123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- from uuid import uuid1
- from pathlib import Path
- from flask import request
- from fate_flow.entity.run_status import StatusSet
- from fate_flow.entity import JobConfigurationBase
- from fate_arch import storage
- from fate_arch.common import FederatedMode
- from fate_arch.common.base_utils import json_loads
- from fate_flow.settings import UPLOAD_DATA_FROM_CLIENT
- from fate_flow.utils.api_utils import get_json_result, error_response
- from fate_flow.utils import detect_utils, job_utils
- from fate_flow.scheduler.dag_scheduler import DAGScheduler
- from fate_flow.operation.job_saver import JobSaver
- page_name = 'data'
- @manager.route('/<access_module>', methods=['post'])
- def download_upload(access_module):
- job_id = job_utils.generate_job_id()
- if access_module == "upload" and UPLOAD_DATA_FROM_CLIENT and not (request.json and request.json.get("use_local_data") == 0):
- file = request.files['file']
- filename = Path(job_utils.get_job_directory(job_id), 'fate_upload_tmp', uuid1().hex)
- filename.parent.mkdir(parents=True, exist_ok=True)
- try:
- file.save(str(filename))
- except Exception as e:
- try:
- filename.unlink()
- except FileNotFoundError:
- pass
- return error_response(500, f'Save file error: {e}')
- job_config = request.args.to_dict() or request.form.to_dict()
- if "namespace" not in job_config or "table_name" not in job_config:
- # higher than version 1.5.1, support eggroll run parameters
- job_config = json_loads(list(job_config.keys())[0])
- job_config['file'] = str(filename)
- else:
- job_config = request.json
- required_arguments = ['namespace', 'table_name']
- if access_module == 'upload':
- required_arguments.extend(['file', 'head', 'partition'])
- elif access_module == 'download':
- required_arguments.extend(['output_path'])
- elif access_module == 'writer':
- pass
- else:
- return error_response(400, f'Cannot support this operating: {access_module}')
- detect_utils.check_config(job_config, required_arguments=required_arguments)
- data = {}
- # compatibility
- if "table_name" in job_config:
- job_config["name"] = job_config["table_name"]
- for _ in ["head", "partition", "drop", "extend_sid", "auto_increasing_sid"]:
- if _ in job_config:
- if _ == "false":
- job_config[_] = False
- elif _ == "true":
- job_config[_] = True
- else:
- job_config[_] = int(job_config[_])
- if access_module == "upload":
- if int(job_config.get('drop', 0)) > 0:
- job_config["destroy"] = True
- else:
- job_config["destroy"] = False
- data['table_name'] = job_config["table_name"]
- data['namespace'] = job_config["namespace"]
- data_table_meta = storage.StorageTableMeta(name=job_config["table_name"], namespace=job_config["namespace"])
- if data_table_meta and not job_config["destroy"]:
- return get_json_result(retcode=100,
- retmsg='The data table already exists.'
- 'If you still want to continue uploading, please add the parameter --drop')
- job_dsl, job_runtime_conf = gen_data_access_job_config(job_config, access_module)
- submit_result = DAGScheduler.submit(JobConfigurationBase(**{'dsl': job_dsl, 'runtime_conf': job_runtime_conf}), job_id=job_id)
- data.update(submit_result)
- return get_json_result(job_id=job_id, data=data)
- @manager.route('/upload/history', methods=['POST'])
- def upload_history():
- request_data = request.json
- if request_data.get('job_id'):
- tasks = JobSaver.query_task(component_name='upload_0', status=StatusSet.SUCCESS, job_id=request_data.get('job_id'), run_on_this_party=True)
- else:
- tasks = JobSaver.query_task(component_name='upload_0', status=StatusSet.SUCCESS, run_on_this_party=True)
- limit = request_data.get('limit')
- if not limit:
- tasks = tasks[-1::-1]
- else:
- tasks = tasks[-1:-limit - 1:-1]
- jobs_run_conf = job_utils.get_upload_job_configuration_summary(upload_tasks=tasks)
- data = get_upload_info(jobs_run_conf=jobs_run_conf)
- return get_json_result(retcode=0, retmsg='success', data=data)
- def get_upload_info(jobs_run_conf):
- data = []
- for job_id, job_run_conf in jobs_run_conf.items():
- info = {}
- table_name = job_run_conf["name"]
- namespace = job_run_conf["namespace"]
- table_meta = storage.StorageTableMeta(name=table_name, namespace=namespace)
- if table_meta:
- partition = job_run_conf["partition"]
- info["upload_info"] = {
- "table_name": table_name,
- "namespace": namespace,
- "partition": partition,
- 'upload_count': table_meta.get_count()
- }
- info["notes"] = job_run_conf["notes"]
- info["schema"] = table_meta.get_schema()
- data.append({job_id: info})
- return data
- def gen_data_access_job_config(config_data, access_module):
- job_runtime_conf = {
- "initiator": {},
- "job_parameters": {"common": {}},
- "role": {},
- "component_parameters": {"role": {"local": {"0": {}}}}
- }
- initiator_role = "local"
- initiator_party_id = config_data.get('party_id', 0)
- job_runtime_conf["initiator"]["role"] = initiator_role
- job_runtime_conf["initiator"]["party_id"] = initiator_party_id
- job_parameters_fields = {"task_cores", "eggroll_run", "spark_run", "computing_engine", "storage_engine", "federation_engine"}
- for _ in job_parameters_fields:
- if _ in config_data:
- job_runtime_conf["job_parameters"]["common"][_] = config_data[_]
- job_runtime_conf["job_parameters"]["common"]["federated_mode"] = FederatedMode.SINGLE
- job_runtime_conf["role"][initiator_role] = [initiator_party_id]
- job_dsl = {
- "components": {}
- }
- if access_module == 'upload':
- parameters = {
- "head",
- "partition",
- "file",
- "namespace",
- "name",
- "id_delimiter",
- "storage_engine",
- "storage_address",
- "destroy",
- "extend_sid",
- "auto_increasing_sid",
- "block_size",
- "schema",
- "with_meta",
- "meta"
- }
- update_config(job_runtime_conf, job_dsl, initiator_role, parameters, access_module, config_data)
- if access_module == 'download':
- parameters = {
- "delimiter",
- "output_path",
- "namespace",
- "name"
- }
- update_config(job_runtime_conf, job_dsl, initiator_role, parameters, access_module, config_data)
- if access_module == 'writer':
- parameters = {
- "namespace",
- "name",
- "storage_engine",
- "address",
- "output_namespace",
- "output_name"
- }
- update_config(job_runtime_conf, job_dsl, initiator_role, parameters, access_module, config_data)
- return job_dsl, job_runtime_conf
- def update_config(job_runtime_conf, job_dsl, initiator_role, parameters, access_module, config_data):
- job_runtime_conf["component_parameters"]['role'][initiator_role]["0"][f"{access_module}_0"] = {}
- for p in parameters:
- if p in config_data:
- job_runtime_conf["component_parameters"]['role'][initiator_role]["0"][f"{access_module}_0"][p] = config_data[p]
- job_runtime_conf['dsl_version'] = 2
- job_dsl["components"][f"{access_module}_0"] = {
- "module": access_module.capitalize()
- }
|