第343集Redis LargeKey架构实战:内存不均衡、大数据扫描与企业级性能优化完整解决方案 | 字数总计: 3.6k | 阅读时长: 18分钟 | 阅读量:
Redis LargeKey架构实战:内存不均衡、大数据扫描与企业级性能优化完整解决方案 一、LargeKey问题概述 1.1 LargeKey的定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 LargeKey分类: 按大小分类: - 单个key超过10KB - List/Set/ZSet元素超过1000个 - Hash字段超过100个 按类型分类: - String: 值过大(>10KB) - List: 元素过多(>1000) - Set: 元素过多(>1000) - Hash: 字段过多(>100) - ZSet: 成员过多(>1000) 常见场景: - 大对象序列化 - 未拆分的数据结构 - 缓存整个列表/集合
1.2 LargeKey的危害 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 LargeKey带来的问题: 内存问题: - 内存占用过大 - 导致OOM - 内存碎片化 性能问题: - 阻塞Redis服务器 - 操作耗时增加 - 网络带宽占用 主从复制: - 延迟增加 - 网络阻塞 - 磁盘IO压力 持久化影响: - RDB/AOF文件过大 - 备份恢复困难
二、LargeKey检测 2.1 使用redis-cli检测 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 redis-cli --bigkeys redis-cli -h 127.0.0.1 -p 6379 -a password --bigkeys redis-cli --bigkeys --i 0.1 redis-cli --bigkeys --no-raw
2.2 使用Python脚本检测 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 import redisimport sysfrom collections import defaultdictclass LargeKeyDetector : def __init__ (self, host='localhost' , port=6379 , password=None , db=0 ): self.client = redis.Redis(host=host, port=port, password=password, db=db) def get_memory_usage (self, key ): """获取key的内存使用""" try : memory = self.client.memory_usage(key) return memory except redis.exceptions.ResponseError: return self._estimate_size(key) def _estimate_size (self, key ): """估算key的大小""" key_type = self.client.type (key) size = 0 if key_type == b'string' : value = self.client.get(key) size = len (value) if value else 0 elif key_type == b'list' : size = self.client.llen(key) * 100 elif key_type == b'set' : size = self.client.scard(key) * 100 elif key_type == b'zset' : size = self.client.zcard(key) * 150 elif key_type == b'hash' : size = self.client.hlen(key) * 100 return size def scan_all_keys (self, pattern='*' , count=1000 ): """扫描所有key""" cursor = 0 total_keys = 0 large_keys = [] while True : cursor, keys = self.client.scan(cursor, match =pattern, count=count) for key in keys: size = self.get_memory_usage(key) total_keys += 1 if size > 10240 : large_keys.append({ 'key' : key, 'size' : size, 'type' : self.client.type (key).decode() }) if total_keys % 1000 == 0 : print (f"已扫描: {total_keys} keys, 发现large keys: {len (large_keys)} " ) if cursor == 0 : break return large_keys def detect (self ): """检测LargeKey""" print ("开始检测LargeKey..." ) large_keys = self.scan_all_keys() large_keys.sort(key=lambda x: x['size' ], reverse=True ) print (f"\n共发现 {len (large_keys)} 个LargeKey:" ) print ("-" * 80 ) for item in large_keys[:20 ]: size_kb = item['size' ] / 1024 size_mb = item['size' ] / 1024 / 1024 size_str = f"{size_mb:.2 f} MB" if size_mb > 1 else f"{size_kb:.2 f} KB" print (f"Key: {item['key' ]} " ) print (f" Type: {item['type' ]} " ) print (f" Size: {size_str} ({item['size' ]} bytes)" ) print () if __name__ == '__main__' : detector = LargeKeyDetector(host='127.0.0.1' , port=6379 ) detector.detect()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 pip install rdbtools python-lzf redis-cli --rdb dump.rdb rdb -c memory dump.rdb > memory.csv cat memory.csv | head -n 1cat memory.csv | sort -t',' -k4 -nr | head -n 10cat memory.csv | grep "^0,string" | awk -F',' '{sum+=$4} END {print sum}'
2.4 使用RedisInsight检测 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 RedisInsight工具: 可视化界面: - 连接Redis实例 - 浏览keyspace - 查看key详情 LargeKey检测: - 自动扫描 - 内存使用分析 - 统计报表 使用步骤: 1 . 安装RedisInsight 2 . 添加连接 3 . 选择Database Analysis 4 . 查看报告
三、scan命令优化 3.1 scan命令基础 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import redisclient = redis.Redis(host='127.0.0.1' , port=6379 ) def basic_scan (): cursor = 0 keys = [] while True : cursor, batch = client.scan(cursor, match ='user:*' , count=100 ) keys.extend(batch) if cursor == 0 : break print (f"Found {len (keys)} keys" ) return keys def scan_iter_pattern (): keys = [] for key in client.scan_iter(match ='user:*' , count=100 ): keys.append(key) print (f"Found {len (keys)} keys" ) return keys def scan_by_type (type_name ): keys = [] for key in client.scan_iter(match ='*' , count=100 ): if client.type (key).decode() == type_name: keys.append(key) return keys
3.2 scan优化策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import redisimport timefrom concurrent.futures import ThreadPoolExecutorclass OptimizedScanner : def __init__ (self, host='127.0.0.1' , port=6379 ): self.client = redis.Redis(host=host, port=port) def scan_with_pipeline (self, pattern='*' , batch_size=100 ): """使用管道优化scan""" pipeline = self.client.pipeline() cursor = 0 while True : cursor, keys = self.client.scan( cursor, match =pattern, count=batch_size ) if keys: pipeline.get_many(keys) if cursor == 0 : break return pipeline.execute() def parallel_scan (self, patterns, max_workers=4 ): """并行扫描多个模式""" def scan_pattern (pattern ): keys = [] for key in self.client.scan_iter(match =pattern, count=100 ): keys.append(key) return keys with ThreadPoolExecutor(max_workers=max_workers) as executor: results = executor.map (scan_pattern, patterns) return list (results) def batch_process (self, pattern='*' , batch_size=1000 , processor=None ): """批量处理scan结果""" batch = [] for key in self.client.scan_iter(match =pattern, count=100 ): batch.append(key) if len (batch) >= batch_size: if processor: processor(batch) batch = [] if batch and processor: processor(batch)
3.3 分片扫描 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import redisfrom rediscluster import RedisClusterclass ShardedScanner : def __init__ (self, nodes ): self.client = RedisCluster( startup_nodes=nodes, decode_responses=True ) def scan_cluster (self, pattern='*' ): """扫描Redis Cluster的所有keys""" all_keys = [] for node in self.client.get_primaries(): for key in self.client.scan_iter(match =pattern, _client=node): all_keys.append(key) return all_keys def get_keys_per_slot (self, slot ): """获取特定slot的所有keys""" keys = [] for key in self.client.scan_iter(count=100 ): if self.client.cluster_keyslot(key) == slot: keys.append(key) return keys
四、LargeKey处理方案 4.1 大String拆分 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import redisclass LargeStringHandler : def __init__ (self, client ): self.client = client self.chunk_size = 1024 def set_large_string (self, key, value ): """将大String拆分为多个key""" chunks = [] num_chunks = (len (value) + self.chunk_size - 1 ) // self.chunk_size self.client.set (f"{key} :metadata" , num_chunks) for i in range (num_chunks): start = i * self.chunk_size end = start + self.chunk_size chunk_key = f"{key} :chunk:{i} " self.client.set (chunk_key, value[start:end]) chunks.append(chunk_key) return chunks def get_large_string (self, key ): """重组大String""" metadata = self.client.get(f"{key} :metadata" ) if not metadata: return None num_chunks = int (metadata) chunks = [] for i in range (num_chunks): chunk_key = f"{key} :chunk:{i} " chunk = self.client.get(chunk_key) if chunk: chunks.append(chunk) return '' .join(chunks)
4.2 大List拆分 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class LargeListHandler : def __init__ (self, client ): self.client = client self.chunk_size = 100 def lpush_chunked (self, key, *values ): """将List拆分为多个List""" chunks = [] for i in range (0 , len (values), self.chunk_size): chunk = values[i:i + self.chunk_size] chunks.append(chunk) for i, chunk in enumerate (chunks): list_key = f"{key} :{i} " self.client.lpush(list_key, *chunk) def lrange_chunked (self, key, start=0 , end=-1 ): """从拆分后的List读取""" all_items = [] for i in range (start, end + 1 ): chunk_index = i // self.chunk_size item_index = i % self.chunk_size list_key = f"{key} :{chunk_index} " item = self.client.lindex(list_key, item_index) all_items.append(item) return all_items
4.3 大Hash拆分 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class LargeHashHandler : def __init__ (self, client ): self.client = client self.fields_per_hash = 100 def hset_chunked (self, key, mapping ): """将Hash拆分为多个Hash""" fields = list (mapping.items()) for i in range (0 , len (fields), self.fields_per_hash): chunk = dict (fields[i:i + self.fields_per_hash]) hash_key = f"{key} :{i // self.fields_per_hash} " self.client.hset(hash_key, mapping=chunk) def hgetall_chunked (self, key ): """从拆分的Hash读取所有字段""" all_fields = {} i = 0 while True : hash_key = f"{key} :{i} " fields = self.client.hgetall(hash_key) if not fields: break all_fields.update(fields) i += 1 return all_fields
五、内存优化 5.1 内存不均衡分析 1 2 3 4 5 6 7 8 9 10 11 redis-cli -h 192.168.1.10 -p 7001 info memory for i in 7001 7002 7003; do echo "Node $i :" redis-cli -h 192.168.1.10 -p $i dbsize done redis-cli --bigkeys --i 0.1 > /tmp/bigkeys_output.txt
5.2 内存碎片整理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import redisclass MemoryOptimizer : def __init__ (self, client ): self.client = client def get_memory_info (self ): """获取内存信息""" info = self.client.info('memory' ) return { 'used_memory' : info['used_memory_human' ], 'used_memory_peak' : info['used_memory_peak_human' ], 'mem_fragmentation_ratio' : info['mem_fragmentation_ratio' ], 'total_system_memory' : info['total_system_memory_human' ] } def check_fragmentation (self ): """检查内存碎片""" info = self.client.info('memory' ) ratio = float (info['mem_fragmentation_ratio' ]) if ratio > 1.5 : print (f"内存碎片率过高: {ratio} " ) return True return False def memory_purge (self ): """内存清理""" try : self.client.execute_command('MEMORY PURGE' ) print ("内存清理完成" ) except redis.exceptions.ResponseError as e: print (f"不支持MEMORY PURGE: {e} " ) def defrag_manual (self ): """手动碎片整理(重启Redis)""" self.client.bgsave() print ("已生成RDB文件,可通过重启进行内存整理" )
5.3 主动清理LargeKey 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class LargeKeyCleaner : def __init__ (self, client ): self.client = client def delete_large_key (self, key ): """删除LargeKey""" try : self.client.unlink(key) except redis.exceptions.ResponseError: self.client.delete(key) def cleanup_by_pattern (self, pattern='temp:*' , limit=1000 ): """按模式清理""" deleted_count = 0 for key in self.client.scan_iter(match =pattern, count=100 ): self.delete_large_key(key) deleted_count += 1 if deleted_count >= limit: break return deleted_count def expire_large_keys (self, seconds=3600 ): """为LargeKey设置过期时间""" large_keys = self.get_large_keys() for key_info in large_keys: key = key_info['key' ] if not self.client.ttl(key): self.client.expire(key, seconds)
六、带宽优化 6.1 压缩传输 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import gzipimport jsonimport redisclass CompressedRedis : def __init__ (self, client ): self.client = client def set_compressed (self, key, value ): """存储压缩数据""" serialized = json.dumps(value).encode('utf-8' ) compressed = gzip.compress(serialized) self.client.set (key, compressed) def get_compressed (self, key ): """读取压缩数据""" compressed = self.client.get(key) if not compressed: return None decompressed = gzip.decompress(compressed) return json.loads(decompressed.decode('utf-8' ))
6.2 批量操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class BatchOperator : def __init__ (self, client ): self.client = client def batch_get (self, keys ): """批量获取""" pipeline = self.client.pipeline() for key in keys: pipeline.get(key) return pipeline.execute() def batch_set (self, mapping ): """批量设置""" pipeline = self.client.pipeline() for key, value in mapping.items(): pipeline.set (key, value) pipeline.execute() def mget (self, keys ): """使用MGET(更高效)""" return self.client.mget(keys)
6.3 Pipeline优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class PipelineOptimizer : def __init__ (self, client, pipeline_size=100 ): self.client = client self.pipeline_size = pipeline_size def batch_scan (self, pattern='*' ): """批量scan并处理""" pipeline = self.client.pipeline() batch_count = 0 results = [] for key in self.client.scan_iter(match =pattern, count=100 ): pipeline.get(key) batch_count += 1 if batch_count >= self.pipeline_size: batch_results = pipeline.execute() results.extend(batch_results) batch_count = 0 pipeline = self.client.pipeline() if batch_count > 0 : batch_results = pipeline.execute() results.extend(batch_results) return results
七、监控和告警 7.1 监控LargeKey 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 import redisimport timefrom datetime import datetimeclass LargeKeyMonitor : def __init__ (self, client ): self.client = client self.large_key_threshold = 10240 def monitor_large_keys (self, interval=300 ): """定期监控LargeKey""" while True : self.scan_and_report() time.sleep(interval) def scan_and_report (self ): """扫描并报告""" large_keys = [] cursor = 0 start_time = time.time() while True : cursor, keys = self.client.scan(cursor, count=100 ) for key in keys: size = self.get_key_size(key) if size > self.large_key_threshold: large_keys.append({ 'key' : key, 'size' : size, 'time' : datetime.now().isoformat() }) if cursor == 0 : break elapsed_time = time.time() - start_time self.generate_report(large_keys, elapsed_time) def get_key_size (self, key ): """获取key大小""" try : return self.client.memory_usage(key) except : key_type = self.client.type (key) if key_type == b'string' : return len (self.client.get(key) or b'' ) elif key_type == b'list' : return self.client.llen(key) * 50 elif key_type == b'set' : return self.client.scard(key) * 50 return 0 def generate_report (self, large_keys, elapsed_time ): """生成报告""" print (f"\n=== LargeKey监控报告 ===" ) print (f"扫描耗时: {elapsed_time:.2 f} 秒" ) print (f"发现LargeKey数量: {len (large_keys)} " ) if large_keys: print ("\nTop 10 Largest Keys:" ) for i, item in enumerate (large_keys[:10 ], 1 ): size_kb = item['size' ] / 1024 print (f"{i} . {item['key' ]} : {size_kb:.2 f} KB" ) if len (large_keys) > 100 : self.send_alert(large_keys)
7.2 Prometheus监控 1 2 3 4 5 6 7 8 9 10 11 wget https://github.com/oliver006/redis_exporter/releases/download/v1.36.0/redis_exporter-v1.36.0.linux-amd64.tar.gz tar -xzf redis_exporter-v1.36.0.linux-amd64.tar.gz ./redis_exporter --redis.addr=localhost:6379 scrape_configs: - job_name: 'redis' static_configs: - targets: ['localhost:9121' ]
八、最佳实践 8.1 LargeKey预防 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 预防措施: 1 . 设计合理的key结构: - 避免存储大对象 - 拆分为多个key - 使用合理的数据结构 2 . 定期清理: - 设置过期时间 - 定期清理无用key - 监控key大小 3 . 限制写入: - 限制单个value大小 - 限制集合元素数量 - 使用合理的数据结构 4 . 压缩数据: - 序列化优化 - 使用压缩算法 - 减少重复数据
8.2 优化建议 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class RedisOptimizer : """Redis优化器""" @staticmethod def optimize_large_string (original_value ): """优化大String""" compressed = gzip.compress(original_value.encode()) if len (compressed) > 10240 : return "SPLIT_CHUNKS" return compressed @staticmethod def optimize_large_list (list_items ): """优化大List""" ids = [item.get('id' ) for item in list_items] return ids @staticmethod def optimize_large_hash (hash_data ): """优化大Hash""" chunks = {} chunk_size = 50 items = list (hash_data.items()) for i in range (0 , len (items), chunk_size): chunk = dict (items[i:i + chunk_size]) chunks[f"chunk_{i // chunk_size} " ] = chunk return chunks
九、总结 本文介绍了Redis LargeKey检测与优化方案:
核心要点
LargeKey检测 :--bigkeys、Python扫描
scan命令优化 :分片、Pipeline
拆分策略 :大String/Hash/List拆分
内存优化 :碎片整理、清理
带宽优化 :压缩、批量、Pipeline
监控告警 :持续监控与告警
技术要点
检测工具 :--bigkeys、rdb-tools、RedisInsight
scan命令 :非阻塞、Cursor、模式匹配
拆分方案 :按大小、按数量、分区存储
性能优化 :Pipeline、批量、异步删除
监控 :定期扫描、持续监控
实践建议
定期使用 --bigkeys 扫描
对大于10KB的key拆分优化
用SCAN替代KEYS,配合Pipeline降延迟
设置TTL与定期清理,关注碎片与压缩
持续监控并配置告警
通过上述措施可有效减少LargeKey风险并提升性能。