Files
2025-12-26 22:41:42 +08:00

616 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import sqlite3
import datetime
from typing import Dict, List, Any, Optional
import logging
from pathlib import Path
from .config import Config
# 创建Config实例
config = Config()
class DataManager:
def __init__(self):
# 确保目录存在
os.makedirs(os.path.dirname(config.DB_FILE), exist_ok=True)
# 初始化数据库
self._init_db()
# 加载式神数据
self.shikigami_data = self._load_shikigami_data()
def _init_db(self):
"""初始化数据库"""
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
# 创建式神表
cursor.execute("""
CREATE TABLE IF NOT EXISTS shikigami (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
rarity TEXT NOT NULL,
image_path TEXT NOT NULL
)
""")
# 创建每日抽卡记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS daily_draws (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
user_id TEXT NOT NULL,
rarity TEXT NOT NULL,
shikigami_id INTEGER NOT NULL,
timestamp TEXT NOT NULL,
FOREIGN KEY (shikigami_id) REFERENCES shikigami(id)
)
""")
# 创建用户统计表
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_stats (
user_id TEXT PRIMARY KEY,
total_draws INTEGER DEFAULT 0,
R_count INTEGER DEFAULT 0,
SR_count INTEGER DEFAULT 0,
SSR_count INTEGER DEFAULT 0,
SP_count INTEGER DEFAULT 0
)
""")
# 创建抽卡历史表
cursor.execute("""
CREATE TABLE IF NOT EXISTS draw_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
date TEXT NOT NULL,
rarity TEXT NOT NULL,
shikigami_id INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES user_stats(user_id),
FOREIGN KEY (shikigami_id) REFERENCES shikigami(id)
)
""")
# 创建成就表
cursor.execute("""
CREATE TABLE IF NOT EXISTS achievements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
achievement_id TEXT NOT NULL,
unlocked_date TEXT NOT NULL,
reward_claimed INTEGER DEFAULT 0,
UNIQUE(user_id, achievement_id)
)
""")
# 创建用户成就进度表
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_achievement_progress (
user_id TEXT PRIMARY KEY,
consecutive_days INTEGER DEFAULT 0,
last_draw_date TEXT DEFAULT '',
no_ssr_streak INTEGER DEFAULT 0,
total_consecutive_days INTEGER DEFAULT 0
)
""")
conn.commit()
def update_achievement_progress(self, user_id: str, rarity: str) -> List[str]:
"""更新用户成就进度,返回新解锁的成就列表"""
today = self.get_today_date()
unlocked_achievements = []
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
# 获取或创建用户成就进度
cursor.execute(
"SELECT * FROM user_achievement_progress WHERE user_id = ?",
(user_id,)
)
progress = cursor.fetchone()
if not progress:
cursor.execute(
"INSERT INTO user_achievement_progress (user_id, last_draw_date) VALUES (?, ?)",
(user_id, today)
)
consecutive_days = 1
no_ssr_streak = 1 if rarity not in ["SSR", "SP"] else 0
total_consecutive_days = 1
else:
last_draw_date = progress[2]
consecutive_days = progress[1]
no_ssr_streak = progress[3]
total_consecutive_days = progress[4]
# 更新连续抽卡天数
if last_draw_date != today:
# 检查是否是连续的一天
last_date = datetime.datetime.strptime(last_draw_date, "%Y-%m-%d")
current_date = datetime.datetime.strptime(today, "%Y-%m-%d")
days_diff = (current_date - last_date).days
if days_diff == 1:
consecutive_days += 1
total_consecutive_days += 1
elif days_diff > 1:
consecutive_days = 1
total_consecutive_days += 1
# days_diff == 0 表示今天已经抽过卡了,不更新连续天数
# 更新无SSR连击数
if rarity in ["SSR", "SP"]:
no_ssr_streak = 0
else:
no_ssr_streak += 1
# 更新进度
cursor.execute("""
INSERT OR REPLACE INTO user_achievement_progress
(user_id, consecutive_days, last_draw_date, no_ssr_streak, total_consecutive_days)
VALUES (?, ?, ?, ?, ?)
""", (user_id, consecutive_days, today, no_ssr_streak, total_consecutive_days))
# 检查是否解锁新成就
for achievement_id, achievement_config in config.ACHIEVEMENTS.items():
# 对于可重复获得的成就(勤勤恳恳系列),需要特殊处理
if achievement_config.get("repeatable", False) and achievement_config["type"] == "consecutive_days":
# 检查连续抽卡成就的升级逻辑
if consecutive_days >= achievement_config["threshold"]:
# 检查是否已经解锁过这个等级
cursor.execute(
"SELECT id FROM achievements WHERE user_id = ? AND achievement_id = ?",
(user_id, achievement_id)
)
if not cursor.fetchone():
# 解锁新等级的成就
cursor.execute("""
INSERT INTO achievements (user_id, achievement_id, unlocked_date)
VALUES (?, ?, ?)
""", (user_id, achievement_id, today))
unlocked_achievements.append(achievement_id)
# 如果是最高等级(Ⅴ),检查是否需要给重复奖励
elif achievement_config["level"] == 5 and consecutive_days >= 150:
# 每30天给一次重复奖励
days_over_150 = consecutive_days - 150
if days_over_150 > 0 and days_over_150 % 30 == 0:
# 检查这个重复奖励是否已经给过
repeat_id = f"{achievement_id}_repeat_{days_over_150//30}"
cursor.execute(
"SELECT id FROM achievements WHERE user_id = ? AND achievement_id = ?",
(user_id, repeat_id)
)
if not cursor.fetchone():
cursor.execute("""
INSERT INTO achievements (user_id, achievement_id, unlocked_date)
VALUES (?, ?, ?)
""", (user_id, repeat_id, today))
unlocked_achievements.append(achievement_id)
else:
# 非重复成就的原有逻辑
# 检查是否已经解锁
cursor.execute(
"SELECT id FROM achievements WHERE user_id = ? AND achievement_id = ?",
(user_id, achievement_id)
)
if cursor.fetchone():
continue
# 检查成就条件
unlocked = False
if achievement_config["type"] == "consecutive_days":
if consecutive_days >= achievement_config["threshold"]:
unlocked = True
elif achievement_config["type"] == "no_ssr_streak":
if no_ssr_streak >= achievement_config["threshold"]:
unlocked = True
if unlocked:
cursor.execute("""
INSERT INTO achievements (user_id, achievement_id, unlocked_date)
VALUES (?, ?, ?)
""", (user_id, achievement_id, today))
unlocked_achievements.append(achievement_id)
conn.commit()
return unlocked_achievements
def get_user_achievements(self, user_id: str) -> Dict[str, Any]:
"""获取用户成就信息"""
with sqlite3.connect(config.DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取已解锁的成就
cursor.execute(
"SELECT achievement_id, unlocked_date, reward_claimed FROM achievements WHERE user_id = ?",
(user_id,)
)
unlocked = {row["achievement_id"]: {
"unlocked_date": row["unlocked_date"],
"reward_claimed": bool(row["reward_claimed"])
} for row in cursor.fetchall()}
# 获取进度
cursor.execute(
"SELECT * FROM user_achievement_progress WHERE user_id = ?",
(user_id,)
)
progress_row = cursor.fetchone()
if not progress_row:
progress = {
"consecutive_days": 0,
"no_ssr_streak": 0,
"total_consecutive_days": 0
}
else:
progress = {
"consecutive_days": progress_row["consecutive_days"],
"no_ssr_streak": progress_row["no_ssr_streak"],
"total_consecutive_days": progress_row["total_consecutive_days"]
}
return {
"unlocked": unlocked,
"progress": progress
}
def claim_achievement_reward(self, user_id: str, achievement_id: str) -> bool:
"""领取成就奖励"""
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE achievements
SET reward_claimed = 1
WHERE user_id = ? AND achievement_id = ? AND reward_claimed = 0
""", (user_id, achievement_id))
conn.commit()
return cursor.rowcount > 0
# 迁移现有JSON数据到SQLite
self._migrate_data()
def _migrate_data(self):
"""迁移JSON数据到SQLite"""
try:
# 迁移每日抽卡记录
if os.path.exists(config.DAILY_DRAWS_FILE):
with open(config.DAILY_DRAWS_FILE, 'r', encoding='utf-8') as f:
daily_draws = json.load(f)
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
for date, users in daily_draws.items():
for user_id, draws in users.items():
for draw in draws:
# 查找式神ID
cursor.execute(
"SELECT id FROM shikigami WHERE name=? AND rarity=?",
(draw["name"], draw["rarity"])
)
shikigami_id = cursor.fetchone()
if shikigami_id:
cursor.execute(
"INSERT INTO daily_draws (date, user_id, rarity, shikigami_id, timestamp) VALUES (?, ?, ?, ?, ?)",
(date, user_id, draw["rarity"], shikigami_id[0], draw["timestamp"])
)
conn.commit()
# 迁移用户统计数据
if os.path.exists(config.USER_STATS_FILE):
with open(config.USER_STATS_FILE, 'r', encoding='utf-8') as f:
user_stats = json.load(f)
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
for user_id, stats in user_stats.items():
cursor.execute(
"INSERT OR REPLACE INTO user_stats (user_id, total_draws, R_count, SR_count, SSR_count, SP_count) VALUES (?, ?, ?, ?, ?, ?)",
(user_id, stats["total_draws"], stats["R_count"], stats["SR_count"], stats["SSR_count"], stats["SP_count"])
)
# 迁移抽卡历史
for draw in stats.get("draw_history", []):
cursor.execute(
"SELECT id FROM shikigami WHERE name=? AND rarity=?",
(draw["name"], draw["rarity"])
)
shikigami_id = cursor.fetchone()
if shikigami_id:
cursor.execute(
"INSERT INTO draw_history (user_id, date, rarity, shikigami_id) VALUES (?, ?, ?, ?)",
(user_id, draw["date"], draw["rarity"], shikigami_id[0])
)
conn.commit()
except Exception as e:
logging.error(f"数据迁移失败: {e}")
def _load_shikigami_data(self) -> Dict[str, List[Dict[str, str]]]:
"""加载式神数据到数据库"""
result = {"R": [], "SR": [], "SSR": [], "SP": []}
rarity_dirs = {
"R": "r",
"SR": "sr",
"SSR": "ssr",
"SP": "sp"
}
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
# 清空现有式神数据
cursor.execute("DELETE FROM shikigami")
for rarity, dir_name in rarity_dirs.items():
dir_path = os.path.join(config.SHIKIGAMI_IMG_DIR, dir_name)
if os.path.exists(dir_path):
for file_name in os.listdir(dir_path):
if file_name.endswith(('.png', '.jpg', '.jpeg')):
name = os.path.splitext(file_name)[0]
image_path = os.path.join(dir_path, file_name)
# 插入式神数据
cursor.execute(
"INSERT INTO shikigami (name, rarity, image_path) VALUES (?, ?, ?)",
(name, rarity, image_path)
)
result[rarity].append({
"name": name,
"image_url": image_path
})
conn.commit()
return result
def get_today_date(self) -> str:
"""获取当前日期字符串"""
return datetime.datetime.now().strftime("%Y-%m-%d")
def get_current_time(self) -> str:
"""获取当前时间字符串"""
return datetime.datetime.now().strftime("%H:%M:%S")
def get_daily_draws(self) -> Dict[str, Dict[str, List[Dict[str, str]]]]:
"""获取每日抽卡记录"""
result = {}
today = self.get_today_date()
with sqlite3.connect(config.DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 先查询今日的抽卡记录
cursor.execute("""
SELECT date, user_id, rarity, shikigami_id, timestamp
FROM daily_draws
WHERE date = ?
ORDER BY timestamp
""", (today,))
rows = cursor.fetchall()
# 获取所有涉及的式神ID
shikigami_ids = list(set(row["shikigami_id"] for row in rows))
# 查询式神信息
shikigami_info = {}
if shikigami_ids:
placeholders = ','.join('?' * len(shikigami_ids))
cursor.execute(f"""
SELECT id, name, rarity
FROM shikigami
WHERE id IN ({placeholders})
""", shikigami_ids)
for shikigami_row in cursor.fetchall():
shikigami_info[shikigami_row["id"]] = {
"name": shikigami_row["name"],
"rarity": shikigami_row["rarity"]
}
# 构建结果
for row in rows:
date = row["date"]
user_id = row["user_id"]
shikigami_id = row["shikigami_id"]
if date not in result:
result[date] = {}
if user_id not in result[date]:
result[date][user_id] = []
# 如果找不到式神信息使用daily_draws表中的稀有度和默认名称
if shikigami_id in shikigami_info:
name = shikigami_info[shikigami_id]["name"]
rarity = shikigami_info[shikigami_id]["rarity"]
else:
name = f"式神{shikigami_id}"
rarity = row["rarity"]
result[date][user_id].append({
"rarity": rarity,
"name": name,
"timestamp": row["timestamp"]
})
return result
def save_daily_draws(self, data: Dict[str, Dict[str, List[Dict[str, str]]]]):
"""保存每日抽卡记录"""
# SQLite实现中此方法为空因为记录时直接插入数据库
pass
def get_user_stats(self) -> Dict[str, Dict[str, Any]]:
"""获取用户统计数据"""
result = {}
with sqlite3.connect(config.DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取基础统计
cursor.execute("SELECT * FROM user_stats")
user_stats = cursor.fetchall()
for stat in user_stats:
user_id = stat["user_id"]
result[user_id] = {
"total_draws": stat["total_draws"],
"R_count": stat["R_count"],
"SR_count": stat["SR_count"],
"SSR_count": stat["SSR_count"],
"SP_count": stat["SP_count"],
"draw_history": []
}
# 获取抽卡历史
cursor.execute("""
SELECT draw_history.date, draw_history.rarity, shikigami.name
FROM draw_history
JOIN shikigami ON draw_history.shikigami_id = shikigami.id
WHERE draw_history.user_id = ?
ORDER BY draw_history.date DESC
LIMIT 100
""", (user_id,))
history = cursor.fetchall()
result[user_id]["draw_history"] = [
{
"date": row["date"],
"rarity": row["rarity"],
"name": row["name"]
} for row in history
]
return result
def save_user_stats(self, data: Dict[str, Dict[str, Any]]):
"""保存用户统计数据"""
# SQLite实现中此方法为空因为统计时直接更新数据库
pass
def check_daily_limit(self, user_id: str) -> bool:
"""检查用户是否达到每日抽卡限制"""
today = self.get_today_date()
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*)
FROM daily_draws
WHERE date = ? AND user_id = ?
""", (today, user_id))
count = cursor.fetchone()[0]
return count < config.DAILY_LIMIT
def get_draws_left(self, user_id: str) -> int:
"""获取用户今日剩余抽卡次数"""
today = self.get_today_date()
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*)
FROM daily_draws
WHERE date = ? AND user_id = ?
""", (today, user_id))
count = cursor.fetchone()[0]
return max(0, config.DAILY_LIMIT - count)
def record_draw(self, user_id: str, rarity: str, shikigami_name: str) -> List[str]:
"""记录一次抽卡,返回新解锁的成就列表"""
today = self.get_today_date()
current_time = self.get_current_time()
with sqlite3.connect(config.DB_FILE) as conn:
cursor = conn.cursor()
# 获取式神ID
cursor.execute(
"SELECT id FROM shikigami WHERE name = ? AND rarity = ?",
(shikigami_name, rarity)
)
shikigami_id = cursor.fetchone()
if not shikigami_id:
logging.error(f"找不到式神: {shikigami_name} ({rarity})")
return []
shikigami_id = shikigami_id[0]
# 记录每日抽卡
cursor.execute("""
INSERT INTO daily_draws (date, user_id, rarity, shikigami_id, timestamp)
VALUES (?, ?, ?, ?, ?)
""", (today, user_id, rarity, shikigami_id, current_time))
# 更新用户统计
cursor.execute("""
INSERT OR IGNORE INTO user_stats (user_id) VALUES (?)
""", (user_id,))
cursor.execute("""
UPDATE user_stats
SET total_draws = total_draws + 1,
R_count = R_count + ?,
SR_count = SR_count + ?,
SSR_count = SSR_count + ?,
SP_count = SP_count + ?
WHERE user_id = ?
""", (
1 if rarity == "R" else 0,
1 if rarity == "SR" else 0,
1 if rarity == "SSR" else 0,
1 if rarity == "SP" else 0,
user_id
))
# 添加抽卡历史
cursor.execute("""
INSERT INTO draw_history (user_id, date, rarity, shikigami_id)
VALUES (?, ?, ?, ?)
""", (user_id, today, rarity, shikigami_id))
# 保持历史记录不超过100条
cursor.execute("""
DELETE FROM draw_history
WHERE user_id = ? AND id NOT IN (
SELECT id FROM draw_history
WHERE user_id = ?
ORDER BY date DESC
LIMIT 100
)
""", (user_id, user_id))
conn.commit()
# 更新成就进度
unlocked_achievements = self.update_achievement_progress(user_id, rarity)
return unlocked_achievements