job_utils.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. # Copyright 2019 The FATE Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. import datetime
  16. import errno
  17. import os
  18. import random
  19. import sys
  20. import threading
  21. import typing
  22. from functools import wraps
  23. from fate_arch.common import FederatedMode, file_utils
  24. from fate_arch.common.base_utils import current_timestamp, fate_uuid, json_dumps
  25. from fate_flow.db.db_models import DB, Job, Task
  26. from fate_flow.db.db_utils import query_db
  27. from fate_flow.db.job_default_config import JobDefaultConfig
  28. from fate_flow.db.service_registry import ServerRegistry
  29. from fate_flow.entity import JobConfiguration, RunParameters
  30. from fate_flow.entity.run_status import JobStatus, TaskStatus
  31. from fate_flow.entity.types import InputSearchType
  32. from fate_flow.settings import FATE_BOARD_DASHBOARD_ENDPOINT
  33. from fate_flow.utils import data_utils, detect_utils, process_utils, session_utils
  34. from fate_flow.utils.base_utils import get_fate_flow_directory
  35. from fate_flow.utils.log_utils import schedule_logger
  36. from fate_flow.utils.schedule_utils import get_dsl_parser_by_version
  37. PIPELINE_COMPONENT_NAME = 'pipeline'
  38. PIPELINE_MODEL_ALIAS = 'pipeline'
  39. PIPELINE_COMPONENT_MODULE_NAME = 'Pipeline'
  40. PIPELINE_MODEL_NAME = 'Pipeline'
  41. class JobIdGenerator(object):
  42. _lock = threading.RLock()
  43. def __init__(self, initial_value=0):
  44. self._value = initial_value
  45. self._pre_timestamp = None
  46. self._max = 99999
  47. def next_id(self):
  48. """
  49. generate next job id with locking
  50. """
  51. #todo: there is duplication in the case of multiple instances deployment
  52. now = datetime.datetime.now()
  53. with JobIdGenerator._lock:
  54. if self._pre_timestamp == now:
  55. if self._value < self._max:
  56. self._value += 1
  57. else:
  58. now += datetime.timedelta(microseconds=1)
  59. self._pre_timestamp = now
  60. self._value = 0
  61. else:
  62. self._pre_timestamp = now
  63. self._value = 0
  64. return "{}{}".format(now.strftime("%Y%m%d%H%M%S%f"), self._value)
  65. job_id_generator = JobIdGenerator()
  66. def generate_job_id():
  67. return job_id_generator.next_id()
  68. def generate_task_id(job_id, component_name):
  69. return '{}_{}'.format(job_id, component_name)
  70. def generate_task_version_id(task_id, task_version):
  71. return "{}_{}".format(task_id, task_version)
  72. def generate_session_id(task_id, task_version, role, party_id, suffix=None, random_end=False):
  73. items = [task_id, str(task_version), role, str(party_id)]
  74. if suffix:
  75. items.append(suffix)
  76. if random_end:
  77. items.append(fate_uuid())
  78. return "_".join(items)
  79. def generate_task_input_data_namespace(task_id, task_version, role, party_id):
  80. return "input_data_{}".format(generate_session_id(task_id=task_id,
  81. task_version=task_version,
  82. role=role,
  83. party_id=party_id))
  84. def get_job_directory(job_id, *args):
  85. return os.path.join(get_fate_flow_directory(), 'jobs', job_id, *args)
  86. def get_job_log_directory(job_id, *args):
  87. return os.path.join(get_fate_flow_directory(), 'logs', job_id, *args)
  88. def get_task_directory(job_id, role, party_id, component_name, task_id, task_version, **kwargs):
  89. return get_job_directory(job_id, role, party_id, component_name, task_id, task_version)
  90. def get_general_worker_directory(worker_name, worker_id, *args):
  91. return os.path.join(get_fate_flow_directory(), worker_name, worker_id, *args)
  92. def get_general_worker_log_directory(worker_name, worker_id, *args):
  93. return os.path.join(get_fate_flow_directory(), 'logs', worker_name, worker_id, *args)
  94. def check_config(config: typing.Dict, required_parameters: typing.List):
  95. for parameter in required_parameters:
  96. if parameter not in config:
  97. return False, 'configuration no {} parameter'.format(parameter)
  98. else:
  99. return True, 'ok'
  100. def check_job_conf(runtime_conf, job_dsl):
  101. detect_utils.check_config(runtime_conf, ['initiator', 'role'])
  102. detect_utils.check_config(runtime_conf['initiator'], ['role', 'party_id'])
  103. # deal party id
  104. runtime_conf['initiator']['party_id'] = int(runtime_conf['initiator']['party_id'])
  105. for r in runtime_conf['role'].keys():
  106. for i in range(len(runtime_conf['role'][r])):
  107. runtime_conf['role'][r][i] = int(runtime_conf['role'][r][i])
  108. constraint_check(runtime_conf, job_dsl)
  109. def runtime_conf_basic(if_local=False):
  110. job_runtime_conf = {
  111. "dsl_version": 2,
  112. "initiator": {},
  113. "job_parameters": {
  114. "common": {
  115. "federated_mode": FederatedMode.SINGLE
  116. },
  117. },
  118. "role": {},
  119. "component_parameters": {}
  120. }
  121. if if_local:
  122. job_runtime_conf["initiator"]["role"] = "local"
  123. job_runtime_conf["initiator"]["party_id"] = 0
  124. job_runtime_conf["role"]["local"] = [0]
  125. return job_runtime_conf
  126. def new_runtime_conf(job_dir, method, module, role, party_id):
  127. if role:
  128. conf_path_dir = os.path.join(job_dir, method, module, role, str(party_id))
  129. else:
  130. conf_path_dir = os.path.join(job_dir, method, module, str(party_id))
  131. os.makedirs(conf_path_dir, exist_ok=True)
  132. return os.path.join(conf_path_dir, 'runtime_conf.json')
  133. def save_job_conf(job_id, role, party_id, dsl, runtime_conf, runtime_conf_on_party, train_runtime_conf, pipeline_dsl=None):
  134. path_dict = get_job_conf_path(job_id=job_id, role=role, party_id=party_id)
  135. dump_job_conf(path_dict=path_dict,
  136. dsl=dsl,
  137. runtime_conf=runtime_conf,
  138. runtime_conf_on_party=runtime_conf_on_party,
  139. train_runtime_conf=train_runtime_conf,
  140. pipeline_dsl=pipeline_dsl)
  141. return path_dict
  142. def save_task_using_job_conf(task: Task):
  143. task_dir = get_task_directory(job_id=task.f_job_id,
  144. role=task.f_role,
  145. party_id=task.f_party_id,
  146. component_name=task.f_component_name,
  147. task_id=task.f_task_id,
  148. task_version=str(task.f_task_version))
  149. return save_using_job_conf(task.f_job_id, task.f_role, task.f_party_id, config_dir=task_dir)
  150. def save_using_job_conf(job_id, role, party_id, config_dir):
  151. path_dict = get_job_conf_path(job_id=job_id, role=role, party_id=party_id, specified_dir=config_dir)
  152. job_configuration = get_job_configuration(job_id=job_id,
  153. role=role,
  154. party_id=party_id)
  155. dump_job_conf(path_dict=path_dict,
  156. dsl=job_configuration.dsl,
  157. runtime_conf=job_configuration.runtime_conf,
  158. runtime_conf_on_party=job_configuration.runtime_conf_on_party,
  159. train_runtime_conf=job_configuration.train_runtime_conf,
  160. pipeline_dsl=None)
  161. return path_dict
  162. def dump_job_conf(path_dict, dsl, runtime_conf, runtime_conf_on_party, train_runtime_conf, pipeline_dsl=None):
  163. os.makedirs(os.path.dirname(path_dict.get('dsl_path')), exist_ok=True)
  164. os.makedirs(os.path.dirname(path_dict.get('runtime_conf_on_party_path')), exist_ok=True)
  165. for data, conf_path in [(dsl, path_dict['dsl_path']),
  166. (runtime_conf, path_dict['runtime_conf_path']),
  167. (runtime_conf_on_party, path_dict['runtime_conf_on_party_path']),
  168. (train_runtime_conf, path_dict['train_runtime_conf_path']),
  169. (pipeline_dsl, path_dict['pipeline_dsl_path'])]:
  170. with open(conf_path, 'w+') as f:
  171. f.truncate()
  172. if not data:
  173. data = {}
  174. f.write(json_dumps(data, indent=4))
  175. f.flush()
  176. return path_dict
  177. @DB.connection_context()
  178. def get_job_configuration(job_id, role, party_id) -> JobConfiguration:
  179. jobs = Job.select(Job.f_dsl, Job.f_runtime_conf, Job.f_train_runtime_conf, Job.f_runtime_conf_on_party).where(Job.f_job_id == job_id,
  180. Job.f_role == role,
  181. Job.f_party_id == party_id)
  182. if jobs:
  183. job = jobs[0]
  184. return JobConfiguration(**job.to_human_model_dict())
  185. def get_task_using_job_conf(task_info: dict):
  186. task_dir = get_task_directory(**task_info)
  187. return read_job_conf(task_info["job_id"], task_info["role"], task_info["party_id"], task_dir)
  188. def read_job_conf(job_id, role, party_id, specified_dir=None):
  189. path_dict = get_job_conf_path(job_id=job_id, role=role, party_id=party_id, specified_dir=specified_dir)
  190. conf_dict = {}
  191. for key, path in path_dict.items():
  192. config = file_utils.load_json_conf(path)
  193. conf_dict[key.rstrip("_path")] = config
  194. return JobConfiguration(**conf_dict)
  195. def get_job_conf_path(job_id, role, party_id, specified_dir=None):
  196. conf_dir = get_job_directory(job_id) if not specified_dir else specified_dir
  197. job_dsl_path = os.path.join(conf_dir, 'job_dsl.json')
  198. job_runtime_conf_path = os.path.join(conf_dir, 'job_runtime_conf.json')
  199. if not specified_dir:
  200. job_runtime_conf_on_party_path = os.path.join(conf_dir, role, str(party_id), 'job_runtime_on_party_conf.json')
  201. else:
  202. job_runtime_conf_on_party_path = os.path.join(conf_dir, 'job_runtime_on_party_conf.json')
  203. train_runtime_conf_path = os.path.join(conf_dir, 'train_runtime_conf.json')
  204. pipeline_dsl_path = os.path.join(conf_dir, 'pipeline_dsl.json')
  205. return {'dsl_path': job_dsl_path,
  206. 'runtime_conf_path': job_runtime_conf_path,
  207. 'runtime_conf_on_party_path': job_runtime_conf_on_party_path,
  208. 'train_runtime_conf_path': train_runtime_conf_path,
  209. 'pipeline_dsl_path': pipeline_dsl_path}
  210. @DB.connection_context()
  211. def get_upload_job_configuration_summary(upload_tasks: typing.List[Task]):
  212. jobs_run_conf = {}
  213. for task in upload_tasks:
  214. jobs = Job.select(Job.f_job_id, Job.f_runtime_conf_on_party, Job.f_description).where(Job.f_job_id == task.f_job_id)
  215. job = jobs[0]
  216. jobs_run_conf[job.f_job_id] = job.f_runtime_conf_on_party["component_parameters"]["role"]["local"]["0"]["upload_0"]
  217. jobs_run_conf[job.f_job_id]["notes"] = job.f_description
  218. return jobs_run_conf
  219. @DB.connection_context()
  220. def get_job_parameters(job_id, role, party_id):
  221. jobs = Job.select(Job.f_runtime_conf_on_party).where(Job.f_job_id == job_id,
  222. Job.f_role == role,
  223. Job.f_party_id == party_id)
  224. if jobs:
  225. job = jobs[0]
  226. return job.f_runtime_conf_on_party.get("job_parameters")
  227. else:
  228. return {}
  229. @DB.connection_context()
  230. def get_job_dsl(job_id, role, party_id):
  231. jobs = Job.select(Job.f_dsl).where(Job.f_job_id == job_id,
  232. Job.f_role == role,
  233. Job.f_party_id == party_id)
  234. if jobs:
  235. job = jobs[0]
  236. return job.f_dsl
  237. else:
  238. return {}
  239. @DB.connection_context()
  240. def list_job(limit=0, offset=0, query=None, order_by=None):
  241. return query_db(Job, limit, offset, query, order_by)
  242. @DB.connection_context()
  243. def list_task(limit=0, offset=0, query=None, order_by=None):
  244. return query_db(Task, limit, offset, query, order_by)
  245. def check_job_process(pid):
  246. if pid < 0:
  247. return False
  248. if pid == 0:
  249. raise ValueError('invalid PID 0')
  250. try:
  251. os.kill(pid, 0)
  252. except OSError as err:
  253. if err.errno == errno.ESRCH:
  254. # ESRCH == No such process
  255. return False
  256. elif err.errno == errno.EPERM:
  257. # EPERM clearly means there's a process to deny access to
  258. return True
  259. else:
  260. # According to "man 2 kill" possible error values are
  261. # (EINVAL, EPERM, ESRCH)
  262. raise
  263. else:
  264. return True
  265. def check_job_is_timeout(job: Job):
  266. job_parameters = job.f_runtime_conf_on_party["job_parameters"]
  267. timeout = job_parameters.get("timeout", JobDefaultConfig.job_timeout)
  268. now_time = current_timestamp()
  269. running_time = (now_time - job.f_create_time)/1000
  270. if running_time > timeout:
  271. schedule_logger(job.f_job_id).info(f'run time {running_time}s timeout')
  272. return True
  273. else:
  274. return False
  275. def start_session_stop(task):
  276. job_parameters = RunParameters(**get_job_parameters(job_id=task.f_job_id, role=task.f_role, party_id=task.f_party_id))
  277. session_manager_id = generate_session_id(task.f_task_id, task.f_task_version, task.f_role, task.f_party_id)
  278. if task.f_status != TaskStatus.WAITING:
  279. schedule_logger(task.f_job_id).info(f'start run subprocess to stop task sessions {session_manager_id}')
  280. else:
  281. schedule_logger(task.f_job_id).info(f'task is waiting, pass stop sessions {session_manager_id}')
  282. return
  283. task_dir = os.path.join(get_job_directory(job_id=task.f_job_id), task.f_role,
  284. task.f_party_id, task.f_component_name, 'session_stop')
  285. os.makedirs(task_dir, exist_ok=True)
  286. process_cmd = [
  287. sys.executable or 'python3',
  288. sys.modules[session_utils.SessionStop.__module__].__file__,
  289. '--session', session_manager_id,
  290. '--computing', job_parameters.computing_engine,
  291. '--federation', job_parameters.federation_engine,
  292. '--storage', job_parameters.storage_engine,
  293. '-c', 'stop' if task.f_status == JobStatus.SUCCESS else 'kill'
  294. ]
  295. p = process_utils.run_subprocess(job_id=task.f_job_id, config_dir=task_dir, process_cmd=process_cmd)
  296. p.wait()
  297. p.poll()
  298. def get_timeout(job_id, timeout, runtime_conf, dsl):
  299. try:
  300. if timeout > 0:
  301. schedule_logger(job_id).info(f'setting job timeout {timeout}')
  302. return timeout
  303. else:
  304. default_timeout = job_default_timeout(runtime_conf, dsl)
  305. schedule_logger(job_id).info(f'setting job timeout {timeout} not a positive number, using the default timeout {default_timeout}')
  306. return default_timeout
  307. except:
  308. default_timeout = job_default_timeout(runtime_conf, dsl)
  309. schedule_logger(job_id).info(f'setting job timeout {timeout} is incorrect, using the default timeout {default_timeout}')
  310. return default_timeout
  311. def job_default_timeout(runtime_conf, dsl):
  312. # future versions will improve
  313. timeout = JobDefaultConfig.job_timeout
  314. return timeout
  315. def get_board_url(job_id, role, party_id):
  316. board_url = "http://{}:{}{}".format(
  317. ServerRegistry.FATEBOARD.get("host"),
  318. ServerRegistry.FATEBOARD.get("port"),
  319. FATE_BOARD_DASHBOARD_ENDPOINT).format(job_id, role, party_id)
  320. return board_url
  321. def check_job_inheritance_parameters(job, inheritance_jobs, inheritance_tasks):
  322. if not inheritance_jobs:
  323. raise Exception(
  324. f"no found job {job.f_inheritance_info.get('job_id')} role {job.f_role} party id {job.f_party_id}")
  325. inheritance_job = inheritance_jobs[0]
  326. task_status = {}
  327. for task in inheritance_tasks:
  328. task_status[task.f_component_name] = task.f_status
  329. for component in job.f_inheritance_info.get('component_list'):
  330. if component not in task_status.keys():
  331. raise Exception(f"job {job.f_inheritance_info.get('job_id')} no found component {component}")
  332. elif task_status[component] not in [TaskStatus.SUCCESS, TaskStatus.PASS]:
  333. raise Exception(F"job {job.f_inheritance_info.get('job_id')} component {component} status:{task_status[component]}")
  334. dsl_parser = get_dsl_parser_by_version()
  335. dsl_parser.verify_conf_reusability(inheritance_job.f_runtime_conf, job.f_runtime_conf, job.f_inheritance_info.get('component_list'))
  336. dsl_parser.verify_dsl_reusability(inheritance_job.f_dsl, job.f_dsl, job.f_inheritance_info.get('component_list', []))
  337. def get_job_all_components(dsl):
  338. return [dsl['components'][component_name]['module'].lower() for component_name in dsl['components'].keys()]
  339. def constraint_check(job_runtime_conf, job_dsl):
  340. if job_dsl:
  341. all_components = get_job_all_components(job_dsl)
  342. glm = ['heterolr', 'heterolinr', 'heteropoisson']
  343. for cpn in glm:
  344. if cpn in all_components:
  345. roles = job_runtime_conf.get('role')
  346. if 'guest' in roles.keys() and 'arbiter' in roles.keys() and 'host' in roles.keys():
  347. for party_id in set(roles['guest']) & set(roles['arbiter']):
  348. if party_id not in roles['host'] or len(set(roles['guest']) & set(roles['arbiter'])) != len(roles['host']):
  349. raise Exception("{} component constraint party id, please check role config:{}".format(cpn, job_runtime_conf.get('role')))
  350. def get_job_dataset(is_initiator, role, party_id, roles, job_args):
  351. dataset = {}
  352. dsl_version = 1
  353. if job_args.get('dsl_version'):
  354. if job_args.get('dsl_version') == 2:
  355. dsl_version = 2
  356. for _role, _role_party_args in job_args.items():
  357. if _role == "dsl_version":
  358. continue
  359. if is_initiator or _role == role:
  360. for _party_index in range(len(_role_party_args)):
  361. _party_id = roles[_role][_party_index]
  362. if is_initiator or _party_id == party_id:
  363. dataset[_role] = dataset.get(_role, {})
  364. dataset[_role][_party_id] = dataset[_role].get(
  365. _party_id, {})
  366. if dsl_version == 1:
  367. for _data_type, _data_location in _role_party_args[_party_index]['args']['data'].items():
  368. dataset[_role][_party_id][_data_type] = '{}.{}'.format(
  369. _data_location['namespace'], _data_location['name'])
  370. else:
  371. for key in _role_party_args[_party_index].keys():
  372. for _data_type, _data_location in _role_party_args[_party_index][key].items():
  373. search_type = data_utils.get_input_search_type(parameters=_data_location)
  374. if search_type is InputSearchType.TABLE_INFO:
  375. dataset[_role][_party_id][key] = '{}.{}'.format(_data_location['namespace'], _data_location['name'])
  376. elif search_type is InputSearchType.JOB_COMPONENT_OUTPUT:
  377. dataset[_role][_party_id][key] = '{}.{}.{}'.format(_data_location['job_id'], _data_location['component_name'], _data_location['data_name'])
  378. else:
  379. dataset[_role][_party_id][key] = "unknown"
  380. return dataset
  381. def asynchronous_function(func):
  382. @wraps(func)
  383. def _wrapper(*args, **kwargs):
  384. is_asynchronous = kwargs.pop("is_asynchronous", False)
  385. if is_asynchronous:
  386. thread = threading.Thread(target=func, args=args, kwargs=kwargs)
  387. thread.start()
  388. is_asynchronous = True
  389. return is_asynchronous
  390. else:
  391. return func(*args, **kwargs)
  392. return _wrapper
  393. def task_report(tasks):
  394. now_time = current_timestamp()
  395. report_list = [{"component_name": task.f_component_name, "start_time": task.f_start_time,
  396. "end_time": task.f_end_time, "elapsed": task.f_elapsed, "status": task.f_status}
  397. for task in tasks]
  398. report_list.sort(key=lambda x: (x["start_time"] if x["start_time"] else now_time, x["status"]))
  399. return report_list
  400. def get_component_parameters(job_providers, dsl_parser, provider_detail, role, party_id):
  401. component_parameters = dict()
  402. for component in job_providers.keys():
  403. provider_info = job_providers[component]["provider"]
  404. provider_name = provider_info["name"]
  405. provider_version = provider_info["version"]
  406. parameter = dsl_parser.parse_component_parameters(component,
  407. provider_detail,
  408. provider_name,
  409. provider_version,
  410. local_role=role,
  411. local_party_id=party_id)
  412. module_name = dsl_parser.get_component_info(component_name=component).get_module().lower()
  413. if module_name not in component_parameters.keys():
  414. component_parameters[module_name] = [parameter.get("ComponentParam", {})]
  415. else:
  416. component_parameters[module_name].append(parameter.get("ComponentParam", {}))
  417. return component_parameters
  418. def generate_retry_interval(cur_retry, max_retry_cnt, long_retry_cnt):
  419. if cur_retry < max_retry_cnt - long_retry_cnt:
  420. retry_interval = random.random() * 10 + 5
  421. else:
  422. retry_interval = round(300 + random.random() * 10, 3)
  423. return retry_interval