123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import operator
- import typing
- from fate_arch import session, storage
- from fate_arch.abc import CTableABC
- from fate_arch.common import EngineType
- from fate_arch.common.base_utils import current_timestamp, deserialize_b64, serialize_b64
- from fate_arch.common.data_utils import default_output_info
- from fate_arch.storage import StorageEngine
- from fate_flow.db.db_models import DB, ComponentSummary, TrackingOutputDataInfo
- from fate_flow.db.db_utils import bulk_insert_into_db
- from fate_flow.db.job_default_config import JobDefaultConfig
- from fate_flow.entity import DataCache, Metric, MetricMeta, RunParameters
- from fate_flow.manager.cache_manager import CacheManager
- from fate_flow.manager.metric_manager import MetricManager
- from fate_flow.pipelined_model.pipelined_model import PipelinedModel
- from fate_flow.utils import job_utils, model_utils
- from fate_flow.utils.log_utils import schedule_logger
- class Tracker(object):
- """
- Tracker for Job/Task/Metric
- """
- METRIC_DATA_PARTITION = 48
- METRIC_LIST_PARTITION = 48
- JOB_VIEW_PARTITION = 8
- def __init__(self, job_id: str, role: str, party_id: int,
- model_id: str = None,
- model_version: str = None,
- component_name: str = None,
- component_module_name: str = None,
- task_id: str = None,
- task_version: int = None,
- job_parameters: RunParameters = None
- ):
- self.job_id = job_id
- self.job_parameters = job_parameters
- self.role = role
- self.party_id = party_id
- self.component_name = component_name if component_name else job_utils.PIPELINE_COMPONENT_NAME
- self.module_name = component_module_name if component_module_name else job_utils.PIPELINE_COMPONENT_MODULE_NAME
- self.task_id = task_id
- self.task_version = task_version
- self.model_id = model_id
- self.party_model_id = model_utils.gen_party_model_id(model_id=model_id, role=role, party_id=party_id)
- self.model_version = model_version
- self.pipelined_model = None
- if self.party_model_id and self.model_version:
- self.pipelined_model = PipelinedModel(self.party_model_id, self.model_version)
- self.metric_manager = MetricManager(job_id=self.job_id, role=self.role, party_id=self.party_id, component_name=self.component_name, task_id=self.task_id, task_version=self.task_version)
- def save_metric_data(self, metric_namespace: str, metric_name: str, metrics: typing.List[Metric], job_level=False):
- schedule_logger(self.job_id).info(
- 'save component {} on {} {} {} {} metric data'.format(self.component_name, self.role,
- self.party_id, metric_namespace, metric_name))
- kv = []
- for metric in metrics:
- kv.append((metric.key, metric.value))
- self.metric_manager.insert_metrics_into_db(metric_namespace, metric_name, 1, kv, job_level)
- def get_job_metric_data(self, metric_namespace: str, metric_name: str):
- return self.read_metric_data(metric_namespace=metric_namespace, metric_name=metric_name, job_level=True)
- def get_metric_data(self, metric_namespace: str, metric_name: str):
- return self.read_metric_data(metric_namespace=metric_namespace, metric_name=metric_name, job_level=False)
- @DB.connection_context()
- def read_metric_data(self, metric_namespace: str, metric_name: str, job_level=False):
- metrics = []
- for k, v in self.metric_manager.read_metrics_from_db(metric_namespace, metric_name, 1, job_level):
- metrics.append(Metric(key=k, value=v))
- return metrics
- def save_metric_meta(self, metric_namespace: str, metric_name: str, metric_meta: MetricMeta,
- job_level: bool = False):
- schedule_logger(self.job_id).info(
- 'save component {} on {} {} {} {} metric meta'.format(self.component_name, self.role,
- self.party_id, metric_namespace, metric_name))
- self.metric_manager.insert_metrics_into_db(metric_namespace, metric_name, 0, metric_meta.to_dict().items(), job_level)
- @DB.connection_context()
- def get_metric_meta(self, metric_namespace: str, metric_name: str, job_level: bool = False):
- kv = dict()
- for k, v in self.metric_manager.read_metrics_from_db(metric_namespace, metric_name, 0, job_level):
- kv[k] = v
- return MetricMeta(name=kv.get('name'), metric_type=kv.get('metric_type'), extra_metas=kv)
- def log_job_view(self, view_data: dict):
- self.metric_manager.insert_metrics_into_db('job', 'job_view', 2, view_data.items(), job_level=True)
- @DB.connection_context()
- def get_job_view(self):
- view_data = {}
- for k, v in self.metric_manager.read_metrics_from_db('job', 'job_view', 2, job_level=True):
- view_data[k] = v
- return view_data
- def save_output_data(self, computing_table, output_storage_engine, output_storage_address=None,
- output_table_namespace=None, output_table_name=None, schema=None, token=None, need_read=True):
- if computing_table:
- if not output_table_namespace or not output_table_name:
- output_table_namespace, output_table_name = default_output_info(task_id=self.task_id, task_version=self.task_version, output_type="data")
- schedule_logger(self.job_id).info(
- 'persisting the component output temporary table to {} {}'.format(output_table_namespace,
- output_table_name))
- part_of_limit = JobDefaultConfig.output_data_summary_count_limit
- part_of_data = []
- if need_read:
- for k, v in computing_table.collect():
- part_of_data.append((k, v))
- part_of_limit -= 1
- if part_of_limit == 0:
- break
- session.Session.persistent(computing_table=computing_table,
- namespace=output_table_namespace,
- name=output_table_name,
- schema=schema,
- part_of_data=part_of_data,
- engine=output_storage_engine,
- engine_address=output_storage_address,
- token=token)
- return output_table_namespace, output_table_name
- else:
- schedule_logger(self.job_id).info('task id {} output data table is none'.format(self.task_id))
- return None, None
- def save_table_meta(self, meta):
- schedule_logger(self.job_id).info(f'start save table meta:{meta}')
- address = storage.StorageTableMeta.create_address(storage_engine=meta.get("engine"),
- address_dict=meta.get("address"))
- table_meta = storage.StorageTableMeta(name=meta.get("name"), namespace=meta.get("namespace"), new=True)
- table_meta.set_metas(**meta)
- meta["address"] = address
- meta["part_of_data"] = deserialize_b64(meta["part_of_data"])
- meta["schema"] = deserialize_b64(meta["schema"])
- table_meta.create()
- schedule_logger(self.job_id).info(f'save table meta success')
- def get_table_meta(self, table_info):
- schedule_logger(self.job_id).info(f'start get table meta:{table_info}')
- table_meta_dict = storage.StorageTableMeta(namespace=table_info.get("namespace"), name=table_info.get("table_name"), create_address=False).to_dict()
- schedule_logger(self.job_id).info(f'get table meta success: {table_meta_dict}')
- table_meta_dict["part_of_data"] = serialize_b64(table_meta_dict["part_of_data"], to_str=True)
- table_meta_dict["schema"] = serialize_b64(table_meta_dict["schema"], to_str=True)
- return table_meta_dict
- def get_output_data_table(self, output_data_infos, tracker_client=None):
- """
- Get component output data table, will run in the task executor process
- :param output_data_infos:
- :return:
- """
- output_tables_meta = {}
- if output_data_infos:
- for output_data_info in output_data_infos:
- schedule_logger(self.job_id).info("get task {} {} output table {} {}".format(output_data_info.f_task_id, output_data_info.f_task_version, output_data_info.f_table_namespace, output_data_info.f_table_name))
- if not tracker_client:
- data_table_meta = storage.StorageTableMeta(name=output_data_info.f_table_name, namespace=output_data_info.f_table_namespace)
- else:
- data_table_meta = tracker_client.get_table_meta(output_data_info.f_table_name, output_data_info.f_table_namespace)
- output_tables_meta[output_data_info.f_data_name] = data_table_meta
- return output_tables_meta
- def save_output_cache(self, cache_data: typing.Dict[str, CTableABC], cache_meta: dict, cache_name, output_storage_engine, output_storage_address: dict, token=None):
- output_namespace, output_name = default_output_info(task_id=self.task_id, task_version=self.task_version, output_type="cache")
- cache = CacheManager.persistent(cache_name, cache_data, cache_meta, output_namespace, output_name, output_storage_engine, output_storage_address, token=token)
- cache_key = self.tracking_output_cache(cache=cache, cache_name=cache_name)
- return cache_key
- def tracking_output_cache(self, cache: DataCache, cache_name: str) -> str:
- cache_key = CacheManager.record(cache=cache,
- job_id=self.job_id,
- role=self.role,
- party_id=self.party_id,
- component_name=self.component_name,
- task_id=self.task_id,
- task_version=self.task_version,
- cache_name=cache_name)
- schedule_logger(self.job_id).info(f"tracking {self.task_id} {self.task_version} output cache, cache key is {cache_key}")
- return cache_key
- def get_output_cache(self, cache_key=None, cache_name=None):
- caches = self.query_output_cache(cache_key=cache_key, cache_name=cache_name)
- if caches:
- return CacheManager.load(cache=caches[0])
- else:
- return None, None
- def query_output_cache(self, cache_key=None, cache_name=None) -> typing.List[DataCache]:
- caches = CacheManager.query(job_id=self.job_id, role=self.role, party_id=self.party_id, component_name=self.component_name, cache_name=cache_name, cache_key=cache_key)
- group = {}
- # only the latest version of the task output is retrieved
- for cache in caches:
- group_key = f"{cache.task_id}-{cache.name}"
- if group_key not in group:
- group[group_key] = cache
- elif cache.task_version > group[group_key].task_version:
- group[group_key] = cache
- return list(group.values())
- def query_output_cache_record(self):
- return CacheManager.query_record(job_id=self.job_id, role=self.role, party_id=self.party_id, component_name=self.component_name,
- task_version=self.task_version)
- @DB.connection_context()
- def insert_summary_into_db(self, summary_data: dict, need_serialize=True):
- try:
- summary_model = self.get_dynamic_db_model(ComponentSummary, self.job_id)
- DB.create_tables([summary_model])
- summary_obj = summary_model.get_or_none(
- summary_model.f_job_id == self.job_id,
- summary_model.f_component_name == self.component_name,
- summary_model.f_role == self.role,
- summary_model.f_party_id == self.party_id,
- summary_model.f_task_id == self.task_id,
- summary_model.f_task_version == self.task_version
- )
- if summary_obj:
- summary_obj.f_summary = serialize_b64(summary_data, to_str=True) if need_serialize else summary_data
- summary_obj.f_update_time = current_timestamp()
- summary_obj.save()
- else:
- self.get_dynamic_db_model(ComponentSummary, self.job_id).create(
- f_job_id=self.job_id,
- f_component_name=self.component_name,
- f_role=self.role,
- f_party_id=self.party_id,
- f_task_id=self.task_id,
- f_task_version=self.task_version,
- f_summary=serialize_b64(summary_data, to_str=True),
- f_create_time=current_timestamp()
- )
- except Exception as e:
- schedule_logger(self.job_id).exception("An exception where querying summary job id: {} "
- "component name: {} to database:\n{}".format(
- self.job_id, self.component_name, e)
- )
- @DB.connection_context()
- def read_summary_from_db(self, need_deserialize=True):
- cpn_summary = ""
- try:
- summary_model = self.get_dynamic_db_model(ComponentSummary, self.job_id)
- summary = summary_model.get_or_none(
- summary_model.f_job_id == self.job_id,
- summary_model.f_component_name == self.component_name,
- summary_model.f_role == self.role,
- summary_model.f_party_id == self.party_id
- )
- if summary:
- cpn_summary = deserialize_b64(summary.f_summary) if need_deserialize else summary.f_summary
- except Exception as e:
- schedule_logger(self.job_id).exception(e)
- return cpn_summary
- @DB.connection_context()
- def reload_summary(self, source_tracker):
- cpn_summary = source_tracker.read_summary_from_db(need_deserialize=False)
- if cpn_summary:
- self.insert_summary_into_db(cpn_summary, need_serialize=False)
- def log_output_data_info(self, data_name: str, table_namespace: str, table_name: str):
- self.insert_output_data_info_into_db(data_name=data_name, table_namespace=table_namespace, table_name=table_name)
- @DB.connection_context()
- def insert_output_data_info_into_db(self, data_name: str, table_namespace: str, table_name: str):
- try:
- tracking_output_data_info = self.get_dynamic_db_model(TrackingOutputDataInfo, self.job_id)()
- tracking_output_data_info.f_job_id = self.job_id
- tracking_output_data_info.f_component_name = self.component_name
- tracking_output_data_info.f_task_id = self.task_id
- tracking_output_data_info.f_task_version = self.task_version
- tracking_output_data_info.f_data_name = data_name
- tracking_output_data_info.f_role = self.role
- tracking_output_data_info.f_party_id = self.party_id
- tracking_output_data_info.f_table_namespace = table_namespace
- tracking_output_data_info.f_table_name = table_name
- tracking_output_data_info.f_create_time = current_timestamp()
- bulk_insert_into_db(
- self.get_dynamic_db_model(TrackingOutputDataInfo, self.job_id),
- (tracking_output_data_info.to_dict(), ),
- )
- except Exception as e:
- schedule_logger(self.job_id).exception("An exception where inserted output data info {} {} {} to database:\n{}".format(
- data_name,
- table_namespace,
- table_name,
- e
- ))
- def save_as_table(self, computing_table, name, namespace):
- if self.job_parameters.storage_engine == StorageEngine.LINKIS_HIVE:
- return
- self.save_output_data(computing_table=computing_table,
- output_storage_engine=self.job_parameters.storage_engine,
- output_storage_address=self.job_parameters.engines_address.get(EngineType.STORAGE, {}),
- output_table_namespace=namespace, output_table_name=name)
- @DB.connection_context()
- def clean_metrics(self):
- return self.metric_manager.clean_metrics()
- @DB.connection_context()
- def get_metric_list(self, job_level: bool = False):
- return self.metric_manager.get_metric_list(job_level=job_level)
- @DB.connection_context()
- def reload_metric(self, source_tracker):
- return self.metric_manager.reload_metric(source_tracker.metric_manager)
- def get_output_data_info(self, data_name=None):
- return self.read_output_data_info_from_db(data_name=data_name)
- def read_output_data_info_from_db(self, data_name=None):
- filter_dict = {}
- filter_dict["job_id"] = self.job_id
- filter_dict["component_name"] = self.component_name
- filter_dict["role"] = self.role
- filter_dict["party_id"] = self.party_id
- if data_name:
- filter_dict["data_name"] = data_name
- return self.query_output_data_infos(**filter_dict)
- @classmethod
- @DB.connection_context()
- def query_output_data_infos(cls, **kwargs) -> typing.List[TrackingOutputDataInfo]:
- try:
- tracking_output_data_info_model = cls.get_dynamic_db_model(TrackingOutputDataInfo, kwargs.get("job_id"))
- filters = []
- for f_n, f_v in kwargs.items():
- attr_name = 'f_%s' % f_n
- if hasattr(tracking_output_data_info_model, attr_name):
- filters.append(operator.attrgetter('f_%s' % f_n)(tracking_output_data_info_model) == f_v)
- if filters:
- output_data_infos_tmp = tracking_output_data_info_model.select().where(*filters)
- else:
- output_data_infos_tmp = tracking_output_data_info_model.select()
- output_data_infos_group = {}
- # only the latest version of the task output data is retrieved
- for output_data_info in output_data_infos_tmp:
- group_key = cls.get_output_data_group_key(output_data_info.f_task_id, output_data_info.f_data_name)
- if group_key not in output_data_infos_group:
- output_data_infos_group[group_key] = output_data_info
- elif output_data_info.f_task_version > output_data_infos_group[group_key].f_task_version:
- output_data_infos_group[group_key] = output_data_info
- return list(output_data_infos_group.values())
- except Exception as e:
- return []
- @classmethod
- def get_output_data_group_key(cls, task_id, data_name):
- return task_id + data_name
- def clean_task(self):
- schedule_logger(self.job_id).info(
- 'clean task {} {} on {} {}'.format(self.task_id, self.task_version, self.role, self.party_id))
- session_id = job_utils.generate_session_id(self.task_id, self.task_version, self.role, self.party_id)
- sess = session.Session(session_id=session_id, options={"logger": schedule_logger(self.job_id)})
- sess.destroy_all_sessions()
- return True
- @classmethod
- def get_dynamic_db_model(cls, base, job_id):
- return type(base.model(table_index=cls.get_dynamic_tracking_table_index(job_id=job_id)))
- @classmethod
- def get_dynamic_tracking_table_index(cls, job_id):
- return job_id[:8]
|