1. 多类设备网关概述

多类设备网关是IoT系统中的核心组件,负责连接和管理不同类型的设备,实现设备消息的接收、解析、路由和转发。本文将详细介绍基于Netty和Kafka的多类设备网关实现,包括设备连接管理、消息路由、协议解析、负载均衡的完整解决方案。

1.1 核心功能

  1. 设备连接管理: 管理多种类型设备的TCP连接
  2. 消息路由: 根据设备类型和消息内容进行智能路由
  3. 协议解析: 支持多种设备协议的数据解析
  4. 负载均衡: 实现设备连接的负载均衡
  5. 消息转发: 将设备消息转发到Kafka进行后续处理

1.2 技术架构

1
2
3
4
5
设备A → Netty网关 → 协议解析 → 消息路由 → Kafka Topic A
设备B → Netty网关 → 协议解析 → 消息路由 → Kafka Topic B
设备C → Netty网关 → 协议解析 → 消息路由 → Kafka Topic C
↓ ↓ ↓ ↓
连接管理 → 心跳检测 → 异常处理 → 监控告警

2. Maven依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
<!-- pom.xml -->
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.gateway</groupId>
<artifactId>device-gateway-demo</artifactId>
<version>1.0.0</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
</parent>

<properties>
<java.version>11</java.version>
<netty.version>4.1.77.Final</netty.version>
<kafka.version>3.2.0</kafka.version>
</properties>

<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>

<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<!-- JSON处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- MySQL -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<!-- Micrometer(监控指标) -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

<!-- Commons工具 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<!-- Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
</dependencies>
</project>

3. 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# application.yml
server:
port: 8080

spring:
application:
name: device-gateway-demo

# Kafka配置
kafka:
bootstrap-servers: localhost:9092

# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
properties:
max.in.flight.requests.per.connection: 5
enable.idempotence: true

# 消费者配置
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
group-id: device-gateway-group
auto-offset-reset: earliest
enable-auto-commit: false

# Redis配置
redis:
host: localhost
port: 6379
database: 0

# 数据库配置
datasource:
url: jdbc:mysql://localhost:3306/device_gateway?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver

jpa:
hibernate:
ddl-auto: update
show-sql: true

# 设备网关配置
device:
gateway:
# Netty服务器配置
netty:
# 监听端口
port: 9999
# Boss线程数
boss-threads: 1
# Worker线程数
worker-threads: 8
# 连接超时时间(秒)
connect-timeout: 30
# 心跳间隔(秒)
heartbeat-interval: 30
# 最大连接数
max-connections: 10000

# 设备类型配置
device-types:
- type: "SENSOR"
topic: "device-sensor-topic"
protocol: "JSON"
heartbeat-interval: 60
- type: "CAMERA"
topic: "device-camera-topic"
protocol: "BINARY"
heartbeat-interval: 30
- type: "CONTROLLER"
topic: "device-controller-topic"
protocol: "MODBUS"
heartbeat-interval: 15

# 消息路由配置
routing:
# 默认Topic
default-topic: "device-default-topic"
# 路由规则
rules:
- condition: "deviceType == 'SENSOR'"
topic: "device-sensor-topic"
- condition: "deviceType == 'CAMERA'"
topic: "device-camera-topic"
- condition: "deviceType == 'CONTROLLER'"
topic: "device-controller-topic"

# 监控配置
management:
endpoints:
web:
exposure:
include: "*"
metrics:
export:
prometheus:
enabled: true

4. 设备实体定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package com.gateway.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.*;
import java.time.LocalDateTime;

/**
* 设备实体
* @author Java实战
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "device", indexes = {
@Index(name = "idx_device_id", columnList = "device_id"),
@Index(name = "idx_device_type", columnList = "device_type"),
@Index(name = "idx_online_status", columnList = "online_status")
})
public class Device {

/**
* 主键ID
*/
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

/**
* 设备ID(唯一标识)
*/
@Column(nullable = false, unique = true, length = 64)
private String deviceId;

/**
* 设备名称
*/
@Column(nullable = false, length = 100)
private String deviceName;

/**
* 设备类型
*/
@Column(nullable = false, length = 50)
private String deviceType;

/**
* 设备协议
*/
@Column(length = 50)
private String protocol;

/**
* 在线状态:ONLINE-在线, OFFLINE-离线
*/
@Column(nullable = false, length = 20)
private String onlineStatus;

/**
* 最后心跳时间
*/
private LocalDateTime lastHeartbeat;

/**
* 连接时间
*/
private LocalDateTime connectTime;

/**
* 断开时间
*/
private LocalDateTime disconnectTime;

/**
* 创建时间
*/
@Column(nullable = false)
private LocalDateTime createTime;

/**
* 更新时间
*/
private LocalDateTime updateTime;

/**
* 设备描述
*/
@Column(length = 500)
private String description;

/**
* 设备IP地址
*/
@Column(length = 50)
private String ipAddress;

/**
* 设备端口
*/
private Integer port;
}

5. Netty服务器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package com.gateway.netty;

import com.gateway.config.DeviceGatewayConfig;
import com.gateway.handler.DeviceChannelInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/**
* Netty服务器
* @author Java实战
*/
@Slf4j
@Component
public class NettyServer {

@Autowired
private DeviceGatewayConfig config;

@Autowired
private DeviceChannelInitializer channelInitializer;

private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ChannelFuture channelFuture;

/**
* 启动Netty服务器
*/
@PostConstruct
public void start() {
try {
log.info("启动Netty服务器,端口: {}", config.getNetty().getPort());

// 创建Boss和Worker线程组
bossGroup = new NioEventLoopGroup(config.getNetty().getBossThreads());
workerGroup = new NioEventLoopGroup(config.getNetty().getWorkerThreads());

// 创建服务器启动器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(channelInitializer)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_RCVBUF, 1024 * 64)
.childOption(ChannelOption.SO_SNDBUF, 1024 * 64);

// 绑定端口并启动服务器
channelFuture = bootstrap.bind(config.getNetty().getPort()).sync();

log.info("Netty服务器启动成功,端口: {}", config.getNetty().getPort());

// 异步等待服务器关闭
new Thread(() -> {
try {
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("Netty服务器关闭异常", e);
Thread.currentThread().interrupt();
}
}).start();

} catch (Exception e) {
log.error("启动Netty服务器失败", e);
throw new RuntimeException("启动Netty服务器失败", e);
}
}

/**
* 关闭Netty服务器
*/
@PreDestroy
public void stop() {
try {
log.info("关闭Netty服务器");

if (channelFuture != null) {
channelFuture.channel().close().sync();
}

if (workerGroup != null) {
workerGroup.shutdownGracefully().sync();
}

if (bossGroup != null) {
bossGroup.shutdownGracefully().sync();
}

log.info("Netty服务器关闭成功");

} catch (Exception e) {
log.error("关闭Netty服务器失败", e);
}
}
}

6. 设备连接处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package com.gateway.handler;

import com.gateway.entity.Device;
import com.gateway.service.DeviceService;
import com.gateway.service.MessageRouterService;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

/**
* 设备连接处理器
* @author Java实战
*/
@Slf4j
@Component
public class DeviceConnectionHandler extends ChannelInboundHandlerAdapter {

@Autowired
private DeviceService deviceService;

@Autowired
private MessageRouterService messageRouterService;

/**
* 客户端连接建立
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = address.getAddress().getHostAddress();
int clientPort = address.getPort();

log.info("设备连接建立: {}:{}", clientIp, clientPort);

// 将连接信息存储到Channel属性中
ctx.channel().attr(AttributeKeys.CLIENT_IP).set(clientIp);
ctx.channel().attr(AttributeKeys.CLIENT_PORT).set(clientPort);

super.channelActive(ctx);
}

/**
* 客户端连接断开
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String deviceId = ctx.channel().attr(AttributeKeys.DEVICE_ID).get();
String clientIp = ctx.channel().attr(AttributeKeys.CLIENT_IP).get();

log.info("设备连接断开: deviceId={}, ip={}", deviceId, clientIp);

// 更新设备离线状态
if (deviceId != null) {
deviceService.updateDeviceOffline(deviceId);
}

super.channelInactive(ctx);
}

/**
* 读取客户端消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
String deviceId = ctx.channel().attr(AttributeKeys.DEVICE_ID).get();
String deviceType = ctx.channel().attr(AttributeKeys.DEVICE_TYPE).get();

log.debug("收到设备消息: deviceId={}, deviceType={}, msg={}",
deviceId, deviceType, msg);

// 路由消息到Kafka
messageRouterService.routeMessage(deviceId, deviceType, msg);

} catch (Exception e) {
log.error("处理设备消息失败", e);
} finally {
// 释放消息对象
if (msg instanceof io.netty.util.ReferenceCounted) {
((io.netty.util.ReferenceCounted) msg).release();
}
}
}

/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
String deviceId = ctx.channel().attr(AttributeKeys.DEVICE_ID).get();
String clientIp = ctx.channel().attr(AttributeKeys.CLIENT_IP).get();

log.error("设备连接异常: deviceId={}, ip={}", deviceId, clientIp, cause);

// 关闭连接
ctx.close();
}

/**
* 心跳检测
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
String deviceId = ctx.channel().attr(AttributeKeys.DEVICE_ID).get();

if (event.state() == IdleState.READER_IDLE) {
log.warn("设备心跳超时: deviceId={}", deviceId);

// 发送心跳检测消息
ctx.writeAndFlush("PING");

} else if (event.state() == IdleState.WRITER_IDLE) {
log.warn("设备写入超时: deviceId={}", deviceId);

} else if (event.state() == IdleState.ALL_IDLE) {
log.warn("设备读写超时: deviceId={}", deviceId);

// 关闭超时连接
ctx.close();
}
}

super.userEventTriggered(ctx, evt);
}
}

7. 协议解析器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
package com.gateway.protocol;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/**
* 协议解析器
* @author Java实战
*/
@Slf4j
@Component
public class ProtocolParser {

/**
* 解析设备消息
*/
public DeviceMessage parseMessage(String deviceType, String protocol, Object rawMessage) {
try {
log.debug("解析设备消息: deviceType={}, protocol={}", deviceType, protocol);

DeviceMessage message = new DeviceMessage();
message.setDeviceType(deviceType);
message.setProtocol(protocol);
message.setRawMessage(rawMessage);
message.setTimestamp(System.currentTimeMillis());

// 根据协议类型解析消息
switch (protocol.toUpperCase()) {
case "JSON":
return parseJsonMessage(message);
case "BINARY":
return parseBinaryMessage(message);
case "MODBUS":
return parseModbusMessage(message);
default:
return parseDefaultMessage(message);
}

} catch (Exception e) {
log.error("解析设备消息失败: deviceType={}, protocol={}", deviceType, protocol, e);
return createErrorMessage(deviceType, protocol, rawMessage, e.getMessage());
}
}

/**
* 解析JSON协议消息
*/
private DeviceMessage parseJsonMessage(DeviceMessage message) {
try {
String jsonStr = message.getRawMessage().toString();
JSONObject json = JSON.parseObject(jsonStr);

message.setDeviceId(json.getString("deviceId"));
message.setMessageType(json.getString("messageType"));
message.setData(json.getJSONObject("data"));
message.setMessageId(json.getString("messageId"));

log.debug("JSON消息解析成功: deviceId={}, messageType={}",
message.getDeviceId(), message.getMessageType());

return message;

} catch (Exception e) {
log.error("JSON消息解析失败", e);
throw new RuntimeException("JSON消息解析失败", e);
}
}

/**
* 解析二进制协议消息
*/
private DeviceMessage parseBinaryMessage(DeviceMessage message) {
try {
byte[] data = (byte[]) message.getRawMessage();

// 简单的二进制协议解析
if (data.length < 8) {
throw new RuntimeException("消息长度不足");
}

// 解析设备ID(前4字节)
int deviceId = bytesToInt(data, 0);
message.setDeviceId(String.valueOf(deviceId));

// 解析消息类型(第5字节)
message.setMessageType(String.valueOf(data[4]));

// 解析数据长度(第6-7字节)
int dataLength = bytesToInt(data, 5);

// 解析数据内容
if (data.length >= 8 + dataLength) {
byte[] messageData = new byte[dataLength];
System.arraycopy(data, 8, messageData, 0, dataLength);
message.setData(parseBinaryData(messageData));
}

log.debug("二进制消息解析成功: deviceId={}, messageType={}",
message.getDeviceId(), message.getMessageType());

return message;

} catch (Exception e) {
log.error("二进制消息解析失败", e);
throw new RuntimeException("二进制消息解析失败", e);
}
}

/**
* 解析Modbus协议消息
*/
private DeviceMessage parseModbusMessage(DeviceMessage message) {
try {
byte[] data = (byte[]) message.getRawMessage();

if (data.length < 8) {
throw new RuntimeException("Modbus消息长度不足");
}

// 解析设备地址(第1字节)
int deviceAddress = data[0] & 0xFF;
message.setDeviceId(String.valueOf(deviceAddress));

// 解析功能码(第2字节)
int functionCode = data[1] & 0xFF;
message.setMessageType("FC" + functionCode);

// 解析数据
Map<String, Object> modbusData = new HashMap<>();
modbusData.put("deviceAddress", deviceAddress);
modbusData.put("functionCode", functionCode);

if (data.length > 2) {
byte[] payload = new byte[data.length - 2];
System.arraycopy(data, 2, payload, 0, payload.length);
modbusData.put("payload", payload);
}

message.setData(modbusData);

log.debug("Modbus消息解析成功: deviceId={}, functionCode={}",
message.getDeviceId(), functionCode);

return message;

} catch (Exception e) {
log.error("Modbus消息解析失败", e);
throw new RuntimeException("Modbus消息解析失败", e);
}
}

/**
* 解析默认协议消息
*/
private DeviceMessage parseDefaultMessage(DeviceMessage message) {
message.setDeviceId("UNKNOWN");
message.setMessageType("UNKNOWN");
message.setData(Map.of("raw", message.getRawMessage().toString()));

return message;
}

/**
* 创建错误消息
*/
private DeviceMessage createErrorMessage(String deviceType, String protocol,
Object rawMessage, String error) {
DeviceMessage message = new DeviceMessage();
message.setDeviceType(deviceType);
message.setProtocol(protocol);
message.setRawMessage(rawMessage);
message.setDeviceId("ERROR");
message.setMessageType("ERROR");
message.setData(Map.of("error", error));
message.setTimestamp(System.currentTimeMillis());

return message;
}

/**
* 解析二进制数据
*/
private Map<String, Object> parseBinaryData(byte[] data) {
Map<String, Object> result = new HashMap<>();

// 简单的二进制数据解析
for (int i = 0; i < data.length; i += 4) {
if (i + 4 <= data.length) {
int value = bytesToInt(data, i);
result.put("value" + (i / 4), value);
}
}

return result;
}

/**
* 字节数组转整数
*/
private int bytesToInt(byte[] bytes, int offset) {
return ((bytes[offset] & 0xFF) << 24) |
((bytes[offset + 1] & 0xFF) << 16) |
((bytes[offset + 2] & 0xFF) << 8) |
(bytes[offset + 3] & 0xFF);
}
}

8. 消息路由服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package com.gateway.service;

import com.alibaba.fastjson.JSON;
import com.gateway.config.DeviceGatewayConfig;
import com.gateway.entity.DeviceMessage;
import com.gateway.protocol.ProtocolParser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.UUID;

/**
* 消息路由服务
* @author Java实战
*/
@Slf4j
@Service
public class MessageRouterService {

@Autowired
private DeviceGatewayConfig config;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@Autowired
private ProtocolParser protocolParser;

/**
* 路由设备消息
*/
public void routeMessage(String deviceId, String deviceType, Object rawMessage) {
try {
log.debug("路由设备消息: deviceId={}, deviceType={}", deviceId, deviceType);

// 获取设备协议
String protocol = getDeviceProtocol(deviceType);

// 解析消息
DeviceMessage message = protocolParser.parseMessage(deviceType, protocol, rawMessage);
message.setDeviceId(deviceId);
message.setMessageId(UUID.randomUUID().toString());

// 确定目标Topic
String targetTopic = determineTargetTopic(message);

// 发送到Kafka
sendToKafka(targetTopic, message);

log.debug("消息路由成功: deviceId={}, topic={}", deviceId, targetTopic);

} catch (Exception e) {
log.error("消息路由失败: deviceId={}, deviceType={}", deviceId, deviceType, e);
}
}

/**
* 确定目标Topic
*/
private String determineTargetTopic(DeviceMessage message) {
// 根据路由规则确定Topic
for (DeviceGatewayConfig.RoutingRule rule : config.getRouting().getRules()) {
if (evaluateCondition(rule.getCondition(), message)) {
return rule.getTopic();
}
}

// 根据设备类型确定Topic
String topicByType = getTopicByDeviceType(message.getDeviceType());
if (topicByType != null) {
return topicByType;
}

// 使用默认Topic
return config.getRouting().getDefaultTopic();
}

/**
* 根据设备类型获取Topic
*/
private String getTopicByDeviceType(String deviceType) {
for (DeviceGatewayConfig.DeviceTypeConfig typeConfig : config.getDeviceTypes()) {
if (typeConfig.getType().equals(deviceType)) {
return typeConfig.getTopic();
}
}
return null;
}

/**
* 获取设备协议
*/
private String getDeviceProtocol(String deviceType) {
for (DeviceGatewayConfig.DeviceTypeConfig typeConfig : config.getDeviceTypes()) {
if (typeConfig.getType().equals(deviceType)) {
return typeConfig.getProtocol();
}
}
return "DEFAULT";
}

/**
* 评估路由条件
*/
private boolean evaluateCondition(String condition, DeviceMessage message) {
try {
// 简单的条件评估实现
if (condition.contains("deviceType")) {
String expectedType = condition.split("==")[1].trim().replace("'", "");
return expectedType.equals(message.getDeviceType());
}

return false;

} catch (Exception e) {
log.error("评估路由条件失败: condition={}", condition, e);
return false;
}
}

/**
* 发送消息到Kafka
*/
private void sendToKafka(String topic, DeviceMessage message) {
try {
// 构建Kafka消息
Map<String, Object> kafkaMessage = Map.of(
"messageId", message.getMessageId(),
"deviceId", message.getDeviceId(),
"deviceType", message.getDeviceType(),
"messageType", message.getMessageType(),
"protocol", message.getProtocol(),
"data", message.getData(),
"timestamp", message.getTimestamp()
);

// 发送到Kafka
kafkaTemplate.send(topic, message.getDeviceId(), kafkaMessage);

log.debug("消息已发送到Kafka: topic={}, deviceId={}", topic, message.getDeviceId());

} catch (Exception e) {
log.error("发送消息到Kafka失败: topic={}, deviceId={}", topic, message.getDeviceId(), e);
}
}
}

9. 设备管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package com.gateway.service;

import com.gateway.entity.Device;
import com.gateway.repository.DeviceRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

/**
* 设备管理服务
* @author Java实战
*/
@Slf4j
@Service
public class DeviceService {

@Autowired
private DeviceRepository deviceRepository;

@Autowired
private StringRedisTemplate redisTemplate;

private static final String DEVICE_ONLINE_KEY = "device:online:";
private static final String DEVICE_HEARTBEAT_KEY = "device:heartbeat:";

/**
* 注册设备
*/
@Transactional
public Device registerDevice(String deviceId, String deviceName, String deviceType,
String protocol, String ipAddress, Integer port) {
try {
log.info("注册设备: deviceId={}, deviceName={}, deviceType={}",
deviceId, deviceName, deviceType);

// 检查设备是否已存在
Optional<Device> existingDevice = deviceRepository.findByDeviceId(deviceId);

Device device;
if (existingDevice.isPresent()) {
// 更新现有设备
device = existingDevice.get();
device.setDeviceName(deviceName);
device.setDeviceType(deviceType);
device.setProtocol(protocol);
device.setIpAddress(ipAddress);
device.setPort(port);
device.setUpdateTime(LocalDateTime.now());
} else {
// 创建新设备
device = Device.builder()
.deviceId(deviceId)
.deviceName(deviceName)
.deviceType(deviceType)
.protocol(protocol)
.onlineStatus("OFFLINE")
.ipAddress(ipAddress)
.port(port)
.createTime(LocalDateTime.now())
.updateTime(LocalDateTime.now())
.build();
}

device = deviceRepository.save(device);

log.info("设备注册成功: deviceId={}", deviceId);
return device;

} catch (Exception e) {
log.error("设备注册失败: deviceId={}", deviceId, e);
throw new RuntimeException("设备注册失败", e);
}
}

/**
* 更新设备在线状态
*/
@Transactional
public void updateDeviceOnline(String deviceId, String ipAddress, Integer port) {
try {
Optional<Device> deviceOpt = deviceRepository.findByDeviceId(deviceId);
if (deviceOpt.isPresent()) {
Device device = deviceOpt.get();
device.setOnlineStatus("ONLINE");
device.setConnectTime(LocalDateTime.now());
device.setLastHeartbeat(LocalDateTime.now());
device.setIpAddress(ipAddress);
device.setPort(port);
device.setUpdateTime(LocalDateTime.now());

deviceRepository.save(device);

// 更新Redis缓存
redisTemplate.opsForValue().set(
DEVICE_ONLINE_KEY + deviceId,
"ONLINE",
30,
TimeUnit.MINUTES
);

log.info("设备上线: deviceId={}", deviceId);
}

} catch (Exception e) {
log.error("更新设备在线状态失败: deviceId={}", deviceId, e);
}
}

/**
* 更新设备离线状态
*/
@Transactional
public void updateDeviceOffline(String deviceId) {
try {
Optional<Device> deviceOpt = deviceRepository.findByDeviceId(deviceId);
if (deviceOpt.isPresent()) {
Device device = deviceOpt.get();
device.setOnlineStatus("OFFLINE");
device.setDisconnectTime(LocalDateTime.now());
device.setUpdateTime(LocalDateTime.now());

deviceRepository.save(device);

// 删除Redis缓存
redisTemplate.delete(DEVICE_ONLINE_KEY + deviceId);
redisTemplate.delete(DEVICE_HEARTBEAT_KEY + deviceId);

log.info("设备离线: deviceId={}", deviceId);
}

} catch (Exception e) {
log.error("更新设备离线状态失败: deviceId={}", deviceId, e);
}
}

/**
* 更新设备心跳
*/
@Transactional
public void updateDeviceHeartbeat(String deviceId) {
try {
Optional<Device> deviceOpt = deviceRepository.findByDeviceId(deviceId);
if (deviceOpt.isPresent()) {
Device device = deviceOpt.get();
device.setLastHeartbeat(LocalDateTime.now());
device.setUpdateTime(LocalDateTime.now());

deviceRepository.save(device);

// 更新Redis心跳缓存
redisTemplate.opsForValue().set(
DEVICE_HEARTBEAT_KEY + deviceId,
String.valueOf(System.currentTimeMillis()),
5,
TimeUnit.MINUTES
);

log.debug("设备心跳更新: deviceId={}", deviceId);
}

} catch (Exception e) {
log.error("更新设备心跳失败: deviceId={}", deviceId, e);
}
}

/**
* 查询在线设备
*/
public List<Device> getOnlineDevices() {
return deviceRepository.findByOnlineStatus("ONLINE");
}

/**
* 根据设备类型查询设备
*/
public List<Device> getDevicesByType(String deviceType) {
return deviceRepository.findByDeviceType(deviceType);
}

/**
* 检查设备是否在线
*/
public boolean isDeviceOnline(String deviceId) {
String status = redisTemplate.opsForValue().get(DEVICE_ONLINE_KEY + deviceId);
return "ONLINE".equals(status);
}
}

10. 设备消息实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.gateway.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Map;

/**
* 设备消息实体
* @author Java实战
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceMessage {

/**
* 消息ID
*/
private String messageId;

/**
* 设备ID
*/
private String deviceId;

/**
* 设备类型
*/
private String deviceType;

/**
* 消息类型
*/
private String messageType;

/**
* 协议类型
*/
private String protocol;

/**
* 消息数据
*/
private Map<String, Object> data;

/**
* 原始消息
*/
private Object rawMessage;

/**
* 时间戳
*/
private Long timestamp;
}

11. 控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package com.gateway.controller;

import com.gateway.entity.Device;
import com.gateway.service.DeviceService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;

/**
* 设备网关控制器
* @author Java实战
*/
@Slf4j
@RestController
@RequestMapping("/api/gateway")
public class DeviceGatewayController {

@Autowired
private DeviceService deviceService;

/**
* 注册设备
*/
@PostMapping("/device/register")
public ResponseEntity<Device> registerDevice(@RequestBody Device device) {
try {
Device registeredDevice = deviceService.registerDevice(
device.getDeviceId(),
device.getDeviceName(),
device.getDeviceType(),
device.getProtocol(),
device.getIpAddress(),
device.getPort()
);

return ResponseEntity.ok(registeredDevice);

} catch (Exception e) {
log.error("设备注册失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 查询在线设备
*/
@GetMapping("/devices/online")
public ResponseEntity<List<Device>> getOnlineDevices() {
try {
List<Device> devices = deviceService.getOnlineDevices();
return ResponseEntity.ok(devices);

} catch (Exception e) {
log.error("查询在线设备失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 根据设备类型查询设备
*/
@GetMapping("/devices/type/{deviceType}")
public ResponseEntity<List<Device>> getDevicesByType(@PathVariable String deviceType) {
try {
List<Device> devices = deviceService.getDevicesByType(deviceType);
return ResponseEntity.ok(devices);

} catch (Exception e) {
log.error("根据设备类型查询设备失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 检查设备是否在线
*/
@GetMapping("/device/{deviceId}/online")
public ResponseEntity<Boolean> isDeviceOnline(@PathVariable String deviceId) {
try {
boolean online = deviceService.isDeviceOnline(deviceId);
return ResponseEntity.ok(online);

} catch (Exception e) {
log.error("检查设备在线状态失败", e);
return ResponseEntity.internalServerError().build();
}
}
}

12. Repository

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.gateway.repository;

import com.gateway.entity.Device;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import java.util.List;
import java.util.Optional;

/**
* 设备Repository
* @author Java实战
*/
@Repository
public interface DeviceRepository extends JpaRepository<Device, Long> {

/**
* 根据设备ID查询
*/
Optional<Device> findByDeviceId(String deviceId);

/**
* 根据在线状态查询
*/
List<Device> findByOnlineStatus(String onlineStatus);

/**
* 根据设备类型查询
*/
List<Device> findByDeviceType(String deviceType);
}

13. 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.gateway.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
* 设备网关配置
* @author Java实战
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "device.gateway")
public class DeviceGatewayConfig {

/**
* Netty配置
*/
private NettyConfig netty;

/**
* 设备类型配置
*/
private List<DeviceTypeConfig> deviceTypes;

/**
* 消息路由配置
*/
private RoutingConfig routing;

@Data
public static class NettyConfig {
private int port;
private int bossThreads;
private int workerThreads;
private int connectTimeout;
private int heartbeatInterval;
private int maxConnections;
}

@Data
public static class DeviceTypeConfig {
private String type;
private String topic;
private String protocol;
private int heartbeatInterval;
}

@Data
public static class RoutingConfig {
private String defaultTopic;
private List<RoutingRule> rules;
}

@Data
public static class RoutingRule {
private String condition;
private String topic;
}
}

14. 总结

Netty/Kafka多类设备网关是IoT系统的核心组件。通过本文的详细介绍,我们了解了:

  1. Netty服务器: 高性能的TCP服务器实现
  2. 设备连接管理: 设备注册、在线状态管理、心跳检测
  3. 协议解析: 支持JSON、二进制、Modbus等多种协议
  4. 消息路由: 智能路由到不同的Kafka Topic
  5. 负载均衡: 支持大量设备并发连接

通过合理的网关设计和实现,可以为IoT系统提供稳定、高效的设备接入能力。


Java实战要点:

  • Netty提供高性能的网络通信能力
  • 使用Channel属性存储设备信息
  • 心跳检测保证连接稳定性
  • 协议解析支持多种设备类型
  • Kafka实现消息的可靠传输

代码注解说明:

  • @ChannelHandler.Sharable: 可共享的处理器
  • ChannelInboundHandlerAdapter: 入站处理器适配器
  • IdleStateEvent: 空闲状态事件
  • AttributeKeys: Channel属性键
  • 消息路由: 根据设备类型和规则智能路由