log_sharing_utils.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import os
  2. import subprocess
  3. from fate_flow.utils.base_utils import get_fate_flow_directory
  4. from fate_flow.utils.log_utils import replace_ip
  5. JOB = ["jobSchedule", "jobScheduleError"]
  6. PARTY = ["partyError", "partyWarning", "partyInfo", "partyDebug"]
  7. COMPONENT = ["componentInfo"]
  8. LOGMapping = {
  9. "jobSchedule": "fate_flow_schedule.log",
  10. "jobScheduleError": "fate_flow_schedule_error.log",
  11. "partyError": "ERROR.log",
  12. "partyWarning": "WARNING.log",
  13. "partyInfo": "INFO.log",
  14. "partyDebug": "DEBUG.log",
  15. "componentInfo": "INFO.log"
  16. }
  17. def parameters_check(log_type, job_id, role, party_id, component_name):
  18. if log_type in JOB:
  19. if not job_id:
  20. return False
  21. if log_type in PARTY:
  22. if not job_id or not role or not party_id:
  23. return False
  24. if log_type in COMPONENT:
  25. if not job_id or not role or not party_id or not component_name:
  26. return False
  27. return True
  28. class LogCollector():
  29. def __init__(self, log_type, job_id, party_id="", role="", component_name="", **kwargs):
  30. self.log_type = log_type
  31. self.job_id = job_id
  32. self.party_id = str(party_id)
  33. self.role = role
  34. self.component_name = component_name
  35. def get_log_file_path(self):
  36. status = parameters_check(self.log_type, self.job_id, self.role, self.party_id, self.component_name)
  37. if not status:
  38. raise Exception(f"job type {self.log_type} Missing parameters")
  39. type_dict = {
  40. "jobSchedule": os.path.join(self.job_id, "fate_flow_schedule.log"),
  41. "jobScheduleError": os.path.join(self.job_id, "fate_flow_schedule_error.log"),
  42. "partyError": os.path.join(self.job_id, self.role, self.party_id, "ERROR.log"),
  43. "partyWarning": os.path.join(self.job_id, self.role, self.party_id, "WARNING.log"),
  44. "partyInfo": os.path.join(self.job_id,self.role, self.party_id, "INFO.log"),
  45. "partyDebug": os.path.join(self.job_id, self.role, self.party_id, "DEBUG.log"),
  46. "componentInfo": os.path.join(self.job_id, self.role, self.party_id, self.component_name, "INFO.log")
  47. }
  48. if self.log_type not in type_dict.keys():
  49. raise Exception(f"no found log type {self.log_type}")
  50. return os.path.join(get_fate_flow_directory('logs'), type_dict[self.log_type])
  51. def cat_log(self, begin, end):
  52. line_list = []
  53. log_path = self.get_log_file_path()
  54. if begin and end:
  55. cmd = f"cat {log_path} | tail -n +{begin}| head -n {end-begin+1}"
  56. elif begin:
  57. cmd = f"cat {log_path} | tail -n +{begin}"
  58. elif end:
  59. cmd = f"cat {log_path} | head -n {end}"
  60. else:
  61. cmd = f"cat {log_path}"
  62. lines = self.execute(cmd)
  63. if lines:
  64. line_list = []
  65. line_num = begin if begin else 1
  66. for line in lines.split("\n"):
  67. line = replace_ip(line)
  68. line_list.append({"line_num": line_num, "content": line})
  69. line_num += 1
  70. return line_list
  71. def get_size(self):
  72. try:
  73. return int(self.execute(f"cat {self.get_log_file_path()} | wc -l").strip())
  74. except:
  75. return 0
  76. @staticmethod
  77. def execute(cmd):
  78. res = subprocess.run(
  79. cmd, shell=True, universal_newlines=True,
  80. stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
  81. )
  82. return res.stdout