group_horse_racing: - settle_race: rewrite with 7 bug fixes (race condition, draw double-credit, empty participants, etc.) - models.py: reorder fields for correct defaults, add indexes - message_service: add logger import danding_points: - api.py: add finally blocks to 3 methods (add_points, get_history, get_leaderboard) - database.py: add finally block to get_user_balance chatai: - __init__.py: deprecated API→asyncio.to_thread, deduplicate logging, taskkill filter for safety - screenshot.py: XSS protection with bleach on HTML content - requirements.txt: add bleach dependency danding_qqpush: - api.py L13: fix self-referencing _renderer NameError crash - api.py: lazy singleton pattern via _get_renderer() instead of per-request ImageRenderer - __init__.py: mask Token in log output (security) All 34 tests pass.
67 lines
1.8 KiB
Python
67 lines
1.8 KiB
Python
"""Danding_QqPush 插件初始化模块"""
|
|
from nonebot import get_driver
|
|
from nonebot.log import logger
|
|
from nonebot.plugin import PluginMetadata
|
|
|
|
from .config import Config
|
|
from .api import create_routes
|
|
from .sender import sender
|
|
|
|
|
|
__plugin_meta__ = PluginMetadata(
|
|
name="danding_qqpush",
|
|
description="通过外部 HTTP API 向 QQ 群定向推送通知",
|
|
usage="""
|
|
API 接口:
|
|
POST /danding/qqpush/{token}
|
|
|
|
请求参数:
|
|
{
|
|
"group_id": 123456789,
|
|
"qq": 987654321,
|
|
"text": "系统告警#数据库连接失败#请立即处理"
|
|
}
|
|
|
|
说明:
|
|
- text 中的 # 表示换行
|
|
- 消息会自动渲染为图片并发送到指定群
|
|
""",
|
|
config=Config,
|
|
)
|
|
|
|
|
|
# 加载配置
|
|
plugin_config = Config.model_validate(get_driver().config.model_dump())
|
|
|
|
|
|
def register_routes():
|
|
"""注册 FastAPI 路由"""
|
|
driver = get_driver()
|
|
|
|
# 创建并注册路由
|
|
routes = create_routes(plugin_config.Token, plugin_config)
|
|
driver.server_app.include_router(routes)
|
|
|
|
masked = plugin_config.Token[:4] + "***" if len(plugin_config.Token) > 4 else "***"
|
|
logger.info(f"[Danding_QqPush] API 路由已注册: /danding/qqpush/{masked}")
|
|
|
|
|
|
# 插件加载时注册路由
|
|
try:
|
|
register_routes()
|
|
logger.info("[Danding_QqPush] 插件加载成功")
|
|
except Exception as e:
|
|
logger.error(f"[Danding_QqPush] 插件加载失败: {str(e)}")
|
|
|
|
|
|
# Bot 连接时自动初始化 sender
|
|
driver = get_driver()
|
|
@driver.on_bot_connect
|
|
async def _(bot):
|
|
"""Bot 连接时自动设置 sender"""
|
|
try:
|
|
sender.set_bot(bot)
|
|
logger.info(f"[Danding_QqPush] Bot 已连接: {bot.self_id}")
|
|
except Exception as e:
|
|
logger.error(f"[Danding_QqPush] Bot 连接初始化失败: {e}")
|