1. WebSocket前后端通信概述

WebSocket是一种在单个TCP连接上进行全双工通信的协议,能够实现客户端和服务器之间的实时双向通信。相比传统的HTTP请求-响应模式,WebSocket提供了更高效的实时通信能力。本文将详细介绍WebSocket服务端实现、前端连接、消息管理和实时推送的完整解决方案。

1.1 核心功能

  1. 实时通信: 客户端与服务器双向实时通信
  2. 消息推送: 服务器主动向客户端推送消息
  3. 用户管理: 在线用户管理和状态跟踪
  4. 消息管理: 消息存储、转发、历史记录
  5. 房间管理: 多房间聊天和群组管理

1.2 技术架构

1
2
3
4
5
前端客户端 → WebSocket连接 → 后端服务
↓ ↓ ↓
浏览器 → 连接建立 → 消息处理
↓ ↓ ↓
消息发送 → 实时推送 → 用户管理

2. WebSocket服务端配置

2.1 WebSocket配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/**
* WebSocket配置类
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Value("${websocket.allowed-origins}")
private String allowedOrigins;

@Value("${websocket.stomp-endpoint}")
private String stompEndpoint;

@Value("${websocket.application-destination-prefix}")
private String applicationDestinationPrefix;

@Value("${websocket.broker-destination-prefix}")
private String brokerDestinationPrefix;

/**
* 配置消息代理
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 启用简单消息代理,处理以"/topic"和"/queue"开头的消息
config.enableSimpleBroker(brokerDestinationPrefix, "/queue");

// 设置应用程序目标前缀
config.setApplicationDestinationPrefixes(applicationDestinationPrefix);

// 设置用户目标前缀
config.setUserDestinationPrefix("/user");
}

/**
* 注册STOMP端点
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint(stompEndpoint)
.setAllowedOriginPatterns(allowedOrigins.split(","))
.withSockJS();

log.info("WebSocket端点注册成功: {}", stompEndpoint);
}

/**
* 配置WebSocket传输
*/
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(8192) // 8KB
.setSendBufferSizeLimit(512 * 1024) // 512KB
.setSendTimeLimit(20 * 1000); // 20秒
}

/**
* 配置客户端入站通道
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new WebSocketAuthInterceptor());
}
}

/**
* WebSocket认证拦截器
*/
@Component
public class WebSocketAuthInterceptor implements ChannelInterceptor {

@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

// 处理连接和订阅消息
if (StompCommand.CONNECT.equals(accessor.getCommand()) ||
StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {

// 获取认证信息
String token = accessor.getFirstNativeHeader("Authorization");
if (StringUtils.hasText(token)) {
// 验证token并设置用户信息
String userId = validateToken(token);
if (userId != null) {
accessor.setUser(new WebSocketUser(userId));
}
}
}

return message;
}

/**
* 验证token
* @param token 认证token
* @return 用户ID
*/
private String validateToken(String token) {
try {
// 这里可以实现具体的token验证逻辑
// 例如:JWT token验证
return "user123"; // 示例返回
} catch (Exception e) {
log.error("Token验证失败", e);
return null;
}
}
}

/**
* WebSocket用户
*/
@Data
@AllArgsConstructor
public class WebSocketUser {
private String userId;

public String getName() {
return userId;
}
}

2.2 应用配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# application.yml
websocket:
allowed-origins: http://localhost:3000,http://localhost:8080
stomp-endpoint: /ws
application-destination-prefix: /app
broker-destination-prefix: /topic
heartbeat:
enabled: true
interval: 30000
timeout: 10000
message:
max-size: 8192
send-buffer-size: 524288
send-time-limit: 20000

# 消息配置
message:
storage:
enabled: true
max-history: 1000
expire-days: 30
broadcast:
enabled: true
max-users: 10000

3. WebSocket服务端实现

3.1 WebSocket消息控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
/**
* WebSocket消息控制器
*/
@Controller
public class WebSocketMessageController {

@Autowired
private SimpMessagingTemplate messagingTemplate;

@Autowired
private WebSocketUserService userService;

@Autowired
private WebSocketMessageService messageService;

/**
* 处理用户连接
*/
@EventListener
public void handleWebSocketConnectListener(SessionConnectedEvent event) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = accessor.getSessionId();

log.info("用户连接: sessionId={}", sessionId);

// 记录用户连接
userService.addUser(sessionId, accessor.getUser());
}

/**
* 处理用户断开连接
*/
@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = accessor.getSessionId();

log.info("用户断开连接: sessionId={}", sessionId);

// 移除用户
userService.removeUser(sessionId);
}

/**
* 处理聊天消息
*/
@MessageMapping("/chat/send")
@SendTo("/topic/chat")
public ChatMessage handleChatMessage(ChatMessage message, Principal principal) {
try {
// 设置发送者信息
message.setSenderId(principal.getName());
message.setTimestamp(LocalDateTime.now());
message.setMessageId(UUID.randomUUID().toString());

// 保存消息
messageService.saveMessage(message);

log.debug("处理聊天消息: senderId={}, content={}",
message.getSenderId(), message.getContent());

return message;

} catch (Exception e) {
log.error("处理聊天消息失败", e);
return null;
}
}

/**
* 处理私聊消息
*/
@MessageMapping("/chat/private")
public void handlePrivateMessage(PrivateMessage message, Principal principal) {
try {
// 设置发送者信息
message.setSenderId(principal.getName());
message.setTimestamp(LocalDateTime.now());
message.setMessageId(UUID.randomUUID().toString());

// 保存私聊消息
messageService.savePrivateMessage(message);

// 发送给指定用户
messagingTemplate.convertAndSendToUser(
message.getReceiverId(),
"/queue/private",
message
);

log.debug("处理私聊消息: senderId={}, receiverId={}, content={}",
message.getSenderId(), message.getReceiverId(), message.getContent());

} catch (Exception e) {
log.error("处理私聊消息失败", e);
}
}

/**
* 处理房间消息
*/
@MessageMapping("/room/{roomId}/send")
@SendTo("/topic/room/{roomId}")
public RoomMessage handleRoomMessage(@DestinationVariable String roomId,
RoomMessage message,
Principal principal) {
try {
// 设置消息信息
message.setRoomId(roomId);
message.setSenderId(principal.getName());
message.setTimestamp(LocalDateTime.now());
message.setMessageId(UUID.randomUUID().toString());

// 保存房间消息
messageService.saveRoomMessage(message);

log.debug("处理房间消息: roomId={}, senderId={}, content={}",
roomId, message.getSenderId(), message.getContent());

return message;

} catch (Exception e) {
log.error("处理房间消息失败: roomId={}", roomId, e);
return null;
}
}

/**
* 处理用户加入房间
*/
@MessageMapping("/room/{roomId}/join")
@SendTo("/topic/room/{roomId}")
public SystemMessage handleJoinRoom(@DestinationVariable String roomId,
Principal principal) {
try {
// 用户加入房间
userService.joinRoom(principal.getName(), roomId);

SystemMessage systemMessage = new SystemMessage();
systemMessage.setRoomId(roomId);
systemMessage.setContent(principal.getName() + " 加入了房间");
systemMessage.setTimestamp(LocalDateTime.now());
systemMessage.setType("JOIN");

log.info("用户加入房间: userId={}, roomId={}", principal.getName(), roomId);

return systemMessage;

} catch (Exception e) {
log.error("用户加入房间失败: roomId={}", roomId, e);
return null;
}
}

/**
* 处理用户离开房间
*/
@MessageMapping("/room/{roomId}/leave")
@SendTo("/topic/room/{roomId}")
public SystemMessage handleLeaveRoom(@DestinationVariable String roomId,
Principal principal) {
try {
// 用户离开房间
userService.leaveRoom(principal.getName(), roomId);

SystemMessage systemMessage = new SystemMessage();
systemMessage.setRoomId(roomId);
systemMessage.setContent(principal.getName() + " 离开了房间");
systemMessage.setTimestamp(LocalDateTime.now());
systemMessage.setType("LEAVE");

log.info("用户离开房间: userId={}, roomId={}", principal.getName(), roomId);

return systemMessage;

} catch (Exception e) {
log.error("用户离开房间失败: roomId={}", roomId, e);
return null;
}
}

/**
* 广播系统消息
*/
public void broadcastSystemMessage(String content) {
try {
SystemMessage message = new SystemMessage();
message.setContent(content);
message.setTimestamp(LocalDateTime.now());
message.setType("SYSTEM");

messagingTemplate.convertAndSend("/topic/system", message);

log.info("广播系统消息: content={}", content);

} catch (Exception e) {
log.error("广播系统消息失败", e);
}
}
}

/**
* 聊天消息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessage {
private String messageId;
private String senderId;
private String content;
private LocalDateTime timestamp;
private String type = "CHAT";
}

/**
* 私聊消息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PrivateMessage {
private String messageId;
private String senderId;
private String receiverId;
private String content;
private LocalDateTime timestamp;
private String type = "PRIVATE";
}

/**
* 房间消息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RoomMessage {
private String messageId;
private String roomId;
private String senderId;
private String content;
private LocalDateTime timestamp;
private String type = "ROOM";
}

/**
* 系统消息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SystemMessage {
private String roomId;
private String content;
private LocalDateTime timestamp;
private String type;
}

3.2 WebSocket用户管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/**
* WebSocket用户管理服务
*/
@Service
public class WebSocketUserService {

private final Map<String, WebSocketUserInfo> onlineUsers = new ConcurrentHashMap<>();
private final Map<String, Set<String>> roomUsers = new ConcurrentHashMap<>();

/**
* 添加在线用户
* @param sessionId 会话ID
* @param user 用户信息
*/
public void addUser(String sessionId, Principal user) {
if (user != null) {
WebSocketUserInfo userInfo = WebSocketUserInfo.builder()
.userId(user.getName())
.sessionId(sessionId)
.loginTime(LocalDateTime.now())
.lastActiveTime(LocalDateTime.now())
.status("ONLINE")
.rooms(new HashSet<>())
.build();

onlineUsers.put(user.getName(), userInfo);

log.info("用户上线: userId={}, sessionId={}", user.getName(), sessionId);
}
}

/**
* 移除在线用户
* @param sessionId 会话ID
*/
public void removeUser(String sessionId) {
onlineUsers.values().stream()
.filter(userInfo -> sessionId.equals(userInfo.getSessionId()))
.findFirst()
.ifPresent(userInfo -> {
// 从所有房间中移除用户
userInfo.getRooms().forEach(roomId -> leaveRoom(userInfo.getUserId(), roomId));

onlineUsers.remove(userInfo.getUserId());

log.info("用户下线: userId={}, sessionId={}", userInfo.getUserId(), sessionId);
});
}

/**
* 用户加入房间
* @param userId 用户ID
* @param roomId 房间ID
*/
public void joinRoom(String userId, String roomId) {
WebSocketUserInfo userInfo = onlineUsers.get(userId);
if (userInfo != null) {
userInfo.getRooms().add(roomId);
userInfo.setLastActiveTime(LocalDateTime.now());

roomUsers.computeIfAbsent(roomId, k -> new HashSet<>()).add(userId);

log.debug("用户加入房间: userId={}, roomId={}", userId, roomId);
}
}

/**
* 用户离开房间
* @param userId 用户ID
* @param roomId 房间ID
*/
public void leaveRoom(String userId, String roomId) {
WebSocketUserInfo userInfo = onlineUsers.get(userId);
if (userInfo != null) {
userInfo.getRooms().remove(roomId);
userInfo.setLastActiveTime(LocalDateTime.now());

Set<String> users = roomUsers.get(roomId);
if (users != null) {
users.remove(userId);
if (users.isEmpty()) {
roomUsers.remove(roomId);
}
}

log.debug("用户离开房间: userId={}, roomId={}", userId, roomId);
}
}

/**
* 获取在线用户列表
* @return 在线用户列表
*/
public List<WebSocketUserInfo> getOnlineUsers() {
return new ArrayList<>(onlineUsers.values());
}

/**
* 获取房间用户列表
* @param roomId 房间ID
* @return 房间用户列表
*/
public List<String> getRoomUsers(String roomId) {
Set<String> users = roomUsers.get(roomId);
return users != null ? new ArrayList<>(users) : Collections.emptyList();
}

/**
* 获取在线用户数量
* @return 在线用户数量
*/
public int getOnlineUserCount() {
return onlineUsers.size();
}

/**
* 检查用户是否在线
* @param userId 用户ID
* @return 是否在线
*/
public boolean isUserOnline(String userId) {
return onlineUsers.containsKey(userId);
}

/**
* 更新用户活跃时间
* @param userId 用户ID
*/
public void updateUserActiveTime(String userId) {
WebSocketUserInfo userInfo = onlineUsers.get(userId);
if (userInfo != null) {
userInfo.setLastActiveTime(LocalDateTime.now());
}
}

/**
* 定期清理离线用户
*/
@Scheduled(fixedRate = 300000) // 5分钟执行一次
public void cleanupOfflineUsers() {
LocalDateTime cutoffTime = LocalDateTime.now().minusMinutes(10);

onlineUsers.entrySet().removeIf(entry -> {
WebSocketUserInfo userInfo = entry.getValue();
if (userInfo.getLastActiveTime().isBefore(cutoffTime)) {
log.info("清理离线用户: userId={}", userInfo.getUserId());
return true;
}
return false;
});
}
}

/**
* WebSocket用户信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WebSocketUserInfo {
private String userId;
private String sessionId;
private LocalDateTime loginTime;
private LocalDateTime lastActiveTime;
private String status;
private Set<String> rooms;
}

4. WebSocket消息服务

4.1 WebSocket消息服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
/**
* WebSocket消息服务
*/
@Service
public class WebSocketMessageService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private MessageRepository messageRepository;

/**
* 保存聊天消息
* @param message 聊天消息
*/
public void saveMessage(ChatMessage message) {
try {
// 保存到数据库
MessageEntity entity = MessageEntity.builder()
.messageId(message.getMessageId())
.senderId(message.getSenderId())
.content(message.getContent())
.timestamp(message.getTimestamp())
.type("CHAT")
.build();

messageRepository.save(entity);

// 保存到Redis缓存
String key = "message:chat:" + message.getMessageId();
redisTemplate.opsForValue().set(key, message, Duration.ofDays(7));

log.debug("保存聊天消息: messageId={}", message.getMessageId());

} catch (Exception e) {
log.error("保存聊天消息失败: messageId={}", message.getMessageId(), e);
}
}

/**
* 保存私聊消息
* @param message 私聊消息
*/
public void savePrivateMessage(PrivateMessage message) {
try {
// 保存到数据库
MessageEntity entity = MessageEntity.builder()
.messageId(message.getMessageId())
.senderId(message.getSenderId())
.receiverId(message.getReceiverId())
.content(message.getContent())
.timestamp(message.getTimestamp())
.type("PRIVATE")
.build();

messageRepository.save(entity);

// 保存到Redis缓存
String key = "message:private:" + message.getMessageId();
redisTemplate.opsForValue().set(key, message, Duration.ofDays(7));

log.debug("保存私聊消息: messageId={}", message.getMessageId());

} catch (Exception e) {
log.error("保存私聊消息失败: messageId={}", message.getMessageId(), e);
}
}

/**
* 保存房间消息
* @param message 房间消息
*/
public void saveRoomMessage(RoomMessage message) {
try {
// 保存到数据库
MessageEntity entity = MessageEntity.builder()
.messageId(message.getMessageId())
.roomId(message.getRoomId())
.senderId(message.getSenderId())
.content(message.getContent())
.timestamp(message.getTimestamp())
.type("ROOM")
.build();

messageRepository.save(entity);

// 保存到Redis缓存
String key = "message:room:" + message.getMessageId();
redisTemplate.opsForValue().set(key, message, Duration.ofDays(7));

log.debug("保存房间消息: messageId={}", message.getMessageId());

} catch (Exception e) {
log.error("保存房间消息失败: messageId={}", message.getMessageId(), e);
}
}

/**
* 获取聊天历史消息
* @param limit 限制数量
* @return 历史消息列表
*/
public List<ChatMessage> getChatHistory(int limit) {
try {
List<MessageEntity> entities = messageRepository.findByTypeOrderByTimestampDesc("CHAT",
PageRequest.of(0, limit));

return entities.stream()
.map(entity -> new ChatMessage(
entity.getMessageId(),
entity.getSenderId(),
entity.getContent(),
entity.getTimestamp()
))
.collect(Collectors.toList());

} catch (Exception e) {
log.error("获取聊天历史消息失败", e);
return Collections.emptyList();
}
}

/**
* 获取私聊历史消息
* @param senderId 发送者ID
* @param receiverId 接收者ID
* @param limit 限制数量
* @return 历史消息列表
*/
public List<PrivateMessage> getPrivateHistory(String senderId, String receiverId, int limit) {
try {
List<MessageEntity> entities = messageRepository.findBySenderIdAndReceiverIdOrderByTimestampDesc(
senderId, receiverId, PageRequest.of(0, limit));

return entities.stream()
.map(entity -> new PrivateMessage(
entity.getMessageId(),
entity.getSenderId(),
entity.getReceiverId(),
entity.getContent(),
entity.getTimestamp()
))
.collect(Collectors.toList());

} catch (Exception e) {
log.error("获取私聊历史消息失败: senderId={}, receiverId={}", senderId, receiverId, e);
return Collections.emptyList();
}
}

/**
* 获取房间历史消息
* @param roomId 房间ID
* @param limit 限制数量
* @return 历史消息列表
*/
public List<RoomMessage> getRoomHistory(String roomId, int limit) {
try {
List<MessageEntity> entities = messageRepository.findByRoomIdOrderByTimestampDesc(
roomId, PageRequest.of(0, limit));

return entities.stream()
.map(entity -> new RoomMessage(
entity.getMessageId(),
entity.getRoomId(),
entity.getSenderId(),
entity.getContent(),
entity.getTimestamp()
))
.collect(Collectors.toList());

} catch (Exception e) {
log.error("获取房间历史消息失败: roomId={}", roomId, e);
return Collections.emptyList();
}
}
}

/**
* 消息实体
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "messages")
public class MessageEntity {
@Id
private String messageId;
private String senderId;
private String receiverId;
private String roomId;
private String content;
private LocalDateTime timestamp;
private String type;
}

/**
* 消息仓库
*/
@Repository
public interface MessageRepository extends JpaRepository<MessageEntity, String> {

/**
* 根据类型获取消息
*/
List<MessageEntity> findByTypeOrderByTimestampDesc(String type, Pageable pageable);

/**
* 根据发送者和接收者获取私聊消息
*/
List<MessageEntity> findBySenderIdAndReceiverIdOrderByTimestampDesc(
String senderId, String receiverId, Pageable pageable);

/**
* 根据房间ID获取消息
*/
List<MessageEntity> findByRoomIdOrderByTimestampDesc(String roomId, Pageable pageable);
}

5. 前端WebSocket实现

5.1 JavaScript WebSocket客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
/**
* WebSocket客户端
*/
class WebSocketClient {
constructor(url, options = {}) {
this.url = url;
this.options = options;
this.stompClient = null;
this.connected = false;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectInterval = 5000;

this.messageHandlers = new Map();
this.connectionHandlers = [];
this.disconnectionHandlers = [];
}

/**
* 连接WebSocket
*/
connect() {
try {
const socket = new SockJS(this.url);
this.stompClient = Stomp.over(socket);

// 设置调试信息
this.stompClient.debug = (str) => {
console.log('STOMP: ' + str);
};

// 连接选项
const connectOptions = {
...this.options,
onConnect: (frame) => {
console.log('WebSocket连接成功:', frame);
this.connected = true;
this.reconnectAttempts = 0;

// 触发连接处理器
this.connectionHandlers.forEach(handler => handler(frame));
},
onStompError: (frame) => {
console.error('STOMP错误:', frame);
this.handleConnectionError();
},
onWebSocketClose: (event) => {
console.log('WebSocket连接关闭:', event);
this.connected = false;
this.handleDisconnection();
}
};

this.stompClient.connect(connectOptions);

} catch (error) {
console.error('WebSocket连接失败:', error);
this.handleConnectionError();
}
}

/**
* 断开连接
*/
disconnect() {
if (this.stompClient && this.connected) {
this.stompClient.disconnect(() => {
console.log('WebSocket连接已断开');
this.connected = false;
});
}
}

/**
* 订阅主题
* @param destination 目标地址
* @param callback 回调函数
*/
subscribe(destination, callback) {
if (this.stompClient && this.connected) {
const subscription = this.stompClient.subscribe(destination, (message) => {
try {
const data = JSON.parse(message.body);
callback(data);
} catch (error) {
console.error('解析消息失败:', error);
}
});

this.messageHandlers.set(destination, subscription);
console.log('订阅主题:', destination);

return subscription;
} else {
console.warn('WebSocket未连接,无法订阅主题:', destination);
return null;
}
}

/**
* 取消订阅
* @param destination 目标地址
*/
unsubscribe(destination) {
const subscription = this.messageHandlers.get(destination);
if (subscription) {
subscription.unsubscribe();
this.messageHandlers.delete(destination);
console.log('取消订阅主题:', destination);
}
}

/**
* 发送消息
* @param destination 目标地址
* @param message 消息内容
*/
send(destination, message) {
if (this.stompClient && this.connected) {
this.stompClient.send(destination, {}, JSON.stringify(message));
console.log('发送消息:', destination, message);
} else {
console.warn('WebSocket未连接,无法发送消息:', destination);
}
}

/**
* 发送聊天消息
* @param content 消息内容
*/
sendChatMessage(content) {
const message = {
content: content,
timestamp: new Date().toISOString()
};
this.send('/app/chat/send', message);
}

/**
* 发送私聊消息
* @param receiverId 接收者ID
* @param content 消息内容
*/
sendPrivateMessage(receiverId, content) {
const message = {
receiverId: receiverId,
content: content,
timestamp: new Date().toISOString()
};
this.send('/app/chat/private', message);
}

/**
* 发送房间消息
* @param roomId 房间ID
* @param content 消息内容
*/
sendRoomMessage(roomId, content) {
const message = {
content: content,
timestamp: new Date().toISOString()
};
this.send(`/app/room/${roomId}/send`, message);
}

/**
* 加入房间
* @param roomId 房间ID
*/
joinRoom(roomId) {
this.send(`/app/room/${roomId}/join`, {});
}

/**
* 离开房间
* @param roomId 房间ID
*/
leaveRoom(roomId) {
this.send(`/app/room/${roomId}/leave`, {});
}

/**
* 添加连接处理器
* @param handler 处理器函数
*/
onConnect(handler) {
this.connectionHandlers.push(handler);
}

/**
* 添加断开连接处理器
* @param handler 处理器函数
*/
onDisconnect(handler) {
this.disconnectionHandlers.push(handler);
}

/**
* 处理连接错误
*/
handleConnectionError() {
this.connected = false;

if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);

setTimeout(() => {
this.connect();
}, this.reconnectInterval);
} else {
console.error('达到最大重连次数,停止重连');
}
}

/**
* 处理断开连接
*/
handleDisconnection() {
this.disconnectionHandlers.forEach(handler => handler());
}

/**
* 检查连接状态
* @returns {boolean} 是否已连接
*/
isConnected() {
return this.connected;
}
}

// 使用示例
const wsClient = new WebSocketClient('/ws', {
// 认证头
Authorization: 'Bearer your-token-here'
});

// 连接WebSocket
wsClient.connect();

// 监听连接事件
wsClient.onConnect((frame) => {
console.log('WebSocket连接成功');

// 订阅聊天消息
wsClient.subscribe('/topic/chat', (message) => {
console.log('收到聊天消息:', message);
displayChatMessage(message);
});

// 订阅私聊消息
wsClient.subscribe('/user/queue/private', (message) => {
console.log('收到私聊消息:', message);
displayPrivateMessage(message);
});

// 订阅系统消息
wsClient.subscribe('/topic/system', (message) => {
console.log('收到系统消息:', message);
displaySystemMessage(message);
});
});

// 监听断开连接事件
wsClient.onDisconnect(() => {
console.log('WebSocket连接断开');
showConnectionStatus('disconnected');
});

// 发送消息示例
function sendMessage() {
const input = document.getElementById('messageInput');
const content = input.value.trim();

if (content && wsClient.isConnected()) {
wsClient.sendChatMessage(content);
input.value = '';
}
}

// 显示消息
function displayChatMessage(message) {
const messagesContainer = document.getElementById('messages');
const messageElement = document.createElement('div');
messageElement.className = 'message';
messageElement.innerHTML = `
<div class="sender">${message.senderId}</div>
<div class="content">${message.content}</div>
<div class="timestamp">${new Date(message.timestamp).toLocaleString()}</div>
`;
messagesContainer.appendChild(messageElement);
messagesContainer.scrollTop = messagesContainer.scrollHeight;
}

// 显示系统消息
function displaySystemMessage(message) {
const messagesContainer = document.getElementById('messages');
const messageElement = document.createElement('div');
messageElement.className = 'system-message';
messageElement.innerHTML = `
<div class="content">${message.content}</div>
<div class="timestamp">${new Date(message.timestamp).toLocaleString()}</div>
`;
messagesContainer.appendChild(messageElement);
messagesContainer.scrollTop = messagesContainer.scrollHeight;
}

// 显示连接状态
function showConnectionStatus(status) {
const statusElement = document.getElementById('connectionStatus');
statusElement.textContent = status === 'connected' ? '已连接' : '已断开';
statusElement.className = status;
}

6. WebSocket管理控制器

6.1 WebSocket管理控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
/**
* WebSocket管理控制器
*/
@RestController
@RequestMapping("/api/websocket")
public class WebSocketManagementController {

@Autowired
private WebSocketUserService userService;

@Autowired
private WebSocketMessageService messageService;

@Autowired
private SimpMessagingTemplate messagingTemplate;

/**
* 获取在线用户列表
*/
@GetMapping("/users/online")
public ResponseEntity<List<WebSocketUserInfo>> getOnlineUsers() {
try {
List<WebSocketUserInfo> users = userService.getOnlineUsers();
return ResponseEntity.ok(users);

} catch (Exception e) {
log.error("获取在线用户列表失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);
}
}

/**
* 获取在线用户数量
*/
@GetMapping("/users/count")
public ResponseEntity<Map<String, Object>> getOnlineUserCount() {
try {
int count = userService.getOnlineUserCount();

Map<String, Object> result = new HashMap<>();
result.put("count", count);
result.put("timestamp", LocalDateTime.now());

return ResponseEntity.ok(result);

} catch (Exception e) {
log.error("获取在线用户数量失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);
}
}

/**
* 获取房间用户列表
*/
@GetMapping("/room/{roomId}/users")
public ResponseEntity<List<String>> getRoomUsers(@PathVariable String roomId) {
try {
List<String> users = userService.getRoomUsers(roomId);
return ResponseEntity.ok(users);

} catch (Exception e) {
log.error("获取房间用户列表失败: roomId={}", roomId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);
}
}

/**
* 获取聊天历史消息
*/
@GetMapping("/messages/chat/history")
public ResponseEntity<List<ChatMessage>> getChatHistory(
@RequestParam(defaultValue = "50") int limit) {
try {
List<ChatMessage> messages = messageService.getChatHistory(limit);
return ResponseEntity.ok(messages);

} catch (Exception e) {
log.error("获取聊天历史消息失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);
}
}

/**
* 获取私聊历史消息
*/
@GetMapping("/messages/private/history")
public ResponseEntity<List<PrivateMessage>> getPrivateHistory(
@RequestParam String senderId,
@RequestParam String receiverId,
@RequestParam(defaultValue = "50") int limit) {
try {
List<PrivateMessage> messages = messageService.getPrivateHistory(
senderId, receiverId, limit);
return ResponseEntity.ok(messages);

} catch (Exception e) {
log.error("获取私聊历史消息失败: senderId={}, receiverId={}", senderId, receiverId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);
}
}

/**
* 获取房间历史消息
*/
@GetMapping("/messages/room/{roomId}/history")
public ResponseEntity<List<RoomMessage>> getRoomHistory(
@PathVariable String roomId,
@RequestParam(defaultValue = "50") int limit) {
try {
List<RoomMessage> messages = messageService.getRoomHistory(roomId, limit);
return ResponseEntity.ok(messages);

} catch (Exception e) {
log.error("获取房间历史消息失败: roomId={}", roomId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);
}
}

/**
* 广播系统消息
*/
@PostMapping("/broadcast/system")
public ResponseEntity<Map<String, Object>> broadcastSystemMessage(
@RequestParam String content) {
try {
SystemMessage message = new SystemMessage();
message.setContent(content);
message.setTimestamp(LocalDateTime.now());
message.setType("SYSTEM");

messagingTemplate.convertAndSend("/topic/system", message);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "系统消息广播成功");
result.put("timestamp", LocalDateTime.now());

return ResponseEntity.ok(result);

} catch (Exception e) {
log.error("广播系统消息失败", e);

Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("message", "系统消息广播失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result);
}
}

/**
* 发送消息给指定用户
*/
@PostMapping("/send/user/{userId}")
public ResponseEntity<Map<String, Object>> sendMessageToUser(
@PathVariable String userId,
@RequestParam String content) {
try {
if (!userService.isUserOnline(userId)) {
Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("message", "用户不在线");
return ResponseEntity.badRequest().body(result);
}

SystemMessage message = new SystemMessage();
message.setContent(content);
message.setTimestamp(LocalDateTime.now());
message.setType("ADMIN");

messagingTemplate.convertAndSendToUser(userId, "/queue/admin", message);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "消息发送成功");
result.put("timestamp", LocalDateTime.now());

return ResponseEntity.ok(result);

} catch (Exception e) {
log.error("发送消息给用户失败: userId={}", userId, e);

Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("message", "消息发送失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result);
}
}
}

7. 总结

通过WebSocket前后端通信的实现,我们成功构建了一个完整的实时通信系统。关键特性包括:

7.1 核心优势

  1. 实时通信: 客户端与服务器双向实时通信
  2. 消息推送: 服务器主动向客户端推送消息
  3. 用户管理: 在线用户管理和状态跟踪
  4. 消息管理: 消息存储、转发、历史记录
  5. 房间管理: 多房间聊天和群组管理

7.2 最佳实践

  1. 连接管理: 完善的连接建立和断开处理
  2. 消息处理: 可靠的消息发送和接收机制
  3. 用户管理: 在线用户状态跟踪和管理
  4. 错误处理: 完善的异常处理和重连机制
  5. 性能优化: 消息缓存和批量处理

这套WebSocket前后端通信方案不仅能够提供高效的实时通信能力,还包含了完善的用户管理、消息管理和错误处理机制,是现代实时应用的重要基础设施。