spark.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import os
  17. from fate_arch.common import EngineType
  18. from fate_flow.controller.engine_controller.engine import EngineABC
  19. from fate_flow.db.db_models import Task
  20. from fate_flow.db.dependence_registry import DependenceRegistry
  21. from fate_flow.entity import ComponentProvider
  22. from fate_flow.entity.run_status import TaskStatus
  23. from fate_flow.entity.types import KillProcessRetCode, WorkerName, FateDependenceName, FateDependenceStorageEngine
  24. from fate_flow.manager.resource_manager import ResourceManager
  25. from fate_flow.manager.worker_manager import WorkerManager
  26. from fate_flow.utils import job_utils, process_utils
  27. from fate_flow.db.service_registry import ServerRegistry
  28. from fate_flow.settings import DEPENDENT_DISTRIBUTION
  29. from fate_flow.utils.log_utils import schedule_logger
  30. class SparkEngine(EngineABC):
  31. def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_dir, cwd_dir, **kwargs):
  32. spark_home = ServerRegistry.FATE_ON_SPARK.get("spark", {}).get("home")
  33. if not spark_home:
  34. try:
  35. import pyspark
  36. spark_home = pyspark.__path__[0]
  37. except ImportError as e:
  38. raise RuntimeError("can not import pyspark")
  39. except Exception as e:
  40. raise RuntimeError("can not import pyspark")
  41. # else:
  42. # raise ValueError(f"spark home must be configured in conf/service_conf.yaml when run on cluster mode")
  43. # additional configs
  44. spark_submit_config = run_parameters.spark_run
  45. deploy_mode = spark_submit_config.get("deploy-mode", "client")
  46. if deploy_mode not in ["client"]:
  47. raise ValueError(f"deploy mode {deploy_mode} not supported")
  48. spark_submit_cmd = os.path.join(spark_home, "bin/spark-submit")
  49. executable = [spark_submit_cmd, f"--name={task.f_task_id}#{task.f_role}"]
  50. for k, v in spark_submit_config.items():
  51. if k != "conf":
  52. executable.append(f"--{k}={v}")
  53. if "conf" in spark_submit_config:
  54. for ck, cv in spark_submit_config["conf"].items():
  55. executable.append(f"--conf")
  56. executable.append(f"{ck}={cv}")
  57. extra_env = {}
  58. extra_env["SPARK_HOME"] = spark_home
  59. if DEPENDENT_DISTRIBUTION:
  60. dependence = Dependence()
  61. dependence.init(provider=ComponentProvider(**task.f_provider_info))
  62. executor_env_pythonpath, executor_python_env, driver_python_env, archives = dependence.get_task_dependence_info()
  63. schedule_logger(task.f_job_id).info(f"executor_env_python {executor_python_env},"
  64. f"driver_env_python {driver_python_env}, archives {archives}")
  65. executable.append(f'--archives')
  66. executable.append(archives)
  67. executable.append(f'--conf')
  68. executable.append(f'spark.pyspark.python={executor_python_env}')
  69. executable.append(f'--conf')
  70. executable.append(f'spark.executorEnv.PYTHONPATH={executor_env_pythonpath}')
  71. executable.append(f'--conf')
  72. executable.append(f'spark.pyspark.driver.python={driver_python_env}')
  73. return WorkerManager.start_task_worker(worker_name=WorkerName.TASK_EXECUTOR, task=task,
  74. task_parameters=run_parameters, executable=executable,
  75. extra_env=extra_env)
  76. def kill(self, task):
  77. kill_status_code = process_utils.kill_task_executor_process(task)
  78. # session stop
  79. if kill_status_code is KillProcessRetCode.KILLED or task.f_status not in {TaskStatus.WAITING}:
  80. job_utils.start_session_stop(task)
  81. def is_alive(self, task):
  82. return process_utils.check_process(pid=int(task.f_run_pid), task=task)
  83. class Dependence:
  84. dependence_config = None
  85. @classmethod
  86. def init(cls, provider):
  87. cls.set_version_dependence(provider)
  88. @classmethod
  89. def set_version_dependence(cls, provider, storage_engine=FateDependenceStorageEngine.HDFS.value):
  90. dependence_config = {}
  91. for dependence_type in [FateDependenceName.Fate_Source_Code.value, FateDependenceName.Python_Env.value]:
  92. dependencies_storage_info = DependenceRegistry.get_dependencies_storage_meta(storage_engine=storage_engine,
  93. version=provider.version,
  94. type=dependence_type,
  95. get_or_one=True
  96. )
  97. dependence_config[dependence_type] = dependencies_storage_info.to_dict()
  98. cls.dependence_config = dependence_config
  99. @classmethod
  100. def get_task_dependence_info(cls):
  101. return cls.get_executor_env_pythonpath(), cls.get_executor_python_env(), cls.get_driver_python_env(), \
  102. cls.get_archives()
  103. @classmethod
  104. def get_executor_env_pythonpath(cls):
  105. return cls.dependence_config.get(FateDependenceName.Fate_Source_Code.value).get("f_dependencies_conf").get(
  106. "executor_env_pythonpath")
  107. @classmethod
  108. def get_executor_python_env(cls):
  109. return cls.dependence_config.get(FateDependenceName.Python_Env.value).get("f_dependencies_conf").get(
  110. "executor_python")
  111. @classmethod
  112. def get_driver_python_env(cls):
  113. return cls.dependence_config.get(FateDependenceName.Python_Env.value).get("f_dependencies_conf").get(
  114. "driver_python")
  115. @classmethod
  116. def get_archives(cls, storage_engine=FateDependenceStorageEngine.HDFS.value):
  117. archives = []
  118. name_node = ResourceManager.get_engine_registration_info(engine_type=EngineType.STORAGE,
  119. engine_name=storage_engine
  120. ).f_engine_config.get("name_node")
  121. for dependence_type in [FateDependenceName.Fate_Source_Code.value, FateDependenceName.Python_Env.value]:
  122. archives.append(
  123. name_node + cls.dependence_config.get(dependence_type).get("f_dependencies_conf").get("archives")
  124. )
  125. return ','.join(archives)