Files
DanDingNoneBot/danding_bot/plugins/group_horse_racing/test_commands.py
Mr.Xia 5df0487b88 fix(测试): 修复完全模拟比赛测试中的消息验证逻辑
- 将消息列表中的消息强制转换为字符串,避免类型错误
- 使用 any() 检查关键消息是否存在,而不是依赖固定索引
- 改进开赛名单和进度消息的验证逻辑
- 修复回合进度条目数量检查的逻辑
2026-04-07 20:38:31 +08:00

414 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from nonebot import on_command
from nonebot.adapters.onebot.v11 import Bot, Event, GroupMessageEvent, PrivateMessageEvent
from . import plugin_config as config
from .commands import get_scope, check_access, room_store, points_service, race_engine
from .models import Horse, HorseState, RoomState, Bet, RaceResult
import asyncio
import random
from datetime import datetime
import traceback
from . import commands as commands_mod
async def check_tester(event: Event) -> bool:
"""Check if user is a tester."""
if not config.TEST_MODE:
return False
return event.user_id in config.TESTERS
test_reset_points_cmd = on_command("测试重置积分", priority=5)
@test_reset_points_cmd.handle()
async def handle_test_reset_points(bot: Bot, event: Event):
"""Reset user points to 1000 for testing."""
if not await check_tester(event):
await test_reset_points_cmd.finish("权限不足")
return
success, _ = await points_service.set_points(event.user_id, 1000, "测试重置积分")
if success:
await test_reset_points_cmd.finish("积分已重置为1000")
else:
await test_reset_points_cmd.finish("重置失败")
test_set_points_cmd = on_command("测试设置积分", priority=5)
@test_set_points_cmd.handle()
async def handle_test_set_points(bot: Bot, event: Event):
"""Set user points for testing."""
if not await check_tester(event):
await test_set_points_cmd.finish("权限不足")
return
# Get the message text and extract amount
msg = str(event.get_message()).strip()
# Remove command prefix
parts = msg.split()
if len(parts) < 2:
await test_set_points_cmd.finish("请使用: /测试设置积分 <金额>")
return
try:
amount = int(parts[1])
if amount < 0:
await test_set_points_cmd.finish("金额必须为非负数")
return
except ValueError:
await test_set_points_cmd.finish("金额必须是整数")
return
success, _ = await points_service.set_points(event.user_id, amount, f"测试设置积分为{amount}")
if success:
await test_set_points_cmd.finish(f"积分已设置为 {amount}")
else:
await test_set_points_cmd.finish("设置失败")
test_query_points_cmd = on_command("测试查询积分", priority=5)
@test_query_points_cmd.handle()
async def handle_test_query_points(bot: Bot, event: Event):
"""Query user points for testing."""
if not await check_tester(event):
await test_query_points_cmd.finish("权限不足")
return
balance = await points_service.get_balance(event.user_id)
await test_query_points_cmd.finish(f"当前积分: {balance}")
test_clear_room_cmd = on_command("测试清空房间", priority=5)
@test_clear_room_cmd.handle()
async def handle_test_clear_room(bot: Bot, event: Event):
"""Clear test room."""
if not await check_tester(event):
await test_clear_room_cmd.finish("权限不足")
return
scope = get_scope(event)
room_store.delete_room(scope)
await test_clear_room_cmd.finish("房间已清空")
test_force_start_cmd = on_command("测试强制开赛", priority=5)
@test_force_start_cmd.handle()
async def handle_test_force_start(bot: Bot, event: Event):
"""Force start race for testing."""
if not await check_tester(event):
await test_force_start_cmd.finish("权限不足")
return
await test_force_start_cmd.finish("测试强制开赛命令")
def _generate_random_horse_names(count: int) -> list[str]:
prefixes = ["赤焰", "踏雪", "追风", "流星", "疾电", "破晓", "青岚", "玄影", "星尘", "霜刃", "烈阳", "苍穹"]
cores = ["", "", "", "", "", "", "", "", "", "", "", ""]
suffixes = ["", "", "", "", "", "", "", "", "", ""]
names: set[str] = set()
attempts = 0
while len(names) < count and attempts < 500:
attempts += 1
name = f"{random.choice(prefixes)}{random.choice(cores)}{random.choice(suffixes)}"
if len(name) > 10:
name = name[:10]
names.add(name)
while len(names) < count:
names.add(f"测试马{len(names) + 1}")
return list(names)[:count]
test_simulate_race_cmd = on_command("测试模拟赛马", aliases={"测试模拟"}, priority=1, block=True)
class _FakeBot:
def __init__(self):
self.messages: list[dict] = []
self._next_message_id = 1
async def send_msg(self, **kwargs):
self.messages.append(dict(kwargs))
message_id = self._next_message_id
self._next_message_id += 1
return {"message_id": message_id}
async def delete_msg(self, message_id: int):
# Simply record the deletion if needed, or do nothing
return
class _InMemoryRoomStore:
def __init__(self):
self.rooms: dict[str, "commands_mod.Room"] = {}
self.saved_results: list[RaceResult] = []
def get_room(self, scope: str):
return self.rooms.get(scope)
def create_room(self, scope: str):
room = commands_mod.Room(scope=scope)
self.rooms[scope] = room
return room
def delete_room(self, scope: str):
if scope in self.rooms:
del self.rooms[scope]
def save_race_result(self, result: RaceResult):
self.saved_results.append(result)
class _InMemoryPointsService:
def __init__(self):
self.calls: list[tuple[str, dict]] = []
async def reward_champion(self, user_id: str):
self.calls.append(("reward_champion", {"user_id": user_id}))
return True, 0
async def reward_participant(self, user_id: str):
self.calls.append(("reward_participant", {"user_id": user_id}))
return True, 0
async def payout_winnings(self, user_id: str, amount: int, odds: float):
self.calls.append(("payout_winnings", {"user_id": user_id, "amount": amount, "odds": odds}))
return True, 0
async def refund_bet_points(self, user_id: str, amount: int, reason: str = "比赛中断退还"):
self.calls.append(("refund_bet_points", {"user_id": user_id, "amount": amount, "reason": reason}))
return True, 0
async def get_balance(self, user_id: str) -> int:
self.calls.append(("get_balance", {"user_id": user_id}))
return 0
class _NoopMessageService:
def __init__(self):
self.last_messages: dict[str, dict[str, str]] = {}
def clear_pending_recalls(self, scope: str):
if scope in self.last_messages:
del self.last_messages[scope]
async def send_with_recall(self, bot, scope, message_type, message):
# Support basic recall for race_update to avoid flooding during simulation
if message_type == "race_update":
await self.recall_previous_of_type(bot, scope, "race_update")
is_group = scope.startswith("group_")
result = await bot.send_msg(
message_type="group" if is_group else "private",
group_id=int(scope.split("_", 1)[1]) if is_group else None,
user_id=int(scope.split("_", 1)[1]) if not is_group else None,
message=message,
)
if scope not in self.last_messages:
self.last_messages[scope] = {}
if isinstance(result, dict) and "message_id" in result:
self.last_messages[scope][message_type] = result["message_id"]
return "fake_msg_id"
async def recall_previous_of_type(self, bot, scope, message_type):
if scope in self.last_messages and message_type in self.last_messages[scope]:
msg_id = self.last_messages[scope][message_type]
try:
await bot.delete_msg(message_id=msg_id)
except Exception:
pass
del self.last_messages[scope][message_type]
@test_simulate_race_cmd.handle()
async def handle_test_simulate_race(bot: Bot, event: Event):
if not await check_tester(event):
await test_simulate_race_cmd.send("权限不足")
return
await test_simulate_race_cmd.send("收到:测试模拟赛马,开始执行完全模拟(无真实积分/数据库副作用)")
raw_msg = str(event.get_message()).strip()
stream_progress = ("展示" in raw_msg) or ("实时" in raw_msg) or ("" in raw_msg)
scope = get_scope(event)
try:
race_engine.stop_race(scope)
room_store.delete_room(scope)
except Exception:
pass
original_room_store = commands_mod.room_store
original_points_service = commands_mod.points_service
original_message_service = commands_mod.message_service
original_tick_interval = commands_mod.config.RACE_TICK_INTERVAL
original_stop_race = commands_mod.race_engine.stop_race
original_send_to_scope = commands_mod._send_to_scope
fake_room_store = _InMemoryRoomStore()
fake_points_service = _InMemoryPointsService()
fake_message_service = _NoopMessageService()
fake_bot = _FakeBot()
start_task: asyncio.Task | None = None
room = None
try:
await test_simulate_race_cmd.send("阶段:构建沙盒环境与参赛数据")
commands_mod.room_store = fake_room_store
commands_mod.points_service = fake_points_service
commands_mod.message_service = fake_message_service
commands_mod.config.RACE_TICK_INTERVAL = 1 if stream_progress else 0
commands_mod.race_engine.stop_race = lambda _scope: commands_mod.race_engine.active_tasks.pop(_scope, None)
progress_count = 0
max_progress = 30
async def _test_send_to_scope(_bot: Bot, _scope: str, message: str, *args, **kwargs):
nonlocal progress_count
await fake_bot.send_msg(message_type="private", user_id=event.user_id, message=message)
if not stream_progress:
return
if message.startswith("【第") and "回合】" in message:
progress_count += 1
if progress_count > max_progress:
return
await original_send_to_scope(bot, scope, message, *args, **kwargs)
commands_mod._send_to_scope = _test_send_to_scope
room = fake_room_store.create_room(scope)
horse_names = _generate_random_horse_names(8)
for idx, horse_name in enumerate(horse_names, start=1):
owner_id = f"sim_user_{idx}"
room.horses[horse_name] = Horse(owner_id=owner_id, name=horse_name, index=idx, state=HorseState.RACING)
room.next_horse_index = len(horse_names) + 1
bet_amount = max(commands_mod.config.MIN_BET, 10)
room.bets.append(Bet(user_id="bettor_1", horse_name=horse_names[0], amount=bet_amount))
room.bets.append(Bet(user_id="bettor_2", horse_name=horse_names[1], amount=bet_amount * 2))
room.state = RoomState.WAITING
for horse in room.horses.values():
horse.state = HorseState.RACING
await test_simulate_race_cmd.send("阶段:执行赛程(后台任务)")
start_task = asyncio.create_task(commands_mod.run_race_with_settlement(fake_bot, room, scope))
commands_mod.race_engine.register_task(scope, start_task)
await asyncio.wait_for(start_task, timeout=60 if stream_progress else 15)
messages = [str(m.get("message", "")) for m in fake_bot.messages]
if not messages:
await test_simulate_race_cmd.send("完全模拟失败:未捕获到任何消息")
return
if not any("比赛开始!" in msg for msg in messages):
await test_simulate_race_cmd.send("完全模拟失败:未发送开赛消息")
return
# Look for the start message to verify horse names
start_msg = next((msg for msg in messages if "比赛开始!" in msg), "")
for idx, horse_name in enumerate(horse_names, start=1):
if f"{idx:02d}{horse_name}" not in start_msg:
await test_simulate_race_cmd.send(f"完全模拟失败:开赛名单中未找到 {idx:02d}{horse_name}")
return
progress_messages = [msg for msg in messages if "【第" in msg and "回合】" in msg]
if not progress_messages:
await test_simulate_race_cmd.send("完全模拟失败:未发送回合进度消息")
return
# Check first progress message format
progress_lines = [line for line in progress_messages[0].splitlines() if "|" in line]
if len(progress_lines) != len(horse_names):
await test_simulate_race_cmd.send("完全模拟失败:回合进度展示条目数量不匹配")
return
if not any("比赛结束!冠军:" in msg for msg in messages):
await test_simulate_race_cmd.send("完全模拟失败:未发送结束结算消息")
return
if not any("积分变化:" in msg for msg in messages):
await test_simulate_race_cmd.send("完全模拟失败:未发送积分变化总结")
return
if not fake_room_store.saved_results:
await test_simulate_race_cmd.send("完全模拟失败:未写入赛史结果(内存)")
return
saved = fake_room_store.saved_results[-1]
if saved.champion_name not in room.horses:
await test_simulate_race_cmd.send("完全模拟失败:赛史冠军不在参赛马匹中")
return
if not saved.point_changes:
await test_simulate_race_cmd.send("完全模拟失败:赛史未记录积分变化")
return
if not saved.point_change_summaries:
await test_simulate_race_cmd.send("完全模拟失败:赛史未记录积分变化总结")
return
champion_owner_id = room.horses[saved.champion_name].owner_id
reward_champion_calls = [c for c in fake_points_service.calls if c[0] == "reward_champion"]
if not reward_champion_calls or reward_champion_calls[0][1]["user_id"] != champion_owner_id:
await test_simulate_race_cmd.send("完全模拟失败:未正确发放冠军奖励(内存记录)")
return
participant_calls = [c for c in fake_points_service.calls if c[0] == "reward_participant"]
if len(participant_calls) != len(room.horses):
await test_simulate_race_cmd.send("完全模拟失败:参赛奖励次数不匹配(内存记录)")
return
await test_simulate_race_cmd.send(
"\n".join(
[
"完全模拟赛马完成(无真实积分/数据库副作用)",
f"参赛马匹:{', '.join(horse_names)}",
f"冠军:{saved.champion_name}(马主:{saved.champion_owner}",
f"总回合:{saved.duration_ticks}",
f"消息条数:{len(messages)}(开赛/进度/结算均已覆盖)",
f"结算调用:{len(fake_points_service.calls)}(冠军/参赛/下注派奖)",
f"积分变化用户数:{len(saved.point_changes)}",
f"过程展示:{'开启' if stream_progress else '关闭'}",
]
)
)
return
except asyncio.TimeoutError:
ticks = room.tick_count if room else 0
await test_simulate_race_cmd.send(f"完全模拟失败:超时未完成(当前回合:{ticks}")
except asyncio.CancelledError:
ticks = room.tick_count if room else 0
await test_simulate_race_cmd.send(f"完全模拟失败:任务被取消(当前回合:{ticks}")
except Exception as e:
tail = "\n".join(traceback.format_exc().splitlines()[-8:])
await test_simulate_race_cmd.send(f"完全模拟异常:{type(e).__name__}: {e}\n{tail}")
finally:
if start_task and not start_task.done():
start_task.cancel()
commands_mod.room_store = original_room_store
commands_mod.points_service = original_points_service
commands_mod.message_service = original_message_service
commands_mod.config.RACE_TICK_INTERVAL = original_tick_interval
commands_mod.race_engine.stop_race = original_stop_race
commands_mod._send_to_scope = original_send_to_scope