- 增加了通过外部 HTTP API 向 QQ 群组发送消息的核心功能。 - 实现了对长文本消息的图片渲染,以避免被认定为垃圾信息。 - 支持在消息中提及特定的 QQ 用户。 - 创建了用于 API 令牌和图片渲染设置的配置选项。 - 开发了一个测试脚本以验证 API 功能。 - 对现有代码进行了重构,以提高组织性和可维护性。
247 lines
9.6 KiB
Python
247 lines
9.6 KiB
Python
import requests
|
||
import json
|
||
from typing import Dict, Optional, Tuple
|
||
from nonebot import logger
|
||
from .config import Config
|
||
|
||
def mask_username(username: str) -> str:
|
||
"""
|
||
对用户名进行脱敏处理,只显示前两位和后两位,中间用*号隐藏
|
||
|
||
Args:
|
||
username: 原始用户名
|
||
|
||
Returns:
|
||
脱敏后的用户名
|
||
"""
|
||
if not username:
|
||
return username
|
||
|
||
# 如果用户名长度小于等于4,直接显示前两位和后两位(可能重叠)
|
||
if len(username) <= 4:
|
||
return username
|
||
|
||
# 显示前两位和后两位,中间用*号填充
|
||
return f"{username[:2]}{'*' * (len(username) - 4)}{username[-2:]}"
|
||
|
||
# 获取配置
|
||
config = Config()
|
||
|
||
# API 端点配置
|
||
DD_API_HOST = "https://api.danding.vip/DD/" # 蛋定服务器连接地址
|
||
BOT_TOKEN = "3340e353a49447f1be640543cbdcd937" # 对接服务器的Token
|
||
BOT_USER_ID = "1424473282" # 机器人用户ID
|
||
|
||
async def query_qq_binding(qq: str) -> Tuple[bool, Optional[str], Optional[str]]:
|
||
"""
|
||
查询QQ号是否绑定了蛋定用户名
|
||
|
||
Args:
|
||
qq: 要查询的QQ号
|
||
|
||
Returns:
|
||
Tuple[是否绑定, 用户名, VIP到期时间]
|
||
"""
|
||
try:
|
||
url = f"{DD_API_HOST}query_qq_binding"
|
||
data = {"qq": qq}
|
||
|
||
response = requests.post(url=url, json=data)
|
||
logger.debug(f"查询QQ绑定状态响应: {response}")
|
||
|
||
if response.status_code != 200:
|
||
logger.error(f"查询QQ绑定状态失败,状态码: {response.status_code}")
|
||
return False, None, None
|
||
|
||
result = response.json()
|
||
logger.debug(f"查询QQ绑定状态结果: {result}")
|
||
|
||
if result.get("code") == 200:
|
||
data = result.get("data", {})
|
||
is_bound = data.get("is_bound", False)
|
||
|
||
if is_bound:
|
||
username = data.get("username")
|
||
vip_time = data.get("vip_time")
|
||
return True, username, vip_time
|
||
else:
|
||
return False, None, None
|
||
else:
|
||
logger.error(f"查询QQ绑定状态失败,错误信息: {result.get('message')}")
|
||
return False, None, None
|
||
|
||
except Exception as e:
|
||
logger.error(f"查询QQ绑定状态异常: {str(e)}")
|
||
return False, None, None
|
||
|
||
async def add_user_viptime(username: str, time_class: str = "Day", count: int = 1) -> Tuple[bool, str]:
|
||
"""
|
||
为用户添加VIP时间
|
||
|
||
Args:
|
||
username: 蛋定用户名
|
||
time_class: 时间类型 (Hour/Day/Week/Month/Season/Year)
|
||
count: 添加次数(默认为1)
|
||
|
||
Returns:
|
||
Tuple[是否成功, 响应消息]
|
||
"""
|
||
try:
|
||
url = f"{DD_API_HOST}bot_add_user_viptime"
|
||
|
||
# 如果count大于1,需要多次调用API
|
||
success_count = 0
|
||
last_message = ""
|
||
|
||
for i in range(count):
|
||
data = {
|
||
"user": BOT_USER_ID,
|
||
"token": BOT_TOKEN,
|
||
"username": username,
|
||
"classes": time_class
|
||
}
|
||
|
||
response = requests.post(url=url, json=data)
|
||
logger.debug(f"添加VIP时间响应({i+1}/{count}): {response}")
|
||
|
||
if response.status_code != 200:
|
||
error_msg = f"添加VIP时间失败({i+1}/{count}),状态码: {response.status_code}"
|
||
logger.error(error_msg)
|
||
continue
|
||
|
||
result = response.json()
|
||
logger.debug(f"添加VIP时间结果({i+1}/{count}): {result}")
|
||
|
||
if result.get("code") == 200:
|
||
success_count += 1
|
||
last_message = result.get("msg", "添加VIP时间成功")
|
||
else:
|
||
error_msg = result.get("msg", "添加VIP时间失败")
|
||
logger.error(f"添加VIP时间失败({i+1}/{count}): {error_msg}")
|
||
|
||
if success_count == count:
|
||
return True, f"成功添加{count}次{time_class}时长。{last_message}"
|
||
elif success_count > 0:
|
||
return False, f"仅成功添加{success_count}/{count}次{time_class}时长。{last_message}"
|
||
else:
|
||
return False, f"添加{count}次{time_class}时长全部失败。"
|
||
|
||
except Exception as e:
|
||
error_msg = f"添加VIP时间异常: {str(e)}"
|
||
logger.error(error_msg)
|
||
return False, error_msg
|
||
|
||
async def process_ssr_sp_reward(user_id: str, count: int = 1) -> Tuple[bool, str]:
|
||
"""
|
||
处理SSR/SP奖励发放
|
||
|
||
Args:
|
||
user_id: QQ用户ID
|
||
count: 奖励数量(默认为1)
|
||
|
||
Returns:
|
||
Tuple[是否自动发放成功, 消息内容]
|
||
"""
|
||
# 查询QQ绑定状态
|
||
is_bound, username, vip_time = await query_qq_binding(user_id)
|
||
|
||
if not is_bound:
|
||
# 用户未绑定,返回提示信息
|
||
if count == 1:
|
||
msg = (f"🎉恭喜您抽中了SSR/SP稀有度式神!🎉\n"
|
||
f"获得奖励:蛋定助手天卡一张\n"
|
||
f"获取奖励请联系管理员,或前往蛋定云服务中绑定QQ号即可体验自动加时!")
|
||
else:
|
||
msg = (f"🎉恭喜您抽中了{count}张SSR/SP稀有度式神!🎉\n"
|
||
f"获得奖励:蛋定助手天卡{count}张\n"
|
||
f"获取奖励请联系管理员,或前往蛋定云服务中绑定QQ号即可体验自动加时!")
|
||
return False, msg
|
||
else:
|
||
# 用户已绑定,自动加时
|
||
success, message = await add_user_viptime(username, "Day", count)
|
||
|
||
if success:
|
||
masked_username = mask_username(username)
|
||
if count == 1:
|
||
msg = (f"🎉恭喜您抽中了SSR/SP稀有度式神!🎉\n"
|
||
f"🎁已自动为您的蛋定账号({masked_username})添加天卡时长!\n"
|
||
f"📅到期时间: {message.split('到期时间为:')[-1] if '到期时间为:' in message else '请查看您的账号详情'}")
|
||
else:
|
||
msg = (f"🎉恭喜您抽中了{count}张SSR/SP稀有度式神!🎉\n"
|
||
f"🎁已自动为您的蛋定账号({masked_username})添加{count}天卡时长!\n"
|
||
f"📅到期时间: {message.split('到期时间为:')[-1] if '到期时间为:' in message else '请查看您的账号详情'}")
|
||
return True, msg
|
||
else:
|
||
# 自动加时失败,返回错误信息和手动领取提示
|
||
if count == 1:
|
||
msg = (f"🎉恭喜您抽中了SSR/SP稀有度式神!🎉\n"
|
||
f"获得奖励:蛋定助手天卡一张\n"
|
||
f"⚠️自动加时失败: {message}\n"
|
||
f"请联系管理员手动领取奖励!")
|
||
else:
|
||
msg = (f"🎉恭喜您抽中了{count}张SSR/SP稀有度式神!🎉\n"
|
||
f"获得奖励:蛋定助手天卡{count}张\n"
|
||
f"⚠️自动加时失败: {message}\n"
|
||
f"请联系管理员手动领取奖励!")
|
||
return False, msg
|
||
|
||
async def process_achievement_reward(user_id: str, achievement_id: str) -> Tuple[bool, str]:
|
||
"""
|
||
处理成就奖励发放
|
||
|
||
Args:
|
||
user_id: QQ用户ID
|
||
achievement_id: 成就ID
|
||
|
||
Returns:
|
||
Tuple[是否自动发放成功, 消息内容]
|
||
"""
|
||
# 获取成就配置
|
||
achievement_config = config.ACHIEVEMENTS.get(achievement_id)
|
||
if not achievement_config:
|
||
# 检查是否是重复奖励
|
||
if "_repeat_" in achievement_id:
|
||
base_achievement_id = achievement_id.split("_repeat_")[0]
|
||
base_config = config.ACHIEVEMENTS.get(base_achievement_id)
|
||
if base_config:
|
||
reward_type = base_config.get("repeat_reward", "天卡")
|
||
else:
|
||
reward_type = "天卡"
|
||
else:
|
||
return False, f"未找到成就配置: {achievement_id}"
|
||
else:
|
||
reward_type = achievement_config.get("reward", "天卡")
|
||
|
||
# 查询QQ绑定状态
|
||
is_bound, username, vip_time = await query_qq_binding(user_id)
|
||
|
||
if not is_bound:
|
||
# 用户未绑定,返回提示信息
|
||
msg = (f"🏆 恭喜解锁成就奖励!\n"
|
||
f"获得奖励:蛋定助手{reward_type}一张\n"
|
||
f"获取奖励请联系管理员,或前往蛋定云服务中绑定QQ号即可体验自动加时!")
|
||
return False, msg
|
||
else:
|
||
# 用户已绑定,自动加时
|
||
# 将奖励类型转换为API需要的时间类型
|
||
time_class = "Day" # 默认为天卡
|
||
if "周卡" in reward_type:
|
||
time_class = "Week"
|
||
elif "月卡" in reward_type:
|
||
time_class = "Month"
|
||
|
||
success, message = await add_user_viptime(username, time_class)
|
||
|
||
if success:
|
||
masked_username = mask_username(username)
|
||
msg = (f"🏆 恭喜解锁成就奖励!\n"
|
||
f"🎁已自动为您的蛋定账号({masked_username})添加{reward_type}时长!\n"
|
||
f"📅到期时间: {message.split('到期时间为:')[-1] if '到期时间为:' in message else '请查看您的账号详情'}")
|
||
return True, msg
|
||
else:
|
||
# 自动加时失败,返回错误信息和手动领取提示
|
||
msg = (f"🏆 恭喜解锁成就奖励!\n"
|
||
f"获得奖励:蛋定助手{reward_type}一张\n"
|
||
f"⚠️自动加时失败: {message}\n"
|
||
f"请联系管理员手动领取奖励!")
|
||
return False, msg |