_table.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import operator
  17. from typing import Iterable
  18. import peewee
  19. from fate_arch.abc import StorageTableMetaABC, StorageTableABC, AddressABC
  20. from fate_arch.common.base_utils import current_timestamp
  21. from fate_arch.common.log import getLogger
  22. from fate_arch.relation_ship import Relationship
  23. from fate_arch.metastore.db_models import DB, StorageTableMetaModel
  24. LOGGER = getLogger()
  25. class StorageTableBase(StorageTableABC):
  26. def __init__(self, name, namespace, address, partitions, options, engine, store_type):
  27. self._name = name
  28. self._namespace = namespace
  29. self._address = address
  30. self._partitions = partitions
  31. self._options = options if options else {}
  32. self._engine = engine
  33. self._store_type = store_type
  34. self._meta = None
  35. self._read_access_time = None
  36. self._write_access_time = None
  37. @property
  38. def name(self):
  39. return self._name
  40. @property
  41. def namespace(self):
  42. return self._namespace
  43. @property
  44. def address(self):
  45. return self._address
  46. @property
  47. def partitions(self):
  48. return self._partitions
  49. @property
  50. def options(self):
  51. return self._options
  52. @property
  53. def engine(self):
  54. return self._engine
  55. @property
  56. def store_type(self):
  57. return self._store_type
  58. @property
  59. def meta(self):
  60. return self._meta
  61. @meta.setter
  62. def meta(self, meta):
  63. self._meta = meta
  64. @property
  65. def read_access_time(self):
  66. return self._read_access_time
  67. @property
  68. def write_access_time(self):
  69. return self._write_access_time
  70. def update_meta(self,
  71. schema=None,
  72. count=None,
  73. part_of_data=None,
  74. description=None,
  75. partitions=None,
  76. **kwargs):
  77. self._meta.update_metas(schema=schema,
  78. count=count,
  79. part_of_data=part_of_data,
  80. description=description,
  81. partitions=partitions,
  82. **kwargs)
  83. def create_meta(self, **kwargs):
  84. table_meta = StorageTableMeta(name=self._name, namespace=self._namespace, new=True)
  85. table_meta.set_metas(**kwargs)
  86. table_meta.address = self._address
  87. table_meta.partitions = self._partitions
  88. table_meta.engine = self._engine
  89. table_meta.store_type = self._store_type
  90. table_meta.options = self._options
  91. table_meta.create()
  92. self._meta = table_meta
  93. return table_meta
  94. def check_address(self):
  95. return True
  96. def put_all(self, kv_list: Iterable, **kwargs):
  97. self._update_write_access_time()
  98. self._put_all(kv_list, **kwargs)
  99. def collect(self, **kwargs) -> list:
  100. self._update_read_access_time()
  101. return self._collect(**kwargs)
  102. def count(self):
  103. self._update_read_access_time()
  104. count = self._count()
  105. self.meta.update_metas(count=count)
  106. return count
  107. def read(self):
  108. self._update_read_access_time()
  109. return self._read()
  110. def destroy(self):
  111. self.meta.destroy_metas()
  112. self._destroy()
  113. def save_as(self, address, name, namespace, partitions=None, **kwargs):
  114. table = self._save_as(address, name, namespace, partitions, **kwargs)
  115. table.create_meta(**kwargs)
  116. return table
  117. def _update_read_access_time(self, read_access_time=None):
  118. read_access_time = current_timestamp() if not read_access_time else read_access_time
  119. self._meta.update_metas(read_access_time=read_access_time)
  120. def _update_write_access_time(self, write_access_time=None):
  121. write_access_time = current_timestamp() if not write_access_time else write_access_time
  122. self._meta.update_metas(write_access_time=write_access_time)
  123. # to be implemented
  124. def _put_all(self, kv_list: Iterable, **kwargs):
  125. raise NotImplementedError()
  126. def _collect(self, **kwargs) -> list:
  127. raise NotImplementedError()
  128. def _count(self):
  129. raise NotImplementedError()
  130. def _read(self):
  131. raise NotImplementedError()
  132. def _destroy(self):
  133. raise NotImplementedError()
  134. def _save_as(self, address, name, namespace, partitions=None, schema=None, **kwargs):
  135. raise NotImplementedError()
  136. class StorageTableMeta(StorageTableMetaABC):
  137. def __init__(self, name, namespace, new=False, create_address=True):
  138. self.name = name
  139. self.namespace = namespace
  140. self.address = None
  141. self.engine = None
  142. self.store_type = None
  143. self.options = None
  144. self.partitions = None
  145. self.in_serialized = None
  146. self.have_head = None
  147. self.id_delimiter = None
  148. self.extend_sid = False
  149. self.auto_increasing_sid = None
  150. self.schema = None
  151. self.count = None
  152. self.part_of_data = None
  153. self.description = None
  154. self.origin = None
  155. self.disable = None
  156. self.create_time = None
  157. self.update_time = None
  158. self.read_access_time = None
  159. self.write_access_time = None
  160. if self.options is None:
  161. self.options = {}
  162. if self.schema is None:
  163. self.schema = {}
  164. if self.part_of_data is None:
  165. self.part_of_data = []
  166. if not new:
  167. self.build(create_address)
  168. def build(self, create_address):
  169. for k, v in self.table_meta.__dict__["__data__"].items():
  170. setattr(self, k.lstrip("f_"), v)
  171. if create_address:
  172. self.address = self.create_address(storage_engine=self.engine, address_dict=self.address)
  173. def __new__(cls, *args, **kwargs):
  174. if not kwargs.get("new", False):
  175. name, namespace = kwargs.get("name"), kwargs.get("namespace")
  176. if not name or not namespace:
  177. return None
  178. tables_meta = cls.query_table_meta(filter_fields=dict(name=name, namespace=namespace))
  179. if not tables_meta:
  180. return None
  181. self = super().__new__(cls)
  182. setattr(self, "table_meta", tables_meta[0])
  183. return self
  184. else:
  185. return super().__new__(cls)
  186. def exists(self):
  187. if hasattr(self, "table_meta"):
  188. return True
  189. else:
  190. return False
  191. @DB.connection_context()
  192. def create(self):
  193. table_meta = StorageTableMetaModel()
  194. table_meta.f_create_time = current_timestamp()
  195. table_meta.f_schema = {}
  196. table_meta.f_part_of_data = []
  197. for k, v in self.to_dict().items():
  198. attr_name = 'f_%s' % k
  199. if hasattr(StorageTableMetaModel, attr_name):
  200. setattr(table_meta, attr_name, v if not issubclass(type(v), AddressABC) else v.__dict__)
  201. try:
  202. rows = table_meta.save(force_insert=True)
  203. if rows != 1:
  204. raise Exception("create table meta failed")
  205. except peewee.IntegrityError as e:
  206. if e.args[0] == 1062:
  207. # warning
  208. pass
  209. elif isinstance(e.args[0], str) and "UNIQUE constraint failed" in e.args[0]:
  210. pass
  211. else:
  212. raise e
  213. except Exception as e:
  214. raise e
  215. def set_metas(self, **kwargs):
  216. for k, v in kwargs.items():
  217. if hasattr(self, k):
  218. setattr(self, k, v)
  219. @classmethod
  220. @DB.connection_context()
  221. def query_table_meta(cls, filter_fields, query_fields=None):
  222. filters = []
  223. querys = []
  224. for f_n, f_v in filter_fields.items():
  225. attr_name = 'f_%s' % f_n
  226. if hasattr(StorageTableMetaModel, attr_name):
  227. filters.append(operator.attrgetter('f_%s' % f_n)(StorageTableMetaModel) == f_v)
  228. if query_fields:
  229. for f_n in query_fields:
  230. attr_name = 'f_%s' % f_n
  231. if hasattr(StorageTableMetaModel, attr_name):
  232. querys.append(operator.attrgetter('f_%s' % f_n)(StorageTableMetaModel))
  233. if filters:
  234. if querys:
  235. tables_meta = StorageTableMetaModel.select(querys).where(*filters)
  236. else:
  237. tables_meta = StorageTableMetaModel.select().where(*filters)
  238. return [table_meta for table_meta in tables_meta]
  239. else:
  240. # not allow query all table
  241. return []
  242. @DB.connection_context()
  243. def update_metas(self, schema=None, count=None, part_of_data=None, description=None, partitions=None,
  244. in_serialized=None, **kwargs):
  245. meta_info = {}
  246. for k, v in locals().items():
  247. if k not in ["self", "kwargs", "meta_info"] and v is not None:
  248. meta_info[k] = v
  249. meta_info.update(kwargs)
  250. meta_info["name"] = meta_info.get("name", self.name)
  251. meta_info["namespace"] = meta_info.get("namespace", self.namespace)
  252. update_filters = []
  253. primary_keys = StorageTableMetaModel._meta.primary_key.field_names
  254. for p_k in primary_keys:
  255. update_filters.append(operator.attrgetter(p_k)(StorageTableMetaModel) == meta_info[p_k.lstrip("f_")])
  256. table_meta = StorageTableMetaModel()
  257. update_fields = {}
  258. for k, v in meta_info.items():
  259. attr_name = 'f_%s' % k
  260. if hasattr(StorageTableMetaModel, attr_name) and attr_name not in primary_keys:
  261. if k == "part_of_data":
  262. if len(v) < 100:
  263. tmp = v
  264. else:
  265. tmp = v[:100]
  266. update_fields[operator.attrgetter(attr_name)(StorageTableMetaModel)] = tmp
  267. else:
  268. update_fields[operator.attrgetter(attr_name)(StorageTableMetaModel)] = v
  269. if update_filters:
  270. operate = table_meta.update(update_fields).where(*update_filters)
  271. else:
  272. operate = table_meta.update(update_fields)
  273. if count:
  274. self.count = count
  275. _return = operate.execute()
  276. _meta = StorageTableMeta(name=self.name, namespace=self.namespace)
  277. return _return > 0, _meta
  278. @DB.connection_context()
  279. def destroy_metas(self):
  280. StorageTableMetaModel \
  281. .delete() \
  282. .where(StorageTableMetaModel.f_name == self.name,
  283. StorageTableMetaModel.f_namespace == self.namespace) \
  284. .execute()
  285. @classmethod
  286. def create_address(cls, storage_engine, address_dict):
  287. address_class = Relationship.EngineToAddress.get(storage_engine)
  288. kwargs = {}
  289. for k in address_class.__init__.__code__.co_varnames:
  290. if k == "self":
  291. continue
  292. if address_dict.get(k, None):
  293. kwargs[k] = address_dict[k]
  294. return address_class(**kwargs)
  295. def get_name(self):
  296. return self.name
  297. def get_namespace(self):
  298. return self.namespace
  299. def get_address(self):
  300. return self.address
  301. def get_engine(self):
  302. return self.engine
  303. def get_store_type(self):
  304. return self.store_type
  305. def get_options(self):
  306. return self.options
  307. def get_partitions(self):
  308. return self.partitions
  309. def get_in_serialized(self):
  310. return self.in_serialized
  311. def get_id_delimiter(self):
  312. return self.id_delimiter
  313. def get_extend_sid(self):
  314. return self.extend_sid
  315. def get_auto_increasing_sid(self):
  316. return self.auto_increasing_sid
  317. def get_have_head(self):
  318. return self.have_head
  319. def get_origin(self):
  320. return self.origin
  321. def get_disable(self):
  322. return self.disable
  323. def get_schema(self):
  324. return self.schema
  325. def get_count(self):
  326. return self.count
  327. def get_part_of_data(self):
  328. return self.part_of_data
  329. def get_description(self):
  330. return self.description
  331. def to_dict(self) -> dict:
  332. d = {}
  333. for k, v in self.__dict__.items():
  334. if v is None or k == "table_meta":
  335. continue
  336. d[k] = v
  337. return d