Redis 热点 key 怎么治理?

1. 概述

1.1 热点key的重要性

热点key是Redis性能优化的核心问题之一,热点key可能导致Redis单点压力过大,影响系统性能和稳定性。

热点key的影响

  • 单点压力:大量请求集中在单个key,导致Redis单点压力过大
  • 性能下降:热点key可能导致Redis性能下降
  • 系统不稳定:热点key可能导致系统不稳定
  • 资源浪费:热点key可能导致资源浪费

1.2 热点key的定义

热点key:访问频率远高于其他key的key。

热点key的特征

  • 访问频率高:QPS远高于其他key
  • 访问集中:大量请求集中在单个key
  • 影响范围大:影响整个系统的性能

1.3 本文内容结构

本文将从以下几个方面全面解析Redis热点key治理:

  1. 热点key检测:如何检测热点key
  2. 本地缓存方案:使用本地缓存减少Redis压力
  3. 分片方案:将热点key分片到多个key
  4. 限流方案:对热点key进行限流
  5. 预热方案:预热热点key到本地缓存
  6. 其他方案:其他治理方案
  7. 实战案例:实际项目中的热点key治理

2. 热点key检测

2.1 检测方法

2.1.1 基于监控统计

方法:通过监控系统统计每个key的访问频率。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Component
public class HotKeyDetector {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private MeterRegistry meterRegistry;

/**
* 检测热点key
*/
public List<HotKey> detectHotKeys(int threshold) {
// 从监控系统获取key访问统计
Map<String, Long> keyAccessCount = getKeyAccessCount();

List<HotKey> hotKeys = new ArrayList<>();
for (Map.Entry<String, Long> entry : keyAccessCount.entrySet()) {
if (entry.getValue() > threshold) {
HotKey hotKey = new HotKey();
hotKey.setKey(entry.getKey());
hotKey.setAccessCount(entry.getValue());
hotKey.setQps(entry.getValue() / 60.0); // 假设统计周期为1分钟
hotKeys.add(hotKey);
}
}

return hotKeys;
}

/**
* 记录key访问
*/
public void recordKeyAccess(String key) {
// 记录到监控系统
meterRegistry.counter("redis.key.access", "key", key).increment();
}
}

2.1.2 基于Redis监控

方法:通过Redis的MONITOR命令或INFO命令监控key访问。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Component
public class RedisHotKeyMonitor {

@Autowired
private RedisTemplate<String, String> redisTemplate;

private final Map<String, AtomicLong> keyAccessCount = new ConcurrentHashMap<>();

/**
* 监控Redis命令
*/
@PostConstruct
public void startMonitoring() {
new Thread(() -> {
Jedis jedis = new Jedis("localhost", 6379);
jedis.monitor(new JedisMonitor() {
@Override
public void onCommand(String command) {
// 解析命令,提取key
String key = extractKey(command);
if (key != null) {
keyAccessCount.computeIfAbsent(key, k -> new AtomicLong(0)).incrementAndGet();
}
}
});
}).start();
}

/**
* 获取热点key
*/
public List<HotKey> getHotKeys(int threshold) {
List<HotKey> hotKeys = new ArrayList<>();
for (Map.Entry<String, AtomicLong> entry : keyAccessCount.entrySet()) {
if (entry.getValue().get() > threshold) {
HotKey hotKey = new HotKey();
hotKey.setKey(entry.getKey());
hotKey.setAccessCount(entry.getValue().get());
hotKeys.add(hotKey);
}
}
return hotKeys;
}

private String extractKey(String command) {
// 解析Redis命令,提取key
// 简化处理,实际需要完整解析
return null;
}
}

2.2 实时检测

2.2.1 基于AOP拦截

方法:通过AOP拦截Redis操作,统计key访问频率。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Aspect
@Component
public class RedisKeyAccessAspect {

@Autowired
private HotKeyDetector hotKeyDetector;

private final Map<String, AtomicLong> keyAccessCount = new ConcurrentHashMap<>();

@Around("execution(* org.springframework.data.redis.core.RedisTemplate.*(..))")
public Object interceptRedisOperation(ProceedingJoinPoint joinPoint) throws Throwable {
// 提取key
String key = extractKey(joinPoint);

if (key != null) {
// 记录访问
keyAccessCount.computeIfAbsent(key, k -> new AtomicLong(0)).incrementAndGet();
hotKeyDetector.recordKeyAccess(key);
}

return joinPoint.proceed();
}

private String extractKey(ProceedingJoinPoint joinPoint) {
Object[] args = joinPoint.getArgs();
if (args.length > 0 && args[0] instanceof String) {
return (String) args[0];
}
return null;
}
}

3. 本地缓存方案

3.1 原理

3.1.1 基本思路

本地缓存方案:将热点key的数据缓存到应用本地,减少对Redis的访问。

优势

  • 减少Redis压力:减少对Redis的访问
  • 提高响应速度:本地缓存访问速度更快
  • 降低网络开销:减少网络请求

缺点

  • 内存占用:本地缓存占用应用内存
  • 数据一致性:需要处理数据一致性问题
  • 多实例问题:多实例环境下,每个实例都需要缓存

3.2 实现代码

3.2.1 Caffeine本地缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Service
public class UserService {

@Autowired
private UserMapper userMapper;

@Autowired
private RedisTemplate<String, String> redisTemplate;

// 本地缓存(Caffeine)
private final Cache<String, User> localCache = Caffeine.newBuilder()
.maximumSize(10000) // 最大缓存数量
.expireAfterWrite(5, TimeUnit.MINUTES) // 写入后5分钟过期
.expireAfterAccess(3, TimeUnit.MINUTES) // 访问后3分钟过期
.recordStats() // 启用统计
.build();

/**
* 获取用户(多级缓存:本地缓存 + Redis)
*/
public User getUser(Long userId) {
String cacheKey = "user:" + userId;

// 1. 从本地缓存获取
User user = localCache.getIfPresent(cacheKey);
if (user != null) {
return user;
}

// 2. 从Redis获取
String userJson = redisTemplate.opsForValue().get(cacheKey);
if (userJson != null) {
user = JSON.parseObject(userJson, User.class);
// 回填本地缓存
localCache.put(cacheKey, user);
return user;
}

// 3. 从数据库获取
user = userMapper.selectById(userId);
if (user != null) {
// 写入Redis
redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(user), 1, TimeUnit.HOURS);
// 写入本地缓存
localCache.put(cacheKey, user);
}

return user;
}

/**
* 更新用户(更新多级缓存)
*/
public void updateUser(User user) {
String cacheKey = "user:" + user.getId();

// 1. 更新数据库
userMapper.updateById(user);

// 2. 删除Redis缓存
redisTemplate.delete(cacheKey);

// 3. 删除本地缓存
localCache.invalidate(cacheKey);

// 4. 可选:发送消息通知其他实例删除本地缓存
notifyCacheInvalidation(cacheKey);
}
}

3.2.2 热点key自动识别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Service
public class HotKeyLocalCache {

@Autowired
private RedisTemplate<String, String> redisTemplate;

// 热点key本地缓存
private final Cache<String, String> hotKeyCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(1, TimeUnit.MINUTES)
.build();

// key访问统计
private final Map<String, AtomicLong> keyAccessCount = new ConcurrentHashMap<>();

/**
* 获取数据(自动识别热点key)
*/
public String get(String key) {
// 1. 统计访问
long accessCount = keyAccessCount.computeIfAbsent(key, k -> new AtomicLong(0))
.incrementAndGet();

// 2. 判断是否是热点key(访问次数 > 100)
if (accessCount > 100) {
// 从本地缓存获取
String value = hotKeyCache.getIfPresent(key);
if (value != null) {
return value;
}

// 从Redis获取
value = redisTemplate.opsForValue().get(key);
if (value != null) {
// 写入本地缓存
hotKeyCache.put(key, value);
}
return value;
} else {
// 非热点key,直接从Redis获取
return redisTemplate.opsForValue().get(key);
}
}
}

4. 分片方案

4.1 原理

4.1.1 基本思路

分片方案:将热点key分片到多个key,分散访问压力。

优势

  • 分散压力:将单个key的压力分散到多个key
  • 提高并发:多个key可以并发访问
  • 易于扩展:可以动态增加分片数量

缺点

  • 实现复杂:需要实现分片逻辑
  • 数据一致性:需要保证分片数据的一致性
  • 查询复杂:查询时需要聚合多个分片的数据

4.2 实现代码

4.2.1 简单分片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Service
public class HotKeySharding {

@Autowired
private RedisTemplate<String, String> redisTemplate;

private static final int SHARD_COUNT = 10; // 分片数量

/**
* 写入数据(分片)
*/
public void set(String key, String value) {
// 1. 计算分片key
List<String> shardKeys = getShardKeys(key);

// 2. 写入所有分片
for (String shardKey : shardKeys) {
redisTemplate.opsForValue().set(shardKey, value, 1, TimeUnit.HOURS);
}
}

/**
* 读取数据(分片)
*/
public String get(String key) {
// 1. 计算分片key
List<String> shardKeys = getShardKeys(key);

// 2. 随机选择一个分片读取(负载均衡)
String shardKey = shardKeys.get(new Random().nextInt(shardKeys.size()));
return redisTemplate.opsForValue().get(shardKey);
}

/**
* 获取分片key列表
*/
private List<String> getShardKeys(String key) {
List<String> shardKeys = new ArrayList<>();
for (int i = 0; i < SHARD_COUNT; i++) {
shardKeys.add(key + ":shard:" + i);
}
return shardKeys;
}
}

4.2.2 基于Hash分片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Service
public class HotKeyHashSharding {

@Autowired
private RedisTemplate<String, String> redisTemplate;

private static final int SHARD_COUNT = 10;

/**
* 写入数据(Hash分片)
*/
public void set(String key, String value) {
// 1. 计算分片索引
int shardIndex = getShardIndex(key);
String shardKey = key + ":shard:" + shardIndex;

// 2. 写入分片
redisTemplate.opsForValue().set(shardKey, value, 1, TimeUnit.HOURS);
}

/**
* 读取数据(Hash分片)
*/
public String get(String key) {
// 1. 计算分片索引
int shardIndex = getShardIndex(key);
String shardKey = key + ":shard:" + shardIndex;

// 2. 从分片读取
return redisTemplate.opsForValue().get(shardKey);
}

/**
* 计算分片索引
*/
private int getShardIndex(String key) {
// 使用key的hash值计算分片索引
return Math.abs(key.hashCode()) % SHARD_COUNT;
}
}

4.2.3 动态分片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Service
public class DynamicHotKeySharding {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private HotKeyDetector hotKeyDetector;

// key的分片数量配置
private final Map<String, Integer> keyShardCount = new ConcurrentHashMap<>();

/**
* 获取数据(动态分片)
*/
public String get(String key) {
// 1. 获取分片数量(默认1,热点key增加分片)
int shardCount = getShardCount(key);

if (shardCount == 1) {
// 非热点key,直接读取
return redisTemplate.opsForValue().get(key);
} else {
// 热点key,从分片读取
int shardIndex = getShardIndex(key, shardCount);
String shardKey = key + ":shard:" + shardIndex;
return redisTemplate.opsForValue().get(shardKey);
}
}

/**
* 设置数据(动态分片)
*/
public void set(String key, String value) {
// 1. 获取分片数量
int shardCount = getShardCount(key);

if (shardCount == 1) {
// 非热点key,直接写入
redisTemplate.opsForValue().set(key, value, 1, TimeUnit.HOURS);
} else {
// 热点key,写入所有分片
for (int i = 0; i < shardCount; i++) {
String shardKey = key + ":shard:" + i;
redisTemplate.opsForValue().set(shardKey, value, 1, TimeUnit.HOURS);
}
}
}

/**
* 获取分片数量
*/
private int getShardCount(String key) {
// 检查是否是热点key
if (hotKeyDetector.isHotKey(key)) {
// 热点key,增加分片数量
return keyShardCount.computeIfAbsent(key, k -> 10);
} else {
// 非热点key,不分片
return 1;
}
}

/**
* 计算分片索引
*/
private int getShardIndex(String key, int shardCount) {
return Math.abs(key.hashCode()) % shardCount;
}
}

5. 限流方案

5.1 原理

5.1.1 基本思路

限流方案:对热点key的访问进行限流,防止过度访问。

优势

  • 保护Redis:防止热点key过度访问Redis
  • 保证稳定性:保证系统稳定性
  • 资源保护:保护Redis资源

缺点

  • 可能影响业务:限流可能影响正常业务
  • 需要合理设置:需要合理设置限流阈值

5.2 实现代码

5.2.1 基于令牌桶限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Service
public class HotKeyRateLimiter {

@Autowired
private RedisTemplate<String, String> redisTemplate;

// 热点key限流器
private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>();

/**
* 获取数据(限流保护)
*/
public String get(String key) {
// 1. 检查是否是热点key
if (isHotKey(key)) {
// 2. 获取限流器
RateLimiter rateLimiter = getRateLimiter(key);

// 3. 尝试获取令牌
if (!rateLimiter.tryAcquire()) {
// 限流,返回降级数据或抛出异常
throw new BusinessException("热点key访问过于频繁,请稍后再试");
}
}

// 4. 访问Redis
return redisTemplate.opsForValue().get(key);
}

/**
* 获取限流器
*/
private RateLimiter getRateLimiter(String key) {
return rateLimiters.computeIfAbsent(key, k -> {
// 热点key限流:每秒1000次
return RateLimiter.create(1000);
});
}

private boolean isHotKey(String key) {
// 检查是否是热点key
return hotKeyDetector.isHotKey(key);
}
}

5.2.2 基于Redis限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Service
public class RedisHotKeyRateLimiter {

@Autowired
private RedisTemplate<String, String> redisTemplate;

/**
* 获取数据(Redis限流)
*/
public String get(String key) {
// 1. 检查是否是热点key
if (isHotKey(key)) {
// 2. Redis限流检查
String limitKey = "rate_limit:hot_key:" + key;
String countStr = redisTemplate.opsForValue().get(limitKey);
int count = countStr == null ? 0 : Integer.parseInt(countStr);

// 限流阈值:每秒1000次
if (count >= 1000) {
throw new BusinessException("热点key访问过于频繁,请稍后再试");
}

// 增加计数
redisTemplate.opsForValue().increment(limitKey);
redisTemplate.expire(limitKey, 1, TimeUnit.SECONDS);
}

// 3. 访问Redis
return redisTemplate.opsForValue().get(key);
}
}

6. 预热方案

6.1 原理

6.1.1 基本思路

预热方案:在系统启动或定时任务中,将热点key的数据预热到本地缓存。

优势

  • 减少冷启动:减少系统启动时的缓存未命中
  • 提高性能:提前加载热点数据,提高响应速度
  • 降低压力:减少对Redis的访问压力

6.2 实现代码

6.2.1 启动预热

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Component
public class HotKeyPreloader {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private HotKeyDetector hotKeyDetector;

@Autowired
private Cache<String, String> localCache;

/**
* 系统启动时预热热点key
*/
@PostConstruct
public void preloadHotKeys() {
// 1. 获取热点key列表
List<HotKey> hotKeys = hotKeyDetector.getHotKeys(1000);

// 2. 预热到本地缓存
for (HotKey hotKey : hotKeys) {
String value = redisTemplate.opsForValue().get(hotKey.getKey());
if (value != null) {
localCache.put(hotKey.getKey(), value);
}
}

log.info("Preloaded {} hot keys", hotKeys.size());
}
}

6.2.2 定时预热

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Component
public class HotKeyPreloader {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private HotKeyDetector hotKeyDetector;

@Autowired
private Cache<String, String> localCache;

/**
* 定时预热热点key(每5分钟执行一次)
*/
@Scheduled(fixedDelay = 300000)
public void preloadHotKeys() {
// 1. 获取热点key列表
List<HotKey> hotKeys = hotKeyDetector.getHotKeys(1000);

// 2. 预热到本地缓存
for (HotKey hotKey : hotKeys) {
try {
String value = redisTemplate.opsForValue().get(hotKey.getKey());
if (value != null) {
localCache.put(hotKey.getKey(), value);
}
} catch (Exception e) {
log.error("Preload hot key failed: {}", hotKey.getKey(), e);
}
}

log.info("Preloaded {} hot keys", hotKeys.size());
}
}

7. 其他方案

7.1 互斥锁方案

7.1.1 原理

互斥锁方案:在缓存失效时,使用分布式锁确保只有一个线程进行缓存重建,防止缓存击穿。

优势

  • 防止缓存击穿:避免大量请求同时访问数据库
  • 保证数据一致性:确保缓存重建的一致性

7.1.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Service
public class HotKeyMutexLock {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private RedissonClient redissonClient;

@Autowired
private UserMapper userMapper;

/**
* 获取用户(互斥锁防止缓存击穿)
*/
public User getUser(Long userId) {
String cacheKey = "user:" + userId;

// 1. 从缓存获取
String userJson = redisTemplate.opsForValue().get(cacheKey);
if (userJson != null) {
return JSON.parseObject(userJson, User.class);
}

// 2. 缓存未命中,获取分布式锁
String lockKey = "lock:user:" + userId;
RLock lock = redissonClient.getLock(lockKey);

try {
// 尝试加锁,最多等待100ms,锁定10秒
if (lock.tryLock(100, 10, TimeUnit.MILLISECONDS)) {
try {
// 3. 双重检查(其他线程可能已经重建缓存)
userJson = redisTemplate.opsForValue().get(cacheKey);
if (userJson != null) {
return JSON.parseObject(userJson, User.class);
}

// 4. 从数据库加载
User user = userMapper.selectById(userId);
if (user != null) {
// 5. 写入缓存
redisTemplate.opsForValue().set(
cacheKey,
JSON.toJSONString(user),
1,
TimeUnit.HOURS
);
}

return user;
} finally {
lock.unlock();
}
} else {
// 获取锁失败,等待一段时间后重试
Thread.sleep(50);
return getUser(userId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BusinessException("获取锁被中断", e);
}
}
}

7.2 设置合理的过期时间

7.2.1 原理

设置合理的过期时间:为热点key设置适当的过期时间,防止缓存数据长期占用内存,同时避免缓存雪崩。

策略

  • 随机过期时间:避免大量key同时过期
  • 分层过期时间:不同类型的数据设置不同的过期时间
  • 动态调整:根据业务特点动态调整过期时间

7.2.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Service
public class HotKeyExpirationStrategy {

@Autowired
private RedisTemplate<String, String> redisTemplate;

/**
* 设置缓存(随机过期时间)
*/
public void set(String key, String value, int baseExpireSeconds) {
// 随机过期时间:基础时间 ± 20%
int randomOffset = (int) (baseExpireSeconds * 0.2 * (Math.random() - 0.5));
int expireSeconds = baseExpireSeconds + randomOffset;

redisTemplate.opsForValue().set(key, value, expireSeconds, TimeUnit.SECONDS);
}

/**
* 设置热点key缓存(分层过期时间)
*/
public void setHotKey(String key, String value, KeyType keyType) {
int expireSeconds;

switch (keyType) {
case USER_INFO:
expireSeconds = 3600; // 用户信息:1小时
break;
case PRODUCT_INFO:
expireSeconds = 1800; // 商品信息:30分钟
break;
case ORDER_INFO:
expireSeconds = 7200; // 订单信息:2小时
break;
default:
expireSeconds = 3600;
}

// 添加随机偏移,避免缓存雪崩
int randomOffset = (int) (expireSeconds * 0.1 * (Math.random() - 0.5));
expireSeconds += randomOffset;

redisTemplate.opsForValue().set(key, value, expireSeconds, TimeUnit.SECONDS);
}
}

enum KeyType {
USER_INFO,
PRODUCT_INFO,
ORDER_INFO
}

7.3 读写分离

7.3.1 原理

读写分离:热点key的读操作分散到多个Redis实例。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Service
public class HotKeyReadWriteSplit {

@Autowired
private RedisTemplate<String, String> masterRedis;

@Autowired
private List<RedisTemplate<String, String>> slaveRedisList;

/**
* 读取数据(读写分离)
*/
public String get(String key) {
if (isHotKey(key)) {
// 热点key,从从库读取(负载均衡)
RedisTemplate<String, String> slaveRedis =
slaveRedisList.get(new Random().nextInt(slaveRedisList.size()));
return slaveRedis.opsForValue().get(key);
} else {
// 非热点key,从主库读取
return masterRedis.opsForValue().get(key);
}
}

/**
* 写入数据(写主库)
*/
public void set(String key, String value) {
// 写入主库
masterRedis.opsForValue().set(key, value, 1, TimeUnit.HOURS);
}
}

7.4 异步更新

7.4.1 原理

异步更新:热点key的更新操作异步处理,减少对Redis的压力。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Service
public class HotKeyAsyncUpdate {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private ExecutorService asyncExecutor;

/**
* 更新数据(异步更新)
*/
public void set(String key, String value) {
if (isHotKey(key)) {
// 热点key,异步更新
asyncExecutor.submit(() -> {
redisTemplate.opsForValue().set(key, value, 1, TimeUnit.HOURS);
});
} else {
// 非热点key,同步更新
redisTemplate.opsForValue().set(key, value, 1, TimeUnit.HOURS);
}
}
}

8. 综合方案

8.1 完整治理方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@Service
public class HotKeyGovernance {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private HotKeyDetector hotKeyDetector;

@Autowired
private Cache<String, String> localCache;

@Autowired
private RedissonClient redissonClient;

/**
* 获取数据(综合治理方案)
*/
public String get(String key) {
// 1. 检查是否是热点key
boolean isHotKey = hotKeyDetector.isHotKey(key);

if (isHotKey) {
// 热点key治理方案
return getHotKey(key);
} else {
// 非热点key,正常处理
return getNormalKey(key);
}
}

/**
* 热点key获取(综合方案)
*/
private String getHotKey(String key) {
// 1. 从本地缓存获取(减少Redis压力)
String value = localCache.getIfPresent(key);
if (value != null) {
return value;
}

// 2. 限流保护
if (!rateLimiter.tryAcquire(key)) {
// 限流,返回降级数据
return getFallbackValue(key);
}

// 3. 从Redis获取(可能使用分片)
value = getFromRedis(key);

if (value != null) {
// 4. 回填本地缓存
localCache.put(key, value);
} else {
// 5. 缓存未命中,使用互斥锁防止缓存击穿
value = getWithMutexLock(key);
}

return value;
}

/**
* 非热点key获取
*/
private String getNormalKey(String key) {
// 1. 从Redis获取
String value = redisTemplate.opsForValue().get(key);
if (value != null) {
return value;
}

// 2. 缓存未命中,使用互斥锁防止缓存击穿
return getWithMutexLock(key);
}

/**
* 使用互斥锁获取数据(防止缓存击穿)
*/
private String getWithMutexLock(String key) {
String lockKey = "lock:" + key;
RLock lock = redissonClient.getLock(lockKey);

try {
if (lock.tryLock(100, 10, TimeUnit.MILLISECONDS)) {
try {
// 双重检查
String value = redisTemplate.opsForValue().get(key);
if (value != null) {
return value;
}

// 从数据库加载(这里简化处理)
value = loadFromDatabase(key);

if (value != null) {
// 写入缓存(随机过期时间)
int expireSeconds = 3600 + (int)(Math.random() * 600); // 1小时 ± 10分钟
redisTemplate.opsForValue().set(key, value, expireSeconds, TimeUnit.SECONDS);
}

return value;
} finally {
lock.unlock();
}
} else {
// 获取锁失败,等待后重试
Thread.sleep(50);
return get(key);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BusinessException("获取锁被中断", e);
}
}

/**
* 从Redis获取(可能使用分片)
*/
private String getFromRedis(String key) {
// 如果使用了分片,从分片读取
if (isSharded(key)) {
return getFromShardedRedis(key);
} else {
return redisTemplate.opsForValue().get(key);
}
}

private String loadFromDatabase(String key) {
// 从数据库加载数据
return null;
}

private String getFallbackValue(String key) {
// 返回降级数据
return null;
}

private boolean isSharded(String key) {
// 检查是否使用了分片
return false;
}

private String getFromShardedRedis(String key) {
// 从分片Redis获取
return null;
}
}

9. 实战案例

9.1 案例:电商商品详情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Service
public class ProductService {

@Autowired
private ProductMapper productMapper;

@Autowired
private RedisTemplate<String, String> redisTemplate;

// 本地缓存(热点商品)
private final Cache<String, Product> localCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build();

/**
* 获取商品详情(热点key治理)
*/
public Product getProduct(Long productId) {
String cacheKey = "product:" + productId;

// 1. 从本地缓存获取(热点商品)
Product product = localCache.getIfPresent(cacheKey);
if (product != null) {
return product;
}

// 2. 从Redis获取
String productJson = redisTemplate.opsForValue().get(cacheKey);
if (productJson != null) {
product = JSON.parseObject(productJson, Product.class);
// 检查是否是热点商品(访问次数 > 1000)
if (isHotProduct(productId)) {
// 回填本地缓存
localCache.put(cacheKey, product);
}
return product;
}

// 3. 从数据库获取
product = productMapper.selectById(productId);
if (product != null) {
// 写入Redis
redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(product), 1, TimeUnit.HOURS);
// 如果是热点商品,写入本地缓存
if (isHotProduct(productId)) {
localCache.put(cacheKey, product);
}
}

return product;
}

private boolean isHotProduct(Long productId) {
// 检查是否是热点商品
return hotKeyDetector.isHotKey("product:" + productId);
}
}

10. 总结

10.1 核心要点

  1. 热点key检测:通过监控、AOP等方式检测热点key
  2. 本地缓存:使用本地缓存减少Redis压力(最有效方案)
  3. 分片方案:将热点key分片到多个key,分散压力
  4. 限流方案:对热点key进行限流保护,防止系统过载
  5. 预热方案:预热热点key到本地缓存,减少冷启动
  6. 互斥锁:使用分布式锁防止缓存击穿
  7. 合理过期时间:设置随机过期时间,避免缓存雪崩
  8. 综合方案:组合使用多种方案,综合治理

10.2 关键理解

  1. 检测是基础:首先要能检测到热点key
  2. 本地缓存优先:本地缓存是最有效的方案,优先使用
  3. 分片分散压力:分片可以分散单个key的压力
  4. 限流保护系统:限流可以保护系统不被热点key压垮
  5. 互斥锁防击穿:使用分布式锁防止缓存击穿
  6. 随机过期时间:避免大量key同时过期导致缓存雪崩

10.3 最佳实践

  1. 实时检测:实时检测热点key,及时发现问题
  2. 本地缓存:热点key优先使用本地缓存,减少Redis压力
  3. 分片处理:超热点key使用分片,分散访问压力
  4. 限流保护:对热点key进行限流,保护系统稳定性
  5. 互斥锁:缓存失效时使用互斥锁,防止缓存击穿
  6. 随机过期:设置随机过期时间,避免缓存雪崩
  7. 监控告警:监控热点key的访问情况,及时告警
  8. 预热机制:系统启动或定时预热热点key,减少冷启动

相关文章