第367集设备实时信息处理Java微服务后端架构实战
|字数总计:3.5k|阅读时长:15分钟|阅读量:
设备实时信息处理Java微服务后端架构实战
1. 架构概述
设备实时信息处理系统是物联网平台的核心模块,需要支持高并发、低延迟的设备状态查询和实时状态推送。本篇文章将深入讲解如何基于Java微服务架构实现一个高性能、高可用的设备实时信息处理系统。
1.1 系统架构图
1 2 3 4 5 6 7 8 9 10 11
| 用户端 → 用户网关 → 设备服务 → 缓存/数据库 ↓ 身份认证 ↓ 读取缓存、数据库准实时信息 ↓ 返回请求结果
设备服务 → 内存缓存 ↓ 定时同步至缓存、数据库
|
1.2 核心组件
- 用户网关(User Gateway):负责用户请求的接入、身份认证、请求路由
- 设备服务(Device Service):负责设备状态管理、实时数据处理、缓存管理
- 缓存层(Redis):存储设备准实时状态信息
- 数据库(MySQL/MongoDB):持久化设备历史状态数据
- 内存缓存(本地缓存):存储设备最新实时状态
2. 用户网关服务实现
2.1 网关服务核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
@RestController @RequestMapping("/api/gateway") @Slf4j public class UserGatewayController {
@Autowired private AuthService authService; @Autowired private DeviceServiceClient deviceServiceClient; @Autowired private RedisTemplate<String, Object> redisTemplate;
@PostMapping("/device/realtime") public Result<DeviceRealtimeInfo> getDeviceRealtimeInfo( @RequestHeader("Authorization") String token, @RequestBody DeviceRealtimeRequest request) { try { UserInfo userInfo = authService.authenticate(token); if (userInfo == null) { return Result.error("身份认证失败"); } if (!authService.hasPermission(userInfo.getUserId(), request.getDeviceId())) { return Result.error("无权限访问该设备"); } DeviceRealtimeInfo deviceInfo = getDeviceRealtimeInfo(request.getDeviceId()); return Result.success(deviceInfo); } catch (Exception e) { log.error("获取设备实时信息失败: deviceId={}, error={}", request.getDeviceId(), e.getMessage(), e); return Result.error("获取设备实时信息失败"); } }
private DeviceRealtimeInfo getDeviceRealtimeInfo(String deviceId) { String cacheKey = "device:realtime:" + deviceId; DeviceRealtimeInfo deviceInfo = (DeviceRealtimeInfo) redisTemplate.opsForValue().get(cacheKey); if (deviceInfo != null) { log.debug("从缓存获取设备信息: deviceId={}", deviceId); return deviceInfo; } log.debug("缓存未命中,查询数据库: deviceId={}", deviceId); deviceInfo = deviceServiceClient.getDeviceRealtimeInfo(deviceId); if (deviceInfo != null) { redisTemplate.opsForValue().set(cacheKey, deviceInfo, 5, TimeUnit.MINUTES); } return deviceInfo; }
@PostMapping("/device/realtime/batch") public Result<List<DeviceRealtimeInfo>> batchGetDeviceRealtimeInfo( @RequestHeader("Authorization") String token, @RequestBody BatchDeviceRealtimeRequest request) { try { UserInfo userInfo = authService.authenticate(token); if (userInfo == null) { return Result.error("身份认证失败"); } List<DeviceRealtimeInfo> deviceInfoList = new ArrayList<>(); for (String deviceId : request.getDeviceIds()) { DeviceRealtimeInfo deviceInfo = getDeviceRealtimeInfo(deviceId); if (deviceInfo != null) { deviceInfoList.add(deviceInfo); } } return Result.success(deviceInfoList); } catch (Exception e) { log.error("批量获取设备实时信息失败: error={}", e.getMessage(), e); return Result.error("批量获取设备实时信息失败"); } } }
|
2.2 身份认证服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
@Service @Slf4j public class AuthService {
@Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired private UserServiceClient userServiceClient;
public UserInfo authenticate(String token) { try { String userId = parseToken(token); if (StringUtils.isEmpty(userId)) { return null; } String userCacheKey = "user:info:" + userId; UserInfo userInfo = (UserInfo) redisTemplate.opsForValue().get(userCacheKey); if (userInfo != null) { return userInfo; } userInfo = userServiceClient.getUserInfo(userId); if (userInfo != null) { redisTemplate.opsForValue().set(userCacheKey, userInfo, 30, TimeUnit.MINUTES); } return userInfo; } catch (Exception e) { log.error("身份认证失败: token={}, error={}", token, e.getMessage(), e); return null; } }
public boolean hasPermission(String userId, String deviceId) { try { String permissionKey = "user:device:permission:" + userId + ":" + deviceId; Boolean hasPermission = (Boolean) redisTemplate.opsForValue().get(permissionKey); if (hasPermission != null) { return hasPermission; } hasPermission = userServiceClient.checkDevicePermission(userId, deviceId); redisTemplate.opsForValue().set(permissionKey, hasPermission, 10, TimeUnit.MINUTES); return hasPermission; } catch (Exception e) { log.error("权限校验失败: userId={}, deviceId={}, error={}", userId, deviceId, e.getMessage(), e); return false; } }
private String parseToken(String token) { if (StringUtils.isEmpty(token) || !token.startsWith("Bearer ")) { return null; } String jwtToken = token.substring(7); return extractUserIdFromJWT(jwtToken); }
private String extractUserIdFromJWT(String jwtToken) { return "user123"; } }
|
3. 设备服务实现
3.1 设备服务核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
@Service @Slf4j public class DeviceService {
@Autowired private DeviceMapper deviceMapper; @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired private DeviceRealtimeCache deviceRealtimeCache; @Autowired private ScheduledExecutorService scheduledExecutor;
public void pushDeviceRealtimeStatus(DeviceRealtimeStatus status) { try { String deviceId = status.getDeviceId(); deviceRealtimeCache.updateDeviceStatus(deviceId, status); log.info("设备实时状态已更新到内存: deviceId={}, status={}", deviceId, status.getStatus()); } catch (Exception e) { log.error("推送设备实时状态失败: deviceId={}, error={}", status.getDeviceId(), e.getMessage(), e); } }
public void cacheDeviceRealtimeStatusToMemory(String deviceId, DeviceRealtimeStatus status) { deviceRealtimeCache.updateDeviceStatus(deviceId, status); }
@Scheduled(fixedRate = 30000) public void syncDeviceStatusToCacheAndDatabase() { try { Map<String, DeviceRealtimeStatus> deviceStatusMap = deviceRealtimeCache.getAllDeviceStatus(); if (deviceStatusMap.isEmpty()) { return; } syncToRedisCache(deviceStatusMap); syncToDatabase(deviceStatusMap); log.info("设备状态同步完成: count={}", deviceStatusMap.size()); } catch (Exception e) { log.error("同步设备状态失败: error={}", e.getMessage(), e); } }
private void syncToRedisCache(Map<String, DeviceRealtimeStatus> deviceStatusMap) { for (Map.Entry<String, DeviceRealtimeStatus> entry : deviceStatusMap.entrySet()) { String deviceId = entry.getKey(); DeviceRealtimeStatus status = entry.getValue(); String cacheKey = "device:realtime:" + deviceId; DeviceRealtimeInfo deviceInfo = convertToDeviceInfo(status); redisTemplate.opsForValue().set(cacheKey, deviceInfo, 5, TimeUnit.MINUTES); } }
private void syncToDatabase(Map<String, DeviceRealtimeStatus> deviceStatusMap) { List<DeviceStatusRecord> statusRecords = new ArrayList<>(); for (Map.Entry<String, DeviceRealtimeStatus> entry : deviceStatusMap.entrySet()) { String deviceId = entry.getKey(); DeviceRealtimeStatus status = entry.getValue(); DeviceStatusRecord record = new DeviceStatusRecord(); record.setDeviceId(deviceId); record.setStatus(status.getStatus()); record.setData(status.getData()); record.setTimestamp(status.getTimestamp()); record.setCreateTime(new Date()); statusRecords.add(record); } if (!statusRecords.isEmpty()) { deviceMapper.batchInsertDeviceStatus(statusRecords); } }
public DeviceRealtimeInfo getDeviceRealtimeInfo(String deviceId) { DeviceRealtimeStatus status = deviceRealtimeCache.getDeviceStatus(deviceId); if (status != null) { return convertToDeviceInfo(status); } String cacheKey = "device:realtime:" + deviceId; DeviceRealtimeInfo deviceInfo = (DeviceRealtimeInfo) redisTemplate.opsForValue().get(cacheKey); if (deviceInfo != null) { return deviceInfo; } DeviceStatusRecord record = deviceMapper.selectLatestDeviceStatus(deviceId); if (record != null) { deviceInfo = convertToDeviceInfo(record); redisTemplate.opsForValue().set(cacheKey, deviceInfo, 5, TimeUnit.MINUTES); return deviceInfo; } return null; }
private DeviceRealtimeInfo convertToDeviceInfo(DeviceRealtimeStatus status) { DeviceRealtimeInfo info = new DeviceRealtimeInfo(); info.setDeviceId(status.getDeviceId()); info.setStatus(status.getStatus()); info.setData(status.getData()); info.setTimestamp(status.getTimestamp()); return info; }
private DeviceRealtimeInfo convertToDeviceInfo(DeviceStatusRecord record) { DeviceRealtimeInfo info = new DeviceRealtimeInfo(); info.setDeviceId(record.getDeviceId()); info.setStatus(record.getStatus()); info.setData(record.getData()); info.setTimestamp(record.getTimestamp()); return info; } }
|
3.2 设备实时状态内存缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
|
@Component @Slf4j public class DeviceRealtimeCache {
private final Cache<String, DeviceRealtimeStatus> cache = Caffeine.newBuilder() .maximumSize(100000) .expireAfterWrite(5, TimeUnit.MINUTES) .recordStats() .build();
public void updateDeviceStatus(String deviceId, DeviceRealtimeStatus status) { cache.put(deviceId, status); }
public DeviceRealtimeStatus getDeviceStatus(String deviceId) { return cache.getIfPresent(deviceId); }
public Map<String, DeviceRealtimeStatus> getAllDeviceStatus() { return cache.asMap(); }
public void removeDeviceStatus(String deviceId) { cache.invalidate(deviceId); }
public CacheStats getCacheStats() { return cache.stats(); } }
|
3.3 设备状态推送接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
@RestController @RequestMapping("/api/device") @Slf4j public class DeviceStatusPushController {
@Autowired private DeviceService deviceService;
@PostMapping("/status/push") public Result<Void> pushDeviceStatus(@RequestBody DeviceRealtimeStatus status) { try { deviceService.pushDeviceRealtimeStatus(status); deviceService.cacheDeviceRealtimeStatusToMemory(status.getDeviceId(), status); return Result.success(); } catch (Exception e) { log.error("推送设备状态失败: deviceId={}, error={}", status.getDeviceId(), e.getMessage(), e); return Result.error("推送设备状态失败"); } }
@PostMapping("/status/push/batch") public Result<Void> batchPushDeviceStatus(@RequestBody List<DeviceRealtimeStatus> statusList) { try { for (DeviceRealtimeStatus status : statusList) { deviceService.pushDeviceRealtimeStatus(status); deviceService.cacheDeviceRealtimeStatusToMemory(status.getDeviceId(), status); } return Result.success(); } catch (Exception e) { log.error("批量推送设备状态失败: error={}", e.getMessage(), e); return Result.error("批量推送设备状态失败"); } } }
|
4. 数据模型定义
4.1 设备实时状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
@Data public class DeviceRealtimeStatus {
private String deviceId;
private String status;
private String data;
private Date timestamp; }
|
4.2 设备实时信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
@Data public class DeviceRealtimeInfo {
private String deviceId;
private String status;
private String data;
private Date timestamp; }
|
4.3 设备状态记录(数据库)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
|
@Data @TableName("device_status_record") public class DeviceStatusRecord {
@TableId(type = IdType.AUTO) private Long id;
private String deviceId;
private String status;
private String data;
private Date timestamp;
private Date createTime; }
|
5. 数据库Mapper实现
5.1 DeviceMapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
@Mapper public interface DeviceMapper extends BaseMapper<DeviceStatusRecord> {
@Select("SELECT * FROM device_status_record " + "WHERE device_id = #{deviceId} " + "ORDER BY timestamp DESC LIMIT 1") DeviceStatusRecord selectLatestDeviceStatus(@Param("deviceId") String deviceId);
@Insert("<script>" + "INSERT INTO device_status_record (device_id, status, data, timestamp, create_time) " + "VALUES " + "<foreach collection='records' item='record' separator=','>" + "(#{record.deviceId}, #{record.status}, #{record.data}, #{record.timestamp}, #{record.createTime})" + "</foreach>" + "</script>") void batchInsertDeviceStatus(@Param("records") List<DeviceStatusRecord> records); }
|
6. 配置类
6.1 Redis配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
@Configuration public class RedisConfig {
@Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(mapper); template.setValueSerializer(serializer); template.setKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(serializer); template.afterPropertiesSet(); return template; } }
|
6.2 定时任务配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
@Configuration @EnableScheduling public class ScheduledConfig implements SchedulingConfigurer {
@Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.setScheduler(taskExecutor()); }
@Bean(destroyMethod = "shutdown") public Executor taskExecutor() { return Executors.newScheduledThreadPool(10); } }
|
7. 性能优化策略
7.1 多级缓存策略
系统采用三级缓存架构:
- 内存缓存(Caffeine):最快,存储设备最新状态,延迟<1ms
- Redis缓存:次快,存储准实时状态,延迟<5ms
- 数据库:最慢,存储历史状态,延迟10-50ms
7.2 批量处理优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
@Service public class BatchProcessService {
@Autowired private RedisTemplate<String, Object> redisTemplate;
public List<DeviceRealtimeInfo> batchGetDeviceInfo(List<String> deviceIds) { List<Object> results = redisTemplate.executePipelined( new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { StringRedisConnection stringRedisConn = (StringRedisConnection) connection; for (String deviceId : deviceIds) { String key = "device:realtime:" + deviceId; stringRedisConn.get(key); } return null; } } ); List<DeviceRealtimeInfo> deviceInfoList = new ArrayList<>(); for (Object result : results) { if (result != null) { deviceInfoList.add((DeviceRealtimeInfo) result); } } return deviceInfoList; } }
|
7.3 异步处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
@Service @Slf4j public class AsyncProcessService {
@Autowired private DeviceService deviceService;
@Async("deviceTaskExecutor") public void asyncSyncDeviceStatus(DeviceRealtimeStatus status) { try { deviceService.syncToDatabase(Collections.singletonMap( status.getDeviceId(), status)); } catch (Exception e) { log.error("异步同步设备状态失败: deviceId={}, error={}", status.getDeviceId(), e.getMessage(), e); } } }
|
8. 监控与告警
8.1 缓存监控
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
@Service @Slf4j public class CacheMonitorService {
@Autowired private DeviceRealtimeCache deviceRealtimeCache;
@Scheduled(fixedRate = 60000) public void monitorCache() { CacheStats stats = deviceRealtimeCache.getCacheStats(); log.info("缓存统计: hitCount={}, missCount={}, hitRate={}, evictionCount={}", stats.hitCount(), stats.missCount(), stats.hitRate(), stats.evictionCount()); if (stats.hitRate() < 0.8) { log.warn("缓存命中率过低: hitRate={}", stats.hitRate()); } } }
|
9. 总结
本文详细介绍了设备实时信息处理的Java微服务架构实现,包括:
- 用户网关服务:负责用户请求接入、身份认证、请求路由
- 设备服务:负责设备状态管理、实时数据处理、缓存管理
- 多级缓存架构:内存缓存 + Redis缓存 + 数据库
- 定时同步机制:定期将内存状态同步到缓存和数据库
- 性能优化:批量处理、异步处理、Pipeline优化
该架构具有以下优势:
- 高性能:多级缓存架构,查询延迟<5ms
- 高可用:缓存降级,数据库兜底
- 可扩展:微服务架构,支持水平扩展
- 实时性:内存缓存保证最新状态实时可用
通过本文的实战代码,可以快速搭建一个高性能的设备实时信息处理系统。