data_access_app.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from uuid import uuid1
  17. from pathlib import Path
  18. from flask import request
  19. from fate_flow.entity.run_status import StatusSet
  20. from fate_flow.entity import JobConfigurationBase
  21. from fate_arch import storage
  22. from fate_arch.common import FederatedMode
  23. from fate_arch.common.base_utils import json_loads
  24. from fate_flow.settings import UPLOAD_DATA_FROM_CLIENT
  25. from fate_flow.utils.api_utils import get_json_result, error_response
  26. from fate_flow.utils import detect_utils, job_utils
  27. from fate_flow.scheduler.dag_scheduler import DAGScheduler
  28. from fate_flow.operation.job_saver import JobSaver
  29. page_name = 'data'
  30. @manager.route('/<access_module>', methods=['post'])
  31. def download_upload(access_module):
  32. job_id = job_utils.generate_job_id()
  33. if access_module == "upload" and UPLOAD_DATA_FROM_CLIENT and not (request.json and request.json.get("use_local_data") == 0):
  34. file = request.files['file']
  35. filename = Path(job_utils.get_job_directory(job_id), 'fate_upload_tmp', uuid1().hex)
  36. filename.parent.mkdir(parents=True, exist_ok=True)
  37. try:
  38. file.save(str(filename))
  39. except Exception as e:
  40. try:
  41. filename.unlink()
  42. except FileNotFoundError:
  43. pass
  44. return error_response(500, f'Save file error: {e}')
  45. job_config = request.args.to_dict() or request.form.to_dict()
  46. if "namespace" not in job_config or "table_name" not in job_config:
  47. # higher than version 1.5.1, support eggroll run parameters
  48. job_config = json_loads(list(job_config.keys())[0])
  49. job_config['file'] = str(filename)
  50. else:
  51. job_config = request.json
  52. required_arguments = ['namespace', 'table_name']
  53. if access_module == 'upload':
  54. required_arguments.extend(['file', 'head', 'partition'])
  55. elif access_module == 'download':
  56. required_arguments.extend(['output_path'])
  57. elif access_module == 'writer':
  58. pass
  59. else:
  60. return error_response(400, f'Cannot support this operating: {access_module}')
  61. detect_utils.check_config(job_config, required_arguments=required_arguments)
  62. data = {}
  63. # compatibility
  64. if "table_name" in job_config:
  65. job_config["name"] = job_config["table_name"]
  66. for _ in ["head", "partition", "drop", "extend_sid", "auto_increasing_sid"]:
  67. if _ in job_config:
  68. if _ == "false":
  69. job_config[_] = False
  70. elif _ == "true":
  71. job_config[_] = True
  72. else:
  73. job_config[_] = int(job_config[_])
  74. if access_module == "upload":
  75. if int(job_config.get('drop', 0)) > 0:
  76. job_config["destroy"] = True
  77. else:
  78. job_config["destroy"] = False
  79. data['table_name'] = job_config["table_name"]
  80. data['namespace'] = job_config["namespace"]
  81. data_table_meta = storage.StorageTableMeta(name=job_config["table_name"], namespace=job_config["namespace"])
  82. if data_table_meta and not job_config["destroy"]:
  83. return get_json_result(retcode=100,
  84. retmsg='The data table already exists.'
  85. 'If you still want to continue uploading, please add the parameter --drop')
  86. job_dsl, job_runtime_conf = gen_data_access_job_config(job_config, access_module)
  87. submit_result = DAGScheduler.submit(JobConfigurationBase(**{'dsl': job_dsl, 'runtime_conf': job_runtime_conf}), job_id=job_id)
  88. data.update(submit_result)
  89. return get_json_result(job_id=job_id, data=data)
  90. @manager.route('/upload/history', methods=['POST'])
  91. def upload_history():
  92. request_data = request.json
  93. if request_data.get('job_id'):
  94. tasks = JobSaver.query_task(component_name='upload_0', status=StatusSet.SUCCESS, job_id=request_data.get('job_id'), run_on_this_party=True)
  95. else:
  96. tasks = JobSaver.query_task(component_name='upload_0', status=StatusSet.SUCCESS, run_on_this_party=True)
  97. limit = request_data.get('limit')
  98. if not limit:
  99. tasks = tasks[-1::-1]
  100. else:
  101. tasks = tasks[-1:-limit - 1:-1]
  102. jobs_run_conf = job_utils.get_upload_job_configuration_summary(upload_tasks=tasks)
  103. data = get_upload_info(jobs_run_conf=jobs_run_conf)
  104. return get_json_result(retcode=0, retmsg='success', data=data)
  105. def get_upload_info(jobs_run_conf):
  106. data = []
  107. for job_id, job_run_conf in jobs_run_conf.items():
  108. info = {}
  109. table_name = job_run_conf["name"]
  110. namespace = job_run_conf["namespace"]
  111. table_meta = storage.StorageTableMeta(name=table_name, namespace=namespace)
  112. if table_meta:
  113. partition = job_run_conf["partition"]
  114. info["upload_info"] = {
  115. "table_name": table_name,
  116. "namespace": namespace,
  117. "partition": partition,
  118. 'upload_count': table_meta.get_count()
  119. }
  120. info["notes"] = job_run_conf["notes"]
  121. info["schema"] = table_meta.get_schema()
  122. data.append({job_id: info})
  123. return data
  124. def gen_data_access_job_config(config_data, access_module):
  125. job_runtime_conf = {
  126. "initiator": {},
  127. "job_parameters": {"common": {}},
  128. "role": {},
  129. "component_parameters": {"role": {"local": {"0": {}}}}
  130. }
  131. initiator_role = "local"
  132. initiator_party_id = config_data.get('party_id', 0)
  133. job_runtime_conf["initiator"]["role"] = initiator_role
  134. job_runtime_conf["initiator"]["party_id"] = initiator_party_id
  135. job_parameters_fields = {"task_cores", "eggroll_run", "spark_run", "computing_engine", "storage_engine", "federation_engine"}
  136. for _ in job_parameters_fields:
  137. if _ in config_data:
  138. job_runtime_conf["job_parameters"]["common"][_] = config_data[_]
  139. job_runtime_conf["job_parameters"]["common"]["federated_mode"] = FederatedMode.SINGLE
  140. job_runtime_conf["role"][initiator_role] = [initiator_party_id]
  141. job_dsl = {
  142. "components": {}
  143. }
  144. if access_module == 'upload':
  145. parameters = {
  146. "head",
  147. "partition",
  148. "file",
  149. "namespace",
  150. "name",
  151. "id_delimiter",
  152. "storage_engine",
  153. "storage_address",
  154. "destroy",
  155. "extend_sid",
  156. "auto_increasing_sid",
  157. "block_size",
  158. "schema",
  159. "with_meta",
  160. "meta"
  161. }
  162. update_config(job_runtime_conf, job_dsl, initiator_role, parameters, access_module, config_data)
  163. if access_module == 'download':
  164. parameters = {
  165. "delimiter",
  166. "output_path",
  167. "namespace",
  168. "name"
  169. }
  170. update_config(job_runtime_conf, job_dsl, initiator_role, parameters, access_module, config_data)
  171. if access_module == 'writer':
  172. parameters = {
  173. "namespace",
  174. "name",
  175. "storage_engine",
  176. "address",
  177. "output_namespace",
  178. "output_name"
  179. }
  180. update_config(job_runtime_conf, job_dsl, initiator_role, parameters, access_module, config_data)
  181. return job_dsl, job_runtime_conf
  182. def update_config(job_runtime_conf, job_dsl, initiator_role, parameters, access_module, config_data):
  183. job_runtime_conf["component_parameters"]['role'][initiator_role]["0"][f"{access_module}_0"] = {}
  184. for p in parameters:
  185. if p in config_data:
  186. job_runtime_conf["component_parameters"]['role'][initiator_role]["0"][f"{access_module}_0"][p] = config_data[p]
  187. job_runtime_conf['dsl_version'] = 2
  188. job_dsl["components"][f"{access_module}_0"] = {
  189. "module": access_module.capitalize()
  190. }