123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- from fate_arch.common import base_utils
- from fate_flow.scheduler import SchedulerBase
- from fate_flow.utils.api_utils import federated_api
- from fate_flow.utils.log_utils import start_log, failed_log, successful_log, warning_log
- from fate_flow.utils.log_utils import schedule_logger
- from fate_flow.entity import RetCode
- from fate_flow.entity.run_status import FederatedSchedulingStatusCode
- from fate_flow.entity.types import ResourceOperation
- from fate_flow.db.db_models import Job, Task
- from fate_flow.operation.job_saver import JobSaver
- import threading
- from fate_flow.entity.types import TaskCleanResourceType
- class FederatedScheduler(SchedulerBase):
- """
- Send commands to party,
- Report info to initiator
- """
- # Task
- REPORT_TO_INITIATOR_FIELDS = ["party_status", "start_time", "update_time", "end_time", "elapsed", "error_report"]
- # Job
- @classmethod
- def create_job(cls, job: Job):
- return cls.job_command(job=job, command="create", command_body=job.to_human_model_dict(), parallel=False)
- @classmethod
- def update_parameter(cls, job: Job, updated_parameters):
- return cls.job_command(job=job, command="parameter/update", command_body=updated_parameters, parallel=False)
- @classmethod
- def resource_for_job(cls, job, operation_type: ResourceOperation, specific_dest=None):
- schedule_logger(job.f_job_id).info(f"try to {operation_type} job resource")
- status_code, response = cls.job_command(job=job, command=f"resource/{operation_type.value}", specific_dest=specific_dest)
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(job.f_job_id).info(f"{operation_type} job resource successfully")
- else:
- schedule_logger(job.f_job_id).info(f"{operation_type} job resource failed")
- return status_code, response
- @classmethod
- def check_component(cls, job, check_type, specific_dest=None):
- schedule_logger(job.f_job_id).info(f"try to check component inheritance dependence")
- status_code, response = cls.job_command(job=job, command=f"component/{check_type}/check", specific_dest=specific_dest)
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(job.f_job_id).info(f"check job dependence successfully")
- else:
- schedule_logger(job.f_job_id).info(f"check job dependence failed")
- return status_code, response
- @classmethod
- def dependence_for_job(cls, job, specific_dest=None):
- schedule_logger(job.f_job_id).info(f"try to check job dependence")
- status_code, response = cls.job_command(job=job, command=f"dependence/check", specific_dest=specific_dest)
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(job.f_job_id).info(f"check job dependence successfully")
- else:
- schedule_logger(job.f_job_id).info(f"check job dependence failed")
- return status_code, response
- @classmethod
- def connect(cls, job):
- return cls.job_command(job=job, command="align", command_body={"job_id": job.f_job_id, "role": job.f_role,
- "party_id": job.f_party_id})
- @classmethod
- def start_job(cls, job, command_body=None):
- return cls.job_command(job=job, command="start", command_body=command_body)
- @classmethod
- def align_args(cls, job, command_body):
- return cls.job_command(job=job, command="align", command_body=command_body)
- @classmethod
- def sync_job(cls, job, update_fields):
- sync_info = job.to_human_model_dict(only_primary_with=update_fields)
- schedule_logger(job.f_job_id).info("sync job info to all party")
- status_code, response = cls.job_command(job=job, command="update", command_body=sync_info)
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(job.f_job_id).info("sync job info to all party successfully")
- else:
- schedule_logger(job.f_job_id).info(f"sync job info to all party failed: \n{response}")
- return status_code, response
- @classmethod
- def sync_job_status(cls, job):
- schedule_logger(job.f_job_id).info(f"job is {job.f_status}, sync to all party")
- status_code, response = cls.job_command(job=job, command=f"status/{job.f_status}", command_body=job.to_human_model_dict())
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(job.f_job_id).info(f"sync job status {job.f_status} to all party success")
- else:
- schedule_logger(job.f_job_id).info(f"sync job status {job.f_status} to all party failed: \n{response}")
- return status_code, response
- @classmethod
- def save_pipelined_model(cls, job):
- schedule_logger(job.f_job_id).info("try to save job pipelined model")
- status_code, response = cls.job_command(job=job, command="model")
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(job.f_job_id).info("save job pipelined model success")
- else:
- schedule_logger(job.f_job_id).info(f"save job pipelined model failed:\n{response}")
- return status_code, response
- @classmethod
- def stop_job(cls, job, stop_status):
- schedule_logger(job.f_job_id).info("try to stop job")
- job.f_status = stop_status
- status_code, response = cls.job_command(job=job, command="stop/{}".format(stop_status))
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(job.f_job_id).info("stop job success")
- else:
- schedule_logger(job.f_job_id).info(f"stop job failed:\n{response}")
- return status_code, response
- @classmethod
- def request_stop_job(cls, job, stop_status, command_body=None):
- return cls.job_command(job=job, command="stop/{}".format(stop_status), dest_only_initiator=True, command_body=command_body)
- @classmethod
- def request_rerun_job(cls, job, command_body):
- return cls.job_command(job=job, command="rerun", command_body=command_body, dest_only_initiator=True)
- @classmethod
- def clean_job(cls, job):
- schedule_logger(job.f_job_id).info("try to clean job")
- status_code, response = cls.job_command(job=job, command="clean", command_body=job.f_runtime_conf_on_party["role"].copy())
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(job.f_job_id).info("clean job success")
- else:
- schedule_logger(job.f_job_id).info(f"clean job failed:\n{response}")
- return status_code, response
- @classmethod
- def job_command(cls, job, command, command_body=None, dest_only_initiator=False, specific_dest=None, parallel=False):
- federated_response = {}
- job_parameters = job.f_runtime_conf_on_party["job_parameters"]
- if dest_only_initiator:
- dest_partis = [(job.f_initiator_role, [job.f_initiator_party_id])]
- api_type = "initiator"
- elif specific_dest:
- dest_partis = specific_dest.items()
- api_type = "party"
- else:
- dest_partis = job.f_roles.items()
- api_type = "party"
- threads = []
- for dest_role, dest_party_ids in dest_partis:
- federated_response[dest_role] = {}
- for dest_party_id in dest_party_ids:
- endpoint = f"/{api_type}/{job.f_job_id}/{dest_role}/{dest_party_id}/{command}"
- args = (job.f_job_id, job.f_role, job.f_party_id, dest_role, dest_party_id, endpoint, command_body, job_parameters["federated_mode"], federated_response)
- if parallel:
- t = threading.Thread(target=cls.federated_command, args=args)
- threads.append(t)
- t.start()
- else:
- cls.federated_command(*args)
- for thread in threads:
- thread.join()
- return cls.return_federated_response(federated_response=federated_response)
- @classmethod
- def create_task(cls, job, task):
- return cls.task_command(job=job, task=task, command="create", command_body=task.to_human_model_dict())
- @classmethod
- def start_task(cls, job, task):
- return cls.task_command(job=job, task=task, command="start", command_body={}, need_user=True)
- @classmethod
- def collect_task(cls, job, task):
- return cls.task_command(job=job, task=task, command="collect")
- @classmethod
- def sync_task(cls, job, task, update_fields):
- sync_info = task.to_human_model_dict(only_primary_with=update_fields)
- schedule_logger(task.f_job_id).info("sync task {} {} info to all party".format(task.f_task_id, task.f_task_version))
- status_code, response = cls.task_command(job=job, task=task, command="update", command_body=sync_info)
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(task.f_job_id).info("sync task {} {} info to all party successfully".format(task.f_task_id, task.f_task_version))
- else:
- schedule_logger(task.f_job_id).info("sync task {} {} info to all party failed: \n{}".format(task.f_task_id, task.f_task_version, response))
- return status_code, response
- @classmethod
- def sync_task_status(cls, job, task):
- schedule_logger(task.f_job_id).info("task {} {} is {}, sync to all party".format(task.f_task_id, task.f_task_version, task.f_status))
- status_code, response = cls.task_command(job=job, task=task, command=f"status/{task.f_status}")
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(task.f_job_id).info("sync task {} {} status {} to all party success".format(task.f_task_id, task.f_task_version, task.f_status))
- else:
- schedule_logger(task.f_job_id).info("sync task {} {} status {} to all party failed: \n{}".format(task.f_task_id, task.f_task_version, task.f_status, response))
- return status_code, response
- @classmethod
- def stop_task(cls, job, task, stop_status, command_body=None):
- schedule_logger(task.f_job_id).info("try to stop task {} {}".format(task.f_task_id, task.f_task_version))
- task.f_status = stop_status
- status_code, response = cls.task_command(job=job, task=task, command="stop/{}".format(stop_status), command_body=command_body)
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(job.f_job_id).info("stop task {} {} success".format(task.f_task_id, task.f_task_version))
- else:
- schedule_logger(job.f_job_id).info("stop task {} {} failed:\n{}".format(task.f_task_id, task.f_task_version, response))
- return status_code, response
- @classmethod
- def clean_task(cls, job, task, content_type: TaskCleanResourceType):
- schedule_logger(task.f_job_id).info("try to clean task {} {} {}".format(task.f_task_id, task.f_task_version, content_type))
- status_code, response = cls.task_command(job=job, task=task, command="clean/{}".format(content_type.value))
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(job.f_job_id).info("clean task {} {} {} successfully".format(task.f_task_id, task.f_task_version, content_type))
- else:
- schedule_logger(job.f_job_id).info("clean task {} {} {} failed:\n{}".format(task.f_task_id, task.f_task_version, content_type, response))
- return status_code, response
- @classmethod
- def task_command(cls, job: Job, task: Task, command, command_body=None, parallel=False, need_user=False):
- msg = f"execute federated task {task.f_component_name} command({command})"
- federated_response = {}
- job_parameters = job.f_runtime_conf_on_party["job_parameters"]
- tasks = JobSaver.query_task(task_id=task.f_task_id, only_latest=True)
- threads = []
- for task in tasks:
- dest_role, dest_party_id = task.f_role, task.f_party_id
- federated_response[dest_role] = federated_response.get(dest_role, {})
- endpoint = f"/party/{task.f_job_id}/{task.f_component_name}/{task.f_task_id}/{task.f_task_version}/{dest_role}/{dest_party_id}/{command}"
- if need_user:
- command_body["user_id"] = job.f_user.get(dest_role, {}).get(str(dest_party_id), "")
- schedule_logger(job.f_job_id).info(f'user:{job.f_user}, dest_role:{dest_role}, dest_party_id:{dest_party_id}')
- schedule_logger(job.f_job_id).info(f'command_body: {command_body}')
- args = (job.f_job_id, job.f_role, job.f_party_id, dest_role, dest_party_id, endpoint, command_body, job_parameters["federated_mode"], federated_response)
- if parallel:
- t = threading.Thread(target=cls.federated_command, args=args)
- threads.append(t)
- t.start()
- else:
- cls.federated_command(*args)
- for thread in threads:
- thread.join()
- status_code, response = cls.return_federated_response(federated_response=federated_response)
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(job.f_job_id).info(successful_log(msg))
- elif status_code == FederatedSchedulingStatusCode.NOT_EFFECTIVE:
- schedule_logger(job.f_job_id).warning(warning_log(msg))
- elif status_code == FederatedSchedulingStatusCode.ERROR:
- schedule_logger(job.f_job_id).critical(failed_log(msg, detail=response))
- else:
- schedule_logger(job.f_job_id).error(failed_log(msg, detail=response))
- return status_code, response
- @classmethod
- def federated_command(cls, job_id, src_role, src_party_id, dest_role, dest_party_id, endpoint, body, federated_mode, federated_response):
- st = base_utils.current_timestamp()
- log_msg = f"sending {endpoint} federated command"
- schedule_logger(job_id).info(start_log(msg=log_msg))
- try:
- response = federated_api(job_id=job_id,
- method='POST',
- endpoint=endpoint,
- src_role=src_role,
- src_party_id=src_party_id,
- dest_party_id=dest_party_id,
- json_body=body if body else {},
- federated_mode=federated_mode)
- except Exception as e:
- schedule_logger(job_id=job_id).exception(e)
- response = {
- "retcode": RetCode.FEDERATED_ERROR,
- "retmsg": "Federated schedule error, {}".format(e)
- }
- if response["retcode"] != RetCode.SUCCESS:
- if response["retcode"] in [RetCode.NOT_EFFECTIVE, RetCode.RUNNING]:
- schedule_logger(job_id).warning(warning_log(msg=log_msg, role=dest_role, party_id=dest_party_id))
- else:
- schedule_logger(job_id).error(failed_log(msg=log_msg, role=dest_role, party_id=dest_party_id, detail=response["retmsg"]))
- federated_response[dest_role][dest_party_id] = response
- et = base_utils.current_timestamp()
- schedule_logger(job_id).info(f"{log_msg} use {et - st} ms")
- @classmethod
- def report_task_to_initiator(cls, task: Task):
- """
- :param task:
- :return:
- """
- if task.f_role != task.f_initiator_role and task.f_party_id != task.f_initiator_party_id:
- try:
- response = federated_api(job_id=task.f_job_id,
- method='POST',
- endpoint='/initiator/{}/{}/{}/{}/{}/{}/report'.format(
- task.f_job_id,
- task.f_component_name,
- task.f_task_id,
- task.f_task_version,
- task.f_role,
- task.f_party_id),
- src_party_id=task.f_party_id,
- dest_party_id=task.f_initiator_party_id,
- src_role=task.f_role,
- json_body=task.to_human_model_dict(only_primary_with=cls.REPORT_TO_INITIATOR_FIELDS),
- federated_mode=task.f_federated_mode)
- except Exception as e:
- schedule_logger(task.f_job_id).error(f"report task to initiator error: {e}")
- return False
- if response["retcode"] != RetCode.SUCCESS:
- retmsg = response["retmsg"]
- schedule_logger(task.f_job_id).error(f"report task to initiator error: {retmsg}")
- return False
- else:
- return True
- else:
- return False
- @classmethod
- def tracker_command(cls, job, request_data, command, json_body=None):
- job_parameters = job.f_runtime_conf_on_party["job_parameters"]
- response = federated_api(job_id=str(request_data['job_id']),
- method='POST',
- endpoint='/tracker/{}/{}/{}/{}/{}'.format(
- request_data['job_id'],
- request_data['component_name'],
- request_data['role'],
- request_data['party_id'],
- command),
- src_party_id=job.f_party_id,
- dest_party_id=request_data['party_id'],
- src_role=job.f_role,
- json_body=json_body if json_body else {},
- federated_mode=job_parameters["federated_mode"])
- return response
|