Files
DanDingNoneBot/danding_bot/plugins/danding_qqpush/sender.py
XiaM-Admin dedc872f1b 功能:通过 HTTP API 实现 Danding_QqPush 插件,用于 QQ 群通知
- 增加了通过外部 HTTP API 向 QQ 群组发送消息的核心功能。
- 实现了对长文本消息的图片渲染,以避免被认定为垃圾信息。
- 支持在消息中提及特定的 QQ 用户。
- 创建了用于 API 令牌和图片渲染设置的配置选项。
- 开发了一个测试脚本以验证 API 功能。
- 对现有代码进行了重构,以提高组织性和可维护性。
2026-01-20 21:19:05 +08:00

147 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""消息发送模块 - 负责向 QQ 群发送消息"""
from typing import Optional
from nonebot import get_bots
from nonebot.adapters.onebot.v11 import Bot, Message, MessageSegment
class MessageSender:
"""消息发送器"""
def __init__(self):
"""初始化消息发送器"""
self.bot: Optional[Bot] = None
def set_bot(self, bot: Bot):
"""
设置 Bot 实例
Args:
bot: OneBot V11 Bot 实例
"""
self.bot = bot
def get_bot(self) -> Optional[Bot]:
"""
获取 Bot 实例
Returns:
Bot 实例,如果未设置则尝试从全局获取
"""
if self.bot:
return self.bot
# 尝试从全局获取 Bot
try:
bots = get_bots()
if bots:
bot = list(bots.values())[0]
self.bot = bot
return bot
except Exception:
pass
return None
async def send_to_group(
self,
group_id: int,
qq: int,
image_base64: str
) -> dict:
"""
向指定群发送消息(@用户 + 图片)
Args:
group_id: 群号
qq: 要 @ 的 QQ 号
image_base64: 图片的 base64 编码格式base64://...
Returns:
发送结果字典
Raises:
ValueError: Bot 未设置
Exception: 发送失败
"""
bot = self.get_bot()
if not bot:
raise ValueError("Bot 实例未设置,无法发送消息")
try:
# 构造消息:@用户 + 图片
message = Message()
message.append(MessageSegment.at(qq))
message.append(MessageSegment.image(image_base64))
# 发送群消息
result = await bot.call_api(
"send_group_msg",
group_id=group_id,
message=message
)
return {
"success": True,
"data": result,
"message": "消息发送成功"
}
except Exception as e:
# 捕获异常并返回错误信息
return {
"success": False,
"error": str(e),
"message": f"消息发送失败: {str(e)}"
}
async def send_text_to_group(
self,
group_id: int,
qq: int,
text: str
) -> dict:
"""
向指定群发送纯文本消息(@用户 + 文本)
Args:
group_id: 群号
qq: 要 @ 的 QQ 号
text: 文本内容
Returns:
发送结果字典
"""
bot = self.get_bot()
if not bot:
raise ValueError("Bot 实例未设置,无法发送消息")
try:
# 构造消息:@用户 + 文本
message = Message()
message.append(MessageSegment.at(qq))
message.append(MessageSegment.text(text))
# 发送群消息
result = await bot.call_api(
"send_group_msg",
group_id=group_id,
message=message
)
return {
"success": True,
"data": result,
"message": "消息发送成功"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": f"消息发送失败: {str(e)}"
}
# 全局消息发送器实例
sender = MessageSender()