第84集支撑百万台设备的Java物联网架构实战
|字数总计:3.6k|阅读时长:17分钟|阅读量:
1. 物联网架构概述
在物联网时代,支撑百万台设备的系统架构是企业数字化转型的核心。本文将详细介绍如何设计、实现和优化支撑百万台设备的Java物联网架构,包括设备接入、数据采集、消息处理、设备管理等完整解决方案。
1.1 核心挑战
- 设备接入: 支持多种协议和设备的统一接入
- 数据采集: 高效采集和处理海量设备数据
- 消息处理: 实时处理设备消息和指令
- 设备管理: 统一管理百万台设备的生命周期
- 系统扩展: 支持设备数量的水平扩展
1.2 技术架构
1 2 3 4 5
| 设备端 → 协议网关 → 消息队列 → 数据处理 → 业务服务 ↓ ↓ ↓ ↓ ↓ 传感器 → MQTT/HTTP → Kafka → 数据清洗 → 设备管理 ↓ ↓ ↓ ↓ ↓ 控制器 → 协议转换 → 消息分发 → 数据存储 → 监控告警
|
2. 设备接入架构
2.1 Maven依赖配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.82.Final</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.5.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.2.8</version> </dependency> </dependencies>
|
2.2 应用配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| server: port: 8080
spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer batch-size: 16384 buffer-memory: 33554432 consumer: group-id: iot-device-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: latest max-poll-records: 500 redis: host: localhost port: 6379 database: 0 timeout: 5000 lettuce: pool: max-active: 200 max-idle: 20 min-idle: 5
mqtt: broker-url: tcp://localhost:1883 username: admin password: admin client-id: iot-server qos: 1 retain: false
|
3. 设备管理服务
3.1 设备实体类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
@Data @TableName("iot_device") public class Device { @TableId(type = IdType.AUTO) private Long id; private String deviceId; private String deviceName; private String deviceType; private String protocol; private String ipAddress; private Integer port; private String status; private Date lastHeartbeat; private Date createTime; private Date updateTime; private String description; private String location; private String manufacturer; private String model; private String version; }
@Data @TableName("iot_device_data") public class DeviceData { @TableId(type = IdType.AUTO) private Long id; private String deviceId; private String dataType; private String dataValue; private String unit; private Date timestamp; private String quality; private String location; }
|
3.2 设备管理服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
|
@Service public class DeviceService {
@Autowired private DeviceMapper deviceMapper; @Autowired private DeviceDataMapper deviceDataMapper; @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired private KafkaTemplate<String, String> kafkaTemplate;
@Transactional public void registerDevice(Device device) { Device existingDevice = deviceMapper.selectByDeviceId(device.getDeviceId()); if (existingDevice != null) { throw new DeviceAlreadyExistsException("设备已存在: " + device.getDeviceId()); } device.setStatus("OFFLINE"); device.setCreateTime(new Date()); device.setUpdateTime(new Date()); deviceMapper.insert(device); DeviceRegisterEvent event = new DeviceRegisterEvent(); event.setDeviceId(device.getDeviceId()); event.setDeviceType(device.getDeviceType()); event.setRegisterTime(new Date()); kafkaTemplate.send("device.register", device.getDeviceId(), JSON.toJSONString(event)); updateDeviceCache(device); log.info("设备注册成功: deviceId={}, deviceName={}", device.getDeviceId(), device.getDeviceName()); }
public void updateDeviceStatus(String deviceId, String status) { Device device = deviceMapper.selectByDeviceId(deviceId); if (device != null) { device.setStatus(status); device.setUpdateTime(new Date()); if ("ONLINE".equals(status)) { device.setLastHeartbeat(new Date()); } deviceMapper.updateById(device); updateDeviceCache(device); DeviceStatusEvent event = new DeviceStatusEvent(); event.setDeviceId(deviceId); event.setOldStatus(device.getStatus()); event.setNewStatus(status); event.setTimestamp(new Date()); kafkaTemplate.send("device.status", deviceId, JSON.toJSONString(event)); log.info("设备状态更新: deviceId={}, status={}", deviceId, status); } }
public void handleHeartbeat(String deviceId) { Device device = deviceMapper.selectByDeviceId(deviceId); if (device != null) { device.setLastHeartbeat(new Date()); device.setUpdateTime(new Date()); deviceMapper.updateById(device); updateDeviceCache(device); if (!"ONLINE".equals(device.getStatus())) { updateDeviceStatus(deviceId, "ONLINE"); } } }
public Device getDevice(String deviceId) { String cacheKey = "device:" + deviceId; Device device = (Device) redisTemplate.opsForValue().get(cacheKey); if (device != null) { return device; } device = deviceMapper.selectByDeviceId(deviceId); if (device != null) { redisTemplate.opsForValue().set(cacheKey, device, 30, TimeUnit.MINUTES); } return device; }
public List<Device> getDevices(List<String> deviceIds) { List<Device> devices = new ArrayList<>(); List<String> notInCacheIds = new ArrayList<>(); for (String deviceId : deviceIds) { Device device = (Device) redisTemplate.opsForValue().get("device:" + deviceId); if (device != null) { devices.add(device); } else { notInCacheIds.add(deviceId); } } if (!notInCacheIds.isEmpty()) { List<Device> dbDevices = deviceMapper.selectByDeviceIds(notInCacheIds); devices.addAll(dbDevices); for (Device device : dbDevices) { redisTemplate.opsForValue().set("device:" + device.getDeviceId(), device, 30, TimeUnit.MINUTES); } } return devices; }
private void updateDeviceCache(Device device) { String cacheKey = "device:" + device.getDeviceId(); redisTemplate.opsForValue().set(cacheKey, device, 30, TimeUnit.MINUTES); } }
|
4. 消息处理架构
4.1 MQTT消息处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
|
@Component public class MqttMessageHandler {
@Autowired private DeviceService deviceService; @Autowired private DeviceDataService deviceDataService; @Autowired private KafkaTemplate<String, String> kafkaTemplate;
@EventListener public void handleDeviceData(MqttMessageEvent event) { try { String topic = event.getTopic(); String payload = event.getPayload(); DeviceDataMessage message = JSON.parseObject(payload, DeviceDataMessage.class); Device device = deviceService.getDevice(message.getDeviceId()); if (device == null) { log.warn("设备不存在: {}", message.getDeviceId()); return; } processDeviceData(message); kafkaTemplate.send("device.data", message.getDeviceId(), payload); log.info("处理设备数据: deviceId={}, dataType={}", message.getDeviceId(), message.getDataType()); } catch (Exception e) { log.error("处理设备数据失败: {}", e.getMessage(), e); } }
@EventListener public void handleHeartbeat(MqttMessageEvent event) { try { String topic = event.getTopic(); String payload = event.getPayload(); HeartbeatMessage message = JSON.parseObject(payload, HeartbeatMessage.class); deviceService.handleHeartbeat(message.getDeviceId()); log.debug("处理设备心跳: deviceId={}", message.getDeviceId()); } catch (Exception e) { log.error("处理设备心跳失败: {}", e.getMessage(), e); } }
@EventListener public void handleDeviceStatus(MqttMessageEvent event) { try { String topic = event.getTopic(); String payload = event.getPayload(); DeviceStatusMessage message = JSON.parseObject(payload, DeviceStatusMessage.class); deviceService.updateDeviceStatus(message.getDeviceId(), message.getStatus()); log.info("处理设备状态: deviceId={}, status={}", message.getDeviceId(), message.getStatus()); } catch (Exception e) { log.error("处理设备状态失败: {}", e.getMessage(), e); } }
private void processDeviceData(DeviceDataMessage message) { if (!validateDeviceData(message)) { log.warn("设备数据验证失败: deviceId={}", message.getDeviceId()); return; } DeviceData deviceData = cleanDeviceData(message); deviceDataService.saveDeviceData(deviceData); checkRealTimeAlarm(deviceData); }
private boolean validateDeviceData(DeviceDataMessage message) { if (StringUtils.isEmpty(message.getDeviceId()) || StringUtils.isEmpty(message.getDataType()) || StringUtils.isEmpty(message.getDataValue())) { return false; } try { Double.parseDouble(message.getDataValue()); } catch (NumberFormatException e) { return false; } return true; }
private DeviceData cleanDeviceData(DeviceDataMessage message) { DeviceData deviceData = new DeviceData(); deviceData.setDeviceId(message.getDeviceId()); deviceData.setDataType(message.getDataType()); deviceData.setDataValue(message.getDataValue()); deviceData.setUnit(message.getUnit()); deviceData.setTimestamp(new Date()); deviceData.setQuality("GOOD"); deviceData.setLocation(message.getLocation()); return deviceData; }
private void checkRealTimeAlarm(DeviceData deviceData) { if (isDataOutOfRange(deviceData)) { AlarmMessage alarm = new AlarmMessage(); alarm.setDeviceId(deviceData.getDeviceId()); alarm.setDataType(deviceData.getDataType()); alarm.setDataValue(deviceData.getDataValue()); alarm.setAlarmLevel("HIGH"); alarm.setAlarmTime(new Date()); alarm.setDescription("数据超出正常范围"); kafkaTemplate.send("device.alarm", deviceData.getDeviceId(), JSON.toJSONString(alarm)); } }
private boolean isDataOutOfRange(DeviceData deviceData) { try { double value = Double.parseDouble(deviceData.getDataValue()); return value > 100.0 || value < 0.0; } catch (NumberFormatException e) { return false; } } }
|
4.2 Kafka消息处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
@Component public class KafkaMessageConsumer {
@Autowired private DeviceDataService deviceDataService; @Autowired private AlarmService alarmService; @Autowired private NotificationService notificationService;
@KafkaListener(topics = "device.data", groupId = "device-data-group") public void consumeDeviceData(String message) { try { DeviceDataMessage dataMessage = JSON.parseObject(message, DeviceDataMessage.class); DeviceData deviceData = preprocessDeviceData(dataMessage); deviceDataService.batchSaveDeviceData(Arrays.asList(deviceData)); updateDeviceStatistics(dataMessage.getDeviceId()); log.debug("消费设备数据: deviceId={}", dataMessage.getDeviceId()); } catch (Exception e) { log.error("消费设备数据失败: {}", e.getMessage(), e); } }
@KafkaListener(topics = "device.status", groupId = "device-status-group") public void consumeDeviceStatus(String message) { try { DeviceStatusEvent statusEvent = JSON.parseObject(message, DeviceStatusEvent.class); updateDeviceStatusStatistics(statusEvent); if ("OFFLINE".equals(statusEvent.getNewStatus())) { notificationService.sendDeviceOfflineNotification(statusEvent.getDeviceId()); } log.info("消费设备状态: deviceId={}, status={}", statusEvent.getDeviceId(), statusEvent.getNewStatus()); } catch (Exception e) { log.error("消费设备状态失败: {}", e.getMessage(), e); } }
@KafkaListener(topics = "device.alarm", groupId = "device-alarm-group") public void consumeDeviceAlarm(String message) { try { AlarmMessage alarmMessage = JSON.parseObject(message, AlarmMessage.class); alarmService.saveAlarm(alarmMessage); notificationService.sendAlarmNotification(alarmMessage); updateAlarmStatistics(alarmMessage); log.warn("消费设备告警: deviceId={}, level={}", alarmMessage.getDeviceId(), alarmMessage.getAlarmLevel()); } catch (Exception e) { log.error("消费设备告警失败: {}", e.getMessage(), e); } }
private DeviceData preprocessDeviceData(DeviceDataMessage message) { DeviceData deviceData = new DeviceData(); deviceData.setDeviceId(message.getDeviceId()); deviceData.setDataType(message.getDataType()); deviceData.setDataValue(message.getDataValue()); deviceData.setUnit(message.getUnit()); deviceData.setTimestamp(new Date()); deviceData.setQuality("GOOD"); deviceData.setLocation(message.getLocation()); return deviceData; }
private void updateDeviceStatistics(String deviceId) { }
private void updateDeviceStatusStatistics(DeviceStatusEvent event) { }
private void updateAlarmStatistics(AlarmMessage alarm) { } }
|
5. 数据采集与存储
5.1 数据采集服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
@Service public class DataCollectionService {
@Autowired private DeviceDataMapper deviceDataMapper; @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired private KafkaTemplate<String, String> kafkaTemplate;
@Transactional public void batchSaveDeviceData(List<DeviceData> deviceDataList) { if (deviceDataList.isEmpty()) { return; } deviceDataMapper.batchInsert(deviceDataList); updateRealTimeDataCache(deviceDataList); sendToRealTimeStream(deviceDataList); log.info("批量保存设备数据: count={}", deviceDataList.size()); }
public List<DeviceData> getDeviceHistoryData(String deviceId, Date startTime, Date endTime) { String cacheKey = "device:history:" + deviceId + ":" + startTime.getTime() + ":" + endTime.getTime(); List<DeviceData> cachedData = (List<DeviceData>) redisTemplate.opsForValue().get(cacheKey); if (cachedData != null) { return cachedData; } List<DeviceData> data = deviceDataMapper.selectByDeviceIdAndTimeRange(deviceId, startTime, endTime); redisTemplate.opsForValue().set(cacheKey, data, 10, TimeUnit.MINUTES); return data; }
public DeviceData getDeviceRealTimeData(String deviceId) { String cacheKey = "device:realtime:" + deviceId; return (DeviceData) redisTemplate.opsForValue().get(cacheKey); }
private void updateRealTimeDataCache(List<DeviceData> deviceDataList) { for (DeviceData data : deviceDataList) { String cacheKey = "device:realtime:" + data.getDeviceId(); redisTemplate.opsForValue().set(cacheKey, data, 5, TimeUnit.MINUTES); } }
private void sendToRealTimeStream(List<DeviceData> deviceDataList) { for (DeviceData data : deviceDataList) { kafkaTemplate.send("device.data.stream", data.getDeviceId(), JSON.toJSONString(data)); } } }
|
6. 设备监控与告警
6.1 监控服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
@Service public class DeviceMonitoringService {
@Autowired private DeviceService deviceService; @Autowired private RedisTemplate<String, Object> redisTemplate; @Scheduled(fixedRate = 60000) public void checkDeviceHealth() { List<Device> onlineDevices = deviceService.getOnlineDevices(); for (Device device : onlineDevices) { checkDeviceHeartbeat(device); } updateMonitoringStatistics(); }
private void checkDeviceHeartbeat(Device device) { Date lastHeartbeat = device.getLastHeartbeat(); if (lastHeartbeat == null) { return; } long timeout = 5 * 60 * 1000; if (System.currentTimeMillis() - lastHeartbeat.getTime() > timeout) { deviceService.updateDeviceStatus(device.getDeviceId(), "OFFLINE"); sendDeviceOfflineAlarm(device); log.warn("设备心跳超时: deviceId={}", device.getDeviceId()); } }
private void sendDeviceOfflineAlarm(Device device) { AlarmMessage alarm = new AlarmMessage(); alarm.setDeviceId(device.getDeviceId()); alarm.setDataType("HEARTBEAT"); alarm.setDataValue("TIMEOUT"); alarm.setAlarmLevel("HIGH"); alarm.setAlarmTime(new Date()); alarm.setDescription("设备心跳超时,可能已离线"); kafkaTemplate.send("device.alarm", device.getDeviceId(), JSON.toJSONString(alarm)); }
private void updateMonitoringStatistics() { long totalDevices = deviceService.getTotalDeviceCount(); long onlineDevices = deviceService.getOnlineDeviceCount(); long offlineDevices = totalDevices - onlineDevices; Map<String, Object> stats = new HashMap<>(); stats.put("totalDevices", totalDevices); stats.put("onlineDevices", onlineDevices); stats.put("offlineDevices", offlineDevices); stats.put("timestamp", System.currentTimeMillis()); redisTemplate.opsForValue().set("device:statistics", stats, 1, TimeUnit.HOURS); } }
|
7. 总结
本文详细介绍了支撑百万台设备的Java物联网架构设计,包括:
7.1 核心技术点
- 设备接入: MQTT协议、多种设备类型支持
- 消息处理: Kafka消息队列、实时数据处理
- 数据存储: 批量数据插入、多级缓存
- 设备管理: 设备生命周期管理、状态监控
- 监控告警: 实时监控、自动告警
7.2 架构优势
- 高并发: 支持百万台设备同时接入
- 高可用: 消息队列保证数据不丢失
- 可扩展: 水平扩展支持更多设备
- 实时性: 实时数据处理和告警
7.3 最佳实践
- 设备管理: 统一设备注册和状态管理
- 数据采集: 批量处理提高性能
- 消息处理: 异步处理保证系统响应
- 监控告警: 实时监控设备健康状态
通过以上架构设计,可以构建支撑百万台设备的物联网系统,满足大规模设备接入和数据处理的需求。