linkis_spark.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from copy import deepcopy
  17. import requests
  18. from fate_flow.utils.log_utils import schedule_logger
  19. from fate_flow.controller.engine_controller.engine import EngineABC
  20. from fate_flow.db.runtime_config import RuntimeConfig
  21. from fate_flow.entity.types import KillProcessRetCode
  22. from fate_flow.entity.run_status import LinkisJobStatus
  23. from fate_flow.settings import LINKIS_EXECUTE_ENTRANCE, LINKIS_SUBMIT_PARAMS, LINKIS_RUNTYPE, \
  24. LINKIS_LABELS, LINKIS_QUERT_STATUS, LINKIS_KILL_ENTRANCE, detect_logger
  25. from fate_flow.db.service_registry import ServerRegistry
  26. from fate_flow.db.db_models import Task
  27. class LinkisSparkEngine(EngineABC):
  28. def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_dir, cwd_dir, **kwargs):
  29. linkis_execute_url = "http://{}:{}{}".format(ServerRegistry.LINKIS_SPARK_CONFIG.get("host"),
  30. ServerRegistry.LINKIS_SPARK_CONFIG.get("port"),
  31. LINKIS_EXECUTE_ENTRANCE)
  32. headers = {"Token-Code": ServerRegistry.LINKIS_SPARK_CONFIG.get("token_code"),
  33. "Token-User": kwargs.get("user_name"),
  34. "Content-Type": "application/json"}
  35. schedule_logger(Task.f_job_id).info(f"headers:{headers}")
  36. python_path = ServerRegistry.LINKIS_SPARK_CONFIG.get("python_path")
  37. execution_code = 'import sys\nsys.path.append("{}")\n' \
  38. 'from fate_flow.worker.task_executor import TaskExecutor\n' \
  39. 'task_info = TaskExecutor.run_task(job_id="{}",component_name="{}",' \
  40. 'task_id="{}",task_version={},role="{}",party_id={},' \
  41. 'run_ip="{}",config="{}",job_server="{}")\n' \
  42. 'TaskExecutor.report_task_update_to_driver(task_info=task_info)'. \
  43. format(python_path, task.f_job_id, task.f_component_name, task.f_task_id, task.f_task_version, task.f_role, task.f_party_id, RuntimeConfig.JOB_SERVER_HOST,
  44. run_parameters_path, '{}:{}'.format(RuntimeConfig.JOB_SERVER_HOST, RuntimeConfig.HTTP_PORT))
  45. schedule_logger(task.f_job_id).info(f"execution code:{execution_code}")
  46. params = deepcopy(LINKIS_SUBMIT_PARAMS)
  47. schedule_logger(task.f_job_id).info(f"spark run parameters:{run_parameters.spark_run}")
  48. for spark_key, v in run_parameters.spark_run.items():
  49. if spark_key in ["spark.executor.memory", "spark.driver.memory", "spark.executor.instances",
  50. "wds.linkis.rm.yarnqueue"]:
  51. params["configuration"]["startup"][spark_key] = v
  52. data = {
  53. "method": LINKIS_EXECUTE_ENTRANCE,
  54. "params": params,
  55. "executeApplicationName": "spark",
  56. "executionCode": execution_code,
  57. "runType": LINKIS_RUNTYPE,
  58. "source": {},
  59. "labels": LINKIS_LABELS
  60. }
  61. schedule_logger(task.f_job_id).info(f'submit linkis spark, data:{data}')
  62. task_info = {
  63. "engine_conf": {}
  64. }
  65. task_info["engine_conf"]["data"] = data
  66. task_info["engine_conf"]["headers"] = headers
  67. res = requests.post(url=linkis_execute_url, headers=headers, json=data)
  68. schedule_logger(task.f_job_id).info(f"start linkis spark task: {res.text}")
  69. if res.status_code == 200:
  70. if res.json().get("status"):
  71. raise Exception(f"submit linkis spark failed: {res.json()}")
  72. task_info["engine_conf"]["execID"] = res.json().get("data").get("execID")
  73. task_info["engine_conf"]["taskID"] = res.json().get("data").get("taskID")
  74. schedule_logger(task.f_job_id).info('submit linkis spark success')
  75. else:
  76. raise Exception(f"submit linkis spark failed: {res.text}")
  77. return task_info
  78. @staticmethod
  79. def kill(task):
  80. linkis_query_url = "http://{}:{}{}".format(ServerRegistry.LINKIS_SPARK_CONFIG.get("host"),
  81. ServerRegistry.LINKIS_SPARK_CONFIG.get("port"),
  82. LINKIS_QUERT_STATUS.replace("execID",
  83. task.f_engine_conf.get("execID")))
  84. headers = task.f_engine_conf.get("headers")
  85. response = requests.get(linkis_query_url, headers=headers).json()
  86. schedule_logger(task.f_job_id).info(f"querty task response:{response}")
  87. if response.get("data").get("status") != LinkisJobStatus.SUCCESS:
  88. linkis_execute_url = "http://{}:{}{}".format(ServerRegistry.LINKIS_SPARK_CONFIG.get("host"),
  89. ServerRegistry.LINKIS_SPARK_CONFIG.get("port"),
  90. LINKIS_KILL_ENTRANCE.replace("execID",
  91. task.f_engine_conf.get("execID")))
  92. schedule_logger(task.f_job_id).info(f"start stop task:{linkis_execute_url}")
  93. schedule_logger(task.f_job_id).info(f"headers: {headers}")
  94. kill_result = requests.get(linkis_execute_url, headers=headers)
  95. schedule_logger(task.f_job_id).info(f"kill result:{kill_result}")
  96. if kill_result.status_code == 200:
  97. pass
  98. return KillProcessRetCode.KILLED
  99. def is_alive(self, task):
  100. process_exist = True
  101. try:
  102. linkis_query_url = "http://{}:{}{}".format(ServerRegistry.LINKIS_SPARK_CONFIG.get("host"),
  103. ServerRegistry.LINKIS_SPARK_CONFIG.get("port"),
  104. LINKIS_QUERT_STATUS.replace("execID", task.f_engine_conf.get("execID")))
  105. headers = task.f_engine_conf["headers"]
  106. response = requests.get(linkis_query_url, headers=headers).json()
  107. detect_logger.info(response)
  108. if response.get("data").get("status") == LinkisJobStatus.FAILED:
  109. process_exist = False
  110. except Exception as e:
  111. detect_logger.exception(e)
  112. process_exist = False
  113. return process_exist