设备服务完整逻辑代码Java微服务后端架构实战

1. 架构概述

设备服务系统是IoT平台的核心业务模块,需要支持海量设备的接入、认证、数据上报、指令下发和状态管理。本篇文章将深入讲解如何基于Java微服务架构实现一个完整的设备服务系统,包含Netty设备接入、Kafka消息队列、设备服务、数据服务、指令服务等多个微服务的完整业务逻辑代码。

1.1 系统架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
设备服务完整架构
├── Netty接入服务
│ ├── 设备连接接入
│ ├── 协议解析
│ ├── 消息路由
│ └── 连接管理

├── 设备服务 (Device Service)
│ ├── 设备注册
│ ├── 设备认证
│ ├── 设备管理
│ └── 设备状态

├── 数据服务 (Data Service)
│ ├── 设备数据接收
│ ├── 数据解析
│ ├── 数据存储
│ └── 数据查询

├── 指令服务 (Command Service)
│ ├── 指令下发
│ ├── 指令状态跟踪
│ └── 指令响应处理

├── Kafka消息队列
│ ├── 设备数据Topic
│ ├── 设备事件Topic
│ ├── 设备指令Topic
│ └── 设备状态Topic

└── 数据库/缓存
├── MySQL - 设备信息、指令记录
└── Redis - 设备状态、连接信息

1.2 核心组件

  • Netty接入服务:负责设备TCP连接接入、协议解析、消息路由
  • 设备服务(Device Service):负责设备注册、认证、管理、状态维护
  • 数据服务(Data Service):负责设备数据接收、解析、存储、查询
  • 指令服务(Command Service):负责设备指令下发、状态跟踪、响应处理
  • Kafka消息队列:负责设备数据、事件、指令的异步处理
  • 数据库(MySQL):持久化设备信息、指令记录、数据记录
  • 缓存(Redis):缓存设备状态、连接信息、在线状态

2. Netty接入服务实现

2.1 Netty服务器启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* Netty服务器启动类
* 负责设备TCP连接接入
*/
@SpringBootApplication
@Slf4j
public class NettyDeviceServerApplication {

@Autowired
private NettyServerConfig nettyConfig;

@Autowired
private DeviceChannelHandler deviceChannelHandler;

public static void main(String[] args) {
SpringApplication.run(NettyDeviceServerApplication.class, args);
}

@PostConstruct
public void startNettyServer() {
new Thread(() -> {
try {
startServer();
} catch (Exception e) {
log.error("Netty服务器启动失败: error={}", e.getMessage(), e);
System.exit(1);
}
}).start();
}

/**
* 启动Netty服务器
*/
private void startServer() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(nettyConfig.getBossThreads());
EventLoopGroup workerGroup = new NioEventLoopGroup(nettyConfig.getWorkerThreads());

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, nettyConfig.getBacklog())
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_RCVBUF, nettyConfig.getReceiveBufferSize())
.childOption(ChannelOption.SO_SNDBUF, nettyConfig.getSendBufferSize())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

// 添加编解码器
pipeline.addLast(new LengthFieldBasedFrameDecoder(
nettyConfig.getMaxFrameLength(), 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new DeviceMessageDecoder());
pipeline.addLast(new DeviceMessageEncoder());

// 添加业务处理器
pipeline.addLast(deviceChannelHandler);
}
});

ChannelFuture future = bootstrap.bind(nettyConfig.getPort()).sync();
log.info("Netty设备接入服务器启动成功: port={}", nettyConfig.getPort());

future.channel().closeFuture().sync();

} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

2.2 Netty通道处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/**
* 设备通道处理器
* 处理设备连接、消息接收、消息发送
*/
@Component
@Slf4j
@ChannelHandler.Sharable
public class DeviceChannelHandler extends ChannelInboundHandlerAdapter {

@Autowired
private DeviceConnectionManager connectionManager;

@Autowired
private DeviceMessageProcessor messageProcessor;

@Autowired
private DeviceAuthService deviceAuthService;

/**
* 通道激活
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("设备连接建立: remoteAddress={}", ctx.channel().remoteAddress());
super.channelActive(ctx);
}

/**
* 通道失活
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String deviceId = connectionManager.getDeviceIdByChannel(ctx.channel());
if (deviceId != null) {
connectionManager.removeConnection(deviceId);
log.info("设备连接断开: deviceId={}, remoteAddress={}",
deviceId, ctx.channel().remoteAddress());
}
super.channelInactive(ctx);
}

/**
* 读取消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
DeviceMessage message = (DeviceMessage) msg;

// 1. 处理设备认证
if (message.getType() == DeviceMessageType.AUTH) {
handleAuth(ctx, message);
return;
}

// 2. 验证设备是否已认证
String deviceId = connectionManager.getDeviceIdByChannel(ctx.channel());
if (deviceId == null) {
log.warn("设备未认证,拒绝消息: remoteAddress={}",
ctx.channel().remoteAddress());
ctx.close();
return;
}

// 3. 处理设备消息
messageProcessor.processMessage(deviceId, message, ctx.channel());

} catch (Exception e) {
log.error("处理设备消息失败: error={}", e.getMessage(), e);
ctx.close();
}
}

/**
* 处理设备认证
*/
private void handleAuth(ChannelHandlerContext ctx, DeviceMessage message) {
try {
AuthMessage authMessage = (AuthMessage) message.getData();

// 1. 设备认证
DeviceAuthResult authResult = deviceAuthService.authenticate(
authMessage.getDeviceId(),
authMessage.getDeviceKey());

if (!authResult.isSuccess()) {
// 认证失败,发送失败响应
DeviceMessage response = DeviceMessage.createAuthResponse(
false, authResult.getMessage());
ctx.writeAndFlush(response);
ctx.close();
return;
}

// 2. 注册设备连接
connectionManager.registerConnection(
authResult.getDeviceInfo().getDeviceId(),
ctx.channel());

// 3. 发送认证成功响应
DeviceMessage response = DeviceMessage.createAuthResponse(true, "认证成功");
ctx.writeAndFlush(response);

log.info("设备认证成功: deviceId={}, remoteAddress={}",
authResult.getDeviceInfo().getDeviceId(),
ctx.channel().remoteAddress());

} catch (Exception e) {
log.error("处理设备认证失败: error={}", e.getMessage(), e);
ctx.close();
}
}

/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("通道异常: remoteAddress={}, error={}",
ctx.channel().remoteAddress(), cause.getMessage(), cause);
ctx.close();
}
}

2.3 设备连接管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* 设备连接管理器
* 管理设备连接、通道映射
*/
@Component
@Slf4j
public class DeviceConnectionManager {

private final ConcurrentHashMap<String, Channel> deviceChannels = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Channel, String> channelDevices = new ConcurrentHashMap();
private final ConcurrentHashMap<String, DeviceInfo> deviceInfos = new ConcurrentHashMap<>();

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 注册设备连接
*/
public void registerConnection(String deviceId, Channel channel) {
deviceChannels.put(deviceId, channel);
channelDevices.put(channel, deviceId);

// 更新Redis中的设备在线状态
String key = "device:online:" + deviceId;
redisTemplate.opsForValue().set(key, "1", 30, TimeUnit.MINUTES);

log.info("注册设备连接: deviceId={}, channelId={}", deviceId, channel.id());
}

/**
* 移除设备连接
*/
public void removeConnection(String deviceId) {
Channel channel = deviceChannels.remove(deviceId);
if (channel != null) {
channelDevices.remove(channel);

// 更新Redis中的设备离线状态
String key = "device:online:" + deviceId;
redisTemplate.delete(key);

log.info("移除设备连接: deviceId={}, channelId={}", deviceId, channel.id());
}
}

/**
* 根据设备ID获取通道
*/
public Channel getChannelByDeviceId(String deviceId) {
return deviceChannels.get(deviceId);
}

/**
* 根据通道获取设备ID
*/
public String getDeviceIdByChannel(Channel channel) {
return channelDevices.get(channel);
}

/**
* 检查设备是否在线
*/
public boolean isDeviceOnline(String deviceId) {
return deviceChannels.containsKey(deviceId);
}

/**
* 发送消息到设备
*/
public boolean sendMessage(String deviceId, DeviceMessage message) {
Channel channel = deviceChannels.get(deviceId);
if (channel == null || !channel.isActive()) {
log.warn("设备不在线,无法发送消息: deviceId={}", deviceId);
return false;
}

try {
channel.writeAndFlush(message);
return true;
} catch (Exception e) {
log.error("发送消息到设备失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
return false;
}
}

/**
* 获取所有在线设备ID
*/
public Set<String> getAllOnlineDeviceIds() {
return new HashSet<>(deviceChannels.keySet());
}
}

2.4 设备消息处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
/**
* 设备消息处理器
* 处理设备上报的各种消息
*/
@Component
@Slf4j
public class DeviceMessageProcessor {

@Autowired
private DeviceDataService deviceDataService;

@Autowired
private DeviceCommandService deviceCommandService;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private DeviceConnectionManager connectionManager;

/**
* 处理设备消息
*/
public void processMessage(String deviceId, DeviceMessage message, Channel channel) {
try {
switch (message.getType()) {
case DATA_REPORT:
handleDataReport(deviceId, message);
break;
case HEARTBEAT:
handleHeartbeat(deviceId, message);
break;
case COMMAND_RESPONSE:
handleCommandResponse(deviceId, message);
break;
case STATUS_REPORT:
handleStatusReport(deviceId, message);
break;
default:
log.warn("未知的消息类型: deviceId={}, messageType={}",
deviceId, message.getType());
}

} catch (Exception e) {
log.error("处理设备消息失败: deviceId={}, messageType={}, error={}",
deviceId, message.getType(), e.getMessage(), e);
}
}

/**
* 处理数据上报
*/
private void handleDataReport(String deviceId, DeviceMessage message) {
try {
DataReportMessage dataMessage = (DataReportMessage) message.getData();

// 1. 构建设备数据对象
DeviceData deviceData = new DeviceData();
deviceData.setDeviceId(deviceId);
deviceData.setDataType(dataMessage.getDataType());
deviceData.setDataContent(dataMessage.getDataContent());
deviceData.setTimestamp(LocalDateTime.now());

// 2. 保存设备数据
deviceDataService.saveDeviceData(deviceData);

// 3. 发送到Kafka
sendToKafka("device.data.report", deviceId, deviceData);

log.debug("处理设备数据上报: deviceId={}, dataType={}",
deviceId, dataMessage.getDataType());

} catch (Exception e) {
log.error("处理设备数据上报失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
}
}

/**
* 处理心跳
*/
private void handleHeartbeat(String deviceId, DeviceMessage message) {
try {
// 1. 更新设备心跳时间
String key = "device:heartbeat:" + deviceId;
redisTemplate.opsForValue().set(key, System.currentTimeMillis(),
5, TimeUnit.MINUTES);

// 2. 发送心跳响应
DeviceMessage response = DeviceMessage.createHeartbeatResponse();
connectionManager.sendMessage(deviceId, response);

log.debug("处理设备心跳: deviceId={}", deviceId);

} catch (Exception e) {
log.error("处理设备心跳失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
}
}

/**
* 处理指令响应
*/
private void handleCommandResponse(String deviceId, DeviceMessage message) {
try {
CommandResponseMessage responseMessage = (CommandResponseMessage) message.getData();

// 1. 更新指令状态
deviceCommandService.updateCommandStatus(
responseMessage.getCommandId(),
responseMessage.getStatus(),
responseMessage.getResponseData());

// 2. 发送到Kafka
sendToKafka("device.command.response", deviceId, responseMessage);

log.info("处理设备指令响应: deviceId={}, commandId={}, status={}",
deviceId, responseMessage.getCommandId(), responseMessage.getStatus());

} catch (Exception e) {
log.error("处理设备指令响应失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
}
}

/**
* 处理状态上报
*/
private void handleStatusReport(String deviceId, DeviceMessage message) {
try {
StatusReportMessage statusMessage = (StatusReportMessage) message.getData();

// 1. 更新设备状态
deviceDataService.updateDeviceStatus(deviceId, statusMessage.getStatus());

// 2. 发送到Kafka
sendToKafka("device.status.report", deviceId, statusMessage);

log.info("处理设备状态上报: deviceId={}, status={}",
deviceId, statusMessage.getStatus());

} catch (Exception e) {
log.error("处理设备状态上报失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
}
}

/**
* 发送到Kafka
*/
private void sendToKafka(String topic, String deviceId, Object data) {
try {
String message = JSON.toJSONString(data);
kafkaTemplate.send(topic, deviceId, message);
} catch (Exception e) {
log.error("发送到Kafka失败: topic={}, deviceId={}, error={}",
topic, deviceId, e.getMessage(), e);
}
}
}

3. 设备服务实现

3.1 设备服务控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* 设备服务控制器
* 提供设备管理接口
*/
@RestController
@RequestMapping("/api/device")
@Slf4j
public class DeviceController {

@Autowired
private DeviceService deviceService;

@Autowired
private DeviceAuthService deviceAuthService;

/**
* 设备注册
*/
@PostMapping("/register")
public Result<DeviceInfo> registerDevice(@RequestBody @Valid DeviceRegisterRequest request) {
try {
DeviceInfo deviceInfo = deviceService.registerDevice(request);
return Result.success(deviceInfo);
} catch (Exception e) {
log.error("设备注册失败: error={}", e.getMessage(), e);
return Result.error("设备注册失败: " + e.getMessage());
}
}

/**
* 设备认证
*/
@PostMapping("/auth")
public Result<DeviceAuthResult> authenticateDevice(
@RequestBody @Valid DeviceAuthRequest request) {
try {
DeviceAuthResult result = deviceAuthService.authenticate(
request.getDeviceId(), request.getDeviceKey());
return Result.success(result);
} catch (Exception e) {
log.error("设备认证失败: error={}", e.getMessage(), e);
return Result.error("设备认证失败: " + e.getMessage());
}
}

/**
* 获取设备信息
*/
@GetMapping("/{deviceId}")
public Result<DeviceInfo> getDeviceInfo(@PathVariable String deviceId) {
try {
DeviceInfo deviceInfo = deviceService.getDeviceInfo(deviceId);
return Result.success(deviceInfo);
} catch (Exception e) {
log.error("获取设备信息失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
return Result.error("获取设备信息失败: " + e.getMessage());
}
}

/**
* 更新设备信息
*/
@PutMapping("/{deviceId}")
public Result<DeviceInfo> updateDeviceInfo(
@PathVariable String deviceId,
@RequestBody @Valid DeviceUpdateRequest request) {
try {
DeviceInfo deviceInfo = deviceService.updateDeviceInfo(deviceId, request);
return Result.success(deviceInfo);
} catch (Exception e) {
log.error("更新设备信息失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
return Result.error("更新设备信息失败: " + e.getMessage());
}
}

/**
* 获取设备列表
*/
@GetMapping("/list")
public Result<PageResult<DeviceInfo>> getDeviceList(
@RequestParam(required = false) String deviceType,
@RequestParam(required = false) String status,
@RequestParam(defaultValue = "1") Integer pageNum,
@RequestParam(defaultValue = "10") Integer pageSize) {
try {
PageResult<DeviceInfo> result = deviceService.getDeviceList(
deviceType, status, pageNum, pageSize);
return Result.success(result);
} catch (Exception e) {
log.error("获取设备列表失败: error={}", e.getMessage(), e);
return Result.error("获取设备列表失败: " + e.getMessage());
}
}
}

3.2 设备服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
/**
* 设备服务实现
* 负责设备注册、管理、状态维护
*/
@Service
@Slf4j
public class DeviceService {

@Autowired
private DeviceMapper deviceMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 设备注册
*/
@Transactional(rollbackFor = Exception.class)
public DeviceInfo registerDevice(DeviceRegisterRequest request) {
try {
// 1. 检查设备是否已存在
Device device = deviceMapper.selectOne(
new LambdaQueryWrapper<Device>()
.eq(Device::getDeviceId, request.getDeviceId()));
if (device != null) {
throw new BusinessException("设备已存在");
}

// 2. 生成设备密钥
String deviceKey = generateDeviceKey();

// 3. 创建设备
device = new Device();
device.setDeviceId(request.getDeviceId());
device.setDeviceName(request.getDeviceName());
device.setDeviceType(request.getDeviceType());
device.setDeviceKey(deviceKey);
device.setStatus("ACTIVE");
device.setCreateTime(LocalDateTime.now());
device.setUpdateTime(LocalDateTime.now());

deviceMapper.insert(device);

// 4. 构建设备信息
DeviceInfo deviceInfo = convertToDeviceInfo(device);

// 5. 发送设备注册事件
sendDeviceRegisterEvent(deviceInfo);

log.info("设备注册成功: deviceId={}, deviceType={}",
request.getDeviceId(), request.getDeviceType());

return deviceInfo;

} catch (Exception e) {
log.error("设备注册失败: error={}", e.getMessage(), e);
throw new BusinessException("设备注册失败: " + e.getMessage());
}
}

/**
* 获取设备信息
*/
public DeviceInfo getDeviceInfo(String deviceId) {
try {
// 1. 从缓存获取
String cacheKey = "device:info:" + deviceId;
DeviceInfo cachedInfo = (DeviceInfo) redisTemplate.opsForValue().get(cacheKey);
if (cachedInfo != null) {
return cachedInfo;
}

// 2. 从数据库查询
Device device = deviceMapper.selectOne(
new LambdaQueryWrapper<Device>()
.eq(Device::getDeviceId, deviceId));
if (device == null) {
throw new BusinessException("设备不存在");
}

// 3. 构建设备信息
DeviceInfo deviceInfo = convertToDeviceInfo(device);

// 4. 缓存设备信息
redisTemplate.opsForValue().set(cacheKey, deviceInfo, 1, TimeUnit.HOURS);

return deviceInfo;

} catch (Exception e) {
log.error("获取设备信息失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
throw new BusinessException("获取设备信息失败: " + e.getMessage());
}
}

/**
* 更新设备信息
*/
@Transactional(rollbackFor = Exception.class)
public DeviceInfo updateDeviceInfo(String deviceId, DeviceUpdateRequest request) {
try {
// 1. 获取设备
Device device = deviceMapper.selectOne(
new LambdaQueryWrapper<Device>()
.eq(Device::getDeviceId, deviceId));
if (device == null) {
throw new BusinessException("设备不存在");
}

// 2. 更新设备信息
if (request.getDeviceName() != null) {
device.setDeviceName(request.getDeviceName());
}
if (request.getStatus() != null) {
device.setStatus(request.getStatus());
}
device.setUpdateTime(LocalDateTime.now());

deviceMapper.updateById(device);

// 3. 清除缓存
String cacheKey = "device:info:" + deviceId;
redisTemplate.delete(cacheKey);

// 4. 构建设备信息
DeviceInfo deviceInfo = convertToDeviceInfo(device);

log.info("更新设备信息成功: deviceId={}", deviceId);

return deviceInfo;

} catch (Exception e) {
log.error("更新设备信息失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
throw new BusinessException("更新设备信息失败: " + e.getMessage());
}
}

/**
* 获取设备列表
*/
public PageResult<DeviceInfo> getDeviceList(String deviceType, String status,
Integer pageNum, Integer pageSize) {
try {
// 1. 构建查询条件
LambdaQueryWrapper<Device> wrapper = new LambdaQueryWrapper<>();
if (deviceType != null && !deviceType.isEmpty()) {
wrapper.eq(Device::getDeviceType, deviceType);
}
if (status != null && !status.isEmpty()) {
wrapper.eq(Device::getStatus, status);
}
wrapper.orderByDesc(Device::getCreateTime);

// 2. 分页查询
Page<Device> page = new Page<>(pageNum, pageSize);
Page<Device> result = deviceMapper.selectPage(page, wrapper);

// 3. 转换为设备信息列表
List<DeviceInfo> deviceInfoList = result.getRecords().stream()
.map(this::convertToDeviceInfo)
.collect(Collectors.toList());

// 4. 构建分页结果
PageResult<DeviceInfo> pageResult = new PageResult<>();
pageResult.setList(deviceInfoList);
pageResult.setTotal(result.getTotal());
pageResult.setPageNum(pageNum);
pageResult.setPageSize(pageSize);

return pageResult;

} catch (Exception e) {
log.error("获取设备列表失败: error={}", e.getMessage(), e);
throw new BusinessException("获取设备列表失败: " + e.getMessage());
}
}

/**
* 生成设备密钥
*/
private String generateDeviceKey() {
return UUID.randomUUID().toString().replace("-", "");
}

/**
* 转换为设备信息
*/
private DeviceInfo convertToDeviceInfo(Device device) {
DeviceInfo deviceInfo = new DeviceInfo();
deviceInfo.setDeviceId(device.getDeviceId());
deviceInfo.setDeviceName(device.getDeviceName());
deviceInfo.setDeviceType(device.getDeviceType());
deviceInfo.setStatus(device.getStatus());
deviceInfo.setCreateTime(device.getCreateTime());
return deviceInfo;
}

/**
* 发送设备注册事件
*/
private void sendDeviceRegisterEvent(DeviceInfo deviceInfo) {
try {
DeviceEvent event = new DeviceEvent();
event.setEventType("DEVICE_REGISTER");
event.setDeviceId(deviceInfo.getDeviceId());
event.setDeviceInfo(deviceInfo);
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("device.event", deviceInfo.getDeviceId(), message);

} catch (Exception e) {
log.error("发送设备注册事件失败: error={}", e.getMessage(), e);
}
}
}

3.3 设备认证服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 设备认证服务
* 负责设备认证
*/
@Service
@Slf4j
public class DeviceAuthService {

@Autowired
private DeviceMapper deviceMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 设备认证
*/
public DeviceAuthResult authenticate(String deviceId, String deviceKey) {
try {
// 1. 从缓存获取认证结果
String cacheKey = "device:auth:" + deviceId + ":" + deviceKey;
DeviceAuthResult cachedResult = (DeviceAuthResult)
redisTemplate.opsForValue().get(cacheKey);
if (cachedResult != null) {
return cachedResult;
}

// 2. 查询设备
Device device = deviceMapper.selectOne(
new LambdaQueryWrapper<Device>()
.eq(Device::getDeviceId, deviceId));
if (device == null) {
return DeviceAuthResult.failed("设备不存在");
}

// 3. 验证设备密钥
if (!device.getDeviceKey().equals(deviceKey)) {
return DeviceAuthResult.failed("设备密钥错误");
}

// 4. 验证设备状态
if (!"ACTIVE".equals(device.getStatus())) {
return DeviceAuthResult.failed("设备状态异常");
}

// 5. 构建认证结果
DeviceInfo deviceInfo = new DeviceInfo();
deviceInfo.setDeviceId(device.getDeviceId());
deviceInfo.setDeviceName(device.getDeviceName());
deviceInfo.setDeviceType(device.getDeviceType());
deviceInfo.setStatus(device.getStatus());

DeviceAuthResult result = DeviceAuthResult.success(deviceInfo);

// 6. 缓存认证结果
redisTemplate.opsForValue().set(cacheKey, result, 30, TimeUnit.MINUTES);

log.info("设备认证成功: deviceId={}", deviceId);

return result;

} catch (Exception e) {
log.error("设备认证失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
return DeviceAuthResult.failed("设备认证失败: " + e.getMessage());
}
}
}

4. 数据服务实现

4.1 数据服务控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 数据服务控制器
* 提供设备数据接口
*/
@RestController
@RequestMapping("/api/data")
@Slf4j
public class DataController {

@Autowired
private DeviceDataService deviceDataService;

/**
* 保存设备数据
*/
@PostMapping("/save")
public Result<Void> saveDeviceData(@RequestBody @Valid DeviceData deviceData) {
try {
deviceDataService.saveDeviceData(deviceData);
return Result.success();
} catch (Exception e) {
log.error("保存设备数据失败: error={}", e.getMessage(), e);
return Result.error("保存设备数据失败: " + e.getMessage());
}
}

/**
* 获取设备数据
*/
@GetMapping("/{deviceId}")
public Result<List<DeviceData>> getDeviceData(
@PathVariable String deviceId,
@RequestParam(required = false) String dataType,
@RequestParam(required = false) String startTime,
@RequestParam(required = false) String endTime) {
try {
List<DeviceData> dataList = deviceDataService.getDeviceData(
deviceId, dataType, startTime, endTime);
return Result.success(dataList);
} catch (Exception e) {
log.error("获取设备数据失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
return Result.error("获取设备数据失败: " + e.getMessage());
}
}

/**
* 更新设备状态
*/
@PutMapping("/status/{deviceId}")
public Result<Void> updateDeviceStatus(
@PathVariable String deviceId,
@RequestBody @Valid DeviceStatusUpdateRequest request) {
try {
deviceDataService.updateDeviceStatus(deviceId, request.getStatus());
return Result.success();
} catch (Exception e) {
log.error("更新设备状态失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
return Result.error("更新设备状态失败: " + e.getMessage());
}
}
}

4.2 数据服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/**
* 设备数据服务实现
* 负责设备数据保存、查询、状态更新
*/
@Service
@Slf4j
public class DeviceDataService {

@Autowired
private DeviceDataMapper deviceDataMapper;

@Autowired
private DeviceMapper deviceMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 保存设备数据
*/
@Transactional(rollbackFor = Exception.class)
public void saveDeviceData(DeviceData deviceData) {
try {
// 1. 保存到数据库
DeviceDataRecord record = new DeviceDataRecord();
record.setDeviceId(deviceData.getDeviceId());
record.setDataType(deviceData.getDataType());
record.setDataContent(JSON.toJSONString(deviceData.getDataContent()));
record.setTimestamp(deviceData.getTimestamp());
record.setCreateTime(LocalDateTime.now());

deviceDataMapper.insert(record);

// 2. 保存到Redis(实时数据)
String cacheKey = "device:data:realtime:" + deviceData.getDeviceId() +
":" + deviceData.getDataType();
redisTemplate.opsForValue().set(cacheKey, deviceData, 1, TimeUnit.HOURS);

// 3. 发送到Kafka
sendToKafka("device.data.save", deviceData.getDeviceId(), deviceData);

log.debug("保存设备数据成功: deviceId={}, dataType={}",
deviceData.getDeviceId(), deviceData.getDataType());

} catch (Exception e) {
log.error("保存设备数据失败: deviceId={}, error={}",
deviceData.getDeviceId(), e.getMessage(), e);
throw new BusinessException("保存设备数据失败: " + e.getMessage());
}
}

/**
* 获取设备数据
*/
public List<DeviceData> getDeviceData(String deviceId, String dataType,
String startTime, String endTime) {
try {
// 1. 构建查询条件
LambdaQueryWrapper<DeviceDataRecord> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(DeviceDataRecord::getDeviceId, deviceId);
if (dataType != null && !dataType.isEmpty()) {
wrapper.eq(DeviceDataRecord::getDataType, dataType);
}
if (startTime != null && !startTime.isEmpty()) {
wrapper.ge(DeviceDataRecord::getTimestamp, LocalDateTime.parse(startTime));
}
if (endTime != null && !endTime.isEmpty()) {
wrapper.le(DeviceDataRecord::getTimestamp, LocalDateTime.parse(endTime));
}
wrapper.orderByDesc(DeviceDataRecord::getTimestamp);
wrapper.last("LIMIT 1000");

// 2. 查询数据
List<DeviceDataRecord> records = deviceDataMapper.selectList(wrapper);

// 3. 转换为设备数据列表
List<DeviceData> dataList = records.stream().map(record -> {
DeviceData data = new DeviceData();
data.setDeviceId(record.getDeviceId());
data.setDataType(record.getDataType());
data.setDataContent(JSON.parseObject(record.getDataContent(), Map.class));
data.setTimestamp(record.getTimestamp());
return data;
}).collect(Collectors.toList());

return dataList;

} catch (Exception e) {
log.error("获取设备数据失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
throw new BusinessException("获取设备数据失败: " + e.getMessage());
}
}

/**
* 更新设备状态
*/
public void updateDeviceStatus(String deviceId, String status) {
try {
// 1. 更新数据库
Device device = deviceMapper.selectOne(
new LambdaQueryWrapper<Device>()
.eq(Device::getDeviceId, deviceId));
if (device != null) {
device.setStatus(status);
device.setUpdateTime(LocalDateTime.now());
deviceMapper.updateById(device);
}

// 2. 更新Redis
String cacheKey = "device:status:" + deviceId;
redisTemplate.opsForValue().set(cacheKey, status, 1, TimeUnit.HOURS);

// 3. 发送到Kafka
DeviceStatusEvent event = new DeviceStatusEvent();
event.setDeviceId(deviceId);
event.setStatus(status);
event.setEventTime(LocalDateTime.now());
sendToKafka("device.status.update", deviceId, event);

log.info("更新设备状态成功: deviceId={}, status={}", deviceId, status);

} catch (Exception e) {
log.error("更新设备状态失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
throw new BusinessException("更新设备状态失败: " + e.getMessage());
}
}

/**
* 发送到Kafka
*/
private void sendToKafka(String topic, String deviceId, Object data) {
try {
String message = JSON.toJSONString(data);
kafkaTemplate.send(topic, deviceId, message);
} catch (Exception e) {
log.error("发送到Kafka失败: topic={}, deviceId={}, error={}",
topic, deviceId, e.getMessage(), e);
}
}
}

5. 指令服务实现

5.1 指令服务控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 指令服务控制器
* 提供设备指令接口
*/
@RestController
@RequestMapping("/api/command")
@Slf4j
public class CommandController {

@Autowired
private DeviceCommandService deviceCommandService;

/**
* 下发设备指令
*/
@PostMapping("/send")
public Result<DeviceCommand> sendCommand(@RequestBody @Valid CommandSendRequest request) {
try {
DeviceCommand command = deviceCommandService.sendCommand(request);
return Result.success(command);
} catch (Exception e) {
log.error("下发设备指令失败: error={}", e.getMessage(), e);
return Result.error("下发设备指令失败: " + e.getMessage());
}
}

/**
* 获取指令状态
*/
@GetMapping("/{commandId}")
public Result<DeviceCommand> getCommandStatus(@PathVariable Long commandId) {
try {
DeviceCommand command = deviceCommandService.getCommand(commandId);
return Result.success(command);
} catch (Exception e) {
log.error("获取指令状态失败: commandId={}, error={}",
commandId, e.getMessage(), e);
return Result.error("获取指令状态失败: " + e.getMessage());
}
}

/**
* 获取设备指令列表
*/
@GetMapping("/device/{deviceId}")
public Result<List<DeviceCommand>> getDeviceCommands(
@PathVariable String deviceId,
@RequestParam(required = false) String status) {
try {
List<DeviceCommand> commands = deviceCommandService.getDeviceCommands(
deviceId, status);
return Result.success(commands);
} catch (Exception e) {
log.error("获取设备指令列表失败: deviceId={}, error={}",
deviceId, e.getMessage(), e);
return Result.error("获取设备指令列表失败: " + e.getMessage());
}
}
}

5.2 指令服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/**
* 设备指令服务实现
* 负责设备指令下发、状态跟踪
*/
@Service
@Slf4j
public class DeviceCommandService {

@Autowired
private DeviceCommandMapper deviceCommandMapper;

@Autowired
private DeviceConnectionManager connectionManager;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 下发设备指令
*/
@Transactional(rollbackFor = Exception.class)
public DeviceCommand sendCommand(CommandSendRequest request) {
try {
// 1. 检查设备是否在线
if (!connectionManager.isDeviceOnline(request.getDeviceId())) {
throw new BusinessException("设备不在线");
}

// 2. 创建指令记录
DeviceCommand command = new DeviceCommand();
command.setCommandNo(generateCommandNo());
command.setDeviceId(request.getDeviceId());
command.setCommandType(request.getCommandType());
command.setCommandData(JSON.toJSONString(request.getCommandData()));
command.setStatus("PENDING");
command.setCreateTime(LocalDateTime.now());
command.setUpdateTime(LocalDateTime.now());

deviceCommandMapper.insert(command);

// 3. 发送指令到设备
DeviceMessage message = DeviceMessage.createCommandMessage(
command.getId(),
request.getCommandType(),
request.getCommandData());

boolean sent = connectionManager.sendMessage(request.getDeviceId(), message);
if (!sent) {
command.setStatus("FAILED");
command.setErrorMessage("设备不在线");
deviceCommandMapper.updateById(command);
throw new BusinessException("发送指令失败:设备不在线");
}

// 4. 更新指令状态
command.setStatus("SENT");
deviceCommandMapper.updateById(command);

// 5. 发送到Kafka
sendToKafka("device.command.send", request.getDeviceId(), command);

log.info("下发设备指令成功: commandId={}, deviceId={}, commandType={}",
command.getId(), request.getDeviceId(), request.getCommandType());

return command;

} catch (Exception e) {
log.error("下发设备指令失败: deviceId={}, error={}",
request.getDeviceId(), e.getMessage(), e);
throw new BusinessException("下发设备指令失败: " + e.getMessage());
}
}

/**
* 更新指令状态
*/
@Transactional(rollbackFor = Exception.class)
public void updateCommandStatus(Long commandId, String status, String responseData) {
try {
// 1. 获取指令
DeviceCommand command = deviceCommandMapper.selectById(commandId);
if (command == null) {
throw new BusinessException("指令不存在");
}

// 2. 更新指令状态
command.setStatus(status);
command.setResponseData(responseData);
command.setUpdateTime(LocalDateTime.now());

deviceCommandMapper.updateById(command);

// 3. 发送到Kafka
sendToKafka("device.command.status", command.getDeviceId(), command);

log.info("更新指令状态成功: commandId={}, status={}", commandId, status);

} catch (Exception e) {
log.error("更新指令状态失败: commandId={}, error={}",
commandId, e.getMessage(), e);
throw new BusinessException("更新指令状态失败: " + e.getMessage());
}
}

/**
* 获取指令
*/
public DeviceCommand getCommand(Long commandId) {
DeviceCommand command = deviceCommandMapper.selectById(commandId);
if (command == null) {
throw new BusinessException("指令不存在");
}
return command;
}

/**
* 获取设备指令列表
*/
public List<DeviceCommand> getDeviceCommands(String deviceId, String status) {
LambdaQueryWrapper<DeviceCommand> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(DeviceCommand::getDeviceId, deviceId);
if (status != null && !status.isEmpty()) {
wrapper.eq(DeviceCommand::getStatus, status);
}
wrapper.orderByDesc(DeviceCommand::getCreateTime);
wrapper.last("LIMIT 100");

return deviceCommandMapper.selectList(wrapper);
}

/**
* 生成指令编号
*/
private String generateCommandNo() {
return "CMD_" + System.currentTimeMillis() + "_" +
UUID.randomUUID().toString().substring(0, 8);
}

/**
* 发送到Kafka
*/
private void sendToKafka(String topic, String deviceId, Object data) {
try {
String message = JSON.toJSONString(data);
kafkaTemplate.send(topic, deviceId, message);
} catch (Exception e) {
log.error("发送到Kafka失败: topic={}, deviceId={}, error={}",
topic, deviceId, e.getMessage(), e);
}
}
}

6. Kafka消费者实现

6.1 设备数据消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 设备数据消费者
* 消费Kafka设备数据消息
*/
@Component
@Slf4j
public class DeviceDataConsumer {

@Autowired
private DeviceDataService deviceDataService;

/**
* 消费设备数据上报消息
*/
@KafkaListener(topics = "device.data.report", groupId = "device-data-group")
public void consumeDataReport(String message) {
try {
DeviceData deviceData = JSON.parseObject(message, DeviceData.class);

// 处理设备数据
deviceDataService.saveDeviceData(deviceData);

log.debug("消费设备数据上报消息成功: deviceId={}", deviceData.getDeviceId());

} catch (Exception e) {
log.error("消费设备数据上报消息失败: error={}", e.getMessage(), e);
}
}

/**
* 消费设备数据保存消息
*/
@KafkaListener(topics = "device.data.save", groupId = "device-data-group")
public void consumeDataSave(String message) {
try {
DeviceData deviceData = JSON.parseObject(message, DeviceData.class);

// 可以在这里进行额外的数据处理,如数据统计、告警等

log.debug("消费设备数据保存消息成功: deviceId={}", deviceData.getDeviceId());

} catch (Exception e) {
log.error("消费设备数据保存消息失败: error={}", e.getMessage(), e);
}
}
}

6.2 设备事件消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* 设备事件消费者
* 消费Kafka设备事件消息
*/
@Component
@Slf4j
public class DeviceEventConsumer {

@Autowired
private DeviceService deviceService;

/**
* 消费设备事件消息
*/
@KafkaListener(topics = "device.event", groupId = "device-event-group")
public void consumeDeviceEvent(String message) {
try {
DeviceEvent event = JSON.parseObject(message, DeviceEvent.class);

// 根据事件类型处理
switch (event.getEventType()) {
case "DEVICE_REGISTER":
handleDeviceRegister(event);
break;
case "DEVICE_ONLINE":
handleDeviceOnline(event);
break;
case "DEVICE_OFFLINE":
handleDeviceOffline(event);
break;
default:
log.warn("未知的设备事件类型: eventType={}", event.getEventType());
}

} catch (Exception e) {
log.error("消费设备事件消息失败: error={}", e.getMessage(), e);
}
}

/**
* 处理设备注册事件
*/
private void handleDeviceRegister(DeviceEvent event) {
log.info("处理设备注册事件: deviceId={}", event.getDeviceId());
}

/**
* 处理设备上线事件
*/
private void handleDeviceOnline(DeviceEvent event) {
log.info("处理设备上线事件: deviceId={}", event.getDeviceId());
}

/**
* 处理设备离线事件
*/
private void handleDeviceOffline(DeviceEvent event) {
log.info("处理设备离线事件: deviceId={}", event.getDeviceId());
}
}

6.3 设备指令消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 设备指令消费者
* 消费Kafka设备指令消息
*/
@Component
@Slf4j
public class DeviceCommandConsumer {

@Autowired
private DeviceCommandService deviceCommandService;

/**
* 消费设备指令发送消息
*/
@KafkaListener(topics = "device.command.send", groupId = "device-command-group")
public void consumeCommandSend(String message) {
try {
DeviceCommand command = JSON.parseObject(message, DeviceCommand.class);

// 可以在这里进行指令的额外处理,如指令统计、监控等

log.debug("消费设备指令发送消息成功: commandId={}, deviceId={}",
command.getId(), command.getDeviceId());

} catch (Exception e) {
log.error("消费设备指令发送消息失败: error={}", e.getMessage(), e);
}
}

/**
* 消费设备指令状态消息
*/
@KafkaListener(topics = "device.command.status", groupId = "device-command-group")
public void consumeCommandStatus(String message) {
try {
DeviceCommand command = JSON.parseObject(message, DeviceCommand.class);

// 可以在这里进行指令状态的额外处理,如状态统计、告警等

log.debug("消费设备指令状态消息成功: commandId={}, status={}",
command.getId(), command.getStatus());

} catch (Exception e) {
log.error("消费设备指令状态消息失败: error={}", e.getMessage(), e);
}
}
}

7. 数据模型定义

7.1 设备实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 设备实体
*/
@Data
@TableName("t_device")
public class Device {

@TableId(type = IdType.AUTO)
private Long id;

private String deviceId;

private String deviceName;

private String deviceType;

private String deviceKey;

private String status;

private LocalDateTime createTime;

private LocalDateTime updateTime;
}

7.2 设备数据实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 设备数据实体
*/
@Data
public class DeviceData {

private String deviceId;

private String dataType;

private Map<String, Object> dataContent;

private LocalDateTime timestamp;
}

7.3 设备指令实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 设备指令实体
*/
@Data
@TableName("t_device_command")
public class DeviceCommand {

@TableId(type = IdType.AUTO)
private Long id;

private String commandNo;

private String deviceId;

private String commandType;

private String commandData;

private String status;

private String responseData;

private String errorMessage;

private LocalDateTime createTime;

private LocalDateTime updateTime;
}

8. 数据库设计

8.1 设备表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE `t_device` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`device_id` VARCHAR(64) NOT NULL COMMENT '设备ID',
`device_name` VARCHAR(128) DEFAULT NULL COMMENT '设备名称',
`device_type` VARCHAR(32) NOT NULL COMMENT '设备类型',
`device_key` VARCHAR(128) NOT NULL COMMENT '设备密钥',
`status` VARCHAR(32) NOT NULL DEFAULT 'ACTIVE' COMMENT '状态',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_device_id` (`device_id`),
KEY `idx_device_type` (`device_type`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备表';

8.2 设备数据表

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE `t_device_data` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`device_id` VARCHAR(64) NOT NULL COMMENT '设备ID',
`data_type` VARCHAR(32) NOT NULL COMMENT '数据类型',
`data_content` TEXT COMMENT '数据内容',
`timestamp` DATETIME NOT NULL COMMENT '时间戳',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_device_id` (`device_id`),
KEY `idx_data_type` (`data_type`),
KEY `idx_timestamp` (`timestamp`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备数据表';

8.3 设备指令表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE `t_device_command` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`command_no` VARCHAR(64) NOT NULL COMMENT '指令编号',
`device_id` VARCHAR(64) NOT NULL COMMENT '设备ID',
`command_type` VARCHAR(32) NOT NULL COMMENT '指令类型',
`command_data` TEXT COMMENT '指令数据',
`status` VARCHAR(32) NOT NULL COMMENT '状态:PENDING-待发送, SENT-已发送, SUCCESS-成功, FAILED-失败',
`response_data` TEXT COMMENT '响应数据',
`error_message` VARCHAR(512) DEFAULT NULL COMMENT '错误信息',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_command_no` (`command_no`),
KEY `idx_device_id` (`device_id`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备指令表';

9. 配置类

9.1 Netty配置

1
2
3
4
5
6
7
8
9
10
# application.yml
netty:
server:
port: 8888
boss-threads: 1
worker-threads: 8
backlog: 1024
max-frame-length: 65535
receive-buffer-size: 65535
send-buffer-size: 65535

9.2 Kafka配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
consumer:
group-id: device-service-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
auto-offset-reset: latest

10. 总结

本文深入讲解了设备服务完整逻辑代码的Java微服务后端架构实战,涵盖了以下核心内容:

  1. Netty接入服务:实现设备TCP连接接入、协议解析、消息路由、连接管理
  2. 设备服务:实现设备注册、认证、管理、状态维护
  3. 数据服务:实现设备数据接收、解析、存储、查询、状态更新
  4. 指令服务:实现设备指令下发、状态跟踪、响应处理
  5. Kafka消息队列:实现设备数据、事件、指令的异步处理和消息消费
  6. 多服务协作:通过Kafka实现设备服务、数据服务、指令服务之间的解耦和协作
  7. 数据库设计:完整的设备、设备数据、设备指令表设计
  8. 缓存优化:通过Redis缓存设备信息、状态、在线状态
  9. 性能优化:通过连接池、批量处理、异步处理提升系统性能

通过本文的学习,读者可以掌握如何基于Java微服务架构实现一个完整的设备服务系统,包含Netty设备接入、Kafka消息队列、多服务协作等完整业务逻辑代码,为实际项目的设备服务开发提供参考和指导。