Redis热点key治理深入实战

1. 概述

1.1 热点key治理的实战重要性

热点key治理在真实项目中是必须解决的核心问题,特别是在高并发、大流量场景下,热点key可能导致Redis单点压力过大,甚至导致系统崩溃。

真实项目中的挑战

  • 高并发场景:大量并发请求集中在单个key
  • 流量突增:突发流量导致热点key访问激增
  • 系统稳定性:热点key可能影响整个系统的稳定性
  • 资源消耗:热点key可能导致Redis资源耗尽

1.2 本文重点

本文将从实战角度深入解析Redis热点key治理:

  1. 热点key检测实战:实时检测、统计分析、告警机制
  2. 本地缓存深度优化:多级缓存、缓存更新策略、一致性保证
  3. 分片策略实战:动态分片、负载均衡、数据一致性
  4. 限流保护实战:多级限流、动态限流、降级策略
  5. 预热机制实战:启动预热、定时预热、智能预热
  6. 监控告警:实时监控、指标收集、自动告警
  7. 真实项目案例:从实际项目中总结的经验

2. 热点key检测实战

2.1 实时检测系统

2.1.1 基于Redis命令统计

方法:通过拦截Redis命令,统计每个key的访问频率。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Component
public class RedisCommandInterceptor implements CommandListener {

private final Map<String, AtomicLong> keyAccessCount = new ConcurrentHashMap<>();
private final Map<String, Long> keyLastAccessTime = new ConcurrentHashMap<>();

@Override
public void commandExecuted(CommandEvent event) {
// 解析命令,提取key
String key = extractKey(event.getCommand());
if (key != null) {
// 统计访问次数
keyAccessCount.computeIfAbsent(key, k -> new AtomicLong(0)).incrementAndGet();
keyLastAccessTime.put(key, System.currentTimeMillis());
}
}

/**
* 获取热点key(实时统计)
*/
public List<HotKey> getHotKeys(int threshold, long timeWindow) {
long currentTime = System.currentTimeMillis();
List<HotKey> hotKeys = new ArrayList<>();

for (Map.Entry<String, AtomicLong> entry : keyAccessCount.entrySet()) {
String key = entry.getKey();
Long lastAccessTime = keyLastAccessTime.get(key);

// 检查时间窗口
if (lastAccessTime != null && (currentTime - lastAccessTime) < timeWindow) {
long accessCount = entry.getValue().get();
double qps = accessCount * 1000.0 / timeWindow;

if (qps > threshold) {
HotKey hotKey = new HotKey();
hotKey.setKey(key);
hotKey.setAccessCount(accessCount);
hotKey.setQps(qps);
hotKey.setLastAccessTime(new Date(lastAccessTime));
hotKeys.add(hotKey);
}
}
}

// 按QPS排序
hotKeys.sort((a, b) -> Double.compare(b.getQps(), a.getQps()));

return hotKeys;
}

private String extractKey(Command command) {
// 解析Redis命令,提取key
// 简化处理
return null;
}
}

2.1.2 基于AOP统计

方法:通过AOP拦截Redis操作,统计key访问。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Aspect
@Component
public class RedisKeyStatisticsAspect {

private final Map<String, AtomicLong> keyAccessCount = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> keyAccessTime = new ConcurrentHashMap<>();

@Around("execution(* org.springframework.data.redis.core.RedisTemplate.*(..))")
public Object interceptRedisOperation(ProceedingJoinPoint joinPoint) throws Throwable {
long startTime = System.currentTimeMillis();

try {
Object result = joinPoint.proceed();

// 提取key
String key = extractKey(joinPoint);
if (key != null) {
// 统计访问
recordKeyAccess(key, System.currentTimeMillis() - startTime);
}

return result;
} catch (Exception e) {
log.error("Redis operation failed", e);
throw e;
}
}

/**
* 记录key访问
*/
private void recordKeyAccess(String key, long responseTime) {
// 统计访问次数
keyAccessCount.computeIfAbsent(key, k -> new AtomicLong(0)).incrementAndGet();

// 统计响应时间
keyAccessTime.computeIfAbsent(key, k -> new AtomicLong(0))
.addAndGet(responseTime);
}

/**
* 获取热点key统计
*/
public Map<String, KeyStatistics> getKeyStatistics() {
Map<String, KeyStatistics> statistics = new HashMap<>();

for (Map.Entry<String, AtomicLong> entry : keyAccessCount.entrySet()) {
String key = entry.getKey();
long accessCount = entry.getValue().get();
long totalTime = keyAccessTime.getOrDefault(key, new AtomicLong(0)).get();

KeyStatistics stats = new KeyStatistics();
stats.setKey(key);
stats.setAccessCount(accessCount);
stats.setAvgResponseTime(totalTime / (double) accessCount);
stats.setQps(calculateQps(key, accessCount));

statistics.put(key, stats);
}

return statistics;
}

private double calculateQps(String key, long accessCount) {
// 计算QPS(简化处理)
return accessCount / 60.0; // 假设统计周期为1分钟
}
}

2.2 热点key告警

2.2.1 实时告警

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Component
public class HotKeyAlert {

@Autowired
private HotKeyDetector hotKeyDetector;

@Autowired
private AlertService alertService;

/**
* 实时监控热点key
*/
@Scheduled(fixedDelay = 10000) // 每10秒检查一次
public void monitorHotKeys() {
// 获取热点key(QPS > 1000)
List<HotKey> hotKeys = hotKeyDetector.getHotKeys(1000);

for (HotKey hotKey : hotKeys) {
// 发送告警
alertService.sendAlert(
"热点key检测",
String.format("Key: %s, QPS: %.2f, 访问次数: %d",
hotKey.getKey(),
hotKey.getQps(),
hotKey.getAccessCount())
);

// 自动治理
autoGovernance(hotKey);
}
}

/**
* 自动治理
*/
private void autoGovernance(HotKey hotKey) {
// 1. 自动加入本地缓存
hotKeyLocalCache.addHotKey(hotKey.getKey());

// 2. 自动分片(如果QPS > 5000)
if (hotKey.getQps() > 5000) {
hotKeySharding.enableSharding(hotKey.getKey());
}

// 3. 自动限流(如果QPS > 10000)
if (hotKey.getQps() > 10000) {
hotKeyRateLimiter.enableRateLimit(hotKey.getKey());
}
}
}

3. 本地缓存深度优化

3.1 多级缓存架构

3.1.1 三级缓存架构

架构:L1(本地缓存) → L2(Redis) → L3(数据库)

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Service
public class MultiLevelCacheService {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private UserMapper userMapper;

// L1: 本地缓存(Caffeine)
private final Cache<String, User> l1Cache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.expireAfterAccess(3, TimeUnit.MINUTES)
.recordStats()
.build();

/**
* 获取用户(三级缓存)
*/
public User getUser(Long userId) {
String cacheKey = "user:" + userId;

// 1. L1缓存(本地缓存)
User user = l1Cache.getIfPresent(cacheKey);
if (user != null) {
return user;
}

// 2. L2缓存(Redis)
String userJson = redisTemplate.opsForValue().get(cacheKey);
if (userJson != null) {
user = JSON.parseObject(userJson, User.class);
// 回填L1缓存
l1Cache.put(cacheKey, user);
return user;
}

// 3. L3缓存(数据库)
user = userMapper.selectById(userId);
if (user != null) {
// 写入L2缓存
redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(user), 1, TimeUnit.HOURS);
// 写入L1缓存
l1Cache.put(cacheKey, user);
}

return user;
}
}

3.2 缓存更新策略

3.2.1 主动更新

策略:数据更新时,主动更新多级缓存。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Service
public class CacheUpdateService {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private Cache<String, User> localCache;

/**
* 更新用户(主动更新缓存)
*/
@Transactional
public void updateUser(User user) {
String cacheKey = "user:" + user.getId();

// 1. 更新数据库
userMapper.updateById(user);

// 2. 更新L2缓存(Redis)
redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(user), 1, TimeUnit.HOURS);

// 3. 更新L1缓存(本地缓存)
localCache.put(cacheKey, user);

// 4. 通知其他实例更新本地缓存(可选)
notifyOtherInstances(cacheKey, user);
}

/**
* 通知其他实例
*/
private void notifyOtherInstances(String cacheKey, User user) {
// 通过消息队列通知其他实例
CacheUpdateEvent event = new CacheUpdateEvent();
event.setKey(cacheKey);
event.setEventType("UPDATE");
event.setData(JSON.toJSONString(user));

kafkaTemplate.send("cache-update", JSON.toJSONString(event));
}
}

3.2.2 被动更新

策略:缓存失效时,被动更新缓存。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Service
public class CacheRefreshService {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private Cache<String, User> localCache;

/**
* 刷新缓存(被动更新)
*/
public User refreshCache(Long userId) {
String cacheKey = "user:" + userId;

// 1. 从数据库加载
User user = userMapper.selectById(userId);
if (user != null) {
// 2. 更新L2缓存
redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(user), 1, TimeUnit.HOURS);

// 3. 更新L1缓存
localCache.put(cacheKey, user);
}

return user;
}
}

4. 分片策略实战

4.1 动态分片

4.1.1 基于QPS动态分片

策略:根据key的QPS动态调整分片数量。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@Service
public class DynamicHotKeySharding {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private HotKeyDetector hotKeyDetector;

// key的分片配置
private final Map<String, ShardConfig> shardConfigs = new ConcurrentHashMap<>();

/**
* 获取数据(动态分片)
*/
public String get(String key) {
ShardConfig config = getShardConfig(key);

if (config.getShardCount() == 1) {
// 不分片,直接读取
return redisTemplate.opsForValue().get(key);
} else {
// 分片读取
int shardIndex = calculateShardIndex(key, config.getShardCount());
String shardKey = key + ":shard:" + shardIndex;
return redisTemplate.opsForValue().get(shardKey);
}
}

/**
* 设置数据(动态分片)
*/
public void set(String key, String value) {
ShardConfig config = getShardConfig(key);

if (config.getShardCount() == 1) {
// 不分片,直接写入
redisTemplate.opsForValue().set(key, value, 1, TimeUnit.HOURS);
} else {
// 分片写入(写入所有分片)
for (int i = 0; i < config.getShardCount(); i++) {
String shardKey = key + ":shard:" + i;
redisTemplate.opsForValue().set(shardKey, value, 1, TimeUnit.HOURS);
}
}
}

/**
* 获取分片配置
*/
private ShardConfig getShardConfig(String key) {
return shardConfigs.computeIfAbsent(key, k -> {
// 检查是否是热点key
HotKey hotKey = hotKeyDetector.getHotKey(k);
if (hotKey != null) {
// 根据QPS计算分片数量
int shardCount = calculateShardCount(hotKey.getQps());
return new ShardConfig(shardCount);
} else {
// 非热点key,不分片
return new ShardConfig(1);
}
});
}

/**
* 计算分片数量
*/
private int calculateShardCount(double qps) {
if (qps > 10000) {
return 20; // 超热点key,20个分片
} else if (qps > 5000) {
return 10; // 热点key,10个分片
} else if (qps > 2000) {
return 5; // 中等热点key,5个分片
} else {
return 1; // 不分片
}
}

/**
* 计算分片索引
*/
private int calculateShardIndex(String key, int shardCount) {
// 使用一致性Hash
return Math.abs(key.hashCode()) % shardCount;
}
}

@Data
class ShardConfig {
private int shardCount;

public ShardConfig(int shardCount) {
this.shardCount = shardCount;
}
}

4.2 一致性Hash分片

4.2.1 实现一致性Hash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Service
public class ConsistentHashSharding {

@Autowired
private List<RedisTemplate<String, String>> redisTemplates;

private final ConsistentHash<String> consistentHash;

public ConsistentHashSharding() {
// 初始化一致性Hash
List<String> nodes = new ArrayList<>();
for (int i = 0; i < redisTemplates.size(); i++) {
nodes.add("node-" + i);
}
this.consistentHash = new ConsistentHash<>(nodes, 160); // 160个虚拟节点
}

/**
* 获取数据(一致性Hash分片)
*/
public String get(String key) {
// 选择节点
String node = consistentHash.get(key);
int nodeIndex = getNodeIndex(node);

// 从对应节点读取
return redisTemplates.get(nodeIndex).opsForValue().get(key);
}

/**
* 设置数据(一致性Hash分片)
*/
public void set(String key, String value) {
// 选择节点
String node = consistentHash.get(key);
int nodeIndex = getNodeIndex(node);

// 写入对应节点
redisTemplates.get(nodeIndex).opsForValue().set(key, value, 1, TimeUnit.HOURS);
}

private int getNodeIndex(String node) {
return Integer.parseInt(node.split("-")[1]);
}
}

5. 限流保护实战

5.1 多级限流

5.1.1 应用级限流 + Redis级限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Service
public class MultiLevelRateLimiter {

@Autowired
private RedisTemplate<String, String> redisTemplate;

// 应用级限流(Guava RateLimiter)
private final Map<String, RateLimiter> appRateLimiters = new ConcurrentHashMap<>();

/**
* 获取数据(多级限流)
*/
public String get(String key) {
// 1. 应用级限流
RateLimiter appLimiter = appRateLimiters.computeIfAbsent(key, k ->
RateLimiter.create(1000) // 每秒1000次
);

if (!appLimiter.tryAcquire()) {
throw new BusinessException("应用级限流:访问过于频繁");
}

// 2. Redis级限流
if (!redisRateLimit(key)) {
throw new BusinessException("Redis级限流:访问过于频繁");
}

// 3. 访问Redis
return redisTemplate.opsForValue().get(key);
}

/**
* Redis级限流
*/
private boolean redisRateLimit(String key) {
String limitKey = "rate_limit:" + key;
String countStr = redisTemplate.opsForValue().get(limitKey);
int count = countStr == null ? 0 : Integer.parseInt(countStr);

// 限流阈值:每秒1000次
if (count >= 1000) {
return false;
}

// 增加计数
redisTemplate.opsForValue().increment(limitKey);
redisTemplate.expire(limitKey, 1, TimeUnit.SECONDS);

return true;
}
}

5.2 动态限流

5.2.1 基于系统负载动态调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Service
public class DynamicRateLimiter {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private SystemMetrics systemMetrics;

/**
* 获取限流阈值(动态调整)
*/
private int getRateLimitThreshold(String key) {
// 基础阈值
int baseThreshold = 1000;

// 获取系统负载
double cpuUsage = systemMetrics.getCpuUsage();
double memoryUsage = systemMetrics.getMemoryUsage();

// 根据系统负载调整阈值
if (cpuUsage > 0.8 || memoryUsage > 0.8) {
// 系统负载高,降低阈值
return (int) (baseThreshold * 0.5);
} else if (cpuUsage > 0.6 || memoryUsage > 0.6) {
// 系统负载中等,适度降低阈值
return (int) (baseThreshold * 0.7);
} else {
// 系统负载正常,使用基础阈值
return baseThreshold;
}
}

/**
* 限流检查
*/
public boolean tryAcquire(String key) {
int threshold = getRateLimitThreshold(key);
return redisRateLimit(key, threshold);
}

private boolean redisRateLimit(String key, int threshold) {
String limitKey = "rate_limit:" + key;
String countStr = redisTemplate.opsForValue().get(limitKey);
int count = countStr == null ? 0 : Integer.parseInt(countStr);

if (count >= threshold) {
return false;
}

redisTemplate.opsForValue().increment(limitKey);
redisTemplate.expire(limitKey, 1, TimeUnit.SECONDS);
return true;
}
}

6. 预热机制实战

6.1 智能预热

6.1.1 基于历史数据预热

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Component
public class IntelligentHotKeyPreloader {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private HotKeyDetector hotKeyDetector;

@Autowired
private Cache<String, String> localCache;

/**
* 智能预热(基于历史数据)
*/
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
public void intelligentPreload() {
// 1. 获取历史热点key(过去7天的数据)
List<HotKey> historicalHotKeys = getHistoricalHotKeys(7);

// 2. 预测今日热点key
List<String> predictedHotKeys = predictHotKeys(historicalHotKeys);

// 3. 预热预测的热点key
for (String key : predictedHotKeys) {
try {
String value = redisTemplate.opsForValue().get(key);
if (value != null) {
localCache.put(key, value);
}
} catch (Exception e) {
log.error("Preload key failed: {}", key, e);
}
}

log.info("Intelligent preload completed: {} keys", predictedHotKeys.size());
}

/**
* 获取历史热点key
*/
private List<HotKey> getHistoricalHotKeys(int days) {
// 从历史数据中获取热点key
// 简化处理
return new ArrayList<>();
}

/**
* 预测热点key
*/
private List<String> predictHotKeys(List<HotKey> historicalHotKeys) {
// 基于历史数据预测今日热点key
// 简化处理:选择过去7天平均QPS最高的key
return historicalHotKeys.stream()
.filter(hk -> hk.getAvgQps() > 1000)
.map(HotKey::getKey)
.collect(Collectors.toList());
}
}

6.2 渐进式预热

6.2.1 分批预热

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Component
public class GradualHotKeyPreloader {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private HotKeyDetector hotKeyDetector;

@Autowired
private Cache<String, String> localCache;

/**
* 渐进式预热(分批预热,避免系统压力)
*/
@Scheduled(fixedDelay = 60000) // 每分钟执行一次
public void gradualPreload() {
// 1. 获取热点key列表
List<HotKey> hotKeys = hotKeyDetector.getHotKeys(1000);

// 2. 分批预热(每批10个)
int batchSize = 10;
for (int i = 0; i < hotKeys.size(); i += batchSize) {
int end = Math.min(i + batchSize, hotKeys.size());
List<HotKey> batch = hotKeys.subList(i, end);

// 预热当前批次
preloadBatch(batch);

// 等待一段时间,避免系统压力
try {
Thread.sleep(1000); // 等待1秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

/**
* 预热批次
*/
private void preloadBatch(List<HotKey> batch) {
for (HotKey hotKey : batch) {
try {
String value = redisTemplate.opsForValue().get(hotKey.getKey());
if (value != null) {
localCache.put(hotKey.getKey(), value);
}
} catch (Exception e) {
log.error("Preload key failed: {}", hotKey.getKey(), e);
}
}
}
}

7. 监控告警实战

7.1 实时监控

7.1.1 监控指标收集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Component
public class HotKeyMonitor {

@Autowired
private MeterRegistry meterRegistry;

@Autowired
private HotKeyDetector hotKeyDetector;

/**
* 监控热点key
*/
@Scheduled(fixedDelay = 5000) // 每5秒执行一次
public void monitorHotKeys() {
// 1. 获取热点key列表
List<HotKey> hotKeys = hotKeyDetector.getHotKeys(1000);

// 2. 记录监控指标
for (HotKey hotKey : hotKeys) {
// QPS指标
meterRegistry.gauge("hotkey.qps",
Tags.of("key", hotKey.getKey()),
hotKey.getQps());

// 访问次数指标
meterRegistry.gauge("hotkey.access_count",
Tags.of("key", hotKey.getKey()),
hotKey.getAccessCount());

// 响应时间指标
meterRegistry.gauge("hotkey.response_time",
Tags.of("key", hotKey.getKey()),
hotKey.getAvgResponseTime());
}

// 3. 热点key数量
meterRegistry.gauge("hotkey.count", hotKeys.size());
}
}

7.2 自动告警

7.2.1 多级告警

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Component
public class HotKeyAlert {

@Autowired
private HotKeyDetector hotKeyDetector;

@Autowired
private AlertService alertService;

/**
* 多级告警
*/
@Scheduled(fixedDelay = 10000) // 每10秒检查一次
public void multiLevelAlert() {
List<HotKey> hotKeys = hotKeyDetector.getHotKeys(1000);

for (HotKey hotKey : hotKeys) {
double qps = hotKey.getQps();

if (qps > 10000) {
// 严重告警:QPS > 10000
alertService.sendCriticalAlert(
"热点key严重告警",
String.format("Key: %s, QPS: %.2f, 需要立即处理", hotKey.getKey(), qps)
);

// 自动启用所有治理措施
enableAllGovernance(hotKey);

} else if (qps > 5000) {
// 警告:QPS > 5000
alertService.sendWarningAlert(
"热点key警告",
String.format("Key: %s, QPS: %.2f, 建议处理", hotKey.getKey(), qps)
);

// 自动启用部分治理措施
enablePartialGovernance(hotKey);

} else if (qps > 2000) {
// 提示:QPS > 2000
alertService.sendInfoAlert(
"热点key提示",
String.format("Key: %s, QPS: %.2f, 持续监控", hotKey.getKey(), qps)
);
}
}
}

/**
* 启用所有治理措施
*/
private void enableAllGovernance(HotKey hotKey) {
// 1. 启用本地缓存
hotKeyLocalCache.addHotKey(hotKey.getKey());

// 2. 启用分片(20个分片)
hotKeySharding.enableSharding(hotKey.getKey(), 20);

// 3. 启用限流(每秒5000次)
hotKeyRateLimiter.enableRateLimit(hotKey.getKey(), 5000);
}

/**
* 启用部分治理措施
*/
private void enablePartialGovernance(HotKey hotKey) {
// 1. 启用本地缓存
hotKeyLocalCache.addHotKey(hotKey.getKey());

// 2. 启用分片(10个分片)
hotKeySharding.enableSharding(hotKey.getKey(), 10);
}
}

8. 真实项目案例

8.1 案例:高并发商品详情页

8.1.1 场景

场景:电商平台商品详情页,某些热门商品QPS达到5万+。

挑战

  • 单个商品key访问量巨大
  • Redis单点压力过大
  • 需要保证响应速度

8.1.2 解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
@Service
public class ProductDetailService {

@Autowired
private ProductMapper productMapper;

@Autowired
private RedisTemplate<String, String> redisTemplate;

// L1缓存:本地缓存(热点商品)
private final Cache<String, ProductDetail> l1Cache = Caffeine.newBuilder()
.maximumSize(5000)
.expireAfterWrite(3, TimeUnit.MINUTES)
.build();

// 热点商品分片配置
private final Map<Long, Integer> productShardConfig = new ConcurrentHashMap<>();

/**
* 获取商品详情(综合治理)
*/
public ProductDetail getProductDetail(Long productId) {
String cacheKey = "product:detail:" + productId;

// 1. L1缓存(本地缓存)
ProductDetail product = l1Cache.getIfPresent(cacheKey);
if (product != null) {
return product;
}

// 2. 检查是否是热点商品
boolean isHotProduct = isHotProduct(productId);

if (isHotProduct) {
// 热点商品:使用分片
product = getFromShardedCache(productId);
} else {
// 非热点商品:直接读取
product = getFromRedis(cacheKey);
}

if (product == null) {
// 3. 从数据库加载
product = loadFromDatabase(productId);

if (product != null) {
// 写入缓存
if (isHotProduct) {
setToShardedCache(productId, product);
} else {
setToRedis(cacheKey, product);
}
}
}

// 4. 回填L1缓存(如果是热点商品)
if (isHotProduct && product != null) {
l1Cache.put(cacheKey, product);
}

return product;
}

/**
* 从分片缓存获取
*/
private ProductDetail getFromShardedCache(Long productId) {
int shardCount = productShardConfig.getOrDefault(productId, 10);
int shardIndex = (int) (productId % shardCount);

String shardKey = "product:detail:" + productId + ":shard:" + shardIndex;
String productJson = redisTemplate.opsForValue().get(shardKey);

if (productJson != null) {
return JSON.parseObject(productJson, ProductDetail.class);
}

return null;
}

/**
* 写入分片缓存
*/
private void setToShardedCache(Long productId, ProductDetail product) {
int shardCount = productShardConfig.getOrDefault(productId, 10);

// 写入所有分片
for (int i = 0; i < shardCount; i++) {
String shardKey = "product:detail:" + productId + ":shard:" + i;
redisTemplate.opsForValue().set(
shardKey,
JSON.toJSONString(product),
1,
TimeUnit.HOURS
);
}
}

/**
* 检查是否是热点商品
*/
private boolean isHotProduct(Long productId) {
String cacheKey = "product:detail:" + productId;
return hotKeyDetector.isHotKey(cacheKey);
}
}

9. 总结

9.1 核心要点

  1. 实时检测:通过AOP、命令拦截等方式实时检测热点key
  2. 多级缓存:L1本地缓存 + L2 Redis + L3数据库
  3. 动态分片:根据QPS动态调整分片数量
  4. 多级限流:应用级限流 + Redis级限流
  5. 智能预热:基于历史数据预测和预热
  6. 监控告警:实时监控、多级告警、自动治理

9.2 关键理解

  1. 检测是基础:首先要能实时检测到热点key
  2. 本地缓存最有效:本地缓存是最有效的治理方案
  3. 动态调整:根据实际情况动态调整治理策略
  4. 自动化治理:自动检测、自动告警、自动治理

9.3 最佳实践

  1. 实时检测:实时检测热点key,及时发现问题
  2. 多级缓存:使用多级缓存架构,提高性能
  3. 动态分片:根据QPS动态调整分片数量
  4. 多级限流:应用级和Redis级双重限流保护
  5. 智能预热:基于历史数据智能预热
  6. 监控告警:实时监控,多级告警,自动治理

相关文章