1. Redis大Key问题概述

Redis大Key是指存储的数据量特别大的Key,通常指单个Key的value大小超过10KB,或者包含大量元素的Hash、List、Set、ZSet等复合数据结构。大Key问题在Redis使用中非常常见,会严重影响Redis的性能和稳定性,是运维人员需要重点关注和解决的问题。

1.1 大Key的定义标准

  1. String类型: value大小超过10KB
  2. Hash类型: field数量超过1000个或总大小超过10KB
  3. List类型: 元素数量超过1000个或总大小超过10KB
  4. Set类型: 元素数量超过1000个或总大小超过10KB
  5. ZSet类型: 元素数量超过1000个或总大小超过10KB

1.2 大Key产生的原因

  • 业务设计不当: 将大量数据存储在一个Key中
  • 数据聚合: 将多个小数据合并成一个大Key
  • 缓存策略: 缓存整个对象而不是部分字段
  • 历史数据: 历史数据积累导致Key变大
  • 批量操作: 批量插入数据时没有合理分片

1.3 大Key的影响

  • 内存占用: 单个Key占用大量内存
  • 网络阻塞: 大Key传输导致网络拥塞
  • 性能下降: 读写大Key性能显著下降
  • 阻塞风险: 大Key操作可能阻塞Redis
  • 内存碎片: 大Key删除后产生内存碎片

2. 大Key检测与识别

2.1 命令行检测工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#!/bin/bash
# redis-bigkey-detector.sh

# 检测大Key脚本
detect_bigkeys() {
local redis_host=$1
local redis_port=$2
local redis_password=$3
local threshold=$4

echo "开始检测Redis大Key..."

# 获取所有Key
redis-cli -h $redis_host -p $redis_port -a $redis_password --scan --pattern "*" | while read key; do
# 获取Key类型
key_type=$(redis-cli -h $redis_host -p $redis_port -a $redis_password type $key)

case $key_type in
"string")
# 检测String类型大Key
key_size=$(redis-cli -h $redis_host -p $redis_port -a $redis_password memory usage $key)
if [ $key_size -gt $threshold ]; then
echo "大Key发现: $key (String, 大小: $key_size bytes)"
fi
;;
"hash")
# 检测Hash类型大Key
field_count=$(redis-cli -h $redis_host -p $redis_port -a $redis_password hlen $key)
if [ $field_count -gt 1000 ]; then
echo "大Key发现: $key (Hash, field数量: $field_count)"
fi
;;
"list")
# 检测List类型大Key
list_length=$(redis-cli -h $redis_host -p $redis_port -a $redis_password llen $key)
if [ $list_length -gt 1000 ]; then
echo "大Key发现: $key (List, 长度: $list_length)"
fi
;;
"set")
# 检测Set类型大Key
set_size=$(redis-cli -h $redis_host -p $redis_port -a $redis_password scard $key)
if [ $set_size -gt 1000 ]; then
echo "大Key发现: $key (Set, 大小: $set_size)"
fi
;;
"zset")
# 检测ZSet类型大Key
zset_size=$(redis-cli -h $redis_host -p $redis_port -a $redis_password zcard $key)
if [ $zset_size -gt 1000 ]; then
echo "大Key发现: $key (ZSet, 大小: $zset_size)"
fi
;;
esac
done
}

# 使用示例
detect_bigkeys "127.0.0.1" 6379 "password" 10240

2.2 Python检测脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python3
# redis-bigkey-detector.py

import redis
import json
import time
from typing import Dict, List, Tuple

class RedisBigKeyDetector:
def __init__(self, host: str, port: int, password: str = None):
self.redis_client = redis.Redis(
host=host,
port=port,
password=password,
decode_responses=True
)
self.big_keys = []

def detect_big_keys(self, threshold: int = 10240) -> List[Dict]:
"""检测大Key"""
print("开始检测Redis大Key...")

# 获取所有Key
keys = self.redis_client.keys('*')
total_keys = len(keys)

for i, key in enumerate(keys):
if i % 1000 == 0:
print(f"进度: {i}/{total_keys}")

key_info = self._analyze_key(key, threshold)
if key_info:
self.big_keys.append(key_info)

return self.big_keys

def _analyze_key(self, key: str, threshold: int) -> Dict:
"""分析单个Key"""
key_type = self.redis_client.type(key)

if key_type == 'string':
return self._analyze_string_key(key, threshold)
elif key_type == 'hash':
return self._analyze_hash_key(key, threshold)
elif key_type == 'list':
return self._analyze_list_key(key, threshold)
elif key_type == 'set':
return self._analyze_set_key(key, threshold)
elif key_type == 'zset':
return self._analyze_zset_key(key, threshold)

return None

def _analyze_string_key(self, key: str, threshold: int) -> Dict:
"""分析String类型Key"""
try:
# 获取内存使用量
memory_usage = self.redis_client.memory_usage(key)
if memory_usage > threshold:
return {
'key': key,
'type': 'string',
'size': memory_usage,
'description': f'String类型,大小: {memory_usage} bytes'
}
except Exception as e:
print(f"分析String Key {key} 失败: {e}")

return None

def _analyze_hash_key(self, key: str, threshold: int) -> Dict:
"""分析Hash类型Key"""
try:
field_count = self.redis_client.hlen(key)
if field_count > 1000:
# 估算内存使用量
sample_fields = self.redis_client.hscan(key, count=10)[1]
avg_field_size = sum(len(str(k)) + len(str(v)) for k, v in sample_fields.items()) / len(sample_fields)
estimated_size = field_count * avg_field_size

return {
'key': key,
'type': 'hash',
'field_count': field_count,
'estimated_size': estimated_size,
'description': f'Hash类型,field数量: {field_count},估算大小: {estimated_size} bytes'
}
except Exception as e:
print(f"分析Hash Key {key} 失败: {e}")

return None

def _analyze_list_key(self, key: str, threshold: int) -> Dict:
"""分析List类型Key"""
try:
list_length = self.redis_client.llen(key)
if list_length > 1000:
# 估算内存使用量
sample_elements = self.redis_client.lrange(key, 0, 9)
avg_element_size = sum(len(str(elem)) for elem in sample_elements) / len(sample_elements)
estimated_size = list_length * avg_element_size

return {
'key': key,
'type': 'list',
'length': list_length,
'estimated_size': estimated_size,
'description': f'List类型,长度: {list_length},估算大小: {estimated_size} bytes'
}
except Exception as e:
print(f"分析List Key {key} 失败: {e}")

return None

def _analyze_set_key(self, key: str, threshold: int) -> Dict:
"""分析Set类型Key"""
try:
set_size = self.redis_client.scard(key)
if set_size > 1000:
# 估算内存使用量
sample_elements = list(self.redis_client.sscan(key, count=10)[1])
avg_element_size = sum(len(str(elem)) for elem in sample_elements) / len(sample_elements)
estimated_size = set_size * avg_element_size

return {
'key': key,
'type': 'set',
'size': set_size,
'estimated_size': estimated_size,
'description': f'Set类型,大小: {set_size},估算大小: {estimated_size} bytes'
}
except Exception as e:
print(f"分析Set Key {key} 失败: {e}")

return None

def _analyze_zset_key(self, key: str, threshold: int) -> Dict:
"""分析ZSet类型Key"""
try:
zset_size = self.redis_client.zcard(key)
if zset_size > 1000:
# 估算内存使用量
sample_elements = self.redis_client.zscan(key, count=10)[1]
avg_element_size = sum(len(str(k)) + 8 for k, v in sample_elements) / len(sample_elements) # 8 bytes for score
estimated_size = zset_size * avg_element_size

return {
'key': key,
'type': 'zset',
'size': zset_size,
'estimated_size': estimated_size,
'description': f'ZSet类型,大小: {zset_size},估算大小: {estimated_size} bytes'
}
except Exception as e:
print(f"分析ZSet Key {key} 失败: {e}")

return None

def generate_report(self) -> str:
"""生成检测报告"""
if not self.big_keys:
return "未发现大Key"

report = f"发现 {len(self.big_keys)} 个大Key:\n\n"

for i, key_info in enumerate(self.big_keys, 1):
report += f"{i}. {key_info['description']}\n"

return report

# 使用示例
if __name__ == "__main__":
detector = RedisBigKeyDetector("127.0.0.1", 6379, "password")
big_keys = detector.detect_big_keys(threshold=10240)
report = detector.generate_report()
print(report)

2.3 Java检测工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Redis大Key检测工具
@Component
public class RedisBigKeyDetector {
private final RedisTemplate<String, Object> redisTemplate;

public RedisBigKeyDetector(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

public List<BigKeyInfo> detectBigKeys(int threshold) {
List<BigKeyInfo> bigKeys = new ArrayList<>();

// 获取所有Key
Set<String> keys = redisTemplate.keys("*");

for (String key : keys) {
BigKeyInfo keyInfo = analyzeKey(key, threshold);
if (keyInfo != null) {
bigKeys.add(keyInfo);
}
}

return bigKeys;
}

private BigKeyInfo analyzeKey(String key, int threshold) {
DataType keyType = redisTemplate.type(key);

switch (keyType) {
case STRING:
return analyzeStringKey(key, threshold);
case HASH:
return analyzeHashKey(key, threshold);
case LIST:
return analyzeListKey(key, threshold);
case SET:
return analyzeSetKey(key, threshold);
case ZSET:
return analyzeZSetKey(key, threshold);
default:
return null;
}
}

private BigKeyInfo analyzeStringKey(String key, int threshold) {
try {
Long memoryUsage = redisTemplate.execute((RedisCallback<Long>) connection -> {
return connection.memoryUsage(key.getBytes());
});

if (memoryUsage > threshold) {
return new BigKeyInfo(key, "string", memoryUsage,
"String类型,大小: " + memoryUsage + " bytes");
}
} catch (Exception e) {
System.err.println("分析String Key " + key + " 失败: " + e.getMessage());
}

return null;
}

private BigKeyInfo analyzeHashKey(String key, int threshold) {
try {
Long fieldCount = redisTemplate.opsForHash().size(key);

if (fieldCount > 1000) {
// 估算内存使用量
Map<Object, Object> sampleFields = redisTemplate.opsForHash().entries(key);
long estimatedSize = sampleFields.entrySet().stream()
.mapToLong(entry -> entry.getKey().toString().length() +
entry.getValue().toString().length())
.sum();

return new BigKeyInfo(key, "hash", estimatedSize,
"Hash类型,field数量: " + fieldCount + ",估算大小: " + estimatedSize + " bytes");
}
} catch (Exception e) {
System.err.println("分析Hash Key " + key + " 失败: " + e.getMessage());
}

return null;
}

private BigKeyInfo analyzeListKey(String key, int threshold) {
try {
Long listLength = redisTemplate.opsForList().size(key);

if (listLength > 1000) {
// 估算内存使用量
List<Object> sampleElements = redisTemplate.opsForList().range(key, 0, 9);
long avgElementSize = sampleElements.stream()
.mapToLong(element -> element.toString().length())
.sum() / sampleElements.size();
long estimatedSize = listLength * avgElementSize;

return new BigKeyInfo(key, "list", estimatedSize,
"List类型,长度: " + listLength + ",估算大小: " + estimatedSize + " bytes");
}
} catch (Exception e) {
System.err.println("分析List Key " + key + " 失败: " + e.getMessage());
}

return null;
}

private BigKeyInfo analyzeSetKey(String key, int threshold) {
try {
Long setSize = redisTemplate.opsForSet().size(key);

if (setSize > 1000) {
// 估算内存使用量
Set<Object> sampleElements = redisTemplate.opsForSet().members(key);
long avgElementSize = sampleElements.stream()
.mapToLong(element -> element.toString().length())
.sum() / sampleElements.size();
long estimatedSize = setSize * avgElementSize;

return new BigKeyInfo(key, "set", estimatedSize,
"Set类型,大小: " + setSize + ",估算大小: " + estimatedSize + " bytes");
}
} catch (Exception e) {
System.err.println("分析Set Key " + key + " 失败: " + e.getMessage());
}

return null;
}

private BigKeyInfo analyzeZSetKey(String key, int threshold) {
try {
Long zsetSize = redisTemplate.opsForZSet().size(key);

if (zsetSize > 1000) {
// 估算内存使用量
Set<ZSetOperations.TypedTuple<Object>> sampleElements =
redisTemplate.opsForZSet().rangeWithScores(key, 0, 9);
long avgElementSize = sampleElements.stream()
.mapToLong(tuple -> tuple.getValue().toString().length() + 8) // 8 bytes for score
.sum() / sampleElements.size();
long estimatedSize = zsetSize * avgElementSize;

return new BigKeyInfo(key, "zset", estimatedSize,
"ZSet类型,大小: " + zsetSize + ",估算大小: " + estimatedSize + " bytes");
}
} catch (Exception e) {
System.err.println("分析ZSet Key " + key + " 失败: " + e.getMessage());
}

return null;
}

// 大Key信息类
public static class BigKeyInfo {
private String key;
private String type;
private Long size;
private String description;

public BigKeyInfo(String key, String type, Long size, String description) {
this.key = key;
this.type = type;
this.size = size;
this.description = description;
}

// getters and setters
public String getKey() { return key; }
public String getType() { return type; }
public Long getSize() { return size; }
public String getDescription() { return description; }
}
}

3. 大Key优化策略

3.1 数据分片策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// 数据分片服务
@Service
public class DataShardingService {
private final RedisTemplate<String, Object> redisTemplate;
private final int shardCount = 100; // 分片数量

public DataShardingService(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

// Hash类型分片
public void shardHashData(String originalKey, Map<String, Object> data) {
// 计算分片
for (Map.Entry<String, Object> entry : data.entrySet()) {
String shardKey = getShardKey(originalKey, entry.getKey());
redisTemplate.opsForHash().put(shardKey, entry.getKey(), entry.getValue());
}
}

public Object getHashData(String originalKey, String field) {
String shardKey = getShardKey(originalKey, field);
return redisTemplate.opsForHash().get(shardKey, field);
}

// List类型分片
public void shardListData(String originalKey, List<Object> data) {
int batchSize = 100; // 每片100个元素

for (int i = 0; i < data.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, data.size());
List<Object> batch = data.subList(i, endIndex);

String shardKey = getShardKey(originalKey, String.valueOf(i / batchSize));
redisTemplate.opsForList().rightPushAll(shardKey, batch);
}
}

public List<Object> getListData(String originalKey, int start, int end) {
List<Object> result = new ArrayList<>();

int batchSize = 100;
int startBatch = start / batchSize;
int endBatch = end / batchSize;

for (int batch = startBatch; batch <= endBatch; batch++) {
String shardKey = getShardKey(originalKey, String.valueOf(batch));
List<Object> batchData = redisTemplate.opsForList().range(shardKey, 0, -1);

if (batch == startBatch && batch == endBatch) {
// 同一个分片
int localStart = start % batchSize;
int localEnd = end % batchSize;
result.addAll(batchData.subList(localStart, localEnd + 1));
} else if (batch == startBatch) {
// 第一个分片
int localStart = start % batchSize;
result.addAll(batchData.subList(localStart, batchData.size()));
} else if (batch == endBatch) {
// 最后一个分片
int localEnd = end % batchSize;
result.addAll(batchData.subList(0, localEnd + 1));
} else {
// 中间分片
result.addAll(batchData);
}
}

return result;
}

// Set类型分片
public void shardSetData(String originalKey, Set<Object> data) {
int batchSize = 100;
int batchIndex = 0;

for (Object element : data) {
String shardKey = getShardKey(originalKey, String.valueOf(batchIndex));
redisTemplate.opsForSet().add(shardKey, element);

if (redisTemplate.opsForSet().size(shardKey) >= batchSize) {
batchIndex++;
}
}
}

public Set<Object> getSetData(String originalKey) {
Set<Object> result = new HashSet<>();

for (int i = 0; i < shardCount; i++) {
String shardKey = getShardKey(originalKey, String.valueOf(i));
Set<Object> shardData = redisTemplate.opsForSet().members(shardKey);
if (shardData != null) {
result.addAll(shardData);
}
}

return result;
}

// ZSet类型分片
public void shardZSetData(String originalKey, Set<ZSetOperations.TypedTuple<Object>> data) {
int batchSize = 100;
int batchIndex = 0;

for (ZSetOperations.TypedTuple<Object> tuple : data) {
String shardKey = getShardKey(originalKey, String.valueOf(batchIndex));
redisTemplate.opsForZSet().add(shardKey, tuple.getValue(), tuple.getScore());

if (redisTemplate.opsForZSet().size(shardKey) >= batchSize) {
batchIndex++;
}
}
}

public Set<ZSetOperations.TypedTuple<Object>> getZSetData(String originalKey, long start, long end) {
Set<ZSetOperations.TypedTuple<Object>> result = new HashSet<>();

for (int i = 0; i < shardCount; i++) {
String shardKey = getShardKey(originalKey, String.valueOf(i));
Set<ZSetOperations.TypedTuple<Object>> shardData =
redisTemplate.opsForZSet().rangeWithScores(shardKey, start, end);
if (shardData != null) {
result.addAll(shardData);
}
}

return result;
}

private String getShardKey(String originalKey, String shardId) {
return originalKey + ":shard:" + shardId;
}
}

3.2 数据压缩策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// 数据压缩服务
@Service
public class DataCompressionService {
private final RedisTemplate<String, Object> redisTemplate;

public DataCompressionService(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

// 压缩存储
public void compressAndStore(String key, Object data) {
try {
// 序列化
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(data);
oos.close();

byte[] serializedData = baos.toByteArray();

// 压缩
byte[] compressedData = compress(serializedData);

// 存储
redisTemplate.opsForValue().set(key, compressedData);

} catch (Exception e) {
throw new RuntimeException("压缩存储失败", e);
}
}

// 解压读取
public Object decompressAndRead(String key) {
try {
byte[] compressedData = (byte[]) redisTemplate.opsForValue().get(key);
if (compressedData == null) {
return null;
}

// 解压
byte[] decompressedData = decompress(compressedData);

// 反序列化
ByteArrayInputStream bais = new ByteArrayInputStream(decompressedData);
ObjectInputStream ois = new ObjectInputStream(bais);
Object data = ois.readObject();
ois.close();

return data;

} catch (Exception e) {
throw new RuntimeException("解压读取失败", e);
}
}

// GZIP压缩
private byte[] compress(byte[] data) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gzos = new GZIPOutputStream(baos);
gzos.write(data);
gzos.close();
return baos.toByteArray();
} catch (Exception e) {
throw new RuntimeException("压缩失败", e);
}
}

// GZIP解压
private byte[] decompress(byte[] compressedData) {
try {
ByteArrayInputStream bais = new ByteArrayInputStream(compressedData);
GZIPInputStream gzis = new GZIPInputStream(bais);
ByteArrayOutputStream baos = new ByteArrayOutputStream();

byte[] buffer = new byte[1024];
int len;
while ((len = gzis.read(buffer)) != -1) {
baos.write(buffer, 0, len);
}

gzis.close();
return baos.toByteArray();
} catch (Exception e) {
throw new RuntimeException("解压失败", e);
}
}
}

3.3 数据过期策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 数据过期策略服务
@Service
public class DataExpirationService {
private final RedisTemplate<String, Object> redisTemplate;

public DataExpirationService(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

// 设置过期时间
public void setExpiration(String key, long seconds) {
redisTemplate.expire(key, Duration.ofSeconds(seconds));
}

// 批量设置过期时间
public void batchSetExpiration(Map<String, Long> keyExpirationMap) {
for (Map.Entry<String, Long> entry : keyExpirationMap.entrySet()) {
redisTemplate.expire(entry.getKey(), Duration.ofSeconds(entry.getValue()));
}
}

// 根据访问频率设置过期时间
public void setExpirationByAccessFrequency(String key, int accessCount) {
long expirationSeconds;

if (accessCount > 1000) {
// 高频访问,设置较长过期时间
expirationSeconds = 3600; // 1小时
} else if (accessCount > 100) {
// 中频访问,设置中等过期时间
expirationSeconds = 1800; // 30分钟
} else {
// 低频访问,设置较短过期时间
expirationSeconds = 600; // 10分钟
}

redisTemplate.expire(key, Duration.ofSeconds(expirationSeconds));
}

// 清理过期数据
public void cleanupExpiredData() {
// 获取所有Key
Set<String> keys = redisTemplate.keys("*");

for (String key : keys) {
Long ttl = redisTemplate.getExpire(key);
if (ttl != null && ttl == -1) {
// 没有设置过期时间的Key,根据大小设置过期时间
Long memoryUsage = redisTemplate.execute((RedisCallback<Long>) connection -> {
return connection.memoryUsage(key.getBytes());
});

if (memoryUsage > 10240) { // 10KB
redisTemplate.expire(key, Duration.ofHours(1));
}
}
}
}
}

4. 大Key监控与告警

4.1 实时监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 大Key实时监控
@Component
public class BigKeyMonitor {
private final RedisTemplate<String, Object> redisTemplate;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final List<BigKeyAlert> alerts = new ArrayList<>();

public BigKeyMonitor(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;

// 启动监控
scheduler.scheduleAtFixedRate(this::monitorBigKeys, 0, 60, TimeUnit.SECONDS);
}

private void monitorBigKeys() {
try {
// 获取所有Key
Set<String> keys = redisTemplate.keys("*");

for (String key : keys) {
DataType keyType = redisTemplate.type(key);
Long memoryUsage = redisTemplate.execute((RedisCallback<Long>) connection -> {
return connection.memoryUsage(key.getBytes());
});

if (memoryUsage > 10240) { // 10KB阈值
BigKeyAlert alert = new BigKeyAlert(key, keyType, memoryUsage, System.currentTimeMillis());
alerts.add(alert);

// 发送告警
sendAlert(alert);
}
}

} catch (Exception e) {
System.err.println("监控大Key失败: " + e.getMessage());
}
}

private void sendAlert(BigKeyAlert alert) {
System.out.println("大Key告警: " + alert.getKey() +
" (" + alert.getType() +
", 大小: " + alert.getSize() + " bytes)");

// 这里可以集成邮件、短信、钉钉等告警方式
}

public List<BigKeyAlert> getAlerts() {
return new ArrayList<>(alerts);
}

// 大Key告警信息
public static class BigKeyAlert {
private String key;
private DataType type;
private Long size;
private Long timestamp;

public BigKeyAlert(String key, DataType type, Long size, Long timestamp) {
this.key = key;
this.type = type;
this.size = size;
this.timestamp = timestamp;
}

// getters and setters
public String getKey() { return key; }
public DataType getType() { return type; }
public Long getSize() { return size; }
public Long getTimestamp() { return timestamp; }
}
}

4.2 告警配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 大Key告警配置
bigkey:
monitor:
enabled: true
interval: 60s
threshold: 10240 # 10KB

alert:
email:
enabled: true
recipients:
- admin@example.com
- ops@example.com

webhook:
enabled: true
url: "https://hooks.slack.com/services/xxx"

sms:
enabled: false
phoneNumbers:
- "+8613800138000"

5. 大Key处理最佳实践

5.1 预防措施

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// 大Key预防服务
@Service
public class BigKeyPreventionService {
private final RedisTemplate<String, Object> redisTemplate;
private final int maxStringSize = 10240; // 10KB
private final int maxHashFields = 1000;
private final int maxListLength = 1000;
private final int maxSetSize = 1000;
private final int maxZSetSize = 1000;

public BigKeyPreventionService(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

// 检查String大小
public boolean checkStringSize(String key, String value) {
if (value.getBytes().length > maxStringSize) {
System.out.println("警告: String Key " + key + " 大小超过限制: " + value.getBytes().length + " bytes");
return false;
}
return true;
}

// 检查Hash字段数量
public boolean checkHashFields(String key, Map<String, Object> data) {
if (data.size() > maxHashFields) {
System.out.println("警告: Hash Key " + key + " 字段数量超过限制: " + data.size());
return false;
}
return true;
}

// 检查List长度
public boolean checkListLength(String key, List<Object> data) {
if (data.size() > maxListLength) {
System.out.println("警告: List Key " + key + " 长度超过限制: " + data.size());
return false;
}
return true;
}

// 检查Set大小
public boolean checkSetSize(String key, Set<Object> data) {
if (data.size() > maxSetSize) {
System.out.println("警告: Set Key " + key + " 大小超过限制: " + data.size());
return false;
}
return true;
}

// 检查ZSet大小
public boolean checkZSetSize(String key, Set<ZSetOperations.TypedTuple<Object>> data) {
if (data.size() > maxZSetSize) {
System.out.println("警告: ZSet Key " + key + " 大小超过限制: " + data.size());
return false;
}
return true;
}

// 安全存储String
public void safeStoreString(String key, String value) {
if (checkStringSize(key, value)) {
redisTemplate.opsForValue().set(key, value);
} else {
// 分片存储
shardStringData(key, value);
}
}

// 安全存储Hash
public void safeStoreHash(String key, Map<String, Object> data) {
if (checkHashFields(key, data)) {
redisTemplate.opsForHash().putAll(key, data);
} else {
// 分片存储
shardHashData(key, data);
}
}

// 分片存储String
private void shardStringData(String key, String value) {
byte[] data = value.getBytes();
int chunkSize = 1024; // 1KB per chunk

for (int i = 0; i < data.length; i += chunkSize) {
int endIndex = Math.min(i + chunkSize, data.length);
byte[] chunk = Arrays.copyOfRange(data, i, endIndex);

String chunkKey = key + ":chunk:" + (i / chunkSize);
redisTemplate.opsForValue().set(chunkKey, chunk);
}

// 存储元数据
Map<String, Object> metadata = new HashMap<>();
metadata.put("totalSize", data.length);
metadata.put("chunkCount", (data.length + chunkSize - 1) / chunkSize);
redisTemplate.opsForHash().putAll(key + ":metadata", metadata);
}

// 分片存储Hash
private void shardHashData(String key, Map<String, Object> data) {
int batchSize = 100;
int batchIndex = 0;

for (Map.Entry<String, Object> entry : data.entrySet()) {
String shardKey = key + ":shard:" + batchIndex;
redisTemplate.opsForHash().put(shardKey, entry.getKey(), entry.getValue());

if (redisTemplate.opsForHash().size(shardKey) >= batchSize) {
batchIndex++;
}
}

// 存储元数据
Map<String, Object> metadata = new HashMap<>();
metadata.put("totalFields", data.size());
metadata.put("shardCount", batchIndex + 1);
redisTemplate.opsForHash().putAll(key + ":metadata", metadata);
}
}

5.2 清理策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// 大Key清理服务
@Service
public class BigKeyCleanupService {
private final RedisTemplate<String, Object> redisTemplate;
private final DataShardingService shardingService;

public BigKeyCleanupService(RedisTemplate<String, Object> redisTemplate,
DataShardingService shardingService) {
this.redisTemplate = redisTemplate;
this.shardingService = shardingService;
}

// 清理大Key
public void cleanupBigKey(String key) {
try {
DataType keyType = redisTemplate.type(key);

switch (keyType) {
case STRING:
cleanupStringKey(key);
break;
case HASH:
cleanupHashKey(key);
break;
case LIST:
cleanupListKey(key);
break;
case SET:
cleanupSetKey(key);
break;
case ZSET:
cleanupZSetKey(key);
break;
}

} catch (Exception e) {
System.err.println("清理大Key " + key + " 失败: " + e.getMessage());
}
}

// 清理String类型大Key
private void cleanupStringKey(String key) {
// 直接删除
redisTemplate.delete(key);
System.out.println("已清理String大Key: " + key);
}

// 清理Hash类型大Key
private void cleanupHashKey(String key) {
// 分批删除字段
int batchSize = 100;
Long fieldCount = redisTemplate.opsForHash().size(key);

for (int i = 0; i < fieldCount; i += batchSize) {
Set<Object> fields = redisTemplate.opsForHash().keys(key);
List<Object> batchFields = fields.stream()
.skip(i)
.limit(batchSize)
.collect(Collectors.toList());

redisTemplate.opsForHash().delete(key, batchFields.toArray());
}

redisTemplate.delete(key);
System.out.println("已清理Hash大Key: " + key);
}

// 清理List类型大Key
private void cleanupListKey(String key) {
// 分批删除元素
int batchSize = 100;
Long listLength = redisTemplate.opsForList().size(key);

for (int i = 0; i < listLength; i += batchSize) {
redisTemplate.opsForList().trim(key, batchSize, -1);
}

redisTemplate.delete(key);
System.out.println("已清理List大Key: " + key);
}

// 清理Set类型大Key
private void cleanupSetKey(String key) {
// 分批删除元素
int batchSize = 100;
Long setSize = redisTemplate.opsForSet().size(key);

for (int i = 0; i < setSize; i += batchSize) {
Set<Object> members = redisTemplate.opsForSet().members(key);
List<Object> batchMembers = members.stream()
.limit(batchSize)
.collect(Collectors.toList());

redisTemplate.opsForSet().remove(key, batchMembers.toArray());
}

redisTemplate.delete(key);
System.out.println("已清理Set大Key: " + key);
}

// 清理ZSet类型大Key
private void cleanupZSetKey(String key) {
// 分批删除元素
int batchSize = 100;
Long zsetSize = redisTemplate.opsForZSet().size(key);

for (int i = 0; i < zsetSize; i += batchSize) {
Set<Object> members = redisTemplate.opsForZSet().range(key, 0, batchSize - 1);
redisTemplate.opsForZSet().remove(key, members.toArray());
}

redisTemplate.delete(key);
System.out.println("已清理ZSet大Key: " + key);
}

// 批量清理大Key
public void batchCleanupBigKeys(List<String> keys) {
for (String key : keys) {
cleanupBigKey(key);
}
}
}

6. 最佳实践总结

6.1 预防大Key产生

  1. 合理设计: 避免将大量数据存储在一个Key中
  2. 数据分片: 使用分片策略分散数据
  3. 定期检查: 定期检测和清理大Key
  4. 监控告警: 建立完善的监控和告警体系
  5. 代码审查: 在代码审查中关注大Key问题

6.2 大Key处理策略

  • 数据分片: 将大Key拆分成多个小Key
  • 数据压缩: 使用压缩算法减少数据大小
  • 过期策略: 设置合理的过期时间
  • 分批处理: 使用分批操作避免阻塞
  • 异步处理: 使用异步操作提高性能

6.3 监控运维

  • 实时监控: 实时监控大Key的产生
  • 定期检测: 定期检测现有的大Key
  • 告警机制: 建立完善的告警机制
  • 自动清理: 实现自动清理大Key的功能
  • 性能优化: 持续优化Redis性能

通过合理的预防措施和处理策略,可以有效解决Redis大Key问题,提升Redis的性能和稳定性。