首次提交

This commit is contained in:
2025-12-26 22:41:42 +08:00
commit 4a944316fe
143 changed files with 17550 additions and 0 deletions

View File

@@ -0,0 +1,771 @@
import os
from nonebot import on_command, on_startswith
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, MessageEvent, Message
from nonebot.adapters.onebot.v11.message import MessageSegment
from nonebot.typing import T_State
from nonebot.rule import Rule
from pathlib import Path
from .config import Config
from .gacha import GachaSystem
from .utils import format_user_mention, get_image_path
from .api_utils import process_ssr_sp_reward, process_achievement_reward
from . import web_api
# 创建Config实例
config = Config()
# 允许的群聊ID和用户ID
ALLOWED_GROUP_ID = config.ALLOWED_GROUP_ID
ALLOWED_USER_ID = config.ALLOWED_USER_ID
GACHA_COMMANDS = config.GACHA_COMMANDS
STATS_COMMANDS = config.STATS_COMMANDS
DAILY_STATS_COMMANDS = config.DAILY_STATS_COMMANDS
TRIPLE_GACHA_COMMANDS = config.TRIPLE_GACHA_COMMANDS
ACHIEVEMENT_COMMANDS = config.ACHIEVEMENT_COMMANDS
INTRO_COMMANDS = config.INTRO_COMMANDS
DAILY_LIMIT = config.DAILY_LIMIT
gacha_system = GachaSystem()
# 检查是否允许使用功能的规则
def check_permission() -> Rule:
async def _checker(event: MessageEvent) -> bool:
# 允许特定用户在任何场景下使用
if event.user_id == ALLOWED_USER_ID:
return True
# 在允许的群聊中任何人都可以使用
if isinstance(event, GroupMessageEvent) and event.group_id == ALLOWED_GROUP_ID:
return True
return False
return Rule(_checker)
# 注册抽卡命令,添加权限检查规则
gacha_matcher = on_command("抽卡", aliases=set(GACHA_COMMANDS), priority=10, rule=check_permission())
@gacha_matcher.handle()
async def handle_gacha(bot: Bot, event: MessageEvent, state: T_State):
user_id = str(event.user_id)
user_name = event.sender.card if isinstance(event, GroupMessageEvent) else event.sender.nickname
# 执行抽卡
result = gacha_system.draw(user_id)
if not result["success"]:
await gacha_matcher.finish(format_user_mention(user_id, user_name) + "" + result["message"])
# 成功抽卡,格式化消息
rarity = result["rarity"]
name = result["name"]
image_url = result["image_url"]
draws_left = result["draws_left"]
unlocked_achievements = result.get("unlocked_achievements", [])
# 构建消息
msg = Message()
# 根据稀有度设置不同的消息样式
if rarity == "SSR":
msg.append(f"🌟✨ 恭喜 {format_user_mention(user_id, user_name)} ✨🌟\n")
msg.append(f"🎊 抽到了 SSR 式神:{name} 🎊\n")
msg.append(f"💫 真是太幸运了!💫")
elif rarity == "SP":
msg.append(f"🌈🎆 恭喜 {format_user_mention(user_id, user_name)} 🎆🌈\n")
msg.append(f"🎉 抽到了 SP 式神:{name} 🎉\n")
msg.append(f"🔥 这是传说中的SP🔥")
elif rarity == "SR":
msg.append(f"⭐ 恭喜 {format_user_mention(user_id, user_name)}\n")
msg.append(f"✨ 抽到了 SR 式神:{name}")
else: # R
msg.append(f"🍀 {format_user_mention(user_id, user_name)} 🍀\n")
msg.append(f"📜 抽到了 R 式神:{name}")
# 添加图片
if image_url and os.path.exists(image_url):
msg.append(MessageSegment.image(f"file:///{get_image_path(image_url)}"))
# 添加成就通知
if unlocked_achievements:
msg.append("\n\n🏆 恭喜解锁新成就!\n")
has_manual_rewards = False
for achievement_id in unlocked_achievements:
# 尝试自动发放成就奖励
auto_success, reward_msg = await process_achievement_reward(user_id, achievement_id)
# 检查是否是重复奖励
if "_repeat_" in achievement_id:
base_achievement_id = achievement_id.split("_repeat_")[0]
achievement_config = config.ACHIEVEMENTS.get(base_achievement_id)
if achievement_config:
achievement_name = achievement_config["name"]
# 使用重复奖励或默认为天卡
reward = achievement_config.get("repeat_reward", "天卡")
status = "✅ 已自动发放" if auto_success else "⚠️ 需手动领取"
msg.append(f"🎖️ {achievement_name} 重复奖励 (奖励:{reward}) {status}\n")
else:
msg.append(f"🎖️ {achievement_id}\n")
else:
achievement_config = config.ACHIEVEMENTS.get(achievement_id)
if achievement_config:
achievement_name = achievement_config["name"]
reward = achievement_config["reward"]
status = "✅ 已自动发放" if auto_success else "⚠️ 需手动领取"
msg.append(f"🎖️ {achievement_name} (奖励:{reward}) {status}\n")
else:
msg.append(f"🎖️ {achievement_id}\n")
# 记录是否有需要手动领取的奖励
if not auto_success:
has_manual_rewards = True
# 如果有未自动发放的奖励,提示联系管理员
if has_manual_rewards:
msg.append("💰 未自动发放的奖励请联系管理员\n")
# 添加成就进度提示
achievement_data = gacha_system.get_user_achievements(user_id)
if achievement_data["success"]:
progress = achievement_data["progress"]
consecutive_days = progress.get("consecutive_days", 0)
no_ssr_streak = progress.get("no_ssr_streak", 0)
msg.append("\n📈 成就进度:\n")
# 连续抽卡天数进度
if consecutive_days > 0:
if consecutive_days < 30:
msg.append(f"📅 勤勤恳恳Ⅰ:{consecutive_days}/30天 🎯\n")
elif consecutive_days < 60:
msg.append(f"📅 勤勤恳恳Ⅱ:{consecutive_days}/60天 🎯\n")
elif consecutive_days < 90:
msg.append(f"📅 勤勤恳恳Ⅲ:{consecutive_days}/90天 🎯\n")
elif consecutive_days < 120:
msg.append(f"📅 勤勤恳恳Ⅳ:{consecutive_days}/120天 ⭐\n")
elif consecutive_days < 150:
msg.append(f"📅 勤勤恳恳Ⅴ:{consecutive_days}/150天 ⭐\n")
else:
next_reward_days = 30 - (consecutive_days % 30)
if next_reward_days == 30:
next_reward_days = 0
if next_reward_days > 0:
msg.append(f"📅 勤勤恳恳Ⅴ (满级){consecutive_days}天,距离下次奖励{next_reward_days}天 🎁\n")
else:
msg.append(f"📅 勤勤恳恳Ⅴ (满级){consecutive_days}天,可获得奖励!🎉\n")
# 无SSR/SP连击进度
if no_ssr_streak > 0:
if no_ssr_streak < 60:
msg.append(f"💔 非酋进度:{no_ssr_streak}/60次 😭\n")
elif no_ssr_streak < 120:
msg.append(f"💔 顶级非酋:{no_ssr_streak}/120次 😱\n")
elif no_ssr_streak < 180:
msg.append(f"💔 月见黑:{no_ssr_streak}/180次 🌙\n")
else:
msg.append(f"💔 已达月见黑级别:{no_ssr_streak}次 🌚\n")
# 添加剩余次数和概率信息
msg.append(f"\n📊 今日剩余抽卡次数:{draws_left}/{DAILY_LIMIT}\n")
msg.append(gacha_system.get_probability_text())
# 如果抽到了SSR或SP处理奖励发放
if rarity in ["SSR", "SP"]:
# 尝试自动发放奖励
auto_success, reward_msg = await process_ssr_sp_reward(user_id)
msg.append(f"\n\n{reward_msg}")
# 通知管理员好友
admin_id = 2185330092
notify_msg = Message()
if auto_success:
notify_msg.append(f"用户 {user_name}({user_id}) 抽中了{rarity}稀有度式神: {name}! 已自动发放奖励!")
else:
notify_msg.append(f"用户 {user_name}({user_id}) 抽中了{rarity}稀有度式神: {name}! 需要手动发放奖励!")
await bot.send_private_msg(user_id=admin_id, message=notify_msg)
else:
msg.append(f"\n\n抽中SSR或SP时可获得蛋定助手天卡一张哦~~")
await gacha_matcher.finish(msg)
async def notify_admin(bot: Bot, message: str):
"""通知管理员"""
admin_id = 2185330092
try:
await bot.send_private_msg(user_id=admin_id, message=message)
except Exception as e:
pass # 忽略通知失败的错误
# 注册查询命令,添加权限检查规则
stats_matcher = on_command("我的抽卡", aliases=set(STATS_COMMANDS), priority=5, rule=check_permission())
# 注册今日统计命令
daily_stats_matcher = on_command("今日抽卡", aliases=set(DAILY_STATS_COMMANDS), priority=5, rule=check_permission())
# 注册三连抽命令
triple_gacha_matcher = on_command("三连抽", aliases=set(TRIPLE_GACHA_COMMANDS), priority=5, rule=check_permission())
# 注册成就查询命令
achievement_matcher = on_command("查询成就", aliases=set(ACHIEVEMENT_COMMANDS), priority=5, rule=check_permission())
@stats_matcher.handle()
async def handle_stats(bot: Bot, event: MessageEvent, state: T_State):
user_id = str(event.user_id)
user_name = event.sender.card if isinstance(event, GroupMessageEvent) else event.sender.nickname
# 获取用户统计
stats = gacha_system.get_user_stats(user_id)
if not stats["success"]:
await stats_matcher.finish(format_user_mention(user_id, user_name) + " " + stats["message"])
# 构建消息
msg = Message()
msg.append(f"📊 {format_user_mention(user_id, user_name)} 的抽卡统计:\n\n")
msg.append(f"🎲 总抽卡次数:{stats['total_draws']}\n\n")
# 稀有度统计
msg.append("🎯 稀有度分布:\n")
msg.append(f"📜 R{stats['R_count']}张 ({stats['R_count']/stats['total_draws']*100:.1f}%)\n")
msg.append(f"⭐ SR{stats['SR_count']}张 ({stats['SR_count']/stats['total_draws']*100:.1f}%)\n")
msg.append(f"🌟 SSR{stats['SSR_count']}张 ({stats['SSR_count']/stats['total_draws']*100:.1f}%)\n")
msg.append(f"🌈 SP{stats['SP_count']}张 ({stats['SP_count']/stats['total_draws']*100:.1f}%)\n")
# 添加最近抽卡记录
if stats["recent_draws"]:
msg.append("\n🕐 最近抽卡记录:\n")
for draw in reversed(stats["recent_draws"]):
# 根据稀有度添加emoji
if draw['rarity'] == "SSR":
emoji = "🌟"
elif draw['rarity'] == "SP":
emoji = "🌈"
elif draw['rarity'] == "SR":
emoji = ""
else:
emoji = "📜"
msg.append(f"{emoji} {draw['rarity']} - {draw['name']} ({draw['date']})\n")
await stats_matcher.finish(msg)
@triple_gacha_matcher.handle()
async def handle_triple_gacha(bot: Bot, event: MessageEvent, state: T_State):
"""处理三连抽命令"""
user_id = str(event.user_id)
user_name = event.sender.card or event.sender.nickname or "未知用户"
# 执行三连抽
result = gacha_system.triple_draw(user_id)
if not result["success"]:
await triple_gacha_matcher.finish(f"{result['message']}")
# 构建三连抽结果消息
msg = Message()
msg.append(f"🎯 {format_user_mention(user_id, user_name)} 的三连抽结果:\n\n")
# 显示每次抽卡结果
for i, draw_result in enumerate(result["results"], 1):
rarity = draw_result["rarity"]
name = draw_result["name"]
# 根据稀有度添加emoji
if rarity == "SSR":
msg.append(f"🌟 第{i}SSR - {name}\n")
elif rarity == "SP":
msg.append(f"🌈 第{i}SP - {name}\n")
elif rarity == "SR":
msg.append(f"⭐ 第{i}SR - {name}\n")
else: # R
msg.append(f"📜 第{i}R - {name}\n")
# 统计结果
ssr_count = sum(1 for r in result["results"] if r["rarity"] in ["SSR", "SP"])
sr_count = sum(1 for r in result["results"] if r["rarity"] == "SR")
r_count = sum(1 for r in result["results"] if r["rarity"] == "R")
msg.append(f"\n📈 本次三连抽统计:\n")
if ssr_count > 0:
msg.append(f"🎊 SSR/SP{ssr_count}\n")
if sr_count > 0:
msg.append(f"✨ SR{sr_count}\n")
if r_count > 0:
msg.append(f"📜 R{r_count}\n")
# 添加成就通知
unlocked_achievements = result.get("unlocked_achievements", [])
if unlocked_achievements:
msg.append("\n🏆 恭喜解锁新成就!\n")
has_manual_rewards = False
for achievement_id in unlocked_achievements:
# 尝试自动发放成就奖励
auto_success, reward_msg = await process_achievement_reward(user_id, achievement_id)
# 检查是否是重复奖励
if "_repeat_" in achievement_id:
base_achievement_id = achievement_id.split("_repeat_")[0]
achievement_config = config.ACHIEVEMENTS.get(base_achievement_id)
if achievement_config:
achievement_name = achievement_config["name"]
# 使用重复奖励或默认为天卡
reward = achievement_config.get("repeat_reward", "天卡")
status = "✅ 已自动发放" if auto_success else "⚠️ 需手动领取"
msg.append(f"🎖️ {achievement_name} 重复奖励 (奖励:{reward}) {status}\n")
else:
msg.append(f"🎖️ {achievement_id}\n")
else:
achievement_config = config.ACHIEVEMENTS.get(achievement_id)
if achievement_config:
achievement_name = achievement_config["name"]
reward = achievement_config["reward"]
status = "✅ 已自动发放" if auto_success else "⚠️ 需手动领取"
msg.append(f"🎖️ {achievement_name} (奖励:{reward}) {status}\n")
else:
msg.append(f"🎖️ {achievement_id}\n")
# 记录是否有需要手动领取的奖励
if not auto_success:
has_manual_rewards = True
# 如果有未自动发放的奖励,提示联系管理员
if has_manual_rewards:
msg.append("💰 未自动发放的奖励请联系管理员\n")
# 添加成就进度提示
achievement_data = gacha_system.get_user_achievements(user_id)
if achievement_data["success"]:
progress = achievement_data["progress"]
consecutive_days = progress.get("consecutive_days", 0)
no_ssr_streak = progress.get("no_ssr_streak", 0)
msg.append("\n📈 成就进度:\n")
# 连续抽卡天数进度
if consecutive_days > 0:
if consecutive_days < 30:
msg.append(f"📅 勤勤恳恳Ⅰ:{consecutive_days}/30天 🎯\n")
elif consecutive_days < 60:
msg.append(f"📅 勤勤恳恳Ⅱ:{consecutive_days}/60天 🎯\n")
elif consecutive_days < 90:
msg.append(f"📅 勤勤恳恳Ⅲ:{consecutive_days}/90天 🎯\n")
elif consecutive_days < 120:
msg.append(f"📅 勤勤恳恳Ⅳ:{consecutive_days}/120天 ⭐\n")
elif consecutive_days < 150:
msg.append(f"📅 勤勤恳恳Ⅴ:{consecutive_days}/150天 ⭐\n")
else:
next_reward_days = 30 - (consecutive_days % 30)
if next_reward_days == 30:
next_reward_days = 0
if next_reward_days > 0:
msg.append(f"📅 勤勤恳恳Ⅴ (满级){consecutive_days}天,距离下次奖励{next_reward_days}天 🎁\n")
else:
msg.append(f"📅 勤勤恳恳Ⅴ (满级){consecutive_days}天,可获得奖励!🎉\n")
# 无SSR/SP连击进度
if no_ssr_streak > 0:
if no_ssr_streak < 60:
msg.append(f"💔 非酋进度:{no_ssr_streak}/60次 😭\n")
elif no_ssr_streak < 120:
msg.append(f"💔 顶级非酋:{no_ssr_streak}/120次 😱\n")
elif no_ssr_streak < 180:
msg.append(f"💔 月见黑:{no_ssr_streak}/180次 🌙\n")
else:
msg.append(f"💔 已达月见黑级别:{no_ssr_streak}次 🌚\n")
# 添加剩余次数
draws_left = result["draws_left"]
msg.append(f"\n📊 今日剩余抽卡次数:{draws_left}/{DAILY_LIMIT}")
# 如果抽到SSR/SP处理奖励发放
if ssr_count > 0:
# 为每张SSR/SP处理奖励
auto_rewards = 0
manual_rewards = 0
# 这里简化处理,只处理一次奖励(因为每次抽卡都是独立的)
# 如果需要为每张SSR/SP都发放奖励可以循环处理
auto_success, reward_msg = await process_ssr_sp_reward(user_id)
if auto_success:
auto_rewards += 1
else:
manual_rewards += 1
msg.append(f"\n\n{reward_msg}")
# 通知管理员
admin_msg = f"🎉 用户 {user_name}({user_id}) 在三连抽中抽到了 {ssr_count} 张 SSR/SP"
if auto_rewards > 0:
admin_msg += f" 已自动发放 {auto_rewards} 张奖励!"
if manual_rewards > 0:
admin_msg += f" 需要手动发放 {manual_rewards} 张奖励!"
await notify_admin(bot, admin_msg)
await triple_gacha_matcher.finish(msg)
@achievement_matcher.handle()
async def handle_achievement(bot: Bot, event: MessageEvent, state: T_State):
"""处理成就查询命令"""
user_id = str(event.user_id)
user_name = event.sender.card or event.sender.nickname or "未知用户"
# 获取用户成就信息
result = gacha_system.get_user_achievements(user_id)
if not result["success"]:
await achievement_matcher.finish(f"{result['message']}")
# 构建成就消息
msg = Message()
msg.append(f"🏆 {format_user_mention(user_id, user_name)} 的成就信息:\n\n")
# 显示已解锁成就
unlocked = result["achievements"]
if unlocked:
msg.append("🎖️ 已解锁成就:\n")
for achievement in unlocked:
# 检查是否是重复奖励
if "_repeat_" in achievement:
base_achievement_id = achievement.split("_repeat_")[0]
achievement_config = config.ACHIEVEMENTS.get(base_achievement_id)
if achievement_config:
achievement_name = achievement_config["name"]
reward = achievement_config.get("repeat_reward", "天卡")
msg.append(f"{achievement_name} 重复奖励 (奖励:{reward})\n")
else:
msg.append(f"{achievement}\n")
else:
achievement_config = config.ACHIEVEMENTS.get(achievement)
if achievement_config:
achievement_name = achievement_config["name"]
reward = achievement_config["reward"]
msg.append(f"{achievement_name} (奖励:{reward})\n")
else:
msg.append(f"{achievement}\n")
msg.append("\n💰 获取奖励请联系管理员\n\n")
# 显示成就进度
progress = result["progress"]
msg.append("📊 成就进度:\n")
# 连续抽卡天数 - 勤勤恳恳系列成就
consecutive_days = progress.get("consecutive_days", 0)
if consecutive_days > 0:
# 判断当前应该显示哪个等级的进度
if consecutive_days < 30:
msg.append(f"📅 勤勤恳恳Ⅰ:{consecutive_days}/30 天 (奖励:天卡)\n")
elif consecutive_days < 60:
msg.append(f"📅 勤勤恳恳Ⅱ:{consecutive_days}/60 天 (奖励:天卡)\n")
elif consecutive_days < 90:
msg.append(f"📅 勤勤恳恳Ⅲ:{consecutive_days}/90 天 (奖励:天卡)\n")
elif consecutive_days < 120:
msg.append(f"📅 勤勤恳恳Ⅳ:{consecutive_days}/120 天 (奖励:周卡)\n")
elif consecutive_days < 150:
msg.append(f"📅 勤勤恳恳Ⅴ:{consecutive_days}/150 天 (奖励:周卡)\n")
else:
# 已达到最高等级,显示下次奖励进度
next_reward_days = 30 - (consecutive_days % 30)
if next_reward_days == 30:
next_reward_days = 0
msg.append(f"📅 勤勤恳恳Ⅴ (已满级){consecutive_days}\n")
if next_reward_days > 0:
msg.append(f"🎁 距离下次奖励:{next_reward_days} 天 (奖励:天卡)\n")
else:
msg.append(f"🎁 可获得奖励!请联系管理员 (奖励:天卡)\n")
# 无SSR/SP连击数
no_ssr_streak = progress.get("no_ssr_streak", 0)
if no_ssr_streak > 0:
msg.append(f"💔 无SSR/SP连击{no_ssr_streak}\n")
# 显示各个非酋成就的进度
if no_ssr_streak < 60:
msg.append(f" 🎯 非酋成就:{no_ssr_streak}/60 (奖励:天卡)\n")
elif no_ssr_streak < 120:
msg.append(f" 🎯 顶级非酋成就:{no_ssr_streak}/120 (奖励:周卡)\n")
elif no_ssr_streak < 180:
msg.append(f" 🎯 月见黑成就:{no_ssr_streak}/180 (奖励:月卡)\n")
else:
msg.append(f" 🌙 已达到月见黑级别!\n")
# 如果没有任何进度,显示提示
if consecutive_days == 0 and no_ssr_streak == 0:
msg.append("🌱 还没有任何成就进度,快去抽卡吧!")
await achievement_matcher.finish(msg)
# 注册查询抽卡指令,支持@用户查询功能
query_matcher = on_command("查询抽卡", aliases={"查询抽奖"}, priority=5, rule=check_permission())
@query_matcher.handle()
async def handle_query(bot: Bot, event: MessageEvent, state: T_State):
# 获取消息中的@用户
message = event.get_message()
at_segment = None
for segment in message:
if segment.type == "at":
at_segment = segment
break
# 确定查询的用户ID
if at_segment:
# 查询被@的用户
target_user_id = str(at_segment.data.get("qq", ""))
# 获取被@用户的信息
if isinstance(event, GroupMessageEvent):
try:
group_id = event.group_id
user_info = await bot.get_group_member_info(group_id=group_id, user_id=int(target_user_id))
target_user_name = user_info.get("card") or user_info.get("nickname", "用户")
except:
target_user_name = "用户"
else:
target_user_name = "用户"
else:
# 查询自己
target_user_id = str(event.user_id)
target_user_name = event.sender.card if isinstance(event, GroupMessageEvent) else event.sender.nickname
# 获取用户统计
stats = gacha_system.get_user_stats(target_user_id)
# 构建响应消息
msg = Message()
# 如果查询的是他人
if target_user_id != str(event.user_id):
msg.append(format_user_mention(str(event.user_id), event.sender.card if isinstance(event, GroupMessageEvent) else event.sender.nickname))
msg.append(f" 查询了 ")
msg.append(format_user_mention(target_user_id, target_user_name))
msg.append(f" 的抽卡记录\n\n")
else:
msg.append(format_user_mention(target_user_id, target_user_name) + "\n")
if not stats["success"]:
msg.append(f"该用户还没有抽卡记录哦!")
await query_matcher.finish(msg)
# 构建统计信息
msg.append(f"总抽卡次数:{stats['total_draws']}\n")
msg.append(f"R卡数量{stats['R_count']}\n")
msg.append(f"SR卡数量{stats['SR_count']}\n")
msg.append(f"SSR卡数量{stats['SSR_count']}\n")
msg.append(f"SP卡数量{stats['SP_count']}\n")
# 计算每种稀有度的比例
if stats['total_draws'] > 0:
msg.append("\n稀有度比例:\n")
msg.append(f"R: {stats['R_count']/stats['total_draws']*100:.2f}%\n")
msg.append(f"SR: {stats['SR_count']/stats['total_draws']*100:.2f}%\n")
msg.append(f"SSR: {stats['SSR_count']/stats['total_draws']*100:.2f}%\n")
msg.append(f"SP: {stats['SP_count']/stats['total_draws']*100:.2f}%\n")
# 添加最近抽卡记录
if stats["recent_draws"]:
msg.append("\n最近5次抽卡记录\n")
for draw in reversed(stats["recent_draws"]):
msg.append(f"{draw['date']}: {draw['rarity']} - {draw['name']}\n")
await query_matcher.finish(msg)
# 自定义排行榜权限检查(仅检查白名单,不检查抽卡次数)
def check_rank_permission() -> Rule:
async def _checker(event: MessageEvent) -> bool:
# 允许特定用户在任何场景下使用
if event.user_id == ALLOWED_USER_ID:
return True
# 在允许的群聊中任何人都可以使用
if isinstance(event, GroupMessageEvent) and event.group_id == ALLOWED_GROUP_ID:
return True
return False
return Rule(_checker)
rank_matcher = on_startswith(("抽卡排行","抽卡榜"), priority=1, rule=check_rank_permission())
@rank_matcher.handle()
async def handle_rank(bot: Bot, event: MessageEvent, state: T_State):
# 获取排行榜数据
rank_data = gacha_system.get_rank_list()
if not rank_data:
await rank_matcher.finish("暂无抽卡排行榜数据")
# 构建消息
msg = Message("✨┈┈┈┈┈ 抽卡排行榜 ┈┈┈┈┈✨\n")
msg.append("🏆 SSR/SP稀有度式神排行榜 🏆\n")
msg.append("┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈\n")
for i, (user_id, data) in enumerate(rank_data[:10], 1):
# 获取用户昵称
user_name = "未知用户"
try:
if isinstance(event, GroupMessageEvent):
# 群聊场景获取群名片或昵称
user_info = await bot.get_group_member_info(group_id=event.group_id, user_id=int(user_id))
user_name = user_info.get("card") or user_info.get("nickname", "未知用户")
else:
# 私聊场景获取昵称
user_info = await bot.get_stranger_info(user_id=int(user_id))
user_name = user_info.get("nickname", "未知用户")
except Exception as e:
# 如果获取失败,尝试从事件中获取发送者信息
if str(user_id) == str(event.user_id):
user_name = event.sender.card if isinstance(event, GroupMessageEvent) else event.sender.nickname
# 美化输出格式
rank_icon = "🥇" if i == 1 else "🥈" if i == 2 else "🥉" if i == 3 else f"{i}."
ssr_icon = "🌟"
sp_icon = "💫"
total = data['SSR_count'] + data['SP_count']
msg.append(f"{rank_icon} {user_name}\n")
msg.append(f" {ssr_icon}SSR: {data['SSR_count']}{sp_icon}SP: {data['SP_count']}\n")
msg.append(f" 🔮总计: {total}\n")
msg.append("┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈\n")
await rank_matcher.finish(msg)
@daily_stats_matcher.handle()
async def handle_daily_stats(bot: Bot, event: MessageEvent, state: T_State):
"""处理今日抽卡统计命令"""
result = gacha_system.get_daily_stats()
if not result["success"]:
await daily_stats_matcher.finish(f"{result['message']}")
stats = result["stats"]
date = result["date"]
# 构建统计消息
msg = Message()
msg.append(f"📊 今日抽卡统计 ({date})\n\n")
msg.append(f"👥 参与人数:{stats['total_users']}\n")
msg.append(f"🎲 总抽卡次数:{stats['total_draws']}\n\n")
# 稀有度分布
msg.append("🎯 稀有度分布:\n")
if stats['total_draws'] > 0:
msg.append(f"📜 R{stats['rarity_stats']['R']}张 ({stats['rarity_stats']['R']/stats['total_draws']*100:.1f}%)\n")
msg.append(f"⭐ SR{stats['rarity_stats']['SR']}张 ({stats['rarity_stats']['SR']/stats['total_draws']*100:.1f}%)\n")
msg.append(f"🌟 SSR{stats['rarity_stats']['SSR']}张 ({stats['rarity_stats']['SSR']/stats['total_draws']*100:.1f}%)\n")
msg.append(f"🌈 SP{stats['rarity_stats']['SP']}张 ({stats['rarity_stats']['SP']/stats['total_draws']*100:.1f}%)\n\n")
else:
msg.append("暂无数据\n\n")
# SSR/SP排行榜
if stats['top_users']:
msg.append("🏆 今日SSR/SP排行榜\n")
for i, user_data in enumerate(stats['top_users'][:5], 1):
user_id = user_data['user_id']
ssr_count = user_data['ssr_count']
# 尝试获取用户昵称
try:
user_info = await bot.get_stranger_info(user_id=int(user_id))
user_name = user_info.get('nickname', f'用户{user_id}')
except:
user_name = f'用户{user_id}'
if i == 1:
msg.append(f"🥇 {user_name}{ssr_count}\n")
elif i == 2:
msg.append(f"🥈 {user_name}{ssr_count}\n")
elif i == 3:
msg.append(f"🥉 {user_name}{ssr_count}\n")
else:
msg.append(f"🏅 {user_name}{ssr_count}\n")
else:
msg.append("🏆 今日还没有人抽到SSR/SP哦~")
await daily_stats_matcher.finish(msg)
# 抽卡介绍命令
intro_matcher = on_command("抽卡介绍", aliases=set(INTRO_COMMANDS), priority=5, rule=check_permission())
@intro_matcher.handle()
async def handle_intro(bot: Bot, event: MessageEvent, state: T_State):
"""处理抽卡介绍命令"""
# 构建介绍消息
msg = "🎮 阴阳师抽卡系统介绍 🎮\n\n"
# 抽卡机制
msg += "📋 抽卡机制:\n"
msg += f"• 每日限制:{DAILY_LIMIT}次免费抽卡\n"
msg += "• 稀有度概率:\n"
for rarity, prob in config.RARITY_PROBABILITY.items():
msg += f" - {rarity}: {prob}%\n"
msg += "\n"
# 可用指令
msg += "🎯 可用指令:\n"
msg += f"• 抽卡:{', '.join(GACHA_COMMANDS[:3])}\n"
msg += f"• 三连抽:{', '.join(TRIPLE_GACHA_COMMANDS)}\n"
msg += f"• 个人统计:{', '.join(STATS_COMMANDS[:2])}\n"
msg += f"• 今日统计:{', '.join(DAILY_STATS_COMMANDS[:2])}\n"
msg += f"• 查询成就:{', '.join(ACHIEVEMENT_COMMANDS)}\n"
msg += "• 查询抽卡/查询抽奖:查询自己的或@他人的抽卡数据\n"
msg += "• 抽卡排行/抽卡榜查看SSR/SP排行榜\n"
msg += "\n"
# 成就系统
msg += "🏆 成就系统:\n"
msg += "\n📅 勤勤恳恳系列(连续抽卡):\n"
consecutive_achievements = [
("勤勤恳恳Ⅰ", "30天", "天卡"),
("勤勤恳恳Ⅱ", "60天", "天卡"),
("勤勤恳恳Ⅲ", "90天", "天卡"),
("勤勤恳恳Ⅳ", "120天", "周卡"),
("勤勤恳恳Ⅴ", "150天", "周卡")
]
for name, days, reward in consecutive_achievements:
msg += f"{name}:连续{days}{reward} 💎\n"
msg += " ※ 达到最高等级后每30天可重复获得天卡奖励\n\n"
msg += "😭 非酋系列无SSR/SP连击\n"
no_ssr_achievements = [
("非酋", "60次", "天卡"),
("顶级非酋", "120次", "周卡"),
("月见黑", "180次", "月卡")
]
for name, count, reward in no_ssr_achievements:
msg += f"{name}:连续{count}未中SSR/SP → {reward} 💎\n"
msg += "\n"
# 奖励说明
msg += "🎁 奖励说明:\n"
msg += "• 天卡:蛋定助手天卡奖励\n"
msg += "• 周卡:蛋定助手周卡奖励\n"
msg += "• 月卡:蛋定助手月卡奖励\n"
msg += "\n"
# 联系管理员
msg += "📞 重要提醒:\n"
msg += "🔸 所有奖励需要联系管理员获取 🔸\n"
msg += "请在获得成就后主动联系管理员领取奖励哦~ 😊\n\n"
# 祝福语
msg += "🍀 祝您抽卡愉快,欧气满满! ✨"
await intro_matcher.finish(msg)
# 导入 Web API 模块,使其路由能够注册到 NoneBot 的 FastAPI 应用
from . import web_api
# 注册 Web 路由
try:
web_api.register_web_routes()
except Exception as e:
print(f"❌ 注册 onmyoji_gacha Web 路由失败: {e}")

View File

@@ -0,0 +1,216 @@
import requests
import json
from typing import Dict, Optional, Tuple
from nonebot import logger
from .config import Config
def mask_username(username: str) -> str:
"""
对用户名进行脱敏处理,只显示前两位和后两位,中间用*号隐藏
Args:
username: 原始用户名
Returns:
脱敏后的用户名
"""
if not username:
return username
# 如果用户名长度小于等于4直接显示前两位和后两位可能重叠
if len(username) <= 4:
return username
# 显示前两位和后两位,中间用*号填充
return f"{username[:2]}{'*' * (len(username) - 4)}{username[-2:]}"
# 获取配置
config = Config()
# API 端点配置
DD_API_HOST = "https://api.danding.vip/DD/" # 蛋定服务器连接地址
BOT_TOKEN = "3340e353a49447f1be640543cbdcd937" # 对接服务器的Token
BOT_USER_ID = "1424473282" # 机器人用户ID
async def query_qq_binding(qq: str) -> Tuple[bool, Optional[str], Optional[str]]:
"""
查询QQ号是否绑定了蛋定用户名
Args:
qq: 要查询的QQ号
Returns:
Tuple[是否绑定, 用户名, VIP到期时间]
"""
try:
url = f"{DD_API_HOST}query_qq_binding"
data = {"qq": qq}
response = requests.post(url=url, json=data)
logger.debug(f"查询QQ绑定状态响应: {response}")
if response.status_code != 200:
logger.error(f"查询QQ绑定状态失败状态码: {response.status_code}")
return False, None, None
result = response.json()
logger.debug(f"查询QQ绑定状态结果: {result}")
if result.get("code") == 200:
data = result.get("data", {})
is_bound = data.get("is_bound", False)
if is_bound:
username = data.get("username")
vip_time = data.get("vip_time")
return True, username, vip_time
else:
return False, None, None
else:
logger.error(f"查询QQ绑定状态失败错误信息: {result.get('message')}")
return False, None, None
except Exception as e:
logger.error(f"查询QQ绑定状态异常: {str(e)}")
return False, None, None
async def add_user_viptime(username: str, time_class: str = "Day") -> Tuple[bool, str]:
"""
为用户添加VIP时间
Args:
username: 蛋定用户名
time_class: 时间类型 (Hour/Day/Week/Month/Season/Year)
Returns:
Tuple[是否成功, 响应消息]
"""
try:
url = f"{DD_API_HOST}bot_add_user_viptime"
data = {
"user": BOT_USER_ID,
"token": BOT_TOKEN,
"username": username,
"classes": time_class
}
response = requests.post(url=url, json=data)
logger.debug(f"添加VIP时间响应: {response}")
if response.status_code != 200:
error_msg = f"添加VIP时间失败状态码: {response.status_code}"
logger.error(error_msg)
return False, error_msg
result = response.json()
logger.debug(f"添加VIP时间结果: {result}")
if result.get("code") == 200:
return True, result.get("msg", "添加VIP时间成功")
else:
error_msg = result.get("msg", "添加VIP时间失败")
logger.error(f"添加VIP时间失败: {error_msg}")
return False, error_msg
except Exception as e:
error_msg = f"添加VIP时间异常: {str(e)}"
logger.error(error_msg)
return False, error_msg
async def process_ssr_sp_reward(user_id: str) -> Tuple[bool, str]:
"""
处理SSR/SP奖励发放
Args:
user_id: QQ用户ID
Returns:
Tuple[是否自动发放成功, 消息内容]
"""
# 查询QQ绑定状态
is_bound, username, vip_time = await query_qq_binding(user_id)
if not is_bound:
# 用户未绑定,返回提示信息
msg = (f"🎉恭喜您抽中了SSR/SP稀有度式神🎉\n"
f"获得奖励:蛋定助手天卡一张\n"
f"获取奖励请联系管理员或前往蛋定云服务中绑定QQ号即可体验自动加时")
return False, msg
else:
# 用户已绑定,自动加时
success, message = await add_user_viptime(username, "Day")
if success:
masked_username = mask_username(username)
msg = (f"🎉恭喜您抽中了SSR/SP稀有度式神🎉\n"
f"🎁已自动为您的蛋定账号({masked_username})添加天卡时长!\n"
f"📅到期时间: {message.split('到期时间为:')[-1] if '到期时间为:' in message else '请查看您的账号详情'}")
return True, msg
else:
# 自动加时失败,返回错误信息和手动领取提示
msg = (f"🎉恭喜您抽中了SSR/SP稀有度式神🎉\n"
f"获得奖励:蛋定助手天卡一张\n"
f"⚠️自动加时失败: {message}\n"
f"请联系管理员手动领取奖励!")
return False, msg
async def process_achievement_reward(user_id: str, achievement_id: str) -> Tuple[bool, str]:
"""
处理成就奖励发放
Args:
user_id: QQ用户ID
achievement_id: 成就ID
Returns:
Tuple[是否自动发放成功, 消息内容]
"""
# 获取成就配置
achievement_config = config.ACHIEVEMENTS.get(achievement_id)
if not achievement_config:
# 检查是否是重复奖励
if "_repeat_" in achievement_id:
base_achievement_id = achievement_id.split("_repeat_")[0]
base_config = config.ACHIEVEMENTS.get(base_achievement_id)
if base_config:
reward_type = base_config.get("repeat_reward", "天卡")
else:
reward_type = "天卡"
else:
return False, f"未找到成就配置: {achievement_id}"
else:
reward_type = achievement_config.get("reward", "天卡")
# 查询QQ绑定状态
is_bound, username, vip_time = await query_qq_binding(user_id)
if not is_bound:
# 用户未绑定,返回提示信息
msg = (f"🏆 恭喜解锁成就奖励!\n"
f"获得奖励:蛋定助手{reward_type}一张\n"
f"获取奖励请联系管理员或前往蛋定云服务中绑定QQ号即可体验自动加时")
return False, msg
else:
# 用户已绑定,自动加时
# 将奖励类型转换为API需要的时间类型
time_class = "Day" # 默认为天卡
if "周卡" in reward_type:
time_class = "Week"
elif "月卡" in reward_type:
time_class = "Month"
success, message = await add_user_viptime(username, time_class)
if success:
masked_username = mask_username(username)
msg = (f"🏆 恭喜解锁成就奖励!\n"
f"🎁已自动为您的蛋定账号({masked_username})添加{reward_type}时长!\n"
f"📅到期时间: {message.split('到期时间为:')[-1] if '到期时间为:' in message else '请查看您的账号详情'}")
return True, msg
else:
# 自动加时失败,返回错误信息和手动领取提示
msg = (f"🏆 恭喜解锁成就奖励!\n"
f"获得奖励:蛋定助手{reward_type}一张\n"
f"⚠️自动加时失败: {message}\n"
f"请联系管理员手动领取奖励!")
return False, msg

View File

@@ -0,0 +1,118 @@
from pydantic import BaseSettings
import os
class Config(BaseSettings):
# 抽卡概率配置
RARITY_PROBABILITY: dict = {
"R": 78.75,
"SR": 20.0,
"SSR": 1.0,
"SP": 0.25
}
# 每日抽卡限制
DAILY_LIMIT: int = 3
# 数据文件路径
DB_FILE: str = "data/onmyoji_gacha/gacha.db"
DAILY_DRAWS_FILE: str = "data/onmyoji_gacha/daily_draws.json" # 保留用于迁移
USER_STATS_FILE: str = "data/onmyoji_gacha/user_stats.json" # 保留用于迁移
# 式神图片目录
SHIKIGAMI_IMG_DIR: str = "data/chouka/"
# 触发指令
GACHA_COMMANDS: list = ["抽卡","抽奖", "召唤"]
STATS_COMMANDS: list = ["我的抽卡","我的抽奖", "我的图鉴"]
DAILY_STATS_COMMANDS: list = ["今日抽卡", "今日统计", "抽卡统计"]
TRIPLE_GACHA_COMMANDS: list = ["三连", "三连抽"]
ACHIEVEMENT_COMMANDS: list = ["查询成就", "抽卡成就"]
INTRO_COMMANDS: list = ["抽卡介绍", "抽卡说明", "抽卡帮助"]
# 成就系统配置
ACHIEVEMENTS: dict = {
"consecutive_days_30_1": {
"name": "勤勤恳恳Ⅰ",
"description": "连续抽卡30天",
"reward": "天卡",
"threshold": 30,
"type": "consecutive_days",
"level": 1,
"repeatable": True
},
"consecutive_days_30_2": {
"name": "勤勤恳恳Ⅱ",
"description": "连续抽卡60天",
"reward": "天卡",
"threshold": 60,
"type": "consecutive_days",
"level": 2,
"repeatable": True
},
"consecutive_days_30_3": {
"name": "勤勤恳恳Ⅲ",
"description": "连续抽卡90天",
"reward": "天卡",
"threshold": 90,
"type": "consecutive_days",
"level": 3,
"repeatable": True
},
"consecutive_days_30_4": {
"name": "勤勤恳恳Ⅳ",
"description": "连续抽卡120天",
"reward": "周卡",
"threshold": 120,
"type": "consecutive_days",
"level": 4,
"repeatable": True
},
"consecutive_days_30_5": {
"name": "勤勤恳恳Ⅴ",
"description": "连续抽卡150天",
"reward": "周卡",
"threshold": 150,
"type": "consecutive_days",
"level": 5,
"repeatable": True,
"repeat_reward": "天卡"
},
"no_ssr_60": {
"name": "非酋",
"description": "连续60次未抽到SSR/SP",
"reward": "天卡",
"threshold": 60,
"type": "no_ssr_streak"
},
"no_ssr_120": {
"name": "顶级非酋",
"description": "连续120次未抽到SSR/SP",
"reward": "周卡",
"threshold": 120,
"type": "no_ssr_streak"
},
"no_ssr_180": {
"name": "月见黑",
"description": "连续180次未抽到SSR/SP",
"reward": "月卡",
"threshold": 180,
"type": "no_ssr_streak"
}
}
# 权限配置
ALLOWED_GROUP_ID: int = 621016172
ALLOWED_USER_ID: int = 1424473282
# 特殊概率用户配置
SPECIAL_PROBABILITY_USERS: list = ["1424473282"] # 100%抽到SSR或SP的用户列表
# Web后台管理配置
WEB_ADMIN_TOKEN: str = os.getenv("WEB_ADMIN_TOKEN", "onmyoji_admin_token_2024")
WEB_ADMIN_PORT: int = int(os.getenv("WEB_ADMIN_PORT", "8080"))
# 时区
TIMEZONE: str = "Asia/Shanghai"
class Config:
extra = "ignore"

View File

@@ -0,0 +1,616 @@
import os
import json
import sqlite3
import datetime
from typing import Dict, List, Any, Optional
import logging
from pathlib import Path
from .config import Config
# 创建Config实例
config = Config()
class DataManager:
def __init__(self):
# 确保目录存在
os.makedirs(os.path.dirname(config.DB_FILE), exist_ok=True)
# 初始化数据库
self._init_db()
# 加载式神数据
self.shikigami_data = self._load_shikigami_data()
def _init_db(self):
"""初始化数据库"""
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
# 创建式神表
cursor.execute("""
CREATE TABLE IF NOT EXISTS shikigami (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
rarity TEXT NOT NULL,
image_path TEXT NOT NULL
)
""")
# 创建每日抽卡记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS daily_draws (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
user_id TEXT NOT NULL,
rarity TEXT NOT NULL,
shikigami_id INTEGER NOT NULL,
timestamp TEXT NOT NULL,
FOREIGN KEY (shikigami_id) REFERENCES shikigami(id)
)
""")
# 创建用户统计表
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_stats (
user_id TEXT PRIMARY KEY,
total_draws INTEGER DEFAULT 0,
R_count INTEGER DEFAULT 0,
SR_count INTEGER DEFAULT 0,
SSR_count INTEGER DEFAULT 0,
SP_count INTEGER DEFAULT 0
)
""")
# 创建抽卡历史表
cursor.execute("""
CREATE TABLE IF NOT EXISTS draw_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
date TEXT NOT NULL,
rarity TEXT NOT NULL,
shikigami_id INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES user_stats(user_id),
FOREIGN KEY (shikigami_id) REFERENCES shikigami(id)
)
""")
# 创建成就表
cursor.execute("""
CREATE TABLE IF NOT EXISTS achievements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
achievement_id TEXT NOT NULL,
unlocked_date TEXT NOT NULL,
reward_claimed INTEGER DEFAULT 0,
UNIQUE(user_id, achievement_id)
)
""")
# 创建用户成就进度表
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_achievement_progress (
user_id TEXT PRIMARY KEY,
consecutive_days INTEGER DEFAULT 0,
last_draw_date TEXT DEFAULT '',
no_ssr_streak INTEGER DEFAULT 0,
total_consecutive_days INTEGER DEFAULT 0
)
""")
conn.commit()
def update_achievement_progress(self, user_id: str, rarity: str) -> List[str]:
"""更新用户成就进度,返回新解锁的成就列表"""
today = self.get_today_date()
unlocked_achievements = []
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
# 获取或创建用户成就进度
cursor.execute(
"SELECT * FROM user_achievement_progress WHERE user_id = ?",
(user_id,)
)
progress = cursor.fetchone()
if not progress:
cursor.execute(
"INSERT INTO user_achievement_progress (user_id, last_draw_date) VALUES (?, ?)",
(user_id, today)
)
consecutive_days = 1
no_ssr_streak = 1 if rarity not in ["SSR", "SP"] else 0
total_consecutive_days = 1
else:
last_draw_date = progress[2]
consecutive_days = progress[1]
no_ssr_streak = progress[3]
total_consecutive_days = progress[4]
# 更新连续抽卡天数
if last_draw_date != today:
# 检查是否是连续的一天
last_date = datetime.datetime.strptime(last_draw_date, "%Y-%m-%d")
current_date = datetime.datetime.strptime(today, "%Y-%m-%d")
days_diff = (current_date - last_date).days
if days_diff == 1:
consecutive_days += 1
total_consecutive_days += 1
elif days_diff > 1:
consecutive_days = 1
total_consecutive_days += 1
# days_diff == 0 表示今天已经抽过卡了,不更新连续天数
# 更新无SSR连击数
if rarity in ["SSR", "SP"]:
no_ssr_streak = 0
else:
no_ssr_streak += 1
# 更新进度
cursor.execute("""
INSERT OR REPLACE INTO user_achievement_progress
(user_id, consecutive_days, last_draw_date, no_ssr_streak, total_consecutive_days)
VALUES (?, ?, ?, ?, ?)
""", (user_id, consecutive_days, today, no_ssr_streak, total_consecutive_days))
# 检查是否解锁新成就
for achievement_id, achievement_config in config.ACHIEVEMENTS.items():
# 对于可重复获得的成就(勤勤恳恳系列),需要特殊处理
if achievement_config.get("repeatable", False) and achievement_config["type"] == "consecutive_days":
# 检查连续抽卡成就的升级逻辑
if consecutive_days >= achievement_config["threshold"]:
# 检查是否已经解锁过这个等级
cursor.execute(
"SELECT id FROM achievements WHERE user_id = ? AND achievement_id = ?",
(user_id, achievement_id)
)
if not cursor.fetchone():
# 解锁新等级的成就
cursor.execute("""
INSERT INTO achievements (user_id, achievement_id, unlocked_date)
VALUES (?, ?, ?)
""", (user_id, achievement_id, today))
unlocked_achievements.append(achievement_id)
# 如果是最高等级(Ⅴ),检查是否需要给重复奖励
elif achievement_config["level"] == 5 and consecutive_days >= 150:
# 每30天给一次重复奖励
days_over_150 = consecutive_days - 150
if days_over_150 > 0 and days_over_150 % 30 == 0:
# 检查这个重复奖励是否已经给过
repeat_id = f"{achievement_id}_repeat_{days_over_150//30}"
cursor.execute(
"SELECT id FROM achievements WHERE user_id = ? AND achievement_id = ?",
(user_id, repeat_id)
)
if not cursor.fetchone():
cursor.execute("""
INSERT INTO achievements (user_id, achievement_id, unlocked_date)
VALUES (?, ?, ?)
""", (user_id, repeat_id, today))
unlocked_achievements.append(achievement_id)
else:
# 非重复成就的原有逻辑
# 检查是否已经解锁
cursor.execute(
"SELECT id FROM achievements WHERE user_id = ? AND achievement_id = ?",
(user_id, achievement_id)
)
if cursor.fetchone():
continue
# 检查成就条件
unlocked = False
if achievement_config["type"] == "consecutive_days":
if consecutive_days >= achievement_config["threshold"]:
unlocked = True
elif achievement_config["type"] == "no_ssr_streak":
if no_ssr_streak >= achievement_config["threshold"]:
unlocked = True
if unlocked:
cursor.execute("""
INSERT INTO achievements (user_id, achievement_id, unlocked_date)
VALUES (?, ?, ?)
""", (user_id, achievement_id, today))
unlocked_achievements.append(achievement_id)
conn.commit()
return unlocked_achievements
def get_user_achievements(self, user_id: str) -> Dict[str, Any]:
"""获取用户成就信息"""
with sqlite3.connect(config.DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取已解锁的成就
cursor.execute(
"SELECT achievement_id, unlocked_date, reward_claimed FROM achievements WHERE user_id = ?",
(user_id,)
)
unlocked = {row["achievement_id"]: {
"unlocked_date": row["unlocked_date"],
"reward_claimed": bool(row["reward_claimed"])
} for row in cursor.fetchall()}
# 获取进度
cursor.execute(
"SELECT * FROM user_achievement_progress WHERE user_id = ?",
(user_id,)
)
progress_row = cursor.fetchone()
if not progress_row:
progress = {
"consecutive_days": 0,
"no_ssr_streak": 0,
"total_consecutive_days": 0
}
else:
progress = {
"consecutive_days": progress_row["consecutive_days"],
"no_ssr_streak": progress_row["no_ssr_streak"],
"total_consecutive_days": progress_row["total_consecutive_days"]
}
return {
"unlocked": unlocked,
"progress": progress
}
def claim_achievement_reward(self, user_id: str, achievement_id: str) -> bool:
"""领取成就奖励"""
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE achievements
SET reward_claimed = 1
WHERE user_id = ? AND achievement_id = ? AND reward_claimed = 0
""", (user_id, achievement_id))
conn.commit()
return cursor.rowcount > 0
# 迁移现有JSON数据到SQLite
self._migrate_data()
def _migrate_data(self):
"""迁移JSON数据到SQLite"""
try:
# 迁移每日抽卡记录
if os.path.exists(config.DAILY_DRAWS_FILE):
with open(config.DAILY_DRAWS_FILE, 'r', encoding='utf-8') as f:
daily_draws = json.load(f)
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
for date, users in daily_draws.items():
for user_id, draws in users.items():
for draw in draws:
# 查找式神ID
cursor.execute(
"SELECT id FROM shikigami WHERE name=? AND rarity=?",
(draw["name"], draw["rarity"])
)
shikigami_id = cursor.fetchone()
if shikigami_id:
cursor.execute(
"INSERT INTO daily_draws (date, user_id, rarity, shikigami_id, timestamp) VALUES (?, ?, ?, ?, ?)",
(date, user_id, draw["rarity"], shikigami_id[0], draw["timestamp"])
)
conn.commit()
# 迁移用户统计数据
if os.path.exists(config.USER_STATS_FILE):
with open(config.USER_STATS_FILE, 'r', encoding='utf-8') as f:
user_stats = json.load(f)
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
for user_id, stats in user_stats.items():
cursor.execute(
"INSERT OR REPLACE INTO user_stats (user_id, total_draws, R_count, SR_count, SSR_count, SP_count) VALUES (?, ?, ?, ?, ?, ?)",
(user_id, stats["total_draws"], stats["R_count"], stats["SR_count"], stats["SSR_count"], stats["SP_count"])
)
# 迁移抽卡历史
for draw in stats.get("draw_history", []):
cursor.execute(
"SELECT id FROM shikigami WHERE name=? AND rarity=?",
(draw["name"], draw["rarity"])
)
shikigami_id = cursor.fetchone()
if shikigami_id:
cursor.execute(
"INSERT INTO draw_history (user_id, date, rarity, shikigami_id) VALUES (?, ?, ?, ?)",
(user_id, draw["date"], draw["rarity"], shikigami_id[0])
)
conn.commit()
except Exception as e:
logging.error(f"数据迁移失败: {e}")
def _load_shikigami_data(self) -> Dict[str, List[Dict[str, str]]]:
"""加载式神数据到数据库"""
result = {"R": [], "SR": [], "SSR": [], "SP": []}
rarity_dirs = {
"R": "r",
"SR": "sr",
"SSR": "ssr",
"SP": "sp"
}
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
# 清空现有式神数据
cursor.execute("DELETE FROM shikigami")
for rarity, dir_name in rarity_dirs.items():
dir_path = os.path.join(config.SHIKIGAMI_IMG_DIR, dir_name)
if os.path.exists(dir_path):
for file_name in os.listdir(dir_path):
if file_name.endswith(('.png', '.jpg', '.jpeg')):
name = os.path.splitext(file_name)[0]
image_path = os.path.join(dir_path, file_name)
# 插入式神数据
cursor.execute(
"INSERT INTO shikigami (name, rarity, image_path) VALUES (?, ?, ?)",
(name, rarity, image_path)
)
result[rarity].append({
"name": name,
"image_url": image_path
})
conn.commit()
return result
def get_today_date(self) -> str:
"""获取当前日期字符串"""
return datetime.datetime.now().strftime("%Y-%m-%d")
def get_current_time(self) -> str:
"""获取当前时间字符串"""
return datetime.datetime.now().strftime("%H:%M:%S")
def get_daily_draws(self) -> Dict[str, Dict[str, List[Dict[str, str]]]]:
"""获取每日抽卡记录"""
result = {}
today = self.get_today_date()
with sqlite3.connect(config.DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 先查询今日的抽卡记录
cursor.execute("""
SELECT date, user_id, rarity, shikigami_id, timestamp
FROM daily_draws
WHERE date = ?
ORDER BY timestamp
""", (today,))
rows = cursor.fetchall()
# 获取所有涉及的式神ID
shikigami_ids = list(set(row["shikigami_id"] for row in rows))
# 查询式神信息
shikigami_info = {}
if shikigami_ids:
placeholders = ','.join('?' * len(shikigami_ids))
cursor.execute(f"""
SELECT id, name, rarity
FROM shikigami
WHERE id IN ({placeholders})
""", shikigami_ids)
for shikigami_row in cursor.fetchall():
shikigami_info[shikigami_row["id"]] = {
"name": shikigami_row["name"],
"rarity": shikigami_row["rarity"]
}
# 构建结果
for row in rows:
date = row["date"]
user_id = row["user_id"]
shikigami_id = row["shikigami_id"]
if date not in result:
result[date] = {}
if user_id not in result[date]:
result[date][user_id] = []
# 如果找不到式神信息使用daily_draws表中的稀有度和默认名称
if shikigami_id in shikigami_info:
name = shikigami_info[shikigami_id]["name"]
rarity = shikigami_info[shikigami_id]["rarity"]
else:
name = f"式神{shikigami_id}"
rarity = row["rarity"]
result[date][user_id].append({
"rarity": rarity,
"name": name,
"timestamp": row["timestamp"]
})
return result
def save_daily_draws(self, data: Dict[str, Dict[str, List[Dict[str, str]]]]):
"""保存每日抽卡记录"""
# SQLite实现中此方法为空因为记录时直接插入数据库
pass
def get_user_stats(self) -> Dict[str, Dict[str, Any]]:
"""获取用户统计数据"""
result = {}
with sqlite3.connect(config.DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取基础统计
cursor.execute("SELECT * FROM user_stats")
user_stats = cursor.fetchall()
for stat in user_stats:
user_id = stat["user_id"]
result[user_id] = {
"total_draws": stat["total_draws"],
"R_count": stat["R_count"],
"SR_count": stat["SR_count"],
"SSR_count": stat["SSR_count"],
"SP_count": stat["SP_count"],
"draw_history": []
}
# 获取抽卡历史
cursor.execute("""
SELECT draw_history.date, draw_history.rarity, shikigami.name
FROM draw_history
JOIN shikigami ON draw_history.shikigami_id = shikigami.id
WHERE draw_history.user_id = ?
ORDER BY draw_history.date DESC
LIMIT 100
""", (user_id,))
history = cursor.fetchall()
result[user_id]["draw_history"] = [
{
"date": row["date"],
"rarity": row["rarity"],
"name": row["name"]
} for row in history
]
return result
def save_user_stats(self, data: Dict[str, Dict[str, Any]]):
"""保存用户统计数据"""
# SQLite实现中此方法为空因为统计时直接更新数据库
pass
def check_daily_limit(self, user_id: str) -> bool:
"""检查用户是否达到每日抽卡限制"""
today = self.get_today_date()
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*)
FROM daily_draws
WHERE date = ? AND user_id = ?
""", (today, user_id))
count = cursor.fetchone()[0]
return count < config.DAILY_LIMIT
def get_draws_left(self, user_id: str) -> int:
"""获取用户今日剩余抽卡次数"""
today = self.get_today_date()
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*)
FROM daily_draws
WHERE date = ? AND user_id = ?
""", (today, user_id))
count = cursor.fetchone()[0]
return max(0, config.DAILY_LIMIT - count)
def record_draw(self, user_id: str, rarity: str, shikigami_name: str) -> List[str]:
"""记录一次抽卡,返回新解锁的成就列表"""
today = self.get_today_date()
current_time = self.get_current_time()
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
# 获取式神ID
cursor.execute(
"SELECT id FROM shikigami WHERE name = ? AND rarity = ?",
(shikigami_name, rarity)
)
shikigami_id = cursor.fetchone()
if not shikigami_id:
logging.error(f"找不到式神: {shikigami_name} ({rarity})")
return []
shikigami_id = shikigami_id[0]
# 记录每日抽卡
cursor.execute("""
INSERT INTO daily_draws (date, user_id, rarity, shikigami_id, timestamp)
VALUES (?, ?, ?, ?, ?)
""", (today, user_id, rarity, shikigami_id, current_time))
# 更新用户统计
cursor.execute("""
INSERT OR IGNORE INTO user_stats (user_id) VALUES (?)
""", (user_id,))
cursor.execute("""
UPDATE user_stats
SET total_draws = total_draws + 1,
R_count = R_count + ?,
SR_count = SR_count + ?,
SSR_count = SSR_count + ?,
SP_count = SP_count + ?
WHERE user_id = ?
""", (
1 if rarity == "R" else 0,
1 if rarity == "SR" else 0,
1 if rarity == "SSR" else 0,
1 if rarity == "SP" else 0,
user_id
))
# 添加抽卡历史
cursor.execute("""
INSERT INTO draw_history (user_id, date, rarity, shikigami_id)
VALUES (?, ?, ?, ?)
""", (user_id, today, rarity, shikigami_id))
# 保持历史记录不超过100条
cursor.execute("""
DELETE FROM draw_history
WHERE user_id = ? AND id NOT IN (
SELECT id FROM draw_history
WHERE user_id = ?
ORDER BY date DESC
LIMIT 100
)
""", (user_id, user_id))
conn.commit()
# 更新成就进度
unlocked_achievements = self.update_achievement_progress(user_id, rarity)
return unlocked_achievements

View File

@@ -0,0 +1,307 @@
import random
from typing import Dict, Tuple, List, Optional
import os
from pathlib import Path
from .config import Config
from .data_manager import DataManager
config = Config()
data_manager = DataManager()
class GachaSystem:
def __init__(self):
self.data_manager = data_manager
def draw(self, user_id: str) -> Dict:
"""执行一次抽卡"""
# 检查抽卡限制
if not self.data_manager.check_daily_limit(user_id):
draws_left = self.data_manager.get_draws_left(user_id)
return {
"success": False,
"message": f"您今日的抽卡次数已用完,每日限制{config.DAILY_LIMIT}次,明天再来吧!"
}
# 抽取稀有度传递用户ID
rarity = self._draw_rarity(user_id)
# 从该稀有度中抽取式神
shikigami_data = self.data_manager.shikigami_data.get(rarity, [])
if not shikigami_data:
return {
"success": False,
"message": f"系统错误:{rarity}稀有度下没有可用式神"
}
# 随机选择式神
shikigami = random.choice(shikigami_data)
# 记录抽卡
unlocked_achievements = self.data_manager.record_draw(user_id, rarity, shikigami["name"])
# 剩余次数
draws_left = self.data_manager.get_draws_left(user_id)
return {
"success": True,
"rarity": rarity,
"name": shikigami["name"],
"image_url": shikigami["image_url"],
"draws_left": draws_left,
"unlocked_achievements": unlocked_achievements
}
def _draw_rarity(self, user_id: str = None) -> str:
"""按概率抽取稀有度"""
# 检查是否是特殊概率用户
if user_id and user_id in config.SPECIAL_PROBABILITY_USERS:
# 100%概率抽到SSR或SP随机选择
return random.choice(["SSR", "SP"])
# 普通用户的概率逻辑
r = random.random() * 100 # 0-100的随机数
cumulative = 0
for rarity, prob in config.RARITY_PROBABILITY.items():
cumulative += prob
if r < cumulative:
return rarity
# 默认返回R理论上不会执行到这里
return "R"
def get_user_stats(self, user_id: str) -> Dict:
"""获取用户抽卡统计"""
user_stats = self.data_manager.get_user_stats()
if user_id not in user_stats:
return {
"success": False,
"message": "您还没有抽卡记录哦!"
}
stats = user_stats[user_id]
return {
"success": True,
"total_draws": stats["total_draws"],
"R_count": stats["R_count"],
"SR_count": stats["SR_count"],
"SSR_count": stats["SSR_count"],
"SP_count": stats["SP_count"],
"recent_draws": stats["draw_history"][-5:] if stats["draw_history"] else []
}
def get_probability_text(self) -> str:
"""获取概率展示文本"""
probs = config.RARITY_PROBABILITY
return f"--- 系统概率 ---\nR: {probs['R']}% | SR: {probs['SR']}% | SSR: {probs['SSR']}% | SP: {probs['SP']}%"
def get_rank_list(self) -> List[Tuple[str, Dict[str, int]]]:
"""获取抽卡排行榜数据"""
user_stats = self.data_manager.get_user_stats()
# 过滤有SSR/SP记录的用户
ranked_users = [
(user_id, stats)
for user_id, stats in user_stats.items()
if stats.get("SSR_count", 0) > 0 or stats.get("SP_count", 0) > 0
]
# 按SSR+SP总数降序排序
ranked_users.sort(
key=lambda x: (x[1].get("SSR_count", 0) + x[1].get("SP_count", 0)),
reverse=True
)
return ranked_users
def get_daily_stats(self) -> Dict:
"""获取今日抽卡统计"""
daily_draws = self.data_manager.get_daily_draws()
today = self.data_manager.get_today_date()
if not daily_draws or today not in daily_draws:
return {
"success": False,
"message": "今日还没有人抽卡哦!"
}
today_stats = daily_draws[today]
total_stats = {
"total_users": len(today_stats),
"total_draws": 0,
"R_count": 0,
"SR_count": 0,
"SSR_count": 0,
"SP_count": 0,
"user_stats": []
}
# 统计每个用户的抽卡情况
for user_id, draws in today_stats.items():
user_stats = {
"user_id": user_id,
"total_draws": len(draws),
"R_count": sum(1 for d in draws if d["rarity"] == "R"),
"SR_count": sum(1 for d in draws if d["rarity"] == "SR"),
"SSR_count": sum(1 for d in draws if d["rarity"] == "SSR"),
"SP_count": sum(1 for d in draws if d["rarity"] == "SP")
}
# 更新总统计
total_stats["total_draws"] += user_stats["total_draws"]
total_stats["R_count"] += user_stats["R_count"]
total_stats["SR_count"] += user_stats["SR_count"]
total_stats["SSR_count"] += user_stats["SSR_count"]
total_stats["SP_count"] += user_stats["SP_count"]
# 只记录抽到SSR或SP的用户
if user_stats["SSR_count"] > 0 or user_stats["SP_count"] > 0:
total_stats["user_stats"].append(user_stats)
# 按SSR+SP数量排序用户统计
total_stats["user_stats"].sort(
key=lambda x: (x["SSR_count"] + x["SP_count"]),
reverse=True
)
# 构建稀有度统计
rarity_stats = {
"R": total_stats["R_count"],
"SR": total_stats["SR_count"],
"SSR": total_stats["SSR_count"],
"SP": total_stats["SP_count"]
}
# 构建排行榜数据
top_users = []
for user_stat in total_stats["user_stats"]:
top_users.append({
"user_id": user_stat["user_id"],
"ssr_count": user_stat["SSR_count"] + user_stat["SP_count"]
})
final_stats = {
"total_users": total_stats["total_users"],
"total_draws": total_stats["total_draws"],
"rarity_stats": rarity_stats,
"top_users": top_users
}
return {
"success": True,
"date": today,
"stats": final_stats
}
def triple_draw(self, user_id: str) -> Dict:
"""执行三连抽"""
# 检查是否有足够的抽卡次数
draws_left = self.data_manager.get_draws_left(user_id)
if draws_left < 3:
return {
"success": False,
"message": f"抽卡次数不足,您今日还剩{draws_left}次抽卡机会三连抽需要3次机会"
}
results = []
all_unlocked_achievements = []
# 执行三次抽卡
for i in range(3):
# 抽取稀有度传递用户ID
rarity = self._draw_rarity(user_id)
# 从该稀有度中抽取式神
shikigami_data = self.data_manager.shikigami_data.get(rarity, [])
if not shikigami_data:
return {
"success": False,
"message": f"系统错误:{rarity}稀有度下没有可用式神"
}
# 随机选择式神
shikigami = random.choice(shikigami_data)
# 记录抽卡
unlocked_achievements = self.data_manager.record_draw(user_id, rarity, shikigami["name"])
all_unlocked_achievements.extend(unlocked_achievements)
results.append({
"rarity": rarity,
"name": shikigami["name"],
"image_url": shikigami["image_url"]
})
# 剩余次数
draws_left = self.data_manager.get_draws_left(user_id)
return {
"success": True,
"results": results,
"draws_left": draws_left,
"unlocked_achievements": list(set(all_unlocked_achievements)) # 去重
}
def get_user_achievements(self, user_id: str) -> Dict:
"""获取用户成就信息"""
achievement_data = self.data_manager.get_user_achievements(user_id)
if not achievement_data["unlocked"] and all(v == 0 for v in achievement_data["progress"].values()):
return {
"success": False,
"message": "您还没有任何成就进度哦!快去抽卡吧!"
}
return {
"success": True,
"achievements": achievement_data["unlocked"],
"progress": achievement_data["progress"]
}
def get_daily_detailed_records(self, date: Optional[str] = None) -> Dict:
"""获取每日详细抽卡记录"""
if not date:
date = self.data_manager.get_today_date()
daily_draws = self.data_manager.get_daily_draws()
if not daily_draws or date not in daily_draws:
return {
"success": False,
"message": f"{date} 没有抽卡记录"
}
records = []
for user_id, draws in daily_draws[date].items():
for draw in draws:
# 检查这次抽卡是否解锁了成就
unlocked_achievements = []
draw_time = draw.get("timestamp", "未知时间")
# 获取用户成就信息
achievement_data = self.data_manager.get_user_achievements(user_id)
if achievement_data["unlocked"]:
# 检查是否有在抽卡时间之后解锁的成就
for achievement_id, achievement_info in achievement_data["unlocked"].items():
if achievement_info["unlocked_date"] == f"{date} {draw_time}":
unlocked_achievements.append(achievement_id)
records.append({
"user_id": user_id,
"draw_time": draw_time,
"shikigami_name": draw["name"],
"rarity": draw["rarity"],
"unlocked_achievements": unlocked_achievements
})
# 按时间排序
records.sort(key=lambda x: x["draw_time"])
return {
"success": True,
"date": date,
"records": records,
"total_count": len(records)
}

View File

@@ -0,0 +1,799 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>阴阳师抽卡系统 - 管理后台</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.10.0/font/bootstrap-icons.css">
<style>
.stat-card {
border-radius: 15px;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
transition: transform 0.3s;
}
.stat-card:hover {
transform: translateY(-5px);
}
.rarity-r { color: #6c757d; }
.rarity-sr { color: #0d6efd; }
.rarity-ssr { color: #ffc107; }
.rarity-sp { color: #dc3545; }
.achievement-card {
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
border-radius: 10px;
padding: 15px;
margin-bottom: 15px;
}
.progress {
height: 25px;
}
.navbar-brand {
font-weight: bold;
}
.table th {
border-top: none;
}
</style>
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-dark bg-primary">
<div class="container">
<a class="navbar-brand" href="#">
<i class="bi bi-dice-5"></i> 阴阳师抽卡系统 - 管理后台
</a>
<div class="ms-auto">
<span class="navbar-text">
<i class="bi bi-shield-lock"></i> 管理员访问
</span>
</div>
</div>
</nav>
<div class="container mt-4">
<!-- 统计概览 -->
<div class="row mb-4">
<div class="col-md-3">
<div class="card stat-card bg-light">
<div class="card-body text-center">
<h5 class="card-title"><i class="bi bi-people"></i> 参与人数</h5>
<h2 class="text-primary" id="totalUsers">-</h2>
</div>
</div>
</div>
<div class="col-md-3">
<div class="card stat-card bg-light">
<div class="card-body text-center">
<h5 class="card-title"><i class="bi bi-dice-6"></i> 总抽卡次数</h5>
<h2 class="text-success" id="totalDraws">-</h2>
</div>
</div>
</div>
<div class="col-md-3">
<div class="card stat-card bg-light">
<div class="card-body text-center">
<h5 class="card-title"><i class="bi bi-trophy"></i> SSR/SP总数</h5>
<h2 class="text-warning" id="totalSSRSP">-</h2>
</div>
</div>
</div>
<div class="col-md-3">
<div class="card stat-card bg-light">
<div class="card-body text-center">
<h5 class="card-title"><i class="bi bi-percent"></i> SSR/SP概率</h5>
<h2 class="text-danger" id="ssrSpRate">-</h2>
</div>
</div>
</div>
</div>
<!-- 稀有度分布 -->
<div class="row mb-4">
<div class="col-md-12">
<div class="card">
<div class="card-header">
<h5><i class="bi bi-pie-chart"></i> 稀有度分布</h5>
</div>
<div class="card-body">
<div class="row">
<div class="col-md-3">
<div class="text-center">
<h3 class="rarity-r">R</h3>
<div class="progress mb-2">
<div class="progress-bar bg-secondary" id="rRate" style="width: 0%"></div>
</div>
<h4 id="rCount">-</h4>
</div>
</div>
<div class="col-md-3">
<div class="text-center">
<h3 class="rarity-sr">SR</h3>
<div class="progress mb-2">
<div class="progress-bar bg-primary" id="srRate" style="width: 0%"></div>
</div>
<h4 id="srCount">-</h4>
</div>
</div>
<div class="col-md-3">
<div class="text-center">
<h3 class="rarity-ssr">SSR</h3>
<div class="progress mb-2">
<div class="progress-bar bg-warning" id="ssrRate" style="width: 0%"></div>
</div>
<h4 id="ssrCount">-</h4>
</div>
</div>
<div class="col-md-3">
<div class="text-center">
<h3 class="rarity-sp">SP</h3>
<div class="progress mb-2">
<div class="progress-bar bg-danger" id="spRate" style="width: 0%"></div>
</div>
<h4 id="spCount">-</h4>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
<!-- 排行榜 -->
<div class="row mb-4">
<div class="col-md-12">
<div class="card">
<div class="card-header d-flex justify-content-between align-items-center">
<h5><i class="bi bi-trophy"></i> SSR/SP排行榜</h5>
<button class="btn btn-sm btn-outline-primary" onclick="refreshRankList()">
<i class="bi bi-arrow-clockwise"></i> 刷新
</button>
</div>
<div class="card-body">
<div class="table-responsive">
<table class="table table-hover">
<thead>
<tr>
<th>排名</th>
<th>用户ID</th>
<th>总抽卡次数</th>
<th class="rarity-r">R</th>
<th class="rarity-sr">SR</th>
<th class="rarity-ssr">SSR</th>
<th class="rarity-sp">SP</th>
<th>SSR/SP总数</th>
<th>操作</th>
</tr>
</thead>
<tbody id="rankListBody">
<tr>
<td colspan="9" class="text-center">加载中...</td>
</tr>
</tbody>
</table>
</div>
</div>
</div>
</div>
</div>
<!-- 用户查询 -->
<div class="row mb-4">
<div class="col-md-12">
<div class="card">
<div class="card-header">
<h5><i class="bi bi-person-search"></i> 用户查询</h5>
</div>
<div class="card-body">
<div class="input-group mb-3">
<input type="text" class="form-control" id="userIdInput" placeholder="输入用户ID">
<button class="btn btn-primary" onclick="queryUser()">
<i class="bi bi-search"></i> 查询
</button>
</div>
<div id="userStatsResult" style="display: none;">
<div class="row">
<div class="col-md-6">
<h6>抽卡统计</h6>
<table class="table table-sm">
<tr>
<td>总抽卡次数</td>
<td id="userTotalDraws">-</td>
</tr>
<tr>
<td class="rarity-r">R卡数量</td>
<td id="userRCount">-</td>
</tr>
<tr>
<td class="rarity-sr">SR卡数量</td>
<td id="userSRCount">-</td>
</tr>
<tr>
<td class="rarity-ssr">SSR卡数量</td>
<td id="userSSRCount">-</td>
</tr>
<tr>
<td class="rarity-sp">SP卡数量</td>
<td id="userSPCount">-</td>
</tr>
</table>
</div>
<div class="col-md-6">
<h6>最近抽卡记录</h6>
<div id="recentDraws">
加载中...
</div>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
<!-- 成就查询 -->
<div class="row mb-4">
<div class="col-md-12">
<div class="card">
<div class="card-header">
<h5><i class="bi bi-award"></i> 成就查询</h5>
</div>
<div class="card-body">
<div class="input-group mb-3">
<input type="text" class="form-control" id="achievementUserIdInput" placeholder="输入用户ID">
<button class="btn btn-primary" onclick="queryAchievements()">
<i class="bi bi-search"></i> 查询
</button>
</div>
<div id="achievementResult" style="display: none;">
<div class="row">
<div class="col-md-6">
<h6>已解锁成就</h6>
<div id="unlockedAchievements">
加载中...
</div>
</div>
<div class="col-md-6">
<h6>成就进度</h6>
<div class="achievement-card">
<div class="d-flex justify-content-between">
<span>连续抽卡天数</span>
<span id="consecutiveDays">-</span>
</div>
<div class="progress mt-2">
<div class="progress-bar" id="consecutiveDaysProgress" style="width: 0%"></div>
</div>
</div>
<div class="achievement-card">
<div class="d-flex justify-content-between">
<span>无SSR/SP连击</span>
<span id="noSsrStreak">-</span>
</div>
<div class="progress mt-2">
<div class="progress-bar bg-danger" id="noSsrStreakProgress" style="width: 0%"></div>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
<!-- 每日详细抽卡记录 -->
<div class="row mb-4">
<div class="col-md-12">
<div class="card">
<div class="card-header d-flex justify-content-between align-items-center">
<h5><i class="bi bi-table"></i> 今日抽卡详细记录</h5>
<div>
<input type="date" class="form-control form-control-sm d-inline-block me-2" id="recordDateInput" style="width: auto;">
<button class="btn btn-sm btn-outline-primary" onclick="loadDailyRecords()">
<i class="bi bi-arrow-clockwise"></i> 刷新
</button>
</div>
</div>
<div class="card-body">
<div class="table-responsive">
<table class="table table-hover">
<thead>
<tr>
<th>抽卡时间</th>
<th>用户ID</th>
<th>式神名称</th>
<th>稀有度</th>
<th>成就解锁</th>
</tr>
</thead>
<tbody id="dailyRecordsBody">
<tr>
<td colspan="5" class="text-center">加载中...</td>
</tr>
</tbody>
</table>
</div>
<div class="d-flex justify-content-between align-items-center mt-3">
<div>
<span>总记录数: <strong id="totalRecordsCount">0</strong></span>
</div>
<div>
<nav aria-label="Page navigation">
<ul class="pagination pagination-sm" id="recordsPagination">
<!-- 分页控件将通过JS动态生成 -->
</ul>
</nav>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
<footer class="bg-light text-center py-3 mt-5">
<div class="container">
<p class="mb-0">阴阳师抽卡系统 - 管理后台 &copy; 2024</p>
</div>
</footer>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/js/bootstrap.bundle.min.js"></script>
<script>
// API配置
const API_BASE = '/onmyoji_gacha/api';
let ADMIN_TOKEN = localStorage.getItem('adminToken');
// 令牌重置函数
function resetToken() {
localStorage.removeItem('adminToken');
const newToken = prompt('请输入管理员令牌:');
if (newToken) {
ADMIN_TOKEN = newToken;
localStorage.setItem('adminToken', ADMIN_TOKEN);
// 更新请求头
headers.Authorization = `Bearer ${ADMIN_TOKEN}`;
console.log('令牌已重置:', ADMIN_TOKEN);
// 重新加载数据
loadDailyStats();
loadRankList();
loadDailyRecords();
} else {
alert('需要管理员令牌才能访问');
}
}
// 如果没有保存的令牌,提示输入
if (!ADMIN_TOKEN) {
ADMIN_TOKEN = prompt('请输入管理员令牌:');
if (ADMIN_TOKEN) {
localStorage.setItem('adminToken', ADMIN_TOKEN);
} else {
alert('需要管理员令牌才能访问');
window.location.reload();
}
}
console.log('使用的管理员令牌:', ADMIN_TOKEN);
// API请求头
const headers = {
'Authorization': `Bearer ${ADMIN_TOKEN}`,
'Content-Type': 'application/json'
};
console.log('请求头:', headers);
// 添加令牌重置按钮到导航栏
window.addEventListener('DOMContentLoaded', function() {
const navbar = document.querySelector('.navbar .ms-auto');
const resetButton = document.createElement('button');
resetButton.className = 'btn btn-outline-light btn-sm ms-2';
resetButton.innerHTML = '<i class="bi bi-key"></i> 重置令牌';
resetButton.onclick = resetToken;
navbar.appendChild(resetButton);
});
// 页面加载时获取数据
document.addEventListener('DOMContentLoaded', function() {
loadDailyStats();
loadRankList();
// 设置今天的日期为默认值
const today = new Date().toISOString().split('T')[0];
document.getElementById('recordDateInput').value = today;
loadDailyRecords();
});
// 加载今日统计
async function loadDailyStats() {
try {
console.log('正在请求每日统计...');
const response = await fetch(`${API_BASE}/stats/daily`, { headers });
console.log('响应状态:', response.status);
console.log('响应头:', response.headers);
if (!response.ok) {
const errorText = await response.text();
console.error('API 错误响应:', errorText);
throw new Error(`HTTP ${response.status}: ${errorText}`);
}
const data = await response.json();
console.log('每日统计数据:', data);
if (data.success) {
const stats = data.stats;
document.getElementById('totalUsers').textContent = stats.total_users;
document.getElementById('totalDraws').textContent = stats.total_draws;
const ssrSpTotal = stats.rarity_stats.SSR + stats.rarity_stats.SP;
document.getElementById('totalSSRSP').textContent = ssrSpTotal;
const ssrSpRate = ((ssrSpTotal / stats.total_draws) * 100).toFixed(2);
document.getElementById('ssrSpRate').textContent = ssrSpRate + '%';
// 更新稀有度分布
updateRarityDistribution(stats.rarity_stats, stats.total_draws);
} else {
console.error('API 返回失败:', data);
}
} catch (error) {
console.error('加载统计数据失败:', error);
// 显示错误信息给用户
document.getElementById('totalUsers').textContent = '错误';
document.getElementById('totalDraws').textContent = '错误';
document.getElementById('totalSSRSP').textContent = '错误';
document.getElementById('ssrSpRate').textContent = '错误';
}
}
// 更新稀有度分布
function updateRarityDistribution(rarityStats, totalDraws) {
const rarities = ['R', 'SR', 'SSR', 'SP'];
rarities.forEach(rarity => {
const count = rarityStats[rarity];
const rate = totalDraws > 0 ? (count / totalDraws * 100).toFixed(1) : 0;
document.getElementById(`${rarity.toLowerCase()}Count`).textContent = count;
document.getElementById(`${rarity.toLowerCase()}Rate`).style.width = rate + '%';
document.getElementById(`${rarity.toLowerCase()}Rate`).textContent = rate + '%';
});
}
// 加载排行榜
async function loadRankList() {
try {
console.log('正在请求排行榜数据...');
const response = await fetch(`${API_BASE}/stats/rank`, { headers });
console.log('排行榜响应状态:', response.status);
if (!response.ok) {
const errorText = await response.text();
console.error('排行榜 API 错误响应:', errorText);
throw new Error(`HTTP ${response.status}: ${errorText}`);
}
const data = await response.json();
console.log('排行榜数据:', data);
if (data.success) {
const tbody = document.getElementById('rankListBody');
tbody.innerHTML = '';
data.data.forEach((user, index) => {
const row = document.createElement('tr');
row.innerHTML = `
<td>${index + 1}</td>
<td>${user.user_id}</td>
<td>${user.total_draws}</td>
<td class="rarity-r">${user.R_count}</td>
<td class="rarity-sr">${user.SR_count}</td>
<td class="rarity-ssr">${user.SSR_count}</td>
<td class="rarity-sp">${user.SP_count}</td>
<td><strong>${user.ssr_sp_total}</strong></td>
<td>
<button class="btn btn-sm btn-outline-primary" onclick="viewUserDetails('${user.user_id}')">
<i class="bi bi-eye"></i> 详情
</button>
</td>
`;
tbody.appendChild(row);
});
if (data.data.length === 0) {
tbody.innerHTML = '<tr><td colspan="9" class="text-center">暂无数据</td></tr>';
}
} else {
console.error('排行榜 API 返回失败:', data);
document.getElementById('rankListBody').innerHTML = '<tr><td colspan="9" class="text-center">数据加载失败</td></tr>';
}
} catch (error) {
console.error('加载排行榜失败:', error);
document.getElementById('rankListBody').innerHTML = '<tr><td colspan="9" class="text-center">加载失败,请检查令牌</td></tr>';
}
}
// 刷新排行榜
function refreshRankList() {
loadRankList();
}
// 查询用户
async function queryUser() {
const userId = document.getElementById('userIdInput').value.trim();
if (!userId) {
alert('请输入用户ID');
return;
}
try {
const response = await fetch(`${API_BASE}/stats/user/${userId}`, { headers });
const data = await response.json();
if (data.success) {
document.getElementById('userStatsResult').style.display = 'block';
document.getElementById('userTotalDraws').textContent = data.total_draws;
document.getElementById('userRCount').textContent = data.R_count;
document.getElementById('userSRCount').textContent = data.SR_count;
document.getElementById('userSSRCount').textContent = data.SSR_count;
document.getElementById('userSPCount').textContent = data.SP_count;
// 显示最近抽卡记录
const recentDrawsDiv = document.getElementById('recentDraws');
if (data.recent_draws && data.recent_draws.length > 0) {
recentDrawsDiv.innerHTML = data.recent_draws.map(draw =>
`<div class="mb-1">
<span class="badge bg-secondary">${draw.date}</span>
<span class="badge rarity-${draw.rarity.toLowerCase()}">${draw.rarity}</span>
${draw.name}
</div>`
).join('');
} else {
recentDrawsDiv.innerHTML = '<p class="text-muted">暂无抽卡记录</p>';
}
} else {
alert('未找到用户数据');
document.getElementById('userStatsResult').style.display = 'none';
}
} catch (error) {
console.error('查询用户失败:', error);
alert('查询失败');
}
}
// 查看用户详情
function viewUserDetails(userId) {
document.getElementById('userIdInput').value = userId;
queryUser();
document.getElementById('achievementUserIdInput').value = userId;
queryAchievements();
// 滚动到用户查询区域
document.querySelector('.card:has(#userIdInput)').scrollIntoView({ behavior: 'smooth' });
}
// 查询成就
async function queryAchievements() {
const userId = document.getElementById('achievementUserIdInput').value.trim();
if (!userId) {
alert('请输入用户ID');
return;
}
try {
const response = await fetch(`${API_BASE}/achievements/${userId}`, { headers });
const data = await response.json();
if (data.success) {
document.getElementById('achievementResult').style.display = 'block';
// 显示已解锁成就
const unlockedDiv = document.getElementById('unlockedAchievements');
const achievements = Object.entries(data.achievements);
if (achievements.length > 0) {
unlockedDiv.innerHTML = achievements.map(([id, info]) =>
`<div class="achievement-card">
<div class="d-flex justify-content-between align-items-center">
<span>${id}</span>
<span class="badge bg-success">已解锁</span>
</div>
<small class="text-muted">${info.unlocked_date}</small>
</div>`
).join('');
} else {
unlockedDiv.innerHTML = '<p class="text-muted">暂无已解锁成就</p>';
}
// 显示成就进度
const progress = data.progress;
document.getElementById('consecutiveDays').textContent = progress.consecutive_days + ' 天';
document.getElementById('noSsrStreak').textContent = progress.no_ssr_streak + ' 次';
// 更新进度条
const consecutiveProgress = Math.min((progress.consecutive_days / 150) * 100, 100);
document.getElementById('consecutiveDaysProgress').style.width = consecutiveProgress + '%';
const noSsrProgress = Math.min((progress.no_ssr_streak / 180) * 100, 100);
document.getElementById('noSsrStreakProgress').style.width = noSsrProgress + '%';
} else {
alert('未找到用户成就数据');
document.getElementById('achievementResult').style.display = 'none';
}
} catch (error) {
console.error('查询成就失败:', error);
alert('查询失败');
}
}
// 每页记录数
const RECORDS_PER_PAGE = 20;
let currentRecords = [];
let currentPage = 1;
// 加载每日详细抽卡记录
async function loadDailyRecords() {
const date = document.getElementById('recordDateInput').value;
if (!date) {
alert('请选择日期');
return;
}
try {
console.log('正在请求每日详细抽卡记录...', date);
const response = await fetch(`${API_BASE}/records/daily?date=${date}`, { headers });
console.log('每日详细记录响应状态:', response.status);
if (!response.ok) {
const errorText = await response.text();
console.error('每日详细记录 API 错误响应:', errorText);
throw new Error(`HTTP ${response.status}: ${errorText}`);
}
const data = await response.json();
console.log('每日详细记录数据:', data);
if (data.success) {
currentRecords = data.records;
currentPage = 1;
document.getElementById('totalRecordsCount').textContent = data.total_count;
displayRecords();
} else {
console.error('每日详细记录 API 返回失败:', data);
document.getElementById('dailyRecordsBody').innerHTML = '<tr><td colspan="5" class="text-center">暂无数据</td></tr>';
document.getElementById('totalRecordsCount').textContent = '0';
document.getElementById('recordsPagination').innerHTML = '';
}
} catch (error) {
console.error('加载每日详细记录失败:', error);
document.getElementById('dailyRecordsBody').innerHTML = '<tr><td colspan="5" class="text-center">加载失败,请检查令牌</td></tr>';
document.getElementById('totalRecordsCount').textContent = '0';
document.getElementById('recordsPagination').innerHTML = '';
}
}
// 显示记录(支持分页)
function displayRecords() {
const tbody = document.getElementById('dailyRecordsBody');
tbody.innerHTML = '';
// 计算分页
const startIndex = (currentPage - 1) * RECORDS_PER_PAGE;
const endIndex = Math.min(startIndex + RECORDS_PER_PAGE, currentRecords.length);
const pageRecords = currentRecords.slice(startIndex, endIndex);
if (pageRecords.length === 0) {
tbody.innerHTML = '<tr><td colspan="5" class="text-center">暂无数据</td></tr>';
document.getElementById('recordsPagination').innerHTML = '';
return;
}
// 显示当前页的记录
pageRecords.forEach(record => {
const row = document.createElement('tr');
// 格式化成就解锁信息
let achievementsHtml = '';
if (record.unlocked_achievements && record.unlocked_achievements.length > 0) {
achievementsHtml = record.unlocked_achievements.map(achievement =>
`<span class="badge bg-success me-1">${achievement}</span>`
).join('');
} else {
achievementsHtml = '<span class="text-muted">无</span>';
}
row.innerHTML = `
<td>${record.draw_time}</td>
<td>${record.user_id}</td>
<td>${record.shikigami_name}</td>
<td><span class="badge rarity-${record.rarity.toLowerCase()}">${record.rarity}</span></td>
<td>${achievementsHtml}</td>
`;
tbody.appendChild(row);
});
// 更新分页控件
updatePagination();
}
// 更新分页控件
function updatePagination() {
const totalPages = Math.ceil(currentRecords.length / RECORDS_PER_PAGE);
const pagination = document.getElementById('recordsPagination');
pagination.innerHTML = '';
if (totalPages <= 1) {
return;
}
// 上一页按钮
const prevLi = document.createElement('li');
prevLi.className = `page-item ${currentPage === 1 ? 'disabled' : ''}`;
prevLi.innerHTML = `<a class="page-link" href="#" onclick="changePage(${currentPage - 1})">上一页</a>`;
pagination.appendChild(prevLi);
// 页码按钮
const maxVisiblePages = 5;
let startPage = Math.max(1, currentPage - Math.floor(maxVisiblePages / 2));
let endPage = Math.min(totalPages, startPage + maxVisiblePages - 1);
if (endPage - startPage + 1 < maxVisiblePages) {
startPage = Math.max(1, endPage - maxVisiblePages + 1);
}
if (startPage > 1) {
const firstLi = document.createElement('li');
firstLi.className = 'page-item';
firstLi.innerHTML = `<a class="page-link" href="#" onclick="changePage(1)">1</a>`;
pagination.appendChild(firstLi);
if (startPage > 2) {
const ellipsisLi = document.createElement('li');
ellipsisLi.className = 'page-item disabled';
ellipsisLi.innerHTML = `<a class="page-link" href="#">...</a>`;
pagination.appendChild(ellipsisLi);
}
}
for (let i = startPage; i <= endPage; i++) {
const pageLi = document.createElement('li');
pageLi.className = `page-item ${i === currentPage ? 'active' : ''}`;
pageLi.innerHTML = `<a class="page-link" href="#" onclick="changePage(${i})">${i}</a>`;
pagination.appendChild(pageLi);
}
if (endPage < totalPages) {
if (endPage < totalPages - 1) {
const ellipsisLi = document.createElement('li');
ellipsisLi.className = 'page-item disabled';
ellipsisLi.innerHTML = `<a class="page-link" href="#">...</a>`;
pagination.appendChild(ellipsisLi);
}
const lastLi = document.createElement('li');
lastLi.className = 'page-item';
lastLi.innerHTML = `<a class="page-link" href="#" onclick="changePage(${totalPages})">${totalPages}</a>`;
pagination.appendChild(lastLi);
}
// 下一页按钮
const nextLi = document.createElement('li');
nextLi.className = `page-item ${currentPage === totalPages ? 'disabled' : ''}`;
nextLi.innerHTML = `<a class="page-link" href="#" onclick="changePage(${currentPage + 1})">下一页</a>`;
pagination.appendChild(nextLi);
}
// 切换页面
function changePage(page) {
const totalPages = Math.ceil(currentRecords.length / RECORDS_PER_PAGE);
if (page < 1 || page > totalPages) {
return;
}
currentPage = page;
displayRecords();
}
</script>
</body>
</html>

View File

@@ -0,0 +1,12 @@
import os
from typing import Union, Optional
from pathlib import Path
def get_image_path(file_path: str) -> str:
"""获取图片的绝对路径"""
return os.path.abspath(file_path)
def format_user_mention(user_id: str, user_name: Optional[str] = None) -> str:
"""格式化用户@信息"""
display_name = user_name if user_name else f"用户{user_id}"
return f"@{display_name}"

View File

@@ -0,0 +1,202 @@
"""
onmyoji_gacha 插件的 Web API 接口
使用 NoneBot 内置的 FastAPI 适配器提供管理员后台接口
"""
import os
from typing import Dict, List, Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Header, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from nonebot import get_driver
from .config import Config
from .gacha import GachaSystem
# 创建配置实例
config = Config()
gacha_system = GachaSystem()
# 创建 FastAPI 路由
router = APIRouter(prefix="/onmyoji_gacha", tags=["onmyoji_gacha"])
# 设置模板目录
templates = Jinja2Templates(directory="danding_bot/plugins/onmyoji_gacha/templates")
# 依赖:验证管理员权限
async def verify_admin_token(authorization: Optional[str] = Header(None)):
"""验证管理员权限"""
print(f"🔐 验证管理员令牌: {authorization}")
if not authorization:
print("❌ 缺少认证令牌")
raise HTTPException(status_code=401, detail="缺少认证令牌")
token = authorization.replace("Bearer ", "")
print(f"🔑 提取的令牌: {token}")
print(f"🎯 期望的令牌: {config.WEB_ADMIN_TOKEN}")
if token != config.WEB_ADMIN_TOKEN:
print("❌ 令牌验证失败")
raise HTTPException(status_code=403, detail="无效的认证令牌")
print("✅ 令牌验证成功")
return True
# API 响应模型
class DailyStatsResponse(BaseModel):
success: bool
date: str
stats: Dict[str, Any]
class UserStatsResponse(BaseModel):
success: bool
user_id: str
total_draws: int
R_count: int
SR_count: int
SSR_count: int
SP_count: int
recent_draws: List[Dict[str, str]]
class RankListResponse(BaseModel):
success: bool
data: List[Dict[str, Any]]
class AchievementResponse(BaseModel):
success: bool
user_id: str
achievements: Dict[str, Any]
progress: Dict[str, Any]
class DailyDetailedRecordsResponse(BaseModel):
success: bool
date: str
records: List[Dict[str, Any]]
total_count: int
# 管理后台页面
@router.get("/admin", response_class=HTMLResponse)
async def admin_page(request: Request):
"""管理后台页面"""
return templates.TemplateResponse("admin.html", {"request": request})
# API 端点
@router.get("/api/stats/daily", response_model=DailyStatsResponse, dependencies=[Depends(verify_admin_token)])
async def get_daily_stats():
"""获取今日抽卡统计"""
result = gacha_system.get_daily_stats()
if not result["success"]:
return result
return {
"success": True,
"date": result["date"],
"stats": result["stats"]
}
@router.get("/api/stats/user/{user_id}", response_model=UserStatsResponse, dependencies=[Depends(verify_admin_token)])
async def get_user_stats(user_id: str):
"""获取用户抽卡统计"""
result = gacha_system.get_user_stats(user_id)
if not result["success"]:
return {
"success": False,
"user_id": user_id,
"total_draws": 0,
"R_count": 0,
"SR_count": 0,
"SSR_count": 0,
"SP_count": 0,
"recent_draws": []
}
return {
"success": True,
"user_id": user_id,
"total_draws": result["total_draws"],
"R_count": result["R_count"],
"SR_count": result["SR_count"],
"SSR_count": result["SSR_count"],
"SP_count": result["SP_count"],
"recent_draws": result["recent_draws"]
}
@router.get("/api/stats/rank", response_model=RankListResponse, dependencies=[Depends(verify_admin_token)])
async def get_rank_list():
"""获取抽卡排行榜"""
rank_data = gacha_system.get_rank_list()
# 转换数据格式
formatted_data = []
for user_id, stats in rank_data:
formatted_data.append({
"user_id": user_id,
"total_draws": stats["total_draws"],
"R_count": stats["R_count"],
"SR_count": stats["SR_count"],
"SSR_count": stats["SSR_count"],
"SP_count": stats["SP_count"],
"ssr_sp_total": stats["SSR_count"] + stats["SP_count"]
})
return {
"success": True,
"data": formatted_data
}
@router.get("/api/achievements/{user_id}", response_model=AchievementResponse, dependencies=[Depends(verify_admin_token)])
async def get_user_achievements(user_id: str):
"""获取用户成就信息"""
result = gacha_system.get_user_achievements(user_id)
if not result["success"]:
return {
"success": False,
"user_id": user_id,
"achievements": {},
"progress": {}
}
return {
"success": True,
"user_id": user_id,
"achievements": result["achievements"],
"progress": result["progress"]
}
@router.get("/api/records/daily", response_model=DailyDetailedRecordsResponse, dependencies=[Depends(verify_admin_token)])
async def get_daily_detailed_records(date: Optional[str] = None):
"""获取每日详细抽卡记录"""
result = gacha_system.get_daily_detailed_records(date)
if not result["success"]:
return {
"success": False,
"date": date or gacha_system.data_manager.get_today_date(),
"records": [],
"total_count": 0
}
return {
"success": True,
"date": result["date"],
"records": result["records"],
"total_count": result["total_count"]
}
# 注册路由到 NoneBot 的 FastAPI 应用
# 将在插件加载时由 __init__.py 调用
def register_web_routes():
"""注册 Web 路由到 NoneBot 的 FastAPI 应用"""
try:
from nonebot import get_driver
driver = get_driver()
# 获取 FastAPI 应用实例
app = driver.server_app
# 注册路由
app.include_router(router)
print("✅ onmyoji_gacha Web API 路由注册成功")
return True
except Exception as e:
print(f"❌ 注册 Web 路由时出错: {e}")
return False