job.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import os
  17. import json
  18. from datetime import datetime
  19. import click
  20. import requests
  21. from contextlib import closing
  22. from flow_client.flow_cli.utils import cli_args
  23. from flow_client.flow_cli.utils.cli_utils import (preprocess, download_from_request, access_server,
  24. prettify, check_abs_path)
  25. @click.group(short_help="Job Operations")
  26. @click.pass_context
  27. def job(ctx):
  28. """
  29. \b
  30. Provides numbers of job operational commands, including submit, stop, query and etc.
  31. For more details, please check out the help text.
  32. """
  33. pass
  34. @job.command("submit", short_help="Submit Job Command")
  35. @cli_args.CONF_PATH
  36. @cli_args.DSL_PATH
  37. @click.pass_context
  38. def submit(ctx, **kwargs):
  39. """
  40. - DESCRIPTION:
  41. \b
  42. Submit a pipeline job.
  43. Used to be 'submit_job'.
  44. \b
  45. - USAGE:
  46. flow job submit -c fate_flow/examples/test_hetero_lr_job_conf.json -d fate_flow/examples/test_hetero_lr_job_dsl.json
  47. """
  48. config_data, dsl_data = preprocess(**kwargs)
  49. post_data = {
  50. 'job_dsl': dsl_data,
  51. 'job_runtime_conf': config_data
  52. }
  53. access_server('post', ctx, 'job/submit', post_data)
  54. @job.command("list", short_help="List Job Command")
  55. @cli_args.LIMIT
  56. @click.pass_context
  57. def list_job(ctx, **kwargs):
  58. """
  59. - DESCRIPTION:
  60. List job.
  61. \b
  62. - USAGE:
  63. flow job list
  64. flow job list -l 30
  65. """
  66. config_data, dsl_data = preprocess(**kwargs)
  67. access_server('post', ctx, 'job/list/job', config_data)
  68. @job.command("query", short_help="Query Job Command")
  69. @cli_args.JOBID
  70. @cli_args.ROLE
  71. @cli_args.PARTYID
  72. @cli_args.STATUS
  73. @click.pass_context
  74. def query(ctx, **kwargs):
  75. """
  76. \b
  77. - DESCRIPTION:
  78. Query job information by filters.
  79. Used to be 'query_job'.
  80. \b
  81. - USAGE:
  82. flow job query -r guest -p 9999 -s success
  83. flow job query -j $JOB_ID -cpn hetero_feature_binning_0
  84. """
  85. config_data, dsl_data = preprocess(**kwargs)
  86. response = access_server('post', ctx, "job/query", config_data, False)
  87. if isinstance(response, requests.models.Response):
  88. response = response.json()
  89. if response['retcode'] == 0:
  90. for i in range(len(response['data'])):
  91. del response['data'][i]['f_runtime_conf']
  92. del response['data'][i]['f_dsl']
  93. prettify(response)
  94. # @job.command("clean", short_help="Clean Job Command")
  95. @cli_args.JOBID_REQUIRED
  96. @cli_args.ROLE
  97. @cli_args.PARTYID
  98. @cli_args.COMPONENT_NAME
  99. @click.pass_context
  100. def clean(ctx, **kwargs):
  101. """
  102. \b
  103. - DESCRIPTION:
  104. Clean processor, data table and metric data.
  105. Used to be 'clean_job'.
  106. \b
  107. - USAGE:
  108. flow job clean -j $JOB_ID -cpn hetero_feature_binning_0
  109. """
  110. config_data, dsl_data = preprocess(**kwargs)
  111. access_server('post', ctx, "job/clean", config_data)
  112. @job.command("stop", short_help="Stop Job Command")
  113. @cli_args.JOBID_REQUIRED
  114. @click.pass_context
  115. def stop(ctx, **kwargs):
  116. """
  117. \b
  118. - DESCRIPTION:
  119. Stop a specified job.
  120. \b
  121. - USAGE:
  122. flow job stop -j $JOB_ID
  123. """
  124. config_data, dsl_data = preprocess(**kwargs)
  125. access_server('post', ctx, "job/stop", config_data)
  126. @job.command("rerun", short_help="Rerun Job Command")
  127. @cli_args.JOBID_REQUIRED
  128. @cli_args.FORCE
  129. @cli_args.COMPONENT_NAME
  130. @click.pass_context
  131. def rerun(ctx, **kwargs):
  132. config_data, dsl_data = preprocess(**kwargs)
  133. access_server('post', ctx, "job/rerun", config_data)
  134. @job.command("config", short_help="Config Job Command")
  135. @cli_args.JOBID_REQUIRED
  136. @cli_args.ROLE_REQUIRED
  137. @cli_args.PARTYID_REQUIRED
  138. @cli_args.OUTPUT_PATH_REQUIRED
  139. @click.pass_context
  140. def config(ctx, **kwargs):
  141. """
  142. \b
  143. - DESCRIPTION:
  144. Download Configurations of A Specified Job.
  145. \b
  146. - USAGE:
  147. flow job config -j $JOB_ID -r host -p 10000 --output-path ./examples/
  148. """
  149. config_data, dsl_data = preprocess(**kwargs)
  150. response = access_server('post', ctx, 'job/config', config_data, False)
  151. if isinstance(response, requests.models.Response):
  152. response = response.json()
  153. if response['retcode'] == 0:
  154. job_id = response['data']['job_id']
  155. download_directory = os.path.join(os.path.abspath(config_data['output_path']), 'job_{}_config'.format(job_id))
  156. os.makedirs(download_directory, exist_ok=True)
  157. for k, v in response['data'].items():
  158. if k == 'job_id':
  159. continue
  160. with open('{}/{}.json'.format(download_directory, k), 'w') as fw:
  161. json.dump(v, fw, indent=4)
  162. del response['data']['dsl']
  163. del response['data']['runtime_conf']
  164. response['directory'] = download_directory
  165. response['retmsg'] = 'download successfully, please check {} directory'.format(download_directory)
  166. prettify(response)
  167. @job.command("log", short_help="Log Job Command")
  168. @cli_args.JOBID_REQUIRED
  169. @cli_args.OUTPUT_PATH_REQUIRED
  170. @click.pass_context
  171. def log(ctx, **kwargs):
  172. """
  173. \b
  174. - DESCRIPTION:
  175. Download Log Files of A Specified Job.
  176. \b
  177. - USAGE:
  178. flow job log -j JOB_ID --output-path ./examples/
  179. """
  180. config_data, dsl_data = preprocess(**kwargs)
  181. job_id = config_data['job_id']
  182. tar_file_name = 'job_{}_log.tar.gz'.format(job_id)
  183. extract_dir = os.path.join(config_data['output_path'], 'job_{}_log'.format(job_id))
  184. with closing(access_server('post', ctx, 'job/log/download', config_data, False, stream=True)) as response:
  185. if response.status_code == 200:
  186. download_from_request(http_response=response, tar_file_name=tar_file_name, extract_dir=extract_dir)
  187. res = {'retcode': 0,
  188. 'directory': extract_dir,
  189. 'retmsg': 'download successfully, please check {} directory'.format(extract_dir)}
  190. else:
  191. res = response.json() if isinstance(response, requests.models.Response) else response
  192. prettify(res)
  193. @job.command("view", short_help="Query Job Data View Command")
  194. @cli_args.JOBID
  195. @cli_args.ROLE
  196. @cli_args.PARTYID
  197. @cli_args.STATUS
  198. @click.pass_context
  199. def view(ctx, **kwargs):
  200. """
  201. \b
  202. - DESCRIPTION:
  203. Query job data view information by filters.
  204. Used to be 'data_view_query'.
  205. \b
  206. - USAGE:
  207. flow job view -r guest -p 9999
  208. flow job view -j $JOB_ID -cpn hetero_feature_binning_0
  209. """
  210. config_data, dsl_data = preprocess(**kwargs)
  211. access_server('post', ctx, 'job/data/view/query', config_data)
  212. @job.command("dsl", short_help="Generate Predict DSL Command")
  213. @click.option("--cpn-list", type=click.STRING,
  214. help="User inputs a string to specify component list")
  215. @click.option("--cpn-path", type=click.Path(exists=True),
  216. help="User specifies a file path which records the component list.")
  217. @click.option("--train-dsl-path", type=click.Path(exists=True), required=True,
  218. help="User specifies the train dsl file path.")
  219. @cli_args.OUTPUT_PATH
  220. @click.pass_context
  221. def dsl_generator(ctx, **kwargs):
  222. """
  223. \b
  224. - DESCRIPTION:
  225. A predict dsl generator.
  226. Before using predict dsl generator, users should prepare:
  227. 1. name list of component which you are going to use in predict progress,
  228. 2. the train dsl file path you specified in train progress.
  229. \b
  230. Notice that users can choose to specify the component name list by using a text file,
  231. or, by typing in terminal. We, however, strongly recommend users using prepared files
  232. to specify the component list in order to avoid some unnecessary mistakes.
  233. \b
  234. - USAGE:
  235. flow job dsl --cpn-path fate_flow/examples/component_list.txt --train-dsl-path fate_flow/examples/test_hetero_lr_job_dsl.json -o fate_flow/examples/
  236. flow job dsl --cpn-list "dataio_0, hetero_feature_binning_0, hetero_feature_selection_0, evaluation_0" --train-dsl-path fate_flow/examples/test_hetero_lr_job_dsl.json -o fate_flow/examples/
  237. flow job dsl --cpn-list [dataio_0,hetero_feature_binning_0,hetero_feature_selection_0,evaluation_0] --train-dsl-path fate_flow/examples/test_hetero_lr_job_dsl.json -o fate_flow/examples/
  238. """
  239. if kwargs.get("cpn_list"):
  240. cpn_str = kwargs.get("cpn_list")
  241. elif kwargs.get("cpn_path"):
  242. with open(kwargs.get("cpn_path"), "r") as fp:
  243. cpn_str = fp.read()
  244. else:
  245. cpn_str = ""
  246. with open(kwargs.get("train_dsl_path"), "r") as ft:
  247. train_dsl = ft.read()
  248. config_data = {
  249. "cpn_str": cpn_str,
  250. "train_dsl": train_dsl,
  251. "version": "2",
  252. }
  253. if kwargs.get("output_path"):
  254. dsl_filename = "predict_dsl_{}.json".format(datetime.now().strftime('%Y%m%d%H%M%S'))
  255. output_path = os.path.join(check_abs_path(kwargs.get("output_path")), dsl_filename)
  256. config_data["filename"] = dsl_filename
  257. with closing(access_server('post', ctx, 'job/dsl/generate', config_data, False, stream=True)) as response:
  258. if response.status_code == 200:
  259. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  260. with open(output_path, "wb") as fw:
  261. for chunk in response.iter_content(1024):
  262. if chunk:
  263. fw.write(chunk)
  264. res = {'retcode': 0,
  265. 'retmsg': "New predict dsl file has been generated successfully. "
  266. "File path is: {}".format(output_path)}
  267. else:
  268. try:
  269. res = response.json() if isinstance(response, requests.models.Response) else response
  270. except Exception:
  271. res = {'retcode': 100,
  272. 'retmsg': "New predict dsl file generated failed."
  273. "For more details, please check logs/fate_flow/fate_flow_stat.log"}
  274. prettify(res)
  275. else:
  276. access_server('post', ctx, 'job/dsl/generate', config_data)
  277. @job.command("parameter-update", short_help="Update Job Components Parameters Command")
  278. @cli_args.JOBID_REQUIRED
  279. @cli_args.CONF_PATH
  280. @click.pass_context
  281. def update_parameter(ctx, **kwargs):
  282. config_data, dsl_data = preprocess(**kwargs)
  283. access_server('post', ctx, 'job/parameter/update', config_data)