_session.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import typing
  17. import uuid
  18. import peewee
  19. from fate_arch.abc import CSessionABC, FederationABC, CTableABC, StorageSessionABC, StorageTableABC, StorageTableMetaABC
  20. from fate_arch.common import engine_utils, EngineType, Party
  21. from fate_arch.common import log, base_utils
  22. from fate_arch.common import remote_status
  23. from fate_arch.common._parties import PartiesInfo
  24. from fate_arch.computing import ComputingEngine
  25. from fate_arch.federation import FederationEngine
  26. from fate_arch.metastore.db_models import DB, SessionRecord, init_database_tables
  27. from fate_arch.storage import StorageEngine, StorageSessionBase
  28. LOGGER = log.getLogger()
  29. class Session(object):
  30. __GLOBAL_SESSION = None
  31. @classmethod
  32. def get_global(cls):
  33. return cls.__GLOBAL_SESSION
  34. @classmethod
  35. def _as_global(cls, sess):
  36. cls.__GLOBAL_SESSION = sess
  37. def as_global(self):
  38. self._as_global(self)
  39. return self
  40. def __init__(self, session_id: str = None, options=None):
  41. if options is None:
  42. options = {}
  43. engines = engine_utils.get_engines()
  44. LOGGER.info(f"using engines: {engines}")
  45. computing_type = engines.get(EngineType.COMPUTING, None)
  46. if computing_type is None:
  47. raise RuntimeError(f"must set default engines on conf/service_conf.yaml")
  48. self._computing_type = engines.get(EngineType.COMPUTING, None)
  49. self._federation_type = engines.get(EngineType.FEDERATION, None)
  50. self._storage_engine = engines.get(EngineType.STORAGE, None)
  51. self._computing_session: typing.Optional[CSessionABC] = None
  52. self._federation_session: typing.Optional[FederationABC] = None
  53. self._storage_session: typing.Dict[StorageSessionABC] = {}
  54. self._parties_info: typing.Optional[PartiesInfo] = None
  55. self._all_party_info: typing.List[Party] = []
  56. self._session_id = str(uuid.uuid1()) if not session_id else session_id
  57. self._logger = LOGGER if options.get("logger", None) is None else options.get("logger", None)
  58. self._logger.info(f"create manager session {self._session_id}")
  59. # init meta db
  60. init_database_tables()
  61. @property
  62. def session_id(self) -> str:
  63. return self._session_id
  64. def _open(self):
  65. return self
  66. def _close(self):
  67. self.destroy_all_sessions()
  68. def __enter__(self):
  69. return self._open()
  70. def __exit__(self, exc_type, exc_val, exc_tb):
  71. if exc_tb:
  72. self._logger.exception("", exc_info=(exc_type, exc_val, exc_tb))
  73. return self._close()
  74. def init_computing(self,
  75. computing_session_id: str = None,
  76. record: bool = True,
  77. **kwargs):
  78. computing_session_id = f"{self._session_id}_computing_{uuid.uuid1()}" if not computing_session_id else computing_session_id
  79. if self.is_computing_valid:
  80. raise RuntimeError(f"computing session already valid")
  81. if record:
  82. self.save_record(engine_type=EngineType.COMPUTING,
  83. engine_name=self._computing_type,
  84. engine_session_id=computing_session_id)
  85. if self._computing_type == ComputingEngine.STANDALONE:
  86. from fate_arch.computing.standalone import CSession
  87. options = kwargs.get("options", {})
  88. self._computing_session = CSession(session_id=computing_session_id, options=options)
  89. self._computing_type = ComputingEngine.STANDALONE
  90. return self
  91. if self._computing_type == ComputingEngine.EGGROLL:
  92. from fate_arch.computing.eggroll import CSession
  93. options = kwargs.get("options", {})
  94. self._computing_session = CSession(
  95. session_id=computing_session_id, options=options
  96. )
  97. return self
  98. if self._computing_type == ComputingEngine.SPARK:
  99. from fate_arch.computing.spark import CSession
  100. self._computing_session = CSession(session_id=computing_session_id)
  101. self._computing_type = ComputingEngine.SPARK
  102. return self
  103. if self._computing_type == ComputingEngine.LINKIS_SPARK:
  104. from fate_arch.computing.spark import CSession
  105. self._computing_session = CSession(session_id=computing_session_id)
  106. self._computing_type = ComputingEngine.LINKIS_SPARK
  107. return self
  108. raise RuntimeError(f"{self._computing_type} not supported")
  109. def init_federation(
  110. self,
  111. federation_session_id: str,
  112. *,
  113. runtime_conf: typing.Optional[dict] = None,
  114. parties_info: typing.Optional[PartiesInfo] = None,
  115. service_conf: typing.Optional[dict] = None,
  116. record: bool = True,
  117. ):
  118. if record:
  119. self.save_record(engine_type=EngineType.FEDERATION,
  120. engine_name=self._federation_type,
  121. engine_session_id=federation_session_id,
  122. engine_runtime_conf={"runtime_conf": runtime_conf, "service_conf": service_conf})
  123. if parties_info is None:
  124. if runtime_conf is None:
  125. raise RuntimeError(f"`party_info` and `runtime_conf` are both `None`")
  126. parties_info = PartiesInfo.from_conf(runtime_conf)
  127. self._parties_info = parties_info
  128. self._all_party_info = [Party(k, p) for k, v in runtime_conf['role'].items() for p in v]
  129. if self.is_federation_valid:
  130. raise RuntimeError("federation session already valid")
  131. if self._federation_type == FederationEngine.STANDALONE:
  132. from fate_arch.computing.standalone import CSession
  133. from fate_arch.federation.standalone import Federation
  134. if not self.is_computing_valid or not isinstance(
  135. self._computing_session, CSession
  136. ):
  137. raise RuntimeError(
  138. f"require computing with type {ComputingEngine.STANDALONE} valid"
  139. )
  140. self._federation_session = Federation(
  141. standalone_session=self._computing_session.get_standalone_session(),
  142. federation_session_id=federation_session_id,
  143. party=parties_info.local_party,
  144. )
  145. return self
  146. if self._federation_type == FederationEngine.EGGROLL:
  147. from fate_arch.computing.eggroll import CSession
  148. from fate_arch.federation.eggroll import Federation
  149. if not self.is_computing_valid or not isinstance(
  150. self._computing_session, CSession
  151. ):
  152. raise RuntimeError(
  153. f"require computing with type {ComputingEngine.EGGROLL} valid"
  154. )
  155. self._federation_session = Federation(
  156. rp_ctx=self._computing_session.get_rpc(),
  157. rs_session_id=federation_session_id,
  158. party=parties_info.local_party,
  159. proxy_endpoint=f"{service_conf['host']}:{service_conf['port']}",
  160. )
  161. return self
  162. if self._federation_type == FederationEngine.RABBITMQ:
  163. from fate_arch.federation.rabbitmq import Federation
  164. self._federation_session = Federation.from_conf(
  165. federation_session_id=federation_session_id,
  166. party=parties_info.local_party,
  167. runtime_conf=runtime_conf,
  168. rabbitmq_config=service_conf,
  169. )
  170. return self
  171. # Add pulsar support
  172. if self._federation_type == FederationEngine.PULSAR:
  173. from fate_arch.federation.pulsar import Federation
  174. self._federation_session = Federation.from_conf(
  175. federation_session_id=federation_session_id,
  176. party=parties_info.local_party,
  177. runtime_conf=runtime_conf,
  178. pulsar_config=service_conf,
  179. )
  180. return self
  181. raise RuntimeError(f"{self._federation_type} not supported")
  182. def _get_or_create_storage(self,
  183. storage_session_id=None,
  184. storage_engine=None,
  185. record: bool = True,
  186. **kwargs) -> StorageSessionABC:
  187. storage_session_id = f"{self._session_id}_storage_{uuid.uuid1()}" if not storage_session_id else storage_session_id
  188. if storage_session_id in self._storage_session:
  189. return self._storage_session[storage_session_id]
  190. else:
  191. if storage_engine is None:
  192. storage_engine = self._storage_engine
  193. for session in self._storage_session.values():
  194. if storage_engine == session.engine:
  195. return session
  196. if record:
  197. self.save_record(engine_type=EngineType.STORAGE,
  198. engine_name=storage_engine,
  199. engine_session_id=storage_session_id)
  200. if storage_engine == StorageEngine.EGGROLL:
  201. from fate_arch.storage.eggroll import StorageSession
  202. storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
  203. elif storage_engine == StorageEngine.STANDALONE:
  204. from fate_arch.storage.standalone import StorageSession
  205. storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
  206. elif storage_engine == StorageEngine.MYSQL:
  207. from fate_arch.storage.mysql import StorageSession
  208. storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
  209. elif storage_engine == StorageEngine.HDFS:
  210. from fate_arch.storage.hdfs import StorageSession
  211. storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
  212. elif storage_engine == StorageEngine.HIVE:
  213. from fate_arch.storage.hive import StorageSession
  214. storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
  215. elif storage_engine == StorageEngine.LINKIS_HIVE:
  216. from fate_arch.storage.linkis_hive import StorageSession
  217. storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
  218. elif storage_engine == StorageEngine.PATH:
  219. from fate_arch.storage.path import StorageSession
  220. storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
  221. elif storage_engine == StorageEngine.LOCALFS:
  222. from fate_arch.storage.localfs import StorageSession
  223. storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
  224. elif storage_engine == StorageEngine.API:
  225. from fate_arch.storage.api import StorageSession
  226. storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
  227. else:
  228. raise NotImplementedError(f"can not be initialized with storage engine: {storage_engine}")
  229. self._storage_session[storage_session_id] = storage_session
  230. return storage_session
  231. def get_table(self, name, namespace, ignore_disable=False) -> typing.Union[StorageTableABC, None]:
  232. meta = Session.get_table_meta(name=name, namespace=namespace)
  233. if meta is None:
  234. return None
  235. if meta.get_disable() and not ignore_disable:
  236. raise Exception(f"table {namespace} {name} disable: {meta.get_disable()}")
  237. engine = meta.get_engine()
  238. storage_session = self._get_or_create_storage(storage_engine=engine)
  239. table = storage_session.get_table(name=name, namespace=namespace)
  240. return table
  241. @classmethod
  242. def get_table_meta(cls, name, namespace) -> typing.Union[StorageTableMetaABC, None]:
  243. meta = StorageSessionBase.get_table_meta(name=name, namespace=namespace)
  244. return meta
  245. @classmethod
  246. def persistent(cls, computing_table: CTableABC, namespace, name, schema=None, part_of_data=None,
  247. engine=None, engine_address=None, store_type=None, token: typing.Dict = None) -> StorageTableMetaABC:
  248. return StorageSessionBase.persistent(computing_table=computing_table,
  249. namespace=namespace,
  250. name=name,
  251. schema=schema,
  252. part_of_data=part_of_data,
  253. engine=engine,
  254. engine_address=engine_address,
  255. store_type=store_type,
  256. token=token)
  257. @property
  258. def computing(self) -> CSessionABC:
  259. return self._computing_session
  260. @property
  261. def federation(self) -> FederationABC:
  262. return self._federation_session
  263. def storage(self, **kwargs):
  264. return self._get_or_create_storage(**kwargs)
  265. @property
  266. def parties(self):
  267. return self._parties_info
  268. @property
  269. def is_computing_valid(self):
  270. return self._computing_session is not None
  271. @property
  272. def is_federation_valid(self):
  273. return self._federation_session is not None
  274. @DB.connection_context()
  275. def save_record(self, engine_type, engine_name, engine_session_id, engine_runtime_conf=None):
  276. self._logger.info(
  277. f"try to save session record for manager {self._session_id}, {engine_type} {engine_name}"
  278. f" {engine_session_id}")
  279. session_record = SessionRecord()
  280. session_record.f_manager_session_id = self._session_id
  281. session_record.f_engine_type = engine_type
  282. session_record.f_engine_name = engine_name
  283. session_record.f_engine_session_id = engine_session_id
  284. session_record.f_engine_address = engine_runtime_conf if engine_runtime_conf else {}
  285. session_record.f_create_time = base_utils.current_timestamp()
  286. msg = f"save storage session record for manager {self._session_id}, {engine_type} {engine_name} " \
  287. f"{engine_session_id}"
  288. try:
  289. effect_count = session_record.save(force_insert=True)
  290. if effect_count != 1:
  291. raise RuntimeError(f"{msg} failed")
  292. except peewee.IntegrityError as e:
  293. LOGGER.warning(e)
  294. except Exception as e:
  295. raise RuntimeError(f"{msg} exception", e)
  296. self._logger.info(
  297. f"save session record for manager {self._session_id}, {engine_type} {engine_name} "
  298. f"{engine_session_id} successfully")
  299. @DB.connection_context()
  300. def delete_session_record(self, engine_session_id, manager_session_id=None):
  301. if not manager_session_id:
  302. rows = SessionRecord.delete().where(SessionRecord.f_engine_session_id == engine_session_id).execute()
  303. else:
  304. rows = SessionRecord.delete().where(SessionRecord.f_engine_session_id == engine_session_id,
  305. SessionRecord.f_manager_session_id == manager_session_id).execute()
  306. if rows > 0:
  307. self._logger.info(f"delete session {engine_session_id} record successfully")
  308. else:
  309. self._logger.warning(f"delete session {engine_session_id} record failed")
  310. @classmethod
  311. @DB.connection_context()
  312. def query_sessions(cls, reverse=None, order_by=None, **kwargs):
  313. try:
  314. session_records = SessionRecord.query(reverse=reverse, order_by=order_by, **kwargs)
  315. return session_records
  316. except BaseException:
  317. return []
  318. @DB.connection_context()
  319. def get_session_from_record(self, **kwargs):
  320. self._logger.info(f"query by manager session id {self._session_id}")
  321. session_records = self.query_sessions(manager_session_id=self.session_id, **kwargs)
  322. self._logger.info([session_record.f_engine_session_id for session_record in session_records])
  323. for session_record in session_records:
  324. try:
  325. engine_session_id = session_record.f_engine_session_id
  326. if session_record.f_engine_type == EngineType.COMPUTING:
  327. self._init_computing_if_not_valid(computing_session_id=engine_session_id)
  328. elif session_record.f_engine_type == EngineType.STORAGE:
  329. self._get_or_create_storage(storage_session_id=engine_session_id,
  330. storage_engine=session_record.f_engine_name,
  331. record=False)
  332. elif session_record.f_engine_type == EngineType.FEDERATION:
  333. self._logger.info(f"engine runtime conf: {session_record.f_engine_address}")
  334. self._init_federation_if_not_valid(federation_session_id=engine_session_id,
  335. engine_runtime_conf=session_record.f_engine_address)
  336. except Exception as e:
  337. self._logger.info(e)
  338. self.delete_session_record(engine_session_id=session_record.f_engine_session_id)
  339. def _init_computing_if_not_valid(self, computing_session_id):
  340. if not self.is_computing_valid:
  341. self.init_computing(computing_session_id=computing_session_id, record=False)
  342. return True
  343. elif self._computing_session.session_id != computing_session_id:
  344. self._logger.warning(
  345. f"manager session had computing session {self._computing_session.session_id} "
  346. f"different with query from db session {computing_session_id}")
  347. return False
  348. else:
  349. # already exists
  350. return True
  351. def _init_federation_if_not_valid(self, federation_session_id, engine_runtime_conf):
  352. if not self.is_federation_valid:
  353. try:
  354. self._logger.info(f"init federation session {federation_session_id} type {self._federation_type}")
  355. self.init_federation(federation_session_id=federation_session_id,
  356. runtime_conf=engine_runtime_conf.get("runtime_conf"),
  357. service_conf=engine_runtime_conf.get("service_conf"),
  358. record=False)
  359. self._logger.info(f"init federation session {federation_session_id} type {self._federation_type} done")
  360. return True
  361. except Exception as e:
  362. self._logger.warning(
  363. f"init federation session {federation_session_id} type {self._federation_type} failed: {e}")
  364. return False
  365. elif self._federation_session.session_id != federation_session_id:
  366. self._logger.warning(
  367. f"manager session had federation session {self._federation_session.session_id} different with query from db session {federation_session_id}")
  368. return False
  369. else:
  370. # already exists
  371. return True
  372. def destroy_all_sessions(self, **kwargs):
  373. self._logger.info(f"start destroy manager session {self._session_id} all sessions")
  374. self.get_session_from_record(**kwargs)
  375. self.destroy_federation_session()
  376. self.destroy_storage_session()
  377. self.destroy_computing_session()
  378. self._logger.info(f"finish destroy manager session {self._session_id} all sessions")
  379. def destroy_computing_session(self):
  380. if self.is_computing_valid:
  381. try:
  382. self._logger.info(f"try to destroy computing session {self._computing_session.session_id}")
  383. self._computing_session.destroy()
  384. except Exception as e:
  385. self._logger.info(f"destroy computing session {self._computing_session.session_id} failed", e)
  386. self.delete_session_record(engine_session_id=self._computing_session.session_id)
  387. self._computing_session = None
  388. def destroy_storage_session(self):
  389. for session_id, session in self._storage_session.items():
  390. try:
  391. self._logger.info(f"try to destroy storage session {session_id}")
  392. session.destroy()
  393. self._logger.info(f"destroy storage session {session_id} successfully")
  394. except Exception as e:
  395. self._logger.exception(f"destroy storage session {session_id} failed", e)
  396. self.delete_session_record(engine_session_id=session_id)
  397. self._storage_session = {}
  398. def destroy_federation_session(self):
  399. if self.is_federation_valid:
  400. try:
  401. if self._parties_info.local_party.role != "local":
  402. self._logger.info(
  403. f"try to destroy federation session {self._federation_session.session_id} type"
  404. f" {EngineType.FEDERATION} role {self._parties_info.local_party.role}")
  405. self._federation_session.destroy(parties=self._all_party_info)
  406. self._logger.info(f"destroy federation session {self._federation_session.session_id} done")
  407. except Exception as e:
  408. self._logger.info(f"destroy federation failed: {e}")
  409. self.delete_session_record(engine_session_id=self._federation_session.session_id,
  410. manager_session_id=self.session_id)
  411. self._federation_session = None
  412. def wait_remote_all_done(self, timeout=None):
  413. LOGGER.info(f"remote futures: {remote_status._remote_futures}, waiting...")
  414. remote_status.wait_all_remote_done(timeout)
  415. LOGGER.info(f"remote futures: {remote_status._remote_futures}, all done")
  416. def get_session() -> Session:
  417. return Session.get_global()
  418. def get_parties() -> PartiesInfo:
  419. return get_session().parties
  420. def get_computing_session() -> CSessionABC:
  421. return get_session().computing
  422. # noinspection PyPep8Naming
  423. class computing_session(object):
  424. @staticmethod
  425. def init(session_id, options=None):
  426. Session(options=options).as_global().init_computing(session_id)
  427. @staticmethod
  428. def parallelize(data: typing.Iterable, partition: int, include_key: bool, **kwargs) -> CTableABC:
  429. return get_computing_session().parallelize(data, partition=partition, include_key=include_key, **kwargs)
  430. @staticmethod
  431. def stop():
  432. return get_computing_session().stop()