设备服务接入系统Java微服务后端架构实战

1. 架构概述

设备服务接入系统是IoT平台的核心基础设施,需要支持海量设备的接入、认证、连接管理、指令下发和数据上报。本篇文章将深入讲解如何基于Java微服务架构实现一个高性能、高可用、可扩展的设备服务接入系统。

1.1 系统架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
物理设备 → 接入网关 → 设备服务 → 数据库/缓存

设备注册/认证

建立连接(MQTT/WebSocket)

心跳检测

设备指令下发

设备数据上报

设备状态管理

设备服务 → 消息队列 → 业务服务

设备数据转发

设备事件通知

1.2 核心组件

  • 接入网关(Access Gateway):负责设备连接接入、协议解析、消息路由
  • 设备服务(Device Service):负责设备注册、设备认证、设备管理、设备状态维护
  • MQTT Broker:负责MQTT协议连接管理和消息转发
  • 设备连接管理器(Device Connection Manager):负责设备连接状态管理、心跳检测
  • 设备指令服务(Device Command Service):负责设备指令下发、指令状态跟踪
  • 设备数据服务(Device Data Service):负责设备数据接收、数据解析、数据转发
  • 数据库(MySQL):持久化设备信息、设备连接记录、设备指令记录
  • 缓存(Redis):缓存设备连接状态、设备在线状态、设备会话信息
  • 消息队列(Kafka/RocketMQ):异步处理设备数据、设备事件通知

2. 接入网关服务实现

2.1 MQTT接入网关核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
/**
* MQTT接入网关
* 负责设备MQTT连接接入、协议解析、消息路由
*/
@Component
@Slf4j
public class MqttAccessGateway {

@Autowired
private DeviceServiceClient deviceServiceClient;

@Autowired
private DeviceConnectionManager deviceConnectionManager;

@Autowired
private DeviceCommandService deviceCommandService;

@Autowired
private DeviceDataService deviceDataService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 处理设备连接请求
* 流程:设备连接 → 设备认证 → 建立连接 → 注册设备连接
*/
public MqttConnectResult handleConnect(MqttConnectMessage message, Channel channel) {
try {
String clientId = message.payload().clientIdentifier();
String username = message.payload().userName();
String password = message.payload().passwordInBytes() != null ?
new String(message.payload().passwordInBytes()) : null;

// 1. 设备认证
DeviceAuthResult authResult = deviceServiceClient.authenticateDevice(
clientId, username, password);

if (!authResult.isSuccess()) {
log.warn("设备认证失败: clientId={}, username={}, reason={}",
clientId, username, authResult.getMessage());
return MqttConnectResult.failed(authResult.getMessage());
}

// 2. 注册设备连接
DeviceConnection connection = deviceConnectionManager.registerConnection(
clientId, channel, authResult.getDeviceInfo());

// 3. 更新设备在线状态
updateDeviceOnlineStatus(clientId, true);

// 4. 发送设备上线事件
sendDeviceOnlineEvent(authResult.getDeviceInfo());

log.info("设备连接成功: clientId={}, deviceId={}, deviceType={}",
clientId, authResult.getDeviceInfo().getDeviceId(),
authResult.getDeviceInfo().getDeviceType());

return MqttConnectResult.success(connection);

} catch (Exception e) {
log.error("处理设备连接失败: error={}", e.getMessage(), e);
return MqttConnectResult.failed("设备连接处理失败: " + e.getMessage());
}
}

/**
* 处理设备断开连接
*/
public void handleDisconnect(String clientId, Channel channel) {
try {
// 1. 移除设备连接
deviceConnectionManager.removeConnection(clientId, channel);

// 2. 更新设备离线状态
updateDeviceOnlineStatus(clientId, false);

// 3. 发送设备离线事件
DeviceInfo deviceInfo = deviceServiceClient.getDeviceInfo(clientId);
if (deviceInfo != null) {
sendDeviceOfflineEvent(deviceInfo);
}

log.info("设备断开连接: clientId={}", clientId);

} catch (Exception e) {
log.error("处理设备断开连接失败: clientId={}, error={}",
clientId, e.getMessage(), e);
}
}

/**
* 处理设备发布消息(数据上报)
*/
public void handlePublish(String clientId, String topic, MqttPublishMessage message) {
try {
// 1. 解析设备数据
byte[] payload = message.payload().array();
DeviceData deviceData = parseDeviceData(clientId, topic, payload);

// 2. 更新设备心跳时间
updateDeviceHeartbeat(clientId);

// 3. 处理设备数据
deviceDataService.processDeviceData(deviceData);

log.debug("处理设备数据上报: clientId={}, topic={}, dataSize={}",
clientId, topic, payload.length);

} catch (Exception e) {
log.error("处理设备数据上报失败: clientId={}, topic={}, error={}",
clientId, topic, e.getMessage(), e);
}
}

/**
* 处理设备订阅请求
*/
public void handleSubscribe(String clientId, MqttSubscribeMessage message) {
try {
// 1. 验证订阅权限
List<MqttTopicSubscription> subscriptions = message.payload().topicSubscriptions();
for (MqttTopicSubscription subscription : subscriptions) {
String topic = subscription.topicName();
if (!validateSubscribePermission(clientId, topic)) {
log.warn("设备订阅权限不足: clientId={}, topic={}", clientId, topic);
continue;
}

// 2. 记录订阅关系
recordSubscription(clientId, topic);
}

log.info("设备订阅成功: clientId={}, subscriptions={}",
clientId, subscriptions.size());

} catch (Exception e) {
log.error("处理设备订阅失败: clientId={}, error={}",
clientId, e.getMessage(), e);
}
}

/**
* 下发设备指令
*/
public void sendDeviceCommand(String clientId, DeviceCommand command) {
try {
// 1. 获取设备连接
DeviceConnection connection = deviceConnectionManager.getConnection(clientId);
if (connection == null || !connection.isActive()) {
throw new BusinessException("设备未连接: " + clientId);
}

// 2. 构建MQTT消息
String topic = buildCommandTopic(clientId, command.getCommandType());
MqttPublishMessage mqttMessage = buildMqttPublishMessage(topic, command);

// 3. 发送MQTT消息
connection.getChannel().writeAndFlush(mqttMessage);

// 4. 记录指令下发
deviceCommandService.recordCommandSent(clientId, command);

log.info("设备指令下发成功: clientId={}, commandId={}, commandType={}",
clientId, command.getCommandId(), command.getCommandType());

} catch (Exception e) {
log.error("设备指令下发失败: clientId={}, commandId={}, error={}",
clientId, command.getCommandId(), e.getMessage(), e);
throw new BusinessException("设备指令下发失败: " + e.getMessage());
}
}

/**
* 更新设备在线状态
*/
private void updateDeviceOnlineStatus(String clientId, boolean online) {
String statusKey = "device:status:" + clientId;
redisTemplate.opsForValue().set(statusKey, online ? "ONLINE" : "OFFLINE",
30, TimeUnit.MINUTES);
}

/**
* 更新设备心跳时间
*/
private void updateDeviceHeartbeat(String clientId) {
String heartbeatKey = "device:heartbeat:" + clientId;
redisTemplate.opsForValue().set(heartbeatKey, System.currentTimeMillis(),
5, TimeUnit.MINUTES);
}

/**
* 发送设备上线事件
*/
private void sendDeviceOnlineEvent(DeviceInfo deviceInfo) {
DeviceOnlineEvent event = new DeviceOnlineEvent();
event.setDeviceId(deviceInfo.getDeviceId());
event.setDeviceType(deviceInfo.getDeviceType());
event.setOnlineTime(LocalDateTime.now());

kafkaTemplate.send("device.online", deviceInfo.getDeviceId(),
JSON.toJSONString(event));
}

/**
* 发送设备离线事件
*/
private void sendDeviceOfflineEvent(DeviceInfo deviceInfo) {
DeviceOfflineEvent event = new DeviceOfflineEvent();
event.setDeviceId(deviceInfo.getDeviceId());
event.setDeviceType(deviceInfo.getDeviceType());
event.setOfflineTime(LocalDateTime.now());

kafkaTemplate.send("device.offline", deviceInfo.getDeviceId(),
JSON.toJSONString(event));
}

/**
* 解析设备数据
*/
private DeviceData parseDeviceData(String clientId, String topic, byte[] payload) {
DeviceData deviceData = new DeviceData();
deviceData.setClientId(clientId);
deviceData.setTopic(topic);
deviceData.setPayload(payload);
deviceData.setReceiveTime(LocalDateTime.now());

// 根据topic解析数据
if (topic.contains("/data/")) {
// 数据上报
deviceData.setDataType("DATA_REPORT");
deviceData.setDataContent(JSON.parseObject(new String(payload), Map.class));
} else if (topic.contains("/status/")) {
// 状态上报
deviceData.setDataType("STATUS_REPORT");
deviceData.setDataContent(JSON.parseObject(new String(payload), Map.class));
} else if (topic.contains("/event/")) {
// 事件上报
deviceData.setDataType("EVENT_REPORT");
deviceData.setDataContent(JSON.parseObject(new String(payload), Map.class));
}

return deviceData;
}

/**
* 验证订阅权限
*/
private boolean validateSubscribePermission(String clientId, String topic) {
// 设备只能订阅自己的指令主题
return topic.startsWith("/device/" + clientId + "/command/");
}

/**
* 记录订阅关系
*/
private void recordSubscription(String clientId, String topic) {
String subscriptionKey = "device:subscription:" + clientId + ":" + topic;
redisTemplate.opsForValue().set(subscriptionKey, "SUBSCRIBED",
24, TimeUnit.HOURS);
}

/**
* 构建指令主题
*/
private String buildCommandTopic(String clientId, String commandType) {
return "/device/" + clientId + "/command/" + commandType;
}

/**
* 构建MQTT发布消息
*/
private MqttPublishMessage buildMqttPublishMessage(String topic, DeviceCommand command) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0);

MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(
topic, 0);

byte[] payload = JSON.toJSONString(command).getBytes();

return new MqttPublishMessage(fixedHeader, variableHeader,
Unpooled.wrappedBuffer(payload));
}
}

2.2 Netty MQTT服务器启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* MQTT服务器启动类
*/
@Component
@Slf4j
public class MqttServer {

@Autowired
private MqttAccessGateway mqttAccessGateway;

@Value("${mqtt.server.port:1883}")
private int mqttPort;

@Value("${mqtt.server.boss-threads:1}")
private int bossThreads;

@Value("${mqtt.server.worker-threads:4}")
private int workerThreads;

private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel serverChannel;

@PostConstruct
public void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup(bossThreads);
workerGroup = new NioEventLoopGroup(workerThreads);

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new MqttChannelInitializer(mqttAccessGateway));

ChannelFuture future = bootstrap.bind(mqttPort).sync();
serverChannel = future.channel();

log.info("MQTT服务器启动成功: port={}", mqttPort);
}

@PreDestroy
public void stop() {
if (serverChannel != null) {
serverChannel.close();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}

log.info("MQTT服务器已关闭");
}
}

/**
* MQTT通道初始化器
*/
public class MqttChannelInitializer extends ChannelInitializer<SocketChannel> {

private final MqttAccessGateway mqttAccessGateway;

public MqttChannelInitializer(MqttAccessGateway mqttAccessGateway) {
this.mqttAccessGateway = mqttAccessGateway;
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 创建一个ChannelPipeline对象,用于管理和执行一系列ChannelHandler
ChannelPipeline pipeline = ch.pipeline();

// MQTT编解码器
// 添加MQTT解码器到ChannelPipeline,设置最大消息大小为1MB
pipeline.addLast(new MqttDecoder(1024 * 1024));
// 添加MQTT编码器到ChannelPipeline
pipeline.addLast(new MqttEncoder());

// MQTT协议处理器
pipeline.addLast(new MqttProtocolHandler(mqttAccessGateway));

// 空闲检测
// 添加空闲状态处理器,设置读空闲时间为60秒,写空闲时间和所有空闲时间均为0
// 当连接在指定时间内没有读事件发生时,会触发空闲状态事件
pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
// 添加自定义的MQTT空闲状态处理器,用于处理MQTT连接的空闲状态事件
// 当IdleStateHandler触发空闲事件时,此处理器会相应地处理连接状态
pipeline.addLast(new MqttIdleHandler());
}
}

3. 设备服务实现

3.1 设备认证服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/**
* 设备认证服务
* 负责设备注册、设备认证
*/
@Service
@Slf4j
public class DeviceAuthService {

@Autowired
private DeviceMapper deviceMapper;

@Autowired
private DeviceCredentialMapper deviceCredentialMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 设备认证
*/
public DeviceAuthResult authenticateDevice(String clientId, String username, String password) {
try {
// 1. 查询设备凭证
DeviceCredential credential = deviceCredentialMapper.selectByClientId(clientId);
if (credential == null) {
return DeviceAuthResult.failed("设备不存在");
}

// 2. 验证用户名
if (!credential.getUsername().equals(username)) {
return DeviceAuthResult.failed("用户名错误");
}

// 3. 验证密码
if (!validatePassword(password, credential.getPassword())) {
return DeviceAuthResult.failed("密码错误");
}

// 4. 查询设备信息
Device device = deviceMapper.selectById(credential.getDeviceId());
if (device == null) {
return DeviceAuthResult.failed("设备信息不存在");
}

// 5. 检查设备状态
if (!"ACTIVE".equals(device.getStatus())) {
return DeviceAuthResult.failed("设备未激活");
}

// 6. 构建设备信息
DeviceInfo deviceInfo = convertToDeviceInfo(device);

log.info("设备认证成功: clientId={}, deviceId={}", clientId, device.getDeviceId());

return DeviceAuthResult.success(deviceInfo);

} catch (Exception e) {
log.error("设备认证失败: clientId={}, error={}", clientId, e.getMessage(), e);
return DeviceAuthResult.failed("设备认证失败: " + e.getMessage());
}
}

/**
* 设备注册
*/
@Transactional(rollbackFor = Exception.class)
public DeviceRegisterResult registerDevice(DeviceRegisterRequest request) {
try {
// 1. 检查设备是否已存在
Device existingDevice = deviceMapper.selectByDeviceNo(request.getDeviceNo());
if (existingDevice != null) {
return DeviceRegisterResult.failed("设备已存在");
}

// 2. 创建设备信息
Device device = new Device();
device.setDeviceNo(request.getDeviceNo());
device.setDeviceName(request.getDeviceName());
device.setDeviceType(request.getDeviceType());
device.setManufacturer(request.getManufacturer());
device.setModel(request.getModel());
device.setStatus("INACTIVE");
device.setCreateTime(LocalDateTime.now());
device.setUpdateTime(LocalDateTime.now());

deviceMapper.insert(device);

// 3. 创建设备凭证
String clientId = generateClientId(device.getId());
String username = generateUsername(device.getId());
String password = generatePassword();

DeviceCredential credential = new DeviceCredential();
credential.setDeviceId(device.getId());
credential.setClientId(clientId);
credential.setUsername(username);
credential.setPassword(encryptPassword(password));
credential.setCreateTime(LocalDateTime.now());
credential.setUpdateTime(LocalDateTime.now());

deviceCredentialMapper.insert(credential);

log.info("设备注册成功: deviceId={}, deviceNo={}, clientId={}",
device.getId(), device.getDeviceNo(), clientId);

DeviceRegisterResult result = new DeviceRegisterResult();
result.setSuccess(true);
result.setDeviceId(device.getId());
result.setClientId(clientId);
result.setUsername(username);
result.setPassword(password);
result.setMessage("设备注册成功");

return result;

} catch (Exception e) {
log.error("设备注册失败: deviceNo={}, error={}",
request.getDeviceNo(), e.getMessage(), e);
return DeviceRegisterResult.failed("设备注册失败: " + e.getMessage());
}
}

/**
* 验证密码
*/
private boolean validatePassword(String inputPassword, String storedPassword) {
// 使用BCrypt或其他加密算法验证密码
return BCrypt.checkpw(inputPassword, storedPassword);
}

/**
* 加密密码
*/
private String encryptPassword(String password) {
return BCrypt.hashpw(password, BCrypt.gensalt());
}

/**
* 生成客户端ID
*/
private String generateClientId(Long deviceId) {
return "DEVICE_" + deviceId + "_" + System.currentTimeMillis();
}

/**
* 生成用户名
*/
private String generateUsername(Long deviceId) {
return "USER_" + deviceId;
}

/**
* 生成密码
*/
private String generatePassword() {
return UUID.randomUUID().toString().replace("-", "");
}

/**
* 转换为设备信息
*/
private DeviceInfo convertToDeviceInfo(Device device) {
DeviceInfo deviceInfo = new DeviceInfo();
deviceInfo.setDeviceId(device.getId());
deviceInfo.setDeviceNo(device.getDeviceNo());
deviceInfo.setDeviceName(device.getDeviceName());
deviceInfo.setDeviceType(device.getDeviceType());
deviceInfo.setManufacturer(device.getManufacturer());
deviceInfo.setModel(device.getModel());
deviceInfo.setStatus(device.getStatus());
return deviceInfo;
}
}

3.2 设备连接管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
/**
* 设备连接管理器
* 负责设备连接状态管理、心跳检测
*/
@Service
@Slf4j
public class DeviceConnectionManager {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private DeviceMapper deviceMapper;

private final ConcurrentHashMap<String, DeviceConnection> connections = new ConcurrentHashMap<>();

/**
* 注册设备连接
*/
public DeviceConnection registerConnection(String clientId, Channel channel, DeviceInfo deviceInfo) {
/**
* 创建设备连接对象并设置相关属性
* 该代码块主要用于初始化一个设备连接实例,并配置其基本属性
*/
DeviceConnection connection = new DeviceConnection(); // 创建新的设备连接对象
// 设置客户端ID
connection.setClientId(clientId);
// 设置设备ID,从设备信息中获取
connection.setDeviceId(deviceInfo.getDeviceId());
// 设置通信通道
connection.setChannel(channel);
// 设置设备信息对象
connection.setDeviceInfo(deviceInfo);
// 设置连接时间为当前系统时间
connection.setConnectTime(LocalDateTime.now());
// 设置最后心跳时间为当前系统时间
connection.setLastHeartbeatTime(LocalDateTime.now());
// 设置连接状态为活跃
connection.setActive(true);

connections.put(clientId, connection);

// 缓存连接信息
String connectionKey = "device:connection:" + clientId;
redisTemplate.opsForValue().set(connectionKey, connection,
30, TimeUnit.MINUTES);

log.info("设备连接注册成功: clientId={}, deviceId={}",
clientId, deviceInfo.getDeviceId());

return connection;
}

/**
* 移除设备连接
*/
public void removeConnection(String clientId, Channel channel) {
DeviceConnection connection = connections.remove(clientId);
if (connection != null) {
// 将连接状态设置为非活跃状态
connection.setActive(false); // 设置连接的活跃状态为false,表示连接已断开或不再活跃
// 记录连接断开的具体时间
connection.setDisconnectTime(LocalDateTime.now()); // 使用当前系统时间设置连接的断开时间

// 删除缓存
String connectionKey = "device:connection:" + clientId;
redisTemplate.delete(connectionKey);

log.info("设备连接移除成功: clientId={}", clientId);
}
}

/**
* 获取设备连接
*/
public DeviceConnection getConnection(String clientId) {
// 1. 从内存获取
DeviceConnection connection = connections.get(clientId);
if (connection != null && connection.isActive()) {
return connection;
}

// 2. 从缓存获取
String connectionKey = "device:connection:" + clientId;
connection = (DeviceConnection) redisTemplate.opsForValue().get(connectionKey);

return connection;
}

/**
* 更新设备心跳
*/
public void updateHeartbeat(String clientId) {
DeviceConnection connection = connections.get(clientId);
if (connection != null) {
connection.setLastHeartbeatTime(LocalDateTime.now());

// 更新缓存
String connectionKey = "device:connection:" + clientId;
redisTemplate.opsForValue().set(connectionKey, connection,
30, TimeUnit.MINUTES);
}
}

/**
* 检查设备心跳超时
*/
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkHeartbeatTimeout() {
try {
LocalDateTime now = LocalDateTime.now();
LocalDateTime timeoutThreshold = now.minusMinutes(5); // 5分钟超时

List<String> timeoutClients = new ArrayList<>();

for (Map.Entry<String, DeviceConnection> entry : connections.entrySet()) {
/**
* 检查设备连接的心跳时间,判断是否超时
* 遍历设备连接映射,将超时的客户端ID添加到超时列表中
*/
// 获取当前遍历项的客户端ID
String clientId = entry.getKey();
// 获取与客户端ID对应的设备连接对象
DeviceConnection connection = entry.getValue();

// 判断设备连接的最后心跳时间是否早于超时阈值时间
if (connection.getLastHeartbeatTime().isBefore(timeoutThreshold)) {
// 如果超时,将客户端ID添加到超时客户端列表中
timeoutClients.add(clientId);
}
}

// 处理超时设备
for (String clientId : timeoutClients) {
handleHeartbeatTimeout(clientId);
}

// 检查是否存在心跳超时的设备
if (!timeoutClients.isEmpty()) {
// 如果存在超时设备,输出警告日志
// 日志内容包括超时设备的数量和具体设备列表
log.warn("检测到心跳超时设备: count={}, clients={}",
timeoutClients.size(), timeoutClients);
}

} catch (Exception e) {
log.error("检查设备心跳超时失败: error={}", e.getMessage(), e);
}
}

/**
* 处理心跳超时
*/
/**
* 处理设备心跳超时的方法
* 当设备在一定时间内未发送心跳时,此方法会被调用以处理超时逻辑
* @param clientId 客户端ID,用于标识设备连接
*/
private void handleHeartbeatTimeout(String clientId) {
// 从连接映射中获取设备连接对象
DeviceConnection connection = connections.get(clientId);
if (connection != null) {
// 关闭连接
if (connection.getChannel() != null && connection.getChannel().isActive()) {
connection.getChannel().close();
}

// 移除连接
removeConnection(clientId, connection.getChannel());

// 更新设备状态为离线
updateDeviceOfflineStatus(connection.getDeviceId());

log.warn("设备心跳超时,断开连接: clientId={}, deviceId={}",
clientId, connection.getDeviceId());
}
}

/**
* 更新设备离线状态
*/
/**
* 更新设备离线状态
* @param deviceId 设备ID,用于标识需要更新状态的设备
*/
private void updateDeviceOfflineStatus(Long deviceId) {
// 根据设备ID查询设备信息
Device device = deviceMapper.selectById(deviceId);
// 判断设备是否存在
if (device != null) {
// 设置设备状态为离线
device.setStatus("OFFLINE");
// 更新设备修改时间为当前时间
device.setUpdateTime(LocalDateTime.now());
// 执行更新操作
deviceMapper.updateById(device);
}
}
}

4. 设备指令服务实现

4.1 设备指令服务核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/**
* 设备指令服务
* 负责设备指令下发、指令状态跟踪
*/
@Service
@Slf4j
public class DeviceCommandService {

@Autowired
private DeviceCommandMapper deviceCommandMapper;

@Autowired
private MqttAccessGateway mqttAccessGateway;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 下发设备指令
*/
public DeviceCommandResult sendCommand(DeviceCommandRequest request) {
try {
// 1. 创建设备指令记录
DeviceCommand command = createDeviceCommand(request);
deviceCommandMapper.insert(command);

// 2. 下发指令到设备
mqttAccessGateway.sendDeviceCommand(request.getClientId(), command);

// 3. 记录指令下发
recordCommandSent(command);

// 4. 构建返回结果
/**
* 创建设备命令结果对象
* 并设置命令的基本信息
*/
// 创建一个新的DeviceCommandResult对象
DeviceCommandResult result = new DeviceCommandResult();
// 设置命令执行结果为成功
result.setSuccess(true);
// 设置命令ID
result.setCommandId(command.getId());
// 设置命令编号
result.setCommandNo(command.getCommandNo());
// 设置命令状态为"已发送"
result.setStatus("SENT");
// 设置命令执行结果的消息
result.setMessage("指令下发成功");

log.info("设备指令下发成功: commandId={}, clientId={}, commandType={}",
command.getId(), request.getClientId(), request.getCommandType());

return result;

} catch (Exception e) {
log.error("设备指令下发失败: clientId={}, commandType={}, error={}",
request.getClientId(), request.getCommandType(), e.getMessage(), e);
return DeviceCommandResult.failed("指令下发失败: " + e.getMessage());
}
}

/**
* 处理设备指令响应
*/
@KafkaListener(topics = "device.command.response", groupId = "device-command-group")
public void handleCommandResponse(String message) {
try {
DeviceCommandResponse response = JSON.parseObject(message, DeviceCommandResponse.class);

// 1. 查询指令记录
DeviceCommand command = deviceCommandMapper.selectByCommandNo(
response.getCommandNo());
if (command == null) {
log.warn("设备指令记录不存在: commandNo={}", response.getCommandNo());
return;
}

// 2. 更新指令状态
// 设置命令执行状态,根据响应是否成功决定状态为"SUCCESS"或"FAILED"
command.setStatus(response.isSuccess() ? "SUCCESS" : "FAILED");
// 设置命令的响应时间为当前时间
command.setResponseTime(LocalDateTime.now());
// 设置命令的响应数据,从响应对象中获取
command.setResponseData(response.getResponseData());
// 设置命令的错误信息,从响应对象中获取
command.setErrorMessage(response.getErrorMessage());
// 更新命令的修改时间为当前时间
command.setUpdateTime(LocalDateTime.now());

deviceCommandMapper.updateById(command);

// 3. 发送指令响应事件
sendCommandResponseEvent(command, response);

log.info("设备指令响应处理成功: commandId={}, commandNo={}, status={}",
command.getId(), command.getCommandNo(), command.getStatus());

} catch (Exception e) {
log.error("处理设备指令响应失败: error={}", e.getMessage(), e);
}
}

/**
* 创建设备指令
*/
private DeviceCommand createDeviceCommand(DeviceCommandRequest request) {
/**
* 创建设备命令对象并设置相关属性
* 该代码块用于初始化一个设备命令对象,填充从请求中获取的各项信息,并设置初始状态
*/
DeviceCommand command = new DeviceCommand(); // 创建新的设备命令对象
// 设置命令的基本属性
command.setCommandNo(generateCommandNo()); // 生成并设置命令编号
command.setClientId(request.getClientId()); // 设置客户端ID
command.setDeviceId(request.getDeviceId()); // 设置设备ID
command.setCommandType(request.getCommandType()); // 设置命令类型
command.setCommandData(request.getCommandData()); // 设置命令数据
// 设置命令的状态和时间信息
command.setStatus("PENDING"); // 设置命令状态为"待处理"
command.setCreateTime(LocalDateTime.now()); // 设置创建时间为当前时间
command.setUpdateTime(LocalDateTime.now()); // 设置更新时间为当前时间

return command;
}

/**
* 记录指令下发
*/
private void recordCommandSent(DeviceCommand command) {
String commandKey = "device:command:" + command.getCommandNo();
redisTemplate.opsForValue().set(commandKey, command,
24, TimeUnit.HOURS);
}

/**
* 发送指令响应事件
*/
private void sendCommandResponseEvent(DeviceCommand command, DeviceCommandResponse response) {
/**
* 创建设备命令响应事件对象并进行发送
* 该代码块主要功能是构建一个设备命令响应事件,并通过Kafka发送相关数据
*/
// 创建一个新的设备命令响应事件对象
DeviceCommandResponseEvent event = new DeviceCommandResponseEvent();
// 设置命令ID,关联到原始命令
event.setCommandId(command.getId());
// 设置命令编号,用于唯一标识一条命令
event.setCommandNo(command.getCommandNo());
// 设置客户端ID,标识发送命令的客户端
event.setClientId(command.getClientId());
// 设置设备ID,标识接收命令的设备
event.setDeviceId(command.getDeviceId());
// 设置命令类型,用于区分不同类型的命令
event.setCommandType(command.getCommandType());
// 设置命令执行是否成功,来自响应结果
event.setSuccess(response.isSuccess());
// 设置命令响应的具体数据
event.setResponseData(response.getResponseData());
// 设置响应时间,记录当前时间戳
event.setResponseTime(LocalDateTime.now());

// 通过Kafka发送设备命令响应事件
// 使用指定的主题"device.command.response.event"
// 以命令编号作为消息键,确保消息顺序和可追踪性
// 将事件对象转换为JSON格式作为消息内容发送
kafkaTemplate.send("device.command.response.event",
command.getCommandNo(), JSON.toJSONString(event));
}

/**
* 生成指令编号
*/
private String generateCommandNo() {
return "CMD_" + System.currentTimeMillis() + "_" +
UUID.randomUUID().toString().substring(0, 8);
}
}

5. 设备数据服务实现

5.1 设备数据服务核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/**
* 设备数据服务
* 负责设备数据接收、数据解析、数据转发
*/
@Service
@Slf4j
public class DeviceDataService {

@Autowired
private DeviceDataMapper deviceDataMapper;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 处理设备数据
*/
@Async("deviceDataExecutor")
public void processDeviceData(DeviceData deviceData) {
try {
// 1. 保存设备数据
saveDeviceData(deviceData);

// 2. 根据数据类型处理
/**
* 根据设备数据类型进行相应的处理
* 这是一个switch语句,用于判断设备数据的类型并调用对应的处理方法
*
* @param deviceData 包含设备数据的对象,通过其getDataType()方法获取数据类型
*/
switch (deviceData.getDataType()) { // 使用switch语句判断设备数据类型
case "DATA_REPORT": // 当数据类型为"DATA_REPORT"时
processDataReport(deviceData); // 调用处理数据报告的方法
break; // 跳出switch语句
case "STATUS_REPORT": // 当数据类型为"STATUS_REPORT"时
processStatusReport(deviceData); // 调用处理状态报告的方法
break; // 跳出switch语句
case "EVENT_REPORT": // 当数据类型为"EVENT_REPORT"时
processEventReport(deviceData); // 调用处理事件报告的方法
break; // 跳出switch语句
default: // 当数据类型不匹配以上任何情况时
log.warn("未知的设备数据类型: dataType={}", deviceData.getDataType()); // 记录警告日志
}

log.debug("设备数据处理成功: clientId={}, dataType={}",
deviceData.getClientId(), deviceData.getDataType());

} catch (Exception e) {
log.error("处理设备数据失败: clientId={}, dataType={}, error={}",
deviceData.getClientId(), deviceData.getDataType(), e.getMessage(), e);
}
}

/**
* 保存设备数据
*/
/**
* 保存设备数据的方法
* 将传入的设备数据对象转换为记录对象并保存到数据库
* @param deviceData 包含设备数据的对象,包含客户端ID、设备ID、数据类型等信息
*/
private void saveDeviceData(DeviceData deviceData) {
// 创建一个新的设备数据记录对象
DeviceDataRecord record = new DeviceDataRecord();
// 设置记录的客户端ID
record.setClientId(deviceData.getClientId());
// 设置记录的设备ID
record.setDeviceId(deviceData.getDeviceId());
// 设置记录的数据类型
record.setDataType(deviceData.getDataType());
// 设置记录的主题
record.setTopic(deviceData.getTopic());
// 将数据内容转换为JSON字符串格式并设置
record.setDataContent(JSON.toJSONString(deviceData.getDataContent()));
// 设置记录的接收时间
record.setReceiveTime(deviceData.getReceiveTime());
// 设置记录的创建时间为当前时间
record.setCreateTime(LocalDateTime.now());

// 将记录插入到数据库中
deviceDataMapper.insert(record);
}

/**
* 处理数据上报
*/
private void processDataReport(DeviceData deviceData) {
// 1. 发送到数据上报主题
DeviceDataReportEvent event = new DeviceDataReportEvent();
event.setClientId(deviceData.getClientId());
event.setDeviceId(deviceData.getDeviceId());
event.setDataContent(deviceData.getDataContent());
event.setReceiveTime(deviceData.getReceiveTime());

kafkaTemplate.send("device.data.report",
deviceData.getClientId(), JSON.toJSONString(event));

// 2. 更新设备数据缓存
updateDeviceDataCache(deviceData);
}

/**
* 处理状态上报
*/
private void processStatusReport(DeviceData deviceData) {
// 1. 发送到状态上报主题
DeviceStatusReportEvent event = new DeviceStatusReportEvent();
event.setClientId(deviceData.getClientId());
event.setDeviceId(deviceData.getDeviceId());
event.setStatusData(deviceData.getDataContent());
event.setReportTime(deviceData.getReceiveTime());

kafkaTemplate.send("device.status.report",
deviceData.getClientId(), JSON.toJSONString(event));

// 2. 更新设备状态缓存
updateDeviceStatusCache(deviceData);
}

/**
* 处理事件上报
*/
private void processEventReport(DeviceData deviceData) {
// 发送到事件上报主题
DeviceEventReportEvent event = new DeviceEventReportEvent();
event.setClientId(deviceData.getClientId());
event.setDeviceId(deviceData.getDeviceId());
event.setEventData(deviceData.getDataContent());
event.setEventTime(deviceData.getReceiveTime());

kafkaTemplate.send("device.event.report",
deviceData.getClientId(), JSON.toJSONString(event));
}

/**
* 更新设备数据缓存
*/
private void updateDeviceDataCache(DeviceData deviceData) {
String cacheKey = "device:data:latest:" + deviceData.getClientId();
redisTemplate.opsForValue().set(cacheKey, deviceData.getDataContent(),
1, TimeUnit.HOURS);
}

/**
* 更新设备状态缓存
*/
private void updateDeviceStatusCache(DeviceData deviceData) {
String cacheKey = "device:status:latest:" + deviceData.getClientId();
redisTemplate.opsForValue().set(cacheKey, deviceData.getDataContent(),
30, TimeUnit.MINUTES);
}
}

6. 数据模型定义

6.1 设备实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 设备实体
*/
@Data
@TableName("t_device")
public class Device {

/**
* 设备ID
*/
@TableId(type = IdType.AUTO)
private Long id;

/**
* 设备编号
*/
private String deviceNo;

/**
* 设备名称
*/
private String deviceName;

/**
* 设备类型
*/
private String deviceType;

/**
* 制造商
*/
private String manufacturer;

/**
* 型号
*/
private String model;

/**
* 设备状态:INACTIVE-未激活, ACTIVE-激活, OFFLINE-离线, ONLINE-在线
*/
private String status;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 更新时间
*/
private LocalDateTime updateTime;
}

6.2 设备凭证实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 设备凭证实体
*/
@Data
@TableName("t_device_credential")
public class DeviceCredential {

/**
* 凭证ID
*/
@TableId(type = IdType.AUTO)
private Long id;

/**
* 设备ID
*/
private Long deviceId;

/**
* 客户端ID
*/
private String clientId;

/**
* 用户名
*/
private String username;

/**
* 密码(加密)
*/
private String password;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 更新时间
*/
private LocalDateTime updateTime;
}

6.3 设备指令实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 设备指令实体
*/
@Data
@TableName("t_device_command")
public class DeviceCommand {

/**
* 指令ID
*/
@TableId(type = IdType.AUTO)
private Long id;

/**
* 指令编号
*/
private String commandNo;

/**
* 客户端ID
*/
private String clientId;

/**
* 设备ID
*/
private Long deviceId;

/**
* 指令类型
*/
private String commandType;

/**
* 指令数据
*/
private String commandData;

/**
* 指令状态:PENDING-待发送, SENT-已发送, SUCCESS-成功, FAILED-失败
*/
private String status;

/**
* 响应时间
*/
private LocalDateTime responseTime;

/**
* 响应数据
*/
private String responseData;

/**
* 错误信息
*/
private String errorMessage;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 更新时间
*/
private LocalDateTime updateTime;
}

6.4 设备数据记录实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 设备数据记录实体
*/
@Data
@TableName("t_device_data")
public class DeviceDataRecord {

/**
* 记录ID
*/
@TableId(type = IdType.AUTO)
private Long id;

/**
* 客户端ID
*/
private String clientId;

/**
* 设备ID
*/
private Long deviceId;

/**
* 数据类型:DATA_REPORT-数据上报, STATUS_REPORT-状态上报, EVENT_REPORT-事件上报
*/
private String dataType;

/**
* 主题
*/
private String topic;

/**
* 数据内容(JSON)
*/
private String dataContent;

/**
* 接收时间
*/
private LocalDateTime receiveTime;

/**
* 创建时间
*/
private LocalDateTime createTime;
}

7. 数据库设计

7.1 设备表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE `t_device` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '设备ID',
`device_no` VARCHAR(64) NOT NULL COMMENT '设备编号',
`device_name` VARCHAR(128) NOT NULL COMMENT '设备名称',
`device_type` VARCHAR(32) NOT NULL COMMENT '设备类型',
`manufacturer` VARCHAR(64) DEFAULT NULL COMMENT '制造商',
`model` VARCHAR(64) DEFAULT NULL COMMENT '型号',
`status` VARCHAR(32) NOT NULL COMMENT '设备状态:INACTIVE-未激活, ACTIVE-激活, OFFLINE-离线, ONLINE-在线',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_device_no` (`device_no`),
KEY `idx_device_type` (`device_type`),
KEY `idx_status` (`status`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备表';

7.2 设备凭证表

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE `t_device_credential` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '凭证ID',
`device_id` BIGINT(20) NOT NULL COMMENT '设备ID',
`client_id` VARCHAR(128) NOT NULL COMMENT '客户端ID',
`username` VARCHAR(64) NOT NULL COMMENT '用户名',
`password` VARCHAR(255) NOT NULL COMMENT '密码(加密)',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_client_id` (`client_id`),
UNIQUE KEY `uk_username` (`username`),
KEY `idx_device_id` (`device_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备凭证表';

7.3 设备指令表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
CREATE TABLE `t_device_command` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '指令ID',
`command_no` VARCHAR(64) NOT NULL COMMENT '指令编号',
`client_id` VARCHAR(128) NOT NULL COMMENT '客户端ID',
`device_id` BIGINT(20) NOT NULL COMMENT '设备ID',
`command_type` VARCHAR(32) NOT NULL COMMENT '指令类型',
`command_data` TEXT COMMENT '指令数据',
`status` VARCHAR(32) NOT NULL COMMENT '指令状态:PENDING-待发送, SENT-已发送, SUCCESS-成功, FAILED-失败',
`response_time` DATETIME DEFAULT NULL COMMENT '响应时间',
`response_data` TEXT COMMENT '响应数据',
`error_message` VARCHAR(512) DEFAULT NULL COMMENT '错误信息',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_command_no` (`command_no`),
KEY `idx_client_id` (`client_id`),
KEY `idx_device_id` (`device_id`),
KEY `idx_command_type` (`command_type`),
KEY `idx_status` (`status`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备指令表';

7.4 设备数据表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE `t_device_data` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '记录ID',
`client_id` VARCHAR(128) NOT NULL COMMENT '客户端ID',
`device_id` BIGINT(20) NOT NULL COMMENT '设备ID',
`data_type` VARCHAR(32) NOT NULL COMMENT '数据类型:DATA_REPORT-数据上报, STATUS_REPORT-状态上报, EVENT_REPORT-事件上报',
`topic` VARCHAR(256) NOT NULL COMMENT '主题',
`data_content` TEXT COMMENT '数据内容(JSON)',
`receive_time` DATETIME NOT NULL COMMENT '接收时间',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_client_id` (`client_id`),
KEY `idx_device_id` (`device_id`),
KEY `idx_data_type` (`data_type`),
KEY `idx_receive_time` (`receive_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备数据表';

8. Mapper实现

8.1 设备Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 设备Mapper
*/
@Mapper
public interface DeviceMapper extends BaseMapper<Device> {

/**
* 根据设备编号查询设备
*/
@Select("SELECT * FROM t_device WHERE device_no = #{deviceNo}")
Device selectByDeviceNo(@Param("deviceNo") String deviceNo);

/**
* 根据设备ID查询设备
*/
@Select("SELECT * FROM t_device WHERE id = #{deviceId}")
Device selectById(@Param("deviceId") Long deviceId);
}

8.2 设备凭证Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 设备凭证Mapper
*/
@Mapper
public interface DeviceCredentialMapper extends BaseMapper<DeviceCredential> {

/**
* 根据客户端ID查询凭证
*/
@Select("SELECT * FROM t_device_credential WHERE client_id = #{clientId}")
DeviceCredential selectByClientId(@Param("clientId") String clientId);

/**
* 根据设备ID查询凭证
*/
@Select("SELECT * FROM t_device_credential WHERE device_id = #{deviceId}")
DeviceCredential selectByDeviceId(@Param("deviceId") Long deviceId);
}

8.3 设备指令Mapper

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 设备指令Mapper
*/
@Mapper
public interface DeviceCommandMapper extends BaseMapper<DeviceCommand> {

/**
* 根据指令编号查询指令
*/
@Select("SELECT * FROM t_device_command WHERE command_no = #{commandNo}")
DeviceCommand selectByCommandNo(@Param("commandNo") String commandNo);
}

9. 配置类

9.1 MQTT配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# application.yml
# MQTT配置部分
mqtt:
# MQTT服务器配置
server:
# 服务器端口号,默认为1883
port: 1883
# Boss线程数,用于处理连接请求,默认为1
boss-threads: 1
# Worker线程数,用于处理IO请求,默认为4
worker-threads: 4
# 最大消息大小限制,单位为字节,默认为1MB(1048576字节)
max-message-size: 1048576
# 保持连接时间,单位为秒,默认为60秒
keep-alive: 60

9.2 异步任务配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 异步任务配置
*/
@Configuration
@EnableAsync
public class AsyncConfig {

/**
* 创建一个名为"deviceDataExecutor"的线程池Bean
* 该线程池用于处理设备数据相关的异步任务
*
* @return 配置好的ThreadPoolTaskExecutor实例
*/
@Bean("deviceDataExecutor")
public Executor deviceDataExecutor() {
// 创建线程池任务执行器实例
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程池大小,即保持活跃的线程数
executor.setCorePoolSize(20);
// 设置最大线程池大小,当任务队列满时可以创建的最大线程数
executor.setMaxPoolSize(50);
// 设置任务队列容量,用于存放待执行任务
executor.setQueueCapacity(1000);
// 设置线程空闲后的存活时间,单位为秒
executor.setKeepAliveSeconds(60);
// 设置线程名称前缀,方便日志追踪和问题排查
executor.setThreadNamePrefix("device-data-");
// 设置拒绝策略,当任务队列满且线程数达到最大值时的处理策略
// CallerRunsPolicy表示由提交任务的线程自己执行该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化线程池执行器
executor.initialize();
// 返回配置好的线程池执行器
return executor;
}
}

9.3 Kafka配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# application.yml
spring:
kafka:
# Kafka 服务器地址,用于连接 Kafka 集群
bootstrap-servers: localhost:9092
producer:
# 消息键的序列化器,用于将键转换为字节
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消息值的序列化器,用于将值转换为字节
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 生产者确认机制,all 表示所有副本都收到消息才认为发送成功
acks: all
# 发送失败时的重试次数
retries: 3
# 批量发送消息的大小限制(字节)
batch-size: 16384
# 等待时间(毫秒),如果达到该时间或 batch-size 满足则发送消息
linger-ms: 1
# 生产者可以使用的缓冲区大小(字节)
buffer-memory: 33554432
consumer:
# 消费者组 ID,用于标识消费者组
group-id: device-access-group
# 消息键的反序列化器,用于将字节转换为键
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
enable-auto-commit: true
# 消息值的反序列化器,用于将字节转换为值

10. 性能优化策略

10.1 连接池优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Netty连接池配置
*/
@Configuration
public class NettyConfig {

@Bean
public EventLoopGroup bossGroup() {
return new NioEventLoopGroup(1);
}

/**
* 创建并配置一个EventLoopGroup Bean,用于Netty的 worker 线程组
* workerGroup用于处理客户端连接的IO操作,如读写等
*
* @return 返回一个配置好的NioEventLoopGroup实例
* 线程数设置为可用处理器的两倍,以充分利用多核CPU资源
* 提高并发处理能力
*/
@Bean
public EventLoopGroup workerGroup() {
return new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
}
}

10.2 缓存优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 设备连接缓存服务
*/
@Service
@Slf4j
public class DeviceConnectionCacheService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 缓存设备连接信息
*/
public void cacheConnection(String clientId, DeviceConnection connection) {
String cacheKey = "device:connection:" + clientId;
redisTemplate.opsForValue().set(cacheKey, connection,
30, TimeUnit.MINUTES);
}

/**
* 获取缓存的设备连接
*/
public DeviceConnection getCachedConnection(String clientId) {
String cacheKey = "device:connection:" + clientId;
return (DeviceConnection) redisTemplate.opsForValue().get(cacheKey);
}
}

10.3 批量处理优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 设备数据批量处理服务
*/
@Service
@Slf4j
public class DeviceDataBatchService {

@Autowired
private DeviceDataMapper deviceDataMapper;

private final List<DeviceDataRecord> batchBuffer = new ArrayList<>();
private final int BATCH_SIZE = 100;

/**
* 批量保存设备数据
*/
public synchronized void batchSave(DeviceDataRecord record) {
batchBuffer.add(record);

if (batchBuffer.size() >= BATCH_SIZE) {
flushBatch();
}
}

/**
* 刷新批次
*/
private void flushBatch() {
if (batchBuffer.isEmpty()) {
return;
}

try {
// 批量插入
deviceDataMapper.insertBatch(batchBuffer);
batchBuffer.clear();

log.debug("批量保存设备数据成功: count={}", BATCH_SIZE);

} catch (Exception e) {
log.error("批量保存设备数据失败: error={}", e.getMessage(), e);
}
}

/**
* 定时刷新批次
*/
@Scheduled(fixedRate = 5000) // 每5秒刷新一次
public void scheduledFlush() {
flushBatch();
}
}

11. 监控告警

11.1 业务指标监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* 业务指标监控
*/
@Component
@Slf4j
public class DeviceAccessMetricsMonitor {

@Autowired
private MeterRegistry meterRegistry;

/**
* 记录设备连接
*/
public void recordDeviceConnect(String deviceType) {
Counter.builder("device.connect.total")
.tag("device_type", deviceType)
.register(meterRegistry)
.increment();
}

/**
* 记录设备断开
*/
public void recordDeviceDisconnect(String deviceType) {
Counter.builder("device.disconnect.total")
.tag("device_type", deviceType)
.register(meterRegistry)
.increment();
}

/**
* 记录设备数据上报
*/
public void recordDeviceDataReport(String dataType) {
Counter.builder("device.data.report.total")
.tag("data_type", dataType)
.register(meterRegistry)
.increment();
}

/**
* 记录设备指令下发
*/
/**
* 记录已发送的设备命令
* 该方法用于统计和记录设备命令的发送情况,通过指标计数器实现
*
* @param commandType 命令类型,作为标签值用于区分不同类型的命令
*/
public void recordDeviceCommandSent(String commandType) {
// 使用Counter构建器创建一个名为"device.command.sent.total"的计数器
// 添加"command_type"标签,值为传入的commandType参数
// 将计数器注册到meterRegistry中
// 调用increment()方法使计数器加1
Counter.builder("device.command.sent.total")
.tag("command_type", commandType)
.register(meterRegistry)
.increment();
}

/**
* 记录设备心跳超时
*/
public void recordHeartbeatTimeout() {
Counter.builder("device.heartbeat.timeout.total")
.register(meterRegistry)
.increment();
}
}

11.2 告警规则配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# prometheus-alert-rules.yml
groups:
# 设备访问告警规则配置
# 用于监控设备连接、心跳和指令下发相关的异常情况
- name: device_access_alerts # 告警规则组名称,表示设备访问相关的告警
rules: # 具体的告警规则列表

# 设备连接失败率告警规则
- alert: DeviceConnectFailureRateHigh # 告警名称,表示设备连接失败率过高
expr: rate(device_connect_total{status="failed"}[5m]) > 0.1 # PromQL表达式,计算5分钟内设备连接失败率,超过0.1触发告警
for: 5m # 告警持续时间,表示指标持续5分钟满足条件后才触发告警
labels: # 告警标签,用于标识告警的严重程度等元信息
severity: warning # 告警级别,设置为警告级别
annotations: # 告警注释,用于提供告警的详细描述信息
summary: "设备连接失败率过高" # 告警摘要,简明扼要地描述告警内容
description: "设备连接失败率超过10%,当前值: {{ $value }}" # 告警详细描述,包含当前失败率的具体数值



# 设备心跳超时率告警规则
- alert: DeviceHeartbeatTimeoutHigh # 告警名称,表示设备心跳超时率过高
expr: rate(device_heartbeat_timeout_total[5m]) > 0.05 # PromQL表达式,计算5分钟内设备心跳超时率,超过0.05触发告警
for: 5m # 告警持续时间,表示指标持续5分钟满足条件后才触发告警
labels: # 告警标签,用于标识告警的严重程度等元信息
severity: warning # 告警级别,设置为警告级别
annotations: # 告警注释,用于提供告警的详细描述信息
summary: "设备心跳超时率过高" # 告警摘要,简明扼要地描述告警内容
description: "设备心跳超时率超过5%,当前值: {{ $value }}" # 告警详细描述,包含当前超时率的具体数值



# 设备指令下发失败率告警规则
- alert: DeviceCommandFailureRateHigh # 告警名称,表示设备指令下发失败率过高
expr: rate(device_command_sent_total{status="failed"}[5m]) > 0.1 # PromQL表达式,计算5分钟内设备指令下发失败率,超过0.1触发告警
for: 5m # 告警持续时间,表示指标持续5分钟满足条件后才触发告警
labels: # 告警标签,用于标识告警的严重程度等元信息
severity: warning # 告警级别,设置为警告级别
annotations: # 告警注释,用于提供告警的详细描述信息
summary: "设备指令下发失败率过高" # 告警摘要,简明扼要地描述告警内容
description: "设备指令下发失败率超过10%,当前值: {{ $value }}" # 告警详细描述,包含当前失败率的具体数值

12. 总结

本文深入讲解了设备服务接入系统的Java微服务后端架构实战,涵盖了以下核心内容:

  1. 系统架构设计:采用微服务架构,通过接入网关、设备服务、指令服务、数据服务协同完成设备接入管理
  2. MQTT协议支持:基于Netty实现MQTT协议服务器,支持设备连接、认证、消息发布订阅
  3. 设备注册认证:支持设备注册、设备凭证管理、设备认证
  4. 连接管理:实现设备连接状态管理、心跳检测、连接超时处理
  5. 指令下发:支持设备指令下发、指令状态跟踪、指令响应处理
  6. 数据上报:支持设备数据接收、数据解析、数据转发
  7. 性能优化:通过连接池、缓存、批量处理提升系统性能
  8. 监控告警:通过业务指标监控、告警规则保障系统稳定运行

通过本文的学习,读者可以掌握如何基于Java微服务架构实现一个高性能、高可用、可扩展的设备服务接入系统,为实际项目开发提供参考和指导。