detector.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import time
  17. from typing import List
  18. from fate_arch.common.base_utils import current_timestamp
  19. from fate_arch.session import Session
  20. from fate_flow.controller.engine_adapt import build_engine
  21. from fate_flow.controller.job_controller import JobController
  22. from fate_flow.controller.task_controller import TaskController
  23. from fate_flow.db.db_models import DB, DependenciesStorageMeta, Job
  24. from fate_flow.db.job_default_config import JobDefaultConfig
  25. from fate_flow.db.runtime_config import RuntimeConfig
  26. from fate_flow.entity.run_status import (
  27. EndStatus, FederatedSchedulingStatusCode,
  28. JobStatus, TaskStatus,
  29. )
  30. from fate_flow.manager.dependence_manager import DependenceManager
  31. from fate_flow.manager.resource_manager import ResourceManager
  32. from fate_flow.operation.job_saver import JobSaver
  33. from fate_flow.scheduler.federated_scheduler import FederatedScheduler
  34. from fate_flow.settings import SESSION_VALID_PERIOD
  35. from fate_flow.utils.api_utils import is_localhost
  36. from fate_flow.utils.cron import Cron
  37. from fate_flow.utils.job_utils import check_job_is_timeout, generate_retry_interval
  38. from fate_flow.utils.process_utils import check_process
  39. from fate_flow.utils.log_utils import detect_logger
  40. class Detector(Cron):
  41. def run_do(self):
  42. self.detect_running_task()
  43. self.detect_end_task()
  44. self.detect_running_job()
  45. self.detect_resource_record()
  46. self.detect_expired_session()
  47. self.detect_dependence_upload_record()
  48. @classmethod
  49. def detect_running_task(cls):
  50. detect_logger().info('start to detect running task..')
  51. count = 0
  52. try:
  53. running_tasks = JobSaver.query_task(party_status=TaskStatus.RUNNING, run_on_this_party=True)
  54. stop_job_ids = set()
  55. for task in running_tasks:
  56. if task.f_run_ip != RuntimeConfig.JOB_SERVER_HOST:
  57. cls.detect_cluster_instance_status(task, stop_job_ids)
  58. continue
  59. if not task.f_engine_conf or task.f_run_ip != RuntimeConfig.JOB_SERVER_HOST:
  60. continue
  61. count += 1
  62. try:
  63. process_exist = build_engine(task.f_engine_conf.get("computing_engine")).is_alive(task)
  64. if not process_exist:
  65. msg = f"task {task.f_task_id} {task.f_task_version} on {task.f_role} {task.f_party_id}"
  66. detect_logger(job_id=task.f_job_id).info(f"{msg} with {task.f_party_status} process {task.f_run_pid} does not exist")
  67. time.sleep(3)
  68. _tasks = JobSaver.query_task(task_id=task.f_task_id, task_version=task.f_task_version, role=task.f_role, party_id=task.f_party_id)
  69. if _tasks:
  70. if _tasks[0].f_party_status == TaskStatus.RUNNING:
  71. stop_job_ids.add(task.f_job_id)
  72. detect_logger(job_id=task.f_job_id).info(f"{msg} party status has been checked twice, try to stop job")
  73. else:
  74. detect_logger(job_id=task.f_job_id).info(f"{msg} party status has changed to {_tasks[0].f_party_status}, may be stopped by task_controller.stop_task, pass stop job again")
  75. else:
  76. detect_logger(job_id=task.f_job_id).warning(f"{msg} can not found on db")
  77. except Exception as e:
  78. detect_logger(job_id=task.f_job_id).exception(e)
  79. if stop_job_ids:
  80. detect_logger().info('start to stop jobs: {}'.format(stop_job_ids))
  81. stop_jobs = set()
  82. for job_id in stop_job_ids:
  83. jobs = JobSaver.query_job(job_id=job_id)
  84. if jobs:
  85. stop_jobs.add(jobs[0])
  86. cls.request_stop_jobs(jobs=stop_jobs, stop_msg="task executor process abort", stop_status=JobStatus.FAILED)
  87. except Exception as e:
  88. detect_logger().exception(e)
  89. finally:
  90. detect_logger().info(f"finish detect {count} running task")
  91. @classmethod
  92. def detect_end_task(cls):
  93. detect_logger().info('start to detect end status task..')
  94. count = 0
  95. try:
  96. tasks = JobSaver.query_task(
  97. run_ip=RuntimeConfig.JOB_SERVER_HOST,
  98. run_port=RuntimeConfig.HTTP_PORT,
  99. status=set(EndStatus.status_list()),
  100. kill_status=False
  101. )
  102. for task in tasks:
  103. try:
  104. if task.f_end_time and task.f_end_time - current_timestamp() < 5 * 60 * 1000:
  105. continue
  106. detect_logger().info(f'start to stop task {task.f_role} {task.f_party_id} {task.f_task_id}'
  107. f' {task.f_task_version}')
  108. kill_task_status = TaskController.stop_task(task=task, stop_status=TaskStatus.FAILED)
  109. detect_logger().info( f'kill task status: {kill_task_status}')
  110. count += 1
  111. except Exception as e:
  112. detect_logger().exception(e)
  113. except Exception as e:
  114. detect_logger().exception(e)
  115. finally:
  116. detect_logger().info(f"finish detect {count} end task")
  117. @classmethod
  118. def detect_running_job(cls):
  119. detect_logger().info('start detect running job')
  120. try:
  121. running_jobs = JobSaver.query_job(status=JobStatus.RUNNING, is_initiator=True)
  122. stop_jobs = set()
  123. for job in running_jobs:
  124. try:
  125. if check_job_is_timeout(job):
  126. stop_jobs.add(job)
  127. except Exception as e:
  128. detect_logger(job_id=job.f_job_id).exception(e)
  129. cls.request_stop_jobs(jobs=stop_jobs, stop_msg="running timeout", stop_status=JobStatus.TIMEOUT)
  130. except Exception as e:
  131. detect_logger().exception(e)
  132. finally:
  133. detect_logger().info('finish detect running job')
  134. @classmethod
  135. @DB.connection_context()
  136. def detect_resource_record(cls):
  137. detect_logger().info('start detect resource recycle')
  138. try:
  139. filter_status = EndStatus.status_list()
  140. filter_status.append(JobStatus.WAITING)
  141. jobs = Job.select().where(Job.f_resource_in_use == True, current_timestamp() - Job.f_apply_resource_time > 10 * 60 * 1000, Job.f_status << filter_status)
  142. stop_jobs = set()
  143. for job in jobs:
  144. if job.f_status == JobStatus.WAITING:
  145. stop_jobs.add(job)
  146. else:
  147. try:
  148. detect_logger(job_id=job.f_job_id).info(f"start to return job {job.f_job_id} on {job.f_role} {job.f_party_id} resource")
  149. flag = ResourceManager.return_job_resource(job_id=job.f_job_id, role=job.f_role, party_id=job.f_party_id)
  150. if flag:
  151. detect_logger(job_id=job.f_job_id).info(f"return job {job.f_job_id} on {job.f_role} {job.f_party_id} resource successfully")
  152. else:
  153. detect_logger(job_id=job.f_job_id).info(f"return job {job.f_job_id} on {job.f_role} {job.f_party_id} resource failed")
  154. except Exception as e:
  155. detect_logger(job_id=job.f_job_id).exception(e)
  156. cls.request_stop_jobs(jobs=stop_jobs, stop_msg="start timeout", stop_status=JobStatus.TIMEOUT)
  157. except Exception as e:
  158. detect_logger().exception(e)
  159. finally:
  160. detect_logger().info('finish detect resource recycle')
  161. @classmethod
  162. @DB.connection_context()
  163. def detect_dependence_upload_record(cls):
  164. detect_logger().info('start detect dependence upload process')
  165. try:
  166. upload_process_list = DependenciesStorageMeta.select().where(DependenciesStorageMeta.f_upload_status==True)
  167. for dependence in upload_process_list:
  168. if int(dependence.f_pid):
  169. is_alive = check_process(pid=int(dependence.f_pid))
  170. if not is_alive:
  171. try:
  172. DependenceManager.kill_upload_process(version=dependence.f_version,
  173. storage_engine=dependence.f_storage_engine,
  174. dependence_type=dependence.f_type)
  175. except Exception as e:
  176. detect_logger().exception(e)
  177. except Exception as e:
  178. detect_logger().exception(e)
  179. finally:
  180. detect_logger().info('finish detect dependence upload process')
  181. @classmethod
  182. def detect_expired_session(cls):
  183. ttl = SESSION_VALID_PERIOD
  184. detect_logger().info(f'start detect expired session by ttl {ttl/1000} s')
  185. try:
  186. session_records = Session.query_sessions(create_time=[None, current_timestamp() - ttl])
  187. manager_session_id_list = []
  188. for session_record in session_records:
  189. manager_session_id = session_record.f_manager_session_id
  190. if manager_session_id in manager_session_id_list:
  191. continue
  192. manager_session_id_list.append(manager_session_id)
  193. detect_logger().info(f'start destroy session {manager_session_id}')
  194. try:
  195. sess = Session(session_id=manager_session_id, options={"logger": detect_logger()})
  196. sess.destroy_all_sessions()
  197. except Exception as e:
  198. detect_logger().error(f'stop session {manager_session_id} error', e)
  199. finally:
  200. detect_logger().info(f'stop session {manager_session_id} successfully')
  201. except Exception as e:
  202. detect_logger().error('detect expired session error', e)
  203. finally:
  204. detect_logger().info('finish detect expired session')
  205. @classmethod
  206. def request_stop_jobs(cls, jobs: List[Job], stop_msg, stop_status):
  207. if not len(jobs):
  208. return
  209. detect_logger().info(f"have {len(jobs)} should be stopped, because of {stop_msg}")
  210. for job in jobs:
  211. try:
  212. detect_logger(job_id=job.f_job_id).info(f"detector request start to stop job {job.f_job_id}, because of {stop_msg}")
  213. status = FederatedScheduler.request_stop_job(job=job, stop_status=stop_status)
  214. detect_logger(job_id=job.f_job_id).info(f"detector request stop job {job.f_job_id} {status}")
  215. except Exception as e:
  216. detect_logger(job_id=job.f_job_id).exception(e)
  217. @classmethod
  218. def detect_cluster_instance_status(cls, task, stop_job_ids):
  219. detect_logger(job_id=task.f_job_id).info('start detect running task instance status')
  220. try:
  221. latest_tasks = JobSaver.query_task(task_id=task.f_task_id, role=task.f_role, party_id=task.f_party_id)
  222. if len(latest_tasks) != 1:
  223. detect_logger(job_id=task.f_job_id).error(
  224. f'query latest tasks of {task.f_task_id} failed, '
  225. f'have {len(latest_tasks)} tasks'
  226. )
  227. return
  228. if task.f_task_version != latest_tasks[0].f_task_version:
  229. detect_logger(job_id=task.f_job_id).info(
  230. f'{task.f_task_id} {task.f_task_version} is not the latest task, '
  231. 'update task status to failed'
  232. )
  233. JobSaver.update_task_status({
  234. 'task_id': task.f_task_id,
  235. 'role': task.f_role,
  236. 'party_id': task.f_party_id,
  237. 'task_version': task.f_task_version,
  238. 'status': JobStatus.FAILED,
  239. 'party_status': JobStatus.FAILED,
  240. })
  241. return
  242. if not task.f_run_ip or not task.f_run_port or is_localhost(task.f_run_ip):
  243. return
  244. instance_list = RuntimeConfig.SERVICE_DB.get_servers()
  245. instance_list = {instance.http_address for instance_id, instance in instance_list.items()}
  246. if f'{task.f_run_ip}:{task.f_run_port}' not in instance_list:
  247. detect_logger(job_id=task.f_job_id).warning(
  248. 'detect cluster instance status failed, '
  249. f'add task {task.f_task_id} {task.f_task_version} to stop list'
  250. )
  251. stop_job_ids.add(task.f_job_id)
  252. except Exception as e:
  253. detect_logger(job_id=task.f_job_id).exception(e)
  254. class FederatedDetector(Detector):
  255. def run_do(self):
  256. self.detect_running_job_federated()
  257. @classmethod
  258. def detect_running_job_federated(cls):
  259. detect_logger().info('start federated detect running job')
  260. try:
  261. running_jobs = JobSaver.query_job(status=JobStatus.RUNNING)
  262. stop_jobs = set()
  263. for job in running_jobs:
  264. cur_retry = 0
  265. max_retry_cnt = JobDefaultConfig.detect_connect_max_retry_count
  266. long_retry_cnt = JobDefaultConfig.detect_connect_long_retry_count
  267. exception = None
  268. while cur_retry < max_retry_cnt:
  269. detect_logger().info(f"start federated detect running job {job.f_job_id} cur_retry={cur_retry}")
  270. try:
  271. status_code, response = FederatedScheduler.connect(job)
  272. if status_code != FederatedSchedulingStatusCode.SUCCESS:
  273. exception = f"connect code: {status_code}"
  274. else:
  275. exception = None
  276. detect_logger().info(f"federated detect running job {job.f_job_id} success")
  277. break
  278. except Exception as e:
  279. exception = e
  280. detect_logger(job_id=job.f_job_id).debug(e)
  281. finally:
  282. retry_interval = generate_retry_interval(cur_retry, max_retry_cnt, long_retry_cnt)
  283. time.sleep(retry_interval)
  284. cur_retry += 1
  285. if exception is not None:
  286. try:
  287. JobController.stop_jobs(job_id=job.f_job_id, stop_status=JobStatus.FAILED)
  288. except exception as e:
  289. detect_logger().exception(f"stop job failed: {e}")
  290. detect_logger(job.f_job_id).info(f"job {job.f_job_id} connect failed: {exception}")
  291. stop_jobs.add(job)
  292. cls.request_stop_jobs(jobs=stop_jobs, stop_msg="federated error", stop_status=JobStatus.FAILED)
  293. except Exception as e:
  294. detect_logger().exception(e)
  295. finally:
  296. detect_logger().info('finish federated detect running job')