Files
DanDingNoneBot/danding_bot/plugins/onmyoji_gacha/data_manager.py

292 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
阴阳师抽卡插件 - xapi 数据管理模块。
本模块只负责调用 xapi /bot/gacha 运行时 API。抽卡概率、奖励发放和 QQ 消息编排
仍由 nonebot 插件本地负责。
"""
from __future__ import annotations
import asyncio
import datetime
import logging
from typing import Any, Dict, List, Optional
import aiohttp
from .config import Config
logger = logging.getLogger(__name__)
config = Config()
class DataManager:
"""抽卡数据管理器,封装 /bot/gacha HTTP 调用。"""
def __init__(self):
self.shikigami_data: Dict[str, List[Dict[str, Any]]] = {"R": [], "SR": [], "SSR": [], "SP": []}
def _url(self, path: str) -> str:
"""拼接 /bot/gacha 端点地址。"""
return f"{config.GACHA_API_HOST}/{path.lstrip('/')}"
def _auth(self) -> Dict[str, str]:
"""生成 xapi Bot 鉴权参数。"""
return {
"user": config.BOT_USER_ID,
"token": config.BOT_TOKEN,
}
async def _request(
self,
method: str,
path: str,
*,
payload: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
) -> Optional[Dict[str, Any]]:
"""调用 xapi /bot/gacha并只向上层暴露 data。"""
request_url = self._url(path)
timeout = aiohttp.ClientTimeout(total=10)
try:
async with aiohttp.ClientSession() as session:
if method == "GET":
request_params = {**self._auth(), **(params or {})}
async with session.get(request_url, params=request_params, timeout=timeout) as resp:
return await self._parse_response(resp, path)
request_payload = {**self._auth(), **(payload or {})}
async with session.post(request_url, json=request_payload, timeout=timeout) as resp:
return await self._parse_response(resp, path)
except aiohttp.ClientError as exc:
logger.error("gacha api request failed path=%s error=%s", path, exc)
return None
except asyncio.TimeoutError as exc:
logger.error("gacha api request timeout path=%s error=%s", path, exc)
return None
async def _parse_response(self, resp: aiohttp.ClientResponse, path: str) -> Optional[Dict[str, Any]]:
"""解析 xapi 统一响应,失败时返回 None 维持旧调用方失败语义。"""
if resp.status != 200:
logger.error("gacha api bad status path=%s status=%s", path, resp.status)
return None
body = await resp.json()
if body.get("code") != 200:
logger.error("gacha api fail path=%s code=%s message=%s", path, body.get("code"), body.get("message"))
return None
data = body.get("data")
return data if isinstance(data, dict) else None
async def refresh_shikigami_data(self) -> Dict[str, List[Dict[str, Any]]]:
"""从 xapi 拉取式神基础数据并按稀有度缓存。"""
data = await self._request("GET", "shikigami")
items = data.get("items", []) if data else []
grouped: Dict[str, List[Dict[str, Any]]] = {"R": [], "SR": [], "SSR": [], "SP": []}
for item in items:
rarity = item.get("rarity")
if rarity not in grouped:
continue
image_path = item.get("image_path") or item.get("image_url") or ""
grouped[rarity].append(
{
"id": item.get("id"),
"name": item.get("name"),
"rarity": rarity,
"image_path": image_path,
"image_url": image_path,
}
)
self.shikigami_data = grouped
return self.shikigami_data
async def ensure_shikigami_data(self) -> Dict[str, List[Dict[str, Any]]]:
"""确保式神缓存已加载。"""
if not any(self.shikigami_data.values()):
await self.refresh_shikigami_data()
return self.shikigami_data
def get_today_date(self) -> str:
"""获取当前日期字符串。"""
return datetime.datetime.now().strftime("%Y-%m-%d")
def get_current_time(self) -> str:
"""获取当前时间字符串。"""
return datetime.datetime.now().strftime("%H:%M:%S")
def _find_shikigami(self, rarity: str, shikigami_name: str) -> Optional[Dict[str, Any]]:
"""从本地缓存查找 xapi 托管式神。"""
for item in self.shikigami_data.get(rarity, []):
if item.get("name") == shikigami_name:
return item
return None
async def get_draws_left(self, user_id: str) -> int:
"""获取用户今日剩余抽卡次数。"""
data = await self._request("GET", "draws-left", params={"user_id": user_id})
if data is None:
return 0
return int(data.get("draws_left", 0) or 0)
async def check_daily_limit(self, user_id: str) -> bool:
"""检查用户是否还有抽卡次数。"""
return await self.get_draws_left(user_id) > 0
async def record_draw_result(self, user_id: str, rarity: str, shikigami: Dict[str, Any]) -> Dict[str, Any]:
"""写入一次抽卡并返回 xapi 原始业务结果。"""
data = await self._request(
"POST",
"draw",
payload={
"user_id": user_id,
"shikigami_id": int(shikigami["id"]),
"rarity": rarity,
"name": shikigami["name"],
},
)
if data is None:
return {"success": False, "message": "抽卡记录写入失败"}
return data
async def record_triple_draw_result(self, user_id: str, draws: List[Dict[str, Any]]) -> Dict[str, Any]:
"""写入三连抽并返回 xapi 原始业务结果。"""
payload_draws = [
{
"shikigami_id": int(item["id"]),
"rarity": item["rarity"],
"name": item["name"],
}
for item in draws
]
data = await self._request("POST", "draw/triple", payload={"user_id": user_id, "draws": payload_draws})
if data is None:
return {"success": False, "message": "三连抽记录写入失败"}
return data
async def record_draw(self, user_id: str, rarity: str, shikigami_name: str) -> List[str]:
"""记录一次抽卡,返回新解锁的成就列表。"""
await self.ensure_shikigami_data()
shikigami = self._find_shikigami(rarity, shikigami_name)
if not shikigami:
logger.error("找不到式神: %s (%s)", shikigami_name, rarity)
return []
result = await self.record_draw_result(user_id, rarity, shikigami)
if not result.get("success"):
logger.error("抽卡记录写入失败 user_id=%s message=%s", user_id, result.get("message"))
return []
return result.get("unlocked_achievements", [])
async def record_sign_in(self, user_id: str, points_awarded: int) -> bool:
"""记录每日签到,重复签到返回 False。"""
data = await self._request(
"POST",
"sign-in",
payload={"user_id": user_id, "points_awarded": points_awarded},
)
if data is None:
return False
return bool(data.get("success")) and not bool(data.get("signed_already"))
async def get_user_stats(self, user_id: str) -> Dict[str, Any]:
"""获取用户抽卡统计。"""
data = await self._request("GET", "user-stats", params={"user_id": user_id})
return data or {"success": False, "message": "您还没有抽卡记录哦!"}
async def get_daily_stats(self, date: Optional[str] = None) -> Dict[str, Any]:
"""获取指定日期抽卡统计。"""
params = {"date": date} if date else {}
data = await self._request("GET", "daily-stats", params=params)
return data or {"success": False, "message": "今日还没有人抽卡哦!"}
async def get_rank(self, limit: int = 10) -> List[Dict[str, Any]]:
"""获取抽卡排行榜。"""
data = await self._request("GET", "rank", params={"limit": max(1, min(100, limit))})
if data is None:
return []
items = data.get("items", [])
return items if isinstance(items, list) else []
async def get_user_achievements(self, user_id: str) -> Dict[str, Any]:
"""获取用户成就信息。"""
data = await self._request("GET", f"achievements/{user_id}")
if data is None:
return {
"unlocked": {},
"progress": {
"consecutive_days": 0,
"no_ssr_streak": 0,
"total_consecutive_days": 0,
},
}
return {
"unlocked": data.get("achievements", {}),
"progress": data.get("progress", {}),
}
async def claim_achievement_reward(self, user_id: str, achievement_id: str) -> bool:
"""标记成就奖励已领取。"""
data = await self._request("POST", f"achievements/{user_id}/claim", payload={"achievement_id": achievement_id})
return bool(data and data.get("success"))
async def get_daily_records(self, date: Optional[str] = None) -> Dict[str, Any]:
"""获取每日详细抽卡记录。"""
params = {"date": date} if date else {}
data = await self._request("GET", "records/daily", params=params)
return data or {"success": False, "date": date or self.get_today_date(), "records": [], "total_count": 0}
async def get_daily_draws(self, date: Optional[str] = None) -> Dict[str, Dict[str, List[Dict[str, str]]]]:
"""按旧结构返回每日抽卡记录。"""
data = await self.get_daily_records(date)
result: Dict[str, Dict[str, List[Dict[str, str]]]] = {}
if not data.get("success"):
return result
target_date = data.get("date") or date or self.get_today_date()
result[target_date] = {}
for record in data.get("records", []):
user_id = record.get("user_id")
if not user_id:
continue
result[target_date].setdefault(user_id, []).append(
{
"rarity": record.get("rarity", ""),
"name": record.get("shikigami_name", ""),
"timestamp": record.get("draw_time", ""),
}
)
return result
async def has_signed_in_today(self, user_id: str) -> bool:
"""保留旧方法名;当前无独立查询端点,签到去重由 xapi sign-in 写接口处理。"""
return False
def save_daily_draws(self, data: Dict[str, Dict[str, List[Dict[str, str]]]]) -> None:
"""兼容旧空方法,运行时不写本地文件。"""
return None
def save_user_stats(self, data: Dict[str, Dict[str, Any]]) -> None:
"""兼容旧空方法,运行时不写本地文件。"""
return None