log_utils.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import os
  17. import re
  18. import typing
  19. import traceback
  20. import logging
  21. from fate_arch.common.log import LoggerFactory, getLogger
  22. from fate_flow.utils.base_utils import get_fate_flow_directory
  23. def ready_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
  24. prefix, suffix = base_msg(job, task, role, party_id, detail)
  25. return f"{prefix}{msg} ready{suffix}"
  26. def start_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
  27. prefix, suffix = base_msg(job, task, role, party_id, detail)
  28. return f"{prefix}start to {msg}{suffix}"
  29. def successful_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
  30. prefix, suffix = base_msg(job, task, role, party_id, detail)
  31. return f"{prefix}{msg} successfully{suffix}"
  32. def warning_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
  33. prefix, suffix = base_msg(job, task, role, party_id, detail)
  34. return f"{prefix}{msg} is not effective{suffix}"
  35. def failed_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
  36. prefix, suffix = base_msg(job, task, role, party_id, detail)
  37. return f"{prefix}failed to {msg}{suffix}"
  38. def base_msg(job=None, task=None, role: str = None, party_id: typing.Union[str, int] = None, detail=None):
  39. if detail:
  40. detail_msg = f" detail: \n{detail}"
  41. else:
  42. detail_msg = ""
  43. if task is not None:
  44. return f"task {task.f_task_id} {task.f_task_version} ", f" on {task.f_role} {task.f_party_id}{detail_msg}"
  45. elif job is not None:
  46. return "", f" on {job.f_role} {job.f_party_id}{detail_msg}"
  47. elif role and party_id:
  48. return "", f" on {role} {party_id}{detail_msg}"
  49. else:
  50. return "", f"{detail_msg}"
  51. def exception_to_trace_string(ex):
  52. return "".join(traceback.TracebackException.from_exception(ex).format())
  53. def get_logger_base_dir():
  54. job_log_dir = get_fate_flow_directory('logs')
  55. return job_log_dir
  56. def get_job_logger(job_id, log_type):
  57. fate_flow_log_dir = get_fate_flow_directory('logs', 'fate_flow')
  58. job_log_dir = get_fate_flow_directory('logs', job_id)
  59. if not job_id:
  60. log_dirs = [fate_flow_log_dir]
  61. else:
  62. if log_type == 'audit':
  63. log_dirs = [job_log_dir, fate_flow_log_dir]
  64. else:
  65. log_dirs = [job_log_dir]
  66. if LoggerFactory.log_share:
  67. oldmask = os.umask(000)
  68. os.makedirs(job_log_dir, exist_ok=True)
  69. os.makedirs(fate_flow_log_dir, exist_ok=True)
  70. os.umask(oldmask)
  71. else:
  72. os.makedirs(job_log_dir, exist_ok=True)
  73. os.makedirs(fate_flow_log_dir, exist_ok=True)
  74. logger = LoggerFactory.new_logger(f"{job_id}_{log_type}")
  75. for job_log_dir in log_dirs:
  76. handler = LoggerFactory.get_handler(class_name=None, level=LoggerFactory.LEVEL,
  77. log_dir=job_log_dir, log_type=log_type, job_id=job_id)
  78. error_handler = LoggerFactory.get_handler(class_name=None, level=logging.ERROR, log_dir=job_log_dir, log_type=log_type, job_id=job_id)
  79. logger.addHandler(handler)
  80. logger.addHandler(error_handler)
  81. with LoggerFactory.lock:
  82. LoggerFactory.schedule_logger_dict[job_id + log_type] = logger
  83. return logger
  84. def schedule_logger(job_id=None, delete=False):
  85. if not job_id:
  86. return getLogger("fate_flow_schedule")
  87. else:
  88. if delete:
  89. with LoggerFactory.lock:
  90. try:
  91. for key in LoggerFactory.schedule_logger_dict.keys():
  92. if job_id in key:
  93. del LoggerFactory.schedule_logger_dict[key]
  94. except:
  95. pass
  96. return True
  97. key = job_id + 'schedule'
  98. if key in LoggerFactory.schedule_logger_dict:
  99. return LoggerFactory.schedule_logger_dict[key]
  100. return get_job_logger(job_id, "schedule")
  101. def audit_logger(job_id='', log_type='audit'):
  102. key = job_id + log_type
  103. if key in LoggerFactory.schedule_logger_dict.keys():
  104. return LoggerFactory.schedule_logger_dict[key]
  105. return get_job_logger(job_id=job_id, log_type=log_type)
  106. def sql_logger(job_id='', log_type='sql'):
  107. key = job_id + log_type
  108. if key in LoggerFactory.schedule_logger_dict.keys():
  109. return LoggerFactory.schedule_logger_dict[key]
  110. return get_job_logger(job_id=job_id, log_type=log_type)
  111. def detect_logger(job_id='', log_type='detect'):
  112. key = job_id + log_type
  113. if key in LoggerFactory.schedule_logger_dict.keys():
  114. return LoggerFactory.schedule_logger_dict[key]
  115. return get_job_logger(job_id=job_id, log_type=log_type)
  116. def replace_ip(line):
  117. match_ip = re.findall('[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}', line)
  118. if match_ip:
  119. for ip in match_ip:
  120. line = re.sub(ip, "xxx.xxx.xxx.xxx", line)
  121. return line