_mq_channel.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import pulsar
  17. from fate_arch.common import log
  18. from fate_arch.federation._nretry import nretry
  19. LOGGER = log.getLogger()
  20. CHANNEL_TYPE_PRODUCER = "producer"
  21. CHANNEL_TYPE_CONSUMER = "consumer"
  22. DEFAULT_TENANT = "fl-tenant"
  23. DEFAULT_CLUSTER = "standalone"
  24. TOPIC_PREFIX = "{}/{}/{}"
  25. UNIQUE_PRODUCER_NAME = "unique_producer"
  26. UNIQUE_CONSUMER_NAME = "unique_consumer"
  27. DEFAULT_SUBSCRIPTION_NAME = "unique"
  28. # A channel cloud only be able to send or receive message.
  29. class MQChannel(object):
  30. # TODO add credential to secure pulsar cluster
  31. def __init__(
  32. self,
  33. host,
  34. port,
  35. tenant,
  36. namespace,
  37. send_topic,
  38. receive_topic,
  39. src_party_id,
  40. src_role,
  41. dst_party_id,
  42. dst_role,
  43. credential=None,
  44. extra_args: dict = None,
  45. ):
  46. # "host:port" is used to connect the pulsar broker
  47. self._host = host
  48. self._port = port
  49. self._tenant = tenant
  50. self._namespace = namespace
  51. self._send_topic = send_topic
  52. self._receive_topic = receive_topic
  53. self._credential = credential
  54. self._src_party_id = src_party_id
  55. self._src_role = src_role
  56. self._dst_party_id = dst_party_id
  57. self._dst_role = dst_role
  58. self._extra_args = extra_args
  59. # "_channel" is the subscriptor for the topic
  60. self._producer_send = None
  61. self._producer_conn = None
  62. self._consumer_receive = None
  63. self._consumer_conn = None
  64. self._sequence_id = None
  65. # these are pulsar message id
  66. self._latest_confirmed = None
  67. self._first_confirmed = None
  68. self._subscription_config = {}
  69. if self._extra_args.get("subscription") is not None:
  70. self._subscription_config.update(self._extra_args["subscription"])
  71. self._producer_config = {}
  72. if self._extra_args.get("producer") is not None:
  73. self._producer_config.update(self._extra_args["producer"])
  74. self._consumer_config = {}
  75. if self._extra_args.get("consumer") is not None:
  76. self._consumer_config.update(self._extra_args["consumer"])
  77. # splitting the creation of producer and producer to avoid resource wasted
  78. @nretry
  79. def produce(self, body, properties):
  80. self._get_or_create_producer()
  81. LOGGER.debug("send queue: {}".format(self._producer_send.topic()))
  82. LOGGER.debug("send data size: {}".format(len(body)))
  83. message_id = self._producer_send.send(
  84. content=body, properties=properties)
  85. if message_id is None:
  86. raise Exception("publish failed")
  87. self._sequence_id = message_id
  88. @nretry
  89. def consume(self):
  90. self._get_or_create_consumer()
  91. try:
  92. LOGGER.debug("receive topic: {}".format(
  93. self._consumer_receive.topic()))
  94. receive_timeout = self._consumer_config.get(
  95. 'receive_timeout_millis', None)
  96. if receive_timeout is not None:
  97. LOGGER.debug(
  98. f"receive timeout millis {receive_timeout}")
  99. message = self._consumer_receive.receive(
  100. timeout_millis=receive_timeout)
  101. return message
  102. except Exception:
  103. self._consumer_receive.seek(pulsar.MessageId.earliest)
  104. raise TimeoutError("meet receive timeout, try to reset the cursor")
  105. @nretry
  106. def ack(self, message):
  107. # assume consumer is alive
  108. try:
  109. self._consumer_receive.acknowledge(message)
  110. self._latest_confirmed = message
  111. if self._first_confirmed is None:
  112. self._first_confirmed = message
  113. except Exception as e:
  114. LOGGER.debug("meet {} when trying to ack message".format(e))
  115. self._get_or_create_consumer()
  116. self._consumer_receive.negative_acknowledge(message)
  117. @nretry
  118. def unack_all(self):
  119. self._get_or_create_consumer()
  120. self._consumer_receive.seek(pulsar.MessageId.earliest)
  121. @nretry
  122. def cancel(self):
  123. if self._consumer_conn is not None:
  124. try:
  125. self._consumer_receive.close()
  126. self._consumer_conn.close()
  127. except Exception as e:
  128. LOGGER.debug("meet {} when trying to close consumer".format(e))
  129. self._consumer_receive = None
  130. self._consumer_conn = None
  131. if self._producer_conn is not None:
  132. try:
  133. self._producer_send.close()
  134. self._producer_conn.close()
  135. except Exception as e:
  136. LOGGER.debug("meet {} when trying to close producer".format(e))
  137. self._producer_send = None
  138. self._producer_conn = None
  139. def _get_or_create_producer(self):
  140. if self._check_producer_alive() != True:
  141. # if self._producer_conn is None:
  142. try:
  143. self._producer_conn = pulsar.Client(
  144. service_url="pulsar://{}:{}".format(
  145. self._host, self._port),
  146. operation_timeout_seconds=30,
  147. )
  148. except Exception as e:
  149. self._producer_conn = None
  150. # alway used current client to fetch producer
  151. try:
  152. self._producer_send = self._producer_conn.create_producer(
  153. TOPIC_PREFIX.format(
  154. self._tenant, self._namespace, self._send_topic
  155. ),
  156. producer_name=UNIQUE_PRODUCER_NAME,
  157. send_timeout_millis=60000,
  158. max_pending_messages=500,
  159. compression_type=pulsar.CompressionType.LZ4,
  160. **self._producer_config,
  161. )
  162. except Exception as e:
  163. LOGGER.debug(
  164. f"catch exception {e} in creating pulsar producer")
  165. self._producer_conn = None
  166. def _get_or_create_consumer(self):
  167. if not self._check_consumer_alive():
  168. try:
  169. self._consumer_conn = pulsar.Client(
  170. service_url="pulsar://{}:{}".format(
  171. self._host, self._port),
  172. operation_timeout_seconds=30,
  173. )
  174. except Exception:
  175. self._consumer_conn = None
  176. try:
  177. self._consumer_receive = self._consumer_conn.subscribe(
  178. TOPIC_PREFIX.format(
  179. self._tenant, self._namespace, self._receive_topic
  180. ),
  181. subscription_name=DEFAULT_SUBSCRIPTION_NAME,
  182. consumer_name=UNIQUE_CONSUMER_NAME,
  183. initial_position=pulsar.InitialPosition.Earliest,
  184. replicate_subscription_state_enabled=True,
  185. **self._subscription_config,
  186. )
  187. # set cursor to latest confirmed
  188. if self._latest_confirmed is not None:
  189. self._consumer_receive.seek(self._latest_confirmed)
  190. except Exception as e:
  191. LOGGER.debug(
  192. f"catch exception {e} in creating pulsar consumer")
  193. self._consumer_conn.close()
  194. self._consumer_conn = None
  195. def _check_producer_alive(self):
  196. if self._producer_conn is None or self._producer_send is None:
  197. return False
  198. try:
  199. self._producer_conn.get_topic_partitions("test-alive")
  200. self._producer_send.flush()
  201. return True
  202. except Exception as e:
  203. LOGGER.debug("catch {}, closing producer client".format(e))
  204. if self._producer_conn is not None:
  205. try:
  206. self._producer_conn.close()
  207. except Exception:
  208. pass
  209. self._producer_conn = None
  210. self._producer_send = None
  211. return False
  212. def _check_consumer_alive(self):
  213. try:
  214. if self._latest_confirmed is not None:
  215. self._consumer_receive.acknowledge(self._latest_confirmed)
  216. return True
  217. else:
  218. return False
  219. except Exception as e:
  220. self._consumer_conn = None
  221. self._consumer_receive = None
  222. return False