|
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import operator
- from typing import Iterable
- import peewee
- from fate_arch.abc import StorageTableMetaABC, StorageTableABC, AddressABC
- from fate_arch.common.base_utils import current_timestamp
- from fate_arch.common.log import getLogger
- from fate_arch.relation_ship import Relationship
- from fate_arch.metastore.db_models import DB, StorageTableMetaModel
- LOGGER = getLogger()
- class StorageTableBase(StorageTableABC):
- def __init__(self, name, namespace, address, partitions, options, engine, store_type):
- self._name = name
- self._namespace = namespace
- self._address = address
- self._partitions = partitions
- self._options = options if options else {}
- self._engine = engine
- self._store_type = store_type
- self._meta = None
- self._read_access_time = None
- self._write_access_time = None
- @property
- def name(self):
- return self._name
- @property
- def namespace(self):
- return self._namespace
- @property
- def address(self):
- return self._address
- @property
- def partitions(self):
- return self._partitions
- @property
- def options(self):
- return self._options
- @property
- def engine(self):
- return self._engine
- @property
- def store_type(self):
- return self._store_type
- @property
- def meta(self):
- return self._meta
- @meta.setter
- def meta(self, meta):
- self._meta = meta
- @property
- def read_access_time(self):
- return self._read_access_time
- @property
- def write_access_time(self):
- return self._write_access_time
- def update_meta(self,
- schema=None,
- count=None,
- part_of_data=None,
- description=None,
- partitions=None,
- **kwargs):
- self._meta.update_metas(schema=schema,
- count=count,
- part_of_data=part_of_data,
- description=description,
- partitions=partitions,
- **kwargs)
- def create_meta(self, **kwargs):
- table_meta = StorageTableMeta(name=self._name, namespace=self._namespace, new=True)
- table_meta.set_metas(**kwargs)
- table_meta.address = self._address
- table_meta.partitions = self._partitions
- table_meta.engine = self._engine
- table_meta.store_type = self._store_type
- table_meta.options = self._options
- table_meta.create()
- self._meta = table_meta
- return table_meta
- def check_address(self):
- return True
- def put_all(self, kv_list: Iterable, **kwargs):
- self._update_write_access_time()
- self._put_all(kv_list, **kwargs)
- def collect(self, **kwargs) -> list:
- self._update_read_access_time()
- return self._collect(**kwargs)
- def count(self):
- self._update_read_access_time()
- count = self._count()
- self.meta.update_metas(count=count)
- return count
- def read(self):
- self._update_read_access_time()
- return self._read()
- def destroy(self):
- self.meta.destroy_metas()
- self._destroy()
- def save_as(self, address, name, namespace, partitions=None, **kwargs):
- table = self._save_as(address, name, namespace, partitions, **kwargs)
- table.create_meta(**kwargs)
- return table
- def _update_read_access_time(self, read_access_time=None):
- read_access_time = current_timestamp() if not read_access_time else read_access_time
- self._meta.update_metas(read_access_time=read_access_time)
- def _update_write_access_time(self, write_access_time=None):
- write_access_time = current_timestamp() if not write_access_time else write_access_time
- self._meta.update_metas(write_access_time=write_access_time)
- # to be implemented
- def _put_all(self, kv_list: Iterable, **kwargs):
- raise NotImplementedError()
- def _collect(self, **kwargs) -> list:
- raise NotImplementedError()
- def _count(self):
- raise NotImplementedError()
- def _read(self):
- raise NotImplementedError()
- def _destroy(self):
- raise NotImplementedError()
- def _save_as(self, address, name, namespace, partitions=None, schema=None, **kwargs):
- raise NotImplementedError()
- class StorageTableMeta(StorageTableMetaABC):
- def __init__(self, name, namespace, new=False, create_address=True):
- self.name = name
- self.namespace = namespace
- self.address = None
- self.engine = None
- self.store_type = None
- self.options = None
- self.partitions = None
- self.in_serialized = None
- self.have_head = None
- self.id_delimiter = None
- self.extend_sid = False
- self.auto_increasing_sid = None
- self.schema = None
- self.count = None
- self.part_of_data = None
- self.description = None
- self.origin = None
- self.disable = None
- self.create_time = None
- self.update_time = None
- self.read_access_time = None
- self.write_access_time = None
- if self.options is None:
- self.options = {}
- if self.schema is None:
- self.schema = {}
- if self.part_of_data is None:
- self.part_of_data = []
- if not new:
- self.build(create_address)
- def build(self, create_address):
- for k, v in self.table_meta.__dict__["__data__"].items():
- setattr(self, k.lstrip("f_"), v)
- if create_address:
- self.address = self.create_address(storage_engine=self.engine, address_dict=self.address)
- def __new__(cls, *args, **kwargs):
- if not kwargs.get("new", False):
- name, namespace = kwargs.get("name"), kwargs.get("namespace")
- if not name or not namespace:
- return None
- tables_meta = cls.query_table_meta(filter_fields=dict(name=name, namespace=namespace))
- if not tables_meta:
- return None
- self = super().__new__(cls)
- setattr(self, "table_meta", tables_meta[0])
- return self
- else:
- return super().__new__(cls)
- def exists(self):
- if hasattr(self, "table_meta"):
- return True
- else:
- return False
- @DB.connection_context()
- def create(self):
- table_meta = StorageTableMetaModel()
- table_meta.f_create_time = current_timestamp()
- table_meta.f_schema = {}
- table_meta.f_part_of_data = []
- for k, v in self.to_dict().items():
- attr_name = 'f_%s' % k
- if hasattr(StorageTableMetaModel, attr_name):
- setattr(table_meta, attr_name, v if not issubclass(type(v), AddressABC) else v.__dict__)
- try:
- rows = table_meta.save(force_insert=True)
- if rows != 1:
- raise Exception("create table meta failed")
- except peewee.IntegrityError as e:
- if e.args[0] == 1062:
- # warning
- pass
- elif isinstance(e.args[0], str) and "UNIQUE constraint failed" in e.args[0]:
- pass
- else:
- raise e
- except Exception as e:
- raise e
- def set_metas(self, **kwargs):
- for k, v in kwargs.items():
- if hasattr(self, k):
- setattr(self, k, v)
- @classmethod
- @DB.connection_context()
- def query_table_meta(cls, filter_fields, query_fields=None):
- filters = []
- querys = []
- for f_n, f_v in filter_fields.items():
- attr_name = 'f_%s' % f_n
- if hasattr(StorageTableMetaModel, attr_name):
- filters.append(operator.attrgetter('f_%s' % f_n)(StorageTableMetaModel) == f_v)
- if query_fields:
- for f_n in query_fields:
- attr_name = 'f_%s' % f_n
- if hasattr(StorageTableMetaModel, attr_name):
- querys.append(operator.attrgetter('f_%s' % f_n)(StorageTableMetaModel))
- if filters:
- if querys:
- tables_meta = StorageTableMetaModel.select(querys).where(*filters)
- else:
- tables_meta = StorageTableMetaModel.select().where(*filters)
- return [table_meta for table_meta in tables_meta]
- else:
- # not allow query all table
- return []
- @DB.connection_context()
- def update_metas(self, schema=None, count=None, part_of_data=None, description=None, partitions=None,
- in_serialized=None, **kwargs):
- meta_info = {}
- for k, v in locals().items():
- if k not in ["self", "kwargs", "meta_info"] and v is not None:
- meta_info[k] = v
- meta_info.update(kwargs)
- meta_info["name"] = meta_info.get("name", self.name)
- meta_info["namespace"] = meta_info.get("namespace", self.namespace)
- update_filters = []
- primary_keys = StorageTableMetaModel._meta.primary_key.field_names
- for p_k in primary_keys:
- update_filters.append(operator.attrgetter(p_k)(StorageTableMetaModel) == meta_info[p_k.lstrip("f_")])
- table_meta = StorageTableMetaModel()
- update_fields = {}
- for k, v in meta_info.items():
- attr_name = 'f_%s' % k
- if hasattr(StorageTableMetaModel, attr_name) and attr_name not in primary_keys:
- if k == "part_of_data":
- if len(v) < 100:
- tmp = v
- else:
- tmp = v[:100]
- update_fields[operator.attrgetter(attr_name)(StorageTableMetaModel)] = tmp
- else:
- update_fields[operator.attrgetter(attr_name)(StorageTableMetaModel)] = v
- if update_filters:
- operate = table_meta.update(update_fields).where(*update_filters)
- else:
- operate = table_meta.update(update_fields)
- if count:
- self.count = count
- _return = operate.execute()
- _meta = StorageTableMeta(name=self.name, namespace=self.namespace)
- return _return > 0, _meta
- @DB.connection_context()
- def destroy_metas(self):
- StorageTableMetaModel \
- .delete() \
- .where(StorageTableMetaModel.f_name == self.name,
- StorageTableMetaModel.f_namespace == self.namespace) \
- .execute()
- @classmethod
- def create_address(cls, storage_engine, address_dict):
- address_class = Relationship.EngineToAddress.get(storage_engine)
- kwargs = {}
- for k in address_class.__init__.__code__.co_varnames:
- if k == "self":
- continue
- if address_dict.get(k, None):
- kwargs[k] = address_dict[k]
- return address_class(**kwargs)
- def get_name(self):
- return self.name
- def get_namespace(self):
- return self.namespace
- def get_address(self):
- return self.address
- def get_engine(self):
- return self.engine
- def get_store_type(self):
- return self.store_type
- def get_options(self):
- return self.options
- def get_partitions(self):
- return self.partitions
- def get_in_serialized(self):
- return self.in_serialized
- def get_id_delimiter(self):
- return self.id_delimiter
- def get_extend_sid(self):
- return self.extend_sid
- def get_auto_increasing_sid(self):
- return self.auto_increasing_sid
- def get_have_head(self):
- return self.have_head
- def get_origin(self):
- return self.origin
- def get_disable(self):
- return self.disable
- def get_schema(self):
- return self.schema
- def get_count(self):
- return self.count
- def get_part_of_data(self):
- return self.part_of_data
- def get_description(self):
- return self.description
- def to_dict(self) -> dict:
- d = {}
- for k, v in self.__dict__.items():
- if v is None or k == "table_meta":
- continue
- d[k] = v
- return d
|