设备报警数据处理Java微服务后端架构实战

1. 架构概述

设备报警数据处理系统是物联网平台的关键模块,需要支持高并发、低延迟的报警信息查询和实时报警推送。本篇文章将深入讲解如何基于Java微服务架构实现一个高性能、高可用的设备报警数据处理系统。

1.1 系统架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
管理端 → 管理网关 → 设备服务 → 缓存/数据库

身份认证

读取缓存、数据库准报警信息

返回请求结果

设备服务 → 消息队列 → 设备服务

报警信息处理及持久化数据库

推送管理用户报警信息

1.2 核心组件

  • 管理网关(Management Gateway):负责管理用户请求的接入、身份认证、请求路由
  • 设备服务(Device Service):负责设备报警信息管理、报警数据处理、消息队列消费
  • 消息队列(Kafka/RocketMQ):异步处理设备报警信息推送
  • 缓存层(Redis):存储设备准实时报警信息
  • 数据库(MySQL/MongoDB):持久化设备报警历史数据
  • WebSocket/SSE:实时推送报警信息给管理用户

2. 管理网关服务实现

2.1 网关服务核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/**
* 管理网关服务
* 负责管理用户请求的接入、身份认证、请求路由
*/
@RestController
@RequestMapping("/api/management/gateway")
@Slf4j
public class ManagementGatewayController {

@Autowired
private AuthService authService;

@Autowired
private DeviceServiceClient deviceServiceClient;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 管理用户报警信息请求
* 流程:身份认证 → 读取缓存/数据库 → 返回结果
*/
@PostMapping("/alarm/info")
public Result<DeviceAlarmInfo> getDeviceAlarmInfo(
@RequestHeader("Authorization") String token,
@RequestBody DeviceAlarmRequest request) {

try {
// 1. 身份认证
UserInfo userInfo = authService.authenticate(token);
if (userInfo == null) {
return Result.error("身份认证失败");
}

// 2. 权限校验
if (!authService.hasPermission(userInfo.getUserId(), request.getDeviceId())) {
return Result.error("无权限访问该设备");
}

// 3. 读取缓存、数据库准报警信息
DeviceAlarmInfo alarmInfo = getDeviceAlarmInfo(request.getDeviceId(),
request.getAlarmId());

// 4. 返回请求结果
return Result.success(alarmInfo);

} catch (Exception e) {
log.error("获取设备报警信息失败: deviceId={}, alarmId={}, error={}",
request.getDeviceId(), request.getAlarmId(), e.getMessage(), e);
return Result.error("获取设备报警信息失败");
}
}

/**
* 查询设备报警列表
*/
@PostMapping("/alarm/list")
public Result<PageResult<DeviceAlarmInfo>> getDeviceAlarmList(
@RequestHeader("Authorization") String token,
@RequestBody DeviceAlarmQueryRequest request) {

try {
// 1. 身份认证
UserInfo userInfo = authService.authenticate(token);
if (userInfo == null) {
return Result.error("身份认证失败");
}

// 2. 读取缓存、数据库准报警信息
PageResult<DeviceAlarmInfo> alarmList = getDeviceAlarmList(request);

return Result.success(alarmList);

} catch (Exception e) {
log.error("查询设备报警列表失败: error={}", e.getMessage(), e);
return Result.error("查询设备报警列表失败");
}
}

/**
* 读取缓存、数据库准报警信息
* 优先从缓存读取,缓存未命中则查询数据库
*/
private DeviceAlarmInfo getDeviceAlarmInfo(String deviceId, String alarmId) {
// 1. 先查Redis缓存(准实时报警信息)
String cacheKey = "device:alarm:" + deviceId + ":" + alarmId;
DeviceAlarmInfo alarmInfo = (DeviceAlarmInfo)
redisTemplate.opsForValue().get(cacheKey);

if (alarmInfo != null) {
log.debug("从缓存获取报警信息: deviceId={}, alarmId={}", deviceId, alarmId);
return alarmInfo;
}

// 2. 缓存未命中,调用设备服务查询数据库
log.debug("缓存未命中,查询数据库: deviceId={}, alarmId={}", deviceId, alarmId);
alarmInfo = deviceServiceClient.getDeviceAlarmInfo(deviceId, alarmId);

// 3. 将查询结果写入缓存(设置过期时间10分钟)
if (alarmInfo != null) {
redisTemplate.opsForValue().set(cacheKey, alarmInfo, 10, TimeUnit.MINUTES);
}

return alarmInfo;
}

/**
* 查询设备报警列表
*/
private PageResult<DeviceAlarmInfo> getDeviceAlarmList(DeviceAlarmQueryRequest request) {
// 1. 先查Redis缓存(最近报警列表)
String cacheKey = "device:alarm:list:" + request.getDeviceId() + ":" +
request.getPageNum() + ":" + request.getPageSize();
PageResult<DeviceAlarmInfo> alarmList = (PageResult<DeviceAlarmInfo>)
redisTemplate.opsForValue().get(cacheKey);

if (alarmList != null) {
log.debug("从缓存获取报警列表: deviceId={}", request.getDeviceId());
return alarmList;
}

// 2. 缓存未命中,调用设备服务查询数据库
alarmList = deviceServiceClient.getDeviceAlarmList(request);

// 3. 将查询结果写入缓存(设置过期时间5分钟)
if (alarmList != null) {
redisTemplate.opsForValue().set(cacheKey, alarmList, 5, TimeUnit.MINUTES);
}

return alarmList;
}
}

2.2 身份认证服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* 身份认证服务
*/
@Service
@Slf4j
public class AuthService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private UserServiceClient userServiceClient;

/**
* 身份认证
* 验证Token有效性
*/
public UserInfo authenticate(String token) {
try {
// 1. 从Token中解析用户信息
String userId = parseToken(token);
if (StringUtils.isEmpty(userId)) {
return null;
}

// 2. 从缓存中获取用户信息
String userCacheKey = "user:info:" + userId;
UserInfo userInfo = (UserInfo) redisTemplate.opsForValue().get(userCacheKey);

if (userInfo != null) {
return userInfo;
}

// 3. 缓存未命中,调用用户服务查询
userInfo = userServiceClient.getUserInfo(userId);

// 4. 将用户信息写入缓存
if (userInfo != null) {
redisTemplate.opsForValue().set(userCacheKey, userInfo, 30, TimeUnit.MINUTES);
}

return userInfo;

} catch (Exception e) {
log.error("身份认证失败: token={}, error={}", token, e.getMessage(), e);
return null;
}
}

/**
* 权限校验
* 检查用户是否有权限访问指定设备
*/
public boolean hasPermission(String userId, String deviceId) {
try {
// 1. 检查用户设备权限缓存
String permissionKey = "user:device:permission:" + userId + ":" + deviceId;
Boolean hasPermission = (Boolean) redisTemplate.opsForValue().get(permissionKey);

if (hasPermission != null) {
return hasPermission;
}

// 2. 调用权限服务查询
hasPermission = userServiceClient.checkDevicePermission(userId, deviceId);

// 3. 将权限信息写入缓存
redisTemplate.opsForValue().set(permissionKey, hasPermission, 10, TimeUnit.MINUTES);

return hasPermission;

} catch (Exception e) {
log.error("权限校验失败: userId={}, deviceId={}, error={}",
userId, deviceId, e.getMessage(), e);
return false;
}
}

/**
* 解析Token
*/
private String parseToken(String token) {
// JWT Token解析逻辑
if (StringUtils.isEmpty(token) || !token.startsWith("Bearer ")) {
return null;
}

String jwtToken = token.substring(7);
// 解析JWT获取userId
return extractUserIdFromJWT(jwtToken);
}

private String extractUserIdFromJWT(String jwtToken) {
// JWT解析实现
// 实际实现需要使用JWT库
return "user123";
}
}

3. 设备服务实现

3.1 设备服务核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
/**
* 设备服务
* 负责设备报警信息管理、报警数据处理、消息队列消费
*/
@Service
@Slf4j
public class DeviceAlarmService {

@Autowired
private DeviceAlarmMapper deviceAlarmMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private AlarmPushService alarmPushService;

/**
* 推送设备报警信息消息队列
* 设备服务接收报警信息,发送到消息队列
*/
public void pushDeviceAlarmToQueue(DeviceAlarmMessage alarmMessage) {
try {
// 1. 发送到Kafka消息队列
String topic = "device.alarm";
String key = alarmMessage.getDeviceId();
String value = JSON.toJSONString(alarmMessage);

kafkaTemplate.send(topic, key, value);

log.info("设备报警信息已发送到消息队列: deviceId={}, alarmId={}",
alarmMessage.getDeviceId(), alarmMessage.getAlarmId());

} catch (Exception e) {
log.error("推送设备报警信息到消息队列失败: deviceId={}, error={}",
alarmMessage.getDeviceId(), e.getMessage(), e);
}
}

/**
* 消费设备报警信息消息队列
* 从Kafka消费报警信息,进行处理和持久化
*/
@KafkaListener(topics = "device.alarm", groupId = "device-alarm-group")
public void consumeDeviceAlarm(String message) {
try {
// 1. 解析消息
DeviceAlarmMessage alarmMessage = JSON.parseObject(message, DeviceAlarmMessage.class);

// 2. 报警信息处理及持久化数据库
processAndPersistAlarm(alarmMessage);

// 3. 推送管理用户报警信息
pushAlarmToManagementUsers(alarmMessage);

log.info("设备报警信息处理完成: deviceId={}, alarmId={}",
alarmMessage.getDeviceId(), alarmMessage.getAlarmId());

} catch (Exception e) {
log.error("消费设备报警信息失败: error={}", e.getMessage(), e);
}
}

/**
* 报警信息处理及持久化数据库
*/
private void processAndPersistAlarm(DeviceAlarmMessage alarmMessage) {
try {
// 1. 构建报警记录
DeviceAlarmRecord alarmRecord = buildAlarmRecord(alarmMessage);

// 2. 保存到数据库
deviceAlarmMapper.insert(alarmRecord);

// 3. 更新缓存
updateAlarmCache(alarmRecord);

// 4. 更新报警统计
updateAlarmStatistics(alarmRecord);

log.info("报警信息已持久化: deviceId={}, alarmId={}",
alarmMessage.getDeviceId(), alarmMessage.getAlarmId());

} catch (Exception e) {
log.error("报警信息处理失败: deviceId={}, error={}",
alarmMessage.getDeviceId(), e.getMessage(), e);
}
}

/**
* 构建报警记录
*/
private DeviceAlarmRecord buildAlarmRecord(DeviceAlarmMessage alarmMessage) {
DeviceAlarmRecord record = new DeviceAlarmRecord();
record.setAlarmId(UUID.randomUUID().toString());
record.setDeviceId(alarmMessage.getDeviceId());
record.setAlarmType(alarmMessage.getAlarmType());
record.setAlarmLevel(alarmMessage.getAlarmLevel());
record.setAlarmMessage(alarmMessage.getAlarmMessage());
record.setAlarmData(alarmMessage.getAlarmData());
record.setStatus("UNHANDLED"); // 未处理
record.setCreateTime(new Date());
record.setTimestamp(alarmMessage.getTimestamp());
return record;
}

/**
* 更新报警缓存
*/
private void updateAlarmCache(DeviceAlarmRecord alarmRecord) {
// 1. 更新单个报警信息缓存
String cacheKey = "device:alarm:" + alarmRecord.getDeviceId() + ":" +
alarmRecord.getAlarmId();
DeviceAlarmInfo alarmInfo = convertToAlarmInfo(alarmRecord);
redisTemplate.opsForValue().set(cacheKey, alarmInfo, 10, TimeUnit.MINUTES);

// 2. 更新设备最新报警列表缓存
String listCacheKey = "device:alarm:latest:" + alarmRecord.getDeviceId();
List<DeviceAlarmInfo> latestAlarms = getLatestAlarms(alarmRecord.getDeviceId());
latestAlarms.add(0, alarmInfo); // 添加到列表头部
if (latestAlarms.size() > 100) {
latestAlarms = latestAlarms.subList(0, 100); // 只保留最近100条
}
redisTemplate.opsForValue().set(listCacheKey, latestAlarms, 5, TimeUnit.MINUTES);
}

/**
* 更新报警统计
*/
private void updateAlarmStatistics(DeviceAlarmRecord alarmRecord) {
// 1. 更新设备报警统计
String statsKey = "device:alarm:stats:" + alarmRecord.getDeviceId();
DeviceAlarmStats stats = (DeviceAlarmStats) redisTemplate.opsForValue().get(statsKey);
if (stats == null) {
stats = new DeviceAlarmStats();
stats.setDeviceId(alarmRecord.getDeviceId());
}

// 更新统计信息
stats.setTotalCount(stats.getTotalCount() + 1);
if ("CRITICAL".equals(alarmRecord.getAlarmLevel())) {
stats.setCriticalCount(stats.getCriticalCount() + 1);
} else if ("WARNING".equals(alarmRecord.getAlarmLevel())) {
stats.setWarningCount(stats.getWarningCount() + 1);
}
stats.setLastAlarmTime(new Date());

redisTemplate.opsForValue().set(statsKey, stats, 1, TimeUnit.HOURS);
}

/**
* 推送管理用户报警信息
* 实时推送报警信息给管理用户
*/
private void pushAlarmToManagementUsers(DeviceAlarmMessage alarmMessage) {
try {
// 1. 获取需要推送的管理用户列表
List<String> userIds = getManagementUserIds(alarmMessage.getDeviceId());

// 2. 构建推送消息
AlarmPushMessage pushMessage = buildAlarmPushMessage(alarmMessage);

// 3. 推送给每个管理用户
for (String userId : userIds) {
alarmPushService.pushAlarmToUser(userId, pushMessage);
}

log.info("报警信息已推送给管理用户: deviceId={}, userIds={}",
alarmMessage.getDeviceId(), userIds);

} catch (Exception e) {
log.error("推送管理用户报警信息失败: deviceId={}, error={}",
alarmMessage.getDeviceId(), e.getMessage(), e);
}
}

/**
* 获取管理用户ID列表
*/
private List<String> getManagementUserIds(String deviceId) {
// 1. 从缓存获取
String cacheKey = "device:management:users:" + deviceId;
List<String> userIds = (List<String>) redisTemplate.opsForValue().get(cacheKey);

if (userIds != null) {
return userIds;
}

// 2. 从数据库查询
userIds = deviceAlarmMapper.selectManagementUserIds(deviceId);

// 3. 写入缓存
if (userIds != null && !userIds.isEmpty()) {
redisTemplate.opsForValue().set(cacheKey, userIds, 30, TimeUnit.MINUTES);
}

return userIds != null ? userIds : new ArrayList<>();
}

/**
* 构建报警推送消息
*/
private AlarmPushMessage buildAlarmPushMessage(DeviceAlarmMessage alarmMessage) {
AlarmPushMessage pushMessage = new AlarmPushMessage();
pushMessage.setAlarmId(alarmMessage.getAlarmId());
pushMessage.setDeviceId(alarmMessage.getDeviceId());
pushMessage.setAlarmType(alarmMessage.getAlarmType());
pushMessage.setAlarmLevel(alarmMessage.getAlarmLevel());
pushMessage.setAlarmMessage(alarmMessage.getAlarmMessage());
pushMessage.setTimestamp(alarmMessage.getTimestamp());
return pushMessage;
}

/**
* 获取设备实时信息
* 优先从缓存读取,未命中则查询数据库
*/
public DeviceAlarmInfo getDeviceAlarmInfo(String deviceId, String alarmId) {
// 1. 先查Redis缓存
String cacheKey = "device:alarm:" + deviceId + ":" + alarmId;
DeviceAlarmInfo alarmInfo = (DeviceAlarmInfo)
redisTemplate.opsForValue().get(cacheKey);

if (alarmInfo != null) {
return alarmInfo;
}

// 2. Redis缓存未命中,查询数据库
DeviceAlarmRecord record = deviceAlarmMapper.selectByAlarmId(alarmId);
if (record != null) {
alarmInfo = convertToAlarmInfo(record);

// 3. 将查询结果写入缓存
redisTemplate.opsForValue().set(cacheKey, alarmInfo, 10, TimeUnit.MINUTES);

return alarmInfo;
}

return null;
}

/**
* 查询设备报警列表
*/
public PageResult<DeviceAlarmInfo> getDeviceAlarmList(DeviceAlarmQueryRequest request) {
// 1. 构建查询条件
QueryWrapper<DeviceAlarmRecord> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("device_id", request.getDeviceId());
if (StringUtils.isNotEmpty(request.getAlarmType())) {
queryWrapper.eq("alarm_type", request.getAlarmType());
}
if (StringUtils.isNotEmpty(request.getAlarmLevel())) {
queryWrapper.eq("alarm_level", request.getAlarmLevel());
}
if (StringUtils.isNotEmpty(request.getStatus())) {
queryWrapper.eq("status", request.getStatus());
}
queryWrapper.orderByDesc("create_time");

// 2. 分页查询
Page<DeviceAlarmRecord> page = new Page<>(request.getPageNum(), request.getPageSize());
Page<DeviceAlarmRecord> pageResult = deviceAlarmMapper.selectPage(page, queryWrapper);

// 3. 转换为返回对象
List<DeviceAlarmInfo> alarmInfoList = pageResult.getRecords().stream()
.map(this::convertToAlarmInfo)
.collect(Collectors.toList());

PageResult<DeviceAlarmInfo> result = new PageResult<>();
result.setList(alarmInfoList);
result.setTotal(pageResult.getTotal());
result.setPageNum(request.getPageNum());
result.setPageSize(request.getPageSize());

return result;
}

/**
* 获取最新报警列表
*/
private List<DeviceAlarmInfo> getLatestAlarms(String deviceId) {
String listCacheKey = "device:alarm:latest:" + deviceId;
List<DeviceAlarmInfo> latestAlarms = (List<DeviceAlarmInfo>)
redisTemplate.opsForValue().get(listCacheKey);

if (latestAlarms == null) {
latestAlarms = new ArrayList<>();
}

return latestAlarms;
}

/**
* 转换报警记录为报警信息
*/
private DeviceAlarmInfo convertToAlarmInfo(DeviceAlarmRecord record) {
DeviceAlarmInfo info = new DeviceAlarmInfo();
info.setAlarmId(record.getAlarmId());
info.setDeviceId(record.getDeviceId());
info.setAlarmType(record.getAlarmType());
info.setAlarmLevel(record.getAlarmLevel());
info.setAlarmMessage(record.getAlarmMessage());
info.setAlarmData(record.getAlarmData());
info.setStatus(record.getStatus());
info.setTimestamp(record.getTimestamp());
info.setCreateTime(record.getCreateTime());
return info;
}
}

3.2 报警推送服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
* 报警推送服务
* 负责实时推送报警信息给管理用户
*/
@Service
@Slf4j
public class AlarmPushService {

@Autowired
private WebSocketService webSocketService;

@Autowired
private SSEService sseService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 推送报警信息给用户
*/
public void pushAlarmToUser(String userId, AlarmPushMessage pushMessage) {
try {
// 1. 检查用户是否在线
if (!isUserOnline(userId)) {
// 用户不在线,存储到离线消息队列
saveOfflineMessage(userId, pushMessage);
return;
}

// 2. 通过WebSocket推送
webSocketService.sendMessageToUser(userId, pushMessage);

// 3. 通过SSE推送(备用)
sseService.sendMessageToUser(userId, pushMessage);

log.info("报警信息已推送给用户: userId={}, alarmId={}",
userId, pushMessage.getAlarmId());

} catch (Exception e) {
log.error("推送报警信息失败: userId={}, error={}",
userId, e.getMessage(), e);
}
}

/**
* 检查用户是否在线
*/
private boolean isUserOnline(String userId) {
String onlineKey = "user:online:" + userId;
return redisTemplate.hasKey(onlineKey);
}

/**
* 保存离线消息
*/
private void saveOfflineMessage(String userId, AlarmPushMessage pushMessage) {
String queueKey = "user:offline:message:" + userId;
redisTemplate.opsForList().rightPush(queueKey, pushMessage);
redisTemplate.expire(queueKey, 7, TimeUnit.DAYS); // 保存7天
}

/**
* 获取用户离线消息
*/
public List<AlarmPushMessage> getOfflineMessages(String userId) {
String queueKey = "user:offline:message:" + userId;
List<Object> messages = redisTemplate.opsForList().range(queueKey, 0, -1);

if (messages == null || messages.isEmpty()) {
return new ArrayList<>();
}

return messages.stream()
.map(msg -> (AlarmPushMessage) msg)
.collect(Collectors.toList());
}

/**
* 清除用户离线消息
*/
public void clearOfflineMessages(String userId) {
String queueKey = "user:offline:message:" + userId;
redisTemplate.delete(queueKey);
}
}

3.3 WebSocket服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* WebSocket服务
* 提供实时通信能力
*/
@Component
@Slf4j
public class WebSocketService {

private final Map<String, WebSocketSession> userSessions = new ConcurrentHashMap<>();

/**
* 用户连接
*/
public void onUserConnect(String userId, WebSocketSession session) {
userSessions.put(userId, session);
log.info("用户WebSocket连接: userId={}", userId);
}

/**
* 用户断开连接
*/
public void onUserDisconnect(String userId) {
userSessions.remove(userId);
log.info("用户WebSocket断开: userId={}", userId);
}

/**
* 发送消息给用户
*/
public void sendMessageToUser(String userId, Object message) {
WebSocketSession session = userSessions.get(userId);
if (session != null && session.isOpen()) {
try {
String messageStr = JSON.toJSONString(message);
session.sendMessage(new TextMessage(messageStr));
} catch (Exception e) {
log.error("发送WebSocket消息失败: userId={}, error={}",
userId, e.getMessage(), e);
}
}
}

/**
* 广播消息给所有用户
*/
public void broadcastMessage(Object message) {
String messageStr = JSON.toJSONString(message);
for (Map.Entry<String, WebSocketSession> entry : userSessions.entrySet()) {
try {
WebSocketSession session = entry.getValue();
if (session.isOpen()) {
session.sendMessage(new TextMessage(messageStr));
}
} catch (Exception e) {
log.error("广播WebSocket消息失败: userId={}, error={}",
entry.getKey(), e.getMessage(), e);
}
}
}
}

4. 数据模型定义

4.1 设备报警消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 设备报警消息(消息队列)
*/
@Data
public class DeviceAlarmMessage {
/**
* 报警ID
*/
private String alarmId;

/**
* 设备ID
*/
private String deviceId;

/**
* 报警类型:TEMPERATURE, PRESSURE, VOLTAGE等
*/
private String alarmType;

/**
* 报警级别:CRITICAL, WARNING, INFO
*/
private String alarmLevel;

/**
* 报警消息
*/
private String alarmMessage;

/**
* 报警数据(JSON格式)
*/
private String alarmData;

/**
* 时间戳
*/
private Date timestamp;
}

4.2 设备报警信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 设备报警信息(返回给管理用户)
*/
@Data
public class DeviceAlarmInfo {
/**
* 报警ID
*/
private String alarmId;

/**
* 设备ID
*/
private String deviceId;

/**
* 报警类型
*/
private String alarmType;

/**
* 报警级别
*/
private String alarmLevel;

/**
* 报警消息
*/
private String alarmMessage;

/**
* 报警数据
*/
private String alarmData;

/**
* 处理状态:UNHANDLED, HANDLING, HANDLED
*/
private String status;

/**
* 时间戳
*/
private Date timestamp;

/**
* 创建时间
*/
private Date createTime;
}

4.3 设备报警记录(数据库)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* 设备报警记录(数据库实体)
*/
@Data
@TableName("device_alarm_record")
public class DeviceAlarmRecord {
/**
* 主键ID
*/
@TableId(type = IdType.AUTO)
private Long id;

/**
* 报警ID
*/
private String alarmId;

/**
* 设备ID
*/
private String deviceId;

/**
* 报警类型
*/
private String alarmType;

/**
* 报警级别
*/
private String alarmLevel;

/**
* 报警消息
*/
private String alarmMessage;

/**
* 报警数据
*/
private String alarmData;

/**
* 处理状态
*/
private String status;

/**
* 处理人
*/
private String handler;

/**
* 处理时间
*/
private Date handleTime;

/**
* 时间戳
*/
private Date timestamp;

/**
* 创建时间
*/
private Date createTime;

/**
* 更新时间
*/
private Date updateTime;
}

4.4 报警推送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 报警推送消息
*/
@Data
public class AlarmPushMessage {
/**
* 报警ID
*/
private String alarmId;

/**
* 设备ID
*/
private String deviceId;

/**
* 报警类型
*/
private String alarmType;

/**
* 报警级别
*/
private String alarmLevel;

/**
* 报警消息
*/
private String alarmMessage;

/**
* 时间戳
*/
private Date timestamp;
}

5. 数据库Mapper实现

5.1 DeviceAlarmMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 设备报警Mapper
*/
@Mapper
public interface DeviceAlarmMapper extends BaseMapper<DeviceAlarmRecord> {

/**
* 根据报警ID查询
*/
@Select("SELECT * FROM device_alarm_record WHERE alarm_id = #{alarmId}")
DeviceAlarmRecord selectByAlarmId(@Param("alarmId") String alarmId);

/**
* 查询管理用户ID列表
*/
@Select("SELECT DISTINCT user_id FROM device_management_user " +
"WHERE device_id = #{deviceId}")
List<String> selectManagementUserIds(@Param("deviceId") String deviceId);

/**
* 查询设备报警统计
*/
@Select("SELECT COUNT(*) as total_count, " +
"SUM(CASE WHEN alarm_level = 'CRITICAL' THEN 1 ELSE 0 END) as critical_count, " +
"SUM(CASE WHEN alarm_level = 'WARNING' THEN 1 ELSE 0 END) as warning_count " +
"FROM device_alarm_record " +
"WHERE device_id = #{deviceId} " +
"AND create_time >= #{startTime}")
DeviceAlarmStats selectAlarmStats(@Param("deviceId") String deviceId,
@Param("startTime") Date startTime);
}

6. 配置类

6.1 Kafka配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* Kafka配置
*/
@Configuration
@EnableKafka
public class KafkaConfig {

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "1");
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "device-alarm-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(configProps);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 3个消费者线程
return factory;
}
}

6.2 WebSocket配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* WebSocket配置
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

@Autowired
private AlarmWebSocketHandler alarmWebSocketHandler;

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(alarmWebSocketHandler, "/ws/alarm")
.setAllowedOrigins("*");
}
}

7. 性能优化策略

7.1 消息队列批量处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 批量处理优化
* 使用批量操作减少网络往返
*/
@Service
@Slf4j
public class BatchAlarmProcessService {

@Autowired
private DeviceAlarmService deviceAlarmService;

private final List<DeviceAlarmMessage> alarmBuffer = new ArrayList<>();
private final Object lock = new Object();

@Scheduled(fixedRate = 5000) // 每5秒处理一次
public void batchProcessAlarms() {
List<DeviceAlarmMessage> alarmsToProcess;

synchronized (lock) {
if (alarmBuffer.isEmpty()) {
return;
}
alarmsToProcess = new ArrayList<>(alarmBuffer);
alarmBuffer.clear();
}

// 批量处理报警信息
for (DeviceAlarmMessage alarm : alarmsToProcess) {
deviceAlarmService.pushDeviceAlarmToQueue(alarm);
}

log.info("批量处理报警信息: count={}", alarmsToProcess.size());
}

/**
* 添加报警到缓冲区
*/
public void addAlarmToBuffer(DeviceAlarmMessage alarm) {
synchronized (lock) {
alarmBuffer.add(alarm);
if (alarmBuffer.size() >= 100) {
// 缓冲区满了,立即处理
batchProcessAlarms();
}
}
}
}

7.2 异步处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 异步处理服务
* 使用异步方式处理非关键路径操作
*/
@Service
@Slf4j
public class AsyncAlarmProcessService {

@Autowired
private DeviceAlarmService deviceAlarmService;

@Async("alarmTaskExecutor")
public void asyncProcessAlarm(DeviceAlarmMessage alarmMessage) {
try {
// 异步处理报警信息
deviceAlarmService.processAndPersistAlarm(alarmMessage);
} catch (Exception e) {
log.error("异步处理报警信息失败: deviceId={}, error={}",
alarmMessage.getDeviceId(), e.getMessage(), e);
}
}

@Async("alarmTaskExecutor")
public void asyncPushAlarm(DeviceAlarmMessage alarmMessage) {
try {
// 异步推送报警信息
deviceAlarmService.pushAlarmToManagementUsers(alarmMessage);
} catch (Exception e) {
log.error("异步推送报警信息失败: deviceId={}, error={}",
alarmMessage.getDeviceId(), e.getMessage(), e);
}
}
}

8. 监控与告警

8.1 报警处理监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 报警处理监控服务
*/
@Service
@Slf4j
public class AlarmMonitorService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void monitorAlarmProcessing() {
// 1. 监控报警处理延迟
monitorAlarmProcessingDelay();

// 2. 监控报警队列积压
monitorAlarmQueueBacklog();

// 3. 监控报警推送成功率
monitorAlarmPushSuccessRate();
}

/**
* 监控报警处理延迟
*/
private void monitorAlarmProcessingDelay() {
// 统计报警处理延迟
// 如果延迟超过阈值,发送告警
}

/**
* 监控报警队列积压
*/
private void monitorAlarmQueueBacklog() {
// 检查Kafka队列积压情况
// 如果积压超过阈值,发送告警
}

/**
* 监控报警推送成功率
*/
private void monitorAlarmPushSuccessRate() {
// 统计报警推送成功率
// 如果成功率低于阈值,发送告警
}
}

9. 总结

本文详细介绍了设备报警数据处理的Java微服务架构实现,包括:

  1. 管理网关服务:负责管理用户请求接入、身份认证、请求路由
  2. 设备服务:负责设备报警信息管理、报警数据处理、消息队列消费
  3. 消息队列处理:使用Kafka异步处理设备报警信息
  4. 实时推送机制:通过WebSocket/SSE实时推送报警信息给管理用户
  5. 多级缓存架构:Redis缓存 + 数据库持久化
  6. 性能优化:批量处理、异步处理、消息队列优化

该架构具有以下优势:

  • 高性能:消息队列异步处理,支持高并发
  • 高可用:消息队列保证消息不丢失,支持重试
  • 实时性:WebSocket/SSE实时推送,延迟<100ms
  • 可扩展:微服务架构,支持水平扩展
  • 可靠性:消息持久化,支持离线消息

通过本文的实战代码,可以快速搭建一个高性能的设备报警数据处理系统。