meg_handler.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import json
  2. from handler.card_handler import choose_updated_card
  3. from utils.config_io import add_to_user_reply, if_has_processed, load_config_from_json, save_config_to_json
  4. from utils.image_io import create_image_path, save_image
  5. from utils.request_api import get_img, reply_message, update_message
  6. from utils.logger import logger
  7. from utils.template_io import load_template
  8. def handle_meg(meg, event_id, user_open_id):
  9. if 'message_id' in meg and 'content' in meg:
  10. # 正常消息,保存message_id可用于回复
  11. message_id = meg['message_id']
  12. if not if_has_processed(event_id):
  13. content = json.loads(meg['content'])
  14. if 'content' in content:
  15. # 为富文本消息
  16. rich_text_content = content['content']
  17. if if_at_bot_rich_text(rich_text_content):
  18. # @了机器人
  19. if if_has_img(rich_text_content):
  20. # 有图片
  21. response_data = img_in_meg_card(rich_text_content, if_has_download = False)
  22. res = reply_message(message_id, response_data)
  23. if res:
  24. reply_msg_id = res.json()['data']['message_id']
  25. # 保存到全局config防止重复回复
  26. add_to_user_reply(event_id, message_id, reply_msg_id)
  27. # 保存user信息到msg_config
  28. config_data = load_config_from_json(reply_msg_id)
  29. config_data['user_message'] = message_id
  30. config_data['user_open_id'] = user_open_id
  31. save_config_to_json(config_data, reply_msg_id)
  32. # 下载图片
  33. download_img_in_content(message_id, reply_msg_id, rich_text_content)
  34. # 更新卡片
  35. config_data = load_config_from_json(reply_msg_id)
  36. config_data['if_download'] = True
  37. save_config_to_json(config_data, reply_msg_id)
  38. if config_data['action'] != "" and config_data['lora'] != "":
  39. update_card_data = choose_updated_card(reply_msg_id, config_data)
  40. update_data ={
  41. "msg_type": "interactive",
  42. }
  43. update_data['content'] = json.dumps(update_card_data)
  44. else:
  45. update_data = img_in_meg_card(rich_text_content, if_has_download = True, reply_message_id = reply_msg_id)
  46. update_message(reply_msg_id, update_data)
  47. else:
  48. # 无图片
  49. return
  50. else:
  51. # 未@机器人
  52. return
  53. else:
  54. # 为普通消息
  55. if 'mentions' in meg:
  56. if any(item['name'] == 'Stable Diffusion Of HOXI' for item in meg['mentions']):
  57. logger.info('返回使用手册')
  58. response_data = no_img_in_meg_card()
  59. res = reply_message(message_id, response_data)
  60. return
  61. else:
  62. # 处理过的消息
  63. logger.info(message_id + '已处理过')
  64. return
  65. else:
  66. # 非正常消息
  67. return
  68. # 判断消息中是否@了机器人
  69. def if_at_bot_rich_text(content):
  70. for section in content:
  71. for item in section:
  72. if item.get('tag') == 'at' and item.get('user_name') == 'Stable Diffusion Of HOXI':
  73. return True
  74. return False
  75. # 判断消息中是否有图片
  76. def if_has_img(content):
  77. for section in content:
  78. for item in section:
  79. if item.get('tag') == 'img':
  80. return True
  81. return False
  82. # 下载用户信息中的图片
  83. def download_img_in_content(message_id, reply_message_id, content):
  84. for section in content:
  85. for item in section:
  86. if item.get('tag') == 'img':
  87. img_key = item.get('image_key')
  88. create_image_path(reply_message_id, img_key)
  89. for section in content:
  90. for item in section:
  91. if item.get('tag') == 'img':
  92. img_key = item.get('image_key')
  93. response = get_img(message_id, img_key)
  94. if response.status_code == 200:
  95. save_image(response.content, reply_message_id, img_key)
  96. logger.info(img_key + "保存成功")
  97. # 生成无图片的卡片消息
  98. def no_img_in_meg_card():
  99. response_data ={
  100. "msg_type": "interactive",
  101. }
  102. card_content = load_template('no_img_card')
  103. response_data['content'] = json.dumps(card_content)
  104. return response_data
  105. # 生成交互卡片消息
  106. def img_in_meg_card(content, if_has_download = False, reply_message_id = None):
  107. response_data ={
  108. "msg_type": "interactive",
  109. }
  110. card_content = load_template('choose_card' if if_has_download else 'upload_card', reply_message_id)
  111. image_columns = []
  112. for section in content:
  113. for item in section:
  114. if item.get('tag') == 'img':
  115. image_columns.append(
  116. {
  117. "tag": "column",
  118. "width": "weighted",
  119. "weight": 1,
  120. "vertical_align": "top",
  121. "elements":
  122. [
  123. {
  124. "tag": "img",
  125. "img_key": item.get('image_key'),
  126. "alt": {
  127. "tag": "plain_text",
  128. "content": ""
  129. },
  130. "mode": "fit_horizontal",
  131. "preview": True
  132. },
  133. ]
  134. }
  135. )
  136. card_content['elements'][1]['columns'] = image_columns
  137. response_data['content'] = json.dumps(card_content)
  138. return response_data