job_app.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import io
  17. import json
  18. import os
  19. import tarfile
  20. from flask import abort, request, send_file
  21. from fate_arch.common.base_utils import json_dumps, json_loads
  22. from fate_flow.controller.job_controller import JobController
  23. from fate_flow.entity import JobConfigurationBase, RetCode
  24. from fate_flow.entity.run_status import FederatedSchedulingStatusCode, JobStatus
  25. from fate_flow.operation.job_clean import JobClean
  26. from fate_flow.operation.job_saver import JobSaver
  27. from fate_flow.operation.job_tracker import Tracker
  28. from fate_flow.scheduler.dag_scheduler import DAGScheduler
  29. from fate_flow.scheduler.federated_scheduler import FederatedScheduler
  30. from fate_flow.settings import TEMP_DIRECTORY, stat_logger
  31. from fate_flow.utils import job_utils, log_utils, schedule_utils, api_utils
  32. from fate_flow.utils.api_utils import error_response, get_json_result
  33. from fate_flow.utils.config_adapter import JobRuntimeConfigAdapter
  34. from fate_flow.utils.log_utils import schedule_logger
  35. @manager.route('/submit', methods=['POST'])
  36. def submit_job():
  37. submit_result = DAGScheduler.submit(JobConfigurationBase(**request.json))
  38. return get_json_result(retcode=submit_result["code"], retmsg=submit_result["message"],
  39. job_id=submit_result["job_id"],
  40. data=submit_result if submit_result["code"] == RetCode.SUCCESS else None)
  41. @manager.route('/stop', methods=['POST'])
  42. def stop_job():
  43. job_id = request.json.get('job_id')
  44. stop_status = request.json.get("stop_status", "canceled")
  45. jobs = JobSaver.query_job(job_id=job_id)
  46. if jobs:
  47. schedule_logger(job_id).info(f"stop job on this party")
  48. kill_status, kill_details = JobController.stop_jobs(job_id=job_id, stop_status=stop_status)
  49. schedule_logger(job_id).info(f"stop job on this party status {kill_status}")
  50. schedule_logger(job_id).info(f"request stop job to {stop_status}")
  51. status_code, response = FederatedScheduler.request_stop_job(job=jobs[0], stop_status=stop_status, command_body=jobs[0].to_dict())
  52. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  53. return get_json_result(retcode=RetCode.SUCCESS, retmsg=f"stop job on this party {'success' if kill_status else 'failed'}; stop job on all party success")
  54. else:
  55. return get_json_result(retcode=RetCode.OPERATING_ERROR, retmsg=f"stop job on this party {kill_status}", data=response)
  56. else:
  57. schedule_logger(job_id).info(f"can not found job to stop")
  58. return get_json_result(retcode=RetCode.DATA_ERROR, retmsg="can not found job")
  59. @manager.route('/rerun', methods=['POST'])
  60. def rerun_job():
  61. job_id = request.json.get("job_id")
  62. jobs = JobSaver.query_job(job_id=job_id)
  63. if jobs:
  64. status_code, response = FederatedScheduler.request_rerun_job(job=jobs[0], command_body=request.json)
  65. if status_code == FederatedSchedulingStatusCode.SUCCESS:
  66. return get_json_result(retcode=RetCode.SUCCESS, retmsg="rerun job success")
  67. else:
  68. return get_json_result(retcode=RetCode.OPERATING_ERROR, retmsg="rerun job failed:\n{}".format(json_dumps(response)))
  69. else:
  70. return get_json_result(retcode=RetCode.DATA_ERROR, retmsg="can not found job")
  71. @manager.route('/query', methods=['POST'])
  72. def query_job():
  73. jobs = JobSaver.query_job(**request.json)
  74. if not jobs:
  75. return get_json_result(retcode=0, retmsg='no job could be found', data=[])
  76. return get_json_result(retcode=0, retmsg='success', data=[job.to_dict() for job in jobs])
  77. @manager.route('/list/job', methods=['POST'])
  78. def list_job():
  79. limit, offset = parse_limit_and_offset()
  80. query = {
  81. 'tag': ('!=', 'submit_failed'),
  82. }
  83. for i in ('job_id', 'description'):
  84. if request.json.get(i) is not None:
  85. query[i] = ('contains', request.json[i])
  86. if request.json.get('party_id') is not None:
  87. try:
  88. query['party_id'] = int(request.json['party_id'])
  89. except Exception:
  90. return error_response(400, f"Invalid parameter 'party_id'.")
  91. query['party_id'] = ('contains', query['party_id'])
  92. if request.json.get('partner') is not None:
  93. query['roles'] = ('contains', query['partner'])
  94. for i in ('role', 'status'):
  95. if request.json.get(i) is None:
  96. continue
  97. if isinstance(request.json[i], str):
  98. request.json[i] = [request.json[i]]
  99. if not isinstance(request.json[i], list):
  100. return error_response(400, f"Invalid parameter '{i}'.")
  101. request.json[i] = set(request.json[i])
  102. for j in request.json[i]:
  103. if j not in valid_query_parameters[i]:
  104. return error_response(400, f"Invalid parameter '{i}'.")
  105. query[i] = ('in_', request.json[i])
  106. jobs, count = job_utils.list_job(limit, offset, query, parse_order_by(('create_time', 'desc')))
  107. jobs = [job.to_human_model_dict() for job in jobs]
  108. for job in jobs:
  109. job['party_id'] = int(job['party_id'])
  110. job['partners'] = set()
  111. for i in ('guest', 'host', 'arbiter'):
  112. job['partners'].update(job['roles'].get(i, []))
  113. job['partners'].discard(job['party_id'])
  114. job['partners'] = sorted(job['partners'])
  115. return get_json_result(data={
  116. 'jobs': jobs,
  117. 'count': count,
  118. })
  119. @manager.route('/update', methods=['POST'])
  120. def update_job():
  121. job_info = request.json
  122. jobs = JobSaver.query_job(job_id=job_info['job_id'], party_id=job_info['party_id'], role=job_info['role'])
  123. if not jobs:
  124. return get_json_result(retcode=101, retmsg='find job failed')
  125. else:
  126. JobSaver.update_job(job_info={'description': job_info.get('notes', ''), 'job_id': job_info['job_id'], 'role': job_info['role'],
  127. 'party_id': job_info['party_id']})
  128. return get_json_result(retcode=0, retmsg='success')
  129. @manager.route('/report', methods=['POST'])
  130. def job_report():
  131. tasks = JobSaver.query_task(**request.json)
  132. if not tasks:
  133. return get_json_result(retcode=101, retmsg='find task failed')
  134. return get_json_result(retcode=0, retmsg='success', data=job_utils.task_report(tasks))
  135. @manager.route('/parameter/update', methods=['POST'])
  136. @api_utils.validate_request("job_id")
  137. def update_parameters():
  138. job_info = request.json
  139. component_parameters = job_info.pop("component_parameters", None)
  140. job_parameters = job_info.pop("job_parameters", None)
  141. job_info["is_initiator"] = True
  142. jobs = JobSaver.query_job(**job_info)
  143. if not jobs:
  144. return get_json_result(retcode=RetCode.DATA_ERROR, retmsg=log_utils.failed_log(f"query job by {job_info}"))
  145. else:
  146. retcode, retdata = DAGScheduler.update_parameters(jobs[0], job_parameters, component_parameters)
  147. return get_json_result(retcode=retcode, data=retdata)
  148. @manager.route('/config', methods=['POST'])
  149. def job_config():
  150. jobs = JobSaver.query_job(**request.json)
  151. if not jobs:
  152. return get_json_result(retcode=101, retmsg='find job failed')
  153. else:
  154. job = jobs[0]
  155. response_data = dict()
  156. response_data['job_id'] = job.f_job_id
  157. response_data['dsl'] = job.f_dsl
  158. response_data['runtime_conf'] = job.f_runtime_conf
  159. response_data['train_runtime_conf'] = job.f_train_runtime_conf
  160. adapter = JobRuntimeConfigAdapter(job.f_runtime_conf)
  161. job_parameters = adapter.get_common_parameters().to_dict()
  162. response_data['model_info'] = {'model_id': job_parameters.get('model_id'),
  163. 'model_version': job_parameters.get('model_version')}
  164. return get_json_result(retcode=0, retmsg='success', data=response_data)
  165. def check_job_log_dir():
  166. job_id = str(request.json['job_id'])
  167. job_log_dir = job_utils.get_job_log_directory(job_id=job_id)
  168. if not os.path.exists(job_log_dir):
  169. abort(error_response(404, f"Log file path: '{job_log_dir}' not found. Please check if the job id is valid."))
  170. return job_id, job_log_dir
  171. @manager.route('/log/download', methods=['POST'])
  172. @api_utils.validate_request('job_id')
  173. def job_log_download():
  174. job_id, job_log_dir = check_job_log_dir()
  175. memory_file = io.BytesIO()
  176. tar = tarfile.open(fileobj=memory_file, mode='w:gz')
  177. for root, dir, files in os.walk(job_log_dir):
  178. for file in files:
  179. full_path = os.path.join(root, file)
  180. rel_path = os.path.relpath(full_path, job_log_dir)
  181. tar.add(full_path, rel_path)
  182. tar.close()
  183. memory_file.seek(0)
  184. return send_file(memory_file, attachment_filename=f'job_{job_id}_log.tar.gz', as_attachment=True)
  185. @manager.route('/log/path', methods=['POST'])
  186. @api_utils.validate_request('job_id')
  187. def job_log_path():
  188. job_id, job_log_dir = check_job_log_dir()
  189. return get_json_result(data={"logs_directory": job_log_dir})
  190. @manager.route('/task/query', methods=['POST'])
  191. def query_task():
  192. tasks = JobSaver.query_task(**request.json)
  193. if not tasks:
  194. return get_json_result(retcode=101, retmsg='find task failed')
  195. return get_json_result(retcode=0, retmsg='success', data=[task.to_dict() for task in tasks])
  196. @manager.route('/list/task', methods=['POST'])
  197. def list_task():
  198. limit, offset = parse_limit_and_offset()
  199. query = {}
  200. for i in ('job_id', 'role', 'party_id', 'component_name'):
  201. if request.json.get(i) is not None:
  202. query[i] = request.json[i]
  203. if query.get('role') is not None:
  204. if query['role'] not in valid_query_parameters['role']:
  205. return error_response(400, f"Invalid parameter 'role'.")
  206. if query.get('party_id') is not None:
  207. try:
  208. query['party_id'] = int(query['party_id'])
  209. except Exception:
  210. return error_response(400, f"Invalid parameter 'party_id'.")
  211. tasks, count = job_utils.list_task(limit, offset, query, parse_order_by(('create_time', 'asc')))
  212. return get_json_result(data={
  213. 'tasks': [task.to_human_model_dict() for task in tasks],
  214. 'count': count,
  215. })
  216. @manager.route('/data/view/query', methods=['POST'])
  217. def query_component_output_data_info():
  218. output_data_infos = Tracker.query_output_data_infos(**request.json)
  219. if not output_data_infos:
  220. return get_json_result(retcode=101, retmsg='find data view failed')
  221. return get_json_result(retcode=0, retmsg='success', data=[output_data_info.to_dict() for output_data_info in output_data_infos])
  222. @manager.route('/clean', methods=['POST'])
  223. def clean_job():
  224. JobClean.start_clean_job(**request.json)
  225. return get_json_result(retcode=0, retmsg='success')
  226. @manager.route('/clean/queue', methods=['POST'])
  227. def clean_queue():
  228. jobs = JobSaver.query_job(is_initiator=True, status=JobStatus.WAITING)
  229. clean_status = {}
  230. for job in jobs:
  231. status_code, response = FederatedScheduler.request_stop_job(job=job, stop_status=JobStatus.CANCELED)
  232. clean_status[job.f_job_id] = status_code
  233. return get_json_result(retcode=0, retmsg='success', data=clean_status)
  234. @manager.route('/dsl/generate', methods=['POST'])
  235. def dsl_generator():
  236. data = request.json
  237. cpn_str = data.get("cpn_str", "")
  238. try:
  239. if not cpn_str:
  240. raise Exception("Component list should not be empty.")
  241. if isinstance(cpn_str, list):
  242. cpn_list = cpn_str
  243. else:
  244. if (cpn_str.find("/") and cpn_str.find("\\")) != -1:
  245. raise Exception("Component list string should not contain '/' or '\\'.")
  246. cpn_str = cpn_str.replace(" ", "").replace("\n", "").strip(",[]")
  247. cpn_list = cpn_str.split(",")
  248. train_dsl = json_loads(data.get("train_dsl"))
  249. parser = schedule_utils.get_dsl_parser_by_version(data.get("version", "2"))
  250. predict_dsl = parser.deploy_component(cpn_list, train_dsl)
  251. if data.get("filename"):
  252. os.makedirs(TEMP_DIRECTORY, exist_ok=True)
  253. temp_filepath = os.path.join(TEMP_DIRECTORY, data.get("filename"))
  254. with open(temp_filepath, "w") as fout:
  255. fout.write(json.dumps(predict_dsl, indent=4))
  256. return send_file(open(temp_filepath, 'rb'), as_attachment=True, attachment_filename=data.get("filename"))
  257. return get_json_result(data=predict_dsl)
  258. except Exception as e:
  259. stat_logger.exception(e)
  260. return error_response(210, "DSL generating failed. For more details, "
  261. "please check logs/fate_flow/fate_flow_stat.log.")
  262. @manager.route('/url/get', methods=['POST'])
  263. @api_utils.validate_request('job_id', 'role', 'party_id')
  264. def get_url():
  265. request_data = request.json
  266. jobs = JobSaver.query_job(job_id=request_data.get('job_id'), role=request_data.get('role'),
  267. party_id=request_data.get('party_id'))
  268. if jobs:
  269. board_urls = []
  270. for job in jobs:
  271. board_url = job_utils.get_board_url(job.f_job_id, job.f_role, job.f_party_id)
  272. board_urls.append(board_url)
  273. return get_json_result(data={'board_url': board_urls})
  274. else:
  275. return get_json_result(retcode=101, retmsg='no found job')
  276. def parse_limit_and_offset():
  277. try:
  278. limit = int(request.json.get('limit', 0))
  279. page = int(request.json.get('page', 1)) - 1
  280. except Exception:
  281. abort(error_response(400, f"Invalid parameter 'limit' or 'page'."))
  282. return limit, limit * page
  283. def parse_order_by(default=None):
  284. order_by = []
  285. if request.json.get('order_by') is not None:
  286. if request.json['order_by'] not in valid_query_parameters['order_by']:
  287. abort(error_response(400, f"Invalid parameter 'order_by'."))
  288. order_by.append(request.json['order_by'])
  289. if request.json.get('order') is not None:
  290. if request.json['order'] not in valid_query_parameters['order']:
  291. abort(error_response(400, f"Invalid parameter order 'order'."))
  292. order_by.append(request.json['order'])
  293. return order_by or default
  294. valid_query_parameters = {
  295. 'role': {'guest', 'host', 'arbiter', 'local'},
  296. 'status': {'success', 'running', 'waiting', 'failed', 'canceled'},
  297. 'order_by': {'job_id', 'task_version', 'create_time', 'start_time', 'end_time', 'elapsed'},
  298. 'order': {'asc', 'desc'},
  299. }