Files
DanDingNoneBot/danding_bot/plugins/danding_qqpush/sender.py

149 lines
4.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""消息发送模块 - 负责向 QQ 群发送消息"""
from typing import Optional
from nonebot import get_bots
from nonebot.adapters.onebot.v11 import Bot, Message, MessageSegment
class MessageSender:
"""消息发送器"""
def __init__(self):
"""初始化消息发送器"""
self.bot: Optional[Bot] = None
def set_bot(self, bot: Bot):
"""
设置 Bot 实例
Args:
bot: OneBot V11 Bot 实例
"""
self.bot = bot
def get_bot(self) -> Optional[Bot]:
"""
获取 Bot 实例
Returns:
Bot 实例,如果未设置则尝试从全局获取
"""
if self.bot:
return self.bot
# 尝试从全局获取 Bot
try:
bots = get_bots()
if bots:
bot = list(bots.values())[0]
self.bot = bot
return bot
except Exception:
pass
return None
async def send_to_group(
self,
group_id: int,
qq: int,
image_base64: str
) -> dict:
"""
向指定群发送消息(@用户 + 图片)
Args:
group_id: 群号
qq: 要 @ 的 QQ 号
image_base64: 图片的 base64 编码格式base64://...
Returns:
发送结果字典
Raises:
ValueError: Bot 未设置
Exception: 发送失败
"""
bot = self.get_bot()
if not bot:
raise ValueError("Bot 实例未设置,无法发送消息")
try:
# 构造消息:@用户 + 图片
message = Message()
message.append(MessageSegment.at(qq))
message.append(MessageSegment.image(image_base64))
# 发送群消息,添加 __qqpush_source 标记供 auto_recall 识别
result = await bot.call_api(
"send_group_msg",
group_id=group_id,
message=message,
__qqpush_source="danding_qqpush" # 添加标记
)
return {
"success": True,
"data": result,
"message": "消息发送成功"
}
except Exception as e:
# 捕获异常并返回错误信息
return {
"success": False,
"error": str(e),
"message": f"消息发送失败: {str(e)}"
}
async def send_text_to_group(
self,
group_id: int,
qq: int,
text: str
) -> dict:
"""
向指定群发送纯文本消息(@用户 + 文本)
Args:
group_id: 群号
qq: 要 @ 的 QQ 号
text: 文本内容
Returns:
发送结果字典
"""
bot = self.get_bot()
if not bot:
raise ValueError("Bot 实例未设置,无法发送消息")
try:
# 构造消息:@用户 + 文本
message = Message()
message.append(MessageSegment.at(qq))
message.append(MessageSegment.text(text))
# 发送群消息,添加 __qqpush_source 标记供 auto_recall 识别
result = await bot.call_api(
"send_group_msg",
group_id=group_id,
message=message,
__qqpush_source="danding_qqpush" # 添加标记
)
return {
"success": True,
"data": result,
"message": "消息发送成功"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": f"消息发送失败: {str(e)}"
}
# 全局消息发送器实例
sender = MessageSender()