工作空间路径

#!/usr/bin/env python3
"""
企业微信 <-> nanobot 桥接服务 (HTTP API 版本)
使用 gateway HTTP API 而非 CLI 调用,大幅提升响应速度
"""
import asyncio
import json
import os
import re
import sys
from pathlib import Path
from typing import Any, Optional
from datetime import datetime

工作空间路径

NANOBOT_WORKSPACE = "/home/nanobot/.nanobot"

try:
import httpx
HTTPX_OK = True
except ImportError:
HTTPX_OK = False
print("[!] 错误:需要安装 httpx - pip install httpx")
sys.exit(1)

==================== 配置 ====================

从配置文件加载环境变量

CONFIG_FILE = Path(NANOBOT_WORKSPACE) / "config" / "wechat_config.env"
if CONFIG_FILE.exists():
print(f"[+] 加载配置文件:{CONFIG_FILE}")
with open(CONFIG_FILE, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ.setdefault(key.strip(), value.strip())

企业微信 API

WEIXIN_CORP_ID = os.environ.get("WEIXIN_CORP_ID", "")
WEIXIN_AGENT_SECRET = os.environ.get("WEIXIN_AGENT_SECRET", "")
WEIXIN_PUSH_URL = os.environ.get("WEIXIN_PUSH_URL", "https://api.yuangs.cc/weixinpush")
WEIXIN_UPLOAD_URL = os.environ.get("WEIXIN_UPLOAD_URL", "https://qyapi.weixin.qq.com/cgi-bin/media/upload")

轮询认证

WECHAT_WEBHOOK_SECRET = os.environ.get("WECHAT_WEBHOOK_SECRET", "")

nanobot Gateway HTTP API

NANOBOT_API_URL = os.environ.get("NANOBOT_API_URL", "http://127.0.0.1:18790")

nanobot CLI fallback

NANOBOT_PATH = os.environ.get("NANOBOT_PATH", "/home/nanobot/.nanobot/.venv/bin/nanobot")
OUTPUT_DIR = Path(NANOBOT_WORKSPACE) / "outputs"

并发控制

MAX_CONCURRENT_TASKS = 5

文件清理

AUTO_CLEANUP_FILES = True
CLEANUP_DELAY_SECONDS = 300

支持的文件类型

SUPPORTED_FILE_TYPES = {
'docx': 'file',
'pptx': 'file',
'xlsx': 'file',
'pdf': 'file',
'png': 'image',
'gif': 'image',
'jpg': 'image',
'jpeg': 'image',
}

==================== 企业微信客户端 ====================

class WeChatClient:
"""企业微信 API 客户端"""

def __init__(self, corp_id: str = "", agent_secret: str = ""):  
    self.corp_id = corp_id or WEIXIN_CORP_ID  
    self.agent_secret = agent_secret or WEIXIN_AGENT_SECRET  
    self._access_token: Optional[str] = None  
    self._token_expires_at: float = 0  

async def get_access_token(self) -> str:  
    """获取访问令牌"""  
    if self._access_token and datetime.now().timestamp() < self._token_expires_at:  
        return self._access_token  

    if not self.corp_id or not self.agent_secret:  
        return ""  

    async with httpx.AsyncClient() as client:  
        try:  
            resp = await client.get(  
                "https://qyapi.weixin.qq.com/cgi-bin/gettoken",  
                params={"corpid": self.corp_id, "corpsecret": self.agent_secret},  
                timeout=10  
            )  
            data = resp.json()  
            if data.get("errcode") == 0:  
                self._access_token = data["access_token"]  
                self._token_expires_at = datetime.now().timestamp() + 7000  
                return self._access_token  
        except Exception as e:  
            print(f"[!] 获取 token 失败:{e}")  

    return ""  

async def send_text(self, content: str, to_user: str = "@all") -> bool:  
    """发送文本消息"""  
    async with httpx.AsyncClient() as client:  
        try:  
            resp = await client.post(  
                WEIXIN_PUSH_URL,  
                json={  
                    "msgtype": "text",  
                    "content": content,  
                    "to_user": to_user  
                },  
                timeout=10  
            )  
            if resp.status_code == 200:  
                data = resp.json()  
                return data.get("status") == "success"  
            return False  
        except Exception as e:  
            print(f"[!] 发送文本失败:{e}")  
            return False  

async def upload_media(self, file_path: str, media_type: str = "file") -> Optional[str]:  
    """上传文件到企业微信,返回 media_id"""  
    if not self.corp_id or not self.agent_secret:  
        print(f"[!] 未配置企业微信凭证,无法上传文件:{file_path}")  
        return None  

    token = await self.get_access_token()  
    if not token:  
        return None  

    async with httpx.AsyncClient() as client:  
        try:  
            with open(file_path, 'rb') as f:  
                files = {'media': f}  
                resp = await client.post(  
                    WEIXIN_UPLOAD_URL,  
                    params={"type": media_type, "access_token": token},  
                    files=files,  
                    timeout=30  
                )  
                data = resp.json()  
                if data.get("errcode") == 0:  
                    media_id = data.get("media_id")  
                    print(f"[+] 文件上传成功:{media_id}")  
                    return media_id  
                else:  
                    print(f"[!] 上传失败:{data}")  
                    return None  
        except Exception as e:  
            print(f"[!] 上传文件异常:{e}")  
            return None  

async def send_file(self, file_path: str, to_user: str = "@all") -> bool:  
    """发送文件消息"""  
    ext = Path(file_path).suffix.lower().lstrip('.')  
    media_type = SUPPORTED_FILE_TYPES.get(ext, 'file')  

    media_id = await self.upload_media(file_path, media_type)  
    if not media_id:  
        return False  

    async with httpx.AsyncClient() as client:  
        try:  
            token = await self.get_access_token()  
            if media_type == 'image':  
                msg_type = "image"  
            else:  
                msg_type = "file"  

            resp = await client.post(  
                f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}",  
                json={  
                    "touser": to_user,  
                    "msgtype": msg_type,  
                    "agentid": 1000002,  
                    msg_type: {"media_id": media_id},  
                    "safe": 0  
                },  
                timeout=10  
            )  
            data = resp.json()  
            return data.get("errcode") == 0  
        except Exception as e:  
            print(f"[!] 发送文件失败:{e}")  
            return False  

==================== nanobot 客户端 (HTTP API) ====================

class NanobotClient:
"""通过 HTTP API 调用 nanobot gateway"""

def __init__(self, workspace: str):  
    self.workspace = Path(workspace)  
    self.nanobot_path = NANOBOT_PATH  
    self.api_url = NANOBOT_API_URL  

@staticmethod  
def filter_thought(text: str) -> str:  
    """清理思考标签"""  
    if not text:  
        return ""  
    # 清除 think 标签  
    text = re.sub(r'<think.*?>.*?</think', '', text, flags=re.DOTALL | re.IGNORECASE)  
    text = re.sub(r'</?think>', '', text, flags=re.IGNORECASE)  
    return text.strip()  

async def chat(self, message: str, user_id: str, channel: str = "wechat") -> tuple[str, list[str]]:  
    """发送消息并获取回复 - 使用 HTTP API  

    Returns:  
        tuple: (response_text, media_paths)  
    """  
    # 注入微信环境上下文  
    enhanced_message = (  
        "[系统约束:你正运行在企业微信受限环境。"  
        "1. 禁止输出思考标签或任何内心独白。"  
        "2. 直接输出给用户的最终回复内容。"  
        "3. 请尽量使用简洁的回复,适合手机屏幕阅读。"  
        "生成文件时,请在回复中说明文件路径。]"  
        f"用户消息:{message}"  
    )  

    try:  
        # 使用 HTTP API 调用 gateway  
        async with httpx.AsyncClient() as client:  
            resp = await client.post(  
                f"{self.api_url}/api/message",  
                json={  
                    "message": enhanced_message,  
                    "user_id": user_id,  
                    "channel": channel,  
                    "timeout": 180  
                },  
                timeout=200  
            )  

            if resp.status_code == 200:  
                data = resp.json()  
                response = data.get("response", "")  
                media = data.get("media", [])  
                response = self.filter_thought(response)  
                return (response if response else "(无回复)", media)  
            else:  
                try:  
                    error = resp.json().get("error", "unknown error")  
                except:  
                    error = f"HTTP {resp.status_code}"  
                print(f"[!] API 错误: {error}")  
                return (f"[错误] {error}", [])  

    except httpx.TimeoutException:  
        return ("[超时] 消息处理超时,请稍后重试", [])  
    except Exception as e:  
        print(f"[!] API 调用异常:{e}")  
        return (f"[错误] {str(e)}", [])  

==================== 文件处理器 ====================

class FileHandler:
"""文件处理工具"""

@staticmethod  
def extract_file_paths(text: str) -> list[tuple[str, str]]:  
    """从文本中提取文件路径"""  
    paths = []  
    # 匹配 OUTPUT_DIR 下的文件路径  
    patterns = [  
        rf'{OUTPUT_DIR}/[\w\-\.]+\.\w{{2,4}}',  
        rf'/tmp/[\w\-\.]+\.\w{{2,4}}',  
    ]  
    for pattern in patterns:  
        matches = re.findall(pattern, text)  
        for match in matches:  
            if os.path.exists(match):  
                ext = Path(match).suffix.lower().lstrip('.')  
                paths.append((match, ext))  
    return paths  

@staticmethod  
async def cleanup_file(file_path: str, delay: int = CLEANUP_DELAY_SECONDS):  
    """延迟清理文件"""  
    await asyncio.sleep(delay)  
    try:  
        if os.path.exists(file_path):  
            os.remove(file_path)  
            print(f"[-] 已清理文件:{file_path}")  
    except Exception as e:  
        print(f"[!] 清理文件失败:{e}")  

==================== 消息处理器 ====================

class MessageProcessor:
"""消息处理器"""

def __init__(self, wechat: WeChatClient, nanobot: NanobotClient):  
    self.wechat = wechat  
    self.nanobot = nanobot  
    self.semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)  
    self.last_check_time = 0  

async def process(self, content: str, user_id: str):  
    """处理单条消息(带并发控制)"""  
    async with self.semaphore:  
        try:  
            print(f"[>] 处理消息:{user_id}: {content[:50]}...")  

            response, media_paths = await self.nanobot.chat(content, user_id)  
            print(f"[<] 回复:{response[:100]}...")  
            if media_paths:  
                print(f"[<] 媒体:{media_paths}")  

            await self.wechat.send_text(response, user_id)  

            # 发送 API 返回的媒体文件  
            for file_path in media_paths:  
                if os.path.exists(file_path):  
                    print(f"[#] 发送媒体文件:{file_path}")  
                    success = await self.wechat.send_file(file_path, user_id)  
                    if success:  
                        asyncio.create_task(FileHandler.cleanup_file(file_path))  

            # 提取并发送回复文本中的文件路径(兼容旧模式)  
            files = FileHandler.extract_file_paths(response)  
            for file_path, ext in files:  
                if file_path not in media_paths:  # 避免重复发送  
                    print(f"[#] 发送文件:{file_path}")  
                    success = await self.wechat.send_file(file_path, user_id)  
                    if success:  
                        asyncio.create_task(FileHandler.cleanup_file(file_path))  

        except Exception as e:  
            print(f"[!] 处理消息异常:{e}")  
            await self.wechat.send_text(f"[错误] 消息处理失败:{e}", user_id)  

==================== 轮询服务 ====================

class PollingService:
"""轮询企业微信消息"""

def __init__(self, wechat: WeChatClient, processor: MessageProcessor):  
    self.wechat = wechat  
    self.processor = processor  
    self.last_msg_time = 0  
    self.processed_msg_ids = set()  
    # 轮询接口地址  
    self.poll_url = "https://api.yuangs.cc/wechat/poll"  
    # ACK 接口地址  
    self.ack_url = "https://api.yuangs.cc/wechat/ack"  

async def ack_message(self, msg_id: str) -> bool:  
    """确认消息已处理,从队列中删除"""  
    if not WECHAT_WEBHOOK_SECRET:  
        return False  

    async with httpx.AsyncClient() as client:  
        try:  
            resp = await client.post(  
                self.ack_url,  
                headers={"X-Webhook-Secret": WECHAT_WEBHOOK_SECRET, "Content-Type": "application/json"},  
                json={"msg_id": msg_id},  
                timeout=5  
            )  
            if resp.status_code == 200:  
                data = resp.json()  
                return data.get("status") == "ok"  
        except Exception as e:  
            print(f"[!] ACK 失败:{e}")  
    return False  

async def poll(self):  
    """轮询消息"""  
    print(f"[+] 开始轮询企业微信消息 (HTTP API 模式)...")  

    if not WECHAT_WEBHOOK_SECRET:  
        print(f"[!] 警告:未配置 WECHAT_WEBHOOK_SECRET,轮询接口将无法认证")  

    while True:  
        try:  
            # 调用轮询 API 获取消息 (GET 请求,带认证 Header)  
            headers = {"X-Webhook-Secret": WECHAT_WEBHOOK_SECRET} if WECHAT_WEBHOOK_SECRET else {}  

            async with httpx.AsyncClient() as client:  
                resp = await client.get(  
                    self.poll_url,  
                    headers=headers,  
                    timeout=10  
                )  

                if resp.status_code == 200:  
                    data = resp.json()  
                    messages = data.get("messages", [])  

                    for msg in messages:  
                        msg_id = msg.get("id", "")  
                        user_id = msg.get("user_id", "")  
                        content = msg.get("content", "")  

                        # 跳过已处理的消息  
                        if msg_id in self.processed_msg_ids:  
                            continue  

                        self.processed_msg_ids.add(msg_id)  

                        # 限制缓存大小  
                        if len(self.processed_msg_ids) > 1000:  
                            self.processed_msg_ids = set(list(self.processed_msg_ids)[-500:])  

                        # 确认消息  
                        await self.ack_message(msg_id)  
                        print(f"[+] ACK 消息:{msg_id[:8]}...")  

                        # 处理消息  
                        await self.processor.process(content, user_id)  
                elif resp.status_code == 401:  
                    print(f"[!] 认证失败,请检查 WECHAT_WEBHOOK_SECRET")  
                else:  
                    print(f"[!] 轮询失败:HTTP {resp.status_code}")  

        except httpx.TimeoutException:  
            pass  # 超时是正常的,继续轮询  
        except Exception as e:  
            print(f"[!] 轮询异常:{e}")  

        # 轮询间隔  
        await asyncio.sleep(1)  

==================== 主程序 ====================

async def main():
print("=" * 50)
print("企业微信 <-> nanobot 桥接服务 (HTTP API 版本)")
print("=" * 50)
print(f"[+] Gateway API: {NANOBOT_API_URL}")
print(f"[+] 轮询地址: https://api.yuangs.cc/wechat/poll")

# 检查 Gateway API 是否可用  
async with httpx.AsyncClient() as client:  
    try:  
        resp = await client.get(f"{NANOBOT_API_URL}/api/health", timeout=5)  
        if resp.status_code == 200:  
            print(f"[+] Gateway API 连接成功")  
        else:  
            print(f"[!] Gateway API 返回 {resp.status_code}")  
    except Exception as e:  
        print(f"[!] Gateway API 不可用: {e}")  
        print(f"[!] 请确保 nanobot gateway 正在运行")  
        return  

# 创建客户端  
wechat = WeChatClient()  
nanobot = NanobotClient(NANOBOT_WORKSPACE)  
processor = MessageProcessor(wechat, nanobot)  
polling = PollingService(wechat, processor)  

# 开始轮询  
await polling.poll()  

if name == "main":
asyncio.run(main())