1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- import os
- import subprocess
- from fate_flow.utils.base_utils import get_fate_flow_directory
- from fate_flow.utils.log_utils import replace_ip
- JOB = ["jobSchedule", "jobScheduleError"]
- PARTY = ["partyError", "partyWarning", "partyInfo", "partyDebug"]
- COMPONENT = ["componentInfo"]
- LOGMapping = {
- "jobSchedule": "fate_flow_schedule.log",
- "jobScheduleError": "fate_flow_schedule_error.log",
- "partyError": "ERROR.log",
- "partyWarning": "WARNING.log",
- "partyInfo": "INFO.log",
- "partyDebug": "DEBUG.log",
- "componentInfo": "INFO.log"
- }
- def parameters_check(log_type, job_id, role, party_id, component_name):
- if log_type in JOB:
- if not job_id:
- return False
- if log_type in PARTY:
- if not job_id or not role or not party_id:
- return False
- if log_type in COMPONENT:
- if not job_id or not role or not party_id or not component_name:
- return False
- return True
- class LogCollector():
- def __init__(self, log_type, job_id, party_id="", role="", component_name="", **kwargs):
- self.log_type = log_type
- self.job_id = job_id
- self.party_id = str(party_id)
- self.role = role
- self.component_name = component_name
- def get_log_file_path(self):
- status = parameters_check(self.log_type, self.job_id, self.role, self.party_id, self.component_name)
- if not status:
- raise Exception(f"job type {self.log_type} Missing parameters")
- type_dict = {
- "jobSchedule": os.path.join(self.job_id, "fate_flow_schedule.log"),
- "jobScheduleError": os.path.join(self.job_id, "fate_flow_schedule_error.log"),
- "partyError": os.path.join(self.job_id, self.role, self.party_id, "ERROR.log"),
- "partyWarning": os.path.join(self.job_id, self.role, self.party_id, "WARNING.log"),
- "partyInfo": os.path.join(self.job_id,self.role, self.party_id, "INFO.log"),
- "partyDebug": os.path.join(self.job_id, self.role, self.party_id, "DEBUG.log"),
- "componentInfo": os.path.join(self.job_id, self.role, self.party_id, self.component_name, "INFO.log")
- }
- if self.log_type not in type_dict.keys():
- raise Exception(f"no found log type {self.log_type}")
- return os.path.join(get_fate_flow_directory('logs'), type_dict[self.log_type])
- def cat_log(self, begin, end):
- line_list = []
- log_path = self.get_log_file_path()
- if begin and end:
- cmd = f"cat {log_path} | tail -n +{begin}| head -n {end-begin+1}"
- elif begin:
- cmd = f"cat {log_path} | tail -n +{begin}"
- elif end:
- cmd = f"cat {log_path} | head -n {end}"
- else:
- cmd = f"cat {log_path}"
- lines = self.execute(cmd)
- if lines:
- line_list = []
- line_num = begin if begin else 1
- for line in lines.split("\n"):
- line = replace_ip(line)
- line_list.append({"line_num": line_num, "content": line})
- line_num += 1
- return line_list
- def get_size(self):
- try:
- return int(self.execute(f"cat {self.get_log_file_path()} | wc -l").strip())
- except:
- return 0
- @staticmethod
- def execute(cmd):
- res = subprocess.run(
- cmd, shell=True, universal_newlines=True,
- stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
- )
- return res.stdout
|