1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
|
import redis import json import time from typing import Dict, List, Tuple
class RedisBigKeyDetector: def __init__(self, host: str, port: int, password: str = None): self.redis_client = redis.Redis( host=host, port=port, password=password, decode_responses=True ) self.big_keys = [] def detect_big_keys(self, threshold: int = 10240) -> List[Dict]: """检测大Key""" print("开始检测Redis大Key...") keys = self.redis_client.keys('*') total_keys = len(keys) for i, key in enumerate(keys): if i % 1000 == 0: print(f"进度: {i}/{total_keys}") key_info = self._analyze_key(key, threshold) if key_info: self.big_keys.append(key_info) return self.big_keys def _analyze_key(self, key: str, threshold: int) -> Dict: """分析单个Key""" key_type = self.redis_client.type(key) if key_type == 'string': return self._analyze_string_key(key, threshold) elif key_type == 'hash': return self._analyze_hash_key(key, threshold) elif key_type == 'list': return self._analyze_list_key(key, threshold) elif key_type == 'set': return self._analyze_set_key(key, threshold) elif key_type == 'zset': return self._analyze_zset_key(key, threshold) return None def _analyze_string_key(self, key: str, threshold: int) -> Dict: """分析String类型Key""" try: memory_usage = self.redis_client.memory_usage(key) if memory_usage > threshold: return { 'key': key, 'type': 'string', 'size': memory_usage, 'description': f'String类型,大小: {memory_usage} bytes' } except Exception as e: print(f"分析String Key {key} 失败: {e}") return None def _analyze_hash_key(self, key: str, threshold: int) -> Dict: """分析Hash类型Key""" try: field_count = self.redis_client.hlen(key) if field_count > 1000: sample_fields = self.redis_client.hscan(key, count=10)[1] avg_field_size = sum(len(str(k)) + len(str(v)) for k, v in sample_fields.items()) / len(sample_fields) estimated_size = field_count * avg_field_size return { 'key': key, 'type': 'hash', 'field_count': field_count, 'estimated_size': estimated_size, 'description': f'Hash类型,field数量: {field_count},估算大小: {estimated_size} bytes' } except Exception as e: print(f"分析Hash Key {key} 失败: {e}") return None def _analyze_list_key(self, key: str, threshold: int) -> Dict: """分析List类型Key""" try: list_length = self.redis_client.llen(key) if list_length > 1000: sample_elements = self.redis_client.lrange(key, 0, 9) avg_element_size = sum(len(str(elem)) for elem in sample_elements) / len(sample_elements) estimated_size = list_length * avg_element_size return { 'key': key, 'type': 'list', 'length': list_length, 'estimated_size': estimated_size, 'description': f'List类型,长度: {list_length},估算大小: {estimated_size} bytes' } except Exception as e: print(f"分析List Key {key} 失败: {e}") return None def _analyze_set_key(self, key: str, threshold: int) -> Dict: """分析Set类型Key""" try: set_size = self.redis_client.scard(key) if set_size > 1000: sample_elements = list(self.redis_client.sscan(key, count=10)[1]) avg_element_size = sum(len(str(elem)) for elem in sample_elements) / len(sample_elements) estimated_size = set_size * avg_element_size return { 'key': key, 'type': 'set', 'size': set_size, 'estimated_size': estimated_size, 'description': f'Set类型,大小: {set_size},估算大小: {estimated_size} bytes' } except Exception as e: print(f"分析Set Key {key} 失败: {e}") return None def _analyze_zset_key(self, key: str, threshold: int) -> Dict: """分析ZSet类型Key""" try: zset_size = self.redis_client.zcard(key) if zset_size > 1000: sample_elements = self.redis_client.zscan(key, count=10)[1] avg_element_size = sum(len(str(k)) + 8 for k, v in sample_elements) / len(sample_elements) estimated_size = zset_size * avg_element_size return { 'key': key, 'type': 'zset', 'size': zset_size, 'estimated_size': estimated_size, 'description': f'ZSet类型,大小: {zset_size},估算大小: {estimated_size} bytes' } except Exception as e: print(f"分析ZSet Key {key} 失败: {e}") return None def generate_report(self) -> str: """生成检测报告""" if not self.big_keys: return "未发现大Key" report = f"发现 {len(self.big_keys)} 个大Key:\n\n" for i, key_info in enumerate(self.big_keys, 1): report += f"{i}. {key_info['description']}\n" return report
if __name__ == "__main__": detector = RedisBigKeyDetector("127.0.0.1", 6379, "password") big_keys = detector.detect_big_keys(threshold=10240) report = detector.generate_report() print(report)
|