123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import math
- import typing
- from fate_arch.common import EngineType
- from fate_arch.common import base_utils
- from fate_flow.utils.log_utils import schedule_logger
- from fate_arch.computing import ComputingEngine
- from fate_arch.common import engine_utils
- from fate_flow.db.db_models import DB, EngineRegistry, Job
- from fate_flow.entity.types import ResourceOperation
- from fate_flow.entity import RunParameters
- from fate_flow.operation.job_saver import JobSaver
- from fate_flow.settings import stat_logger, IGNORE_RESOURCE_ROLES, SUPPORT_IGNORE_RESOURCE_ENGINES, \
- IGNORE_RESOURCE_COMPUTING_ENGINE, ENGINES
- from fate_flow.utils import job_utils
- from fate_flow.db.job_default_config import JobDefaultConfig
- class ResourceManager(object):
- @classmethod
- def initialize(cls):
- engines_config, engine_group_map = engine_utils.get_engines_config_from_conf(group_map=True)
- for engine_type, engine_configs in engines_config.items():
- for engine_name, engine_config in engine_configs.items():
- cls.register_engine(engine_type=engine_type, engine_name=engine_name, engine_entrance=engine_group_map[engine_type][engine_name], engine_config=engine_config)
- @classmethod
- @DB.connection_context()
- def register_engine(cls, engine_type, engine_name, engine_entrance, engine_config):
- nodes = engine_config.get("nodes", 1)
- cores = engine_config.get("cores_per_node", 0) * nodes * JobDefaultConfig.total_cores_overweight_percent
- memory = engine_config.get("memory_per_node", 0) * nodes * JobDefaultConfig.total_memory_overweight_percent
- filters = [EngineRegistry.f_engine_type == engine_type, EngineRegistry.f_engine_name == engine_name]
- resources = EngineRegistry.select().where(*filters)
- if resources:
- resource = resources[0]
- update_fields = {}
- update_fields[EngineRegistry.f_engine_config] = engine_config
- update_fields[EngineRegistry.f_cores] = cores
- update_fields[EngineRegistry.f_memory] = memory
- update_fields[EngineRegistry.f_remaining_cores] = EngineRegistry.f_remaining_cores + (
- cores - resource.f_cores)
- update_fields[EngineRegistry.f_remaining_memory] = EngineRegistry.f_remaining_memory + (
- memory - resource.f_memory)
- update_fields[EngineRegistry.f_nodes] = nodes
- operate = EngineRegistry.update(update_fields).where(*filters)
- update_status = operate.execute() > 0
- if update_status:
- stat_logger.info(f"update {engine_type} engine {engine_name} {engine_entrance} registration information")
- else:
- stat_logger.info(f"update {engine_type} engine {engine_name} {engine_entrance} registration information takes no effect")
- else:
- resource = EngineRegistry()
- resource.f_create_time = base_utils.current_timestamp()
- resource.f_engine_type = engine_type
- resource.f_engine_name = engine_name
- resource.f_engine_entrance = engine_entrance
- resource.f_engine_config = engine_config
- resource.f_cores = cores
- resource.f_memory = memory
- resource.f_remaining_cores = cores
- resource.f_remaining_memory = memory
- resource.f_nodes = nodes
- try:
- resource.save(force_insert=True)
- except Exception as e:
- stat_logger.warning(e)
- stat_logger.info(f"create {engine_type} engine {engine_name} {engine_entrance} registration information")
- @classmethod
- def check_resource_apply(cls, job_parameters: RunParameters, role, party_id, engines_info):
- computing_engine, cores, memory = cls.calculate_job_resource(job_parameters=job_parameters, role=role, party_id=party_id)
- max_cores_per_job = math.floor(engines_info[EngineType.COMPUTING].f_cores * JobDefaultConfig.max_cores_percent_per_job) \
- if engines_info.get(EngineType.COMPUTING) else 0
- if cores > max_cores_per_job:
- return False, cores, max_cores_per_job
- return True, cores, max_cores_per_job
- @classmethod
- def apply_for_job_resource(cls, job_id, role, party_id):
- return cls.resource_for_job(job_id=job_id, role=role, party_id=party_id, operation_type=ResourceOperation.APPLY)
- @classmethod
- def return_job_resource(cls, job_id, role, party_id):
- return cls.resource_for_job(job_id=job_id, role=role, party_id=party_id,
- operation_type=ResourceOperation.RETURN)
- @classmethod
- def query_resource(cls, resource_in_use=True, engine_name=None):
- if not engine_name:
- engine_name = ENGINES.get(EngineType.COMPUTING)
- use_resource_jobs = JobSaver.query_job(resource_in_use=resource_in_use)
- used = []
- for job in use_resource_jobs:
- used.append({"job_id": job.f_job_id, "role": job.f_role, "party_id": job.f_party_id,
- "core": job.f_cores, "memory": job.f_memory})
- computing_engine_resource = cls.get_engine_registration_info(engine_type=EngineType.COMPUTING, engine_name=engine_name)
- return used, computing_engine_resource.to_dict() if computing_engine_resource else {}
- @classmethod
- def return_resource(cls, job_id):
- jobs = JobSaver.query_job(job_id=job_id)
- if not jobs:
- raise Exception(f'no found job {job_id}')
- return_resource_job_list = []
- for job in jobs:
- job_info = {"job_id": job.f_job_id, "role": job.f_role, "party_id": job.f_party_id,
- "resource_in_use": job.f_resource_in_use, "resource_return_status": False}
- if job.f_resource_in_use:
- return_status = cls.return_job_resource(job.f_job_id, job.f_role, job.f_party_id)
- job_info["resource_return_status"] = return_status
- return_resource_job_list.append(job_info)
- return return_resource_job_list
- @classmethod
- @DB.connection_context()
- def resource_for_job(cls, job_id, role, party_id, operation_type: ResourceOperation):
- operate_status = False
- engine_name, cores, memory = cls.calculate_job_resource(job_id=job_id, role=role, party_id=party_id)
- try:
- with DB.atomic():
- updates = {
- Job.f_engine_type: EngineType.COMPUTING,
- Job.f_engine_name: engine_name,
- Job.f_cores: cores,
- Job.f_memory: memory,
- }
- filters = [
- Job.f_job_id == job_id,
- Job.f_role == role,
- Job.f_party_id == party_id,
- ]
- if operation_type is ResourceOperation.APPLY:
- updates[Job.f_remaining_cores] = cores
- updates[Job.f_remaining_memory] = memory
- updates[Job.f_resource_in_use] = True
- updates[Job.f_apply_resource_time] = base_utils.current_timestamp()
- filters.append(Job.f_resource_in_use == False)
- elif operation_type is ResourceOperation.RETURN:
- updates[Job.f_resource_in_use] = False
- updates[Job.f_return_resource_time] = base_utils.current_timestamp()
- filters.append(Job.f_resource_in_use == True)
- operate = Job.update(updates).where(*filters)
- record_status = operate.execute() > 0
- if not record_status:
- raise RuntimeError(f"record job {job_id} resource {operation_type} failed on {role} {party_id}")
- if cores or memory:
- filters, updates = cls.update_resource_sql(resource_model=EngineRegistry,
- cores=cores,
- memory=memory,
- operation_type=operation_type,
- )
- filters.append(EngineRegistry.f_engine_type == EngineType.COMPUTING)
- filters.append(EngineRegistry.f_engine_name == engine_name)
- operate = EngineRegistry.update(updates).where(*filters)
- apply_status = operate.execute() > 0
- else:
- apply_status = True
- if not apply_status:
- raise RuntimeError(
- f"update engine {engine_name} record for job {job_id} resource {operation_type} on {role} {party_id} failed")
- operate_status = True
- except Exception as e:
- schedule_logger(job_id).warning(e)
- schedule_logger(job_id).warning(
- f"{operation_type} job resource(cores {cores} memory {memory}) on {role} {party_id} failed")
- operate_status = False
- finally:
- remaining_cores, remaining_memory = cls.get_remaining_resource(EngineRegistry,
- [
- EngineRegistry.f_engine_type == EngineType.COMPUTING,
- EngineRegistry.f_engine_name == engine_name])
- operate_msg = "successfully" if operate_status else "failed"
- schedule_logger(job_id).info(
- f"{operation_type} job resource(cores {cores} memory {memory}) on {role} {party_id} {operate_msg}, remaining cores: {remaining_cores} remaining memory: {remaining_memory}")
- return operate_status
- @classmethod
- def adapt_engine_parameters(cls, role, job_parameters: RunParameters, create_initiator_baseline=False):
- computing_engine_info = ResourceManager.get_engine_registration_info(engine_type=EngineType.COMPUTING,
- engine_name=job_parameters.computing_engine)
- if not job_parameters.adaptation_parameters or create_initiator_baseline:
- job_parameters.adaptation_parameters = {
- "task_nodes": 0,
- "task_cores_per_node": 0,
- "task_memory_per_node": 0,
- # request_task_cores base on initiator and distribute to all parties, using job conf parameters or initiator fateflow server default settings
- "request_task_cores": int(job_parameters.task_cores) if job_parameters.task_cores else JobDefaultConfig.task_cores,
- "if_initiator_baseline": True
- }
- else:
- # use initiator baseline
- if role == "arbiter":
- job_parameters.adaptation_parameters["request_task_cores"] = 1
- elif "request_task_cores" not in job_parameters.adaptation_parameters:
- # compatibility 1.5.0
- job_parameters.adaptation_parameters["request_task_cores"] = job_parameters.adaptation_parameters["task_nodes"] * job_parameters.adaptation_parameters["task_cores_per_node"]
- job_parameters.adaptation_parameters["if_initiator_baseline"] = False
- adaptation_parameters = job_parameters.adaptation_parameters
- if job_parameters.computing_engine in {ComputingEngine.STANDALONE, ComputingEngine.EGGROLL}:
- adaptation_parameters["task_nodes"] = computing_engine_info.f_nodes
- if int(job_parameters.eggroll_run.get("eggroll.session.processors.per.node", 0)) > 0:
- adaptation_parameters["task_cores_per_node"] = int(job_parameters.eggroll_run["eggroll.session.processors.per.node"])
- else:
- adaptation_parameters["task_cores_per_node"] = max(1, int(adaptation_parameters["request_task_cores"] / adaptation_parameters["task_nodes"]))
- if not create_initiator_baseline:
- # set the adaptation parameters to the actual engine operation parameters
- job_parameters.eggroll_run["eggroll.session.processors.per.node"] = adaptation_parameters["task_cores_per_node"]
- elif job_parameters.computing_engine == ComputingEngine.SPARK or job_parameters.computing_engine == ComputingEngine.LINKIS_SPARK:
- adaptation_parameters["task_nodes"] = int(job_parameters.spark_run.get("num-executors", computing_engine_info.f_nodes))
- if int(job_parameters.spark_run.get("executor-cores", 0)) > 0:
- adaptation_parameters["task_cores_per_node"] = int(job_parameters.spark_run["executor-cores"])
- else:
- adaptation_parameters["task_cores_per_node"] = max(1, int(adaptation_parameters["request_task_cores"] / adaptation_parameters["task_nodes"]))
- if not create_initiator_baseline:
- # set the adaptation parameters to the actual engine operation parameters
- job_parameters.spark_run["num-executors"] = adaptation_parameters["task_nodes"]
- job_parameters.spark_run["executor-cores"] = adaptation_parameters["task_cores_per_node"]
- @classmethod
- def calculate_job_resource(cls, job_parameters: RunParameters = None, job_id=None, role=None, party_id=None):
- if not job_parameters:
- job_parameters = job_utils.get_job_parameters(job_id=job_id,
- role=role,
- party_id=party_id)
- job_parameters = RunParameters(**job_parameters)
- cores = 0
- memory = 0
- if not (job_parameters.computing_engine in IGNORE_RESOURCE_COMPUTING_ENGINE or
- role in IGNORE_RESOURCE_ROLES and job_parameters.computing_engine in SUPPORT_IGNORE_RESOURCE_ENGINES):
- cores = (int(job_parameters.adaptation_parameters["task_cores_per_node"] or 0) *
- int(job_parameters.adaptation_parameters["task_nodes"] or 0) *
- int(job_parameters.task_parallelism or 0))
- memory = (int(job_parameters.adaptation_parameters["task_memory_per_node"] or 0) *
- int(job_parameters.adaptation_parameters["task_nodes"] or 0) *
- int(job_parameters.task_parallelism or 0))
- return job_parameters.computing_engine, cores, memory
- @classmethod
- def calculate_task_resource(cls, task_parameters: RunParameters = None, task_info: dict = None):
- if not task_parameters:
- job_parameters = job_utils.get_job_parameters(job_id=task_info["job_id"],
- role=task_info["role"],
- party_id=task_info["party_id"])
- task_parameters = RunParameters(**job_parameters)
- if task_parameters.computing_engine in IGNORE_RESOURCE_COMPUTING_ENGINE:
- cores_per_task = 0
- memory_per_task = 0
- elif task_info["role"] in IGNORE_RESOURCE_ROLES and task_parameters.computing_engine in SUPPORT_IGNORE_RESOURCE_ENGINES:
- cores_per_task = 0
- memory_per_task = 0
- else:
- cores_per_task = task_parameters.adaptation_parameters["task_cores_per_node"] * \
- task_parameters.adaptation_parameters["task_nodes"]
- memory_per_task = task_parameters.adaptation_parameters["task_memory_per_node"] * \
- task_parameters.adaptation_parameters["task_nodes"]
- return cores_per_task, memory_per_task
- @classmethod
- def apply_for_task_resource(cls, task_info):
- return ResourceManager.resource_for_task(task_info=task_info, operation_type=ResourceOperation.APPLY)
- @classmethod
- def return_task_resource(cls, task_info):
- return ResourceManager.resource_for_task(task_info=task_info, operation_type=ResourceOperation.RETURN)
- @classmethod
- def resource_for_task(cls, task_info, operation_type):
- cores_per_task, memory_per_task = cls.calculate_task_resource(task_info=task_info)
- schedule_logger(task_info["job_id"]).info(f"cores_per_task:{cores_per_task}, memory_per_task:{memory_per_task}")
- if cores_per_task or memory_per_task:
- filters, updates = cls.update_resource_sql(resource_model=Job,
- cores=cores_per_task,
- memory=memory_per_task,
- operation_type=operation_type,
- )
- filters.append(Job.f_job_id == task_info["job_id"])
- filters.append(Job.f_role == task_info["role"])
- filters.append(Job.f_party_id == task_info["party_id"])
- filters.append(Job.f_resource_in_use == True)
- operate = Job.update(updates).where(*filters)
- operate_status = operate.execute() > 0
- else:
- operate_status = True
- if operate_status:
- schedule_logger(task_info["job_id"]).info(
- "task {} {} {} resource successfully".format(task_info["task_id"],
- task_info["task_version"], operation_type))
- else:
- schedule_logger(task_info["job_id"]).warning(
- "task {} {} {} resource failed".format(task_info["task_id"],
- task_info["task_version"], operation_type))
- return operate_status
- @classmethod
- def update_resource_sql(cls, resource_model: typing.Union[EngineRegistry, Job], cores, memory, operation_type: ResourceOperation):
- if operation_type is ResourceOperation.APPLY:
- filters = [
- resource_model.f_remaining_cores >= cores,
- resource_model.f_remaining_memory >= memory
- ]
- updates = {resource_model.f_remaining_cores: resource_model.f_remaining_cores - cores,
- resource_model.f_remaining_memory: resource_model.f_remaining_memory - memory}
- elif operation_type is ResourceOperation.RETURN:
- filters = []
- updates = {resource_model.f_remaining_cores: resource_model.f_remaining_cores + cores,
- resource_model.f_remaining_memory: resource_model.f_remaining_memory + memory}
- else:
- raise RuntimeError(f"can not support {operation_type} resource operation type")
- return filters, updates
- @classmethod
- @DB.connection_context()
- def get_remaining_resource(cls, resource_model: typing.Union[EngineRegistry, Job], filters):
- remaining_cores, remaining_memory = None, None
- try:
- objs = resource_model.select(resource_model.f_remaining_cores, resource_model.f_remaining_memory).where(
- *filters)
- if objs:
- remaining_cores, remaining_memory = objs[0].f_remaining_cores, objs[0].f_remaining_memory
- except Exception as e:
- schedule_logger().exception(e)
- finally:
- return remaining_cores, remaining_memory
- @classmethod
- @DB.connection_context()
- def get_engine_registration_info(cls, engine_type, engine_name) -> EngineRegistry:
- engines = EngineRegistry.select().where(EngineRegistry.f_engine_type == engine_type,
- EngineRegistry.f_engine_name == engine_name)
- if engines:
- return engines[0]
|