1. 云端Redis请求数管理架构概述

在云原生时代,Redis作为高性能的内存数据库,承载着企业核心业务的数据存储和缓存需求。随着业务规模的不断扩大,Redis的请求数管理成为了架构师必须面对的重要挑战。如何有效控制QPS、实现智能限流、支持弹性扩缩容,是构建高可用Redis架构的关键技术。本文从架构师的角度深入分析Redis请求数管理的实现原理、优化策略和最佳实践,为企业级应用提供完整的Redis集群请求数管理解决方案。

1.1 Redis请求数监控架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
┌─────────────────────────────────────────────────────────┐
│ 应用层 │
│ (业务接口、API网关、微服务) │
├─────────────────────────────────────────────────────────┤
│ 监控层 │
│ (QPS监控、性能统计、健康检查) │
├─────────────────────────────────────────────────────────┤
│ 控制层 │
│ (限流控制、QPS调整、扩缩容决策) │
├─────────────────────────────────────────────────────────┤
│ 数据层 │
│ (Redis集群、时序数据库、缓存) │
├─────────────────────────────────────────────────────────┤
│ 分析层 │
│ (性能分析、趋势预测、异常检测) │
├─────────────────────────────────────────────────────────┤
│ 告警层 │
│ (告警规则、通知机制、自动处理) │
└─────────────────────────────────────────────────────────┘

1.2 Redis性能关键指标

  1. QPS: 每秒查询数、请求频率、吞吐量
  2. 响应时间: 平均响应时间、P95、P99响应时间
  3. 连接数: 并发连接数、连接池大小
  4. 内存使用率: 内存占用、内存碎片率
  5. 错误率: 请求失败率、超时率

2. Redis请求数监控与QPS控制

2.1 Redis请求数监控架构

Redis请求数监控是确保系统稳定运行的基础,需要实时监控QPS、响应时间、错误率等关键指标。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
graph TB
A[Redis集群] --> B[监控代理]
B --> C[指标收集器]
C --> D[时间序列数据库]
D --> E[监控面板]
E --> F[告警系统]

G[限流器] --> A
H[负载均衡器] --> A
I[弹性扩缩容] --> A

J[QPS控制器] --> G
K[限流算法] --> G
L[扩缩容策略] --> I

2.2 Redis QPS监控实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
/**
* Redis QPS监控器
* 实时监控Redis集群的QPS、响应时间、错误率等指标
*/
public class RedisQPSMonitor {
private final RedisClusterManager clusterManager;
private final MetricsCollector metricsCollector;
private final AlertManager alertManager;
private final ScheduledExecutorService monitorExecutor;
private final Map<String, QPSMetrics> nodeMetrics;

public RedisQPSMonitor(RedisClusterManager clusterManager) {
this.clusterManager = clusterManager;
this.metricsCollector = new MetricsCollector();
this.alertManager = new AlertManager();
this.monitorExecutor = Executors.newScheduledThreadPool(4);
this.nodeMetrics = new ConcurrentHashMap<>();

// 启动监控任务
startMonitoring();
}

/**
* 启动监控
*/
private void startMonitoring() {
// 监控QPS
monitorExecutor.scheduleAtFixedRate(this::monitorQPS, 0, 1, TimeUnit.SECONDS);

// 监控响应时间
monitorExecutor.scheduleAtFixedRate(this::monitorResponseTime, 0, 5, TimeUnit.SECONDS);

// 监控错误率
monitorExecutor.scheduleAtFixedRate(this::monitorErrorRate, 0, 10, TimeUnit.SECONDS);

// 监控内存使用率
monitorExecutor.scheduleAtFixedRate(this::monitorMemoryUsage, 0, 30, TimeUnit.SECONDS);
}

/**
* 监控QPS
*/
private void monitorQPS() {
try {
List<RedisNode> nodes = clusterManager.getAllNodes();

for (RedisNode node : nodes) {
QPSMetrics metrics = getQPSMetrics(node);
nodeMetrics.put(node.getId(), metrics);

// 检查QPS阈值
if (metrics.getCurrentQPS() > metrics.getMaxQPS()) {
alertManager.sendAlert("QPS超限",
"节点 " + node.getId() + " QPS: " + metrics.getCurrentQPS() +
" 超过阈值: " + metrics.getMaxQPS());
}

// 记录指标
metricsCollector.recordQPS(node.getId(), metrics.getCurrentQPS());
}
} catch (Exception e) {
System.err.println("监控QPS失败: " + e.getMessage());
}
}

/**
* 监控响应时间
*/
private void monitorResponseTime() {
try {
List<RedisNode> nodes = clusterManager.getAllNodes();

for (RedisNode node : nodes) {
long responseTime = measureResponseTime(node);

// 检查响应时间阈值
if (responseTime > 1000) { // 1秒
alertManager.sendAlert("响应时间过长",
"节点 " + node.getId() + " 响应时间: " + responseTime + "ms");
}

// 记录指标
metricsCollector.recordResponseTime(node.getId(), responseTime);
}
} catch (Exception e) {
System.err.println("监控响应时间失败: " + e.getMessage());
}
}

/**
* 监控错误率
*/
private void monitorErrorRate() {
try {
List<RedisNode> nodes = clusterManager.getAllNodes();

for (RedisNode node : nodes) {
double errorRate = calculateErrorRate(node);

// 检查错误率阈值
if (errorRate > 0.01) { // 1%
alertManager.sendAlert("错误率过高",
"节点 " + node.getId() + " 错误率: " + errorRate);
}

// 记录指标
metricsCollector.recordErrorRate(node.getId(), errorRate);
}
} catch (Exception e) {
System.err.println("监控错误率失败: " + e.getMessage());
}
}

/**
* 监控内存使用率
*/
private void monitorMemoryUsage() {
try {
List<RedisNode> nodes = clusterManager.getAllNodes();

for (RedisNode node : nodes) {
double memoryUsage = getMemoryUsage(node);

// 检查内存使用率阈值
if (memoryUsage > 0.8) { // 80%
alertManager.sendAlert("内存使用率过高",
"节点 " + node.getId() + " 内存使用率: " + memoryUsage);
}

// 记录指标
metricsCollector.recordMemoryUsage(node.getId(), memoryUsage);
}
} catch (Exception e) {
System.err.println("监控内存使用率失败: " + e.getMessage());
}
}

/**
* 获取QPS指标
*/
private QPSMetrics getQPSMetrics(RedisNode node) {
try {
Jedis jedis = new Jedis(node.getHost(), node.getPort());

// 获取Redis INFO信息
String info = jedis.info("stats");
Map<String, String> stats = parseInfo(info);

// 计算QPS
long totalCommands = Long.parseLong(stats.getOrDefault("total_commands_processed", "0"));
long currentQPS = calculateQPS(node.getId(), totalCommands);

jedis.close();

return new QPSMetrics(currentQPS, node.getMaxQPS());
} catch (Exception e) {
return new QPSMetrics(0, node.getMaxQPS());
}
}

/**
* 计算QPS
*/
private long calculateQPS(String nodeId, long totalCommands) {
QPSMetrics previousMetrics = nodeMetrics.get(nodeId);
if (previousMetrics == null) {
return 0;
}

long previousCommands = previousMetrics.getTotalCommands();
long commandsDiff = totalCommands - previousCommands;

// 每秒的命令数
return commandsDiff;
}

/**
* 测量响应时间
*/
private long measureResponseTime(RedisNode node) {
try {
Jedis jedis = new Jedis(node.getHost(), node.getPort());

long startTime = System.currentTimeMillis();
jedis.ping();
long endTime = System.currentTimeMillis();

jedis.close();

return endTime - startTime;
} catch (Exception e) {
return -1;
}
}

/**
* 计算错误率
*/
private double calculateErrorRate(RedisNode node) {
try {
Jedis jedis = new Jedis(node.getHost(), node.getPort());

String info = jedis.info("stats");
Map<String, String> stats = parseInfo(info);

long totalCommands = Long.parseLong(stats.getOrDefault("total_commands_processed", "0"));
long rejectedCommands = Long.parseLong(stats.getOrDefault("rejected_connections", "0"));

jedis.close();

if (totalCommands == 0) {
return 0.0;
}

return (double) rejectedCommands / totalCommands;
} catch (Exception e) {
return 0.0;
}
}

/**
* 获取内存使用率
*/
private double getMemoryUsage(RedisNode node) {
try {
Jedis jedis = new Jedis(node.getHost(), node.getPort());

String info = jedis.info("memory");
Map<String, String> memoryInfo = parseInfo(info);

long usedMemory = Long.parseLong(memoryInfo.getOrDefault("used_memory", "0"));
long maxMemory = Long.parseLong(memoryInfo.getOrDefault("maxmemory", "0"));

jedis.close();

if (maxMemory == 0) {
return 0.0;
}

return (double) usedMemory / maxMemory;
} catch (Exception e) {
return 0.0;
}
}

/**
* 解析Redis INFO信息
*/
private Map<String, String> parseInfo(String info) {
Map<String, String> result = new HashMap<>();

String[] lines = info.split("\r\n");
for (String line : lines) {
if (line.contains(":")) {
String[] parts = line.split(":", 2);
if (parts.length == 2) {
result.put(parts[0], parts[1]);
}
}
}

return result;
}

/**
* 获取节点QPS指标
*/
public QPSMetrics getNodeQPSMetrics(String nodeId) {
return nodeMetrics.get(nodeId);
}

/**
* 获取集群总QPS
*/
public long getClusterTotalQPS() {
return nodeMetrics.values().stream()
.mapToLong(QPSMetrics::getCurrentQPS)
.sum();
}
}

/**
* QPS指标
*/
class QPSMetrics {
private final long currentQPS;
private final long maxQPS;
private final long totalCommands;
private final long timestamp;

public QPSMetrics(long currentQPS, long maxQPS) {
this.currentQPS = currentQPS;
this.maxQPS = maxQPS;
this.totalCommands = 0;
this.timestamp = System.currentTimeMillis();
}

public QPSMetrics(long currentQPS, long maxQPS, long totalCommands) {
this.currentQPS = currentQPS;
this.maxQPS = maxQPS;
this.totalCommands = totalCommands;
this.timestamp = System.currentTimeMillis();
}

public long getCurrentQPS() { return currentQPS; }
public long getMaxQPS() { return maxQPS; }
public long getTotalCommands() { return totalCommands; }
public long getTimestamp() { return timestamp; }

public double getQPSUtilization() {
if (maxQPS == 0) return 0.0;
return (double) currentQPS / maxQPS;
}
}

/**
* Redis节点
*/
class RedisNode {
private final String id;
private final String host;
private final int port;
private final long maxQPS;
private final boolean isMaster;

public RedisNode(String id, String host, int port, long maxQPS, boolean isMaster) {
this.id = id;
this.host = host;
this.port = port;
this.maxQPS = maxQPS;
this.isMaster = isMaster;
}

public String getId() { return id; }
public String getHost() { return host; }
public int getPort() { return port; }
public long getMaxQPS() { return maxQPS; }
public boolean isMaster() { return isMaster; }

public String getAddress() {
return host + ":" + port;
}
}

/**
* Redis集群管理器
*/
class RedisClusterManager {
private final List<RedisNode> nodes;

public RedisClusterManager() {
this.nodes = new CopyOnWriteArrayList<>();
}

public void addNode(RedisNode node) {
nodes.add(node);
}

public void removeNode(String nodeId) {
nodes.removeIf(node -> node.getId().equals(nodeId));
}

public List<RedisNode> getAllNodes() {
return new ArrayList<>(nodes);
}

public RedisNode getNode(String nodeId) {
return nodes.stream()
.filter(node -> node.getId().equals(nodeId))
.findFirst()
.orElse(null);
}
}

2.3 QPS控制器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
/**
* Redis QPS控制器
* 实现QPS的动态控制和调整
*/
public class RedisQPSController {
private final RedisQPSMonitor monitor;
private final RedisClusterManager clusterManager;
private final QPSAdjustmentStrategy adjustmentStrategy;
private final ScheduledExecutorService controllerExecutor;

public RedisQPSController(RedisQPSMonitor monitor, RedisClusterManager clusterManager) {
this.monitor = monitor;
this.clusterManager = clusterManager;
this.adjustmentStrategy = new AdaptiveQPSStrategy();
this.controllerExecutor = Executors.newScheduledThreadPool(2);

// 启动QPS控制
startQPSControl();
}

/**
* 启动QPS控制
*/
private void startQPSControl() {
// 定期调整QPS
controllerExecutor.scheduleAtFixedRate(this::adjustQPS, 0, 30, TimeUnit.SECONDS);

// 定期检查QPS状态
controllerExecutor.scheduleAtFixedRate(this::checkQPSStatus, 0, 10, TimeUnit.SECONDS);
}

/**
* 调整QPS
*/
private void adjustQPS() {
try {
List<RedisNode> nodes = clusterManager.getAllNodes();

for (RedisNode node : nodes) {
QPSMetrics metrics = monitor.getNodeQPSMetrics(node.getId());
if (metrics == null) continue;

// 计算QPS利用率
double utilization = metrics.getQPSUtilization();

// 根据利用率调整QPS
QPSAdjustment adjustment = adjustmentStrategy.calculateAdjustment(utilization, node);

if (adjustment.getAction() == QPSAction.INCREASE) {
increaseQPS(node, adjustment.getAmount());
} else if (adjustment.getAction() == QPSAction.DECREASE) {
decreaseQPS(node, adjustment.getAmount());
}
}
} catch (Exception e) {
System.err.println("调整QPS失败: " + e.getMessage());
}
}

/**
* 检查QPS状态
*/
private void checkQPSStatus() {
try {
long clusterTotalQPS = monitor.getClusterTotalQPS();
long clusterMaxQPS = clusterManager.getAllNodes().stream()
.mapToLong(RedisNode::getMaxQPS)
.sum();

double clusterUtilization = (double) clusterTotalQPS / clusterMaxQPS;

if (clusterUtilization > 0.9) {
// 集群QPS利用率过高,需要扩容
triggerScaling(ScalingAction.SCALE_OUT);
} else if (clusterUtilization < 0.3) {
// 集群QPS利用率过低,可以考虑缩容
triggerScaling(ScalingAction.SCALE_IN);
}
} catch (Exception e) {
System.err.println("检查QPS状态失败: " + e.getMessage());
}
}

/**
* 增加QPS
*/
private void increaseQPS(RedisNode node, long amount) {
try {
// 调整Redis配置
adjustRedisConfig(node, "maxmemory-policy", "allkeys-lru");
adjustRedisConfig(node, "timeout", "0");

// 记录调整日志
System.out.println("增加节点 " + node.getId() + " QPS: " + amount);
} catch (Exception e) {
System.err.println("增加QPS失败: " + e.getMessage());
}
}

/**
* 减少QPS
*/
private void decreaseQPS(RedisNode node, long amount) {
try {
// 调整Redis配置
adjustRedisConfig(node, "maxmemory-policy", "volatile-lru");
adjustRedisConfig(node, "timeout", "300");

// 记录调整日志
System.out.println("减少节点 " + node.getId() + " QPS: " + amount);
} catch (Exception e) {
System.err.println("减少QPS失败: " + e.getMessage());
}
}

/**
* 调整Redis配置
*/
private void adjustRedisConfig(RedisNode node, String key, String value) {
try {
Jedis jedis = new Jedis(node.getHost(), node.getPort());
jedis.configSet(key, value);
jedis.close();
} catch (Exception e) {
System.err.println("调整Redis配置失败: " + e.getMessage());
}
}

/**
* 触发扩缩容
*/
private void triggerScaling(ScalingAction action) {
// 实现扩缩容逻辑
System.out.println("触发扩缩容: " + action);
}
}

/**
* QPS调整策略
*/
interface QPSAdjustmentStrategy {
QPSAdjustment calculateAdjustment(double utilization, RedisNode node);
}

/**
* 自适应QPS策略
*/
class AdaptiveQPSStrategy implements QPSAdjustmentStrategy {
private static final double HIGH_UTILIZATION_THRESHOLD = 0.8;
private static final double LOW_UTILIZATION_THRESHOLD = 0.3;
private static final long DEFAULT_ADJUSTMENT_AMOUNT = 1000;

@Override
public QPSAdjustment calculateAdjustment(double utilization, RedisNode node) {
if (utilization > HIGH_UTILIZATION_THRESHOLD) {
// 利用率过高,减少QPS
long amount = (long) (node.getMaxQPS() * 0.1);
return new QPSAdjustment(QPSAction.DECREASE, amount);
} else if (utilization < LOW_UTILIZATION_THRESHOLD) {
// 利用率过低,增加QPS
long amount = (long) (node.getMaxQPS() * 0.1);
return new QPSAdjustment(QPSAction.INCREASE, amount);
} else {
// 利用率正常,不调整
return new QPSAdjustment(QPSAction.NO_CHANGE, 0);
}
}
}

enum QPSAction {
INCREASE, DECREASE, NO_CHANGE
}

class QPSAdjustment {
private final QPSAction action;
private final long amount;

public QPSAdjustment(QPSAction action, long amount) {
this.action = action;
this.amount = amount;
}

public QPSAction getAction() { return action; }
public long getAmount() { return amount; }
}

enum ScalingAction {
SCALE_OUT, SCALE_IN
}

3. 限流算法深度实现

3.1 限流算法分类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
graph TD
A[限流算法] --> B[计数器算法]
A --> C[滑动窗口算法]
A --> D[令牌桶算法]
A --> E[漏桶算法]

B --> B1[固定窗口]
B --> B2[滑动窗口]

C --> C1[时间窗口]
C --> C2[请求窗口]

D --> D1[固定速率]
D --> D2[动态速率]

E --> E1[固定容量]
E --> E2[动态容量]

3.2 令牌桶限流算法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
/**
* 令牌桶限流器
* 支持固定速率和动态速率调整
*/
public class TokenBucketRateLimiter {
private final long capacity;
private final long refillRate;
private final AtomicLong tokens;
private final AtomicLong lastRefillTime;
private final ScheduledExecutorService refillExecutor;

public TokenBucketRateLimiter(long capacity, long refillRate) {
this.capacity = capacity;
this.refillRate = refillRate;
this.tokens = new AtomicLong(capacity);
this.lastRefillTime = new AtomicLong(System.currentTimeMillis());
this.refillExecutor = Executors.newScheduledThreadPool(1);

// 启动令牌补充
startTokenRefill();
}

/**
* 尝试获取令牌
*/
public boolean tryAcquire() {
return tryAcquire(1);
}

/**
* 尝试获取指定数量的令牌
*/
public boolean tryAcquire(long requestedTokens) {
if (requestedTokens <= 0) {
return true;
}

refillTokens();

long currentTokens = tokens.get();
if (currentTokens >= requestedTokens) {
return tokens.compareAndSet(currentTokens, currentTokens - requestedTokens);
}

return false;
}

/**
* 获取令牌(阻塞)
*/
public void acquire() throws InterruptedException {
acquire(1);
}

/**
* 获取指定数量的令牌(阻塞)
*/
public void acquire(long requestedTokens) throws InterruptedException {
while (!tryAcquire(requestedTokens)) {
Thread.sleep(calculateWaitTime(requestedTokens));
}
}

/**
* 补充令牌
*/
private void refillTokens() {
long now = System.currentTimeMillis();
long lastRefill = lastRefillTime.get();

if (now > lastRefill) {
long timePassed = now - lastRefill;
long tokensToAdd = timePassed * refillRate / 1000;

if (tokensToAdd > 0) {
long currentTokens = tokens.get();
long newTokens = Math.min(capacity, currentTokens + tokensToAdd);

if (tokens.compareAndSet(currentTokens, newTokens)) {
lastRefillTime.compareAndSet(lastRefill, now);
}
}
}
}

/**
* 启动令牌补充
*/
private void startTokenRefill() {
refillExecutor.scheduleAtFixedRate(this::refillTokens, 0, 100, TimeUnit.MILLISECONDS);
}

/**
* 计算等待时间
*/
private long calculateWaitTime(long requestedTokens) {
long currentTokens = tokens.get();
if (currentTokens >= requestedTokens) {
return 0;
}

long tokensNeeded = requestedTokens - currentTokens;
return tokensNeeded * 1000 / refillRate;
}

/**
* 获取当前令牌数
*/
public long getCurrentTokens() {
refillTokens();
return tokens.get();
}

/**
* 获取容量
*/
public long getCapacity() {
return capacity;
}

/**
* 获取补充速率
*/
public long getRefillRate() {
return refillRate;
}

/**
* 动态调整补充速率
*/
public void adjustRefillRate(long newRefillRate) {
// 实现动态调整逻辑
System.out.println("调整补充速率: " + newRefillRate);
}
}

/**
* 分布式令牌桶限流器
* 基于Redis实现分布式限流
*/
public class DistributedTokenBucketRateLimiter {
private final RedisTemplate<String, String> redisTemplate;
private final String keyPrefix;
private final long capacity;
private final long refillRate;

public DistributedTokenBucketRateLimiter(RedisTemplate<String, String> redisTemplate,
String keyPrefix, long capacity, long refillRate) {
this.redisTemplate = redisTemplate;
this.keyPrefix = keyPrefix;
this.capacity = capacity;
this.refillRate = refillRate;
}

/**
* 尝试获取令牌
*/
public boolean tryAcquire(String key) {
return tryAcquire(key, 1);
}

/**
* 尝试获取指定数量的令牌
*/
public boolean tryAcquire(String key, long requestedTokens) {
String redisKey = keyPrefix + ":" + key;

// 使用Lua脚本保证原子性
String luaScript = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refillRate = tonumber(ARGV[2])
local requestedTokens = tonumber(ARGV[3])
local now = tonumber(ARGV[4])

local bucket = redis.call('HMGET', key, 'tokens', 'lastRefillTime')
local tokens = tonumber(bucket[1]) or capacity
local lastRefillTime = tonumber(bucket[2]) or now

-- 计算需要补充的令牌数
local timePassed = now - lastRefillTime
local tokensToAdd = math.floor(timePassed * refillRate / 1000)

if tokensToAdd > 0 then
tokens = math.min(capacity, tokens + tokensToAdd)
lastRefillTime = now
end

-- 检查是否有足够的令牌
if tokens >= requestedTokens then
tokens = tokens - requestedTokens
redis.call('HMSET', key, 'tokens', tokens, 'lastRefillTime', lastRefillTime)
redis.call('EXPIRE', key, 3600)
return 1
else
redis.call('HMSET', key, 'tokens', tokens, 'lastRefillTime', lastRefillTime)
redis.call('EXPIRE', key, 3600)
return 0
end
""";

DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(luaScript);
script.setResultType(Long.class);

Long result = redisTemplate.execute(script,
Collections.singletonList(redisKey),
String.valueOf(capacity),
String.valueOf(refillRate),
String.valueOf(requestedTokens),
String.valueOf(System.currentTimeMillis()));

return result != null && result == 1;
}

/**
* 获取当前令牌数
*/
public long getCurrentTokens(String key) {
String redisKey = keyPrefix + ":" + key;

String luaScript = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refillRate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

local bucket = redis.call('HMGET', key, 'tokens', 'lastRefillTime')
local tokens = tonumber(bucket[1]) or capacity
local lastRefillTime = tonumber(bucket[2]) or now

-- 计算需要补充的令牌数
local timePassed = now - lastRefillTime
local tokensToAdd = math.floor(timePassed * refillRate / 1000)

if tokensToAdd > 0 then
tokens = math.min(capacity, tokens + tokensToAdd)
lastRefillTime = now
redis.call('HMSET', key, 'tokens', tokens, 'lastRefillTime', lastRefillTime)
redis.call('EXPIRE', key, 3600)
end

return tokens
""";

DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(luaScript);
script.setResultType(Long.class);

Long result = redisTemplate.execute(script,
Collections.singletonList(redisKey),
String.valueOf(capacity),
String.valueOf(refillRate),
String.valueOf(System.currentTimeMillis()));

return result != null ? result : 0;
}
}

3.3 滑动窗口限流算法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
/**
* 滑动窗口限流器
* 基于时间窗口的精确限流
*/
public class SlidingWindowRateLimiter {
private final long windowSize;
private final long maxRequests;
private final Map<String, List<Long>> requestWindows;
private final ScheduledExecutorService cleanupExecutor;

public SlidingWindowRateLimiter(long windowSize, long maxRequests) {
this.windowSize = windowSize;
this.maxRequests = maxRequests;
this.requestWindows = new ConcurrentHashMap<>();
this.cleanupExecutor = Executors.newScheduledThreadPool(1);

// 启动清理任务
startCleanup();
}

/**
* 尝试获取请求许可
*/
public boolean tryAcquire(String key) {
return tryAcquire(key, 1);
}

/**
* 尝试获取指定数量的请求许可
*/
public boolean tryAcquire(String key, long requestedRequests) {
long now = System.currentTimeMillis();
List<Long> window = requestWindows.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>());

// 清理过期请求
cleanupExpiredRequests(window, now);

// 检查是否超过限制
if (window.size() + requestedRequests > maxRequests) {
return false;
}

// 添加请求时间戳
for (int i = 0; i < requestedRequests; i++) {
window.add(now);
}

return true;
}

/**
* 清理过期请求
*/
private void cleanupExpiredRequests(List<Long> window, long now) {
long cutoffTime = now - windowSize;
window.removeIf(timestamp -> timestamp < cutoffTime);
}

/**
* 启动清理任务
*/
private void startCleanup() {
cleanupExecutor.scheduleAtFixedRate(() -> {
long now = System.currentTimeMillis();
long cutoffTime = now - windowSize;

requestWindows.entrySet().removeIf(entry -> {
List<Long> window = entry.getValue();
window.removeIf(timestamp -> timestamp < cutoffTime);
return window.isEmpty();
});
}, 0, windowSize / 4, TimeUnit.MILLISECONDS);
}

/**
* 获取当前请求数
*/
public long getCurrentRequests(String key) {
long now = System.currentTimeMillis();
List<Long> window = requestWindows.get(key);

if (window == null) {
return 0;
}

cleanupExpiredRequests(window, now);
return window.size();
}

/**
* 获取剩余请求数
*/
public long getRemainingRequests(String key) {
return maxRequests - getCurrentRequests(key);
}

/**
* 获取窗口大小
*/
public long getWindowSize() {
return windowSize;
}

/**
* 获取最大请求数
*/
public long getMaxRequests() {
return maxRequests;
}
}

/**
* 分布式滑动窗口限流器
* 基于Redis实现分布式滑动窗口限流
*/
public class DistributedSlidingWindowRateLimiter {
private final RedisTemplate<String, String> redisTemplate;
private final String keyPrefix;
private final long windowSize;
private final long maxRequests;

public DistributedSlidingWindowRateLimiter(RedisTemplate<String, String> redisTemplate,
String keyPrefix, long windowSize, long maxRequests) {
this.redisTemplate = redisTemplate;
this.keyPrefix = keyPrefix;
this.windowSize = windowSize;
this.maxRequests = maxRequests;
}

/**
* 尝试获取请求许可
*/
public boolean tryAcquire(String key) {
return tryAcquire(key, 1);
}

/**
* 尝试获取指定数量的请求许可
*/
public boolean tryAcquire(String key, long requestedRequests) {
String redisKey = keyPrefix + ":" + key;

// 使用Lua脚本保证原子性
String luaScript = """
local key = KEYS[1]
local windowSize = tonumber(ARGV[1])
local maxRequests = tonumber(ARGV[2])
local requestedRequests = tonumber(ARGV[3])
local now = tonumber(ARGV[4])

-- 清理过期请求
local cutoffTime = now - windowSize
redis.call('ZREMRANGEBYSCORE', key, 0, cutoffTime)

-- 获取当前请求数
local currentRequests = redis.call('ZCARD', key)

-- 检查是否超过限制
if currentRequests + requestedRequests > maxRequests then
return 0
end

-- 添加请求时间戳
for i = 1, requestedRequests do
redis.call('ZADD', key, now + i * 0.001, now + i * 0.001)
end

redis.call('EXPIRE', key, math.ceil(windowSize / 1000))
return 1
""";

DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(luaScript);
script.setResultType(Long.class);

Long result = redisTemplate.execute(script,
Collections.singletonList(redisKey),
String.valueOf(windowSize),
String.valueOf(maxRequests),
String.valueOf(requestedRequests),
String.valueOf(System.currentTimeMillis()));

return result != null && result == 1;
}

/**
* 获取当前请求数
*/
public long getCurrentRequests(String key) {
String redisKey = keyPrefix + ":" + key;

String luaScript = """
local key = KEYS[1]
local windowSize = tonumber(ARGV[1])
local now = tonumber(ARGV[2])

-- 清理过期请求
local cutoffTime = now - windowSize
redis.call('ZREMRANGEBYSCORE', key, 0, cutoffTime)

-- 获取当前请求数
return redis.call('ZCARD', key)
""";

DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(luaScript);
script.setResultType(Long.class);

Long result = redisTemplate.execute(script,
Collections.singletonList(redisKey),
String.valueOf(windowSize),
String.valueOf(System.currentTimeMillis()));

return result != null ? result : 0;
}

/**
* 获取剩余请求数
*/
public long getRemainingRequests(String key) {
return maxRequests - getCurrentRequests(key);
}
}

3.4 漏桶限流算法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/**
* 漏桶限流器
* 控制请求的流出速率
*/
public class LeakyBucketRateLimiter {
private final long capacity;
private final long leakRate;
private final AtomicLong currentLevel;
private final AtomicLong lastLeakTime;
private final ScheduledExecutorService leakExecutor;

public LeakyBucketRateLimiter(long capacity, long leakRate) {
this.capacity = capacity;
this.leakRate = leakRate;
this.currentLevel = new AtomicLong(0);
this.lastLeakTime = new AtomicLong(System.currentTimeMillis());
this.leakExecutor = Executors.newScheduledThreadPool(1);

// 启动漏水
startLeaking();
}

/**
* 尝试添加请求
*/
public boolean tryAddRequest() {
return tryAddRequest(1);
}

/**
* 尝试添加指定数量的请求
*/
public boolean tryAddRequest(long requestedAmount) {
if (requestedAmount <= 0) {
return true;
}

leakWater();

long current = currentLevel.get();
if (current + requestedAmount <= capacity) {
return currentLevel.compareAndSet(current, current + requestedAmount);
}

return false;
}

/**
* 添加请求(阻塞)
*/
public void addRequest() throws InterruptedException {
addRequest(1);
}

/**
* 添加指定数量的请求(阻塞)
*/
public void addRequest(long requestedAmount) throws InterruptedException {
while (!tryAddRequest(requestedAmount)) {
Thread.sleep(calculateWaitTime(requestedAmount));
}
}

/**
* 漏水
*/
private void leakWater() {
long now = System.currentTimeMillis();
long lastLeak = lastLeakTime.get();

if (now > lastLeak) {
long timePassed = now - lastLeak;
long waterToLeak = timePassed * leakRate / 1000;

if (waterToLeak > 0) {
long current = currentLevel.get();
long newLevel = Math.max(0, current - waterToLeak);

if (currentLevel.compareAndSet(current, newLevel)) {
lastLeakTime.compareAndSet(lastLeak, now);
}
}
}
}

/**
* 启动漏水
*/
private void startLeaking() {
leakExecutor.scheduleAtFixedRate(this::leakWater, 0, 100, TimeUnit.MILLISECONDS);
}

/**
* 计算等待时间
*/
private long calculateWaitTime(long requestedAmount) {
long current = currentLevel.get();
if (current + requestedAmount <= capacity) {
return 0;
}

long excess = current + requestedAmount - capacity;
return excess * 1000 / leakRate;
}

/**
* 获取当前水位
*/
public long getCurrentLevel() {
leakWater();
return currentLevel.get();
}

/**
* 获取容量
*/
public long getCapacity() {
return capacity;
}

/**
* 获取漏水速率
*/
public long getLeakRate() {
return leakRate;
}

/**
* 获取剩余容量
*/
public long getRemainingCapacity() {
return capacity - getCurrentLevel();
}
}

4. 弹性扩缩容机制

4.1 弹性扩缩容架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
graph TB
A[监控系统] --> B[扩缩容决策器]
B --> C[扩缩容执行器]
C --> D[Redis集群]

E[QPS监控] --> A
F[CPU监控] --> A
G[内存监控] --> A
H[延迟监控] --> A

I[扩容策略] --> B
J[缩容策略] --> B
K[健康检查] --> B

L[节点添加] --> C
M[节点移除] --> C
N[负载重分配] --> C

4.2 弹性扩缩容实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
/**
* Redis弹性扩缩容管理器
* 实现基于指标的自动扩缩容
*/
public class RedisElasticScalingManager {
private final RedisClusterManager clusterManager;
private final RedisQPSMonitor qpsMonitor;
private final ScalingDecisionEngine decisionEngine;
private final ScalingExecutor scalingExecutor;
private final ScheduledExecutorService scalingExecutorService;

public RedisElasticScalingManager(RedisClusterManager clusterManager,
RedisQPSMonitor qpsMonitor) {
this.clusterManager = clusterManager;
this.qpsMonitor = qpsMonitor;
this.decisionEngine = new ScalingDecisionEngine();
this.scalingExecutor = new ScalingExecutor(clusterManager);
this.scalingExecutorService = Executors.newScheduledThreadPool(2);

// 启动扩缩容监控
startScalingMonitoring();
}

/**
* 启动扩缩容监控
*/
private void startScalingMonitoring() {
// 定期检查扩缩容需求
scalingExecutorService.scheduleAtFixedRate(this::checkScalingNeeds,
0, 60, TimeUnit.SECONDS);

// 定期执行扩缩容操作
scalingExecutorService.scheduleAtFixedRate(this::executeScaling,
0, 300, TimeUnit.SECONDS);
}

/**
* 检查扩缩容需求
*/
private void checkScalingNeeds() {
try {
// 收集集群指标
ClusterMetrics metrics = collectClusterMetrics();

// 分析扩缩容需求
ScalingDecision decision = decisionEngine.analyzeScalingNeeds(metrics);

if (decision.getAction() != ScalingAction.NO_SCALING) {
// 记录扩缩容决策
System.out.println("扩缩容决策: " + decision.getAction() +
", 原因: " + decision.getReason());

// 执行扩缩容
executeScalingDecision(decision);
}
} catch (Exception e) {
System.err.println("检查扩缩容需求失败: " + e.getMessage());
}
}

/**
* 执行扩缩容
*/
private void executeScaling() {
try {
// 检查是否有待执行的扩缩容操作
List<ScalingOperation> pendingOperations = scalingExecutor.getPendingOperations();

for (ScalingOperation operation : pendingOperations) {
if (operation.isReadyToExecute()) {
scalingExecutor.executeOperation(operation);
}
}
} catch (Exception e) {
System.err.println("执行扩缩容失败: " + e.getMessage());
}
}

/**
* 执行扩缩容决策
*/
private void executeScalingDecision(ScalingDecision decision) {
try {
ScalingOperation operation = createScalingOperation(decision);
scalingExecutor.scheduleOperation(operation);
} catch (Exception e) {
System.err.println("执行扩缩容决策失败: " + e.getMessage());
}
}

/**
* 创建扩缩容操作
*/
private ScalingOperation createScalingOperation(ScalingDecision decision) {
if (decision.getAction() == ScalingAction.SCALE_OUT) {
return new ScaleOutOperation(decision.getTargetNodeCount());
} else if (decision.getAction() == ScalingAction.SCALE_IN) {
return new ScaleInOperation(decision.getTargetNodeCount());
} else {
throw new IllegalArgumentException("不支持的扩缩容操作: " + decision.getAction());
}
}

/**
* 收集集群指标
*/
private ClusterMetrics collectClusterMetrics() {
ClusterMetrics metrics = new ClusterMetrics();

// 收集QPS指标
long totalQPS = qpsMonitor.getClusterTotalQPS();
metrics.setTotalQPS(totalQPS);

// 收集节点指标
List<RedisNode> nodes = clusterManager.getAllNodes();
metrics.setNodeCount(nodes.size());

// 计算平均QPS
double avgQPS = nodes.isEmpty() ? 0 : (double) totalQPS / nodes.size();
metrics.setAverageQPS(avgQPS);

// 计算QPS利用率
long maxQPS = nodes.stream().mapToLong(RedisNode::getMaxQPS).sum();
double qpsUtilization = maxQPS == 0 ? 0 : (double) totalQPS / maxQPS;
metrics.setQPSUtilization(qpsUtilization);

// 收集其他指标
collectAdditionalMetrics(metrics, nodes);

return metrics;
}

/**
* 收集其他指标
*/
private void collectAdditionalMetrics(ClusterMetrics metrics, List<RedisNode> nodes) {
// 收集CPU使用率
double avgCPUUsage = nodes.stream()
.mapToDouble(this::getNodeCPUUsage)
.average()
.orElse(0.0);
metrics.setAverageCPUUsage(avgCPUUsage);

// 收集内存使用率
double avgMemoryUsage = nodes.stream()
.mapToDouble(this::getNodeMemoryUsage)
.average()
.orElse(0.0);
metrics.setAverageMemoryUsage(avgMemoryUsage);

// 收集延迟
double avgLatency = nodes.stream()
.mapToDouble(this::getNodeLatency)
.average()
.orElse(0.0);
metrics.setAverageLatency(avgLatency);
}

private double getNodeCPUUsage(RedisNode node) {
// 实现CPU使用率获取
return 0.0;
}

private double getNodeMemoryUsage(RedisNode node) {
// 实现内存使用率获取
return 0.0;
}

private double getNodeLatency(RedisNode node) {
// 实现延迟获取
return 0.0;
}
}

/**
* 扩缩容决策引擎
*/
class ScalingDecisionEngine {
private static final double SCALE_OUT_QPS_THRESHOLD = 0.8;
private static final double SCALE_IN_QPS_THRESHOLD = 0.3;
private static final double SCALE_OUT_CPU_THRESHOLD = 0.8;
private static final double SCALE_IN_CPU_THRESHOLD = 0.3;
private static final double SCALE_OUT_MEMORY_THRESHOLD = 0.8;
private static final double SCALE_IN_MEMORY_THRESHOLD = 0.3;
private static final double SCALE_OUT_LATENCY_THRESHOLD = 1000.0;
private static final int MIN_NODE_COUNT = 3;
private static final int MAX_NODE_COUNT = 20;

/**
* 分析扩缩容需求
*/
public ScalingDecision analyzeScalingNeeds(ClusterMetrics metrics) {
// 检查扩容条件
if (shouldScaleOut(metrics)) {
int targetNodeCount = calculateTargetNodeCount(metrics, true);
return new ScalingDecision(ScalingAction.SCALE_OUT, targetNodeCount,
"QPS利用率: " + metrics.getQPSUtilization() +
", CPU使用率: " + metrics.getAverageCPUUsage() +
", 内存使用率: " + metrics.getAverageMemoryUsage());
}

// 检查缩容条件
if (shouldScaleIn(metrics)) {
int targetNodeCount = calculateTargetNodeCount(metrics, false);
return new ScalingDecision(ScalingAction.SCALE_IN, targetNodeCount,
"QPS利用率: " + metrics.getQPSUtilization() +
", CPU使用率: " + metrics.getAverageCPUUsage() +
", 内存使用率: " + metrics.getAverageMemoryUsage());
}

return new ScalingDecision(ScalingAction.NO_SCALING, metrics.getNodeCount(), "无需扩缩容");
}

/**
* 检查是否应该扩容
*/
private boolean shouldScaleOut(ClusterMetrics metrics) {
// 检查节点数量限制
if (metrics.getNodeCount() >= MAX_NODE_COUNT) {
return false;
}

// 检查QPS利用率
if (metrics.getQPSUtilization() > SCALE_OUT_QPS_THRESHOLD) {
return true;
}

// 检查CPU使用率
if (metrics.getAverageCPUUsage() > SCALE_OUT_CPU_THRESHOLD) {
return true;
}

// 检查内存使用率
if (metrics.getAverageMemoryUsage() > SCALE_OUT_MEMORY_THRESHOLD) {
return true;
}

// 检查延迟
if (metrics.getAverageLatency() > SCALE_OUT_LATENCY_THRESHOLD) {
return true;
}

return false;
}

/**
* 检查是否应该缩容
*/
private boolean shouldScaleIn(ClusterMetrics metrics) {
// 检查节点数量限制
if (metrics.getNodeCount() <= MIN_NODE_COUNT) {
return false;
}

// 检查QPS利用率
if (metrics.getQPSUtilization() < SCALE_IN_QPS_THRESHOLD) {
return true;
}

// 检查CPU使用率
if (metrics.getAverageCPUUsage() < SCALE_IN_CPU_THRESHOLD) {
return true;
}

// 检查内存使用率
if (metrics.getAverageMemoryUsage() < SCALE_IN_MEMORY_THRESHOLD) {
return true;
}

return false;
}

/**
* 计算目标节点数量
*/
private int calculateTargetNodeCount(ClusterMetrics metrics, boolean isScaleOut) {
int currentNodeCount = metrics.getNodeCount();

if (isScaleOut) {
// 扩容:基于QPS利用率计算
double qpsUtilization = metrics.getQPSUtilization();
int additionalNodes = (int) Math.ceil((qpsUtilization - SCALE_OUT_QPS_THRESHOLD) * currentNodeCount);

return Math.min(currentNodeCount + additionalNodes, MAX_NODE_COUNT);
} else {
// 缩容:基于QPS利用率计算
double qpsUtilization = metrics.getQPSUtilization();
int removeNodes = (int) Math.floor((SCALE_IN_QPS_THRESHOLD - qpsUtilization) * currentNodeCount);

return Math.max(currentNodeCount - removeNodes, MIN_NODE_COUNT);
}
}
}

/**
* 扩缩容执行器
*/
class ScalingExecutor {
private final RedisClusterManager clusterManager;
private final Queue<ScalingOperation> pendingOperations;
private final Map<String, ScalingOperation> executingOperations;

public ScalingExecutor(RedisClusterManager clusterManager) {
this.clusterManager = clusterManager;
this.pendingOperations = new ConcurrentLinkedQueue<>();
this.executingOperations = new ConcurrentHashMap<>();
}

/**
* 调度扩缩容操作
*/
public void scheduleOperation(ScalingOperation operation) {
pendingOperations.offer(operation);
}

/**
* 执行扩缩容操作
*/
public void executeOperation(ScalingOperation operation) {
try {
executingOperations.put(operation.getId(), operation);

if (operation instanceof ScaleOutOperation) {
executeScaleOut((ScaleOutOperation) operation);
} else if (operation instanceof ScaleInOperation) {
executeScaleIn((ScaleInOperation) operation);
}

executingOperations.remove(operation.getId());
} catch (Exception e) {
System.err.println("执行扩缩容操作失败: " + e.getMessage());
executingOperations.remove(operation.getId());
}
}

/**
* 执行扩容
*/
private void executeScaleOut(ScaleOutOperation operation) {
try {
int targetCount = operation.getTargetNodeCount();
int currentCount = clusterManager.getAllNodes().size();
int nodesToAdd = targetCount - currentCount;

for (int i = 0; i < nodesToAdd; i++) {
// 创建新节点
RedisNode newNode = createNewNode();

// 添加到集群
clusterManager.addNode(newNode);

// 等待节点就绪
waitForNodeReady(newNode);

// 重新分配数据
redistributeData();

System.out.println("扩容完成,添加节点: " + newNode.getId());
}
} catch (Exception e) {
System.err.println("扩容失败: " + e.getMessage());
}
}

/**
* 执行缩容
*/
private void executeScaleIn(ScaleInOperation operation) {
try {
int targetCount = operation.getTargetNodeCount();
int currentCount = clusterManager.getAllNodes().size();
int nodesToRemove = currentCount - targetCount;

for (int i = 0; i < nodesToRemove; i++) {
// 选择要移除的节点
RedisNode nodeToRemove = selectNodeToRemove();

if (nodeToRemove != null) {
// 迁移数据
migrateDataFromNode(nodeToRemove);

// 从集群中移除
clusterManager.removeNode(nodeToRemove.getId());

// 销毁节点
destroyNode(nodeToRemove);

System.out.println("缩容完成,移除节点: " + nodeToRemove.getId());
}
}
} catch (Exception e) {
System.err.println("缩容失败: " + e.getMessage());
}
}

/**
* 创建新节点
*/
private RedisNode createNewNode() {
// 实现新节点创建逻辑
String nodeId = "node-" + System.currentTimeMillis();
String host = "redis-" + nodeId + ".example.com";
int port = 6379;
long maxQPS = 10000;

return new RedisNode(nodeId, host, port, maxQPS, false);
}

/**
* 等待节点就绪
*/
private void waitForNodeReady(RedisNode node) {
// 实现节点就绪等待逻辑
try {
Thread.sleep(5000); // 模拟等待
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

/**
* 重新分配数据
*/
private void redistributeData() {
// 实现数据重新分配逻辑
System.out.println("重新分配数据...");
}

/**
* 选择要移除的节点
*/
private RedisNode selectNodeToRemove() {
List<RedisNode> nodes = clusterManager.getAllNodes();

// 优先移除从节点
return nodes.stream()
.filter(node -> !node.isMaster())
.findFirst()
.orElse(nodes.get(0));
}

/**
* 从节点迁移数据
*/
private void migrateDataFromNode(RedisNode node) {
// 实现数据迁移逻辑
System.out.println("从节点 " + node.getId() + " 迁移数据...");
}

/**
* 销毁节点
*/
private void destroyNode(RedisNode node) {
// 实现节点销毁逻辑
System.out.println("销毁节点: " + node.getId());
}

/**
* 获取待执行的操作
*/
public List<ScalingOperation> getPendingOperations() {
return new ArrayList<>(pendingOperations);
}
}

/**
* 集群指标
*/
class ClusterMetrics {
private long totalQPS;
private int nodeCount;
private double averageQPS;
private double qpsUtilization;
private double averageCPUUsage;
private double averageMemoryUsage;
private double averageLatency;

// getters and setters
public long getTotalQPS() { return totalQPS; }
public void setTotalQPS(long totalQPS) { this.totalQPS = totalQPS; }
public int getNodeCount() { return nodeCount; }
public void setNodeCount(int nodeCount) { this.nodeCount = nodeCount; }
public double getAverageQPS() { return averageQPS; }
public void setAverageQPS(double averageQPS) { this.averageQPS = averageQPS; }
public double getQPSUtilization() { return qpsUtilization; }
public void setQPSUtilization(double qpsUtilization) { this.qpsUtilization = qpsUtilization; }
public double getAverageCPUUsage() { return averageCPUUsage; }
public void setAverageCPUUsage(double averageCPUUsage) { this.averageCPUUsage = averageCPUUsage; }
public double getAverageMemoryUsage() { return averageMemoryUsage; }
public void setAverageMemoryUsage(double averageMemoryUsage) { this.averageMemoryUsage = averageMemoryUsage; }
public double getAverageLatency() { return averageLatency; }
public void setAverageLatency(double averageLatency) { this.averageLatency = averageLatency; }
}

/**
* 扩缩容决策
*/
class ScalingDecision {
private final ScalingAction action;
private final int targetNodeCount;
private final String reason;

public ScalingDecision(ScalingAction action, int targetNodeCount, String reason) {
this.action = action;
this.targetNodeCount = targetNodeCount;
this.reason = reason;
}

public ScalingAction getAction() { return action; }
public int getTargetNodeCount() { return targetNodeCount; }
public String getReason() { return reason; }
}

enum ScalingAction {
SCALE_OUT, SCALE_IN, NO_SCALING
}

/**
* 扩缩容操作基类
*/
abstract class ScalingOperation {
private final String id;
private final long createTime;
private final int targetNodeCount;

public ScalingOperation(int targetNodeCount) {
this.id = "op-" + System.currentTimeMillis();
this.createTime = System.currentTimeMillis();
this.targetNodeCount = targetNodeCount;
}

public String getId() { return id; }
public long getCreateTime() { return createTime; }
public int getTargetNodeCount() { return targetNodeCount; }

public boolean isReadyToExecute() {
return System.currentTimeMillis() - createTime > 5000; // 5秒后执行
}
}

/**
* 扩容操作
*/
class ScaleOutOperation extends ScalingOperation {
public ScaleOutOperation(int targetNodeCount) {
super(targetNodeCount);
}
}

/**
* 缩容操作
*/
class ScaleInOperation extends ScalingOperation {
public ScaleInOperation(int targetNodeCount) {
super(targetNodeCount);
}
}

5. 企业级Redis集群请求数管理

5.1 Redis集群请求数管理架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
/**
* 企业级Redis集群请求数管理器
* 实现多维度请求数管理、智能路由、故障转移
*/
public class EnterpriseRedisClusterManager {
private final RedisClusterManager clusterManager;
private final RedisQPSMonitor qpsMonitor;
private final RedisQPSController qpsController;
private final RateLimiterManager rateLimiterManager;
private final RedisElasticScalingManager scalingManager;
private final RequestRouter requestRouter;
private final HealthChecker healthChecker;

public EnterpriseRedisClusterManager() {
this.clusterManager = new RedisClusterManager();
this.qpsMonitor = new RedisQPSMonitor(clusterManager);
this.qpsController = new RedisQPSController(qpsMonitor, clusterManager);
this.rateLimiterManager = new RateLimiterManager();
this.scalingManager = new RedisElasticScalingManager(clusterManager, qpsMonitor);
this.requestRouter = new RequestRouter(clusterManager);
this.healthChecker = new HealthChecker(clusterManager);

// 初始化集群
initializeCluster();
}

/**
* 初始化集群
*/
private void initializeCluster() {
// 添加初始节点
addInitialNodes();

// 配置限流器
configureRateLimiters();

// 启动健康检查
healthChecker.start();
}

/**
* 添加初始节点
*/
private void addInitialNodes() {
// 添加主节点
RedisNode master1 = new RedisNode("master-1", "redis-master-1.example.com", 6379, 15000, true);
RedisNode master2 = new RedisNode("master-2", "redis-master-2.example.com", 6379, 15000, true);
RedisNode master3 = new RedisNode("master-3", "redis-master-3.example.com", 6379, 15000, true);

clusterManager.addNode(master1);
clusterManager.addNode(master2);
clusterManager.addNode(master3);

// 添加从节点
RedisNode slave1 = new RedisNode("slave-1", "redis-slave-1.example.com", 6379, 10000, false);
RedisNode slave2 = new RedisNode("slave-2", "redis-slave-2.example.com", 6379, 10000, false);
RedisNode slave3 = new RedisNode("slave-3", "redis-slave-3.example.com", 6379, 10000, false);

clusterManager.addNode(slave1);
clusterManager.addNode(slave2);
clusterManager.addNode(slave3);
}

/**
* 配置限流器
*/
private void configureRateLimiters() {
// 配置全局限流器
TokenBucketRateLimiter globalLimiter = new TokenBucketRateLimiter(100000, 1000);
rateLimiterManager.addLimiter("global", globalLimiter);

// 配置用户限流器
DistributedTokenBucketRateLimiter userLimiter = new DistributedTokenBucketRateLimiter(
null, "user", 1000, 100);
rateLimiterManager.addLimiter("user", userLimiter);

// 配置API限流器
SlidingWindowRateLimiter apiLimiter = new SlidingWindowRateLimiter(60000, 1000);
rateLimiterManager.addLimiter("api", apiLimiter);
}

/**
* 处理Redis请求
*/
public RedisResponse handleRequest(RedisRequest request) {
try {
// 1. 全局限流检查
if (!rateLimiterManager.tryAcquire("global", "global")) {
return RedisResponse.rateLimited("全局限流");
}

// 2. 用户限流检查
if (!rateLimiterManager.tryAcquire("user", request.getUserId())) {
return RedisResponse.rateLimited("用户限流");
}

// 3. API限流检查
if (!rateLimiterManager.tryAcquire("api", request.getApiKey())) {
return RedisResponse.rateLimited("API限流");
}

// 4. 选择目标节点
RedisNode targetNode = requestRouter.selectNode(request);
if (targetNode == null) {
return RedisResponse.error("没有可用的节点");
}

// 5. 执行Redis操作
RedisResponse response = executeRedisOperation(targetNode, request);

// 6. 记录指标
recordMetrics(request, response, targetNode);

return response;

} catch (Exception e) {
return RedisResponse.error("请求处理失败: " + e.getMessage());
}
}

/**
* 执行Redis操作
*/
private RedisResponse executeRedisOperation(RedisNode node, RedisRequest request) {
try {
Jedis jedis = new Jedis(node.getHost(), node.getPort());

long startTime = System.currentTimeMillis();
String result = null;

switch (request.getOperation()) {
case GET:
result = jedis.get(request.getKey());
break;
case SET:
result = jedis.set(request.getKey(), request.getValue());
break;
case DEL:
result = String.valueOf(jedis.del(request.getKey()));
break;
case EXISTS:
result = String.valueOf(jedis.exists(request.getKey()));
break;
case INCR:
result = String.valueOf(jedis.incr(request.getKey()));
break;
case DECR:
result = String.valueOf(jedis.decr(request.getKey()));
break;
default:
throw new IllegalArgumentException("不支持的操作: " + request.getOperation());
}

long endTime = System.currentTimeMillis();

jedis.close();

return RedisResponse.success(result, endTime - startTime);

} catch (Exception e) {
return RedisResponse.error("Redis操作失败: " + e.getMessage());
}
}

/**
* 记录指标
*/
private void recordMetrics(RedisRequest request, RedisResponse response, RedisNode node) {
// 记录QPS
qpsMonitor.recordRequest(node.getId(), request, response);

// 记录响应时间
qpsMonitor.recordResponseTime(node.getId(), response.getResponseTime());

// 记录错误率
if (!response.isSuccess()) {
qpsMonitor.recordError(node.getId(), response.getError());
}
}

/**
* 获取集群状态
*/
public ClusterStatus getClusterStatus() {
ClusterStatus status = new ClusterStatus();

// 收集节点状态
List<RedisNode> nodes = clusterManager.getAllNodes();
status.setNodeCount(nodes.size());

// 收集QPS信息
long totalQPS = qpsMonitor.getClusterTotalQPS();
status.setTotalQPS(totalQPS);

// 收集健康状态
Map<String, Boolean> nodeHealth = healthChecker.getNodeHealthStatus();
status.setNodeHealth(nodeHealth);

// 收集限流状态
Map<String, Long> limiterStatus = rateLimiterManager.getLimiterStatus();
status.setLimiterStatus(limiterStatus);

return status;
}

/**
* 动态调整限流配置
*/
public void adjustRateLimit(String limiterName, long newCapacity, long newRate) {
rateLimiterManager.adjustLimiter(limiterName, newCapacity, newRate);
}

/**
* 手动触发扩缩容
*/
public void triggerScaling(ScalingAction action, int targetNodeCount) {
ScalingDecision decision = new ScalingDecision(action, targetNodeCount, "手动触发");
scalingManager.executeScalingDecision(decision);
}
}

/**
* 限流器管理器
*/
class RateLimiterManager {
private final Map<String, Object> limiters;

public RateLimiterManager() {
this.limiters = new ConcurrentHashMap<>();
}

/**
* 添加限流器
*/
public void addLimiter(String name, Object limiter) {
limiters.put(name, limiter);
}

/**
* 尝试获取许可
*/
public boolean tryAcquire(String limiterName, String key) {
Object limiter = limiters.get(limiterName);
if (limiter == null) {
return true; // 没有限流器,允许通过
}

if (limiter instanceof TokenBucketRateLimiter) {
return ((TokenBucketRateLimiter) limiter).tryAcquire();
} else if (limiter instanceof SlidingWindowRateLimiter) {
return ((SlidingWindowRateLimiter) limiter).tryAcquire(key);
} else if (limiter instanceof DistributedTokenBucketRateLimiter) {
return ((DistributedTokenBucketRateLimiter) limiter).tryAcquire(key);
}

return true;
}

/**
* 调整限流器
*/
public void adjustLimiter(String limiterName, long newCapacity, long newRate) {
Object limiter = limiters.get(limiterName);
if (limiter instanceof TokenBucketRateLimiter) {
((TokenBucketRateLimiter) limiter).adjustRefillRate(newRate);
}
}

/**
* 获取限流器状态
*/
public Map<String, Long> getLimiterStatus() {
Map<String, Long> status = new HashMap<>();

for (Map.Entry<String, Object> entry : limiters.entrySet()) {
String name = entry.getKey();
Object limiter = entry.getValue();

if (limiter instanceof TokenBucketRateLimiter) {
status.put(name, ((TokenBucketRateLimiter) limiter).getCurrentTokens());
}
}

return status;
}
}

/**
* 请求路由器
*/
class RequestRouter {
private final RedisClusterManager clusterManager;
private final LoadBalanceAlgorithm loadBalanceAlgorithm;

public RequestRouter(RedisClusterManager clusterManager) {
this.clusterManager = clusterManager;
this.loadBalanceAlgorithm = new ConsistentHashAlgorithm(100);
}

/**
* 选择目标节点
*/
public RedisNode selectNode(RedisRequest request) {
List<RedisNode> availableNodes = clusterManager.getAllNodes().stream()
.filter(node -> isNodeAvailable(node))
.collect(Collectors.toList());

if (availableNodes.isEmpty()) {
return null;
}

// 根据请求类型选择节点
if (request.isReadOnly()) {
return selectReadNode(availableNodes, request);
} else {
return selectWriteNode(availableNodes, request);
}
}

/**
* 选择读节点
*/
private RedisNode selectReadNode(List<RedisNode> nodes, RedisRequest request) {
// 优先选择从节点
List<RedisNode> slaveNodes = nodes.stream()
.filter(node -> !node.isMaster())
.collect(Collectors.toList());

if (!slaveNodes.isEmpty()) {
return loadBalanceAlgorithm.select(slaveNodes, request);
}

// 没有从节点,选择主节点
return loadBalanceAlgorithm.select(nodes, request);
}

/**
* 选择写节点
*/
private RedisNode selectWriteNode(List<RedisNode> nodes, RedisRequest request) {
// 写操作只能选择主节点
List<RedisNode> masterNodes = nodes.stream()
.filter(RedisNode::isMaster)
.collect(Collectors.toList());

if (masterNodes.isEmpty()) {
return null;
}

return loadBalanceAlgorithm.select(masterNodes, request);
}

/**
* 检查节点是否可用
*/
private boolean isNodeAvailable(RedisNode node) {
try {
Jedis jedis = new Jedis(node.getHost(), node.getPort());
String result = jedis.ping();
jedis.close();
return "PONG".equals(result);
} catch (Exception e) {
return false;
}
}
}

/**
* 健康检查器
*/
class HealthChecker {
private final RedisClusterManager clusterManager;
private final Map<String, Boolean> nodeHealth;
private final ScheduledExecutorService healthCheckExecutor;

public HealthChecker(RedisClusterManager clusterManager) {
this.clusterManager = clusterManager;
this.nodeHealth = new ConcurrentHashMap<>();
this.healthCheckExecutor = Executors.newScheduledThreadPool(2);
}

/**
* 启动健康检查
*/
public void start() {
healthCheckExecutor.scheduleAtFixedRate(this::checkNodeHealth,
0, 30, TimeUnit.SECONDS);
}

/**
* 检查节点健康状态
*/
private void checkNodeHealth() {
List<RedisNode> nodes = clusterManager.getAllNodes();

for (RedisNode node : nodes) {
boolean isHealthy = checkNodeHealth(node);
nodeHealth.put(node.getId(), isHealthy);
}
}

/**
* 检查单个节点健康状态
*/
private boolean checkNodeHealth(RedisNode node) {
try {
Jedis jedis = new Jedis(node.getHost(), node.getPort());

// 检查连接
String pingResult = jedis.ping();
if (!"PONG".equals(pingResult)) {
return false;
}

// 检查内存使用率
String info = jedis.info("memory");
Map<String, String> memoryInfo = parseInfo(info);
long usedMemory = Long.parseLong(memoryInfo.getOrDefault("used_memory", "0"));
long maxMemory = Long.parseLong(memoryInfo.getOrDefault("maxmemory", "0"));

if (maxMemory > 0) {
double memoryUsage = (double) usedMemory / maxMemory;
if (memoryUsage > 0.95) { // 95%
return false;
}
}

jedis.close();
return true;

} catch (Exception e) {
return false;
}
}

/**
* 获取节点健康状态
*/
public Map<String, Boolean> getNodeHealthStatus() {
return new HashMap<>(nodeHealth);
}

private Map<String, String> parseInfo(String info) {
Map<String, String> result = new HashMap<>();

String[] lines = info.split("\r\n");
for (String line : lines) {
if (line.contains(":")) {
String[] parts = line.split(":", 2);
if (parts.length == 2) {
result.put(parts[0], parts[1]);
}
}
}

return result;
}
}

/**
* Redis请求
*/
class RedisRequest {
private final String userId;
private final String apiKey;
private final RedisOperation operation;
private final String key;
private final String value;
private final long timestamp;

public RedisRequest(String userId, String apiKey, RedisOperation operation,
String key, String value) {
this.userId = userId;
this.apiKey = apiKey;
this.operation = operation;
this.key = key;
this.value = value;
this.timestamp = System.currentTimeMillis();
}

public String getUserId() { return userId; }
public String getApiKey() { return apiKey; }
public RedisOperation getOperation() { return operation; }
public String getKey() { return key; }
public String getValue() { return value; }
public long getTimestamp() { return timestamp; }

public boolean isReadOnly() {
return operation == RedisOperation.GET || operation == RedisOperation.EXISTS;
}
}

enum RedisOperation {
GET, SET, DEL, EXISTS, INCR, DECR
}

/**
* Redis响应
*/
class RedisResponse {
private final boolean success;
private final String result;
private final String error;
private final long responseTime;
private final long timestamp;

private RedisResponse(boolean success, String result, String error, long responseTime) {
this.success = success;
this.result = result;
this.error = error;
this.responseTime = responseTime;
this.timestamp = System.currentTimeMillis();
}

public static RedisResponse success(String result, long responseTime) {
return new RedisResponse(true, result, null, responseTime);
}

public static RedisResponse error(String error) {
return new RedisResponse(false, null, error, 0);
}

public static RedisResponse rateLimited(String reason) {
return new RedisResponse(false, null, "限流: " + reason, 0);
}

public boolean isSuccess() { return success; }
public String getResult() { return result; }
public String getError() { return error; }
public long getResponseTime() { return responseTime; }
public long getTimestamp() { return timestamp; }
}

/**
* 集群状态
*/
class ClusterStatus {
private int nodeCount;
private long totalQPS;
private Map<String, Boolean> nodeHealth;
private Map<String, Long> limiterStatus;

// getters and setters
public int getNodeCount() { return nodeCount; }
public void setNodeCount(int nodeCount) { this.nodeCount = nodeCount; }
public long getTotalQPS() { return totalQPS; }
public void setTotalQPS(long totalQPS) { this.totalQPS = totalQPS; }
public Map<String, Boolean> getNodeHealth() { return nodeHealth; }
public void setNodeHealth(Map<String, Boolean> nodeHealth) { this.nodeHealth = nodeHealth; }
public Map<String, Long> getLimiterStatus() { return limiterStatus; }
public void setLimiterStatus(Map<String, Long> limiterStatus) { this.limiterStatus = limiterStatus; }
}

6. 总结

本文深入探讨了云端Redis请求数管理的架构师级别技术,涵盖了QPS监控与控制、多种限流算法的实现、弹性扩缩容机制,以及企业级Redis集群的请求数管理解决方案。

关键技术要点:

  1. QPS监控与控制

    • 实时监控Redis集群的QPS、响应时间、错误率
    • 动态调整QPS阈值和配置参数
    • 智能告警和故障检测
  2. 限流算法实现

    • 令牌桶算法:支持固定速率和动态速率调整
    • 滑动窗口算法:基于时间窗口的精确限流
    • 漏桶算法:控制请求的流出速率
    • 分布式限流:基于Redis的分布式限流实现
  3. 弹性扩缩容机制

    • 基于多维度指标的扩缩容决策
    • 自动化的节点添加和移除
    • 数据迁移和负载重分配
    • 健康检查和故障恢复
  4. 企业级集群管理

    • 多维度请求数管理
    • 智能路由和负载均衡
    • 故障转移和容错机制
    • 实时监控和状态管理

架构设计原则:

  • 高可用性:通过冗余、故障转移、健康检查确保服务可用性
  • 高性能:通过限流、负载均衡、缓存优化提升性能
  • 可扩展性:支持水平扩展、动态配置、弹性伸缩
  • 可观测性:全面的监控、告警、性能分析

作为架构师,我们需要深入理解Redis请求数管理的核心技术,掌握各种限流算法和扩缩容策略,并能够根据业务需求设计出高性能、高可用的Redis集群架构。通过本文的实战案例,我们可以更好地理解云端Redis请求数管理在企业级应用中的重要作用。

Redis请求数管理的优化是一个持续的过程,需要根据业务发展和技术演进不断调整和优化。只有深入理解Redis技术的本质,才能设计出真正优秀的Redis集群架构解决方案。