123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import json
- import os
- from flask import jsonify, request, send_file
- from fate_flow.component_env_utils import feature_utils
- from fate_flow.component_env_utils.env_utils import import_component_output_depend
- from fate_flow.db.db_models import DB, Job
- from fate_flow.manager.data_manager import TableStorage, delete_metric_data, get_component_output_data_schema
- from fate_flow.model.sync_model import SyncComponent
- from fate_flow.operation.job_saver import JobSaver
- from fate_flow.operation.job_tracker import Tracker
- from fate_flow.scheduler.federated_scheduler import FederatedScheduler
- from fate_flow.settings import TEMP_DIRECTORY, stat_logger, ENABLE_MODEL_STORE
- from fate_flow.utils import job_utils, schedule_utils
- from fate_flow.utils.api_utils import error_response, get_json_result, validate_request
- @manager.route('/job/data_view', methods=['post'])
- def job_view():
- request_data = request.json
- check_request_parameters(request_data)
- job_tracker = Tracker(job_id=request_data['job_id'], role=request_data['role'], party_id=request_data['party_id'])
- job_view_data = job_tracker.get_job_view()
- if job_view_data:
- job_metric_list = job_tracker.get_metric_list(job_level=True)
- job_view_data['model_summary'] = {}
- for metric_namespace, namespace_metrics in job_metric_list.items():
- job_view_data['model_summary'][metric_namespace] = job_view_data['model_summary'].get(metric_namespace, {})
- for metric_name in namespace_metrics:
- job_view_data['model_summary'][metric_namespace][metric_name] = job_view_data['model_summary'][
- metric_namespace].get(metric_name, {})
- for metric_data in job_tracker.get_job_metric_data(metric_namespace=metric_namespace,
- metric_name=metric_name):
- job_view_data['model_summary'][metric_namespace][metric_name][metric_data.key] = metric_data.value
- return get_json_result(retcode=0, retmsg='success', data=job_view_data)
- else:
- return get_json_result(retcode=101, retmsg='error')
- @manager.route('/component/metric/all', methods=['post'])
- def component_metric_all():
- request_data = request.json
- check_request_parameters(request_data)
- tracker = Tracker(job_id=request_data['job_id'], component_name=request_data['component_name'],
- role=request_data['role'], party_id=request_data['party_id'])
- metrics = tracker.get_metric_list()
- all_metric_data = {}
- if metrics:
- for metric_namespace, metric_names in metrics.items():
- all_metric_data[metric_namespace] = all_metric_data.get(metric_namespace, {})
- for metric_name in metric_names:
- all_metric_data[metric_namespace][metric_name] = all_metric_data[metric_namespace].get(metric_name, {})
- metric_data, metric_meta = get_metric_all_data(tracker=tracker, metric_namespace=metric_namespace,
- metric_name=metric_name)
- all_metric_data[metric_namespace][metric_name]['data'] = metric_data
- all_metric_data[metric_namespace][metric_name]['meta'] = metric_meta
- return get_json_result(retcode=0, retmsg='success', data=all_metric_data)
- else:
- return get_json_result(retcode=0, retmsg='no data', data={})
- @manager.route('/component/metrics', methods=['post'])
- def component_metrics():
- request_data = request.json
- check_request_parameters(request_data)
- tracker = Tracker(job_id=request_data['job_id'], component_name=request_data['component_name'],
- role=request_data['role'], party_id=request_data['party_id'])
- metrics = tracker.get_metric_list()
- if metrics:
- return get_json_result(retcode=0, retmsg='success', data=metrics)
- else:
- return get_json_result(retcode=0, retmsg='no data', data={})
- @manager.route('/component/metric_data', methods=['post'])
- def component_metric_data():
- request_data = request.json
- check_request_parameters(request_data)
- tracker = Tracker(job_id=request_data['job_id'], component_name=request_data['component_name'],
- role=request_data['role'], party_id=request_data['party_id'])
- metric_data, metric_meta = get_metric_all_data(tracker=tracker, metric_namespace=request_data['metric_namespace'],
- metric_name=request_data['metric_name'])
- if metric_data or metric_meta:
- return get_json_result(retcode=0, retmsg='success', data=metric_data,
- meta=metric_meta)
- else:
- return get_json_result(retcode=0, retmsg='no data', data=[], meta={})
- def get_metric_all_data(tracker, metric_namespace, metric_name):
- metric_data = tracker.get_metric_data(metric_namespace=metric_namespace,
- metric_name=metric_name)
- metric_meta = tracker.get_metric_meta(metric_namespace=metric_namespace,
- metric_name=metric_name)
- if metric_data or metric_meta:
- metric_data_list = [(metric.key, metric.value) for metric in metric_data]
- metric_data_list.sort(key=lambda x: x[0])
- return metric_data_list, metric_meta.to_dict() if metric_meta else {}
- else:
- return [], {}
- @manager.route('/component/metric/delete', methods=['post'])
- def component_metric_delete():
- sql = delete_metric_data(request.json)
- return get_json_result(retcode=0, retmsg='success', data=sql)
- @manager.route('/component/parameters', methods=['post'])
- def component_parameters():
- request_data = request.json
- check_request_parameters(request_data)
- tasks = JobSaver.query_task(only_latest=True, **request_data)
- if not tasks:
- return get_json_result(retcode=101, retmsg='can not found this task')
- parameters = tasks[0].f_component_parameters
- output_parameters = {}
- output_parameters['module'] = parameters.get('module', '')
- for p_k, p_v in parameters.items():
- if p_k.endswith('Param'):
- output_parameters[p_k] = p_v
- return get_json_result(retcode=0, retmsg='success', data=output_parameters)
- @manager.route('/component/output/model', methods=['post'])
- def component_output_model():
- request_data = request.json
- check_request_parameters(request_data)
- job_configuration = job_utils.get_job_configuration(request_data['job_id'], request_data['role'], request_data['party_id'])
- model_id = job_configuration.runtime_conf_on_party['job_parameters']['model_id']
- model_version = request_data['job_id']
- tracker = Tracker(
- job_id=request_data['job_id'],
- role=request_data['role'], party_id=request_data['party_id'],
- model_id=model_id, model_version=model_version,
- component_name=request_data['component_name'],
- )
- define_meta = tracker.pipelined_model.pipelined_component.get_define_meta()
- if not define_meta or request_data['component_name'] not in define_meta['component_define']:
- return get_json_result(retcode=0, retmsg='no define_meta', data={})
- component_define = define_meta['component_define'][request_data['component_name']]
- # There is only one model output at the current dsl version.
- model_alias = next(iter(define_meta['model_proto'][request_data['component_name']].keys()))
- if ENABLE_MODEL_STORE:
- sync_component = SyncComponent(
- role=request_data['role'],
- party_id=request_data['party_id'],
- model_id=model_id,
- model_version=model_version,
- component_name=request_data['component_name'],
- )
- if not sync_component.local_exists() and sync_component.remote_exists():
- sync_component.download()
- output_model = tracker.pipelined_model.read_component_model(
- component_name=request_data['component_name'],
- model_alias=model_alias,
- output_json=True,
- )
- output_model_json = {}
- component_model_meta = {}
- for buffer_name, buffer_object_json_format in output_model.items():
- if buffer_name.endswith('Param'):
- output_model_json = buffer_object_json_format
- elif buffer_name.endswith('Meta'):
- component_model_meta = {
- 'meta_data': buffer_object_json_format,
- }
- if not output_model_json:
- return get_json_result(retcode=0, retmsg='no data', data={})
- component_model_meta.update(component_define)
- return get_json_result(retcode=0, retmsg='success', data=output_model_json, meta=component_model_meta)
- @manager.route('/component/output/data', methods=['post'])
- def component_output_data():
- request_data = request.json
- tasks = JobSaver.query_task(only_latest=True, job_id=request_data['job_id'],
- component_name=request_data['component_name'],
- role=request_data['role'], party_id=request_data['party_id'])
- if not tasks:
- raise ValueError(f'no found task, please check if the parameters are correct:{request_data}')
- import_component_output_depend(tasks[0].f_provider_info)
- output_tables_meta = get_component_output_tables_meta(task_data=request_data)
- if not output_tables_meta:
- return get_json_result(retcode=0, retmsg='no data', data=[])
- output_data_list = []
- headers = []
- totals = []
- data_names = []
- for output_name, output_table_meta in output_tables_meta.items():
- output_data = []
- is_str = False
- all_extend_header = {}
- if output_table_meta:
- for k, v in output_table_meta.get_part_of_data():
- data_line, is_str, all_extend_header = feature_utils.get_component_output_data_line(src_key=k, src_value=v, schema=output_table_meta.get_schema(), all_extend_header=all_extend_header)
- output_data.append(data_line)
- total = output_table_meta.get_count()
- output_data_list.append(output_data)
- data_names.append(output_name)
- totals.append(total)
- if output_data:
- extend_header = feature_utils.generate_header(all_extend_header, schema=output_table_meta.get_schema())
- if output_table_meta.schema.get("is_display", True):
- header = get_component_output_data_schema(output_table_meta=output_table_meta, is_str=is_str,
- extend_header=extend_header)
- else:
- header = []
- headers.append(header)
- else:
- headers.append(None)
- if len(output_data_list) == 1 and not output_data_list[0]:
- return get_json_result(retcode=0, retmsg='no data', data=[])
- return get_json_result(retcode=0, retmsg='success', data=output_data_list,
- meta={'header': headers, 'total': totals, 'names': data_names})
- @manager.route('/component/output/data/download', methods=['get'])
- def component_output_data_download():
- request_data = request.json
- tasks = JobSaver.query_task(only_latest=True, job_id=request_data['job_id'],
- component_name=request_data['component_name'],
- role=request_data['role'], party_id=request_data['party_id'])
- if not tasks:
- raise ValueError(f'no found task, please check if the parameters are correct:{request_data}')
- import_component_output_depend(tasks[0].f_provider_info)
- try:
- output_tables_meta = get_component_output_tables_meta(task_data=request_data)
- except Exception as e:
- stat_logger.exception(e)
- return error_response(210, str(e))
- limit = request_data.get('limit', -1)
- if not output_tables_meta:
- return error_response(response_code=210, retmsg='no data')
- if limit == 0:
- return error_response(response_code=210, retmsg='limit is 0')
- tar_file_name = 'job_{}_{}_{}_{}_output_data.tar.gz'.format(request_data['job_id'],
- request_data['component_name'],
- request_data['role'], request_data['party_id'])
- return TableStorage.send_table(output_tables_meta, tar_file_name, limit=limit, need_head=request_data.get("head", True))
- @manager.route('/component/output/data/table', methods=['post'])
- @validate_request('job_id', 'role', 'party_id', 'component_name')
- def component_output_data_table():
- request_data = request.json
- jobs = JobSaver.query_job(job_id=request_data.get('job_id'))
- if jobs:
- job = jobs[0]
- return jsonify(FederatedScheduler.tracker_command(job, request_data, 'output/table'))
- else:
- return get_json_result(retcode=100, retmsg='No found job')
- @manager.route('/component/summary/download', methods=['POST'])
- @validate_request("job_id", "component_name", "role", "party_id")
- def get_component_summary():
- request_data = request.json
- try:
- tracker = Tracker(job_id=request_data["job_id"], component_name=request_data["component_name"],
- role=request_data["role"], party_id=request_data["party_id"],
- task_id=request_data.get("task_id", None), task_version=request_data.get("task_version", None))
- summary = tracker.read_summary_from_db()
- if summary:
- if request_data.get("filename"):
- temp_filepath = os.path.join(TEMP_DIRECTORY, request_data.get("filename"))
- with open(temp_filepath, "w") as fout:
- fout.write(json.dumps(summary, indent=4))
- return send_file(open(temp_filepath, "rb"), as_attachment=True,
- attachment_filename=request_data.get("filename"))
- else:
- return get_json_result(data=summary)
- return error_response(210, "No component summary found, please check if arguments are specified correctly.")
- except Exception as e:
- stat_logger.exception(e)
- return error_response(210, str(e))
- @manager.route('/component/list', methods=['POST'])
- def component_list():
- request_data = request.json
- parser, _, _ = schedule_utils.get_job_dsl_parser_by_job_id(job_id=request_data.get('job_id'))
- if parser:
- return get_json_result(data={'components': list(parser.get_dsl().get('components').keys())})
- else:
- return get_json_result(retcode=100, retmsg='No job matched, please make sure the job id is valid.')
- def get_component_output_tables_meta(task_data):
- check_request_parameters(task_data)
- tracker = Tracker(job_id=task_data['job_id'], component_name=task_data['component_name'],
- role=task_data['role'], party_id=task_data['party_id'])
- output_data_table_infos = tracker.get_output_data_info()
- output_tables_meta = tracker.get_output_data_table(output_data_infos=output_data_table_infos)
- return output_tables_meta
- @DB.connection_context()
- def check_request_parameters(request_data):
- if 'role' not in request_data or 'party_id' not in request_data:
- jobs = Job.select(Job.f_runtime_conf_on_party).where(Job.f_job_id == request_data.get('job_id', ''),
- Job.f_is_initiator == True)
- if jobs:
- job = jobs[0]
- job_runtime_conf = job.f_runtime_conf_on_party
- job_initiator = job_runtime_conf.get('initiator', {})
- role = job_initiator.get('role', '')
- party_id = job_initiator.get('party_id', 0)
- request_data['role'] = role if 'role' not in request_data else request_data['role']
- request_data['party_id'] = party_id if 'party_id' not in request_data else request_data['party_id']
|