简介
Cloudflare Durable Objects (DOs) 是 Cloudflare Workers 生态系统中的一个强大组件,它为开发者提供了一种构建有状态(Stateful)、实时(Real-time)、全球分布式应用程序的全新方式,同时极大地简化了传统分布式系统中的复杂性。
让我们详细解释其作用与优势:
Cloudflare Durable Objects 的核心作用 (Function)
作用概括: Durable Objects 允许你在 Cloudflare 的全球网络边缘创建单例(Singleton)、有状态(Stateful)的 JavaScript/TypeScript 对象。每个 Durable Object 实例都拥有一个唯一的 ID 和独立的持久化存储。所有针对特定 ID 的请求都会被路由到同一个 Durable Object 实例上,无论这些请求来自全球何处。
具体解释:
-
有状态的计算单元:
-
传统的无服务器(Serverless)函数(如 Cloudflare Workers 本身,或 AWS Lambda)通常是无状态的。这意味着每次请求都可能在一个全新的、独立的实例上运行,它们不保留之前的任何信息。
-
Durable Objects 解决了这个问题。每个 DO 实例都可以维护自己的内部状态(变量、内存数据),并且这个状态是持久的,即使在没有请求时也不会丢失。
-
你可以把它想象成一个在 Cloudflare 边缘网络上运行的“微型数据库”或“微型服务实例”,它不仅能处理请求,还能记住自己的过去。
-
-
全球唯一的单例:
-
这是 Durable Objects 最核心的特性。当你创建一个 Durable Object 并给它一个 ID(例如,一个聊天室的 ID,一个文档的 ID,一个游戏房间的 ID),Cloudflare 会确保全球范围内只有一个这个 ID 对应的 Durable Object 实例在运行。
-
所有针对这个 ID 的请求(无论来自哪个地理位置)都会被智能地路由到这个唯一的实例上。这个实例通常会运行在离第一个请求发起者最近的 Cloudflare 数据中心,然后后续请求会“粘滞”到这个实例上。
-
-
内置的持久化存储:
-
每个 Durable Object 实例都自带一个简单的键值(Key-Value)存储,可以直接在对象内部的代码中访问。这意味着你不需要单独连接外部数据库(如 Redis、PostgreSQL、DynamoDB)来存储这个对象的状态。
-
这个存储是高度一致的,并且与对象实例本身紧密集成,提供了极低的读写延迟。
-
Cloudflare Durable Objects 的核心优势 (Advantages)
Cloudflare 官方描述中的“无需协调状态、无需独立存储、无需管理基础设施”正是其最突出的优势:
-
无需协调状态 (No need to coordinate state):
-
传统问题: 在构建协作应用(如多人在线文档、聊天室、实时游戏)时,最大的挑战之一是如何管理和同步共享状态。当多个用户同时修改一个数据时,你需要复杂的分布式锁、事务、消息队列等机制来避免数据冲突和不一致,这非常复杂且容易出错。
-
DO 优势: 由于每个 Durable Object 都是一个全球唯一的单例,所有针对特定共享资源的请求都汇聚到这一个实例上。这意味着在这个实例内部,你可以像编写单线程应用一样管理状态,无需担心分布式并发控制问题。Durable Object 内部的请求是串行处理的,保证了数据的一致性。这极大地简化了复杂实时应用的开发。
-
-
无需独立存储 (No separate storage):
-
传统问题: 无服务器函数通常是无状态的,因此任何需要持久化的数据都必须存储在外部数据库中。这增加了架构的复杂性(需要管理数据库连接、数据模型、额外的服务费用),并可能引入额外的网络延迟。
-
DO 优势: 每个 Durable Object 都内置了持久化的键值存储。这意味着对象的状态可以直接存储在与其计算逻辑紧密结合的地方,无需额外的数据库服务。这不仅简化了开发和部署,还因为数据与计算的紧密性而提供了极低的延迟。
-
-
无需管理基础设施 (No infrastructure management):
-
传统问题: 即使是“无服务器”服务,你可能仍然需要关心数据库的扩容、负载均衡、高可用性、灾备等问题。
-
DO 优势: Durable Objects 是 Cloudflare 完全托管的服务。你只需编写 JavaScript/TypeScript 代码,Cloudflare 负责处理所有的底层基础设施:对象的实例化、路由、状态的持久化、全球复制、故障恢复、扩容等。开发者可以完全专注于业务逻辑,而无需担心服务器、数据库或网络配置。
-
其他重要优势:
-
卓越的实时性能: 由于对象实例运行在 Cloudflare 的全球边缘网络上,并且所有请求都路由到同一个实例,结合内置的低延迟存储,Durable Objects 非常适合构建需要毫秒级响应的实时应用。
-
全球分布式与低延迟: 对象实例会智能地在离第一个请求最近的 Cloudflare 数据中心被激活,后续请求会“粘滞”到该实例。如果该数据中心出现问题,对象会自动迁移到其他健康的边缘节点,确保高可用性和低延迟。
-
成本效益: 采用按请求和存储量计费的模式,没有闲置成本,非常适合间歇性或突发性负载的应用。
-
简化开发模型: 将分布式系统的复杂性抽象化,让开发者能够以更直观、更接近单机应用的方式来思考和实现复杂的协作逻辑。
应用场景 (Use Cases)
正如描述中所说,Durable Objects 非常适合:
-
AI Agents (AI 代理): 每个 AI 代理可以是一个 Durable Object,维护其对话历史、用户偏好、学习状态等,实现有记忆和上下文的 AI 交互。
-
Collaborative Applications (协作应用):
-
实时文档编辑: 每个文档可以是一个 Durable Object,管理所有用户的实时修改和同步。
-
白板应用: 管理画布上的所有元素和用户操作。
-
-
Real-time Interactions like Chat (实时聊天):
-
聊天室: 每个聊天室可以是一个 Durable Object,维护消息历史、在线用户列表,并向所有参与者广播新消息。
-
私聊: 两个用户之间的私聊会话可以是一个 Durable Object。
-
-
Online Gaming (在线游戏):
-
游戏房间: 每个游戏房间可以是一个 Durable Object,管理游戏状态、玩家位置、得分等,并处理玩家的实时操作。
-
排行榜: 维护和更新实时排行榜。
-
-
IoT 设备状态管理: 每个 IoT 设备可以对应一个 Durable Object,存储其最新状态、传感器读数,并处理来自设备的命令。
-
分布式锁/计数器: 利用其单例特性,轻松实现全局唯一的锁或计数器。
总而言之,Cloudflare Durable Objects 提供了一种革命性的方式来构建复杂的、有状态的、全球分布式的实时应用,它将传统上属于后端工程师的分布式系统难题,通过其独特的架构设计,极大地简化并推向了边缘计算的范畴。
举个例子
好的,我们来用一个“实时协作文档”的场景来详细说明 Cloudflare Durable Objects 的作用和代码实现。
场景: 多个用户可以同时打开同一个文档,并实时看到其他用户的修改。
核心思想:
-
每个文档 对应一个 Durable Object 实例。
-
这个 Durable Object 实例负责维护该文档的最新内容。
-
所有连接到该文档的用户,都通过 WebSocket 连接到这个唯一的 Durable Object 实例。
-
当一个用户修改文档时,其修改通过 WebSocket 发送给 Durable Object,Durable Object 更新内容并广播给所有其他连接的用户。
1. 项目结构
my-collaborative-doc/
├── src/
│ ├── worker.js # Cloudflare Worker 入口,负责路由请求到 Durable Object
│ └── document_do.js # Durable Object 的实现代码
├── public/
│ └── index.html # 客户端网页,包含 JavaScript
└── wrangler.toml # Cloudflare Workers 项目配置文件
2. wrangler.toml (Cloudflare Workers 配置)
这个文件告诉 Cloudflare 如何部署你的 Worker 和 Durable Object。
name = "my-collaborative-doc"
main = "src/worker.js"
compatibility_date = "2023-10-26" # 使用最新的兼容性日期
# 定义 Durable Object
[[durable_objects.bindings]]
name = "DOCUMENT_DO" # 在 worker.js 中通过 env.DOCUMENT_DO 访问
class_name = "DocumentDurableObject" # 对应 document_do.js 中导出的类名
# Durable Object 迁移(首次部署或修改 DO 类时需要)
[[migrations]]
tag = "v1" # 任意版本标签
new_classes = ["DocumentDurableObject"] # 首次部署时指定新的 DO 类
3. src/document_do.js (Durable Object 实现)
这是核心逻辑,每个文档实例都运行在这里。
// src/document_do.js
export class DocumentDurableObject {
constructor(state, env) {
this.state = state;
this.env = env;
this.content = ""; // 文档的当前内容
this.websockets = new Set(); // 存储所有连接到此文档的 WebSocket 客户端
// 从持久化存储中加载文档内容
this.state.storage.get("content").then(storedContent => {
if (storedContent) {
this.content = storedContent;
}
});
}
// Durable Object 的主要入口点,处理所有传入请求
async fetch(request) {
// 确保请求是串行处理的,避免并发问题
return this.state.blockConcurrencyWhile(async () => {
const url = new URL(request.url);
// 处理 WebSocket 连接请求
if (url.pathname === "/websocket") {
// 检查请求是否是 WebSocket 升级请求
const upgradeHeader = request.headers.get("Upgrade");
if (!upgradeHeader || upgradeHeader !== "websocket") {
return new Response("Expected Upgrade: websocket", { status: 426 });
}
// 创建 WebSocket 对
const { 0: client, 1: server } = new WebSocketPair();
// 将服务器端 WebSocket 添加到我们的集合中
this.websockets.add(server);
// 设置 WebSocket 事件监听器
server.addEventListener("message", async event => {
// 收到客户端发送的文档更新
const newContent = event.data;
this.content = newContent; // 更新内存中的内容
// 持久化到存储
await this.state.storage.put("content", this.content);
// 广播给所有其他连接的客户端
this.broadcast(this.content, server);
});
server.addEventListener("close", evt => {
console.log(`WebSocket closed: ${evt.code} ${evt.reason}`);
this.websockets.delete(server); // 客户端断开连接,从集合中移除
});
server.addEventListener("error", err => {
console.error("WebSocket error:", err);
this.websockets.delete(server); // 发生错误,从集合中移除
});
// 首次连接时,将当前文档内容发送给新连接的客户端
server.accept();
server.send(this.content);
// 返回客户端 WebSocket,完成升级
return new Response(null, { status: 101, webSocket: client });
} else if (url.pathname === "/content") {
// 处理 HTTP GET 请求,获取文档内容
if (request.method === "GET") {
return new Response(this.content, { headers: { "Content-Type": "text/plain" } });
}
// 处理 HTTP POST 请求,更新文档内容 (可选,WebSocket 更适合实时)
else if (request.method === "POST") {
const newContent = await request.text();
this.content = newContent;
await this.state.storage.put("content", this.content);
this.broadcast(this.content); // 广播给所有连接的客户端
return new Response("Content updated", { status: 200 });
}
}
return new Response("Not Found", { status: 404 });
});
}
// 辅助方法:向所有连接的 WebSocket 广播消息
broadcast(message, sender = null) {
this.websockets.forEach(ws => {
// 避免将消息发回给发送者,除非 sender 为 null (例如 HTTP POST 更新)
if (ws !== sender) {
try {
ws.send(message);
} catch (err) {
console.error("Failed to send message to WebSocket:", err);
this.websockets.delete(ws); // 发送失败,移除该 WebSocket
}
}
});
}
}
代码解释:
-
constructor(state, env):-
state: 提供了访问 Durable Object 存储 (state.storage) 和管理并发 (state.blockConcurrencyWhile) 的能力。 -
env: 包含 Worker 脚本中定义的任何环境变量(本例中未使用)。 -
this.content: 内存中存储的文档内容。 -
this.websockets: 一个Set,用于跟踪所有连接到此 DO 实例的 WebSocket 连接。 -
在构造函数中,尝试从
state.storage加载之前保存的文档内容,确保持久性。
-
-
fetch(request):-
这是 Durable Object 的入口点,所有针对此 DO 实例的请求都会到达这里。
-
this.state.blockConcurrencyWhile(async () => { ... }): 非常重要! 这确保了对 Durable Object 的所有请求(包括 WebSocket 消息)都是串行处理的。这意味着你不需要担心并发读写this.content或this.websockets导致的竞态条件,极大地简化了状态管理。 -
/websocket路径: 处理 WebSocket 升级请求。-
new WebSocketPair(): 创建一个 WebSocket 对,一个用于客户端,一个用于服务器端(即 DO 内部)。 -
this.websockets.add(server): 将服务器端 WebSocket 实例添加到集合中,以便后续广播。 -
server.addEventListener("message", ...): 监听客户端发送的消息。当客户端(用户)输入内容时,消息会发送到这里。DO 更新this.content,将其持久化到state.storage,然后调用broadcast方法。 -
server.addEventListener("close", ...)/server.addEventListener("error", ...): 处理 WebSocket 断开连接或错误,并从集合中移除对应的 WebSocket。 -
server.accept(): 接受 WebSocket 连接。 -
server.send(this.content): 首次连接时,将当前文档内容发送给新连接的客户端。
-
-
/content路径 (可选): 提供了通过 HTTP GET 获取内容和 HTTP POST 更新内容的能力,但对于实时协作,WebSocket 是首选。
-
-
broadcast(message, sender = null):-
遍历
this.websockets集合中的所有连接。 -
ws.send(message): 将更新后的文档内容发送给每个连接的客户端。 -
if (ws !== sender): 避免将消息发回给发送者,减少不必要的网络流量(尽管发送回去通常也无害)。 -
错误处理:如果发送失败(例如,客户端已断开但尚未触发
close事件),则从集合中移除该 WebSocket。
-
4. src/worker.js (Cloudflare Worker 入口)
这个 Worker 负责接收来自客户端的请求,并将其路由到正确的 Durable Object 实例。
// src/worker.js
export default {
async fetch(request, env, ctx) {
const url = new URL(request.url);
// 假设 URL 结构是 /docs/<document_id>/websocket 或 /docs/<document_id>/content
const pathParts = url.pathname.split("/");
if (pathParts.length < 3 || pathParts[1] !== "docs") {
return new Response("Invalid URL. Expected /docs/<document_id>/...", { status: 400 });
}
const documentId = pathParts[2]; // 从 URL 中提取文档 ID
const subPath = "/" + pathParts.slice(3).join("/"); // 提取子路径,如 /websocket 或 /content
// 获取 Durable Object ID
// idFromName 确保对于同一个 documentId,总是得到同一个 Durable Object 实例
const id = env.DOCUMENT_DO.idFromName(documentId);
// 获取 Durable Object 的 stub (存根)
// stub 是一个代理对象,允许你向 Durable Object 发送请求
const stub = env.DOCUMENT_DO.get(id);
// 将原始请求转发给 Durable Object
// 注意:这里需要修改请求的 URL,使其只包含 Durable Object 内部的路径
request.url = new URL(subPath, request.url).toString(); // 比如,将 /docs/doc123/websocket 变为 /websocket
return stub.fetch(request);
},
};
代码解释:
-
fetch(request, env, ctx): Worker 的入口函数。 -
URL 解析: 从请求 URL 中提取
documentId。例如,如果请求是https://your-worker.your-domain.com/docs/my-unique-doc-id/websocket,那么documentId将是my-unique-doc-id。 -
env.DOCUMENT_DO.idFromName(documentId): 这是关键!它根据一个字符串名称(documentId)生成一个 Durable Object ID。对于相同的名称,它总是返回相同的 ID。 这确保了所有针对my-unique-doc-id的请求都会被路由到同一个 Durable Object 实例。 -
env.DOCUMENT_DO.get(id): 获取一个 Durable Object 的“存根”(stub)。这个存根是一个代理对象,你可以通过它向实际的 Durable Object 实例发送请求。 -
request.url = new URL(subPath, request.url).toString();: 在将请求转发给 Durable Object 之前,我们修改了请求的 URL。Durable Object 内部只关心/websocket或/content这样的路径,而不关心前面的/docs/<document_id>部分。 -
stub.fetch(request): 将修改后的请求转发给 Durable Object 实例。Cloudflare 的网络会自动将这个请求路由到正确的 Durable Object 实例所在的边缘数据中心。
5. public/index.html (客户端网页)
这个 HTML 文件包含一个 textarea 和一些 JavaScript,用于连接 WebSocket 并发送/接收文档更新。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Collaborative Document</title>
<style>
body { font-family: sans-serif; margin: 20px; }
textarea { width: 80%; height: 400px; padding: 10px; font-size: 16px; border: 1px solid #ccc; }
#status { margin-top: 10px; color: green; }
</style>
</head>
<body>
<h1>Collaborative Document</h1>
<p>Open this page in multiple tabs/browsers to see real-time collaboration.</p>
<p>Document ID: <span id="docIdDisplay"></span></p>
<textarea id="documentContent"></textarea>
<div id="status">Connecting...</div>
<script>
const docIdDisplay = document.getElementById('docIdDisplay');
const documentContent = document.getElementById('documentContent');
const statusDiv = document.getElementById('status');
// 从 URL 中获取文档 ID,例如:http://localhost:8787/docs/my-first-doc
const pathParts = window.location.pathname.split('/');
const documentId = pathParts[2] || 'default-doc'; // 如果没有指定,使用默认ID
docIdDisplay.textContent = documentId;
// 构建 WebSocket URL
// 假设你的 Worker 部署在当前域名下,并且 WebSocket 路径是 /docs/<document_id>/websocket
const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${wsProtocol}//${window.location.host}/docs/${documentId}/websocket`;
let ws;
let debounceTimeout;
function connectWebSocket() {
statusDiv.textContent = 'Connecting...';
ws = new WebSocket(wsUrl);
ws.onopen = () => {
statusDiv.textContent = 'Connected!';
console.log('WebSocket connected.');
};
ws.onmessage = (event) => {
// 收到来自 Durable Object 的文档更新
const receivedContent = event.data;
// 只有当内容不同时才更新,避免光标跳动
if (documentContent.value !== receivedContent) {
const cursorStart = documentContent.selectionStart;
const cursorEnd = documentContent.selectionEnd;
documentContent.value = receivedContent;
// 尝试恢复光标位置
documentContent.setSelectionRange(cursorStart, cursorEnd);
}
};
ws.onclose = (event) => {
statusDiv.textContent = `Disconnected: ${event.code} ${event.reason}. Reconnecting in 3s...`;
console.log('WebSocket disconnected:', event);
setTimeout(connectWebSocket, 3000); // 尝试重连
};
ws.onerror = (error) => {
statusDiv.textContent = 'WebSocket error. Reconnecting in 3s...';
console.error('WebSocket error:', error);
ws.close(); // 强制关闭以触发 onclose 和重连
};
}
// 当用户在 textarea 中输入时,发送更新
documentContent.addEventListener('input', () => {
clearTimeout(debounceTimeout);
debounceTimeout = setTimeout(() => {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(documentContent.value);
}
}, 200); // 200ms 防抖,避免频繁发送
});
// 页面加载时连接 WebSocket
connectWebSocket();
</script>
</body>
</html>
代码解释:
-
获取
documentId: 从当前 URL 的路径中提取文档 ID。 -
构建 WebSocket URL: 根据当前页面的协议和主机,以及文档 ID,构建连接到 Worker 的 WebSocket URL。
-
connectWebSocket():-
创建
WebSocket实例。 -
ws.onopen: 连接成功时,更新状态。 -
ws.onmessage: 收到来自 Durable Object 的消息时(即其他用户的修改),更新textarea的内容。这里做了简单的光标位置恢复,但更复杂的协作编辑器会使用 Operational Transformation (OT) 或 Conflict-free Replicated Data Types (CRDTs) 来处理更精细的同步和光标管理。 -
ws.onclose/ws.onerror: 处理连接断开或错误,并尝试在几秒后重连。
-
-
documentContent.addEventListener('input', ...):-
当用户在
textarea中输入时触发。 -
使用**防抖(debounce)**技术:在用户停止输入一小段时间(200ms)后才发送内容,避免每次按键都发送消息,减少网络流量。
-
ws.send(documentContent.value): 将textarea的当前全部内容发送给 Durable Object。
-
6. 部署和测试
-
安装 Wrangler CLI:
npm install -g wrangler -
登录 Cloudflare:
wrangler login -
在项目根目录运行部署命令:
wrangler deploy首次部署时,Wrangler 会提示你确认 Durable Object 的迁移。
-
访问: 部署成功后,Wrangler 会给你一个 Worker 的 URL,例如
https://my-collaborative-doc.<your-worker-name>.workers.dev。-
你可以通过
https://my-collaborative-doc.<your-worker-name>.workers.dev/docs/my-first-doc访问你的协作文档。 -
在不同的浏览器标签页或不同的设备上打开相同的 URL,然后尝试在其中一个
textarea中输入内容,你会看到其他标签页/设备上的内容实时更新。
-
总结与优势体现
通过这个例子,我们可以看到 Cloudflare Durable Objects 如何简化了实时协作应用的开发:
-
无需协调状态:
DocumentDurableObject内部的this.content和this.websockets都是单线程访问的,因为state.blockConcurrencyWhile保证了所有请求的串行执行。开发者无需编写复杂的分布式锁或事务逻辑。 -
无需独立存储: 文档内容直接通过
this.state.storage.put()和this.state.storage.get()在 Durable Object 内部进行持久化,无需配置和管理外部数据库。 -
无需管理基础设施: 你只需编写业务逻辑代码,Cloudflare 负责 Durable Object 的实例化、路由、高可用性、全球分布和持久化。你不需要关心服务器、负载均衡、数据库集群等。
-
实时性: WebSocket 连接直接建立到文档对应的 Durable Object 实例,且该实例通常运行在离用户最近的边缘,提供了极低的延迟,实现了真正的实时协作。
-
可伸缩性: 不同的文档会对应不同的 Durable Object 实例,它们可以独立地在 Cloudflare 的全球网络上运行和扩展。
这个例子虽然简单,但它展示了 Durable Objects 在构建复杂、有状态、实时、全球分布式应用方面的强大能力和开发体验的巨大提升。