import json import requests import os from functools import wraps from dotenv import load_dotenv from requests_toolbelt import MultipartEncoder from utils.logger import logger from model.global_model import GlobalModel load_dotenv() # 加载环境变量 APP_ID = os.getenv('APP_ID') APP_SECRET = os.getenv('APP_SECRET') def get_tenant_token(): url = 'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal' data = { 'app_id': APP_ID, 'app_secret': APP_SECRET } r = requests.post(url, json=data) GlobalModel().tenant_token = r.json().get('tenant_access_token') def keep_tenant_access_token(f): @wraps(f) def decorated(*args, **kwargs): if GlobalModel().tenant_token == "": # tenant_token未初始化 get_tenant_token() r = f(*args, **kwargs) else: # tenant_token已初始化 try: r = f(*args, **kwargs) if r: if r.json().get('code') == 99991661 or r.json().get('code') == 99991663: # tenant_token已过期 get_tenant_token() r = f(*args, **kwargs) else: # 正常返回 return r else: get_tenant_token() r = f(*args, **kwargs) except: get_tenant_token() r = f(*args, **kwargs) return r return r return decorated @keep_tenant_access_token def reply_message(message_id, response_json): headers = { 'Authorization': 'Bearer ' + GlobalModel().tenant_token, 'Content-Type': 'application/json; charset=utf-8' } url = f"https://open.feishu.cn/open-apis/im/v1/messages/{message_id}/reply" res = requests.post(url, json=response_json, headers=headers) if res.status_code == 200 and 'code' in res.json() and res.json()['code'] == 0: logger.info(message_id + '回复成功') return res else: logger.error(message_id + '回复失败\n' + str(res.json())) return None @keep_tenant_access_token def get_img(message_id, img_key): headers = { 'Authorization': 'Bearer ' + GlobalModel().tenant_token } url = f"https://open.feishu.cn/open-apis/im/v1/messages/{message_id}/resources/{img_key}?type=image" response = requests.get(url, headers=headers) return response @keep_tenant_access_token def update_message(message_id, update_json): headers = { 'Authorization': 'Bearer ' + GlobalModel().tenant_token, 'Content-Type': 'application/json; charset=utf-8' } url = f"https://open.feishu.cn/open-apis/im/v1/messages/{message_id}" res = requests.patch(url, json=update_json, headers=headers) if res.status_code == 200 and 'code' in res.json() and res.json()['code'] == 0: logger.info(message_id + '更新成功') return res else: logger.error(message_id + '更新失败\n' + str(res.json())) return None @keep_tenant_access_token def upload_image(img_key, image_data, note): url = "https://open.feishu.cn/open-apis/im/v1/images" data = { "image_type": "message", 'image': image_data } multi_form = MultipartEncoder(data) headers = { 'Authorization': 'Bearer ' + GlobalModel().tenant_token, 'Content-Type': multi_form.content_type } res = requests.post(url, data=multi_form, headers=headers) if res.status_code == 200 and 'code' in res.json() and res.json()['code'] == 0: logger.info(img_key + "-" + note + '上传成功') return res else: logger.error(img_key + "-" + note + '上传失败\n' + str(res.json())) return None