第459集WebSocket百万级别直播评论系统实战
|字数总计:5.4k|阅读时长:26分钟|阅读量:
WebSocket百万级别直播评论系统实战
1. 概述
1.1 直播评论系统挑战
直播评论系统是实时性要求极高的应用场景,需要支持:
- 百万级并发:同时在线用户百万+
- 实时性:评论秒级同步到所有用户
- 高可用:7×24小时稳定运行
- 数据持久化:评论数据可靠存储
- 带宽优化:减少带宽消耗
- 页面稳定:前端流畅不卡顿
1.2 WebSocket技术优势
WebSocket是HTML5提供的全双工通信协议,相比HTTP轮询具有以下优势:
- 低延迟:建立连接后实时通信
- 低开销:无需重复建立连接
- 全双工:客户端和服务器可以同时发送数据
- 协议升级:基于HTTP协议升级
1.3 本文内容结构
本文将从以下几个方面全面解析WebSocket百万级直播评论系统:
- WebSocket基础:协议原理、连接建立、消息格式
- 系统架构设计:整体架构、组件选型
- 高可用实现:负载均衡、服务集群、故障转移
- 数据存储:MySQL、Redis、消息队列
- 实时同步:消息广播、房间管理
- 性能优化:带宽优化、连接管理、消息压缩
- 前端优化:页面稳定性、消息渲染、防抖节流
- 监控告警:性能监控、告警机制
2. WebSocket基础
2.1 WebSocket协议
2.1.1 协议特点
WebSocket协议:
- 基于TCP协议
- 全双工通信
- 低延迟、低开销
- 支持二进制和文本消息
2.1.2 连接建立
握手过程:
1 2 3
| 客户端 → 服务器:HTTP Upgrade请求 服务器 → 客户端:HTTP 101 Switching Protocols 连接建立,开始WebSocket通信
|
客户端请求:
1 2 3 4 5 6
| GET /chat HTTP/1.1 Host: server.example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== Sec-WebSocket-Version: 13
|
服务器响应:
1 2 3 4
| HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
|
2.1.3 消息格式
WebSocket帧格式:
- FIN:是否最后一帧
- Opcode:操作码(文本、二进制、关闭等)
- Mask:是否掩码
- Payload Length:数据长度
- Payload Data:实际数据
2.2 WebSocket API
2.2.1 客户端API
JavaScript API:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| const ws = new WebSocket('ws://localhost:8080/chat');
ws.onopen = function(event) { console.log('WebSocket连接已建立'); };
ws.onmessage = function(event) { console.log('收到消息:', event.data); };
ws.onclose = function(event) { console.log('WebSocket连接已关闭'); };
ws.onerror = function(error) { console.error('WebSocket错误:', error); };
ws.send('Hello Server');
ws.close();
|
2.2.2 服务器端API
Java Spring WebSocket:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(new ChatWebSocketHandler(), "/chat") .setAllowedOrigins("*"); } }
@Component public class ChatWebSocketHandler extends TextWebSocketHandler { @Override public void afterConnectionEstablished(WebSocketSession session) { } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) { } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { } }
|
2.3 WebSocket vs HTTP轮询
2.3.1 性能对比
| 特性 |
WebSocket |
HTTP轮询 |
| 延迟 |
毫秒级 |
秒级 |
| 开销 |
低(连接复用) |
高(频繁请求) |
| 服务器压力 |
低 |
高 |
| 带宽消耗 |
低 |
高 |
| 实时性 |
高 |
低 |
2.3.2 适用场景
WebSocket适用:
HTTP轮询适用:
3. 系统架构设计
3.1 整体架构
3.1.1 架构图
1 2 3 4 5 6 7 8 9 10 11
| 百万用户 ↓ Nginx(负载均衡 + WebSocket代理) ↓ WebSocket服务器集群(10-20台) ↓ ├──→ Redis(房间管理、消息缓存) ├──→ Kafka(消息队列、异步处理) └──→ MySQL(数据持久化) ↓ 监控告警系统
|
3.1.2 架构说明
接入层:
- Nginx负载均衡
- WebSocket代理
- SSL终端
服务层:
- WebSocket服务器集群
- 房间管理服务
- 消息处理服务
数据层:
- Redis:房间信息、在线用户、消息缓存
- Kafka:消息队列、异步处理
- MySQL:评论数据持久化
监控层:
3.2 组件选型
3.2.1 WebSocket服务器
选型:
- Spring WebSocket:Java生态,易于集成
- Netty:高性能,适合高并发
- Node.js + Socket.io:轻量级,开发快速
推荐:Netty(百万级并发)
3.2.2 消息中间件
选型:
- Redis Pub/Sub:轻量级,低延迟
- Kafka:高吞吐,持久化
- RabbitMQ:功能丰富,可靠性高
推荐:Redis Pub/Sub(实时)+ Kafka(持久化)
3.2.3 数据存储
选型:
- MySQL:评论数据持久化
- Redis:热点数据缓存、房间管理
- MongoDB:可选,适合非结构化数据
推荐:MySQL + Redis
3.3 服务器规划
3.3.1 服务器配置
最低成本配置(支持百万级并发):
| 服务器 |
配置 |
数量 |
用途 |
| Nginx |
4核8G |
2台(主备) |
负载均衡 |
| WebSocket服务器 |
8核16G |
10-20台 |
WebSocket服务 |
| Redis集群 |
4核16G |
6台(3主3从) |
缓存、Pub/Sub |
| Kafka集群 |
4核16G |
3台 |
消息队列 |
| MySQL |
8核32G |
2台(主从) |
数据持久化 |
| 监控服务器 |
4核8G |
1台 |
监控告警 |
总成本:约25-30台服务器
3.3.2 性能估算
单台WebSocket服务器:
- 8核16G服务器
- 预计支持5-10万并发连接
- 10-20台服务器可支持100-200万并发
带宽估算:
- 每条评论约100字节
- 100万用户,每秒1000条评论
- 下行带宽:1000 × 100 × 100万 = 100GB/s(需要CDN)
4. 高可用实现
4.1 Nginx负载均衡
4.1.1 WebSocket代理配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
upstream websocket_backend { least_conn; server 192.168.1.101:8080 weight=1 max_fails=3 fail_timeout=30s; server 192.168.1.102:8080 weight=1 max_fails=3 fail_timeout=30s; server 192.168.1.103:8080 weight=1 max_fails=3 fail_timeout=30s; }
server { listen 80; server_name chat.example.com; location /chat { proxy_pass http://websocket_backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_connect_timeout 7d; proxy_send_timeout 7d; proxy_read_timeout 7d; } location /health { access_log off; return 200 "healthy\n"; } }
|
4.1.2 SSL配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| server { listen 443 ssl; server_name chat.example.com; ssl_certificate /etc/nginx/ssl/chat.example.com.crt; ssl_certificate_key /etc/nginx/ssl/chat.example.com.key; ssl_protocols TLSv1.2 TLSv1.3; ssl_ciphers HIGH:!aNULL:!MD5; ssl_prefer_server_ciphers on; location /chat { proxy_pass http://websocket_backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; } }
|
4.2 WebSocket服务器集群
4.2.1 Netty WebSocket服务器
Maven依赖:
1 2 3 4 5 6 7 8 9 10 11 12
| <dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.68.Final</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.7.0</version> </dependency> </dependencies>
|
Netty服务器代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| package com.example.websocket;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler;
public class WebSocketServer { private static final int PORT = 8080; public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerProtocolHandler("/chat")); pipeline.addLast(new ChatWebSocketHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future = bootstrap.bind(PORT).sync(); System.out.println("WebSocket服务器启动,端口: " + PORT); future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new WebSocketServer().start(); } }
|
4.2.2 WebSocket处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
| package com.example.websocket;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig;
public class ChatWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> { private static JedisPool jedisPool; static { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(100); config.setMaxIdle(20); config.setMinIdle(5); jedisPool = new JedisPool(config, "192.168.1.201", 6379); } private String roomId; private String userId; @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("新连接: " + ctx.channel().remoteAddress()); } @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) { if (frame instanceof TextWebSocketFrame) { String message = ((TextWebSocketFrame) frame).text(); handleMessage(ctx, message); } } private void handleMessage(ChannelHandlerContext ctx, String message) { try { JSONObject json = JSON.parseObject(message); String type = json.getString("type"); if ("join".equals(type)) { roomId = json.getString("roomId"); userId = json.getString("userId"); joinRoom(ctx, roomId, userId); } else if ("comment".equals(type)) { String content = json.getString("content"); sendComment(roomId, userId, content); } else if ("heartbeat".equals(type)) { sendHeartbeat(ctx); } } catch (Exception e) { e.printStackTrace(); } } private void joinRoom(ChannelHandlerContext ctx, String roomId, String userId) { try (Jedis jedis = jedisPool.getResource()) { String channelKey = "room:" + roomId + ":channels"; jedis.sadd(channelKey, ctx.channel().id().asShortText()); RedisSubscriber subscriber = new RedisSubscriber(ctx); new Thread(() -> { try (Jedis subJedis = jedisPool.getResource()) { subJedis.subscribe(subscriber, "room:" + roomId); } }).start(); } } private void sendComment(String roomId, String userId, String content) { JSONObject comment = new JSONObject(); comment.put("type", "comment"); comment.put("roomId", roomId); comment.put("userId", userId); comment.put("content", content); comment.put("timestamp", System.currentTimeMillis()); try (Jedis jedis = jedisPool.getResource()) { jedis.publish("room:" + roomId, comment.toJSONString()); } saveCommentAsync(roomId, userId, content); } private void saveCommentAsync(String roomId, String userId, String content) { } private void sendHeartbeat(ChannelHandlerContext ctx) { JSONObject response = new JSONObject(); response.put("type", "heartbeat"); response.put("timestamp", System.currentTimeMillis()); ctx.writeAndFlush(new TextWebSocketFrame(response.toJSONString())); } @Override public void channelInactive(ChannelHandlerContext ctx) { if (roomId != null) { try (Jedis jedis = jedisPool.getResource()) { String channelKey = "room:" + roomId + ":channels"; jedis.srem(channelKey, ctx.channel().id().asShortText()); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
|
4.3 Redis集群
4.3.1 Redis集群配置
Redis集群(3主3从):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| redis-server --port 7001 --cluster-enabled yes --cluster-config-file nodes-7001.conf
redis-server --port 7002 --cluster-enabled yes --cluster-config-file nodes-7002.conf
redis-server --port 7003 --cluster-enabled yes --cluster-config-file nodes-7003.conf
redis-server --port 7004 --cluster-enabled yes --cluster-config-file nodes-7004.conf
redis-server --port 7005 --cluster-enabled yes --cluster-config-file nodes-7005.conf
redis-server --port 7006 --cluster-enabled yes --cluster-config-file nodes-7006.conf
|
创建集群:
1 2 3 4
| redis-cli --cluster create \ 192.168.1.201:7001 192.168.1.202:7002 192.168.1.203:7003 \ 192.168.1.201:7004 192.168.1.202:7005 192.168.1.203:7006 \ --cluster-replicas 1
|
4.3.2 Redis Pub/Sub
发布消息:
1 2 3
| try (Jedis jedis = jedisPool.getResource()) { jedis.publish("room:" + roomId, message); }
|
订阅消息:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class RedisSubscriber extends JedisPubSub { private ChannelHandlerContext ctx; public RedisSubscriber(ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void onMessage(String channel, String message) { ctx.writeAndFlush(new TextWebSocketFrame(message)); } }
|
4.4 故障转移
4.4.1 Nginx主备
Keepalived配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| # /etc/keepalived/keepalived.conf vrrp_script chk_nginx { script "/etc/keepalived/check_nginx.sh" interval 2 weight -5 }
vrrp_instance VI_1 { state MASTER interface eth0 virtual_router_id 51 priority 100 virtual_ipaddress { 192.168.1.100 } track_script { chk_nginx } }
|
4.4.2 服务健康检查
健康检查接口:
1 2 3 4 5 6 7 8 9 10 11 12
| @RestController public class HealthController { @GetMapping("/health") public Map<String, Object> health() { Map<String, Object> result = new HashMap<>(); result.put("status", "UP"); result.put("connections", getActiveConnections()); result.put("timestamp", System.currentTimeMillis()); return result; } }
|
5. 数据存储
5.1 MySQL数据持久化
5.1.1 数据库设计
评论表:
1 2 3 4 5 6 7 8 9 10 11
| CREATE TABLE `live_comment` ( `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '评论ID', `room_id` VARCHAR(50) NOT NULL COMMENT '房间ID', `user_id` VARCHAR(50) NOT NULL COMMENT '用户ID', `user_name` VARCHAR(100) NOT NULL COMMENT '用户名', `content` TEXT NOT NULL COMMENT '评论内容', `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`id`), KEY `idx_room_id` (`room_id`), KEY `idx_create_time` (`create_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播评论表';
|
5.1.2 异步写入
Kafka Consumer写入数据库:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Component public class CommentConsumer { @KafkaListener(topics = "live-comment", groupId = "comment-processor") public void processComment(String message) { try { JSONObject comment = JSON.parseObject(message); commentService.saveComment( comment.getString("roomId"), comment.getString("userId"), comment.getString("userName"), comment.getString("content") ); } catch (Exception e) { log.error("处理评论失败: " + message, e); } } }
|
批量插入优化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Service public class CommentService { private List<Comment> commentBuffer = new ArrayList<>(); private final int BATCH_SIZE = 100; @Scheduled(fixedDelay = 1000) public void batchInsert() { if (commentBuffer.size() >= BATCH_SIZE) { commentMapper.batchInsert(commentBuffer); commentBuffer.clear(); } } public void saveComment(String roomId, String userId, String userName, String content) { Comment comment = new Comment(); comment.setRoomId(roomId); comment.setUserId(userId); comment.setUserName(userName); comment.setContent(content); commentBuffer.add(comment); } }
|
5.2 Redis缓存
5.2.1 热点评论缓存
缓存最新评论:
1 2 3 4 5 6 7 8 9 10
| public void cacheLatestComments(String roomId, Comment comment) { try (Jedis jedis = jedisPool.getResource()) { String key = "room:" + roomId + ":comments"; jedis.lpush(key, JSON.toJSONString(comment)); jedis.ltrim(key, 0, 999); jedis.expire(key, 3600); } }
|
获取最新评论:
1 2 3 4 5 6 7 8 9 10
| public List<Comment> getLatestComments(String roomId, int count) { try (Jedis jedis = jedisPool.getResource()) { String key = "room:" + roomId + ":comments"; List<String> comments = jedis.lrange(key, 0, count - 1); return comments.stream() .map(c -> JSON.parseObject(c, Comment.class)) .collect(Collectors.toList()); } }
|
5.2.2 在线用户统计
统计在线用户数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void addOnlineUser(String roomId, String userId) { try (Jedis jedis = jedisPool.getResource()) { String key = "room:" + roomId + ":users"; jedis.sadd(key, userId); jedis.expire(key, 3600); } }
public long getOnlineUserCount(String roomId) { try (Jedis jedis = jedisPool.getResource()) { String key = "room:" + roomId + ":users"; return jedis.scard(key); } }
|
5.3 Kafka消息队列
5.3.1 消息发送
发送评论到Kafka:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Service public class CommentProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendComment(String roomId, String userId, String userName, String content) { JSONObject comment = new JSONObject(); comment.put("roomId", roomId); comment.put("userId", userId); comment.put("userName", userName); comment.put("content", content); comment.put("timestamp", System.currentTimeMillis()); kafkaTemplate.send("live-comment", comment.toJSONString()); } }
|
5.3.2 消息消费
消费评论消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Component public class CommentConsumer { @Autowired private CommentService commentService; @KafkaListener(topics = "live-comment", groupId = "comment-processor") public void consume(String message) { try { JSONObject comment = JSON.parseObject(message); commentService.saveComment( comment.getString("roomId"), comment.getString("userId"), comment.getString("userName"), comment.getString("content") ); } catch (Exception e) { log.error("消费评论失败: " + message, e); } } }
|
6. 实时同步
6.1 消息广播
6.1.1 房间内广播
Redis Pub/Sub广播:
1 2 3 4 5 6 7
| public void broadcastComment(String roomId, Comment comment) { try (Jedis jedis = jedisPool.getResource()) { String channel = "room:" + roomId; String message = JSON.toJSONString(comment); jedis.publish(channel, message); } }
|
6.1.2 多房间广播
全站广播:
1 2 3 4 5 6
| public void broadcastToAllRooms(Comment comment) { try (Jedis jedis = jedisPool.getResource()) { String message = JSON.toJSONString(comment); jedis.publish("broadcast:all", message); } }
|
6.2 房间管理
6.2.1 房间信息
房间数据结构:
1 2 3 4 5 6
| public class Room { private String roomId; private String roomName; private long onlineCount; private List<Comment> latestComments; }
|
房间信息缓存:
1 2 3 4 5 6
| public void cacheRoomInfo(Room room) { try (Jedis jedis = jedisPool.getResource()) { String key = "room:info:" + room.getRoomId(); jedis.setex(key, 3600, JSON.toJSONString(room)); } }
|
6.2.2 房间列表
获取房间列表:
1 2 3 4 5 6 7 8 9 10 11
| public List<Room> getRoomList() { try (Jedis jedis = jedisPool.getResource()) { Set<String> roomKeys = jedis.keys("room:info:*"); return roomKeys.stream() .map(key -> { String json = jedis.get(key); return JSON.parseObject(json, Room.class); }) .collect(Collectors.toList()); } }
|
6.3 消息去重
6.3.1 消息ID
生成消息ID:
1 2 3
| public String generateMessageId(String roomId, String userId, long timestamp) { return roomId + ":" + userId + ":" + timestamp; }
|
消息去重:
1 2 3 4 5 6 7 8 9 10
| public boolean isDuplicate(String messageId) { try (Jedis jedis = jedisPool.getResource()) { String key = "message:id:" + messageId; boolean exists = jedis.exists(key); if (!exists) { jedis.setex(key, 60, "1"); } return exists; } }
|
7. 性能优化
7.1 带宽优化
7.1.1 消息压缩
Gzip压缩:
1 2 3 4 5 6 7 8 9 10 11
| public String compressMessage(String message) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); GZIPOutputStream gzos = new GZIPOutputStream(baos); gzos.write(message.getBytes("UTF-8")); gzos.close(); return Base64.getEncoder().encodeToString(baos.toByteArray()); } catch (Exception e) { return message; } }
|
7.1.2 消息精简
精简消息格式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| { "type": "comment", "roomId": "room-001", "userId": "user-001", "userName": "张三", "content": "评论内容", "timestamp": 1234567890 }
{ "t": "c", "r": "room-001", "u": "user-001", "n": "张三", "c": "评论内容", "ts": 1234567890 }
|
7.1.3 批量发送
批量发送消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private List<String> messageBuffer = new ArrayList<>(); private final int BATCH_SIZE = 10;
public void sendMessage(String message) { messageBuffer.add(message); if (messageBuffer.size() >= BATCH_SIZE) { sendBatch(); } }
private void sendBatch() { JSONArray batch = new JSONArray(); batch.addAll(messageBuffer); String batchMessage = batch.toJSONString(); ctx.writeAndFlush(new TextWebSocketFrame(batchMessage)); messageBuffer.clear(); }
|
7.2 连接管理
7.2.1 连接池
连接池配置:
1
| EventLoopGroup workerGroup = new NioEventLoopGroup(16);
|
7.2.2 心跳机制
客户端心跳:
1 2 3 4 5 6
| setInterval(function() { if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({type: 'heartbeat'})); } }, 30000);
|
服务器心跳:
1 2 3 4 5 6 7
| @Scheduled(fixedRate = 30000) public void sendHeartbeat() { channelGroup.writeAndFlush(new TextWebSocketFrame( JSON.toJSONString(Collections.singletonMap("type", "heartbeat")) )); }
|
7.3 消息限流
7.3.1 用户限流
限制用户发送频率:
1 2 3 4 5 6 7 8 9 10
| public boolean checkRateLimit(String userId) { try (Jedis jedis = jedisPool.getResource()) { String key = "rate:limit:" + userId; long count = jedis.incr(key); if (count == 1) { jedis.expire(key, 60); } return count <= 10; } }
|
7.3.2 房间限流
限制房间消息频率:
1 2 3 4 5 6 7 8 9 10
| public boolean checkRoomRateLimit(String roomId) { try (Jedis jedis = jedisPool.getResource()) { String key = "rate:room:" + roomId; long count = jedis.incr(key); if (count == 1) { jedis.expire(key, 1); } return count <= 1000; } }
|
8. 前端优化
8.1 页面稳定性
8.1.1 连接重连
自动重连机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| class WebSocketClient { constructor(url) { this.url = url; this.ws = null; this.reconnectInterval = 1000; this.maxReconnectInterval = 30000; this.reconnectAttempts = 0; this.connect(); } connect() { try { this.ws = new WebSocket(this.url); this.ws.onopen = () => { console.log('WebSocket连接成功'); this.reconnectAttempts = 0; this.reconnectInterval = 1000; }; this.ws.onclose = () => { console.log('WebSocket连接关闭,准备重连'); this.reconnect(); }; this.ws.onerror = (error) => { console.error('WebSocket错误:', error); }; this.ws.onmessage = (event) => { this.handleMessage(event.data); }; } catch (error) { console.error('连接失败:', error); this.reconnect(); } } reconnect() { if (this.reconnectAttempts < 10) { setTimeout(() => { this.reconnectAttempts++; this.reconnectInterval = Math.min( this.reconnectInterval * 2, this.maxReconnectInterval ); console.log(`重连尝试 ${this.reconnectAttempts}...`); this.connect(); }, this.reconnectInterval); } } handleMessage(data) { const messages = JSON.parse(data); if (Array.isArray(messages)) { messages.forEach(msg => this.renderComment(msg)); } else { this.renderComment(messages); } } renderComment(comment) { const commentElement = document.createElement('div'); commentElement.className = 'comment'; commentElement.innerHTML = ` <span class="user">${comment.n}</span> <span class="content">${comment.c}</span> `; document.getElementById('comments').appendChild(commentElement); const comments = document.getElementById('comments'); if (comments.children.length > 1000) { comments.removeChild(comments.firstChild); } comments.scrollTop = comments.scrollHeight; } }
|
8.1.2 消息队列
前端消息队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| class MessageQueue { constructor() { this.queue = []; this.processing = false; } add(message) { this.queue.push(message); if (!this.processing) { this.process(); } } process() { this.processing = true; while (this.queue.length > 0) { const message = this.queue.shift(); this.renderComment(message); } this.processing = false; } }
|
8.2 消息渲染优化
8.2.1 虚拟滚动
虚拟滚动实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| class VirtualScroll { constructor(container, itemHeight, visibleCount) { this.container = container; this.itemHeight = itemHeight; this.visibleCount = visibleCount; this.items = []; this.startIndex = 0; this.endIndex = visibleCount; } addItem(item) { this.items.push(item); this.update(); } update() { const scrollTop = this.container.scrollTop; this.startIndex = Math.floor(scrollTop / this.itemHeight); this.endIndex = Math.min( this.startIndex + this.visibleCount, this.items.length ); this.render(); } render() { const fragment = document.createDocumentFragment(); for (let i = this.startIndex; i < this.endIndex; i++) { const item = this.items[i]; const element = this.createItemElement(item); fragment.appendChild(element); } this.container.innerHTML = ''; this.container.appendChild(fragment); } }
|
8.2.2 防抖节流
防抖处理:
1 2 3 4 5 6 7 8 9 10 11 12
| function debounce(func, wait) { let timeout; return function(...args) { clearTimeout(timeout); timeout = setTimeout(() => func.apply(this, args), wait); }; }
const renderComments = debounce(function(comments) { comments.forEach(comment => renderComment(comment)); }, 100);
|
节流处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| function throttle(func, limit) { let inThrottle; return function(...args) { if (!inThrottle) { func.apply(this, args); inThrottle = true; setTimeout(() => inThrottle = false, limit); } }; }
const handleScroll = throttle(function() { }, 100);
|
8.3 性能监控
8.3.1 前端监控
监控指标:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| class PerformanceMonitor { constructor() { this.metrics = { messageCount: 0, renderTime: 0, errorCount: 0 }; } recordMessage() { this.metrics.messageCount++; } recordRenderTime(time) { this.metrics.renderTime += time; } recordError() { this.metrics.errorCount++; } getReport() { return { ...this.metrics, avgRenderTime: this.metrics.renderTime / this.metrics.messageCount }; } sendReport() { fetch('/api/metrics', { method: 'POST', body: JSON.stringify(this.getReport()) }); } }
|
9. 监控告警
9.1 性能监控
9.1.1 连接数监控
监控连接数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Component public class ConnectionMonitor { private AtomicLong connectionCount = new AtomicLong(0); public void increment() { connectionCount.incrementAndGet(); } public void decrement() { connectionCount.decrementAndGet(); } public long getConnectionCount() { return connectionCount.get(); } @Scheduled(fixedRate = 5000) public void report() { long count = getConnectionCount(); metricsService.record("websocket.connections", count); } }
|
9.1.2 消息速率监控
监控消息速率:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Component public class MessageRateMonitor { private AtomicLong messageCount = new AtomicLong(0); public void increment() { messageCount.incrementAndGet(); } @Scheduled(fixedRate = 1000) public void report() { long count = messageCount.getAndSet(0); metricsService.record("websocket.messages.rate", count); } }
|
9.2 告警机制
9.2.1 告警规则
告警配置:
1 2 3 4 5 6 7 8 9 10 11 12 13
| alerts: - name: high_connection_count condition: websocket.connections > 100000 severity: warning - name: high_message_rate condition: websocket.messages.rate > 10000 severity: warning - name: service_down condition: service.health == "DOWN" severity: critical
|
9.2.2 告警通知
发送告警:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Service public class AlertService { public void sendAlert(String alertName, String message) { emailService.send("admin@example.com", alertName, message); smsService.send("13800138000", message); dingtalkService.send(alertName + ": " + message); } }
|
10. 总结
10.1 核心要点
- WebSocket基础:协议原理、连接建立、消息格式
- 系统架构:负载均衡、服务集群、数据存储
- 高可用:故障转移、健康检查、监控告警
- 性能优化:带宽优化、连接管理、消息限流
- 前端优化:页面稳定性、消息渲染、性能监控
10.2 架构师建议
服务器规划:
- 10-20台WebSocket服务器
- 6台Redis集群
- 3台Kafka集群
- 2台MySQL主从
性能优化:
稳定性保障:
10.3 最佳实践
- 标准化:统一消息格式、错误处理
- 监控化:实时监控、告警机制
- 文档化:维护架构文档、API文档
- 测试化:压力测试、故障演练
相关文章: