
|
import redis import json import time from typing import Dict, List, Tuple
class RedisBigKeyDetector: def __init__(self, host: str, port: int, password: str = None): self.redis_client = redis.Redis( host=host, port=port, password=password, decode_responses=True ) self.big_keys = [] def detect_big_keys(self, threshold: int = 10240) -> List[Dict]: """检测大Key""" print("开始检测Redis大Key...") keys = self.redis_client.keys('*') total_keys = len(keys) for i, key in enumerate(keys): if i % 1000 == 0: print(f"进度: {i}/{total_keys}") key_info = self._analyze_key(key, threshold) if key_info: self.big_keys.append(key_info) return self.big_keys def _analyze_key(self, key: str, threshold: int) -> Dict: """分析单个Key""" key_type = self.redis_client.type(key) if key_type == 'string': return self._analyze_string_key(key, threshold) elif key_type == 'hash': return self._analyze_hash_key(key, threshold) elif key_type == 'list': return self._analyze_list_key(key, threshold) elif key_type == 'set': return self._analyze_set_key(key, threshold) elif key_type == 'zset': return self._analyze_zset_key(key, threshold) return None def _analyze_string_key(self, key: str, threshold: int) -> Dict: """分析String类型Key""" try: memory_usage = self.redis_client.memory_usage(key) if memory_usage > threshold: return { 'key': key, 'type': 'string', 'size': memory_usage, 'description': f'String类型,大小: {memory_usage} bytes' } except Exception as e: print(f"分析String Key {key} 失败: {e}") return None def _analyze_hash_key(self, key: str, threshold: int) -> Dict: """分析Hash类型Key""" try: field_count = self.redis_client.hlen(key) if field_count > 1000: sample_fields = self.redis_client.hscan(key, count=10)[1] avg_field_size = sum(len(str(k)) + len(str(v)) for k, v in sample_fields.items()) / len(sample_fields) estimated_size = field_count * avg_field_size return { 'key': key, 'type': 'hash', 'field_count': field_count, 'estimated_size': estimated_size, 'description': f'Hash类型,field数量: {field_count},估算大小: {estimated_size} bytes' } except Exception as e: print(f"分析Hash Key {key} 失败: {e}") return None def _analyze_list_key(self, key: str, threshold: int) -> Dict: """分析List类型Key""" try: list_length = self.redis_client.llen(key) if list_length > 1000: sample_elements = self.redis_client.lrange(key, 0, 9) avg_element_size = sum(len(str(elem)) for elem in sample_elements) / len(sample_elements) estimated_size = list_length * avg_element_size return { 'key': key, 'type': 'list', 'length': list_length, 'estimated_size': estimated_size, 'description': f'List类型,长度: {list_length},估算大小: {estimated_size} bytes' } except Exception as e: print(f"分析List Key {key} 失败: {e}") return None def _analyze_set_key(self, key: str, threshold: int) -> Dict: """分析Set类型Key""" try: set_size = self.redis_client.scard(key) if set_size > 1000: sample_elements = list(self.redis_client.sscan(key, count=10)[1]) avg_element_size = sum(len(str(elem)) for elem in sample_elements) / len(sample_elements) estimated_size = set_size * avg_element_size return { 'key': key, 'type': 'set', 'size': set_size, 'estimated_size': estimated_size, 'description': f'Set类型,大小: {set_size},估算大小: {estimated_size} bytes' } except Exception as e: print(f"分析Set Key {key} 失败: {e}") return None def _analyze_zset_key(self, key: str, threshold: int) -> Dict: """分析ZSet类型Key""" try: zset_size = self.redis_client.zcard(key) if zset_size > 1000: sample_elements = self.redis_client.zscan(key, count=10)[1] avg_element_size = sum(len(str(k)) + 8 for k, v in sample_elements) / len(sample_elements) estimated_size = zset_size * avg_element_size return { 'key': key, 'type': 'zset', 'size': zset_size, 'estimated_size': estimated_size, 'description': f'ZSet类型,大小: {zset_size},估算大小: {estimated_size} bytes' } except Exception as e: print(f"分析ZSet Key {key} 失败: {e}") return None def generate_report(self) -> str: """生成检测报告""" if not self.big_keys: return "未发现大Key" report = f"发现 {len(self.big_keys)} 个大Key:\n\n" for i, key_info in enumerate(self.big_keys, 1): report += f"{i}. {key_info['description']}\n" return report
if __name__ == "__main__": detector = RedisBigKeyDetector("127.0.0.1", 6379, "password") big_keys = detector.detect_big_keys(threshold=10240) report = detector.generate_report() print(report)
|