import asyncio import random import os import signal import sys import atexit import subprocess import threading from nonebot import on_message, get_plugin_config, get_driver from nonebot.adapters.onebot.v11 import MessageEvent, Bot, MessageSegment from nonebot.plugin import PluginMetadata from nonebot.exception import FinishedException from openai import OpenAI from .config import Config from .utils.text_image import create_text_image from .screenshot import markdown_to_image import pyppeteer import pyppeteer.launcher import types # 插件元信息 __plugin_meta__ = PluginMetadata( name="chatai", description="一个对接 DeepSeek 的聊天 AI 插件", usage="发送以 * 开头的消息,AI 会回复你,两分钟后自动撤销", config=Config, ) # 获取插件配置 plugin_config = get_plugin_config(Config) # 全局浏览器实例 browser = None browser_lock = threading.Lock() # 注册消息事件处理器 message_handler = on_message(priority=50, block=True) # 确保输出目录存在 os.makedirs("data/chatai", exist_ok=True) # 获取 NoneBot 驱动器 driver = get_driver() # 定义强制终止 Chrome 的函数 def force_kill_chrome(): """强制终止所有 Chrome 进程""" try: if sys.platform == 'win32': subprocess.run(['taskkill', '/F', '/IM', 'chrome.exe'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) else: subprocess.run(['pkill', '-9', '-f', 'chrome'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except: pass # 在启动时确保没有残留的 Chrome 进程 force_kill_chrome() # 注册退出处理函数 atexit.register(force_kill_chrome) # 注册信号处理 def signal_handler(sig, frame): """处理终止信号""" # 直接强制终止 Chrome 进程,不使用 Pyppeteer 的关闭方法 force_kill_chrome() # 强制退出程序 os._exit(0) # 注册信号处理器 signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @driver.on_shutdown async def close_browser(): """在 NoneBot 关闭时关闭浏览器""" global browser with browser_lock: if browser is not None: try: await browser.close() except: pass browser = None # 确保所有 Chrome 进程都被终止 force_kill_chrome() # 替代方案:直接替换信号处理器 def noop_signal_handler(sig, frame): pass # 保存原始信号处理器 original_sigint = signal.getsignal(signal.SIGINT) original_sigterm = signal.getsignal(signal.SIGTERM) # 在启动浏览器前替换信号处理器 async def init_browser(): """初始化浏览器实例""" global browser with browser_lock: if browser is None or not hasattr(browser, 'process') or not browser.process: # 替换信号处理器 signal.signal(signal.SIGINT, noop_signal_handler) signal.signal(signal.SIGTERM, noop_signal_handler) try: browser = await pyppeteer.launch( headless=True, args=['--no-sandbox', '--disable-setuid-sandbox', '--disable-gpu'] ) finally: # 恢复我们的信号处理器 signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) return browser async def call_ai_api(message: str) -> str: """调用 AI 接口""" client = OpenAI( api_key=plugin_config.deepseek_token, base_url="https://api.siliconflow.cn/v1" ) response = client.chat.completions.create( model="deepseek-ai/DeepSeek-V3", messages=[ {"role": "system", "content": "你是一个活泼可爱的群助手。请在回复中使用丰富的 Emoji 表情(如 😊 😄 🎉 ✨ 💖 等)。你的语气要俏皮活泼,像在和朋友聊天一样自然。在回答问题时要保持专业性的同时,也要让回复显得生动有趣。每条回复都必须包含至少2-3个 Emoji 表情。如果你需要展示代码片段,请确保代码中不包含任何颜文字或表情符号,保持代码的专业性和可读性。"}, {"role": "user", "content": message}, ], stream=False ) return response.choices[0].message.content or "" @message_handler.handle() async def handle_message(event: MessageEvent, bot: Bot): # 获取用户发送的消息内容 user_message = event.get_plaintext().strip() # 检查消息是否以 * 开头 if not user_message.startswith("*"): return # 如果不是以 * 开头,直接返回,不处理 # 去掉开头的 * 并去除多余空格 user_message = user_message[1:].strip() # 如果消息为空,直接返回 if not user_message: await asyncio.sleep(random.uniform(2, 3)) await message_handler.finish("请输入有效内容哦~") # 调用模型 API try: # 初始化浏览器 browser = await init_browser() # 调用 AI API response = await call_ai_api(user_message) if response: await asyncio.sleep(random.uniform(2, 3)) # 使用 markdown_to_image 生成图片 image_path = 'data/chatai/output.png' await markdown_to_image(response, image_path, browser) # 发送图片消息 sent_message = await bot.send(event, MessageSegment.image(f"file:///{os.path.abspath(image_path)}")) # 启动定时任务,两分钟后撤销消息 asyncio.create_task(delete_message_after_delay(bot, sent_message["message_id"])) except FinishedException: pass except Exception as e: await asyncio.sleep(random.uniform(2, 3)) await message_handler.finish(f"出错了: {str(e)}") async def delete_message_after_delay(bot: Bot, message_id: int): """两分钟后撤销消息""" await asyncio.sleep(120) # 等待两分钟 try: await bot.delete_msg(message_id=message_id) except: pass