Files
Mr.Xia 0fd011fa1e 功能:实现 Danding_Points 积分系统插件
- 新增积分系统插件,支持积分查询、签到、转账等核心功能
- 包含对应的测试脚本

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 00:24:00 +08:00

295 lines
10 KiB
Python

import asyncio
import threading
from datetime import datetime
from typing import Tuple, List, Dict, Any
from .config import Config
from .database import PointsDatabase
class PointsAPI:
"""Points system API for managing user points."""
def __init__(self, config: Config):
self.config = config
self.db = PointsDatabase(config)
self._lock = threading.Lock()
async def get_balance(self, user_id: str) -> int:
"""Get user's current points balance."""
return await asyncio.to_thread(self.db.get_user_balance, user_id)
async def add_points(
self, user_id: str, amount: int, source: str, reason: str = None
) -> Tuple[bool, int]:
"""Add points to user account.
Returns: (success, new_balance)
"""
# Parameter validation
if not isinstance(amount, int) or amount <= 0:
return False, 0
if not user_id or not source:
return False, 0
# Operation limit validation
if self.config.POINTS_MAX_PER_OPERATION > 0:
if amount > self.config.POINTS_MAX_PER_OPERATION:
return False, 0
def _add():
with self._lock:
conn = self.db.get_connection()
cursor = conn.cursor()
try:
# Ensure user exists
self.db.ensure_user_exists(user_id)
# Get current balance
cursor.execute(
"SELECT points FROM user_points WHERE user_id = ?",
(user_id,),
)
row = cursor.fetchone()
current_balance = row["points"] if row else 0
# Check balance limit
new_balance = current_balance + amount
if self.config.POINTS_MAX_BALANCE > 0:
if new_balance > self.config.POINTS_MAX_BALANCE:
conn.close()
return False, current_balance
# Update balance and total_earned
now = datetime.now().isoformat()
cursor.execute(
"""
UPDATE user_points
SET points = ?, total_earned = total_earned + ?, updated_at = ?
WHERE user_id = ?
""",
(new_balance, amount, now, user_id),
)
# Write transaction log
cursor.execute(
"""
INSERT INTO point_transactions
(user_id, amount, balance_after, source, reason, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(user_id, amount, new_balance, source, reason, now),
)
conn.commit()
conn.close()
return True, new_balance
except Exception:
conn.close()
return False, 0
return await asyncio.to_thread(_add)
async def spend_points(
self, user_id: str, amount: int, source: str, reason: str = None
) -> Tuple[bool, int]:
"""Spend points from user account.
Returns: (success, new_balance)
"""
# Parameter validation
if not isinstance(amount, int) or amount <= 0:
return False, 0
if not user_id or not source:
return False, 0
# Operation limit validation
if self.config.POINTS_MAX_PER_OPERATION > 0:
if amount > self.config.POINTS_MAX_PER_OPERATION:
return False, 0
def _spend():
with self._lock:
conn = self.db.get_connection()
cursor = conn.cursor()
try:
# Ensure user exists
self.db.ensure_user_exists(user_id)
# Get current balance
cursor.execute(
"SELECT points FROM user_points WHERE user_id = ?",
(user_id,),
)
row = cursor.fetchone()
current_balance = row["points"] if row else 0
# Check sufficient balance
if current_balance < amount:
conn.close()
return False, current_balance
# Update balance and total_spent
new_balance = current_balance - amount
now = datetime.now().isoformat()
cursor.execute(
"""
UPDATE user_points
SET points = ?, total_spent = total_spent + ?, updated_at = ?
WHERE user_id = ?
""",
(new_balance, amount, now, user_id),
)
# Write transaction log (amount as negative)
cursor.execute(
"""
INSERT INTO point_transactions
(user_id, amount, balance_after, source, reason, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(user_id, -amount, new_balance, source, reason, now),
)
conn.commit()
conn.close()
return True, new_balance
except Exception:
conn.close()
return False, 0
return await asyncio.to_thread(_spend)
async def set_points(
self, user_id: str, amount: int, source: str, reason: str = None
) -> Tuple[bool, int]:
"""Set user's points to exact amount.
Returns: (success, new_balance)
"""
# Parameter validation
if not isinstance(amount, int) or amount < 0:
return False, 0
if not user_id or not source:
return False, 0
def _set():
with self._lock:
conn = self.db.get_connection()
cursor = conn.cursor()
try:
# Ensure user exists
self.db.ensure_user_exists(user_id)
# Get current balance
cursor.execute(
"SELECT points, total_earned FROM user_points WHERE user_id = ?",
(user_id,),
)
row = cursor.fetchone()
current_balance = row["points"] if row else 0
current_earned = row["total_earned"] if row else 0
# If new value equals old value, return without writing
if current_balance == amount:
conn.close()
return True, amount
# Calculate difference for total_earned (only positive diff)
diff = amount - current_balance
earned_diff = max(0, diff)
# Update balance and total_earned
now = datetime.now().isoformat()
cursor.execute(
"""
UPDATE user_points
SET points = ?, total_earned = total_earned + ?, updated_at = ?
WHERE user_id = ?
""",
(amount, earned_diff, now, user_id),
)
# Write transaction log
cursor.execute(
"""
INSERT INTO point_transactions
(user_id, amount, balance_after, source, reason, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(user_id, diff, amount, source, reason, now),
)
conn.commit()
conn.close()
return True, amount
except Exception:
conn.close()
return False, 0
return await asyncio.to_thread(_set)
async def get_transactions(
self, user_id: str, limit: int = 20, offset: int = 0
) -> List[Dict[str, Any]]:
"""Get transaction history for a user.
Returns: List of transaction dicts
"""
# Normalize parameters
limit = max(1, min(100, limit))
offset = max(0, offset)
def _get():
conn = self.db.get_connection()
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, user_id, amount, balance_after, source, reason, created_at
FROM point_transactions
WHERE user_id = ?
ORDER BY id DESC
LIMIT ? OFFSET ?
""",
(user_id, limit, offset),
)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
return await asyncio.to_thread(_get)
async def get_ranking(
self, limit: int = 10, order_by: str = "points"
) -> List[Dict[str, Any]]:
"""Get points ranking.
Returns: List of ranking dicts with rank field
"""
# Normalize parameters
limit = max(1, min(100, limit))
if order_by not in ("points", "total_earned"):
order_by = "points"
def _get():
conn = self.db.get_connection()
cursor = conn.cursor()
order_column = "points" if order_by == "points" else "total_earned"
query = f"""
SELECT
RANK() OVER (ORDER BY {order_column} DESC) as rank,
user_id,
points,
total_earned,
total_spent
FROM user_points
ORDER BY {order_column} DESC, user_id ASC
LIMIT ?
"""
cursor.execute(query, (limit,))
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
return await asyncio.to_thread(_get)