Cloudflare Durable Objects 作用及优势

简介

Cloudflare Durable Objects (DOs) 是 Cloudflare Workers 生态系统中的一个强大组件,它为开发者提供了一种构建有状态(Stateful)实时(Real-time)全球分布式应用程序的全新方式,同时极大地简化了传统分布式系统中的复杂性。

让我们详细解释其作用与优势:


Cloudflare Durable Objects 的核心作用 (Function)

作用概括: Durable Objects 允许你在 Cloudflare 的全球网络边缘创建单例(Singleton)有状态(Stateful)的 JavaScript/TypeScript 对象。每个 Durable Object 实例都拥有一个唯一的 ID独立的持久化存储。所有针对特定 ID 的请求都会被路由到同一个 Durable Object 实例上,无论这些请求来自全球何处。

具体解释:

  1. 有状态的计算单元:

    • 传统的无服务器(Serverless)函数(如 Cloudflare Workers 本身,或 AWS Lambda)通常是无状态的。这意味着每次请求都可能在一个全新的、独立的实例上运行,它们不保留之前的任何信息。

    • Durable Objects 解决了这个问题。每个 DO 实例都可以维护自己的内部状态(变量、内存数据),并且这个状态是持久的,即使在没有请求时也不会丢失。

    • 你可以把它想象成一个在 Cloudflare 边缘网络上运行的“微型数据库”或“微型服务实例”,它不仅能处理请求,还能记住自己的过去。

  2. 全球唯一的单例:

    • 这是 Durable Objects 最核心的特性。当你创建一个 Durable Object 并给它一个 ID(例如,一个聊天室的 ID,一个文档的 ID,一个游戏房间的 ID),Cloudflare 会确保全球范围内只有一个这个 ID 对应的 Durable Object 实例在运行。

    • 所有针对这个 ID 的请求(无论来自哪个地理位置)都会被智能地路由到这个唯一的实例上。这个实例通常会运行在离第一个请求发起者最近的 Cloudflare 数据中心,然后后续请求会“粘滞”到这个实例上。

  3. 内置的持久化存储:

    • 每个 Durable Object 实例都自带一个简单的键值(Key-Value)存储,可以直接在对象内部的代码中访问。这意味着你不需要单独连接外部数据库(如 Redis、PostgreSQL、DynamoDB)来存储这个对象的状态。

    • 这个存储是高度一致的,并且与对象实例本身紧密集成,提供了极低的读写延迟。


Cloudflare Durable Objects 的核心优势 (Advantages)

Cloudflare 官方描述中的“无需协调状态、无需独立存储、无需管理基础设施”正是其最突出的优势:

  1. 无需协调状态 (No need to coordinate state):

    • 传统问题: 在构建协作应用(如多人在线文档、聊天室、实时游戏)时,最大的挑战之一是如何管理和同步共享状态。当多个用户同时修改一个数据时,你需要复杂的分布式锁、事务、消息队列等机制来避免数据冲突和不一致,这非常复杂且容易出错。

    • DO 优势: 由于每个 Durable Object 都是一个全球唯一的单例,所有针对特定共享资源的请求都汇聚到这一个实例上。这意味着在这个实例内部,你可以像编写单线程应用一样管理状态,无需担心分布式并发控制问题。Durable Object 内部的请求是串行处理的,保证了数据的一致性。这极大地简化了复杂实时应用的开发。

  2. 无需独立存储 (No separate storage):

    • 传统问题: 无服务器函数通常是无状态的,因此任何需要持久化的数据都必须存储在外部数据库中。这增加了架构的复杂性(需要管理数据库连接、数据模型、额外的服务费用),并可能引入额外的网络延迟。

    • DO 优势: 每个 Durable Object 都内置了持久化的键值存储。这意味着对象的状态可以直接存储在与其计算逻辑紧密结合的地方,无需额外的数据库服务。这不仅简化了开发和部署,还因为数据与计算的紧密性而提供了极低的延迟

  3. 无需管理基础设施 (No infrastructure management):

    • 传统问题: 即使是“无服务器”服务,你可能仍然需要关心数据库的扩容、负载均衡、高可用性、灾备等问题。

    • DO 优势: Durable Objects 是 Cloudflare 完全托管的服务。你只需编写 JavaScript/TypeScript 代码,Cloudflare 负责处理所有的底层基础设施:对象的实例化、路由、状态的持久化、全球复制、故障恢复、扩容等。开发者可以完全专注于业务逻辑,而无需担心服务器、数据库或网络配置。

其他重要优势:

  • 卓越的实时性能: 由于对象实例运行在 Cloudflare 的全球边缘网络上,并且所有请求都路由到同一个实例,结合内置的低延迟存储,Durable Objects 非常适合构建需要毫秒级响应的实时应用。

  • 全球分布式与低延迟: 对象实例会智能地在离第一个请求最近的 Cloudflare 数据中心被激活,后续请求会“粘滞”到该实例。如果该数据中心出现问题,对象会自动迁移到其他健康的边缘节点,确保高可用性和低延迟。

  • 成本效益: 采用按请求和存储量计费的模式,没有闲置成本,非常适合间歇性或突发性负载的应用。

  • 简化开发模型: 将分布式系统的复杂性抽象化,让开发者能够以更直观、更接近单机应用的方式来思考和实现复杂的协作逻辑。


应用场景 (Use Cases)

正如描述中所说,Durable Objects 非常适合:

  • AI Agents (AI 代理): 每个 AI 代理可以是一个 Durable Object,维护其对话历史、用户偏好、学习状态等,实现有记忆和上下文的 AI 交互。

  • Collaborative Applications (协作应用):

    • 实时文档编辑: 每个文档可以是一个 Durable Object,管理所有用户的实时修改和同步。

    • 白板应用: 管理画布上的所有元素和用户操作。

  • Real-time Interactions like Chat (实时聊天):

    • 聊天室: 每个聊天室可以是一个 Durable Object,维护消息历史、在线用户列表,并向所有参与者广播新消息。

    • 私聊: 两个用户之间的私聊会话可以是一个 Durable Object。

  • Online Gaming (在线游戏):

    • 游戏房间: 每个游戏房间可以是一个 Durable Object,管理游戏状态、玩家位置、得分等,并处理玩家的实时操作。

    • 排行榜: 维护和更新实时排行榜。

  • IoT 设备状态管理: 每个 IoT 设备可以对应一个 Durable Object,存储其最新状态、传感器读数,并处理来自设备的命令。

  • 分布式锁/计数器: 利用其单例特性,轻松实现全局唯一的锁或计数器。

总而言之,Cloudflare Durable Objects 提供了一种革命性的方式来构建复杂的、有状态的、全球分布式的实时应用,它将传统上属于后端工程师的分布式系统难题,通过其独特的架构设计,极大地简化并推向了边缘计算的范畴。

举个例子

好的,我们来用一个“实时协作文档”的场景来详细说明 Cloudflare Durable Objects 的作用和代码实现。

场景: 多个用户可以同时打开同一个文档,并实时看到其他用户的修改。

核心思想:

  • 每个文档 对应一个 Durable Object 实例

  • 这个 Durable Object 实例负责维护该文档的最新内容

  • 所有连接到该文档的用户,都通过 WebSocket 连接到这个唯一的 Durable Object 实例。

  • 当一个用户修改文档时,其修改通过 WebSocket 发送给 Durable Object,Durable Object 更新内容并广播给所有其他连接的用户。


1. 项目结构

  
my-collaborative-doc/
  
├── src/
  
│   ├── worker.js         # Cloudflare Worker 入口,负责路由请求到 Durable Object
  
│   └── document_do.js    # Durable Object 的实现代码
  
├── public/
  
│   └── index.html        # 客户端网页,包含 JavaScript
  
└── wrangler.toml         # Cloudflare Workers 项目配置文件
  

2. wrangler.toml (Cloudflare Workers 配置)

这个文件告诉 Cloudflare 如何部署你的 Worker 和 Durable Object。

  
name = "my-collaborative-doc"
  
main = "src/worker.js"
  
compatibility_date = "2023-10-26" # 使用最新的兼容性日期
  

  
# 定义 Durable Object
  
[[durable_objects.bindings]]
  
name = "DOCUMENT_DO" # 在 worker.js 中通过 env.DOCUMENT_DO 访问
  
class_name = "DocumentDurableObject" # 对应 document_do.js 中导出的类名
  

  
# Durable Object 迁移(首次部署或修改 DO 类时需要)
  
[[migrations]]
  
tag = "v1" # 任意版本标签
  
new_classes = ["DocumentDurableObject"] # 首次部署时指定新的 DO 类
  

3. src/document_do.js (Durable Object 实现)

这是核心逻辑,每个文档实例都运行在这里。

  
// src/document_do.js
  

  
export class DocumentDurableObject {
  
    constructor(state, env) {
  
        this.state = state;
  
        this.env = env;
  
        this.content = ""; // 文档的当前内容
  
        this.websockets = new Set(); // 存储所有连接到此文档的 WebSocket 客户端
  

  
        // 从持久化存储中加载文档内容
  
        this.state.storage.get("content").then(storedContent => {
  
            if (storedContent) {
  
                this.content = storedContent;
  
            }
  
        });
  
    }
  

  
    // Durable Object 的主要入口点,处理所有传入请求
  
    async fetch(request) {
  
        // 确保请求是串行处理的,避免并发问题
  
        return this.state.blockConcurrencyWhile(async () => {
  
            const url = new URL(request.url);
  

  
            // 处理 WebSocket 连接请求
  
            if (url.pathname === "/websocket") {
  
                // 检查请求是否是 WebSocket 升级请求
  
                const upgradeHeader = request.headers.get("Upgrade");
  
                if (!upgradeHeader || upgradeHeader !== "websocket") {
  
                    return new Response("Expected Upgrade: websocket", { status: 426 });
  
                }
  

  
                // 创建 WebSocket 对
  
                const { 0: client, 1: server } = new WebSocketPair();
  

  
                // 将服务器端 WebSocket 添加到我们的集合中
  
                this.websockets.add(server);
  

  
                // 设置 WebSocket 事件监听器
  
                server.addEventListener("message", async event => {
  
                    // 收到客户端发送的文档更新
  
                    const newContent = event.data;
  
                    this.content = newContent; // 更新内存中的内容
  

  
                    // 持久化到存储
  
                    await this.state.storage.put("content", this.content);
  

  
                    // 广播给所有其他连接的客户端
  
                    this.broadcast(this.content, server);
  
                });
  

  
                server.addEventListener("close", evt => {
  
                    console.log(`WebSocket closed: ${evt.code} ${evt.reason}`);
  
                    this.websockets.delete(server); // 客户端断开连接,从集合中移除
  
                });
  

  
                server.addEventListener("error", err => {
  
                    console.error("WebSocket error:", err);
  
                    this.websockets.delete(server); // 发生错误,从集合中移除
  
                });
  

  
                // 首次连接时,将当前文档内容发送给新连接的客户端
  
                server.accept();
  
                server.send(this.content);
  

  
                // 返回客户端 WebSocket,完成升级
  
                return new Response(null, { status: 101, webSocket: client });
  

  
            } else if (url.pathname === "/content") {
  
                // 处理 HTTP GET 请求,获取文档内容
  
                if (request.method === "GET") {
  
                    return new Response(this.content, { headers: { "Content-Type": "text/plain" } });
  
                }
  
                // 处理 HTTP POST 请求,更新文档内容 (可选,WebSocket 更适合实时)
  
                else if (request.method === "POST") {
  
                    const newContent = await request.text();
  
                    this.content = newContent;
  
                    await this.state.storage.put("content", this.content);
  
                    this.broadcast(this.content); // 广播给所有连接的客户端
  
                    return new Response("Content updated", { status: 200 });
  
                }
  
            }
  

  
            return new Response("Not Found", { status: 404 });
  
        });
  
    }
  

  
    // 辅助方法:向所有连接的 WebSocket 广播消息
  
    broadcast(message, sender = null) {
  
        this.websockets.forEach(ws => {
  
            // 避免将消息发回给发送者,除非 sender 为 null (例如 HTTP POST 更新)
  
            if (ws !== sender) {
  
                try {
  
                    ws.send(message);
  
                } catch (err) {
  
                    console.error("Failed to send message to WebSocket:", err);
  
                    this.websockets.delete(ws); // 发送失败,移除该 WebSocket
  
                }
  
            }
  
        });
  
    }
  
}
  

代码解释:

  • constructor(state, env):

    • state: 提供了访问 Durable Object 存储 (state.storage) 和管理并发 (state.blockConcurrencyWhile) 的能力。

    • env: 包含 Worker 脚本中定义的任何环境变量(本例中未使用)。

    • this.content: 内存中存储的文档内容。

    • this.websockets: 一个 Set,用于跟踪所有连接到此 DO 实例的 WebSocket 连接。

    • 在构造函数中,尝试从 state.storage 加载之前保存的文档内容,确保持久性。

  • fetch(request):

    • 这是 Durable Object 的入口点,所有针对此 DO 实例的请求都会到达这里。

    • this.state.blockConcurrencyWhile(async () => { ... }): 非常重要! 这确保了对 Durable Object 的所有请求(包括 WebSocket 消息)都是串行处理的。这意味着你不需要担心并发读写 this.contentthis.websockets 导致的竞态条件,极大地简化了状态管理。

    • /websocket 路径: 处理 WebSocket 升级请求。

      • new WebSocketPair(): 创建一个 WebSocket 对,一个用于客户端,一个用于服务器端(即 DO 内部)。

      • this.websockets.add(server): 将服务器端 WebSocket 实例添加到集合中,以便后续广播。

      • server.addEventListener("message", ...): 监听客户端发送的消息。当客户端(用户)输入内容时,消息会发送到这里。DO 更新 this.content,将其持久化到 state.storage,然后调用 broadcast 方法。

      • server.addEventListener("close", ...) / server.addEventListener("error", ...): 处理 WebSocket 断开连接或错误,并从集合中移除对应的 WebSocket。

      • server.accept(): 接受 WebSocket 连接。

      • server.send(this.content): 首次连接时,将当前文档内容发送给新连接的客户端。

    • /content 路径 (可选): 提供了通过 HTTP GET 获取内容和 HTTP POST 更新内容的能力,但对于实时协作,WebSocket 是首选。

  • broadcast(message, sender = null):

    • 遍历 this.websockets 集合中的所有连接。

    • ws.send(message): 将更新后的文档内容发送给每个连接的客户端。

    • if (ws !== sender): 避免将消息发回给发送者,减少不必要的网络流量(尽管发送回去通常也无害)。

    • 错误处理:如果发送失败(例如,客户端已断开但尚未触发 close 事件),则从集合中移除该 WebSocket。


4. src/worker.js (Cloudflare Worker 入口)

这个 Worker 负责接收来自客户端的请求,并将其路由到正确的 Durable Object 实例。

  
// src/worker.js
  

  
export default {
  
    async fetch(request, env, ctx) {
  
        const url = new URL(request.url);
  

  
        // 假设 URL 结构是 /docs/<document_id>/websocket 或 /docs/<document_id>/content
  
        const pathParts = url.pathname.split("/");
  
        if (pathParts.length < 3 || pathParts[1] !== "docs") {
  
            return new Response("Invalid URL. Expected /docs/<document_id>/...", { status: 400 });
  
        }
  

  
        const documentId = pathParts[2]; // 从 URL 中提取文档 ID
  
        const subPath = "/" + pathParts.slice(3).join("/"); // 提取子路径,如 /websocket 或 /content
  

  
        // 获取 Durable Object ID
  
        // idFromName 确保对于同一个 documentId,总是得到同一个 Durable Object 实例
  
        const id = env.DOCUMENT_DO.idFromName(documentId);
  

  
        // 获取 Durable Object 的 stub (存根)
  
        // stub 是一个代理对象,允许你向 Durable Object 发送请求
  
        const stub = env.DOCUMENT_DO.get(id);
  

  
        // 将原始请求转发给 Durable Object
  
        // 注意:这里需要修改请求的 URL,使其只包含 Durable Object 内部的路径
  
        request.url = new URL(subPath, request.url).toString(); // 比如,将 /docs/doc123/websocket 变为 /websocket
  

  
        return stub.fetch(request);
  
    },
  
};
  

代码解释:

  • fetch(request, env, ctx): Worker 的入口函数。

  • URL 解析: 从请求 URL 中提取 documentId。例如,如果请求是 https://your-worker.your-domain.com/docs/my-unique-doc-id/websocket,那么 documentId 将是 my-unique-doc-id

  • env.DOCUMENT_DO.idFromName(documentId): 这是关键!它根据一个字符串名称(documentId)生成一个 Durable Object ID。对于相同的名称,它总是返回相同的 ID。 这确保了所有针对 my-unique-doc-id 的请求都会被路由到同一个 Durable Object 实例。

  • env.DOCUMENT_DO.get(id): 获取一个 Durable Object 的“存根”(stub)。这个存根是一个代理对象,你可以通过它向实际的 Durable Object 实例发送请求。

  • request.url = new URL(subPath, request.url).toString();: 在将请求转发给 Durable Object 之前,我们修改了请求的 URL。Durable Object 内部只关心 /websocket/content 这样的路径,而不关心前面的 /docs/<document_id> 部分。

  • stub.fetch(request): 将修改后的请求转发给 Durable Object 实例。Cloudflare 的网络会自动将这个请求路由到正确的 Durable Object 实例所在的边缘数据中心。


5. public/index.html (客户端网页)

这个 HTML 文件包含一个 textarea 和一些 JavaScript,用于连接 WebSocket 并发送/接收文档更新。

  
<!DOCTYPE html>
  
<html lang="en">
  
<head>
  
    <meta charset="UTF-8">
  
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
  
    <title>Collaborative Document</title>
  
    <style>
  
        body { font-family: sans-serif; margin: 20px; }
  
        textarea { width: 80%; height: 400px; padding: 10px; font-size: 16px; border: 1px solid #ccc; }
  
        #status { margin-top: 10px; color: green; }
  
    </style>
  
</head>
  
<body>
  
    <h1>Collaborative Document</h1>
  
    <p>Open this page in multiple tabs/browsers to see real-time collaboration.</p>
  
    <p>Document ID: <span id="docIdDisplay"></span></p>
  
    <textarea id="documentContent"></textarea>
  
    <div id="status">Connecting...</div>
  

  
    <script>
  
        const docIdDisplay = document.getElementById('docIdDisplay');
  
        const documentContent = document.getElementById('documentContent');
  
        const statusDiv = document.getElementById('status');
  

  
        // 从 URL 中获取文档 ID,例如:http://localhost:8787/docs/my-first-doc
  
        const pathParts = window.location.pathname.split('/');
  
        const documentId = pathParts[2] || 'default-doc'; // 如果没有指定,使用默认ID
  
        docIdDisplay.textContent = documentId;
  

  
        // 构建 WebSocket URL
  
        // 假设你的 Worker 部署在当前域名下,并且 WebSocket 路径是 /docs/<document_id>/websocket
  
        const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
  
        const wsUrl = `${wsProtocol}//${window.location.host}/docs/${documentId}/websocket`;
  

  
        let ws;
  
        let debounceTimeout;
  

  
        function connectWebSocket() {
  
            statusDiv.textContent = 'Connecting...';
  
            ws = new WebSocket(wsUrl);
  

  
            ws.onopen = () => {
  
                statusDiv.textContent = 'Connected!';
  
                console.log('WebSocket connected.');
  
            };
  

  
            ws.onmessage = (event) => {
  
                // 收到来自 Durable Object 的文档更新
  
                const receivedContent = event.data;
  
                // 只有当内容不同时才更新,避免光标跳动
  
                if (documentContent.value !== receivedContent) {
  
                    const cursorStart = documentContent.selectionStart;
  
                    const cursorEnd = documentContent.selectionEnd;
  
                    documentContent.value = receivedContent;
  
                    // 尝试恢复光标位置
  
                    documentContent.setSelectionRange(cursorStart, cursorEnd);
  
                }
  
            };
  

  
            ws.onclose = (event) => {
  
                statusDiv.textContent = `Disconnected: ${event.code} ${event.reason}. Reconnecting in 3s...`;
  
                console.log('WebSocket disconnected:', event);
  
                setTimeout(connectWebSocket, 3000); // 尝试重连
  
            };
  

  
            ws.onerror = (error) => {
  
                statusDiv.textContent = 'WebSocket error. Reconnecting in 3s...';
  
                console.error('WebSocket error:', error);
  
                ws.close(); // 强制关闭以触发 onclose 和重连
  
            };
  
        }
  

  
        // 当用户在 textarea 中输入时,发送更新
  
        documentContent.addEventListener('input', () => {
  
            clearTimeout(debounceTimeout);
  
            debounceTimeout = setTimeout(() => {
  
                if (ws && ws.readyState === WebSocket.OPEN) {
  
                    ws.send(documentContent.value);
  
                }
  
            }, 200); // 200ms 防抖,避免频繁发送
  
        });
  

  
        // 页面加载时连接 WebSocket
  
        connectWebSocket();
  
    </script>
  
</body>
  
</html>
  

代码解释:

  • 获取 documentId: 从当前 URL 的路径中提取文档 ID。

  • 构建 WebSocket URL: 根据当前页面的协议和主机,以及文档 ID,构建连接到 Worker 的 WebSocket URL。

  • connectWebSocket():

    • 创建 WebSocket 实例。

    • ws.onopen: 连接成功时,更新状态。

    • ws.onmessage: 收到来自 Durable Object 的消息时(即其他用户的修改),更新 textarea 的内容。这里做了简单的光标位置恢复,但更复杂的协作编辑器会使用 Operational Transformation (OT) 或 Conflict-free Replicated Data Types (CRDTs) 来处理更精细的同步和光标管理。

    • ws.onclose / ws.onerror: 处理连接断开或错误,并尝试在几秒后重连。

  • documentContent.addEventListener('input', ...):

    • 当用户在 textarea 中输入时触发。

    • 使用**防抖(debounce)**技术:在用户停止输入一小段时间(200ms)后才发送内容,避免每次按键都发送消息,减少网络流量。

    • ws.send(documentContent.value): 将 textarea 的当前全部内容发送给 Durable Object。


6. 部署和测试

  1. 安装 Wrangler CLI:

    
    npm install -g wrangler
    
  2. 登录 Cloudflare:

    
    wrangler login
    
  3. 在项目根目录运行部署命令:

    
    wrangler deploy
    

    首次部署时,Wrangler 会提示你确认 Durable Object 的迁移。

  4. 访问: 部署成功后,Wrangler 会给你一个 Worker 的 URL,例如 https://my-collaborative-doc.<your-worker-name>.workers.dev

    • 你可以通过 https://my-collaborative-doc.<your-worker-name>.workers.dev/docs/my-first-doc 访问你的协作文档。

    • 在不同的浏览器标签页或不同的设备上打开相同的 URL,然后尝试在其中一个 textarea 中输入内容,你会看到其他标签页/设备上的内容实时更新。


总结与优势体现

通过这个例子,我们可以看到 Cloudflare Durable Objects 如何简化了实时协作应用的开发:

  1. 无需协调状态: DocumentDurableObject 内部的 this.contentthis.websockets 都是单线程访问的,因为 state.blockConcurrencyWhile 保证了所有请求的串行执行。开发者无需编写复杂的分布式锁或事务逻辑。

  2. 无需独立存储: 文档内容直接通过 this.state.storage.put()this.state.storage.get() 在 Durable Object 内部进行持久化,无需配置和管理外部数据库。

  3. 无需管理基础设施: 你只需编写业务逻辑代码,Cloudflare 负责 Durable Object 的实例化、路由、高可用性、全球分布和持久化。你不需要关心服务器、负载均衡、数据库集群等。

  4. 实时性: WebSocket 连接直接建立到文档对应的 Durable Object 实例,且该实例通常运行在离用户最近的边缘,提供了极低的延迟,实现了真正的实时协作。

  5. 可伸缩性: 不同的文档会对应不同的 Durable Object 实例,它们可以独立地在 Cloudflare 的全球网络上运行和扩展。

这个例子虽然简单,但它展示了 Durable Objects 在构建复杂、有状态、实时、全球分布式应用方面的强大能力和开发体验的巨大提升。