Files
DanDingNoneBot/danding_bot/plugins/group_horse_racing/test_commands.py
Mr.Xia e445878ed8 fix: 修正测试中模拟赛马奖励次数检查逻辑
原检查条件错误地减去了1,导致在赛马数量为0时出现负值。
现在直接比较奖励调用次数与赛马数量是否相等。
2026-04-07 20:26:02 +08:00

381 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from nonebot import on_command
from nonebot.adapters.onebot.v11 import Bot, Event, GroupMessageEvent, PrivateMessageEvent
from . import plugin_config as config
from .commands import get_scope, check_access, room_store, points_service, race_engine
from .models import Horse, HorseState, RoomState, Bet, RaceResult
import asyncio
import random
from datetime import datetime
import traceback
from . import commands as commands_mod
async def check_tester(event: Event) -> bool:
"""Check if user is a tester."""
if not config.TEST_MODE:
return False
return event.user_id in config.TESTERS
test_reset_points_cmd = on_command("测试重置积分", priority=5)
@test_reset_points_cmd.handle()
async def handle_test_reset_points(bot: Bot, event: Event):
"""Reset user points to 1000 for testing."""
if not await check_tester(event):
await test_reset_points_cmd.finish("权限不足")
return
success, _ = await points_service.set_points(event.user_id, 1000, "测试重置积分")
if success:
await test_reset_points_cmd.finish("积分已重置为1000")
else:
await test_reset_points_cmd.finish("重置失败")
test_set_points_cmd = on_command("测试设置积分", priority=5)
@test_set_points_cmd.handle()
async def handle_test_set_points(bot: Bot, event: Event):
"""Set user points for testing."""
if not await check_tester(event):
await test_set_points_cmd.finish("权限不足")
return
# Get the message text and extract amount
msg = str(event.get_message()).strip()
# Remove command prefix
parts = msg.split()
if len(parts) < 2:
await test_set_points_cmd.finish("请使用: /测试设置积分 <金额>")
return
try:
amount = int(parts[1])
if amount < 0:
await test_set_points_cmd.finish("金额必须为非负数")
return
except ValueError:
await test_set_points_cmd.finish("金额必须是整数")
return
success, _ = await points_service.set_points(event.user_id, amount, f"测试设置积分为{amount}")
if success:
await test_set_points_cmd.finish(f"积分已设置为 {amount}")
else:
await test_set_points_cmd.finish("设置失败")
test_query_points_cmd = on_command("测试查询积分", priority=5)
@test_query_points_cmd.handle()
async def handle_test_query_points(bot: Bot, event: Event):
"""Query user points for testing."""
if not await check_tester(event):
await test_query_points_cmd.finish("权限不足")
return
balance = await points_service.get_balance(event.user_id)
await test_query_points_cmd.finish(f"当前积分: {balance}")
test_clear_room_cmd = on_command("测试清空房间", priority=5)
@test_clear_room_cmd.handle()
async def handle_test_clear_room(bot: Bot, event: Event):
"""Clear test room."""
if not await check_tester(event):
await test_clear_room_cmd.finish("权限不足")
return
scope = get_scope(event)
room_store.delete_room(scope)
await test_clear_room_cmd.finish("房间已清空")
test_force_start_cmd = on_command("测试强制开赛", priority=5)
@test_force_start_cmd.handle()
async def handle_test_force_start(bot: Bot, event: Event):
"""Force start race for testing."""
if not await check_tester(event):
await test_force_start_cmd.finish("权限不足")
return
await test_force_start_cmd.finish("测试强制开赛命令")
def _generate_random_horse_names(count: int) -> list[str]:
prefixes = ["赤焰", "踏雪", "追风", "流星", "疾电", "破晓", "青岚", "玄影", "星尘", "霜刃", "烈阳", "苍穹"]
cores = ["", "", "", "", "", "", "", "", "", "", "", ""]
suffixes = ["", "", "", "", "", "", "", "", "", ""]
names: set[str] = set()
attempts = 0
while len(names) < count and attempts < 500:
attempts += 1
name = f"{random.choice(prefixes)}{random.choice(cores)}{random.choice(suffixes)}"
if len(name) > 10:
name = name[:10]
names.add(name)
while len(names) < count:
names.add(f"测试马{len(names) + 1}")
return list(names)[:count]
test_simulate_race_cmd = on_command("测试模拟赛马", aliases={"测试模拟"}, priority=1, block=True)
class _FakeBot:
def __init__(self):
self.messages: list[dict] = []
self._next_message_id = 1
async def send_msg(self, **kwargs):
self.messages.append(dict(kwargs))
message_id = str(self._next_message_id)
self._next_message_id += 1
return message_id
class _InMemoryRoomStore:
def __init__(self):
self.rooms: dict[str, "commands_mod.Room"] = {}
self.saved_results: list[RaceResult] = []
def get_room(self, scope: str):
return self.rooms.get(scope)
def create_room(self, scope: str):
room = commands_mod.Room(scope=scope)
self.rooms[scope] = room
return room
def delete_room(self, scope: str):
if scope in self.rooms:
del self.rooms[scope]
def save_race_result(self, result: RaceResult):
self.saved_results.append(result)
class _InMemoryPointsService:
def __init__(self):
self.calls: list[tuple[str, dict]] = []
async def reward_champion(self, user_id: str):
self.calls.append(("reward_champion", {"user_id": user_id}))
return True, 0
async def reward_participant(self, user_id: str):
self.calls.append(("reward_participant", {"user_id": user_id}))
return True, 0
async def payout_winnings(self, user_id: str, amount: int, odds: float):
self.calls.append(("payout_winnings", {"user_id": user_id, "amount": amount, "odds": odds}))
return True, 0
async def refund_bet_points(self, user_id: str, amount: int, reason: str = "比赛中断退还"):
self.calls.append(("refund_bet_points", {"user_id": user_id, "amount": amount, "reason": reason}))
return True, 0
async def get_balance(self, user_id: str) -> int:
self.calls.append(("get_balance", {"user_id": user_id}))
return 0
class _NoopMessageService:
def clear_pending_recalls(self, scope: str):
return
async def send_with_recall(self, bot, scope, message_type, message):
return "fake_msg_id"
async def recall_previous_of_type(self, bot, scope, message_type):
return
@test_simulate_race_cmd.handle()
async def handle_test_simulate_race(bot: Bot, event: Event):
if not await check_tester(event):
await test_simulate_race_cmd.send("权限不足")
return
await test_simulate_race_cmd.send("收到:测试模拟赛马,开始执行完全模拟(无真实积分/数据库副作用)")
raw_msg = str(event.get_message()).strip()
stream_progress = ("展示" in raw_msg) or ("实时" in raw_msg) or ("" in raw_msg)
scope = get_scope(event)
try:
race_engine.stop_race(scope)
room_store.delete_room(scope)
except Exception:
pass
original_room_store = commands_mod.room_store
original_points_service = commands_mod.points_service
original_message_service = commands_mod.message_service
original_tick_interval = commands_mod.config.RACE_TICK_INTERVAL
original_stop_race = commands_mod.race_engine.stop_race
original_send_to_scope = commands_mod._send_to_scope
fake_room_store = _InMemoryRoomStore()
fake_points_service = _InMemoryPointsService()
fake_message_service = _NoopMessageService()
fake_bot = _FakeBot()
start_task: asyncio.Task | None = None
room = None
try:
await test_simulate_race_cmd.send("阶段:构建沙盒环境与参赛数据")
commands_mod.room_store = fake_room_store
commands_mod.points_service = fake_points_service
commands_mod.message_service = fake_message_service
commands_mod.config.RACE_TICK_INTERVAL = 1 if stream_progress else 0
commands_mod.race_engine.stop_race = lambda _scope: commands_mod.race_engine.active_tasks.pop(_scope, None)
progress_count = 0
max_progress = 30
async def _test_send_to_scope(_bot: Bot, _scope: str, message: str, *args, **kwargs):
nonlocal progress_count
await fake_bot.send_msg(message_type="private", user_id=event.user_id, message=message)
if not stream_progress:
return
if message.startswith("【第") and "回合】" in message:
progress_count += 1
if progress_count > max_progress:
return
await original_send_to_scope(bot, scope, message, *args, **kwargs)
commands_mod._send_to_scope = _test_send_to_scope
room = fake_room_store.create_room(scope)
horse_names = _generate_random_horse_names(8)
for idx, horse_name in enumerate(horse_names, start=1):
owner_id = f"sim_user_{idx}"
room.horses[horse_name] = Horse(owner_id=owner_id, name=horse_name, index=idx, state=HorseState.RACING)
room.next_horse_index = len(horse_names) + 1
bet_amount = max(commands_mod.config.MIN_BET, 10)
room.bets.append(Bet(user_id="bettor_1", horse_name=horse_names[0], amount=bet_amount))
room.bets.append(Bet(user_id="bettor_2", horse_name=horse_names[1], amount=bet_amount * 2))
room.state = RoomState.WAITING
for horse in room.horses.values():
horse.state = HorseState.RACING
await test_simulate_race_cmd.send("阶段:执行赛程(后台任务)")
start_task = asyncio.create_task(commands_mod.run_race_with_settlement(fake_bot, room, scope))
commands_mod.race_engine.register_task(scope, start_task)
await asyncio.wait_for(start_task, timeout=60 if stream_progress else 15)
messages = [m.get("message", "") for m in fake_bot.messages]
if not messages:
await test_simulate_race_cmd.send("完全模拟失败:未捕获到任何消息")
return
if "比赛开始!" not in messages[0]:
await test_simulate_race_cmd.send("完全模拟失败:未发送开赛消息")
return
for idx, horse_name in enumerate(horse_names, start=1):
if f"{idx:02d}{horse_name}" not in messages[0]:
await test_simulate_race_cmd.send("完全模拟失败:开赛名单未按序号展示")
return
progress_messages = [msg for msg in messages if "【第" in msg and "回合】" in msg]
if not progress_messages:
await test_simulate_race_cmd.send("完全模拟失败:未发送回合进度消息")
return
progress_lines = progress_messages[0].splitlines()[1:]
if len(progress_lines) != len(horse_names):
await test_simulate_race_cmd.send("完全模拟失败:回合进度展示条目数量不匹配")
return
for idx, line in enumerate(progress_lines, start=1):
if not line.strip().startswith(f"{idx:02d}"):
await test_simulate_race_cmd.send("完全模拟失败:回合进度未按报名序号固定排序")
return
result_msg = messages[-1]
if "比赛结束!冠军:" not in result_msg:
await test_simulate_race_cmd.send("完全模拟失败:未发送结束结算消息")
return
if "积分变化:" not in result_msg:
await test_simulate_race_cmd.send("完全模拟失败:未发送积分变化总结")
return
if not fake_room_store.saved_results:
await test_simulate_race_cmd.send("完全模拟失败:未写入赛史结果(内存)")
return
saved = fake_room_store.saved_results[-1]
if saved.champion_name not in room.horses:
await test_simulate_race_cmd.send("完全模拟失败:赛史冠军不在参赛马匹中")
return
if not saved.point_changes:
await test_simulate_race_cmd.send("完全模拟失败:赛史未记录积分变化")
return
if not saved.point_change_summaries:
await test_simulate_race_cmd.send("完全模拟失败:赛史未记录积分变化总结")
return
champion_owner_id = room.horses[saved.champion_name].owner_id
reward_champion_calls = [c for c in fake_points_service.calls if c[0] == "reward_champion"]
if not reward_champion_calls or reward_champion_calls[0][1]["user_id"] != champion_owner_id:
await test_simulate_race_cmd.send("完全模拟失败:未正确发放冠军奖励(内存记录)")
return
participant_calls = [c for c in fake_points_service.calls if c[0] == "reward_participant"]
if len(participant_calls) != len(room.horses):
await test_simulate_race_cmd.send("完全模拟失败:参赛奖励次数不匹配(内存记录)")
return
await test_simulate_race_cmd.send(
"\n".join(
[
"完全模拟赛马完成(无真实积分/数据库副作用)",
f"参赛马匹:{', '.join(horse_names)}",
f"冠军:{saved.champion_name}(马主:{saved.champion_owner}",
f"总回合:{saved.duration_ticks}",
f"消息条数:{len(messages)}(开赛/进度/结算均已覆盖)",
f"结算调用:{len(fake_points_service.calls)}(冠军/参赛/下注派奖)",
f"积分变化用户数:{len(saved.point_changes)}",
f"过程展示:{'开启' if stream_progress else '关闭'}",
]
)
)
return
except asyncio.TimeoutError:
ticks = room.tick_count if room else 0
await test_simulate_race_cmd.send(f"完全模拟失败:超时未完成(当前回合:{ticks}")
except asyncio.CancelledError:
ticks = room.tick_count if room else 0
await test_simulate_race_cmd.send(f"完全模拟失败:任务被取消(当前回合:{ticks}")
except Exception as e:
tail = "\n".join(traceback.format_exc().splitlines()[-8:])
await test_simulate_race_cmd.send(f"完全模拟异常:{type(e).__name__}: {e}\n{tail}")
finally:
if start_task and not start_task.done():
start_task.cancel()
commands_mod.room_store = original_room_store
commands_mod.points_service = original_points_service
commands_mod.message_service = original_message_service
commands_mod.config.RACE_TICK_INTERVAL = original_tick_interval
commands_mod.race_engine.stop_race = original_stop_race
commands_mod._send_to_scope = original_send_to_scope