diff --git a/danding_bot/plugins/danding_qqpush/api.py b/danding_bot/plugins/danding_qqpush/api.py index 7dc715f..6730846 100644 --- a/danding_bot/plugins/danding_qqpush/api.py +++ b/danding_bot/plugins/danding_qqpush/api.py @@ -8,6 +8,9 @@ from nonebot import get_driver, logger from .config import Config from .text_parser import TextParser from .image_render import ImageRenderer + +# Module-level singleton: load font once, reuse across requests +_renderer = _renderer # reuse module-level singleton from .sender import sender @@ -134,10 +137,10 @@ def create_routes(token: str, config: Config): raise except Exception as e: - logger.exception(f"推送接口异常: {str(e)}") + logger.exception(f"推送接口异常: {e}") raise HTTPException( status_code=500, - detail=f"服务器内部错误: {str(e)}" + detail="服务器内部错误" ) return router diff --git a/danding_bot/plugins/danding_qqpush/image_render.py b/danding_bot/plugins/danding_qqpush/image_render.py index b3fb338..0f61c24 100644 --- a/danding_bot/plugins/danding_qqpush/image_render.py +++ b/danding_bot/plugins/danding_qqpush/image_render.py @@ -1,215 +1,215 @@ -"""图片生成模块 - 将文本渲染为图片""" -from PIL import Image, ImageDraw, ImageFont -from pilmoji import Pilmoji -import io -import base64 -from typing import Tuple - - -class ImageRenderer: - """图片渲染器""" - - DEFAULT_FONT_PATHS = [ - "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc", - "/usr/share/fonts/truetype/noto/NotoColorEmoji.ttf", - "C:/Windows/Fonts/msyh.ttc", - "C:/Windows/Fonts/simhei.ttf", - "C:/Windows/Fonts/seguiemj.ttf", - ] - - def __init__( - self, - width: int = 800, - font_size: int = 24, - padding: int = 30, - line_spacing: float = 1.4, - bg_color: Tuple[int, int, int] = (252, 252, 252), - text_color: Tuple[int, int, int] = (0, 0, 0), - font_paths: list = None - ): - """ - 初始化图片渲染器 - - Args: - width: 图片宽度 - font_size: 字体大小 - padding: 内边距 - line_spacing: 行距倍数 - bg_color: 背景颜色 (R, G, B) - text_color: 文本颜色 (R, G, B) - font_paths: 字体文件路径列表 - """ - self.width = width - self.font_size = font_size - self.padding = padding - self.line_spacing = line_spacing - self.bg_color = bg_color - self.text_color = text_color - self.font_paths = font_paths or self.DEFAULT_FONT_PATHS.copy() - - # 加载字体 - self.font = self._load_font() - - def _load_font(self) -> ImageFont.FreeTypeFont: - """加载字体""" - return self._load_font_by_size(self.font_size) - - def _load_font_by_size(self, font_size: int) -> ImageFont.FreeTypeFont: - """ - 加载指定大小的字体 - - Args: - font_size: 字体大小 - - Returns: - 字体对象 - """ - for font_path in self.font_paths: - try: - font = ImageFont.truetype(font_path, font_size) - return font - except Exception: - continue - - # 如果都失败了,使用默认字体 - return ImageFont.load_default() - - def _calculate_text_dimensions(self, text: str) -> Tuple[int, int]: - """ - 计算文本的尺寸 - - Args: - text: 文本内容 - - Returns: - (宽度, 高度) - """ - # 创建一个临时图片用于计算 - test_img = Image.new('RGB', (1, 1), color=self.bg_color) - test_draw = ImageDraw.Draw(test_img) - - bbox = test_draw.textbbox((0, 0), text, font=self.font) - width = bbox[2] - bbox[0] - height = bbox[3] - bbox[1] - - return width, height - - def _smart_text_wrap(self, text: str, max_width: int) -> list: - """ - 智能文本换行 - - Args: - text: 文本内容 - max_width: 最大宽度 - - Returns: - 换行后的文本列表 - """ - lines = [] - current_line = "" - current_width = 0 - - for char in text: - char_width, _ = self._calculate_text_dimensions(char) - - if current_width + char_width <= max_width: - current_line += char - current_width += char_width - else: - if current_line: - lines.append(current_line) - current_line = char - current_width = char_width - - if current_line: - lines.append(current_line) - - return lines - - def render(self, text: str, title: str = "蛋定助手通知您:") -> bytes: - """ - 将文本渲染为图片 - - Args: - text: 文本内容(已处理换行符) - - Returns: - 图片的字节数据 - """ - if not text: - text = "(空消息)" - - # 计算有效宽度 - effective_width = self.width - (2 * self.padding) - - # 按段落处理 - all_lines = [] - for paragraph in text.split('\n'): - if not paragraph: - all_lines.append("") - continue - - wrapped_lines = self._smart_text_wrap(paragraph, effective_width) - all_lines.extend(wrapped_lines) - - # 计算行高 - _, line_height = self._calculate_text_dimensions("测试") - total_line_height = int(line_height * self.line_spacing) - - # 标题相关 - title_font_size = int(self.font_size * 1.3) # 标题字体放大1.3倍 - title_font = self._load_font_by_size(title_font_size) - title_height = int(line_height * 1.5) # 标题区域高度 - divider_height = 2 # 分割线高度 - divider_spacing = 10 # 分割线与标题的间距 - - # 计算总高度(包含标题区域) - content_height = (len(all_lines) * total_line_height) + (2 * self.padding) - header_height = title_height + divider_spacing + divider_height + self.padding - total_height = content_height + header_height - height = max(total_height, 200) # 最小高度 - - # 创建图片 - image = Image.new('RGB', (self.width, height), color=self.bg_color) - draw = ImageDraw.Draw(image) - - title_x = self.padding - title_y = self.padding - with Pilmoji(image) as pilmoji: - pilmoji.text((title_x, title_y), title, font=title_font, fill=self.text_color) - - # 绘制分割线 - divider_y = title_y + title_height + 10 - draw.line( - [(self.padding, divider_y), (self.width - self.padding, divider_y)], - fill=(200, 200, 200), - width=divider_height - ) - - y = divider_y + divider_spacing + self.padding - with Pilmoji(image) as pilmoji: - for line in all_lines: - if line: - pilmoji.text((self.padding, y), line, font=self.font, fill=self.text_color) - y += total_line_height - - # 转换为字节流 - img_byte_arr = io.BytesIO() - image.save(img_byte_arr, format='PNG', quality=95) - img_byte_arr.seek(0) - - return img_byte_arr.getvalue() - - def render_to_base64(self, text: str, title: str = "蛋定助手通知您:") -> str: - """ - 将文本渲染为图片并返回 base64 编码 - - Args: - text: 文本内容 - - Returns: - base64 编码的图片数据 - """ - img_bytes = self.render(text, title=title) - base64_str = base64.b64encode(img_bytes).decode('utf-8') - return f"base64://{base64_str}" +"""图片生成模块 - 将文本渲染为图片""" +from PIL import Image, ImageDraw, ImageFont +from pilmoji import Pilmoji +import io +import base64 +from typing import Tuple + + +class ImageRenderer: + """图片渲染器""" + + DEFAULT_FONT_PATHS = [ + "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc", + "/usr/share/fonts/truetype/noto/NotoColorEmoji.ttf", + "C:/Windows/Fonts/msyh.ttc", + "C:/Windows/Fonts/simhei.ttf", + "C:/Windows/Fonts/seguiemj.ttf", + ] + + def __init__( + self, + width: int = 800, + font_size: int = 24, + padding: int = 30, + line_spacing: float = 1.4, + bg_color: Tuple[int, int, int] = (252, 252, 252), + text_color: Tuple[int, int, int] = (0, 0, 0), + font_paths: list = None + ): + """ + 初始化图片渲染器 + + Args: + width: 图片宽度 + font_size: 字体大小 + padding: 内边距 + line_spacing: 行距倍数 + bg_color: 背景颜色 (R, G, B) + text_color: 文本颜色 (R, G, B) + font_paths: 字体文件路径列表 + """ + self.width = width + self.font_size = font_size + self.padding = padding + self.line_spacing = line_spacing + self.bg_color = bg_color + self.text_color = text_color + self.font_paths = font_paths or self.DEFAULT_FONT_PATHS.copy() + + # 加载字体 + self.font = self._load_font() + + def _load_font(self) -> ImageFont.FreeTypeFont: + """加载字体""" + return self._load_font_by_size(self.font_size) + + def _load_font_by_size(self, font_size: int) -> ImageFont.FreeTypeFont: + """ + 加载指定大小的字体 + + Args: + font_size: 字体大小 + + Returns: + 字体对象 + """ + for font_path in self.font_paths: + try: + font = ImageFont.truetype(font_path, font_size) + return font + except Exception: + continue + + # 如果都失败了,使用默认字体 + return ImageFont.load_default() + + def _calculate_text_dimensions(self, text: str) -> Tuple[int, int]: + """ + 计算文本的尺寸(使用缓存的 Draw 对象,避免重复创建临时 Image) + + Args: + text: 文本内容 + + Returns: + (宽度, 高度) + """ + if not hasattr(self, '_cached_draw'): + self._cached_img = Image.new('RGB', (1, 1), color=self.bg_color) + self._cached_draw = ImageDraw.Draw(self._cached_img) + + bbox = self._cached_draw.textbbox((0, 0), text, font=self.font) + width = bbox[2] - bbox[0] + height = bbox[3] - bbox[1] + + return width, height + + def _smart_text_wrap(self, text: str, max_width: int) -> list: + """ + 智能文本换行(使用 font.getlength 避免逐字符创建临时 Image) + + Args: + text: 文本内容 + max_width: 最大宽度 + + Returns: + 换行后的文本列表 + """ + lines = [] + current_line = "" + current_width = 0.0 + + for char in text: + char_width = self.font.getlength(char) + + if current_width + char_width <= max_width: + current_line += char + current_width += char_width + else: + if current_line: + lines.append(current_line) + current_line = char + current_width = char_width + + if current_line: + lines.append(current_line) + + return lines + + def render(self, text: str, title: str = "蛋定助手通知您:") -> bytes: + """ + 将文本渲染为图片 + + Args: + text: 文本内容(已处理换行符) + + Returns: + 图片的字节数据 + """ + if not text: + text = "(空消息)" + + # 计算有效宽度 + effective_width = self.width - (2 * self.padding) + + # 按段落处理 + all_lines = [] + for paragraph in text.split('\n'): + if not paragraph: + all_lines.append("") + continue + + wrapped_lines = self._smart_text_wrap(paragraph, effective_width) + all_lines.extend(wrapped_lines) + + # 计算行高 + _, line_height = self._calculate_text_dimensions("测试") + total_line_height = int(line_height * self.line_spacing) + + # 标题相关 + title_font_size = int(self.font_size * 1.3) # 标题字体放大1.3倍 + title_font = self._load_font_by_size(title_font_size) + title_height = int(line_height * 1.5) # 标题区域高度 + divider_height = 2 # 分割线高度 + divider_spacing = 10 # 分割线与标题的间距 + + # 计算总高度(包含标题区域) + content_height = (len(all_lines) * total_line_height) + (2 * self.padding) + header_height = title_height + divider_spacing + divider_height + self.padding + total_height = content_height + header_height + height = max(total_height, 200) # 最小高度 + + # 创建图片 + image = Image.new('RGB', (self.width, height), color=self.bg_color) + draw = ImageDraw.Draw(image) + + title_x = self.padding + title_y = self.padding + with Pilmoji(image) as pilmoji: + pilmoji.text((title_x, title_y), title, font=title_font, fill=self.text_color) + + # 绘制分割线 + divider_y = title_y + title_height + 10 + draw.line( + [(self.padding, divider_y), (self.width - self.padding, divider_y)], + fill=(200, 200, 200), + width=divider_height + ) + + y = divider_y + divider_spacing + self.padding + with Pilmoji(image) as pilmoji: + for line in all_lines: + if line: + pilmoji.text((self.padding, y), line, font=self.font, fill=self.text_color) + y += total_line_height + + # 转换为字节流 + img_byte_arr = io.BytesIO() + image.save(img_byte_arr, format='PNG') + img_byte_arr.seek(0) + + return img_byte_arr.getvalue() + + def render_to_base64(self, text: str, title: str = "蛋定助手通知您:") -> str: + """ + 将文本渲染为图片并返回 base64 编码 + + Args: + text: 文本内容 + + Returns: + base64 编码的图片数据 + """ + img_bytes = self.render(text, title=title) + base64_str = base64.b64encode(img_bytes).decode('utf-8') + return f"base64://{base64_str}" diff --git a/danding_bot/plugins/danding_qqpush/sender.py b/danding_bot/plugins/danding_qqpush/sender.py index d31cd35..9808d98 100644 --- a/danding_bot/plugins/danding_qqpush/sender.py +++ b/danding_bot/plugins/danding_qqpush/sender.py @@ -42,106 +42,47 @@ class MessageSender: return None - async def send_to_group( - self, - group_id: int, - qq: int, - image_base64: str - ) -> dict: + async def _send_msg(self, group_id: int, qq: int, segment) -> dict: """ - 向指定群发送消息(@用户 + 图片) + 内部通用发送方法(@用户 + 任意消息段) Args: group_id: 群号 qq: 要 @ 的 QQ 号 - image_base64: 图片的 base64 编码(格式:base64://...) + segment: MessageSegment 实例 Returns: 发送结果字典 - - Raises: - ValueError: Bot 未设置 - Exception: 发送失败 """ bot = self.get_bot() if not bot: raise ValueError("Bot 实例未设置,无法发送消息") try: - # 构造消息:@用户 + 图片 message = Message() message.append(MessageSegment.at(qq)) - message.append(MessageSegment.image(image_base64)) + message.append(segment) - # 发送群消息,添加 __qqpush_source 标记供 auto_recall 识别 result = await bot.call_api( "send_group_msg", group_id=group_id, message=message, - __qqpush_source="danding_qqpush" # 添加标记 + __qqpush_source="danding_qqpush" ) - return { - "success": True, - "data": result, - "message": "消息发送成功" - } + return {"success": True, "data": result, "message": "消息发送成功"} except Exception as e: - # 捕获异常并返回错误信息 - return { - "success": False, - "error": str(e), - "message": f"消息发送失败: {str(e)}" - } + logger.warning(f"[QqPush] 消息发送失败 group={group_id} qq={qq}: {e}") + return {"success": False, "error": str(e), "message": f"消息发送失败: {e}"} - async def send_text_to_group( - self, - group_id: int, - qq: int, - text: str - ) -> dict: - """ - 向指定群发送纯文本消息(@用户 + 文本) - - Args: - group_id: 群号 - qq: 要 @ 的 QQ 号 - text: 文本内容 - - Returns: - 发送结果字典 - """ - bot = self.get_bot() - if not bot: - raise ValueError("Bot 实例未设置,无法发送消息") - - try: - # 构造消息:@用户 + 文本 - message = Message() - message.append(MessageSegment.at(qq)) - message.append(MessageSegment.text(text)) - - # 发送群消息,添加 __qqpush_source 标记供 auto_recall 识别 - result = await bot.call_api( - "send_group_msg", - group_id=group_id, - message=message, - __qqpush_source="danding_qqpush" # 添加标记 - ) - - return { - "success": True, - "data": result, - "message": "消息发送成功" - } - - except Exception as e: - return { - "success": False, - "error": str(e), - "message": f"消息发送失败: {str(e)}" - } + async def send_to_group(self, group_id: int, qq: int, image_base64: str) -> dict: + """向指定群发送消息(@用户 + 图片)""" + return await self._send_msg(group_id, qq, MessageSegment.image(image_base64)) + + async def send_text_to_group(self, group_id: int, qq: int, text: str) -> dict: + """向指定群发送纯文本消息(@用户 + 文本)""" + return await self._send_msg(group_id, qq, MessageSegment.text(text)) # 全局消息发送器实例