475 lines
16 KiB
Python
475 lines
16 KiB
Python
"""group_horse_racing 运行时 API 改造测试。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import importlib.util
|
|
import sys
|
|
import types
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import aiohttp
|
|
import aiosqlite
|
|
import pytest
|
|
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
PLUGIN_DIR = PROJECT_ROOT / "danding_bot" / "plugins" / "group_horse_racing"
|
|
|
|
|
|
def _load_module(full_name: str, path: Path):
|
|
spec = importlib.util.spec_from_file_location(full_name, path)
|
|
module = importlib.util.module_from_spec(spec)
|
|
sys.modules[full_name] = module
|
|
assert spec and spec.loader
|
|
spec.loader.exec_module(module)
|
|
return module
|
|
|
|
|
|
def _install_nonebot_stubs() -> None:
|
|
"""安装 shared.py 导入所需的最小 NoneBot 类型桩。"""
|
|
|
|
nonebot = sys.modules.setdefault("nonebot", types.ModuleType("nonebot"))
|
|
adapters = sys.modules.setdefault("nonebot.adapters", types.ModuleType("nonebot.adapters"))
|
|
onebot = sys.modules.setdefault("nonebot.adapters.onebot", types.ModuleType("nonebot.adapters.onebot"))
|
|
v11 = types.ModuleType("nonebot.adapters.onebot.v11")
|
|
|
|
class Bot:
|
|
async def get_group_member_info(self, **kwargs):
|
|
return {}
|
|
|
|
class Event:
|
|
def get_user_id(self):
|
|
return str(getattr(self, "user_id", ""))
|
|
|
|
class GroupMessageEvent(Event):
|
|
pass
|
|
|
|
class PrivateMessageEvent(Event):
|
|
pass
|
|
|
|
class Message(list):
|
|
pass
|
|
|
|
class MessageSegment:
|
|
@staticmethod
|
|
def image(data):
|
|
return {"type": "image", "data": data}
|
|
|
|
v11.Bot = Bot
|
|
v11.Event = Event
|
|
v11.GroupMessageEvent = GroupMessageEvent
|
|
v11.PrivateMessageEvent = PrivateMessageEvent
|
|
v11.Message = Message
|
|
v11.MessageSegment = MessageSegment
|
|
sys.modules["nonebot.adapters.onebot.v11"] = v11
|
|
nonebot.adapters = adapters
|
|
adapters.onebot = onebot
|
|
onebot.v11 = v11
|
|
|
|
|
|
def _install_qqpush_stubs() -> None:
|
|
qqpush_config = types.ModuleType("danding_bot.plugins.danding_qqpush.config")
|
|
qqpush_image = types.ModuleType("danding_bot.plugins.danding_qqpush.image_render")
|
|
|
|
class QqPushConfig:
|
|
FontPaths = []
|
|
|
|
class ImageRenderer:
|
|
def __init__(self, **kwargs):
|
|
self.kwargs = kwargs
|
|
|
|
def render_to_base64(self, body, title=""):
|
|
return f"base64://{title}:{body}"
|
|
|
|
qqpush_config.Config = QqPushConfig
|
|
qqpush_image.ImageRenderer = ImageRenderer
|
|
sys.modules["danding_bot.plugins.danding_qqpush.config"] = qqpush_config
|
|
sys.modules["danding_bot.plugins.danding_qqpush.image_render"] = qqpush_image
|
|
|
|
|
|
def _install_points_stub() -> None:
|
|
points_package = types.ModuleType("danding_bot.plugins.danding_points")
|
|
|
|
class PointsApi:
|
|
async def add_points(self, user_id, amount, source, reason=None):
|
|
return True, 0
|
|
|
|
async def spend_points(self, user_id, amount, source, reason=None):
|
|
return True, 0
|
|
|
|
async def set_points(self, user_id, amount, source, reason=None):
|
|
return True, amount
|
|
|
|
async def get_balance(self, user_id):
|
|
return 0
|
|
|
|
points_package.points_api = PointsApi()
|
|
sys.modules["danding_bot.plugins.danding_points"] = points_package
|
|
|
|
|
|
def load_room_store_modules():
|
|
"""直接加载 room_store 相关子模块,避免执行插件入口。"""
|
|
|
|
package_name = "_horse_racing_room_store_under_test"
|
|
package = types.ModuleType(package_name)
|
|
package.__path__ = [str(PLUGIN_DIR)]
|
|
sys.modules[package_name] = package
|
|
|
|
config_module = _load_module(f"{package_name}.config", PLUGIN_DIR / "config.py")
|
|
models_module = _load_module(f"{package_name}.models", PLUGIN_DIR / "models.py")
|
|
room_store_module = _load_module(f"{package_name}.room_store", PLUGIN_DIR / "room_store.py")
|
|
return config_module, models_module, room_store_module
|
|
|
|
|
|
def load_shared_modules():
|
|
"""加载 shared.py 及其依赖,同时隔离 NoneBot 和外部插件依赖。"""
|
|
|
|
_install_nonebot_stubs()
|
|
_install_qqpush_stubs()
|
|
_install_points_stub()
|
|
|
|
package_name = "_horse_racing_shared_under_test"
|
|
package = types.ModuleType(package_name)
|
|
package.__path__ = [str(PLUGIN_DIR)]
|
|
sys.modules[package_name] = package
|
|
|
|
config_module = _load_module(f"{package_name}.config", PLUGIN_DIR / "config.py")
|
|
models_module = _load_module(f"{package_name}.models", PLUGIN_DIR / "models.py")
|
|
package.plugin_config = config_module.Config(RACE_RENDER_AS_IMAGE=False, RACE_TICK_INTERVAL=0, RACE_DISTANCE=1)
|
|
_load_module(f"{package_name}.room_store", PLUGIN_DIR / "room_store.py")
|
|
_load_module(f"{package_name}.points_service", PLUGIN_DIR / "points_service.py")
|
|
_load_module(f"{package_name}.race_engine", PLUGIN_DIR / "race_engine.py")
|
|
_load_module(f"{package_name}.message_service", PLUGIN_DIR / "message_service.py")
|
|
|
|
commands_package_name = f"{package_name}.commands"
|
|
commands_package = types.ModuleType(commands_package_name)
|
|
commands_package.__path__ = [str(PLUGIN_DIR / "commands")]
|
|
sys.modules[commands_package_name] = commands_package
|
|
access_module = types.ModuleType(f"{commands_package_name}.access")
|
|
access_module.get_event_id = lambda event: str(getattr(event, "user_id", ""))
|
|
access_module.get_scope = lambda event: f"group_{getattr(event, 'group_id', '')}"
|
|
async def _check_access(bot, event):
|
|
return True
|
|
access_module.check_access = _check_access
|
|
sys.modules[f"{commands_package_name}.access"] = access_module
|
|
|
|
shared_module = _load_module(f"{commands_package_name}.shared", PLUGIN_DIR / "commands" / "shared.py")
|
|
return config_module, models_module, shared_module
|
|
|
|
|
|
config_module, models_module, room_store_module = load_room_store_modules()
|
|
Config = config_module.Config
|
|
RoomStore = room_store_module.RoomStore
|
|
RaceResult = models_module.RaceResult
|
|
|
|
|
|
class FakeResponse:
|
|
"""模拟 aiohttp 响应上下文。"""
|
|
|
|
def __init__(self, payload, status=200):
|
|
self.payload = payload
|
|
self.status = status
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc, tb):
|
|
return None
|
|
|
|
async def json(self):
|
|
return self.payload
|
|
|
|
|
|
class FakeSession:
|
|
"""记录赛马 HTTP 请求参数的 aiohttp ClientSession 替身。"""
|
|
|
|
def __init__(self, responses, calls):
|
|
self.responses = responses
|
|
self.calls = calls
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc, tb):
|
|
return None
|
|
|
|
def get(self, url, params=None, timeout=None):
|
|
self.calls.append({"method": "GET", "url": url, "params": params, "timeout": timeout})
|
|
return FakeResponse(self.responses.pop(0))
|
|
|
|
def post(self, url, json=None, timeout=None):
|
|
self.calls.append({"method": "POST", "url": url, "json": json, "timeout": timeout})
|
|
return FakeResponse(self.responses.pop(0))
|
|
|
|
def put(self, url, json=None, timeout=None):
|
|
self.calls.append({"method": "PUT", "url": url, "json": json, "timeout": timeout})
|
|
return FakeResponse(self.responses.pop(0))
|
|
|
|
|
|
def success_data(data):
|
|
return {"code": 200, "message": "", "data": data}
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_aiohttp(monkeypatch):
|
|
calls = []
|
|
responses = []
|
|
monkeypatch.setattr(room_store_module.aiohttp, "ClientSession", lambda: FakeSession(responses, calls))
|
|
return responses, calls
|
|
|
|
|
|
def make_store(tmp_path: Path) -> RoomStore:
|
|
return RoomStore(
|
|
Config(
|
|
RACE_DB_FILE=str(tmp_path / "race.db"),
|
|
RACE_API_HOST="http://xapi.test/bot/race/",
|
|
BOT_USER="robot",
|
|
BOT_TOKEN="secret",
|
|
)
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_room_store_race_history_and_horse_names_use_xapi(fake_aiohttp, tmp_path):
|
|
responses, calls = fake_aiohttp
|
|
responses.extend(
|
|
[
|
|
success_data({"user_id": "10001", "horse_name": "赤焰"}),
|
|
success_data({"success": True}),
|
|
success_data({"race_id": "race-001"}),
|
|
]
|
|
)
|
|
store = make_store(tmp_path)
|
|
result = RaceResult(
|
|
race_id="race-001",
|
|
scope="group_1000",
|
|
champion_name="赤焰",
|
|
champion_owner="10001",
|
|
participants=["赤焰", "青岚"],
|
|
bet_distribution={"赤焰": 30, "青岚": 10},
|
|
duration_ticks=8,
|
|
completed_at=datetime(2026, 6, 20, 10, 0, 0),
|
|
point_changes={"10001": 170},
|
|
point_change_summaries={"10001": "大赚特赚"},
|
|
odds_snapshot={"赤焰": 1.5},
|
|
)
|
|
|
|
horse_name = await store.get_last_horse_name("10001")
|
|
await store.set_last_horse_name("10001", "青岚")
|
|
await store.save_race_result(result)
|
|
|
|
assert horse_name == "赤焰"
|
|
assert calls[0]["method"] == "GET"
|
|
assert calls[0]["url"] == "http://xapi.test/bot/race/horse-name"
|
|
assert calls[0]["params"] == {"user": "robot", "token": "secret", "user_id": "10001"}
|
|
assert calls[1]["method"] == "PUT"
|
|
assert calls[1]["json"] == {"user": "robot", "token": "secret", "user_id": "10001", "horse_name": "青岚"}
|
|
assert calls[2]["method"] == "POST"
|
|
assert calls[2]["url"] == "http://xapi.test/bot/race/history"
|
|
assert calls[2]["json"]["race_id"] == "race-001"
|
|
assert calls[2]["json"]["participants"] == ["赤焰", "青岚"]
|
|
assert calls[2]["json"]["odds_snapshot"] == {"赤焰": 1.5}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_room_snapshots_still_use_local_sqlite_without_race_http(fake_aiohttp, tmp_path):
|
|
_responses, calls = fake_aiohttp
|
|
store = make_store(tmp_path)
|
|
|
|
room = await store.create_room("group_1000")
|
|
loaded = store.get_room("group_1000")
|
|
|
|
assert loaded is room
|
|
assert calls == []
|
|
async with aiosqlite.connect(store.db_path) as db:
|
|
cursor = await db.execute("SELECT scope, state FROM room_snapshots")
|
|
rows = await cursor.fetchall()
|
|
await store.close()
|
|
assert rows == [("group_1000", "waiting")]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_race_api_network_error_keeps_old_failure_shapes(monkeypatch, tmp_path):
|
|
class FailingSession:
|
|
async def __aenter__(self):
|
|
raise aiohttp.ClientError("offline")
|
|
|
|
async def __aexit__(self, exc_type, exc, tb):
|
|
return None
|
|
|
|
monkeypatch.setattr(room_store_module.aiohttp, "ClientSession", lambda: FailingSession())
|
|
store = make_store(tmp_path)
|
|
|
|
assert await store.get_last_horse_name("10001") is None
|
|
await store.set_last_horse_name("10001", "赤焰")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_save_race_result_raises_when_xapi_rejects(fake_aiohttp, tmp_path):
|
|
responses, _calls = fake_aiohttp
|
|
responses.append({"code": 500, "message": "写入失败", "data": None})
|
|
store = make_store(tmp_path)
|
|
result = RaceResult(
|
|
race_id="race-rejected",
|
|
scope="group_1000",
|
|
champion_name="赤焰",
|
|
champion_owner="10001",
|
|
participants=["赤焰"],
|
|
bet_distribution={"赤焰": 0},
|
|
duration_ticks=1,
|
|
completed_at=datetime(2026, 6, 20, 10, 0, 0),
|
|
point_changes={"10001": 170},
|
|
point_change_summaries={"10001": "大赚特赚"},
|
|
odds_snapshot={"赤焰": 1.2},
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="赛马赛果写入 xapi 失败"):
|
|
await store.save_race_result(result)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_settle_race_awaits_balances_and_builds_complete_result():
|
|
_config_module, shared_models, shared = load_shared_modules()
|
|
Room = shared_models.Room
|
|
Horse = shared_models.Horse
|
|
HorseState = shared_models.HorseState
|
|
Bet = shared_models.Bet
|
|
|
|
class FakePointsService:
|
|
def __init__(self):
|
|
self.balances = {"10001": 100, "10002": 50, "20001": 200}
|
|
|
|
async def get_balance(self, user_id: str) -> int:
|
|
return self.balances[user_id]
|
|
|
|
async def reward_participant(self, user_id: str):
|
|
self.balances[user_id] += shared.config.PARTICIPANT_REWARD
|
|
return True, self.balances[user_id]
|
|
|
|
async def reward_champion(self, user_id: str):
|
|
self.balances[user_id] += shared.config.CHAMPION_REWARD
|
|
return True, self.balances[user_id]
|
|
|
|
async def payout_winnings(self, user_id: str, amount: int, odds: float):
|
|
self.balances[user_id] += max(1, round(amount * odds))
|
|
return True, self.balances[user_id]
|
|
|
|
shared.points_service = FakePointsService()
|
|
room = Room(scope="group_1000")
|
|
room.horses = {
|
|
"赤焰": Horse(owner_id="10001", name="赤焰", index=1, state=HorseState.RACING),
|
|
"青岚": Horse(owner_id="10002", name="青岚", index=2, state=HorseState.RACING),
|
|
}
|
|
room.bets = [Bet(user_id="20001", horse_name="赤焰", amount=30)]
|
|
room.champion_name = "赤焰"
|
|
room.tick_count = 7
|
|
|
|
settlement = await shared.settle_race(room)
|
|
|
|
assert settlement is not None
|
|
result, odds = settlement
|
|
assert result.race_id
|
|
assert result.scope == "group_1000"
|
|
assert result.participants == ["赤焰", "青岚"]
|
|
assert result.bet_distribution == {"赤焰": 30, "青岚": 0}
|
|
assert result.duration_ticks == 7
|
|
assert result.odds_snapshot == odds
|
|
assert result.point_changes == {"10001": 170, "10002": 20, "20001": 36}
|
|
assert all(isinstance(value, int) for value in result.point_changes.values())
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_race_with_settlement_saves_result_before_deleting_room():
|
|
_config_module, shared_models, shared = load_shared_modules()
|
|
Room = shared_models.Room
|
|
Horse = shared_models.Horse
|
|
HorseState = shared_models.HorseState
|
|
Bet = shared_models.Bet
|
|
|
|
class FakeRaceEngine:
|
|
def __init__(self):
|
|
self.stopped = []
|
|
|
|
def tick(self, room):
|
|
room.tick_count = 3
|
|
return [room.horses["赤焰"]]
|
|
|
|
def format_progress(self, room):
|
|
return "progress"
|
|
|
|
def determine_champion(self, horses):
|
|
return horses[0]
|
|
|
|
def stop_race(self, scope):
|
|
self.stopped.append(scope)
|
|
|
|
class FakePointsService:
|
|
def __init__(self):
|
|
self.balances = {"10001": 100, "10002": 50, "20001": 200}
|
|
|
|
async def get_balance(self, user_id: str) -> int:
|
|
return self.balances[user_id]
|
|
|
|
async def reward_participant(self, user_id: str):
|
|
self.balances[user_id] += shared.config.PARTICIPANT_REWARD
|
|
return True, self.balances[user_id]
|
|
|
|
async def reward_champion(self, user_id: str):
|
|
self.balances[user_id] += shared.config.CHAMPION_REWARD
|
|
return True, self.balances[user_id]
|
|
|
|
async def payout_winnings(self, user_id: str, amount: int, odds: float):
|
|
self.balances[user_id] += max(1, round(amount * odds))
|
|
return True, self.balances[user_id]
|
|
|
|
class FakeMessageService:
|
|
def __init__(self):
|
|
self.sent = []
|
|
self.cleared = []
|
|
|
|
async def send_with_recall(self, bot, scope, message_type, message):
|
|
self.sent.append((scope, message_type, str(message)))
|
|
return "msg"
|
|
|
|
async def recall_previous_of_type(self, bot, scope, message_type):
|
|
return None
|
|
|
|
def clear_pending_recalls(self, scope):
|
|
self.cleared.append(scope)
|
|
|
|
class FakeRoomStore:
|
|
def __init__(self):
|
|
self.saved = []
|
|
self.deleted = []
|
|
|
|
async def save_race_result(self, result):
|
|
self.saved.append(result)
|
|
|
|
def delete_room(self, scope):
|
|
self.deleted.append(scope)
|
|
|
|
shared.race_engine = FakeRaceEngine()
|
|
shared.points_service = FakePointsService()
|
|
shared.message_service = FakeMessageService()
|
|
shared.room_store = FakeRoomStore()
|
|
shared.config.RACE_TICK_INTERVAL = 0
|
|
|
|
room = Room(scope="group_1000")
|
|
room.horses = {
|
|
"赤焰": Horse(owner_id="10001", name="赤焰", index=1, state=HorseState.RACING),
|
|
"青岚": Horse(owner_id="10002", name="青岚", index=2, state=HorseState.RACING),
|
|
}
|
|
room.bets = [Bet(user_id="20001", horse_name="赤焰", amount=30)]
|
|
|
|
await shared.run_race_with_settlement(object(), room, "group_1000")
|
|
|
|
assert len(shared.room_store.saved) == 1
|
|
saved = shared.room_store.saved[0]
|
|
assert saved.scope == "group_1000"
|
|
assert saved.duration_ticks == 3
|
|
assert saved.odds_snapshot == {"赤焰": 1.2, "青岚": 1.2}
|
|
assert shared.room_store.deleted == ["group_1000"]
|
|
assert shared.race_engine.stopped == ["group_1000"]
|
|
assert shared.message_service.cleared == ["group_1000"]
|