Files
DanDingNoneBot/danding_bot/plugins/group_horse_racing/race_engine.py
Mr.Xia fe081f43cf fix(race): 代码质量审查修复 + commands包拆分 + 赛马取消命令
- P1: bet.py赔率计算移入锁内防竞态
- P1: config.py TESTERS解析失败添加warning日志
- P2: 新增赛马取消命令(积分退还/任务取消/状态重置)
- P3: bet.py清理未使用的_send_to_scope导入
- 将commands.py拆分为commands/包(access/bet/help/race/register)
- OpenSpec变更提案: fix-race-conditions-and-logs
2026-05-02 14:33:34 +08:00

86 lines
3.1 KiB
Python

import asyncio
import random
from typing import Optional
from .models import Room, RoomState, Horse, HorseState
from .config import Config
class RaceEngine:
def __init__(self, config: Config):
self.config = config
self.active_tasks: dict[str, asyncio.Task] = {}
def tick(self, room: Room):
"""Advance race by one tick. Returns list of finished horses."""
room.tick_count += 1
for horse in room.horses.values():
if horse.state == HorseState.RACING:
distance = max(0.5, random.gauss(10, 3)) # Minimum 0.5m displacement
horse.position += distance
finished_horses = [
h for h in room.horses.values()
if h.position >= self.config.RACE_DISTANCE
]
return finished_horses
MAX_ROUNDS = 100 # Safety valve
def determine_champion(self, horses: list[Horse]) -> Horse:
"""Determine champion from tied horses."""
if len(horses) == 1:
return horses[0]
for _ in range(self.MAX_ROUNDS):
distances = [max(0, random.gauss(10, 3)) for _ in horses]
max_distance = max(distances)
horses = [h for h, d in zip(horses, distances) if d == max_distance]
if len(horses) == 1:
return horses[0]
# Fallback: random choice after max rounds
return random.choice(horses)
def register_task(self, scope: str, task: asyncio.Task):
"""Register an active race task."""
self.active_tasks[scope] = task
def stop_race(self, scope: str):
"""Stop race and cancel task."""
if scope in self.active_tasks:
task = self.active_tasks[scope]
if not task.done():
task.cancel()
del self.active_tasks[scope]
def format_progress(self, room: Room) -> str:
"""Format race progress as visual bar for each horse."""
distance = self.config.RACE_DISTANCE
bar_width = 20
lines = [f"【第{room.tick_count}回合】"]
sorted_horses = sorted(room.horses.values(), key=lambda h: h.index)
# Calculate max display width for name alignment
# Each CJK/fullwidth char counts as 2, others as 1
def _display_width(s: str) -> int:
w = 0
for ch in s:
w += 2 if '\u4e00' <= ch <= '\u9fff' or '\u3000' <= ch <= '\u303f' or '\uff00' <= ch <= '\uffef' else 1
return w
max_name_width = max(_display_width(h.name) for h in sorted_horses)
for horse in sorted_horses:
progress = min(horse.position / distance, 1.0)
filled = int(progress * bar_width)
bar = "" * filled + "" * (bar_width - filled)
# Pad with fullwidth spaces (1 fullwidth space = 2 columns = 1 CJK char width)
diff = max_name_width - _display_width(horse.name)
pad = "\u3000" * (diff // 2) + (" " if diff % 2 else "")
lines.append(f" {horse.index:02d}{horse.name}{pad} |{bar}| {horse.position:.1f}m")
return "\n".join(lines)