Skip to content

接入新渠道

渠道(Channel)是 CocoCat 与外部用户交互的入口。目前支持微信、企业微信、飞书和 Web API。接入新渠道需要实现 Channel 基类并注册到入口管理器。

架构概览

外部消息 → Channel实现 → on_message回调 → SceneKeeper → Agent邮箱

Agent回复 ← Channel.send() ← Agent处理完成后回调 ← Heartbeat消费

第一步:创建 Channel 子类

cococat/core/channels/ 下创建新文件,实现 Channel 基类:

python
"""My new channel implementation."""
from cococat.core.channels import Channel, ChatMessage


class MyChannel(Channel):
    channel_type = "my_channel"  # 唯一标识

    def start(self, scene_id: str, config: dict):
        """启动频道连接。config 包含渠道配置参数。"""
        self.scene_id = scene_id
        # 初始化连接、WebSocket、Webhook 等
        # 收到消息时调用 self.on_message(msg)

    def send(self, reply: str, user_id: str):
        """发送回复给用户。"""
        # 调用渠道 API 发送消息
        pass

Channel 基类

python
class Channel:
    channel_type = ""

    def __init__(self):
        self.scene_id = ""
        self.on_message = None  # 外部设置的回调

    def start(self, scene_id: str, config: dict):
        raise NotImplementedError

    def send(self, reply: str, user_id: str):
        raise NotImplementedError
方法说明
start(scene_id, config)启动频道,建立与外部平台的连接
send(reply, user_id)向指定用户发送回复

ChatMessage 格式

python
class ChatMessage:
    channel_type: str   # 渠道类型标识
    scene_id: str       # 所属场景
    user_id: str        # 用户 ID(外部平台用户标识)
    content: str        # 消息内容
    msg_type: str       # 消息类型(text, image 等)
    msg_id: str         # 消息 ID
    timestamp: str      # ISO 格式时间戳

第二步:实现消息接收循环

渠道需要持续监听外部消息。典型实现方式:

WebSocket 模式(如飞书):

python
def start(self, scene_id: str, config: dict):
    import asyncio
    import websockets

    async def listen():
        async with websockets.connect(config["ws_url"]) as ws:
            async for message in ws:
                msg = self._parse_message(message)
                if msg and self.on_message:
                    self.on_message(msg)

    threading.Thread(target=lambda: asyncio.run(listen()), daemon=True).start()

Webhook 模式(如微信公众号):

python
def start(self, scene_id: str, config: dict):
    # Webhook 由 FastAPI 路由处理
    pass

第三步:注册到入口管理器

编辑 cococat/core/scene_keeper.py,新增频道启动函数和路由:

python
def _start_my_channel(scene_id: str, config: dict, target_id: str, target_type: str):
    try:
        from cococat.core.channels.my_channel import MyChannel
        ch = MyChannel()
        ch.on_message = lambda msg: _route_to_agent(
            target_id, "my_channel", msg.user_id, msg.content
        )
        ch.start(scene_id, config)
    except Exception as e:
        print(f"[SceneKeeper] MyChannel failed: {e}")

start_agent_entriesstart_scene_entries 中添加分支:

python
if channel == "my_channel":
    t = threading.Thread(target=_start_my_channel, ...)
    t.start()

第四步:添加入口配置 UI

如果渠道需要配置参数(API Key、Webhook URL 等),在 Web 面板的入口管理页面添加对应的配置表单。配置以 JSON 形式存储在 agents/{id}/entries.jsonscenes/{id}/entries.json

json
{
  "entries": [
    {
      "type": "channel",
      "channel": "my_channel",
      "enabled": true,
      "config": {
        "api_key": "xxx",
        "webhook_url": "https://..."
      }
    }
  ]
}

最佳实践

  • 异常处理:渠道启动不应阻塞主流程,使用 daemon=True 线程
  • 重连机制:WebSocket 类渠道需要断线重连
  • 消息去重:使用 msg_id 防止重复消费
  • 速率限制:遵守外部平台的 API 调用限制
  • 日志:记录渠道连接状态和错误

基于 MIT 协议开源