123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import os
- import re
- import typing
- import traceback
- import logging
- from fate_arch.common.log import LoggerFactory, getLogger
- from fate_flow.utils.base_utils import get_fate_flow_directory
- def ready_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
- prefix, suffix = base_msg(job, task, role, party_id, detail)
- return f"{prefix}{msg} ready{suffix}"
- def start_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
- prefix, suffix = base_msg(job, task, role, party_id, detail)
- return f"{prefix}start to {msg}{suffix}"
- def successful_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
- prefix, suffix = base_msg(job, task, role, party_id, detail)
- return f"{prefix}{msg} successfully{suffix}"
- def warning_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
- prefix, suffix = base_msg(job, task, role, party_id, detail)
- return f"{prefix}{msg} is not effective{suffix}"
- def failed_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
- prefix, suffix = base_msg(job, task, role, party_id, detail)
- return f"{prefix}failed to {msg}{suffix}"
- def base_msg(job=None, task=None, role: str = None, party_id: typing.Union[str, int] = None, detail=None):
- if detail:
- detail_msg = f" detail: \n{detail}"
- else:
- detail_msg = ""
- if task is not None:
- return f"task {task.f_task_id} {task.f_task_version} ", f" on {task.f_role} {task.f_party_id}{detail_msg}"
- elif job is not None:
- return "", f" on {job.f_role} {job.f_party_id}{detail_msg}"
- elif role and party_id:
- return "", f" on {role} {party_id}{detail_msg}"
- else:
- return "", f"{detail_msg}"
- def exception_to_trace_string(ex):
- return "".join(traceback.TracebackException.from_exception(ex).format())
- def get_logger_base_dir():
- job_log_dir = get_fate_flow_directory('logs')
- return job_log_dir
- def get_job_logger(job_id, log_type):
- fate_flow_log_dir = get_fate_flow_directory('logs', 'fate_flow')
- job_log_dir = get_fate_flow_directory('logs', job_id)
- if not job_id:
- log_dirs = [fate_flow_log_dir]
- else:
- if log_type == 'audit':
- log_dirs = [job_log_dir, fate_flow_log_dir]
- else:
- log_dirs = [job_log_dir]
- if LoggerFactory.log_share:
- oldmask = os.umask(000)
- os.makedirs(job_log_dir, exist_ok=True)
- os.makedirs(fate_flow_log_dir, exist_ok=True)
- os.umask(oldmask)
- else:
- os.makedirs(job_log_dir, exist_ok=True)
- os.makedirs(fate_flow_log_dir, exist_ok=True)
- logger = LoggerFactory.new_logger(f"{job_id}_{log_type}")
- for job_log_dir in log_dirs:
- handler = LoggerFactory.get_handler(class_name=None, level=LoggerFactory.LEVEL,
- log_dir=job_log_dir, log_type=log_type, job_id=job_id)
- error_handler = LoggerFactory.get_handler(class_name=None, level=logging.ERROR, log_dir=job_log_dir, log_type=log_type, job_id=job_id)
- logger.addHandler(handler)
- logger.addHandler(error_handler)
- with LoggerFactory.lock:
- LoggerFactory.schedule_logger_dict[job_id + log_type] = logger
- return logger
- def schedule_logger(job_id=None, delete=False):
- if not job_id:
- return getLogger("fate_flow_schedule")
- else:
- if delete:
- with LoggerFactory.lock:
- try:
- for key in LoggerFactory.schedule_logger_dict.keys():
- if job_id in key:
- del LoggerFactory.schedule_logger_dict[key]
- except:
- pass
- return True
- key = job_id + 'schedule'
- if key in LoggerFactory.schedule_logger_dict:
- return LoggerFactory.schedule_logger_dict[key]
- return get_job_logger(job_id, "schedule")
- def audit_logger(job_id='', log_type='audit'):
- key = job_id + log_type
- if key in LoggerFactory.schedule_logger_dict.keys():
- return LoggerFactory.schedule_logger_dict[key]
- return get_job_logger(job_id=job_id, log_type=log_type)
- def sql_logger(job_id='', log_type='sql'):
- key = job_id + log_type
- if key in LoggerFactory.schedule_logger_dict.keys():
- return LoggerFactory.schedule_logger_dict[key]
- return get_job_logger(job_id=job_id, log_type=log_type)
- def detect_logger(job_id='', log_type='detect'):
- key = job_id + log_type
- if key in LoggerFactory.schedule_logger_dict.keys():
- return LoggerFactory.schedule_logger_dict[key]
- return get_job_logger(job_id=job_id, log_type=log_type)
- def replace_ip(line):
- match_ip = re.findall('[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}', line)
- if match_ip:
- for ip in match_ip:
- line = re.sub(ip, "xxx.xxx.xxx.xxx", line)
- return line
|