123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import os
- import grpc
- from fate_arch.common.base_utils import json_loads
- from fate_arch.protobuf.python import model_service_pb2, model_service_pb2_grpc
- from fate_flow.model.sync_model import SyncModel
- from fate_flow.pipelined_model import pipelined_model
- from fate_flow.pipelined_model.homo_model_deployer.model_deploy import model_deploy
- from fate_flow.settings import (
- ENABLE_MODEL_STORE, FATE_FLOW_MODEL_TRANSFER_ENDPOINT,
- GRPC_OPTIONS, HOST, HTTP_PORT, USE_REGISTRY, stat_logger,
- )
- from fate_flow.utils import model_utils
- def generate_publish_model_info(config_data):
- model_id = config_data['job_parameters']['model_id']
- model_version = config_data['job_parameters']['model_version']
- config_data['model'] = {}
- for role, role_party in config_data.get("role").items():
- config_data['model'][role] = {}
- for party_id in role_party:
- config_data['model'][role][party_id] = {
- 'model_id': model_utils.gen_party_model_id(model_id, role, party_id),
- 'model_version': model_version
- }
- def load_model(config_data):
- stat_logger.info(config_data)
- if not config_data.get('servings'):
- return 100, 'Please configure servings address'
- for serving in config_data['servings']:
- with grpc.insecure_channel(serving, GRPC_OPTIONS) as channel:
- stub = model_service_pb2_grpc.ModelServiceStub(channel)
- load_model_request = model_service_pb2.PublishRequest()
- for role_name, role_partys in config_data.get("role", {}).items():
- for _party_id in role_partys:
- load_model_request.role[role_name].partyId.append(str(_party_id))
- for role_name, role_model_config in config_data.get("model", {}).items():
- for _party_id, role_party_model_config in role_model_config.items():
- load_model_request.model[role_name].roleModelInfo[str(_party_id)].tableName = \
- role_party_model_config['model_version']
- load_model_request.model[role_name].roleModelInfo[str(_party_id)].namespace = \
- role_party_model_config['model_id']
- stat_logger.info('request serving: {} load model'.format(serving))
- load_model_request.local.role = config_data.get('local', {}).get('role', '')
- load_model_request.local.partyId = str(config_data.get('local', {}).get('party_id', ''))
- load_model_request.loadType = config_data['job_parameters'].get("load_type", "FATEFLOW")
- # make use of 'model.transfer.url' in serving server
- use_serving_url = config_data['job_parameters'].get('use_transfer_url_on_serving', False)
- if not USE_REGISTRY and not use_serving_url:
- load_model_request.filePath = f"http://{HOST}:{HTTP_PORT}{FATE_FLOW_MODEL_TRANSFER_ENDPOINT}"
- else:
- load_model_request.filePath = config_data['job_parameters'].get("file_path", "")
- stat_logger.info(load_model_request)
- response = stub.publishLoad(load_model_request)
- stat_logger.info(
- '{} {} load model status: {}'.format(load_model_request.local.role, load_model_request.local.partyId,
- response.statusCode))
- if response.statusCode != 0:
- return response.statusCode, '{} {}'.format(response.message, response.error)
- return 0, 'success'
- def bind_model_service(config_data):
- if not config_data.get('servings'):
- return 100, 'Please configure servings address'
- service_id = str(config_data.get('service_id', ''))
- initiator_role = config_data['initiator']['role']
- initiator_party_id = str(config_data['initiator']['party_id'])
- model_id = config_data['job_parameters']['model_id']
- model_version = config_data['job_parameters']['model_version']
- for serving in config_data['servings']:
- with grpc.insecure_channel(serving, GRPC_OPTIONS) as channel:
- stub = model_service_pb2_grpc.ModelServiceStub(channel)
- publish_model_request = model_service_pb2.PublishRequest()
- publish_model_request.serviceId = service_id
- # {"role": {"guest": ["9999"], "host": ["10000"], "arbiter": ["9999"]}}
- for role_name, role_party in config_data.get("role").items():
- publish_model_request.role[role_name].partyId.extend([str(party_id) for party_id in role_party])
- party_model_id = model_utils.gen_party_model_id(model_id, initiator_role, initiator_party_id)
- publish_model_request.model[initiator_role].roleModelInfo[initiator_party_id].tableName = model_version
- publish_model_request.model[initiator_role].roleModelInfo[initiator_party_id].namespace = party_model_id
- publish_model_request.local.role = initiator_role
- publish_model_request.local.partyId = initiator_party_id
- stat_logger.info(publish_model_request)
- response = stub.publishBind(publish_model_request)
- stat_logger.info(response)
- if response.statusCode != 0:
- return response.statusCode, response.message
- return 0, None
- def download_model(party_model_id, model_version):
- if ENABLE_MODEL_STORE:
- sync_model = SyncModel(
- party_model_id=party_model_id,
- model_version=model_version,
- )
- if sync_model.remote_exists():
- sync_model.download(True)
- model = pipelined_model.PipelinedModel(party_model_id, model_version)
- if not model.exists():
- return {}
- return model.collect_models(in_bytes=True)
- def convert_homo_model(request_data):
- party_model_id = model_utils.gen_party_model_id(model_id=request_data["model_id"],
- role=request_data["role"],
- party_id=request_data["party_id"])
- model_version = request_data.get("model_version")
- model = pipelined_model.PipelinedModel(model_id=party_model_id, model_version=model_version)
- if not model.exists():
- return 100, 'Model {} {} does not exist'.format(party_model_id, model_version), None
- define_meta = model.pipelined_component.get_define_meta()
- framework_name = request_data.get("framework_name")
- detail = []
- # todo: use subprocess?
- convert_tool = model.get_homo_model_convert_tool()
- for key, value in define_meta.get("model_proto", {}).items():
- if key == 'pipeline':
- continue
- for model_alias in value.keys():
- buffer_obj = model.read_component_model(key, model_alias)
- module_name = define_meta.get("component_define", {}).get(key, {}).get('module_name')
- converted_framework, converted_model = convert_tool.model_convert(model_contents=buffer_obj,
- module_name=module_name,
- framework_name=framework_name)
- if converted_model:
- converted_model_dir = os.path.join(model.pipelined_component.variables_data_path, key, model_alias, "converted_model")
- os.makedirs(converted_model_dir, exist_ok=True)
- saved_path = convert_tool.save_converted_model(converted_model,
- converted_framework,
- converted_model_dir)
- detail.append({
- "component_name": key,
- "model_alias": model_alias,
- "converted_model_path": saved_path
- })
- if len(detail) > 0:
- return (0,
- f"Conversion of homogeneous federated learning component(s) in model "
- f"{party_model_id}:{model_version} completed. Use export or homo/deploy "
- f"to download or deploy the converted model.",
- detail)
- else:
- return 100, f"No component in model {party_model_id}:{model_version} can be converted.", None
- def deploy_homo_model(request_data):
- party_model_id = model_utils.gen_party_model_id(model_id=request_data["model_id"],
- role=request_data["role"],
- party_id=request_data["party_id"])
- model_version = request_data["model_version"]
- component_name = request_data['component_name']
- service_id = request_data['service_id']
- framework_name = request_data.get('framework_name')
- model = pipelined_model.PipelinedModel(model_id=party_model_id, model_version=model_version)
- if not model.exists():
- return 100, 'Model {} {} does not exist'.format(party_model_id, model_version), None
- # get the model alias from the dsl saved with the pipeline
- pipeline = model.read_pipeline_model()
- train_dsl = json_loads(pipeline.train_dsl)
- if component_name not in train_dsl.get('components', {}):
- return 100, 'Model {} {} does not contain component {}'.\
- format(party_model_id, model_version, component_name), None
- model_alias_list = train_dsl['components'][component_name].get('output', {}).get('model')
- if not model_alias_list:
- return 100, 'Component {} in Model {} {} does not have output model'. \
- format(component_name, party_model_id, model_version), None
- # currently there is only one model output
- model_alias = model_alias_list[0]
- converted_model_dir = os.path.join(model.pipelined_component.variables_data_path, component_name, model_alias, "converted_model")
- if not os.path.isdir(converted_model_dir):
- return 100, '''Component {} in Model {} {} isn't converted'''.\
- format(component_name, party_model_id, model_version), None
- # todo: use subprocess?
- convert_tool = model.get_homo_model_convert_tool()
- if not framework_name:
- module_name = train_dsl['components'][component_name].get('module')
- buffer_obj = model.read_component_model(component_name, model_alias)
- framework_name = convert_tool.get_default_target_framework(model_contents=buffer_obj, module_name=module_name)
- model_object = convert_tool.load_converted_model(base_dir=converted_model_dir,
- framework_name=framework_name)
- deployed_service = model_deploy(party_model_id,
- model_version,
- model_object,
- framework_name,
- service_id,
- request_data['deployment_type'],
- request_data['deployment_parameters'])
- return (0,
- f"An online serving service is started in the {request_data['deployment_type']} system.",
- deployed_service)
|