手搓一个健壮的UniApp WebSocket通信库

这是一个为 UniApp 开发的 WebSocket JavaScript 库,它封装了原生 uni.connectSocket 的 API,提供了更方便、更健壮的连接管理、消息处理、重连机制和事件监听。

文件名:socket.js

/**
 * UniApp WebSocket 封装库
 * 提供连接管理、自动重连、消息队列、事件订阅等功能
 */

class UniSocket {
    constructor(options = {}) {
        // 默认配置
        this.config = {
            url: '', // 必填:WebSocket 服务器地址
            protocols: [], // 可选:子协议数组
            connectTimeout: 10000, // 连接超时时间 (毫秒)
            reconnectInterval: 3000, // 重连间隔时间 (毫秒)
            maxReconnectAttempts: 5, // 最大重连尝试次数,0 表示无限次
            heartbeatInterval: 30000, // 心跳间隔时间 (毫秒),0 表示不发送心跳
            heartbeatMessage: 'ping', // 心跳消息内容
            debug: false, // 是否开启调试日志
            ...options // 用户自定义配置覆盖默认值
        };

        // 校验必要参数
        if (!this.config.url) {
            throw new Error('WebSocket URL is required!');
        }

        // 内部状态
        this.socketTask = null; // socketTask 实例
        this.isConnected = false; // 连接状态
        this.isConnecting = false; // 正在连接中
        this.reconnectAttempts = 0; // 当前重连尝试次数
        this.shouldReconnect = true; // 是否应该尝试重连
        this.heartbeatTimer = null; // 心跳定时器
        this.messageQueue = []; // 消息队列 (用于连接未建立时暂存消息)
        this.eventListeners = {}; // 事件监听器 { eventName: [callback1, callback2] }

        this._init();
    }

    /**
     * 初始化
     * @private
     */
    _init() {
        this._log('Socket 初始化');
        this._connect();
    }

    /**
     * 建立 WebSocket 连接
     * @private
     */
    _connect() {
        if (this.isConnecting || this.isConnected) {
            this._log('正在连接或已连接,跳过');
            return;
        }

        this.isConnecting = true;
        this._log(`正在连接: ${this.config.url}`);

        // 创建 socketTask
        this.socketTask = uni.connectSocket({
            url: this.config.url,
            protocols: this.config.protocols,
            success: (res) => {
                this._log('connectSocket success:', res);
            },
            fail: (err) => {
                this._log('connectSocket fail:', err);
                this._handleConnectionError(err);
            }
        });

        // 监听打开事件
        this.socketTask.onOpen((res) => {
            this._log('WebSocket 连接已打开', res);
            this.isConnected = true;
            this.isConnecting = false;
            this.reconnectAttempts = 0; // 连接成功,重置重连计数
            this._startHeartbeat(); // 启动心跳
            this._notifyEvent('open', res);

            // 连接成功后,发送队列中的消息
            this._flushMessageQueue();
        });

        // 监听消息事件
        this.socketTask.onMessage((res) => {
            this._log('收到消息:', res);
            // 尝试解析 JSON,如果失败则保留原始字符串
            let data;
            try {
                data = JSON.parse(res.data);
            } catch (e) {
                data = res.data;
            }
            this._notifyEvent('message', data);
        });

        // 监听关闭事件
        this.socketTask.onClose((res) => {
            this._log('WebSocket 连接已关闭', res);
            this.isConnected = false;
            this.isConnecting = false;
            this._stopHeartbeat(); // 停止心跳
            this._notifyEvent('close', res);

            // 根据情况判断是否需要重连
            if (this.shouldReconnect && (this.config.maxReconnectAttempts === 0 || this.reconnectAttempts < this.config.maxReconnectAttempts)) {
                this._scheduleReconnect();
            } else {
                this._log('达到最大重连次数或不应重连,停止重连');
            }
        });

        // 监听错误事件
        this.socketTask.onError((err) => {
            this._log('WebSocket 错误:', err);
            this._handleConnectionError(err);
            this._notifyEvent('error', err);
        });

        // 处理连接超时
        if (this.config.connectTimeout > 0) {
            setTimeout(() => {
                if (this.isConnecting && !this.isConnected) {
                    this._log('连接超时');
                    this._handleConnectionError(new Error('Connection timeout'));
                    // 注意:这里不能直接调用 socketTask.close(),因为它可能还没创建成功
                    // 超时逻辑主要在 onOpen 未触发时由外部处理
                }
            }, this.config.connectTimeout);
        }
    }

    /**
     * 处理连接错误
     * @private
     * @param {Object|Error} err - 错误信息
     */
    _handleConnectionError(err) {
        this.isConnecting = false;
        // 如果连接未建立就出错,可能需要重连
        if (!this.isConnected) {
            if (this.shouldReconnect && (this.config.maxReconnectAttempts === 0 || this.reconnectAttempts < this.config.maxReconnectAttempts)) {
                this._scheduleReconnect();
            }
        }
        // 如果已经连接过,onClose 会处理重连
    }

    /**
     * 安排重连
     * @private
     */
    _scheduleReconnect() {
        this.reconnectAttempts++;
        this._log(`准备第 ${this.reconnectAttempts} 次重连,间隔 ${this.config.reconnectInterval}ms`);

        setTimeout(() => {
            this._connect();
        }, this.config.reconnectInterval);
    }

    /**
     * 启动心跳
     * @private
     */
    _startHeartbeat() {
        if (this.config.heartbeatInterval > 0 && this.heartbeatTimer === null) {
            this.heartbeatTimer = setInterval(() => {
                if (this.isConnected) {
                    this.send(this.config.heartbeatMessage);
                    this._log('发送心跳:', this.config.heartbeatMessage);
                }
            }, this.config.heartbeatInterval);
        }
    }

    /**
     * 停止心跳
     * @private
     */
    _stopHeartbeat() {
        if (this.heartbeatTimer !== null) {
            clearInterval(this.heartbeatTimer);
            this.heartbeatTimer = null;
            this._log('心跳已停止');
        }
    }

    /**
     * 清空并发送消息队列中的消息
     * @private
     */
    _flushMessageQueue() {
        if (this.messageQueue.length > 0 && this.isConnected) {
            this._log(`发送队列中的 ${this.messageQueue.length} 条消息`);
            this.messageQueue.forEach((msg) => {
                this.send(msg);
            });
            this.messageQueue = [];
        }
    }

    /**
     * 触发事件监听器
     * @private
     * @param {String} eventName - 事件名称
     * @param {*} data - 传递给监听器的数据
     */
    _notifyEvent(eventName, data) {
        if (this.eventListeners[eventName]) {
            this.eventListeners[eventName].forEach(callback => {
                try {
                    callback(data);
                } catch (error) {
                    this._log(`事件 "${eventName}" 的监听器执行出错:`, error);
                }
            });
        }
    }

    /**
     * 发送消息
     * @public
     * @param {String|Object} message - 要发送的消息,Object 会自动转为 JSON 字符串
     * @returns {Boolean} - 发送是否成功 (仅表示调用成功,不保证送达)
     */
    send(message) {
        if (!this.isConnected) {
            // 连接未建立,将消息加入队列
            this.messageQueue.push(message);
            this._log('连接未建立,消息已加入队列:', message);
            return false;
        }

        // 如果是对象,序列化为 JSON
        const msgToSend = typeof message === 'object' ? JSON.stringify(message) : message;

        try {
            this.socketTask.send({
                data: msgToSend,
                success: (res) => {
                    this._log('消息发送成功:', msgToSend);
                },
                fail: (err) => {
                    this._log('消息发送失败:', err);
                    // 发送失败,可以考虑加入重发队列或通知上层
                    this._notifyEvent('sendFail', { message: msgToSend, error: err });
                }
            });
            return true;
        } catch (error) {
            this._log('发送消息时发生异常:', error);
            return false;
        }
    }

    /**
     * 监听事件
     * @public
     * @param {String} eventName - 事件名称 ('open', 'close', 'message', 'error', 'sendFail')
     * @param {Function} callback - 回调函数
     */
    on(eventName, callback) {
        if (typeof callback !== 'function') {
            this._log('监听器必须是函数');
            return;
        }
        if (!this.eventListeners[eventName]) {
            this.eventListeners[eventName] = [];
        }
        this.eventListeners[eventName].push(callback);
    }

    /**
     * 取消监听事件
     * @public
     * @param {String} eventName - 事件名称
     * @param {Function} [callback] - 要移除的特定回调函数,不传则移除该事件所有监听器
     */
    off(eventName, callback) {
        if (!this.eventListeners[eventName]) return;

        if (callback) {
            const index = this.eventListeners[eventName].indexOf(callback);
            if (index > -1) {
                this.eventListeners[eventName].splice(index, 1);
            }
        } else {
            delete this.eventListeners[eventName];
        }
    }

    /**
     * 只监听一次事件
     * @public
     * @param {String} eventName - 事件名称
     * @param {Function} callback - 回调函数
     */
    once(eventName, callback) {
        const onceWrapper = (data) => {
            callback(data);
            this.off(eventName, onceWrapper);
        };
        this.on(eventName, onceWrapper);
    }

    /**
     * 断开连接
     * @public
     * @param {Number} [code=1000] - 断开状态码
     * @param {String} [reason] - 断开原因
     */
    close(code = 1000, reason = '') {
        this.shouldReconnect = false; // 明确关闭,不应再重连
        this._stopHeartbeat();

        if (this.socketTask) {
            this.socketTask.close({
                code: code,
                reason: reason,
                success: (res) => {
                    this._log('WebSocket 断开连接成功', res);
                },
                fail: (err) => {
                    this._log('断开连接失败:', err);
                }
            });
        } else {
            this._log('socketTask 不存在,无法断开');
        }
    }

    /**
     * 重新连接 (可用于手动触发重连)
     * @public
     */
    reconnect() {
        this.shouldReconnect = true;
        // 如果当前有连接,先关闭它
        if (this.isConnected || this.isConnecting) {
            this.close();
        }
        // 等待关闭完成或直接尝试连接
        setTimeout(() => {
            this._connect();
        }, 100); // 给关闭一点时间
    }

    /**
     * 获取当前连接状态
     * @public
     * @returns {Object} - 状态信息
     */
    getStatus() {
        return {
            isConnected: this.isConnected,
            isConnecting: this.isConnecting,
            reconnectAttempts: this.reconnectAttempts,
            messageQueueLength: this.messageQueue.length
        };
    }

    /**
     * 输出日志 (仅在 debug 模式下)
     * @private
     * @param {...*} args - 日志内容
     */
    _log(...args) {
        if (this.config.debug) {
            console.log('[UniSocket]', ...args);
        }
    }

    /**
     * 销毁实例 (清理资源)
     * @public
     */
    destroy() {
        this.close();
        this.shouldReconnect = false;
        this._stopHeartbeat();
        this.eventListeners = {};
        this.messageQueue = [];
        this.socketTask = null;
        this._log('Socket 实例已销毁');
    }
}

// 导出类 (适用于模块化环境)
if (typeof module !== 'undefined' && module.exports) {
    module.exports = UniSocket;
}
// 如果在浏览器环境且需要全局变量 (UniApp 中通常用 import)
// window.UniSocket = UniSocket; // 不推荐在 UniApp 中直接挂全局

export default UniSocket; // 推荐的 ES6 导出方式

如何在 UniApp 项目中使用这个库:

  1. 创建文件:在你的 UniApp 项目中(例如 utils/ 目录下)创建一个文件 socket.js,并将上面的代码粘贴进去。
  2. 在页面或组件中引入和使用
// pages/index/index.vue 或你的组件中

// 引入 Socket 库
import UniSocket from '@/utils/socket.js'; // 路径根据你的实际位置调整

export default {
    data() {
        return {
            socket: null, // 存储 socket 实例
            messages: [], // 存储收到的消息
            status: '未连接' // 连接状态
        };
    },

    onLoad() {
        // 初始化 WebSocket 连接
        this.initSocket();
    },

    onUnload() {
        // 页面卸载时,断开连接或销毁实例
        if (this.socket) {
            // 方式一:断开连接,但保留实例 (如果需要重连)
            // this.socket.close();

            // 方式二:彻底销毁实例 (推荐在页面销毁时)
            this.socket.destroy();
            this.socket = null;
        }
    },

    methods: {
        initSocket() {
            // 创建 socket 实例
            this.socket = new UniSocket({
                url: 'wss://your-websocket-server.com/path', // 替换成你的 WebSocket 服务器地址
                // protocols: ['protocol1', 'protocol2'], // 可选
                connectTimeout: 10000,
                reconnectInterval: 3000,
                maxReconnectAttempts: 5,
                heartbeatInterval: 30000,
                heartbeatMessage: 'ping', // 或者 { type: 'ping' }
                debug: true // 开发时开启,上线前关闭
            });

            // 监听事件
            this.socket.on('open', (res) => {
                console.log('连接成功:', res);
                this.status = '已连接';
                // 可以在这里发送登录消息等
                // this.socket.send({ type: 'login', token: 'xxx' });
            });

            this.socket.on('message', (data) => {
                console.log('收到消息:', data);
                // 更新消息列表
                this.messages.push(data);
                // 视图更新可能需要 nextTick 或确保数据响应式
                this.$nextTick(() => {
                    // 滚动到底部等操作
                });
            });

            this.socket.on('close', (res) => {
                console.log('连接关闭:', res);
                this.status = '已关闭';
                // 可以提示用户
            });

            this.socket.on('error', (err) => {
                console.error('连接错误:', err);
                // 可以提示用户
            });

            // 监听发送失败 (可选)
            this.socket.on('sendFail', ({ message, error }) => {
                console.warn('消息发送失败:', message, error);
                // 可以尝试重发或记录
            });
        },

        // 发送消息
        sendMessage() {
            const msg = 'Hello from UniApp!';
            // 或者发送对象
            // const msg = { type: 'chat', content: 'Hello!' };
            if (this.socket) {
                const success = this.socket.send(msg);
                if (!success) {
                    // 发送失败,可能连接未建立,消息已在队列中
                    console.log('消息已加入发送队列');
                }
            }
        },

        // 手动断开连接
        disconnect() {
            if (this.socket) {
                this.socket.close();
            }
        },

        // 手动重连
        manualReconnect() {
            if (this.socket) {
                this.socket.reconnect();
            }
        },

        // 获取状态
        getStatus() {
            if (this.socket) {
                const status = this.socket.getStatus();
                console.log('Socket Status:', status);
                return status;
            }
            return null;
        }
    }
};
  1. :支持配置重连间隔和最大重连次数,连接断开后自动尝试重连。
  2. 消息队列:在连接未建立或断开时,调用 send 的消息会被暂存到队列中,连接成功后自动发送。
  3. 心跳机制:可配置心跳间隔和消息,维持长连接,检测连接状态。
  4. 事件系统:提供 onoffonce 方法订阅和取消订阅 openclosemessageerrorsendFail 等事件。
  5. 连接状态管理:内部维护连接状态 (isConnectedisConnecting),并提供 getStatus 方法查询。
  6. 错误处理:处理连接超时、连接错误、发送失败等情况。
  7. 调试模式:通过 debug: true 开启详细日志输出。
  8. 资源清理:提供 destroy 方法彻底清理定时器和事件监听器,防止内存泄漏。

注意事项:

  • HTTPS/WSS:在真机上,WebSocket 地址通常需要是 wss:// (安全的) 协议,并且域名需要在小程序/APP的后台配置中添加到 socket 合法域名列表。
  • 平台差异:虽然 uni.connectSocket 是跨平台的,但不同平台(H5、小程序、App)的底层实现和限制可能略有差异,建议充分测试。
  • 性能:心跳和重连频率需要根据实际业务需求和服务器承受能力合理设置。
  • 消息格式:库会自动将 Object 类型的消息 JSON.stringify 后发送,接收时会尝试 JSON.parse。确保你的服务器和客户端消息格式兼容。
  • 并发:这个库是单例设计,一个实例管理一个连接。如果需要多个连接,需要创建多个 UniSocket 实例。

这个库应该能满足你在 UniApp 中使用 WebSocket 接收实时消息的基本需求。你可以根据具体项目需求进一步修改和扩展它。

+1

发表回复