job_tracker.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import operator
  17. import typing
  18. from fate_arch import session, storage
  19. from fate_arch.abc import CTableABC
  20. from fate_arch.common import EngineType
  21. from fate_arch.common.base_utils import current_timestamp, deserialize_b64, serialize_b64
  22. from fate_arch.common.data_utils import default_output_info
  23. from fate_arch.storage import StorageEngine
  24. from fate_flow.db.db_models import DB, ComponentSummary, TrackingOutputDataInfo
  25. from fate_flow.db.db_utils import bulk_insert_into_db
  26. from fate_flow.db.job_default_config import JobDefaultConfig
  27. from fate_flow.entity import DataCache, Metric, MetricMeta, RunParameters
  28. from fate_flow.manager.cache_manager import CacheManager
  29. from fate_flow.manager.metric_manager import MetricManager
  30. from fate_flow.pipelined_model.pipelined_model import PipelinedModel
  31. from fate_flow.utils import job_utils, model_utils
  32. from fate_flow.utils.log_utils import schedule_logger
  33. class Tracker(object):
  34. """
  35. Tracker for Job/Task/Metric
  36. """
  37. METRIC_DATA_PARTITION = 48
  38. METRIC_LIST_PARTITION = 48
  39. JOB_VIEW_PARTITION = 8
  40. def __init__(self, job_id: str, role: str, party_id: int,
  41. model_id: str = None,
  42. model_version: str = None,
  43. component_name: str = None,
  44. component_module_name: str = None,
  45. task_id: str = None,
  46. task_version: int = None,
  47. job_parameters: RunParameters = None
  48. ):
  49. self.job_id = job_id
  50. self.job_parameters = job_parameters
  51. self.role = role
  52. self.party_id = party_id
  53. self.component_name = component_name if component_name else job_utils.PIPELINE_COMPONENT_NAME
  54. self.module_name = component_module_name if component_module_name else job_utils.PIPELINE_COMPONENT_MODULE_NAME
  55. self.task_id = task_id
  56. self.task_version = task_version
  57. self.model_id = model_id
  58. self.party_model_id = model_utils.gen_party_model_id(model_id=model_id, role=role, party_id=party_id)
  59. self.model_version = model_version
  60. self.pipelined_model = None
  61. if self.party_model_id and self.model_version:
  62. self.pipelined_model = PipelinedModel(self.party_model_id, self.model_version)
  63. self.metric_manager = MetricManager(job_id=self.job_id, role=self.role, party_id=self.party_id, component_name=self.component_name, task_id=self.task_id, task_version=self.task_version)
  64. def save_metric_data(self, metric_namespace: str, metric_name: str, metrics: typing.List[Metric], job_level=False):
  65. schedule_logger(self.job_id).info(
  66. 'save component {} on {} {} {} {} metric data'.format(self.component_name, self.role,
  67. self.party_id, metric_namespace, metric_name))
  68. kv = []
  69. for metric in metrics:
  70. kv.append((metric.key, metric.value))
  71. self.metric_manager.insert_metrics_into_db(metric_namespace, metric_name, 1, kv, job_level)
  72. def get_job_metric_data(self, metric_namespace: str, metric_name: str):
  73. return self.read_metric_data(metric_namespace=metric_namespace, metric_name=metric_name, job_level=True)
  74. def get_metric_data(self, metric_namespace: str, metric_name: str):
  75. return self.read_metric_data(metric_namespace=metric_namespace, metric_name=metric_name, job_level=False)
  76. @DB.connection_context()
  77. def read_metric_data(self, metric_namespace: str, metric_name: str, job_level=False):
  78. metrics = []
  79. for k, v in self.metric_manager.read_metrics_from_db(metric_namespace, metric_name, 1, job_level):
  80. metrics.append(Metric(key=k, value=v))
  81. return metrics
  82. def save_metric_meta(self, metric_namespace: str, metric_name: str, metric_meta: MetricMeta,
  83. job_level: bool = False):
  84. schedule_logger(self.job_id).info(
  85. 'save component {} on {} {} {} {} metric meta'.format(self.component_name, self.role,
  86. self.party_id, metric_namespace, metric_name))
  87. self.metric_manager.insert_metrics_into_db(metric_namespace, metric_name, 0, metric_meta.to_dict().items(), job_level)
  88. @DB.connection_context()
  89. def get_metric_meta(self, metric_namespace: str, metric_name: str, job_level: bool = False):
  90. kv = dict()
  91. for k, v in self.metric_manager.read_metrics_from_db(metric_namespace, metric_name, 0, job_level):
  92. kv[k] = v
  93. return MetricMeta(name=kv.get('name'), metric_type=kv.get('metric_type'), extra_metas=kv)
  94. def log_job_view(self, view_data: dict):
  95. self.metric_manager.insert_metrics_into_db('job', 'job_view', 2, view_data.items(), job_level=True)
  96. @DB.connection_context()
  97. def get_job_view(self):
  98. view_data = {}
  99. for k, v in self.metric_manager.read_metrics_from_db('job', 'job_view', 2, job_level=True):
  100. view_data[k] = v
  101. return view_data
  102. def save_output_data(self, computing_table, output_storage_engine, output_storage_address=None,
  103. output_table_namespace=None, output_table_name=None, schema=None, token=None, need_read=True):
  104. if computing_table:
  105. if not output_table_namespace or not output_table_name:
  106. output_table_namespace, output_table_name = default_output_info(task_id=self.task_id, task_version=self.task_version, output_type="data")
  107. schedule_logger(self.job_id).info(
  108. 'persisting the component output temporary table to {} {}'.format(output_table_namespace,
  109. output_table_name))
  110. part_of_limit = JobDefaultConfig.output_data_summary_count_limit
  111. part_of_data = []
  112. if need_read:
  113. for k, v in computing_table.collect():
  114. part_of_data.append((k, v))
  115. part_of_limit -= 1
  116. if part_of_limit == 0:
  117. break
  118. session.Session.persistent(computing_table=computing_table,
  119. namespace=output_table_namespace,
  120. name=output_table_name,
  121. schema=schema,
  122. part_of_data=part_of_data,
  123. engine=output_storage_engine,
  124. engine_address=output_storage_address,
  125. token=token)
  126. return output_table_namespace, output_table_name
  127. else:
  128. schedule_logger(self.job_id).info('task id {} output data table is none'.format(self.task_id))
  129. return None, None
  130. def save_table_meta(self, meta):
  131. schedule_logger(self.job_id).info(f'start save table meta:{meta}')
  132. address = storage.StorageTableMeta.create_address(storage_engine=meta.get("engine"),
  133. address_dict=meta.get("address"))
  134. table_meta = storage.StorageTableMeta(name=meta.get("name"), namespace=meta.get("namespace"), new=True)
  135. table_meta.set_metas(**meta)
  136. meta["address"] = address
  137. meta["part_of_data"] = deserialize_b64(meta["part_of_data"])
  138. meta["schema"] = deserialize_b64(meta["schema"])
  139. table_meta.create()
  140. schedule_logger(self.job_id).info(f'save table meta success')
  141. def get_table_meta(self, table_info):
  142. schedule_logger(self.job_id).info(f'start get table meta:{table_info}')
  143. table_meta_dict = storage.StorageTableMeta(namespace=table_info.get("namespace"), name=table_info.get("table_name"), create_address=False).to_dict()
  144. schedule_logger(self.job_id).info(f'get table meta success: {table_meta_dict}')
  145. table_meta_dict["part_of_data"] = serialize_b64(table_meta_dict["part_of_data"], to_str=True)
  146. table_meta_dict["schema"] = serialize_b64(table_meta_dict["schema"], to_str=True)
  147. return table_meta_dict
  148. def get_output_data_table(self, output_data_infos, tracker_client=None):
  149. """
  150. Get component output data table, will run in the task executor process
  151. :param output_data_infos:
  152. :return:
  153. """
  154. output_tables_meta = {}
  155. if output_data_infos:
  156. for output_data_info in output_data_infos:
  157. schedule_logger(self.job_id).info("get task {} {} output table {} {}".format(output_data_info.f_task_id, output_data_info.f_task_version, output_data_info.f_table_namespace, output_data_info.f_table_name))
  158. if not tracker_client:
  159. data_table_meta = storage.StorageTableMeta(name=output_data_info.f_table_name, namespace=output_data_info.f_table_namespace)
  160. else:
  161. data_table_meta = tracker_client.get_table_meta(output_data_info.f_table_name, output_data_info.f_table_namespace)
  162. output_tables_meta[output_data_info.f_data_name] = data_table_meta
  163. return output_tables_meta
  164. def save_output_cache(self, cache_data: typing.Dict[str, CTableABC], cache_meta: dict, cache_name, output_storage_engine, output_storage_address: dict, token=None):
  165. output_namespace, output_name = default_output_info(task_id=self.task_id, task_version=self.task_version, output_type="cache")
  166. cache = CacheManager.persistent(cache_name, cache_data, cache_meta, output_namespace, output_name, output_storage_engine, output_storage_address, token=token)
  167. cache_key = self.tracking_output_cache(cache=cache, cache_name=cache_name)
  168. return cache_key
  169. def tracking_output_cache(self, cache: DataCache, cache_name: str) -> str:
  170. cache_key = CacheManager.record(cache=cache,
  171. job_id=self.job_id,
  172. role=self.role,
  173. party_id=self.party_id,
  174. component_name=self.component_name,
  175. task_id=self.task_id,
  176. task_version=self.task_version,
  177. cache_name=cache_name)
  178. schedule_logger(self.job_id).info(f"tracking {self.task_id} {self.task_version} output cache, cache key is {cache_key}")
  179. return cache_key
  180. def get_output_cache(self, cache_key=None, cache_name=None):
  181. caches = self.query_output_cache(cache_key=cache_key, cache_name=cache_name)
  182. if caches:
  183. return CacheManager.load(cache=caches[0])
  184. else:
  185. return None, None
  186. def query_output_cache(self, cache_key=None, cache_name=None) -> typing.List[DataCache]:
  187. caches = CacheManager.query(job_id=self.job_id, role=self.role, party_id=self.party_id, component_name=self.component_name, cache_name=cache_name, cache_key=cache_key)
  188. group = {}
  189. # only the latest version of the task output is retrieved
  190. for cache in caches:
  191. group_key = f"{cache.task_id}-{cache.name}"
  192. if group_key not in group:
  193. group[group_key] = cache
  194. elif cache.task_version > group[group_key].task_version:
  195. group[group_key] = cache
  196. return list(group.values())
  197. def query_output_cache_record(self):
  198. return CacheManager.query_record(job_id=self.job_id, role=self.role, party_id=self.party_id, component_name=self.component_name,
  199. task_version=self.task_version)
  200. @DB.connection_context()
  201. def insert_summary_into_db(self, summary_data: dict, need_serialize=True):
  202. try:
  203. summary_model = self.get_dynamic_db_model(ComponentSummary, self.job_id)
  204. DB.create_tables([summary_model])
  205. summary_obj = summary_model.get_or_none(
  206. summary_model.f_job_id == self.job_id,
  207. summary_model.f_component_name == self.component_name,
  208. summary_model.f_role == self.role,
  209. summary_model.f_party_id == self.party_id,
  210. summary_model.f_task_id == self.task_id,
  211. summary_model.f_task_version == self.task_version
  212. )
  213. if summary_obj:
  214. summary_obj.f_summary = serialize_b64(summary_data, to_str=True) if need_serialize else summary_data
  215. summary_obj.f_update_time = current_timestamp()
  216. summary_obj.save()
  217. else:
  218. self.get_dynamic_db_model(ComponentSummary, self.job_id).create(
  219. f_job_id=self.job_id,
  220. f_component_name=self.component_name,
  221. f_role=self.role,
  222. f_party_id=self.party_id,
  223. f_task_id=self.task_id,
  224. f_task_version=self.task_version,
  225. f_summary=serialize_b64(summary_data, to_str=True),
  226. f_create_time=current_timestamp()
  227. )
  228. except Exception as e:
  229. schedule_logger(self.job_id).exception("An exception where querying summary job id: {} "
  230. "component name: {} to database:\n{}".format(
  231. self.job_id, self.component_name, e)
  232. )
  233. @DB.connection_context()
  234. def read_summary_from_db(self, need_deserialize=True):
  235. cpn_summary = ""
  236. try:
  237. summary_model = self.get_dynamic_db_model(ComponentSummary, self.job_id)
  238. summary = summary_model.get_or_none(
  239. summary_model.f_job_id == self.job_id,
  240. summary_model.f_component_name == self.component_name,
  241. summary_model.f_role == self.role,
  242. summary_model.f_party_id == self.party_id
  243. )
  244. if summary:
  245. cpn_summary = deserialize_b64(summary.f_summary) if need_deserialize else summary.f_summary
  246. except Exception as e:
  247. schedule_logger(self.job_id).exception(e)
  248. return cpn_summary
  249. @DB.connection_context()
  250. def reload_summary(self, source_tracker):
  251. cpn_summary = source_tracker.read_summary_from_db(need_deserialize=False)
  252. if cpn_summary:
  253. self.insert_summary_into_db(cpn_summary, need_serialize=False)
  254. def log_output_data_info(self, data_name: str, table_namespace: str, table_name: str):
  255. self.insert_output_data_info_into_db(data_name=data_name, table_namespace=table_namespace, table_name=table_name)
  256. @DB.connection_context()
  257. def insert_output_data_info_into_db(self, data_name: str, table_namespace: str, table_name: str):
  258. try:
  259. tracking_output_data_info = self.get_dynamic_db_model(TrackingOutputDataInfo, self.job_id)()
  260. tracking_output_data_info.f_job_id = self.job_id
  261. tracking_output_data_info.f_component_name = self.component_name
  262. tracking_output_data_info.f_task_id = self.task_id
  263. tracking_output_data_info.f_task_version = self.task_version
  264. tracking_output_data_info.f_data_name = data_name
  265. tracking_output_data_info.f_role = self.role
  266. tracking_output_data_info.f_party_id = self.party_id
  267. tracking_output_data_info.f_table_namespace = table_namespace
  268. tracking_output_data_info.f_table_name = table_name
  269. tracking_output_data_info.f_create_time = current_timestamp()
  270. bulk_insert_into_db(
  271. self.get_dynamic_db_model(TrackingOutputDataInfo, self.job_id),
  272. (tracking_output_data_info.to_dict(), ),
  273. )
  274. except Exception as e:
  275. schedule_logger(self.job_id).exception("An exception where inserted output data info {} {} {} to database:\n{}".format(
  276. data_name,
  277. table_namespace,
  278. table_name,
  279. e
  280. ))
  281. def save_as_table(self, computing_table, name, namespace):
  282. if self.job_parameters.storage_engine == StorageEngine.LINKIS_HIVE:
  283. return
  284. self.save_output_data(computing_table=computing_table,
  285. output_storage_engine=self.job_parameters.storage_engine,
  286. output_storage_address=self.job_parameters.engines_address.get(EngineType.STORAGE, {}),
  287. output_table_namespace=namespace, output_table_name=name)
  288. @DB.connection_context()
  289. def clean_metrics(self):
  290. return self.metric_manager.clean_metrics()
  291. @DB.connection_context()
  292. def get_metric_list(self, job_level: bool = False):
  293. return self.metric_manager.get_metric_list(job_level=job_level)
  294. @DB.connection_context()
  295. def reload_metric(self, source_tracker):
  296. return self.metric_manager.reload_metric(source_tracker.metric_manager)
  297. def get_output_data_info(self, data_name=None):
  298. return self.read_output_data_info_from_db(data_name=data_name)
  299. def read_output_data_info_from_db(self, data_name=None):
  300. filter_dict = {}
  301. filter_dict["job_id"] = self.job_id
  302. filter_dict["component_name"] = self.component_name
  303. filter_dict["role"] = self.role
  304. filter_dict["party_id"] = self.party_id
  305. if data_name:
  306. filter_dict["data_name"] = data_name
  307. return self.query_output_data_infos(**filter_dict)
  308. @classmethod
  309. @DB.connection_context()
  310. def query_output_data_infos(cls, **kwargs) -> typing.List[TrackingOutputDataInfo]:
  311. try:
  312. tracking_output_data_info_model = cls.get_dynamic_db_model(TrackingOutputDataInfo, kwargs.get("job_id"))
  313. filters = []
  314. for f_n, f_v in kwargs.items():
  315. attr_name = 'f_%s' % f_n
  316. if hasattr(tracking_output_data_info_model, attr_name):
  317. filters.append(operator.attrgetter('f_%s' % f_n)(tracking_output_data_info_model) == f_v)
  318. if filters:
  319. output_data_infos_tmp = tracking_output_data_info_model.select().where(*filters)
  320. else:
  321. output_data_infos_tmp = tracking_output_data_info_model.select()
  322. output_data_infos_group = {}
  323. # only the latest version of the task output data is retrieved
  324. for output_data_info in output_data_infos_tmp:
  325. group_key = cls.get_output_data_group_key(output_data_info.f_task_id, output_data_info.f_data_name)
  326. if group_key not in output_data_infos_group:
  327. output_data_infos_group[group_key] = output_data_info
  328. elif output_data_info.f_task_version > output_data_infos_group[group_key].f_task_version:
  329. output_data_infos_group[group_key] = output_data_info
  330. return list(output_data_infos_group.values())
  331. except Exception as e:
  332. return []
  333. @classmethod
  334. def get_output_data_group_key(cls, task_id, data_name):
  335. return task_id + data_name
  336. def clean_task(self):
  337. schedule_logger(self.job_id).info(
  338. 'clean task {} {} on {} {}'.format(self.task_id, self.task_version, self.role, self.party_id))
  339. session_id = job_utils.generate_session_id(self.task_id, self.task_version, self.role, self.party_id)
  340. sess = session.Session(session_id=session_id, options={"logger": schedule_logger(self.job_id)})
  341. sess.destroy_all_sessions()
  342. return True
  343. @classmethod
  344. def get_dynamic_db_model(cls, base, job_id):
  345. return type(base.model(table_index=cls.get_dynamic_tracking_table_index(job_id=job_id)))
  346. @classmethod
  347. def get_dynamic_tracking_table_index(cls, job_id):
  348. return job_id[:8]