1. 物联网架构概述

在物联网时代,支撑百万台设备的系统架构是企业数字化转型的核心。本文将详细介绍如何设计、实现和优化支撑百万台设备的Java物联网架构,包括设备接入、数据采集、消息处理、设备管理等完整解决方案。

1.1 核心挑战

  1. 设备接入: 支持多种协议和设备的统一接入
  2. 数据采集: 高效采集和处理海量设备数据
  3. 消息处理: 实时处理设备消息和指令
  4. 设备管理: 统一管理百万台设备的生命周期
  5. 系统扩展: 支持设备数量的水平扩展

1.2 技术架构

1
2
3
4
5
设备端 → 协议网关 → 消息队列 → 数据处理 → 业务服务
↓ ↓ ↓ ↓ ↓
传感器 → MQTT/HTTP → Kafka → 数据清洗 → 设备管理
↓ ↓ ↓ ↓ ↓
控制器 → 协议转换 → 消息分发 → 数据存储 → 监控告警

2. 设备接入架构

2.1 Maven依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Spring Boot Data Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!-- MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>

<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.82.Final</version>
</dependency>

<!-- MyBatis Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.2</version>
</dependency>

<!-- Druid连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.8</version>
</dependency>
</dependencies>

2.2 应用配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# application.yml
server:
port: 8080

spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
batch-size: 16384
buffer-memory: 33554432
consumer:
group-id: iot-device-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
max-poll-records: 500

redis:
host: localhost
port: 6379
database: 0
timeout: 5000
lettuce:
pool:
max-active: 200
max-idle: 20
min-idle: 5

# MQTT配置
mqtt:
broker-url: tcp://localhost:1883
username: admin
password: admin
client-id: iot-server
qos: 1
retain: false

3. 设备管理服务

3.1 设备实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* 设备实体类
* 定义设备的基本信息和状态
*/
@Data
@TableName("iot_device")
public class Device {

@TableId(type = IdType.AUTO)
private Long id; // 设备ID

private String deviceId; // 设备唯一标识

private String deviceName; // 设备名称

private String deviceType; // 设备类型

private String protocol; // 通信协议

private String ipAddress; // IP地址

private Integer port; // 端口

private String status; // 设备状态

private Date lastHeartbeat; // 最后心跳时间

private Date createTime; // 创建时间

private Date updateTime; // 更新时间

private String description; // 设备描述

private String location; // 设备位置

private String manufacturer; // 制造商

private String model; // 设备型号

private String version; // 固件版本
}

/**
* 设备数据实体类
* 存储设备上报的数据
*/
@Data
@TableName("iot_device_data")
public class DeviceData {

@TableId(type = IdType.AUTO)
private Long id; // 数据ID

private String deviceId; // 设备ID

private String dataType; // 数据类型

private String dataValue; // 数据值

private String unit; // 数据单位

private Date timestamp; // 时间戳

private String quality; // 数据质量

private String location; // 数据位置
}

3.2 设备管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/**
* 设备管理服务
* 提供设备的CRUD操作和状态管理
*/
@Service
public class DeviceService {

@Autowired
private DeviceMapper deviceMapper;

@Autowired
private DeviceDataMapper deviceDataMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 注册设备
* 新设备接入时调用
*/
@Transactional
public void registerDevice(Device device) {
// 1. 检查设备是否已存在
Device existingDevice = deviceMapper.selectByDeviceId(device.getDeviceId());
if (existingDevice != null) {
throw new DeviceAlreadyExistsException("设备已存在: " + device.getDeviceId());
}

// 2. 设置设备状态
device.setStatus("OFFLINE");
device.setCreateTime(new Date());
device.setUpdateTime(new Date());

// 3. 保存设备信息
deviceMapper.insert(device);

// 4. 发送设备注册事件
DeviceRegisterEvent event = new DeviceRegisterEvent();
event.setDeviceId(device.getDeviceId());
event.setDeviceType(device.getDeviceType());
event.setRegisterTime(new Date());

kafkaTemplate.send("device.register", device.getDeviceId(), JSON.toJSONString(event));

// 5. 更新设备缓存
updateDeviceCache(device);

log.info("设备注册成功: deviceId={}, deviceName={}",
device.getDeviceId(), device.getDeviceName());
}

/**
* 更新设备状态
* 设备上线/下线时调用
*/
public void updateDeviceStatus(String deviceId, String status) {
// 1. 更新数据库
Device device = deviceMapper.selectByDeviceId(deviceId);
if (device != null) {
device.setStatus(status);
device.setUpdateTime(new Date());
if ("ONLINE".equals(status)) {
device.setLastHeartbeat(new Date());
}
deviceMapper.updateById(device);

// 2. 更新缓存
updateDeviceCache(device);

// 3. 发送状态变更事件
DeviceStatusEvent event = new DeviceStatusEvent();
event.setDeviceId(deviceId);
event.setOldStatus(device.getStatus());
event.setNewStatus(status);
event.setTimestamp(new Date());

kafkaTemplate.send("device.status", deviceId, JSON.toJSONString(event));

log.info("设备状态更新: deviceId={}, status={}", deviceId, status);
}
}

/**
* 处理设备心跳
* 设备定期发送心跳保持连接
*/
public void handleHeartbeat(String deviceId) {
// 1. 更新心跳时间
Device device = deviceMapper.selectByDeviceId(deviceId);
if (device != null) {
device.setLastHeartbeat(new Date());
device.setUpdateTime(new Date());
deviceMapper.updateById(device);

// 2. 更新缓存
updateDeviceCache(device);

// 3. 检查设备状态
if (!"ONLINE".equals(device.getStatus())) {
updateDeviceStatus(deviceId, "ONLINE");
}
}
}

/**
* 获取设备信息
* 先查缓存,再查数据库
*/
public Device getDevice(String deviceId) {
// 1. 先查缓存
String cacheKey = "device:" + deviceId;
Device device = (Device) redisTemplate.opsForValue().get(cacheKey);
if (device != null) {
return device;
}

// 2. 再查数据库
device = deviceMapper.selectByDeviceId(deviceId);
if (device != null) {
// 3. 写入缓存
redisTemplate.opsForValue().set(cacheKey, device, 30, TimeUnit.MINUTES);
}

return device;
}

/**
* 批量获取设备信息
*/
public List<Device> getDevices(List<String> deviceIds) {
List<Device> devices = new ArrayList<>();
List<String> notInCacheIds = new ArrayList<>();

// 1. 批量查缓存
for (String deviceId : deviceIds) {
Device device = (Device) redisTemplate.opsForValue().get("device:" + deviceId);
if (device != null) {
devices.add(device);
} else {
notInCacheIds.add(deviceId);
}
}

// 2. 批量查数据库
if (!notInCacheIds.isEmpty()) {
List<Device> dbDevices = deviceMapper.selectByDeviceIds(notInCacheIds);
devices.addAll(dbDevices);

// 3. 批量写入缓存
for (Device device : dbDevices) {
redisTemplate.opsForValue().set("device:" + device.getDeviceId(),
device, 30, TimeUnit.MINUTES);
}
}

return devices;
}

/**
* 更新设备缓存
*/
private void updateDeviceCache(Device device) {
String cacheKey = "device:" + device.getDeviceId();
redisTemplate.opsForValue().set(cacheKey, device, 30, TimeUnit.MINUTES);
}
}

4. 消息处理架构

4.1 MQTT消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/**
* MQTT消息处理器
* 处理设备通过MQTT协议发送的消息
*/
@Component
public class MqttMessageHandler {

@Autowired
private DeviceService deviceService;

@Autowired
private DeviceDataService deviceDataService;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 处理设备数据消息
* 设备上报数据时调用
*/
@EventListener
public void handleDeviceData(MqttMessageEvent event) {
try {
String topic = event.getTopic();
String payload = event.getPayload();

// 1. 解析消息
DeviceDataMessage message = JSON.parseObject(payload, DeviceDataMessage.class);

// 2. 验证设备
Device device = deviceService.getDevice(message.getDeviceId());
if (device == null) {
log.warn("设备不存在: {}", message.getDeviceId());
return;
}

// 3. 处理数据
processDeviceData(message);

// 4. 发送到Kafka
kafkaTemplate.send("device.data", message.getDeviceId(), payload);

log.info("处理设备数据: deviceId={}, dataType={}",
message.getDeviceId(), message.getDataType());

} catch (Exception e) {
log.error("处理设备数据失败: {}", e.getMessage(), e);
}
}

/**
* 处理设备心跳消息
*/
@EventListener
public void handleHeartbeat(MqttMessageEvent event) {
try {
String topic = event.getTopic();
String payload = event.getPayload();

// 解析心跳消息
HeartbeatMessage message = JSON.parseObject(payload, HeartbeatMessage.class);

// 更新设备心跳
deviceService.handleHeartbeat(message.getDeviceId());

log.debug("处理设备心跳: deviceId={}", message.getDeviceId());

} catch (Exception e) {
log.error("处理设备心跳失败: {}", e.getMessage(), e);
}
}

/**
* 处理设备状态消息
*/
@EventListener
public void handleDeviceStatus(MqttMessageEvent event) {
try {
String topic = event.getTopic();
String payload = event.getPayload();

// 解析状态消息
DeviceStatusMessage message = JSON.parseObject(payload, DeviceStatusMessage.class);

// 更新设备状态
deviceService.updateDeviceStatus(message.getDeviceId(), message.getStatus());

log.info("处理设备状态: deviceId={}, status={}",
message.getDeviceId(), message.getStatus());

} catch (Exception e) {
log.error("处理设备状态失败: {}", e.getMessage(), e);
}
}

/**
* 处理设备数据
*/
private void processDeviceData(DeviceDataMessage message) {
// 1. 数据验证
if (!validateDeviceData(message)) {
log.warn("设备数据验证失败: deviceId={}", message.getDeviceId());
return;
}

// 2. 数据清洗
DeviceData deviceData = cleanDeviceData(message);

// 3. 保存数据
deviceDataService.saveDeviceData(deviceData);

// 4. 实时告警检查
checkRealTimeAlarm(deviceData);
}

/**
* 验证设备数据
*/
private boolean validateDeviceData(DeviceDataMessage message) {
// 检查必要字段
if (StringUtils.isEmpty(message.getDeviceId()) ||
StringUtils.isEmpty(message.getDataType()) ||
StringUtils.isEmpty(message.getDataValue())) {
return false;
}

// 检查数据值格式
try {
Double.parseDouble(message.getDataValue());
} catch (NumberFormatException e) {
return false;
}

return true;
}

/**
* 清洗设备数据
*/
private DeviceData cleanDeviceData(DeviceDataMessage message) {
DeviceData deviceData = new DeviceData();
deviceData.setDeviceId(message.getDeviceId());
deviceData.setDataType(message.getDataType());
deviceData.setDataValue(message.getDataValue());
deviceData.setUnit(message.getUnit());
deviceData.setTimestamp(new Date());
deviceData.setQuality("GOOD");
deviceData.setLocation(message.getLocation());

return deviceData;
}

/**
* 实时告警检查
*/
private void checkRealTimeAlarm(DeviceData deviceData) {
// 检查数据是否超出阈值
if (isDataOutOfRange(deviceData)) {
// 发送告警消息
AlarmMessage alarm = new AlarmMessage();
alarm.setDeviceId(deviceData.getDeviceId());
alarm.setDataType(deviceData.getDataType());
alarm.setDataValue(deviceData.getDataValue());
alarm.setAlarmLevel("HIGH");
alarm.setAlarmTime(new Date());
alarm.setDescription("数据超出正常范围");

kafkaTemplate.send("device.alarm", deviceData.getDeviceId(),
JSON.toJSONString(alarm));
}
}

/**
* 检查数据是否超出范围
*/
private boolean isDataOutOfRange(DeviceData deviceData) {
// 根据设备类型和数据类型检查阈值
// 这里简化处理,实际应该从配置中读取阈值
try {
double value = Double.parseDouble(deviceData.getDataValue());
return value > 100.0 || value < 0.0;
} catch (NumberFormatException e) {
return false;
}
}
}

4.2 Kafka消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/**
* Kafka消息消费者
* 处理设备数据、状态、告警等消息
*/
@Component
public class KafkaMessageConsumer {

@Autowired
private DeviceDataService deviceDataService;

@Autowired
private AlarmService alarmService;

@Autowired
private NotificationService notificationService;

/**
* 消费设备数据消息
*/
@KafkaListener(topics = "device.data", groupId = "device-data-group")
public void consumeDeviceData(String message) {
try {
DeviceDataMessage dataMessage = JSON.parseObject(message, DeviceDataMessage.class);

// 1. 数据预处理
DeviceData deviceData = preprocessDeviceData(dataMessage);

// 2. 批量保存数据
deviceDataService.batchSaveDeviceData(Arrays.asList(deviceData));

// 3. 更新设备统计
updateDeviceStatistics(dataMessage.getDeviceId());

log.debug("消费设备数据: deviceId={}", dataMessage.getDeviceId());

} catch (Exception e) {
log.error("消费设备数据失败: {}", e.getMessage(), e);
}
}

/**
* 消费设备状态消息
*/
@KafkaListener(topics = "device.status", groupId = "device-status-group")
public void consumeDeviceStatus(String message) {
try {
DeviceStatusEvent statusEvent = JSON.parseObject(message, DeviceStatusEvent.class);

// 1. 更新设备状态统计
updateDeviceStatusStatistics(statusEvent);

// 2. 发送状态变更通知
if ("OFFLINE".equals(statusEvent.getNewStatus())) {
notificationService.sendDeviceOfflineNotification(statusEvent.getDeviceId());
}

log.info("消费设备状态: deviceId={}, status={}",
statusEvent.getDeviceId(), statusEvent.getNewStatus());

} catch (Exception e) {
log.error("消费设备状态失败: {}", e.getMessage(), e);
}
}

/**
* 消费设备告警消息
*/
@KafkaListener(topics = "device.alarm", groupId = "device-alarm-group")
public void consumeDeviceAlarm(String message) {
try {
AlarmMessage alarmMessage = JSON.parseObject(message, AlarmMessage.class);

// 1. 保存告警记录
alarmService.saveAlarm(alarmMessage);

// 2. 发送告警通知
notificationService.sendAlarmNotification(alarmMessage);

// 3. 更新告警统计
updateAlarmStatistics(alarmMessage);

log.warn("消费设备告警: deviceId={}, level={}",
alarmMessage.getDeviceId(), alarmMessage.getAlarmLevel());

} catch (Exception e) {
log.error("消费设备告警失败: {}", e.getMessage(), e);
}
}

/**
* 预处理设备数据
*/
private DeviceData preprocessDeviceData(DeviceDataMessage message) {
DeviceData deviceData = new DeviceData();
deviceData.setDeviceId(message.getDeviceId());
deviceData.setDataType(message.getDataType());
deviceData.setDataValue(message.getDataValue());
deviceData.setUnit(message.getUnit());
deviceData.setTimestamp(new Date());
deviceData.setQuality("GOOD");
deviceData.setLocation(message.getLocation());

return deviceData;
}

/**
* 更新设备统计
*/
private void updateDeviceStatistics(String deviceId) {
// 更新设备数据统计信息
// 这里可以更新Redis中的统计数据
}

/**
* 更新设备状态统计
*/
private void updateDeviceStatusStatistics(DeviceStatusEvent event) {
// 更新设备状态统计信息
}

/**
* 更新告警统计
*/
private void updateAlarmStatistics(AlarmMessage alarm) {
// 更新告警统计信息
}
}

5. 数据采集与存储

5.1 数据采集服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
* 数据采集服务
* 负责采集和处理设备数据
*/
@Service
public class DataCollectionService {

@Autowired
private DeviceDataMapper deviceDataMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 批量保存设备数据
* 使用批量插入提高性能
*/
@Transactional
public void batchSaveDeviceData(List<DeviceData> deviceDataList) {
if (deviceDataList.isEmpty()) {
return;
}

// 1. 批量插入数据库
deviceDataMapper.batchInsert(deviceDataList);

// 2. 更新实时数据缓存
updateRealTimeDataCache(deviceDataList);

// 3. 发送数据到实时处理流
sendToRealTimeStream(deviceDataList);

log.info("批量保存设备数据: count={}", deviceDataList.size());
}

/**
* 获取设备历史数据
*/
public List<DeviceData> getDeviceHistoryData(String deviceId, Date startTime, Date endTime) {
// 1. 先查缓存
String cacheKey = "device:history:" + deviceId + ":" + startTime.getTime() + ":" + endTime.getTime();
List<DeviceData> cachedData = (List<DeviceData>) redisTemplate.opsForValue().get(cacheKey);
if (cachedData != null) {
return cachedData;
}

// 2. 查数据库
List<DeviceData> data = deviceDataMapper.selectByDeviceIdAndTimeRange(deviceId, startTime, endTime);

// 3. 写入缓存
redisTemplate.opsForValue().set(cacheKey, data, 10, TimeUnit.MINUTES);

return data;
}

/**
* 获取设备实时数据
*/
public DeviceData getDeviceRealTimeData(String deviceId) {
String cacheKey = "device:realtime:" + deviceId;
return (DeviceData) redisTemplate.opsForValue().get(cacheKey);
}

/**
* 更新实时数据缓存
*/
private void updateRealTimeDataCache(List<DeviceData> deviceDataList) {
for (DeviceData data : deviceDataList) {
String cacheKey = "device:realtime:" + data.getDeviceId();
redisTemplate.opsForValue().set(cacheKey, data, 5, TimeUnit.MINUTES);
}
}

/**
* 发送数据到实时处理流
*/
private void sendToRealTimeStream(List<DeviceData> deviceDataList) {
for (DeviceData data : deviceDataList) {
kafkaTemplate.send("device.data.stream", data.getDeviceId(), JSON.toJSONString(data));
}
}
}

6. 设备监控与告警

6.1 监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/**
* 设备监控服务
* 监控设备状态和性能
*/
@Service
public class DeviceMonitoringService {

@Autowired
private DeviceService deviceService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void checkDeviceHealth() {
// 1. 获取所有在线设备
List<Device> onlineDevices = deviceService.getOnlineDevices();

// 2. 检查设备心跳
for (Device device : onlineDevices) {
checkDeviceHeartbeat(device);
}

// 3. 更新监控统计
updateMonitoringStatistics();
}

/**
* 检查设备心跳
*/
private void checkDeviceHeartbeat(Device device) {
Date lastHeartbeat = device.getLastHeartbeat();
if (lastHeartbeat == null) {
return;
}

// 检查心跳是否超时(5分钟)
long timeout = 5 * 60 * 1000; // 5分钟
if (System.currentTimeMillis() - lastHeartbeat.getTime() > timeout) {
// 设备心跳超时,标记为离线
deviceService.updateDeviceStatus(device.getDeviceId(), "OFFLINE");

// 发送设备离线告警
sendDeviceOfflineAlarm(device);

log.warn("设备心跳超时: deviceId={}", device.getDeviceId());
}
}

/**
* 发送设备离线告警
*/
private void sendDeviceOfflineAlarm(Device device) {
AlarmMessage alarm = new AlarmMessage();
alarm.setDeviceId(device.getDeviceId());
alarm.setDataType("HEARTBEAT");
alarm.setDataValue("TIMEOUT");
alarm.setAlarmLevel("HIGH");
alarm.setAlarmTime(new Date());
alarm.setDescription("设备心跳超时,可能已离线");

// 发送到Kafka
kafkaTemplate.send("device.alarm", device.getDeviceId(), JSON.toJSONString(alarm));
}

/**
* 更新监控统计
*/
private void updateMonitoringStatistics() {
// 更新设备统计信息到Redis
long totalDevices = deviceService.getTotalDeviceCount();
long onlineDevices = deviceService.getOnlineDeviceCount();
long offlineDevices = totalDevices - onlineDevices;

Map<String, Object> stats = new HashMap<>();
stats.put("totalDevices", totalDevices);
stats.put("onlineDevices", onlineDevices);
stats.put("offlineDevices", offlineDevices);
stats.put("timestamp", System.currentTimeMillis());

redisTemplate.opsForValue().set("device:statistics", stats, 1, TimeUnit.HOURS);
}
}

7. 总结

本文详细介绍了支撑百万台设备的Java物联网架构设计,包括:

7.1 核心技术点

  1. 设备接入: MQTT协议、多种设备类型支持
  2. 消息处理: Kafka消息队列、实时数据处理
  3. 数据存储: 批量数据插入、多级缓存
  4. 设备管理: 设备生命周期管理、状态监控
  5. 监控告警: 实时监控、自动告警

7.2 架构优势

  1. 高并发: 支持百万台设备同时接入
  2. 高可用: 消息队列保证数据不丢失
  3. 可扩展: 水平扩展支持更多设备
  4. 实时性: 实时数据处理和告警

7.3 最佳实践

  1. 设备管理: 统一设备注册和状态管理
  2. 数据采集: 批量处理提高性能
  3. 消息处理: 异步处理保证系统响应
  4. 监控告警: 实时监控设备健康状态

通过以上架构设计,可以构建支撑百万台设备的物联网系统,满足大规模设备接入和数据处理的需求。