123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import os
- from fate_arch.common import EngineType
- from fate_flow.controller.engine_controller.engine import EngineABC
- from fate_flow.db.db_models import Task
- from fate_flow.db.dependence_registry import DependenceRegistry
- from fate_flow.entity import ComponentProvider
- from fate_flow.entity.run_status import TaskStatus
- from fate_flow.entity.types import KillProcessRetCode, WorkerName, FateDependenceName, FateDependenceStorageEngine
- from fate_flow.manager.resource_manager import ResourceManager
- from fate_flow.manager.worker_manager import WorkerManager
- from fate_flow.utils import job_utils, process_utils
- from fate_flow.db.service_registry import ServerRegistry
- from fate_flow.settings import DEPENDENT_DISTRIBUTION
- from fate_flow.utils.log_utils import schedule_logger
- class SparkEngine(EngineABC):
- def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_dir, cwd_dir, **kwargs):
- spark_home = ServerRegistry.FATE_ON_SPARK.get("spark", {}).get("home")
- if not spark_home:
- try:
- import pyspark
- spark_home = pyspark.__path__[0]
- except ImportError as e:
- raise RuntimeError("can not import pyspark")
- except Exception as e:
- raise RuntimeError("can not import pyspark")
- # else:
- # raise ValueError(f"spark home must be configured in conf/service_conf.yaml when run on cluster mode")
- # additional configs
- spark_submit_config = run_parameters.spark_run
- deploy_mode = spark_submit_config.get("deploy-mode", "client")
- if deploy_mode not in ["client"]:
- raise ValueError(f"deploy mode {deploy_mode} not supported")
- spark_submit_cmd = os.path.join(spark_home, "bin/spark-submit")
- executable = [spark_submit_cmd, f"--name={task.f_task_id}#{task.f_role}"]
- for k, v in spark_submit_config.items():
- if k != "conf":
- executable.append(f"--{k}={v}")
- if "conf" in spark_submit_config:
- for ck, cv in spark_submit_config["conf"].items():
- executable.append(f"--conf")
- executable.append(f"{ck}={cv}")
- extra_env = {}
- extra_env["SPARK_HOME"] = spark_home
- if DEPENDENT_DISTRIBUTION:
- dependence = Dependence()
- dependence.init(provider=ComponentProvider(**task.f_provider_info))
- executor_env_pythonpath, executor_python_env, driver_python_env, archives = dependence.get_task_dependence_info()
- schedule_logger(task.f_job_id).info(f"executor_env_python {executor_python_env},"
- f"driver_env_python {driver_python_env}, archives {archives}")
- executable.append(f'--archives')
- executable.append(archives)
- executable.append(f'--conf')
- executable.append(f'spark.pyspark.python={executor_python_env}')
- executable.append(f'--conf')
- executable.append(f'spark.executorEnv.PYTHONPATH={executor_env_pythonpath}')
- executable.append(f'--conf')
- executable.append(f'spark.pyspark.driver.python={driver_python_env}')
- return WorkerManager.start_task_worker(worker_name=WorkerName.TASK_EXECUTOR, task=task,
- task_parameters=run_parameters, executable=executable,
- extra_env=extra_env)
- def kill(self, task):
- kill_status_code = process_utils.kill_task_executor_process(task)
- # session stop
- if kill_status_code is KillProcessRetCode.KILLED or task.f_status not in {TaskStatus.WAITING}:
- job_utils.start_session_stop(task)
- def is_alive(self, task):
- return process_utils.check_process(pid=int(task.f_run_pid), task=task)
- class Dependence:
- dependence_config = None
- @classmethod
- def init(cls, provider):
- cls.set_version_dependence(provider)
- @classmethod
- def set_version_dependence(cls, provider, storage_engine=FateDependenceStorageEngine.HDFS.value):
- dependence_config = {}
- for dependence_type in [FateDependenceName.Fate_Source_Code.value, FateDependenceName.Python_Env.value]:
- dependencies_storage_info = DependenceRegistry.get_dependencies_storage_meta(storage_engine=storage_engine,
- version=provider.version,
- type=dependence_type,
- get_or_one=True
- )
- dependence_config[dependence_type] = dependencies_storage_info.to_dict()
- cls.dependence_config = dependence_config
- @classmethod
- def get_task_dependence_info(cls):
- return cls.get_executor_env_pythonpath(), cls.get_executor_python_env(), cls.get_driver_python_env(), \
- cls.get_archives()
- @classmethod
- def get_executor_env_pythonpath(cls):
- return cls.dependence_config.get(FateDependenceName.Fate_Source_Code.value).get("f_dependencies_conf").get(
- "executor_env_pythonpath")
- @classmethod
- def get_executor_python_env(cls):
- return cls.dependence_config.get(FateDependenceName.Python_Env.value).get("f_dependencies_conf").get(
- "executor_python")
- @classmethod
- def get_driver_python_env(cls):
- return cls.dependence_config.get(FateDependenceName.Python_Env.value).get("f_dependencies_conf").get(
- "driver_python")
- @classmethod
- def get_archives(cls, storage_engine=FateDependenceStorageEngine.HDFS.value):
- archives = []
- name_node = ResourceManager.get_engine_registration_info(engine_type=EngineType.STORAGE,
- engine_name=storage_engine
- ).f_engine_config.get("name_node")
- for dependence_type in [FateDependenceName.Fate_Source_Code.value, FateDependenceName.Python_Env.value]:
- archives.append(
- name_node + cls.dependence_config.get(dependence_type).get("f_dependencies_conf").get("archives")
- )
- return ','.join(archives)
|