Redis LargeKey架构实战:内存不均衡、大数据扫描与企业级性能优化完整解决方案

一、LargeKey问题概述

1.1 LargeKey的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
LargeKey分类:
按大小分类:
- 单个key超过10KB
- List/Set/ZSet元素超过1000个
- Hash字段超过100个

按类型分类:
- String: 值过大(>10KB)
- List: 元素过多(>1000)
- Set: 元素过多(>1000)
- Hash: 字段过多(>100)
- ZSet: 成员过多(>1000)

常见场景:
- 大对象序列化
- 未拆分的数据结构
- 缓存整个列表/集合

1.2 LargeKey的危害

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
LargeKey带来的问题:
内存问题:
- 内存占用过大
- 导致OOM
- 内存碎片化

性能问题:
- 阻塞Redis服务器
- 操作耗时增加
- 网络带宽占用

主从复制:
- 延迟增加
- 网络阻塞
- 磁盘IO压力

持久化影响:
- RDB/AOF文件过大
- 备份恢复困难

二、LargeKey检测

2.1 使用redis-cli检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 方法1: 使用redis-cli --bigkeys
redis-cli --bigkeys

# 示例输出
# -------- summary -------
# Sampled 100000 keys in the keyspace!
# Total key length in bytes is 1114003 (avg len 11.14)

# Biggest string found 'product:12345:details' has 512345 bytes
# Biggest list found 'order:queue' has 10000 items
# Biggest set found 'tags:popular' has 50000 members
# Biggest hash found 'user:12345:profile' has 200 fields
# Biggest zset found 'leaderboard' has 100000 members

# 12345 strings with 3456 bytes (12.35% of keys, avg size 0.03)
# 123 list(s) with 1234 items (0.12% of keys, avg size 10.04)
# 456 set(s) with 5678 members (0.46% of keys, avg size 12.45)
# 789 hash(es) with 9012 fields (0.79% of keys, avg size 11.43)
# 234 zset(s) with 3456 members (0.23% of keys, avg size 14.78)

# 方法2: 指定host和密码
redis-cli -h 127.0.0.1 -p 6379 -a password --bigkeys

# 方法3: 指定sample数量(更快)
redis-cli --bigkeys --i 0.1

# 方法4: JSON格式输出
redis-cli --bigkeys --no-raw

2.2 使用Python脚本检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python
# redis_largekey_detector.py

import redis
import sys
from collections import defaultdict

class LargeKeyDetector:
def __init__(self, host='localhost', port=6379, password=None, db=0):
self.client = redis.Redis(host=host, port=port, password=password, db=db)

def get_memory_usage(self, key):
"""获取key的内存使用"""
try:
memory = self.client.memory_usage(key)
return memory
except redis.exceptions.ResponseError:
# 某些Redis版本不支持memory usage命令
return self._estimate_size(key)

def _estimate_size(self, key):
"""估算key的大小"""
key_type = self.client.type(key)
size = 0

if key_type == b'string':
value = self.client.get(key)
size = len(value) if value else 0

elif key_type == b'list':
size = self.client.llen(key) * 100 # 估算每个元素100字节

elif key_type == b'set':
size = self.client.scard(key) * 100

elif key_type == b'zset':
size = self.client.zcard(key) * 150

elif key_type == b'hash':
size = self.client.hlen(key) * 100

return size

def scan_all_keys(self, pattern='*', count=1000):
"""扫描所有key"""
cursor = 0
total_keys = 0
large_keys = []

while True:
cursor, keys = self.client.scan(cursor, match=pattern, count=count)

for key in keys:
size = self.get_memory_usage(key)
total_keys += 1

if size > 10240: # 大于10KB
large_keys.append({
'key': key,
'size': size,
'type': self.client.type(key).decode()
})

if total_keys % 1000 == 0:
print(f"已扫描: {total_keys} keys, 发现large keys: {len(large_keys)}")

if cursor == 0:
break

return large_keys

def detect(self):
"""检测LargeKey"""
print("开始检测LargeKey...")
large_keys = self.scan_all_keys()

# 按大小排序
large_keys.sort(key=lambda x: x['size'], reverse=True)

print(f"\n共发现 {len(large_keys)} 个LargeKey:")
print("-" * 80)

for item in large_keys[:20]: # 只显示前20个
size_kb = item['size'] / 1024
size_mb = item['size'] / 1024 / 1024
size_str = f"{size_mb:.2f} MB" if size_mb > 1 else f"{size_kb:.2f} KB"

print(f"Key: {item['key']}")
print(f" Type: {item['type']}")
print(f" Size: {size_str} ({item['size']} bytes)")
print()

if __name__ == '__main__':
detector = LargeKeyDetector(host='127.0.0.1', port=6379)
detector.detect()

2.3 使用rdb-tools分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 安装rdb-tools
pip install rdbtools python-lzf

# 生成Redis RDB文件
redis-cli --rdb dump.rdb

# 分析RDB文件
rdb -c memory dump.rdb > memory.csv

# 查看内存使用情况
cat memory.csv | head -n 1
# database,type,key,size_in_bytes,encoding,num_elements,len_largest_element

# 排序找出最大的key
cat memory.csv | sort -t',' -k4 -nr | head -n 10

# 按类型统计
cat memory.csv | grep "^0,string" | awk -F',' '{sum+=$4} END {print sum}'

2.4 使用RedisInsight检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
RedisInsight工具:
可视化界面:
- 连接Redis实例
- 浏览keyspace
- 查看key详情

LargeKey检测:
- 自动扫描
- 内存使用分析
- 统计报表

使用步骤:
1. 安装RedisInsight
2. 添加连接
3. 选择Database Analysis
4. 查看报告

三、scan命令优化

3.1 scan命令基础

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# scan_basic.py
import redis

client = redis.Redis(host='127.0.0.1', port=6379)

# 基本的scan
def basic_scan():
cursor = 0
keys = []

while True:
cursor, batch = client.scan(cursor, match='user:*', count=100)
keys.extend(batch)

if cursor == 0:
break

print(f"Found {len(keys)} keys")
return keys

# 使用scan_iter(推荐)
def scan_iter_pattern():
keys = []
for key in client.scan_iter(match='user:*', count=100):
keys.append(key)

print(f"Found {len(keys)} keys")
return keys

# 按类型扫描
def scan_by_type(type_name):
keys = []
for key in client.scan_iter(match='*', count=100):
if client.type(key).decode() == type_name:
keys.append(key)

return keys

3.2 scan优化策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# optimized_scan.py
import redis
import time
from concurrent.futures import ThreadPoolExecutor

class OptimizedScanner:
def __init__(self, host='127.0.0.1', port=6379):
self.client = redis.Redis(host=host, port=port)

def scan_with_pipeline(self, pattern='*', batch_size=100):
"""使用管道优化scan"""
pipeline = self.client.pipeline()
cursor = 0

while True:
cursor, keys = self.client.scan(
cursor,
match=pattern,
count=batch_size
)

# 使用管道批量获取key值
if keys:
pipeline.get_many(keys)

if cursor == 0:
break

return pipeline.execute()

def parallel_scan(self, patterns, max_workers=4):
"""并行扫描多个模式"""
def scan_pattern(pattern):
keys = []
for key in self.client.scan_iter(match=pattern, count=100):
keys.append(key)
return keys

with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = executor.map(scan_pattern, patterns)

return list(results)

def batch_process(self, pattern='*', batch_size=1000, processor=None):
"""批量处理scan结果"""
batch = []

for key in self.client.scan_iter(match=pattern, count=100):
batch.append(key)

if len(batch) >= batch_size:
if processor:
processor(batch)
batch = []

# 处理剩余的keys
if batch and processor:
processor(batch)

3.3 分片扫描

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# sharded_scan.py
import redis
from rediscluster import RedisCluster

class ShardedScanner:
def __init__(self, nodes):
self.client = RedisCluster(
startup_nodes=nodes,
decode_responses=True
)

def scan_cluster(self, pattern='*'):
"""扫描Redis Cluster的所有keys"""
all_keys = []

# 获取所有master节点
for node in self.client.get_primaries():
for key in self.client.scan_iter(match=pattern, _client=node):
all_keys.append(key)

return all_keys

def get_keys_per_slot(self, slot):
"""获取特定slot的所有keys"""
keys = []
for key in self.client.scan_iter(count=100):
if self.client.cluster_keyslot(key) == slot:
keys.append(key)
return keys

四、LargeKey处理方案

4.1 大String拆分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# split_large_string.py
import redis

class LargeStringHandler:
def __init__(self, client):
self.client = client
self.chunk_size = 1024 # 1KB per chunk

def set_large_string(self, key, value):
"""将大String拆分为多个key"""
chunks = []

# 计算需要多少chunks
num_chunks = (len(value) + self.chunk_size - 1) // self.chunk_size

# 存储metadata
self.client.set(f"{key}:metadata", num_chunks)

# 存储chunks
for i in range(num_chunks):
start = i * self.chunk_size
end = start + self.chunk_size
chunk_key = f"{key}:chunk:{i}"
self.client.set(chunk_key, value[start:end])
chunks.append(chunk_key)

return chunks

def get_large_string(self, key):
"""重组大String"""
metadata = self.client.get(f"{key}:metadata")
if not metadata:
return None

num_chunks = int(metadata)
chunks = []

for i in range(num_chunks):
chunk_key = f"{key}:chunk:{i}"
chunk = self.client.get(chunk_key)
if chunk:
chunks.append(chunk)

return ''.join(chunks)

4.2 大List拆分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# split_large_list.py
class LargeListHandler:
def __init__(self, client):
self.client = client
self.chunk_size = 100

def lpush_chunked(self, key, *values):
"""将List拆分为多个List"""
# 分组
chunks = []
for i in range(0, len(values), self.chunk_size):
chunk = values[i:i + self.chunk_size]
chunks.append(chunk)

# 使用多个key
for i, chunk in enumerate(chunks):
list_key = f"{key}:{i}"
self.client.lpush(list_key, *chunk)

def lrange_chunked(self, key, start=0, end=-1):
"""从拆分后的List读取"""
# 找出涉及的chunks
# ... 实现逻辑

all_items = []
for i in range(start, end + 1):
chunk_index = i // self.chunk_size
item_index = i % self.chunk_size
list_key = f"{key}:{chunk_index}"
item = self.client.lindex(list_key, item_index)
all_items.append(item)

return all_items

4.3 大Hash拆分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# split_large_hash.py
class LargeHashHandler:
def __init__(self, client):
self.client = client
self.fields_per_hash = 100

def hset_chunked(self, key, mapping):
"""将Hash拆分为多个Hash"""
fields = list(mapping.items())

for i in range(0, len(fields), self.fields_per_hash):
chunk = dict(fields[i:i + self.fields_per_hash])
hash_key = f"{key}:{i // self.fields_per_hash}"
self.client.hset(hash_key, mapping=chunk)

def hgetall_chunked(self, key):
"""从拆分的Hash读取所有字段"""
all_fields = {}
i = 0

while True:
hash_key = f"{key}:{i}"
fields = self.client.hgetall(hash_key)

if not fields:
break

all_fields.update(fields)
i += 1

return all_fields

五、内存优化

5.1 内存不均衡分析

1
2
3
4
5
6
7
8
9
10
11
# 查看每个节点的内存使用
redis-cli -h 192.168.1.10 -p 7001 info memory

# 集群模式下查看各个节点的keys数量
for i in 7001 7002 7003; do
echo "Node $i:"
redis-cli -h 192.168.1.10 -p $i dbsize
done

# 查看大key的分布
redis-cli --bigkeys --i 0.1 > /tmp/bigkeys_output.txt

5.2 内存碎片整理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# memory_optimization.py
import redis

class MemoryOptimizer:
def __init__(self, client):
self.client = client

def get_memory_info(self):
"""获取内存信息"""
info = self.client.info('memory')
return {
'used_memory': info['used_memory_human'],
'used_memory_peak': info['used_memory_peak_human'],
'mem_fragmentation_ratio': info['mem_fragmentation_ratio'],
'total_system_memory': info['total_system_memory_human']
}

def check_fragmentation(self):
"""检查内存碎片"""
info = self.client.info('memory')
ratio = float(info['mem_fragmentation_ratio'])

if ratio > 1.5:
print(f"内存碎片率过高: {ratio}")
return True

return False

def memory_purge(self):
"""内存清理"""
# 使用MEMORY PURGE命令(需要Redis 4.0+)
try:
self.client.execute_command('MEMORY PURGE')
print("内存清理完成")
except redis.exceptions.ResponseError as e:
print(f"不支持MEMORY PURGE: {e}")

def defrag_manual(self):
"""手动碎片整理(重启Redis)"""
# 生成RDB文件
self.client.bgsave()
print("已生成RDB文件,可通过重启进行内存整理")

5.3 主动清理LargeKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# cleanup_largekey.py
class LargeKeyCleaner:
def __init__(self, client):
self.client = client

def delete_large_key(self, key):
"""删除LargeKey"""
# 方法1: 直接删除(阻塞)
# self.client.delete(key)

# 方法2: 异步删除(Redis 4.0+,推荐)
try:
self.client.unlink(key)
except redis.exceptions.ResponseError:
# 不支持unlink,使用delete
self.client.delete(key)

def cleanup_by_pattern(self, pattern='temp:*', limit=1000):
"""按模式清理"""
deleted_count = 0

for key in self.client.scan_iter(match=pattern, count=100):
self.delete_large_key(key)
deleted_count += 1

if deleted_count >= limit:
break

return deleted_count

def expire_large_keys(self, seconds=3600):
"""为LargeKey设置过期时间"""
large_keys = self.get_large_keys()

for key_info in large_keys:
key = key_info['key']
if not self.client.ttl(key):
self.client.expire(key, seconds)

六、带宽优化

6.1 压缩传输

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# compression.py
import gzip
import json
import redis

class CompressedRedis:
def __init__(self, client):
self.client = client

def set_compressed(self, key, value):
"""存储压缩数据"""
# 序列化
serialized = json.dumps(value).encode('utf-8')

# 压缩
compressed = gzip.compress(serialized)

# 存储
self.client.set(key, compressed)

def get_compressed(self, key):
"""读取压缩数据"""
compressed = self.client.get(key)
if not compressed:
return None

# 解压
decompressed = gzip.decompress(compressed)

# 反序列化
return json.loads(decompressed.decode('utf-8'))

6.2 批量操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# batch_operations.py
class BatchOperator:
def __init__(self, client):
self.client = client

def batch_get(self, keys):
"""批量获取"""
pipeline = self.client.pipeline()
for key in keys:
pipeline.get(key)
return pipeline.execute()

def batch_set(self, mapping):
"""批量设置"""
pipeline = self.client.pipeline()
for key, value in mapping.items():
pipeline.set(key, value)
pipeline.execute()

def mget(self, keys):
"""使用MGET(更高效)"""
return self.client.mget(keys)

6.3 Pipeline优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# pipeline_optimization.py
class PipelineOptimizer:
def __init__(self, client, pipeline_size=100):
self.client = client
self.pipeline_size = pipeline_size

def batch_scan(self, pattern='*'):
"""批量scan并处理"""
pipeline = self.client.pipeline()
batch_count = 0
results = []

for key in self.client.scan_iter(match=pattern, count=100):
pipeline.get(key)
batch_count += 1

if batch_count >= self.pipeline_size:
batch_results = pipeline.execute()
results.extend(batch_results)
batch_count = 0
pipeline = self.client.pipeline()

# 处理剩余
if batch_count > 0:
batch_results = pipeline.execute()
results.extend(batch_results)

return results

七、监控和告警

7.1 监控LargeKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# largekey_monitor.py
import redis
import time
from datetime import datetime

class LargeKeyMonitor:
def __init__(self, client):
self.client = client
self.large_key_threshold = 10240 # 10KB

def monitor_large_keys(self, interval=300):
"""定期监控LargeKey"""
while True:
self.scan_and_report()
time.sleep(interval)

def scan_and_report(self):
"""扫描并报告"""
large_keys = []
cursor = 0

start_time = time.time()

while True:
cursor, keys = self.client.scan(cursor, count=100)

for key in keys:
size = self.get_key_size(key)

if size > self.large_key_threshold:
large_keys.append({
'key': key,
'size': size,
'time': datetime.now().isoformat()
})

if cursor == 0:
break

elapsed_time = time.time() - start_time

# 生成报告
self.generate_report(large_keys, elapsed_time)

def get_key_size(self, key):
"""获取key大小"""
try:
return self.client.memory_usage(key)
except:
# 估算
key_type = self.client.type(key)
if key_type == b'string':
return len(self.client.get(key) or b'')
elif key_type == b'list':
return self.client.llen(key) * 50
elif key_type == b'set':
return self.client.scard(key) * 50
# ... 其他类型
return 0

def generate_report(self, large_keys, elapsed_time):
"""生成报告"""
print(f"\n=== LargeKey监控报告 ===")
print(f"扫描耗时: {elapsed_time:.2f}秒")
print(f"发现LargeKey数量: {len(large_keys)}")

if large_keys:
print("\nTop 10 Largest Keys:")
for i, item in enumerate(large_keys[:10], 1):
size_kb = item['size'] / 1024
print(f"{i}. {item['key']}: {size_kb:.2f} KB")

# 可以发送告警
if len(large_keys) > 100:
self.send_alert(large_keys)

7.2 Prometheus监控

1
2
3
4
5
6
7
8
9
10
11
# redis_exporter配置
# 安装redis_exporter
wget https://github.com/oliver006/redis_exporter/releases/download/v1.36.0/redis_exporter-v1.36.0.linux-amd64.tar.gz
tar -xzf redis_exporter-v1.36.0.linux-amd64.tar.gz
./redis_exporter --redis.addr=localhost:6379

# prometheus配置
scrape_configs:
- job_name: 'redis'
static_configs:
- targets: ['localhost:9121']

八、最佳实践

8.1 LargeKey预防

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
预防措施:
1. 设计合理的key结构:
- 避免存储大对象
- 拆分为多个key
- 使用合理的数据结构

2. 定期清理:
- 设置过期时间
- 定期清理无用key
- 监控key大小

3. 限制写入:
- 限制单个value大小
- 限制集合元素数量
- 使用合理的数据结构

4. 压缩数据:
- 序列化优化
- 使用压缩算法
- 减少重复数据

8.2 优化建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 优化示例
class RedisOptimizer:
"""Redis优化器"""

@staticmethod
def optimize_large_string(original_value):
"""优化大String"""
# 1. 压缩
compressed = gzip.compress(original_value.encode())

# 2. 如果还是太大,拆分
if len(compressed) > 10240: # 10KB
return "SPLIT_CHUNKS"

return compressed

@staticmethod
def optimize_large_list(list_items):
"""优化大List"""
# 1. 只存储必要的字段
# 2. 使用编码优化
# 3. 拆分存储

# 示例: 只存储ID,详细数据存到其他key
ids = [item.get('id') for item in list_items]
return ids

@staticmethod
def optimize_large_hash(hash_data):
"""优化大Hash"""
# 拆分Hash
chunks = {}
chunk_size = 50

items = list(hash_data.items())
for i in range(0, len(items), chunk_size):
chunk = dict(items[i:i + chunk_size])
chunks[f"chunk_{i // chunk_size}"] = chunk

return chunks

九、总结

本文介绍了Redis LargeKey检测与优化方案:

核心要点

  1. LargeKey检测--bigkeys、Python扫描
  2. scan命令优化:分片、Pipeline
  3. 拆分策略:大String/Hash/List拆分
  4. 内存优化:碎片整理、清理
  5. 带宽优化:压缩、批量、Pipeline
  6. 监控告警:持续监控与告警

技术要点

  • 检测工具--bigkeysrdb-tools、RedisInsight
  • scan命令:非阻塞、Cursor、模式匹配
  • 拆分方案:按大小、按数量、分区存储
  • 性能优化:Pipeline、批量、异步删除
  • 监控:定期扫描、持续监控

实践建议

  1. 定期使用 --bigkeys 扫描
  2. 对大于10KB的key拆分优化
  3. 用SCAN替代KEYS,配合Pipeline降延迟
  4. 设置TTL与定期清理,关注碎片与压缩
  5. 持续监控并配置告警

通过上述措施可有效减少LargeKey风险并提升性能。