deal_rollsite_audit_log.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. import os
  2. import json
  3. import sys
  4. import re
  5. import requests
  6. import traceback
  7. import datetime
  8. from deal_rollsite_audit_log_settings import LOG_INDEX, ELASTIC_SEARCH_URL, ELASTIC_SEARCH_AUTH, ELASTIC_SEARCH_USER, ELASTIC_SEARCH_PASSWORD, HOST_ROLE_PARTY_ID
  9. LOG_PATH = ""
  10. SERVER_IP = None
  11. EXCHANGE_TYPE = "general"
  12. def run():
  13. progress = read_progress()
  14. end_pos = progress.get("end_pos", -1)
  15. last_st_ino = progress.get("st_ino", -1)
  16. last_st_mtime = progress.get("st_mtime", -1)
  17. now_st_ino = os.stat(LOG_PATH).st_ino
  18. print(f"last inode num: {last_st_ino}, mtime: {last_st_mtime}")
  19. print(f"{LOG_PATH} inode num is {now_st_ino}")
  20. if last_st_ino != -1 and last_st_ino != now_st_ino:
  21. # create time not match, log path have change
  22. print(f"last inode num is {last_st_ino}, but now is {now_st_ino}, log path have change, search all pending log")
  23. last_deal_log_path, pending_paths = search_pending_logs(os.path.dirname(LOG_PATH), last_st_ino, last_st_mtime)
  24. print(f"find last deal log path: {last_deal_log_path}")
  25. print(f"find pending paths: {pending_paths}")
  26. deal_log(last_deal_log_path, end_pos)
  27. for pending_path in pending_paths:
  28. deal_log(pending_path, -1)
  29. # reset end pos
  30. end_pos = -1
  31. end_pos = deal_log(LOG_PATH, end_pos)
  32. progress["end_pos"] = end_pos
  33. progress["st_mtime"] = os.stat(LOG_PATH).st_mtime
  34. progress["st_ino"] = os.stat(LOG_PATH).st_ino
  35. save_progress(progress)
  36. def deal_log(LOG_PATH, end_pos):
  37. if not LOG_PATH:
  38. return end_pos
  39. audit_logs = []
  40. with open(LOG_PATH) as fr:
  41. line_count = get_file_line_count(fr)
  42. print(f"{LOG_PATH} end pos: {end_pos}, line count: {line_count}")
  43. if line_count > end_pos + 1:
  44. fr.seek(end_pos + 1)
  45. while True:
  46. line = fr.readline()
  47. if not line:
  48. break
  49. audit_log = deal_line(line)
  50. merge_audit_log(audit_log, audit_logs)
  51. end_pos = fr.tell()
  52. else:
  53. print(f"{LOG_PATH} no change")
  54. if audit_logs:
  55. bulk_save(audit_logs)
  56. return end_pos
  57. def merge_audit_log(audit_log, audit_logs):
  58. if audit_log:
  59. #audit_logs.append(json.dumps({"index": {"_index": "fate_rollsite_exchange_audit"}}))
  60. audit_logs.append('{"index":{}}')
  61. audit_logs.append(json.dumps(audit_log))
  62. def search_pending_logs(log_dir, st_ino, st_mtime):
  63. last_deal_log_path = None
  64. pending_paths = []
  65. year_dirs = [os.path.join(log_dir, f) for f in os.listdir(log_dir) if os.path.isdir(os.path.join(log_dir, f))]
  66. year_dirs.sort(key=lambda f: os.stat(f).st_mtime, reverse=True)
  67. for year_dir in year_dirs:
  68. print(f"search year dir: {year_dir}")
  69. month_dirs = [os.path.join(year_dir, f) for f in os.listdir(year_dir) if os.path.isdir(os.path.join(year_dir, f))]
  70. month_dirs.sort(key=lambda f: os.stat(f).st_mtime, reverse=True)
  71. year_search = False
  72. for month_dir in month_dirs:
  73. print(f"search month dir: {month_dir}")
  74. day_dirs = [os.path.join(month_dir, f) for f in os.listdir(month_dir) if os.path.isdir(os.path.join(month_dir, f))]
  75. day_dirs.sort(key=lambda f: os.stat(f).st_mtime, reverse=True)
  76. month_search = False
  77. for day_dir in day_dirs:
  78. print(f"search day dir: {day_dir}")
  79. last_deal_log_path, day_pending_paths = get_pending_logs(day_dir, st_ino, st_mtime)
  80. if day_pending_paths:
  81. print(f"get pending path: {day_pending_paths}")
  82. pending_paths.extend(day_pending_paths)
  83. else:
  84. print(f"{day_dir} no pending path, break")
  85. break
  86. else:
  87. # all day dir have pending_paths
  88. month_search = True
  89. if not month_search:
  90. break
  91. else:
  92. # all day dir have pending_paths
  93. year_search = True
  94. if not year_search:
  95. break
  96. return last_deal_log_path, pending_paths
  97. def get_pending_logs(day_dir, st_ino, st_mtime):
  98. pending_paths = []
  99. st_mtime_match_path = None
  100. for f in os.listdir(day_dir):
  101. f_p = os.path.join(day_dir, f)
  102. if os.path.isfile(f_p) and f.startswith("rollsite-audit.log") and os.stat(f_p).st_mtime >= st_mtime:
  103. if os.stat(f_p).st_ino == st_ino:
  104. st_mtime_match_path = f_p
  105. else:
  106. pending_paths.append(f_p)
  107. return st_mtime_match_path, pending_paths
  108. def get_file_line_count(fp):
  109. fp.seek(0, 2)
  110. return fp.tell()
  111. def progress_file_path():
  112. return os.path.join(os.path.dirname(os.path.realpath(__file__)), "deal_rollsite_log_progress.json")
  113. def read_progress():
  114. p_p = progress_file_path()
  115. if not os.path.exists(p_p):
  116. return {}
  117. with open(p_p) as fr:
  118. return json.load(fr)
  119. def save_progress(progress):
  120. p_p = progress_file_path()
  121. with open(p_p, "w") as fw:
  122. json.dump(progress, fw, indent=4)
  123. def deal_line(src):
  124. #a = "[INFO ][36165610][2021-03-19 20:08:05,935][grpc-server-9370-30,pid:32590,tid:89][audit:87] - task={taskId=202103192007180194594}|src={name=202103192007180194594,partyId=9999,role=fateflow,callback={ip=127.0.0.1,port=9360}}|dst={name=202103192007180194594,partyId=10000,role=fateflow}|command={name=/v1/party/202103192007180194594/arbiter/10000/clean}|operator=POST|conf={overallTimeout=30000}"
  125. meta_data = {}
  126. try:
  127. split_items = src.split(" - ")
  128. meta_line = split_items[1].strip()
  129. meta_data["logTime"] = re.findall("\[.*?\]", split_items[0])[2].strip("[").strip("]")
  130. meta_data["logTime"] = (datetime.datetime.strptime(meta_data["logTime"], "%Y-%m-%d %H:%M:%S,%f") - datetime.timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
  131. for meta_item_str in meta_line.split("|"):
  132. meta_item_key = meta_item_str[:meta_item_str.index("=")]
  133. meta_item_value_str = meta_item_str[meta_item_str.index("=") + 1:]
  134. if meta_item_value_str.find("{") == 0:
  135. meta_item_value = str_to_dict(meta_item_value_str[1:-1])
  136. else:
  137. meta_item_value = meta_item_value_str
  138. meta_data[meta_item_key] = meta_item_value
  139. meta_data["jobId"] = meta_data["task"]["taskId"]
  140. meta_data["jobDate"] = (datetime.datetime.strptime(meta_data["jobId"][:14], "%Y%m%d%H%M%S") - datetime.timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")
  141. meta_data["server"] = SERVER_IP
  142. meta_data["exchangeType"] = EXCHANGE_TYPE
  143. meta_data["src"]["role"] = "host" if meta_data["src"]["partyId"] in HOST_ROLE_PARTY_ID else "guest"
  144. meta_data["dst"]["role"] = "host" if meta_data["dst"]["partyId"] in HOST_ROLE_PARTY_ID else "guest"
  145. except Exception as e:
  146. traceback.print_exc()
  147. return meta_data
  148. def str_to_dict(src):
  149. key = ""
  150. value = ""
  151. current = 1 # 1 for key, 2 for value
  152. d = {}
  153. i = 0
  154. while True:
  155. c = src[i]
  156. if c == "{":
  157. j = i + 1
  158. sub_str = ""
  159. while True:
  160. if src[j] == "}":
  161. j = j + 1
  162. break
  163. else:
  164. sub_str += src[j]
  165. j = j + 1
  166. sub = str_to_dict(sub_str)
  167. if current == 2:
  168. d[key] = sub
  169. i = j
  170. else:
  171. if c == "=":
  172. current = 2
  173. elif c == ",":
  174. d[key] = value
  175. key = ""
  176. value = ""
  177. current = 1
  178. else:
  179. if current == 1:
  180. key += c
  181. elif current == 2:
  182. value += c
  183. i = i + 1
  184. if i == len(src):
  185. if key and value:
  186. d[key] = value
  187. break
  188. return d
  189. def upload(audit_log):
  190. res = requests.post("/".join([ELASTIC_SEARCH_URL, LOG_INDEX, "_doc"]), json=audit_log)
  191. print(res.json())
  192. def bulk_save(audit_logs):
  193. data = "\n".join(audit_logs) + "\n"
  194. if ELASTIC_SEARCH_AUTH:
  195. res = requests.post("/".join([ELASTIC_SEARCH_URL, LOG_INDEX, "_doc", "_bulk"]),
  196. data=data,
  197. headers={'content-type':'application/json', 'charset':'UTF-8'},
  198. timeout=(30, 300),
  199. auth=(ELASTIC_SEARCH_USER, ELASTIC_SEARCH_PASSWORD))
  200. else:
  201. res = requests.post("/".join([ELASTIC_SEARCH_URL, LOG_INDEX, "_doc", "_bulk"]),
  202. data=data,
  203. headers={'content-type':'application/json', 'charset':'UTF-8'},
  204. timeout=(30, 300))
  205. print(res.text)
  206. print(res.json())
  207. if __name__ == "__main__":
  208. LOG_PATH = sys.argv[1]
  209. SERVER_IP = sys.argv[2]
  210. EXCHANGE_TYPE = sys.argv[3]
  211. run()