federated_scheduler.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from fate_arch.common import base_utils
  17. from fate_flow.scheduler import SchedulerBase
  18. from fate_flow.utils.api_utils import federated_api
  19. from fate_flow.utils.log_utils import start_log, failed_log, successful_log, warning_log
  20. from fate_flow.utils.log_utils import schedule_logger
  21. from fate_flow.entity import RetCode
  22. from fate_flow.entity.run_status import FederatedSchedulingStatusCode
  23. from fate_flow.entity.types import ResourceOperation
  24. from fate_flow.db.db_models import Job, Task
  25. from fate_flow.operation.job_saver import JobSaver
  26. import threading
  27. from fate_flow.entity.types import TaskCleanResourceType
  28. class FederatedScheduler(SchedulerBase):
  29. """
  30. Send commands to party,
  31. Report info to initiator
  32. """
  33. # Task
  34. REPORT_TO_INITIATOR_FIELDS = ["party_status", "start_time", "update_time", "end_time", "elapsed", "error_report"]
  35. # Job
  36. @classmethod
  37. def create_job(cls, job: Job):
  38. return cls.job_command(job=job, command="create", command_body=job.to_human_model_dict(), parallel=False)
  39. @classmethod
  40. def update_parameter(cls, job: Job, updated_parameters):
  41. return cls.job_command(job=job, command="parameter/update", command_body=updated_parameters, parallel=False)
  42. @classmethod
  43. def resource_for_job(cls, job, operation_type: ResourceOperation, specific_dest=None):
  44. schedule_logger(job.f_job_id).info(f"try to {operation_type} job resource")
  45. status_code, response = cls.job_command(job=job, command=f"resource/{operation_type.value}", specific_dest=specific_dest)
  46. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  47. schedule_logger(job.f_job_id).info(f"{operation_type} job resource successfully")
  48. else:
  49. schedule_logger(job.f_job_id).info(f"{operation_type} job resource failed")
  50. return status_code, response
  51. @classmethod
  52. def check_component(cls, job, check_type, specific_dest=None):
  53. schedule_logger(job.f_job_id).info(f"try to check component inheritance dependence")
  54. status_code, response = cls.job_command(job=job, command=f"component/{check_type}/check", specific_dest=specific_dest)
  55. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  56. schedule_logger(job.f_job_id).info(f"check job dependence successfully")
  57. else:
  58. schedule_logger(job.f_job_id).info(f"check job dependence failed")
  59. return status_code, response
  60. @classmethod
  61. def dependence_for_job(cls, job, specific_dest=None):
  62. schedule_logger(job.f_job_id).info(f"try to check job dependence")
  63. status_code, response = cls.job_command(job=job, command=f"dependence/check", specific_dest=specific_dest)
  64. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  65. schedule_logger(job.f_job_id).info(f"check job dependence successfully")
  66. else:
  67. schedule_logger(job.f_job_id).info(f"check job dependence failed")
  68. return status_code, response
  69. @classmethod
  70. def connect(cls, job):
  71. return cls.job_command(job=job, command="align", command_body={"job_id": job.f_job_id, "role": job.f_role,
  72. "party_id": job.f_party_id})
  73. @classmethod
  74. def start_job(cls, job, command_body=None):
  75. return cls.job_command(job=job, command="start", command_body=command_body)
  76. @classmethod
  77. def align_args(cls, job, command_body):
  78. return cls.job_command(job=job, command="align", command_body=command_body)
  79. @classmethod
  80. def sync_job(cls, job, update_fields):
  81. sync_info = job.to_human_model_dict(only_primary_with=update_fields)
  82. schedule_logger(job.f_job_id).info("sync job info to all party")
  83. status_code, response = cls.job_command(job=job, command="update", command_body=sync_info)
  84. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  85. schedule_logger(job.f_job_id).info("sync job info to all party successfully")
  86. else:
  87. schedule_logger(job.f_job_id).info(f"sync job info to all party failed: \n{response}")
  88. return status_code, response
  89. @classmethod
  90. def sync_job_status(cls, job):
  91. schedule_logger(job.f_job_id).info(f"job is {job.f_status}, sync to all party")
  92. status_code, response = cls.job_command(job=job, command=f"status/{job.f_status}", command_body=job.to_human_model_dict())
  93. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  94. schedule_logger(job.f_job_id).info(f"sync job status {job.f_status} to all party success")
  95. else:
  96. schedule_logger(job.f_job_id).info(f"sync job status {job.f_status} to all party failed: \n{response}")
  97. return status_code, response
  98. @classmethod
  99. def save_pipelined_model(cls, job):
  100. schedule_logger(job.f_job_id).info("try to save job pipelined model")
  101. status_code, response = cls.job_command(job=job, command="model")
  102. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  103. schedule_logger(job.f_job_id).info("save job pipelined model success")
  104. else:
  105. schedule_logger(job.f_job_id).info(f"save job pipelined model failed:\n{response}")
  106. return status_code, response
  107. @classmethod
  108. def stop_job(cls, job, stop_status):
  109. schedule_logger(job.f_job_id).info("try to stop job")
  110. job.f_status = stop_status
  111. status_code, response = cls.job_command(job=job, command="stop/{}".format(stop_status))
  112. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  113. schedule_logger(job.f_job_id).info("stop job success")
  114. else:
  115. schedule_logger(job.f_job_id).info(f"stop job failed:\n{response}")
  116. return status_code, response
  117. @classmethod
  118. def request_stop_job(cls, job, stop_status, command_body=None):
  119. return cls.job_command(job=job, command="stop/{}".format(stop_status), dest_only_initiator=True, command_body=command_body)
  120. @classmethod
  121. def request_rerun_job(cls, job, command_body):
  122. return cls.job_command(job=job, command="rerun", command_body=command_body, dest_only_initiator=True)
  123. @classmethod
  124. def clean_job(cls, job):
  125. schedule_logger(job.f_job_id).info("try to clean job")
  126. status_code, response = cls.job_command(job=job, command="clean", command_body=job.f_runtime_conf_on_party["role"].copy())
  127. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  128. schedule_logger(job.f_job_id).info("clean job success")
  129. else:
  130. schedule_logger(job.f_job_id).info(f"clean job failed:\n{response}")
  131. return status_code, response
  132. @classmethod
  133. def job_command(cls, job, command, command_body=None, dest_only_initiator=False, specific_dest=None, parallel=False):
  134. federated_response = {}
  135. job_parameters = job.f_runtime_conf_on_party["job_parameters"]
  136. if dest_only_initiator:
  137. dest_partis = [(job.f_initiator_role, [job.f_initiator_party_id])]
  138. api_type = "initiator"
  139. elif specific_dest:
  140. dest_partis = specific_dest.items()
  141. api_type = "party"
  142. else:
  143. dest_partis = job.f_roles.items()
  144. api_type = "party"
  145. threads = []
  146. for dest_role, dest_party_ids in dest_partis:
  147. federated_response[dest_role] = {}
  148. for dest_party_id in dest_party_ids:
  149. endpoint = f"/{api_type}/{job.f_job_id}/{dest_role}/{dest_party_id}/{command}"
  150. args = (job.f_job_id, job.f_role, job.f_party_id, dest_role, dest_party_id, endpoint, command_body, job_parameters["federated_mode"], federated_response)
  151. if parallel:
  152. t = threading.Thread(target=cls.federated_command, args=args)
  153. threads.append(t)
  154. t.start()
  155. else:
  156. cls.federated_command(*args)
  157. for thread in threads:
  158. thread.join()
  159. return cls.return_federated_response(federated_response=federated_response)
  160. @classmethod
  161. def create_task(cls, job, task):
  162. return cls.task_command(job=job, task=task, command="create", command_body=task.to_human_model_dict())
  163. @classmethod
  164. def start_task(cls, job, task):
  165. return cls.task_command(job=job, task=task, command="start", command_body={}, need_user=True)
  166. @classmethod
  167. def collect_task(cls, job, task):
  168. return cls.task_command(job=job, task=task, command="collect")
  169. @classmethod
  170. def sync_task(cls, job, task, update_fields):
  171. sync_info = task.to_human_model_dict(only_primary_with=update_fields)
  172. schedule_logger(task.f_job_id).info("sync task {} {} info to all party".format(task.f_task_id, task.f_task_version))
  173. status_code, response = cls.task_command(job=job, task=task, command="update", command_body=sync_info)
  174. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  175. schedule_logger(task.f_job_id).info("sync task {} {} info to all party successfully".format(task.f_task_id, task.f_task_version))
  176. else:
  177. schedule_logger(task.f_job_id).info("sync task {} {} info to all party failed: \n{}".format(task.f_task_id, task.f_task_version, response))
  178. return status_code, response
  179. @classmethod
  180. def sync_task_status(cls, job, task):
  181. schedule_logger(task.f_job_id).info("task {} {} is {}, sync to all party".format(task.f_task_id, task.f_task_version, task.f_status))
  182. status_code, response = cls.task_command(job=job, task=task, command=f"status/{task.f_status}")
  183. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  184. schedule_logger(task.f_job_id).info("sync task {} {} status {} to all party success".format(task.f_task_id, task.f_task_version, task.f_status))
  185. else:
  186. schedule_logger(task.f_job_id).info("sync task {} {} status {} to all party failed: \n{}".format(task.f_task_id, task.f_task_version, task.f_status, response))
  187. return status_code, response
  188. @classmethod
  189. def stop_task(cls, job, task, stop_status, command_body=None):
  190. schedule_logger(task.f_job_id).info("try to stop task {} {}".format(task.f_task_id, task.f_task_version))
  191. task.f_status = stop_status
  192. status_code, response = cls.task_command(job=job, task=task, command="stop/{}".format(stop_status), command_body=command_body)
  193. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  194. schedule_logger(job.f_job_id).info("stop task {} {} success".format(task.f_task_id, task.f_task_version))
  195. else:
  196. schedule_logger(job.f_job_id).info("stop task {} {} failed:\n{}".format(task.f_task_id, task.f_task_version, response))
  197. return status_code, response
  198. @classmethod
  199. def clean_task(cls, job, task, content_type: TaskCleanResourceType):
  200. schedule_logger(task.f_job_id).info("try to clean task {} {} {}".format(task.f_task_id, task.f_task_version, content_type))
  201. status_code, response = cls.task_command(job=job, task=task, command="clean/{}".format(content_type.value))
  202. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  203. schedule_logger(job.f_job_id).info("clean task {} {} {} successfully".format(task.f_task_id, task.f_task_version, content_type))
  204. else:
  205. schedule_logger(job.f_job_id).info("clean task {} {} {} failed:\n{}".format(task.f_task_id, task.f_task_version, content_type, response))
  206. return status_code, response
  207. @classmethod
  208. def task_command(cls, job: Job, task: Task, command, command_body=None, parallel=False, need_user=False):
  209. msg = f"execute federated task {task.f_component_name} command({command})"
  210. federated_response = {}
  211. job_parameters = job.f_runtime_conf_on_party["job_parameters"]
  212. tasks = JobSaver.query_task(task_id=task.f_task_id, only_latest=True)
  213. threads = []
  214. for task in tasks:
  215. dest_role, dest_party_id = task.f_role, task.f_party_id
  216. federated_response[dest_role] = federated_response.get(dest_role, {})
  217. endpoint = f"/party/{task.f_job_id}/{task.f_component_name}/{task.f_task_id}/{task.f_task_version}/{dest_role}/{dest_party_id}/{command}"
  218. if need_user:
  219. command_body["user_id"] = job.f_user.get(dest_role, {}).get(str(dest_party_id), "")
  220. schedule_logger(job.f_job_id).info(f'user:{job.f_user}, dest_role:{dest_role}, dest_party_id:{dest_party_id}')
  221. schedule_logger(job.f_job_id).info(f'command_body: {command_body}')
  222. args = (job.f_job_id, job.f_role, job.f_party_id, dest_role, dest_party_id, endpoint, command_body, job_parameters["federated_mode"], federated_response)
  223. if parallel:
  224. t = threading.Thread(target=cls.federated_command, args=args)
  225. threads.append(t)
  226. t.start()
  227. else:
  228. cls.federated_command(*args)
  229. for thread in threads:
  230. thread.join()
  231. status_code, response = cls.return_federated_response(federated_response=federated_response)
  232. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  233. schedule_logger(job.f_job_id).info(successful_log(msg))
  234. elif status_code == FederatedSchedulingStatusCode.NOT_EFFECTIVE:
  235. schedule_logger(job.f_job_id).warning(warning_log(msg))
  236. elif status_code == FederatedSchedulingStatusCode.ERROR:
  237. schedule_logger(job.f_job_id).critical(failed_log(msg, detail=response))
  238. else:
  239. schedule_logger(job.f_job_id).error(failed_log(msg, detail=response))
  240. return status_code, response
  241. @classmethod
  242. def federated_command(cls, job_id, src_role, src_party_id, dest_role, dest_party_id, endpoint, body, federated_mode, federated_response):
  243. st = base_utils.current_timestamp()
  244. log_msg = f"sending {endpoint} federated command"
  245. schedule_logger(job_id).info(start_log(msg=log_msg))
  246. try:
  247. response = federated_api(job_id=job_id,
  248. method='POST',
  249. endpoint=endpoint,
  250. src_role=src_role,
  251. src_party_id=src_party_id,
  252. dest_party_id=dest_party_id,
  253. json_body=body if body else {},
  254. federated_mode=federated_mode)
  255. except Exception as e:
  256. schedule_logger(job_id=job_id).exception(e)
  257. response = {
  258. "retcode": RetCode.FEDERATED_ERROR,
  259. "retmsg": "Federated schedule error, {}".format(e)
  260. }
  261. if response["retcode"] != RetCode.SUCCESS:
  262. if response["retcode"] in [RetCode.NOT_EFFECTIVE, RetCode.RUNNING]:
  263. schedule_logger(job_id).warning(warning_log(msg=log_msg, role=dest_role, party_id=dest_party_id))
  264. else:
  265. schedule_logger(job_id).error(failed_log(msg=log_msg, role=dest_role, party_id=dest_party_id, detail=response["retmsg"]))
  266. federated_response[dest_role][dest_party_id] = response
  267. et = base_utils.current_timestamp()
  268. schedule_logger(job_id).info(f"{log_msg} use {et - st} ms")
  269. @classmethod
  270. def report_task_to_initiator(cls, task: Task):
  271. """
  272. :param task:
  273. :return:
  274. """
  275. if task.f_role != task.f_initiator_role and task.f_party_id != task.f_initiator_party_id:
  276. try:
  277. response = federated_api(job_id=task.f_job_id,
  278. method='POST',
  279. endpoint='/initiator/{}/{}/{}/{}/{}/{}/report'.format(
  280. task.f_job_id,
  281. task.f_component_name,
  282. task.f_task_id,
  283. task.f_task_version,
  284. task.f_role,
  285. task.f_party_id),
  286. src_party_id=task.f_party_id,
  287. dest_party_id=task.f_initiator_party_id,
  288. src_role=task.f_role,
  289. json_body=task.to_human_model_dict(only_primary_with=cls.REPORT_TO_INITIATOR_FIELDS),
  290. federated_mode=task.f_federated_mode)
  291. except Exception as e:
  292. schedule_logger(task.f_job_id).error(f"report task to initiator error: {e}")
  293. return False
  294. if response["retcode"] != RetCode.SUCCESS:
  295. retmsg = response["retmsg"]
  296. schedule_logger(task.f_job_id).error(f"report task to initiator error: {retmsg}")
  297. return False
  298. else:
  299. return True
  300. else:
  301. return False
  302. @classmethod
  303. def tracker_command(cls, job, request_data, command, json_body=None):
  304. job_parameters = job.f_runtime_conf_on_party["job_parameters"]
  305. response = federated_api(job_id=str(request_data['job_id']),
  306. method='POST',
  307. endpoint='/tracker/{}/{}/{}/{}/{}'.format(
  308. request_data['job_id'],
  309. request_data['component_name'],
  310. request_data['role'],
  311. request_data['party_id'],
  312. command),
  313. src_party_id=job.f_party_id,
  314. dest_party_id=request_data['party_id'],
  315. src_role=job.f_role,
  316. json_body=json_body if json_body else {},
  317. federated_mode=job_parameters["federated_mode"])
  318. return response