party_app.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from flask import request
  17. from fate_flow.controller.job_controller import JobController
  18. from fate_flow.controller.task_controller import TaskController
  19. from fate_flow.entity import RetCode
  20. from fate_flow.entity.types import TaskCleanResourceType
  21. from fate_flow.manager.dependence_manager import DependenceManager
  22. from fate_flow.manager.resource_manager import ResourceManager
  23. from fate_flow.operation.job_saver import JobSaver
  24. from fate_flow.utils.api_utils import get_json_result, create_job_request_check
  25. from fate_flow.utils.task_utils import task_request_proxy
  26. @manager.route('/<job_id>/<role>/<party_id>/create', methods=['POST'])
  27. @create_job_request_check
  28. def create_job(job_id, role, party_id):
  29. try:
  30. result = JobController.create_job(job_id=job_id, role=role, party_id=int(party_id), job_info=request.json)
  31. return get_json_result(retcode=0, retmsg='success', data=result)
  32. except RuntimeError as e:
  33. return get_json_result(retcode=RetCode.OPERATING_ERROR, retmsg=str(e), data={"job_id": job_id})
  34. @manager.route('/<job_id>/<role>/<party_id>/component/inheritance/check', methods=['POST'])
  35. def component_inheritance_check(job_id, role, party_id):
  36. job = JobSaver.query_job(job_id=job_id, role=role, party_id=party_id)[0]
  37. component_list = DependenceManager.component_check(job, check_type="inheritance")
  38. return get_json_result(data=component_list)
  39. @manager.route('/<job_id>/<role>/<party_id>/component/rerun/check', methods=['POST'])
  40. def component_rerun_check(job_id, role, party_id):
  41. job = JobSaver.query_job(job_id=job_id, role=role, party_id=party_id)[0]
  42. component_list = DependenceManager.component_check(job, check_type="rerun")
  43. return get_json_result(data=component_list)
  44. @manager.route('/<job_id>/<role>/<party_id>/dependence/check', methods=['POST'])
  45. def check_dependence(job_id, role, party_id):
  46. job = JobSaver.query_job(job_id=job_id, role=role, party_id=party_id)[0]
  47. status = DependenceManager.check_job_dependence(job)
  48. if status:
  49. return get_json_result(retcode=0, retmsg='success')
  50. else:
  51. return get_json_result(retcode=RetCode.RUNNING,
  52. retmsg=f"check for job {job_id} dependence failed, "
  53. f"dependencies are being installed automatically, it may take a few minutes")
  54. @manager.route('/<job_id>/<role>/<party_id>/resource/apply', methods=['POST'])
  55. def apply_resource(job_id, role, party_id):
  56. status = ResourceManager.apply_for_job_resource(job_id=job_id, role=role, party_id=int(party_id))
  57. if status:
  58. return get_json_result(retcode=0, retmsg='success')
  59. else:
  60. return get_json_result(retcode=RetCode.OPERATING_ERROR, retmsg=f"apply for job {job_id} resource failed")
  61. @manager.route('/<job_id>/<role>/<party_id>/resource/return', methods=['POST'])
  62. def return_resource(job_id, role, party_id):
  63. status = ResourceManager.return_job_resource(job_id=job_id, role=role, party_id=int(party_id))
  64. if status:
  65. return get_json_result(retcode=0, retmsg='success')
  66. else:
  67. return get_json_result(retcode=RetCode.OPERATING_ERROR, retmsg=f"apply for job {job_id} resource failed")
  68. @manager.route('/<job_id>/<role>/<party_id>/start', methods=['POST'])
  69. def start_job(job_id, role, party_id):
  70. JobController.start_job(job_id=job_id, role=role, party_id=int(party_id), extra_info=request.json)
  71. return get_json_result(retcode=0, retmsg='success')
  72. @manager.route('/<job_id>/<role>/<party_id>/align', methods=['POST'])
  73. def align_job_args(job_id, role, party_id):
  74. JobController.align_job_args(job_info=request.json, role=role, party_id=party_id, job_id=job_id)
  75. return get_json_result(retcode=0, retmsg='success')
  76. @manager.route('/<job_id>/<role>/<party_id>/update', methods=['POST'])
  77. def update_job(job_id, role, party_id):
  78. job_info = {}
  79. job_info.update(request.json)
  80. job_info.update({
  81. "job_id": job_id,
  82. "role": role,
  83. "party_id": party_id
  84. })
  85. JobController.update_job(job_info=job_info)
  86. return get_json_result(retcode=0, retmsg='success')
  87. @manager.route('/<job_id>/<role>/<party_id>/parameter/update', methods=['POST'])
  88. def update_parameters(job_id, role, party_id):
  89. JobController.update_parameter(job_id=job_id, role=role, party_id=party_id, updated_parameters=request.json)
  90. return get_json_result(retcode=0, retmsg='success')
  91. @manager.route('/<job_id>/<role>/<party_id>/status/<status>', methods=['POST'])
  92. def job_status(job_id, role, party_id, status):
  93. job_info = request.json
  94. # some value of job_info is initiator, should be updated
  95. job_info.update({
  96. "job_id": job_id,
  97. "role": role,
  98. "party_id": party_id,
  99. "status": status
  100. })
  101. if JobController.update_job_status(job_info=job_info):
  102. return get_json_result(retcode=0, retmsg='success')
  103. else:
  104. return get_json_result(retcode=RetCode.NOT_EFFECTIVE, retmsg="update job status does not take effect")
  105. @manager.route('/<job_id>/<role>/<party_id>/model', methods=['POST'])
  106. def save_pipelined_model(job_id, role, party_id):
  107. JobController.save_pipelined_model(job_id=job_id, role=role, party_id=party_id)
  108. return get_json_result(retcode=0, retmsg='success')
  109. @manager.route('/<job_id>/<role>/<party_id>/stop/<stop_status>', methods=['POST'])
  110. def stop_job(job_id, role, party_id, stop_status):
  111. kill_status, kill_details = JobController.stop_jobs(job_id=job_id, stop_status=stop_status, role=role, party_id=party_id)
  112. return get_json_result(retcode=RetCode.SUCCESS if kill_status else RetCode.EXCEPTION_ERROR,
  113. retmsg='success' if kill_status else 'failed',
  114. data=kill_details)
  115. @manager.route('/<job_id>/<role>/<party_id>/clean', methods=['POST'])
  116. def clean(job_id, role, party_id):
  117. JobController.clean_job(job_id=job_id, role=role, party_id=party_id, roles=request.json)
  118. return get_json_result(retcode=0, retmsg='success')
  119. # Control API for task
  120. @manager.route('/<job_id>/<component_name>/<task_id>/<task_version>/<role>/<party_id>/create', methods=['POST'])
  121. def create_task(job_id, component_name, task_id, task_version, role, party_id):
  122. JobController.initialize_task(role, party_id, request.json)
  123. return get_json_result(retcode=0, retmsg='success')
  124. @manager.route('/<job_id>/<component_name>/<task_id>/<task_version>/<role>/<party_id>/start', methods=['POST'])
  125. @task_request_proxy(filter_local=True)
  126. def start_task(job_id, component_name, task_id, task_version, role, party_id):
  127. TaskController.start_task(job_id, component_name, task_id, task_version, role, party_id, **request.json)
  128. return get_json_result(retcode=0, retmsg='success')
  129. @manager.route('/<job_id>/<component_name>/<task_id>/<task_version>/<role>/<party_id>/report', methods=['POST'])
  130. def report_task(job_id, component_name, task_id, task_version, role, party_id):
  131. task_info = {}
  132. task_info.update(request.json)
  133. task_info.update({
  134. "job_id": job_id,
  135. "task_id": task_id,
  136. "task_version": task_version,
  137. "role": role,
  138. "party_id": party_id,
  139. })
  140. TaskController.update_task(task_info=task_info)
  141. if task_info.get("party_status"):
  142. if not TaskController.update_task_status(task_info=task_info):
  143. return get_json_result(retcode=RetCode.NOT_EFFECTIVE, retmsg="update job status does not take effect")
  144. return get_json_result(retcode=0, retmsg='success')
  145. @manager.route('/<job_id>/<component_name>/<task_id>/<task_version>/<role>/<party_id>/update', methods=['POST'])
  146. def update_task(job_id, component_name, task_id, task_version, role, party_id):
  147. task_info = {}
  148. task_info.update(request.json)
  149. task_info.update({
  150. "job_id": job_id,
  151. "task_id": task_id,
  152. "task_version": task_version,
  153. "role": role,
  154. "party_id": party_id,
  155. })
  156. TaskController.update_task(task_info=task_info)
  157. return get_json_result(retcode=0, retmsg='success')
  158. @manager.route('/<job_id>/<component_name>/<task_id>/<task_version>/<role>/<party_id>/collect', methods=['POST'])
  159. def collect_task(job_id, component_name, task_id, task_version, role, party_id):
  160. task_info = TaskController.collect_task(job_id=job_id, component_name=component_name, task_id=task_id, task_version=task_version, role=role, party_id=party_id)
  161. if task_info:
  162. return get_json_result(retcode=RetCode.SUCCESS, retmsg="success", data=task_info)
  163. else:
  164. return get_json_result(retcode=RetCode.OPERATING_ERROR, retmsg="query task failed")
  165. @manager.route('/<job_id>/<component_name>/<task_id>/<task_version>/<role>/<party_id>/status/<status>', methods=['POST'])
  166. def task_status(job_id, component_name, task_id, task_version, role, party_id, status):
  167. task_info = {}
  168. task_info.update({
  169. "job_id": job_id,
  170. "task_id": task_id,
  171. "task_version": task_version,
  172. "role": role,
  173. "party_id": party_id,
  174. "status": status
  175. })
  176. if TaskController.update_task_status(task_info=task_info):
  177. return get_json_result(retcode=0, retmsg='success')
  178. else:
  179. return get_json_result(retcode=RetCode.NOT_EFFECTIVE, retmsg="update job status does not take effect")
  180. @manager.route('/<job_id>/<component_name>/<task_id>/<task_version>/<role>/<party_id>/stop/<stop_status>', methods=['POST'])
  181. @task_request_proxy()
  182. def stop_task(job_id, component_name, task_id, task_version, role, party_id, stop_status):
  183. tasks = JobSaver.query_task(job_id=job_id, task_id=task_id, task_version=task_version, role=role, party_id=int(party_id))
  184. kill_status = True
  185. for task in tasks:
  186. kill_status = kill_status & TaskController.stop_task(task=task, stop_status=stop_status, is_asynchronous=request.json.get("is_asynchronous"))
  187. return get_json_result(retcode=RetCode.SUCCESS if kill_status else RetCode.EXCEPTION_ERROR,
  188. retmsg='success' if kill_status else 'failed')
  189. @manager.route('/<job_id>/<component_name>/<task_id>/<task_version>/<role>/<party_id>/clean/<content_type>', methods=['POST'])
  190. def clean_task(job_id, component_name, task_id, task_version, role, party_id, content_type):
  191. TaskController.clean_task(job_id=job_id, task_id=task_id, task_version=task_version, role=role, party_id=int(party_id), content_type=TaskCleanResourceType(content_type))
  192. return get_json_result(retcode=0, retmsg='success')