component_app.py 9.9 KB


  1. #
  2. # Copyright 2021 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from flask import request
  17. from fate_arch.common.file_utils import get_federatedml_setting_conf_directory
  18. from fate_flow.component_env_utils.env_utils import get_class_object
  19. from fate_flow.db.component_registry import ComponentRegistry
  20. from fate_flow.db.db_models import PipelineComponentMeta
  21. from fate_flow.model.sync_model import SyncComponent
  22. from fate_flow.pipelined_model.pipelined_model import PipelinedModel
  23. from fate_flow.settings import ENABLE_MODEL_STORE
  24. from fate_flow.utils.api_utils import error_response, get_json_result, validate_request
  25. from fate_flow.utils.detect_utils import check_config
  26. from fate_flow.utils.job_utils import generate_job_id
  27. from fate_flow.utils.model_utils import gen_party_model_id
  28. from fate_flow.utils.schedule_utils import get_dsl_parser_by_version
  29. @manager.route('/get', methods=['POST'])
  30. def get_components():
  31. return get_json_result(data=ComponentRegistry.get_components())
  32. @manager.route('/<component_name>/get', methods=['POST'])
  33. def get_component(component_name):
  34. return get_json_result(data=ComponentRegistry.get_components().get(component_name))
  35. @manager.route('/validate', methods=['POST'])
  36. def validate_component_param():
  37. if not request.json or not isinstance(request.json, dict):
  38. return error_response(400)
  39. required_keys = [
  40. 'component_name',
  41. 'component_module_name',
  42. ]
  43. config_keys = ['role']
  44. dsl_version = int(request.json.get('dsl_version', 0))
  45. parser_class = get_dsl_parser_by_version(dsl_version)
  46. if dsl_version == 1:
  47. config_keys += ['role_parameters', 'algorithm_parameters']
  48. elif dsl_version == 2:
  49. config_keys += ['component_parameters']
  50. else:
  51. return error_response(400, 'unsupported dsl_version')
  52. try:
  53. check_config(request.json, required_keys + config_keys)
  54. except Exception as e:
  55. return error_response(400, str(e))
  56. try:
  57. parser_class.validate_component_param(
  58. get_federatedml_setting_conf_directory(),
  59. {i: request.json[i] for i in config_keys},
  60. *[request.json[i] for i in required_keys])
  61. except Exception as e:
  62. return error_response(400, str(e))
  63. return get_json_result()
  64. @manager.route('/hetero/merge', methods=['POST'])
  65. @validate_request(
  66. 'model_id', 'model_version', 'guest_party_id', 'host_party_ids',
  67. 'component_name', 'model_type', 'output_format',
  68. )
  69. def hetero_model_merge():
  70. request_data = request.json
  71. if ENABLE_MODEL_STORE:
  72. sync_component = SyncComponent(
  73. role='guest',
  74. party_id=request_data['guest_party_id'],
  75. model_id=request_data['model_id'],
  76. model_version=request_data['model_version'],
  77. component_name=request_data['component_name'],
  78. )
  79. if not sync_component.local_exists() and sync_component.remote_exists():
  80. sync_component.download()
  81. for party_id in request_data['host_party_ids']:
  82. sync_component = SyncComponent(
  83. role='host',
  84. party_id=party_id,
  85. model_id=request_data['model_id'],
  86. model_version=request_data['model_version'],
  87. component_name=request_data['component_name'],
  88. )
  89. if not sync_component.local_exists() and sync_component.remote_exists():
  90. sync_component.download()
  91. model = PipelinedModel(
  92. gen_party_model_id(
  93. request_data['model_id'],
  94. 'guest',
  95. request_data['guest_party_id'],
  96. ),
  97. request_data['model_version'],
  98. ).read_component_model(
  99. request_data['component_name'],
  100. output_json=True,
  101. )
  102. guest_param = None
  103. guest_meta = None
  104. for k, v in model.items():
  105. if k.endswith('Param'):
  106. guest_param = v
  107. elif k.endswith('Meta'):
  108. guest_meta = v
  109. else:
  110. return error_response(400, f'Unknown guest model key: "{k}".')
  111. if guest_param is None or guest_meta is None:
  112. return error_response(400, 'Invalid guest model.')
  113. host_params = []
  114. host_metas = []
  115. for party_id in request_data['host_party_ids']:
  116. model = PipelinedModel(
  117. gen_party_model_id(
  118. request_data['model_id'],
  119. 'host',
  120. party_id,
  121. ),
  122. request_data['model_version'],
  123. ).read_component_model(
  124. request_data['component_name'],
  125. output_json=True,
  126. )
  127. for k, v in model.items():
  128. if k.endswith('Param'):
  129. host_params.append(v)
  130. elif k.endswith('Meta'):
  131. host_metas.append(v)
  132. else:
  133. return error_response(400, f'Unknown host model key: "{k}".')
  134. if not host_params or not host_metas or len(host_params) != len(host_metas):
  135. return error_response(400, 'Invalid host models.')
  136. data = get_class_object('hetero_model_merge')(
  137. guest_param, guest_meta,
  138. host_params, host_metas,
  139. request_data['model_type'],
  140. request_data['output_format'],
  141. request_data.get('target_name', 'y'),
  142. request_data.get('host_rename', False),
  143. request_data.get('include_guest_coef', False),
  144. )
  145. return get_json_result(data=data)
  146. @manager.route('/woe_array/extract', methods=['POST'])
  147. @validate_request(
  148. 'model_id', 'model_version', 'role', 'party_id', 'component_name',
  149. )
  150. def woe_array_extract():
  151. if request.json['role'] != 'guest':
  152. return error_response(400, 'Only support guest role.')
  153. if ENABLE_MODEL_STORE:
  154. sync_component = SyncComponent(
  155. role=request.json['role'],
  156. party_id=request.json['party_id'],
  157. model_id=request.json['model_id'],
  158. model_version=request.json['model_version'],
  159. component_name=request.json['component_name'],
  160. )
  161. if not sync_component.local_exists() and sync_component.remote_exists():
  162. sync_component.download()
  163. model = PipelinedModel(
  164. gen_party_model_id(
  165. request.json['model_id'],
  166. request.json['role'],
  167. request.json['party_id'],
  168. ),
  169. request.json['model_version'],
  170. ).read_component_model(
  171. request.json['component_name'],
  172. output_json=True,
  173. )
  174. param = None
  175. meta = None
  176. for k, v in model.items():
  177. if k.endswith('Param'):
  178. param = v
  179. elif k.endswith('Meta'):
  180. meta = v
  181. else:
  182. return error_response(400, f'Unknown model key: "{k}".')
  183. if param is None or meta is None:
  184. return error_response(400, 'Invalid model.')
  185. data = get_class_object('extract_woe_array_dict')(param)
  186. return get_json_result(data=data)
  187. @manager.route('/woe_array/merge', methods=['POST'])
  188. @validate_request(
  189. 'model_id', 'model_version', 'role', 'party_id', 'component_name', 'woe_array',
  190. )
  191. def woe_array_merge():
  192. if request.json['role'] != 'host':
  193. return error_response(400, 'Only support host role.')
  194. pipelined_model = PipelinedModel(
  195. gen_party_model_id(
  196. request.json['model_id'],
  197. request.json['role'],
  198. request.json['party_id'],
  199. ),
  200. request.json['model_version'],
  201. )
  202. query = pipelined_model.pipelined_component.get_define_meta_from_db(
  203. PipelineComponentMeta.f_component_name == request.json['component_name'],
  204. )
  205. if not query:
  206. return error_response(404, 'Component not found.')
  207. query = query[0]
  208. if ENABLE_MODEL_STORE:
  209. sync_component = SyncComponent(
  210. role=query.f_role,
  211. party_id=query.f_party_id,
  212. model_id=query.f_model_id,
  213. model_version=query.f_model_version,
  214. component_name=query.f_component_name,
  215. )
  216. if not sync_component.local_exists() and sync_component.remote_exists():
  217. sync_component.download()
  218. model = pipelined_model._read_component_model(
  219. query.f_component_name,
  220. query.f_model_alias,
  221. )
  222. for model_name, (
  223. buffer_name,
  224. buffer_string,
  225. buffer_dict,
  226. ) in model.items():
  227. if model_name.endswith('Param'):
  228. string_merged, dict_merged = get_class_object('merge_woe_array_dict')(
  229. buffer_name,
  230. buffer_string,
  231. buffer_dict,
  232. request.json['woe_array'],
  233. )
  234. model[model_name] = (
  235. buffer_name,
  236. string_merged,
  237. dict_merged,
  238. )
  239. break
  240. pipelined_model = PipelinedModel(
  241. pipelined_model.party_model_id,
  242. generate_job_id()
  243. )
  244. pipelined_model.save_component_model(
  245. query.f_component_name,
  246. query.f_component_module_name,
  247. query.f_model_alias,
  248. model,
  249. query.f_run_parameters,
  250. )
  251. if ENABLE_MODEL_STORE:
  252. sync_component = SyncComponent(
  253. role=query.f_role,
  254. party_id=query.f_party_id,
  255. model_id=query.f_model_id,
  256. model_version=pipelined_model.model_version,
  257. component_name=query.f_component_name,
  258. )
  259. sync_component.upload()
  260. return get_json_result(data={
  261. 'role': query.f_role,
  262. 'party_id': query.f_party_id,
  263. 'model_id': query.f_model_id,
  264. 'model_version': pipelined_model.model_version,
  265. 'component_name': query.f_component_name,
  266. })