tracking_app.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import json
  17. import os
  18. from flask import jsonify, request, send_file
  19. from fate_flow.component_env_utils import feature_utils
  20. from fate_flow.component_env_utils.env_utils import import_component_output_depend
  21. from fate_flow.db.db_models import DB, Job
  22. from fate_flow.manager.data_manager import TableStorage, delete_metric_data, get_component_output_data_schema
  23. from fate_flow.model.sync_model import SyncComponent
  24. from fate_flow.operation.job_saver import JobSaver
  25. from fate_flow.operation.job_tracker import Tracker
  26. from fate_flow.scheduler.federated_scheduler import FederatedScheduler
  27. from fate_flow.settings import TEMP_DIRECTORY, stat_logger, ENABLE_MODEL_STORE
  28. from fate_flow.utils import job_utils, schedule_utils
  29. from fate_flow.utils.api_utils import error_response, get_json_result, validate_request
  30. @manager.route('/job/data_view', methods=['post'])
  31. def job_view():
  32. request_data = request.json
  33. check_request_parameters(request_data)
  34. job_tracker = Tracker(job_id=request_data['job_id'], role=request_data['role'], party_id=request_data['party_id'])
  35. job_view_data = job_tracker.get_job_view()
  36. if job_view_data:
  37. job_metric_list = job_tracker.get_metric_list(job_level=True)
  38. job_view_data['model_summary'] = {}
  39. for metric_namespace, namespace_metrics in job_metric_list.items():
  40. job_view_data['model_summary'][metric_namespace] = job_view_data['model_summary'].get(metric_namespace, {})
  41. for metric_name in namespace_metrics:
  42. job_view_data['model_summary'][metric_namespace][metric_name] = job_view_data['model_summary'][
  43. metric_namespace].get(metric_name, {})
  44. for metric_data in job_tracker.get_job_metric_data(metric_namespace=metric_namespace,
  45. metric_name=metric_name):
  46. job_view_data['model_summary'][metric_namespace][metric_name][metric_data.key] = metric_data.value
  47. return get_json_result(retcode=0, retmsg='success', data=job_view_data)
  48. else:
  49. return get_json_result(retcode=101, retmsg='error')
  50. @manager.route('/component/metric/all', methods=['post'])
  51. def component_metric_all():
  52. request_data = request.json
  53. check_request_parameters(request_data)
  54. tracker = Tracker(job_id=request_data['job_id'], component_name=request_data['component_name'],
  55. role=request_data['role'], party_id=request_data['party_id'])
  56. metrics = tracker.get_metric_list()
  57. all_metric_data = {}
  58. if metrics:
  59. for metric_namespace, metric_names in metrics.items():
  60. all_metric_data[metric_namespace] = all_metric_data.get(metric_namespace, {})
  61. for metric_name in metric_names:
  62. all_metric_data[metric_namespace][metric_name] = all_metric_data[metric_namespace].get(metric_name, {})
  63. metric_data, metric_meta = get_metric_all_data(tracker=tracker, metric_namespace=metric_namespace,
  64. metric_name=metric_name)
  65. all_metric_data[metric_namespace][metric_name]['data'] = metric_data
  66. all_metric_data[metric_namespace][metric_name]['meta'] = metric_meta
  67. return get_json_result(retcode=0, retmsg='success', data=all_metric_data)
  68. else:
  69. return get_json_result(retcode=0, retmsg='no data', data={})
  70. @manager.route('/component/metrics', methods=['post'])
  71. def component_metrics():
  72. request_data = request.json
  73. check_request_parameters(request_data)
  74. tracker = Tracker(job_id=request_data['job_id'], component_name=request_data['component_name'],
  75. role=request_data['role'], party_id=request_data['party_id'])
  76. metrics = tracker.get_metric_list()
  77. if metrics:
  78. return get_json_result(retcode=0, retmsg='success', data=metrics)
  79. else:
  80. return get_json_result(retcode=0, retmsg='no data', data={})
  81. @manager.route('/component/metric_data', methods=['post'])
  82. def component_metric_data():
  83. request_data = request.json
  84. check_request_parameters(request_data)
  85. tracker = Tracker(job_id=request_data['job_id'], component_name=request_data['component_name'],
  86. role=request_data['role'], party_id=request_data['party_id'])
  87. metric_data, metric_meta = get_metric_all_data(tracker=tracker, metric_namespace=request_data['metric_namespace'],
  88. metric_name=request_data['metric_name'])
  89. if metric_data or metric_meta:
  90. return get_json_result(retcode=0, retmsg='success', data=metric_data,
  91. meta=metric_meta)
  92. else:
  93. return get_json_result(retcode=0, retmsg='no data', data=[], meta={})
  94. def get_metric_all_data(tracker, metric_namespace, metric_name):
  95. metric_data = tracker.get_metric_data(metric_namespace=metric_namespace,
  96. metric_name=metric_name)
  97. metric_meta = tracker.get_metric_meta(metric_namespace=metric_namespace,
  98. metric_name=metric_name)
  99. if metric_data or metric_meta:
  100. metric_data_list = [(metric.key, metric.value) for metric in metric_data]
  101. metric_data_list.sort(key=lambda x: x[0])
  102. return metric_data_list, metric_meta.to_dict() if metric_meta else {}
  103. else:
  104. return [], {}
  105. @manager.route('/component/metric/delete', methods=['post'])
  106. def component_metric_delete():
  107. sql = delete_metric_data(request.json)
  108. return get_json_result(retcode=0, retmsg='success', data=sql)
  109. @manager.route('/component/parameters', methods=['post'])
  110. def component_parameters():
  111. request_data = request.json
  112. check_request_parameters(request_data)
  113. tasks = JobSaver.query_task(only_latest=True, **request_data)
  114. if not tasks:
  115. return get_json_result(retcode=101, retmsg='can not found this task')
  116. parameters = tasks[0].f_component_parameters
  117. output_parameters = {}
  118. output_parameters['module'] = parameters.get('module', '')
  119. for p_k, p_v in parameters.items():
  120. if p_k.endswith('Param'):
  121. output_parameters[p_k] = p_v
  122. return get_json_result(retcode=0, retmsg='success', data=output_parameters)
  123. @manager.route('/component/output/model', methods=['post'])
  124. def component_output_model():
  125. request_data = request.json
  126. check_request_parameters(request_data)
  127. job_configuration = job_utils.get_job_configuration(request_data['job_id'], request_data['role'], request_data['party_id'])
  128. model_id = job_configuration.runtime_conf_on_party['job_parameters']['model_id']
  129. model_version = request_data['job_id']
  130. tracker = Tracker(
  131. job_id=request_data['job_id'],
  132. role=request_data['role'], party_id=request_data['party_id'],
  133. model_id=model_id, model_version=model_version,
  134. component_name=request_data['component_name'],
  135. )
  136. define_meta = tracker.pipelined_model.pipelined_component.get_define_meta()
  137. if not define_meta or request_data['component_name'] not in define_meta['component_define']:
  138. return get_json_result(retcode=0, retmsg='no define_meta', data={})
  139. component_define = define_meta['component_define'][request_data['component_name']]
  140. # There is only one model output at the current dsl version.
  141. model_alias = next(iter(define_meta['model_proto'][request_data['component_name']].keys()))
  142. if ENABLE_MODEL_STORE:
  143. sync_component = SyncComponent(
  144. role=request_data['role'],
  145. party_id=request_data['party_id'],
  146. model_id=model_id,
  147. model_version=model_version,
  148. component_name=request_data['component_name'],
  149. )
  150. if not sync_component.local_exists() and sync_component.remote_exists():
  151. sync_component.download()
  152. output_model = tracker.pipelined_model.read_component_model(
  153. component_name=request_data['component_name'],
  154. model_alias=model_alias,
  155. output_json=True,
  156. )
  157. output_model_json = {}
  158. component_model_meta = {}
  159. for buffer_name, buffer_object_json_format in output_model.items():
  160. if buffer_name.endswith('Param'):
  161. output_model_json = buffer_object_json_format
  162. elif buffer_name.endswith('Meta'):
  163. component_model_meta = {
  164. 'meta_data': buffer_object_json_format,
  165. }
  166. if not output_model_json:
  167. return get_json_result(retcode=0, retmsg='no data', data={})
  168. component_model_meta.update(component_define)
  169. return get_json_result(retcode=0, retmsg='success', data=output_model_json, meta=component_model_meta)
  170. @manager.route('/component/output/data', methods=['post'])
  171. def component_output_data():
  172. request_data = request.json
  173. tasks = JobSaver.query_task(only_latest=True, job_id=request_data['job_id'],
  174. component_name=request_data['component_name'],
  175. role=request_data['role'], party_id=request_data['party_id'])
  176. if not tasks:
  177. raise ValueError(f'no found task, please check if the parameters are correct:{request_data}')
  178. import_component_output_depend(tasks[0].f_provider_info)
  179. output_tables_meta = get_component_output_tables_meta(task_data=request_data)
  180. if not output_tables_meta:
  181. return get_json_result(retcode=0, retmsg='no data', data=[])
  182. output_data_list = []
  183. headers = []
  184. totals = []
  185. data_names = []
  186. for output_name, output_table_meta in output_tables_meta.items():
  187. output_data = []
  188. is_str = False
  189. all_extend_header = {}
  190. if output_table_meta:
  191. for k, v in output_table_meta.get_part_of_data():
  192. data_line, is_str, all_extend_header = feature_utils.get_component_output_data_line(src_key=k, src_value=v, schema=output_table_meta.get_schema(), all_extend_header=all_extend_header)
  193. output_data.append(data_line)
  194. total = output_table_meta.get_count()
  195. output_data_list.append(output_data)
  196. data_names.append(output_name)
  197. totals.append(total)
  198. if output_data:
  199. extend_header = feature_utils.generate_header(all_extend_header, schema=output_table_meta.get_schema())
  200. if output_table_meta.schema.get("is_display", True):
  201. header = get_component_output_data_schema(output_table_meta=output_table_meta, is_str=is_str,
  202. extend_header=extend_header)
  203. else:
  204. header = []
  205. headers.append(header)
  206. else:
  207. headers.append(None)
  208. if len(output_data_list) == 1 and not output_data_list[0]:
  209. return get_json_result(retcode=0, retmsg='no data', data=[])
  210. return get_json_result(retcode=0, retmsg='success', data=output_data_list,
  211. meta={'header': headers, 'total': totals, 'names': data_names})
  212. @manager.route('/component/output/data/download', methods=['get'])
  213. def component_output_data_download():
  214. request_data = request.json
  215. tasks = JobSaver.query_task(only_latest=True, job_id=request_data['job_id'],
  216. component_name=request_data['component_name'],
  217. role=request_data['role'], party_id=request_data['party_id'])
  218. if not tasks:
  219. raise ValueError(f'no found task, please check if the parameters are correct:{request_data}')
  220. import_component_output_depend(tasks[0].f_provider_info)
  221. try:
  222. output_tables_meta = get_component_output_tables_meta(task_data=request_data)
  223. except Exception as e:
  224. stat_logger.exception(e)
  225. return error_response(210, str(e))
  226. limit = request_data.get('limit', -1)
  227. if not output_tables_meta:
  228. return error_response(response_code=210, retmsg='no data')
  229. if limit == 0:
  230. return error_response(response_code=210, retmsg='limit is 0')
  231. tar_file_name = 'job_{}_{}_{}_{}_output_data.tar.gz'.format(request_data['job_id'],
  232. request_data['component_name'],
  233. request_data['role'], request_data['party_id'])
  234. return TableStorage.send_table(output_tables_meta, tar_file_name, limit=limit, need_head=request_data.get("head", True))
  235. @manager.route('/component/output/data/table', methods=['post'])
  236. @validate_request('job_id', 'role', 'party_id', 'component_name')
  237. def component_output_data_table():
  238. request_data = request.json
  239. jobs = JobSaver.query_job(job_id=request_data.get('job_id'))
  240. if jobs:
  241. job = jobs[0]
  242. return jsonify(FederatedScheduler.tracker_command(job, request_data, 'output/table'))
  243. else:
  244. return get_json_result(retcode=100, retmsg='No found job')
  245. @manager.route('/component/summary/download', methods=['POST'])
  246. @validate_request("job_id", "component_name", "role", "party_id")
  247. def get_component_summary():
  248. request_data = request.json
  249. try:
  250. tracker = Tracker(job_id=request_data["job_id"], component_name=request_data["component_name"],
  251. role=request_data["role"], party_id=request_data["party_id"],
  252. task_id=request_data.get("task_id", None), task_version=request_data.get("task_version", None))
  253. summary = tracker.read_summary_from_db()
  254. if summary:
  255. if request_data.get("filename"):
  256. temp_filepath = os.path.join(TEMP_DIRECTORY, request_data.get("filename"))
  257. with open(temp_filepath, "w") as fout:
  258. fout.write(json.dumps(summary, indent=4))
  259. return send_file(open(temp_filepath, "rb"), as_attachment=True,
  260. attachment_filename=request_data.get("filename"))
  261. else:
  262. return get_json_result(data=summary)
  263. return error_response(210, "No component summary found, please check if arguments are specified correctly.")
  264. except Exception as e:
  265. stat_logger.exception(e)
  266. return error_response(210, str(e))
  267. @manager.route('/component/list', methods=['POST'])
  268. def component_list():
  269. request_data = request.json
  270. parser, _, _ = schedule_utils.get_job_dsl_parser_by_job_id(job_id=request_data.get('job_id'))
  271. if parser:
  272. return get_json_result(data={'components': list(parser.get_dsl().get('components').keys())})
  273. else:
  274. return get_json_result(retcode=100, retmsg='No job matched, please make sure the job id is valid.')
  275. def get_component_output_tables_meta(task_data):
  276. check_request_parameters(task_data)
  277. tracker = Tracker(job_id=task_data['job_id'], component_name=task_data['component_name'],
  278. role=task_data['role'], party_id=task_data['party_id'])
  279. output_data_table_infos = tracker.get_output_data_info()
  280. output_tables_meta = tracker.get_output_data_table(output_data_infos=output_data_table_infos)
  281. return output_tables_meta
  282. @DB.connection_context()
  283. def check_request_parameters(request_data):
  284. if 'role' not in request_data or 'party_id' not in request_data:
  285. jobs = Job.select(Job.f_runtime_conf_on_party).where(Job.f_job_id == request_data.get('job_id', ''),
  286. Job.f_is_initiator == True)
  287. if jobs:
  288. job = jobs[0]
  289. job_runtime_conf = job.f_runtime_conf_on_party
  290. job_initiator = job_runtime_conf.get('initiator', {})
  291. role = job_initiator.get('role', '')
  292. party_id = job_initiator.get('party_id', 0)
  293. request_data['role'] = role if 'role' not in request_data else request_data['role']
  294. request_data['party_id'] = party_id if 'party_id' not in request_data else request_data['party_id']