|
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import requests
- import time
- from fate_arch.common import log
- LOGGER = log.getLogger()
- C_HTTP_TEMPLATE = "http://{}/api/{}"
- C_COMMON_HTTP_HEADER = {'Content-Type': 'application/json'}
- """
- APIs are refered to https://rawcdn.githack.com/rabbitmq/rabbitmq-management/v3.8.3/priv/www/api/index.html
- """
- class RabbitManager:
- def __init__(self, user, password, endpoint, runtime_config=None):
- self.user = user
- self.password = password
- self.endpoint = endpoint
- # The runtime_config defines the parameters to create queue, exchange .etc
- self.runtime_config = runtime_config if runtime_config is not None else {}
- def create_user(self, user, password):
- url = C_HTTP_TEMPLATE.format(self.endpoint, "users/" + user)
- body = {
- "password": password,
- "tags": ""
- }
- result = requests.put(url, headers=C_COMMON_HTTP_HEADER,
- json=body, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.create_user] {result}")
- if result.status_code == 201 or result.status_code == 204:
- return True
- else:
- return False
- def delete_user(self, user):
- url = C_HTTP_TEMPLATE.format(self.endpoint, "users/" + user)
- result = requests.delete(url, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.delete_user] {result}")
- return result
- def create_vhost(self, vhost):
- url = C_HTTP_TEMPLATE.format(self.endpoint, "vhosts/" + vhost)
- result = requests.put(
- url, headers=C_COMMON_HTTP_HEADER, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.create_vhost] {result}")
- self.add_user_to_vhost(self.user, vhost)
- return True
- def delete_vhost(self, vhost):
- url = C_HTTP_TEMPLATE.format(self.endpoint, "vhosts/" + vhost)
- result = requests.delete(url, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.delete_vhost] {result}")
- return result
- def delete_vhosts(self):
- result = self.get_vhosts()
- names = None
- try:
- if result.status_code == 200:
- names = [e["name"] for e in result.json()]
- except BaseException:
- names = None
- LOGGER.debug(f"[rabbitmanager.delete_vhosts] {names}")
- if names is not None:
- LOGGER.debug("[rabbitmanager.delete_vhosts]start to delete_vhosts")
- for name in names:
- self.delete_vhost(name)
- def get_vhosts(self):
- url = C_HTTP_TEMPLATE.format(self.endpoint, "vhosts")
- result = requests.get(url, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.get_vhosts] {result}")
- return result
- def add_user_to_vhost(self, user, vhost):
- url = C_HTTP_TEMPLATE.format(
- self.endpoint, "{}/{}/{}".format("permissions", vhost, user))
- body = {
- "configure": ".*",
- "write": ".*",
- "read": ".*"
- }
- result = requests.put(url, headers=C_COMMON_HTTP_HEADER,
- json=body, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.add_user_to_vhost] {result}")
- if result.status_code == 201 or result.status_code == 204:
- return True
- else:
- return False
- def remove_user_from_vhost(self, user, vhost):
- url = C_HTTP_TEMPLATE.format(
- self.endpoint, "{}/{}/{}".format("permissions", vhost, user))
- result = requests.delete(url, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.remove_user_from_vhost] {result}")
- return result
- def get_exchanges(self, vhost):
- url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}".format("exchanges", vhost))
- result = requests.get(url, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.get_exchanges] {result}")
- try:
- if result.status_code == 200:
- exchange_names = [e["name"] for e in result.json()]
- LOGGER.debug(f"[rabbitmanager.get_exchanges] exchange_names={exchange_names}")
- return exchange_names
- else:
- return None
- except BaseException:
- return None
- def create_exchange(self, vhost, exchange_name):
- url = C_HTTP_TEMPLATE.format(
- self.endpoint, "{}/{}/{}".format("exchanges", vhost, exchange_name))
- basic_config = {
- "type": "direct",
- "auto_delete": False,
- "durable": True,
- "internal": False,
- "arguments": {}
- }
- exchange_runtime_config = self.runtime_config.get("exchange", {})
- basic_config.update(exchange_runtime_config)
- result = requests.put(url, headers=C_COMMON_HTTP_HEADER,
- json=basic_config, auth=(self.user, self.password))
- LOGGER.debug(result)
- return result
- def delete_exchange(self, vhost, exchange_name):
- url = C_HTTP_TEMPLATE.format(
- self.endpoint, "{}/{}/{}".format("exchanges", vhost, exchange_name))
- result = requests.delete(url, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.delete_exchange] vhost={vhost}, exchange_name={exchange_name}, {result}")
- return result
- def get_policies(self, vhost):
- url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}".format("policies", vhost))
- result = requests.get(url, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.get_policies] {result}")
- try:
- if result.status_code == 200:
- policies_names = [e["name"] for e in result.json()]
- LOGGER.debug(f"[rabbitmanager.get_policies] policies_names={policies_names}")
- return policies_names
- else:
- return None
- except BaseException:
- return None
- def delete_policy(self, vhost, policy_name):
- url = C_HTTP_TEMPLATE.format(
- self.endpoint, "{}/{}/{}".format("policies", vhost, policy_name))
- result = requests.delete(url, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.delete_policy] vhost={vhost}, policy_name={policy_name}, {result}")
- return result
- def create_queue(self, vhost, queue_name):
- url = C_HTTP_TEMPLATE.format(
- self.endpoint, "{}/{}/{}".format("queues", vhost, queue_name))
- basic_config = {
- "auto_delete": False,
- "durable": True,
- "arguments": {}
- }
- queue_runtime_config = self.runtime_config.get("queue", {})
- basic_config.update(queue_runtime_config)
- LOGGER.debug(basic_config)
- result = requests.put(url, headers=C_COMMON_HTTP_HEADER,
- json=basic_config, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.create_queue] {result}")
- if result.status_code == 201 or result.status_code == 204:
- return True
- else:
- return False
- def get_queue(self, vhost, queue_name):
- url = C_HTTP_TEMPLATE.format(
- self.endpoint, "{}/{}/{}".format("queues", vhost, queue_name))
- result = requests.get(url, headers=C_COMMON_HTTP_HEADER, auth=(self.user, self.password))
- return result
- def get_queues(self, vhost):
- url = C_HTTP_TEMPLATE.format(
- self.endpoint, "{}/{}".format("queues", vhost))
- result = requests.get(url, headers=C_COMMON_HTTP_HEADER, auth=(self.user, self.password))
- try:
- if result.status_code == 200:
- queue_names = [e["name"] for e in result.json()]
- LOGGER.debug(f"[rabbitmanager.get_all_queue] queue_names={queue_names}")
- return queue_names
- else:
- return None
- except BaseException:
- return None
- def delete_queue(self, vhost, queue_name):
- url = C_HTTP_TEMPLATE.format(
- self.endpoint, "{}/{}/{}".format("queues", vhost, queue_name))
- result = requests.delete(url, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.delete_queue] vhost={vhost}, queue_name={queue_name}, {result}")
- return result
- def get_connections(self):
- url = C_HTTP_TEMPLATE.format(
- self.endpoint, "connections")
- result = requests.get(url, headers=C_COMMON_HTTP_HEADER, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager.get_connections] {result}")
- return result
- def delete_connections(self, vhost=None):
- result = self.get_connections()
- names = None
- try:
- if result.status_code == 200:
- if vhost is None:
- names = [e["name"] for e in result.json()]
- else:
- names = [e["name"] for e in result.json() if e["vhost"] == vhost]
- except BaseException:
- names = None
- LOGGER.debug(f"[rabbitmanager.delete_connections] {names}")
- if names is not None:
- LOGGER.debug("[rabbitmanager.delete_connections] start....")
- for name in names:
- url = C_HTTP_TEMPLATE.format(
- self.endpoint, "{}/{}".format("connections", name))
- result = requests.delete(url, auth=(self.user, self.password))
- LOGGER.debug(result)
- def bind_exchange_to_queue(self, vhost, exchange_name, queue_name):
- url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}/e/{}/q/{}".format("bindings",
- vhost,
- exchange_name,
- queue_name))
- body = {
- "routing_key": queue_name,
- "arguments": {}
- }
- result = requests.post(
- url, headers=C_COMMON_HTTP_HEADER, json=body, auth=(self.user, self.password))
- LOGGER.debug(result)
- return result
- def unbind_exchange_to_queue(self, vhost, exchange_name, queue_name):
- url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}/e/{}/q/{}/{}".format("bindings",
- vhost,
- exchange_name,
- queue_name,
- queue_name))
- result = requests.delete(url, auth=(self.user, self.password))
- LOGGER.debug(result)
- return result
- def _set_federated_upstream(self, upstream_host, vhost, receive_queue_name):
- url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}/{}/{}".format("parameters",
- "federation-upstream",
- vhost,
- receive_queue_name))
- upstream_runtime_config = self.runtime_config.get("upstream", {})
- upstream_runtime_config['uri'] = upstream_host
- upstream_runtime_config['queue'] = receive_queue_name.replace(
- "receive", "send", 1)
- body = {
- "value": upstream_runtime_config
- }
- LOGGER.debug(f"[rabbitmanager._set_federated_upstream]set_federated_upstream, url: {url} body: {body}")
- result = requests.put(url, headers=C_COMMON_HTTP_HEADER,
- json=body, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager._set_federated_upstream] {result}")
- if result.status_code != 201 and result.status_code != 204:
- LOGGER.debug(f"[rabbitmanager._set_federated_upstream] _set_federated_upstream fail. {result}")
- return False
- return True
- def _unset_federated_upstream(self, upstream_name, vhost):
- url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}/{}/{}".format("parameters",
- "federation-upstream",
- vhost,
- upstream_name))
- result = requests.delete(url, auth=(self.user, self.password))
- LOGGER.debug(result)
- return result
- def _set_federated_queue_policy(self, vhost, receive_queue_name):
- url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}/{}".format("policies",
- vhost,
- receive_queue_name))
- body = {
- "pattern": '^' + receive_queue_name + '$',
- "apply-to": "queues",
- "definition":
- {
- "federation-upstream": receive_queue_name
- }
- }
- LOGGER.debug(f"[rabbitmanager._set_federated_queue_policy]set_federated_queue_policy, url: {url} body: {body}")
- result = requests.put(url, headers=C_COMMON_HTTP_HEADER,
- json=body, auth=(self.user, self.password))
- LOGGER.debug(f"[rabbitmanager._set_federated_queue_policy] {result}")
- if result.status_code != 201 and result.status_code != 204:
- LOGGER.debug(f"[rabbitmanager._set_federated_queue_policy] _set_federated_queue_policy fail. {result}")
- return False
- return True
- def _unset_federated_queue_policy(self, policy_name, vhost):
- url = C_HTTP_TEMPLATE.format(self.endpoint, "{}/{}/{}".format("policies",
- vhost,
- policy_name))
- result = requests.delete(url, auth=(self.user, self.password))
- LOGGER.debug(result)
- return result
- # Create federate queue with upstream
- def federate_queue(self, upstream_host, vhost, send_queue_name, receive_queue_name):
- time.sleep(0.1)
- LOGGER.debug(f"[rabbitmanager.federate_queue] create federate_queue {receive_queue_name}")
- result = self._set_federated_upstream(
- upstream_host, vhost, receive_queue_name)
- if result is False:
- # should be logged
- LOGGER.debug(f"[rabbitmanager.federate_queue] result_set_upstream fail.")
- return False
- result = self._set_federated_queue_policy(
- vhost, receive_queue_name)
- if result is False:
- LOGGER.debug(f"[rabbitmanager.federate_queue] result_set_policy fail.")
- return False
- return True
- def de_federate_queue(self, vhost, receive_queue_name):
- result = self._unset_federated_queue_policy(receive_queue_name, vhost)
- LOGGER.debug(
- f"delete federate queue policy status code: {result.status_code}")
- result = self._unset_federated_upstream(receive_queue_name, vhost)
- LOGGER.debug(
- f"delete federate queue upstream status code: {result.status_code}")
- return True
- def clean(self, vhost):
- time.sleep(1)
- queue_names = self.get_queues(vhost)
- if queue_names is not None:
- for name in queue_names:
- self.delete_queue(vhost, name)
- exchange_names = self.get_exchanges(vhost)
- if exchange_names is not None:
- for name in exchange_names:
- self.delete_exchange(vhost, name)
- policy_names = self.get_policies(vhost)
- if policy_names is not None:
- for name in policy_names:
- self.delete_policy(vhost, name)
- self.delete_vhost(vhost=vhost)
- time.sleep(1)
- self.delete_connections(vhost=vhost)
|