db_services.py 13 KB


  1. #
  2. # Copyright 2021 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import abc
  17. import atexit
  18. import json
  19. import time
  20. from functools import wraps
  21. from queue import Queue
  22. from threading import Thread
  23. from urllib import parse
  24. from kazoo.client import KazooClient
  25. from kazoo.exceptions import NodeExistsError, NoNodeError, ZookeeperError
  26. from kazoo.security import make_digest_acl
  27. from shortuuid import ShortUUID
  28. from fate_arch.common.versions import get_fate_version
  29. from fate_flow.db.service_registry import ServerRegistry
  30. from fate_flow.entity.instance import FlowInstance
  31. from fate_flow.errors.error_services import *
  32. from fate_flow.settings import (
  33. FATE_FLOW_MODEL_TRANSFER_ENDPOINT, GRPC_PORT, HOST, HTTP_PORT, NGINX_HOST, NGINX_HTTP_PORT,
  34. RANDOM_INSTANCE_ID, USE_REGISTRY, ZOOKEEPER, ZOOKEEPER_REGISTRY, stat_logger,
  35. )
  36. from fate_flow.utils.model_utils import models_group_by_party_model_id_and_model_version
  37. model_download_endpoint = f'http://{NGINX_HOST}:{NGINX_HTTP_PORT}{FATE_FLOW_MODEL_TRANSFER_ENDPOINT}'
  38. instance_id = ShortUUID().random(length=8) if RANDOM_INSTANCE_ID else f'flow-{HOST}-{HTTP_PORT}'
  39. server_instance = (
  40. f'{HOST}:{GRPC_PORT}',
  41. json.dumps({
  42. 'instance_id': instance_id,
  43. 'timestamp': round(time.time() * 1000),
  44. 'version': get_fate_version() or '',
  45. 'host': HOST,
  46. 'grpc_port': GRPC_PORT,
  47. 'http_port': HTTP_PORT,
  48. }),
  49. )
  50. def check_service_supported(method):
  51. """Decorator to check if `service_name` is supported.
  52. The attribute `supported_services` MUST be defined in class.
  53. The first and second arguments of `method` MUST be `self` and `service_name`.
  54. :param Callable method: The class method.
  55. :return: The inner wrapper function.
  56. :rtype: Callable
  57. """
  58. @wraps(method)
  59. def magic(self, service_name, *args, **kwargs):
  60. if service_name not in self.supported_services:
  61. raise ServiceNotSupported(service_name=service_name)
  62. return method(self, service_name, *args, **kwargs)
  63. return magic
  64. def get_model_download_url(party_model_id, model_version):
  65. """Get the full url of model download.
  66. :param str party_model_id: The party model id, `#` will be replaced with `~`.
  67. :param str model_version: The model version.
  68. :return: The download url.
  69. :rtype: str
  70. """
  71. return '{endpoint}/{model_id}/{model_version}'.format(
  72. endpoint=model_download_endpoint,
  73. model_id=party_model_id.replace('#', '~'),
  74. model_version=model_version,
  75. )
  76. class ServicesDB(abc.ABC):
  77. """Database for storage service urls.
  78. Abstract base class for the real backends.
  79. """
  80. @property
  81. @abc.abstractmethod
  82. def supported_services(self):
  83. """The names of supported services.
  84. The returned list SHOULD contain `fateflow` (model download) and `servings` (FATE-Serving).
  85. :return: The service names.
  86. :rtype: list
  87. """
  88. pass
  89. @abc.abstractmethod
  90. def _insert(self, service_name, service_url, value=''):
  91. pass
  92. @check_service_supported
  93. def insert(self, service_name, service_url, value=''):
  94. """Insert a service url to database.
  95. :param str service_name: The service name.
  96. :param str service_url: The service url.
  97. :return: None
  98. """
  99. try:
  100. self._insert(service_name, service_url, value)
  101. except ServicesError as e:
  102. stat_logger.exception(e)
  103. @abc.abstractmethod
  104. def _delete(self, service_name, service_url):
  105. pass
  106. @check_service_supported
  107. def delete(self, service_name, service_url):
  108. """Delete a service url from database.
  109. :param str service_name: The service name.
  110. :param str service_url: The service url.
  111. :return: None
  112. """
  113. try:
  114. self._delete(service_name, service_url)
  115. except ServicesError as e:
  116. stat_logger.exception(e)
  117. def register_model(self, party_model_id, model_version):
  118. """Call `self.insert` for insert a service url to database.
  119. Currently, only `fateflow` (model download) urls are supported.
  120. :param str party_model_id: The party model id, `#` will be replaced with `_`.
  121. :param str model_version: The model version.
  122. :return: None
  123. """
  124. self.insert('fateflow', get_model_download_url(party_model_id, model_version))
  125. def unregister_model(self, party_model_id, model_version):
  126. """Call `self.delete` for delete a service url from database.
  127. Currently, only `fateflow` (model download) urls are supported.
  128. :param str party_model_id: The party model id, `#` will be replaced with `_`.
  129. :param str model_version: The model version.
  130. :return: None
  131. """
  132. self.delete('fateflow', get_model_download_url(party_model_id, model_version))
  133. def register_flow(self):
  134. """Call `self.insert` for insert the flow server address to databae.
  135. :return: None
  136. """
  137. self.insert('flow-server', *server_instance)
  138. def unregister_flow(self):
  139. """Call `self.delete` for delete the flow server address from databae.
  140. :return: None
  141. """
  142. self.delete('flow-server', server_instance[0])
  143. @abc.abstractmethod
  144. def _get_urls(self, service_name, with_values=False):
  145. pass
  146. @check_service_supported
  147. def get_urls(self, service_name, with_values=False):
  148. """Query service urls from database. The urls may belong to other nodes.
  149. Currently, only `fateflow` (model download) urls and `servings` (FATE-Serving) urls are supported.
  150. `fateflow` is a url containing scheme, host, port and path,
  151. while `servings` only contains host and port.
  152. :param str service_name: The service name.
  153. :return: The service urls.
  154. :rtype: list
  155. """
  156. try:
  157. return self._get_urls(service_name, with_values)
  158. except ServicesError as e:
  159. stat_logger.exception(e)
  160. return []
  161. def register_models(self):
  162. """Register all service urls of each model to database on this node.
  163. :return: None
  164. """
  165. for model in models_group_by_party_model_id_and_model_version():
  166. self.register_model(model.f_party_model_id, model.f_model_version)
  167. def unregister_models(self):
  168. """Unregister all service urls of each model to database on this node.
  169. :return: None
  170. """
  171. for model in models_group_by_party_model_id_and_model_version():
  172. self.unregister_model(model.f_party_model_id, model.f_model_version)
  173. def get_servers(self):
  174. servers = {}
  175. for znode, value in self.get_urls('flow-server', True):
  176. instance = FlowInstance(**json.loads(value))
  177. servers[instance.instance_id] = instance
  178. return servers
  179. class ZooKeeperDB(ServicesDB):
  180. """ZooKeeper Database
  181. """
  182. znodes = ZOOKEEPER_REGISTRY
  183. supported_services = znodes.keys()
  184. def __init__(self):
  185. hosts = ZOOKEEPER.get('hosts')
  186. if not isinstance(hosts, list) or not hosts:
  187. raise ZooKeeperNotConfigured()
  188. client_kwargs = {'hosts': hosts}
  189. use_acl = ZOOKEEPER.get('use_acl', False)
  190. if use_acl:
  191. username = ZOOKEEPER.get('user')
  192. password = ZOOKEEPER.get('password')
  193. if not username or not password:
  194. raise MissingZooKeeperUsernameOrPassword()
  195. client_kwargs['default_acl'] = [make_digest_acl(username, password, all=True)]
  196. client_kwargs['auth_data'] = [('digest', ':'.join([username, password]))]
  197. try:
  198. # `KazooClient` is thread-safe, it contains `_thread.RLock` and can not be pickle.
  199. # So be careful when using `self.client` outside the class.
  200. self.client = KazooClient(**client_kwargs)
  201. self.client.start()
  202. except ZookeeperError as e:
  203. raise ZooKeeperBackendError(error_message=repr(e))
  204. atexit.register(self.client.stop)
  205. self.znodes_list = Queue()
  206. Thread(target=self._watcher).start()
  207. def _insert(self, service_name, service_url, value=''):
  208. znode = self._get_znode_path(service_name, service_url)
  209. value = value.encode('utf-8')
  210. try:
  211. self.client.create(znode, value, ephemeral=True, makepath=True)
  212. except NodeExistsError:
  213. stat_logger.warning(f'Znode `{znode}` exists, add it to watch list.')
  214. self.znodes_list.put((znode, value))
  215. except ZookeeperError as e:
  216. raise ZooKeeperBackendError(error_message=repr(e))
  217. def _delete(self, service_name, service_url):
  218. znode = self._get_znode_path(service_name, service_url)
  219. try:
  220. self.client.delete(znode)
  221. except NoNodeError:
  222. stat_logger.warning(f'Znode `{znode}` not found, ignore deletion.')
  223. except ZookeeperError as e:
  224. raise ZooKeeperBackendError(error_message=repr(e))
  225. def _get_znode_path(self, service_name, service_url):
  226. """Get the znode path by service_name.
  227. :param str service_name: The service name.
  228. :param str service_url: The service url.
  229. :return: The znode path composed of `self.znodes[service_name]` and escaped `service_url`.
  230. :rtype: str
  231. :example:
  232. >>> self._get_znode_path('fateflow', 'http://127.0.0.1:9380/v1/model/transfer/arbiter-10000_guest-9999_host-10000_model/202105060929263278441')
  233. '/FATE-SERVICES/flow/online/transfer/providers/http%3A%2F%2F127.0.0.1%3A9380%2Fv1%2Fmodel%2Ftransfer%2Farbiter-10000_guest-9999_host-10000_model%2F202105060929263278441'
  234. """
  235. return '/'.join([self.znodes[service_name], parse.quote(service_url, safe='')])
  236. def _get_urls(self, service_name, with_values=False):
  237. try:
  238. _urls = self.client.get_children(self.znodes[service_name])
  239. except ZookeeperError as e:
  240. raise ZooKeeperBackendError(error_message=repr(e))
  241. urls = []
  242. for url in _urls:
  243. url = parse.unquote(url)
  244. data = ''
  245. znode = self._get_znode_path(service_name, url)
  246. if service_name == 'servings':
  247. url = parse.urlparse(url).netloc or url
  248. if with_values:
  249. try:
  250. data = self.client.get(znode)
  251. except NoNodeError:
  252. stat_logger.warning(f'Znode `{znode}` not found, return empty value.')
  253. except ZookeeperError as e:
  254. raise ZooKeeperBackendError(error_message=repr(e))
  255. else:
  256. data = data[0].decode('utf-8')
  257. urls.append((url, data) if with_values else url)
  258. return urls
  259. def _watcher(self):
  260. while True:
  261. znode, value = self.znodes_list.get()
  262. try:
  263. self.client.create(znode, value, ephemeral=True, makepath=True)
  264. except NodeExistsError:
  265. stat = self.client.exists(znode)
  266. if stat is not None:
  267. if stat.owner_session_id is None:
  268. stat_logger.warning(f'Znode `{znode}` is not an ephemeral node.')
  269. continue
  270. if stat.owner_session_id == self.client.client_id[0]:
  271. stat_logger.warning(f'Duplicate znode `{znode}`.')
  272. continue
  273. self.znodes_list.put((znode, value))
  274. class FallbackDB(ServicesDB):
  275. """Fallback Database.
  276. This class get the service url from `conf/service_conf.yaml`
  277. It cannot insert or delete the service url.
  278. """
  279. supported_services = (
  280. 'fateflow',
  281. 'flow-server',
  282. 'servings',
  283. )
  284. def _insert(self, *args, **kwargs):
  285. pass
  286. def _delete(self, *args, **kwargs):
  287. pass
  288. def _get_urls(self, service_name, with_values=False):
  289. if service_name == 'fateflow':
  290. return [(model_download_endpoint, '')] if with_values else [model_download_endpoint]
  291. if service_name == 'flow-server':
  292. return [server_instance] if with_values else [server_instance[0]]
  293. urls = getattr(ServerRegistry, service_name.upper(), [])
  294. if isinstance(urls, dict):
  295. urls = urls.get('hosts', [])
  296. if not isinstance(urls, list):
  297. urls = [urls]
  298. return [(url, '') for url in urls] if with_values else urls
  299. def service_db():
  300. """Initialize services database.
  301. Currently only ZooKeeper is supported.
  302. :return ZooKeeperDB if `use_registry` is `True`, else FallbackDB.
  303. FallbackDB is a compatible class and it actually does nothing.
  304. """
  305. if not USE_REGISTRY:
  306. return FallbackDB()
  307. if isinstance(USE_REGISTRY, str):
  308. if USE_REGISTRY.lower() == 'zookeeper':
  309. return ZooKeeperDB()
  310. # backward compatibility
  311. return ZooKeeperDB()