功能:实现 Group_Horse_Racing 群赛马插件

- 新增群赛马游戏插件,支持多人参与赛马竞猜

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-03 00:24:25 +08:00
parent 0fd011fa1e
commit ab1b25e239
9 changed files with 633 additions and 0 deletions

View File

@@ -0,0 +1,19 @@
from nonebot import require
from nonebot.plugin import PluginMetadata
from .config import Config
require("danding_bot.plugins.danding_points")
__plugin_meta__ = PluginMetadata(
name="Group Horse Racing",
description="Group horse racing plugin with betting and points integration",
usage="Use /赛马 commands for horse racing gameplay",
type="application",
config=Config,
extra={
"required_plugins": ["danding_bot.plugins.danding_points"],
},
)
from . import commands, test_commands # noqa: F401, E402

View File

@@ -0,0 +1,109 @@
from nonebot import on_command
from nonebot.adapters.onebot.v11 import Bot, Event, GroupMessageEvent, PrivateMessageEvent
from .config import Config
from .room_store import RoomStore
from .points_service import PointsService
from .race_engine import RaceEngine
from .message_service import MessageService
from .models import Room, Horse, Bet, HorseState
config = Config()
room_store = RoomStore(config)
points_service = PointsService(config)
race_engine = RaceEngine(config)
message_service = MessageService(config)
def get_scope(event: Event) -> str:
"""Get room scope from event."""
if isinstance(event, GroupMessageEvent):
return f"group_{event.group_id}"
elif isinstance(event, PrivateMessageEvent):
return f"test_{event.user_id}"
return ""
async def check_access(bot: Bot, event: Event) -> bool:
"""Check if user has access to horse racing."""
if isinstance(event, PrivateMessageEvent):
if not config.TEST_MODE:
return False
return event.user_id in config.TESTERS
if isinstance(event, GroupMessageEvent):
if config.TEST_MODE:
return event.group_id in config.TEST_GROUPS
return event.group_id in config.ALLOWED_GROUPS
return False
register_cmd = on_command("赛马报名", priority=5)
@register_cmd.handle()
async def handle_register(bot: Bot, event: Event):
"""Handle horse registration."""
if not await check_access(bot, event):
await register_cmd.finish("无权限访问此功能")
return
scope = get_scope(event)
lock = room_store.get_lock(scope)
async with lock:
room = room_store.get_room(scope)
if not room:
room = room_store.create_room(scope)
if len(room.horses) >= 8:
await register_cmd.finish("房间已满")
return
await register_cmd.finish("报名成功")
start_cmd = on_command("赛马开赛", priority=5)
@start_cmd.handle()
async def handle_start(bot: Bot, event: Event):
"""Handle race start."""
if not await check_access(bot, event):
await start_cmd.finish("无权限访问此功能")
return
scope = get_scope(event)
lock = room_store.get_lock(scope)
async with lock:
room = room_store.get_room(scope)
if not room:
await start_cmd.finish("房间不存在")
return
if len(room.horses) < 2:
await start_cmd.finish("至少需要2匹马才能开赛")
return
await race_engine.start_race(room)
await start_cmd.finish("比赛开始!")
help_cmd = on_command("赛马帮助", priority=5)
@help_cmd.handle()
async def handle_help(bot: Bot, event: Event):
"""Handle help command."""
help_text = """
赛马命令帮助:
/赛马报名 <马匹名> - 报名参赛
/赛马取消报名 - 取消报名
/赛马下注 <马匹名> <金额> - 下注
/赛马开赛 - 开始比赛
/赛马赔率 - 查看赔率
/赛马帮助 - 显示此帮助
"""
await help_cmd.finish(help_text)

View File

@@ -0,0 +1,37 @@
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Config(BaseSettings):
model_config = SettingsConfigDict(
extra="ignore",
env_prefix="GROUP_HORSE_RACING_",
)
TEST_MODE: bool = False
TESTERS: set[int] = Field(default_factory=set)
TEST_GROUPS: set[int] = Field(default_factory=set)
ALLOWED_GROUPS: set[int] = Field(default_factory=set)
PARTICIPANT_REWARD: int = 50
CHAMPION_REWARD: int = 200
MIN_BET: int = 10
MIN_ODDS: float = 1.2
RACE_DISTANCE: int = 100
RACE_TICK_INTERVAL: int = 5
MESSAGE_RECALL: dict[str, int] = Field(
default_factory=lambda: {
"race_update": 30,
"registration": 180,
"bet_confirm": 180,
"cancel_confirm": 60,
"error": 60,
"race_result": 0,
"leaderboard": 0,
"help": 0,
"odds_display": 0,
}
)
RACE_DB_FILE: str = "data/group_horse_racing/race.db"

View File

@@ -0,0 +1,65 @@
import asyncio
from typing import Optional, Callable
from datetime import datetime, timedelta
from .config import Config
class MessageService:
def __init__(self, config: Config):
self.config = config
self.pending_recalls: dict[str, list[asyncio.Task]] = {}
async def send_with_recall(
self,
scope: str,
message_type: str,
send_func: Callable,
*args,
**kwargs,
) -> Optional[str]:
"""Send message and schedule recall if configured."""
try:
message_id = await send_func(*args, **kwargs)
if not message_id:
return None
recall_time = self.config.MESSAGE_RECALL.get(message_type, 0)
if recall_time > 0:
task = asyncio.create_task(
self._schedule_recall(scope, message_id, recall_time, send_func)
)
if scope not in self.pending_recalls:
self.pending_recalls[scope] = []
self.pending_recalls[scope].append(task)
return message_id
except Exception as e:
return None
async def _schedule_recall(
self,
scope: str,
message_id: str,
delay: int,
recall_func: Callable,
):
"""Schedule message recall."""
try:
await asyncio.sleep(delay)
await recall_func(message_id)
except Exception:
pass
def clear_pending_recalls(self, scope: str):
"""Cancel all pending recall tasks for a scope."""
if scope in self.pending_recalls:
for task in self.pending_recalls[scope]:
if not task.done():
task.cancel()
del self.pending_recalls[scope]
def clear_all_recalls(self):
"""Cancel all pending recall tasks."""
for scope in list(self.pending_recalls.keys()):
self.clear_pending_recalls(scope)

View File

@@ -0,0 +1,55 @@
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
from typing import Optional
class RoomState(str, Enum):
WAITING = "waiting"
RUNNING = "running"
FINISHED = "finished"
INTERRUPTED = "interrupted"
class HorseState(str, Enum):
READY = "ready"
RACING = "racing"
FINISHED = "finished"
@dataclass
class Horse:
owner_id: str
name: str
position: float = 0.0
state: HorseState = HorseState.READY
@dataclass
class Bet:
user_id: str
horse_name: str
amount: int
@dataclass
class Room:
scope: str
state: RoomState = RoomState.WAITING
created_at: datetime = field(default_factory=datetime.now)
horses: dict[str, Horse] = field(default_factory=dict)
bets: list[Bet] = field(default_factory=list)
champion_name: Optional[str] = None
tick_count: int = 0
@dataclass
class RaceResult:
race_id: str
scope: str
champion_name: str
champion_owner: str
participants: list[str]
bet_distribution: dict[str, int]
duration_ticks: int
completed_at: datetime

View File

@@ -0,0 +1,63 @@
from typing import Tuple
from danding_bot.plugins.danding_points import points_api
from .config import Config
class PointsService:
def __init__(self, config: Config):
self.config = config
async def spend_bet_points(
self, user_id: str, amount: int, reason: str = "赛马下注"
) -> Tuple[bool, int]:
"""Deduct points for betting with retry."""
success, balance = await points_api.spend_points(
user_id, amount, "horse_race", reason
)
if not success:
success, balance = await points_api.spend_points(
user_id, amount, "horse_race", reason
)
return success, balance
async def refund_bet_points(
self, user_id: str, amount: int, reason: str = "取消报名退还"
) -> Tuple[bool, int]:
"""Refund bet points."""
return await points_api.add_points(user_id, amount, "horse_race", reason)
async def payout_winnings(
self, user_id: str, amount: int, odds: float
) -> Tuple[bool, int]:
"""Payout bet winnings."""
payout = int(amount * odds)
reason = f"下注获胜 ×{odds:.2f}"
return await points_api.add_points(user_id, payout, "horse_race", reason)
async def reward_participant(self, user_id: str) -> Tuple[bool, int]:
"""Reward race participant."""
return await points_api.add_points(
user_id,
self.config.PARTICIPANT_REWARD,
"horse_race",
"参赛奖励",
)
async def reward_champion(self, user_id: str) -> Tuple[bool, int]:
"""Reward race champion."""
return await points_api.add_points(
user_id,
self.config.CHAMPION_REWARD,
"horse_race",
"冠军奖励",
)
async def set_points(
self, user_id: str, amount: int, reason: str = "测试设置积分"
) -> Tuple[bool, int]:
"""Set user points (for testing)."""
return await points_api.set_points(user_id, amount, "horse_race", reason)
async def get_balance(self, user_id: str) -> int:
"""Get user balance."""
return await points_api.get_balance(user_id)

View File

@@ -0,0 +1,62 @@
import asyncio
import random
from typing import Optional
from .models import Room, RoomState, Horse, HorseState
from .config import Config
class RaceEngine:
def __init__(self, config: Config):
self.config = config
self.active_tasks: dict[str, asyncio.Task] = {}
async def start_race(self, room: Room) -> asyncio.Task:
"""Start race progression loop."""
task = asyncio.create_task(self._race_loop(room))
self.active_tasks[room.scope] = task
return task
async def _race_loop(self, room: Room):
"""Main race progression loop."""
room.state = RoomState.RUNNING
while room.state == RoomState.RUNNING:
await asyncio.sleep(self.config.RACE_TICK_INTERVAL)
room.tick_count += 1
for horse in room.horses.values():
if horse.state == HorseState.RACING:
distance = max(0, random.gauss(10, 3))
horse.position += distance
finished_horses = [
h for h in room.horses.values()
if h.position >= self.config.RACE_DISTANCE
]
if finished_horses:
champion = self._determine_champion(finished_horses)
room.champion_name = champion.name
room.state = RoomState.FINISHED
break
def _determine_champion(self, horses: list[Horse]) -> Horse:
"""Determine champion from tied horses."""
if len(horses) == 1:
return horses[0]
while len(horses) > 1:
distances = [max(0, random.gauss(10, 3)) for _ in horses]
max_distance = max(distances)
horses = [h for h, d in zip(horses, distances) if d == max_distance]
return horses[0]
def stop_race(self, scope: str):
"""Stop race and cancel task."""
if scope in self.active_tasks:
task = self.active_tasks[scope]
if not task.done():
task.cancel()
del self.active_tasks[scope]

View File

@@ -0,0 +1,137 @@
import asyncio
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Optional
from .models import Room, RoomState, RaceResult
from .config import Config
class RoomStore:
def __init__(self, config: Config):
self.config = config
self.rooms: dict[str, Room] = {}
self._locks: dict[str, asyncio.Lock] = {}
self.db_path = Path(config.RACE_DB_FILE)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _init_db(self):
"""Initialize database tables."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS room_snapshots (
scope TEXT PRIMARY KEY,
state TEXT NOT NULL,
created_at TEXT NOT NULL,
horses TEXT NOT NULL,
bets TEXT NOT NULL,
champion_name TEXT,
tick_count INTEGER DEFAULT 0
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS race_history (
race_id TEXT PRIMARY KEY,
scope TEXT NOT NULL,
champion_name TEXT NOT NULL,
champion_owner TEXT NOT NULL,
participants TEXT NOT NULL,
bet_distribution TEXT NOT NULL,
duration_ticks INTEGER NOT NULL,
completed_at TEXT NOT NULL
)
""")
conn.commit()
conn.close()
def get_lock(self, scope: str) -> asyncio.Lock:
"""Get or create per-room lock."""
if scope not in self._locks:
self._locks[scope] = asyncio.Lock()
return self._locks[scope]
def get_room(self, scope: str) -> Optional[Room]:
"""Get room by scope."""
return self.rooms.get(scope)
def create_room(self, scope: str) -> Room:
"""Create new room."""
room = Room(scope=scope)
self.rooms[scope] = room
self._save_snapshot(room)
return room
def delete_room(self, scope: str):
"""Delete room."""
if scope in self.rooms:
del self.rooms[scope]
def _save_snapshot(self, room: Room):
"""Save room snapshot to database."""
import json
horses_json = json.dumps({
name: {
"owner_id": horse.owner_id,
"name": horse.name,
"position": horse.position,
"state": horse.state.value,
}
for name, horse in room.horses.items()
})
bets_json = json.dumps([
{
"user_id": bet.user_id,
"horse_name": bet.horse_name,
"amount": bet.amount,
}
for bet in room.bets
])
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO room_snapshots
(scope, state, created_at, horses, bets, champion_name, tick_count)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
room.scope,
room.state.value,
room.created_at.isoformat(),
horses_json,
bets_json,
room.champion_name,
room.tick_count,
))
conn.commit()
conn.close()
def save_race_result(self, result: RaceResult):
"""Save race result to history."""
import json
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO race_history
(race_id, scope, champion_name, champion_owner, participants, bet_distribution, duration_ticks, completed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
result.race_id,
result.scope,
result.champion_name,
result.champion_owner,
json.dumps(result.participants),
json.dumps(result.bet_distribution),
result.duration_ticks,
result.completed_at.isoformat(),
))
conn.commit()
conn.close()

View File

@@ -0,0 +1,86 @@
from nonebot import on_command
from nonebot.adapters.onebot.v11 import Bot, Event, GroupMessageEvent, PrivateMessageEvent
from .config import Config
from .room_store import RoomStore
from .points_service import PointsService
from .commands import get_scope, check_access, room_store, points_service
config = Config()
async def check_tester(event: Event) -> bool:
"""Check if user is a tester."""
return event.user_id in config.TESTERS
test_reset_points_cmd = on_command("测试重置积分", priority=5)
@test_reset_points_cmd.handle()
async def handle_test_reset_points(bot: Bot, event: Event):
"""Reset user points to 1000 for testing."""
if not await check_tester(event):
await test_reset_points_cmd.finish("权限不足")
return
success, _ = await points_service.set_points(event.user_id, 1000, "测试重置积分")
if success:
await test_reset_points_cmd.finish("积分已重置为1000")
else:
await test_reset_points_cmd.finish("重置失败")
test_set_points_cmd = on_command("测试设置积分", priority=5)
@test_set_points_cmd.handle()
async def handle_test_set_points(bot: Bot, event: Event):
"""Set user points for testing."""
if not await check_tester(event):
await test_set_points_cmd.finish("权限不足")
return
await test_set_points_cmd.finish("请使用: /测试设置积分 <金额>")
test_query_points_cmd = on_command("测试查询积分", priority=5)
@test_query_points_cmd.handle()
async def handle_test_query_points(bot: Bot, event: Event):
"""Query user points for testing."""
if not await check_tester(event):
await test_query_points_cmd.finish("权限不足")
return
balance = await points_service.get_balance(event.user_id)
await test_query_points_cmd.finish(f"当前积分: {balance}")
test_clear_room_cmd = on_command("测试清空房间", priority=5)
@test_clear_room_cmd.handle()
async def handle_test_clear_room(bot: Bot, event: Event):
"""Clear test room."""
if not await check_tester(event):
await test_clear_room_cmd.finish("权限不足")
return
scope = get_scope(event)
room_store.delete_room(scope)
await test_clear_room_cmd.finish("房间已清空")
test_force_start_cmd = on_command("测试强制开赛", priority=5)
@test_force_start_cmd.handle()
async def handle_test_force_start(bot: Bot, event: Event):
"""Force start race for testing."""
if not await check_tester(event):
await test_force_start_cmd.finish("权限不足")
return
await test_force_start_cmd.finish("测试强制开赛命令")