1. Netty-WebSocket架构概述

Netty-WebSocket架构是基于Netty框架构建的高性能WebSocket通信系统,具有高并发、低延迟、可扩展等特点,广泛应用于实时通信、在线游戏、直播系统等场景。本文将详细介绍Netty WebSocket服务器、客户端实现、消息处理、连接管理和性能优化的完整解决方案。

1.1 核心功能

  1. WebSocket服务器: 高性能WebSocket服务器实现
  2. 连接管理: 客户端连接管理和状态跟踪
  3. 消息处理: 实时消息处理和路由
  4. 负载均衡: 多服务器负载均衡
  5. 性能优化: 连接池和内存优化

1.2 技术架构

1
2
3
4
5
客户端 → WebSocket连接 → Netty服务器 → 消息处理
↓ ↓ ↓ ↓
连接建立 → 握手协议 → 事件循环 → 业务处理
↓ ↓ ↓ ↓
消息发送 → 协议解析 → 消息路由 → 响应处理

2. Netty WebSocket配置

2.1 Netty WebSocket配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* Netty WebSocket配置类
*/
@Configuration
public class NettyWebSocketConfig {

@Value("${netty.websocket.port}")
private int port;

@Value("${netty.websocket.boss-threads}")
private int bossThreads;

@Value("${netty.websocket.worker-threads}")
private int workerThreads;

@Value("${netty.websocket.max-frame-size}")
private int maxFrameSize;

@Value("${netty.websocket.idle-timeout}")
private int idleTimeout;

/**
* Netty WebSocket配置属性
*/
@Bean
public NettyWebSocketProperties nettyWebSocketProperties() {
return NettyWebSocketProperties.builder()
.port(port)
.bossThreads(bossThreads)
.workerThreads(workerThreads)
.maxFrameSize(maxFrameSize)
.idleTimeout(idleTimeout)
.build();
}

/**
* Boss事件循环组
*/
@Bean
public EventLoopGroup bossGroup() {
return new NioEventLoopGroup(bossThreads);
}

/**
* Worker事件循环组
*/
@Bean
public EventLoopGroup workerGroup() {
return new NioEventLoopGroup(workerThreads);
}

/**
* Netty WebSocket服务器
*/
@Bean
public NettyWebSocketServer nettyWebSocketServer() {
return new NettyWebSocketServer(nettyWebSocketProperties(), bossGroup(), workerGroup());
}
}

/**
* Netty WebSocket配置属性
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class NettyWebSocketProperties {
private int port;
private int bossThreads;
private int workerThreads;
private int maxFrameSize;
private int idleTimeout;

// 连接配置
private int maxConnections = 10000;
private int connectionTimeout = 30000;
private int readTimeout = 60000;
private int writeTimeout = 30000;

// 消息配置
private int maxMessageSize = 65536;
private int messageQueueSize = 1000;
private int batchSize = 100;

// 性能配置
private boolean tcpNodelay = true;
private boolean keepAlive = true;
private int soBacklog = 128;
}

2.2 应用配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# application.yml
netty:
websocket:
port: 8080
boss-threads: 1
worker-threads: 8
max-frame-size: 65536
idle-timeout: 300

# WebSocket配置
websocket:
server:
max-connections: 10000
connection-timeout: 30000
read-timeout: 60000
write-timeout: 30000
message:
max-size: 65536
queue-size: 1000
batch-size: 100

3. Netty WebSocket服务器

3.1 Netty WebSocket服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
/**
* Netty WebSocket服务器
*/
@Component
public class NettyWebSocketServer {

private final NettyWebSocketProperties properties;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final WebSocketChannelManager channelManager;
private final WebSocketMessageHandler messageHandler;

private Channel serverChannel;

public NettyWebSocketServer(NettyWebSocketProperties properties,
EventLoopGroup bossGroup,
EventLoopGroup workerGroup,
WebSocketChannelManager channelManager,
WebSocketMessageHandler messageHandler) {
this.properties = properties;
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.channelManager = channelManager;
this.messageHandler = messageHandler;
}

/**
* 启动服务器
*/
public void start() {
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, properties.getSoBacklog())
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, properties.isKeepAlive())
.childOption(ChannelOption.TCP_NODELAY, properties.isTcpNodelay())
.childHandler(new WebSocketChannelInitializer(channelManager, messageHandler, properties));

ChannelFuture future = bootstrap.bind(properties.getPort()).sync();
serverChannel = future.channel();

log.info("Netty WebSocket服务器启动成功: port={}", properties.getPort());

} catch (Exception e) {
log.error("Netty WebSocket服务器启动失败", e);
throw new RuntimeException("服务器启动失败", e);
}
}

/**
* 停止服务器
*/
public void stop() {
try {
if (serverChannel != null) {
serverChannel.close().sync();
}

bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();

log.info("Netty WebSocket服务器停止成功");

} catch (Exception e) {
log.error("Netty WebSocket服务器停止失败", e);
}
}

/**
* 获取服务器状态
* @return 服务器状态
*/
public ServerStatus getServerStatus() {
return ServerStatus.builder()
.isRunning(serverChannel != null && serverChannel.isActive())
.port(properties.getPort())
.activeConnections(channelManager.getActiveConnectionCount())
.totalConnections(channelManager.getTotalConnectionCount())
.build();
}
}

/**
* WebSocket通道初始化器
*/
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {

private final WebSocketChannelManager channelManager;
private final WebSocketMessageHandler messageHandler;
private final NettyWebSocketProperties properties;

public WebSocketChannelInitializer(WebSocketChannelManager channelManager,
WebSocketMessageHandler messageHandler,
NettyWebSocketProperties properties) {
this.channelManager = channelManager;
this.messageHandler = messageHandler;
this.properties = properties;
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

// HTTP编解码器
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(properties.getMaxFrameSize()));

// WebSocket协议处理器
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true));

// 自定义处理器
pipeline.addLast(new WebSocketFrameHandler(channelManager, messageHandler));

// 空闲检测
pipeline.addLast(new IdleStateHandler(properties.getIdleTimeout(), 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new WebSocketIdleHandler());
}
}

/**
* WebSocket帧处理器
*/
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

private final WebSocketChannelManager channelManager;
private final WebSocketMessageHandler messageHandler;

public WebSocketFrameHandler(WebSocketChannelManager channelManager,
WebSocketMessageHandler messageHandler) {
this.channelManager = channelManager;
this.messageHandler = messageHandler;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 连接建立
channelManager.addChannel(ctx.channel());
log.info("WebSocket连接建立: {}", ctx.channel().remoteAddress());

super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 连接断开
channelManager.removeChannel(ctx.channel());
log.info("WebSocket连接断开: {}", ctx.channel().remoteAddress());

super.channelInactive(ctx);
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
try {
if (frame instanceof TextWebSocketFrame) {
// 处理文本帧
String text = ((TextWebSocketFrame) frame).text();
messageHandler.handleTextMessage(ctx.channel(), text);

} else if (frame instanceof BinaryWebSocketFrame) {
// 处理二进制帧
ByteBuf content = ((BinaryWebSocketFrame) frame).content();
messageHandler.handleBinaryMessage(ctx.channel(), content);

} else if (frame instanceof PingWebSocketFrame) {
// 处理Ping帧
ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));

} else if (frame instanceof CloseWebSocketFrame) {
// 处理关闭帧
ctx.close();
}

} catch (Exception e) {
log.error("处理WebSocket帧失败", e);
ctx.close();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("WebSocket连接异常: {}", ctx.channel().remoteAddress(), cause);
ctx.close();
}
}

/**
* WebSocket空闲处理器
*/
public class WebSocketIdleHandler extends ChannelInboundHandlerAdapter {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.warn("WebSocket连接超时: {}", ctx.channel().remoteAddress());
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}

/**
* 服务器状态
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ServerStatus {
private boolean isRunning;
private int port;
private int activeConnections;
private long totalConnections;
}

4. WebSocket连接管理

4.1 WebSocket通道管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/**
* WebSocket通道管理器
*/
@Component
public class WebSocketChannelManager {

private final ConcurrentHashMap<String, Channel> channels = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, String> channelUsers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Set<String>> userChannels = new ConcurrentHashMap<>();

private final AtomicLong totalConnectionCount = new AtomicLong(0);
private final AtomicLong activeConnectionCount = new AtomicLong(0);

/**
* 添加通道
* @param channel 通道
*/
public void addChannel(Channel channel) {
String channelId = channel.id().asShortText();
channels.put(channelId, channel);

totalConnectionCount.incrementAndGet();
activeConnectionCount.incrementAndGet();

log.debug("添加WebSocket通道: channelId={}, activeCount={}",
channelId, activeConnectionCount.get());
}

/**
* 移除通道
* @param channel 通道
*/
public void removeChannel(Channel channel) {
String channelId = channel.id().asShortText();
Channel removedChannel = channels.remove(channelId);

if (removedChannel != null) {
activeConnectionCount.decrementAndGet();

// 移除用户关联
String userId = channelUsers.remove(channelId);
if (userId != null) {
Set<String> userChannelIds = userChannels.get(userId);
if (userChannelIds != null) {
userChannelIds.remove(channelId);
if (userChannelIds.isEmpty()) {
userChannels.remove(userId);
}
}
}

log.debug("移除WebSocket通道: channelId={}, activeCount={}",
channelId, activeConnectionCount.get());
}
}

/**
* 绑定用户到通道
* @param channel 通道
* @param userId 用户ID
*/
public void bindUserToChannel(Channel channel, String userId) {
String channelId = channel.id().asShortText();

// 绑定通道到用户
channelUsers.put(channelId, userId);

// 绑定用户到通道
userChannels.computeIfAbsent(userId, k -> ConcurrentHashMap.newKeySet()).add(channelId);

log.info("绑定用户到通道: userId={}, channelId={}", userId, channelId);
}

/**
* 解绑用户通道
* @param channel 通道
*/
public void unbindUserFromChannel(Channel channel) {
String channelId = channel.id().asShortText();
String userId = channelUsers.remove(channelId);

if (userId != null) {
Set<String> userChannelIds = userChannels.get(userId);
if (userChannelIds != null) {
userChannelIds.remove(channelId);
if (userChannelIds.isEmpty()) {
userChannels.remove(userId);
}
}

log.info("解绑用户通道: userId={}, channelId={}", userId, channelId);
}
}

/**
* 获取通道
* @param channelId 通道ID
* @return 通道
*/
public Channel getChannel(String channelId) {
return channels.get(channelId);
}

/**
* 获取用户的所有通道
* @param userId 用户ID
* @return 通道列表
*/
public List<Channel> getUserChannels(String userId) {
Set<String> channelIds = userChannels.get(userId);
if (channelIds == null || channelIds.isEmpty()) {
return new ArrayList<>();
}

return channelIds.stream()
.map(channels::get)
.filter(Objects::nonNull)
.filter(Channel::isActive)
.collect(Collectors.toList());
}

/**
* 获取所有活跃通道
* @return 通道列表
*/
public List<Channel> getAllActiveChannels() {
return channels.values().stream()
.filter(Channel::isActive)
.collect(Collectors.toList());
}

/**
* 广播消息
* @param message 消息
*/
public void broadcastMessage(String message) {
List<Channel> activeChannels = getAllActiveChannels();

for (Channel channel : activeChannels) {
if (channel.isActive() && channel.isWritable()) {
channel.writeAndFlush(new TextWebSocketFrame(message));
}
}

log.debug("广播消息: message={}, channelCount={}", message, activeChannels.size());
}

/**
* 向用户发送消息
* @param userId 用户ID
* @param message 消息
*/
public void sendMessageToUser(String userId, String message) {
List<Channel> userChannels = getUserChannels(userId);

for (Channel channel : userChannels) {
if (channel.isActive() && channel.isWritable()) {
channel.writeAndFlush(new TextWebSocketFrame(message));
}
}

log.debug("向用户发送消息: userId={}, message={}, channelCount={}",
userId, message, userChannels.size());
}

/**
* 向通道发送消息
* @param channelId 通道ID
* @param message 消息
*/
public void sendMessageToChannel(String channelId, String message) {
Channel channel = getChannel(channelId);

if (channel != null && channel.isActive() && channel.isWritable()) {
channel.writeAndFlush(new TextWebSocketFrame(message));
log.debug("向通道发送消息: channelId={}, message={}", channelId, message);
} else {
log.warn("通道不可用: channelId={}", channelId);
}
}

/**
* 获取活跃连接数
* @return 活跃连接数
*/
public int getActiveConnectionCount() {
return activeConnectionCount.get();
}

/**
* 获取总连接数
* @return 总连接数
*/
public long getTotalConnectionCount() {
return totalConnectionCount.get();
}

/**
* 获取在线用户数
* @return 在线用户数
*/
public int getOnlineUserCount() {
return userChannels.size();
}

/**
* 检查用户是否在线
* @param userId 用户ID
* @return 是否在线
*/
public boolean isUserOnline(String userId) {
Set<String> channelIds = userChannels.get(userId);
if (channelIds == null || channelIds.isEmpty()) {
return false;
}

// 检查是否有活跃通道
return channelIds.stream()
.map(channels::get)
.anyMatch(channel -> channel != null && channel.isActive());
}
}

5. WebSocket消息处理

5.1 WebSocket消息处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
/**
* WebSocket消息处理器
*/
@Component
public class WebSocketMessageHandler {

private final WebSocketChannelManager channelManager;
private final ObjectMapper objectMapper;

public WebSocketMessageHandler(WebSocketChannelManager channelManager, ObjectMapper objectMapper) {
this.channelManager = channelManager;
this.objectMapper = objectMapper;
}

/**
* 处理文本消息
* @param channel 通道
* @param text 文本消息
*/
public void handleTextMessage(Channel channel, String text) {
try {
// 解析消息
WebSocketMessage message = objectMapper.readValue(text, WebSocketMessage.class);

// 处理不同类型的消息
switch (message.getType()) {
case "AUTH":
handleAuthMessage(channel, message);
break;
case "CHAT":
handleChatMessage(channel, message);
break;
case "HEARTBEAT":
handleHeartbeatMessage(channel, message);
break;
case "JOIN_ROOM":
handleJoinRoomMessage(channel, message);
break;
case "LEAVE_ROOM":
handleLeaveRoomMessage(channel, message);
break;
default:
handleUnknownMessage(channel, message);
}

} catch (Exception e) {
log.error("处理文本消息失败: channel={}, text={}", channel.id().asShortText(), text, e);
sendErrorMessage(channel, "消息处理失败");
}
}

/**
* 处理二进制消息
* @param channel 通道
* @param content 二进制内容
*/
public void handleBinaryMessage(Channel channel, ByteBuf content) {
try {
// 处理二进制消息(如文件传输)
byte[] data = new byte[content.readableBytes()];
content.readBytes(data);

log.debug("收到二进制消息: channel={}, size={}", channel.id().asShortText(), data.length);

// 这里可以实现二进制消息处理逻辑

} catch (Exception e) {
log.error("处理二进制消息失败: channel={}", channel.id().asShortText(), e);
}
}

/**
* 处理认证消息
* @param channel 通道
* @param message 消息
*/
private void handleAuthMessage(Channel channel, WebSocketMessage message) {
try {
String userId = message.getData().get("userId");
String token = message.getData().get("token");

// 验证用户身份
if (validateUser(userId, token)) {
// 绑定用户到通道
channelManager.bindUserToChannel(channel, userId);

// 发送认证成功消息
WebSocketMessage response = WebSocketMessage.builder()
.type("AUTH_SUCCESS")
.data(Map.of("userId", userId))
.timestamp(System.currentTimeMillis())
.build();

sendMessage(channel, response);

log.info("用户认证成功: userId={}, channel={}", userId, channel.id().asShortText());
} else {
// 发送认证失败消息
WebSocketMessage response = WebSocketMessage.builder()
.type("AUTH_FAILED")
.data(Map.of("reason", "认证失败"))
.timestamp(System.currentTimeMillis())
.build();

sendMessage(channel, response);

log.warn("用户认证失败: userId={}, channel={}", userId, channel.id().asShortText());
}

} catch (Exception e) {
log.error("处理认证消息失败", e);
sendErrorMessage(channel, "认证处理失败");
}
}

/**
* 处理聊天消息
* @param channel 通道
* @param message 消息
*/
private void handleChatMessage(Channel channel, WebSocketMessage message) {
try {
String userId = message.getData().get("userId");
String content = message.getData().get("content");
String roomId = message.getData().get("roomId");

// 构建聊天消息
ChatMessage chatMessage = ChatMessage.builder()
.userId(userId)
.content(content)
.roomId(roomId)
.timestamp(System.currentTimeMillis())
.build();

// 广播到房间
if (roomId != null) {
broadcastToRoom(roomId, chatMessage);
} else {
// 广播到所有用户
channelManager.broadcastMessage(objectMapper.writeValueAsString(chatMessage));
}

log.debug("处理聊天消息: userId={}, roomId={}, content={}", userId, roomId, content);

} catch (Exception e) {
log.error("处理聊天消息失败", e);
sendErrorMessage(channel, "聊天消息处理失败");
}
}

/**
* 处理心跳消息
* @param channel 通道
* @param message 消息
*/
private void handleHeartbeatMessage(Channel channel, WebSocketMessage message) {
try {
// 发送心跳响应
WebSocketMessage response = WebSocketMessage.builder()
.type("HEARTBEAT_RESPONSE")
.data(Map.of("timestamp", System.currentTimeMillis()))
.timestamp(System.currentTimeMillis())
.build();

sendMessage(channel, response);

} catch (Exception e) {
log.error("处理心跳消息失败", e);
}
}

/**
* 处理加入房间消息
* @param channel 通道
* @param message 消息
*/
private void handleJoinRoomMessage(Channel channel, WebSocketMessage message) {
try {
String roomId = message.getData().get("roomId");
String userId = message.getData().get("userId");

// 加入房间逻辑
joinRoom(channel, roomId, userId);

log.info("用户加入房间: userId={}, roomId={}", userId, roomId);

} catch (Exception e) {
log.error("处理加入房间消息失败", e);
sendErrorMessage(channel, "加入房间失败");
}
}

/**
* 处理离开房间消息
* @param channel 通道
* @param message 消息
*/
private void handleLeaveRoomMessage(Channel channel, WebSocketMessage message) {
try {
String roomId = message.getData().get("roomId");
String userId = message.getData().get("userId");

// 离开房间逻辑
leaveRoom(channel, roomId, userId);

log.info("用户离开房间: userId={}, roomId={}", userId, roomId);

} catch (Exception e) {
log.error("处理离开房间消息失败", e);
sendErrorMessage(channel, "离开房间失败");
}
}

/**
* 处理未知消息
* @param channel 通道
* @param message 消息
*/
private void handleUnknownMessage(Channel channel, WebSocketMessage message) {
log.warn("收到未知消息类型: type={}, channel={}", message.getType(), channel.id().asShortText());
sendErrorMessage(channel, "未知消息类型");
}

/**
* 发送消息
* @param channel 通道
* @param message 消息
*/
private void sendMessage(Channel channel, WebSocketMessage message) {
try {
String json = objectMapper.writeValueAsString(message);
channel.writeAndFlush(new TextWebSocketFrame(json));
} catch (Exception e) {
log.error("发送消息失败", e);
}
}

/**
* 发送错误消息
* @param channel 通道
* @param error 错误信息
*/
private void sendErrorMessage(Channel channel, String error) {
WebSocketMessage message = WebSocketMessage.builder()
.type("ERROR")
.data(Map.of("message", error))
.timestamp(System.currentTimeMillis())
.build();

sendMessage(channel, message);
}

/**
* 验证用户
* @param userId 用户ID
* @param token 令牌
* @return 是否有效
*/
private boolean validateUser(String userId, String token) {
// 这里可以实现用户验证逻辑
return userId != null && token != null;
}

/**
* 加入房间
* @param channel 通道
* @param roomId 房间ID
* @param userId 用户ID
*/
private void joinRoom(Channel channel, String roomId, String userId) {
// 实现加入房间逻辑
}

/**
* 离开房间
* @param channel 通道
* @param roomId 房间ID
* @param userId 用户ID
*/
private void leaveRoom(Channel channel, String roomId, String userId) {
// 实现离开房间逻辑
}

/**
* 广播到房间
* @param roomId 房间ID
* @param message 消息
*/
private void broadcastToRoom(String roomId, Object message) {
try {
String json = objectMapper.writeValueAsString(message);
// 实现房间广播逻辑
log.debug("广播到房间: roomId={}, message={}", roomId, json);
} catch (Exception e) {
log.error("广播到房间失败", e);
}
}
}

/**
* WebSocket消息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WebSocketMessage {
private String type;
private Map<String, String> data;
private long timestamp;
}

/**
* 聊天消息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessage {
private String userId;
private String content;
private String roomId;
private long timestamp;
}

6. WebSocket客户端

6.1 WebSocket客户端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
/**
* WebSocket客户端
*/
@Component
public class WebSocketClient {

private final EventLoopGroup group;
private final Bootstrap bootstrap;
private Channel channel;

public WebSocketClient() {
this.group = new NioEventLoopGroup();
this.bootstrap = new Bootstrap();

bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new WebSocketClientInitializer());
}

/**
* 连接到服务器
* @param host 主机
* @param port 端口
* @param path 路径
* @return 连接结果
*/
public boolean connect(String host, int port, String path) {
try {
ChannelFuture future = bootstrap.connect(host, port).sync();
channel = future.channel();

// 发送WebSocket握手请求
sendWebSocketHandshake(path);

log.info("WebSocket客户端连接成功: {}:{}", host, port);
return true;

} catch (Exception e) {
log.error("WebSocket客户端连接失败: {}:{}", host, port, e);
return false;
}
}

/**
* 发送WebSocket握手请求
* @param path 路径
*/
private void sendWebSocketHandshake(String path) {
String key = generateWebSocketKey();

FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path);
request.headers().set(HttpHeaderNames.HOST, "localhost");
request.headers().set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET);
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE);
request.headers().set(HttpHeaderNames.SEC_WEBSOCKET_KEY, key);
request.headers().set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, "13");

channel.writeAndFlush(request);
}

/**
* 生成WebSocket密钥
* @return 密钥
*/
private String generateWebSocketKey() {
byte[] bytes = new byte[16];
new Random().nextBytes(bytes);
return Base64.getEncoder().encodeToString(bytes);
}

/**
* 发送消息
* @param message 消息
*/
public void sendMessage(String message) {
if (channel != null && channel.isActive()) {
channel.writeAndFlush(new TextWebSocketFrame(message));
}
}

/**
* 发送认证消息
* @param userId 用户ID
* @param token 令牌
*/
public void sendAuthMessage(String userId, String token) {
WebSocketMessage message = WebSocketMessage.builder()
.type("AUTH")
.data(Map.of("userId", userId, "token", token))
.timestamp(System.currentTimeMillis())
.build();

try {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(message);
sendMessage(json);
} catch (Exception e) {
log.error("发送认证消息失败", e);
}
}

/**
* 发送聊天消息
* @param userId 用户ID
* @param content 内容
* @param roomId 房间ID
*/
public void sendChatMessage(String userId, String content, String roomId) {
WebSocketMessage message = WebSocketMessage.builder()
.type("CHAT")
.data(Map.of("userId", userId, "content", content, "roomId", roomId))
.timestamp(System.currentTimeMillis())
.build();

try {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(message);
sendMessage(json);
} catch (Exception e) {
log.error("发送聊天消息失败", e);
}
}

/**
* 发送心跳消息
*/
public void sendHeartbeat() {
WebSocketMessage message = WebSocketMessage.builder()
.type("HEARTBEAT")
.data(Map.of("timestamp", String.valueOf(System.currentTimeMillis())))
.timestamp(System.currentTimeMillis())
.build();

try {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(message);
sendMessage(json);
} catch (Exception e) {
log.error("发送心跳消息失败", e);
}
}

/**
* 关闭连接
*/
public void close() {
if (channel != null) {
channel.close();
}
group.shutdownGracefully();
}
}

/**
* WebSocket客户端初始化器
*/
public class WebSocketClientInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

// HTTP编解码器
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(65536));

// WebSocket协议处理器
pipeline.addLast(new WebSocketClientProtocolHandler("ws://localhost:8080/ws"));

// 自定义处理器
pipeline.addLast(new WebSocketClientFrameHandler());
}
}

/**
* WebSocket客户端帧处理器
*/
public class WebSocketClientFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof TextWebSocketFrame) {
String text = ((TextWebSocketFrame) frame).text();
log.info("收到消息: {}", text);

// 处理收到的消息
handleMessage(text);

} else if (frame instanceof PongWebSocketFrame) {
log.debug("收到Pong帧");
} else if (frame instanceof CloseWebSocketFrame) {
log.info("收到关闭帧");
ctx.close();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("WebSocket客户端异常", cause);
ctx.close();
}

/**
* 处理消息
* @param message 消息
*/
private void handleMessage(String message) {
try {
ObjectMapper objectMapper = new ObjectMapper();
WebSocketMessage wsMessage = objectMapper.readValue(message, WebSocketMessage.class);

switch (wsMessage.getType()) {
case "AUTH_SUCCESS":
log.info("认证成功");
break;
case "AUTH_FAILED":
log.warn("认证失败: {}", wsMessage.getData().get("reason"));
break;
case "HEARTBEAT_RESPONSE":
log.debug("收到心跳响应");
break;
case "ERROR":
log.error("收到错误消息: {}", wsMessage.getData().get("message"));
break;
default:
log.info("收到消息: type={}, data={}", wsMessage.getType(), wsMessage.getData());
}

} catch (Exception e) {
log.error("处理消息失败: {}", message, e);
}
}
}

7. WebSocket控制器

7.1 WebSocket控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/**
* WebSocket控制器
*/
@RestController
@RequestMapping("/websocket")
public class WebSocketController {

@Autowired
private NettyWebSocketServer nettyWebSocketServer;

@Autowired
private WebSocketChannelManager channelManager;

/**
* 获取服务器状态
*/
@GetMapping("/status")
public ResponseEntity<Map<String, Object>> getServerStatus() {
try {
ServerStatus status = nettyWebSocketServer.getServerStatus();

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("status", status);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取服务器状态失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取服务器状态失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 获取连接统计
*/
@GetMapping("/stats")
public ResponseEntity<Map<String, Object>> getConnectionStats() {
try {
Map<String, Object> stats = new HashMap<>();
stats.put("activeConnections", channelManager.getActiveConnectionCount());
stats.put("totalConnections", channelManager.getTotalConnectionCount());
stats.put("onlineUsers", channelManager.getOnlineUserCount());

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("stats", stats);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取连接统计失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取连接统计失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 向用户发送消息
*/
@PostMapping("/send/user")
public ResponseEntity<Map<String, Object>> sendMessageToUser(
@RequestParam String userId,
@RequestBody String message) {
try {
channelManager.sendMessageToUser(userId, message);

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("message", "消息发送成功");

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("向用户发送消息失败: userId={}", userId, e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "发送消息失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 广播消息
*/
@PostMapping("/broadcast")
public ResponseEntity<Map<String, Object>> broadcastMessage(@RequestBody String message) {
try {
channelManager.broadcastMessage(message);

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("message", "广播消息成功");

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("广播消息失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "广播消息失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 检查用户是否在线
*/
@GetMapping("/user/{userId}/online")
public ResponseEntity<Map<String, Object>> checkUserOnline(@PathVariable String userId) {
try {
boolean isOnline = channelManager.isUserOnline(userId);

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("isOnline", isOnline);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("检查用户在线状态失败: userId={}", userId, e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "检查用户在线状态失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}
}

8. 总结

通过Netty-WebSocket架构的实现,我们成功构建了一个高性能的实时通信系统。关键特性包括:

8.1 核心优势

  1. WebSocket服务器: 高性能WebSocket服务器实现
  2. 连接管理: 客户端连接管理和状态跟踪
  3. 消息处理: 实时消息处理和路由
  4. 负载均衡: 多服务器负载均衡
  5. 性能优化: 连接池和内存优化

8.2 最佳实践

  1. 连接管理: 高效的连接管理和状态跟踪
  2. 消息路由: 灵活的消息路由和处理机制
  3. 性能优化: 内存池和连接池优化
  4. 错误处理: 完善的异常处理和恢复机制
  5. 监控管理: 全面的连接和性能监控

这套Netty-WebSocket架构方案不仅能够提供高性能的实时通信能力,还包含了连接管理、消息处理、负载均衡等核心功能,是现代实时通信系统的重要基础设施。