dag_scheduler.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import typing
  17. from copy import deepcopy
  18. from fate_arch.common import FederatedMode
  19. from fate_arch.common.base_utils import current_timestamp, json_dumps, json_loads
  20. from fate_flow.controller.job_controller import JobController
  21. from fate_flow.db.db_models import DB, Job, Task
  22. from fate_flow.db.job_default_config import JobDefaultConfig
  23. from fate_flow.entity import JobConfigurationBase, RetCode
  24. from fate_flow.entity.run_status import (
  25. EndStatus, FederatedSchedulingStatusCode, InterruptStatus, JobInheritanceStatus,
  26. JobStatus, SchedulingStatusCode, StatusSet, TaskStatus,
  27. )
  28. from fate_flow.entity.types import ResourceOperation
  29. from fate_flow.model.sync_model import SyncModel
  30. from fate_flow.operation.job_saver import JobSaver
  31. from fate_flow.operation.job_tracker import Tracker
  32. from fate_flow.scheduler.federated_scheduler import FederatedScheduler
  33. from fate_flow.scheduler.task_scheduler import TaskScheduler
  34. from fate_flow.settings import ENABLE_MODEL_STORE
  35. from fate_flow.utils import detect_utils, job_utils, model_utils, schedule_utils
  36. from fate_flow.utils.config_adapter import JobRuntimeConfigAdapter
  37. from fate_flow.utils.cron import Cron
  38. from fate_flow.utils.log_utils import exception_to_trace_string, schedule_logger
  39. class DAGScheduler(Cron):
  40. @classmethod
  41. def submit(cls, submit_job_conf: JobConfigurationBase, job_id: str = None):
  42. if not job_id:
  43. job_id = job_utils.generate_job_id()
  44. submit_result = {
  45. "job_id": job_id
  46. }
  47. schedule_logger(job_id).info(f"submit job, body {submit_job_conf.to_dict()}")
  48. try:
  49. dsl = submit_job_conf.dsl
  50. runtime_conf = deepcopy(submit_job_conf.runtime_conf)
  51. job_utils.check_job_conf(runtime_conf, dsl)
  52. job_initiator = runtime_conf["initiator"]
  53. conf_adapter = JobRuntimeConfigAdapter(runtime_conf)
  54. common_job_parameters = conf_adapter.get_common_parameters()
  55. if common_job_parameters.job_type != "predict":
  56. # generate job model info
  57. conf_version = schedule_utils.get_conf_version(runtime_conf)
  58. if conf_version != 2:
  59. raise Exception("only the v2 version runtime conf is supported")
  60. common_job_parameters.model_id = model_utils.gen_model_id(runtime_conf["role"])
  61. common_job_parameters.model_version = job_id
  62. train_runtime_conf = {}
  63. else:
  64. # check predict job parameters
  65. detect_utils.check_config(common_job_parameters.to_dict(), ["model_id", "model_version"])
  66. # get inference dsl from pipeline model as job dsl
  67. tracker = Tracker(job_id=job_id, role=job_initiator["role"], party_id=job_initiator["party_id"],
  68. model_id=common_job_parameters.model_id, model_version=common_job_parameters.model_version)
  69. if ENABLE_MODEL_STORE:
  70. sync_model = SyncModel(
  71. role=tracker.role, party_id=tracker.party_id,
  72. model_id=tracker.model_id, model_version=tracker.model_version,
  73. )
  74. if sync_model.remote_exists():
  75. sync_model.download(True)
  76. if not model_utils.check_if_deployed(
  77. role=tracker.role, party_id=tracker.party_id,
  78. model_id=tracker.model_id, model_version=tracker.model_version,
  79. ):
  80. raise Exception(f"model has not been deployed yet")
  81. pipeline_model = tracker.pipelined_model.read_pipeline_model()
  82. train_runtime_conf = json_loads(pipeline_model.train_runtime_conf)
  83. dsl = json_loads(pipeline_model.inference_dsl)
  84. job = Job()
  85. job.f_job_id = job_id
  86. job.f_dsl = dsl
  87. job.f_train_runtime_conf = train_runtime_conf
  88. job.f_roles = runtime_conf["role"]
  89. job.f_initiator_role = job_initiator["role"]
  90. job.f_initiator_party_id = job_initiator["party_id"]
  91. job.f_role = job_initiator["role"]
  92. job.f_party_id = job_initiator["party_id"]
  93. path_dict = job_utils.save_job_conf(job_id=job_id,
  94. role=job.f_initiator_role,
  95. party_id=job.f_initiator_party_id,
  96. dsl=dsl,
  97. runtime_conf=runtime_conf,
  98. runtime_conf_on_party={},
  99. train_runtime_conf=train_runtime_conf,
  100. pipeline_dsl=None)
  101. if job.f_initiator_party_id not in runtime_conf["role"][job.f_initiator_role]:
  102. msg = f"initiator party id {job.f_initiator_party_id} not in roles {runtime_conf['role']}"
  103. schedule_logger(job_id).info(msg)
  104. raise Exception(msg)
  105. # create common parameters on initiator
  106. JobController.create_common_job_parameters(job_id=job.f_job_id, initiator_role=job.f_initiator_role, common_job_parameters=common_job_parameters)
  107. job.f_runtime_conf = conf_adapter.update_common_parameters(common_parameters=common_job_parameters)
  108. dsl_parser = schedule_utils.get_job_dsl_parser(dsl=job.f_dsl,
  109. runtime_conf=job.f_runtime_conf,
  110. train_runtime_conf=job.f_train_runtime_conf)
  111. # initiator runtime conf as template
  112. job.f_runtime_conf_on_party = job.f_runtime_conf.copy()
  113. job.f_runtime_conf_on_party["job_parameters"] = common_job_parameters.to_dict()
  114. # inherit job
  115. job.f_inheritance_info = common_job_parameters.inheritance_info
  116. job.f_inheritance_status = JobInheritanceStatus.WAITING if common_job_parameters.inheritance_info else JobInheritanceStatus.PASS
  117. if job.f_inheritance_info:
  118. inheritance_jobs = JobSaver.query_job(job_id=job.f_inheritance_info.get("job_id"), role=job_initiator["role"], party_id=job_initiator["party_id"])
  119. inheritance_tasks = JobSaver.query_task(job_id=job.f_inheritance_info.get("job_id"), role=job_initiator["role"], party_id=job_initiator["party_id"], only_latest=True)
  120. job_utils.check_job_inheritance_parameters(job, inheritance_jobs, inheritance_tasks)
  121. status_code, response = FederatedScheduler.create_job(job=job)
  122. if status_code != FederatedSchedulingStatusCode.SUCCESS:
  123. job.f_status = JobStatus.FAILED
  124. job.f_tag = "submit_failed"
  125. FederatedScheduler.sync_job_status(job=job)
  126. raise Exception("create job failed", response)
  127. else:
  128. need_run_components = {}
  129. for role in response:
  130. need_run_components[role] = {}
  131. for party, res in response[role].items():
  132. need_run_components[role][party] = [name for name, value in response[role][party]["data"]["components"].items() if value["need_run"] is True]
  133. if common_job_parameters.federated_mode == FederatedMode.MULTIPLE:
  134. # create the task holder in db to record information of all participants in the initiator for scheduling
  135. for role, party_ids in job.f_roles.items():
  136. for party_id in party_ids:
  137. if role == job.f_initiator_role and party_id == job.f_initiator_party_id:
  138. continue
  139. if not need_run_components[role][party_id]:
  140. continue
  141. JobController.initialize_tasks(job_id=job_id,
  142. role=role,
  143. party_id=party_id,
  144. run_on_this_party=False,
  145. initiator_role=job.f_initiator_role,
  146. initiator_party_id=job.f_initiator_party_id,
  147. job_parameters=common_job_parameters,
  148. dsl_parser=dsl_parser,
  149. components=need_run_components[role][party_id],
  150. runtime_conf=runtime_conf,
  151. is_scheduler=True)
  152. job.f_status = JobStatus.WAITING
  153. status_code, response = FederatedScheduler.sync_job_status(job=job)
  154. if status_code != FederatedSchedulingStatusCode.SUCCESS:
  155. raise Exception(f"set job to waiting status failed: {response}")
  156. schedule_logger(job_id).info(f"submit job successfully, job id is {job.f_job_id}, model id is {common_job_parameters.model_id}")
  157. logs_directory = job_utils.get_job_log_directory(job_id)
  158. result = {
  159. "code": RetCode.SUCCESS,
  160. "message": "success",
  161. "model_info": {"model_id": common_job_parameters.model_id, "model_version": common_job_parameters.model_version},
  162. "logs_directory": logs_directory,
  163. "board_url": job_utils.get_board_url(job_id, job_initiator["role"], job_initiator["party_id"])
  164. }
  165. warn_parameter = JobRuntimeConfigAdapter(submit_job_conf.runtime_conf).check_removed_parameter()
  166. if warn_parameter:
  167. result["message"] = f"[WARN]{warn_parameter} is removed,it does not take effect!"
  168. submit_result.update(result)
  169. submit_result.update(path_dict)
  170. except Exception as e:
  171. submit_result["code"] = RetCode.OPERATING_ERROR
  172. submit_result["message"] = exception_to_trace_string(e)
  173. schedule_logger(job_id).exception(e)
  174. return submit_result
  175. @classmethod
  176. def update_parameters(cls, job, job_parameters, component_parameters):
  177. updated_job_parameters, updated_component_parameters, updated_components = JobController.gen_updated_parameters(job_id=job.f_job_id,
  178. initiator_role=job.f_initiator_role,
  179. initiator_party_id=job.f_initiator_party_id,
  180. input_job_parameters=job_parameters,
  181. input_component_parameters=component_parameters)
  182. schedule_logger(job.f_job_id).info(f"components {updated_components} parameters has been updated")
  183. updated_parameters = {
  184. "job_parameters": updated_job_parameters,
  185. "component_parameters": updated_component_parameters,
  186. "components": updated_components
  187. }
  188. status_code, response = FederatedScheduler.update_parameter(job, updated_parameters=updated_parameters)
  189. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  190. return RetCode.SUCCESS, updated_parameters
  191. else:
  192. return RetCode.OPERATING_ERROR, response
  193. def run_do(self):
  194. schedule_logger().info("start schedule waiting jobs")
  195. jobs = JobSaver.query_job(is_initiator=True, status=JobStatus.WAITING, order_by="create_time", reverse=False)
  196. schedule_logger().info(f"have {len(jobs)} waiting jobs")
  197. if len(jobs):
  198. # FIFO
  199. job = jobs[0]
  200. schedule_logger().info(f"schedule waiting job {job.f_job_id}")
  201. try:
  202. self.schedule_waiting_jobs(job=job, lock=True)
  203. except Exception as e:
  204. schedule_logger(job.f_job_id).exception(e)
  205. schedule_logger(job.f_job_id).error("schedule waiting job failed")
  206. schedule_logger().info("schedule waiting jobs finished")
  207. schedule_logger().info("start schedule running jobs")
  208. jobs = JobSaver.query_job(is_initiator=True, status=JobStatus.RUNNING, order_by="create_time", reverse=False)
  209. schedule_logger().info(f"have {len(jobs)} running jobs")
  210. for job in jobs:
  211. schedule_logger().info(f"schedule running job {job.f_job_id}")
  212. try:
  213. self.schedule_running_job(job=job, lock=True)
  214. except Exception as e:
  215. schedule_logger(job.f_job_id).exception(e)
  216. schedule_logger(job.f_job_id).error("schedule job failed")
  217. schedule_logger().info("schedule running jobs finished")
  218. # some ready job exit before start
  219. schedule_logger().info("start schedule ready jobs")
  220. jobs = JobSaver.query_job(is_initiator=True, ready_signal=True, order_by="create_time", reverse=False)
  221. schedule_logger().info(f"have {len(jobs)} ready jobs")
  222. for job in jobs:
  223. schedule_logger().info(f"schedule ready job {job.f_job_id}")
  224. try:
  225. self.schedule_ready_job(job=job)
  226. except Exception as e:
  227. schedule_logger(job.f_job_id).exception(e)
  228. schedule_logger(job.f_job_id).error(f"schedule ready job failed:\n{e}")
  229. schedule_logger().info("schedule ready jobs finished")
  230. schedule_logger().info("start schedule rerun jobs")
  231. jobs = JobSaver.query_job(is_initiator=True, rerun_signal=True, order_by="create_time", reverse=False)
  232. schedule_logger().info(f"have {len(jobs)} rerun jobs")
  233. for job in jobs:
  234. schedule_logger(job.f_job_id).info(f"schedule rerun job {job.f_job_id}")
  235. try:
  236. self.schedule_rerun_job(job=job, lock=True)
  237. except Exception as e:
  238. schedule_logger(job.f_job_id).exception(e)
  239. schedule_logger(job.f_job_id).error("schedule job failed")
  240. schedule_logger().info("schedule rerun jobs finished")
  241. schedule_logger().info("start schedule end status jobs to update status")
  242. jobs = JobSaver.query_job(is_initiator=True, status=set(EndStatus.status_list()), end_time=[current_timestamp() - JobDefaultConfig.end_status_job_scheduling_time_limit, current_timestamp()])
  243. schedule_logger().info(f"have {len(jobs)} end status jobs")
  244. for job in jobs:
  245. schedule_logger().info(f"schedule end status job {job.f_job_id}")
  246. try:
  247. update_status = self.end_scheduling_updates(job_id=job.f_job_id)
  248. if update_status:
  249. schedule_logger(job.f_job_id).info("try update status by scheduling like running job")
  250. else:
  251. schedule_logger(job.f_job_id).info("the number of updates has been exceeded")
  252. continue
  253. self.schedule_running_job(job=job, force_sync_status=True, lock=True)
  254. except Exception as e:
  255. schedule_logger(job.f_job_id).exception(e)
  256. schedule_logger(job.f_job_id).error("schedule job failed")
  257. schedule_logger().info("schedule end status jobs finished")
  258. @classmethod
  259. @schedule_utils.schedule_lock
  260. def schedule_waiting_jobs(cls, job):
  261. job_id, initiator_role, initiator_party_id, = job.f_job_id, job.f_initiator_role, job.f_initiator_party_id,
  262. if job.f_cancel_signal:
  263. job.f_status = JobStatus.CANCELED
  264. FederatedScheduler.sync_job_status(job=job)
  265. schedule_logger(job_id).info("job have cancel signal")
  266. return
  267. if job.f_inheritance_status != JobInheritanceStatus.PASS:
  268. cls.check_component(job)
  269. schedule_logger(job_id).info("job dependence check")
  270. dependence_status_code, federated_dependence_response = FederatedScheduler.dependence_for_job(job=job)
  271. schedule_logger(job_id).info(f"dependence check: {dependence_status_code}, {federated_dependence_response}")
  272. if dependence_status_code == FederatedSchedulingStatusCode.SUCCESS:
  273. apply_status_code, federated_response = FederatedScheduler.resource_for_job(job=job, operation_type=ResourceOperation.APPLY)
  274. if apply_status_code == FederatedSchedulingStatusCode.SUCCESS:
  275. cls.start_job(job_id=job_id, initiator_role=initiator_role, initiator_party_id=initiator_party_id)
  276. else:
  277. # rollback resource
  278. rollback_party = {}
  279. failed_party = {}
  280. for dest_role in federated_response.keys():
  281. for dest_party_id in federated_response[dest_role].keys():
  282. retcode = federated_response[dest_role][dest_party_id]["retcode"]
  283. if retcode == 0:
  284. rollback_party[dest_role] = rollback_party.get(dest_role, [])
  285. rollback_party[dest_role].append(dest_party_id)
  286. else:
  287. failed_party[dest_role] = failed_party.get(dest_role, [])
  288. failed_party[dest_role].append(dest_party_id)
  289. schedule_logger(job_id).info("job apply resource failed on {}, rollback {}".format(
  290. ",".join([",".join([f"{_r}:{_p}" for _p in _ps]) for _r, _ps in failed_party.items()]),
  291. ",".join([",".join([f"{_r}:{_p}" for _p in _ps]) for _r, _ps in rollback_party.items()]),
  292. ))
  293. if rollback_party:
  294. return_status_code, federated_response = FederatedScheduler.resource_for_job(job=job, operation_type=ResourceOperation.RETURN, specific_dest=rollback_party)
  295. if return_status_code != FederatedSchedulingStatusCode.SUCCESS:
  296. schedule_logger(job_id).info(f"job return resource failed:\n{federated_response}")
  297. else:
  298. schedule_logger(job_id).info("job no party should be rollback resource")
  299. if apply_status_code == FederatedSchedulingStatusCode.ERROR:
  300. cls.stop_job(job_id=job_id, role=initiator_role, party_id=initiator_party_id, stop_status=JobStatus.FAILED)
  301. schedule_logger(job_id).info("apply resource error, stop job")
  302. else:
  303. retcode_set = set()
  304. for dest_role in federated_dependence_response.keys():
  305. for party_id in federated_dependence_response[dest_role].keys():
  306. retcode_set.add(federated_dependence_response[dest_role][party_id]["retcode"])
  307. if not retcode_set.issubset({RetCode.RUNNING, RetCode.SUCCESS}):
  308. FederatedScheduler.stop_job(job, StatusSet.FAILED)
  309. @classmethod
  310. def check_component(cls, job, check_type="inheritance"):
  311. schedule_logger(job.f_job_id).info("component check")
  312. dependence_status_code, response = FederatedScheduler.check_component(job=job, check_type=check_type)
  313. schedule_logger(job.f_job_id).info(f"component check response: {response}")
  314. dsl_parser = schedule_utils.get_job_dsl_parser(dsl=job.f_dsl,
  315. runtime_conf=job.f_runtime_conf,
  316. train_runtime_conf=job.f_train_runtime_conf)
  317. component_set = set([cpn.name for cpn in dsl_parser.get_source_connect_sub_graph(job.f_inheritance_info.get("component_list"))])
  318. for dest_role in response.keys():
  319. for party_id in response[dest_role].keys():
  320. component_set = component_set.intersection(set(response[dest_role][party_id].get("data")))
  321. if component_set != set(job.f_inheritance_info.get("component_list")):
  322. schedule_logger(job.f_job_id).info(f"dsl parser components:{component_set}")
  323. component_list = [cpn.name for cpn in dsl_parser.get_source_connect_sub_graph(list(component_set))]
  324. schedule_logger(job.f_job_id).info(f"parser result:{component_list}")
  325. command_body = {"inheritance_info": job.f_inheritance_info}
  326. command_body["inheritance_info"].update({"component_list": component_list})
  327. schedule_logger(job.f_job_id).info(f"start align job info:{command_body}")
  328. status_code, response = FederatedScheduler.align_args(job, command_body=command_body)
  329. schedule_logger(job.f_job_id).info(f"align result:{status_code}, {response}")
  330. schedule_logger(job.f_job_id).info("check success")
  331. @classmethod
  332. def schedule_ready_job(cls, job):
  333. job_id = job.f_job_id
  334. update_status = schedule_utils.ready_signal(job_id=job_id, set_or_reset=False, ready_timeout_ttl=60 * 1000)
  335. schedule_logger(job_id).info(f"reset job ready signal {update_status}")
  336. @classmethod
  337. @schedule_utils.schedule_lock
  338. def schedule_rerun_job(cls, job):
  339. if EndStatus.contains(job.f_status):
  340. job.f_status = JobStatus.WAITING
  341. job.f_ready_signal = False
  342. job.f_ready_time = None
  343. job.f_rerun_signal = False
  344. job.f_progress = 0
  345. job.f_end_time = None
  346. job.f_elapsed = None
  347. schedule_logger(job.f_job_id).info("job has been finished, set waiting to rerun")
  348. status, response = FederatedScheduler.sync_job_status(job=job)
  349. if status == FederatedSchedulingStatusCode.SUCCESS:
  350. schedule_utils.rerun_signal(job_id=job.f_job_id, set_or_reset=False)
  351. FederatedScheduler.sync_job(job=job, update_fields=["ready_signal", "ready_time", "rerun_signal", "progress", "end_time", "elapsed"])
  352. schedule_logger(job.f_job_id).info("job set waiting to rerun successfully")
  353. else:
  354. schedule_logger(job.f_job_id).info("job set waiting to rerun failed")
  355. else:
  356. schedule_utils.rerun_signal(job_id=job.f_job_id, set_or_reset=False)
  357. cls.schedule_running_job(job)
  358. @classmethod
  359. def start_job(cls, job_id, initiator_role, initiator_party_id):
  360. schedule_logger(job_id).info(f"try to start job on initiator {initiator_role} {initiator_party_id}")
  361. jobs = JobSaver.query_job(job_id=job_id, role=initiator_role, party_id=initiator_party_id)
  362. if jobs:
  363. job = jobs[0]
  364. FederatedScheduler.start_job(job=job)
  365. schedule_logger(job_id).info(f"start job on initiator {initiator_role} {initiator_party_id}")
  366. else:
  367. schedule_logger(job_id).error(f"can not found job on initiator {initiator_role} {initiator_party_id}")
  368. @classmethod
  369. @schedule_utils.schedule_lock
  370. def schedule_running_job(cls, job: Job, force_sync_status=False):
  371. schedule_logger(job.f_job_id).info("scheduling running job")
  372. dsl_parser = schedule_utils.get_job_dsl_parser(dsl=job.f_dsl,
  373. runtime_conf=job.f_runtime_conf_on_party,
  374. train_runtime_conf=job.f_train_runtime_conf)
  375. task_scheduling_status_code, auto_rerun_tasks, tasks = TaskScheduler.schedule(job=job, dsl_parser=dsl_parser, canceled=job.f_cancel_signal)
  376. tasks_status = dict([(task.f_component_name, task.f_status) for task in tasks])
  377. new_job_status = cls.calculate_job_status(task_scheduling_status_code=task_scheduling_status_code, tasks_status=tasks_status.values())
  378. if new_job_status == JobStatus.WAITING and job.f_cancel_signal:
  379. new_job_status = JobStatus.CANCELED
  380. total, finished_count = cls.calculate_job_progress(tasks_status=tasks_status)
  381. new_progress = float(finished_count) / total * 100
  382. schedule_logger(job.f_job_id).info(f"job status is {new_job_status}, calculate by task status list: {tasks_status}")
  383. if new_job_status != job.f_status or new_progress != job.f_progress:
  384. # Make sure to update separately, because these two fields update with anti-weight logic
  385. if int(new_progress) - job.f_progress > 0:
  386. job.f_progress = new_progress
  387. FederatedScheduler.sync_job(job=job, update_fields=["progress"])
  388. cls.update_job_on_initiator(initiator_job=job, update_fields=["progress"])
  389. if new_job_status != job.f_status:
  390. job.f_status = new_job_status
  391. if EndStatus.contains(job.f_status):
  392. FederatedScheduler.save_pipelined_model(job=job)
  393. FederatedScheduler.sync_job_status(job=job)
  394. cls.update_job_on_initiator(initiator_job=job, update_fields=["status"])
  395. if EndStatus.contains(job.f_status):
  396. cls.finish(job=job, end_status=job.f_status)
  397. if auto_rerun_tasks:
  398. schedule_logger(job.f_job_id).info("job have auto rerun tasks")
  399. cls.set_job_rerun(job_id=job.f_job_id, initiator_role=job.f_initiator_role, initiator_party_id=job.f_initiator_party_id, tasks=auto_rerun_tasks, auto=True)
  400. if force_sync_status:
  401. FederatedScheduler.sync_job_status(job=job)
  402. schedule_logger(job.f_job_id).info("finish scheduling running job")
  403. @classmethod
  404. def set_job_rerun(cls, job_id, initiator_role, initiator_party_id, auto, force=False,
  405. tasks: typing.List[Task] = None, component_name: typing.Union[str, list] = None):
  406. schedule_logger(job_id).info(f"try to rerun job on initiator {initiator_role} {initiator_party_id}")
  407. jobs = JobSaver.query_job(job_id=job_id, role=initiator_role, party_id=initiator_party_id)
  408. if not jobs:
  409. raise RuntimeError(f"can not found job on initiator {initiator_role} {initiator_party_id}")
  410. job = jobs[0]
  411. dsl_parser = schedule_utils.get_job_dsl_parser(dsl=job.f_dsl,
  412. runtime_conf=job.f_runtime_conf_on_party,
  413. train_runtime_conf=job.f_train_runtime_conf)
  414. component_name, force = cls.get_rerun_component(component_name, job, dsl_parser, force)
  415. schedule_logger(job_id).info(f"rerun component: {component_name}")
  416. if tasks:
  417. schedule_logger(job_id).info(f"require {[task.f_component_name for task in tasks]} to rerun")
  418. else:
  419. task_query = {
  420. 'job_id': job_id,
  421. 'role': initiator_role,
  422. 'party_id': initiator_party_id,
  423. }
  424. if not component_name or component_name == job_utils.PIPELINE_COMPONENT_NAME:
  425. # rerun all tasks
  426. schedule_logger(job_id).info("require all component of pipeline to rerun")
  427. else:
  428. _require_reruns = {component_name} if isinstance(component_name, str) else set(component_name)
  429. _should_reruns = _require_reruns.copy()
  430. for _cpn in _require_reruns:
  431. _components = dsl_parser.get_downstream_dependent_components(_cpn)
  432. for _c in _components:
  433. _should_reruns.add(_c.get_name())
  434. schedule_logger(job_id).info(f"require {_require_reruns} to rerun, "
  435. f"and then found {_should_reruns} need be to rerun")
  436. task_query['component_name'] = _should_reruns
  437. tasks = JobSaver.query_task(**task_query)
  438. job_can_rerun = any([TaskScheduler.prepare_rerun_task(
  439. job=job, task=task, dsl_parser=dsl_parser, auto=auto, force=force,
  440. ) for task in tasks])
  441. if not job_can_rerun:
  442. FederatedScheduler.sync_job_status(job=job)
  443. schedule_logger(job_id).info("job no task to rerun")
  444. return False
  445. schedule_logger(job_id).info("job set rerun signal")
  446. status = schedule_utils.rerun_signal(job_id=job_id, set_or_reset=True)
  447. schedule_logger(job_id).info(f"job set rerun signal {'successfully' if status else 'failed'}")
  448. return True
  449. @classmethod
  450. def get_rerun_component(cls, component_name, job, dsl_parser, force):
  451. if not component_name or component_name == job_utils.PIPELINE_COMPONENT_NAME:
  452. pass
  453. else:
  454. dependence_status_code, response = FederatedScheduler.check_component(job=job, check_type="rerun")
  455. success_task_list = [task.f_component_name for task in JobSaver.query_task(job_id=job.f_job_id, party_id=job.f_party_id, role=job.f_role,
  456. status=TaskStatus.SUCCESS, only_latest=True)]
  457. component_set = set()
  458. for dest_role in response.keys():
  459. for party_id in response[dest_role].keys():
  460. component_set = component_set.union(set(response[dest_role][party_id].get("data")))
  461. schedule_logger(job.f_job_id).info(f"success task list: {success_task_list}, check failed component list: {list(component_set)}")
  462. need_rerun = [cpn.name for cpn in dsl_parser.get_need_revisit_nodes(success_task_list, list(component_set))]
  463. schedule_logger(job.f_job_id).info(f"need rerun success component: {need_rerun}")
  464. if component_set:
  465. force = True
  466. if isinstance(component_name, str):
  467. component_name = set(need_rerun).union({component_name})
  468. else:
  469. component_name = set(need_rerun).union(set(component_name))
  470. return component_name, force
  471. @classmethod
  472. def update_job_on_initiator(cls, initiator_job: Job, update_fields: list):
  473. schedule_logger(initiator_job.f_job_id).info(f"try to update job {update_fields} on initiator")
  474. jobs = JobSaver.query_job(job_id=initiator_job.f_job_id)
  475. if not jobs:
  476. raise Exception("Failed to update job status on initiator")
  477. job_info = initiator_job.to_human_model_dict(only_primary_with=update_fields)
  478. for field in update_fields:
  479. job_info[field] = getattr(initiator_job, "f_%s" % field)
  480. for job in jobs:
  481. job_info["role"] = job.f_role
  482. job_info["party_id"] = job.f_party_id
  483. JobSaver.update_job_status(job_info=job_info)
  484. JobSaver.update_job(job_info=job_info)
  485. schedule_logger(initiator_job.f_job_id).info(f"update job {update_fields} on initiator finished")
  486. @classmethod
  487. def calculate_job_status(cls, task_scheduling_status_code, tasks_status):
  488. # 1. all waiting
  489. # 2. have running
  490. # 3. waiting + end status
  491. # 4. all end status and difference
  492. # 5. all the same end status
  493. tmp_status_set = set(tasks_status)
  494. if TaskStatus.PASS in tmp_status_set:
  495. tmp_status_set.remove(TaskStatus.PASS)
  496. tmp_status_set.add(TaskStatus.SUCCESS)
  497. if len(tmp_status_set) == 1:
  498. # 1 and 5
  499. return tmp_status_set.pop()
  500. else:
  501. if TaskStatus.RUNNING in tmp_status_set:
  502. # 2
  503. return JobStatus.RUNNING
  504. if TaskStatus.WAITING in tmp_status_set:
  505. # 3
  506. if task_scheduling_status_code == SchedulingStatusCode.HAVE_NEXT:
  507. return JobStatus.RUNNING
  508. else:
  509. # have waiting with no next
  510. pass
  511. # have waiting with no next or 4
  512. for status in sorted(InterruptStatus.status_list(), key=lambda s: StatusSet.get_level(status=s), reverse=True):
  513. if status in tmp_status_set:
  514. return status
  515. if tmp_status_set == {TaskStatus.WAITING, TaskStatus.SUCCESS} and task_scheduling_status_code == SchedulingStatusCode.NO_NEXT:
  516. return JobStatus.CANCELED
  517. raise Exception("calculate job status failed, all task status: {}".format(tasks_status))
  518. @classmethod
  519. def calculate_job_progress(cls, tasks_status):
  520. total = 0
  521. finished_count = 0
  522. for task_status in tasks_status.values():
  523. total += 1
  524. if EndStatus.contains(task_status):
  525. finished_count += 1
  526. return total, finished_count
  527. @classmethod
  528. def stop_job(cls, job_id, role, party_id, stop_status):
  529. schedule_logger(job_id).info(f"request stop job with {stop_status}")
  530. jobs = JobSaver.query_job(job_id=job_id, role=role, party_id=party_id, is_initiator=True)
  531. if len(jobs) > 0:
  532. if stop_status == JobStatus.CANCELED:
  533. schedule_logger(job_id).info("cancel job")
  534. set_cancel_status = schedule_utils.cancel_signal(job_id=job_id, set_or_reset=True)
  535. schedule_logger(job_id).info(f"set job cancel signal {set_cancel_status}")
  536. job = jobs[0]
  537. job.f_status = stop_status
  538. schedule_logger(job_id).info(f"request stop job with {stop_status} to all party")
  539. status_code, response = FederatedScheduler.stop_job(job=jobs[0], stop_status=stop_status)
  540. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  541. schedule_logger(job_id).info(f"stop job with {stop_status} successfully")
  542. return RetCode.SUCCESS, "success"
  543. else:
  544. initiator_tasks_group = JobSaver.get_tasks_asc(job_id=job.f_job_id, role=job.f_role, party_id=job.f_party_id)
  545. for initiator_task in initiator_tasks_group.values():
  546. TaskScheduler.collect_task_of_all_party(job, initiator_task=initiator_task, set_status=stop_status)
  547. schedule_logger(job_id).info(f"stop job with {stop_status} failed, {response}")
  548. return RetCode.FEDERATED_ERROR, json_dumps(response)
  549. else:
  550. return RetCode.SUCCESS, "can not found job"
  551. @classmethod
  552. @DB.connection_context()
  553. def end_scheduling_updates(cls, job_id):
  554. operate = Job.update({Job.f_end_scheduling_updates: Job.f_end_scheduling_updates + 1}).where(Job.f_job_id == job_id,
  555. Job.f_end_scheduling_updates < JobDefaultConfig.end_status_job_scheduling_updates)
  556. update_status = operate.execute() > 0
  557. return update_status
  558. @classmethod
  559. def finish(cls, job, end_status):
  560. schedule_logger(job.f_job_id).info(f"job finished with {end_status}, do something...")
  561. cls.stop_job(job_id=job.f_job_id, role=job.f_initiator_role, party_id=job.f_initiator_party_id, stop_status=end_status)
  562. FederatedScheduler.clean_job(job=job)
  563. schedule_logger(job.f_job_id).info(f"job finished with {end_status}, done")