job_submitter.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import json
  17. import os
  18. import tempfile
  19. import time
  20. from datetime import timedelta
  21. from pathlib import Path
  22. from flow_sdk.client import FlowClient
  23. from pipeline.backend import config as conf
  24. from pipeline.backend.config import JobStatus
  25. from pipeline.backend.config import StatusCode
  26. from pipeline.utils.logger import LOGGER
  27. class JobInvoker(object):
  28. def __init__(self):
  29. self.client = FlowClient(ip=conf.PipelineConfig.IP, port=conf.PipelineConfig.PORT, version=conf.SERVER_VERSION,
  30. app_key=conf.PipelineConfig.APP_KEY, secret_key=conf.PipelineConfig.SECRET_KEY)
  31. def submit_job(self, dsl=None, submit_conf=None, callback_func=None):
  32. LOGGER.debug(f"submit dsl is: \n {json.dumps(dsl, indent=4, ensure_ascii=False)}")
  33. LOGGER.debug(f"submit conf is: \n {json.dumps(submit_conf, indent=4, ensure_ascii=False)}")
  34. result = self.run_job_with_retry(self.client.job.submit, params=dict(config_data=submit_conf,
  35. dsl_data=dsl))
  36. # result = self.client.job.submit(config_data=submit_conf, dsl_data=dsl)
  37. if callback_func is not None:
  38. callback_func(result)
  39. try:
  40. if 'retcode' not in result or result["retcode"] != 0:
  41. raise ValueError(f"retcode err, callback result is {result}")
  42. if "jobId" not in result:
  43. raise ValueError(f"jobID not in result: {result}")
  44. job_id = result["jobId"]
  45. data = result["data"]
  46. except ValueError:
  47. raise ValueError("job submit failed, err msg: {}".format(result))
  48. return job_id, data
  49. def upload_data(self, submit_conf=None, drop=0):
  50. result = self.client.data.upload(config_data=submit_conf, verbose=1, drop=drop)
  51. try:
  52. if 'retcode' not in result or result["retcode"] != 0:
  53. raise ValueError
  54. if "jobId" not in result:
  55. raise ValueError
  56. job_id = result["jobId"]
  57. data = result["data"]
  58. except BaseException:
  59. raise ValueError("job submit failed, err msg: {}".format(result))
  60. return job_id, data
  61. def monitor_job_status(self, job_id, role, party_id, previous_status=None):
  62. if previous_status in [StatusCode.SUCCESS, StatusCode.CANCELED]:
  63. if previous_status == StatusCode.SUCCESS:
  64. status = JobStatus.SUCCESS
  65. else:
  66. status = JobStatus.CANCELED
  67. raise ValueError(f"Previous fit status is {status}, please don't fit again")
  68. party_id = str(party_id)
  69. start_time = time.time()
  70. pre_cpn = None
  71. LOGGER.info(f"Job id is {job_id}\n")
  72. while True:
  73. ret_code, ret_msg, data = self.query_job(job_id, role, party_id)
  74. status = data["f_status"]
  75. if status == JobStatus.SUCCESS:
  76. elapse_seconds = timedelta(seconds=int(time.time() - start_time))
  77. LOGGER.info(f"Job is success!!! Job id is {job_id}")
  78. LOGGER.info(f"Total time: {elapse_seconds}")
  79. return StatusCode.SUCCESS
  80. elif status == JobStatus.FAILED:
  81. raise ValueError(f"Job is failed, please check out job {job_id} by fate board or fate_flow cli")
  82. elif status == JobStatus.WAITING:
  83. elapse_seconds = timedelta(seconds=int(time.time() - start_time))
  84. LOGGER.info(f"\x1b[80D\x1b[1A\x1b[KJob is still waiting, time elapse: {elapse_seconds}")
  85. elif status == JobStatus.CANCELED:
  86. elapse_seconds = timedelta(seconds=int(time.time() - start_time))
  87. LOGGER.info(f"Job is canceled, time elapse: {elapse_seconds}\r")
  88. return StatusCode.CANCELED
  89. elif status == JobStatus.TIMEOUT:
  90. elapse_seconds = timedelta(seconds=int(time.time() - start_time))
  91. raise ValueError(f"Job is timeout, time elapse: {elapse_seconds}\r")
  92. elif status == JobStatus.RUNNING:
  93. ret_code, _, data = self.query_task(job_id=job_id, role=role, party_id=party_id,
  94. status=JobStatus.RUNNING)
  95. if ret_code != 0 or len(data) == 0:
  96. time.sleep(conf.TIME_QUERY_FREQS)
  97. continue
  98. elapse_seconds = timedelta(seconds=int(time.time() - start_time))
  99. if len(data) == 1:
  100. cpn = data[0]["f_component_name"]
  101. else:
  102. cpn = []
  103. for cpn_data in data:
  104. cpn.append(cpn_data["f_component_name"])
  105. if cpn != pre_cpn:
  106. LOGGER.info(f"\r")
  107. pre_cpn = cpn
  108. LOGGER.info(f"\x1b[80D\x1b[1A\x1b[KRunning component {cpn}, time elapse: {elapse_seconds}")
  109. else:
  110. raise ValueError(f"Unknown status: {status}")
  111. time.sleep(conf.TIME_QUERY_FREQS)
  112. def query_job(self, job_id, role, party_id):
  113. party_id = str(party_id)
  114. result = self.run_job_with_retry(self.client.job.query, params=dict(job_id=job_id, role=role,
  115. party_id=party_id))
  116. # result = self.client.job.query(job_id=job_id, role=role, party_id=party_id)
  117. try:
  118. if 'retcode' not in result or result["retcode"] != 0:
  119. raise ValueError("can not query_job")
  120. ret_code = result["retcode"]
  121. ret_msg = result["retmsg"]
  122. data = result["data"][0]
  123. return ret_code, ret_msg, data
  124. except ValueError:
  125. raise ValueError("query job result is {}, can not parse useful info".format(result))
  126. def get_output_data_table(self, job_id, cpn_name, role, party_id):
  127. """
  128. Parameters
  129. ----------
  130. job_id: str
  131. cpn_name: str
  132. role: str
  133. party_id: int
  134. Returns
  135. -------
  136. dict
  137. single output example:
  138. {
  139. table_name: [],
  140. table_namespace: []
  141. }
  142. multiple output example:
  143. {
  144. train_data: {
  145. table_name: [],
  146. table_namespace: []
  147. },
  148. validate_data: {
  149. table_name: [],
  150. table_namespace: []
  151. }
  152. test_data: {
  153. table_name: [],
  154. table_namespace: []
  155. }
  156. }
  157. """
  158. party_id = str(party_id)
  159. result = self.client.component.output_data_table(job_id=job_id, role=role,
  160. party_id=party_id, component_name=cpn_name)
  161. data = {}
  162. try:
  163. if 'retcode' not in result or result["retcode"] != 0:
  164. raise ValueError(f"No retcode found in result: {result}")
  165. if "data" not in result:
  166. raise ValueError(f"No data returned: {result}")
  167. all_data = result["data"]
  168. n = len(all_data)
  169. # single data table
  170. if n == 1:
  171. single_data = all_data[0]
  172. del single_data["data_name"]
  173. data = single_data
  174. # multiple data table
  175. elif n > 1:
  176. for single_data in all_data:
  177. data_name = single_data["data_name"]
  178. del single_data["data_name"]
  179. data[data_name] = single_data
  180. # no data table obtained
  181. else:
  182. LOGGER.info(f"No output data table found in {result}")
  183. except ValueError:
  184. raise ValueError(f"Job submit failed, err msg: {result}")
  185. return data
  186. def query_task(self, job_id, role, party_id, status=None):
  187. party_id = str(party_id)
  188. result = self.client.task.query(job_id=job_id, role=role,
  189. party_id=party_id, status=status)
  190. try:
  191. if 'retcode' not in result:
  192. raise ValueError("Cannot query task status of job {}".format(job_id))
  193. ret_code = result["retcode"]
  194. ret_msg = result["retmsg"]
  195. if ret_code != 0:
  196. data = None
  197. else:
  198. data = result["data"]
  199. return ret_code, ret_msg, data
  200. except ValueError:
  201. raise ValueError("Query task result is {}, cannot parse useful info".format(result))
  202. def get_output_data(self, job_id, cpn_name, role, party_id, limits=None, to_pandas=True):
  203. """
  204. Parameters
  205. ----------
  206. job_id: str
  207. cpn_name: str
  208. role: str
  209. party_id: int
  210. limits: int, None, default None. Maximum number of lines returned, including header. If None, return all lines.
  211. to_pandas: bool, default True. Change data output to pandas or not.
  212. Returns
  213. -------
  214. single output example: pandas.DataFrame
  215. multiple output example:
  216. {
  217. train_data: tran_data_df,
  218. validate_data: validate_data_df,
  219. test_data: test_data_df
  220. }
  221. """
  222. party_id = str(party_id)
  223. with tempfile.TemporaryDirectory() as job_dir:
  224. result = self.client.component.output_data(job_id=job_id, role=role, output_path=job_dir,
  225. party_id=party_id, component_name=cpn_name)
  226. output_dir = result["directory"]
  227. n = 0
  228. data_files = []
  229. for file in os.listdir(output_dir):
  230. if file.endswith("csv"):
  231. n += 1
  232. data_files.append(file[:-4])
  233. if n > 0:
  234. data_dict = {}
  235. for data_name in data_files:
  236. curr_data_dict = JobInvoker.create_data_meta_dict(data_name, output_dir, limits)
  237. if curr_data_dict is not None:
  238. if to_pandas:
  239. data_dict[data_name] = self.to_pandas(curr_data_dict)
  240. else:
  241. data_dict[data_name] = curr_data_dict
  242. # no output data obtained
  243. else:
  244. raise ValueError(f"No output data found in directory{output_dir}")
  245. if len(data_dict) == 1:
  246. return list(data_dict.values())[0]
  247. return data_dict
  248. @staticmethod
  249. def create_data_meta_dict(data_name, output_dir, limits):
  250. data_file = f"{data_name}.csv"
  251. meta_file = f"{data_name}.meta"
  252. output_data = os.path.join(output_dir, data_file)
  253. output_meta = os.path.join(output_dir, meta_file)
  254. if not Path(output_data).resolve().exists():
  255. return
  256. data = JobInvoker.extract_output_data(output_data, limits)
  257. meta = JobInvoker.extract_output_meta(output_meta)
  258. data_dict = {"data": data, "meta": meta}
  259. return data_dict
  260. @staticmethod
  261. def to_pandas(data_dict):
  262. import pandas as pd
  263. data = data_dict["data"]
  264. meta = data_dict["meta"]
  265. if JobInvoker.is_normal_predict_task(meta):
  266. """ignore the first line
  267. """
  268. rows = []
  269. for i in range(1, len(data)):
  270. cols = data[i].split(",", -1)
  271. predict_detail = json.loads(",".join(cols[len(meta) - 2: -1])[1:-1].replace("\'", "\""))
  272. value = cols[: len(meta) - 2] + [predict_detail] + cols[-1:]
  273. rows.append(value)
  274. return pd.DataFrame(rows, columns=meta)
  275. else:
  276. rows = []
  277. for i in range(1, len(data)):
  278. cols = data[i].split(",", -1)
  279. rows.append(cols)
  280. return pd.DataFrame(rows, columns=meta)
  281. @staticmethod
  282. def is_normal_predict_task(col_names):
  283. if len(col_names) <= 5:
  284. return False
  285. template_col_names = ["label", "predict_result", "predict_score", "predict_detail", "type"]
  286. for i in range(5):
  287. if template_col_names[i] != col_names[-5 + i]:
  288. return False
  289. return True
  290. @staticmethod
  291. def extract_output_data(output_data, limits):
  292. data = []
  293. with open(output_data, "r") as fin:
  294. for i, line in enumerate(fin):
  295. if i == limits:
  296. break
  297. data.append(line.strip())
  298. return data
  299. @staticmethod
  300. def extract_output_meta(output_meta):
  301. with open(output_meta, "r") as fin:
  302. try:
  303. meta_dict = json.load(fin)
  304. meta = meta_dict["header"]
  305. except ValueError as e:
  306. raise ValueError(f"Cannot get output data meta. err msg: {e}")
  307. return meta
  308. def get_model_param(self, job_id, cpn_name, role, party_id):
  309. result = None
  310. party_id = str(party_id)
  311. try:
  312. result = self.client.component.output_model(job_id=job_id, role=role,
  313. party_id=party_id, component_name=cpn_name)
  314. if "data" not in result:
  315. raise ValueError(f"{result['retmsg']} job {job_id}, component {cpn_name} has no output model param")
  316. return result["data"]
  317. except BaseException:
  318. raise ValueError(f"Cannot get output model, err msg: {result}")
  319. def get_metric(self, job_id, cpn_name, role, party_id):
  320. result = None
  321. party_id = str(party_id)
  322. try:
  323. result = self.client.component.metric_all(job_id=job_id, role=role,
  324. party_id=party_id, component_name=cpn_name)
  325. if "data" not in result:
  326. raise ValueError(f"job {job_id}, component {cpn_name} has no output metric")
  327. return result["data"]
  328. except BaseException:
  329. raise ValueError(f"Cannot get output model, err msg: {result}")
  330. # raise
  331. def get_summary(self, job_id, cpn_name, role, party_id):
  332. result = None
  333. party_id = str(party_id)
  334. try:
  335. result = self.client.component.get_summary(job_id=job_id, role=role,
  336. party_id=party_id, component_name=cpn_name)
  337. if "data" not in result:
  338. raise ValueError(f"Job {job_id}, component {cpn_name} has no output metric")
  339. return result["data"]
  340. except BaseException:
  341. raise ValueError(f"Cannot get output model, err msg: {result}")
  342. def model_deploy(self, model_id, model_version, cpn_list=None, predict_dsl=None, components_checkpoint=None):
  343. if cpn_list:
  344. result = self.client.model.deploy(model_id=model_id, model_version=model_version, cpn_list=cpn_list)
  345. elif predict_dsl:
  346. result = self.client.model.deploy(model_id=model_id, model_version=model_version,
  347. predict_dsl=predict_dsl, components_checkpoint=components_checkpoint)
  348. else:
  349. result = self.client.model.deploy(model_id=model_id, model_version=model_version,
  350. components_checkpoint=components_checkpoint)
  351. if result is None or 'retcode' not in result:
  352. raise ValueError("Call flow deploy is failed, check if fate_flow server is up!")
  353. elif result["retcode"] != 0:
  354. raise ValueError(f"Cannot deploy components, error msg is {result['data']}")
  355. else:
  356. return result["data"]
  357. def get_predict_dsl(self, model_id, model_version):
  358. result = self.client.model.get_predict_dsl(model_id=model_id, model_version=model_version)
  359. if result is None or 'retcode' not in result:
  360. raise ValueError("Call flow get predict dsl is failed, check if fate_flow server is up!")
  361. elif result["retcode"] != 0:
  362. raise ValueError("Cannot get predict dsl, error msg is {}".format(result["retmsg"]))
  363. else:
  364. return result["data"]
  365. def load_model(self, load_conf):
  366. result = self.client.model.load(load_conf)
  367. if result is None or 'retcode' not in result:
  368. raise ValueError("Call flow load failed, check if fate_flow server is up!")
  369. elif result["retcode"] != 0:
  370. raise ValueError("Cannot load model, error msg is {}".format(result["retmsg"]))
  371. else:
  372. return result["data"]
  373. def bind_model(self, bind_conf):
  374. result = self.client.model.bind(bind_conf)
  375. if result is None or 'retcode' not in result:
  376. raise ValueError("Call flow bind failed, check if fate_flow server is up!")
  377. elif result["retcode"] != 0:
  378. raise ValueError("Cannot bind model, error msg is {}".format(result["retmsg"]))
  379. else:
  380. return result["retmsg"]
  381. def convert_homo_model(self, convert_conf):
  382. result = self.client.model.homo_convert(convert_conf)
  383. if result is None or 'retcode' not in result:
  384. raise ValueError("Call flow homo convert failed, check if fate_flow server is up!")
  385. elif result["retcode"] != 0:
  386. raise ValueError("Cannot convert homo model, error msg is {}".format(result["retmsg"]))
  387. else:
  388. return result["data"]
  389. def bind_table(self, **kwargs):
  390. result = self.client.table.bind(**kwargs)
  391. if result is None or 'retcode' not in result:
  392. raise ValueError("Call flow table bind is failed, check if fate_flow server is up!")
  393. elif result["retcode"] != 0:
  394. raise ValueError(f"Cannot bind table, error msg is {result['retmsg']}")
  395. else:
  396. return result["data"]
  397. @staticmethod
  398. def run_job_with_retry(api_func, params):
  399. for i in range(conf.MAX_RETRY + 1):
  400. try:
  401. result = api_func(**params)
  402. if result is None or "retmsg" not in result:
  403. return result
  404. if i == conf.MAX_RETRY:
  405. return result
  406. ret_msg = result["retmsg"]
  407. if "connection refused" in ret_msg.lower() \
  408. or "max retries" in ret_msg.lower():
  409. pass
  410. else:
  411. return result
  412. except AttributeError:
  413. pass
  414. time.sleep(conf.TIME_QUERY_FREQS * (i + 1))