|
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import logging
- import json
- import requests
- from requests.adapters import HTTPAdapter
- from urllib3.util.retry import Retry
- from fate_arch.common.log import getLogger
- logger = getLogger()
- MAX_RETRIES = 10
- MAX_REDIRECT = 5
- BACKOFF_FACTOR = 1
- # sleep time equips to {BACKOFF_FACTOR} * (2 ** ({NUMBER_OF_TOTALRETRIES} - 1))
- CLUSTER = 'clusters/{}'
- TENANT = 'tenants/{}'
- # APIs are refer to https://pulsar.apache.org/admin-rest-api/?version=2.7.0&apiversion=v2
- class PulsarManager():
- def __init__(self, host: str, port: str, runtime_config: dict = {}):
- self.service_url = "http://{}:{}/admin/v2/".format(host, port)
- self.runtime_config = runtime_config
- # create session is used to construct url and request parameters
- def _create_session(self):
- # retry mechanism refers to
- # https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry
- retry = Retry(total=MAX_RETRIES, redirect=MAX_REDIRECT,
- backoff_factor=BACKOFF_FACTOR)
- s = requests.Session()
- # initialize headers
- s.headers.update({'Content-Type': 'application/json'})
- http_adapter = HTTPAdapter(max_retries=retry)
- s.mount('http://', http_adapter)
- s.mount('https://', http_adapter)
- return s
- # allocator
- def get_allocator(self, allocator: str = 'default'):
- session = self._create_session()
- response = session.get(
- self.service_url + 'broker-stats/allocator-stats/{}'.format(allocator))
- return response
- # cluster
- def get_cluster(self, cluster_name: str = ''):
- session = self._create_session()
- response = session.get(
- self.service_url + CLUSTER.format(cluster_name))
- return response
- def delete_cluster(self, cluster_name: str = ''):
- session = self._create_session()
- response = session.delete(
- self.service_url + CLUSTER.format(cluster_name))
- return response
- # service_url need to provide "http://" prefix
- def create_cluster(self, cluster_name: str, broker_url: str, service_url: str = '',
- service_url_tls: str = '', broker_url_tls: str = '',
- proxy_url: str = '', proxy_protocol: str = "SNI", peer_cluster_names: list = [],
- ):
- # initialize data
- data = {
- 'serviceUrl': service_url,
- 'serviceUrlTls': service_url_tls,
- 'brokerServiceUrl': broker_url,
- 'brokerServiceUrlTls': broker_url_tls,
- 'peerClusterNames': peer_cluster_names,
- 'proxyServiceUrl': proxy_url,
- 'proxyProtocol': proxy_protocol
- }
- session = self._create_session()
- response = session.put(
- self.service_url + CLUSTER.format(cluster_name), data=json.dumps(data))
- return response
- def update_cluster(self, cluster_name: str, broker_url: str, service_url: str = '',
- service_url_tls: str = '', broker_url_tls: str = '',
- proxy_url: str = '', proxy_protocol: str = "SNI", peer_cluster_names: list = [],
- ):
- # initialize data
- data = {
- 'serviceUrl': service_url,
- 'serviceUrlTls': service_url_tls,
- 'brokerServiceUrl': broker_url,
- 'brokerServiceUrlTls': broker_url_tls,
- 'peerClusterNames': peer_cluster_names,
- 'proxyServiceUrl': proxy_url,
- 'proxyProtocol': proxy_protocol
- }
- session = self._create_session()
- response = session.post(
- self.service_url + CLUSTER.format(cluster_name), data=json.dumps(data))
- return response
- # tenants
- def get_tenant(self, tenant: str = ''):
- session = self._create_session()
- response = session.get(self.service_url + TENANT.format(tenant))
- return response
- def create_tenant(self, tenant: str, admins: list, clusters: list):
- session = self._create_session()
- data = {'adminRoles': admins,
- 'allowedClusters': clusters}
- response = session.put(
- self.service_url + TENANT.format(tenant), data=json.dumps(data))
- return response
- def delete_tenant(self, tenant: str):
- session = self._create_session()
- response = session.delete(
- self.service_url + TENANT.format(tenant))
- return response
- def update_tenant(self, tenant: str, admins: list, clusters: list):
- session = self._create_session()
- data = {'adminRoles': admins,
- 'allowedClusters': clusters}
- response = session.post(
- self.service_url + TENANT.format(tenant), data=json.dumps(data))
- return response
- # namespace
- def get_namespace(self, tenant: str):
- session = self._create_session()
- response = session.get(
- self.service_url + 'namespaces/{}'.format(tenant))
- return response
- # 'replication_clusters' is always required
- def create_namespace(self, tenant: str, namespace: str, policies: dict = {}):
- session = self._create_session()
- response = session.put(
- self.service_url + 'namespaces/{}/{}'.format(tenant, namespace),
- data=json.dumps(policies)
- )
- return response
- def delete_namespace(self, tenant: str, namespace: str):
- session = self._create_session()
- response = session.delete(
- self.service_url +
- 'namespaces/{}/{}'.format(tenant, namespace)
- )
- return response
- def set_clusters_to_namespace(self, tenant: str, namespace: str, clusters: list):
- session = self._create_session()
- response = session.post(
- self.service_url + 'namespaces/{}/{}/replication'.format(tenant, namespace), json=clusters
- )
- return response
- def get_cluster_from_namespace(self, tenant: str, namespace: str):
- session = self._create_session()
- response = session.get(
- self.service_url +
- 'namespaces/{}/{}/replication'.format(tenant, namespace)
- )
- return response
- def set_subscription_expiration_time(self, tenant: str, namespace: str, mintues: int = 0):
- session = self._create_session()
- response = session.post(
- self.service_url + 'namespaces/{}/{}/subscriptionExpirationTime'.format(tenant, namespace), json=mintues
- )
- return response
- def set_message_ttl(self, tenant: str, namespace: str, mintues: int = 0):
- session = self._create_session()
- response = session.post(
- # the API accepts data as seconds
- self.service_url + 'namespaces/{}/{}/messageTTL'.format(tenant, namespace), json=mintues * 60
- )
- return response
- def unsubscribe_namespace_all_topics(self, tenant: str, namespace: str, subscription_name: str):
- session = self._create_session()
- response = session.post(
- self.service_url +
- 'namespaces/{}/{}/unsubscribe/{}'.format(
- tenant, namespace, subscription_name)
- )
- return response
- def set_retention(self, tenant: str, namespace: str,
- retention_time_in_minutes: int = 0, retention_size_in_MB: int = 0):
- session = self._create_session()
- data = {'retentionTimeInMinutes': retention_time_in_minutes,
- 'retentionSizeInMB': retention_size_in_MB}
- response = session.post(
- self.service_url +
- 'namespaces/{}/{}/retention'.format(tenant, namespace), data=json.dumps(data)
- )
- return response
- def remove_retention(self, tenant: str, namespace: str):
- session = self._create_session()
- response = session.delete(
- self.service_url +
- 'namespaces/{}/{}/retention'.format(tenant, namespace),
- )
- return response
- # topic
- def unsubscribe_topic(self, tenant: str, namespace: str, topic: str, subscription_name: str):
- session = self._create_session()
- response = session.delete(
- self.service_url +
- 'persistent/{}/{}/{}/subscription/{}'.format(
- tenant, namespace, topic, subscription_name)
- )
- return response
|