_federation.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from fate_arch.common import Party
  17. from fate_arch.common import file_utils
  18. from fate_arch.common.log import getLogger
  19. from fate_arch.federation._federation import FederationBase
  20. from fate_arch.federation.pulsar._mq_channel import (
  21. MQChannel,
  22. DEFAULT_TENANT,
  23. DEFAULT_CLUSTER,
  24. DEFAULT_SUBSCRIPTION_NAME,
  25. )
  26. from fate_arch.federation.pulsar._pulsar_manager import PulsarManager
  27. LOGGER = getLogger()
  28. # default message max size in bytes = 1MB
  29. DEFAULT_MESSAGE_MAX_SIZE = 104857
  30. class MQ(object):
  31. def __init__(self, host, port, route_table):
  32. self.host = host
  33. self.port = port
  34. self.route_table = route_table
  35. def __str__(self):
  36. return (
  37. f"MQ(host={self.host}, port={self.port} "
  38. f"route_table={self.route_table}), "
  39. f"type=pulsar"
  40. )
  41. def __repr__(self):
  42. return self.__str__()
  43. class _TopicPair(object):
  44. def __init__(self, tenant, namespace, send, receive):
  45. self.tenant = tenant
  46. self.namespace = namespace
  47. self.send = send
  48. self.receive = receive
  49. class Federation(FederationBase):
  50. @staticmethod
  51. def from_conf(
  52. federation_session_id: str,
  53. party: Party,
  54. runtime_conf: dict,
  55. **kwargs
  56. ):
  57. pulsar_config = kwargs["pulsar_config"]
  58. LOGGER.debug(f"pulsar_config: {pulsar_config}")
  59. host = pulsar_config.get("host", "localhost")
  60. port = pulsar_config.get("port", "6650")
  61. mng_port = pulsar_config.get("mng_port", "8080")
  62. topic_ttl = int(pulsar_config.get("topic_ttl", 0))
  63. cluster = pulsar_config.get("cluster", DEFAULT_CLUSTER)
  64. # tenant name should be unified between parties
  65. tenant = pulsar_config.get("tenant", DEFAULT_TENANT)
  66. # max_message_size;
  67. max_message_size = int(pulsar_config.get("max_message_size", DEFAULT_MESSAGE_MAX_SIZE))
  68. pulsar_run = runtime_conf.get(
  69. "job_parameters", {}).get("pulsar_run", {})
  70. LOGGER.debug(f"pulsar_run: {pulsar_run}")
  71. max_message_size = int(pulsar_run.get(
  72. "max_message_size", max_message_size))
  73. LOGGER.debug(f"set max message size to {max_message_size} Bytes")
  74. # topic ttl could be overwritten by run time config
  75. topic_ttl = int(pulsar_run.get("topic_ttl", topic_ttl))
  76. # pulsar not use user and password so far
  77. # TODO add credential to connections
  78. base_user = pulsar_config.get("user")
  79. base_password = pulsar_config.get("password")
  80. mode = pulsar_config.get("mode", "replication")
  81. pulsar_manager = PulsarManager(
  82. host=host, port=mng_port, runtime_config=pulsar_run
  83. )
  84. # init tenant
  85. tenant_info = pulsar_manager.get_tenant(tenant=tenant).json()
  86. if tenant_info.get("allowedClusters") is None:
  87. pulsar_manager.create_tenant(
  88. tenant=tenant, admins=[], clusters=[cluster])
  89. route_table_path = pulsar_config.get("route_table")
  90. if route_table_path is None:
  91. route_table_path = "conf/pulsar_route_table.yaml"
  92. route_table = file_utils.load_yaml_conf(conf_path=route_table_path)
  93. mq = MQ(host, port, route_table)
  94. conf = pulsar_manager.runtime_config.get(
  95. "connection", {}
  96. )
  97. LOGGER.debug(f"federation mode={mode}")
  98. return Federation(
  99. federation_session_id,
  100. party,
  101. mq,
  102. pulsar_manager,
  103. max_message_size,
  104. topic_ttl,
  105. cluster,
  106. tenant,
  107. conf,
  108. mode
  109. )
  110. def __init__(self, session_id, party: Party, mq: MQ, pulsar_manager: PulsarManager, max_message_size, topic_ttl,
  111. cluster, tenant, conf, mode):
  112. super().__init__(session_id=session_id, party=party, mq=mq, max_message_size=max_message_size, conf=conf)
  113. self._pulsar_manager = pulsar_manager
  114. self._topic_ttl = topic_ttl
  115. self._cluster = cluster
  116. self._tenant = tenant
  117. self._mode = mode
  118. def __getstate__(self):
  119. pass
  120. def destroy(self, parties):
  121. # The idea cleanup strategy is to consume all message in topics,
  122. # and let pulsar cluster to collect the used topics.
  123. LOGGER.debug("[pulsar.cleanup]start to cleanup...")
  124. # 1. remove subscription
  125. response = self._pulsar_manager.unsubscribe_namespace_all_topics(
  126. tenant=self._tenant,
  127. namespace=self._session_id,
  128. subscription_name=DEFAULT_SUBSCRIPTION_NAME,
  129. )
  130. if response.ok:
  131. LOGGER.debug("successfully unsubscribe all topics")
  132. else:
  133. LOGGER.error(response.text)
  134. # 2. reset retention policy
  135. response = self._pulsar_manager.set_retention(
  136. self._tenant,
  137. self._session_id,
  138. retention_time_in_minutes=0,
  139. retention_size_in_MB=0,
  140. )
  141. if response.ok:
  142. LOGGER.debug("successfully reset all retention policy")
  143. else:
  144. LOGGER.error(response.text)
  145. # 3. remove cluster from namespace
  146. response = self._pulsar_manager.set_clusters_to_namespace(
  147. self._tenant, self._session_id, [self._cluster]
  148. )
  149. if response.ok:
  150. LOGGER.debug("successfully reset all replicated cluster")
  151. else:
  152. LOGGER.error(response.text)
  153. # # 4. remove namespace
  154. # response = self._pulsar_manager.delete_namespace(
  155. # self._tenant, self._session_id
  156. # )
  157. # if response.ok:
  158. # LOGGER.debug(f"successfully delete namespace={self._session_id}")
  159. # else:
  160. # LOGGER.error(response.text)
  161. def _maybe_create_topic_and_replication(self, party, topic_suffix):
  162. if self._mode == "replication":
  163. return self._create_topic_by_replication_mode(party, topic_suffix)
  164. if self._mode == "client":
  165. return self._create_topic_by_client_mode(party, topic_suffix)
  166. raise ValueError("mode={self._mode} is not support!")
  167. def _create_topic_by_client_mode(self, party, topic_suffix):
  168. send_topic_name = f"{self._party.role}-{self._party.party_id}-{party.role}-{party.party_id}-{topic_suffix}"
  169. receive_topic_name = f"{party.role}-{party.party_id}-{self._party.role}-{self._party.party_id}-{topic_suffix}"
  170. # topic_pair is a pair of topic for sending and receiving message respectively
  171. topic_pair = _TopicPair(
  172. tenant=self._tenant,
  173. namespace=self._session_id,
  174. send=send_topic_name,
  175. receive=receive_topic_name,
  176. )
  177. # init pulsar namespace
  178. namespaces = self._pulsar_manager.get_namespace(
  179. self._tenant).json()
  180. # create namespace
  181. if f"{self._tenant}/{self._session_id}" not in namespaces:
  182. # append target cluster to the pulsar namespace
  183. code = self._pulsar_manager.create_namespace(
  184. self._tenant, self._session_id
  185. ).status_code
  186. # according to https://pulsar.apache.org/admin-rest-api/?version=2.7.0&apiversion=v2#operation/getPolicies
  187. # return 409 if existed
  188. # return 204 if ok
  189. if code == 204 or code == 409:
  190. LOGGER.debug(
  191. "successfully create pulsar namespace: %s", self._session_id
  192. )
  193. else:
  194. raise Exception(
  195. "unable to create pulsar namespace with status code: {}".format(
  196. code
  197. )
  198. )
  199. # set message ttl for the namespace
  200. response = self._pulsar_manager.set_retention(
  201. self._tenant,
  202. self._session_id,
  203. retention_time_in_minutes=int(self._topic_ttl),
  204. retention_size_in_MB=-1,
  205. )
  206. LOGGER.debug(response.text)
  207. if response.ok:
  208. LOGGER.debug(
  209. "successfully set message ttl to namespace: {} about {} mintues".format(
  210. self._session_id, self._topic_ttl
  211. )
  212. )
  213. else:
  214. LOGGER.debug("failed to set message ttl to namespace")
  215. return topic_pair
  216. def _create_topic_by_replication_mode(self, party, topic_suffix):
  217. send_topic_name = f"{self._party.role}-{self._party.party_id}-{party.role}-{party.party_id}-{topic_suffix}"
  218. receive_topic_name = f"{party.role}-{party.party_id}-{self._party.role}-{self._party.party_id}-{topic_suffix}"
  219. # topic_pair is a pair of topic for sending and receiving message respectively
  220. topic_pair = _TopicPair(
  221. tenant=self._tenant,
  222. namespace=self._session_id,
  223. send=send_topic_name,
  224. receive=receive_topic_name,
  225. )
  226. if party.party_id == self._party.party_id:
  227. LOGGER.debug(
  228. "connecting to local broker, skipping cluster creation"
  229. )
  230. else:
  231. # init pulsar cluster
  232. cluster = self._pulsar_manager.get_cluster(
  233. party.party_id).json()
  234. if (
  235. cluster.get("brokerServiceUrl", "") == ""
  236. and cluster.get("brokerServiceUrlTls", "") == ""
  237. ):
  238. LOGGER.debug(
  239. "pulsar cluster with name %s does not exist or broker url is empty, creating...",
  240. party.party_id,
  241. )
  242. remote_party = self._mq.route_table.get(
  243. int(party.party_id), None
  244. )
  245. # handle party does not exist in route table first
  246. if remote_party is None:
  247. domain = self._mq.route_table.get(
  248. "default").get("domain")
  249. host = f"{party.party_id}.{domain}"
  250. port = self._mq.route_table.get("default").get(
  251. "brokerPort", "6650"
  252. )
  253. sslPort = self._mq.route_table.get("default").get(
  254. "brokerSslPort", "6651"
  255. )
  256. proxy = self._mq.route_table.get(
  257. "default").get("proxy", "")
  258. # fetch party info from the route table
  259. else:
  260. host = self._mq.route_table.get(int(party.party_id)).get(
  261. "host"
  262. )
  263. port = self._mq.route_table.get(int(party.party_id)).get(
  264. "port", "6650"
  265. )
  266. sslPort = self._mq.route_table.get(int(party.party_id)).get(
  267. "sslPort", "6651"
  268. )
  269. proxy = self._mq.route_table.get(int(party.party_id)).get(
  270. "proxy", ""
  271. )
  272. broker_url = f"pulsar://{host}:{port}"
  273. broker_url_tls = f"pulsar+ssl://{host}:{sslPort}"
  274. if proxy != "":
  275. proxy = f"pulsar+ssl://{proxy}"
  276. if self._pulsar_manager.create_cluster(
  277. cluster_name=party.party_id,
  278. broker_url=broker_url,
  279. broker_url_tls=broker_url_tls,
  280. proxy_url=proxy,
  281. ).ok:
  282. LOGGER.debug(
  283. "pulsar cluster with name: %s, broker_url: %s created",
  284. party.party_id,
  285. broker_url,
  286. )
  287. elif self._pulsar_manager.update_cluster(
  288. cluster_name=party.party_id,
  289. broker_url=broker_url,
  290. broker_url_tls=broker_url_tls,
  291. proxy_url=proxy,
  292. ).ok:
  293. LOGGER.debug(
  294. "pulsar cluster with name: %s, broker_url: %s updated",
  295. party.party_id,
  296. broker_url,
  297. )
  298. else:
  299. error_message = (
  300. "unable to create pulsar cluster: %s".format(
  301. party.party_id
  302. )
  303. )
  304. LOGGER.error(error_message)
  305. # just leave this alone.
  306. raise Exception(error_message)
  307. # update tenant
  308. tenant_info = self._pulsar_manager.get_tenant(
  309. self._tenant).json()
  310. if party.party_id not in tenant_info["allowedClusters"]:
  311. tenant_info["allowedClusters"].append(party.party_id)
  312. if self._pulsar_manager.update_tenant(
  313. self._tenant,
  314. tenant_info.get("admins", []),
  315. tenant_info.get(
  316. "allowedClusters",
  317. ),
  318. ).ok:
  319. LOGGER.debug(
  320. "successfully update tenant with cluster: %s",
  321. party.party_id,
  322. )
  323. else:
  324. raise Exception("unable to update tenant")
  325. # TODO: remove this for the loop
  326. # init pulsar namespace
  327. namespaces = self._pulsar_manager.get_namespace(
  328. self._tenant).json()
  329. # create namespace
  330. if f"{self._tenant}/{self._session_id}" not in namespaces:
  331. # append target cluster to the pulsar namespace
  332. clusters = [self._cluster]
  333. if (
  334. party.party_id != self._party.party_id
  335. and party.party_id not in clusters
  336. ):
  337. clusters.append(party.party_id)
  338. policy = {"replication_clusters": clusters}
  339. code = self._pulsar_manager.create_namespace(
  340. self._tenant, self._session_id, policies=policy
  341. ).status_code
  342. # according to https://pulsar.apache.org/admin-rest-api/?version=2.7.0&apiversion=v2#operation/getPolicies
  343. # return 409 if existed
  344. # return 204 if ok
  345. if code == 204 or code == 409:
  346. LOGGER.debug(
  347. "successfully create pulsar namespace: %s", self._session_id
  348. )
  349. else:
  350. raise Exception(
  351. "unable to create pulsar namespace with status code: {}".format(
  352. code
  353. )
  354. )
  355. # set message ttl for the namespace
  356. response = self._pulsar_manager.set_retention(
  357. self._tenant,
  358. self._session_id,
  359. retention_time_in_minutes=int(self._topic_ttl),
  360. retention_size_in_MB=-1,
  361. )
  362. LOGGER.debug(response.text)
  363. if response.ok:
  364. LOGGER.debug(
  365. "successfully set message ttl to namespace: {} about {} mintues".format(
  366. self._session_id, self._topic_ttl
  367. )
  368. )
  369. else:
  370. LOGGER.debug("failed to set message ttl to namespace")
  371. # update party to namespace
  372. else:
  373. if party.party_id != self._party.party_id:
  374. clusters = self._pulsar_manager.get_cluster_from_namespace(
  375. self._tenant, self._session_id
  376. ).json()
  377. if party.party_id not in clusters:
  378. clusters.append(party.party_id)
  379. if self._pulsar_manager.set_clusters_to_namespace(
  380. self._tenant, self._session_id, clusters
  381. ).ok:
  382. LOGGER.debug(
  383. "successfully set clusters: {} to pulsar namespace: {}".format(
  384. clusters, self._session_id
  385. )
  386. )
  387. else:
  388. raise Exception(
  389. "unable to update clusters: {} to pulsar namespaces: {}".format(
  390. clusters, self._session_id
  391. )
  392. )
  393. return topic_pair
  394. def _get_channel(self, topic_pair: _TopicPair, src_party_id, src_role, dst_party_id, dst_role, mq=None,
  395. conf: dict = None):
  396. return MQChannel(
  397. host=mq.host,
  398. port=mq.port,
  399. tenant=topic_pair.tenant,
  400. namespace=topic_pair.namespace,
  401. send_topic=topic_pair.send,
  402. receive_topic=topic_pair.receive,
  403. src_party_id=src_party_id,
  404. src_role=src_role,
  405. dst_party_id=dst_party_id,
  406. dst_role=dst_role,
  407. credential=None,
  408. extra_args=conf,
  409. )
  410. def _get_consume_message(self, channel_info):
  411. while True:
  412. message = channel_info.consume()
  413. body = message.data()
  414. properties = message.properties()
  415. message_id = message.message_id()
  416. yield message_id, properties, body
  417. def _consume_ack(self, channel_info, id):
  418. channel_info.ack(message=id)