逐句解释 src/chatroom_do.js 文件中的代码。这个文件定义了一个 Cloudflare Durable Object (DO),用于实现持久化的聊天室功能,包括消息存储、用户状态管理和 WebSocket 连接处理。
// src/chatroom_do.js (Corrected and Final Version)
// src/chatroom_do.js (Corrected and Final Version): 注释,表明这是聊天室 Durable Object 的源文件,并且是经过修正的最终版本。
// 定义应用层协议的消息类型
const MSG_TYPE_CHAT = 'chat';
const MSG_TYPE_DELETE = 'delete';
const MSG_TYPE_RENAME = 'rename';
const MSG_TYPE_SYSTEM_STATE = 'system_state';
const MSG_TYPE_HISTORY = 'history';
const MSG_TYPE_OFFER = 'offer';
const MSG_TYPE_ANSWER = 'answer';
const MSG_TYPE_CANDIDATE = 'candidate';
const MSG_TYPE_CALL_END = 'call_end';
-
// 定义应用层协议的消息类型: 注释,说明以下常量定义了客户端和服务器之间通过 WebSocket 传输的不同消息类型。 -
const MSG_TYPE_CHAT = 'chat';: 表示普通聊天消息。 -
const MSG_TYPE_DELETE = 'delete';: 表示删除消息的请求。 -
const MSG_TYPE_RENAME = 'rename';: 表示用户改名的请求。 -
const MSG_TYPE_SYSTEM_STATE = 'system_state';: 表示系统状态更新(例如在线用户列表)。 -
const MSG_TYPE_HISTORY = 'history';: 表示请求或发送历史消息。 -
const MSG_TYPE_OFFER = 'offer';: WebRTC 信令消息,表示发起方发送的 SDP Offer。 -
const MSG_TYPE_ANSWER = 'answer';: WebRTC 信令消息,表示接收方发送的 SDP Answer。 -
const MSG_TYPE_CANDIDATE = 'candidate';: WebRTC 信令消息,表示 ICE Candidate(网络连接信息)。 -
const MSG_TYPE_CALL_END = 'call_end';: WebRTC 信令消息,表示通话结束。
export class HibernatingChatRoom {
constructor(state, env) {
this.state = state;
this.env = env;
// 在实例首次创建时,将内存状态初始化为 null。
// 这是 `loadState` 函数判断是否需要从持久化存储加载数据的关键。
this.messages = null;
this.userStats = null;
}
-
export class HibernatingChatRoom { ... }: 定义并导出了HibernatingChatRoom类。这是 Cloudflare Durable Object 的核心类,每个聊天室实例都是这个类的一个对象。 -
constructor(state, env) { ... }: 类的构造函数,在 Durable Object 实例被 Cloudflare 平台创建时调用。-
this.state = state;:state对象是 Durable Object 运行时提供的,它包含了state.storage(用于持久化存储)和state.getWebSockets()等方法,用于管理 WebSocket 连接。 -
this.env = env;:env对象包含了在 Cloudflare Worker 配置中绑定的环境变量和资源(例如 R2 存储桶)。 -
this.messages = null;: 初始化messages数组为null。messages将用于存储聊天消息历史。将其初始化为null是一个重要的标志,表示当前内存中还没有加载持久化数据。 -
this.userStats = null;: 初始化userStats为null。userStats将用于存储用户的统计数据(如消息数量、在线时长)。同样,null标志着数据尚未从持久化存储加载。
-
/**
* 从持久化存储加载状态到内存中。
* 这个函数是幂等的:在 Durable Object 实例的生命周期内,它只会真正执行一次加载操作。
* 后续的调用会因为 this.messages 不再是 null 而直接返回。
*/
// 新的、更稳健的 loadState
async loadState() {
if (this.messages === null) { // 只在实例生命周期内从存储加载一次
console.log("State not in memory. Loading from storage...");
const data = await this.state.storage.get(["messages", "userStats"]);
this.messages = data.get("messages") || [];
const storedUserStats = data.get("userStats");
console.log('Raw storedUserStats from storage:', storedUserStats); // Add this line
// 更稳健地处理:确保 storedUserStats 是一个对象才进行转换
if (storedUserStats && typeof storedUserStats === 'object') {
this.userStats = new Map(Object.entries(storedUserStats));
} else {
this.userStats = new Map(); // 如果数据损坏或不存在,则重新开始
}
console.log(`State loaded. Messages: ${this.messages.length}, Users: ${this.userStats.size}`);
console.log('Loaded userStats:', JSON.stringify(Object.fromEntries(this.userStats))); // Add this line
}
}
-
async loadState() { ... }: 异步函数,负责从 Durable Object 的持久化存储中加载聊天室的状态。-
if (this.messages === null) { ... }: 核心逻辑:这个条件确保loadState在 Durable Object 实例的整个生命周期中只执行一次实际的加载操作。一旦this.messages被赋值(不再是null),后续调用将直接跳过加载逻辑,提高效率。 -
console.log("State not in memory. Loading from storage...");: 打印日志,表示正在从存储加载数据。 -
const data = await this.state.storage.get(["messages", "userStats"]);: 使用this.state.storage.get()方法从持久化存储中获取名为 "messages" 和 "userStats" 的数据。 -
this.messages = data.get("messages") || [];: 从获取到的数据中取出 "messages",如果不存在则默认为一个空数组。 -
const storedUserStats = data.get("userStats");: 从获取到的数据中取出 "userStats"。 -
console.log('Raw storedUserStats from storage:', storedUserStats);: 调试日志,显示从存储中读取的原始userStats数据。 -
if (storedUserStats && typeof storedUserStats === 'object') { this.userStats = new Map(Object.entries(storedUserStats)); } else { this.userStats = new Map(); }: 关键修复:Durable Object 的state.storage默认会将Map对象序列化为普通 JavaScript 对象。因此,在加载时,需要将存储的普通对象(如果存在且是对象类型)转换回Map对象,以便在内存中正确使用。如果数据损坏或不存在,则初始化为空Map。 -
console.log(State loaded. Messages: {this.userStats.size});: 打印加载状态的摘要。 -
console.log('Loaded userStats:', JSON.stringify(Object.fromEntries(this.userStats)));: 调试日志,显示加载并转换后的userStatsMap 的内容。
-
/**
* 将当前内存中的状态写入持久化存储。
* 这是一个“直写(write-through)”策略,确保每次状态变更都持久化,防止数据丢失。
*/
// 这是最终修复后的版本
async saveState() {
if (this.messages === null) {
console.warn("Attempted to save state before loading. Aborting save.");
return;
}
// ***核心改动在这里***
// 在保存前,明确地将 Map 转换为普通的、JSON友好的对象。
// 这保证了存储操作的稳定性和可预测性。
const serializableUserStats = Object.fromEntries(this.userStats);
console.log("Saving state to storage...");
console.log('Saving userStats:', JSON.stringify(serializableUserStats)); // Add this line
try {
await this.state.storage.put({
"messages": this.messages,
"userStats": serializableUserStats // 保存转换后的普通对象,而不是 Map
});
console.log("State saved successfully.");
} catch(e) {
console.error("Failed to save state:", e);
}
}
-
async saveState() { ... }: 异步函数,负责将当前内存中的聊天室状态保存到持久化存储。-
if (this.messages === null) { ... }: 检查是否在状态加载之前尝试保存,如果是则发出警告并中止保存,因为此时内存中的数据可能不完整或不正确。 -
const serializableUserStats = Object.fromEntries(this.userStats);: 核心改动:在保存userStats之前,将其从Map对象转换回普通的 JavaScript 对象。这是因为state.storage.put()更适合存储普通的 JSON 可序列化对象,而不是Map实例。Object.fromEntries()是Object.entries()的逆操作。 -
console.log("Saving state to storage...");: 打印日志,表示正在保存状态。 -
console.log('Saving userStats:', JSON.stringify(serializableUserStats));: 调试日志,显示即将保存的userStats对象。 -
try { await this.state.storage.put({ ... }); ... } catch(e) { ... }: 使用this.state.storage.put()方法将messages数组和转换后的userStats对象保存到持久化存储。使用try...catch块捕获并处理保存过程中可能发生的错误。
-
// Main fetch handler - 这是所有外部请求(包括WebSocket升级)的入口
async fetch(request) {
// **核心修复**: 在处理任何请求之前,首先确保状态已从存储中加载。
// 这保证了即使DO刚刚从休眠中唤醒,它也能拥有正确的历史数据。
await this.loadState();
const url = new URL(request.url);
// --- 新增:处理 /history-messages 路径 ---
if (url.pathname === '/history-messages') {
return new Response(JSON.stringify(this.messages), {
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*', // 或者更严格地设置为 'https://chats.want.biz'
'Access-Control-Allow-Methods': 'GET,HEAD,POST,OPTIONS',
'Access-Control-Max-Age': '86400',
},
});
}
const upgradeHeader = request.headers.get("Upgrade");
if (upgradeHeader !== "websocket") {
return new Response("Expected Upgrade: websocket", { status: 426 });
}
const username = url.searchParams.get("username") || "Anonymous";
const { 0: client, 1: server } = new WebSocketPair();
this.state.acceptWebSocket(server, [username]);
return new Response(null, { status: 101, webSocket: client });
}
-
async fetch(request) { ... }: 这是 Durable Object 的主要入口点,处理所有发送到该 DO 实例的 HTTP 请求(包括 WebSocket 升级请求)。-
await this.loadState();: 核心修复:在处理任何请求之前,强制加载 Durable Object 的状态。这确保了无论 DO 实例是首次创建还是从休眠中唤醒,它都能在处理请求时拥有最新的持久化数据。 -
const url = new URL(request.url);: 解析请求的 URL。 -
if (url.pathname === '/history-messages') { ... }: 处理/history-messages路径的 GET 请求。这是为了让外部 Worker 或客户端可以通过 HTTP 请求获取聊天历史。return new Response(JSON.stringify(this.messages), { headers: { ... } });: 返回包含所有聊天消息的 JSON 响应,并设置适当的 CORS 头部。
-
const upgradeHeader = request.headers.get("Upgrade");: 获取请求头中的Upgrade字段。 -
if (upgradeHeader !== "websocket") { ... }: 检查Upgrade头部是否为 "websocket"。如果不是,则返回 426 状态码(Upgrade Required),表示期望 WebSocket 连接。 -
const username = url.searchParams.get("username") || "Anonymous";: 从 URL 查询参数中获取用户名,如果未提供则默认为 "Anonymous"。 -
const { 0: client, 1: server } = new WebSocketPair();: 创建一个 WebSocketPair 对象。client是一个WebSocket对象,可以返回给客户端;server是一个WebSocket对象,用于在 Durable Object 内部与客户端通信。 -
this.state.acceptWebSocket(server, [username]);: 接受 WebSocket 连接。server端 WebSocket 被 Durable Object 接管,并将其与提供的标签(这里是用户名)关联起来。 -
return new Response(null, { status: 101, webSocket: client });: 返回一个 HTTP 101 (Switching Protocols) 响应,将client端 WebSocket 返回给客户端,完成 WebSocket 升级握手。
-
// --- WebSocket Handlers ---
// 新的、带安全检查的 webSocketOpen
async webSocketOpen(ws) {
await this.loadState(); // 确保状态已加载
const username = this.state.getTags(ws)[0];
console.log(`WebSocket opened for: ${username}`);
console.log(`Username from tags: ${username}`); // Added log
// *** 核心增加部分:安全检查 ***
// 这是一个保险措施,确保 this.userStats 绝对是 Map 类型
if (!(this.userStats instanceof Map)) {
console.error("CRITICAL: this.userStats is not a Map after loading! Re-initializing.");
this.userStats = new Map();
}
let stats = this.userStats.get(username) || { messageCount: 0, totalOnlineDuration: 0 };
console.log(`User ${username} stats before update:`, JSON.stringify(stats));
stats.lastSeen = Date.now();
stats.onlineSessions = (stats.onlineSessions || 0) + 1;
if (stats.onlineSessions === 1) {
stats.currentSessionStart = Date.now();
}
this.userStats.set(username, stats);
console.log(`User ${username} stats after update:`, JSON.stringify(this.userStats.get(username)));
console.log(`userStats map size after webSocketOpen: ${this.userStats.size}`); // Added log
this.broadcastSystemState();
await this.saveState(); // Ensure state is saved after userStats update
}
-
async webSocketOpen(ws) { ... }: 当一个新的 WebSocket 连接成功建立时,Durable Object 会自动调用此方法。-
await this.loadState();: 确保在处理 WebSocket 连接打开事件时,Durable Object 的状态是最新的。 -
const username = this.state.getTags(ws)[0];: 从 WebSocket 关联的标签中获取用户名。 -
console.log(...): 调试日志。 -
if (!(this.userStats instanceof Map)) { ... }: 核心增加部分:一个防御性编程检查,确保this.userStats确实是一个Map实例。如果由于某种原因(例如存储数据损坏或反序列化问题)它不是Map,则重新初始化为一个新的Map,防止后续操作出错。 -
let stats = this.userStats.get(username) || { messageCount: 0, totalOnlineDuration: 0 };: 获取该用户的统计数据,如果不存在则初始化一个新对象。 -
stats.lastSeen = Date.now();: 更新用户最后在线时间。 -
stats.onlineSessions = (stats.onlineSessions || 0) + 1;: 增加在线会话计数。 -
if (stats.onlineSessions === 1) { stats.currentSessionStart = Date.now(); }: 如果这是用户当前唯一的在线会话,记录会话开始时间。 -
this.userStats.set(username, stats);: 更新userStatsMap 中该用户的统计数据。 -
this.broadcastSystemState();: 广播系统状态(例如更新在线用户列表)给所有连接的客户端。 -
await this.saveState();: 在用户统计数据更新后,将状态保存到持久化存储。
-
async webSocketMessage(ws, message) {
await this.loadState();
const username = this.state.getTags(ws)[0];
const user = { ws, username };
try {
const data = JSON.parse(message);
switch (data.type) {
case MSG_TYPE_CHAT:
await this.handleChatMessage(user, data.payload);
break;
case MSG_TYPE_DELETE:
await this.handleDeleteMessage(data.payload);
break;
case MSG_TYPE_RENAME:
await this.handleRename(user, data.payload);
break;
// WebRTC 信号转发逻辑保持不变
case MSG_TYPE_OFFER: this.handleOffer(user, data.payload); break;
case MSG_TYPE_ANSWER: this.handleAnswer(user, data.payload); break;
case MSG_TYPE_CANDIDATE: this.handleCandidate(user, data.payload); break;
case MSG_TYPE_CALL_END: this.handleCallEnd(user, data.payload); break;
default: console.warn('Unknown message type:', data.type);
}
} catch (err) {
console.error('Failed to handle message:', err);
this.sendMessage(ws, { type: 'error', payload: { message: '消息处理失败' } });
}
}
-
async webSocketMessage(ws, message) { ... }: 当 Durable Object 接收到来自客户端的 WebSocket 消息时,自动调用此方法。-
await this.loadState();: 确保状态已加载。 -
const username = this.state.getTags(ws)[0];: 获取发送消息的用户名。 -
const user = { ws, username };: 创建一个包含 WebSocket 和用户名的对象,方便后续处理。 -
try { ... } catch (err) { ... }: 捕获消息处理过程中的错误。 -
const data = JSON.parse(message);: 将接收到的 JSON 字符串消息解析为 JavaScript 对象。 -
switch (data.type) { ... }: 根据消息的type字段,将消息分发到不同的处理函数。-
MSG_TYPE_CHAT: 调用handleChatMessage处理聊天消息。 -
MSG_TYPE_DELETE: 调用handleDeleteMessage处理删除消息请求。 -
MSG_TYPE_RENAME: 调用handleRename处理改名请求。 -
WebRTC 信令消息 (
OFFER,ANSWER,CANDIDATE,CALL_END): 调用相应的handle...函数进行转发。 -
default: 如果消息类型未知,则打印警告。
-
-
console.error('Failed to handle message:', err);: 打印错误信息。 -
this.sendMessage(ws, { type: 'error', payload: { message: '消息处理失败' } });: 向发送方客户端发送一个错误消息。
-
async webSocketClose(ws, code, reason, wasClean) {
await this.loadState();
const username = this.state.getTags(ws)[0];
console.log(`WebSocket closed for: ${username}`);
let stats = this.userStats.get(username);
if (stats) {
console.log(`User ${username} stats before close update:`, JSON.stringify(stats));
stats.lastSeen = Date.now();
stats.onlineSessions = (stats.onlineSessions || 1) - 1;
if (stats.onlineSessions === 0 && stats.currentSessionStart) {
stats.totalOnlineDuration += (Date.now() - stats.currentSessionStart);
delete stats.currentSessionStart;
}
this.userStats.set(username, stats);
console.log(`User ${username} stats after close update:`, JSON.stringify(this.userStats.get(username)));
console.log(`userStats map size after webSocketClose: ${this.userStats.size}`); // Added log
}
this.broadcastSystemState();
await this.saveState();
}
-
async webSocketClose(ws, code, reason, wasClean) { ... }: 当 WebSocket 连接关闭时,Durable Object 会自动调用此方法。-
await this.loadState();: 确保状态已加载。 -
const username = this.state.getTags(ws)[0];: 获取关闭连接的用户名。 -
let stats = this.userStats.get(username);: 获取该用户的统计数据。 -
if (stats) { ... }: 如果找到了用户的统计数据:-
stats.lastSeen = Date.now();: 更新最后在线时间。 -
stats.onlineSessions = (stats.onlineSessions || 1) - 1;: 减少在线会话计数。 -
if (stats.onlineSessions === 0 && stats.currentSessionStart) { ... }: 如果所有会话都已关闭,计算并累加本次会话的总在线时长,并清除currentSessionStart。 -
this.userStats.set(username, stats);: 更新userStatsMap。
-
-
this.broadcastSystemState();: 广播系统状态,通知其他客户端该用户可能已离线。 -
await this.saveState();: 将更新后的用户统计数据保存到持久化存储。
-
async webSocketError(ws, error) {
// loadState 不是必须的,因为 webSocketClose 会被调用
const username = this.state.getTags(ws)[0];
console.error(`WebSocket error for user ${username}:`, error);
}
-
async webSocketError(ws, error) { ... }: 当 WebSocket 连接发生错误时,Durable Object 会自动调用此方法。-
const username = this.state.getTags(ws)[0];: 获取发生错误的用户。 -
console.error(...): 打印错误信息。这里不需要loadState,因为通常错误发生后会紧接着调用webSocketClose,而webSocketClose会处理状态加载和保存。
-
// --- Core Logic (所有核心逻辑函数也需要先加载状态) ---
async handleChatMessage(user, payload) {
await this.loadState();
try {
let message;
if (payload.type === 'image') {
message = await this.#processImageMessage(user, payload);
} else if (payload.type === 'audio') {
message = await this.#processAudioMessage(user, payload);
} else {
message = this.#processTextMessage(user, payload);
}
this.messages.push(message);
if (this.messages.length > 100) this.messages.shift();
let stats = this.userStats.get(user.username);
if (stats) {
stats.messageCount = (stats.messageCount || 0) + 1;
this.userStats.set(user.username, stats);
console.log(`User ${user.username} messageCount updated to: ${stats.messageCount}`);
console.log(`userStats map size after handleChatMessage: ${this.userStats.size}`); // Added log
}
this.broadcast({ type: MSG_TYPE_CHAT, payload: message });
this.broadcastSystemState(); // Add this line to update active users after a new message
await this.saveState(); // Ensure state is saved after userStats update
} catch (error) {
console.error('处理聊天消息失败:', error);
this.sendMessage(user.ws, { type: 'error', payload: { message: `消息发送失败: ${error.message}` } });
}
}
-
async handleChatMessage(user, payload) { ... }: 处理聊天消息的核心逻辑。-
await this.loadState();: 确保状态已加载。 -
try { ... } catch (error) { ... }: 捕获消息处理过程中的错误。 -
if (payload.type === 'image') { ... } else if (payload.type === 'audio') { ... } else { ... }: 根据消息类型(文本、图片、音频)调用不同的私有处理方法 (#process...Message) 来构建消息对象。 -
this.messages.push(message);: 将新消息添加到messages数组。 -
if (this.messages.length > 100) this.messages.shift();: 限制消息历史的长度为 100 条,超出则移除最旧的消息。 -
let stats = this.userStats.get(user.username); if (stats) { ... }: 更新发送消息用户的消息计数。 -
this.broadcast({ type: MSG_TYPE_CHAT, payload: message });: 将新消息广播给所有连接的客户端。 -
this.broadcastSystemState();: 广播系统状态,确保在线用户列表等信息及时更新。 -
await this.saveState();: 将更新后的消息历史和用户统计数据保存到持久化存储。 -
this.sendMessage(user.ws, { type: 'error', payload: { message:消息发送失败: ${error.message}} });: 如果处理失败,向发送方发送错误消息。
-
// ... (所有 #process... 和 handle... 方法保持不变,因为它们都被 `await this.loadState()` 保护了) ...
#processTextMessage(user, payload) {
return {
id: crypto.randomUUID(),
username: user.username,
timestamp: Date.now(),
text: payload.text,
};
}
async #processImageMessage(user, payload) {
const imageUrl = await this.uploadImageToR2(payload.image, payload.filename);
return {
id: crypto.randomUUID(),
username: user.username,
timestamp: Date.now(),
type: 'image',
imageUrl,
filename: payload.filename,
size: payload.size,
caption: payload.caption || ''
};
}
async #processAudioMessage(user, payload) {
const audioUrl = await this.uploadAudioToR2(payload.audio, payload.filename, payload.mimeType);
return {
id: crypto.randomUUID(),
username: user.username,
timestamp: Date.now(),
type: 'audio',
audioUrl,
filename: payload.filename,
size: payload.size,
};
}
-
#processTextMessage(user, payload) { ... }: 私有方法,用于创建文本消息对象。生成唯一 ID、记录用户名、时间戳和文本内容。 -
async #processImageMessage(user, payload) { ... }: 私有异步方法,用于处理图片消息。-
const imageUrl = await this.uploadImageToR2(payload.image, payload.filename);: 调用uploadImageToR2方法将图片数据上传到 R2 存储桶,并获取图片 URL。 -
返回包含图片 URL、文件名、大小和可选标题的图片消息对象。
-
-
async #processAudioMessage(user, payload) { ... }: 私有异步方法,用于处理音频消息。-
const audioUrl = await this.uploadAudioToR2(payload.audio, payload.filename, payload.mimeType);: 调用uploadAudioToR2方法将音频数据上传到 R2 存储桶,并获取音频 URL。 -
返回包含音频 URL、文件名、大小和 MIME 类型的音频消息对象。
-
async handleDeleteMessage(payload) {
await this.loadState();
this.messages = this.messages.filter(m => m.id !== payload.id);
this.broadcast({ type: MSG_TYPE_DELETE, payload: payload });
await this.saveState();
}
-
async handleDeleteMessage(payload) { ... }: 处理删除消息的请求。-
await this.loadState();: 确保状态已加载。 -
this.messages = this.messages.filter(m => m.id !== payload.id);: 从messages数组中过滤掉指定 ID 的消息。 -
this.broadcast({ type: MSG_TYPE_DELETE, payload: payload });: 广播删除消息事件给所有客户端,通知它们更新 UI。 -
await this.saveState();: 将更新后的消息历史保存到持久化存储。
-
async handleRename(user, payload) {
await this.loadState();
const oldUsername = user.username;
const newUsername = payload.newUsername;
if (oldUsername === newUsername) return;
const socketsToUpdate = this.state.getWebSockets(oldUsername);
for (const sock of socketsToUpdate) {
this.state.setTags(sock, [newUsername]);
}
if (this.userStats.has(oldUsername)) {
const stats = this.userStats.get(oldUsername);
const existingNewStats = this.userStats.get(newUsername) || { messageCount: 0, totalOnlineDuration: 0 };
existingNewStats.messageCount += stats.messageCount || 0;
existingNewStats.totalOnlineDuration += stats.totalOnlineDuration || 0;
if (stats.onlineSessions > 0) {
existingNewStats.onlineSessions = (existingNewStats.onlineSessions || 0) + stats.onlineSessions;
existingNewStats.currentSessionStart = stats.currentSessionStart;
}
this.userStats.set(newUsername, existingNewStats);
this.userStats.delete(oldUsername);
}
this.messages.forEach(msg => {
if (msg.username === oldUsername) msg.username = newUsername;
});
this.broadcastSystemState();
// this.broadcast({ type: MSG_TYPE_HISTORY, payload: this.messages }); // Removed, as history is fetched via HTTP
await this.saveState();
}
-
async handleRename(user, payload) { ... }: 处理用户改名的请求。-
await this.loadState();: 确保状态已加载。 -
const oldUsername = user.username; const newUsername = payload.newUsername;: 获取旧用户名和新用户名。 -
if (oldUsername === newUsername) return;: 如果新旧用户名相同,则不做任何操作。 -
const socketsToUpdate = this.state.getWebSockets(oldUsername); for (const sock of socketsToUpdate) { this.state.setTags(sock, [newUsername]); }: 获取所有与旧用户名关联的 WebSocket 连接,并更新它们的标签为新用户名。 -
if (this.userStats.has(oldUsername)) { ... }: 如果旧用户名存在于userStats中:-
将旧用户的统计数据(消息计数、在线时长、在线会话数)合并到新用户的统计数据中(如果新用户已存在,则累加;否则创建新条目)。
-
this.userStats.set(newUsername, existingNewStats);: 更新新用户的统计数据。 -
this.userStats.delete(oldUsername);: 删除旧用户的统计数据。
-
-
this.messages.forEach(msg => { if (msg.username === oldUsername) msg.username = newUsername; });: 遍历所有历史消息,将旧用户名替换为新用户名。 -
this.broadcastSystemState();: 广播系统状态,通知所有客户端用户列表已更新。 -
await this.saveState();: 将更新后的用户统计数据和消息历史保存到持久化存储。
-
// --- 辅助方法 ---
getWsByUsername(username) {
const wss = this.state.getWebSockets(username);
return wss.length > 0 ? wss[0] : null;
}
handleOffer(fromUser, payload) {
const targetWs = this.getWsByUsername(payload.target);
if (!targetWs) return console.warn(`用户 ${payload.target} 不在线,无法转发 offer`);
this.sendMessage(targetWs, { type: MSG_TYPE_OFFER, payload: { from: fromUser.username, sdp: payload.sdp } });
}
handleAnswer(fromUser, payload) {
const targetWs = this.getWsByUsername(payload.target);
if (!targetWs) return console.warn(`用户 ${payload.target} 不在线,无法转发 answer`);
this.sendMessage(targetWs, { type: MSG_TYPE_ANSWER, payload: { from: fromUser.username, sdp: payload.sdp } });
}
handleCandidate(fromUser, payload) {
const targetWs = this.getWsByUsername(payload.target);
if (!targetWs) return console.warn(`用户 ${payload.target} 不在线,无法转发 candidate`);
this.sendMessage(targetWs, { type: MSG_TYPE_CANDIDATE, payload: { from: fromUser.username, candidate: payload.candidate } });
}
handleCallEnd(fromUser, payload) {
const targetWs = this.getWsByUsername(payload.target);
if (!targetWs) return console.warn(`用户 ${payload.target} 不在线,无法转发 call_end`);
this.sendMessage(targetWs, { type: MSG_TYPE_CALL_END, payload: { from: fromUser.username } });
}
-
getWsByUsername(username) { ... }: 辅助方法,根据用户名获取其对应的第一个 WebSocket 连接。 -
handleOffer(fromUser, payload) { ... }: 处理 WebRTC SDP Offer 消息。查找目标用户的 WebSocket,并将 Offer 转发给它。 -
handleAnswer(fromUser, payload) { ... }: 处理 WebRTC SDP Answer 消息。查找目标用户的 WebSocket,并将 Answer 转发给它。 -
handleCandidate(fromUser, payload) { ... }: 处理 WebRTC ICE Candidate 消息。查找目标用户的 WebSocket,并将 Candidate 转发给它。 -
handleCallEnd(fromUser, payload) { ... }: 处理 WebRTC 通话结束消息。查找目标用户的 WebSocket,并将结束通知转发给它。
async uploadImageToR2(imageData, filename) {
try {
const base64Data = imageData.split(',')[1];
if (!base64Data) throw new Error('无效的图片数据');
const imageBuffer = Uint8Array.from(atob(base64Data), c => c.charCodeAt(0));
const fileExtension = filename.split('.').pop() || 'jpg';
const key = `chat-images/${Date.now()}-${crypto.randomUUID()}.${fileExtension}`;
await this.env.R2_BUCKET.put(key, imageBuffer, {
httpMetadata: { contentType: this.getContentType(fileExtension), cacheControl: 'public, max-age=31536000' },
});
return `https://pub-8dfbdda6df204465aae771b4c080140b.r2.dev/${key}`;
} catch (error) {
console.error('R2 上传失败:', error);
throw new Error(`图片上传失败: ${error.message}`);
}
}
getContentType(extension) {
const contentTypes = { 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'png': 'image/png', 'gif': 'image/gif', 'webp': 'image/webp' };
return contentTypes[extension.toLowerCase()] || 'image/jpeg';
}
async uploadAudioToR2(audioData, filename, mimeType) {
try {
const base64Data = audioData.split(',')[1];
if (!base64Data) throw new Error('无效的音频数据');
const audioBuffer = Uint8Array.from(atob(base64Data), c => c.charCodeAt(0));
const fileExtension = filename.split('.').pop() || 'bin';
const key = `chat-audio/${Date.now()}-${crypto.randomUUID()}.${fileExtension}`;
await this.env.R2_BUCKET.put(key, audioBuffer, {
httpMetadata: { contentType: mimeType || 'application/octet-stream', cacheControl: 'public, max-age=31536000' },
});
return `https://pub-8dfbdda6df204465aae771b4c080140b.r2.dev/${key}`;
} catch (error) {
console.error('R2 音频上传失败:', error);
throw new Error(`音频上传失败: ${error.message}`);
}
}
-
async uploadImageToR2(imageData, filename) { ... }: 异步方法,将 Base64 编码的图片数据上传到 Cloudflare R2 存储桶。-
const base64Data = imageData.split(',')[1];: 从data:image/...;base64,...格式中提取 Base64 数据部分。 -
const imageBuffer = Uint8Array.from(atob(base64Data), c => c.charCodeAt(0));: 使用atob解码 Base64 字符串为二进制数据,并转换为Uint8Array。 -
const key =chat-images/{crypto.randomUUID()}.${fileExtension};: 生成一个唯一的 R2 对象键(文件名),包含时间戳和 UUID。 -
await this.env.R2_BUCKET.put(key, imageBuffer, { ... });: 使用env.R2_BUCKET.put()方法将图片数据上传到 R2。httpMetadata设置了Content-Type和缓存控制。 -
returnhttps://pub-8dfbdda6df204465aae771b4c080140b.r2.dev/${key}`;`: 返回 R2 对象的公共访问 URL。 -
try...catch: 捕获并处理上传错误。
-
-
getContentType(extension) { ... }: 辅助方法,根据文件扩展名返回对应的 MIME 类型。 -
async uploadAudioToR2(audioData, filename, mimeType) { ... }: 异步方法,将 Base64 编码的音频数据上传到 Cloudflare R2 存储桶。逻辑与uploadImageToR2类似,只是处理的是音频数据。
sendMessage(ws, message) {
try {
ws.send(JSON.stringify(message));
} catch (err) {
console.error('Failed to send message:', err);
}
}
broadcast(message) {
for (const ws of this.state.getWebSockets()) {
this.sendMessage(ws, message);
}
}
broadcastSystemState() {
// 确保状态已加载,以防万一
if (!this.userStats) return;
// 1. 获取当前所有在线用户的名字 (基于WebSocket连接)
const onlineUsernames = new Set();
for (const ws of this.state.getWebSockets()) {
const username = this.state.getTags(ws)[0];
if (username) {
onlineUsernames.add(username);
}
}
// 2. 构建完整的用户列表,包含在线状态
// 遍历所有已知用户(包括不活跃的),并标记其在线状态
const userList = Array.from(this.userStats.keys()).map(username => {
return { username, isOnline: onlineUsernames.has(username) };
});
// 3. 广播这个列表
this.broadcast({
type: MSG_TYPE_SYSTEM_STATE,
payload: { users: userList } // 前端会根据这个列表来更新UI
});
}
}
-
sendMessage(ws, message) { ... }: 辅助方法,用于向单个 WebSocket 连接发送 JSON 消息。包含try...catch确保发送失败时不会崩溃。 -
broadcast(message) { ... }: 辅助方法,用于向所有当前连接到此 Durable Object 实例的 WebSocket 客户端广播消息。它遍历this.state.getWebSockets()返回的所有 WebSocket 连接,并调用sendMessage。 -
broadcastSystemState() { ... }: 广播系统状态(主要是在线用户列表)给所有客户端。-
if (!this.userStats) return;: 防御性检查,确保userStats已加载。 -
const onlineUsernames = new Set(); for (const ws of this.state.getWebSockets()) { ... }: 遍历所有当前活跃的 WebSocket 连接,从它们的标签中提取用户名,并存储在一个Set中,以获取唯一的在线用户列表。 -
const userList = Array.from(this.userStats.keys()).map(username => { ... });: 遍历this.userStats中所有已知用户的用户名(包括在线和离线的),为每个用户创建一个对象,包含其用户名和isOnline状态(通过检查其是否在onlineUsernamesSet 中)。 -
this.broadcast({ type: MSG_TYPE_SYSTEM_STATE, payload: { users: userList } });: 将包含完整用户列表(及其在线状态)的系统状态消息广播给所有客户端。
-
总结:
chatroom_do.js 文件是 Cloudflare Durable Object 的核心实现,它:
-
管理持久化状态:通过
loadState和saveState方法,将聊天消息 (messages) 和用户统计数据 (userStats) 在内存和 Durable Object 的持久化存储之间同步。特别处理了Map对象在存储时的序列化和反序列化问题。 -
处理 WebSocket 连接:作为 WebSocket 服务器,处理连接的建立 (
fetch和webSocketOpen)、消息的接收 (webSocketMessage) 和连接的关闭 (webSocketClose)。 -
实现聊天室核心功能:包括发送/接收文本、图片、音频消息,删除消息,用户改名等。
-
维护用户统计:记录用户的消息数量、在线会话数和总在线时长。
-
支持 WebRTC 信令:转发 WebRTC 的 SDP Offer/Answer 和 ICE Candidate 消息,实现点对点通信的信令交换。
-
集成 R2 存储:提供将图片和音频文件上传到 Cloudflare R2 的功能。
-
广播机制:能够向所有连接的客户端广播消息和系统状态更新(如在线用户列表)。
-
HTTP 接口:除了 WebSocket,还提供了一个 HTTP GET 接口
/history-messages用于获取历史消息。
这个 Durable Object 实例为每个聊天室提供了独立的、持久化的状态和实时通信能力,并且能够从休眠中快速恢复,确保了聊天室的连续性和数据完整性。