123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- from copy import deepcopy
- import requests
- from fate_flow.utils.log_utils import schedule_logger
- from fate_flow.controller.engine_controller.engine import EngineABC
- from fate_flow.db.runtime_config import RuntimeConfig
- from fate_flow.entity.types import KillProcessRetCode
- from fate_flow.entity.run_status import LinkisJobStatus
- from fate_flow.settings import LINKIS_EXECUTE_ENTRANCE, LINKIS_SUBMIT_PARAMS, LINKIS_RUNTYPE, \
- LINKIS_LABELS, LINKIS_QUERT_STATUS, LINKIS_KILL_ENTRANCE, detect_logger
- from fate_flow.db.service_registry import ServerRegistry
- from fate_flow.db.db_models import Task
- class LinkisSparkEngine(EngineABC):
- def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_dir, cwd_dir, **kwargs):
- linkis_execute_url = "http://{}:{}{}".format(ServerRegistry.LINKIS_SPARK_CONFIG.get("host"),
- ServerRegistry.LINKIS_SPARK_CONFIG.get("port"),
- LINKIS_EXECUTE_ENTRANCE)
- headers = {"Token-Code": ServerRegistry.LINKIS_SPARK_CONFIG.get("token_code"),
- "Token-User": kwargs.get("user_name"),
- "Content-Type": "application/json"}
- schedule_logger(Task.f_job_id).info(f"headers:{headers}")
- python_path = ServerRegistry.LINKIS_SPARK_CONFIG.get("python_path")
- execution_code = 'import sys\nsys.path.append("{}")\n' \
- 'from fate_flow.worker.task_executor import TaskExecutor\n' \
- 'task_info = TaskExecutor.run_task(job_id="{}",component_name="{}",' \
- 'task_id="{}",task_version={},role="{}",party_id={},' \
- 'run_ip="{}",config="{}",job_server="{}")\n' \
- 'TaskExecutor.report_task_update_to_driver(task_info=task_info)'. \
- format(python_path, task.f_job_id, task.f_component_name, task.f_task_id, task.f_task_version, task.f_role, task.f_party_id, RuntimeConfig.JOB_SERVER_HOST,
- run_parameters_path, '{}:{}'.format(RuntimeConfig.JOB_SERVER_HOST, RuntimeConfig.HTTP_PORT))
- schedule_logger(task.f_job_id).info(f"execution code:{execution_code}")
- params = deepcopy(LINKIS_SUBMIT_PARAMS)
- schedule_logger(task.f_job_id).info(f"spark run parameters:{run_parameters.spark_run}")
- for spark_key, v in run_parameters.spark_run.items():
- if spark_key in ["spark.executor.memory", "spark.driver.memory", "spark.executor.instances",
- "wds.linkis.rm.yarnqueue"]:
- params["configuration"]["startup"][spark_key] = v
- data = {
- "method": LINKIS_EXECUTE_ENTRANCE,
- "params": params,
- "executeApplicationName": "spark",
- "executionCode": execution_code,
- "runType": LINKIS_RUNTYPE,
- "source": {},
- "labels": LINKIS_LABELS
- }
- schedule_logger(task.f_job_id).info(f'submit linkis spark, data:{data}')
- task_info = {
- "engine_conf": {}
- }
- task_info["engine_conf"]["data"] = data
- task_info["engine_conf"]["headers"] = headers
- res = requests.post(url=linkis_execute_url, headers=headers, json=data)
- schedule_logger(task.f_job_id).info(f"start linkis spark task: {res.text}")
- if res.status_code == 200:
- if res.json().get("status"):
- raise Exception(f"submit linkis spark failed: {res.json()}")
- task_info["engine_conf"]["execID"] = res.json().get("data").get("execID")
- task_info["engine_conf"]["taskID"] = res.json().get("data").get("taskID")
- schedule_logger(task.f_job_id).info('submit linkis spark success')
- else:
- raise Exception(f"submit linkis spark failed: {res.text}")
- return task_info
- @staticmethod
- def kill(task):
- linkis_query_url = "http://{}:{}{}".format(ServerRegistry.LINKIS_SPARK_CONFIG.get("host"),
- ServerRegistry.LINKIS_SPARK_CONFIG.get("port"),
- LINKIS_QUERT_STATUS.replace("execID",
- task.f_engine_conf.get("execID")))
- headers = task.f_engine_conf.get("headers")
- response = requests.get(linkis_query_url, headers=headers).json()
- schedule_logger(task.f_job_id).info(f"querty task response:{response}")
- if response.get("data").get("status") != LinkisJobStatus.SUCCESS:
- linkis_execute_url = "http://{}:{}{}".format(ServerRegistry.LINKIS_SPARK_CONFIG.get("host"),
- ServerRegistry.LINKIS_SPARK_CONFIG.get("port"),
- LINKIS_KILL_ENTRANCE.replace("execID",
- task.f_engine_conf.get("execID")))
- schedule_logger(task.f_job_id).info(f"start stop task:{linkis_execute_url}")
- schedule_logger(task.f_job_id).info(f"headers: {headers}")
- kill_result = requests.get(linkis_execute_url, headers=headers)
- schedule_logger(task.f_job_id).info(f"kill result:{kill_result}")
- if kill_result.status_code == 200:
- pass
- return KillProcessRetCode.KILLED
- def is_alive(self, task):
- process_exist = True
- try:
- linkis_query_url = "http://{}:{}{}".format(ServerRegistry.LINKIS_SPARK_CONFIG.get("host"),
- ServerRegistry.LINKIS_SPARK_CONFIG.get("port"),
- LINKIS_QUERT_STATUS.replace("execID", task.f_engine_conf.get("execID")))
- headers = task.f_engine_conf["headers"]
- response = requests.get(linkis_query_url, headers=headers).json()
- detect_logger.info(response)
- if response.get("data").get("status") == LinkisJobStatus.FAILED:
- process_exist = False
- except Exception as e:
- detect_logger.exception(e)
- process_exist = False
- return process_exist
|