123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import io
- import json
- import os
- import tarfile
- from flask import abort, request, send_file
- from fate_arch.common.base_utils import json_dumps, json_loads
- from fate_flow.controller.job_controller import JobController
- from fate_flow.entity import JobConfigurationBase, RetCode
- from fate_flow.entity.run_status import FederatedSchedulingStatusCode, JobStatus
- from fate_flow.operation.job_clean import JobClean
- from fate_flow.operation.job_saver import JobSaver
- from fate_flow.operation.job_tracker import Tracker
- from fate_flow.scheduler.dag_scheduler import DAGScheduler
- from fate_flow.scheduler.federated_scheduler import FederatedScheduler
- from fate_flow.settings import TEMP_DIRECTORY, stat_logger
- from fate_flow.utils import job_utils, log_utils, schedule_utils, api_utils
- from fate_flow.utils.api_utils import error_response, get_json_result
- from fate_flow.utils.config_adapter import JobRuntimeConfigAdapter
- from fate_flow.utils.log_utils import schedule_logger
- @manager.route('/submit', methods=['POST'])
- def submit_job():
- submit_result = DAGScheduler.submit(JobConfigurationBase(**request.json))
- return get_json_result(retcode=submit_result["code"], retmsg=submit_result["message"],
- job_id=submit_result["job_id"],
- data=submit_result if submit_result["code"] == RetCode.SUCCESS else None)
- @manager.route('/stop', methods=['POST'])
- def stop_job():
- job_id = request.json.get('job_id')
- stop_status = request.json.get("stop_status", "canceled")
- jobs = JobSaver.query_job(job_id=job_id)
- if jobs:
- schedule_logger(job_id).info(f"stop job on this party")
- kill_status, kill_details = JobController.stop_jobs(job_id=job_id, stop_status=stop_status)
- schedule_logger(job_id).info(f"stop job on this party status {kill_status}")
- schedule_logger(job_id).info(f"request stop job to {stop_status}")
- status_code, response = FederatedScheduler.request_stop_job(job=jobs[0], stop_status=stop_status, command_body=jobs[0].to_dict())
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- return get_json_result(retcode=RetCode.SUCCESS, retmsg=f"stop job on this party {'success' if kill_status else 'failed'}; stop job on all party success")
- else:
- return get_json_result(retcode=RetCode.OPERATING_ERROR, retmsg=f"stop job on this party {kill_status}", data=response)
- else:
- schedule_logger(job_id).info(f"can not found job to stop")
- return get_json_result(retcode=RetCode.DATA_ERROR, retmsg="can not found job")
- @manager.route('/rerun', methods=['POST'])
- def rerun_job():
- job_id = request.json.get("job_id")
- jobs = JobSaver.query_job(job_id=job_id)
- if jobs:
- status_code, response = FederatedScheduler.request_rerun_job(job=jobs[0], command_body=request.json)
- if status_code == FederatedSchedulingStatusCode.SUCCESS:
- return get_json_result(retcode=RetCode.SUCCESS, retmsg="rerun job success")
- else:
- return get_json_result(retcode=RetCode.OPERATING_ERROR, retmsg="rerun job failed:\n{}".format(json_dumps(response)))
- else:
- return get_json_result(retcode=RetCode.DATA_ERROR, retmsg="can not found job")
- @manager.route('/query', methods=['POST'])
- def query_job():
- jobs = JobSaver.query_job(**request.json)
- if not jobs:
- return get_json_result(retcode=0, retmsg='no job could be found', data=[])
- return get_json_result(retcode=0, retmsg='success', data=[job.to_dict() for job in jobs])
- @manager.route('/list/job', methods=['POST'])
- def list_job():
- limit, offset = parse_limit_and_offset()
- query = {
- 'tag': ('!=', 'submit_failed'),
- }
- for i in ('job_id', 'description'):
- if request.json.get(i) is not None:
- query[i] = ('contains', request.json[i])
- if request.json.get('party_id') is not None:
- try:
- query['party_id'] = int(request.json['party_id'])
- except Exception:
- return error_response(400, f"Invalid parameter 'party_id'.")
- query['party_id'] = ('contains', query['party_id'])
- if request.json.get('partner') is not None:
- query['roles'] = ('contains', query['partner'])
- for i in ('role', 'status'):
- if request.json.get(i) is None:
- continue
- if isinstance(request.json[i], str):
- request.json[i] = [request.json[i]]
- if not isinstance(request.json[i], list):
- return error_response(400, f"Invalid parameter '{i}'.")
- request.json[i] = set(request.json[i])
- for j in request.json[i]:
- if j not in valid_query_parameters[i]:
- return error_response(400, f"Invalid parameter '{i}'.")
- query[i] = ('in_', request.json[i])
- jobs, count = job_utils.list_job(limit, offset, query, parse_order_by(('create_time', 'desc')))
- jobs = [job.to_human_model_dict() for job in jobs]
- for job in jobs:
- job['party_id'] = int(job['party_id'])
- job['partners'] = set()
- for i in ('guest', 'host', 'arbiter'):
- job['partners'].update(job['roles'].get(i, []))
- job['partners'].discard(job['party_id'])
- job['partners'] = sorted(job['partners'])
- return get_json_result(data={
- 'jobs': jobs,
- 'count': count,
- })
- @manager.route('/update', methods=['POST'])
- def update_job():
- job_info = request.json
- jobs = JobSaver.query_job(job_id=job_info['job_id'], party_id=job_info['party_id'], role=job_info['role'])
- if not jobs:
- return get_json_result(retcode=101, retmsg='find job failed')
- else:
- JobSaver.update_job(job_info={'description': job_info.get('notes', ''), 'job_id': job_info['job_id'], 'role': job_info['role'],
- 'party_id': job_info['party_id']})
- return get_json_result(retcode=0, retmsg='success')
- @manager.route('/report', methods=['POST'])
- def job_report():
- tasks = JobSaver.query_task(**request.json)
- if not tasks:
- return get_json_result(retcode=101, retmsg='find task failed')
- return get_json_result(retcode=0, retmsg='success', data=job_utils.task_report(tasks))
- @manager.route('/parameter/update', methods=['POST'])
- @api_utils.validate_request("job_id")
- def update_parameters():
- job_info = request.json
- component_parameters = job_info.pop("component_parameters", None)
- job_parameters = job_info.pop("job_parameters", None)
- job_info["is_initiator"] = True
- jobs = JobSaver.query_job(**job_info)
- if not jobs:
- return get_json_result(retcode=RetCode.DATA_ERROR, retmsg=log_utils.failed_log(f"query job by {job_info}"))
- else:
- retcode, retdata = DAGScheduler.update_parameters(jobs[0], job_parameters, component_parameters)
- return get_json_result(retcode=retcode, data=retdata)
- @manager.route('/config', methods=['POST'])
- def job_config():
- jobs = JobSaver.query_job(**request.json)
- if not jobs:
- return get_json_result(retcode=101, retmsg='find job failed')
- else:
- job = jobs[0]
- response_data = dict()
- response_data['job_id'] = job.f_job_id
- response_data['dsl'] = job.f_dsl
- response_data['runtime_conf'] = job.f_runtime_conf
- response_data['train_runtime_conf'] = job.f_train_runtime_conf
- adapter = JobRuntimeConfigAdapter(job.f_runtime_conf)
- job_parameters = adapter.get_common_parameters().to_dict()
- response_data['model_info'] = {'model_id': job_parameters.get('model_id'),
- 'model_version': job_parameters.get('model_version')}
- return get_json_result(retcode=0, retmsg='success', data=response_data)
- def check_job_log_dir():
- job_id = str(request.json['job_id'])
- job_log_dir = job_utils.get_job_log_directory(job_id=job_id)
- if not os.path.exists(job_log_dir):
- abort(error_response(404, f"Log file path: '{job_log_dir}' not found. Please check if the job id is valid."))
- return job_id, job_log_dir
- @manager.route('/log/download', methods=['POST'])
- @api_utils.validate_request('job_id')
- def job_log_download():
- job_id, job_log_dir = check_job_log_dir()
- memory_file = io.BytesIO()
- tar = tarfile.open(fileobj=memory_file, mode='w:gz')
- for root, dir, files in os.walk(job_log_dir):
- for file in files:
- full_path = os.path.join(root, file)
- rel_path = os.path.relpath(full_path, job_log_dir)
- tar.add(full_path, rel_path)
- tar.close()
- memory_file.seek(0)
- return send_file(memory_file, attachment_filename=f'job_{job_id}_log.tar.gz', as_attachment=True)
- @manager.route('/log/path', methods=['POST'])
- @api_utils.validate_request('job_id')
- def job_log_path():
- job_id, job_log_dir = check_job_log_dir()
- return get_json_result(data={"logs_directory": job_log_dir})
- @manager.route('/task/query', methods=['POST'])
- def query_task():
- tasks = JobSaver.query_task(**request.json)
- if not tasks:
- return get_json_result(retcode=101, retmsg='find task failed')
- return get_json_result(retcode=0, retmsg='success', data=[task.to_dict() for task in tasks])
- @manager.route('/list/task', methods=['POST'])
- def list_task():
- limit, offset = parse_limit_and_offset()
- query = {}
- for i in ('job_id', 'role', 'party_id', 'component_name'):
- if request.json.get(i) is not None:
- query[i] = request.json[i]
- if query.get('role') is not None:
- if query['role'] not in valid_query_parameters['role']:
- return error_response(400, f"Invalid parameter 'role'.")
- if query.get('party_id') is not None:
- try:
- query['party_id'] = int(query['party_id'])
- except Exception:
- return error_response(400, f"Invalid parameter 'party_id'.")
- tasks, count = job_utils.list_task(limit, offset, query, parse_order_by(('create_time', 'asc')))
- return get_json_result(data={
- 'tasks': [task.to_human_model_dict() for task in tasks],
- 'count': count,
- })
- @manager.route('/data/view/query', methods=['POST'])
- def query_component_output_data_info():
- output_data_infos = Tracker.query_output_data_infos(**request.json)
- if not output_data_infos:
- return get_json_result(retcode=101, retmsg='find data view failed')
- return get_json_result(retcode=0, retmsg='success', data=[output_data_info.to_dict() for output_data_info in output_data_infos])
- @manager.route('/clean', methods=['POST'])
- def clean_job():
- JobClean.start_clean_job(**request.json)
- return get_json_result(retcode=0, retmsg='success')
- @manager.route('/clean/queue', methods=['POST'])
- def clean_queue():
- jobs = JobSaver.query_job(is_initiator=True, status=JobStatus.WAITING)
- clean_status = {}
- for job in jobs:
- status_code, response = FederatedScheduler.request_stop_job(job=job, stop_status=JobStatus.CANCELED)
- clean_status[job.f_job_id] = status_code
- return get_json_result(retcode=0, retmsg='success', data=clean_status)
- @manager.route('/dsl/generate', methods=['POST'])
- def dsl_generator():
- data = request.json
- cpn_str = data.get("cpn_str", "")
- try:
- if not cpn_str:
- raise Exception("Component list should not be empty.")
- if isinstance(cpn_str, list):
- cpn_list = cpn_str
- else:
- if (cpn_str.find("/") and cpn_str.find("\\")) != -1:
- raise Exception("Component list string should not contain '/' or '\\'.")
- cpn_str = cpn_str.replace(" ", "").replace("\n", "").strip(",[]")
- cpn_list = cpn_str.split(",")
- train_dsl = json_loads(data.get("train_dsl"))
- parser = schedule_utils.get_dsl_parser_by_version(data.get("version", "2"))
- predict_dsl = parser.deploy_component(cpn_list, train_dsl)
- if data.get("filename"):
- os.makedirs(TEMP_DIRECTORY, exist_ok=True)
- temp_filepath = os.path.join(TEMP_DIRECTORY, data.get("filename"))
- with open(temp_filepath, "w") as fout:
- fout.write(json.dumps(predict_dsl, indent=4))
- return send_file(open(temp_filepath, 'rb'), as_attachment=True, attachment_filename=data.get("filename"))
- return get_json_result(data=predict_dsl)
- except Exception as e:
- stat_logger.exception(e)
- return error_response(210, "DSL generating failed. For more details, "
- "please check logs/fate_flow/fate_flow_stat.log.")
- @manager.route('/url/get', methods=['POST'])
- @api_utils.validate_request('job_id', 'role', 'party_id')
- def get_url():
- request_data = request.json
- jobs = JobSaver.query_job(job_id=request_data.get('job_id'), role=request_data.get('role'),
- party_id=request_data.get('party_id'))
- if jobs:
- board_urls = []
- for job in jobs:
- board_url = job_utils.get_board_url(job.f_job_id, job.f_role, job.f_party_id)
- board_urls.append(board_url)
- return get_json_result(data={'board_url': board_urls})
- else:
- return get_json_result(retcode=101, retmsg='no found job')
- def parse_limit_and_offset():
- try:
- limit = int(request.json.get('limit', 0))
- page = int(request.json.get('page', 1)) - 1
- except Exception:
- abort(error_response(400, f"Invalid parameter 'limit' or 'page'."))
- return limit, limit * page
- def parse_order_by(default=None):
- order_by = []
- if request.json.get('order_by') is not None:
- if request.json['order_by'] not in valid_query_parameters['order_by']:
- abort(error_response(400, f"Invalid parameter 'order_by'."))
- order_by.append(request.json['order_by'])
- if request.json.get('order') is not None:
- if request.json['order'] not in valid_query_parameters['order']:
- abort(error_response(400, f"Invalid parameter order 'order'."))
- order_by.append(request.json['order'])
- return order_by or default
- valid_query_parameters = {
- 'role': {'guest', 'host', 'arbiter', 'local'},
- 'status': {'success', 'running', 'waiting', 'failed', 'canceled'},
- 'order_by': {'job_id', 'task_version', 'create_time', 'start_time', 'end_time', 'elapsed'},
- 'order': {'asc', 'desc'},
- }
|