_session.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import os.path
  17. import typing
  18. from fate_arch.abc import StorageSessionABC, CTableABC
  19. from fate_arch.common import EngineType, engine_utils
  20. from fate_arch.common.data_utils import default_output_fs_path
  21. from fate_arch.common.log import getLogger
  22. from fate_arch.storage._table import StorageTableMeta
  23. from fate_arch.storage._types import StorageEngine, EggRollStoreType, StandaloneStoreType, HDFSStoreType, HiveStoreType, \
  24. LinkisHiveStoreType, LocalFSStoreType, PathStoreType, StorageTableOrigin
  25. from fate_arch.relation_ship import Relationship
  26. from fate_arch.common.base_utils import current_timestamp
  27. LOGGER = getLogger()
  28. class StorageSessionBase(StorageSessionABC):
  29. def __init__(self, session_id, engine):
  30. self._session_id = session_id
  31. self._engine = engine
  32. def create_table(self, address, name, namespace, partitions=None, **kwargs):
  33. table = self.table(address=address, name=name, namespace=namespace, partitions=partitions, **kwargs)
  34. table.create_meta(**kwargs)
  35. return table
  36. def get_table(self, name, namespace):
  37. meta = StorageTableMeta(name=name, namespace=namespace)
  38. if meta and meta.exists():
  39. table = self.table(name=meta.get_name(),
  40. namespace=meta.get_namespace(),
  41. address=meta.get_address(),
  42. partitions=meta.get_partitions(),
  43. store_type=meta.get_store_type(),
  44. options=meta.get_options())
  45. table.meta = meta
  46. return table
  47. else:
  48. return None
  49. @classmethod
  50. def get_table_meta(cls, name, namespace):
  51. meta = StorageTableMeta(name=name, namespace=namespace)
  52. if meta and meta.exists():
  53. return meta
  54. else:
  55. return None
  56. @classmethod
  57. def persistent(cls, computing_table: CTableABC, namespace, name, schema=None,
  58. part_of_data=None, engine=None, engine_address=None,
  59. store_type=None, token: typing.Dict = None) -> StorageTableMeta:
  60. if engine:
  61. if engine != StorageEngine.PATH and engine not in Relationship.Computing.get(
  62. computing_table.engine, {}).get(EngineType.STORAGE, {}).get("support", []):
  63. raise Exception(f"storage engine {engine} not supported with computing engine {computing_table.engine}")
  64. else:
  65. engine = Relationship.Computing.get(
  66. computing_table.engine,
  67. {}).get(
  68. EngineType.STORAGE,
  69. {}).get(
  70. "default",
  71. None)
  72. if not engine:
  73. raise Exception(f"can not found {computing_table.engine} default storage engine")
  74. if engine_address is None:
  75. # find engine address from service_conf.yaml
  76. engine_address = engine_utils.get_engines_config_from_conf().get(EngineType.STORAGE, {}).get(engine, {})
  77. address_dict = engine_address.copy()
  78. partitions = computing_table.partitions
  79. if engine == StorageEngine.STANDALONE:
  80. address_dict.update({"name": name, "namespace": namespace})
  81. store_type = StandaloneStoreType.ROLLPAIR_LMDB if store_type is None else store_type
  82. elif engine == StorageEngine.EGGROLL:
  83. address_dict.update({"name": name, "namespace": namespace})
  84. store_type = EggRollStoreType.ROLLPAIR_LMDB if store_type is None else store_type
  85. elif engine == StorageEngine.HIVE:
  86. address_dict.update({"database": namespace, "name": f"{name}"})
  87. store_type = HiveStoreType.DEFAULT if store_type is None else store_type
  88. elif engine == StorageEngine.LINKIS_HIVE:
  89. address_dict.update({"database": None, "name": f"{namespace}_{name}",
  90. "username": token.get("username", "")})
  91. store_type = LinkisHiveStoreType.DEFAULT if store_type is None else store_type
  92. elif engine == StorageEngine.HDFS:
  93. if not address_dict.get("path"):
  94. address_dict.update({"path": default_output_fs_path(
  95. name=name, namespace=namespace, prefix=address_dict.get("path_prefix"))})
  96. store_type = HDFSStoreType.DISK if store_type is None else store_type
  97. elif engine == StorageEngine.LOCALFS:
  98. if not address_dict.get("path"):
  99. address_dict.update({"path": default_output_fs_path(
  100. name=name, namespace=namespace, storage_engine=StorageEngine.LOCALFS)})
  101. store_type = LocalFSStoreType.DISK if store_type is None else store_type
  102. elif engine == StorageEngine.PATH:
  103. store_type = PathStoreType.PICTURE if store_type is None else store_type
  104. else:
  105. raise RuntimeError(f"{engine} storage is not supported")
  106. address = StorageTableMeta.create_address(storage_engine=engine, address_dict=address_dict)
  107. schema = schema if schema else {}
  108. computing_table.save(address, schema=schema, partitions=partitions, store_type=store_type)
  109. table_count = computing_table.count()
  110. table_meta = StorageTableMeta(name=name, namespace=namespace, new=True)
  111. table_meta.address = address
  112. table_meta.partitions = computing_table.partitions
  113. table_meta.engine = engine
  114. table_meta.store_type = store_type
  115. table_meta.schema = schema
  116. table_meta.part_of_data = part_of_data if part_of_data else {}
  117. table_meta.count = table_count
  118. table_meta.write_access_time = current_timestamp()
  119. table_meta.origin = StorageTableOrigin.OUTPUT
  120. table_meta.create()
  121. return table_meta
  122. def __enter__(self):
  123. return self
  124. def __exit__(self, exc_type, exc_value, traceback):
  125. self.destroy()
  126. def destroy(self):
  127. try:
  128. self.stop()
  129. except Exception as e:
  130. LOGGER.warning(f"stop storage session {self._session_id} failed, try to kill", e)
  131. self.kill()
  132. def table(self, name, namespace, address, store_type, partitions=None, **kwargs):
  133. raise NotImplementedError()
  134. def stop(self):
  135. raise NotImplementedError()
  136. def kill(self):
  137. raise NotImplementedError()
  138. def cleanup(self, name, namespace):
  139. raise NotImplementedError()
  140. @property
  141. def session_id(self):
  142. return self._session_id
  143. @property
  144. def engine(self):
  145. return self._engine