chatroom_do.js文件解读

逐句解释 src/chatroom_do.js 文件中的代码。这个文件定义了一个 Cloudflare Durable Object (DO),用于实现持久化的聊天室功能,包括消息存储、用户状态管理和 WebSocket 连接处理。


  
// src/chatroom_do.js (Corrected and Final Version)
  
  • // src/chatroom_do.js (Corrected and Final Version): 注释,表明这是聊天室 Durable Object 的源文件,并且是经过修正的最终版本。
  
// 定义应用层协议的消息类型
  
const MSG_TYPE_CHAT = 'chat';
  
const MSG_TYPE_DELETE = 'delete';
  
const MSG_TYPE_RENAME = 'rename';
  
const MSG_TYPE_SYSTEM_STATE = 'system_state';
  
const MSG_TYPE_HISTORY = 'history';
  
const MSG_TYPE_OFFER = 'offer';
  
const MSG_TYPE_ANSWER = 'answer';
  
const MSG_TYPE_CANDIDATE = 'candidate';
  
const MSG_TYPE_CALL_END = 'call_end';
  
  • // 定义应用层协议的消息类型: 注释,说明以下常量定义了客户端和服务器之间通过 WebSocket 传输的不同消息类型。

  • const MSG_TYPE_CHAT = 'chat';: 表示普通聊天消息。

  • const MSG_TYPE_DELETE = 'delete';: 表示删除消息的请求。

  • const MSG_TYPE_RENAME = 'rename';: 表示用户改名的请求。

  • const MSG_TYPE_SYSTEM_STATE = 'system_state';: 表示系统状态更新(例如在线用户列表)。

  • const MSG_TYPE_HISTORY = 'history';: 表示请求或发送历史消息。

  • const MSG_TYPE_OFFER = 'offer';: WebRTC 信令消息,表示发起方发送的 SDP Offer。

  • const MSG_TYPE_ANSWER = 'answer';: WebRTC 信令消息,表示接收方发送的 SDP Answer。

  • const MSG_TYPE_CANDIDATE = 'candidate';: WebRTC 信令消息,表示 ICE Candidate(网络连接信息)。

  • const MSG_TYPE_CALL_END = 'call_end';: WebRTC 信令消息,表示通话结束。

  
export class HibernatingChatRoom {
  
    constructor(state, env) {
  
        this.state = state;
  
        this.env = env;
  
        // 在实例首次创建时,将内存状态初始化为 null。
  
        // 这是 `loadState` 函数判断是否需要从持久化存储加载数据的关键。
  
        this.messages = null;
  
        this.userStats = null;
  
    }
  
  • export class HibernatingChatRoom { ... }: 定义并导出了 HibernatingChatRoom 类。这是 Cloudflare Durable Object 的核心类,每个聊天室实例都是这个类的一个对象。

  • constructor(state, env) { ... }: 类的构造函数,在 Durable Object 实例被 Cloudflare 平台创建时调用。

    • this.state = state;: state 对象是 Durable Object 运行时提供的,它包含了 state.storage(用于持久化存储)和 state.getWebSockets() 等方法,用于管理 WebSocket 连接。

    • this.env = env;: env 对象包含了在 Cloudflare Worker 配置中绑定的环境变量和资源(例如 R2 存储桶)。

    • this.messages = null;: 初始化 messages 数组为 nullmessages 将用于存储聊天消息历史。将其初始化为 null 是一个重要的标志,表示当前内存中还没有加载持久化数据。

    • this.userStats = null;: 初始化 userStatsnulluserStats 将用于存储用户的统计数据(如消息数量、在线时长)。同样,null 标志着数据尚未从持久化存储加载。

  
    /**
  
     * 从持久化存储加载状态到内存中。
  
     * 这个函数是幂等的:在 Durable Object 实例的生命周期内,它只会真正执行一次加载操作。
  
     * 后续的调用会因为 this.messages 不再是 null 而直接返回。
  
     */
  
// 新的、更稳健的 loadState
  
async loadState() {
  
    if (this.messages === null) { // 只在实例生命周期内从存储加载一次
  
        console.log("State not in memory. Loading from storage...");
  
        const data = await this.state.storage.get(["messages", "userStats"]);
  
        
  
        this.messages = data.get("messages") || [];
  
        
  
        const storedUserStats = data.get("userStats");
  
        console.log('Raw storedUserStats from storage:', storedUserStats); // Add this line
  
        // 更稳健地处理:确保 storedUserStats 是一个对象才进行转换
  
        if (storedUserStats && typeof storedUserStats === 'object') {
  
            this.userStats = new Map(Object.entries(storedUserStats));
  
        } else {
  
            this.userStats = new Map(); // 如果数据损坏或不存在,则重新开始
  
        }
  

  
        console.log(`State loaded. Messages: ${this.messages.length}, Users: ${this.userStats.size}`);
  
        console.log('Loaded userStats:', JSON.stringify(Object.fromEntries(this.userStats))); // Add this line
  
    }
  
}
  
  • async loadState() { ... }: 异步函数,负责从 Durable Object 的持久化存储中加载聊天室的状态。

    • if (this.messages === null) { ... }: 核心逻辑:这个条件确保 loadState 在 Durable Object 实例的整个生命周期中只执行一次实际的加载操作。一旦 this.messages 被赋值(不再是 null),后续调用将直接跳过加载逻辑,提高效率。

    • console.log("State not in memory. Loading from storage...");: 打印日志,表示正在从存储加载数据。

    • const data = await this.state.storage.get(["messages", "userStats"]);: 使用 this.state.storage.get() 方法从持久化存储中获取名为 "messages" 和 "userStats" 的数据。

    • this.messages = data.get("messages") || [];: 从获取到的数据中取出 "messages",如果不存在则默认为一个空数组。

    • const storedUserStats = data.get("userStats");: 从获取到的数据中取出 "userStats"。

    • console.log('Raw storedUserStats from storage:', storedUserStats);: 调试日志,显示从存储中读取的原始 userStats 数据。

    • if (storedUserStats && typeof storedUserStats === 'object') { this.userStats = new Map(Object.entries(storedUserStats)); } else { this.userStats = new Map(); }: 关键修复:Durable Object 的 state.storage 默认会将 Map 对象序列化为普通 JavaScript 对象。因此,在加载时,需要将存储的普通对象(如果存在且是对象类型)转换回 Map 对象,以便在内存中正确使用。如果数据损坏或不存在,则初始化为空 Map

    • console.log(State loaded. Messages: this.messages.length,Users:{this.messages.length}, Users: {this.userStats.size});: 打印加载状态的摘要。

    • console.log('Loaded userStats:', JSON.stringify(Object.fromEntries(this.userStats)));: 调试日志,显示加载并转换后的 userStats Map 的内容。

  
    /**
  
     * 将当前内存中的状态写入持久化存储。
  
     * 这是一个“直写(write-through)”策略,确保每次状态变更都持久化,防止数据丢失。
  
     */
  
// 这是最终修复后的版本
  
async saveState() {
  
    if (this.messages === null) {
  
        console.warn("Attempted to save state before loading. Aborting save.");
  
        return;
  
    }
  

  
    // ***核心改动在这里***
  
    // 在保存前,明确地将 Map 转换为普通的、JSON友好的对象。
  
    // 这保证了存储操作的稳定性和可预测性。
  
    const serializableUserStats = Object.fromEntries(this.userStats);
  

  
    console.log("Saving state to storage...");
  
    console.log('Saving userStats:', JSON.stringify(serializableUserStats)); // Add this line
  
    try {
  
        await this.state.storage.put({
  
            "messages": this.messages,
  
            "userStats": serializableUserStats // 保存转换后的普通对象,而不是 Map
  
        });
  
        console.log("State saved successfully.");
  
    } catch(e) {
  
        console.error("Failed to save state:", e);
  
    }
  
}
  
  • async saveState() { ... }: 异步函数,负责将当前内存中的聊天室状态保存到持久化存储。

    • if (this.messages === null) { ... }: 检查是否在状态加载之前尝试保存,如果是则发出警告并中止保存,因为此时内存中的数据可能不完整或不正确。

    • const serializableUserStats = Object.fromEntries(this.userStats);: 核心改动:在保存 userStats 之前,将其从 Map 对象转换回普通的 JavaScript 对象。这是因为 state.storage.put() 更适合存储普通的 JSON 可序列化对象,而不是 Map 实例。Object.fromEntries()Object.entries() 的逆操作。

    • console.log("Saving state to storage...");: 打印日志,表示正在保存状态。

    • console.log('Saving userStats:', JSON.stringify(serializableUserStats));: 调试日志,显示即将保存的 userStats 对象。

    • try { await this.state.storage.put({ ... }); ... } catch(e) { ... }: 使用 this.state.storage.put() 方法将 messages 数组和转换后的 userStats 对象保存到持久化存储。使用 try...catch 块捕获并处理保存过程中可能发生的错误。

  
    // Main fetch handler - 这是所有外部请求(包括WebSocket升级)的入口
  
    async fetch(request) {
  
        // **核心修复**: 在处理任何请求之前,首先确保状态已从存储中加载。
  
        // 这保证了即使DO刚刚从休眠中唤醒,它也能拥有正确的历史数据。
  
        await this.loadState();
  

  
        const url = new URL(request.url);
  

  
        // --- 新增:处理 /history-messages 路径 ---
  
        if (url.pathname === '/history-messages') {
  
            return new Response(JSON.stringify(this.messages), {
  
                headers: { 
  
                    'Content-Type': 'application/json',
  
                    'Access-Control-Allow-Origin': '*', // 或者更严格地设置为 'https://chats.want.biz'
  
                    'Access-Control-Allow-Methods': 'GET,HEAD,POST,OPTIONS',
  
                    'Access-Control-Max-Age': '86400',
  
                },
  
            });
  
        }
  

  
        
  

  
        const upgradeHeader = request.headers.get("Upgrade");
  
        if (upgradeHeader !== "websocket") {
  
            return new Response("Expected Upgrade: websocket", { status: 426 });
  
        }
  

  
        const username = url.searchParams.get("username") || "Anonymous";
  
        const { 0: client, 1: server } = new WebSocketPair();
  
        this.state.acceptWebSocket(server, [username]);
  

  
        return new Response(null, { status: 101, webSocket: client });
  
    }
  
  • async fetch(request) { ... }: 这是 Durable Object 的主要入口点,处理所有发送到该 DO 实例的 HTTP 请求(包括 WebSocket 升级请求)。

    • await this.loadState();: 核心修复:在处理任何请求之前,强制加载 Durable Object 的状态。这确保了无论 DO 实例是首次创建还是从休眠中唤醒,它都能在处理请求时拥有最新的持久化数据。

    • const url = new URL(request.url);: 解析请求的 URL。

    • if (url.pathname === '/history-messages') { ... }: 处理 /history-messages 路径的 GET 请求。这是为了让外部 Worker 或客户端可以通过 HTTP 请求获取聊天历史。

      • return new Response(JSON.stringify(this.messages), { headers: { ... } });: 返回包含所有聊天消息的 JSON 响应,并设置适当的 CORS 头部。
    • const upgradeHeader = request.headers.get("Upgrade");: 获取请求头中的 Upgrade 字段。

    • if (upgradeHeader !== "websocket") { ... }: 检查 Upgrade 头部是否为 "websocket"。如果不是,则返回 426 状态码(Upgrade Required),表示期望 WebSocket 连接。

    • const username = url.searchParams.get("username") || "Anonymous";: 从 URL 查询参数中获取用户名,如果未提供则默认为 "Anonymous"。

    • const { 0: client, 1: server } = new WebSocketPair();: 创建一个 WebSocketPair 对象。client 是一个 WebSocket 对象,可以返回给客户端;server 是一个 WebSocket 对象,用于在 Durable Object 内部与客户端通信。

    • this.state.acceptWebSocket(server, [username]);: 接受 WebSocket 连接。server 端 WebSocket 被 Durable Object 接管,并将其与提供的标签(这里是用户名)关联起来。

    • return new Response(null, { status: 101, webSocket: client });: 返回一个 HTTP 101 (Switching Protocols) 响应,将 client 端 WebSocket 返回给客户端,完成 WebSocket 升级握手。

  
    // --- WebSocket Handlers ---
  

  
// 新的、带安全检查的 webSocketOpen
  
async webSocketOpen(ws) {
  
    await this.loadState(); // 确保状态已加载
  
    const username = this.state.getTags(ws)[0];
  
    console.log(`WebSocket opened for: ${username}`);
  
    console.log(`Username from tags: ${username}`); // Added log
  
    
  
    // *** 核心增加部分:安全检查 ***
  
    // 这是一个保险措施,确保 this.userStats 绝对是 Map 类型
  
    if (!(this.userStats instanceof Map)) {
  
        console.error("CRITICAL: this.userStats is not a Map after loading! Re-initializing.");
  
        this.userStats = new Map();
  
    }
  

  
    let stats = this.userStats.get(username) || { messageCount: 0, totalOnlineDuration: 0 };
  
    console.log(`User ${username} stats before update:`, JSON.stringify(stats));
  
    
  
    stats.lastSeen = Date.now();
  
    stats.onlineSessions = (stats.onlineSessions || 0) + 1;
  
    if (stats.onlineSessions === 1) {
  
        stats.currentSessionStart = Date.now();
  
    }
  
    this.userStats.set(username, stats);
  
    console.log(`User ${username} stats after update:`, JSON.stringify(this.userStats.get(username)));
  
    console.log(`userStats map size after webSocketOpen: ${this.userStats.size}`); // Added log
  
    
  
    this.broadcastSystemState();
  
    
  
    await this.saveState(); // Ensure state is saved after userStats update
  
}
  
  • async webSocketOpen(ws) { ... }: 当一个新的 WebSocket 连接成功建立时,Durable Object 会自动调用此方法。

    • await this.loadState();: 确保在处理 WebSocket 连接打开事件时,Durable Object 的状态是最新的。

    • const username = this.state.getTags(ws)[0];: 从 WebSocket 关联的标签中获取用户名。

    • console.log(...): 调试日志。

    • if (!(this.userStats instanceof Map)) { ... }: 核心增加部分:一个防御性编程检查,确保 this.userStats 确实是一个 Map 实例。如果由于某种原因(例如存储数据损坏或反序列化问题)它不是 Map,则重新初始化为一个新的 Map,防止后续操作出错。

    • let stats = this.userStats.get(username) || { messageCount: 0, totalOnlineDuration: 0 };: 获取该用户的统计数据,如果不存在则初始化一个新对象。

    • stats.lastSeen = Date.now();: 更新用户最后在线时间。

    • stats.onlineSessions = (stats.onlineSessions || 0) + 1;: 增加在线会话计数。

    • if (stats.onlineSessions === 1) { stats.currentSessionStart = Date.now(); }: 如果这是用户当前唯一的在线会话,记录会话开始时间。

    • this.userStats.set(username, stats);: 更新 userStats Map 中该用户的统计数据。

    • this.broadcastSystemState();: 广播系统状态(例如更新在线用户列表)给所有连接的客户端。

    • await this.saveState();: 在用户统计数据更新后,将状态保存到持久化存储。

  
    async webSocketMessage(ws, message) {
  
        await this.loadState();
  
        const username = this.state.getTags(ws)[0];
  
        const user = { ws, username };
  
        try {
  
            const data = JSON.parse(message);
  
            switch (data.type) {
  
                case MSG_TYPE_CHAT:
  
                    await this.handleChatMessage(user, data.payload);
  
                    break;
  
                case MSG_TYPE_DELETE:
  
                    await this.handleDeleteMessage(data.payload);
  
                    break;
  
                case MSG_TYPE_RENAME:
  
                    await this.handleRename(user, data.payload);
  
                    break;
  
                // WebRTC 信号转发逻辑保持不变
  
                case MSG_TYPE_OFFER: this.handleOffer(user, data.payload); break;
  
                case MSG_TYPE_ANSWER: this.handleAnswer(user, data.payload); break;
  
                case MSG_TYPE_CANDIDATE: this.handleCandidate(user, data.payload); break;
  
                case MSG_TYPE_CALL_END: this.handleCallEnd(user, data.payload); break;
  
                default: console.warn('Unknown message type:', data.type);
  
            }
  
        } catch (err) {
  
            console.error('Failed to handle message:', err);
  
            this.sendMessage(ws, { type: 'error', payload: { message: '消息处理失败' } });
  
        }
  
    }
  
  • async webSocketMessage(ws, message) { ... }: 当 Durable Object 接收到来自客户端的 WebSocket 消息时,自动调用此方法。

    • await this.loadState();: 确保状态已加载。

    • const username = this.state.getTags(ws)[0];: 获取发送消息的用户名。

    • const user = { ws, username };: 创建一个包含 WebSocket 和用户名的对象,方便后续处理。

    • try { ... } catch (err) { ... }: 捕获消息处理过程中的错误。

    • const data = JSON.parse(message);: 将接收到的 JSON 字符串消息解析为 JavaScript 对象。

    • switch (data.type) { ... }: 根据消息的 type 字段,将消息分发到不同的处理函数。

      • MSG_TYPE_CHAT: 调用 handleChatMessage 处理聊天消息。

      • MSG_TYPE_DELETE: 调用 handleDeleteMessage 处理删除消息请求。

      • MSG_TYPE_RENAME: 调用 handleRename 处理改名请求。

      • WebRTC 信令消息 (OFFER, ANSWER, CANDIDATE, CALL_END): 调用相应的 handle... 函数进行转发。

      • default: 如果消息类型未知,则打印警告。

    • console.error('Failed to handle message:', err);: 打印错误信息。

    • this.sendMessage(ws, { type: 'error', payload: { message: '消息处理失败' } });: 向发送方客户端发送一个错误消息。

  
    async webSocketClose(ws, code, reason, wasClean) {
  
        await this.loadState();
  
        const username = this.state.getTags(ws)[0];
  
        console.log(`WebSocket closed for: ${username}`);
  
        
  
        let stats = this.userStats.get(username);
  
        if (stats) {
  
            console.log(`User ${username} stats before close update:`, JSON.stringify(stats));
  
            stats.lastSeen = Date.now();
  
            stats.onlineSessions = (stats.onlineSessions || 1) - 1;
  
            if (stats.onlineSessions === 0 && stats.currentSessionStart) {
  
                stats.totalOnlineDuration += (Date.now() - stats.currentSessionStart);
  
                delete stats.currentSessionStart;
  
            }
  
            this.userStats.set(username, stats);
  
            console.log(`User ${username} stats after close update:`, JSON.stringify(this.userStats.get(username)));
  
            console.log(`userStats map size after webSocketClose: ${this.userStats.size}`); // Added log
  
        }
  
        
  
        this.broadcastSystemState();
  
        await this.saveState();
  
    }
  
  • async webSocketClose(ws, code, reason, wasClean) { ... }: 当 WebSocket 连接关闭时,Durable Object 会自动调用此方法。

    • await this.loadState();: 确保状态已加载。

    • const username = this.state.getTags(ws)[0];: 获取关闭连接的用户名。

    • let stats = this.userStats.get(username);: 获取该用户的统计数据。

    • if (stats) { ... }: 如果找到了用户的统计数据:

      • stats.lastSeen = Date.now();: 更新最后在线时间。

      • stats.onlineSessions = (stats.onlineSessions || 1) - 1;: 减少在线会话计数。

      • if (stats.onlineSessions === 0 && stats.currentSessionStart) { ... }: 如果所有会话都已关闭,计算并累加本次会话的总在线时长,并清除 currentSessionStart

      • this.userStats.set(username, stats);: 更新 userStats Map。

    • this.broadcastSystemState();: 广播系统状态,通知其他客户端该用户可能已离线。

    • await this.saveState();: 将更新后的用户统计数据保存到持久化存储。

  
    async webSocketError(ws, error) {
  
        // loadState 不是必须的,因为 webSocketClose 会被调用
  
        const username = this.state.getTags(ws)[0];
  
        console.error(`WebSocket error for user ${username}:`, error);
  
    }
  
  • async webSocketError(ws, error) { ... }: 当 WebSocket 连接发生错误时,Durable Object 会自动调用此方法。

    • const username = this.state.getTags(ws)[0];: 获取发生错误的用户。

    • console.error(...): 打印错误信息。这里不需要 loadState,因为通常错误发生后会紧接着调用 webSocketClose,而 webSocketClose 会处理状态加载和保存。

  
    // --- Core Logic (所有核心逻辑函数也需要先加载状态) ---
  

  
    async handleChatMessage(user, payload) {
  
        await this.loadState();
  
        try {
  
            let message;
  
            if (payload.type === 'image') {
  
                message = await this.#processImageMessage(user, payload);
  
            } else if (payload.type === 'audio') {
  
                message = await this.#processAudioMessage(user, payload);
  
            } else {
  
                message = this.#processTextMessage(user, payload);
  
            }
  
            this.messages.push(message);
  
            if (this.messages.length > 100) this.messages.shift();
  

  
            let stats = this.userStats.get(user.username);
  
            if (stats) {
  
                stats.messageCount = (stats.messageCount || 0) + 1;
  
                this.userStats.set(user.username, stats);
  
                console.log(`User ${user.username} messageCount updated to: ${stats.messageCount}`);
  
                console.log(`userStats map size after handleChatMessage: ${this.userStats.size}`); // Added log
  
            }
  
            this.broadcast({ type: MSG_TYPE_CHAT, payload: message });
  
            this.broadcastSystemState(); // Add this line to update active users after a new message
  
            await this.saveState(); // Ensure state is saved after userStats update
  
        } catch (error) {
  
            console.error('处理聊天消息失败:', error);
  
            this.sendMessage(user.ws, { type: 'error', payload: { message: `消息发送失败: ${error.message}` } });
  
        }
  
    }
  
  • async handleChatMessage(user, payload) { ... }: 处理聊天消息的核心逻辑。

    • await this.loadState();: 确保状态已加载。

    • try { ... } catch (error) { ... }: 捕获消息处理过程中的错误。

    • if (payload.type === 'image') { ... } else if (payload.type === 'audio') { ... } else { ... }: 根据消息类型(文本、图片、音频)调用不同的私有处理方法 (#process...Message) 来构建消息对象。

    • this.messages.push(message);: 将新消息添加到 messages 数组。

    • if (this.messages.length > 100) this.messages.shift();: 限制消息历史的长度为 100 条,超出则移除最旧的消息。

    • let stats = this.userStats.get(user.username); if (stats) { ... }: 更新发送消息用户的消息计数。

    • this.broadcast({ type: MSG_TYPE_CHAT, payload: message });: 将新消息广播给所有连接的客户端。

    • this.broadcastSystemState();: 广播系统状态,确保在线用户列表等信息及时更新。

    • await this.saveState();: 将更新后的消息历史和用户统计数据保存到持久化存储。

    • this.sendMessage(user.ws, { type: 'error', payload: { message: 消息发送失败: ${error.message} } });: 如果处理失败,向发送方发送错误消息。

  
    // ... (所有 #process... 和 handle... 方法保持不变,因为它们都被 `await this.loadState()` 保护了) ...
  

  
    #processTextMessage(user, payload) {
  
        return {
  
            id: crypto.randomUUID(),
  
            username: user.username,
  
            timestamp: Date.now(),
  
            text: payload.text,
  
        };
  
    }
  
    async #processImageMessage(user, payload) {
  
        const imageUrl = await this.uploadImageToR2(payload.image, payload.filename);
  
        return {
  
            id: crypto.randomUUID(),
  
            username: user.username,
  
            timestamp: Date.now(),
  
            type: 'image',
  
            imageUrl,
  
            filename: payload.filename,
  
            size: payload.size,
  
            caption: payload.caption || ''
  
        };
  
    }
  
    async #processAudioMessage(user, payload) {
  
        const audioUrl = await this.uploadAudioToR2(payload.audio, payload.filename, payload.mimeType);
  
        return {
  
            id: crypto.randomUUID(),
  
            username: user.username,
  
            timestamp: Date.now(),
  
            type: 'audio',
  
            audioUrl,
  
            filename: payload.filename,
  
            size: payload.size,
  
        };
  
    }
  
  • #processTextMessage(user, payload) { ... }: 私有方法,用于创建文本消息对象。生成唯一 ID、记录用户名、时间戳和文本内容。

  • async #processImageMessage(user, payload) { ... }: 私有异步方法,用于处理图片消息。

    • const imageUrl = await this.uploadImageToR2(payload.image, payload.filename);: 调用 uploadImageToR2 方法将图片数据上传到 R2 存储桶,并获取图片 URL。

    • 返回包含图片 URL、文件名、大小和可选标题的图片消息对象。

  • async #processAudioMessage(user, payload) { ... }: 私有异步方法,用于处理音频消息。

    • const audioUrl = await this.uploadAudioToR2(payload.audio, payload.filename, payload.mimeType);: 调用 uploadAudioToR2 方法将音频数据上传到 R2 存储桶,并获取音频 URL。

    • 返回包含音频 URL、文件名、大小和 MIME 类型的音频消息对象。

  
    async handleDeleteMessage(payload) {
  
        await this.loadState();
  
        this.messages = this.messages.filter(m => m.id !== payload.id);
  
        this.broadcast({ type: MSG_TYPE_DELETE, payload: payload });
  
        await this.saveState();
  
    }
  
  • async handleDeleteMessage(payload) { ... }: 处理删除消息的请求。

    • await this.loadState();: 确保状态已加载。

    • this.messages = this.messages.filter(m => m.id !== payload.id);: 从 messages 数组中过滤掉指定 ID 的消息。

    • this.broadcast({ type: MSG_TYPE_DELETE, payload: payload });: 广播删除消息事件给所有客户端,通知它们更新 UI。

    • await this.saveState();: 将更新后的消息历史保存到持久化存储。

  
    async handleRename(user, payload) {
  
        await this.loadState();
  
        const oldUsername = user.username;
  
        const newUsername = payload.newUsername;
  
        if (oldUsername === newUsername) return;
  

  
        const socketsToUpdate = this.state.getWebSockets(oldUsername);
  
        for (const sock of socketsToUpdate) {
  
            this.state.setTags(sock, [newUsername]);
  
        }
  

  
        if (this.userStats.has(oldUsername)) {
  
            const stats = this.userStats.get(oldUsername);
  
            const existingNewStats = this.userStats.get(newUsername) || { messageCount: 0, totalOnlineDuration: 0 };
  
            existingNewStats.messageCount += stats.messageCount || 0;
  
            existingNewStats.totalOnlineDuration += stats.totalOnlineDuration || 0;
  
            if (stats.onlineSessions > 0) {
  
                existingNewStats.onlineSessions = (existingNewStats.onlineSessions || 0) + stats.onlineSessions;
  
                existingNewStats.currentSessionStart = stats.currentSessionStart;
  
            }
  
            this.userStats.set(newUsername, existingNewStats);
  
            this.userStats.delete(oldUsername);
  
        }
  

  
        this.messages.forEach(msg => {
  
            if (msg.username === oldUsername) msg.username = newUsername;
  
        });
  

  
        this.broadcastSystemState();
  
        // this.broadcast({ type: MSG_TYPE_HISTORY, payload: this.messages }); // Removed, as history is fetched via HTTP
  
        await this.saveState();
  
    }
  
  • async handleRename(user, payload) { ... }: 处理用户改名的请求。

    • await this.loadState();: 确保状态已加载。

    • const oldUsername = user.username; const newUsername = payload.newUsername;: 获取旧用户名和新用户名。

    • if (oldUsername === newUsername) return;: 如果新旧用户名相同,则不做任何操作。

    • const socketsToUpdate = this.state.getWebSockets(oldUsername); for (const sock of socketsToUpdate) { this.state.setTags(sock, [newUsername]); }: 获取所有与旧用户名关联的 WebSocket 连接,并更新它们的标签为新用户名。

    • if (this.userStats.has(oldUsername)) { ... }: 如果旧用户名存在于 userStats 中:

      • 将旧用户的统计数据(消息计数、在线时长、在线会话数)合并到新用户的统计数据中(如果新用户已存在,则累加;否则创建新条目)。

      • this.userStats.set(newUsername, existingNewStats);: 更新新用户的统计数据。

      • this.userStats.delete(oldUsername);: 删除旧用户的统计数据。

    • this.messages.forEach(msg => { if (msg.username === oldUsername) msg.username = newUsername; });: 遍历所有历史消息,将旧用户名替换为新用户名。

    • this.broadcastSystemState();: 广播系统状态,通知所有客户端用户列表已更新。

    • await this.saveState();: 将更新后的用户统计数据和消息历史保存到持久化存储。

  
    // --- 辅助方法 ---
  
    
  
    
  

  
    getWsByUsername(username) {
  
        const wss = this.state.getWebSockets(username);
  
        return wss.length > 0 ? wss[0] : null;
  
    }
  

  
    handleOffer(fromUser, payload) {
  
        const targetWs = this.getWsByUsername(payload.target);
  
        if (!targetWs) return console.warn(`用户 ${payload.target} 不在线,无法转发 offer`);
  
        this.sendMessage(targetWs, { type: MSG_TYPE_OFFER, payload: { from: fromUser.username, sdp: payload.sdp } });
  
    }
  

  
    handleAnswer(fromUser, payload) {
  
        const targetWs = this.getWsByUsername(payload.target);
  
        if (!targetWs) return console.warn(`用户 ${payload.target} 不在线,无法转发 answer`);
  
        this.sendMessage(targetWs, { type: MSG_TYPE_ANSWER, payload: { from: fromUser.username, sdp: payload.sdp } });
  
    }
  
    
  
    handleCandidate(fromUser, payload) {
  
        const targetWs = this.getWsByUsername(payload.target);
  
        if (!targetWs) return console.warn(`用户 ${payload.target} 不在线,无法转发 candidate`);
  
        this.sendMessage(targetWs, { type: MSG_TYPE_CANDIDATE, payload: { from: fromUser.username, candidate: payload.candidate } });
  
    }
  

  
    handleCallEnd(fromUser, payload) {
  
        const targetWs = this.getWsByUsername(payload.target);
  
        if (!targetWs) return console.warn(`用户 ${payload.target} 不在线,无法转发 call_end`);
  
        this.sendMessage(targetWs, { type: MSG_TYPE_CALL_END, payload: { from: fromUser.username } });
  
    }
  
  • getWsByUsername(username) { ... }: 辅助方法,根据用户名获取其对应的第一个 WebSocket 连接。

  • handleOffer(fromUser, payload) { ... }: 处理 WebRTC SDP Offer 消息。查找目标用户的 WebSocket,并将 Offer 转发给它。

  • handleAnswer(fromUser, payload) { ... }: 处理 WebRTC SDP Answer 消息。查找目标用户的 WebSocket,并将 Answer 转发给它。

  • handleCandidate(fromUser, payload) { ... }: 处理 WebRTC ICE Candidate 消息。查找目标用户的 WebSocket,并将 Candidate 转发给它。

  • handleCallEnd(fromUser, payload) { ... }: 处理 WebRTC 通话结束消息。查找目标用户的 WebSocket,并将结束通知转发给它。

  
    async uploadImageToR2(imageData, filename) {
  
        try {
  
            const base64Data = imageData.split(',')[1];
  
            if (!base64Data) throw new Error('无效的图片数据');
  
            const imageBuffer = Uint8Array.from(atob(base64Data), c => c.charCodeAt(0));
  
            const fileExtension = filename.split('.').pop() || 'jpg';
  
            const key = `chat-images/${Date.now()}-${crypto.randomUUID()}.${fileExtension}`;
  
            await this.env.R2_BUCKET.put(key, imageBuffer, {
  
                httpMetadata: { contentType: this.getContentType(fileExtension), cacheControl: 'public, max-age=31536000' },
  
            });
  
            return `https://pub-8dfbdda6df204465aae771b4c080140b.r2.dev/${key}`;
  
        } catch (error) {
  
            console.error('R2 上传失败:', error);
  
            throw new Error(`图片上传失败: ${error.message}`);
  
        }
  
    }
  

  
    getContentType(extension) {
  
        const contentTypes = { 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'png': 'image/png', 'gif': 'image/gif', 'webp': 'image/webp' };
  
        return contentTypes[extension.toLowerCase()] || 'image/jpeg';
  
    }
  

  
    async uploadAudioToR2(audioData, filename, mimeType) {
  
        try {
  
            const base64Data = audioData.split(',')[1];
  
            if (!base64Data) throw new Error('无效的音频数据');
  
            const audioBuffer = Uint8Array.from(atob(base64Data), c => c.charCodeAt(0));
  
            const fileExtension = filename.split('.').pop() || 'bin';
  
            const key = `chat-audio/${Date.now()}-${crypto.randomUUID()}.${fileExtension}`;
  
            await this.env.R2_BUCKET.put(key, audioBuffer, {
  
                httpMetadata: { contentType: mimeType || 'application/octet-stream', cacheControl: 'public, max-age=31536000' },
  
            });
  
            return `https://pub-8dfbdda6df204465aae771b4c080140b.r2.dev/${key}`;
  
        } catch (error) {
  
            console.error('R2 音频上传失败:', error);
  
            throw new Error(`音频上传失败: ${error.message}`);
  
        }
  
    }
  
  • async uploadImageToR2(imageData, filename) { ... }: 异步方法,将 Base64 编码的图片数据上传到 Cloudflare R2 存储桶。

    • const base64Data = imageData.split(',')[1];: 从 data:image/...;base64,... 格式中提取 Base64 数据部分。

    • const imageBuffer = Uint8Array.from(atob(base64Data), c => c.charCodeAt(0));: 使用 atob 解码 Base64 字符串为二进制数据,并转换为 Uint8Array

    • const key = chat-images/Date.now(){Date.now()}-{crypto.randomUUID()}.${fileExtension};: 生成一个唯一的 R2 对象键(文件名),包含时间戳和 UUID。

    • await this.env.R2_BUCKET.put(key, imageBuffer, { ... });: 使用 env.R2_BUCKET.put() 方法将图片数据上传到 R2。httpMetadata 设置了 Content-Type 和缓存控制。

    • return https://pub-8dfbdda6df204465aae771b4c080140b.r2.dev/${key}`;`: 返回 R2 对象的公共访问 URL。

    • try...catch: 捕获并处理上传错误。

  • getContentType(extension) { ... }: 辅助方法,根据文件扩展名返回对应的 MIME 类型。

  • async uploadAudioToR2(audioData, filename, mimeType) { ... }: 异步方法,将 Base64 编码的音频数据上传到 Cloudflare R2 存储桶。逻辑与 uploadImageToR2 类似,只是处理的是音频数据。

  
    sendMessage(ws, message) {
  
        try {
  
            ws.send(JSON.stringify(message));
  
        } catch (err) {
  
            console.error('Failed to send message:', err);
  
        }
  
    }
  

  
    broadcast(message) {
  
        for (const ws of this.state.getWebSockets()) {
  
            this.sendMessage(ws, message);
  
        }
  
    }
  

  
broadcastSystemState() {
  
    // 确保状态已加载,以防万一
  
    if (!this.userStats) return;
  

  
    // 1. 获取当前所有在线用户的名字 (基于WebSocket连接)
  
    const onlineUsernames = new Set();
  
    for (const ws of this.state.getWebSockets()) {
  
        const username = this.state.getTags(ws)[0];
  
        if (username) {
  
            onlineUsernames.add(username);
  
        }
  
    }
  

  
    // 2. 构建完整的用户列表,包含在线状态
  
    // 遍历所有已知用户(包括不活跃的),并标记其在线状态
  
    const userList = Array.from(this.userStats.keys()).map(username => {
  
        return { username, isOnline: onlineUsernames.has(username) };
  
    });
  
    
  
    // 3. 广播这个列表
  
    this.broadcast({
  
        type: MSG_TYPE_SYSTEM_STATE,
  
        payload: { users: userList } // 前端会根据这个列表来更新UI
  
    });
  
}
  
}
  
  • sendMessage(ws, message) { ... }: 辅助方法,用于向单个 WebSocket 连接发送 JSON 消息。包含 try...catch 确保发送失败时不会崩溃。

  • broadcast(message) { ... }: 辅助方法,用于向所有当前连接到此 Durable Object 实例的 WebSocket 客户端广播消息。它遍历 this.state.getWebSockets() 返回的所有 WebSocket 连接,并调用 sendMessage

  • broadcastSystemState() { ... }: 广播系统状态(主要是在线用户列表)给所有客户端。

    • if (!this.userStats) return;: 防御性检查,确保 userStats 已加载。

    • const onlineUsernames = new Set(); for (const ws of this.state.getWebSockets()) { ... }: 遍历所有当前活跃的 WebSocket 连接,从它们的标签中提取用户名,并存储在一个 Set 中,以获取唯一的在线用户列表。

    • const userList = Array.from(this.userStats.keys()).map(username => { ... });: 遍历 this.userStats 中所有已知用户的用户名(包括在线和离线的),为每个用户创建一个对象,包含其用户名和 isOnline 状态(通过检查其是否在 onlineUsernames Set 中)。

    • this.broadcast({ type: MSG_TYPE_SYSTEM_STATE, payload: { users: userList } });: 将包含完整用户列表(及其在线状态)的系统状态消息广播给所有客户端。


总结:

chatroom_do.js 文件是 Cloudflare Durable Object 的核心实现,它:

  1. 管理持久化状态:通过 loadStatesaveState 方法,将聊天消息 (messages) 和用户统计数据 (userStats) 在内存和 Durable Object 的持久化存储之间同步。特别处理了 Map 对象在存储时的序列化和反序列化问题。

  2. 处理 WebSocket 连接:作为 WebSocket 服务器,处理连接的建立 (fetchwebSocketOpen)、消息的接收 (webSocketMessage) 和连接的关闭 (webSocketClose)。

  3. 实现聊天室核心功能:包括发送/接收文本、图片、音频消息,删除消息,用户改名等。

  4. 维护用户统计:记录用户的消息数量、在线会话数和总在线时长。

  5. 支持 WebRTC 信令:转发 WebRTC 的 SDP Offer/Answer 和 ICE Candidate 消息,实现点对点通信的信令交换。

  6. 集成 R2 存储:提供将图片和音频文件上传到 Cloudflare R2 的功能。

  7. 广播机制:能够向所有连接的客户端广播消息和系统状态更新(如在线用户列表)。

  8. HTTP 接口:除了 WebSocket,还提供了一个 HTTP GET 接口 /history-messages 用于获取历史消息。

这个 Durable Object 实例为每个聊天室提供了独立的、持久化的状态和实时通信能力,并且能够从休眠中快速恢复,确保了聊天室的连续性和数据完整性。