123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- import json
- from handler.card_handler import choose_updated_card
- from utils.config_io import add_to_user_reply, if_has_processed, load_config_from_json, save_config_to_json
- from utils.image_io import create_image_path, save_image
- from utils.request_api import get_img, reply_message, update_message
- from utils.logger import logger
- from utils.template_io import load_template
- def handle_meg(meg, event_id, user_open_id):
- if 'message_id' in meg and 'content' in meg:
- # 正常消息,保存message_id可用于回复
- message_id = meg['message_id']
- if not if_has_processed(event_id):
- content = json.loads(meg['content'])
- if 'content' in content:
- # 为富文本消息
- rich_text_content = content['content']
- if if_at_bot_rich_text(rich_text_content):
- # @了机器人
- if if_has_img(rich_text_content):
- # 有图片
- response_data = img_in_meg_card(rich_text_content, if_has_download = False)
- res = reply_message(message_id, response_data)
- if res:
- reply_msg_id = res.json()['data']['message_id']
- # 保存到全局config防止重复回复
- add_to_user_reply(event_id, message_id, reply_msg_id)
- # 保存user信息到msg_config
- config_data = load_config_from_json(reply_msg_id)
- config_data['user_message'] = message_id
- config_data['user_open_id'] = user_open_id
- save_config_to_json(config_data, reply_msg_id)
- # 下载图片
- download_img_in_content(message_id, reply_msg_id, rich_text_content)
- # 更新卡片
- config_data = load_config_from_json(reply_msg_id)
- config_data['if_download'] = True
- save_config_to_json(config_data, reply_msg_id)
- if config_data['action'] != "" and config_data['lora'] != "":
- update_card_data = choose_updated_card(reply_msg_id, config_data)
- update_data ={
- "msg_type": "interactive",
- }
- update_data['content'] = json.dumps(update_card_data)
- else:
- update_data = img_in_meg_card(rich_text_content, if_has_download = True, reply_message_id = reply_msg_id)
- update_message(reply_msg_id, update_data)
- else:
- # 无图片
- return
-
- else:
- # 未@机器人
- return
- else:
- # 为普通消息
- if 'mentions' in meg:
- if any(item['name'] == 'Stable Diffusion Of HOXI' for item in meg['mentions']):
- logger.info('返回使用手册')
- response_data = no_img_in_meg_card()
- res = reply_message(message_id, response_data)
- return
- else:
- # 处理过的消息
- logger.info(message_id + '已处理过')
- return
- else:
- # 非正常消息
- return
- # 判断消息中是否@了机器人
- def if_at_bot_rich_text(content):
- for section in content:
- for item in section:
- if item.get('tag') == 'at' and item.get('user_name') == 'Stable Diffusion Of HOXI':
- return True
- return False
- # 判断消息中是否有图片
- def if_has_img(content):
- for section in content:
- for item in section:
- if item.get('tag') == 'img':
- return True
- return False
- # 下载用户信息中的图片
- def download_img_in_content(message_id, reply_message_id, content):
- for section in content:
- for item in section:
- if item.get('tag') == 'img':
- img_key = item.get('image_key')
- create_image_path(reply_message_id, img_key)
- for section in content:
- for item in section:
- if item.get('tag') == 'img':
- img_key = item.get('image_key')
- response = get_img(message_id, img_key)
- if response.status_code == 200:
- save_image(response.content, reply_message_id, img_key)
- logger.info(img_key + "保存成功")
- # 生成无图片的卡片消息
- def no_img_in_meg_card():
- response_data ={
- "msg_type": "interactive",
- }
- card_content = load_template('no_img_card')
- response_data['content'] = json.dumps(card_content)
- return response_data
- # 生成交互卡片消息
- def img_in_meg_card(content, if_has_download = False, reply_message_id = None):
- response_data ={
- "msg_type": "interactive",
- }
- card_content = load_template('choose_card' if if_has_download else 'upload_card', reply_message_id)
- image_columns = []
- for section in content:
- for item in section:
- if item.get('tag') == 'img':
- image_columns.append(
- {
- "tag": "column",
- "width": "weighted",
- "weight": 1,
- "vertical_align": "top",
- "elements":
- [
- {
- "tag": "img",
- "img_key": item.get('image_key'),
- "alt": {
- "tag": "plain_text",
- "content": ""
- },
- "mode": "fit_horizontal",
- "preview": True
- },
- ]
- }
- )
- card_content['elements'][1]['columns'] = image_columns
- response_data['content'] = json.dumps(card_content)
- return response_data
|