_pulsar_manager.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import logging
  17. import json
  18. import requests
  19. from requests.adapters import HTTPAdapter
  20. from urllib3.util.retry import Retry
  21. from fate_arch.common.log import getLogger
  22. logger = getLogger()
  23. MAX_RETRIES = 10
  24. MAX_REDIRECT = 5
  25. BACKOFF_FACTOR = 1
  26. # sleep time equips to {BACKOFF_FACTOR} * (2 ** ({NUMBER_OF_TOTALRETRIES} - 1))
  27. CLUSTER = 'clusters/{}'
  28. TENANT = 'tenants/{}'
  29. # APIs are refer to https://pulsar.apache.org/admin-rest-api/?version=2.7.0&apiversion=v2
  30. class PulsarManager():
  31. def __init__(self, host: str, port: str, runtime_config: dict = {}):
  32. self.service_url = "http://{}:{}/admin/v2/".format(host, port)
  33. self.runtime_config = runtime_config
  34. # create session is used to construct url and request parameters
  35. def _create_session(self):
  36. # retry mechanism refers to
  37. # https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry
  38. retry = Retry(total=MAX_RETRIES, redirect=MAX_REDIRECT,
  39. backoff_factor=BACKOFF_FACTOR)
  40. s = requests.Session()
  41. # initialize headers
  42. s.headers.update({'Content-Type': 'application/json'})
  43. http_adapter = HTTPAdapter(max_retries=retry)
  44. s.mount('http://', http_adapter)
  45. s.mount('https://', http_adapter)
  46. return s
  47. # allocator
  48. def get_allocator(self, allocator: str = 'default'):
  49. session = self._create_session()
  50. response = session.get(
  51. self.service_url + 'broker-stats/allocator-stats/{}'.format(allocator))
  52. return response
  53. # cluster
  54. def get_cluster(self, cluster_name: str = ''):
  55. session = self._create_session()
  56. response = session.get(
  57. self.service_url + CLUSTER.format(cluster_name))
  58. return response
  59. def delete_cluster(self, cluster_name: str = ''):
  60. session = self._create_session()
  61. response = session.delete(
  62. self.service_url + CLUSTER.format(cluster_name))
  63. return response
  64. # service_url need to provide "http://" prefix
  65. def create_cluster(self, cluster_name: str, broker_url: str, service_url: str = '',
  66. service_url_tls: str = '', broker_url_tls: str = '',
  67. proxy_url: str = '', proxy_protocol: str = "SNI", peer_cluster_names: list = [],
  68. ):
  69. # initialize data
  70. data = {
  71. 'serviceUrl': service_url,
  72. 'serviceUrlTls': service_url_tls,
  73. 'brokerServiceUrl': broker_url,
  74. 'brokerServiceUrlTls': broker_url_tls,
  75. 'peerClusterNames': peer_cluster_names,
  76. 'proxyServiceUrl': proxy_url,
  77. 'proxyProtocol': proxy_protocol
  78. }
  79. session = self._create_session()
  80. response = session.put(
  81. self.service_url + CLUSTER.format(cluster_name), data=json.dumps(data))
  82. return response
  83. def update_cluster(self, cluster_name: str, broker_url: str, service_url: str = '',
  84. service_url_tls: str = '', broker_url_tls: str = '',
  85. proxy_url: str = '', proxy_protocol: str = "SNI", peer_cluster_names: list = [],
  86. ):
  87. # initialize data
  88. data = {
  89. 'serviceUrl': service_url,
  90. 'serviceUrlTls': service_url_tls,
  91. 'brokerServiceUrl': broker_url,
  92. 'brokerServiceUrlTls': broker_url_tls,
  93. 'peerClusterNames': peer_cluster_names,
  94. 'proxyServiceUrl': proxy_url,
  95. 'proxyProtocol': proxy_protocol
  96. }
  97. session = self._create_session()
  98. response = session.post(
  99. self.service_url + CLUSTER.format(cluster_name), data=json.dumps(data))
  100. return response
  101. # tenants
  102. def get_tenant(self, tenant: str = ''):
  103. session = self._create_session()
  104. response = session.get(self.service_url + TENANT.format(tenant))
  105. return response
  106. def create_tenant(self, tenant: str, admins: list, clusters: list):
  107. session = self._create_session()
  108. data = {'adminRoles': admins,
  109. 'allowedClusters': clusters}
  110. response = session.put(
  111. self.service_url + TENANT.format(tenant), data=json.dumps(data))
  112. return response
  113. def delete_tenant(self, tenant: str):
  114. session = self._create_session()
  115. response = session.delete(
  116. self.service_url + TENANT.format(tenant))
  117. return response
  118. def update_tenant(self, tenant: str, admins: list, clusters: list):
  119. session = self._create_session()
  120. data = {'adminRoles': admins,
  121. 'allowedClusters': clusters}
  122. response = session.post(
  123. self.service_url + TENANT.format(tenant), data=json.dumps(data))
  124. return response
  125. # namespace
  126. def get_namespace(self, tenant: str):
  127. session = self._create_session()
  128. response = session.get(
  129. self.service_url + 'namespaces/{}'.format(tenant))
  130. return response
  131. # 'replication_clusters' is always required
  132. def create_namespace(self, tenant: str, namespace: str, policies: dict = {}):
  133. session = self._create_session()
  134. response = session.put(
  135. self.service_url + 'namespaces/{}/{}'.format(tenant, namespace),
  136. data=json.dumps(policies)
  137. )
  138. return response
  139. def delete_namespace(self, tenant: str, namespace: str):
  140. session = self._create_session()
  141. response = session.delete(
  142. self.service_url +
  143. 'namespaces/{}/{}'.format(tenant, namespace)
  144. )
  145. return response
  146. def set_clusters_to_namespace(self, tenant: str, namespace: str, clusters: list):
  147. session = self._create_session()
  148. response = session.post(
  149. self.service_url + 'namespaces/{}/{}/replication'.format(tenant, namespace), json=clusters
  150. )
  151. return response
  152. def get_cluster_from_namespace(self, tenant: str, namespace: str):
  153. session = self._create_session()
  154. response = session.get(
  155. self.service_url +
  156. 'namespaces/{}/{}/replication'.format(tenant, namespace)
  157. )
  158. return response
  159. def set_subscription_expiration_time(self, tenant: str, namespace: str, mintues: int = 0):
  160. session = self._create_session()
  161. response = session.post(
  162. self.service_url + 'namespaces/{}/{}/subscriptionExpirationTime'.format(tenant, namespace), json=mintues
  163. )
  164. return response
  165. def set_message_ttl(self, tenant: str, namespace: str, mintues: int = 0):
  166. session = self._create_session()
  167. response = session.post(
  168. # the API accepts data as seconds
  169. self.service_url + 'namespaces/{}/{}/messageTTL'.format(tenant, namespace), json=mintues * 60
  170. )
  171. return response
  172. def unsubscribe_namespace_all_topics(self, tenant: str, namespace: str, subscription_name: str):
  173. session = self._create_session()
  174. response = session.post(
  175. self.service_url +
  176. 'namespaces/{}/{}/unsubscribe/{}'.format(
  177. tenant, namespace, subscription_name)
  178. )
  179. return response
  180. def set_retention(self, tenant: str, namespace: str,
  181. retention_time_in_minutes: int = 0, retention_size_in_MB: int = 0):
  182. session = self._create_session()
  183. data = {'retentionTimeInMinutes': retention_time_in_minutes,
  184. 'retentionSizeInMB': retention_size_in_MB}
  185. response = session.post(
  186. self.service_url +
  187. 'namespaces/{}/{}/retention'.format(tenant, namespace), data=json.dumps(data)
  188. )
  189. return response
  190. def remove_retention(self, tenant: str, namespace: str):
  191. session = self._create_session()
  192. response = session.delete(
  193. self.service_url +
  194. 'namespaces/{}/{}/retention'.format(tenant, namespace),
  195. )
  196. return response
  197. # topic
  198. def unsubscribe_topic(self, tenant: str, namespace: str, topic: str, subscription_name: str):
  199. session = self._create_session()
  200. response = session.delete(
  201. self.service_url +
  202. 'persistent/{}/{}/{}/subscription/{}'.format(
  203. tenant, namespace, topic, subscription_name)
  204. )
  205. return response