123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import os
- from fate_arch.common import FederatedCommunicationType
- from fate_flow.utils.job_utils import asynchronous_function
- from fate_flow.utils.log_utils import schedule_logger
- from fate_flow.controller.engine_adapt import build_engine
- from fate_flow.db.db_models import Task
- from fate_flow.scheduler.federated_scheduler import FederatedScheduler
- from fate_flow.entity.run_status import TaskStatus, EndStatus
- from fate_flow.utils import job_utils
- from fate_flow.operation.job_saver import JobSaver
- from fate_arch.common.base_utils import json_dumps, current_timestamp
- from fate_arch.common import base_utils
- from fate_flow.entity import RunParameters
- from fate_flow.manager.resource_manager import ResourceManager
- from fate_flow.operation.job_tracker import Tracker
- from fate_flow.manager.worker_manager import WorkerManager
- from fate_flow.entity.types import TaskCleanResourceType
- class TaskController(object):
- INITIATOR_COLLECT_FIELDS = ["status", "party_status", "start_time", "update_time", "end_time", "elapsed"]
- @classmethod
- def create_task(cls, role, party_id, run_on_this_party, task_info):
- task_info["role"] = role
- task_info["party_id"] = str(party_id)
- task_info["status"] = TaskStatus.WAITING
- task_info["party_status"] = TaskStatus.WAITING
- task_info["create_time"] = base_utils.current_timestamp()
- task_info["run_on_this_party"] = run_on_this_party
- if task_info.get("task_id") is None:
- task_info["task_id"] = job_utils.generate_task_id(job_id=task_info["job_id"], component_name=task_info["component_name"])
- if task_info.get("task_version") is None:
- task_info["task_version"] = 0
- task = JobSaver.create_task(task_info=task_info)
- @classmethod
- def start_task(cls, job_id, component_name, task_id, task_version, role, party_id, **kwargs):
- """
- Start task, update status and party status
- :param job_id:
- :param component_name:
- :param task_id:
- :param task_version:
- :param role:
- :param party_id:
- :return:
- """
- job_dsl = job_utils.get_job_dsl(job_id, role, party_id)
- schedule_logger(job_id).info(
- f"try to start task {task_id} {task_version} on {role} {party_id} executor subprocess")
- task_executor_process_start_status = False
- task_info = {
- "job_id": job_id,
- "task_id": task_id,
- "task_version": task_version,
- "role": role,
- "party_id": party_id,
- }
- is_failed = False
- try:
- task = JobSaver.query_task(task_id=task_id, task_version=task_version, role=role, party_id=party_id)[0]
- run_parameters_dict = job_utils.get_job_parameters(job_id, role, party_id)
- run_parameters_dict["src_user"] = kwargs.get("src_user")
- run_parameters = RunParameters(**run_parameters_dict)
- config_dir = job_utils.get_task_directory(job_id, role, party_id, component_name, task_id, task_version)
- os.makedirs(config_dir, exist_ok=True)
- run_parameters_path = os.path.join(config_dir, 'task_parameters.json')
- with open(run_parameters_path, 'w') as fw:
- fw.write(json_dumps(run_parameters_dict))
- schedule_logger(job_id).info(f"use computing engine {run_parameters.computing_engine}")
- task_info["engine_conf"] = {"computing_engine": run_parameters.computing_engine}
- backend_engine = build_engine(run_parameters.computing_engine)
- run_info = backend_engine.run(task=task,
- run_parameters=run_parameters,
- run_parameters_path=run_parameters_path,
- config_dir=config_dir,
- log_dir=job_utils.get_job_log_directory(job_id, role, party_id, component_name),
- cwd_dir=job_utils.get_job_directory(job_id, role, party_id, component_name),
- user_name=kwargs.get("user_id"))
- task_info.update(run_info)
- task_info["start_time"] = current_timestamp()
- task_executor_process_start_status = True
- except Exception as e:
- schedule_logger(job_id).exception(e)
- is_failed = True
- finally:
- try:
- cls.update_task(task_info=task_info)
- task_info["party_status"] = TaskStatus.RUNNING
- cls.update_task_status(task_info=task_info)
- if is_failed:
- task_info["party_status"] = TaskStatus.FAILED
- cls.update_task_status(task_info=task_info)
- except Exception as e:
- schedule_logger(job_id).exception(e)
- schedule_logger(job_id).info(
- "task {} {} on {} {} executor subprocess start {}".format(task_id, task_version, role, party_id, "success" if task_executor_process_start_status else "failed"))
- @classmethod
- def update_task(cls, task_info):
- """
- Save to local database and then report to Initiator
- :param task_info:
- :return:
- """
- update_status = False
- try:
- update_status = JobSaver.update_task(task_info=task_info)
- cls.report_task_to_initiator(task_info=task_info)
- except Exception as e:
- schedule_logger(task_info["job_id"]).exception(e)
- finally:
- return update_status
- @classmethod
- def update_task_status(cls, task_info):
- update_status = JobSaver.update_task_status(task_info=task_info)
- if update_status and EndStatus.contains(task_info.get("status")):
- ResourceManager.return_task_resource(task_info=task_info)
- cls.clean_task(job_id=task_info["job_id"],
- task_id=task_info["task_id"],
- task_version=task_info["task_version"],
- role=task_info["role"],
- party_id=task_info["party_id"],
- content_type=TaskCleanResourceType.TABLE,
- is_asynchronous=True)
- cls.report_task_to_initiator(task_info=task_info)
- return update_status
- @classmethod
- def report_task_to_initiator(cls, task_info):
- tasks = JobSaver.query_task(task_id=task_info["task_id"],
- task_version=task_info["task_version"],
- role=task_info["role"],
- party_id=task_info["party_id"])
- if task_info.get("error_report"):
- tasks[0].f_error_report = task_info.get("error_report")
- if tasks[0].f_federated_status_collect_type == FederatedCommunicationType.PUSH:
- FederatedScheduler.report_task_to_initiator(task=tasks[0])
- @classmethod
- def collect_task(cls, job_id, component_name, task_id, task_version, role, party_id):
- tasks = JobSaver.query_task(job_id=job_id, component_name=component_name, task_id=task_id, task_version=task_version, role=role, party_id=party_id)
- if tasks:
- return tasks[0].to_human_model_dict(only_primary_with=cls.INITIATOR_COLLECT_FIELDS)
- else:
- return None
- @classmethod
- @asynchronous_function
- def stop_task(cls, task, stop_status):
- """
- Try to stop the task, but the status depends on the final operation result
- :param task:
- :param stop_status:
- :return:
- """
- kill_status = cls.kill_task(task=task)
- task_info = {
- "job_id": task.f_job_id,
- "task_id": task.f_task_id,
- "task_version": task.f_task_version,
- "role": task.f_role,
- "party_id": task.f_party_id,
- "party_status": stop_status,
- "kill_status": True
- }
- cls.update_task_status(task_info=task_info)
- cls.update_task(task_info=task_info)
- return kill_status
- @classmethod
- def kill_task(cls, task: Task):
- kill_status = False
- try:
- # kill task executor
- backend_engine = build_engine(task.f_engine_conf.get("computing_engine"))
- if backend_engine:
- backend_engine.kill(task)
- WorkerManager.kill_task_all_workers(task)
- except Exception as e:
- schedule_logger(task.f_job_id).exception(e)
- else:
- kill_status = True
- finally:
- schedule_logger(task.f_job_id).info(
- 'task {} {} on {} {} process {} kill {}'.format(task.f_task_id,
- task.f_task_version,
- task.f_role,
- task.f_party_id,
- task.f_run_pid,
- 'success' if kill_status else 'failed'))
- return kill_status
- @classmethod
- @asynchronous_function
- def clean_task(cls, job_id, task_id, task_version, role, party_id, content_type: TaskCleanResourceType):
- status = set()
- if content_type == TaskCleanResourceType.METRICS:
- tracker = Tracker(job_id=job_id, role=role, party_id=party_id, task_id=task_id, task_version=task_version)
- status.add(tracker.clean_metrics())
- elif content_type == TaskCleanResourceType.TABLE:
- jobs = JobSaver.query_job(job_id=job_id, role=role, party_id=party_id)
- if jobs:
- job = jobs[0]
- job_parameters = RunParameters(**job.f_runtime_conf_on_party["job_parameters"])
- tracker = Tracker(job_id=job_id, role=role, party_id=party_id, task_id=task_id, task_version=task_version, job_parameters=job_parameters)
- status.add(tracker.clean_task())
- if len(status) == 1 and True in status:
- return True
- else:
- return False
|