task_controller.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import os
  17. from fate_arch.common import FederatedCommunicationType
  18. from fate_flow.utils.job_utils import asynchronous_function
  19. from fate_flow.utils.log_utils import schedule_logger
  20. from fate_flow.controller.engine_adapt import build_engine
  21. from fate_flow.db.db_models import Task
  22. from fate_flow.scheduler.federated_scheduler import FederatedScheduler
  23. from fate_flow.entity.run_status import TaskStatus, EndStatus
  24. from fate_flow.utils import job_utils
  25. from fate_flow.operation.job_saver import JobSaver
  26. from fate_arch.common.base_utils import json_dumps, current_timestamp
  27. from fate_arch.common import base_utils
  28. from fate_flow.entity import RunParameters
  29. from fate_flow.manager.resource_manager import ResourceManager
  30. from fate_flow.operation.job_tracker import Tracker
  31. from fate_flow.manager.worker_manager import WorkerManager
  32. from fate_flow.entity.types import TaskCleanResourceType
  33. class TaskController(object):
  34. INITIATOR_COLLECT_FIELDS = ["status", "party_status", "start_time", "update_time", "end_time", "elapsed"]
  35. @classmethod
  36. def create_task(cls, role, party_id, run_on_this_party, task_info):
  37. task_info["role"] = role
  38. task_info["party_id"] = str(party_id)
  39. task_info["status"] = TaskStatus.WAITING
  40. task_info["party_status"] = TaskStatus.WAITING
  41. task_info["create_time"] = base_utils.current_timestamp()
  42. task_info["run_on_this_party"] = run_on_this_party
  43. if task_info.get("task_id") is None:
  44. task_info["task_id"] = job_utils.generate_task_id(job_id=task_info["job_id"], component_name=task_info["component_name"])
  45. if task_info.get("task_version") is None:
  46. task_info["task_version"] = 0
  47. task = JobSaver.create_task(task_info=task_info)
  48. @classmethod
  49. def start_task(cls, job_id, component_name, task_id, task_version, role, party_id, **kwargs):
  50. """
  51. Start task, update status and party status
  52. :param job_id:
  53. :param component_name:
  54. :param task_id:
  55. :param task_version:
  56. :param role:
  57. :param party_id:
  58. :return:
  59. """
  60. job_dsl = job_utils.get_job_dsl(job_id, role, party_id)
  61. schedule_logger(job_id).info(
  62. f"try to start task {task_id} {task_version} on {role} {party_id} executor subprocess")
  63. task_executor_process_start_status = False
  64. task_info = {
  65. "job_id": job_id,
  66. "task_id": task_id,
  67. "task_version": task_version,
  68. "role": role,
  69. "party_id": party_id,
  70. }
  71. is_failed = False
  72. try:
  73. task = JobSaver.query_task(task_id=task_id, task_version=task_version, role=role, party_id=party_id)[0]
  74. run_parameters_dict = job_utils.get_job_parameters(job_id, role, party_id)
  75. run_parameters_dict["src_user"] = kwargs.get("src_user")
  76. run_parameters = RunParameters(**run_parameters_dict)
  77. config_dir = job_utils.get_task_directory(job_id, role, party_id, component_name, task_id, task_version)
  78. os.makedirs(config_dir, exist_ok=True)
  79. run_parameters_path = os.path.join(config_dir, 'task_parameters.json')
  80. with open(run_parameters_path, 'w') as fw:
  81. fw.write(json_dumps(run_parameters_dict))
  82. schedule_logger(job_id).info(f"use computing engine {run_parameters.computing_engine}")
  83. task_info["engine_conf"] = {"computing_engine": run_parameters.computing_engine}
  84. backend_engine = build_engine(run_parameters.computing_engine)
  85. run_info = backend_engine.run(task=task,
  86. run_parameters=run_parameters,
  87. run_parameters_path=run_parameters_path,
  88. config_dir=config_dir,
  89. log_dir=job_utils.get_job_log_directory(job_id, role, party_id, component_name),
  90. cwd_dir=job_utils.get_job_directory(job_id, role, party_id, component_name),
  91. user_name=kwargs.get("user_id"))
  92. task_info.update(run_info)
  93. task_info["start_time"] = current_timestamp()
  94. task_executor_process_start_status = True
  95. except Exception as e:
  96. schedule_logger(job_id).exception(e)
  97. is_failed = True
  98. finally:
  99. try:
  100. cls.update_task(task_info=task_info)
  101. task_info["party_status"] = TaskStatus.RUNNING
  102. cls.update_task_status(task_info=task_info)
  103. if is_failed:
  104. task_info["party_status"] = TaskStatus.FAILED
  105. cls.update_task_status(task_info=task_info)
  106. except Exception as e:
  107. schedule_logger(job_id).exception(e)
  108. schedule_logger(job_id).info(
  109. "task {} {} on {} {} executor subprocess start {}".format(task_id, task_version, role, party_id, "success" if task_executor_process_start_status else "failed"))
  110. @classmethod
  111. def update_task(cls, task_info):
  112. """
  113. Save to local database and then report to Initiator
  114. :param task_info:
  115. :return:
  116. """
  117. update_status = False
  118. try:
  119. update_status = JobSaver.update_task(task_info=task_info)
  120. cls.report_task_to_initiator(task_info=task_info)
  121. except Exception as e:
  122. schedule_logger(task_info["job_id"]).exception(e)
  123. finally:
  124. return update_status
  125. @classmethod
  126. def update_task_status(cls, task_info):
  127. update_status = JobSaver.update_task_status(task_info=task_info)
  128. if update_status and EndStatus.contains(task_info.get("status")):
  129. ResourceManager.return_task_resource(task_info=task_info)
  130. cls.clean_task(job_id=task_info["job_id"],
  131. task_id=task_info["task_id"],
  132. task_version=task_info["task_version"],
  133. role=task_info["role"],
  134. party_id=task_info["party_id"],
  135. content_type=TaskCleanResourceType.TABLE,
  136. is_asynchronous=True)
  137. cls.report_task_to_initiator(task_info=task_info)
  138. return update_status
  139. @classmethod
  140. def report_task_to_initiator(cls, task_info):
  141. tasks = JobSaver.query_task(task_id=task_info["task_id"],
  142. task_version=task_info["task_version"],
  143. role=task_info["role"],
  144. party_id=task_info["party_id"])
  145. if task_info.get("error_report"):
  146. tasks[0].f_error_report = task_info.get("error_report")
  147. if tasks[0].f_federated_status_collect_type == FederatedCommunicationType.PUSH:
  148. FederatedScheduler.report_task_to_initiator(task=tasks[0])
  149. @classmethod
  150. def collect_task(cls, job_id, component_name, task_id, task_version, role, party_id):
  151. tasks = JobSaver.query_task(job_id=job_id, component_name=component_name, task_id=task_id, task_version=task_version, role=role, party_id=party_id)
  152. if tasks:
  153. return tasks[0].to_human_model_dict(only_primary_with=cls.INITIATOR_COLLECT_FIELDS)
  154. else:
  155. return None
  156. @classmethod
  157. @asynchronous_function
  158. def stop_task(cls, task, stop_status):
  159. """
  160. Try to stop the task, but the status depends on the final operation result
  161. :param task:
  162. :param stop_status:
  163. :return:
  164. """
  165. kill_status = cls.kill_task(task=task)
  166. task_info = {
  167. "job_id": task.f_job_id,
  168. "task_id": task.f_task_id,
  169. "task_version": task.f_task_version,
  170. "role": task.f_role,
  171. "party_id": task.f_party_id,
  172. "party_status": stop_status,
  173. "kill_status": True
  174. }
  175. cls.update_task_status(task_info=task_info)
  176. cls.update_task(task_info=task_info)
  177. return kill_status
  178. @classmethod
  179. def kill_task(cls, task: Task):
  180. kill_status = False
  181. try:
  182. # kill task executor
  183. backend_engine = build_engine(task.f_engine_conf.get("computing_engine"))
  184. if backend_engine:
  185. backend_engine.kill(task)
  186. WorkerManager.kill_task_all_workers(task)
  187. except Exception as e:
  188. schedule_logger(task.f_job_id).exception(e)
  189. else:
  190. kill_status = True
  191. finally:
  192. schedule_logger(task.f_job_id).info(
  193. 'task {} {} on {} {} process {} kill {}'.format(task.f_task_id,
  194. task.f_task_version,
  195. task.f_role,
  196. task.f_party_id,
  197. task.f_run_pid,
  198. 'success' if kill_status else 'failed'))
  199. return kill_status
  200. @classmethod
  201. @asynchronous_function
  202. def clean_task(cls, job_id, task_id, task_version, role, party_id, content_type: TaskCleanResourceType):
  203. status = set()
  204. if content_type == TaskCleanResourceType.METRICS:
  205. tracker = Tracker(job_id=job_id, role=role, party_id=party_id, task_id=task_id, task_version=task_version)
  206. status.add(tracker.clean_metrics())
  207. elif content_type == TaskCleanResourceType.TABLE:
  208. jobs = JobSaver.query_job(job_id=job_id, role=role, party_id=party_id)
  209. if jobs:
  210. job = jobs[0]
  211. job_parameters = RunParameters(**job.f_runtime_conf_on_party["job_parameters"])
  212. tracker = Tracker(job_id=job_id, role=role, party_id=party_id, task_id=task_id, task_version=task_version, job_parameters=job_parameters)
  213. status.add(tracker.clean_task())
  214. if len(status) == 1 and True in status:
  215. return True
  216. else:
  217. return False