1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
| @Service @Slf4j public class DistributedCacheService { @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @Autowired private UserService userService; private static final String TOKEN_CACHE_PREFIX = "token:"; private static final String USER_INFO_CACHE_PREFIX = "user:"; private static final String HEARTBEAT_CACHE_PREFIX = "heartbeat:"; private static final Duration TOKEN_EXPIRE_TIME = Duration.ofMinutes(30); private static final Duration HEARTBEAT_EXPIRE_TIME = Duration.ofMinutes(10);
public SysUserLoginInfo getLoginInfo(String token) { try { SysUserLoginInfo cachedInfo = getFromLocalCache(token); if (cachedInfo != null) { return cachedInfo; } cachedInfo = getFromRedisCache(token); if (cachedInfo != null) { updateLocalCache(token, cachedInfo); return cachedInfo; } cachedInfo = loadFromDatabase(token); if (cachedInfo != null) { updateCache(token, cachedInfo); sendCacheUpdateMessage(token, cachedInfo); } return cachedInfo; } catch (Exception e) { log.error("获取用户登录信息失败, token: {}", token, e); return loadFromDatabase(token); } }
private SysUserLoginInfo getFromRedisCache(String token) { try { String cacheKey = TOKEN_CACHE_PREFIX + token; Object cached = redisTemplate.opsForValue().get(cacheKey); if (cached instanceof SysUserLoginInfo) { updateHeartbeat(token); return (SysUserLoginInfo) cached; } return null; } catch (Exception e) { log.error("从Redis获取缓存失败, token: {}", token, e); return null; } }
private void updateCache(String token, SysUserLoginInfo userInfo) { try { String cacheKey = TOKEN_CACHE_PREFIX + token; String heartbeatKey = HEARTBEAT_CACHE_PREFIX + token; redisTemplate.opsForValue().set(cacheKey, userInfo, TOKEN_EXPIRE_TIME); redisTemplate.opsForValue().set(heartbeatKey, System.currentTimeMillis(), HEARTBEAT_EXPIRE_TIME); updateLocalCache(token, userInfo); } catch (Exception e) { log.error("更新缓存失败, token: {}", token, e); } }
private boolean shouldUpdateCache(String token) { try { String heartbeatKey = HEARTBEAT_CACHE_PREFIX + token; Long lastHeartbeat = (Long) redisTemplate.opsForValue().get(heartbeatKey); if (lastHeartbeat == null) { return true; } long currentTime = System.currentTimeMillis(); return (currentTime - lastHeartbeat) > HEARTBEAT_EXPIRE_TIME.toMillis(); } catch (Exception e) { log.error("检查缓存更新条件失败, token: {}", token, e); return true; } }
private void updateHeartbeat(String token) { try { String heartbeatKey = HEARTBEAT_CACHE_PREFIX + token; redisTemplate.opsForValue().set(heartbeatKey, System.currentTimeMillis(), HEARTBEAT_EXPIRE_TIME); } catch (Exception e) { log.error("更新心跳时间失败, token: {}", token, e); } }
private SysUserLoginInfo loadFromDatabase(String token) { try { return userService.getUserByToken(token); } catch (Exception e) { log.error("从数据库加载用户信息失败, token: {}", token, e); return null; } }
private void sendCacheUpdateMessage(String token, SysUserLoginInfo userInfo) { try { CacheUpdateMessage message = new CacheUpdateMessage(); message.setToken(token); message.setUserInfo(userInfo); message.setTimestamp(System.currentTimeMillis()); message.setOperation("UPDATE"); kafkaTemplate.send("cache-update-topic", token, message); } catch (Exception e) { log.error("发送缓存更新消息失败, token: {}", token, e); } } }
|