cluster_scheduler.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. #
  2. # Copyright 2022 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from concurrent.futures import ThreadPoolExecutor, as_completed
  17. from unittest import result
  18. from fate_flow.db.runtime_config import RuntimeConfig
  19. from fate_flow.entity import RetCode
  20. from fate_flow.scheduler import SchedulerBase
  21. from fate_flow.utils.api_utils import cluster_api
  22. from fate_flow.utils.log_utils import failed_log, schedule_logger, start_log
  23. class ClusterScheduler(SchedulerBase):
  24. @classmethod
  25. def cluster_command(cls, endpoint, json_body):
  26. log_msg = f'sending {endpoint} cluster federated command'
  27. schedule_logger().info(start_log(msg=log_msg))
  28. instance_list = RuntimeConfig.SERVICE_DB.get_servers()
  29. result = {}
  30. with ThreadPoolExecutor(max_workers=len(instance_list)) as executor:
  31. futures = {
  32. executor.submit(
  33. cluster_api,
  34. method='POST',
  35. host=instance.host,
  36. port=instance.http_port,
  37. endpoint=endpoint,
  38. json_body=json_body,
  39. ): instance_id
  40. for instance_id, instance in instance_list.items()
  41. }
  42. for future in as_completed(futures):
  43. instance = instance_list[futures[future]]
  44. try:
  45. response = future.result()
  46. except Exception as e:
  47. schedule_logger().exception(e)
  48. response = {
  49. 'retcode': RetCode.FEDERATED_ERROR,
  50. 'retmsg': f'Federated schedule error: {instance.instance_id}\n{e}',
  51. }
  52. else:
  53. if response['retcode'] != RetCode.SUCCESS:
  54. schedule_logger().error(failed_log(msg=log_msg, detail=response))
  55. result[instance.instance_id] = response
  56. return result