"""消息发送模块 - 负责向 QQ 群发送消息""" from typing import Optional from nonebot import get_bots, logger from nonebot.adapters.onebot.v11 import Bot, Message, MessageSegment class MessageSender: """消息发送器""" def __init__(self): """初始化消息发送器""" self.bot: Optional[Bot] = None def set_bot(self, bot: Bot): """ 设置 Bot 实例 Args: bot: OneBot V11 Bot 实例 """ self.bot = bot def get_bot(self) -> Optional[Bot]: """ 获取 Bot 实例 Returns: Bot 实例,如果未设置则尝试从全局获取 """ if self.bot: return self.bot # 尝试从全局获取 Bot try: bots = get_bots() if bots: bot = list(bots.values())[0] self.bot = bot return bot except Exception as e: logger.warning(f"[QqPush] 获取全局Bot失败: {e}") return None async def _send_msg(self, group_id: int, qq: int, segment) -> dict: """ 内部通用发送方法(@用户 + 任意消息段) Args: group_id: 群号 qq: 要 @ 的 QQ 号 segment: MessageSegment 实例 Returns: 发送结果字典 """ bot = self.get_bot() if not bot: raise ValueError("Bot 实例未设置,无法发送消息") try: message = Message() message.append(MessageSegment.at(qq)) message.append(segment) result = await bot.call_api( "send_group_msg", group_id=group_id, message=message, __qqpush_source="danding_qqpush" ) return {"success": True, "data": result, "message": "消息发送成功"} except Exception as e: logger.warning(f"[QqPush] 消息发送失败 group={group_id} qq={qq}: {e}") return {"success": False, "error": str(e), "message": f"消息发送失败: {e}"} async def send_to_group(self, group_id: int, qq: int, image_base64: str) -> dict: """向指定群发送消息(@用户 + 图片)""" return await self._send_msg(group_id, qq, MessageSegment.image(image_base64)) async def send_text_to_group(self, group_id: int, qq: int, text: str) -> dict: """向指定群发送纯文本消息(@用户 + 文本)""" return await self._send_msg(group_id, qq, MessageSegment.text(text)) # 全局消息发送器实例 sender = MessageSender()