123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import typing
- from copy import deepcopy
- from fate_arch.common import FederatedMode
- from fate_arch.common.base_utils import current_timestamp, json_dumps, json_loads
- from fate_flow.controller.job_controller import JobController
- from fate_flow.db.db_models import DB, Job, Task
- from fate_flow.db.job_default_config import JobDefaultConfig
- from fate_flow.entity import JobConfigurationBase, RetCode
- from fate_flow.entity.run_status import (
- EndStatus, FederatedSchedulingStatusCode, InterruptStatus, JobInheritanceStatus,
- JobStatus, SchedulingStatusCode, StatusSet, TaskStatus,
- )
- from fate_flow.entity.types import ResourceOperation
- from fate_flow.model.sync_model import SyncModel
- from fate_flow.operation.job_saver import JobSaver
- from fate_flow.operation.job_tracker import Tracker
- from fate_flow.scheduler.federated_scheduler import FederatedScheduler
- from fate_flow.scheduler.task_scheduler import TaskScheduler
- from fate_flow.settings import ENABLE_MODEL_STORE
- from fate_flow.utils import detect_utils, job_utils, model_utils, schedule_utils
- from fate_flow.utils.config_adapter import JobRuntimeConfigAdapter
- from fate_flow.utils.cron import Cron
- from fate_flow.utils.log_utils import exception_to_trace_string, schedule_logger
- class DAGScheduler(Cron):
- @classmethod
- def submit(cls, submit_job_conf: JobConfigurationBase, job_id: str = None):
- if not job_id:
- job_id = job_utils.generate_job_id()
- submit_result = {
- "job_id": job_id
- }
- schedule_logger(job_id).info(f"submit job, body {submit_job_conf.to_dict()}")
- try:
- dsl = submit_job_conf.dsl
- runtime_conf = deepcopy(submit_job_conf.runtime_conf)
- job_utils.check_job_conf(runtime_conf, dsl)
- job_initiator = runtime_conf["initiator"]
- conf_adapter = JobRuntimeConfigAdapter(runtime_conf)
- common_job_parameters = conf_adapter.get_common_parameters()
- if common_job_parameters.job_type != "predict":
- # generate job model info
- conf_version = schedule_utils.get_conf_version(runtime_conf)
- if conf_version != 2:
- raise Exception("only the v2 version runtime conf is supported")
- common_job_parameters.model_id = model_utils.gen_model_id(runtime_conf["role"])
- common_job_parameters.model_version = job_id
- train_runtime_conf = {}
- else:
- # check predict job parameters
- detect_utils.check_config(common_job_parameters.to_dict(), ["model_id", "model_version"])
- # get inference dsl from pipeline model as job dsl
- tracker = Tracker(job_id=job_id, role=job_initiator["role"], party_id=job_initiator["party_id"],
- model_id=common_job_parameters.model_id, model_version=common_job_parameters.model_version)
- if ENABLE_MODEL_STORE:
- sync_model = SyncModel(
- role=tracker.role, party_id=tracker.party_id,
- model_id=tracker.model_id, model_version=tracker.model_version,
- )
- if sync_model.remote_exists():
- sync_model.download(True)
- if not model_utils.check_if_deployed(
- role=tracker.role, party_id=tracker.party_id,
- model_id=tracker.model_id, model_version=tracker.model_version,
- ):
- raise Exception(f"model has not been deployed yet")
- pipeline_model = tracker.pipelined_model.read_pipeline_model()
- train_runtime_conf = json_loads(pipeline_model.train_runtime_conf)
- dsl = json_loads(pipeline_model.inference_dsl)
- job = Job()
- job.f_job_id = job_id
- job.f_dsl = dsl
- job.f_train_runtime_conf = train_runtime_conf
- job.f_roles = runtime_conf["role"]
- job.f_initiator_role = job_initiator["role"]
- job.f_initiator_party_id = job_initiator["party_id"]
- job.f_role = job_initiator["role"]
- job.f_party_id = job_initiator["party_id"]
- path_dict = job_utils.save_job_conf(job_id=job_id,
- role=job.f_initiator_role,
- party_id=job.f_initiator_party_id,
- dsl=dsl,
- runtime_conf=runtime_conf,
- runtime_conf_on_party={},
- train_runtime_conf=train_runtime_conf,
- pipeline_dsl=None)
- if job.f_initiator_party_id not in runtime_conf["role"][job.f_initiator_role]:
- msg = f"initiator party id {job.f_initiator_party_id} not in roles {runtime_conf['role']}"
- schedule_logger(job_id).info(msg)
- raise Exception(msg)
- # create common parameters on initiator
- JobController.create_common_job_parameters(job_id=job.f_job_id, initiator_role=job.f_initiator_role, common_job_parameters=common_job_parameters)
- job.f_runtime_conf = conf_adapter.update_common_parameters(common_parameters=common_job_parameters)
- dsl_parser = schedule_utils.get_job_dsl_parser(dsl=job.f_dsl,
- runtime_conf=job.f_runtime_conf,
- train_runtime_conf=job.f_train_runtime_conf)
- # initiator runtime conf as template
- job.f_runtime_conf_on_party = job.f_runtime_conf.copy()
- job.f_runtime_conf_on_party["job_parameters"] = common_job_parameters.to_dict()
- # inherit job
- job.f_inheritance_info = common_job_parameters.inheritance_info
- job.f_inheritance_status = JobInheritanceStatus.WAITING if common_job_parameters.inheritance_info else JobInheritanceStatus.PASS
- if job.f_inheritance_info:
- inheritance_jobs = JobSaver.query_job(job_id=job.f_inheritance_info.get("job_id"), role=job_initiator["role"], party_id=job_initiator["party_id"])
- inheritance_tasks = JobSaver.query_task(job_id=job.f_inheritance_info.get("job_id"), role=job_initiator["role"], party_id=job_initiator["party_id"], only_latest=True)
- job_utils.check_job_inheritance_parameters(job, inheritance_jobs, inheritance_tasks)
- status_code, response = FederatedScheduler.create_job(job=job)
- if status_code != FederatedSchedulingStatusCode.SUCCESS:
- job.f_status = JobStatus.FAILED
- job.f_tag = "submit_failed"
- FederatedScheduler.sync_job_status(job=job)
- raise Exception("create job failed", response)
- else:
- need_run_components = {}
- for role in response:
- need_run_components[role] = {}
- for party, res in response[role].items():
- need_run_components[role][party] = [name for name, value in response[role][party]["data"]["components"].items() if value["need_run"] is True]
- if common_job_parameters.federated_mode == FederatedMode.MULTIPLE:
- # create the task holder in db to record information of all participants in the initiator for scheduling
- for role, party_ids in job.f_roles.items():
- for party_id in party_ids:
- if role == job.f_initiator_role and party_id == job.f_initiator_party_id:
- continue
- if not need_run_components[role][party_id]:
- continue
- JobController.initialize_tasks(job_id=job_id,
- role=role,
- party_id=party_id,
- run_on_this_party=False,
- initiator_role=job.f_initiator_role,
- initiator_party_id=job.f_initiator_party_id,
- job_parameters=common_job_parameters,
- dsl_parser=dsl_parser,
- components=need_run_components[role][party_id],
- runtime_conf=runtime_conf,
- is_scheduler=True)
- job.f_status = JobStatus.WAITING
- status_code, response = FederatedScheduler.sync_job_status(job=job)
- if status_code != FederatedSchedulingStatusCode.SUCCESS:
- raise Exception(f"set job to waiting status failed: {response}")
- schedule_logger(job_id).info(f"submit job successfully, job id is {job.f_job_id}, model id is {common_job_parameters.model_id}")
- logs_directory = job_utils.get_job_log_directory(job_id)
- result = {
- "code": RetCode.SUCCESS,
- "message": "success",
- "model_info": {"model_id": common_job_parameters.model_id, "model_version": common_job_parameters.model_version},
- "logs_directory": logs_directory,
- "board_url": job_utils.get_board_url(job_id, job_initiator["role"], job_initiator["party_id"])
- }
- warn_parameter = JobRuntimeConfigAdapter(submit_job_conf.runtime_conf).check_removed_parameter()
- if warn_parameter:
- result["message"] = f"[WARN]{warn_parameter} is removed,it does not take effect!"
- submit_result.update(result)
- submit_result.update(path_dict)
- except Exception as e:
- submit_result["code"] = RetCode.OPERATING_ERROR
- submit_result["message"] = exception_to_trace_string(e)
- schedule_logger(job_id).exception(e)
- return submit_result
- @classmethod
- def update_parameters(cls, job, job_parameters, component_parameters):
- updated_job_parameters, updated_component_parameters, updated_components = JobController.gen_updated_parameters(job_id=job.f_job_id,
- initiator_role=job.f_initiator_role,
- initiator_party_id=job.f_initiator_party_id,
- input_job_parameters=job_parameters,
- input_component_parameters=component_parameters)
- schedule_logger(job.f_job_id).info(f"components {updated_components} parameters has been updated")
- updated_parameters = {
- "job_parameters": updated_job_parameters,
- "component_parameters": updated_component_parameters,
- "components": updated_components
- }
- status_code, response = FederatedScheduler.update_parameter(job, updated_parameters=updated_parameters)
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- return RetCode.SUCCESS, updated_parameters
- else:
- return RetCode.OPERATING_ERROR, response
- def run_do(self):
- schedule_logger().info("start schedule waiting jobs")
- jobs = JobSaver.query_job(is_initiator=True, status=JobStatus.WAITING, order_by="create_time", reverse=False)
- schedule_logger().info(f"have {len(jobs)} waiting jobs")
- if len(jobs):
- # FIFO
- job = jobs[0]
- schedule_logger().info(f"schedule waiting job {job.f_job_id}")
- try:
- self.schedule_waiting_jobs(job=job, lock=True)
- except Exception as e:
- schedule_logger(job.f_job_id).exception(e)
- schedule_logger(job.f_job_id).error("schedule waiting job failed")
- schedule_logger().info("schedule waiting jobs finished")
- schedule_logger().info("start schedule running jobs")
- jobs = JobSaver.query_job(is_initiator=True, status=JobStatus.RUNNING, order_by="create_time", reverse=False)
- schedule_logger().info(f"have {len(jobs)} running jobs")
- for job in jobs:
- schedule_logger().info(f"schedule running job {job.f_job_id}")
- try:
- self.schedule_running_job(job=job, lock=True)
- except Exception as e:
- schedule_logger(job.f_job_id).exception(e)
- schedule_logger(job.f_job_id).error("schedule job failed")
- schedule_logger().info("schedule running jobs finished")
- # some ready job exit before start
- schedule_logger().info("start schedule ready jobs")
- jobs = JobSaver.query_job(is_initiator=True, ready_signal=True, order_by="create_time", reverse=False)
- schedule_logger().info(f"have {len(jobs)} ready jobs")
- for job in jobs:
- schedule_logger().info(f"schedule ready job {job.f_job_id}")
- try:
- self.schedule_ready_job(job=job)
- except Exception as e:
- schedule_logger(job.f_job_id).exception(e)
- schedule_logger(job.f_job_id).error(f"schedule ready job failed:\n{e}")
- schedule_logger().info("schedule ready jobs finished")
- schedule_logger().info("start schedule rerun jobs")
- jobs = JobSaver.query_job(is_initiator=True, rerun_signal=True, order_by="create_time", reverse=False)
- schedule_logger().info(f"have {len(jobs)} rerun jobs")
- for job in jobs:
- schedule_logger(job.f_job_id).info(f"schedule rerun job {job.f_job_id}")
- try:
- self.schedule_rerun_job(job=job, lock=True)
- except Exception as e:
- schedule_logger(job.f_job_id).exception(e)
- schedule_logger(job.f_job_id).error("schedule job failed")
- schedule_logger().info("schedule rerun jobs finished")
- schedule_logger().info("start schedule end status jobs to update status")
- jobs = JobSaver.query_job(is_initiator=True, status=set(EndStatus.status_list()), end_time=[current_timestamp() - JobDefaultConfig.end_status_job_scheduling_time_limit, current_timestamp()])
- schedule_logger().info(f"have {len(jobs)} end status jobs")
- for job in jobs:
- schedule_logger().info(f"schedule end status job {job.f_job_id}")
- try:
- update_status = self.end_scheduling_updates(job_id=job.f_job_id)
- if update_status:
- schedule_logger(job.f_job_id).info("try update status by scheduling like running job")
- else:
- schedule_logger(job.f_job_id).info("the number of updates has been exceeded")
- continue
- self.schedule_running_job(job=job, force_sync_status=True, lock=True)
- except Exception as e:
- schedule_logger(job.f_job_id).exception(e)
- schedule_logger(job.f_job_id).error("schedule job failed")
- schedule_logger().info("schedule end status jobs finished")
- @classmethod
- @schedule_utils.schedule_lock
- def schedule_waiting_jobs(cls, job):
- job_id, initiator_role, initiator_party_id, = job.f_job_id, job.f_initiator_role, job.f_initiator_party_id,
- if job.f_cancel_signal:
- job.f_status = JobStatus.CANCELED
- FederatedScheduler.sync_job_status(job=job)
- schedule_logger(job_id).info("job have cancel signal")
- return
- if job.f_inheritance_status != JobInheritanceStatus.PASS:
- cls.check_component(job)
- schedule_logger(job_id).info("job dependence check")
- dependence_status_code, federated_dependence_response = FederatedScheduler.dependence_for_job(job=job)
- schedule_logger(job_id).info(f"dependence check: {dependence_status_code}, {federated_dependence_response}")
- if dependence_status_code == FederatedSchedulingStatusCode.SUCCESS:
- apply_status_code, federated_response = FederatedScheduler.resource_for_job(job=job, operation_type=ResourceOperation.APPLY)
- if apply_status_code == FederatedSchedulingStatusCode.SUCCESS:
- cls.start_job(job_id=job_id, initiator_role=initiator_role, initiator_party_id=initiator_party_id)
- else:
- # rollback resource
- rollback_party = {}
- failed_party = {}
- for dest_role in federated_response.keys():
- for dest_party_id in federated_response[dest_role].keys():
- retcode = federated_response[dest_role][dest_party_id]["retcode"]
- if retcode == 0:
- rollback_party[dest_role] = rollback_party.get(dest_role, [])
- rollback_party[dest_role].append(dest_party_id)
- else:
- failed_party[dest_role] = failed_party.get(dest_role, [])
- failed_party[dest_role].append(dest_party_id)
- schedule_logger(job_id).info("job apply resource failed on {}, rollback {}".format(
- ",".join([",".join([f"{_r}:{_p}" for _p in _ps]) for _r, _ps in failed_party.items()]),
- ",".join([",".join([f"{_r}:{_p}" for _p in _ps]) for _r, _ps in rollback_party.items()]),
- ))
- if rollback_party:
- return_status_code, federated_response = FederatedScheduler.resource_for_job(job=job, operation_type=ResourceOperation.RETURN, specific_dest=rollback_party)
- if return_status_code != FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(job_id).info(f"job return resource failed:\n{federated_response}")
- else:
- schedule_logger(job_id).info("job no party should be rollback resource")
- if apply_status_code == FederatedSchedulingStatusCode.ERROR:
- cls.stop_job(job_id=job_id, role=initiator_role, party_id=initiator_party_id, stop_status=JobStatus.FAILED)
- schedule_logger(job_id).info("apply resource error, stop job")
- else:
- retcode_set = set()
- for dest_role in federated_dependence_response.keys():
- for party_id in federated_dependence_response[dest_role].keys():
- retcode_set.add(federated_dependence_response[dest_role][party_id]["retcode"])
- if not retcode_set.issubset({RetCode.RUNNING, RetCode.SUCCESS}):
- FederatedScheduler.stop_job(job, StatusSet.FAILED)
- @classmethod
- def check_component(cls, job, check_type="inheritance"):
- schedule_logger(job.f_job_id).info("component check")
- dependence_status_code, response = FederatedScheduler.check_component(job=job, check_type=check_type)
- schedule_logger(job.f_job_id).info(f"component check response: {response}")
- dsl_parser = schedule_utils.get_job_dsl_parser(dsl=job.f_dsl,
- runtime_conf=job.f_runtime_conf,
- train_runtime_conf=job.f_train_runtime_conf)
- component_set = set([cpn.name for cpn in dsl_parser.get_source_connect_sub_graph(job.f_inheritance_info.get("component_list"))])
- for dest_role in response.keys():
- for party_id in response[dest_role].keys():
- component_set = component_set.intersection(set(response[dest_role][party_id].get("data")))
- if component_set != set(job.f_inheritance_info.get("component_list")):
- schedule_logger(job.f_job_id).info(f"dsl parser components:{component_set}")
- component_list = [cpn.name for cpn in dsl_parser.get_source_connect_sub_graph(list(component_set))]
- schedule_logger(job.f_job_id).info(f"parser result:{component_list}")
- command_body = {"inheritance_info": job.f_inheritance_info}
- command_body["inheritance_info"].update({"component_list": component_list})
- schedule_logger(job.f_job_id).info(f"start align job info:{command_body}")
- status_code, response = FederatedScheduler.align_args(job, command_body=command_body)
- schedule_logger(job.f_job_id).info(f"align result:{status_code}, {response}")
- schedule_logger(job.f_job_id).info("check success")
- @classmethod
- def schedule_ready_job(cls, job):
- job_id = job.f_job_id
- update_status = schedule_utils.ready_signal(job_id=job_id, set_or_reset=False, ready_timeout_ttl=60 * 1000)
- schedule_logger(job_id).info(f"reset job ready signal {update_status}")
- @classmethod
- @schedule_utils.schedule_lock
- def schedule_rerun_job(cls, job):
- if EndStatus.contains(job.f_status):
- job.f_status = JobStatus.WAITING
- job.f_ready_signal = False
- job.f_ready_time = None
- job.f_rerun_signal = False
- job.f_progress = 0
- job.f_end_time = None
- job.f_elapsed = None
- schedule_logger(job.f_job_id).info("job has been finished, set waiting to rerun")
- status, response = FederatedScheduler.sync_job_status(job=job)
- if status == FederatedSchedulingStatusCode.SUCCESS:
- schedule_utils.rerun_signal(job_id=job.f_job_id, set_or_reset=False)
- FederatedScheduler.sync_job(job=job, update_fields=["ready_signal", "ready_time", "rerun_signal", "progress", "end_time", "elapsed"])
- schedule_logger(job.f_job_id).info("job set waiting to rerun successfully")
- else:
- schedule_logger(job.f_job_id).info("job set waiting to rerun failed")
- else:
- schedule_utils.rerun_signal(job_id=job.f_job_id, set_or_reset=False)
- cls.schedule_running_job(job)
- @classmethod
- def start_job(cls, job_id, initiator_role, initiator_party_id):
- schedule_logger(job_id).info(f"try to start job on initiator {initiator_role} {initiator_party_id}")
- jobs = JobSaver.query_job(job_id=job_id, role=initiator_role, party_id=initiator_party_id)
- if jobs:
- job = jobs[0]
- FederatedScheduler.start_job(job=job)
- schedule_logger(job_id).info(f"start job on initiator {initiator_role} {initiator_party_id}")
- else:
- schedule_logger(job_id).error(f"can not found job on initiator {initiator_role} {initiator_party_id}")
- @classmethod
- @schedule_utils.schedule_lock
- def schedule_running_job(cls, job: Job, force_sync_status=False):
- schedule_logger(job.f_job_id).info("scheduling running job")
- dsl_parser = schedule_utils.get_job_dsl_parser(dsl=job.f_dsl,
- runtime_conf=job.f_runtime_conf_on_party,
- train_runtime_conf=job.f_train_runtime_conf)
- task_scheduling_status_code, auto_rerun_tasks, tasks = TaskScheduler.schedule(job=job, dsl_parser=dsl_parser, canceled=job.f_cancel_signal)
- tasks_status = dict([(task.f_component_name, task.f_status) for task in tasks])
- new_job_status = cls.calculate_job_status(task_scheduling_status_code=task_scheduling_status_code, tasks_status=tasks_status.values())
- if new_job_status == JobStatus.WAITING and job.f_cancel_signal:
- new_job_status = JobStatus.CANCELED
- total, finished_count = cls.calculate_job_progress(tasks_status=tasks_status)
- new_progress = float(finished_count) / total * 100
- schedule_logger(job.f_job_id).info(f"job status is {new_job_status}, calculate by task status list: {tasks_status}")
- if new_job_status != job.f_status or new_progress != job.f_progress:
- # Make sure to update separately, because these two fields update with anti-weight logic
- if int(new_progress) - job.f_progress > 0:
- job.f_progress = new_progress
- FederatedScheduler.sync_job(job=job, update_fields=["progress"])
- cls.update_job_on_initiator(initiator_job=job, update_fields=["progress"])
- if new_job_status != job.f_status:
- job.f_status = new_job_status
- if EndStatus.contains(job.f_status):
- FederatedScheduler.save_pipelined_model(job=job)
- FederatedScheduler.sync_job_status(job=job)
- cls.update_job_on_initiator(initiator_job=job, update_fields=["status"])
- if EndStatus.contains(job.f_status):
- cls.finish(job=job, end_status=job.f_status)
- if auto_rerun_tasks:
- schedule_logger(job.f_job_id).info("job have auto rerun tasks")
- cls.set_job_rerun(job_id=job.f_job_id, initiator_role=job.f_initiator_role, initiator_party_id=job.f_initiator_party_id, tasks=auto_rerun_tasks, auto=True)
- if force_sync_status:
- FederatedScheduler.sync_job_status(job=job)
- schedule_logger(job.f_job_id).info("finish scheduling running job")
- @classmethod
- def set_job_rerun(cls, job_id, initiator_role, initiator_party_id, auto, force=False,
- tasks: typing.List[Task] = None, component_name: typing.Union[str, list] = None):
- schedule_logger(job_id).info(f"try to rerun job on initiator {initiator_role} {initiator_party_id}")
- jobs = JobSaver.query_job(job_id=job_id, role=initiator_role, party_id=initiator_party_id)
- if not jobs:
- raise RuntimeError(f"can not found job on initiator {initiator_role} {initiator_party_id}")
- job = jobs[0]
- dsl_parser = schedule_utils.get_job_dsl_parser(dsl=job.f_dsl,
- runtime_conf=job.f_runtime_conf_on_party,
- train_runtime_conf=job.f_train_runtime_conf)
- component_name, force = cls.get_rerun_component(component_name, job, dsl_parser, force)
- schedule_logger(job_id).info(f"rerun component: {component_name}")
- if tasks:
- schedule_logger(job_id).info(f"require {[task.f_component_name for task in tasks]} to rerun")
- else:
- task_query = {
- 'job_id': job_id,
- 'role': initiator_role,
- 'party_id': initiator_party_id,
- }
- if not component_name or component_name == job_utils.PIPELINE_COMPONENT_NAME:
- # rerun all tasks
- schedule_logger(job_id).info("require all component of pipeline to rerun")
- else:
- _require_reruns = {component_name} if isinstance(component_name, str) else set(component_name)
- _should_reruns = _require_reruns.copy()
- for _cpn in _require_reruns:
- _components = dsl_parser.get_downstream_dependent_components(_cpn)
- for _c in _components:
- _should_reruns.add(_c.get_name())
- schedule_logger(job_id).info(f"require {_require_reruns} to rerun, "
- f"and then found {_should_reruns} need be to rerun")
- task_query['component_name'] = _should_reruns
- tasks = JobSaver.query_task(**task_query)
- job_can_rerun = any([TaskScheduler.prepare_rerun_task(
- job=job, task=task, dsl_parser=dsl_parser, auto=auto, force=force,
- ) for task in tasks])
- if not job_can_rerun:
- FederatedScheduler.sync_job_status(job=job)
- schedule_logger(job_id).info("job no task to rerun")
- return False
- schedule_logger(job_id).info("job set rerun signal")
- status = schedule_utils.rerun_signal(job_id=job_id, set_or_reset=True)
- schedule_logger(job_id).info(f"job set rerun signal {'successfully' if status else 'failed'}")
- return True
- @classmethod
- def get_rerun_component(cls, component_name, job, dsl_parser, force):
- if not component_name or component_name == job_utils.PIPELINE_COMPONENT_NAME:
- pass
- else:
- dependence_status_code, response = FederatedScheduler.check_component(job=job, check_type="rerun")
- success_task_list = [task.f_component_name for task in JobSaver.query_task(job_id=job.f_job_id, party_id=job.f_party_id, role=job.f_role,
- status=TaskStatus.SUCCESS, only_latest=True)]
- component_set = set()
- for dest_role in response.keys():
- for party_id in response[dest_role].keys():
- component_set = component_set.union(set(response[dest_role][party_id].get("data")))
- schedule_logger(job.f_job_id).info(f"success task list: {success_task_list}, check failed component list: {list(component_set)}")
- need_rerun = [cpn.name for cpn in dsl_parser.get_need_revisit_nodes(success_task_list, list(component_set))]
- schedule_logger(job.f_job_id).info(f"need rerun success component: {need_rerun}")
- if component_set:
- force = True
- if isinstance(component_name, str):
- component_name = set(need_rerun).union({component_name})
- else:
- component_name = set(need_rerun).union(set(component_name))
- return component_name, force
- @classmethod
- def update_job_on_initiator(cls, initiator_job: Job, update_fields: list):
- schedule_logger(initiator_job.f_job_id).info(f"try to update job {update_fields} on initiator")
- jobs = JobSaver.query_job(job_id=initiator_job.f_job_id)
- if not jobs:
- raise Exception("Failed to update job status on initiator")
- job_info = initiator_job.to_human_model_dict(only_primary_with=update_fields)
- for field in update_fields:
- job_info[field] = getattr(initiator_job, "f_%s" % field)
- for job in jobs:
- job_info["role"] = job.f_role
- job_info["party_id"] = job.f_party_id
- JobSaver.update_job_status(job_info=job_info)
- JobSaver.update_job(job_info=job_info)
- schedule_logger(initiator_job.f_job_id).info(f"update job {update_fields} on initiator finished")
- @classmethod
- def calculate_job_status(cls, task_scheduling_status_code, tasks_status):
- # 1. all waiting
- # 2. have running
- # 3. waiting + end status
- # 4. all end status and difference
- # 5. all the same end status
- tmp_status_set = set(tasks_status)
- if TaskStatus.PASS in tmp_status_set:
- tmp_status_set.remove(TaskStatus.PASS)
- tmp_status_set.add(TaskStatus.SUCCESS)
- if len(tmp_status_set) == 1:
- # 1 and 5
- return tmp_status_set.pop()
- else:
- if TaskStatus.RUNNING in tmp_status_set:
- # 2
- return JobStatus.RUNNING
- if TaskStatus.WAITING in tmp_status_set:
- # 3
- if task_scheduling_status_code == SchedulingStatusCode.HAVE_NEXT:
- return JobStatus.RUNNING
- else:
- # have waiting with no next
- pass
- # have waiting with no next or 4
- for status in sorted(InterruptStatus.status_list(), key=lambda s: StatusSet.get_level(status=s), reverse=True):
- if status in tmp_status_set:
- return status
- if tmp_status_set == {TaskStatus.WAITING, TaskStatus.SUCCESS} and task_scheduling_status_code == SchedulingStatusCode.NO_NEXT:
- return JobStatus.CANCELED
- raise Exception("calculate job status failed, all task status: {}".format(tasks_status))
- @classmethod
- def calculate_job_progress(cls, tasks_status):
- total = 0
- finished_count = 0
- for task_status in tasks_status.values():
- total += 1
- if EndStatus.contains(task_status):
- finished_count += 1
- return total, finished_count
- @classmethod
- def stop_job(cls, job_id, role, party_id, stop_status):
- schedule_logger(job_id).info(f"request stop job with {stop_status}")
- jobs = JobSaver.query_job(job_id=job_id, role=role, party_id=party_id, is_initiator=True)
- if len(jobs) > 0:
- if stop_status == JobStatus.CANCELED:
- schedule_logger(job_id).info("cancel job")
- set_cancel_status = schedule_utils.cancel_signal(job_id=job_id, set_or_reset=True)
- schedule_logger(job_id).info(f"set job cancel signal {set_cancel_status}")
- job = jobs[0]
- job.f_status = stop_status
- schedule_logger(job_id).info(f"request stop job with {stop_status} to all party")
- status_code, response = FederatedScheduler.stop_job(job=jobs[0], stop_status=stop_status)
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- schedule_logger(job_id).info(f"stop job with {stop_status} successfully")
- return RetCode.SUCCESS, "success"
- else:
- initiator_tasks_group = JobSaver.get_tasks_asc(job_id=job.f_job_id, role=job.f_role, party_id=job.f_party_id)
- for initiator_task in initiator_tasks_group.values():
- TaskScheduler.collect_task_of_all_party(job, initiator_task=initiator_task, set_status=stop_status)
- schedule_logger(job_id).info(f"stop job with {stop_status} failed, {response}")
- return RetCode.FEDERATED_ERROR, json_dumps(response)
- else:
- return RetCode.SUCCESS, "can not found job"
- @classmethod
- @DB.connection_context()
- def end_scheduling_updates(cls, job_id):
- operate = Job.update({Job.f_end_scheduling_updates: Job.f_end_scheduling_updates + 1}).where(Job.f_job_id == job_id,
- Job.f_end_scheduling_updates < JobDefaultConfig.end_status_job_scheduling_updates)
- update_status = operate.execute() > 0
- return update_status
- @classmethod
- def finish(cls, job, end_status):
- schedule_logger(job.f_job_id).info(f"job finished with {end_status}, do something...")
- cls.stop_job(job_id=job.f_job_id, role=job.f_initiator_role, party_id=job.f_initiator_party_id, stop_status=end_status)
- FederatedScheduler.clean_job(job=job)
- schedule_logger(job.f_job_id).info(f"job finished with {end_status}, done")
|