perf+fix(danding_qqpush): perf优化+安全修复+代码DRY

- image_render: cached draw object, font.getlength() 替代逐字符创建临时Image
- image_render: 移除PNG无效的quality参数
- api.py: ImageRenderer单例复用(避免每请求重载字体)
- api.py: 异常详情不再泄露到API响应
- sender.py: 提取_send_msg()消除重复代码
This commit is contained in:
2026-05-09 23:46:44 +08:00
parent b444bd62f5
commit f240ba2882
3 changed files with 235 additions and 291 deletions

View File

@@ -8,6 +8,9 @@ from nonebot import get_driver, logger
from .config import Config
from .text_parser import TextParser
from .image_render import ImageRenderer
# Module-level singleton: load font once, reuse across requests
_renderer = _renderer # reuse module-level singleton
from .sender import sender
@@ -134,10 +137,10 @@ def create_routes(token: str, config: Config):
raise
except Exception as e:
logger.exception(f"推送接口异常: {str(e)}")
logger.exception(f"推送接口异常: {e}")
raise HTTPException(
status_code=500,
detail=f"服务器内部错误: {str(e)}"
detail="服务器内部错误"
)
return router

View File

@@ -1,215 +1,215 @@
"""图片生成模块 - 将文本渲染为图片"""
from PIL import Image, ImageDraw, ImageFont
from pilmoji import Pilmoji
import io
import base64
from typing import Tuple
class ImageRenderer:
"""图片渲染器"""
DEFAULT_FONT_PATHS = [
"/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
"/usr/share/fonts/truetype/noto/NotoColorEmoji.ttf",
"C:/Windows/Fonts/msyh.ttc",
"C:/Windows/Fonts/simhei.ttf",
"C:/Windows/Fonts/seguiemj.ttf",
]
def __init__(
self,
width: int = 800,
font_size: int = 24,
padding: int = 30,
line_spacing: float = 1.4,
bg_color: Tuple[int, int, int] = (252, 252, 252),
text_color: Tuple[int, int, int] = (0, 0, 0),
font_paths: list = None
):
"""
初始化图片渲染器
Args:
width: 图片宽度
font_size: 字体大小
padding: 内边距
line_spacing: 行距倍数
bg_color: 背景颜色 (R, G, B)
text_color: 文本颜色 (R, G, B)
font_paths: 字体文件路径列表
"""
self.width = width
self.font_size = font_size
self.padding = padding
self.line_spacing = line_spacing
self.bg_color = bg_color
self.text_color = text_color
self.font_paths = font_paths or self.DEFAULT_FONT_PATHS.copy()
# 加载字体
self.font = self._load_font()
def _load_font(self) -> ImageFont.FreeTypeFont:
"""加载字体"""
return self._load_font_by_size(self.font_size)
def _load_font_by_size(self, font_size: int) -> ImageFont.FreeTypeFont:
"""
加载指定大小的字体
Args:
font_size: 字体大小
Returns:
字体对象
"""
for font_path in self.font_paths:
try:
font = ImageFont.truetype(font_path, font_size)
return font
except Exception:
continue
# 如果都失败了,使用默认字体
return ImageFont.load_default()
def _calculate_text_dimensions(self, text: str) -> Tuple[int, int]:
"""
计算文本的尺寸
Args:
text: 文本内容
Returns:
(宽度, 高度)
"""
# 创建一个临时图片用于计算
test_img = Image.new('RGB', (1, 1), color=self.bg_color)
test_draw = ImageDraw.Draw(test_img)
bbox = test_draw.textbbox((0, 0), text, font=self.font)
width = bbox[2] - bbox[0]
height = bbox[3] - bbox[1]
return width, height
def _smart_text_wrap(self, text: str, max_width: int) -> list:
"""
智能文本换行
Args:
text: 文本内容
max_width: 最大宽度
Returns:
换行后的文本列表
"""
lines = []
current_line = ""
current_width = 0
for char in text:
char_width, _ = self._calculate_text_dimensions(char)
if current_width + char_width <= max_width:
current_line += char
current_width += char_width
else:
if current_line:
lines.append(current_line)
current_line = char
current_width = char_width
if current_line:
lines.append(current_line)
return lines
def render(self, text: str, title: str = "蛋定助手通知您:") -> bytes:
"""
将文本渲染为图片
Args:
text: 文本内容(已处理换行符)
Returns:
图片的字节数据
"""
if not text:
text = "(空消息)"
# 计算有效宽度
effective_width = self.width - (2 * self.padding)
# 按段落处理
all_lines = []
for paragraph in text.split('\n'):
if not paragraph:
all_lines.append("")
continue
wrapped_lines = self._smart_text_wrap(paragraph, effective_width)
all_lines.extend(wrapped_lines)
# 计算行高
_, line_height = self._calculate_text_dimensions("测试")
total_line_height = int(line_height * self.line_spacing)
# 标题相关
title_font_size = int(self.font_size * 1.3) # 标题字体放大1.3倍
title_font = self._load_font_by_size(title_font_size)
title_height = int(line_height * 1.5) # 标题区域高度
divider_height = 2 # 分割线高度
divider_spacing = 10 # 分割线与标题的间距
# 计算总高度(包含标题区域)
content_height = (len(all_lines) * total_line_height) + (2 * self.padding)
header_height = title_height + divider_spacing + divider_height + self.padding
total_height = content_height + header_height
height = max(total_height, 200) # 最小高度
# 创建图片
image = Image.new('RGB', (self.width, height), color=self.bg_color)
draw = ImageDraw.Draw(image)
title_x = self.padding
title_y = self.padding
with Pilmoji(image) as pilmoji:
pilmoji.text((title_x, title_y), title, font=title_font, fill=self.text_color)
# 绘制分割线
divider_y = title_y + title_height + 10
draw.line(
[(self.padding, divider_y), (self.width - self.padding, divider_y)],
fill=(200, 200, 200),
width=divider_height
)
y = divider_y + divider_spacing + self.padding
with Pilmoji(image) as pilmoji:
for line in all_lines:
if line:
pilmoji.text((self.padding, y), line, font=self.font, fill=self.text_color)
y += total_line_height
# 转换为字节流
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG', quality=95)
img_byte_arr.seek(0)
return img_byte_arr.getvalue()
def render_to_base64(self, text: str, title: str = "蛋定助手通知您:") -> str:
"""
将文本渲染为图片并返回 base64 编码
Args:
text: 文本内容
Returns:
base64 编码的图片数据
"""
img_bytes = self.render(text, title=title)
base64_str = base64.b64encode(img_bytes).decode('utf-8')
return f"base64://{base64_str}"
"""图片生成模块 - 将文本渲染为图片"""
from PIL import Image, ImageDraw, ImageFont
from pilmoji import Pilmoji
import io
import base64
from typing import Tuple
class ImageRenderer:
"""图片渲染器"""
DEFAULT_FONT_PATHS = [
"/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
"/usr/share/fonts/truetype/noto/NotoColorEmoji.ttf",
"C:/Windows/Fonts/msyh.ttc",
"C:/Windows/Fonts/simhei.ttf",
"C:/Windows/Fonts/seguiemj.ttf",
]
def __init__(
self,
width: int = 800,
font_size: int = 24,
padding: int = 30,
line_spacing: float = 1.4,
bg_color: Tuple[int, int, int] = (252, 252, 252),
text_color: Tuple[int, int, int] = (0, 0, 0),
font_paths: list = None
):
"""
初始化图片渲染器
Args:
width: 图片宽度
font_size: 字体大小
padding: 内边距
line_spacing: 行距倍数
bg_color: 背景颜色 (R, G, B)
text_color: 文本颜色 (R, G, B)
font_paths: 字体文件路径列表
"""
self.width = width
self.font_size = font_size
self.padding = padding
self.line_spacing = line_spacing
self.bg_color = bg_color
self.text_color = text_color
self.font_paths = font_paths or self.DEFAULT_FONT_PATHS.copy()
# 加载字体
self.font = self._load_font()
def _load_font(self) -> ImageFont.FreeTypeFont:
"""加载字体"""
return self._load_font_by_size(self.font_size)
def _load_font_by_size(self, font_size: int) -> ImageFont.FreeTypeFont:
"""
加载指定大小的字体
Args:
font_size: 字体大小
Returns:
字体对象
"""
for font_path in self.font_paths:
try:
font = ImageFont.truetype(font_path, font_size)
return font
except Exception:
continue
# 如果都失败了,使用默认字体
return ImageFont.load_default()
def _calculate_text_dimensions(self, text: str) -> Tuple[int, int]:
"""
计算文本的尺寸(使用缓存的 Draw 对象,避免重复创建临时 Image
Args:
text: 文本内容
Returns:
(宽度, 高度)
"""
if not hasattr(self, '_cached_draw'):
self._cached_img = Image.new('RGB', (1, 1), color=self.bg_color)
self._cached_draw = ImageDraw.Draw(self._cached_img)
bbox = self._cached_draw.textbbox((0, 0), text, font=self.font)
width = bbox[2] - bbox[0]
height = bbox[3] - bbox[1]
return width, height
def _smart_text_wrap(self, text: str, max_width: int) -> list:
"""
智能文本换行(使用 font.getlength 避免逐字符创建临时 Image
Args:
text: 文本内容
max_width: 最大宽度
Returns:
换行后的文本列表
"""
lines = []
current_line = ""
current_width = 0.0
for char in text:
char_width = self.font.getlength(char)
if current_width + char_width <= max_width:
current_line += char
current_width += char_width
else:
if current_line:
lines.append(current_line)
current_line = char
current_width = char_width
if current_line:
lines.append(current_line)
return lines
def render(self, text: str, title: str = "蛋定助手通知您:") -> bytes:
"""
将文本渲染为图片
Args:
text: 文本内容(已处理换行符)
Returns:
图片的字节数据
"""
if not text:
text = "(空消息)"
# 计算有效宽度
effective_width = self.width - (2 * self.padding)
# 按段落处理
all_lines = []
for paragraph in text.split('\n'):
if not paragraph:
all_lines.append("")
continue
wrapped_lines = self._smart_text_wrap(paragraph, effective_width)
all_lines.extend(wrapped_lines)
# 计算行高
_, line_height = self._calculate_text_dimensions("测试")
total_line_height = int(line_height * self.line_spacing)
# 标题相关
title_font_size = int(self.font_size * 1.3) # 标题字体放大1.3倍
title_font = self._load_font_by_size(title_font_size)
title_height = int(line_height * 1.5) # 标题区域高度
divider_height = 2 # 分割线高度
divider_spacing = 10 # 分割线与标题的间距
# 计算总高度(包含标题区域)
content_height = (len(all_lines) * total_line_height) + (2 * self.padding)
header_height = title_height + divider_spacing + divider_height + self.padding
total_height = content_height + header_height
height = max(total_height, 200) # 最小高度
# 创建图片
image = Image.new('RGB', (self.width, height), color=self.bg_color)
draw = ImageDraw.Draw(image)
title_x = self.padding
title_y = self.padding
with Pilmoji(image) as pilmoji:
pilmoji.text((title_x, title_y), title, font=title_font, fill=self.text_color)
# 绘制分割线
divider_y = title_y + title_height + 10
draw.line(
[(self.padding, divider_y), (self.width - self.padding, divider_y)],
fill=(200, 200, 200),
width=divider_height
)
y = divider_y + divider_spacing + self.padding
with Pilmoji(image) as pilmoji:
for line in all_lines:
if line:
pilmoji.text((self.padding, y), line, font=self.font, fill=self.text_color)
y += total_line_height
# 转换为字节流
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG')
img_byte_arr.seek(0)
return img_byte_arr.getvalue()
def render_to_base64(self, text: str, title: str = "蛋定助手通知您:") -> str:
"""
将文本渲染为图片并返回 base64 编码
Args:
text: 文本内容
Returns:
base64 编码的图片数据
"""
img_bytes = self.render(text, title=title)
base64_str = base64.b64encode(img_bytes).decode('utf-8')
return f"base64://{base64_str}"

View File

@@ -42,106 +42,47 @@ class MessageSender:
return None
async def send_to_group(
self,
group_id: int,
qq: int,
image_base64: str
) -> dict:
async def _send_msg(self, group_id: int, qq: int, segment) -> dict:
"""
向指定群发送消息@用户 + 图片
内部通用发送方法@用户 + 任意消息段
Args:
group_id: 群号
qq: 要 @ 的 QQ 号
image_base64: 图片的 base64 编码格式base64://...
segment: MessageSegment 实例
Returns:
发送结果字典
Raises:
ValueError: Bot 未设置
Exception: 发送失败
"""
bot = self.get_bot()
if not bot:
raise ValueError("Bot 实例未设置,无法发送消息")
try:
# 构造消息:@用户 + 图片
message = Message()
message.append(MessageSegment.at(qq))
message.append(MessageSegment.image(image_base64))
message.append(segment)
# 发送群消息,添加 __qqpush_source 标记供 auto_recall 识别
result = await bot.call_api(
"send_group_msg",
group_id=group_id,
message=message,
__qqpush_source="danding_qqpush" # 添加标记
__qqpush_source="danding_qqpush"
)
return {
"success": True,
"data": result,
"message": "消息发送成功"
}
return {"success": True, "data": result, "message": "消息发送成功"}
except Exception as e:
# 捕获异常并返回错误信息
return {
"success": False,
"error": str(e),
"message": f"消息发送失败: {str(e)}"
}
logger.warning(f"[QqPush] 消息发送失败 group={group_id} qq={qq}: {e}")
return {"success": False, "error": str(e), "message": f"消息发送失败: {e}"}
async def send_text_to_group(
self,
group_id: int,
qq: int,
text: str
) -> dict:
"""
向指定群发送纯文本消息(@用户 + 文本)
Args:
group_id: 群号
qq: 要 @ 的 QQ 号
text: 文本内容
Returns:
发送结果字典
"""
bot = self.get_bot()
if not bot:
raise ValueError("Bot 实例未设置,无法发送消息")
try:
# 构造消息:@用户 + 文本
message = Message()
message.append(MessageSegment.at(qq))
message.append(MessageSegment.text(text))
# 发送群消息,添加 __qqpush_source 标记供 auto_recall 识别
result = await bot.call_api(
"send_group_msg",
group_id=group_id,
message=message,
__qqpush_source="danding_qqpush" # 添加标记
)
return {
"success": True,
"data": result,
"message": "消息发送成功"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": f"消息发送失败: {str(e)}"
}
async def send_to_group(self, group_id: int, qq: int, image_base64: str) -> dict:
"""向指定群发送消息(@用户 + 图片)"""
return await self._send_msg(group_id, qq, MessageSegment.image(image_base64))
async def send_text_to_group(self, group_id: int, qq: int, text: str) -> dict:
"""向指定群发送纯文本消息(@用户 + 文本)"""
return await self._send_msg(group_id, qq, MessageSegment.text(text))
# 全局消息发送器实例