refactor(plugins): comprehensive code review - ~35 fixes across 14 plugins

Phase 1 - Plugin code review (14/14 plugins):
- Security: 3x token leak in print→logger.debug, Bearer prefix handling
- Bug: bare except→specific exceptions, HorseState type safety, sync→async
- Critical: response_model undefined, route dead code, sync blocking event loop
- Quality: 11x print()→logger, variable name shadowing, consistent logging

Phase 2 - Deep analysis:
- Fix: payout int truncation→max(1, round(amount*odds))
- Fix: room_store get_lock race condition→dict.setdefault()
- Verify: data_manager f-string SQL is safe (uses ? placeholders)

Infrastructure: review reports generated for all plugins.
This commit is contained in:
2026-05-09 23:22:28 +08:00
parent 9a8cb3ad6d
commit c01338f496
43 changed files with 4233 additions and 3645 deletions

View File

@@ -1,183 +1,186 @@
import asyncio
import random
import os
import signal
import sys
import atexit
import subprocess
import threading
from nonebot import on_message, get_plugin_config, get_driver
from nonebot.adapters.onebot.v11 import MessageEvent, Bot, MessageSegment
from nonebot.plugin import PluginMetadata
from nonebot.exception import FinishedException
from openai import OpenAI
from .config import Config
from .utils.text_image import create_text_image
from .screenshot import markdown_to_image
import pyppeteer
import pyppeteer.launcher
import types
# 插件元信息
__plugin_meta__ = PluginMetadata(
name="chatai",
description="一个对接 DeepSeek 的聊天 AI 插件",
usage="发送以 * 开头的消息AI 会回复你,两分钟后自动撤销",
config=Config,
)
# 获取插件配置
plugin_config = get_plugin_config(Config)
# 全局浏览器实例
browser = None
browser_lock = threading.Lock()
# 注册消息事件处理器
message_handler = on_message(priority=50, block=True)
# 确保输出目录存在
os.makedirs("data/chatai", exist_ok=True)
# 获取 NoneBot 驱动器
driver = get_driver()
# 定义强制终止 Chrome 的函数
def force_kill_chrome():
"""强制终止所有 Chrome 进程"""
try:
if sys.platform == 'win32':
subprocess.run(['taskkill', '/F', '/IM', 'chrome.exe'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
subprocess.run(['pkill', '-9', '-f', 'chrome'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except:
pass
# 在启动时确保没有残留的 Chrome 进程
force_kill_chrome()
# 注册退出处理函数
atexit.register(force_kill_chrome)
# 注册信号处理
def signal_handler(sig, frame):
"""处理终止信号"""
# 直接强制终止 Chrome 进程,不使用 Pyppeteer 的关闭方法
force_kill_chrome()
# 强制退出程序
os._exit(0)
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
@driver.on_shutdown
async def close_browser():
"""在 NoneBot 关闭时关闭浏览器"""
global browser
with browser_lock:
if browser is not None:
try:
await browser.close()
except:
pass
browser = None
# 确保所有 Chrome 进程都被终止
force_kill_chrome()
# 替代方案:直接替换信号处理器
def noop_signal_handler(sig, frame):
pass
# 保存原始信号处理器
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGTERM)
# 在启动浏览器前替换信号处理器
async def init_browser():
"""初始化浏览器实例"""
global browser
with browser_lock:
if browser is None or not hasattr(browser, 'process') or not browser.process:
# 替换信号处理器
signal.signal(signal.SIGINT, noop_signal_handler)
signal.signal(signal.SIGTERM, noop_signal_handler)
try:
browser = await pyppeteer.launch(
headless=True,
args=['--no-sandbox', '--disable-setuid-sandbox', '--disable-gpu']
)
finally:
# 恢复我们的信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
return browser
async def call_ai_api(message: str) -> str:
"""调用 AI 接口"""
client = OpenAI(
api_key=plugin_config.deepseek_token,
base_url="https://api.siliconflow.cn/v1"
)
response = client.chat.completions.create(
model="deepseek-ai/DeepSeek-V3",
messages=[
{"role": "system", "content": "你是一个活泼可爱的群助手。请在回复中使用丰富的 Emoji 表情(如 😊 😄 🎉 ✨ 💖 等)。你的语气要俏皮活泼像在和朋友聊天一样自然。在回答问题时要保持专业性的同时也要让回复显得生动有趣。每条回复都必须包含至少2-3个 Emoji 表情。如果你需要展示代码片段,请确保代码中不包含任何颜文字或表情符号,保持代码的专业性和可读性。"},
{"role": "user", "content": message},
],
stream=False
)
return response.choices[0].message.content or ""
@message_handler.handle()
async def handle_message(event: MessageEvent, bot: Bot):
# 获取用户发送的消息内容
user_message = event.get_plaintext().strip()
# 检查消息是否以 * 开头
if not user_message.startswith("*"):
return # 如果不是以 * 开头,直接返回,不处理
# 去掉开头的 * 并去除多余空格
user_message = user_message[1:].strip()
# 如果消息为空,直接返回
if not user_message:
await asyncio.sleep(random.uniform(2, 3))
await message_handler.finish("请输入有效内容哦~")
# 调用模型 API
try:
# 初始化浏览器
browser = await init_browser()
# 调用 AI API
response = await call_ai_api(user_message)
if response:
await asyncio.sleep(random.uniform(2, 3))
# 使用 markdown_to_image 生成图片
image_path = 'data/chatai/output.png'
await markdown_to_image(response, image_path, browser)
# 发送图片消息
sent_message = await bot.send(event, MessageSegment.image(f"file:///{os.path.abspath(image_path)}"))
# 启动定时任务,两分钟后撤销消息
asyncio.create_task(delete_message_after_delay(bot, sent_message["message_id"]))
except FinishedException:
pass
except Exception as e:
await asyncio.sleep(random.uniform(2, 3))
await message_handler.finish(f"出错了: {str(e)}")
async def delete_message_after_delay(bot: Bot, message_id: int):
"""两分钟后撤销消息"""
await asyncio.sleep(120) # 等待两分钟
try:
await bot.delete_msg(message_id=message_id)
except:
pass
import asyncio
import random
import os
import sys
import subprocess
from nonebot import on_message, get_plugin_config, get_driver, logger
from nonebot.adapters.onebot.v11 import MessageEvent, Bot, MessageSegment
from nonebot.plugin import PluginMetadata
from nonebot.exception import FinishedException
from openai import OpenAI
from .config import Config
from .screenshot import markdown_to_image
import pyppeteer
# 插件元信息
__plugin_meta__ = PluginMetadata(
name="chatai",
description="一个对接 DeepSeek 的聊天 AI 插件",
usage="发送以 * 开头的消息AI 会回复你,两分钟后自动撤销",
config=Config,
)
# 获取插件配置
plugin_config = get_plugin_config(Config)
# 全局浏览器实例
_browser: pyppeteer.browser.Browser | None = None
_browser_lock = asyncio.Lock()
# OpenAI 客户端(延迟初始化)
_ai_client: OpenAI | None = None
# 撤回任务引用集合防止被GC回收
_recall_tasks: set[asyncio.Task] = set()
# 注册消息事件处理器
message_handler = on_message(priority=50, block=True)
# 确保输出目录存在
os.makedirs("data/chatai", exist_ok=True)
# 获取 NoneBot 驱动器
driver = get_driver()
def _force_kill_chrome():
"""强制终止残留 Chrome 进程"""
try:
if sys.platform == "win32":
subprocess.run(
["taskkill", "/F", "/IM", "chrome.exe"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
else:
subprocess.run(
["pkill", "-9", "-f", "chrome"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
except Exception:
pass
@driver.on_startup
async def startup_cleanup():
"""启动时清理残留Chrome进程"""
_force_kill_chrome()
@driver.on_shutdown
async def close_browser():
"""在 NoneBot 关闭时关闭浏览器"""
global _browser
async with _browser_lock:
if _browser is not None:
try:
await _browser.close()
except Exception as e:
logger.warning(f"关闭浏览器异常: {e}")
_browser = None
_force_kill_chrome()
async def init_browser() -> "pyppeteer.browser.Browser":
"""初始化或复用浏览器实例"""
global _browser
async with _browser_lock:
if _browser is None or not _browser.process:
try:
_browser = await pyppeteer.launch(
headless=True,
args=["--no-sandbox", "--disable-setuid-sandbox", "--disable-gpu"],
)
logger.info("chatai: 浏览器实例已创建")
except Exception as e:
logger.error(f"chatai: 浏览器启动失败: {e}")
raise
return _browser
def _get_ai_client() -> OpenAI:
"""获取或创建 OpenAI 客户端(单例)"""
global _ai_client
if _ai_client is None:
_ai_client = OpenAI(
api_key=plugin_config.deepseek_token,
base_url="https://api.siliconflow.cn/v1",
)
return _ai_client
async def call_ai_api(message: str) -> str:
"""调用 AI 接口"""
client = _get_ai_client()
response = client.chat.completions.create(
model="deepseek-ai/DeepSeek-V3",
messages=[
{"role": "system", "content": (
"你是一个活泼可爱的群助手。请在回复中使用丰富的 Emoji 表情"
"(如 😊 😄 🎉 ✨ 💖 等)。你的语气要俏皮活泼,像在和朋友聊天一样自然。"
"在回答问题时要保持专业性的同时,也要让回复显得生动有趣。"
"每条回复都必须包含至少2-3个 Emoji 表情。"
"如果你需要展示代码片段,请确保代码中不包含任何颜文字或表情符号,"
"保持代码的专业性和可读性。"
)},
{"role": "user", "content": message},
],
stream=False,
)
return response.choices[0].message.content or ""
@message_handler.handle()
async def handle_message(event: MessageEvent, bot: Bot):
user_message = event.get_plaintext().strip()
if not user_message.startswith("*"):
return
user_message = user_message[1:].strip()
if not user_message:
await asyncio.sleep(random.uniform(2, 3))
await message_handler.finish("请输入有效内容哦~")
try:
browser = await init_browser()
response = await call_ai_api(user_message)
if response:
await asyncio.sleep(random.uniform(2, 3))
# 使用事件ID+时间戳避免并发路径冲突
image_path = f"data/chatai/output_{event.message_id}.png"
await markdown_to_image(response, image_path, browser)
sent_message = await bot.send(
event, MessageSegment.image(f"file:///{os.path.abspath(image_path)}")
)
# 清理临时图片文件
try:
os.remove(image_path)
except OSError:
pass
# 保存task引用防止GC回收
task = asyncio.create_task(
_delete_message_after_delay(bot, sent_message["message_id"])
)
_recall_tasks.add(task)
task.add_done_callback(_recall_tasks.discard)
except FinishedException:
raise
except Exception as e:
logger.error(f"chatai处理失败: user_id={event.user_id} error={e}")
await asyncio.sleep(random.uniform(2, 3))
await message_handler.finish(f"出错了: {e}")
async def _delete_message_after_delay(bot: Bot, message_id: int):
"""两分钟后撤回消息"""
await asyncio.sleep(120)
try:
await bot.delete_msg(message_id=message_id)
except Exception as e:
logger.debug(f"chatai撤回消息失败(可忽略): msg_id={message_id} error={e}")

View File

@@ -1,164 +1,174 @@
import asyncio
import markdown
from pyppeteer import launch
async def markdown_to_image(markdown_text: str, output_path: str, browser=None):
"""将 Markdown 转换为 HTML 并使用 Puppeteer 截图。"""
try:
# 将 Markdown 转换为 HTML
html = markdown.markdown(markdown_text)
# 使用传入的浏览器实例或创建新的
should_close_browser = False
if browser is None:
browser = await launch(headless=True, args=['--no-sandbox', '--disable-setuid-sandbox'])
should_close_browser = True
page = await browser.newPage()
# 设置页面样式,使内容更美观
await page.setContent(f"""
<html>
<head>
<style>
body {{
font-family: "PingFang SC", "Microsoft YaHei", sans-serif;
line-height: 1.6;
padding: 30px;
max-width: 800px;
margin: 0 auto;
background-color: transparent;
color: #333;
}}
.container {{
background-color: #ffffff;
border-radius: 15px;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1);
padding: 25px;
overflow: hidden;
}}
p {{
margin-bottom: 16px;
}}
code {{
font-family: "SFMono-Regular", Consolas, "Liberation Mono", Menlo, monospace;
background-color: #f5f7f9;
padding: 2px 6px;
border-radius: 4px;
font-size: 0.9em;
}}
pre {{
background-color: #f5f7f9;
padding: 15px;
border-radius: 8px;
overflow-x: auto;
margin: 20px 0;
}}
pre code {{
background-color: transparent;
padding: 0;
}}
h1, h2, h3, h4, h5, h6 {{
margin-top: 24px;
margin-bottom: 16px;
font-weight: 600;
line-height: 1.25;
}}
h1 {{
font-size: 1.8em;
border-bottom: 1px solid #eaecef;
padding-bottom: 0.3em;
}}
h2 {{
font-size: 1.5em;
border-bottom: 1px solid #eaecef;
padding-bottom: 0.3em;
}}
blockquote {{
padding: 0 1em;
color: #6a737d;
border-left: 0.25em solid #dfe2e5;
margin: 16px 0;
}}
ul, ol {{
padding-left: 2em;
margin-bottom: 16px;
}}
img {{
max-width: 100%;
border-radius: 8px;
}}
a {{
color: #0366d6;
text-decoration: none;
}}
a:hover {{
text-decoration: underline;
}}
table {{
border-collapse: collapse;
width: 100%;
margin: 16px 0;
}}
table th, table td {{
padding: 8px 12px;
border: 1px solid #dfe2e5;
}}
table th {{
background-color: #f6f8fa;
}}
</style>
</head>
<body>
<div class="container">
{html}
</div>
</body>
</html>
""")
# 等待内容渲染完成
await asyncio.sleep(0.5)
# 获取内容尺寸并设置视口
dimensions = await page.evaluate('''() => {
const container = document.querySelector('.container');
return {
width: container.offsetWidth + 60, // 加上 body 的 padding
height: container.offsetHeight + 60
}
}''')
# 设置视口大小
await page.setViewport({
'width': dimensions['width'],
'height': dimensions['height'],
'deviceScaleFactor': 2 # 提高图片清晰度
})
# 截图,使用透明背景
await page.screenshot({
'path': output_path,
'omitBackground': True, # 透明背景
'clip': {
'x': 0,
'y': 0,
'width': dimensions['width'],
'height': dimensions['height']
}
})
# 关闭页面
await page.close()
# 如果是我们创建的浏览器实例,则关闭它
if should_close_browser:
await browser.close()
except Exception as e:
# 确保资源被释放
if 'page' in locals() and page is not None:
await page.close()
if should_close_browser and 'browser' in locals() and browser is not None:
await browser.close()
raise # 重新抛出异常以便上层处理
import asyncio
import html as html_module
import markdown
from nonebot import logger
async def markdown_to_image(markdown_text: str, output_path: str, browser=None):
"""将 Markdown 转换为 HTML 并使用 Puppeteer 截图。"""
page = None
should_close_browser = False
try:
# 转义用户输入中的HTML特殊字符防止XSS
safe_text = html_module.escape(markdown_text)
html_content = markdown.markdown(safe_text)
# 使用传入的浏览器实例或创建新的
if browser is None:
from pyppeteer import launch
browser = await launch(headless=True, args=['--no-sandbox', '--disable-setuid-sandbox'])
should_close_browser = True
page = await browser.newPage()
# 设置页面样式,使内容更美观
await page.setContent(f"""
<html>
<head>
<style>
body {{
font-family: "PingFang SC", "Microsoft YaHei", sans-serif;
line-height: 1.6;
padding: 30px;
max-width: 800px;
margin: 0 auto;
background-color: transparent;
color: #333;
}}
.container {{
background-color: #ffffff;
border-radius: 15px;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1);
padding: 25px;
overflow: hidden;
}}
p {{
margin-bottom: 16px;
}}
code {{
font-family: "SFMono-Regular", Consolas, "Liberation Mono", Menlo, monospace;
background-color: #f5f7f9;
padding: 2px 6px;
border-radius: 4px;
font-size: 0.9em;
}}
pre {{
background-color: #f5f7f9;
padding: 15px;
border-radius: 8px;
overflow-x: auto;
margin: 20px 0;
}}
pre code {{
background-color: transparent;
padding: 0;
}}
h1, h2, h3, h4, h5, h6 {{
margin-top: 24px;
margin-bottom: 16px;
font-weight: 600;
line-height: 1.25;
}}
h1 {{
font-size: 1.8em;
border-bottom: 1px solid #eaecef;
padding-bottom: 0.3em;
}}
h2 {{
font-size: 1.5em;
border-bottom: 1px solid #eaecef;
padding-bottom: 0.3em;
}}
blockquote {{
padding: 0 1em;
color: #6a737d;
border-left: 0.25em solid #dfe2e5;
margin: 16px 0;
}}
ul, ol {{
padding-left: 2em;
margin-bottom: 16px;
}}
img {{
max-width: 100%;
border-radius: 8px;
}}
a {{
color: #0366d6;
text-decoration: none;
}}
a:hover {{
text-decoration: underline;
}}
table {{
border-collapse: collapse;
width: 100%;
margin: 16px 0;
}}
table th, table td {{
padding: 8px 12px;
border: 1px solid #dfe2e5;
}}
table th {{
background-color: #f6f8fa;
}}
</style>
</head>
<body>
<div class="container">
{html_content}
</div>
</body>
</html>
""")
# 等待内容渲染完成
await asyncio.sleep(0.5)
# 获取内容尺寸并设置视口
dimensions = await page.evaluate('''() => {
const container = document.querySelector('.container');
return {
width: container.offsetWidth + 60, // 加上 body 的 padding
height: container.offsetHeight + 60
}
}''')
# 设置视口大小
await page.setViewport({
'width': dimensions['width'],
'height': dimensions['height'],
'deviceScaleFactor': 2 # 提高图片清晰度
})
# 截图,使用透明背景
await page.screenshot({
'path': output_path,
'omitBackground': True, # 透明背景
'clip': {
'x': 0,
'y': 0,
'width': dimensions['width'],
'height': dimensions['height']
}
})
# 关闭页面
await page.close()
# 如果是我们创建的浏览器实例,则关闭它
if should_close_browser:
await browser.close()
except Exception as e:
# 确保资源被释放
if page is not None:
try:
await page.close()
except Exception:
pass
if should_close_browser and browser is not None:
try:
await browser.close()
except Exception:
pass
raise # 重新抛出异常以便上层处理

View File

@@ -1,143 +1,144 @@
from PIL import Image, ImageDraw, ImageFont
import io
def create_text_image(text: str, width: int = 1000, font_size: int = 25) -> bytes:
"""将文本转换为图片,智能处理各种字符"""
def load_fonts():
"""加载文本和 Emoji 字体"""
# 尝试加载 Emoji 字体
emoji_font = None
try:
emoji_font = ImageFont.truetype("/usr/share/fonts/truetype/noto/NotoColorEmoji.ttf", font_size)
print("成功加载 Emoji 字体")
except Exception as e:
print(f"加载 Emoji 字体失败: {e}")
# 尝试加载文本字体
text_font = None
font_paths = [
"/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
"/usr/share/fonts/wqy-microhei/wqy-microhei.ttc",
"/usr/share/fonts/wqy-zenhei/wqy-zenhei.ttc",
"C:/Windows/Fonts/msyh.ttc",
"C:/Windows/Fonts/simhei.ttf",
]
for path in font_paths:
try:
text_font = ImageFont.truetype(path, font_size)
print(f"成功加载文本字体: {path}")
break
except Exception:
continue
if text_font is None:
text_font = ImageFont.load_default()
print("使用默认字体")
return text_font, emoji_font
def is_emoji(char):
"""判断字符是否为 Emoji"""
return len(char.encode('utf-8')) >= 4
def draw_text_with_fonts(draw, text, x, y, text_font, emoji_font):
"""使用不同的字体绘制文本和 Emoji"""
current_x = x
for char in text:
# 选择合适的字体
font = emoji_font if (is_emoji(char) and emoji_font) else text_font
# 绘制字符
draw.text((current_x, y), char, font=font, fill=(0, 0, 0))
# 计算字符宽度
bbox = draw.textbbox((current_x, y), char, font=font)
char_width = bbox[2] - bbox[0]
current_x += char_width
return current_x - x
def calculate_text_dimensions(text, text_font, emoji_font):
"""计算文本尺寸"""
test_img = Image.new('RGB', (1, 1), color=(255, 255, 255))
test_draw = ImageDraw.Draw(test_img)
total_width = 0
max_height = 0
for char in text:
font = emoji_font if (is_emoji(char) and emoji_font) else text_font
bbox = test_draw.textbbox((0, 0), char, font=font)
char_width = bbox[2] - bbox[0]
char_height = bbox[3] - bbox[1]
total_width += char_width
max_height = max(max_height, char_height)
return total_width, max_height
# 加载字体
text_font, emoji_font = load_fonts()
# 基础配置
padding = 40
effective_width = width - (2 * padding)
def smart_text_wrap(text):
"""智能文本换行"""
lines = []
current_line = ""
current_width = 0
for paragraph in text.split('\n'):
if not paragraph:
lines.append("")
continue
for char in paragraph:
char_width, _ = calculate_text_dimensions(char, text_font, emoji_font)
if current_width + char_width <= effective_width:
current_line += char
current_width += char_width
else:
if current_line:
lines.append(current_line)
current_line = char
current_width = char_width
if current_line:
lines.append(current_line)
current_line = ""
current_width = 0
return lines
# 智能换行处理
lines = smart_text_wrap(text)
# 计算行高
_, line_height = calculate_text_dimensions("测试", text_font, emoji_font)
line_spacing = int(line_height * 0.5) # 行间距为行高的50%
total_line_height = line_height + line_spacing
# 计算总高度
total_height = (len(lines) * total_line_height) + (2 * padding)
height = max(total_height, 200) # 最小高度200像素
# 创建图片
image = Image.new('RGB', (width, int(height)), color=(252, 252, 252))
draw = ImageDraw.Draw(image)
# 绘制文本
y = padding
for line in lines:
if line: # 跳过空行
draw_text_with_fonts(draw, line, padding, y, text_font, emoji_font)
y += total_line_height
# 转换为字节流
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG', quality=95)
img_byte_arr.seek(0)
from PIL import Image, ImageDraw, ImageFont
from nonebot.log import logger
import io
def create_text_image(text: str, width: int = 1000, font_size: int = 25) -> bytes:
"""将文本转换为图片,智能处理各种字符"""
def load_fonts():
"""加载文本和 Emoji 字体"""
# 尝试加载 Emoji 字体
emoji_font = None
try:
emoji_font = ImageFont.truetype("/usr/share/fonts/truetype/noto/NotoColorEmoji.ttf", font_size)
logger.info("成功加载 Emoji 字体")
except Exception as e:
logger.warning(f"加载 Emoji 字体失败: {e}")
# 尝试加载文本字体
text_font = None
font_paths = [
"/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
"/usr/share/fonts/wqy-microhei/wqy-microhei.ttc",
"/usr/share/fonts/wqy-zenhei/wqy-zenhei.ttc",
"C:/Windows/Fonts/msyh.ttc",
"C:/Windows/Fonts/simhei.ttf",
]
for path in font_paths:
try:
text_font = ImageFont.truetype(path, font_size)
logger.info(f"成功加载文本字体: {path}")
break
except Exception:
continue
if text_font is None:
text_font = ImageFont.load_default()
logger.warning("使用默认字体")
return text_font, emoji_font
def is_emoji(char):
"""判断字符是否为 Emoji"""
return len(char.encode('utf-8')) >= 4
def draw_text_with_fonts(draw, text, x, y, text_font, emoji_font):
"""使用不同的字体绘制文本和 Emoji"""
current_x = x
for char in text:
# 选择合适的字体
font = emoji_font if (is_emoji(char) and emoji_font) else text_font
# 绘制字符
draw.text((current_x, y), char, font=font, fill=(0, 0, 0))
# 计算字符宽度
bbox = draw.textbbox((current_x, y), char, font=font)
char_width = bbox[2] - bbox[0]
current_x += char_width
return current_x - x
def calculate_text_dimensions(text, text_font, emoji_font):
"""计算文本尺寸"""
test_img = Image.new('RGB', (1, 1), color=(255, 255, 255))
test_draw = ImageDraw.Draw(test_img)
total_width = 0
max_height = 0
for char in text:
font = emoji_font if (is_emoji(char) and emoji_font) else text_font
bbox = test_draw.textbbox((0, 0), char, font=font)
char_width = bbox[2] - bbox[0]
char_height = bbox[3] - bbox[1]
total_width += char_width
max_height = max(max_height, char_height)
return total_width, max_height
# 加载字体
text_font, emoji_font = load_fonts()
# 基础配置
padding = 40
effective_width = width - (2 * padding)
def smart_text_wrap(text):
"""智能文本换行"""
lines = []
current_line = ""
current_width = 0
for paragraph in text.split('\n'):
if not paragraph:
lines.append("")
continue
for char in paragraph:
char_width, _ = calculate_text_dimensions(char, text_font, emoji_font)
if current_width + char_width <= effective_width:
current_line += char
current_width += char_width
else:
if current_line:
lines.append(current_line)
current_line = char
current_width = char_width
if current_line:
lines.append(current_line)
current_line = ""
current_width = 0
return lines
# 智能换行处理
lines = smart_text_wrap(text)
# 计算行高
_, line_height = calculate_text_dimensions("测试", text_font, emoji_font)
line_spacing = int(line_height * 0.5) # 行间距为行高的50%
total_line_height = line_height + line_spacing
# 计算总高度
total_height = (len(lines) * total_line_height) + (2 * padding)
height = max(total_height, 200) # 最小高度200像素
# 创建图片
image = Image.new('RGB', (width, int(height)), color=(252, 252, 252))
draw = ImageDraw.Draw(image)
# 绘制文本
y = padding
for line in lines:
if line: # 跳过空行
draw_text_with_fonts(draw, line, padding, y, text_font, emoji_font)
y += total_line_height
# 转换为字节流
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG', quality=95)
img_byte_arr.seek(0)
return img_byte_arr.getvalue()