import asyncio import threading from datetime import datetime from typing import Tuple, List, Dict, Any from .config import Config from .database import PointsDatabase class PointsAPI: """Points system API for managing user points.""" def __init__(self, config: Config): self.config = config self.db = PointsDatabase(config) self._lock = threading.Lock() async def get_balance(self, user_id: str) -> int: """Get user's current points balance.""" return await asyncio.to_thread(self.db.get_user_balance, user_id) async def add_points( self, user_id: str, amount: int, source: str, reason: str = None ) -> Tuple[bool, int]: """Add points to user account. Returns: (success, new_balance) """ # Parameter validation if not isinstance(amount, int) or amount <= 0: return False, 0 if not user_id or not source: return False, 0 # Operation limit validation if self.config.POINTS_MAX_PER_OPERATION > 0: if amount > self.config.POINTS_MAX_PER_OPERATION: return False, 0 def _add(): with self._lock: conn = self.db.get_connection() cursor = conn.cursor() try: # Ensure user exists self.db.ensure_user_exists(user_id) # Get current balance cursor.execute( "SELECT points FROM user_points WHERE user_id = ?", (user_id,), ) row = cursor.fetchone() current_balance = row["points"] if row else 0 # Check balance limit new_balance = current_balance + amount if self.config.POINTS_MAX_BALANCE > 0: if new_balance > self.config.POINTS_MAX_BALANCE: conn.close() return False, current_balance # Update balance and total_earned now = datetime.now().isoformat() cursor.execute( """ UPDATE user_points SET points = ?, total_earned = total_earned + ?, updated_at = ? WHERE user_id = ? """, (new_balance, amount, now, user_id), ) # Write transaction log cursor.execute( """ INSERT INTO point_transactions (user_id, amount, balance_after, source, reason, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (user_id, amount, new_balance, source, reason, now), ) conn.commit() conn.close() return True, new_balance except Exception: conn.close() return False, 0 return await asyncio.to_thread(_add) async def spend_points( self, user_id: str, amount: int, source: str, reason: str = None ) -> Tuple[bool, int]: """Spend points from user account. Returns: (success, new_balance) """ # Parameter validation if not isinstance(amount, int) or amount <= 0: return False, 0 if not user_id or not source: return False, 0 # Operation limit validation if self.config.POINTS_MAX_PER_OPERATION > 0: if amount > self.config.POINTS_MAX_PER_OPERATION: return False, 0 def _spend(): with self._lock: conn = self.db.get_connection() cursor = conn.cursor() try: # Ensure user exists self.db.ensure_user_exists(user_id) # Get current balance cursor.execute( "SELECT points FROM user_points WHERE user_id = ?", (user_id,), ) row = cursor.fetchone() current_balance = row["points"] if row else 0 # Check sufficient balance if current_balance < amount: conn.close() return False, current_balance # Update balance and total_spent new_balance = current_balance - amount now = datetime.now().isoformat() cursor.execute( """ UPDATE user_points SET points = ?, total_spent = total_spent + ?, updated_at = ? WHERE user_id = ? """, (new_balance, amount, now, user_id), ) # Write transaction log (amount as negative) cursor.execute( """ INSERT INTO point_transactions (user_id, amount, balance_after, source, reason, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (user_id, -amount, new_balance, source, reason, now), ) conn.commit() conn.close() return True, new_balance except Exception: conn.close() return False, 0 return await asyncio.to_thread(_spend) async def set_points( self, user_id: str, amount: int, source: str, reason: str = None ) -> Tuple[bool, int]: """Set user's points to exact amount. Returns: (success, new_balance) """ # Parameter validation if not isinstance(amount, int) or amount < 0: return False, 0 if not user_id or not source: return False, 0 def _set(): with self._lock: conn = self.db.get_connection() cursor = conn.cursor() try: # Ensure user exists self.db.ensure_user_exists(user_id) # Get current balance cursor.execute( "SELECT points, total_earned FROM user_points WHERE user_id = ?", (user_id,), ) row = cursor.fetchone() current_balance = row["points"] if row else 0 current_earned = row["total_earned"] if row else 0 # If new value equals old value, return without writing if current_balance == amount: conn.close() return True, amount # Calculate difference for total_earned (only positive diff) diff = amount - current_balance earned_diff = max(0, diff) # Update balance and total_earned now = datetime.now().isoformat() cursor.execute( """ UPDATE user_points SET points = ?, total_earned = total_earned + ?, updated_at = ? WHERE user_id = ? """, (amount, earned_diff, now, user_id), ) # Write transaction log cursor.execute( """ INSERT INTO point_transactions (user_id, amount, balance_after, source, reason, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (user_id, diff, amount, source, reason, now), ) conn.commit() conn.close() return True, amount except Exception: conn.close() return False, 0 return await asyncio.to_thread(_set) async def get_transactions( self, user_id: str, limit: int = 20, offset: int = 0 ) -> List[Dict[str, Any]]: """Get transaction history for a user. Returns: List of transaction dicts """ # Normalize parameters limit = max(1, min(100, limit)) offset = max(0, offset) def _get(): conn = self.db.get_connection() cursor = conn.cursor() cursor.execute( """ SELECT id, user_id, amount, balance_after, source, reason, created_at FROM point_transactions WHERE user_id = ? ORDER BY id DESC LIMIT ? OFFSET ? """, (user_id, limit, offset), ) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] return await asyncio.to_thread(_get) async def get_ranking( self, limit: int = 10, order_by: str = "points" ) -> List[Dict[str, Any]]: """Get points ranking. Returns: List of ranking dicts with rank field """ # Normalize parameters limit = max(1, min(100, limit)) if order_by not in ("points", "total_earned"): order_by = "points" def _get(): conn = self.db.get_connection() cursor = conn.cursor() order_column = "points" if order_by == "points" else "total_earned" query = f""" SELECT RANK() OVER (ORDER BY {order_column} DESC) as rank, user_id, points, total_earned, total_spent FROM user_points ORDER BY {order_column} DESC, user_id ASC LIMIT ? """ cursor.execute(query, (limit,)) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] return await asyncio.to_thread(_get)