Files
DanDingNoneBot/danding_bot/plugins/danding_qqpush/__init__.py
Mr.Xia c62ac37611 review: fix critical/medium bugs in 4 plugins (round 2)
group_horse_racing:
- settle_race: rewrite with 7 bug fixes (race condition, draw double-credit, empty participants, etc.)
- models.py: reorder fields for correct defaults, add indexes
- message_service: add logger import

danding_points:
- api.py: add finally blocks to 3 methods (add_points, get_history, get_leaderboard)
- database.py: add finally block to get_user_balance

chatai:
- __init__.py: deprecated API→asyncio.to_thread, deduplicate logging, taskkill filter for safety
- screenshot.py: XSS protection with bleach on HTML content
- requirements.txt: add bleach dependency

danding_qqpush:
- api.py L13: fix self-referencing _renderer NameError crash
- api.py: lazy singleton pattern via _get_renderer() instead of per-request ImageRenderer
- __init__.py: mask Token in log output (security)

All 34 tests pass.
2026-05-10 00:30:22 +08:00

67 lines
1.8 KiB
Python

"""Danding_QqPush 插件初始化模块"""
from nonebot import get_driver
from nonebot.log import logger
from nonebot.plugin import PluginMetadata
from .config import Config
from .api import create_routes
from .sender import sender
__plugin_meta__ = PluginMetadata(
name="danding_qqpush",
description="通过外部 HTTP API 向 QQ 群定向推送通知",
usage="""
API 接口:
POST /danding/qqpush/{token}
请求参数:
{
"group_id": 123456789,
"qq": 987654321,
"text": "系统告警#数据库连接失败#请立即处理"
}
说明:
- text 中的 # 表示换行
- 消息会自动渲染为图片并发送到指定群
""",
config=Config,
)
# 加载配置
plugin_config = Config.model_validate(get_driver().config.model_dump())
def register_routes():
"""注册 FastAPI 路由"""
driver = get_driver()
# 创建并注册路由
routes = create_routes(plugin_config.Token, plugin_config)
driver.server_app.include_router(routes)
masked = plugin_config.Token[:4] + "***" if len(plugin_config.Token) > 4 else "***"
logger.info(f"[Danding_QqPush] API 路由已注册: /danding/qqpush/{masked}")
# 插件加载时注册路由
try:
register_routes()
logger.info("[Danding_QqPush] 插件加载成功")
except Exception as e:
logger.error(f"[Danding_QqPush] 插件加载失败: {str(e)}")
# Bot 连接时自动初始化 sender
driver = get_driver()
@driver.on_bot_connect
async def _(bot):
"""Bot 连接时自动设置 sender"""
try:
sender.set_bot(bot)
logger.info(f"[Danding_QqPush] Bot 已连接: {bot.self_id}")
except Exception as e:
logger.error(f"[Danding_QqPush] Bot 连接初始化失败: {e}")