WebSocket百万级别直播评论系统实战

1. 概述

1.1 直播评论系统挑战

直播评论系统是实时性要求极高的应用场景,需要支持:

  • 百万级并发:同时在线用户百万+
  • 实时性:评论秒级同步到所有用户
  • 高可用:7×24小时稳定运行
  • 数据持久化:评论数据可靠存储
  • 带宽优化:减少带宽消耗
  • 页面稳定:前端流畅不卡顿

1.2 WebSocket技术优势

WebSocket是HTML5提供的全双工通信协议,相比HTTP轮询具有以下优势:

  • 低延迟:建立连接后实时通信
  • 低开销:无需重复建立连接
  • 全双工:客户端和服务器可以同时发送数据
  • 协议升级:基于HTTP协议升级

1.3 本文内容结构

本文将从以下几个方面全面解析WebSocket百万级直播评论系统:

  1. WebSocket基础:协议原理、连接建立、消息格式
  2. 系统架构设计:整体架构、组件选型
  3. 高可用实现:负载均衡、服务集群、故障转移
  4. 数据存储:MySQL、Redis、消息队列
  5. 实时同步:消息广播、房间管理
  6. 性能优化:带宽优化、连接管理、消息压缩
  7. 前端优化:页面稳定性、消息渲染、防抖节流
  8. 监控告警:性能监控、告警机制

2. WebSocket基础

2.1 WebSocket协议

2.1.1 协议特点

WebSocket协议

  • 基于TCP协议
  • 全双工通信
  • 低延迟、低开销
  • 支持二进制和文本消息

2.1.2 连接建立

握手过程

1
2
3
客户端 → 服务器:HTTP Upgrade请求
服务器 → 客户端:HTTP 101 Switching Protocols
连接建立,开始WebSocket通信

客户端请求

1
2
3
4
5
6
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

服务器响应

1
2
3
4
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

2.1.3 消息格式

WebSocket帧格式

  • FIN:是否最后一帧
  • Opcode:操作码(文本、二进制、关闭等)
  • Mask:是否掩码
  • Payload Length:数据长度
  • Payload Data:实际数据

2.2 WebSocket API

2.2.1 客户端API

JavaScript API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建WebSocket连接
const ws = new WebSocket('ws://localhost:8080/chat');

// 连接打开
ws.onopen = function(event) {
console.log('WebSocket连接已建立');
};

// 接收消息
ws.onmessage = function(event) {
console.log('收到消息:', event.data);
};

// 连接关闭
ws.onclose = function(event) {
console.log('WebSocket连接已关闭');
};

// 连接错误
ws.onerror = function(error) {
console.error('WebSocket错误:', error);
};

// 发送消息
ws.send('Hello Server');

// 关闭连接
ws.close();

2.2.2 服务器端API

Java Spring WebSocket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new ChatWebSocketHandler(), "/chat")
.setAllowedOrigins("*");
}
}

@Component
public class ChatWebSocketHandler extends TextWebSocketHandler {

@Override
public void afterConnectionEstablished(WebSocketSession session) {
// 连接建立
}

@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
// 处理消息
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
// 连接关闭
}
}

2.3 WebSocket vs HTTP轮询

2.3.1 性能对比

特性 WebSocket HTTP轮询
延迟 毫秒级 秒级
开销 低(连接复用) 高(频繁请求)
服务器压力
带宽消耗
实时性

2.3.2 适用场景

WebSocket适用

  • 实时聊天
  • 直播评论
  • 在线游戏
  • 实时数据推送

HTTP轮询适用

  • 低频更新
  • 简单场景
  • 兼容性要求高

3. 系统架构设计

3.1 整体架构

3.1.1 架构图

1
2
3
4
5
6
7
8
9
10
11
百万用户

Nginx(负载均衡 + WebSocket代理)

WebSocket服务器集群(10-20台)

├──→ Redis(房间管理、消息缓存)
├──→ Kafka(消息队列、异步处理)
└──→ MySQL(数据持久化)

监控告警系统

3.1.2 架构说明

接入层

  • Nginx负载均衡
  • WebSocket代理
  • SSL终端

服务层

  • WebSocket服务器集群
  • 房间管理服务
  • 消息处理服务

数据层

  • Redis:房间信息、在线用户、消息缓存
  • Kafka:消息队列、异步处理
  • MySQL:评论数据持久化

监控层

  • 性能监控
  • 告警系统

3.2 组件选型

3.2.1 WebSocket服务器

选型

  • Spring WebSocket:Java生态,易于集成
  • Netty:高性能,适合高并发
  • Node.js + Socket.io:轻量级,开发快速

推荐:Netty(百万级并发)

3.2.2 消息中间件

选型

  • Redis Pub/Sub:轻量级,低延迟
  • Kafka:高吞吐,持久化
  • RabbitMQ:功能丰富,可靠性高

推荐:Redis Pub/Sub(实时)+ Kafka(持久化)

3.2.3 数据存储

选型

  • MySQL:评论数据持久化
  • Redis:热点数据缓存、房间管理
  • MongoDB:可选,适合非结构化数据

推荐:MySQL + Redis

3.3 服务器规划

3.3.1 服务器配置

最低成本配置(支持百万级并发):

服务器 配置 数量 用途
Nginx 4核8G 2台(主备) 负载均衡
WebSocket服务器 8核16G 10-20台 WebSocket服务
Redis集群 4核16G 6台(3主3从) 缓存、Pub/Sub
Kafka集群 4核16G 3台 消息队列
MySQL 8核32G 2台(主从) 数据持久化
监控服务器 4核8G 1台 监控告警

总成本:约25-30台服务器

3.3.2 性能估算

单台WebSocket服务器

  • 8核16G服务器
  • 预计支持5-10万并发连接
  • 10-20台服务器可支持100-200万并发

带宽估算

  • 每条评论约100字节
  • 100万用户,每秒1000条评论
  • 下行带宽:1000 × 100 × 100万 = 100GB/s(需要CDN)

4. 高可用实现

4.1 Nginx负载均衡

4.1.1 WebSocket代理配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# /etc/nginx/conf.d/websocket.conf

# WebSocket服务器upstream
upstream websocket_backend {
least_conn; # 最少连接
server 192.168.1.101:8080 weight=1 max_fails=3 fail_timeout=30s;
server 192.168.1.102:8080 weight=1 max_fails=3 fail_timeout=30s;
server 192.168.1.103:8080 weight=1 max_fails=3 fail_timeout=30s;
# ... 更多服务器
}

server {
listen 80;
server_name chat.example.com;

# WebSocket代理
location /chat {
proxy_pass http://websocket_backend;

# WebSocket必需配置
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";

# 其他代理配置
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

# 超时配置(WebSocket长连接)
proxy_connect_timeout 7d;
proxy_send_timeout 7d;
proxy_read_timeout 7d;
}

# 健康检查
location /health {
access_log off;
return 200 "healthy\n";
}
}

4.1.2 SSL配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
server {
listen 443 ssl;
server_name chat.example.com;

# SSL证书
ssl_certificate /etc/nginx/ssl/chat.example.com.crt;
ssl_certificate_key /etc/nginx/ssl/chat.example.com.key;

# SSL优化
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;

# WebSocket代理(WSS)
location /chat {
proxy_pass http://websocket_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
# ... 其他配置
}
}

4.2 WebSocket服务器集群

4.2.1 Netty WebSocket服务器

Maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.68.Final</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>
</dependencies>

Netty服务器代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.example.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class WebSocketServer {

private static final int PORT = 8080;

public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();

// HTTP编解码器
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(65536));

// WebSocket协议处理器
pipeline.addLast(new WebSocketServerProtocolHandler("/chat"));

// 自定义处理器
pipeline.addLast(new ChatWebSocketHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture future = bootstrap.bind(PORT).sync();
System.out.println("WebSocket服务器启动,端口: " + PORT);
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
new WebSocketServer().start();
}
}

4.2.2 WebSocket处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package com.example.websocket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class ChatWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

private static JedisPool jedisPool;

static {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(100);
config.setMaxIdle(20);
config.setMinIdle(5);
jedisPool = new JedisPool(config, "192.168.1.201", 6379);
}

private String roomId;
private String userId;

@Override
public void channelActive(ChannelHandlerContext ctx) {
// 连接建立
System.out.println("新连接: " + ctx.channel().remoteAddress());
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof TextWebSocketFrame) {
String message = ((TextWebSocketFrame) frame).text();
handleMessage(ctx, message);
}
}

private void handleMessage(ChannelHandlerContext ctx, String message) {
try {
// 解析消息
JSONObject json = JSON.parseObject(message);
String type = json.getString("type");

if ("join".equals(type)) {
// 加入房间
roomId = json.getString("roomId");
userId = json.getString("userId");
joinRoom(ctx, roomId, userId);
} else if ("comment".equals(type)) {
// 发送评论
String content = json.getString("content");
sendComment(roomId, userId, content);
} else if ("heartbeat".equals(type)) {
// 心跳
sendHeartbeat(ctx);
}
} catch (Exception e) {
e.printStackTrace();
}
}

private void joinRoom(ChannelHandlerContext ctx, String roomId, String userId) {
try (Jedis jedis = jedisPool.getResource()) {
// 存储连接信息
String channelKey = "room:" + roomId + ":channels";
jedis.sadd(channelKey, ctx.channel().id().asShortText());

// 订阅房间消息
RedisSubscriber subscriber = new RedisSubscriber(ctx);
new Thread(() -> {
try (Jedis subJedis = jedisPool.getResource()) {
subJedis.subscribe(subscriber, "room:" + roomId);
}
}).start();
}
}

private void sendComment(String roomId, String userId, String content) {
// 构建评论消息
JSONObject comment = new JSONObject();
comment.put("type", "comment");
comment.put("roomId", roomId);
comment.put("userId", userId);
comment.put("content", content);
comment.put("timestamp", System.currentTimeMillis());

// 发布到Redis
try (Jedis jedis = jedisPool.getResource()) {
jedis.publish("room:" + roomId, comment.toJSONString());
}

// 异步保存到数据库
saveCommentAsync(roomId, userId, content);
}

private void saveCommentAsync(String roomId, String userId, String content) {
// 发送到Kafka异步处理
// ... Kafka Producer代码
}

private void sendHeartbeat(ChannelHandlerContext ctx) {
JSONObject response = new JSONObject();
response.put("type", "heartbeat");
response.put("timestamp", System.currentTimeMillis());
ctx.writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
// 连接关闭
if (roomId != null) {
try (Jedis jedis = jedisPool.getResource()) {
String channelKey = "room:" + roomId + ":channels";
jedis.srem(channelKey, ctx.channel().id().asShortText());
}
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

4.3 Redis集群

4.3.1 Redis集群配置

Redis集群(3主3从)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 节点1(主)
redis-server --port 7001 --cluster-enabled yes --cluster-config-file nodes-7001.conf

# 节点2(主)
redis-server --port 7002 --cluster-enabled yes --cluster-config-file nodes-7002.conf

# 节点3(主)
redis-server --port 7003 --cluster-enabled yes --cluster-config-file nodes-7003.conf

# 节点4(从)
redis-server --port 7004 --cluster-enabled yes --cluster-config-file nodes-7004.conf

# 节点5(从)
redis-server --port 7005 --cluster-enabled yes --cluster-config-file nodes-7005.conf

# 节点6(从)
redis-server --port 7006 --cluster-enabled yes --cluster-config-file nodes-7006.conf

创建集群

1
2
3
4
redis-cli --cluster create \
192.168.1.201:7001 192.168.1.202:7002 192.168.1.203:7003 \
192.168.1.201:7004 192.168.1.202:7005 192.168.1.203:7006 \
--cluster-replicas 1

4.3.2 Redis Pub/Sub

发布消息

1
2
3
try (Jedis jedis = jedisPool.getResource()) {
jedis.publish("room:" + roomId, message);
}

订阅消息

1
2
3
4
5
6
7
8
9
10
11
12
13
public class RedisSubscriber extends JedisPubSub {
private ChannelHandlerContext ctx;

public RedisSubscriber(ChannelHandlerContext ctx) {
this.ctx = ctx;
}

@Override
public void onMessage(String channel, String message) {
// 转发消息到WebSocket客户端
ctx.writeAndFlush(new TextWebSocketFrame(message));
}
}

4.4 故障转移

4.4.1 Nginx主备

Keepalived配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# /etc/keepalived/keepalived.conf
vrrp_script chk_nginx {
script "/etc/keepalived/check_nginx.sh"
interval 2
weight -5
}

vrrp_instance VI_1 {
state MASTER
interface eth0
virtual_router_id 51
priority 100
virtual_ipaddress {
192.168.1.100
}
track_script {
chk_nginx
}
}

4.4.2 服务健康检查

健康检查接口

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
public class HealthController {

@GetMapping("/health")
public Map<String, Object> health() {
Map<String, Object> result = new HashMap<>();
result.put("status", "UP");
result.put("connections", getActiveConnections());
result.put("timestamp", System.currentTimeMillis());
return result;
}
}

5. 数据存储

5.1 MySQL数据持久化

5.1.1 数据库设计

评论表

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE `live_comment` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '评论ID',
`room_id` VARCHAR(50) NOT NULL COMMENT '房间ID',
`user_id` VARCHAR(50) NOT NULL COMMENT '用户ID',
`user_name` VARCHAR(100) NOT NULL COMMENT '用户名',
`content` TEXT NOT NULL COMMENT '评论内容',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_room_id` (`room_id`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播评论表';

5.1.2 异步写入

Kafka Consumer写入数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class CommentConsumer {

@KafkaListener(topics = "live-comment", groupId = "comment-processor")
public void processComment(String message) {
try {
JSONObject comment = JSON.parseObject(message);

// 批量写入数据库
commentService.saveComment(
comment.getString("roomId"),
comment.getString("userId"),
comment.getString("userName"),
comment.getString("content")
);
} catch (Exception e) {
// 记录错误日志
log.error("处理评论失败: " + message, e);
}
}
}

批量插入优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
public class CommentService {

private List<Comment> commentBuffer = new ArrayList<>();
private final int BATCH_SIZE = 100;

@Scheduled(fixedDelay = 1000) // 每秒批量插入
public void batchInsert() {
if (commentBuffer.size() >= BATCH_SIZE) {
commentMapper.batchInsert(commentBuffer);
commentBuffer.clear();
}
}

public void saveComment(String roomId, String userId, String userName, String content) {
Comment comment = new Comment();
comment.setRoomId(roomId);
comment.setUserId(userId);
comment.setUserName(userName);
comment.setContent(content);
commentBuffer.add(comment);
}
}

5.2 Redis缓存

5.2.1 热点评论缓存

缓存最新评论

1
2
3
4
5
6
7
8
9
10
public void cacheLatestComments(String roomId, Comment comment) {
try (Jedis jedis = jedisPool.getResource()) {
String key = "room:" + roomId + ":comments";

// 添加到列表(最多保留1000条)
jedis.lpush(key, JSON.toJSONString(comment));
jedis.ltrim(key, 0, 999);
jedis.expire(key, 3600); // 1小时过期
}
}

获取最新评论

1
2
3
4
5
6
7
8
9
10
public List<Comment> getLatestComments(String roomId, int count) {
try (Jedis jedis = jedisPool.getResource()) {
String key = "room:" + roomId + ":comments";
List<String> comments = jedis.lrange(key, 0, count - 1);

return comments.stream()
.map(c -> JSON.parseObject(c, Comment.class))
.collect(Collectors.toList());
}
}

5.2.2 在线用户统计

统计在线用户数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void addOnlineUser(String roomId, String userId) {
try (Jedis jedis = jedisPool.getResource()) {
String key = "room:" + roomId + ":users";
jedis.sadd(key, userId);
jedis.expire(key, 3600);
}
}

public long getOnlineUserCount(String roomId) {
try (Jedis jedis = jedisPool.getResource()) {
String key = "room:" + roomId + ":users";
return jedis.scard(key);
}
}

5.3 Kafka消息队列

5.3.1 消息发送

发送评论到Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Service
public class CommentProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendComment(String roomId, String userId, String userName, String content) {
JSONObject comment = new JSONObject();
comment.put("roomId", roomId);
comment.put("userId", userId);
comment.put("userName", userName);
comment.put("content", content);
comment.put("timestamp", System.currentTimeMillis());

kafkaTemplate.send("live-comment", comment.toJSONString());
}
}

5.3.2 消息消费

消费评论消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class CommentConsumer {

@Autowired
private CommentService commentService;

@KafkaListener(topics = "live-comment", groupId = "comment-processor")
public void consume(String message) {
try {
JSONObject comment = JSON.parseObject(message);
commentService.saveComment(
comment.getString("roomId"),
comment.getString("userId"),
comment.getString("userName"),
comment.getString("content")
);
} catch (Exception e) {
log.error("消费评论失败: " + message, e);
}
}
}

6. 实时同步

6.1 消息广播

6.1.1 房间内广播

Redis Pub/Sub广播

1
2
3
4
5
6
7
public void broadcastComment(String roomId, Comment comment) {
try (Jedis jedis = jedisPool.getResource()) {
String channel = "room:" + roomId;
String message = JSON.toJSONString(comment);
jedis.publish(channel, message);
}
}

6.1.2 多房间广播

全站广播

1
2
3
4
5
6
public void broadcastToAllRooms(Comment comment) {
try (Jedis jedis = jedisPool.getResource()) {
String message = JSON.toJSONString(comment);
jedis.publish("broadcast:all", message);
}
}

6.2 房间管理

6.2.1 房间信息

房间数据结构

1
2
3
4
5
6
public class Room {
private String roomId;
private String roomName;
private long onlineCount;
private List<Comment> latestComments;
}

房间信息缓存

1
2
3
4
5
6
public void cacheRoomInfo(Room room) {
try (Jedis jedis = jedisPool.getResource()) {
String key = "room:info:" + room.getRoomId();
jedis.setex(key, 3600, JSON.toJSONString(room));
}
}

6.2.2 房间列表

获取房间列表

1
2
3
4
5
6
7
8
9
10
11
public List<Room> getRoomList() {
try (Jedis jedis = jedisPool.getResource()) {
Set<String> roomKeys = jedis.keys("room:info:*");
return roomKeys.stream()
.map(key -> {
String json = jedis.get(key);
return JSON.parseObject(json, Room.class);
})
.collect(Collectors.toList());
}
}

6.3 消息去重

6.3.1 消息ID

生成消息ID

1
2
3
public String generateMessageId(String roomId, String userId, long timestamp) {
return roomId + ":" + userId + ":" + timestamp;
}

消息去重

1
2
3
4
5
6
7
8
9
10
public boolean isDuplicate(String messageId) {
try (Jedis jedis = jedisPool.getResource()) {
String key = "message:id:" + messageId;
boolean exists = jedis.exists(key);
if (!exists) {
jedis.setex(key, 60, "1"); // 60秒过期
}
return exists;
}
}

7. 性能优化

7.1 带宽优化

7.1.1 消息压缩

Gzip压缩

1
2
3
4
5
6
7
8
9
10
11
public String compressMessage(String message) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gzos = new GZIPOutputStream(baos);
gzos.write(message.getBytes("UTF-8"));
gzos.close();
return Base64.getEncoder().encodeToString(baos.toByteArray());
} catch (Exception e) {
return message;
}
}

7.1.2 消息精简

精简消息格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 完整格式
{
"type": "comment",
"roomId": "room-001",
"userId": "user-001",
"userName": "张三",
"content": "评论内容",
"timestamp": 1234567890
}

// 精简格式
{
"t": "c",
"r": "room-001",
"u": "user-001",
"n": "张三",
"c": "评论内容",
"ts": 1234567890
}

7.1.3 批量发送

批量发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private List<String> messageBuffer = new ArrayList<>();
private final int BATCH_SIZE = 10;

public void sendMessage(String message) {
messageBuffer.add(message);
if (messageBuffer.size() >= BATCH_SIZE) {
sendBatch();
}
}

private void sendBatch() {
JSONArray batch = new JSONArray();
batch.addAll(messageBuffer);
String batchMessage = batch.toJSONString();
// 发送批量消息
ctx.writeAndFlush(new TextWebSocketFrame(batchMessage));
messageBuffer.clear();
}

7.2 连接管理

7.2.1 连接池

连接池配置

1
EventLoopGroup workerGroup = new NioEventLoopGroup(16);  // 16个工作线程

7.2.2 心跳机制

客户端心跳

1
2
3
4
5
6
// 每30秒发送心跳
setInterval(function() {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({type: 'heartbeat'}));
}
}, 30000);

服务器心跳

1
2
3
4
5
6
7
@Scheduled(fixedRate = 30000)  // 每30秒
public void sendHeartbeat() {
// 向所有连接发送心跳
channelGroup.writeAndFlush(new TextWebSocketFrame(
JSON.toJSONString(Collections.singletonMap("type", "heartbeat"))
));
}

7.3 消息限流

7.3.1 用户限流

限制用户发送频率

1
2
3
4
5
6
7
8
9
10
public boolean checkRateLimit(String userId) {
try (Jedis jedis = jedisPool.getResource()) {
String key = "rate:limit:" + userId;
long count = jedis.incr(key);
if (count == 1) {
jedis.expire(key, 60); // 60秒窗口
}
return count <= 10; // 每分钟最多10条
}
}

7.3.2 房间限流

限制房间消息频率

1
2
3
4
5
6
7
8
9
10
public boolean checkRoomRateLimit(String roomId) {
try (Jedis jedis = jedisPool.getResource()) {
String key = "rate:room:" + roomId;
long count = jedis.incr(key);
if (count == 1) {
jedis.expire(key, 1); // 1秒窗口
}
return count <= 1000; // 每秒最多1000条
}
}

8. 前端优化

8.1 页面稳定性

8.1.1 连接重连

自动重连机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class WebSocketClient {
constructor(url) {
this.url = url;
this.ws = null;
this.reconnectInterval = 1000;
this.maxReconnectInterval = 30000;
this.reconnectAttempts = 0;
this.connect();
}

connect() {
try {
this.ws = new WebSocket(this.url);

this.ws.onopen = () => {
console.log('WebSocket连接成功');
this.reconnectAttempts = 0;
this.reconnectInterval = 1000;
};

this.ws.onclose = () => {
console.log('WebSocket连接关闭,准备重连');
this.reconnect();
};

this.ws.onerror = (error) => {
console.error('WebSocket错误:', error);
};

this.ws.onmessage = (event) => {
this.handleMessage(event.data);
};
} catch (error) {
console.error('连接失败:', error);
this.reconnect();
}
}

reconnect() {
if (this.reconnectAttempts < 10) {
setTimeout(() => {
this.reconnectAttempts++;
this.reconnectInterval = Math.min(
this.reconnectInterval * 2,
this.maxReconnectInterval
);
console.log(`重连尝试 ${this.reconnectAttempts}...`);
this.connect();
}, this.reconnectInterval);
}
}

handleMessage(data) {
// 处理消息
const messages = JSON.parse(data);
if (Array.isArray(messages)) {
messages.forEach(msg => this.renderComment(msg));
} else {
this.renderComment(messages);
}
}

renderComment(comment) {
// 渲染评论
const commentElement = document.createElement('div');
commentElement.className = 'comment';
commentElement.innerHTML = `
<span class="user">${comment.n}</span>
<span class="content">${comment.c}</span>
`;
document.getElementById('comments').appendChild(commentElement);

// 限制显示数量
const comments = document.getElementById('comments');
if (comments.children.length > 1000) {
comments.removeChild(comments.firstChild);
}

// 滚动到底部
comments.scrollTop = comments.scrollHeight;
}
}

8.1.2 消息队列

前端消息队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class MessageQueue {
constructor() {
this.queue = [];
this.processing = false;
}

add(message) {
this.queue.push(message);
if (!this.processing) {
this.process();
}
}

process() {
this.processing = true;
while (this.queue.length > 0) {
const message = this.queue.shift();
this.renderComment(message);
}
this.processing = false;
}
}

8.2 消息渲染优化

8.2.1 虚拟滚动

虚拟滚动实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class VirtualScroll {
constructor(container, itemHeight, visibleCount) {
this.container = container;
this.itemHeight = itemHeight;
this.visibleCount = visibleCount;
this.items = [];
this.startIndex = 0;
this.endIndex = visibleCount;
}

addItem(item) {
this.items.push(item);
this.update();
}

update() {
const scrollTop = this.container.scrollTop;
this.startIndex = Math.floor(scrollTop / this.itemHeight);
this.endIndex = Math.min(
this.startIndex + this.visibleCount,
this.items.length
);
this.render();
}

render() {
const fragment = document.createDocumentFragment();
for (let i = this.startIndex; i < this.endIndex; i++) {
const item = this.items[i];
const element = this.createItemElement(item);
fragment.appendChild(element);
}
this.container.innerHTML = '';
this.container.appendChild(fragment);
}
}

8.2.2 防抖节流

防抖处理

1
2
3
4
5
6
7
8
9
10
11
12
function debounce(func, wait) {
let timeout;
return function(...args) {
clearTimeout(timeout);
timeout = setTimeout(() => func.apply(this, args), wait);
};
}

// 使用防抖
const renderComments = debounce(function(comments) {
comments.forEach(comment => renderComment(comment));
}, 100);

节流处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function throttle(func, limit) {
let inThrottle;
return function(...args) {
if (!inThrottle) {
func.apply(this, args);
inThrottle = true;
setTimeout(() => inThrottle = false, limit);
}
};
}

// 使用节流
const handleScroll = throttle(function() {
// 处理滚动
}, 100);

8.3 性能监控

8.3.1 前端监控

监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class PerformanceMonitor {
constructor() {
this.metrics = {
messageCount: 0,
renderTime: 0,
errorCount: 0
};
}

recordMessage() {
this.metrics.messageCount++;
}

recordRenderTime(time) {
this.metrics.renderTime += time;
}

recordError() {
this.metrics.errorCount++;
}

getReport() {
return {
...this.metrics,
avgRenderTime: this.metrics.renderTime / this.metrics.messageCount
};
}

sendReport() {
// 发送监控数据到服务器
fetch('/api/metrics', {
method: 'POST',
body: JSON.stringify(this.getReport())
});
}
}

9. 监控告警

9.1 性能监控

9.1.1 连接数监控

监控连接数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class ConnectionMonitor {

private AtomicLong connectionCount = new AtomicLong(0);

public void increment() {
connectionCount.incrementAndGet();
}

public void decrement() {
connectionCount.decrementAndGet();
}

public long getConnectionCount() {
return connectionCount.get();
}

@Scheduled(fixedRate = 5000) // 每5秒
public void report() {
long count = getConnectionCount();
// 发送到监控系统
metricsService.record("websocket.connections", count);
}
}

9.1.2 消息速率监控

监控消息速率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class MessageRateMonitor {

private AtomicLong messageCount = new AtomicLong(0);

public void increment() {
messageCount.incrementAndGet();
}

@Scheduled(fixedRate = 1000) // 每秒
public void report() {
long count = messageCount.getAndSet(0);
metricsService.record("websocket.messages.rate", count);
}
}

9.2 告警机制

9.2.1 告警规则

告警配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# alert-rules.yml
alerts:
- name: high_connection_count
condition: websocket.connections > 100000
severity: warning

- name: high_message_rate
condition: websocket.messages.rate > 10000
severity: warning

- name: service_down
condition: service.health == "DOWN"
severity: critical

9.2.2 告警通知

发送告警

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service
public class AlertService {

public void sendAlert(String alertName, String message) {
// 发送邮件
emailService.send("admin@example.com", alertName, message);

// 发送短信
smsService.send("13800138000", message);

// 发送到钉钉/企业微信
dingtalkService.send(alertName + ": " + message);
}
}

10. 总结

10.1 核心要点

  1. WebSocket基础:协议原理、连接建立、消息格式
  2. 系统架构:负载均衡、服务集群、数据存储
  3. 高可用:故障转移、健康检查、监控告警
  4. 性能优化:带宽优化、连接管理、消息限流
  5. 前端优化:页面稳定性、消息渲染、性能监控

10.2 架构师建议

  1. 服务器规划

    • 10-20台WebSocket服务器
    • 6台Redis集群
    • 3台Kafka集群
    • 2台MySQL主从
  2. 性能优化

    • 消息压缩
    • 批量发送
    • 连接池优化
    • 限流机制
  3. 稳定性保障

    • 自动重连
    • 心跳机制
    • 监控告警
    • 故障转移

10.3 最佳实践

  1. 标准化:统一消息格式、错误处理
  2. 监控化:实时监控、告警机制
  3. 文档化:维护架构文档、API文档
  4. 测试化:压力测试、故障演练

相关文章