fate_flow_server.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # init env. must be the first import
  17. import fate_flow as _
  18. import logging
  19. import os
  20. import signal
  21. import sys
  22. import traceback
  23. import grpc
  24. from werkzeug.serving import run_simple
  25. from fate_arch.common import file_utils
  26. from fate_arch.common.versions import get_versions
  27. from fate_arch.metastore.db_models import init_database_tables as init_arch_db
  28. from fate_arch.protobuf.python import proxy_pb2_grpc
  29. from fate_flow.apps import app
  30. from fate_flow.controller.version_controller import VersionController
  31. from fate_flow.db.component_registry import ComponentRegistry
  32. from fate_flow.db.config_manager import ConfigManager
  33. from fate_flow.db.db_models import init_database_tables as init_flow_db
  34. from fate_flow.db.db_services import service_db
  35. from fate_flow.db.key_manager import RsaKeyManager
  36. from fate_flow.db.runtime_config import RuntimeConfig
  37. from fate_flow.detection.detector import Detector, FederatedDetector
  38. from fate_flow.entity.types import ProcessRole
  39. from fate_flow.hook import HookManager
  40. from fate_flow.manager.provider_manager import ProviderManager
  41. from fate_flow.scheduler.dag_scheduler import DAGScheduler
  42. from fate_flow.settings import (
  43. GRPC_OPTIONS, GRPC_PORT, GRPC_SERVER_MAX_WORKERS, HOST, HTTP_PORT,
  44. access_logger, database_logger, detect_logger, stat_logger,
  45. )
  46. from fate_flow.utils.base_utils import get_fate_flow_directory
  47. from fate_flow.utils.grpc_utils import UnaryService
  48. from fate_flow.utils.log_utils import schedule_logger
  49. from fate_flow.utils.xthread import ThreadPoolExecutor
  50. if __name__ == '__main__':
  51. stat_logger.info(
  52. f'project base: {file_utils.get_project_base_directory()}, '
  53. f'fate base: {file_utils.get_fate_directory()}, '
  54. f'fate flow base: {get_fate_flow_directory()}'
  55. )
  56. # init db
  57. init_flow_db()
  58. init_arch_db()
  59. # init runtime config
  60. import argparse
  61. parser = argparse.ArgumentParser()
  62. parser.add_argument('--version', default=False, help="fate flow version", action='store_true')
  63. parser.add_argument('--debug', default=False, help="debug mode", action='store_true')
  64. args = parser.parse_args()
  65. if args.version:
  66. print(get_versions())
  67. sys.exit(0)
  68. # todo: add a general init steps?
  69. RuntimeConfig.DEBUG = args.debug
  70. if RuntimeConfig.DEBUG:
  71. stat_logger.info("run on debug mode")
  72. ConfigManager.load()
  73. RuntimeConfig.init_env()
  74. RuntimeConfig.init_config(JOB_SERVER_HOST=HOST, HTTP_PORT=HTTP_PORT)
  75. RuntimeConfig.set_process_role(ProcessRole.DRIVER)
  76. RuntimeConfig.set_service_db(service_db())
  77. RuntimeConfig.SERVICE_DB.register_flow()
  78. RuntimeConfig.SERVICE_DB.register_models()
  79. ComponentRegistry.load()
  80. default_algorithm_provider = ProviderManager.register_default_providers()
  81. RuntimeConfig.set_component_provider(default_algorithm_provider)
  82. ComponentRegistry.load()
  83. HookManager.init()
  84. RsaKeyManager.init()
  85. VersionController.init()
  86. Detector(interval=5 * 1000, logger=detect_logger).start()
  87. FederatedDetector(interval=10 * 1000, logger=detect_logger).start()
  88. DAGScheduler(interval=2 * 1000, logger=schedule_logger()).start()
  89. peewee_logger = logging.getLogger('peewee')
  90. peewee_logger.propagate = False
  91. # fate_arch.common.log.ROpenHandler
  92. peewee_logger.addHandler(database_logger.handlers[0])
  93. peewee_logger.setLevel(database_logger.level)
  94. thread_pool_executor = ThreadPoolExecutor(max_workers=GRPC_SERVER_MAX_WORKERS)
  95. stat_logger.info(f"start grpc server thread pool by {thread_pool_executor._max_workers} max workers")
  96. server = grpc.server(thread_pool=thread_pool_executor, options=GRPC_OPTIONS)
  97. proxy_pb2_grpc.add_DataTransferServiceServicer_to_server(UnaryService(), server)
  98. server.add_insecure_port(f"{HOST}:{GRPC_PORT}")
  99. server.start()
  100. print("FATE Flow grpc server start successfully")
  101. stat_logger.info("FATE Flow grpc server start successfully")
  102. # start http server
  103. try:
  104. print("FATE Flow http server start...")
  105. stat_logger.info("FATE Flow http server start...")
  106. werkzeug_logger = logging.getLogger("werkzeug")
  107. for h in access_logger.handlers:
  108. werkzeug_logger.addHandler(h)
  109. run_simple(hostname=HOST, port=HTTP_PORT, application=app, threaded=True, use_reloader=RuntimeConfig.DEBUG, use_debugger=RuntimeConfig.DEBUG)
  110. except Exception:
  111. traceback.print_exc()
  112. os.kill(os.getpid(), signal.SIGKILL)