base.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import inspect
  17. import json
  18. import sys
  19. import traceback
  20. from base64 import b64encode
  21. from hmac import HMAC
  22. from time import time
  23. from urllib.parse import quote, urlencode
  24. from uuid import uuid1
  25. import requests
  26. from flow_sdk.client.api.base import BaseFlowAPI
  27. def _is_api_endpoint(obj):
  28. return isinstance(obj, BaseFlowAPI)
  29. class BaseFlowClient:
  30. API_BASE_URL = ''
  31. def __new__(cls, *args, **kwargs):
  32. self = super().__new__(cls)
  33. api_endpoints = inspect.getmembers(self, _is_api_endpoint)
  34. for name, api in api_endpoints:
  35. api_cls = type(api)
  36. api = api_cls(self)
  37. setattr(self, name, api)
  38. return self
  39. def __init__(self, ip, port, version, app_key=None, secret_key=None):
  40. self._http = requests.Session()
  41. self.ip = ip
  42. self.port = port
  43. self.version = version
  44. self.app_key = app_key if app_key and secret_key else None
  45. self.secret_key = secret_key if app_key and secret_key else None
  46. def _request(self, method, url, **kwargs):
  47. stream = kwargs.pop('stream', self._http.stream)
  48. prepped = requests.Request(method, self.API_BASE_URL + url, **kwargs).prepare()
  49. if self.app_key and self.secret_key:
  50. timestamp = str(round(time() * 1000))
  51. nonce = str(uuid1())
  52. signature = b64encode(HMAC(self.secret_key.encode('ascii'), b'\n'.join([
  53. timestamp.encode('ascii'),
  54. nonce.encode('ascii'),
  55. self.app_key.encode('ascii'),
  56. prepped.path_url.encode('ascii'),
  57. prepped.body if kwargs.get('json') else b'',
  58. urlencode(sorted(kwargs['data'].items()), quote_via=quote, safe='-._~').encode('ascii')
  59. if kwargs.get('data') and isinstance(kwargs['data'], dict) else b'',
  60. ]), 'sha1').digest()).decode('ascii')
  61. prepped.headers.update({
  62. 'TIMESTAMP': timestamp,
  63. 'NONCE': nonce,
  64. 'APP_KEY': self.app_key,
  65. 'SIGNATURE': signature,
  66. })
  67. try:
  68. response = self._http.send(prepped, stream=stream)
  69. except Exception as e:
  70. response = {
  71. 'retcode': 100,
  72. 'retmsg': str(e),
  73. }
  74. if 'connection refused' in response['retmsg'].lower():
  75. response['retmsg'] = 'Connection refused, Please check if the fate flow service is started'
  76. else:
  77. exc_type, exc_value, exc_traceback_obj = sys.exc_info()
  78. response['traceback'] = traceback.format_exception(exc_type, exc_value, exc_traceback_obj)
  79. return response
  80. @staticmethod
  81. def _decode_result(response):
  82. try:
  83. result = json.loads(response.content.decode('utf-8', 'ignore'), strict=False)
  84. except (TypeError, ValueError):
  85. return response
  86. else:
  87. return result
  88. def _handle_result(self, response):
  89. try:
  90. if isinstance(response, requests.models.Response):
  91. return response.json()
  92. elif isinstance(response, dict):
  93. return response
  94. else:
  95. return self._decode_result(response)
  96. except json.decoder.JSONDecodeError:
  97. res = {'retcode': 100,
  98. 'retmsg': "Internal server error. Nothing in response. You may check out the configuration in "
  99. "'FATE/conf/service_conf.yaml' and restart fate flow server."}
  100. return res
  101. def get(self, url, **kwargs):
  102. return self._request(method='get', url=url, **kwargs)
  103. def post(self, url, **kwargs):
  104. return self._request(method='post', url=url, **kwargs)