第367集设备实时信息处理Java微服务后端架构实战
|字数总计:3.5k|阅读时长:15分钟|阅读量:
设备实时信息处理Java微服务后端架构实战
1. 架构概述
设备实时信息处理系统是物联网平台的核心模块,需要支持高并发、低延迟的设备状态查询和实时状态推送。本篇文章将深入讲解如何基于Java微服务架构实现一个高性能、高可用的设备实时信息处理系统。
1.1 系统架构图
1 2 3 4 5 6 7 8 9 10 11
| 用户端 → 用户网关 → 设备服务 → 缓存/数据库 ↓ 身份认证 ↓ 读取缓存、数据库准实时信息 ↓ 返回请求结果
设备服务 → 内存缓存 ↓ 定时同步至缓存、数据库
|
1.2 核心组件
- 用户网关(User Gateway):负责用户请求的接入、身份认证、请求路由
- 设备服务(Device Service):负责设备状态管理、实时数据处理、缓存管理
- 缓存层(Redis):存储设备准实时状态信息
- 数据库(MySQL/MongoDB):持久化设备历史状态数据
- 内存缓存(本地缓存):存储设备最新实时状态
2. 用户网关服务实现
2.1 网关服务核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
@RestController @RequestMapping("/api/gateway") @Slf4j public class UserGatewayController {
@Autowired private AuthService authService; @Autowired private DeviceServiceClient deviceServiceClient; @Autowired private RedisTemplate<String, Object> redisTemplate;
@PostMapping("/device/realtime") public Result<DeviceRealtimeInfo> getDeviceRealtimeInfo( @RequestHeader("Authorization") String token, @RequestBody DeviceRealtimeRequest request) { try { UserInfo userInfo = authService.authenticate(token); if (userInfo == null) { return Result.error("身份认证失败"); } if (!authService.hasPermission(userInfo.getUserId(), request.getDeviceId())) { return Result.error("无权限访问该设备"); } DeviceRealtimeInfo deviceInfo = getDeviceRealtimeInfo(request.getDeviceId()); return Result.success(deviceInfo); } catch (Exception e) { log.error("获取设备实时信息失败: deviceId={}, error={}", request.getDeviceId(), e.getMessage(), e); return Result.error("获取设备实时信息失败"); } }
private DeviceRealtimeInfo getDeviceRealtimeInfo(String deviceId) { String cacheKey = "device:realtime:" + deviceId; DeviceRealtimeInfo deviceInfo = (DeviceRealtimeInfo) redisTemplate.opsForValue().get(cacheKey); if (deviceInfo != null) { log.debug("从缓存获取设备信息: deviceId={}", deviceId); return deviceInfo; } log.debug("缓存未命中,查询数据库: deviceId={}", deviceId); deviceInfo = deviceServiceClient.getDeviceRealtimeInfo(deviceId); if (deviceInfo != null) { redisTemplate.opsForValue().set(cacheKey, deviceInfo, 5, TimeUnit.MINUTES); } return deviceInfo; }
@PostMapping("/device/realtime/batch") public Result<List<DeviceRealtimeInfo>> batchGetDeviceRealtimeInfo( @RequestHeader("Authorization") String token, @RequestBody BatchDeviceRealtimeRequest request) { try { UserInfo userInfo = authService.authenticate(token); if (userInfo == null) { return Result.error("身份认证失败"); } List<DeviceRealtimeInfo> deviceInfoList = new ArrayList<>(); for (String deviceId : request.getDeviceIds()) { DeviceRealtimeInfo deviceInfo = getDeviceRealtimeInfo(deviceId); if (deviceInfo != null) { deviceInfoList.add(deviceInfo); } } return Result.success(deviceInfoList); } catch (Exception e) { log.error("批量获取设备实时信息失败: error={}", e.getMessage(), e); return Result.error("批量获取设备实时信息失败"); } } }
|
2.2 身份认证服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
@Service @Slf4j public class AuthService {
@Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired private UserServiceClient userServiceClient;
public UserInfo authenticate(String token) { try { String userId = parseToken(token); if (StringUtils.isEmpty(userId)) { return null; } String userCacheKey = "user:info:" + userId; UserInfo userInfo = (UserInfo) redisTemplate.opsForValue().get(userCacheKey); if (userInfo != null) { return userInfo; } userInfo = userServiceClient.getUserInfo(userId); if (userInfo != null) { redisTemplate.opsForValue().set(userCacheKey, userInfo, 30, TimeUnit.MINUTES); } return userInfo; } catch (Exception e) { log.error("身份认证失败: token={}, error={}", token, e.getMessage(), e); return null; } }
public boolean hasPermission(String userId, String deviceId) { try { String permissionKey = "user:device:permission:" + userId + ":" + deviceId; Boolean hasPermission = (Boolean) redisTemplate.opsForValue().get(permissionKey); if (hasPermission != null) { return hasPermission; } hasPermission = userServiceClient.checkDevicePermission(userId, deviceId); redisTemplate.opsForValue().set(permissionKey, hasPermission, 10, TimeUnit.MINUTES); return hasPermission; } catch (Exception e) { log.error("权限校验失败: userId={}, deviceId={}, error={}", userId, deviceId, e.getMessage(), e); return false; } }
private String parseToken(String token) { if (StringUtils.isEmpty(token) || !token.startsWith("Bearer ")) { return null; } String jwtToken = token.substring(7); return extractUserIdFromJWT(jwtToken); }
private String extractUserIdFromJWT(String jwtToken) { return "user123"; } }
|
3. 设备服务实现
3.1 设备服务核心代码

|
@Service @Slf4j public class DeviceService {
@Autowired private DeviceMapper deviceMapper; @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired private DeviceRealtimeCache deviceRealtimeCache; @Autowired private ScheduledExecutorService scheduledExecutor;
public void pushDeviceRealtimeStatus(DeviceRealtimeStatus status) { try { String deviceId = status.getDeviceId(); deviceRealtimeCache.updateDeviceStatus(deviceId, status); log.info("设备实时状态已更新到内存: deviceId={}, status={}", deviceId, status.getStatus()); } catch (Exception e) { log.error("推送设备实时状态失败: deviceId={}, error={}", status.getDeviceId(), e.getMessage(), e); } }
public void cacheDeviceRealtimeStatusToMemory(String deviceId, DeviceRealtimeStatus status) { deviceRealtimeCache.updateDeviceStatus(deviceId, status); }
@Scheduled(fixedRate = 30000) public void syncDeviceStatusToCacheAndDatabase() { try { Map<String, DeviceRealtimeStatus> deviceStatusMap = deviceRealtimeCache.getAllDeviceStatus(); if (deviceStatusMap.isEmpty()) { return; } syncToRedisCache(deviceStatusMap); syncToDatabase(deviceStatusMap); log.info("设备状态同步完成: count={}", deviceStatusMap.size()); } catch (Exception e) { log.error("同步设备状态失败: error={}", e.getMessage(), e); } }
private void syncToRedisCache(Map<String, DeviceRealtimeStatus> deviceStatusMap) { for (Map.Entry<String, DeviceRealtimeStatus> entry : deviceStatusMap.entrySet()) { String deviceId = entry.getKey(); DeviceRealtimeStatus status = entry.getValue(); String cacheKey = "device:realtime:" + deviceId; DeviceRealtimeInfo deviceInfo = convertToDeviceInfo(status); redisTemplate.opsForValue().set(cacheKey, deviceInfo, 5, TimeUnit.MINUTES); } }
private void syncToDatabase(Map<String, DeviceRealtimeStatus> deviceStatusMap) { List<DeviceStatusRecord> statusRecords = new ArrayList<>(); for (Map.Entry<String, DeviceRealtimeStatus> entry : deviceStatusMap.entrySet()) { String deviceId = entry.getKey(); DeviceRealtimeStatus status = entry.getValue(); DeviceStatusRecord record = new DeviceStatusRecord(); record.setDeviceId(deviceId); record.setStatus(status.getStatus()); record.setData(status.getData()); record.setTimestamp(status.getTimestamp()); record.setCreateTime(new Date()); statusRecords.add(record); } if (!statusRecords.isEmpty()) { deviceMapper.batchInsertDeviceStatus(statusRecords); } }
public DeviceRealtimeInfo getDeviceRealtimeInfo(String deviceId) { DeviceRealtimeStatus status = deviceRealtimeCache.getDeviceStatus(deviceId); if (status != null) { return convertToDeviceInfo(status); } String cacheKey = "device:realtime:" + deviceId; DeviceRealtimeInfo deviceInfo = (DeviceRealtimeInfo) redisTemplate.opsForValue().get(cacheKey); if (deviceInfo != null) { return deviceInfo; } DeviceStatusRecord record = deviceMapper.selectLatestDeviceStatus(deviceId); if (record != null) { deviceInfo = convertToDeviceInfo(record); redisTemplate.opsForValue().set(cacheKey, deviceInfo, 5, TimeUnit.MINUTES); return deviceInfo; } return null; }
private DeviceRealtimeInfo convertToDeviceInfo(DeviceRealtimeStatus status) { DeviceRealtimeInfo info = new DeviceRealtimeInfo(); info.setDeviceId(status.getDeviceId()); info.setStatus(status.getStatus()); info.setData(status.getData()); info.setTimestamp(status.getTimestamp()); return info; }
private DeviceRealtimeInfo convertToDeviceInfo(DeviceStatusRecord record) { DeviceRealtimeInfo info = new DeviceRealtimeInfo(); info.setDeviceId(record.getDeviceId()); info.setStatus(record.getStatus()); info.setData(record.getData()); info.setTimestamp(record.getTimestamp()); return info; } }
|
3.2 设备实时状态内存缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
|
@Component @Slf4j public class DeviceRealtimeCache {
private final Cache<String, DeviceRealtimeStatus> cache = Caffeine.newBuilder() .maximumSize(100000) .expireAfterWrite(5, TimeUnit.MINUTES) .recordStats() .build();
public void updateDeviceStatus(String deviceId, DeviceRealtimeStatus status) { cache.put(deviceId, status); }
public DeviceRealtimeStatus getDeviceStatus(String deviceId) { return cache.getIfPresent(deviceId); }
public Map<String, DeviceRealtimeStatus> getAllDeviceStatus() { return cache.asMap(); }
public void removeDeviceStatus(String deviceId) { cache.invalidate(deviceId); }
public CacheStats getCacheStats() { return cache.stats(); } }
|
3.3 设备状态推送接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
@RestController @RequestMapping("/api/device") @Slf4j public class DeviceStatusPushController {
@Autowired private DeviceService deviceService;
@PostMapping("/status/push") public Result<Void> pushDeviceStatus(@RequestBody DeviceRealtimeStatus status) { try { deviceService.pushDeviceRealtimeStatus(status); deviceService.cacheDeviceRealtimeStatusToMemory(status.getDeviceId(), status); return Result.success(); } catch (Exception e) { log.error("推送设备状态失败: deviceId={}, error={}", status.getDeviceId(), e.getMessage(), e); return Result.error("推送设备状态失败"); } }
@PostMapping("/status/push/batch") public Result<Void> batchPushDeviceStatus(@RequestBody List<DeviceRealtimeStatus> statusList) { try { for (DeviceRealtimeStatus status : statusList) { deviceService.pushDeviceRealtimeStatus(status); deviceService.cacheDeviceRealtimeStatusToMemory(status.getDeviceId(), status); } return Result.success(); } catch (Exception e) { log.error("批量推送设备状态失败: error={}", e.getMessage(), e); return Result.error("批量推送设备状态失败"); } } }
|
4. 数据模型定义
4.1 设备实时状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
@Data public class DeviceRealtimeStatus {
private String deviceId;
private String status;
private String data;
private Date timestamp; }
|
4.2 设备实时信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
@Data public class DeviceRealtimeInfo {
private String deviceId;
private String status;
private String data;
private Date timestamp; }
|
4.3 设备状态记录(数据库)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
|
@Data @TableName("device_status_record") public class DeviceStatusRecord {
@TableId(type = IdType.AUTO) private Long id;
private String deviceId;
private String status;
private String data;
private Date timestamp;
private Date createTime; }
|
5. 数据库Mapper实现
5.1 DeviceMapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
@Mapper public interface DeviceMapper extends BaseMapper<DeviceStatusRecord> {
@Select("SELECT * FROM device_status_record " + "WHERE device_id = #{deviceId} " + "ORDER BY timestamp DESC LIMIT 1") DeviceStatusRecord selectLatestDeviceStatus(@Param("deviceId") String deviceId);
@Insert("<script>" + "INSERT INTO device_status_record (device_id, status, data, timestamp, create_time) " + "VALUES " + "<foreach collection='records' item='record' separator=','>" + "(#{record.deviceId}, #{record.status}, #{record.data}, #{record.timestamp}, #{record.createTime})" + "</foreach>" + "</script>") void batchInsertDeviceStatus(@Param("records") List<DeviceStatusRecord> records); }
|
6. 配置类
6.1 Redis配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
@Configuration public class RedisConfig {
@Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(mapper); template.setValueSerializer(serializer); template.setKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(serializer); template.afterPropertiesSet(); return template; } }
|
6.2 定时任务配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
@Configuration @EnableScheduling public class ScheduledConfig implements SchedulingConfigurer {
@Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.setScheduler(taskExecutor()); }
@Bean(destroyMethod = "shutdown") public Executor taskExecutor() { return Executors.newScheduledThreadPool(10); } }
|
7. 性能优化策略
7.1 多级缓存策略
系统采用三级缓存架构:
- 内存缓存(Caffeine):最快,存储设备最新状态,延迟<1ms
- Redis缓存:次快,存储准实时状态,延迟<5ms
- 数据库:最慢,存储历史状态,延迟10-50ms
7.2 批量处理优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
@Service public class BatchProcessService {
@Autowired private RedisTemplate<String, Object> redisTemplate;
public List<DeviceRealtimeInfo> batchGetDeviceInfo(List<String> deviceIds) { List<Object> results = redisTemplate.executePipelined( new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { StringRedisConnection stringRedisConn = (StringRedisConnection) connection; for (String deviceId : deviceIds) { String key = "device:realtime:" + deviceId; stringRedisConn.get(key); } return null; } } ); List<DeviceRealtimeInfo> deviceInfoList = new ArrayList<>(); for (Object result : results) { if (result != null) { deviceInfoList.add((DeviceRealtimeInfo) result); } } return deviceInfoList; } }
|
7.3 异步处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
@Service @Slf4j public class AsyncProcessService {
@Autowired private DeviceService deviceService;
@Async("deviceTaskExecutor") public void asyncSyncDeviceStatus(DeviceRealtimeStatus status) { try { deviceService.syncToDatabase(Collections.singletonMap( status.getDeviceId(), status)); } catch (Exception e) { log.error("异步同步设备状态失败: deviceId={}, error={}", status.getDeviceId(), e.getMessage(), e); } } }
|
8. 监控与告警
8.1 缓存监控
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
@Service @Slf4j public class CacheMonitorService {
@Autowired private DeviceRealtimeCache deviceRealtimeCache;
@Scheduled(fixedRate = 60000) public void monitorCache() { CacheStats stats = deviceRealtimeCache.getCacheStats(); log.info("缓存统计: hitCount={}, missCount={}, hitRate={}, evictionCount={}", stats.hitCount(), stats.missCount(), stats.hitRate(), stats.evictionCount()); if (stats.hitRate() < 0.8) { log.warn("缓存命中率过低: hitRate={}", stats.hitRate()); } } }
|
9. 总结
本文详细介绍了设备实时信息处理的Java微服务架构实现,包括:
- 用户网关服务:负责用户请求接入、身份认证、请求路由
- 设备服务:负责设备状态管理、实时数据处理、缓存管理
- 多级缓存架构:内存缓存 + Redis缓存 + 数据库
- 定时同步机制:定期将内存状态同步到缓存和数据库
- 性能优化:批量处理、异步处理、Pipeline优化
该架构具有以下优势:
- 高性能:多级缓存架构,查询延迟<5ms
- 高可用:缓存降级,数据库兜底
- 可扩展:微服务架构,支持水平扩展
- 实时性:内存缓存保证最新状态实时可用
通过本文的实战代码,可以快速搭建一个高性能的设备实时信息处理系统。