|
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import datetime
- import inspect
- import os
- import sys
- from functools import wraps
- from peewee import (
- BigAutoField, BigIntegerField, BooleanField, CharField,
- CompositeKey, Insert, IntegerField, TextField,
- )
- from playhouse.hybrid import hybrid_property
- from playhouse.pool import PooledMySQLDatabase
- from fate_arch.common import file_utils
- from fate_arch.metastore.base_model import (
- BaseModel, DateTimeField, JSONField, ListField,
- LongTextField, SerializedField, SerializedType,
- )
- from fate_flow.db.runtime_config import RuntimeConfig
- from fate_flow.settings import DATABASE, IS_STANDALONE, stat_logger
- from fate_flow.utils.log_utils import getLogger
- from fate_flow.utils.object_utils import from_dict_hook
- LOGGER = getLogger()
- class JsonSerializedField(SerializedField):
- def __init__(self, object_hook=from_dict_hook, object_pairs_hook=None, **kwargs):
- super(JsonSerializedField, self).__init__(serialized_type=SerializedType.JSON, object_hook=object_hook,
- object_pairs_hook=object_pairs_hook, **kwargs)
- def singleton(cls, *args, **kw):
- instances = {}
- def _singleton():
- key = str(cls) + str(os.getpid())
- if key not in instances:
- instances[key] = cls(*args, **kw)
- return instances[key]
- return _singleton
- @singleton
- class BaseDataBase:
- def __init__(self):
- database_config = DATABASE.copy()
- db_name = database_config.pop("name")
- if IS_STANDALONE and not bool(int(os.environ.get("FORCE_USE_MYSQL", 0))):
- # sqlite does not support other options
- Insert.on_conflict = lambda self, *args, **kwargs: self.on_conflict_replace()
- from playhouse.apsw_ext import APSWDatabase
- self.database_connection = APSWDatabase(file_utils.get_project_base_directory("fate_sqlite.db"))
- RuntimeConfig.init_config(USE_LOCAL_DATABASE=True)
- stat_logger.info('init sqlite database on standalone mode successfully')
- else:
- self.database_connection = PooledMySQLDatabase(db_name, **database_config)
- stat_logger.info('init mysql database on cluster mode successfully')
- class DatabaseLock:
- def __init__(self, lock_name, timeout=10, db=None):
- self.lock_name = lock_name
- self.timeout = int(timeout)
- self.db = db if db else DB
- def lock(self):
- # SQL parameters only support %s format placeholders
- cursor = self.db.execute_sql("SELECT GET_LOCK(%s, %s)", (self.lock_name, self.timeout))
- ret = cursor.fetchone()
- if ret[0] == 0:
- raise Exception(f'acquire mysql lock {self.lock_name} timeout')
- elif ret[0] == 1:
- return True
- else:
- raise Exception(f'failed to acquire lock {self.lock_name}')
- def unlock(self):
- cursor = self.db.execute_sql("SELECT RELEASE_LOCK(%s)", (self.lock_name, ))
- ret = cursor.fetchone()
- if ret[0] == 0:
- raise Exception(f'mysql lock {self.lock_name} was not established by this thread')
- elif ret[0] == 1:
- return True
- else:
- raise Exception(f'mysql lock {self.lock_name} does not exist')
- def __enter__(self):
- if isinstance(self.db, PooledMySQLDatabase):
- self.lock()
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- if isinstance(self.db, PooledMySQLDatabase):
- self.unlock()
- def __call__(self, func):
- @wraps(func)
- def magic(*args, **kwargs):
- with self:
- return func(*args, **kwargs)
- return magic
- DB = BaseDataBase().database_connection
- DB.lock = DatabaseLock
- def close_connection():
- try:
- if DB:
- DB.close()
- except Exception as e:
- LOGGER.exception(e)
- class DataBaseModel(BaseModel):
- class Meta:
- database = DB
- @DB.connection_context()
- def init_database_tables():
- members = inspect.getmembers(sys.modules[__name__], inspect.isclass)
- table_objs = []
- create_failed_list = []
- for name, obj in members:
- if obj != DataBaseModel and issubclass(obj, DataBaseModel):
- table_objs.append(obj)
- LOGGER.info(f"start create table {obj.__name__}")
- try:
- obj.create_table()
- LOGGER.info(f"create table success: {obj.__name__}")
- except Exception as e:
- LOGGER.exception(e)
- create_failed_list.append(obj.__name__)
- if create_failed_list:
- LOGGER.info(f"create tables failed: {create_failed_list}")
- raise Exception(f"create tables failed: {create_failed_list}")
- def fill_db_model_object(model_object, human_model_dict):
- for k, v in human_model_dict.items():
- attr_name = 'f_%s' % k
- if hasattr(model_object.__class__, attr_name):
- setattr(model_object, attr_name, v)
- return model_object
- class Job(DataBaseModel):
- # multi-party common configuration
- f_user_id = CharField(max_length=25, null=True)
- f_job_id = CharField(max_length=25, index=True)
- f_name = CharField(max_length=500, null=True, default='')
- f_description = TextField(null=True, default='')
- f_tag = CharField(max_length=50, null=True, default='')
- f_dsl = JSONField()
- f_runtime_conf = JSONField()
- f_runtime_conf_on_party = JSONField()
- f_train_runtime_conf = JSONField(null=True)
- f_roles = JSONField()
- f_initiator_role = CharField(max_length=50)
- f_initiator_party_id = CharField(max_length=50)
- f_status = CharField(max_length=50)
- f_status_code = IntegerField(null=True)
- f_user = JSONField()
- # this party configuration
- f_role = CharField(max_length=50, index=True)
- f_party_id = CharField(max_length=10, index=True)
- f_is_initiator = BooleanField(null=True, default=False)
- f_progress = IntegerField(null=True, default=0)
- f_ready_signal = BooleanField(default=False)
- f_ready_time = BigIntegerField(null=True)
- f_cancel_signal = BooleanField(default=False)
- f_cancel_time = BigIntegerField(null=True)
- f_rerun_signal = BooleanField(default=False)
- f_end_scheduling_updates = IntegerField(null=True, default=0)
- f_engine_name = CharField(max_length=50, null=True)
- f_engine_type = CharField(max_length=10, null=True)
- f_cores = IntegerField(default=0)
- f_memory = IntegerField(default=0) # MB
- f_remaining_cores = IntegerField(default=0)
- f_remaining_memory = IntegerField(default=0) # MB
- f_resource_in_use = BooleanField(default=False)
- f_apply_resource_time = BigIntegerField(null=True)
- f_return_resource_time = BigIntegerField(null=True)
- f_inheritance_info = JSONField(null=True)
- f_inheritance_status = CharField(max_length=50, null=True)
- f_start_time = BigIntegerField(null=True)
- f_start_date = DateTimeField(null=True)
- f_end_time = BigIntegerField(null=True)
- f_end_date = DateTimeField(null=True)
- f_elapsed = BigIntegerField(null=True)
- class Meta:
- db_table = "t_job"
- primary_key = CompositeKey('f_job_id', 'f_role', 'f_party_id')
- class Task(DataBaseModel):
- # multi-party common configuration
- f_job_id = CharField(max_length=25, index=True)
- f_component_name = TextField()
- f_component_module = CharField(max_length=200)
- f_task_id = CharField(max_length=100)
- f_task_version = BigIntegerField()
- f_initiator_role = CharField(max_length=50)
- f_initiator_party_id = CharField(max_length=50, default=-1)
- f_federated_mode = CharField(max_length=10)
- f_federated_status_collect_type = CharField(max_length=10)
- f_status = CharField(max_length=50, index=True)
- f_status_code = IntegerField(null=True)
- f_auto_retries = IntegerField(default=0)
- f_auto_retry_delay = IntegerField(default=0)
- # this party configuration
- f_role = CharField(max_length=50, index=True)
- f_party_id = CharField(max_length=10, index=True)
- f_run_on_this_party = BooleanField(null=True, index=True, default=False)
- f_worker_id = CharField(null=True, max_length=100)
- f_cmd = JSONField(null=True)
- f_run_ip = CharField(max_length=100, null=True)
- f_run_port = IntegerField(null=True)
- f_run_pid = IntegerField(null=True)
- f_party_status = CharField(max_length=50)
- f_provider_info = JSONField()
- f_component_parameters = JSONField()
- f_engine_conf = JSONField(null=True)
- f_kill_status = BooleanField(default=False)
- f_error_report = TextField(default="")
- f_start_time = BigIntegerField(null=True)
- f_start_date = DateTimeField(null=True)
- f_end_time = BigIntegerField(null=True)
- f_end_date = DateTimeField(null=True)
- f_elapsed = BigIntegerField(null=True)
- class Meta:
- db_table = "t_task"
- primary_key = CompositeKey('f_job_id', 'f_task_id', 'f_task_version', 'f_role', 'f_party_id')
- class TrackingMetric(DataBaseModel):
- _mapper = {}
- @classmethod
- def model(cls, table_index=None, date=None):
- if not table_index:
- table_index = date.strftime(
- '%Y%m%d') if date else datetime.datetime.now().strftime(
- '%Y%m%d')
- class_name = 'TrackingMetric_%s' % table_index
- ModelClass = TrackingMetric._mapper.get(class_name, None)
- if ModelClass is None:
- class Meta:
- db_table = '%s_%s' % ('t_tracking_metric', table_index)
- attrs = {'__module__': cls.__module__, 'Meta': Meta}
- ModelClass = type("%s_%s" % (cls.__name__, table_index), (cls,),
- attrs)
- TrackingMetric._mapper[class_name] = ModelClass
- return ModelClass()
- f_id = BigAutoField(primary_key=True)
- f_job_id = CharField(max_length=25, index=True)
- f_component_name = CharField(max_length=30, index=True)
- f_task_id = CharField(max_length=100, null=True)
- f_task_version = BigIntegerField(null=True)
- f_role = CharField(max_length=10, index=True)
- f_party_id = CharField(max_length=10)
- f_metric_namespace = CharField(max_length=80, index=True)
- f_metric_name = CharField(max_length=80, index=True)
- f_key = CharField(max_length=200)
- f_value = LongTextField()
- f_type = IntegerField() # 0 is data, 1 is meta
- class TrackingOutputDataInfo(DataBaseModel):
- _mapper = {}
- @classmethod
- def model(cls, table_index=None, date=None):
- if not table_index:
- table_index = date.strftime(
- '%Y%m%d') if date else datetime.datetime.now().strftime(
- '%Y%m%d')
- class_name = 'TrackingOutputDataInfo_%s' % table_index
- ModelClass = TrackingOutputDataInfo._mapper.get(class_name, None)
- if ModelClass is None:
- class Meta:
- db_table = '%s_%s' % ('t_tracking_output_data_info', table_index)
- primary_key = CompositeKey(
- 'f_job_id', 'f_task_id', 'f_task_version',
- 'f_data_name', 'f_role', 'f_party_id',
- )
- attrs = {'__module__': cls.__module__, 'Meta': Meta}
- ModelClass = type("%s_%s" % (cls.__name__, table_index), (cls,),
- attrs)
- TrackingOutputDataInfo._mapper[class_name] = ModelClass
- return ModelClass()
- # multi-party common configuration
- f_job_id = CharField(max_length=25, index=True)
- f_component_name = TextField()
- f_task_id = CharField(max_length=100, null=True, index=True)
- f_task_version = BigIntegerField(null=True)
- f_data_name = CharField(max_length=30)
- # this party configuration
- f_role = CharField(max_length=50, index=True)
- f_party_id = CharField(max_length=10, index=True)
- f_table_name = CharField(max_length=500, null=True)
- f_table_namespace = CharField(max_length=500, null=True)
- f_description = TextField(null=True, default='')
- class MachineLearningModelInfo(DataBaseModel):
- f_role = CharField(max_length=50)
- f_party_id = CharField(max_length=10)
- f_roles = JSONField(default={})
- f_job_id = CharField(max_length=25, index=True)
- f_model_id = CharField(max_length=100, index=True)
- f_model_version = CharField(max_length=100, index=True)
- f_size = BigIntegerField(default=0)
- f_initiator_role = CharField(max_length=50)
- f_initiator_party_id = CharField(max_length=50, default=-1)
- # TODO: deprecated. use f_train_runtime_conf instead
- f_runtime_conf = JSONField(default={})
- f_train_dsl = JSONField(default={})
- f_train_runtime_conf = JSONField(default={})
- f_runtime_conf_on_party = JSONField(default={})
- f_inference_dsl = JSONField(default={})
- f_fate_version = CharField(max_length=10, null=True, default='')
- f_parent = BooleanField(null=True, default=None)
- f_parent_info = JSONField(default={})
- # loaded times in api /model/load/do
- f_loaded_times = IntegerField(default=0)
- # imported from api /model/import
- f_imported = IntegerField(default=0)
- f_archive_sha256 = CharField(max_length=100, null=True)
- f_archive_from_ip = CharField(max_length=100, null=True)
- @hybrid_property
- def f_party_model_id(self):
- return '#'.join([self.f_role, self.f_party_id, self.f_model_id])
- class Meta:
- db_table = "t_machine_learning_model_info"
- primary_key = CompositeKey('f_role', 'f_party_id', 'f_model_id', 'f_model_version')
- class DataTableTracking(DataBaseModel):
- f_table_id = BigAutoField(primary_key=True)
- f_table_name = CharField(max_length=300, null=True)
- f_table_namespace = CharField(max_length=300, null=True)
- f_job_id = CharField(max_length=25, index=True, null=True)
- f_have_parent = BooleanField(default=False)
- f_parent_number = IntegerField(default=0)
- f_parent_table_name = CharField(max_length=500, null=True)
- f_parent_table_namespace = CharField(max_length=500, null=True)
- f_source_table_name = CharField(max_length=500, null=True)
- f_source_table_namespace = CharField(max_length=500, null=True)
- class Meta:
- db_table = "t_data_table_tracking"
- class CacheRecord(DataBaseModel):
- f_cache_key = CharField(max_length=500)
- f_cache = JsonSerializedField()
- f_job_id = CharField(max_length=25, index=True, null=True)
- f_role = CharField(max_length=50, index=True, null=True)
- f_party_id = CharField(max_length=10, index=True, null=True)
- f_component_name = TextField(null=True)
- f_task_id = CharField(max_length=100, null=True)
- f_task_version = BigIntegerField(null=True, index=True)
- f_cache_name = CharField(max_length=50, null=True)
- t_ttl = BigIntegerField(default=0)
- class Meta:
- db_table = "t_cache_record"
- class ModelTag(DataBaseModel):
- f_id = BigAutoField(primary_key=True)
- f_m_id = CharField(max_length=25, null=False)
- f_t_id = BigIntegerField(null=False)
- class Meta:
- db_table = "t_model_tag"
- class Tag(DataBaseModel):
- f_id = BigAutoField(primary_key=True)
- f_name = CharField(max_length=100, unique=True)
- f_desc = TextField(null=True)
- class Meta:
- db_table = "t_tags"
- class ComponentSummary(DataBaseModel):
- _mapper = {}
- @classmethod
- def model(cls, table_index=None, date=None):
- if not table_index:
- table_index = date.strftime(
- '%Y%m%d') if date else datetime.datetime.now().strftime(
- '%Y%m%d')
- class_name = 'ComponentSummary_%s' % table_index
- ModelClass = TrackingMetric._mapper.get(class_name, None)
- if ModelClass is None:
- class Meta:
- db_table = '%s_%s' % ('t_component_summary', table_index)
- attrs = {'__module__': cls.__module__, 'Meta': Meta}
- ModelClass = type("%s_%s" % (cls.__name__, table_index), (cls,), attrs)
- ComponentSummary._mapper[class_name] = ModelClass
- return ModelClass()
- f_id = BigAutoField(primary_key=True)
- f_job_id = CharField(max_length=25, index=True)
- f_role = CharField(max_length=25, index=True)
- f_party_id = CharField(max_length=10, index=True)
- f_component_name = CharField(max_length=50)
- f_task_id = CharField(max_length=50, null=True, index=True)
- f_task_version = CharField(max_length=50, null=True)
- f_summary = LongTextField()
- class EngineRegistry(DataBaseModel):
- f_engine_type = CharField(max_length=10, index=True)
- f_engine_name = CharField(max_length=50, index=True)
- f_engine_entrance = CharField(max_length=50, index=True)
- f_engine_config = JSONField()
- f_cores = IntegerField()
- f_memory = IntegerField() # MB
- f_remaining_cores = IntegerField()
- f_remaining_memory = IntegerField() # MB
- f_nodes = IntegerField()
- class Meta:
- db_table = "t_engine_registry"
- primary_key = CompositeKey('f_engine_name', 'f_engine_type')
- # component registry
- class ComponentRegistryInfo(DataBaseModel):
- f_provider_name = CharField(max_length=20, index=True)
- f_version = CharField(max_length=10, index=True)
- f_component_name = CharField(max_length=30, index=True)
- f_module = CharField(max_length=128)
- class Meta:
- db_table = "t_component_registry"
- primary_key = CompositeKey('f_provider_name', 'f_version', 'f_component_name')
- class ComponentProviderInfo(DataBaseModel):
- f_provider_name = CharField(max_length=20, index=True)
- f_version = CharField(max_length=10, index=True)
- f_class_path = JSONField()
- f_path = CharField(max_length=128, null=False)
- f_python = CharField(max_length=128, null=False)
- class Meta:
- db_table = "t_component_provider_info"
- primary_key = CompositeKey('f_provider_name', 'f_version')
- class ComponentInfo(DataBaseModel):
- f_component_name = CharField(max_length=30, primary_key=True)
- f_component_alias = JSONField()
- f_default_provider = CharField(max_length=20)
- f_support_provider = ListField(null=True)
- class Meta:
- db_table = "t_component_info"
- class WorkerInfo(DataBaseModel):
- f_worker_id = CharField(max_length=100, primary_key=True)
- f_worker_name = CharField(max_length=50, index=True)
- f_job_id = CharField(max_length=25, index=True)
- f_task_id = CharField(max_length=100)
- f_task_version = BigIntegerField(index=True)
- f_role = CharField(max_length=50)
- f_party_id = CharField(max_length=10, index=True)
- f_run_ip = CharField(max_length=100, null=True)
- f_run_pid = IntegerField(null=True)
- f_http_port = IntegerField(null=True)
- f_grpc_port = IntegerField(null=True)
- f_config = JSONField(null=True)
- f_cmd = JSONField(null=True)
- f_start_time = BigIntegerField(null=True)
- f_start_date = DateTimeField(null=True)
- f_end_time = BigIntegerField(null=True)
- f_end_date = DateTimeField(null=True)
- class Meta:
- db_table = "t_worker"
- class DependenciesStorageMeta(DataBaseModel):
- f_storage_engine = CharField(max_length=30)
- f_type = CharField(max_length=20)
- f_version = CharField(max_length=10, index=True)
- f_storage_path = CharField(max_length=256, null=True)
- f_snapshot_time = BigIntegerField(null=True)
- f_fate_flow_snapshot_time = BigIntegerField(null=True)
- f_dependencies_conf = JSONField(null=True)
- f_upload_status = BooleanField(default=False)
- f_pid = IntegerField(null=True)
- class Meta:
- db_table = "t_dependencies_storage_meta"
- primary_key = CompositeKey('f_storage_engine', 'f_type', 'f_version')
- class ServerRegistryInfo(DataBaseModel):
- f_server_name = CharField(max_length=30, index=True)
- f_host = CharField(max_length=30)
- f_port = IntegerField()
- f_protocol = CharField(max_length=10)
- class Meta:
- db_table = "t_server_registry_info"
- class ServiceRegistryInfo(DataBaseModel):
- f_server_name = CharField(max_length=30)
- f_service_name = CharField(max_length=30)
- f_url = CharField(max_length=100)
- f_method = CharField(max_length=10)
- f_params = JSONField(null=True)
- f_data = JSONField(null=True)
- f_headers = JSONField(null=True)
- class Meta:
- db_table = "t_service_registry_info"
- primary_key = CompositeKey('f_server_name', 'f_service_name')
- class SiteKeyInfo(DataBaseModel):
- f_party_id = CharField(max_length=10, index=True)
- f_key_name = CharField(max_length=10, index=True)
- f_key = LongTextField()
- class Meta:
- db_table = "t_site_key_info"
- primary_key = CompositeKey('f_party_id', 'f_key_name')
- class PipelineComponentMeta(DataBaseModel):
- f_model_id = CharField(max_length=100, index=True)
- f_model_version = CharField(max_length=100, index=True)
- f_role = CharField(max_length=50, index=True)
- f_party_id = CharField(max_length=10, index=True)
- f_component_name = CharField(max_length=100, index=True)
- f_component_module_name = CharField(max_length=100)
- f_model_alias = CharField(max_length=100, index=True)
- f_model_proto_index = JSONField(null=True)
- f_run_parameters = JSONField(null=True)
- f_archive_sha256 = CharField(max_length=100, null=True)
- f_archive_from_ip = CharField(max_length=100, null=True)
- class Meta:
- db_table = 't_pipeline_component_meta'
- indexes = (
- (('f_model_id', 'f_model_version', 'f_role', 'f_party_id', 'f_component_name'), True),
- )
|