服务与监控Java微服务后端架构实战

1. 架构概述

服务与监控系统是IoT平台的核心数据服务模块,包含数据服务(柜控-service)和数据监控(柜控-monitor)两个主要服务。数据服务负责提供设备实时和历史数据服务,支持动态切换实时库与历史库,以及原报文解析。数据监控负责实时信息订阅(WebSocket)、心跳信息获取和设备下发接口。本篇文章将深入讲解如何基于Java微服务架构实现一个高性能、高可用、可扩展的服务与监控系统。

1.1 系统架构图

1
2
3
4
5
6
7
8
9
10
graph LR
A[数据服务(柜控-service)] --> B[数据监控(柜控-monitor)]
B --> C[WebSocket服务]
B --> D[动态数据源管理器]
B --> E[原报文解析服务]
B --> F[心跳监控服务]
B --> G[设备下发服务]
B --> H[消息队列(Kafka)]
B --> I[Redis]
B --> J[Cassandra]

1.2 核心组件

  • 数据服务(柜控-service):负责设备实时和历史数据服务提供、动态数据库切换、原报文解析
  • 数据监控(柜控-monitor):负责实时信息订阅(WebSocket)、心跳信息获取、设备下发接口
  • WebSocket服务:负责实时信息推送、客户端连接管理
  • 动态数据源管理器:负责实时库(Redis)和历史库(Cassandra)的动态切换
  • 原报文解析服务:负责设备原始报文的解析和转换
  • 心跳监控服务:负责设备心跳信息的收集和监控
  • 设备下发服务:负责设备指令的下发和管理
  • 消息队列(Kafka):负责应用事件、心跳信息、实时信息的异步处理
  • Redis:存储实时信息和心跳信息
  • Cassandra:存储历史信息

2. 数据服务(柜控-service)实现

2.1 数据服务核心控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* 数据服务控制器
* 提供设备实时、历史数据服务
*/
@RestController
@RequestMapping("/api/data")
@Slf4j
public class DataServiceController {

@Autowired
private DeviceDataService deviceDataService;

@Autowired
private MessageParseService messageParseService;

@Autowired
private DynamicDataSourceManager dataSourceManager;

/**
* 获取设备实时数据
* 流程:查询实时库(Redis) → 返回实时数据
*/
@GetMapping("/realtime/{deviceId}")
public Result<DeviceRealtimeData> getRealtimeData(
@PathVariable String deviceId,
@RequestParam(required = false) String dataType) {

try {
// 切换到实时库
dataSourceManager.switchToRealtime();

// 查询实时数据
DeviceRealtimeData data = deviceDataService.getRealtimeData(deviceId, dataType);

return Result.success(data);

} catch (Exception e) {
log.error("获取设备实时数据失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
return Result.error("获取实时数据失败: " + e.getMessage());
}
}

/**
* 获取设备历史数据
* 流程:查询历史库(Cassandra) → 返回历史数据
*/
@GetMapping("/history/{deviceId}")
public Result<List<DeviceHistoryData>> getHistoryData(
@PathVariable String deviceId,
@RequestParam String startTime,
@RequestParam String endTime,
@RequestParam(required = false) String dataType) {

try {
// 切换到历史库
dataSourceManager.switchToHistory();

// 查询历史数据
List<DeviceHistoryData> dataList = deviceDataService.getHistoryData(
deviceId, startTime, endTime, dataType);

return Result.success(dataList);

} catch (Exception e) {
log.error("获取设备历史数据失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
return Result.error("获取历史数据失败: " + e.getMessage());
}
}

/**
* 解析原报文
* 流程:接收原报文 → 解析报文 → 保存数据 → 返回解析结果
*/
@PostMapping("/parse/message")
public Result<MessageParseResult> parseMessage(
@RequestBody @Valid MessageParseRequest request) {

try {
// 解析原报文
MessageParseResult result = messageParseService.parseMessage(
request.getDeviceId(),
request.getRawMessage(),
request.getMessageType());

return Result.success(result);

} catch (Exception e) {
log.error("解析原报文失败: deviceId={}, error={}",
request.getDeviceId(), e.getMessage(), e);
return Result.error("解析报文失败: " + e.getMessage());
}
}
}

2.2 设备数据服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/**
* 设备数据服务
* 负责设备实时和历史数据查询
*/
@Service
@Slf4j
public class DeviceDataService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private CassandraTemplate cassandraTemplate;

@Autowired
private DynamicDataSourceManager dataSourceManager;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 获取设备实时数据
*/
public DeviceRealtimeData getRealtimeData(String deviceId, String dataType) {
try {
// 切换到实时库
dataSourceManager.switchToRealtime();

// 从Redis查询实时数据
String cacheKey = buildRealtimeCacheKey(deviceId, dataType);
DeviceRealtimeData data = (DeviceRealtimeData) redisTemplate.opsForValue().get(cacheKey);

if (data == null) {
// 如果缓存中没有,返回空数据
data = new DeviceRealtimeData();
data.setDeviceId(deviceId);
data.setDataType(dataType);
data.setDataAvailable(false);
} else {
data.setDataAvailable(true);
}

return data;

} catch (Exception e) {
log.error("获取设备实时数据失败: deviceId={}, dataType={}, error={}",
deviceId, dataType, e.getMessage(), e);
throw new BusinessException("获取实时数据失败: " + e.getMessage());
}
}

/**
* 获取设备历史数据
*/
public List<DeviceHistoryData> getHistoryData(String deviceId, String startTime,
String endTime, String dataType) {
try {
// 切换到历史库
dataSourceManager.switchToHistory();

// 从Cassandra查询历史数据
String cql = "SELECT * FROM device_history_data " +
"WHERE device_id = ? AND data_time >= ? AND data_time <= ?";

List<Object> params = new ArrayList<>();
params.add(deviceId);
params.add(LocalDateTime.parse(startTime));
params.add(LocalDateTime.parse(endTime));

if (dataType != null && !dataType.isEmpty()) {
cql += " AND data_type = ?";
params.add(dataType);
}

cql += " ORDER BY data_time DESC";

List<DeviceHistoryData> dataList = cassandraTemplate.select(cql,
params.toArray(), DeviceHistoryData.class);

return dataList;

} catch (Exception e) {
log.error("获取设备历史数据失败: deviceId={}, startTime={}, endTime={}, error={}",
deviceId, startTime, endTime, e.getMessage(), e);
throw new BusinessException("获取历史数据失败: " + e.getMessage());
}
}

/**
* 保存实时数据
*/
public void saveRealtimeData(String deviceId, DeviceRealtimeData data) {
try {
// 切换到实时库
dataSourceManager.switchToRealtime();

// 保存到Redis
String cacheKey = buildRealtimeCacheKey(deviceId, data.getDataType());
redisTemplate.opsForValue().set(cacheKey, data, 1, TimeUnit.HOURS);

// 发送实时数据事件
sendRealtimeDataEvent(deviceId, data);

log.debug("保存设备实时数据成功: deviceId={}, dataType={}",
deviceId, data.getDataType());

} catch (Exception e) {
log.error("保存设备实时数据失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
}
}

/**
* 保存历史数据
*/
public void saveHistoryData(String deviceId, DeviceHistoryData data) {
try {
// 切换到历史库
dataSourceManager.switchToHistory();

// 保存到Cassandra
cassandraTemplate.insert(data);

log.debug("保存设备历史数据成功: deviceId={}, dataTime={}",
deviceId, data.getDataTime());

} catch (Exception e) {
log.error("保存设备历史数据失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
}
}

/**
* 构建实时数据缓存键
*/
private String buildRealtimeCacheKey(String deviceId, String dataType) {
if (dataType == null || dataType.isEmpty()) {
return "device:realtime:" + deviceId;
}
return "device:realtime:" + deviceId + ":" + dataType;
}

/**
* 发送实时数据事件
*/
private void sendRealtimeDataEvent(String deviceId, DeviceRealtimeData data) {
try {
DeviceRealtimeDataEvent event = new DeviceRealtimeDataEvent();
event.setDeviceId(deviceId);
event.setDataType(data.getDataType());
event.setDataContent(data.getDataContent());
event.setUpdateTime(LocalDateTime.now());

kafkaTemplate.send("device.realtime.data", deviceId,
JSON.toJSONString(event));

} catch (Exception e) {
log.error("发送实时数据事件失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
}
}
}

2.3 动态数据源管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 动态数据源管理器
* 负责实时库(Redis)和历史库(Cassandra)的动态切换
*/
@Service
@Slf4j
public class DynamicDataSourceManager {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private CassandraTemplate cassandraTemplate;

private static final ThreadLocal<String> dataSourceContext = new ThreadLocal<>();

/**
* 切换到实时库
*/
public void switchToRealtime() {
dataSourceContext.set("REALTIME");
log.debug("切换到实时库(Redis)");
}

/**
* 切换到历史库
*/
public void switchToHistory() {
dataSourceContext.set("HISTORY");
log.debug("切换到历史库(Cassandra)");
}

/**
* 获取当前数据源类型
*/
public String getCurrentDataSource() {
return dataSourceContext.get();
}

/**
* 清除数据源上下文
*/
public void clearDataSource() {
dataSourceContext.remove();
}

/**
* 执行实时库操作
*/
public <T> T executeOnRealtime(Supplier<T> operation) {
try {
switchToRealtime();
return operation.get();
} finally {
clearDataSource();
}
}

/**
* 执行历史库操作
*/
public <T> T executeOnHistory(Supplier<T> operation) {
try {
switchToHistory();
return operation.get();
} finally {
clearDataSource();
}
}
}

2.4 原报文解析服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/**
* 原报文解析服务
* 负责设备原始报文的解析和转换
*/
@Service
@Slf4j
public class MessageParseService {

@Autowired
private DeviceDataService deviceDataService;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

private final Map<String, MessageParser> parserMap = new ConcurrentHashMap<>();

/**
* 解析原报文
*/
public MessageParseResult parseMessage(String deviceId, String rawMessage,
String messageType) {
try {
// 1. 获取对应的解析器
MessageParser parser = getParser(messageType);
if (parser == null) {
throw new BusinessException("不支持的报文类型: " + messageType);
}

// 2. 解析报文
ParsedMessage parsedMessage = parser.parse(rawMessage);

// 3. 保存解析后的数据
saveParsedData(deviceId, parsedMessage);

// 4. 发送解析事件
sendParseEvent(deviceId, parsedMessage);

// 5. 构建返回结果
MessageParseResult result = new MessageParseResult();
result.setSuccess(true);
result.setDeviceId(deviceId);
result.setMessageType(messageType);
result.setParsedData(parsedMessage.getData());
result.setParseTime(LocalDateTime.now());
result.setMessage("报文解析成功");

log.info("原报文解析成功: deviceId={}, messageType={}",
deviceId, messageType);

return result;

} catch (Exception e) {
log.error("原报文解析失败: deviceId={}, messageType={}, error={}",
deviceId, messageType, e.getMessage(), e);
throw new BusinessException("报文解析失败: " + e.getMessage());
}
}

/**
* 获取解析器
*/
private MessageParser getParser(String messageType) {
return parserMap.computeIfAbsent(messageType, type -> {
switch (type) {
case "JSON":
return new JsonMessageParser();
case "XML":
return new XmlMessageParser();
case "BINARY":
return new BinaryMessageParser();
case "PROTOBUF":
return new ProtobufMessageParser();
default:
return null;
}
});
}

/**
* 保存解析后的数据
*/
private void saveParsedData(String deviceId, ParsedMessage parsedMessage) {
// 保存实时数据
DeviceRealtimeData realtimeData = convertToRealtimeData(deviceId, parsedMessage);
deviceDataService.saveRealtimeData(deviceId, realtimeData);

// 保存历史数据
DeviceHistoryData historyData = convertToHistoryData(deviceId, parsedMessage);
deviceDataService.saveHistoryData(deviceId, historyData);
}

/**
* 发送解析事件
*/
private void sendParseEvent(String deviceId, ParsedMessage parsedMessage) {
try {
MessageParseEvent event = new MessageParseEvent();
event.setDeviceId(deviceId);
event.setMessageType(parsedMessage.getMessageType());
event.setParsedData(parsedMessage.getData());
event.setParseTime(LocalDateTime.now());

kafkaTemplate.send("message.parse.event", deviceId,
JSON.toJSONString(event));

} catch (Exception e) {
log.error("发送解析事件失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
}
}

/**
* 转换为实时数据
*/
private DeviceRealtimeData convertToRealtimeData(String deviceId,
ParsedMessage parsedMessage) {
DeviceRealtimeData data = new DeviceRealtimeData();
data.setDeviceId(deviceId);
data.setDataType(parsedMessage.getDataType());
data.setDataContent(parsedMessage.getData());
data.setUpdateTime(LocalDateTime.now());
return data;
}

/**
* 转换为历史数据
*/
private DeviceHistoryData convertToHistoryData(String deviceId,
ParsedMessage parsedMessage) {
DeviceHistoryData data = new DeviceHistoryData();
data.setDeviceId(deviceId);
data.setDataType(parsedMessage.getDataType());
data.setDataContent(parsedMessage.getData());
data.setDataTime(LocalDateTime.now());
return data;
}
}

/**
* 报文解析器接口
*/
public interface MessageParser {
ParsedMessage parse(String rawMessage);
}

/**
* JSON报文解析器
*/
@Component
public class JsonMessageParser implements MessageParser {

@Override
public ParsedMessage parse(String rawMessage) {
try {
Map<String, Object> data = JSON.parseObject(rawMessage, Map.class);

ParsedMessage parsedMessage = new ParsedMessage();
parsedMessage.setMessageType("JSON");
parsedMessage.setDataType((String) data.get("dataType"));
parsedMessage.setData(data);

return parsedMessage;

} catch (Exception e) {
throw new BusinessException("JSON报文解析失败: " + e.getMessage());
}
}
}

/**
* XML报文解析器
*/
@Component
public class XmlMessageParser implements MessageParser {

@Override
public ParsedMessage parse(String rawMessage) {
try {
// XML解析实现
Document document = DocumentHelper.parseText(rawMessage);
Element root = document.getRootElement();

Map<String, Object> data = new HashMap<>();
data.put("dataType", root.elementText("dataType"));
data.put("content", root.elementText("content"));

ParsedMessage parsedMessage = new ParsedMessage();
parsedMessage.setMessageType("XML");
parsedMessage.setDataType(root.elementText("dataType"));
parsedMessage.setData(data);

return parsedMessage;

} catch (Exception e) {
throw new BusinessException("XML报文解析失败: " + e.getMessage());
}
}
}

2.5 Kafka消息消费者(应用事件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 应用事件消费者
* 消费Kafka应用事件
*/
@Component
@Slf4j
public class ApplicationEventConsumer {

@Autowired
private DeviceDataService deviceDataService;

/**
* 消费应用事件
*/
@KafkaListener(topics = "application.event", groupId = "data-service-group")
public void consumeApplicationEvent(String message) {
try {
ApplicationEvent event = JSON.parseObject(message, ApplicationEvent.class);

// 根据事件类型处理
switch (event.getEventType()) {
case "DEVICE_DATA_UPDATE":
handleDeviceDataUpdate(event);
break;
case "DEVICE_STATUS_CHANGE":
handleDeviceStatusChange(event);
break;
default:
log.warn("未知的应用事件类型: eventType={}", event.getEventType());
}

} catch (Exception e) {
log.error("消费应用事件失败: error={}", e.getMessage(), e);
}
}

/**
* 处理设备数据更新事件
*/
private void handleDeviceDataUpdate(ApplicationEvent event) {
try {
Map<String, Object> eventData = event.getEventData();
String deviceId = (String) eventData.get("deviceId");
DeviceRealtimeData data = JSON.parseObject(
JSON.toJSONString(eventData.get("data")), DeviceRealtimeData.class);

// 保存实时数据
deviceDataService.saveRealtimeData(deviceId, data);

log.debug("处理设备数据更新事件成功: deviceId={}", deviceId);

} catch (Exception e) {
log.error("处理设备数据更新事件失败: error={}", e.getMessage(), e);
}
}

/**
* 处理设备状态变更事件
*/
private void handleDeviceStatusChange(ApplicationEvent event) {
try {
Map<String, Object> eventData = event.getEventData();
String deviceId = (String) eventData.get("deviceId");
String status = (String) eventData.get("status");

// 更新设备状态
updateDeviceStatus(deviceId, status);

log.debug("处理设备状态变更事件成功: deviceId={}, status={}",
deviceId, status);

} catch (Exception e) {
log.error("处理设备状态变更事件失败: error={}", e.getMessage(), e);
}
}

/**
* 更新设备状态
*/
private void updateDeviceStatus(String deviceId, String status) {
// 更新设备状态的实现
log.info("更新设备状态: deviceId={}, status={}", deviceId, status);
}
}

3. 数据监控(柜控-monitor)实现

3.1 数据监控核心控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* 数据监控控制器
* 提供实时信息订阅、心跳信息获取、设备下发接口
*/
@RestController
@RequestMapping("/api/monitor")
@Slf4j
public class DataMonitorController {

@Autowired
private RealtimeSubscriptionService subscriptionService;

@Autowired
private HeartbeatService heartbeatService;

@Autowired
private DeviceCommandService deviceCommandService;

/**
* 建立WebSocket连接(实时信息订阅)
*/
@GetMapping("/websocket/connect")
public void connectWebSocket(HttpServletRequest request, HttpServletResponse response) {
// WebSocket连接处理在WebSocketConfig中配置
}

/**
* 获取心跳信息
* 流程:查询Redis心跳信息 → 返回心跳数据
*/
@GetMapping("/heartbeat/{deviceId}")
public Result<HeartbeatInfo> getHeartbeatInfo(@PathVariable String deviceId) {
try {
HeartbeatInfo heartbeatInfo = heartbeatService.getHeartbeatInfo(deviceId);
return Result.success(heartbeatInfo);

} catch (Exception e) {
log.error("获取心跳信息失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
return Result.error("获取心跳信息失败: " + e.getMessage());
}
}

/**
* 获取所有设备心跳信息
*/
@GetMapping("/heartbeat/list")
public Result<List<HeartbeatInfo>> getAllHeartbeatInfo() {
try {
List<HeartbeatInfo> heartbeatList = heartbeatService.getAllHeartbeatInfo();
return Result.success(heartbeatList);

} catch (Exception e) {
log.error("获取所有心跳信息失败: error={}", e.getMessage(), e);
return Result.error("获取心跳信息失败: " + e.getMessage());
}
}

/**
* 设备下发接口
* 流程:接收下发请求 → 下发设备指令 → 返回下发结果
*/
@PostMapping("/device/command/send")
public Result<DeviceCommandResult> sendDeviceCommand(
@RequestBody @Valid DeviceCommandRequest request) {

try {
DeviceCommandResult result = deviceCommandService.sendCommand(request);
return Result.success(result);

} catch (Exception e) {
log.error("设备指令下发失败: deviceId={}, error={}",
request.getDeviceId(), e.getMessage(), e);
return Result.error("指令下发失败: " + e.getMessage());
}
}

/**
* 查询设备指令状态
*/
@GetMapping("/device/command/{commandId}")
public Result<DeviceCommandStatus> getCommandStatus(@PathVariable String commandId) {
try {
DeviceCommandStatus status = deviceCommandService.getCommandStatus(commandId);
return Result.success(status);

} catch (Exception e) {
log.error("查询设备指令状态失败: commandId={}, error={}",
commandId, e.getMessage(), e);
return Result.error("查询指令状态失败: " + e.getMessage());
}
}
}

3.2 WebSocket实时信息订阅服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
/**
* WebSocket实时信息订阅服务
* 负责实时信息推送、客户端连接管理
*/
@Component
@Slf4j
public class RealtimeSubscriptionService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Set<String>> deviceSubscriptions = new ConcurrentHashMap<>();

/**
* 处理WebSocket连接
*/
public void handleConnection(WebSocketSession session) {
String sessionId = session.getId();
sessions.put(sessionId, session);

log.info("WebSocket连接建立: sessionId={}", sessionId);
}

/**
* 处理WebSocket断开
*/
public void handleDisconnection(WebSocketSession session) {
String sessionId = session.getId();
sessions.remove(sessionId);

// 清除订阅关系
deviceSubscriptions.remove(sessionId);

log.info("WebSocket连接断开: sessionId={}", sessionId);
}

/**
* 处理订阅请求
*/
public void handleSubscribe(String sessionId, String deviceId) {
try {
// 添加订阅关系
deviceSubscriptions.computeIfAbsent(sessionId, k -> new HashSet<>())
.add(deviceId);

log.info("订阅设备实时信息: sessionId={}, deviceId={}", sessionId, deviceId);

} catch (Exception e) {
log.error("处理订阅请求失败: sessionId={}, deviceId={}, error={}",
sessionId, deviceId, e.getMessage(), e);
}
}

/**
* 处理取消订阅请求
*/
public void handleUnsubscribe(String sessionId, String deviceId) {
try {
Set<String> deviceIds = deviceSubscriptions.get(sessionId);
if (deviceIds != null) {
deviceIds.remove(deviceId);
}

log.info("取消订阅设备实时信息: sessionId={}, deviceId={}", sessionId, deviceId);

} catch (Exception e) {
log.error("处理取消订阅请求失败: sessionId={}, deviceId={}, error={}",
sessionId, deviceId, e.getMessage(), e);
}
}

/**
* 推送实时信息
*/
public void pushRealtimeData(String deviceId, DeviceRealtimeData data) {
try {
// 查找订阅该设备的会话
for (Map.Entry<String, Set<String>> entry : deviceSubscriptions.entrySet()) {
String sessionId = entry.getKey();
Set<String> deviceIds = entry.getValue();

if (deviceIds != null && deviceIds.contains(deviceId)) {
WebSocketSession session = sessions.get(sessionId);
if (session != null && session.isOpen()) {
// 发送实时数据
sendMessage(session, buildRealtimeDataMessage(deviceId, data));
}
}
}

} catch (Exception e) {
log.error("推送实时信息失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
}
}

/**
* 发送消息
*/
private void sendMessage(WebSocketSession session, String message) {
try {
session.sendMessage(new TextMessage(message));
} catch (Exception e) {
log.error("发送WebSocket消息失败: sessionId={}, error={}",
session.getId(), e.getMessage(), e);
}
}

/**
* 构建实时数据消息
*/
private String buildRealtimeDataMessage(String deviceId, DeviceRealtimeData data) {
Map<String, Object> message = new HashMap<>();
message.put("type", "REALTIME_DATA");
message.put("deviceId", deviceId);
message.put("data", data);
message.put("timestamp", LocalDateTime.now().toString());

return JSON.toJSONString(message);
}
}

/**
* WebSocket配置
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

@Autowired
private RealtimeSubscriptionService subscriptionService;

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new RealtimeDataWebSocketHandler(subscriptionService),
"/api/monitor/websocket")
.setAllowedOrigins("*");
}
}

/**
* WebSocket处理器
*/
public class RealtimeDataWebSocketHandler extends TextWebSocketHandler {

private final RealtimeSubscriptionService subscriptionService;

public RealtimeDataWebSocketHandler(RealtimeSubscriptionService subscriptionService) {
this.subscriptionService = subscriptionService;
}

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
subscriptionService.handleConnection(session);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
throws Exception {
subscriptionService.handleDisconnection(session);
}

@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message)
throws Exception {
try {
// 解析消息
Map<String, Object> msg = JSON.parseObject(message.getPayload(), Map.class);
String action = (String) msg.get("action");
String deviceId = (String) msg.get("deviceId");

if ("SUBSCRIBE".equals(action)) {
subscriptionService.handleSubscribe(session.getId(), deviceId);
} else if ("UNSUBSCRIBE".equals(action)) {
subscriptionService.handleUnsubscribe(session.getId(), deviceId);
}

} catch (Exception e) {
log.error("处理WebSocket消息失败: sessionId={}, error={}",
session.getId(), e.getMessage(), e);
}
}
}

3.3 心跳信息服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/**
* 心跳信息服务
* 负责心跳信息的收集和监控
*/
@Service
@Slf4j
public class HeartbeatService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 获取心跳信息
*/
public HeartbeatInfo getHeartbeatInfo(String deviceId) {
try {
// 从Redis查询心跳信息
String heartbeatKey = "device:heartbeat:" + deviceId;
HeartbeatInfo heartbeatInfo = (HeartbeatInfo) redisTemplate.opsForValue()
.get(heartbeatKey);

if (heartbeatInfo == null) {
// 如果不存在,返回默认值
heartbeatInfo = new HeartbeatInfo();
heartbeatInfo.setDeviceId(deviceId);
heartbeatInfo.setOnline(false);
heartbeatInfo.setLastHeartbeatTime(null);
}

return heartbeatInfo;

} catch (Exception e) {
log.error("获取心跳信息失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
throw new BusinessException("获取心跳信息失败: " + e.getMessage());
}
}

/**
* 获取所有设备心跳信息
*/
public List<HeartbeatInfo> getAllHeartbeatInfo() {
try {
// 从Redis查询所有心跳信息
Set<String> keys = redisTemplate.keys("device:heartbeat:*");

List<HeartbeatInfo> heartbeatList = new ArrayList<>();
if (keys != null && !keys.isEmpty()) {
for (String key : keys) {
HeartbeatInfo heartbeatInfo = (HeartbeatInfo) redisTemplate.opsForValue()
.get(key);
if (heartbeatInfo != null) {
heartbeatList.add(heartbeatInfo);
}
}
}

return heartbeatList;

} catch (Exception e) {
log.error("获取所有心跳信息失败: error={}", e.getMessage(), e);
throw new BusinessException("获取心跳信息失败: " + e.getMessage());
}
}

/**
* 更新心跳信息
*/
public void updateHeartbeatInfo(String deviceId) {
try {
HeartbeatInfo heartbeatInfo = new HeartbeatInfo();
heartbeatInfo.setDeviceId(deviceId);
heartbeatInfo.setOnline(true);
heartbeatInfo.setLastHeartbeatTime(LocalDateTime.now());

// 保存到Redis
String heartbeatKey = "device:heartbeat:" + deviceId;
redisTemplate.opsForValue().set(heartbeatKey, heartbeatInfo,
10, TimeUnit.MINUTES);

// 发送心跳事件
sendHeartbeatEvent(deviceId, heartbeatInfo);

log.debug("更新心跳信息成功: deviceId={}", deviceId);

} catch (Exception e) {
log.error("更新心跳信息失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
}
}

/**
* 发送心跳事件
*/
private void sendHeartbeatEvent(String deviceId, HeartbeatInfo heartbeatInfo) {
try {
HeartbeatEvent event = new HeartbeatEvent();
event.setDeviceId(deviceId);
event.setOnline(heartbeatInfo.isOnline());
event.setLastHeartbeatTime(heartbeatInfo.getLastHeartbeatTime());
event.setEventTime(LocalDateTime.now());

kafkaTemplate.send("heartbeat.event", deviceId,
JSON.toJSONString(event));

} catch (Exception e) {
log.error("发送心跳事件失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
}
}
}

/**
* 心跳信息消费者
* 消费Kafka心跳信息
*/
@Component
@Slf4j
public class HeartbeatConsumer {

@Autowired
private HeartbeatService heartbeatService;

/**
* 消费心跳信息
*/
@KafkaListener(topics = "heartbeat.info", groupId = "monitor-service-group")
public void consumeHeartbeatInfo(String message) {
try {
HeartbeatMessage heartbeatMessage = JSON.parseObject(message, HeartbeatMessage.class);

// 更新心跳信息
heartbeatService.updateHeartbeatInfo(heartbeatMessage.getDeviceId());

log.debug("消费心跳信息成功: deviceId={}", heartbeatMessage.getDeviceId());

} catch (Exception e) {
log.error("消费心跳信息失败: error={}", e.getMessage(), e);
}
}
}

3.4 设备下发服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
/**
* 设备下发服务
* 负责设备指令的下发和管理
*/
@Service
@Slf4j
public class DeviceCommandService {

@Autowired
private DeviceCommandMapper deviceCommandMapper;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 下发设备指令
*/
public DeviceCommandResult sendCommand(DeviceCommandRequest request) {
try {
// 1. 创建设备指令记录
DeviceCommand command = createDeviceCommand(request);
deviceCommandMapper.insert(command);

// 2. 发送指令到Kafka
sendCommandToKafka(command);

// 3. 缓存指令状态
cacheCommandStatus(command);

// 4. 构建返回结果
DeviceCommandResult result = new DeviceCommandResult();
result.setSuccess(true);
result.setCommandId(command.getId());
result.setCommandNo(command.getCommandNo());
result.setStatus("SENT");
result.setMessage("指令下发成功");

log.info("设备指令下发成功: commandId={}, deviceId={}, commandType={}",
command.getId(), request.getDeviceId(), request.getCommandType());

return result;

} catch (Exception e) {
log.error("设备指令下发失败: deviceId={}, commandType={}, error={}",
request.getDeviceId(), request.getCommandType(), e.getMessage(), e);
return DeviceCommandResult.failed("指令下发失败: " + e.getMessage());
}
}

/**
* 查询指令状态
*/
public DeviceCommandStatus getCommandStatus(String commandId) {
try {
// 1. 从缓存查询
String cacheKey = "device:command:status:" + commandId;
DeviceCommandStatus status = (DeviceCommandStatus) redisTemplate.opsForValue()
.get(cacheKey);

if (status != null) {
return status;
}

// 2. 从数据库查询
DeviceCommand command = deviceCommandMapper.selectById(Long.parseLong(commandId));
if (command == null) {
throw new BusinessException("指令不存在");
}

status = new DeviceCommandStatus();
status.setCommandId(command.getId());
status.setCommandNo(command.getCommandNo());
status.setStatus(command.getStatus());
status.setResponseData(command.getResponseData());
status.setErrorMessage(command.getErrorMessage());

return status;

} catch (Exception e) {
log.error("查询指令状态失败: commandId={}, error={}",
commandId, e.getMessage(), e);
throw new BusinessException("查询指令状态失败: " + e.getMessage());
}
}

/**
* 创建设备指令
*/
private DeviceCommand createDeviceCommand(DeviceCommandRequest request) {
DeviceCommand command = new DeviceCommand();
command.setCommandNo(generateCommandNo());
command.setDeviceId(request.getDeviceId());
command.setCommandType(request.getCommandType());
command.setCommandData(request.getCommandData());
command.setStatus("PENDING");
command.setCreateTime(LocalDateTime.now());
command.setUpdateTime(LocalDateTime.now());

return command;
}

/**
* 发送指令到Kafka
*/
private void sendCommandToKafka(DeviceCommand command) {
try {
DeviceCommandMessage message = new DeviceCommandMessage();
message.setCommandId(command.getId());
message.setCommandNo(command.getCommandNo());
message.setDeviceId(command.getDeviceId());
message.setCommandType(command.getCommandType());
message.setCommandData(command.getCommandData());
message.setCreateTime(command.getCreateTime());

kafkaTemplate.send("device.command.send", command.getDeviceId(),
JSON.toJSONString(message));

} catch (Exception e) {
log.error("发送指令到Kafka失败: commandId={}, error={}",
command.getId(), e.getMessage(), e);
}
}

/**
* 缓存指令状态
*/
private void cacheCommandStatus(DeviceCommand command) {
try {
DeviceCommandStatus status = new DeviceCommandStatus();
status.setCommandId(command.getId());
status.setCommandNo(command.getCommandNo());
status.setStatus(command.getStatus());

String cacheKey = "device:command:status:" + command.getId();
redisTemplate.opsForValue().set(cacheKey, status,
24, TimeUnit.HOURS);

} catch (Exception e) {
log.error("缓存指令状态失败: commandId={}, error={}",
command.getId(), e.getMessage(), e);
}
}

/**
* 生成指令编号
*/
private String generateCommandNo() {
return "CMD_" + System.currentTimeMillis() + "_" +
UUID.randomUUID().toString().substring(0, 8);
}
}

/**
* 实时信息订阅消费者
* 消费Kafka实时信息
*/
@Component
@Slf4j
public class RealtimeDataConsumer {

@Autowired
private RealtimeSubscriptionService subscriptionService;

/**
* 消费实时信息
*/
@KafkaListener(topics = "realtime.data.subscription", groupId = "monitor-service-group")
public void consumeRealtimeData(String message) {
try {
RealtimeDataMessage dataMessage = JSON.parseObject(message, RealtimeDataMessage.class);

// 推送实时信息到WebSocket客户端
DeviceRealtimeData data = dataMessage.getData();
subscriptionService.pushRealtimeData(dataMessage.getDeviceId(), data);

log.debug("消费实时信息成功: deviceId={}", dataMessage.getDeviceId());

} catch (Exception e) {
log.error("消费实时信息失败: error={}", e.getMessage(), e);
}
}
}

/**
* 应用事件消费者(生产消费)
*/
@Component
@Slf4j
public class MonitorApplicationEventConsumer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 消费应用事件
*/
@KafkaListener(topics = "application.event", groupId = "monitor-service-group")
public void consumeApplicationEvent(String message) {
try {
ApplicationEvent event = JSON.parseObject(message, ApplicationEvent.class);

// 根据事件类型处理
switch (event.getEventType()) {
case "DEVICE_ONLINE":
handleDeviceOnline(event);
break;
case "DEVICE_OFFLINE":
handleDeviceOffline(event);
break;
default:
log.warn("未知的应用事件类型: eventType={}", event.getEventType());
}

} catch (Exception e) {
log.error("消费应用事件失败: error={}", e.getMessage(), e);
}
}

/**
* 处理设备上线事件
*/
private void handleDeviceOnline(ApplicationEvent event) {
try {
Map<String, Object> eventData = event.getEventData();
String deviceId = (String) eventData.get("deviceId");

// 发送设备上线通知
sendDeviceOnlineNotification(deviceId);

log.info("处理设备上线事件成功: deviceId={}", deviceId);

} catch (Exception e) {
log.error("处理设备上线事件失败: error={}", e.getMessage(), e);
}
}

/**
* 处理设备离线事件
*/
private void handleDeviceOffline(ApplicationEvent event) {
try {
Map<String, Object> eventData = event.getEventData();
String deviceId = (String) eventData.get("deviceId");

// 发送设备离线通知
sendDeviceOfflineNotification(deviceId);

log.info("处理设备离线事件成功: deviceId={}", deviceId);

} catch (Exception e) {
log.error("处理设备离线事件失败: error={}", e.getMessage(), e);
}
}

/**
* 发送设备上线通知
*/
private void sendDeviceOnlineNotification(String deviceId) {
try {
DeviceNotificationEvent event = new DeviceNotificationEvent();
event.setDeviceId(deviceId);
event.setNotificationType("DEVICE_ONLINE");
event.setNotificationTime(LocalDateTime.now());

kafkaTemplate.send("device.notification", deviceId,
JSON.toJSONString(event));

} catch (Exception e) {
log.error("发送设备上线通知失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
}
}

/**
* 发送设备离线通知
*/
private void sendDeviceOfflineNotification(String deviceId) {
try {
DeviceNotificationEvent event = new DeviceNotificationEvent();
event.setDeviceId(deviceId);
event.setNotificationType("DEVICE_OFFLINE");
event.setNotificationTime(LocalDateTime.now());

kafkaTemplate.send("device.notification", deviceId,
JSON.toJSONString(event));

} catch (Exception e) {
log.error("发送设备离线通知失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
}
}
}

4. 数据模型定义

4.1 设备实时数据实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 设备实时数据实体
*/
@Data
public class DeviceRealtimeData {

/**
* 设备ID
*/
private String deviceId;

/**
* 数据类型
*/
private String dataType;

/**
* 数据内容
*/
private Map<String, Object> dataContent;

/**
* 更新时间
*/
private LocalDateTime updateTime;

/**
* 数据是否可用
*/
private boolean dataAvailable;
}

4.2 设备历史数据实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 设备历史数据实体
*/
@Data
@Table("device_history_data")
public class DeviceHistoryData {

/**
* 记录ID
*/
@PrimaryKey
private String id;

/**
* 设备ID
*/
@PrimaryKeyColumn(ordinal = 0, type = PrimaryKeyType.PARTITIONED)
private String deviceId;

/**
* 数据时间
*/
@PrimaryKeyColumn(ordinal = 1, type = PrimaryKeyType.CLUSTERED)
private LocalDateTime dataTime;

/**
* 数据类型
*/
private String dataType;

/**
* 数据内容
*/
private String dataContent;
}

4.3 心跳信息实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 心跳信息实体
*/
@Data
public class HeartbeatInfo {

/**
* 设备ID
*/
private String deviceId;

/**
* 是否在线
*/
private boolean online;

/**
* 最后心跳时间
*/
private LocalDateTime lastHeartbeatTime;
}

4.4 设备指令实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* 设备指令实体
*/
@Data
@TableName("t_device_command")
public class DeviceCommand {

/**
* 指令ID
*/
@TableId(type = IdType.AUTO)
private Long id;

/**
* 指令编号
*/
private String commandNo;

/**
* 设备ID
*/
private String deviceId;

/**
* 指令类型
*/
private String commandType;

/**
* 指令数据
*/
private String commandData;

/**
* 指令状态:PENDING-待发送, SENT-已发送, SUCCESS-成功, FAILED-失败
*/
private String status;

/**
* 响应数据
*/
private String responseData;

/**
* 错误信息
*/
private String errorMessage;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 更新时间
*/
private LocalDateTime updateTime;
}

5. 数据库设计

5.1 设备指令表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE TABLE `t_device_command` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '指令ID',
`command_no` VARCHAR(64) NOT NULL COMMENT '指令编号',
`device_id` VARCHAR(64) NOT NULL COMMENT '设备ID',
`command_type` VARCHAR(32) NOT NULL COMMENT '指令类型',
`command_data` TEXT COMMENT '指令数据',
`status` VARCHAR(32) NOT NULL COMMENT '指令状态:PENDING-待发送, SENT-已发送, SUCCESS-成功, FAILED-失败',
`response_data` TEXT COMMENT '响应数据',
`error_message` VARCHAR(512) DEFAULT NULL COMMENT '错误信息',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_command_no` (`command_no`),
KEY `idx_device_id` (`device_id`),
KEY `idx_command_type` (`command_type`),
KEY `idx_status` (`status`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备指令表';

5.2 Cassandra历史数据表

1
2
3
4
5
6
7
8
CREATE TABLE device_history_data (
id TEXT,
device_id TEXT,
data_time TIMESTAMP,
data_type TEXT,
data_content TEXT,
PRIMARY KEY (device_id, data_time)
) WITH CLUSTERING ORDER BY (data_time DESC);

6. 配置类

6.1 Redis配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# application.yml
spring:
redis:
host: localhost
port: 6379
password:
database: 0
timeout: 3000
lettuce:
pool:
max-active: 20
max-idle: 10
min-idle: 5

6.2 Cassandra配置

1
2
3
4
5
6
7
8
# application.yml
spring:
data:
cassandra:
contact-points: localhost
port: 9042
keyspace-name: device_data
cluster-name: Test Cluster

6.3 Kafka配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
consumer:
group-id: service-monitor-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
enable-auto-commit: true

6.4 WebSocket配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* WebSocket配置
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

@Autowired
private RealtimeSubscriptionService subscriptionService;

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new RealtimeDataWebSocketHandler(subscriptionService),
"/api/monitor/websocket")
.setAllowedOrigins("*")
.withSockJS();
}
}

7. 性能优化策略

7.1 缓存优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 数据缓存服务
*/
@Service
@Slf4j
public class DataCacheService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 缓存实时数据
*/
public void cacheRealtimeData(String deviceId, DeviceRealtimeData data) {
String cacheKey = "device:realtime:" + deviceId;
redisTemplate.opsForValue().set(cacheKey, data, 1, TimeUnit.HOURS);
}

/**
* 获取缓存的实时数据
*/
public DeviceRealtimeData getCachedRealtimeData(String deviceId) {
String cacheKey = "device:realtime:" + deviceId;
return (DeviceRealtimeData) redisTemplate.opsForValue().get(cacheKey);
}
}

7.2 批量处理优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 批量数据处理服务
*/
@Service
@Slf4j
public class BatchDataProcessService {

@Autowired
private CassandraTemplate cassandraTemplate;

private final List<DeviceHistoryData> batchBuffer = new ArrayList<>();
private final int BATCH_SIZE = 100;

/**
* 批量保存历史数据
*/
public synchronized void batchSave(DeviceHistoryData data) {
batchBuffer.add(data);

if (batchBuffer.size() >= BATCH_SIZE) {
flushBatch();
}
}

/**
* 刷新批次
*/
private void flushBatch() {
if (batchBuffer.isEmpty()) {
return;
}

try {
// 批量插入到Cassandra
cassandraTemplate.insert(batchBuffer);
batchBuffer.clear();

log.debug("批量保存历史数据成功: count={}", BATCH_SIZE);

} catch (Exception e) {
log.error("批量保存历史数据失败: error={}", e.getMessage(), e);
}
}

/**
* 定时刷新批次
*/
@Scheduled(fixedRate = 5000) // 每5秒刷新一次
public void scheduledFlush() {
flushBatch();
}
}

8. 监控告警

8.1 业务指标监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 业务指标监控
*/
@Component
@Slf4j
public class ServiceMonitorMetrics {

@Autowired
private MeterRegistry meterRegistry;

/**
* 记录实时数据查询
*/
public void recordRealtimeDataQuery(String deviceId) {
Counter.builder("realtime.data.query.total")
.tag("device_id", deviceId)
.register(meterRegistry)
.increment();
}

/**
* 记录历史数据查询
*/
public void recordHistoryDataQuery(String deviceId) {
Counter.builder("history.data.query.total")
.tag("device_id", deviceId)
.register(meterRegistry)
.increment();
}

/**
* 记录WebSocket连接
*/
public void recordWebSocketConnection() {
Counter.builder("websocket.connection.total")
.register(meterRegistry)
.increment();
}

/**
* 记录心跳信息查询
*/
public void recordHeartbeatQuery(String deviceId) {
Counter.builder("heartbeat.query.total")
.tag("device_id", deviceId)
.register(meterRegistry)
.increment();
}
}

8.2 告警规则配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# prometheus-alert-rules.yml
groups:
- name: service_monitor_alerts
rules:
- alert: RealtimeDataQueryFailureRateHigh
expr: rate(realtime_data_query_total{status="failed"}[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "实时数据查询失败率过高"
description: "实时数据查询失败率超过10%,当前值: {{ $value }}"

- alert: WebSocketConnectionFailureRateHigh
expr: rate(websocket_connection_total{status="failed"}[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "WebSocket连接失败率过高"
description: "WebSocket连接失败率超过10%,当前值: {{ $value }}"

9. 总结

本文深入讲解了服务与监控的Java微服务后端架构实战,涵盖了以下核心内容:

  1. 数据服务(柜控-service):提供设备实时和历史数据服务,支持动态切换实时库(Redis)和历史库(Cassandra),实现原报文解析
  2. 数据监控(柜控-monitor):实现实时信息订阅(WebSocket)、心跳信息获取、设备下发接口
  3. 动态数据源管理:实现实时库和历史库的动态切换,支持不同数据源的灵活访问
  4. 原报文解析:支持多种报文格式(JSON、XML、Binary、Protobuf)的解析和转换
  5. WebSocket实时推送:实现实时信息的WebSocket订阅和推送机制
  6. 心跳监控:实现设备心跳信息的收集、存储和查询
  7. 设备指令下发:实现设备指令的下发、状态跟踪和响应处理
  8. 消息队列集成:通过Kafka实现应用事件、心跳信息、实时信息的异步处理
  9. 性能优化:通过缓存、批量处理提升系统性能
  10. 监控告警:通过业务指标监控、告警规则保障系统稳定运行

通过本文的学习,读者可以掌握如何基于Java微服务架构实现一个高性能、高可用、可扩展的服务与监控系统,为实际项目开发提供参考和指导。