Files
DanDingNoneBot/danding_bot/plugins/danding_qqpush/sender.py
Mr.Xia f240ba2882 perf+fix(danding_qqpush): perf优化+安全修复+代码DRY
- image_render: cached draw object, font.getlength() 替代逐字符创建临时Image
- image_render: 移除PNG无效的quality参数
- api.py: ImageRenderer单例复用(避免每请求重载字体)
- api.py: 异常详情不再泄露到API响应
- sender.py: 提取_send_msg()消除重复代码
2026-05-09 23:46:44 +08:00

90 lines
2.8 KiB
Python

"""消息发送模块 - 负责向 QQ 群发送消息"""
from typing import Optional
from nonebot import get_bots, logger
from nonebot.adapters.onebot.v11 import Bot, Message, MessageSegment
class MessageSender:
"""消息发送器"""
def __init__(self):
"""初始化消息发送器"""
self.bot: Optional[Bot] = None
def set_bot(self, bot: Bot):
"""
设置 Bot 实例
Args:
bot: OneBot V11 Bot 实例
"""
self.bot = bot
def get_bot(self) -> Optional[Bot]:
"""
获取 Bot 实例
Returns:
Bot 实例,如果未设置则尝试从全局获取
"""
if self.bot:
return self.bot
# 尝试从全局获取 Bot
try:
bots = get_bots()
if bots:
bot = list(bots.values())[0]
self.bot = bot
return bot
except Exception as e:
logger.warning(f"[QqPush] 获取全局Bot失败: {e}")
return None
async def _send_msg(self, group_id: int, qq: int, segment) -> dict:
"""
内部通用发送方法(@用户 + 任意消息段)
Args:
group_id: 群号
qq: 要 @ 的 QQ 号
segment: MessageSegment 实例
Returns:
发送结果字典
"""
bot = self.get_bot()
if not bot:
raise ValueError("Bot 实例未设置,无法发送消息")
try:
message = Message()
message.append(MessageSegment.at(qq))
message.append(segment)
result = await bot.call_api(
"send_group_msg",
group_id=group_id,
message=message,
__qqpush_source="danding_qqpush"
)
return {"success": True, "data": result, "message": "消息发送成功"}
except Exception as e:
logger.warning(f"[QqPush] 消息发送失败 group={group_id} qq={qq}: {e}")
return {"success": False, "error": str(e), "message": f"消息发送失败: {e}"}
async def send_to_group(self, group_id: int, qq: int, image_base64: str) -> dict:
"""向指定群发送消息(@用户 + 图片)"""
return await self._send_msg(group_id, qq, MessageSegment.image(image_base64))
async def send_text_to_group(self, group_id: int, qq: int, text: str) -> dict:
"""向指定群发送纯文本消息(@用户 + 文本)"""
return await self._send_msg(group_id, qq, MessageSegment.text(text))
# 全局消息发送器实例
sender = MessageSender()