process_utils.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import errno
  17. import os
  18. import subprocess
  19. import psutil
  20. from fate_flow.utils.log_utils import schedule_logger
  21. from fate_flow.db.db_models import Task
  22. from fate_flow.entity.types import KillProcessRetCode, ProcessRole
  23. from fate_flow.settings import SUBPROCESS_STD_LOG_NAME
  24. from fate_flow.settings import stat_logger
  25. def run_subprocess(job_id, config_dir, process_cmd, added_env: dict = None, log_dir=None, cwd_dir=None, process_name="", process_id=""):
  26. logger = schedule_logger(job_id) if job_id else stat_logger
  27. process_cmd = [str(cmd) for cmd in process_cmd]
  28. logger.info("start process command: \n{}".format(" ".join(process_cmd)))
  29. os.makedirs(config_dir, exist_ok=True)
  30. if not log_dir:
  31. log_dir = config_dir
  32. if log_dir:
  33. os.makedirs(log_dir, exist_ok=True)
  34. std_path = get_std_path(log_dir=log_dir, process_name=process_name, process_id=process_id)
  35. std = open(std_path, 'w')
  36. pid_path = os.path.join(config_dir, f"{process_name}_pid")
  37. if os.name == 'nt':
  38. startupinfo = subprocess.STARTUPINFO()
  39. startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
  40. startupinfo.wShowWindow = subprocess.SW_HIDE
  41. else:
  42. startupinfo = None
  43. subprocess_env = os.environ.copy()
  44. subprocess_env["PROCESS_ROLE"] = ProcessRole.WORKER.value
  45. if added_env:
  46. for name, value in added_env.items():
  47. if name.endswith("PATH") and subprocess_env.get(name) is not None:
  48. value += ':' + subprocess_env[name]
  49. subprocess_env[name] = value
  50. subprocess_env.pop("CLASSPATH", None)
  51. p = subprocess.Popen(process_cmd,
  52. stdout=std,
  53. stderr=std,
  54. startupinfo=startupinfo,
  55. cwd=cwd_dir,
  56. env=subprocess_env
  57. )
  58. with open(pid_path, 'w') as f:
  59. f.truncate()
  60. f.write(str(p.pid) + "\n")
  61. f.flush()
  62. logger.info(f"start process successfully, pid: {p.pid}, std log path: {std_path}")
  63. return p
  64. def check_process(pid, task: Task = None, expected_cmdline: list = None):
  65. if pid < 0:
  66. return False
  67. if pid == 0:
  68. raise ValueError('invalid PID 0')
  69. try:
  70. os.kill(pid, 0)
  71. except OSError as err:
  72. if err.errno == errno.ESRCH:
  73. # ESRCH == No such process
  74. ret = False
  75. elif err.errno == errno.EPERM:
  76. # EPERM clearly means there's a process to deny access to
  77. ret = True
  78. else:
  79. # According to "man 2 kill" possible error values are
  80. # (EINVAL, EPERM, ESRCH)
  81. raise
  82. else:
  83. ret = True
  84. if ret and task is not None:
  85. p = get_process_instance(pid)
  86. return is_task_executor_process(task=task, process=p)
  87. elif ret and expected_cmdline is not None:
  88. p = get_process_instance(pid)
  89. return check_process_by_cmdline(actual=p.cmdline(), expected=expected_cmdline)
  90. else:
  91. return ret
  92. def check_process_by_keyword(keywords):
  93. if not keywords:
  94. return True
  95. keyword_filter_cmd = ' |'.join(['grep %s' % keyword for keyword in keywords])
  96. ret = os.system('ps aux | {} | grep -v grep | grep -v "ps aux "'.format(keyword_filter_cmd))
  97. return ret == 0
  98. def check_process_by_cmdline(actual: list, expected: list):
  99. if len(actual) != len(expected):
  100. return False
  101. for i, v in enumerate(actual):
  102. if str(v) != str(expected[i]):
  103. return False
  104. else:
  105. return True
  106. def get_std_path(log_dir, process_name="", process_id=""):
  107. std_log_path = f"{process_name}_{process_id}_{SUBPROCESS_STD_LOG_NAME}" if process_name else SUBPROCESS_STD_LOG_NAME
  108. return os.path.join(log_dir, std_log_path)
  109. def get_subprocess_std(log_dir, process_name="", process_id=""):
  110. with open(get_std_path(log_dir, process_name, process_id), "r") as fr:
  111. text = fr.read()
  112. return text
  113. def wait_child_process(signum, frame):
  114. try:
  115. while True:
  116. child_pid, status = os.waitpid(-1, os.WNOHANG)
  117. if child_pid == 0:
  118. stat_logger.info('no child process was immediately available')
  119. break
  120. exitcode = status >> 8
  121. stat_logger.info(f'child process {child_pid} exit with exitcode {exitcode}')
  122. except OSError as e:
  123. if e.errno == errno.ECHILD:
  124. stat_logger.info('current process has no existing unwaited-for child processes.')
  125. else:
  126. raise
  127. def is_task_executor_process(task: Task, process: psutil.Process):
  128. """
  129. check the process if task executor or not by command
  130. :param task:
  131. :param process:
  132. :return:
  133. """
  134. try:
  135. cmdline = process.cmdline()
  136. except Exception as e:
  137. # Not sure whether the process is a task executor process, operations processing is required
  138. schedule_logger(task.f_job_id).warning(e)
  139. return False
  140. else:
  141. schedule_logger(task.f_job_id).info(cmdline)
  142. if task.f_worker_id and task.f_worker_id in cmdline:
  143. return True
  144. if len(cmdline) != len(task.f_cmd):
  145. return False
  146. for i, v in enumerate(task.f_cmd):
  147. if cmdline[i] != str(v):
  148. return False
  149. return True
  150. def kill_task_executor_process(task: Task, only_child=False):
  151. try:
  152. if not task.f_run_pid:
  153. schedule_logger(task.f_job_id).info("task {} {} {} with {} party status no process pid".format(
  154. task.f_task_id, task.f_role, task.f_party_id, task.f_party_status))
  155. return KillProcessRetCode.NOT_FOUND
  156. pid = int(task.f_run_pid)
  157. schedule_logger(task.f_job_id).info("try to stop task {} {} {} with {} party status process pid:{}".format(
  158. task.f_task_id, task.f_role, task.f_party_id, task.f_party_status, pid))
  159. if not check_process(pid):
  160. schedule_logger(task.f_job_id).info("can not found task {} {} {} with {} party status process pid:{}".format(
  161. task.f_task_id, task.f_role, task.f_party_id, task.f_party_status, pid))
  162. return KillProcessRetCode.NOT_FOUND
  163. p = get_process_instance(pid)
  164. if p is None:
  165. return KillProcessRetCode.NOT_FOUND
  166. if not is_task_executor_process(task=task, process=p):
  167. schedule_logger(task.f_job_id).warning("this pid {} is not task {} {} {} executor".format(
  168. pid, task.f_task_id, task.f_role, task.f_party_id))
  169. return KillProcessRetCode.ERROR_PID
  170. for child in p.children(recursive=True):
  171. if check_process(pid=child.pid, task=task):
  172. child.kill()
  173. if not only_child:
  174. if check_process(pid, task=task):
  175. p.kill()
  176. schedule_logger(task.f_job_id).info("successfully stop task {} {} {} process pid:{}".format(
  177. task.f_task_id, task.f_role, task.f_party_id, pid))
  178. return KillProcessRetCode.KILLED
  179. except Exception as e:
  180. raise e
  181. def kill_process(process: psutil.Process = None, pid: int = None, expected_cmdline: list = None):
  182. process = process if process is not None else get_process_instance(pid)
  183. if process is None:
  184. return
  185. for child in process.children(recursive=True):
  186. try:
  187. if check_process(pid=child.pid):
  188. child.kill()
  189. except Exception as e:
  190. stat_logger.warning(f"kill {child.pid} process failed", exc_info=True)
  191. if check_process(pid=process.pid, expected_cmdline=expected_cmdline):
  192. process.kill()
  193. def get_process_instance(pid: int):
  194. try:
  195. return psutil.Process(int(pid))
  196. except psutil.NoSuchProcess:
  197. stat_logger.warning(f"no such process {pid}")
  198. return
  199. except Exception as e:
  200. raise e