import asyncio from typing import Optional, Dict, Any, Set from nonebot import get_plugin_config, logger from nonebot.adapters.onebot.v11 import Bot from nonebot.plugin import PluginMetadata from .config import Config # 插件元信息 __plugin_meta__ = PluginMetadata( name="auto_recall", description="一个通用的消息撤回插件,监控所有发出的消息并在指定时间后撤回", usage="无需手动调用,插件会自动监控并撤回消息", config=Config, ) # 获取插件配置 plugin_config = get_plugin_config(Config) # 撤回任务引用集合,防止被GC回收 _recall_tasks: Set[asyncio.Task] = set() def _track_task(task: asyncio.Task) -> None: """跟踪异步任务,完成后自动移除""" _recall_tasks.add(task) task.add_done_callback(_recall_tasks.discard) # 注册 API 调用后钩子 @Bot.on_called_api async def handle_api_result( bot: Bot, exception: Optional[Exception], api: str, data: Dict[str, Any], result: Any ): """拦截发送消息API调用,监控发出的消息""" if api not in ("send_msg", "send_group_msg", "send_private_msg") or exception: return message_id = result.get("message_id") if not message_id: return recall_delay = plugin_config.recall_delay # 检查是否为 danding_qqpush 发送的消息 if data.get("__qqpush_source") == "danding_qqpush": recall_delay = plugin_config.qqpush_recall_delay logger.info(f"danding_qqpush 消息将在 {recall_delay}s 后撤回: msg_id={message_id}") task = asyncio.create_task(recall_message_after_delay(bot, message_id, recall_delay)) _track_task(task) async def recall_message_after_delay(bot: Bot, message_id: int, delay: int): """在指定时间后撤回消息""" await asyncio.sleep(delay) try: await bot.delete_msg(message_id=message_id) logger.debug(f"消息已撤回: msg_id={message_id}") except Exception as e: logger.error(f"撤回消息失败: msg_id={message_id} error={e}")