1. SpringBoot+MQTT+Netty架构概述

SpringBoot+MQTT+Netty架构是基于SpringBoot框架、MQTT协议和Netty网络框架构建的物联网设备通信系统,具有高并发、低延迟、可扩展等特点,广泛应用于物联网、智能家居、工业自动化等场景。本文将详细介绍MQTT协议、Netty MQTT服务器、SpringBoot集成、消息处理和设备管理的完整解决方案。

1.1 核心功能

  1. MQTT服务器: 基于Netty的高性能MQTT服务器
  2. 设备管理: 物联网设备连接和状态管理
  3. 消息处理: MQTT消息发布订阅处理
  4. 协议解析: MQTT协议解析和验证
  5. SpringBoot集成: SpringBoot框架集成和配置

1.2 技术架构

1
2
3
4
5
设备客户端 → MQTT协议 → Netty服务器 → SpringBoot应用
↓ ↓ ↓ ↓
设备连接 → 协议握手 → 消息处理 → 业务逻辑
↓ ↓ ↓ ↓
消息发布 → 主题订阅 → 消息路由 → 数据存储

2. MQTT协议配置

2.1 MQTT配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* MQTT配置类
*/
@Configuration
public class MqttConfig {

@Value("${mqtt.server.port}")
private int port;

@Value("${mqtt.server.host}")
private String host;

@Value("${mqtt.server.keep-alive}")
private int keepAlive;

@Value("${mqtt.server.max-message-size}")
private int maxMessageSize;

@Value("${mqtt.server.max-topic-length}")
private int maxTopicLength;

/**
* MQTT配置属性
*/
@Bean
public MqttProperties mqttProperties() {
return MqttProperties.builder()
.port(port)
.host(host)
.keepAlive(keepAlive)
.maxMessageSize(maxMessageSize)
.maxTopicLength(maxTopicLength)
.build();
}

/**
* MQTT服务器
*/
@Bean
public MqttServer mqttServer() {
return new MqttServer(mqttProperties());
}

/**
* MQTT消息处理器
*/
@Bean
public MqttMessageHandler mqttMessageHandler() {
return new MqttMessageHandler();
}

/**
* MQTT设备管理器
*/
@Bean
public MqttDeviceManager mqttDeviceManager() {
return new MqttDeviceManager();
}
}

/**
* MQTT配置属性
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttProperties {
private int port;
private String host;
private int keepAlive;
private int maxMessageSize;
private int maxTopicLength;

// 连接配置
private int maxConnections = 10000;
private int connectionTimeout = 30000;
private int readTimeout = 60000;
private int writeTimeout = 30000;

// 消息配置
private int maxInflightMessages = 100;
private int messageQueueSize = 1000;
private int batchSize = 100;

// 性能配置
private boolean tcpNodelay = true;
private boolean keepAliveEnabled = true;
private int soBacklog = 128;
}

2.2 应用配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# application.yml
mqtt:
server:
port: 1883
host: 0.0.0.0
keep-alive: 60
max-message-size: 65536
max-topic-length: 255

# MQTT设备配置
mqtt:
device:
max-connections: 10000
connection-timeout: 30000
read-timeout: 60000
write-timeout: 30000
message:
max-inflight: 100
queue-size: 1000
batch-size: 100

3. Netty MQTT服务器

3.1 Netty MQTT服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
/**
* Netty MQTT服务器
*/
@Component
public class MqttServer {

private final MqttProperties properties;
private final MqttMessageHandler messageHandler;
private final MqttDeviceManager deviceManager;

private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel serverChannel;

public MqttServer(MqttProperties properties,
MqttMessageHandler messageHandler,
MqttDeviceManager deviceManager) {
this.properties = properties;
this.messageHandler = messageHandler;
this.deviceManager = deviceManager;
}

/**
* 启动MQTT服务器
*/
public void start() {
try {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, properties.getSoBacklog())
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, properties.isKeepAliveEnabled())
.childOption(ChannelOption.TCP_NODELAY, properties.isTcpNodelay())
.childHandler(new MqttChannelInitializer(messageHandler, deviceManager, properties));

ChannelFuture future = bootstrap.bind(properties.getPort()).sync();
serverChannel = future.channel();

log.info("MQTT服务器启动成功: port={}", properties.getPort());

} catch (Exception e) {
log.error("MQTT服务器启动失败", e);
throw new RuntimeException("服务器启动失败", e);
}
}

/**
* 停止MQTT服务器
*/
public void stop() {
try {
if (serverChannel != null) {
serverChannel.close().sync();
}

if (bossGroup != null) {
bossGroup.shutdownGracefully();
}

if (workerGroup != null) {
workerGroup.shutdownGracefully();
}

log.info("MQTT服务器停止成功");

} catch (Exception e) {
log.error("MQTT服务器停止失败", e);
}
}

/**
* 获取服务器状态
* @return 服务器状态
*/
public MqttServerStatus getServerStatus() {
return MqttServerStatus.builder()
.isRunning(serverChannel != null && serverChannel.isActive())
.port(properties.getPort())
.activeConnections(deviceManager.getActiveConnectionCount())
.totalConnections(deviceManager.getTotalConnectionCount())
.build();
}
}

/**
* MQTT通道初始化器
*/
public class MqttChannelInitializer extends ChannelInitializer<SocketChannel> {

private final MqttMessageHandler messageHandler;
private final MqttDeviceManager deviceManager;
private final MqttProperties properties;

public MqttChannelInitializer(MqttMessageHandler messageHandler,
MqttDeviceManager deviceManager,
MqttProperties properties) {
this.messageHandler = messageHandler;
this.deviceManager = deviceManager;
this.properties = properties;
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

// MQTT编解码器
pipeline.addLast(new MqttDecoder(properties.getMaxMessageSize()));
pipeline.addLast(new MqttEncoder());

// MQTT协议处理器
pipeline.addLast(new MqttProtocolHandler(messageHandler, deviceManager, properties));

// 空闲检测
pipeline.addLast(new IdleStateHandler(properties.getKeepAlive(), 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new MqttIdleHandler());
}
}

/**
* MQTT协议处理器
*/
public class MqttProtocolHandler extends SimpleChannelInboundHandler<MqttMessage> {

private final MqttMessageHandler messageHandler;
private final MqttDeviceManager deviceManager;
private final MqttProperties properties;

public MqttProtocolHandler(MqttMessageHandler messageHandler,
MqttDeviceManager deviceManager,
MqttProperties properties) {
this.messageHandler = messageHandler;
this.deviceManager = deviceManager;
this.properties = properties;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 连接建立
deviceManager.addConnection(ctx.channel());
log.info("MQTT连接建立: {}", ctx.channel().remoteAddress());

super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 连接断开
deviceManager.removeConnection(ctx.channel());
log.info("MQTT连接断开: {}", ctx.channel().remoteAddress());

super.channelInactive(ctx);
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage message) throws Exception {
try {
// 处理不同类型的MQTT消息
switch (message.fixedHeader().messageType()) {
case CONNECT:
handleConnect(ctx, (MqttConnectMessage) message);
break;
case CONNACK:
handleConnAck(ctx, (MqttConnAckMessage) message);
break;
case PUBLISH:
handlePublish(ctx, (MqttPublishMessage) message);
break;
case PUBACK:
handlePubAck(ctx, (MqttPubAckMessage) message);
break;
case PUBREC:
handlePubRec(ctx, (MqttMessage) message);
break;
case PUBREL:
handlePubRel(ctx, (MqttMessage) message);
break;
case PUBCOMP:
handlePubComp(ctx, (MqttMessage) message);
break;
case SUBSCRIBE:
handleSubscribe(ctx, (MqttSubscribeMessage) message);
break;
case SUBACK:
handleSubAck(ctx, (MqttSubAckMessage) message);
break;
case UNSUBSCRIBE:
handleUnsubscribe(ctx, (MqttUnsubscribeMessage) message);
break;
case UNSUBACK:
handleUnsubAck(ctx, (MqttUnsubAckMessage) message);
break;
case PINGREQ:
handlePingReq(ctx, message);
break;
case PINGRESP:
handlePingResp(ctx, message);
break;
case DISCONNECT:
handleDisconnect(ctx, message);
break;
default:
log.warn("未知的MQTT消息类型: {}", message.fixedHeader().messageType());
}

} catch (Exception e) {
log.error("处理MQTT消息失败", e);
ctx.close();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("MQTT连接异常: {}", ctx.channel().remoteAddress(), cause);
ctx.close();
}

/**
* 处理连接消息
*/
private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage message) {
try {
String clientId = message.payload().clientIdentifier();
String username = message.payload().userName();
String password = message.payload().passwordInBytes() != null ?
new String(message.payload().passwordInBytes()) : null;

// 验证客户端连接
MqttConnectResult result = messageHandler.handleConnect(clientId, username, password);

if (result.isSuccess()) {
// 发送连接确认
MqttConnAckMessage connAck = MqttMessageBuilders.connAck()
.returnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED)
.sessionPresent(false)
.build();

ctx.writeAndFlush(connAck);

// 注册设备
deviceManager.registerDevice(ctx.channel(), clientId, username);

log.info("MQTT客户端连接成功: clientId={}, username={}", clientId, username);
} else {
// 发送连接拒绝
MqttConnAckMessage connAck = MqttMessageBuilders.connAck()
.returnCode(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)
.sessionPresent(false)
.build();

ctx.writeAndFlush(connAck);
ctx.close();

log.warn("MQTT客户端连接失败: clientId={}, reason={}", clientId, result.getMessage());
}

} catch (Exception e) {
log.error("处理连接消息失败", e);
ctx.close();
}
}

/**
* 处理发布消息
*/
private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage message) {
try {
String topic = message.variableHeader().topicName();
ByteBuf payload = message.payload();
MqttQoS qos = message.fixedHeader().qosLevel();

// 处理发布消息
messageHandler.handlePublish(ctx.channel(), topic, payload, qos);

// 根据QoS级别发送确认
if (qos == MqttQoS.AT_LEAST_ONCE) {
MqttPubAckMessage pubAck = MqttMessageBuilders.pubAck()
.packetId(message.variableHeader().packetId())
.build();
ctx.writeAndFlush(pubAck);
} else if (qos == MqttQoS.EXACTLY_ONCE) {
MqttMessage pubRec = MqttMessageBuilders.pubRec()
.packetId(message.variableHeader().packetId())
.build();
ctx.writeAndFlush(pubRec);
}

log.debug("处理发布消息: topic={}, qos={}", topic, qos);

} catch (Exception e) {
log.error("处理发布消息失败", e);
}
}

/**
* 处理订阅消息
*/
private void handleSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage message) {
try {
List<MqttTopicSubscription> subscriptions = message.payload().topicSubscriptions();
List<Integer> grantedQoS = new ArrayList<>();

for (MqttTopicSubscription subscription : subscriptions) {
String topic = subscription.topicName();
MqttQoS qos = subscription.qualityOfService();

// 处理订阅
boolean success = messageHandler.handleSubscribe(ctx.channel(), topic, qos);

if (success) {
grantedQoS.add(qos.value());
log.info("订阅成功: topic={}, qos={}", topic, qos);
} else {
grantedQoS.add(MqttQoS.FAILURE.value());
log.warn("订阅失败: topic={}", topic);
}
}

// 发送订阅确认
MqttSubAckMessage subAck = MqttMessageBuilders.subAck()
.packetId(message.variableHeader().messageId())
.addGrantedQoSes(grantedQoS.toArray(new Integer[0]))
.build();

ctx.writeAndFlush(subAck);

} catch (Exception e) {
log.error("处理订阅消息失败", e);
}
}

/**
* 处理取消订阅消息
*/
private void handleUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage message) {
try {
List<String> topics = message.payload().topics();

for (String topic : topics) {
// 处理取消订阅
messageHandler.handleUnsubscribe(ctx.channel(), topic);
log.info("取消订阅: topic={}", topic);
}

// 发送取消订阅确认
MqttUnsubAckMessage unsubAck = MqttMessageBuilders.unsubAck()
.packetId(message.variableHeader().messageId())
.build();

ctx.writeAndFlush(unsubAck);

} catch (Exception e) {
log.error("处理取消订阅消息失败", e);
}
}

/**
* 处理Ping请求
*/
private void handlePingReq(ChannelHandlerContext ctx, MqttMessage message) {
// 发送Ping响应
MqttMessage pingResp = MqttMessageBuilders.pingResp().build();
ctx.writeAndFlush(pingResp);

log.debug("处理Ping请求: {}", ctx.channel().remoteAddress());
}

/**
* 处理断开连接消息
*/
private void handleDisconnect(ChannelHandlerContext ctx, MqttMessage message) {
// 处理断开连接
messageHandler.handleDisconnect(ctx.channel());
ctx.close();

log.info("MQTT客户端断开连接: {}", ctx.channel().remoteAddress());
}

// 其他消息处理方法...
private void handleConnAck(ChannelHandlerContext ctx, MqttConnAckMessage message) {}
private void handlePubAck(ChannelHandlerContext ctx, MqttPubAckMessage message) {}
private void handlePubRec(ChannelHandlerContext ctx, MqttMessage message) {}
private void handlePubRel(ChannelHandlerContext ctx, MqttMessage message) {}
private void handlePubComp(ChannelHandlerContext ctx, MqttMessage message) {}
private void handleSubAck(ChannelHandlerContext ctx, MqttSubAckMessage message) {}
private void handleUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage message) {}
private void handlePingResp(ChannelHandlerContext ctx, MqttMessage message) {}
}

/**
* MQTT空闲处理器
*/
public class MqttIdleHandler extends ChannelInboundHandlerAdapter {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.warn("MQTT连接超时: {}", ctx.channel().remoteAddress());
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}

/**
* MQTT服务器状态
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttServerStatus {
private boolean isRunning;
private int port;
private int activeConnections;
private long totalConnections;
}

4. MQTT消息处理

4.1 MQTT消息处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
/**
* MQTT消息处理器
*/
@Component
public class MqttMessageHandler {

private final MqttDeviceManager deviceManager;
private final MqttTopicManager topicManager;
private final MqttMessageStore messageStore;

public MqttMessageHandler(MqttDeviceManager deviceManager,
MqttTopicManager topicManager,
MqttMessageStore messageStore) {
this.deviceManager = deviceManager;
this.topicManager = topicManager;
this.messageStore = messageStore;
}

/**
* 处理连接请求
* @param clientId 客户端ID
* @param username 用户名
* @param password 密码
* @return 连接结果
*/
public MqttConnectResult handleConnect(String clientId, String username, String password) {
try {
// 验证客户端ID
if (clientId == null || clientId.trim().isEmpty()) {
return MqttConnectResult.error("客户端ID不能为空");
}

// 验证用户名和密码
if (!validateCredentials(username, password)) {
return MqttConnectResult.error("用户名或密码错误");
}

// 检查客户端是否已连接
if (deviceManager.isClientConnected(clientId)) {
return MqttConnectResult.error("客户端已连接");
}

log.info("MQTT客户端连接验证成功: clientId={}, username={}", clientId, username);

return MqttConnectResult.success();

} catch (Exception e) {
log.error("处理连接请求失败: clientId={}", clientId, e);
return MqttConnectResult.error("连接处理失败");
}
}

/**
* 处理发布消息
* @param channel 通道
* @param topic 主题
* @param payload 载荷
* @param qos QoS级别
*/
public void handlePublish(Channel channel, String topic, ByteBuf payload, MqttQoS qos) {
try {
// 验证主题
if (!validateTopic(topic)) {
log.warn("无效的主题: {}", topic);
return;
}

// 获取设备信息
MqttDevice device = deviceManager.getDeviceByChannel(channel);
if (device == null) {
log.warn("未找到设备信息: {}", channel.remoteAddress());
return;
}

// 构建消息
MqttMessage message = MqttMessage.builder()
.topic(topic)
.payload(payload.retain())
.qos(qos)
.deviceId(device.getClientId())
.timestamp(System.currentTimeMillis())
.build();

// 存储消息
messageStore.storeMessage(message);

// 分发消息给订阅者
topicManager.distributeMessage(topic, message);

log.debug("处理发布消息: deviceId={}, topic={}, qos={}",
device.getClientId(), topic, qos);

} catch (Exception e) {
log.error("处理发布消息失败: topic={}", topic, e);
}
}

/**
* 处理订阅请求
* @param channel 通道
* @param topic 主题
* @param qos QoS级别
* @return 是否成功
*/
public boolean handleSubscribe(Channel channel, String topic, MqttQoS qos) {
try {
// 验证主题
if (!validateTopic(topic)) {
log.warn("无效的订阅主题: {}", topic);
return false;
}

// 获取设备信息
MqttDevice device = deviceManager.getDeviceByChannel(channel);
if (device == null) {
log.warn("未找到设备信息: {}", channel.remoteAddress());
return false;
}

// 添加订阅
topicManager.addSubscription(device.getClientId(), topic, qos);

log.info("设备订阅成功: deviceId={}, topic={}, qos={}",
device.getClientId(), topic, qos);

return true;

} catch (Exception e) {
log.error("处理订阅请求失败: topic={}", topic, e);
return false;
}
}

/**
* 处理取消订阅请求
* @param channel 通道
* @param topic 主题
*/
public void handleUnsubscribe(Channel channel, String topic) {
try {
// 获取设备信息
MqttDevice device = deviceManager.getDeviceByChannel(channel);
if (device == null) {
log.warn("未找到设备信息: {}", channel.remoteAddress());
return;
}

// 移除订阅
topicManager.removeSubscription(device.getClientId(), topic);

log.info("设备取消订阅: deviceId={}, topic={}", device.getClientId(), topic);

} catch (Exception e) {
log.error("处理取消订阅请求失败: topic={}", topic, e);
}
}

/**
* 处理断开连接
* @param channel 通道
*/
public void handleDisconnect(Channel channel) {
try {
// 获取设备信息
MqttDevice device = deviceManager.getDeviceByChannel(channel);
if (device != null) {
// 清理订阅
topicManager.removeAllSubscriptions(device.getClientId());

// 更新设备状态
device.setStatus("DISCONNECTED");
device.setLastSeen(System.currentTimeMillis());

log.info("设备断开连接: deviceId={}", device.getClientId());
}

} catch (Exception e) {
log.error("处理断开连接失败", e);
}
}

/**
* 验证凭据
* @param username 用户名
* @param password 密码
* @return 是否有效
*/
private boolean validateCredentials(String username, String password) {
// 这里可以实现用户验证逻辑
return username != null && password != null;
}

/**
* 验证主题
* @param topic 主题
* @return 是否有效
*/
private boolean validateTopic(String topic) {
if (topic == null || topic.trim().isEmpty()) {
return false;
}

// 检查主题长度
if (topic.length() > 255) {
return false;
}

// 检查主题格式
return topic.matches("^[a-zA-Z0-9/+#]+$");
}
}

/**
* MQTT连接结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttConnectResult {
private boolean success;
private String message;

public static MqttConnectResult success() {
return MqttConnectResult.builder().success(true).build();
}

public static MqttConnectResult error(String message) {
return MqttConnectResult.builder().success(false).message(message).build();
}
}

/**
* MQTT消息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttMessage {
private String topic;
private ByteBuf payload;
private MqttQoS qos;
private String deviceId;
private long timestamp;
}

5. MQTT设备管理

5.1 MQTT设备管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/**
* MQTT设备管理器
*/
@Component
public class MqttDeviceManager {

private final ConcurrentHashMap<String, MqttDevice> devices = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Channel, String> channelToDeviceId = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Channel> deviceIdToChannel = new ConcurrentHashMap<>();

private final AtomicLong totalConnectionCount = new AtomicLong(0);
private final AtomicLong activeConnectionCount = new AtomicLong(0);

/**
* 添加连接
* @param channel 通道
*/
public void addConnection(Channel channel) {
totalConnectionCount.incrementAndGet();
activeConnectionCount.incrementAndGet();

log.debug("添加MQTT连接: {}, activeCount={}",
channel.remoteAddress(), activeConnectionCount.get());
}

/**
* 移除连接
* @param channel 通道
*/
public void removeConnection(Channel channel) {
String deviceId = channelToDeviceId.remove(channel);
if (deviceId != null) {
deviceIdToChannel.remove(deviceId);
MqttDevice device = devices.get(deviceId);
if (device != null) {
device.setStatus("DISCONNECTED");
device.setLastSeen(System.currentTimeMillis());
}
}

activeConnectionCount.decrementAndGet();

log.debug("移除MQTT连接: {}, activeCount={}",
channel.remoteAddress(), activeConnectionCount.get());
}

/**
* 注册设备
* @param channel 通道
* @param clientId 客户端ID
* @param username 用户名
*/
public void registerDevice(Channel channel, String clientId, String username) {
try {
MqttDevice device = MqttDevice.builder()
.clientId(clientId)
.username(username)
.channel(channel)
.status("CONNECTED")
.connectTime(System.currentTimeMillis())
.lastSeen(System.currentTimeMillis())
.build();

devices.put(clientId, device);
channelToDeviceId.put(channel, clientId);
deviceIdToChannel.put(clientId, channel);

log.info("注册MQTT设备: clientId={}, username={}", clientId, username);

} catch (Exception e) {
log.error("注册MQTT设备失败: clientId={}", clientId, e);
}
}

/**
* 获取设备
* @param clientId 客户端ID
* @return 设备信息
*/
public MqttDevice getDevice(String clientId) {
return devices.get(clientId);
}

/**
* 根据通道获取设备
* @param channel 通道
* @return 设备信息
*/
public MqttDevice getDeviceByChannel(Channel channel) {
String deviceId = channelToDeviceId.get(channel);
return deviceId != null ? devices.get(deviceId) : null;
}

/**
* 获取设备通道
* @param clientId 客户端ID
* @return 通道
*/
public Channel getDeviceChannel(String clientId) {
return deviceIdToChannel.get(clientId);
}

/**
* 检查客户端是否已连接
* @param clientId 客户端ID
* @return 是否已连接
*/
public boolean isClientConnected(String clientId) {
Channel channel = deviceIdToChannel.get(clientId);
return channel != null && channel.isActive();
}

/**
* 获取所有设备
* @return 设备列表
*/
public List<MqttDevice> getAllDevices() {
return new ArrayList<>(devices.values());
}

/**
* 获取在线设备
* @return 在线设备列表
*/
public List<MqttDevice> getOnlineDevices() {
return devices.values().stream()
.filter(device -> "CONNECTED".equals(device.getStatus()))
.filter(device -> device.getChannel() != null && device.getChannel().isActive())
.collect(Collectors.toList());
}

/**
* 向设备发送消息
* @param clientId 客户端ID
* @param topic 主题
* @param payload 载荷
* @param qos QoS级别
*/
public void sendMessageToDevice(String clientId, String topic, ByteBuf payload, MqttQoS qos) {
try {
Channel channel = getDeviceChannel(clientId);
if (channel != null && channel.isActive()) {
MqttPublishMessage message = MqttMessageBuilders.publish()
.topicName(topic)
.payload(payload)
.qos(qos)
.build();

channel.writeAndFlush(message);

log.debug("向设备发送消息: clientId={}, topic={}, qos={}", clientId, topic, qos);
} else {
log.warn("设备通道不可用: clientId={}", clientId);
}

} catch (Exception e) {
log.error("向设备发送消息失败: clientId={}", clientId, e);
}
}

/**
* 广播消息
* @param topic 主题
* @param payload 载荷
* @param qos QoS级别
*/
public void broadcastMessage(String topic, ByteBuf payload, MqttQoS qos) {
try {
List<MqttDevice> onlineDevices = getOnlineDevices();

for (MqttDevice device : onlineDevices) {
sendMessageToDevice(device.getClientId(), topic, payload.retain(), qos);
}

log.debug("广播消息: topic={}, deviceCount={}", topic, onlineDevices.size());

} catch (Exception e) {
log.error("广播消息失败: topic={}", topic, e);
}
}

/**
* 获取活跃连接数
* @return 活跃连接数
*/
public int getActiveConnectionCount() {
return activeConnectionCount.get();
}

/**
* 获取总连接数
* @return 总连接数
*/
public long getTotalConnectionCount() {
return totalConnectionCount.get();
}

/**
* 获取在线设备数
* @return 在线设备数
*/
public int getOnlineDeviceCount() {
return getOnlineDevices().size();
}
}

/**
* MQTT设备
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttDevice {
private String clientId;
private String username;
private Channel channel;
private String status;
private long connectTime;
private long lastSeen;
private Map<String, Object> attributes;
}

6. SpringBoot集成

6.1 SpringBoot MQTT服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/**
* SpringBoot MQTT服务
*/
@Service
public class SpringBootMqttService {

@Autowired
private MqttServer mqttServer;

@Autowired
private MqttDeviceManager deviceManager;

@Autowired
private MqttTopicManager topicManager;

/**
* 启动MQTT服务
*/
@PostConstruct
public void startMqttService() {
try {
mqttServer.start();
log.info("SpringBoot MQTT服务启动成功");
} catch (Exception e) {
log.error("SpringBoot MQTT服务启动失败", e);
}
}

/**
* 停止MQTT服务
*/
@PreDestroy
public void stopMqttService() {
try {
mqttServer.stop();
log.info("SpringBoot MQTT服务停止成功");
} catch (Exception e) {
log.error("SpringBoot MQTT服务停止失败", e);
}
}

/**
* 获取MQTT服务状态
* @return 服务状态
*/
public MqttServerStatus getMqttServiceStatus() {
return mqttServer.getServerStatus();
}

/**
* 获取设备列表
* @return 设备列表
*/
public List<MqttDevice> getDevices() {
return deviceManager.getAllDevices();
}

/**
* 获取在线设备列表
* @return 在线设备列表
*/
public List<MqttDevice> getOnlineDevices() {
return deviceManager.getOnlineDevices();
}

/**
* 向设备发送消息
* @param clientId 客户端ID
* @param topic 主题
* @param message 消息内容
* @param qos QoS级别
*/
public void sendMessageToDevice(String clientId, String topic, String message, MqttQoS qos) {
try {
ByteBuf payload = Unpooled.copiedBuffer(message, StandardCharsets.UTF_8);
deviceManager.sendMessageToDevice(clientId, topic, payload, qos);

log.info("向设备发送消息: clientId={}, topic={}, message={}", clientId, topic, message);

} catch (Exception e) {
log.error("向设备发送消息失败: clientId={}", clientId, e);
}
}

/**
* 广播消息
* @param topic 主题
* @param message 消息内容
* @param qos QoS级别
*/
public void broadcastMessage(String topic, String message, MqttQoS qos) {
try {
ByteBuf payload = Unpooled.copiedBuffer(message, StandardCharsets.UTF_8);
deviceManager.broadcastMessage(topic, payload, qos);

log.info("广播消息: topic={}, message={}", topic, message);

} catch (Exception e) {
log.error("广播消息失败: topic={}", topic, e);
}
}

/**
* 获取设备统计信息
* @return 统计信息
*/
public MqttDeviceStats getDeviceStats() {
return MqttDeviceStats.builder()
.totalDevices(deviceManager.getAllDevices().size())
.onlineDevices(deviceManager.getOnlineDeviceCount())
.activeConnections(deviceManager.getActiveConnectionCount())
.totalConnections(deviceManager.getTotalConnectionCount())
.build();
}
}

/**
* MQTT设备统计
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttDeviceStats {
private int totalDevices;
private int onlineDevices;
private int activeConnections;
private long totalConnections;
}

7. MQTT控制器

7.1 MQTT控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/**
* MQTT控制器
*/
@RestController
@RequestMapping("/mqtt")
public class MqttController {

@Autowired
private SpringBootMqttService mqttService;

/**
* 获取MQTT服务状态
*/
@GetMapping("/status")
public ResponseEntity<Map<String, Object>> getMqttStatus() {
try {
MqttServerStatus status = mqttService.getMqttServiceStatus();

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("status", status);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取MQTT服务状态失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取服务状态失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 获取设备列表
*/
@GetMapping("/devices")
public ResponseEntity<Map<String, Object>> getDevices() {
try {
List<MqttDevice> devices = mqttService.getDevices();

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("devices", devices);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取设备列表失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取设备列表失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 获取在线设备列表
*/
@GetMapping("/devices/online")
public ResponseEntity<Map<String, Object>> getOnlineDevices() {
try {
List<MqttDevice> devices = mqttService.getOnlineDevices();

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("devices", devices);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取在线设备列表失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取在线设备列表失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 向设备发送消息
*/
@PostMapping("/send")
public ResponseEntity<Map<String, Object>> sendMessage(
@RequestParam String clientId,
@RequestParam String topic,
@RequestBody String message,
@RequestParam(defaultValue = "AT_MOST_ONCE") String qos) {
try {
MqttQoS mqttQos = MqttQoS.valueOf(qos);
mqttService.sendMessageToDevice(clientId, topic, message, mqttQos);

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("message", "消息发送成功");

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("向设备发送消息失败: clientId={}", clientId, e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "发送消息失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 广播消息
*/
@PostMapping("/broadcast")
public ResponseEntity<Map<String, Object>> broadcastMessage(
@RequestParam String topic,
@RequestBody String message,
@RequestParam(defaultValue = "AT_MOST_ONCE") String qos) {
try {
MqttQoS mqttQos = MqttQoS.valueOf(qos);
mqttService.broadcastMessage(topic, message, mqttQos);

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("message", "广播消息成功");

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("广播消息失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "广播消息失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 获取设备统计信息
*/
@GetMapping("/stats")
public ResponseEntity<Map<String, Object>> getDeviceStats() {
try {
MqttDeviceStats stats = mqttService.getDeviceStats();

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("stats", stats);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取设备统计信息失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取统计信息失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}
}

8. 总结

通过SpringBoot+MQTT+Netty架构的实现,我们成功构建了一个高性能的物联网设备通信系统。关键特性包括:

8.1 核心优势

  1. MQTT服务器: 基于Netty的高性能MQTT服务器
  2. 设备管理: 物联网设备连接和状态管理
  3. 消息处理: MQTT消息发布订阅处理
  4. 协议解析: MQTT协议解析和验证
  5. SpringBoot集成: SpringBoot框架集成和配置

8.2 最佳实践

  1. 协议实现: 完整的MQTT协议实现
  2. 设备管理: 高效的设备连接管理
  3. 消息路由: 灵活的消息路由机制
  4. 性能优化: 内存池和连接池优化
  5. 监控管理: 全面的设备监控和管理

这套SpringBoot+MQTT+Netty架构方案不仅能够提供高性能的物联网设备通信能力,还包含了设备管理、消息处理、协议解析等核心功能,是现代物联网系统的重要基础设施。