_rabbit_manager.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import requests
  17. import time
  18. from fate_arch.common import log
  19. LOGGER = log.getLogger()
  20. C_HTTP_TEMPLATE = "http://{}/api/{}"
  21. C_COMMON_HTTP_HEADER = {'Content-Type': 'application/json'}
  22. """
  23. APIs are refered to https://rawcdn.githack.com/rabbitmq/rabbitmq-management/v3.8.3/priv/www/api/index.html
  24. """
  25. class RabbitManager:
  26. def __init__(self, user, password, endpoint, runtime_config=None):
  27. self.user = user
  28. self.password = password
  29. self.endpoint = endpoint
  30. # The runtime_config defines the parameters to create queue, exchange .etc
  31. self.runtime_config = runtime_config if runtime_config is not None else {}
  32. def create_user(self, user, password):
  33. url = C_HTTP_TEMPLATE.format(self.endpoint, "users/" + user)
  34. body = {
  35. "password": password,
  36. "tags": ""
  37. }
  38. result = requests.put(url, headers=C_COMMON_HTTP_HEADER,
  39. json=body, auth=(self.user, self.password))
  40. LOGGER.debug(f"[rabbitmanager.create_user] {result}")
  41. if result.status_code == 201 or result.status_code == 204:
  42. return True
  43. else:
  44. return False
  45. def delete_user(self, user):
  46. url = C_HTTP_TEMPLATE.format(self.endpoint, "users/" + user)
  47. result = requests.delete(url, auth=(self.user, self.password))
  48. LOGGER.debug(f"[rabbitmanager.delete_user] {result}")
  49. return result
  50. def create_vhost(self, vhost):
  51. url = C_HTTP_TEMPLATE.format(self.endpoint, "vhosts/" + vhost)
  52. result = requests.put(
  53. url, headers=C_COMMON_HTTP_HEADER, auth=(self.user, self.password))
  54. LOGGER.debug(f"[rabbitmanager.create_vhost] {result}")
  55. self.add_user_to_vhost(self.user, vhost)
  56. return True
  57. def delete_vhost(self, vhost):
  58. url = C_HTTP_TEMPLATE.format(self.endpoint, "vhosts/" + vhost)
  59. result = requests.delete(url, auth=(self.user, self.password))
  60. LOGGER.debug(f"[rabbitmanager.delete_vhost] {result}")
  61. return result
  62. def delete_vhosts(self):
  63. result = self.get_vhosts()
  64. names = None
  65. try:
  66. if result.status_code == 200:
  67. names = [e["name"] for e in result.json()]
  68. except BaseException:
  69. names = None
  70. LOGGER.debug(f"[rabbitmanager.delete_vhosts] {names}")
  71. if names is not None:
  72. LOGGER.debug("[rabbitmanager.delete_vhosts]start to delete_vhosts")
  73. for name in names:
  74. self.delete_vhost(name)
  75. def get_vhosts(self):
  76. url = C_HTTP_TEMPLATE.format(self.endpoint, "vhosts")
  77. result = requests.get(url, auth=(self.user, self.password))
  78. LOGGER.debug(f"[rabbitmanager.get_vhosts] {result}")
  79. return result
  80. def add_user_to_vhost(self, user, vhost):
  81. url = C_HTTP_TEMPLATE.format(
  82. self.endpoint, "{}/{}/{}".format("permissions", vhost, user))
  83. body = {
  84. "configure": ".*",
  85. "write": ".*",
  86. "read": ".*"
  87. }
  88. result = requests.put(url, headers=C_COMMON_HTTP_HEADER,
  89. json=body, auth=(self.user, self.password))
  90. LOGGER.debug(f"[rabbitmanager.add_user_to_vhost] {result}")
  91. if result.status_code == 201 or result.status_code == 204:
  92. return True
  93. else:
  94. return False
  95. def remove_user_from_vhost(self, user, vhost):
  96. url = C_HTTP_TEMPLATE.format(
  97. self.endpoint, "{}/{}/{}".format("permissions", vhost, user))
  98. result = requests.delete(url, auth=(self.user, self.password))
  99. LOGGER.debug(f"[rabbitmanager.remove_user_from_vhost] {result}")
  100. return result
  101. def get_exchanges(self, vhost):
  102. url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}".format("exchanges", vhost))
  103. result = requests.get(url, auth=(self.user, self.password))
  104. LOGGER.debug(f"[rabbitmanager.get_exchanges] {result}")
  105. try:
  106. if result.status_code == 200:
  107. exchange_names = [e["name"] for e in result.json()]
  108. LOGGER.debug(f"[rabbitmanager.get_exchanges] exchange_names={exchange_names}")
  109. return exchange_names
  110. else:
  111. return None
  112. except BaseException:
  113. return None
  114. def create_exchange(self, vhost, exchange_name):
  115. url = C_HTTP_TEMPLATE.format(
  116. self.endpoint, "{}/{}/{}".format("exchanges", vhost, exchange_name))
  117. basic_config = {
  118. "type": "direct",
  119. "auto_delete": False,
  120. "durable": True,
  121. "internal": False,
  122. "arguments": {}
  123. }
  124. exchange_runtime_config = self.runtime_config.get("exchange", {})
  125. basic_config.update(exchange_runtime_config)
  126. result = requests.put(url, headers=C_COMMON_HTTP_HEADER,
  127. json=basic_config, auth=(self.user, self.password))
  128. LOGGER.debug(result)
  129. return result
  130. def delete_exchange(self, vhost, exchange_name):
  131. url = C_HTTP_TEMPLATE.format(
  132. self.endpoint, "{}/{}/{}".format("exchanges", vhost, exchange_name))
  133. result = requests.delete(url, auth=(self.user, self.password))
  134. LOGGER.debug(f"[rabbitmanager.delete_exchange] vhost={vhost}, exchange_name={exchange_name}, {result}")
  135. return result
  136. def get_policies(self, vhost):
  137. url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}".format("policies", vhost))
  138. result = requests.get(url, auth=(self.user, self.password))
  139. LOGGER.debug(f"[rabbitmanager.get_policies] {result}")
  140. try:
  141. if result.status_code == 200:
  142. policies_names = [e["name"] for e in result.json()]
  143. LOGGER.debug(f"[rabbitmanager.get_policies] policies_names={policies_names}")
  144. return policies_names
  145. else:
  146. return None
  147. except BaseException:
  148. return None
  149. def delete_policy(self, vhost, policy_name):
  150. url = C_HTTP_TEMPLATE.format(
  151. self.endpoint, "{}/{}/{}".format("policies", vhost, policy_name))
  152. result = requests.delete(url, auth=(self.user, self.password))
  153. LOGGER.debug(f"[rabbitmanager.delete_policy] vhost={vhost}, policy_name={policy_name}, {result}")
  154. return result
  155. def create_queue(self, vhost, queue_name):
  156. url = C_HTTP_TEMPLATE.format(
  157. self.endpoint, "{}/{}/{}".format("queues", vhost, queue_name))
  158. basic_config = {
  159. "auto_delete": False,
  160. "durable": True,
  161. "arguments": {}
  162. }
  163. queue_runtime_config = self.runtime_config.get("queue", {})
  164. basic_config.update(queue_runtime_config)
  165. LOGGER.debug(basic_config)
  166. result = requests.put(url, headers=C_COMMON_HTTP_HEADER,
  167. json=basic_config, auth=(self.user, self.password))
  168. LOGGER.debug(f"[rabbitmanager.create_queue] {result}")
  169. if result.status_code == 201 or result.status_code == 204:
  170. return True
  171. else:
  172. return False
  173. def get_queue(self, vhost, queue_name):
  174. url = C_HTTP_TEMPLATE.format(
  175. self.endpoint, "{}/{}/{}".format("queues", vhost, queue_name))
  176. result = requests.get(url, headers=C_COMMON_HTTP_HEADER, auth=(self.user, self.password))
  177. return result
  178. def get_queues(self, vhost):
  179. url = C_HTTP_TEMPLATE.format(
  180. self.endpoint, "{}/{}".format("queues", vhost))
  181. result = requests.get(url, headers=C_COMMON_HTTP_HEADER, auth=(self.user, self.password))
  182. try:
  183. if result.status_code == 200:
  184. queue_names = [e["name"] for e in result.json()]
  185. LOGGER.debug(f"[rabbitmanager.get_all_queue] queue_names={queue_names}")
  186. return queue_names
  187. else:
  188. return None
  189. except BaseException:
  190. return None
  191. def delete_queue(self, vhost, queue_name):
  192. url = C_HTTP_TEMPLATE.format(
  193. self.endpoint, "{}/{}/{}".format("queues", vhost, queue_name))
  194. result = requests.delete(url, auth=(self.user, self.password))
  195. LOGGER.debug(f"[rabbitmanager.delete_queue] vhost={vhost}, queue_name={queue_name}, {result}")
  196. return result
  197. def get_connections(self):
  198. url = C_HTTP_TEMPLATE.format(
  199. self.endpoint, "connections")
  200. result = requests.get(url, headers=C_COMMON_HTTP_HEADER, auth=(self.user, self.password))
  201. LOGGER.debug(f"[rabbitmanager.get_connections] {result}")
  202. return result
  203. def delete_connections(self, vhost=None):
  204. result = self.get_connections()
  205. names = None
  206. try:
  207. if result.status_code == 200:
  208. if vhost is None:
  209. names = [e["name"] for e in result.json()]
  210. else:
  211. names = [e["name"] for e in result.json() if e["vhost"] == vhost]
  212. except BaseException:
  213. names = None
  214. LOGGER.debug(f"[rabbitmanager.delete_connections] {names}")
  215. if names is not None:
  216. LOGGER.debug("[rabbitmanager.delete_connections] start....")
  217. for name in names:
  218. url = C_HTTP_TEMPLATE.format(
  219. self.endpoint, "{}/{}".format("connections", name))
  220. result = requests.delete(url, auth=(self.user, self.password))
  221. LOGGER.debug(result)
  222. def bind_exchange_to_queue(self, vhost, exchange_name, queue_name):
  223. url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}/e/{}/q/{}".format("bindings",
  224. vhost,
  225. exchange_name,
  226. queue_name))
  227. body = {
  228. "routing_key": queue_name,
  229. "arguments": {}
  230. }
  231. result = requests.post(
  232. url, headers=C_COMMON_HTTP_HEADER, json=body, auth=(self.user, self.password))
  233. LOGGER.debug(result)
  234. return result
  235. def unbind_exchange_to_queue(self, vhost, exchange_name, queue_name):
  236. url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}/e/{}/q/{}/{}".format("bindings",
  237. vhost,
  238. exchange_name,
  239. queue_name,
  240. queue_name))
  241. result = requests.delete(url, auth=(self.user, self.password))
  242. LOGGER.debug(result)
  243. return result
  244. def _set_federated_upstream(self, upstream_host, vhost, receive_queue_name):
  245. url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}/{}/{}".format("parameters",
  246. "federation-upstream",
  247. vhost,
  248. receive_queue_name))
  249. upstream_runtime_config = self.runtime_config.get("upstream", {})
  250. upstream_runtime_config['uri'] = upstream_host
  251. upstream_runtime_config['queue'] = receive_queue_name.replace(
  252. "receive", "send", 1)
  253. body = {
  254. "value": upstream_runtime_config
  255. }
  256. LOGGER.debug(f"[rabbitmanager._set_federated_upstream]set_federated_upstream, url: {url} body: {body}")
  257. result = requests.put(url, headers=C_COMMON_HTTP_HEADER,
  258. json=body, auth=(self.user, self.password))
  259. LOGGER.debug(f"[rabbitmanager._set_federated_upstream] {result}")
  260. if result.status_code != 201 and result.status_code != 204:
  261. LOGGER.debug(f"[rabbitmanager._set_federated_upstream] _set_federated_upstream fail. {result}")
  262. return False
  263. return True
  264. def _unset_federated_upstream(self, upstream_name, vhost):
  265. url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}/{}/{}".format("parameters",
  266. "federation-upstream",
  267. vhost,
  268. upstream_name))
  269. result = requests.delete(url, auth=(self.user, self.password))
  270. LOGGER.debug(result)
  271. return result
  272. def _set_federated_queue_policy(self, vhost, receive_queue_name):
  273. url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}/{}".format("policies",
  274. vhost,
  275. receive_queue_name))
  276. body = {
  277. "pattern": '^' + receive_queue_name + '$',
  278. "apply-to": "queues",
  279. "definition":
  280. {
  281. "federation-upstream": receive_queue_name
  282. }
  283. }
  284. LOGGER.debug(f"[rabbitmanager._set_federated_queue_policy]set_federated_queue_policy, url: {url} body: {body}")
  285. result = requests.put(url, headers=C_COMMON_HTTP_HEADER,
  286. json=body, auth=(self.user, self.password))
  287. LOGGER.debug(f"[rabbitmanager._set_federated_queue_policy] {result}")
  288. if result.status_code != 201 and result.status_code != 204:
  289. LOGGER.debug(f"[rabbitmanager._set_federated_queue_policy] _set_federated_queue_policy fail. {result}")
  290. return False
  291. return True
  292. def _unset_federated_queue_policy(self, policy_name, vhost):
  293. url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}/{}".format("policies",
  294. vhost,
  295. policy_name))
  296. result = requests.delete(url, auth=(self.user, self.password))
  297. LOGGER.debug(result)
  298. return result
  299. # Create federate queue with upstream
  300. def federate_queue(self, upstream_host, vhost, send_queue_name, receive_queue_name):
  301. time.sleep(0.1)
  302. LOGGER.debug(f"[rabbitmanager.federate_queue] create federate_queue {receive_queue_name}")
  303. result = self._set_federated_upstream(
  304. upstream_host, vhost, receive_queue_name)
  305. if result is False:
  306. # should be logged
  307. LOGGER.debug(f"[rabbitmanager.federate_queue] result_set_upstream fail.")
  308. return False
  309. result = self._set_federated_queue_policy(
  310. vhost, receive_queue_name)
  311. if result is False:
  312. LOGGER.debug(f"[rabbitmanager.federate_queue] result_set_policy fail.")
  313. return False
  314. return True
  315. def de_federate_queue(self, vhost, receive_queue_name):
  316. result = self._unset_federated_queue_policy(receive_queue_name, vhost)
  317. LOGGER.debug(
  318. f"delete federate queue policy status code: {result.status_code}")
  319. result = self._unset_federated_upstream(receive_queue_name, vhost)
  320. LOGGER.debug(
  321. f"delete federate queue upstream status code: {result.status_code}")
  322. return True
  323. def clean(self, vhost):
  324. time.sleep(1)
  325. queue_names = self.get_queues(vhost)
  326. if queue_names is not None:
  327. for name in queue_names:
  328. self.delete_queue(vhost, name)
  329. exchange_names = self.get_exchanges(vhost)
  330. if exchange_names is not None:
  331. for name in exchange_names:
  332. self.delete_exchange(vhost, name)
  333. policy_names = self.get_policies(vhost)
  334. if policy_names is not None:
  335. for name in policy_names:
  336. self.delete_policy(vhost, name)
  337. self.delete_vhost(vhost=vhost)
  338. time.sleep(1)
  339. self.delete_connections(vhost=vhost)