Web API网络-实时通信方案
# Web API网络-实时通信方案
实时通信是现代Web应用的重要特性。本文将详细介绍WebSocket和Server-Sent Events(SSE)两种主流的实时通信方案,包括连接建立、消息传递、错误处理、心跳机制等实用技巧。
# 一、WebSocket基础
WebSocket提供了全双工通信通道,允许服务器主动向客户端推送数据。
# 1.1 WebSocket特点
- 全双工通信:客户端和服务器可以同时发送和接收消息
- 持久连接:建立连接后保持打开状态
- 低延迟:无需轮询,消息即时送达
- 二进制支持:可传输文本和二进制数据
- 协议升级:从HTTP升级到WebSocket协议
# 1.2 基本使用
// 创建WebSocket连接
const ws = new WebSocket('ws://localhost:8080');
// 或使用加密连接
// const ws = new WebSocket('wss://example.com');
// 连接打开
ws.addEventListener('open', function(event) {
console.log('WebSocket连接已建立');
// 发送消息
ws.send('Hello Server!');
ws.send(JSON.stringify({ type: 'greeting', message: 'Hello' }));
});
// 接收消息
ws.addEventListener('message', function(event) {
console.log('收到消息:', event.data);
// 如果是JSON数据
try {
const data = JSON.parse(event.data);
console.log('解析后的数据:', data);
} catch (e) {
console.log('不是JSON格式');
}
});
// 连接关闭
ws.addEventListener('close', function(event) {
console.log('WebSocket连接已关闭');
console.log('关闭代码:', event.code);
console.log('关闭原因:', event.reason);
console.log('是否正常关闭:', event.wasClean);
});
// 连接错误
ws.addEventListener('error', function(event) {
console.error('WebSocket错误:', event);
});
// 主动关闭连接
// ws.close();
// ws.close(1000, '正常关闭');
# 1.3 readyState状态
console.log(ws.readyState);
// 0: CONNECTING - 正在连接
// 1: OPEN - 连接已建立,可以通信
// 2: CLOSING - 连接正在关闭
// 3: CLOSED - 连接已关闭或无法打开
// 检查连接状态
function checkConnectionState() {
switch(ws.readyState) {
case WebSocket.CONNECTING:
console.log('正在连接...');
break;
case WebSocket.OPEN:
console.log('连接已建立');
break;
case WebSocket.CLOSING:
console.log('连接正在关闭...');
break;
case WebSocket.CLOSED:
console.log('连接已关闭');
break;
}
}
# 1.4 发送不同类型数据
const ws = new WebSocket('ws://localhost:8080');
ws.addEventListener('open', function() {
// 1. 发送文本
ws.send('Hello');
// 2. 发送JSON
ws.send(JSON.stringify({
type: 'message',
content: 'Hello World'
}));
// 3. 发送Blob
const blob = new Blob(['binary data'], { type: 'application/octet-stream' });
ws.send(blob);
// 4. 发送ArrayBuffer
const buffer = new ArrayBuffer(8);
const view = new Uint8Array(buffer);
view[0] = 1;
view[1] = 2;
ws.send(buffer);
// 5. 发送TypedArray
const uint8 = new Uint8Array([1, 2, 3, 4]);
ws.send(uint8);
});
// 接收不同类型数据
ws.addEventListener('message', function(event) {
// 文本数据
if (typeof event.data === 'string') {
console.log('文本消息:', event.data);
}
// Blob数据
if (event.data instanceof Blob) {
console.log('Blob消息:', event.data);
// 读取Blob
const reader = new FileReader();
reader.onload = function() {
console.log('Blob内容:', reader.result);
};
reader.readAsText(event.data);
}
// ArrayBuffer数据
if (event.data instanceof ArrayBuffer) {
console.log('ArrayBuffer消息:', event.data);
const view = new Uint8Array(event.data);
console.log('数据:', view);
}
});
// 设置接收数据格式
ws.binaryType = 'arraybuffer'; // 或 'blob'(默认)
# 二、WebSocket高级用法
# 2.1 封装WebSocket类
class WebSocketClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
reconnect: true, // 是否自动重连
reconnectInterval: 3000, // 重连间隔
reconnectAttempts: 5, // 最大重连次数
heartbeatInterval: 30000, // 心跳间隔
heartbeatMessage: 'ping', // 心跳消息
...options
};
this.ws = null;
this.reconnectCount = 0;
this.heartbeatTimer = null;
this.isManualClose = false;
this.messageHandlers = [];
this.openHandlers = [];
this.closeHandlers = [];
this.errorHandlers = [];
}
// 连接
connect() {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
console.log('WebSocket已连接');
return;
}
this.ws = new WebSocket(this.url);
this.ws.addEventListener('open', (event) => {
console.log('WebSocket连接成功');
this.reconnectCount = 0;
this.startHeartbeat();
this.openHandlers.forEach(handler => handler(event));
});
this.ws.addEventListener('message', (event) => {
// 处理心跳响应
if (event.data === 'pong') {
return;
}
this.messageHandlers.forEach(handler => handler(event));
});
this.ws.addEventListener('close', (event) => {
console.log('WebSocket连接关闭');
this.stopHeartbeat();
this.closeHandlers.forEach(handler => handler(event));
// 自动重连
if (!this.isManualClose && this.options.reconnect) {
this.reconnect();
}
});
this.ws.addEventListener('error', (event) => {
console.error('WebSocket错误');
this.errorHandlers.forEach(handler => handler(event));
});
}
// 重连
reconnect() {
if (this.reconnectCount >= this.options.reconnectAttempts) {
console.log('达到最大重连次数');
return;
}
this.reconnectCount++;
console.log(`尝试重连 (${this.reconnectCount}/${this.options.reconnectAttempts})...`);
setTimeout(() => {
this.connect();
}, this.options.reconnectInterval);
}
// 发送消息
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
if (typeof data === 'object') {
this.ws.send(JSON.stringify(data));
} else {
this.ws.send(data);
}
} else {
console.error('WebSocket未连接');
}
}
// 关闭连接
close() {
this.isManualClose = true;
this.stopHeartbeat();
if (this.ws) {
this.ws.close();
}
}
// 心跳检测
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(this.options.heartbeatMessage);
}
}, this.options.heartbeatInterval);
}
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
// 事件监听
onOpen(handler) {
this.openHandlers.push(handler);
}
onMessage(handler) {
this.messageHandlers.push(handler);
}
onClose(handler) {
this.closeHandlers.push(handler);
}
onError(handler) {
this.errorHandlers.push(handler);
}
}
// 使用示例
const client = new WebSocketClient('ws://localhost:8080', {
reconnect: true,
reconnectInterval: 3000,
reconnectAttempts: 5,
heartbeatInterval: 30000
});
client.onOpen(() => {
console.log('连接建立');
client.send({ type: 'auth', token: 'abc123' });
});
client.onMessage((event) => {
const data = JSON.parse(event.data);
console.log('收到消息:', data);
});
client.onClose(() => {
console.log('连接关闭');
});
client.onError((error) => {
console.error('连接错误:', error);
});
client.connect();
# 2.2 消息队列
class WebSocketWithQueue extends WebSocketClient {
constructor(url, options) {
super(url, options);
this.messageQueue = [];
}
connect() {
super.connect();
// 连接成功后发送队列中的消息
this.onOpen(() => {
this.flushQueue();
});
}
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
super.send(data);
} else {
// 连接未建立,加入队列
this.messageQueue.push(data);
}
}
flushQueue() {
while (this.messageQueue.length > 0) {
const data = this.messageQueue.shift();
super.send(data);
}
}
}
# 2.3 消息确认机制
class WebSocketWithAck extends WebSocketClient {
constructor(url, options) {
super(url, options);
this.pendingMessages = new Map();
this.messageId = 0;
}
sendWithAck(data, timeout = 5000) {
return new Promise((resolve, reject) => {
const id = ++this.messageId;
const message = {
id: id,
data: data
};
this.send(message);
const timer = setTimeout(() => {
this.pendingMessages.delete(id);
reject(new Error('消息发送超时'));
}, timeout);
this.pendingMessages.set(id, {
resolve,
reject,
timer
});
});
}
handleAck(messageId, success, error) {
const pending = this.pendingMessages.get(messageId);
if (pending) {
clearTimeout(pending.timer);
this.pendingMessages.delete(messageId);
if (success) {
pending.resolve();
} else {
pending.reject(new Error(error || '消息处理失败'));
}
}
}
}
// 使用示例
const wsWithAck = new WebSocketWithAck('ws://localhost:8080');
wsWithAck.onMessage((event) => {
const data = JSON.parse(event.data);
if (data.type === 'ack') {
wsWithAck.handleAck(data.messageId, data.success, data.error);
}
});
// 发送并等待确认
try {
await wsWithAck.sendWithAck({ type: 'order', product: 'book' });
console.log('消息发送成功');
} catch (error) {
console.error('消息发送失败:', error.message);
}
# 三、Server-Sent Events(SSE)
SSE提供了服务器向客户端推送消息的单向通信机制。
# 3.1 SSE特点
- 单向通信:只能服务器推送到客户端
- 自动重连:连接断开后自动重连
- 文本数据:只支持文本数据(通常是JSON)
- 基于HTTP:使用普通HTTP协议
- 事件类型:支持自定义事件类型
# 3.2 基本使用
// 创建EventSource连接
const eventSource = new EventSource('/api/events');
// 或带查询参数
// const eventSource = new EventSource('/api/events?token=abc123');
// 监听消息(默认事件)
eventSource.addEventListener('message', function(event) {
console.log('收到消息:', event.data);
// 解析JSON
const data = JSON.parse(event.data);
console.log(data);
});
// 监听自定义事件
eventSource.addEventListener('notification', function(event) {
console.log('收到通知:', event.data);
});
eventSource.addEventListener('update', function(event) {
console.log('收到更新:', event.data);
});
// 连接打开
eventSource.addEventListener('open', function(event) {
console.log('SSE连接已建立');
});
// 连接错误
eventSource.addEventListener('error', function(event) {
if (event.target.readyState === EventSource.CLOSED) {
console.log('SSE连接已关闭');
} else {
console.error('SSE连接错误');
}
});
// 关闭连接
// eventSource.close();
# 3.3 readyState状态
console.log(eventSource.readyState);
// 0: CONNECTING - 正在连接
// 1: OPEN - 连接已建立
// 2: CLOSED - 连接已关闭
// 检查连接状态
function checkSSEState() {
switch(eventSource.readyState) {
case EventSource.CONNECTING:
console.log('正在连接...');
break;
case EventSource.OPEN:
console.log('连接已建立');
break;
case EventSource.CLOSED:
console.log('连接已关闭');
break;
}
}
# 3.4 服务端实现(Node.js示例)
// Express服务器端
app.get('/api/events', (req, res) => {
// 设置SSE响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 发送消息
const sendEvent = (data, event = 'message', id = null) => {
if (id) {
res.write(`id: ${id}\n`);
}
if (event !== 'message') {
res.write(`event: ${event}\n`);
}
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
// 发送初始消息
sendEvent({ message: '连接成功' });
// 定时发送消息
const interval = setInterval(() => {
sendEvent({
time: new Date().toISOString(),
count: Math.random()
}, 'update');
}, 5000);
// 发送自定义事件
setTimeout(() => {
sendEvent({
title: '新通知',
content: '您有一条新消息'
}, 'notification', Date.now());
}, 10000);
// 客户端断开连接
req.on('close', () => {
clearInterval(interval);
console.log('客户端断开连接');
});
});
# 3.5 封装EventSource类
class SSEClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
reconnect: true,
reconnectInterval: 3000,
withCredentials: false,
...options
};
this.eventSource = null;
this.eventHandlers = new Map();
}
connect() {
if (this.eventSource) {
this.eventSource.close();
}
this.eventSource = new EventSource(this.url, {
withCredentials: this.options.withCredentials
});
// 默认消息事件
this.eventSource.addEventListener('message', (event) => {
this.handleEvent('message', event);
});
// 连接打开
this.eventSource.addEventListener('open', (event) => {
console.log('SSE连接成功');
this.handleEvent('open', event);
});
// 连接错误
this.eventSource.addEventListener('error', (event) => {
console.error('SSE连接错误');
this.handleEvent('error', event);
if (this.options.reconnect &&
this.eventSource.readyState === EventSource.CLOSED) {
this.reconnect();
}
});
// 注册已有的事件监听器
this.eventHandlers.forEach((handlers, eventType) => {
if (eventType !== 'message' && eventType !== 'open' && eventType !== 'error') {
this.eventSource.addEventListener(eventType, (event) => {
this.handleEvent(eventType, event);
});
}
});
}
reconnect() {
console.log('尝试重连...');
setTimeout(() => {
this.connect();
}, this.options.reconnectInterval);
}
on(eventType, handler) {
if (!this.eventHandlers.has(eventType)) {
this.eventHandlers.set(eventType, []);
// 为新事件类型添加监听器
if (this.eventSource &&
eventType !== 'message' &&
eventType !== 'open' &&
eventType !== 'error') {
this.eventSource.addEventListener(eventType, (event) => {
this.handleEvent(eventType, event);
});
}
}
this.eventHandlers.get(eventType).push(handler);
}
off(eventType, handler) {
if (this.eventHandlers.has(eventType)) {
const handlers = this.eventHandlers.get(eventType);
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
}
handleEvent(eventType, event) {
if (this.eventHandlers.has(eventType)) {
this.eventHandlers.get(eventType).forEach(handler => {
try {
handler(event);
} catch (error) {
console.error(`事件处理错误 (${eventType}):`, error);
}
});
}
}
close() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
}
// 使用示例
const sseClient = new SSEClient('/api/events', {
reconnect: true,
reconnectInterval: 3000
});
sseClient.on('open', () => {
console.log('连接建立');
});
sseClient.on('message', (event) => {
const data = JSON.parse(event.data);
console.log('收到消息:', data);
});
sseClient.on('notification', (event) => {
const data = JSON.parse(event.data);
console.log('收到通知:', data);
showNotification(data);
});
sseClient.on('update', (event) => {
const data = JSON.parse(event.data);
console.log('收到更新:', data);
updateUI(data);
});
sseClient.on('error', () => {
console.error('连接错误');
});
sseClient.connect();
# 四、WebSocket vs SSE对比
# 4.1 对比表格
| 特性 | WebSocket | Server-Sent Events |
|---|---|---|
| 通信方向 | 双向 | 单向(服务器→客户端) |
| 协议 | WebSocket (ws://, wss://) | HTTP/HTTPS |
| 数据格式 | 文本、二进制 | 仅文本 |
| 自动重连 | 需手动实现 | 自动重连 |
| 浏览器兼容性 | 较好(IE10+) | 较好(IE不支持) |
| 服务端实现 | 需要WebSocket服务器 | 普通HTTP服务器即可 |
| 代理支持 | 可能被阻止 | 良好 |
| 消息ID | 需手动实现 | 内置支持 |
| 适用场景 | 聊天、游戏、协作编辑 | 通知、实时数据推送、日志流 |
# 4.2 选择建议
// 使用WebSocket:
// - 需要双向实时通信
// - 需要传输二进制数据
// - 高频率消息交互
// - 实时游戏、聊天应用
// 使用SSE:
// - 只需服务器推送
// - 简单的实时更新
// - 服务器无需特殊支持
// - 实时通知、股票行情、日志监控
# 五、综合示例
# 六、最佳实践
# 6.1 WebSocket最佳实践
- 实现自动重连:网络不稳定时自动重新连接
- 心跳检测:定期发送心跳消息保持连接
- 消息队列:连接断开时缓存消息
- 错误处理:捕获并处理所有错误
- 资源清理:组件销毁时关闭连接
# 6.2 SSE最佳实践
- 利用自动重连:SSE自带重连机制
- 设置合理的重试时间:服务端通过
retry字段控制 - 使用事件类型:区分不同类型的消息
- 处理错误状态:监听
error事件 - 正确关闭连接:不需要时调用
close()
# 6.3 通用最佳实践
- 使用HTTPS/WSS:加密传输保证安全
- 身份验证:通过Token或Cookie验证身份
- 消息压缩:大数据量时使用压缩
- 限流控制:避免消息过载
- 监控和日志:记录连接状态和错误
# 七、总结
实时通信是现代Web应用的关键技术:
- WebSocket:全双工通信,适合聊天、游戏、协作
- SSE:单向推送,适合通知、实时数据、日志流
- 选择建议:根据需求选择合适的方案
- 最佳实践:重连、心跳、错误处理、资源清理
掌握实时通信技术,能够构建高效、实时的交互应用。
祝你变得更强!
编辑 (opens new window)
上次更新: 2025/11/28