import asyncio import logging import uuid from datetime import datetime from nonebot import on_command from nonebot.adapters.onebot.v11 import Bot, Event, GroupMessageEvent, Message, MessageSegment, PrivateMessageEvent from danding_bot.plugins.danding_qqpush.config import Config as QqPushConfig from danding_bot.plugins.danding_qqpush.image_render import ImageRenderer from .room_store import RoomStore from .points_service import PointsService from .race_engine import RaceEngine from .message_service import MessageService from .models import Room, Horse, Bet, HorseState, RoomState, RaceResult # Import config from __init__ to ensure it's loaded through NoneBot driver from . import plugin_config as config logger = logging.getLogger("horse_racing.commands") room_store = RoomStore(config) points_service = PointsService(config) race_engine = RaceEngine(config) message_service = MessageService(config) _race_image_renderer: ImageRenderer | None = None async def _get_user_name(bot: Bot, scope: str, user_id: str) -> str: """Get user display name (group card > nickname > user_id).""" try: if scope.startswith("group_"): group_id = int(scope.split("_", 1)[1]) info = await bot.get_group_member_info(group_id=group_id, user_id=int(user_id)) return info.get("card") or info.get("nickname") or user_id except Exception: pass return user_id async def _build_name_map(bot: Bot, scope: str, user_ids: list[str]) -> dict[str, str]: """Build a mapping from user_id to display name.""" name_map: dict[str, str] = {} for uid in user_ids: if uid not in name_map: name_map[uid] = await _get_user_name(bot, scope, uid) return name_map def _get_race_image_renderer() -> ImageRenderer: global _race_image_renderer if _race_image_renderer is None: qqpush_config = QqPushConfig() _race_image_renderer = ImageRenderer( width=config.RACE_IMAGE_WIDTH, font_size=config.RACE_IMAGE_FONT_SIZE, padding=config.RACE_IMAGE_PADDING, line_spacing=config.RACE_IMAGE_LINE_SPACING, font_paths=qqpush_config.FontPaths, ) return _race_image_renderer def _build_race_image_message(message: str) -> Message: if message.startswith("比赛开始!"): title = "🏇 赛马开赛" body = message.replace("比赛开始!", "发令枪响,比赛正式开始!", 1) elif message.startswith("比赛结束!"): title = "🏆 赛马结果" body = message else: title = "📣 赛马进度" body = f"🏁 实时播报\n{message}" renderer = _get_race_image_renderer() image_base64 = renderer.render_to_base64(body, title=title) message_obj = Message() message_obj.append(MessageSegment.image(image_base64)) return message_obj def get_scope(event: Event) -> str: """Get room scope from event.""" if isinstance(event, GroupMessageEvent): return f"group_{event.group_id}" elif isinstance(event, PrivateMessageEvent): return f"test_{event.user_id}" return "" async def check_access(bot: Bot, event: Event) -> bool: """Check if user has access to horse racing.""" if isinstance(event, PrivateMessageEvent): if not config.TEST_MODE: return False return event.user_id in config.TESTERS if isinstance(event, GroupMessageEvent): if config.TEST_MODE: return event.group_id in config.TEST_GROUPS return event.group_id in config.ALLOWED_GROUPS return False def get_event_id(event: Event) -> str: """Get user id as string from event.""" return str(event.user_id) def _normalize_horse_name(horse_name: str) -> str: return horse_name.strip().casefold() def _get_horses_in_order(room: Room) -> list[Horse]: return sorted(room.horses.values(), key=lambda horse: horse.index) def _format_horse_label(horse: Horse) -> str: return f"{horse.index:02d}号 {horse.name}" def _find_user_horse(room: Room, user_id: str) -> Horse | None: for horse in _get_horses_in_order(room): if horse.owner_id == user_id: return horse return None def _find_duplicate_horse(room: Room, horse_name: str) -> Horse | None: normalized_name = _normalize_horse_name(horse_name) for horse in room.horses.values(): if _normalize_horse_name(horse.name) == normalized_name: return horse return None def _resolve_horse_selector(room: Room, selector: str) -> Horse | None: selector = selector.strip() if selector.isdigit(): target_index = int(selector) for horse in room.horses.values(): if horse.index == target_index: return horse return None normalized_selector = _normalize_horse_name(selector) for horse in room.horses.values(): if _normalize_horse_name(horse.name) == normalized_selector: return horse return None def _describe_points_delta(delta: int) -> str: if delta >= 300: return "血赚翻倍" if delta >= 150: return "大赚特赚" if delta > 0: return "小有收获" if delta == 0: return "稳住不亏" if delta <= -300: return "倾家荡产" if delta <= -150: return "伤筋动骨" return "略有损失" def _build_point_changes(room: Room, odds: dict[str, float]) -> tuple[dict[str, int], dict[str, str]]: point_changes: dict[str, int] = {} # Participation reward for all horse owners for horse in room.horses.values(): point_changes[horse.owner_id] = point_changes.get(horse.owner_id, 0) + config.PARTICIPANT_REWARD for bet in room.bets: point_changes[bet.user_id] = point_changes.get(bet.user_id, 0) - bet.amount champion = room.horses.get(room.champion_name) if champion: point_changes[champion.owner_id] = point_changes.get(champion.owner_id, 0) + config.CHAMPION_REWARD for bet in room.bets: if bet.horse_name == room.champion_name: payout = int(bet.amount * odds.get(bet.horse_name, config.MIN_ODDS)) point_changes[bet.user_id] = point_changes.get(bet.user_id, 0) + payout point_summaries = { user_id: _describe_points_delta(delta) for user_id, delta in point_changes.items() } return point_changes, point_summaries async def _format_point_change_lines(room: Room, point_changes: dict[str, int], point_summaries: dict[str, str], name_map: dict[str, str]) -> list[str]: ordered_user_ids: list[str] = [] for horse in _get_horses_in_order(room): if horse.owner_id not in ordered_user_ids: ordered_user_ids.append(horse.owner_id) for bet in room.bets: if bet.user_id not in ordered_user_ids: ordered_user_ids.append(bet.user_id) lines = ["积分变化:"] for user_id in ordered_user_ids: delta = point_changes.get(user_id, 0) summary = point_summaries.get(user_id, _describe_points_delta(delta)) display_name = name_map.get(user_id, user_id) balance = await points_service.get_balance(user_id) lines.append(f" {display_name} {delta:+d} 积分 · {summary}(余额: {balance})") return lines def calculate_odds(room: Room) -> dict[str, float]: """Calculate odds for each horse based on bet distribution.""" total_bet = sum(b.amount for b in room.bets) odds = {} for name in room.horses: horse_bet = sum(b.amount for b in room.bets if b.horse_name == name) if horse_bet == 0: odds[name] = config.MIN_ODDS else: raw_odds = total_bet / horse_bet odds[name] = max(config.MIN_ODDS, round(raw_odds, 2)) return odds async def settle_race(room: Room) -> tuple[RaceResult, dict[str, float]] | None: """Settle bets and rewards after race finishes. Returns (result, odds) or None.""" champion = room.horses.get(room.champion_name) if not champion: return None # 1. Collect all user IDs involved user_ids = set() for horse in room.horses.values(): user_ids.add(horse.owner_id) for horse in room.horses.values(): for bet in horse.bets: user_ids.add(bet.user_id) # 2. Take balance snapshot BEFORE settlements pre_balances = {} for uid in user_ids: balance = await points_service.get_balance(uid) pre_balances[uid] = balance if balance is not None else 0 # 3. Execute all reward/payout operations participant_points = config.PARTICIPANT_REWARD for horse in room.horses.values(): ret, code = await points_service.reward_participant(horse.owner_id, participant_points) if not ret and code != POINTS_ERR_CODE_DUPLICATE: logger.warning(f"reward_participant failed for {horse.owner_id}: code={code}") champion_points = config.CHAMPION_REWARD ret, code = await points_service.reward_champion(champion.owner_id, champion_points) if not ret and code != POINTS_ERR_CODE_DUPLICATE: logger.warning(f"reward_champion failed for {champion.owner_id}: code={code}") all_bets = [] for horse_name, horse in room.horses.items(): all_bets.extend(horse.bets) total_bet = sum(bet.amount for bet in all_bets) if total_bet == 0: odds = {} else: odds = {} for horse_name, horse in room.horses.items(): horse_bet = sum(bet.amount for bet in horse.bets) if horse_bet == 0: odds[horse_name] = config.MAX_ODDS else: odds[horse_name] = max(config.MIN_ODDS, total_bet / horse_bet) champion_bets = room.horses[room.champion_name].bets for bet in champion_bets: win_amount = int(bet.amount * odds[room.champion_name]) ret, code = await points_service.payout_winnings(bet.user_id, win_amount) if not ret and code != POINTS_ERR_CODE_DUPLICATE: logger.warning(f"payout_winnings failed for {bet.user_id}: code={code}") # 4. Take post-settlement snapshot and compute actual deltas post_balances = {} for uid in user_ids: balance = await points_service.get_balance(uid) post_balances[uid] = balance if balance is not None else 0 point_changes = {} for uid in user_ids: delta = post_balances[uid] - pre_balances[uid] if delta != 0: point_changes[uid] = delta result = RaceResult( champion_name=room.champion_name, finishing_order=[name for name in room.finishing_order if name in room.horses], point_changes=point_changes # actual deltas ) return result, odds async def _send_to_scope(bot: Bot, scope: str, message: str, message_type: str = "race_update", critical: bool = False): """Send message to group or private chat based on scope. Args: critical: If True, retry once on failure, then fallback to plain text. """ outbound_message: str | Message = message if config.RACE_RENDER_AS_IMAGE: try: outbound_message = _build_race_image_message(message) except Exception as e: logger.warning(f"_build_race_image_message failed, using plain text: {e}") outbound_message = message try: await message_service.send_with_recall(bot, scope, message_type, outbound_message) except Exception as e: if critical: logger.warning(f"_send_to_scope failed (critical={critical}) for {scope} [{message_type}]: {e}, retrying...") try: # Retry once with plain text await message_service.send_with_recall(bot, scope, message_type, message) except Exception as e2: logger.error(f"_send_to_scope retry also failed for {scope} [{message_type}]: {e2}") else: logger.warning(f"_send_to_scope failed for {scope} [{message_type}]: {e}") async def run_race_with_settlement(bot: Bot, room: Room, scope: str): """Run race with live progress updates and settlement.""" room.state = RoomState.RUNNING horse_list = "\n".join(f" {_format_horse_label(h)}" for h in _get_horses_in_order(room)) await _send_to_scope(bot, scope, f"比赛开始!\n{horse_list}", "race_update") # Race loop with progress updates try: while room.state == RoomState.RUNNING: await asyncio.sleep(config.RACE_TICK_INTERVAL) finished = race_engine.tick(room) progress = race_engine.format_progress(room) await _send_to_scope(bot, scope, progress, "race_update") if finished: champion = race_engine.determine_champion(finished) room.champion_name = champion.name room.state = RoomState.FINISHED break except asyncio.CancelledError: room.state = RoomState.INTERRUPTED for bet in room.bets: await points_service.refund_bet_points(bet.user_id, bet.amount, "比赛中断退还") return # Settle (returns (result, odds) or None) settlement = await settle_race(room) result = settlement[0] if settlement else None odds = settlement[1] if settlement else {} # Build user_id -> display name mapping all_user_ids = [h.owner_id for h in room.horses.values()] + [b.user_id for b in room.bets] name_map = await _build_name_map(bot, scope, all_user_ids) champion = room.horses.get(room.champion_name) champion_name_display = name_map.get(champion.owner_id, champion.owner_id) if champion else '?' result_lines = [ f"比赛结束!冠军:{_format_horse_label(champion) if champion else room.champion_name}", f"马主 {champion_name_display} 获得 {config.CHAMPION_REWARD} 积分", ] winning_bets = [b for b in room.bets if b.horse_name == room.champion_name] if winning_bets: result_lines.append("下注中奖:") for b in winning_bets: payout = int(b.amount * odds.get(b.horse_name, config.MIN_ODDS)) bettor_name = name_map.get(b.user_id, b.user_id) result_lines.append(f" {bettor_name} 下注 {b.amount} -> 获得 {payout}") if result: result_lines.extend(await _format_point_change_lines(room, result.point_changes, result.point_change_summaries, name_map)) # Before sending result, we can recall the last update await message_service.recall_previous_of_type(bot, scope, "race_update") await _send_to_scope(bot, scope, "\n".join(result_lines), "race_result") # Cleanup race_engine.stop_race(scope) room_store.delete_room(scope) message_service.clear_pending_recalls(scope) # --- Commands --- register_cmd = on_command("赛马报名", priority=5) @register_cmd.handle() async def handle_register(bot: Bot, event: Event): """Handle horse registration.""" if not await check_access(bot, event): await register_cmd.finish("无权限访问此功能") return msg = str(event.get_message()).strip() parts = msg.split(None, 1) user_id = get_event_id(event) horse_name = _normalize_horse_name(parts[1]) if len(parts) > 1 else await room_store.get_last_horse_name(user_id) or "" if not horse_name: scope = get_scope(event) horse_name = await _get_user_name(bot, scope, user_id) # Ensure name is not too long when using nickname as default if len(horse_name) > 10: horse_name = horse_name[:10] if len(horse_name) > 10: await register_cmd.finish("马匹名不能超过10个字符") return scope = get_scope(event) lock = room_store.get_lock(scope) async with lock: room = room_store.get_room(scope) if not room: room = await room_store.create_room(scope) if room.state != RoomState.WAITING: await register_cmd.finish("比赛正在进行中,无法报名") return if len(room.horses) >= 8: await register_cmd.finish("房间已满(最多8匹马)") return duplicate_horse = _find_duplicate_horse(room, horse_name) if duplicate_horse: await register_cmd.finish(f"马匹名 \"{horse_name}\" 已被 {_format_horse_label(duplicate_horse)} 使用") return existing_user_horse = _find_user_horse(room, user_id) if existing_user_horse: await register_cmd.finish(f"你已经报名了,当前马匹为 {_format_horse_label(existing_user_horse)}") return horse_index = room.next_horse_index room.next_horse_index += 1 room.horses[horse_name] = Horse(owner_id=user_id, name=horse_name, index=horse_index) await room_store.set_last_horse_name(user_id, horse_name) count = len(room.horses) registered_horse = room.horses[horse_name] await register_cmd.finish(f"报名成功!{_format_horse_label(registered_horse)} 已加入比赛({count}/8)") cancel_cmd = on_command("赛马取消报名", priority=5) @cancel_cmd.handle() async def handle_cancel(bot: Bot, event: Event): """Handle cancel registration.""" if not await check_access(bot, event): await cancel_cmd.finish("无权限访问此功能") return scope = get_scope(event) user_id = get_event_id(event) lock = room_store.get_lock(scope) async with lock: room = room_store.get_room(scope) if not room: await cancel_cmd.finish("房间不存在") return if room.state != RoomState.WAITING: await cancel_cmd.finish("比赛正在进行中,无法取消报名") return user_horse = _find_user_horse(room, user_id) if not user_horse: await cancel_cmd.finish("你还没有报名") return bets_to_refund = [b for b in room.bets if b.horse_name == user_horse.name] for bet in bets_to_refund: await points_service.refund_bet_points(bet.user_id, bet.amount, "取消报名退还下注") room.bets = [b for b in room.bets if b.horse_name != user_horse.name] del room.horses[user_horse.name] await cancel_cmd.finish(f"已取消报名,{_format_horse_label(user_horse)} 已退出") bet_cmd = on_command("赛马下注", priority=5) cancel_bet_cmd = on_command("赛马取消下注", priority=5) @cancel_bet_cmd.handle() async def handle_cancel_bet(bot: Bot, event: Event): """Handle cancel bet - refund all bets placed by the user in current room.""" if not await check_access(bot, event): await cancel_bet_cmd.finish("无权限访问此功能") return scope = get_scope(event) user_id = get_event_id(event) lock = room_store.get_lock(scope) async with lock: room = room_store.get_room(scope) if not room: await cancel_bet_cmd.finish("房间不存在") return if room.state != RoomState.WAITING: await cancel_bet_cmd.finish("比赛已开始,无法取消下注") return user_bets = [b for b in room.bets if b.user_id == user_id] if not user_bets: await cancel_bet_cmd.finish("你还没有下注") return total_refund = 0 refund_errors = [] for bet in user_bets: try: await points_service.refund_bet_points(bet.user_id, bet.amount, "取消下注退还") total_refund += bet.amount except Exception as e: logger.error(f"退还下注失败 user={bet.user_id} amount={bet.amount}: {e}") refund_errors.append(bet) # 只移除已成功退还的下注 if refund_errors: failed_amount = sum(b.amount for b in refund_errors) room.bets = [b for b in room.bets if b.user_id != user_id or b in refund_errors] await cancel_bet_cmd.finish(f"退还部分失败:成功退还 {total_refund} 积分,{len(refund_errors)} 笔退还失败({failed_amount} 积分),请联系管理员") return else: room.bets = [b for b in room.bets if b.user_id != user_id] await cancel_bet_cmd.finish(f"已取消 {len(user_bets)} 笔下注,退还 {total_refund} 积分") @bet_cmd.handle() async def handle_bet(bot: Bot, event: Event): """Handle bet placement.""" if not await check_access(bot, event): await bet_cmd.finish("无权限访问此功能") return msg = str(event.get_message()).strip() parts = msg.split() if len(parts) < 3: await bet_cmd.finish("请使用:/赛马下注 <序号|马匹名> <金额>") return horse_selector = parts[1] try: amount = int(parts[2]) except ValueError: await bet_cmd.finish("金额必须是正整数") return if amount < config.MIN_BET: await bet_cmd.finish(f"最低下注金额为 {config.MIN_BET}") return scope = get_scope(event) user_id = get_event_id(event) lock = room_store.get_lock(scope) async with lock: room = room_store.get_room(scope) if not room: await bet_cmd.finish("房间不存在,请先报名") return if room.state != RoomState.WAITING: await bet_cmd.finish("比赛正在进行中,无法下注") return horse = _resolve_horse_selector(room, horse_selector) if not horse: await bet_cmd.finish(f"马匹序号/名称 \"{horse_selector}\" 不存在") return success, balance = await points_service.spend_bet_points(user_id, amount, f"下注 {_format_horse_label(horse)}") if not success: await bet_cmd.finish(f"积分不足(当前余额:{balance})") return room.bets.append(Bet(user_id=user_id, horse_name=horse.name, amount=amount)) odds = calculate_odds(room) await bet_cmd.finish(f"下注成功!{_format_horse_label(horse)} {amount}积分(当前赔率:{odds.get(horse.name, config.MIN_ODDS):.2f})") odds_cmd = on_command("赛马赔率", priority=5) @odds_cmd.handle() async def handle_odds(bot: Bot, event: Event): """Handle odds display.""" if not await check_access(bot, event): await odds_cmd.finish("无权限访问此功能") return scope = get_scope(event) room = room_store.get_room(scope) if not room: await odds_cmd.finish("房间不存在,请先报名") return if not room.horses: await odds_cmd.finish("还没有马匹报名") return odds = calculate_odds(room) lines = ["当前赔率:"] total_bet = sum(b.amount for b in room.bets) for horse in _get_horses_in_order(room): odd = odds.get(horse.name, config.MIN_ODDS) horse_bet = sum(b.amount for b in room.bets if b.horse_name == horse.name) lines.append(f" {_format_horse_label(horse)} - {odd:.2f}倍 (总下注: {horse_bet})") lines.append(f"总下注池: {total_bet}") await odds_cmd.finish("\n".join(lines)) race_list_cmd = on_command("赛马列表", priority=5) @race_list_cmd.handle() async def handle_race_list(bot: Bot, event: Event): """显示当前房间所有报名马匹信息。""" if not await check_access(bot, event): await race_list_cmd.finish("无权限访问此功能") return scope = get_scope(event) room = room_store.get_room(scope) if not room or not room.horses: await race_list_cmd.finish("暂无报名马匹") return lines = ["🏇 当前报名马匹:"] for horse in _get_horses_in_order(room): owner_display = await _get_user_name(bot, scope, horse.owner_id) lines.append(f" {horse.index}. {horse.name} - 主人: {owner_display}") lines.append(f"\n共 {len(room.horses)} 匹马") await race_list_cmd.finish("\n".join(lines)) start_cmd = on_command("赛马开赛", priority=5) @start_cmd.handle() async def handle_start(bot: Bot, event: Event): """Handle race start - only participants or admins can start.""" if not await check_access(bot, event): await start_cmd.finish("无权限访问此功能") return scope = get_scope(event) user_id = get_event_id(event) lock = room_store.get_lock(scope) async with lock: room = room_store.get_room(scope) if not room: await start_cmd.finish("房间不存在,请先报名") return if room.state != RoomState.WAITING: await start_cmd.finish("比赛已经在进行中") return if len(room.horses) < 2: await start_cmd.finish("至少需要2匹马才能开赛") return # 开赛权限限制:仅参赛者或群管理员可手动开赛(满8匹自动开赛不受影响) is_participant = user_id in [h.owner_id for h in room.horses.values()] is_admin = False if isinstance(event, GroupMessageEvent): try: member_info = await bot.get_group_member_info( group_id=event.group_id, user_id=int(user_id) ) role = member_info.get("role", "") is_admin = role in ("admin", "owner") except Exception: pass if not is_participant and not is_admin: await start_cmd.finish("只有参赛者或群管理员可以开赛") return # Set all horses to racing state for horse in room.horses.values(): horse.state = HorseState.RACING await start_cmd.send("比赛开始!") # Run race in background (outside command handler) task = asyncio.create_task(run_race_with_settlement(bot, room, scope)) race_engine.register_task(scope, task) help_cmd = on_command("赛马帮助", priority=5) @help_cmd.handle() async def handle_help(bot: Bot, event: Event): """Handle help command.""" help_text = f"""🏇 赛马游戏帮助 📌 命令列表: /赛马报名 <马匹名> - 报名参赛(最多8匹马) /赛马报名 - 复用上次绑定的马名,若无则使用群昵称 /赛马取消报名 - 取消报名并退还下注 /赛马下注 <序号|马匹名> <金额> - 下注 /赛马取消下注 - 取消本人在当前房间的所有下注并退还积分 /赛马赔率 - 查看当前赔率和下注池 /赛马列表 - 查看当前报名马匹列表 /赛马开赛 - 开始比赛(至少2匹马) /赛马帮助 - 显示此帮助 📏 规则说明: • 最低下注金额:{config.MIN_BET} 积分 • 参赛马匹上限:8匹 • 开赛要求:至少2匹马报名 • 手动开赛权限:仅当前参赛者或群管理员可操作 💰 奖励机制: • 参赛奖励:参赛者均可获得 {config.PARTICIPANT_REWARD} 积分 • 冠军马主:获得 {config.CHAMPION_REWARD} 积分 • 下注中奖:下注金额 × 赔率 📊 赔率说明: • 赔率根据各马匹下注总额动态计算 • 下注越少的马,赔率越高 • 最低赔率:{config.MIN_ODDS} 倍 🎮 游戏流程: 1️⃣ 玩家报名并绑定马匹名 2️⃣ 玩家可以给任意马匹下注 3️⃣ 满足开赛后,由参赛者或管理员开赛 4️⃣ 比赛实时进行,定期播报进度 5️⃣ 比赛结束后结算积分和奖金""" await help_cmd.finish(help_text)