1. 分布式缓存架构概述

在现代微服务架构中,分布式缓存是提升系统性能和可用性的关键技术。通过将数据缓存在多个节点上,结合Kafka消息队列实现数据同步,可以构建高可用、高并发的缓存系统。本文将详细介绍分布式缓存的设计原理、Kafka集成方案以及生产级实现。

1.1 分布式缓存的重要性

  1. 性能提升: 减少数据库访问,提高响应速度
  2. 高可用性: 多节点冗余,避免单点故障
  3. 水平扩展: 支持动态扩容,应对高并发
  4. 数据一致性: 通过消息队列保证数据同步
  5. 故障恢复: 自动故障转移和恢复机制

1.2 核心设计原则

  • 一致性: 保证缓存数据的一致性
  • 可用性: 确保服务的高可用性
  • 分区容错: 支持网络分区和节点故障
  • 性能优化: 最大化缓存命中率和响应速度
  • 监控告警: 完善的监控和告警机制

1.3 技术架构

  1. 缓存层: Redis集群、本地缓存
  2. 消息层: Kafka消息队列
  3. 应用层: Spring Boot应用集群
  4. 监控层: Prometheus + Grafana
  5. 负载均衡: Nginx + 服务发现

2. 原始缓存代码分析与优化

2.1 原始代码问题分析

2.2 问题总结

  1. 单机限制: 使用本地Map,无法支持多实例部署
  2. 数据不一致: 多实例间数据不同步
  3. 内存泄漏: 定时清理机制不够精确
  4. 性能瓶颈: 同步操作影响性能
  5. 故障恢复: 缺乏故障转移机制

3. 高可用分布式缓存架构设计

3.1 整体架构设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 分布式缓存架构设计
@Configuration
@EnableCaching
public class DistributedCacheConfig {

@Bean
public CacheManager cacheManager() {
RedisCacheManager.Builder builder = RedisCacheManager
.RedisCacheManagerBuilder
.fromConnectionFactory(redisConnectionFactory())
.cacheDefaults(cacheConfiguration());
return builder.build();
}

@Bean
public RedisConnectionFactory redisConnectionFactory() {
LettuceConnectionFactory factory = new LettuceConnectionFactory();
factory.setClusterConfiguration(clusterConfiguration());
return factory;
}

private RedisClusterConfiguration clusterConfiguration() {
RedisClusterConfiguration config = new RedisClusterConfiguration();
config.setClusterNodes(Arrays.asList(
new RedisNode("192.168.1.10", 7000),
new RedisNode("192.168.1.11", 7000),
new RedisNode("192.168.1.12", 7000),
new RedisNode("192.168.1.13", 7000),
new RedisNode("192.168.1.14", 7000),
new RedisNode("192.168.1.15", 7000)
));
config.setMaxRedirects(3);
return config;
}

private RedisCacheConfiguration cacheConfiguration() {
return RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(30))
.serializeKeysWith(RedisSerializationContext.SerializationPair
.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()));
}
}

3.2 缓存服务层设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@Service
@Slf4j
public class DistributedCacheService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@Autowired
private UserService userService;

private static final String TOKEN_CACHE_PREFIX = "token:";
private static final String USER_INFO_CACHE_PREFIX = "user:";
private static final String HEARTBEAT_CACHE_PREFIX = "heartbeat:";

// 缓存过期时间配置
private static final Duration TOKEN_EXPIRE_TIME = Duration.ofMinutes(30);
private static final Duration HEARTBEAT_EXPIRE_TIME = Duration.ofMinutes(10);

/**
* 获取用户登录信息(高可用版本)
*/
public SysUserLoginInfo getLoginInfo(String token) {
try {
// 1. 先从本地缓存获取
SysUserLoginInfo cachedInfo = getFromLocalCache(token);
if (cachedInfo != null) {
return cachedInfo;
}

// 2. 从Redis集群获取
cachedInfo = getFromRedisCache(token);
if (cachedInfo != null) {
// 更新本地缓存
updateLocalCache(token, cachedInfo);
return cachedInfo;
}

// 3. 从数据库获取并更新缓存
cachedInfo = loadFromDatabase(token);
if (cachedInfo != null) {
updateCache(token, cachedInfo);
// 发送Kafka消息通知其他节点
sendCacheUpdateMessage(token, cachedInfo);
}

return cachedInfo;

} catch (Exception e) {
log.error("获取用户登录信息失败, token: {}", token, e);
// 降级策略:直接从数据库获取
return loadFromDatabase(token);
}
}

/**
* 从Redis集群获取缓存
*/
private SysUserLoginInfo getFromRedisCache(String token) {
try {
String cacheKey = TOKEN_CACHE_PREFIX + token;
Object cached = redisTemplate.opsForValue().get(cacheKey);

if (cached instanceof SysUserLoginInfo) {
// 更新心跳时间
updateHeartbeat(token);
return (SysUserLoginInfo) cached;
}

return null;
} catch (Exception e) {
log.error("从Redis获取缓存失败, token: {}", token, e);
return null;
}
}

/**
* 更新缓存(Redis + 本地缓存)
*/
private void updateCache(String token, SysUserLoginInfo userInfo) {
try {
String cacheKey = TOKEN_CACHE_PREFIX + token;
String heartbeatKey = HEARTBEAT_CACHE_PREFIX + token;

// 更新Redis缓存
redisTemplate.opsForValue().set(cacheKey, userInfo, TOKEN_EXPIRE_TIME);
redisTemplate.opsForValue().set(heartbeatKey, System.currentTimeMillis(), HEARTBEAT_EXPIRE_TIME);

// 更新本地缓存
updateLocalCache(token, userInfo);

} catch (Exception e) {
log.error("更新缓存失败, token: {}", token, e);
}
}

/**
* 检查缓存是否需要更新
*/
private boolean shouldUpdateCache(String token) {
try {
String heartbeatKey = HEARTBEAT_CACHE_PREFIX + token;
Long lastHeartbeat = (Long) redisTemplate.opsForValue().get(heartbeatKey);

if (lastHeartbeat == null) {
return true;
}

long currentTime = System.currentTimeMillis();
return (currentTime - lastHeartbeat) > HEARTBEAT_EXPIRE_TIME.toMillis();

} catch (Exception e) {
log.error("检查缓存更新条件失败, token: {}", token, e);
return true;
}
}

/**
* 更新心跳时间
*/
private void updateHeartbeat(String token) {
try {
String heartbeatKey = HEARTBEAT_CACHE_PREFIX + token;
redisTemplate.opsForValue().set(heartbeatKey, System.currentTimeMillis(), HEARTBEAT_EXPIRE_TIME);
} catch (Exception e) {
log.error("更新心跳时间失败, token: {}", token, e);
}
}

/**
* 从数据库加载用户信息
*/
private SysUserLoginInfo loadFromDatabase(String token) {
try {
return userService.getUserByToken(token);
} catch (Exception e) {
log.error("从数据库加载用户信息失败, token: {}", token, e);
return null;
}
}

/**
* 发送缓存更新消息
*/
private void sendCacheUpdateMessage(String token, SysUserLoginInfo userInfo) {
try {
CacheUpdateMessage message = new CacheUpdateMessage();
message.setToken(token);
message.setUserInfo(userInfo);
message.setTimestamp(System.currentTimeMillis());
message.setOperation("UPDATE");

kafkaTemplate.send("cache-update-topic", token, message);

} catch (Exception e) {
log.error("发送缓存更新消息失败, token: {}", token, e);
}
}
}

3.3 本地缓存管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@Component
@Slf4j
public class LocalCacheManager {

// 本地缓存(L1缓存)
private final Cache<String, SysUserLoginInfo> localCache;
private final Cache<String, Long> heartbeatCache;

public LocalCacheManager() {
// 使用Caffeine作为本地缓存
this.localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(Duration.ofMinutes(5))
.expireAfterAccess(Duration.ofMinutes(2))
.recordStats()
.build();

this.heartbeatCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(Duration.ofMinutes(10))
.build();
}

/**
* 从本地缓存获取
*/
public SysUserLoginInfo getFromLocalCache(String token) {
try {
SysUserLoginInfo cached = localCache.getIfPresent(token);
if (cached != null) {
// 检查心跳时间
Long lastHeartbeat = heartbeatCache.getIfPresent(token);
if (lastHeartbeat != null) {
long currentTime = System.currentTimeMillis();
if ((currentTime - lastHeartbeat) < Duration.ofMinutes(10).toMillis()) {
return cached;
}
}
}
return null;
} catch (Exception e) {
log.error("从本地缓存获取失败, token: {}", token, e);
return null;
}
}

/**
* 更新本地缓存
*/
public void updateLocalCache(String token, SysUserLoginInfo userInfo) {
try {
localCache.put(token, userInfo);
heartbeatCache.put(token, System.currentTimeMillis());
} catch (Exception e) {
log.error("更新本地缓存失败, token: {}", token, e);
}
}

/**
* 删除本地缓存
*/
public void removeFromLocalCache(String token) {
try {
localCache.invalidate(token);
heartbeatCache.invalidate(token);
} catch (Exception e) {
log.error("删除本地缓存失败, token: {}", token, e);
}
}

/**
* 清空本地缓存
*/
public void clearLocalCache() {
try {
localCache.invalidateAll();
heartbeatCache.invalidateAll();
} catch (Exception e) {
log.error("清空本地缓存失败", e);
}
}

/**
* 获取缓存统计信息
*/
public CacheStats getCacheStats() {
return localCache.stats();
}
}

4. Kafka消息队列集成

4.1 Kafka配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Configuration
@EnableKafka
public class KafkaConfig {

@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${kafka.consumer.group-id}")
private String groupId;

@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

// 高可用配置
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

// 高可用配置
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
configProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

return new DefaultKafkaConsumerFactory<>(configProps);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

// 并发配置
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);

return factory;
}
}

4.2 缓存同步消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
@Component
@Slf4j
public class CacheSyncConsumer {

@Autowired
private LocalCacheManager localCacheManager;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 处理缓存更新消息
*/
@KafkaListener(topics = "cache-update-topic", groupId = "cache-sync-group")
public void handleCacheUpdate(CacheUpdateMessage message,
Acknowledgment acknowledgment) {
try {
log.info("收到缓存更新消息: token={}, operation={}",
message.getToken(), message.getOperation());

switch (message.getOperation()) {
case "UPDATE":
handleCacheUpdate(message);
break;
case "DELETE":
handleCacheDelete(message);
break;
case "CLEAR":
handleCacheClear(message);
break;
default:
log.warn("未知的缓存操作: {}", message.getOperation());
}

// 手动提交offset
acknowledgment.acknowledge();

} catch (Exception e) {
log.error("处理缓存更新消息失败", e);
// 可以选择重试或者记录错误
}
}

/**
* 处理缓存更新
*/
private void handleCacheUpdate(CacheUpdateMessage message) {
try {
String token = message.getToken();
SysUserLoginInfo userInfo = message.getUserInfo();

// 更新本地缓存
localCacheManager.updateLocalCache(token, userInfo);

// 更新Redis缓存
String cacheKey = "token:" + token;
String heartbeatKey = "heartbeat:" + token;

redisTemplate.opsForValue().set(cacheKey, userInfo, Duration.ofMinutes(30));
redisTemplate.opsForValue().set(heartbeatKey, System.currentTimeMillis(), Duration.ofMinutes(10));

log.info("缓存更新成功: token={}", token);

} catch (Exception e) {
log.error("处理缓存更新失败", e);
}
}

/**
* 处理缓存删除
*/
private void handleCacheDelete(CacheUpdateMessage message) {
try {
String token = message.getToken();

// 删除本地缓存
localCacheManager.removeFromLocalCache(token);

// 删除Redis缓存
String cacheKey = "token:" + token;
String heartbeatKey = "heartbeat:" + token;

redisTemplate.delete(cacheKey);
redisTemplate.delete(heartbeatKey);

log.info("缓存删除成功: token={}", token);

} catch (Exception e) {
log.error("处理缓存删除失败", e);
}
}

/**
* 处理缓存清空
*/
private void handleCacheClear(CacheUpdateMessage message) {
try {
// 清空本地缓存
localCacheManager.clearLocalCache();

log.info("缓存清空成功");

} catch (Exception e) {
log.error("处理缓存清空失败", e);
}
}
}

4.3 消息模型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CacheUpdateMessage implements Serializable {

private String token;
private SysUserLoginInfo userInfo;
private String operation; // UPDATE, DELETE, CLEAR
private Long timestamp;
private String sourceNode; // 发送节点标识

public CacheUpdateMessage() {
this.timestamp = System.currentTimeMillis();
this.sourceNode = getLocalNodeId();
}

private String getLocalNodeId() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
return "unknown";
}
}
}

5. 高并发场景优化

5.1 缓存预热策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@Component
@Slf4j
public class CacheWarmupService {

@Autowired
private UserService userService;

@Autowired
private DistributedCacheService cacheService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 应用启动时预热缓存
*/
@EventListener(ApplicationReadyEvent.class)
public void warmupCache() {
log.info("开始预热缓存...");

try {
// 获取活跃用户列表
List<String> activeTokens = userService.getActiveTokens();

// 分批预热
int batchSize = 100;
for (int i = 0; i < activeTokens.size(); i += batchSize) {
List<String> batch = activeTokens.subList(i,
Math.min(i + batchSize, activeTokens.size()));

warmupBatch(batch);

// 避免对数据库造成压力
Thread.sleep(100);
}

log.info("缓存预热完成,共预热 {} 个用户", activeTokens.size());

} catch (Exception e) {
log.error("缓存预热失败", e);
}
}

/**
* 批量预热
*/
private void warmupBatch(List<String> tokens) {
CompletableFuture.runAsync(() -> {
for (String token : tokens) {
try {
// 异步预热
cacheService.getLoginInfo(token);
} catch (Exception e) {
log.warn("预热缓存失败, token: {}", token, e);
}
}
});
}

/**
* 定时预热热点数据
*/
@Scheduled(fixedRate = 300000) // 5分钟执行一次
public void warmupHotData() {
try {
// 获取热点用户
List<String> hotTokens = userService.getHotTokens();

for (String token : hotTokens) {
cacheService.getLoginInfo(token);
}

log.debug("热点数据预热完成,共预热 {} 个用户", hotTokens.size());

} catch (Exception e) {
log.error("热点数据预热失败", e);
}
}
}

5.2 缓存雪崩防护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@Component
@Slf4j
public class CacheAvalancheProtection {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final Random random = new Random();

/**
* 防止缓存雪崩的过期时间
*/
public Duration getRandomExpireTime(Duration baseExpireTime) {
// 在基础过期时间基础上增加随机时间(±20%)
long baseMillis = baseExpireTime.toMillis();
long randomOffset = (long) (baseMillis * 0.2 * (random.nextDouble() - 0.5));
long finalMillis = baseMillis + randomOffset;

return Duration.ofMillis(Math.max(finalMillis, 1000)); // 最少1秒
}

/**
* 缓存穿透防护
*/
public Object getWithPenetrationProtection(String key, Supplier<Object> dataLoader) {
try {
// 先尝试从缓存获取
Object cached = redisTemplate.opsForValue().get(key);
if (cached != null) {
return cached;
}

// 使用分布式锁防止缓存击穿
String lockKey = "lock:" + key;
Boolean lockAcquired = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "locked", Duration.ofSeconds(10));

if (lockAcquired) {
try {
// 双重检查
cached = redisTemplate.opsForValue().get(key);
if (cached != null) {
return cached;
}

// 从数据源加载
Object data = dataLoader.get();

if (data != null) {
// 缓存数据
Duration expireTime = getRandomExpireTime(Duration.ofMinutes(30));
redisTemplate.opsForValue().set(key, data, expireTime);
} else {
// 缓存空值,防止缓存穿透
redisTemplate.opsForValue().set(key, "", Duration.ofMinutes(5));
}

return data;

} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
} else {
// 等待其他线程加载完成
Thread.sleep(100);
return redisTemplate.opsForValue().get(key);
}

} catch (Exception e) {
log.error("缓存穿透防护失败, key: {}", key, e);
return dataLoader.get();
}
}

/**
* 缓存击穿防护
*/
public Object getWithBreakdownProtection(String key, Supplier<Object> dataLoader) {
try {
// 使用布隆过滤器检查key是否存在
if (!bloomFilter.mightContain(key)) {
return null;
}

return getWithPenetrationProtection(key, dataLoader);

} catch (Exception e) {
log.error("缓存击穿防护失败, key: {}", key, e);
return dataLoader.get();
}
}
}

5.3 限流和熔断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Component
@Slf4j
public class CacheRateLimiter {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 基于Redis的滑动窗口限流
*/
public boolean isAllowed(String key, int maxRequests, Duration windowSize) {
try {
long currentTime = System.currentTimeMillis();
long windowStart = currentTime - windowSize.toMillis();

String script =
"local key = KEYS[1] " +
"local window_start = ARGV[1] " +
"local current_time = ARGV[2] " +
"local max_requests = ARGV[3] " +
"redis.call('ZREMRANGEBYSCORE', key, 0, window_start) " +
"local current_count = redis.call('ZCARD', key) " +
"if current_count < max_requests then " +
" redis.call('ZADD', key, current_time, current_time) " +
" redis.call('EXPIRE', key, 60) " +
" return 1 " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript,
Collections.singletonList(key),
String.valueOf(windowStart),
String.valueOf(currentTime),
String.valueOf(maxRequests));

return result != null && result == 1;

} catch (Exception e) {
log.error("限流检查失败, key: {}", key, e);
return true; // 失败时允许通过
}
}

/**
* 缓存访问限流
*/
public boolean isCacheAccessAllowed(String token) {
String key = "cache_limit:" + token;
return isAllowed(key, 100, Duration.ofMinutes(1)); // 每分钟最多100次
}
}

6. 故障转移与恢复

6.1 故障检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@Component
@Slf4j
public class CacheHealthChecker {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

private volatile boolean redisHealthy = true;
private volatile boolean kafkaHealthy = true;

/**
* 检查Redis健康状态
*/
@Scheduled(fixedRate = 30000) // 30秒检查一次
public void checkRedisHealth() {
try {
String testKey = "health_check:" + System.currentTimeMillis();
redisTemplate.opsForValue().set(testKey, "test", Duration.ofSeconds(10));
String result = (String) redisTemplate.opsForValue().get(testKey);
redisTemplate.delete(testKey);

boolean healthy = "test".equals(result);
if (healthy != redisHealthy) {
redisHealthy = healthy;
log.warn("Redis健康状态变化: {}", healthy ? "健康" : "不健康");
notifyHealthChange("redis", healthy);
}

} catch (Exception e) {
if (redisHealthy) {
redisHealthy = false;
log.error("Redis健康检查失败", e);
notifyHealthChange("redis", false);
}
}
}

/**
* 检查Kafka健康状态
*/
@Scheduled(fixedRate = 30000) // 30秒检查一次
public void checkKafkaHealth() {
try {
String testMessage = "health_check:" + System.currentTimeMillis();
kafkaTemplate.send("health-check-topic", testMessage);

boolean healthy = true;
if (healthy != kafkaHealthy) {
kafkaHealthy = healthy;
log.warn("Kafka健康状态变化: {}", healthy ? "健康" : "不健康");
notifyHealthChange("kafka", healthy);
}

} catch (Exception e) {
if (kafkaHealthy) {
kafkaHealthy = false;
log.error("Kafka健康检查失败", e);
notifyHealthChange("kafka", false);
}
}
}

/**
* 通知健康状态变化
*/
private void notifyHealthChange(String component, boolean healthy) {
try {
HealthChangeMessage message = new HealthChangeMessage();
message.setComponent(component);
message.setHealthy(healthy);
message.setTimestamp(System.currentTimeMillis());

kafkaTemplate.send("health-change-topic", message);

} catch (Exception e) {
log.error("发送健康状态变化通知失败", e);
}
}

/**
* 获取Redis健康状态
*/
public boolean isRedisHealthy() {
return redisHealthy;
}

/**
* 获取Kafka健康状态
*/
public boolean isKafkaHealthy() {
return kafkaHealthy;
}
}

6.2 故障转移策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
@Service
@Slf4j
public class CacheFailoverService {

@Autowired
private CacheHealthChecker healthChecker;

@Autowired
private LocalCacheManager localCacheManager;

@Autowired
private UserService userService;

/**
* 获取用户信息(带故障转移)
*/
public SysUserLoginInfo getLoginInfoWithFailover(String token) {
// 1. 尝试从本地缓存获取
SysUserLoginInfo userInfo = localCacheManager.getFromLocalCache(token);
if (userInfo != null) {
return userInfo;
}

// 2. 如果Redis健康,尝试从Redis获取
if (healthChecker.isRedisHealthy()) {
try {
userInfo = getFromRedisWithRetry(token);
if (userInfo != null) {
localCacheManager.updateLocalCache(token, userInfo);
return userInfo;
}
} catch (Exception e) {
log.warn("从Redis获取失败,尝试故障转移, token: {}", token, e);
}
}

// 3. 故障转移:直接从数据库获取
log.info("执行故障转移,从数据库获取用户信息, token: {}", token);
userInfo = userService.getUserByToken(token);

if (userInfo != null) {
// 更新本地缓存
localCacheManager.updateLocalCache(token, userInfo);

// 如果Redis恢复,异步更新Redis
if (healthChecker.isRedisHealthy()) {
CompletableFuture.runAsync(() -> {
try {
updateRedisCache(token, userInfo);
} catch (Exception e) {
log.warn("异步更新Redis缓存失败", e);
}
});
}
}

return userInfo;
}

/**
* 从Redis获取(带重试)
*/
private SysUserLoginInfo getFromRedisWithRetry(String token) {
int maxRetries = 3;
int retryDelay = 100; // 毫秒

for (int i = 0; i < maxRetries; i++) {
try {
// 这里应该调用RedisTemplate的方法
// 为了简化,这里用伪代码
return getFromRedis(token);
} catch (Exception e) {
if (i == maxRetries - 1) {
throw e;
}

try {
Thread.sleep(retryDelay * (i + 1));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
}
}

return null;
}

/**
* 更新Redis缓存
*/
private void updateRedisCache(String token, SysUserLoginInfo userInfo) {
// 实现Redis缓存更新逻辑
}

/**
* 从Redis获取
*/
private SysUserLoginInfo getFromRedis(String token) {
// 实现从Redis获取逻辑
return null;
}
}

7. 监控与告警

7.1 缓存监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@Component
@Slf4j
public class CacheMetrics {

private final MeterRegistry meterRegistry;
private final Counter cacheHitCounter;
private final Counter cacheMissCounter;
private final Timer cacheAccessTimer;
private final Gauge cacheSizeGauge;

public CacheMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;

// 缓存命中率指标
this.cacheHitCounter = Counter.builder("cache.hits")
.description("缓存命中次数")
.register(meterRegistry);

this.cacheMissCounter = Counter.builder("cache.misses")
.description("缓存未命中次数")
.register(meterRegistry);

// 缓存访问时间
this.cacheAccessTimer = Timer.builder("cache.access.time")
.description("缓存访问时间")
.register(meterRegistry);

// 缓存大小
this.cacheSizeGauge = Gauge.builder("cache.size")
.description("缓存大小")
.register(meterRegistry, this, CacheMetrics::getCacheSize);
}

/**
* 记录缓存命中
*/
public void recordCacheHit() {
cacheHitCounter.increment();
}

/**
* 记录缓存未命中
*/
public void recordCacheMiss() {
cacheMissCounter.increment();
}

/**
* 记录缓存访问时间
*/
public void recordAccessTime(Duration duration) {
cacheAccessTimer.record(duration);
}

/**
* 获取缓存大小
*/
private double getCacheSize() {
// 实现获取缓存大小的逻辑
return 0.0;
}

/**
* 获取缓存命中率
*/
public double getCacheHitRate() {
double hits = cacheHitCounter.count();
double misses = cacheMissCounter.count();
double total = hits + misses;

return total > 0 ? hits / total : 0.0;
}
}

7.2 告警配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Component
@Slf4j
public class CacheAlertService {

@Autowired
private CacheMetrics cacheMetrics;

@Autowired
private CacheHealthChecker healthChecker;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

/**
* 检查缓存命中率告警
*/
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkCacheHitRate() {
double hitRate = cacheMetrics.getCacheHitRate();

if (hitRate < 0.8) { // 命中率低于80%
sendAlert("CACHE_HIT_RATE_LOW",
String.format("缓存命中率过低: %.2f%%", hitRate * 100));
}
}

/**
* 检查Redis健康状态告警
*/
@Scheduled(fixedRate = 30000) // 30秒检查一次
public void checkRedisHealth() {
if (!healthChecker.isRedisHealthy()) {
sendAlert("REDIS_DOWN", "Redis服务不可用");
}
}

/**
* 检查Kafka健康状态告警
*/
@Scheduled(fixedRate = 30000) // 30秒检查一次
public void checkKafkaHealth() {
if (!healthChecker.isKafkaHealthy()) {
sendAlert("KAFKA_DOWN", "Kafka服务不可用");
}
}

/**
* 发送告警
*/
private void sendAlert(String alertType, String message) {
try {
AlertMessage alert = new AlertMessage();
alert.setAlertType(alertType);
alert.setMessage(message);
alert.setTimestamp(System.currentTimeMillis());
alert.setSeverity("WARNING");

kafkaTemplate.send("alert-topic", alert);

log.warn("发送告警: {} - {}", alertType, message);

} catch (Exception e) {
log.error("发送告警失败", e);
}
}
}

8. 性能测试与优化

8.1 性能测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@RestController
@RequestMapping("/test")
@Slf4j
public class CachePerformanceTestController {

@Autowired
private DistributedCacheService cacheService;

@Autowired
private CacheFailoverService failoverService;

/**
* 缓存性能测试
*/
@GetMapping("/cache/performance")
public ResponseEntity<Map<String, Object>> testCachePerformance(
@RequestParam(defaultValue = "1000") int requestCount,
@RequestParam(defaultValue = "10") int threadCount) {

Map<String, Object> result = new HashMap<>();

try {
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(requestCount);

long startTime = System.currentTimeMillis();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger errorCount = new AtomicInteger(0);

for (int i = 0; i < requestCount; i++) {
final int index = i;
executor.submit(() -> {
try {
String token = "test_token_" + index;
SysUserLoginInfo userInfo = cacheService.getLoginInfo(token);

if (userInfo != null) {
successCount.incrementAndGet();
}

} catch (Exception e) {
errorCount.incrementAndGet();
log.error("缓存测试失败", e);
} finally {
latch.countDown();
}
});
}

latch.await();
long endTime = System.currentTimeMillis();

executor.shutdown();

result.put("totalRequests", requestCount);
result.put("successCount", successCount.get());
result.put("errorCount", errorCount.get());
result.put("totalTime", endTime - startTime);
result.put("avgResponseTime", (endTime - startTime) / (double) requestCount);
result.put("throughput", requestCount * 1000.0 / (endTime - startTime));

return ResponseEntity.ok(result);

} catch (Exception e) {
log.error("性能测试失败", e);
result.put("error", e.getMessage());
return ResponseEntity.status(500).body(result);
}
}

/**
* 故障转移测试
*/
@GetMapping("/cache/failover")
public ResponseEntity<Map<String, Object>> testFailover() {
Map<String, Object> result = new HashMap<>();

try {
String token = "failover_test_token";
long startTime = System.currentTimeMillis();

SysUserLoginInfo userInfo = failoverService.getLoginInfoWithFailover(token);

long endTime = System.currentTimeMillis();

result.put("success", userInfo != null);
result.put("responseTime", endTime - startTime);
result.put("userInfo", userInfo);

return ResponseEntity.ok(result);

} catch (Exception e) {
log.error("故障转移测试失败", e);
result.put("error", e.getMessage());
return ResponseEntity.status(500).body(result);
}
}
}

8.2 性能优化建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Component
@Slf4j
public class CacheOptimizationService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private LocalCacheManager localCacheManager;

/**
* 缓存优化建议
*/
public Map<String, Object> getOptimizationSuggestions() {
Map<String, Object> suggestions = new HashMap<>();

try {
// 1. 检查缓存命中率
double hitRate = getCacheHitRate();
if (hitRate < 0.8) {
suggestions.put("hitRateSuggestion",
"缓存命中率较低,建议增加缓存预热和优化缓存策略");
}

// 2. 检查缓存大小
long cacheSize = getCacheSize();
if (cacheSize > 1000000) { // 100万条记录
suggestions.put("sizeSuggestion",
"缓存大小较大,建议实施缓存淘汰策略");
}

// 3. 检查内存使用
long memoryUsage = getMemoryUsage();
if (memoryUsage > 1024 * 1024 * 1024) { // 1GB
suggestions.put("memorySuggestion",
"内存使用较高,建议优化缓存数据结构");
}

// 4. 检查网络延迟
long networkLatency = getNetworkLatency();
if (networkLatency > 10) { // 10ms
suggestions.put("networkSuggestion",
"网络延迟较高,建议优化网络配置");
}

} catch (Exception e) {
log.error("获取优化建议失败", e);
}

return suggestions;
}

private double getCacheHitRate() {
// 实现获取缓存命中率的逻辑
return 0.0;
}

private long getCacheSize() {
// 实现获取缓存大小的逻辑
return 0;
}

private long getMemoryUsage() {
// 实现获取内存使用量的逻辑
return 0;
}

private long getNetworkLatency() {
// 实现获取网络延迟的逻辑
return 0;
}
}

9. 总结

通过本文的学习和实践,我们构建了一个高可用、高并发的分布式缓存系统,主要特点包括:

9.1 核心优势

  1. 高可用性: 多级缓存 + 故障转移机制
  2. 高性能: 本地缓存 + Redis集群 + 异步处理
  3. 数据一致性: Kafka消息队列保证数据同步
  4. 可扩展性: 支持水平扩展和动态扩容
  5. 监控完善: 全面的监控指标和告警机制

9.2 技术要点

  • 多级缓存架构: L1本地缓存 + L2 Redis集群
  • 消息队列集成: Kafka实现缓存同步和事件通知
  • 故障转移: 自动故障检测和降级策略
  • 性能优化: 缓存预热、限流、熔断等机制
  • 监控告警: 完善的监控指标和告警系统

9.3 最佳实践

  1. 缓存策略: 合理设置过期时间和更新策略
  2. 故障处理: 完善的故障检测和恢复机制
  3. 性能监控: 实时监控缓存性能和健康状态
  4. 容量规划: 根据业务需求合理规划缓存容量
  5. 安全防护: 防止缓存穿透、击穿、雪崩等问题

通过这套分布式缓存解决方案,可以显著提升系统的性能和可用性,为高并发场景下的业务提供强有力的技术支撑。