from nonebot import on_command from nonebot.adapters.onebot.v11 import Bot, Event, GroupMessageEvent, PrivateMessageEvent from . import plugin_config as config from .commands import get_scope, check_access, room_store, points_service, race_engine from .models import Horse, HorseState, RoomState import random async def check_tester(event: Event) -> bool: """Check if user is a tester.""" if not config.TEST_MODE: return False return event.user_id in config.TESTERS test_reset_points_cmd = on_command("测试重置积分", priority=5) @test_reset_points_cmd.handle() async def handle_test_reset_points(bot: Bot, event: Event): """Reset user points to 1000 for testing.""" if not await check_tester(event): await test_reset_points_cmd.finish("权限不足") return success, _ = await points_service.set_points(event.user_id, 1000, "测试重置积分") if success: await test_reset_points_cmd.finish("积分已重置为1000") else: await test_reset_points_cmd.finish("重置失败") test_set_points_cmd = on_command("测试设置积分", priority=5) @test_set_points_cmd.handle() async def handle_test_set_points(bot: Bot, event: Event): """Set user points for testing.""" if not await check_tester(event): await test_set_points_cmd.finish("权限不足") return # Get the message text and extract amount msg = str(event.get_message()).strip() # Remove command prefix parts = msg.split() if len(parts) < 2: await test_set_points_cmd.finish("请使用: /测试设置积分 <金额>") return try: amount = int(parts[1]) if amount < 0: await test_set_points_cmd.finish("金额必须为非负数") return except ValueError: await test_set_points_cmd.finish("金额必须是整数") return success, _ = await points_service.set_points(event.user_id, amount, f"测试设置积分为{amount}") if success: await test_set_points_cmd.finish(f"积分已设置为 {amount}") else: await test_set_points_cmd.finish("设置失败") test_query_points_cmd = on_command("测试查询积分", priority=5) @test_query_points_cmd.handle() async def handle_test_query_points(bot: Bot, event: Event): """Query user points for testing.""" if not await check_tester(event): await test_query_points_cmd.finish("权限不足") return balance = await points_service.get_balance(event.user_id) await test_query_points_cmd.finish(f"当前积分: {balance}") test_clear_room_cmd = on_command("测试清空房间", priority=5) @test_clear_room_cmd.handle() async def handle_test_clear_room(bot: Bot, event: Event): """Clear test room.""" if not await check_tester(event): await test_clear_room_cmd.finish("权限不足") return scope = get_scope(event) room_store.delete_room(scope) await test_clear_room_cmd.finish("房间已清空") test_force_start_cmd = on_command("测试强制开赛", priority=5) @test_force_start_cmd.handle() async def handle_test_force_start(bot: Bot, event: Event): """Force start race for testing.""" if not await check_tester(event): await test_force_start_cmd.finish("权限不足") return await test_force_start_cmd.finish("测试强制开赛命令") def _generate_random_horse_names(count: int) -> list[str]: prefixes = ["赤焰", "踏雪", "追风", "流星", "疾电", "破晓", "青岚", "玄影", "星尘", "霜刃", "烈阳", "苍穹"] cores = ["奔", "跃", "影", "翼", "刃", "雷", "岚", "焰", "星", "雪", "风", "光"] suffixes = ["号", "骑", "王", "将", "卫", "客", "影", "者", "马", "军"] names: set[str] = set() attempts = 0 while len(names) < count and attempts < 500: attempts += 1 name = f"{random.choice(prefixes)}{random.choice(cores)}{random.choice(suffixes)}" if len(name) > 10: name = name[:10] names.add(name) while len(names) < count: names.add(f"测试马{len(names) + 1}") return list(names)[:count] test_simulate_race_cmd = on_command("测试模拟赛马", priority=5) @test_simulate_race_cmd.handle() async def handle_test_simulate_race(bot: Bot, event: Event): if not await check_tester(event): await test_simulate_race_cmd.finish("权限不足") return scope = get_scope(event) lock = room_store.get_lock(scope) async with lock: race_engine.stop_race(scope) room_store.delete_room(scope) room = room_store.create_room(scope) horse_names = _generate_random_horse_names(8) for idx, horse_name in enumerate(horse_names, start=1): owner_id = f"sim_user_{idx}" room.horses[horse_name] = Horse(owner_id=owner_id, name=horse_name, state=HorseState.RACING) room.state = RoomState.RUNNING progress_samples: list[str] = [] last_progress = "" finished = [] for _ in range(1_000): finished = race_engine.tick(room) last_progress = race_engine.format_progress(room) if len(progress_samples) < 3: progress_samples.append(last_progress) if finished: break if not finished: race_engine.stop_race(scope) room_store.delete_room(scope) await test_simulate_race_cmd.finish("模拟失败:超过最大回合仍未结束") return champion = race_engine.determine_champion(finished) room.champion_name = champion.name room.state = RoomState.FINISHED expected_lines = 1 + len(room.horses) actual_lines = len(last_progress.splitlines()) if actual_lines != expected_lines: race_engine.stop_race(scope) room_store.delete_room(scope) await test_simulate_race_cmd.finish(f"模拟失败:进度渲染行数不匹配(期望{expected_lines},实际{actual_lines})") return missing_names = [name for name in room.horses.keys() if name not in last_progress] if missing_names: race_engine.stop_race(scope) room_store.delete_room(scope) await test_simulate_race_cmd.finish(f"模拟失败:进度渲染缺少马名:{', '.join(missing_names)}") return race_engine.stop_race(scope) room_store.delete_room(scope) sample_block = "\n\n".join(progress_samples[-2:] + [last_progress] if len(progress_samples) >= 2 else [last_progress]) await test_simulate_race_cmd.finish( "\n".join( [ "模拟赛马完成", f"参赛马匹:{', '.join(horse_names)}", f"冠军:{room.champion_name}", f"总回合:{room.tick_count}", "进度片段:", sample_block, ] ) )