Files
DanDingNoneBot/danding_bot/plugins/group_horse_racing/commands.py
Mr.Xia a2b7e1fc11 fix: settle_race返回tuple消除odds重复计算
- settle_race() 返回 tuple[RaceResult, odds] | None
- run_race_with_settlement 解包使用,移除多余 calculate_odds 调用
- _test_send_to_scope 签名已兼容(含*args/**kwargs)
2026-05-01 23:04:32 +08:00

609 lines
22 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
import uuid
from datetime import datetime
from nonebot import on_command
from nonebot.adapters.onebot.v11 import Bot, Event, GroupMessageEvent, Message, MessageSegment, PrivateMessageEvent
from danding_bot.plugins.danding_qqpush.config import Config as QqPushConfig
from danding_bot.plugins.danding_qqpush.image_render import ImageRenderer
from .room_store import RoomStore
from .points_service import PointsService
from .race_engine import RaceEngine
from .message_service import MessageService
from .models import Room, Horse, Bet, HorseState, RoomState, RaceResult
# Import config from __init__ to ensure it's loaded through NoneBot driver
from . import plugin_config as config
logger = logging.getLogger("horse_racing.commands")
room_store = RoomStore(config)
points_service = PointsService(config)
race_engine = RaceEngine(config)
message_service = MessageService(config)
_race_image_renderer: ImageRenderer | None = None
async def _get_user_name(bot: Bot, scope: str, user_id: str) -> str:
"""Get user display name (group card > nickname > user_id)."""
try:
if scope.startswith("group_"):
group_id = int(scope.split("_", 1)[1])
info = await bot.get_group_member_info(group_id=group_id, user_id=int(user_id))
return info.get("card") or info.get("nickname") or user_id
except Exception:
pass
return user_id
async def _build_name_map(bot: Bot, scope: str, user_ids: list[str]) -> dict[str, str]:
"""Build a mapping from user_id to display name."""
name_map: dict[str, str] = {}
for uid in user_ids:
if uid not in name_map:
name_map[uid] = await _get_user_name(bot, scope, uid)
return name_map
def _get_race_image_renderer() -> ImageRenderer:
global _race_image_renderer
if _race_image_renderer is None:
qqpush_config = QqPushConfig()
_race_image_renderer = ImageRenderer(
width=config.RACE_IMAGE_WIDTH,
font_size=config.RACE_IMAGE_FONT_SIZE,
padding=config.RACE_IMAGE_PADDING,
line_spacing=config.RACE_IMAGE_LINE_SPACING,
font_paths=qqpush_config.FontPaths,
)
return _race_image_renderer
def _build_race_image_message(message: str) -> Message:
if message.startswith("比赛开始!"):
title = "🏇 赛马开赛"
body = message.replace("比赛开始!", "发令枪响,比赛正式开始!", 1)
elif message.startswith("比赛结束!"):
title = "🏆 赛马结果"
body = message
else:
title = "📣 赛马进度"
body = f"🏁 实时播报\n{message}"
renderer = _get_race_image_renderer()
image_base64 = renderer.render_to_base64(body, title=title)
message_obj = Message()
message_obj.append(MessageSegment.image(image_base64))
return message_obj
def get_scope(event: Event) -> str:
"""Get room scope from event."""
if isinstance(event, GroupMessageEvent):
return f"group_{event.group_id}"
elif isinstance(event, PrivateMessageEvent):
return f"test_{event.user_id}"
return ""
async def check_access(bot: Bot, event: Event) -> bool:
"""Check if user has access to horse racing."""
if isinstance(event, PrivateMessageEvent):
if not config.TEST_MODE:
return False
return event.user_id in config.TESTERS
if isinstance(event, GroupMessageEvent):
if config.TEST_MODE:
return event.group_id in config.TEST_GROUPS
return event.group_id in config.ALLOWED_GROUPS
return False
def get_event_id(event: Event) -> str:
"""Get user id as string from event."""
return str(event.user_id)
def _normalize_horse_name(horse_name: str) -> str:
return horse_name.strip().casefold()
def _get_horses_in_order(room: Room) -> list[Horse]:
return sorted(room.horses.values(), key=lambda horse: horse.index)
def _format_horse_label(horse: Horse) -> str:
return f"{horse.index:02d}{horse.name}"
def _find_user_horse(room: Room, user_id: str) -> Horse | None:
for horse in _get_horses_in_order(room):
if horse.owner_id == user_id:
return horse
return None
def _find_duplicate_horse(room: Room, horse_name: str) -> Horse | None:
normalized_name = _normalize_horse_name(horse_name)
for horse in room.horses.values():
if _normalize_horse_name(horse.name) == normalized_name:
return horse
return None
def _resolve_horse_selector(room: Room, selector: str) -> Horse | None:
selector = selector.strip()
if selector.isdigit():
target_index = int(selector)
for horse in room.horses.values():
if horse.index == target_index:
return horse
return None
normalized_selector = _normalize_horse_name(selector)
for horse in room.horses.values():
if _normalize_horse_name(horse.name) == normalized_selector:
return horse
return None
def _describe_points_delta(delta: int) -> str:
if delta >= 300:
return "血赚翻倍"
if delta >= 150:
return "大赚特赚"
if delta > 0:
return "小有收获"
if delta == 0:
return "稳住不亏"
if delta <= -300:
return "倾家荡产"
if delta <= -150:
return "伤筋动骨"
return "略有损失"
def _build_point_changes(room: Room, odds: dict[str, float]) -> tuple[dict[str, int], dict[str, str]]:
point_changes: dict[str, int] = {}
# Participation reward for all horse owners
for horse in room.horses.values():
point_changes[horse.owner_id] = point_changes.get(horse.owner_id, 0) + config.PARTICIPANT_REWARD
for bet in room.bets:
point_changes[bet.user_id] = point_changes.get(bet.user_id, 0) - bet.amount
champion = room.horses.get(room.champion_name)
if champion:
point_changes[champion.owner_id] = point_changes.get(champion.owner_id, 0) + config.CHAMPION_REWARD
for bet in room.bets:
if bet.horse_name == room.champion_name:
payout = int(bet.amount * odds.get(bet.horse_name, config.MIN_ODDS))
point_changes[bet.user_id] = point_changes.get(bet.user_id, 0) + payout
point_summaries = {
user_id: _describe_points_delta(delta)
for user_id, delta in point_changes.items()
}
return point_changes, point_summaries
async def _format_point_change_lines(room: Room, point_changes: dict[str, int], point_summaries: dict[str, str], name_map: dict[str, str]) -> list[str]:
ordered_user_ids: list[str] = []
for horse in _get_horses_in_order(room):
if horse.owner_id not in ordered_user_ids:
ordered_user_ids.append(horse.owner_id)
for bet in room.bets:
if bet.user_id not in ordered_user_ids:
ordered_user_ids.append(bet.user_id)
lines = ["积分变化:"]
for user_id in ordered_user_ids:
delta = point_changes.get(user_id, 0)
summary = point_summaries.get(user_id, _describe_points_delta(delta))
display_name = name_map.get(user_id, user_id)
balance = await points_service.get_balance(user_id)
lines.append(f" {display_name} {delta:+d} 积分 · {summary}(余额: {balance}")
return lines
def calculate_odds(room: Room) -> dict[str, float]:
"""Calculate odds for each horse based on bet distribution."""
total_bet = sum(b.amount for b in room.bets)
odds = {}
for name in room.horses:
horse_bet = sum(b.amount for b in room.bets if b.horse_name == name)
if horse_bet == 0:
odds[name] = config.MIN_ODDS
else:
raw_odds = total_bet / horse_bet
odds[name] = max(config.MIN_ODDS, round(raw_odds, 2))
return odds
async def settle_race(room: Room) -> tuple[RaceResult, dict[str, float]] | None:
"""Settle bets and rewards after race finishes. Returns (result, odds) or None."""
champion = room.horses.get(room.champion_name)
if not champion:
return None
# Reward all participants
for horse in room.horses.values():
await points_service.reward_participant(horse.owner_id)
# Reward champion owner
await points_service.reward_champion(champion.owner_id)
# Settle bets
odds = calculate_odds(room)
for bet in room.bets:
if bet.horse_name == room.champion_name:
await points_service.payout_winnings(bet.user_id, bet.amount, odds.get(bet.horse_name, config.MIN_ODDS))
# Save race result
point_changes, point_summaries = _build_point_changes(room, odds)
result = RaceResult(
race_id=str(uuid.uuid4()),
scope=room.scope,
champion_name=champion.name,
champion_owner=champion.owner_id,
participants=[h.name for h in _get_horses_in_order(room)],
bet_distribution={name: sum(b.amount for b in room.bets if b.horse_name == name) for name in room.horses},
duration_ticks=room.tick_count,
completed_at=datetime.now(),
point_changes=point_changes,
point_change_summaries=point_summaries,
odds_snapshot=odds,
)
await room_store.save_race_result(result)
return result, odds
async def _send_to_scope(bot: Bot, scope: str, message: str, message_type: str = "race_update"):
"""Send message to group or private chat based on scope."""
try:
outbound_message: str | Message = message
if config.RACE_RENDER_AS_IMAGE:
outbound_message = _build_race_image_message(message)
await message_service.send_with_recall(bot, scope, message_type, outbound_message)
except Exception as e:
logger.warning(f"_send_to_scope failed for {scope} [{message_type}]: {e}")
async def run_race_with_settlement(bot: Bot, room: Room, scope: str):
"""Run race with live progress updates and settlement."""
room.state = RoomState.RUNNING
horse_list = "\n".join(f" {_format_horse_label(h)}" for h in _get_horses_in_order(room))
await _send_to_scope(bot, scope, f"比赛开始!\n{horse_list}", "race_update")
# Race loop with progress updates
try:
while room.state == RoomState.RUNNING:
await asyncio.sleep(config.RACE_TICK_INTERVAL)
finished = race_engine.tick(room)
progress = race_engine.format_progress(room)
await _send_to_scope(bot, scope, progress, "race_update")
if finished:
champion = race_engine.determine_champion(finished)
room.champion_name = champion.name
room.state = RoomState.FINISHED
break
except asyncio.CancelledError:
room.state = RoomState.INTERRUPTED
for bet in room.bets:
await points_service.refund_bet_points(bet.user_id, bet.amount, "比赛中断退还")
return
# Settle (returns (result, odds) or None)
settlement = await settle_race(room)
result = settlement[0] if settlement else None
odds = settlement[1] if settlement else {}
# Build user_id -> display name mapping
all_user_ids = [h.owner_id for h in room.horses.values()] + [b.user_id for b in room.bets]
name_map = await _build_name_map(bot, scope, all_user_ids)
champion = room.horses.get(room.champion_name)
champion_name_display = name_map.get(champion.owner_id, champion.owner_id) if champion else '?'
result_lines = [
f"比赛结束!冠军:{_format_horse_label(champion) if champion else room.champion_name}",
f"马主 {champion_name_display} 获得 {config.CHAMPION_REWARD} 积分",
]
winning_bets = [b for b in room.bets if b.horse_name == room.champion_name]
if winning_bets:
result_lines.append("下注中奖:")
for b in winning_bets:
payout = int(b.amount * odds.get(b.horse_name, config.MIN_ODDS))
bettor_name = name_map.get(b.user_id, b.user_id)
result_lines.append(f" {bettor_name} 下注 {b.amount} -> 获得 {payout}")
if result:
result_lines.extend(await _format_point_change_lines(room, result.point_changes, result.point_change_summaries, name_map))
# Before sending result, we can recall the last update
await message_service.recall_previous_of_type(bot, scope, "race_update")
await _send_to_scope(bot, scope, "\n".join(result_lines), "race_result")
# Cleanup
race_engine.stop_race(scope)
room_store.delete_room(scope)
message_service.clear_pending_recalls(scope)
# --- Commands ---
register_cmd = on_command("赛马报名", priority=5)
@register_cmd.handle()
async def handle_register(bot: Bot, event: Event):
"""Handle horse registration."""
if not await check_access(bot, event):
await register_cmd.finish("无权限访问此功能")
return
msg = str(event.get_message()).strip()
parts = msg.split(None, 1)
user_id = get_event_id(event)
horse_name = parts[1].strip() if len(parts) > 1 else await room_store.get_last_horse_name(user_id) or ""
if not horse_name:
scope = get_scope(event)
horse_name = await _get_user_name(bot, scope, user_id)
# Ensure name is not too long when using nickname as default
if len(horse_name) > 10:
horse_name = horse_name[:10]
if len(horse_name) > 10:
await register_cmd.finish("马匹名不能超过10个字符")
return
scope = get_scope(event)
lock = room_store.get_lock(scope)
async with lock:
room = room_store.get_room(scope)
if not room:
room = await room_store.create_room(scope)
if room.state != RoomState.WAITING:
await register_cmd.finish("比赛正在进行中,无法报名")
return
if len(room.horses) >= 8:
await register_cmd.finish("房间已满最多8匹马")
return
duplicate_horse = _find_duplicate_horse(room, horse_name)
if duplicate_horse:
await register_cmd.finish(f"马匹名 \"{horse_name}\" 已被 {_format_horse_label(duplicate_horse)} 使用")
return
existing_user_horse = _find_user_horse(room, user_id)
if existing_user_horse:
await register_cmd.finish(f"你已经报名了,当前马匹为 {_format_horse_label(existing_user_horse)}")
return
horse_index = room.next_horse_index
room.next_horse_index += 1
room.horses[horse_name] = Horse(owner_id=user_id, name=horse_name, index=horse_index)
await room_store.set_last_horse_name(user_id, horse_name)
count = len(room.horses)
registered_horse = room.horses[horse_name]
await register_cmd.finish(f"报名成功!{_format_horse_label(registered_horse)} 已加入比赛({count}/8")
cancel_cmd = on_command("赛马取消报名", priority=5)
@cancel_cmd.handle()
async def handle_cancel(bot: Bot, event: Event):
"""Handle cancel registration."""
if not await check_access(bot, event):
await cancel_cmd.finish("无权限访问此功能")
return
scope = get_scope(event)
user_id = get_event_id(event)
lock = room_store.get_lock(scope)
async with lock:
room = room_store.get_room(scope)
if not room:
await cancel_cmd.finish("房间不存在")
return
if room.state != RoomState.WAITING:
await cancel_cmd.finish("比赛正在进行中,无法取消报名")
return
user_horse = _find_user_horse(room, user_id)
if not user_horse:
await cancel_cmd.finish("你还没有报名")
return
bets_to_refund = [b for b in room.bets if b.horse_name == user_horse.name]
for bet in bets_to_refund:
await points_service.refund_bet_points(bet.user_id, bet.amount, "取消报名退还下注")
room.bets = [b for b in room.bets if b.horse_name != user_horse.name]
del room.horses[user_horse.name]
await cancel_cmd.finish(f"已取消报名,{_format_horse_label(user_horse)} 已退出")
bet_cmd = on_command("赛马下注", priority=5)
@bet_cmd.handle()
async def handle_bet(bot: Bot, event: Event):
"""Handle bet placement."""
if not await check_access(bot, event):
await bet_cmd.finish("无权限访问此功能")
return
msg = str(event.get_message()).strip()
parts = msg.split()
if len(parts) < 3:
await bet_cmd.finish("请使用:/赛马下注 <序号|马匹名> <金额>")
return
horse_selector = parts[1]
try:
amount = int(parts[2])
except ValueError:
await bet_cmd.finish("金额必须是正整数")
return
if amount < config.MIN_BET:
await bet_cmd.finish(f"最低下注金额为 {config.MIN_BET}")
return
scope = get_scope(event)
user_id = get_event_id(event)
lock = room_store.get_lock(scope)
async with lock:
room = room_store.get_room(scope)
if not room:
await bet_cmd.finish("房间不存在,请先报名")
return
if room.state != RoomState.WAITING:
await bet_cmd.finish("比赛正在进行中,无法下注")
return
horse = _resolve_horse_selector(room, horse_selector)
if not horse:
await bet_cmd.finish(f"马匹序号/名称 \"{horse_selector}\" 不存在")
return
success, balance = await points_service.spend_bet_points(user_id, amount, f"下注 {_format_horse_label(horse)}")
if not success:
await bet_cmd.finish(f"积分不足(当前余额:{balance}")
return
room.bets.append(Bet(user_id=user_id, horse_name=horse.name, amount=amount))
odds = calculate_odds(room)
await bet_cmd.finish(f"下注成功!{_format_horse_label(horse)} {amount}积分(当前赔率:{odds.get(horse.name, config.MIN_ODDS):.2f}")
odds_cmd = on_command("赛马赔率", priority=5)
@odds_cmd.handle()
async def handle_odds(bot: Bot, event: Event):
"""Handle odds display."""
if not await check_access(bot, event):
await odds_cmd.finish("无权限访问此功能")
return
scope = get_scope(event)
room = room_store.get_room(scope)
if not room:
await odds_cmd.finish("房间不存在,请先报名")
return
if not room.horses:
await odds_cmd.finish("还没有马匹报名")
return
odds = calculate_odds(room)
lines = ["当前赔率:"]
total_bet = sum(b.amount for b in room.bets)
for horse in _get_horses_in_order(room):
odd = odds.get(horse.name, config.MIN_ODDS)
horse_bet = sum(b.amount for b in room.bets if b.horse_name == horse.name)
lines.append(f" {_format_horse_label(horse)} - {odd:.2f}倍 (总下注: {horse_bet})")
lines.append(f"总下注池: {total_bet}")
await odds_cmd.finish("\n".join(lines))
start_cmd = on_command("赛马开赛", priority=5)
@start_cmd.handle()
async def handle_start(bot: Bot, event: Event):
"""Handle race start."""
if not await check_access(bot, event):
await start_cmd.finish("无权限访问此功能")
return
scope = get_scope(event)
lock = room_store.get_lock(scope)
async with lock:
room = room_store.get_room(scope)
if not room:
await start_cmd.finish("房间不存在,请先报名")
return
if room.state != RoomState.WAITING:
await start_cmd.finish("比赛已经在进行中")
return
if len(room.horses) < 2:
await start_cmd.finish("至少需要2匹马才能开赛")
return
# Set all horses to racing state
for horse in room.horses.values():
horse.state = HorseState.RACING
await start_cmd.send("比赛开始!")
# Run race in background (outside command handler)
task = asyncio.create_task(run_race_with_settlement(bot, room, scope))
race_engine.register_task(scope, task)
help_cmd = on_command("赛马帮助", priority=5)
@help_cmd.handle()
async def handle_help(bot: Bot, event: Event):
"""Handle help command."""
help_text = f"""🏇 赛马游戏帮助
📌 命令列表:
/赛马报名 <马匹名> - 报名参赛最多8匹马
/赛马报名 - 复用上次绑定的马名,若无则使用群昵称
/赛马取消报名 - 取消报名并退还下注
/赛马下注 <序号|马匹名> <金额> - 下注
/赛马赔率 - 查看当前赔率和下注池
/赛马开赛 - 开始比赛至少2匹马
/赛马帮助 - 显示此帮助
📏 规则说明:
• 最低下注金额:{config.MIN_BET} 积分
• 参赛马匹上限8匹
• 开赛要求至少2匹马报名
💰 奖励机制:
• 冠军马主:获得 {config.CHAMPION_REWARD} 积分
• 下注中奖:下注金额 × 赔率
📊 赔率说明:
• 赔率根据各马匹下注总额动态计算
• 下注越少的马,赔率越高
• 最低赔率:{config.MIN_ODDS}
🎮 游戏流程:
1⃣ 玩家报名并绑定马匹名
2⃣ 玩家可以给任意马匹下注
3⃣ 满足开赛条件后,任意玩家可开赛
4⃣ 比赛实时进行,定期播报进度
5⃣ 比赛结束后结算积分和奖金"""
await help_cmd.finish(help_text)