resource_manager.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import math
  17. import typing
  18. from fate_arch.common import EngineType
  19. from fate_arch.common import base_utils
  20. from fate_flow.utils.log_utils import schedule_logger
  21. from fate_arch.computing import ComputingEngine
  22. from fate_arch.common import engine_utils
  23. from fate_flow.db.db_models import DB, EngineRegistry, Job
  24. from fate_flow.entity.types import ResourceOperation
  25. from fate_flow.entity import RunParameters
  26. from fate_flow.operation.job_saver import JobSaver
  27. from fate_flow.settings import stat_logger, IGNORE_RESOURCE_ROLES, SUPPORT_IGNORE_RESOURCE_ENGINES, \
  28. IGNORE_RESOURCE_COMPUTING_ENGINE, ENGINES
  29. from fate_flow.utils import job_utils
  30. from fate_flow.db.job_default_config import JobDefaultConfig
  31. class ResourceManager(object):
  32. @classmethod
  33. def initialize(cls):
  34. engines_config, engine_group_map = engine_utils.get_engines_config_from_conf(group_map=True)
  35. for engine_type, engine_configs in engines_config.items():
  36. for engine_name, engine_config in engine_configs.items():
  37. cls.register_engine(engine_type=engine_type, engine_name=engine_name, engine_entrance=engine_group_map[engine_type][engine_name], engine_config=engine_config)
  38. @classmethod
  39. @DB.connection_context()
  40. def register_engine(cls, engine_type, engine_name, engine_entrance, engine_config):
  41. nodes = engine_config.get("nodes", 1)
  42. cores = engine_config.get("cores_per_node", 0) * nodes * JobDefaultConfig.total_cores_overweight_percent
  43. memory = engine_config.get("memory_per_node", 0) * nodes * JobDefaultConfig.total_memory_overweight_percent
  44. filters = [EngineRegistry.f_engine_type == engine_type, EngineRegistry.f_engine_name == engine_name]
  45. resources = EngineRegistry.select().where(*filters)
  46. if resources:
  47. resource = resources[0]
  48. update_fields = {}
  49. update_fields[EngineRegistry.f_engine_config] = engine_config
  50. update_fields[EngineRegistry.f_cores] = cores
  51. update_fields[EngineRegistry.f_memory] = memory
  52. update_fields[EngineRegistry.f_remaining_cores] = EngineRegistry.f_remaining_cores + (
  53. cores - resource.f_cores)
  54. update_fields[EngineRegistry.f_remaining_memory] = EngineRegistry.f_remaining_memory + (
  55. memory - resource.f_memory)
  56. update_fields[EngineRegistry.f_nodes] = nodes
  57. operate = EngineRegistry.update(update_fields).where(*filters)
  58. update_status = operate.execute() > 0
  59. if update_status:
  60. stat_logger.info(f"update {engine_type} engine {engine_name} {engine_entrance} registration information")
  61. else:
  62. stat_logger.info(f"update {engine_type} engine {engine_name} {engine_entrance} registration information takes no effect")
  63. else:
  64. resource = EngineRegistry()
  65. resource.f_create_time = base_utils.current_timestamp()
  66. resource.f_engine_type = engine_type
  67. resource.f_engine_name = engine_name
  68. resource.f_engine_entrance = engine_entrance
  69. resource.f_engine_config = engine_config
  70. resource.f_cores = cores
  71. resource.f_memory = memory
  72. resource.f_remaining_cores = cores
  73. resource.f_remaining_memory = memory
  74. resource.f_nodes = nodes
  75. try:
  76. resource.save(force_insert=True)
  77. except Exception as e:
  78. stat_logger.warning(e)
  79. stat_logger.info(f"create {engine_type} engine {engine_name} {engine_entrance} registration information")
  80. @classmethod
  81. def check_resource_apply(cls, job_parameters: RunParameters, role, party_id, engines_info):
  82. computing_engine, cores, memory = cls.calculate_job_resource(job_parameters=job_parameters, role=role, party_id=party_id)
  83. max_cores_per_job = math.floor(engines_info[EngineType.COMPUTING].f_cores * JobDefaultConfig.max_cores_percent_per_job) \
  84. if engines_info.get(EngineType.COMPUTING) else 0
  85. if cores > max_cores_per_job:
  86. return False, cores, max_cores_per_job
  87. return True, cores, max_cores_per_job
  88. @classmethod
  89. def apply_for_job_resource(cls, job_id, role, party_id):
  90. return cls.resource_for_job(job_id=job_id, role=role, party_id=party_id, operation_type=ResourceOperation.APPLY)
  91. @classmethod
  92. def return_job_resource(cls, job_id, role, party_id):
  93. return cls.resource_for_job(job_id=job_id, role=role, party_id=party_id,
  94. operation_type=ResourceOperation.RETURN)
  95. @classmethod
  96. def query_resource(cls, resource_in_use=True, engine_name=None):
  97. if not engine_name:
  98. engine_name = ENGINES.get(EngineType.COMPUTING)
  99. use_resource_jobs = JobSaver.query_job(resource_in_use=resource_in_use)
  100. used = []
  101. for job in use_resource_jobs:
  102. used.append({"job_id": job.f_job_id, "role": job.f_role, "party_id": job.f_party_id,
  103. "core": job.f_cores, "memory": job.f_memory})
  104. computing_engine_resource = cls.get_engine_registration_info(engine_type=EngineType.COMPUTING, engine_name=engine_name)
  105. return used, computing_engine_resource.to_dict() if computing_engine_resource else {}
  106. @classmethod
  107. def return_resource(cls, job_id):
  108. jobs = JobSaver.query_job(job_id=job_id)
  109. if not jobs:
  110. raise Exception(f'no found job {job_id}')
  111. return_resource_job_list = []
  112. for job in jobs:
  113. job_info = {"job_id": job.f_job_id, "role": job.f_role, "party_id": job.f_party_id,
  114. "resource_in_use": job.f_resource_in_use, "resource_return_status": False}
  115. if job.f_resource_in_use:
  116. return_status = cls.return_job_resource(job.f_job_id, job.f_role, job.f_party_id)
  117. job_info["resource_return_status"] = return_status
  118. return_resource_job_list.append(job_info)
  119. return return_resource_job_list
  120. @classmethod
  121. @DB.connection_context()
  122. def resource_for_job(cls, job_id, role, party_id, operation_type: ResourceOperation):
  123. operate_status = False
  124. engine_name, cores, memory = cls.calculate_job_resource(job_id=job_id, role=role, party_id=party_id)
  125. try:
  126. with DB.atomic():
  127. updates = {
  128. Job.f_engine_type: EngineType.COMPUTING,
  129. Job.f_engine_name: engine_name,
  130. Job.f_cores: cores,
  131. Job.f_memory: memory,
  132. }
  133. filters = [
  134. Job.f_job_id == job_id,
  135. Job.f_role == role,
  136. Job.f_party_id == party_id,
  137. ]
  138. if operation_type is ResourceOperation.APPLY:
  139. updates[Job.f_remaining_cores] = cores
  140. updates[Job.f_remaining_memory] = memory
  141. updates[Job.f_resource_in_use] = True
  142. updates[Job.f_apply_resource_time] = base_utils.current_timestamp()
  143. filters.append(Job.f_resource_in_use == False)
  144. elif operation_type is ResourceOperation.RETURN:
  145. updates[Job.f_resource_in_use] = False
  146. updates[Job.f_return_resource_time] = base_utils.current_timestamp()
  147. filters.append(Job.f_resource_in_use == True)
  148. operate = Job.update(updates).where(*filters)
  149. record_status = operate.execute() > 0
  150. if not record_status:
  151. raise RuntimeError(f"record job {job_id} resource {operation_type} failed on {role} {party_id}")
  152. if cores or memory:
  153. filters, updates = cls.update_resource_sql(resource_model=EngineRegistry,
  154. cores=cores,
  155. memory=memory,
  156. operation_type=operation_type,
  157. )
  158. filters.append(EngineRegistry.f_engine_type == EngineType.COMPUTING)
  159. filters.append(EngineRegistry.f_engine_name == engine_name)
  160. operate = EngineRegistry.update(updates).where(*filters)
  161. apply_status = operate.execute() > 0
  162. else:
  163. apply_status = True
  164. if not apply_status:
  165. raise RuntimeError(
  166. f"update engine {engine_name} record for job {job_id} resource {operation_type} on {role} {party_id} failed")
  167. operate_status = True
  168. except Exception as e:
  169. schedule_logger(job_id).warning(e)
  170. schedule_logger(job_id).warning(
  171. f"{operation_type} job resource(cores {cores} memory {memory}) on {role} {party_id} failed")
  172. operate_status = False
  173. finally:
  174. remaining_cores, remaining_memory = cls.get_remaining_resource(EngineRegistry,
  175. [
  176. EngineRegistry.f_engine_type == EngineType.COMPUTING,
  177. EngineRegistry.f_engine_name == engine_name])
  178. operate_msg = "successfully" if operate_status else "failed"
  179. schedule_logger(job_id).info(
  180. f"{operation_type} job resource(cores {cores} memory {memory}) on {role} {party_id} {operate_msg}, remaining cores: {remaining_cores} remaining memory: {remaining_memory}")
  181. return operate_status
  182. @classmethod
  183. def adapt_engine_parameters(cls, role, job_parameters: RunParameters, create_initiator_baseline=False):
  184. computing_engine_info = ResourceManager.get_engine_registration_info(engine_type=EngineType.COMPUTING,
  185. engine_name=job_parameters.computing_engine)
  186. if not job_parameters.adaptation_parameters or create_initiator_baseline:
  187. job_parameters.adaptation_parameters = {
  188. "task_nodes": 0,
  189. "task_cores_per_node": 0,
  190. "task_memory_per_node": 0,
  191. # request_task_cores base on initiator and distribute to all parties, using job conf parameters or initiator fateflow server default settings
  192. "request_task_cores": int(job_parameters.task_cores) if job_parameters.task_cores else JobDefaultConfig.task_cores,
  193. "if_initiator_baseline": True
  194. }
  195. else:
  196. # use initiator baseline
  197. if role == "arbiter":
  198. job_parameters.adaptation_parameters["request_task_cores"] = 1
  199. elif "request_task_cores" not in job_parameters.adaptation_parameters:
  200. # compatibility 1.5.0
  201. job_parameters.adaptation_parameters["request_task_cores"] = job_parameters.adaptation_parameters["task_nodes"] * job_parameters.adaptation_parameters["task_cores_per_node"]
  202. job_parameters.adaptation_parameters["if_initiator_baseline"] = False
  203. adaptation_parameters = job_parameters.adaptation_parameters
  204. if job_parameters.computing_engine in {ComputingEngine.STANDALONE, ComputingEngine.EGGROLL}:
  205. adaptation_parameters["task_nodes"] = computing_engine_info.f_nodes
  206. if int(job_parameters.eggroll_run.get("eggroll.session.processors.per.node", 0)) > 0:
  207. adaptation_parameters["task_cores_per_node"] = int(job_parameters.eggroll_run["eggroll.session.processors.per.node"])
  208. else:
  209. adaptation_parameters["task_cores_per_node"] = max(1, int(adaptation_parameters["request_task_cores"] / adaptation_parameters["task_nodes"]))
  210. if not create_initiator_baseline:
  211. # set the adaptation parameters to the actual engine operation parameters
  212. job_parameters.eggroll_run["eggroll.session.processors.per.node"] = adaptation_parameters["task_cores_per_node"]
  213. elif job_parameters.computing_engine == ComputingEngine.SPARK or job_parameters.computing_engine == ComputingEngine.LINKIS_SPARK:
  214. adaptation_parameters["task_nodes"] = int(job_parameters.spark_run.get("num-executors", computing_engine_info.f_nodes))
  215. if int(job_parameters.spark_run.get("executor-cores", 0)) > 0:
  216. adaptation_parameters["task_cores_per_node"] = int(job_parameters.spark_run["executor-cores"])
  217. else:
  218. adaptation_parameters["task_cores_per_node"] = max(1, int(adaptation_parameters["request_task_cores"] / adaptation_parameters["task_nodes"]))
  219. if not create_initiator_baseline:
  220. # set the adaptation parameters to the actual engine operation parameters
  221. job_parameters.spark_run["num-executors"] = adaptation_parameters["task_nodes"]
  222. job_parameters.spark_run["executor-cores"] = adaptation_parameters["task_cores_per_node"]
  223. @classmethod
  224. def calculate_job_resource(cls, job_parameters: RunParameters = None, job_id=None, role=None, party_id=None):
  225. if not job_parameters:
  226. job_parameters = job_utils.get_job_parameters(job_id=job_id,
  227. role=role,
  228. party_id=party_id)
  229. job_parameters = RunParameters(**job_parameters)
  230. cores = 0
  231. memory = 0
  232. if not (job_parameters.computing_engine in IGNORE_RESOURCE_COMPUTING_ENGINE or
  233. role in IGNORE_RESOURCE_ROLES and job_parameters.computing_engine in SUPPORT_IGNORE_RESOURCE_ENGINES):
  234. cores = (int(job_parameters.adaptation_parameters["task_cores_per_node"] or 0) *
  235. int(job_parameters.adaptation_parameters["task_nodes"] or 0) *
  236. int(job_parameters.task_parallelism or 0))
  237. memory = (int(job_parameters.adaptation_parameters["task_memory_per_node"] or 0) *
  238. int(job_parameters.adaptation_parameters["task_nodes"] or 0) *
  239. int(job_parameters.task_parallelism or 0))
  240. return job_parameters.computing_engine, cores, memory
  241. @classmethod
  242. def calculate_task_resource(cls, task_parameters: RunParameters = None, task_info: dict = None):
  243. if not task_parameters:
  244. job_parameters = job_utils.get_job_parameters(job_id=task_info["job_id"],
  245. role=task_info["role"],
  246. party_id=task_info["party_id"])
  247. task_parameters = RunParameters(**job_parameters)
  248. if task_parameters.computing_engine in IGNORE_RESOURCE_COMPUTING_ENGINE:
  249. cores_per_task = 0
  250. memory_per_task = 0
  251. elif task_info["role"] in IGNORE_RESOURCE_ROLES and task_parameters.computing_engine in SUPPORT_IGNORE_RESOURCE_ENGINES:
  252. cores_per_task = 0
  253. memory_per_task = 0
  254. else:
  255. cores_per_task = task_parameters.adaptation_parameters["task_cores_per_node"] * \
  256. task_parameters.adaptation_parameters["task_nodes"]
  257. memory_per_task = task_parameters.adaptation_parameters["task_memory_per_node"] * \
  258. task_parameters.adaptation_parameters["task_nodes"]
  259. return cores_per_task, memory_per_task
  260. @classmethod
  261. def apply_for_task_resource(cls, task_info):
  262. return ResourceManager.resource_for_task(task_info=task_info, operation_type=ResourceOperation.APPLY)
  263. @classmethod
  264. def return_task_resource(cls, task_info):
  265. return ResourceManager.resource_for_task(task_info=task_info, operation_type=ResourceOperation.RETURN)
  266. @classmethod
  267. def resource_for_task(cls, task_info, operation_type):
  268. cores_per_task, memory_per_task = cls.calculate_task_resource(task_info=task_info)
  269. schedule_logger(task_info["job_id"]).info(f"cores_per_task:{cores_per_task}, memory_per_task:{memory_per_task}")
  270. if cores_per_task or memory_per_task:
  271. filters, updates = cls.update_resource_sql(resource_model=Job,
  272. cores=cores_per_task,
  273. memory=memory_per_task,
  274. operation_type=operation_type,
  275. )
  276. filters.append(Job.f_job_id == task_info["job_id"])
  277. filters.append(Job.f_role == task_info["role"])
  278. filters.append(Job.f_party_id == task_info["party_id"])
  279. filters.append(Job.f_resource_in_use == True)
  280. operate = Job.update(updates).where(*filters)
  281. operate_status = operate.execute() > 0
  282. else:
  283. operate_status = True
  284. if operate_status:
  285. schedule_logger(task_info["job_id"]).info(
  286. "task {} {} {} resource successfully".format(task_info["task_id"],
  287. task_info["task_version"], operation_type))
  288. else:
  289. schedule_logger(task_info["job_id"]).warning(
  290. "task {} {} {} resource failed".format(task_info["task_id"],
  291. task_info["task_version"], operation_type))
  292. return operate_status
  293. @classmethod
  294. def update_resource_sql(cls, resource_model: typing.Union[EngineRegistry, Job], cores, memory, operation_type: ResourceOperation):
  295. if operation_type is ResourceOperation.APPLY:
  296. filters = [
  297. resource_model.f_remaining_cores >= cores,
  298. resource_model.f_remaining_memory >= memory
  299. ]
  300. updates = {resource_model.f_remaining_cores: resource_model.f_remaining_cores - cores,
  301. resource_model.f_remaining_memory: resource_model.f_remaining_memory - memory}
  302. elif operation_type is ResourceOperation.RETURN:
  303. filters = []
  304. updates = {resource_model.f_remaining_cores: resource_model.f_remaining_cores + cores,
  305. resource_model.f_remaining_memory: resource_model.f_remaining_memory + memory}
  306. else:
  307. raise RuntimeError(f"can not support {operation_type} resource operation type")
  308. return filters, updates
  309. @classmethod
  310. @DB.connection_context()
  311. def get_remaining_resource(cls, resource_model: typing.Union[EngineRegistry, Job], filters):
  312. remaining_cores, remaining_memory = None, None
  313. try:
  314. objs = resource_model.select(resource_model.f_remaining_cores, resource_model.f_remaining_memory).where(
  315. *filters)
  316. if objs:
  317. remaining_cores, remaining_memory = objs[0].f_remaining_cores, objs[0].f_remaining_memory
  318. except Exception as e:
  319. schedule_logger().exception(e)
  320. finally:
  321. return remaining_cores, remaining_memory
  322. @classmethod
  323. @DB.connection_context()
  324. def get_engine_registration_info(cls, engine_type, engine_name) -> EngineRegistry:
  325. engines = EngineRegistry.select().where(EngineRegistry.f_engine_type == engine_type,
  326. EngineRegistry.f_engine_name == engine_name)
  327. if engines:
  328. return engines[0]