第231集Semaphore接口限流架构实战:高并发流量控制、令牌桶算法、分布式限流的企业级解决方案

前言

在当今高并发、大流量的企业级应用中,接口限流已成为系统稳定性和可用性的重要保障。传统的限流方案往往功能单一、扩展性差,无法满足复杂业务场景的流量控制需求。基于Semaphore(信号量)构建的接口限流系统,不仅能够提供精确的并发控制能力,还能实现灵活的限流策略、智能的流量分配和实时的监控告警。随着微服务架构和云原生技术的普及,构建可扩展、高精度的限流框架,已成为企业级架构师必须掌握的关键技能。

本文将深入探讨基于Semaphore的接口限流架构设计与实战应用,从高并发流量控制到令牌桶算法,从分布式限流到智能降级,为企业构建专业级的接口限流解决方案提供全面的技术指导。

一、Semaphore接口限流架构概述与核心原理

1.1 Semaphore接口限流架构设计

基于Semaphore的接口限流系统采用分层架构设计,通过信号量控制、令牌桶算法、分布式协调等技术,实现高效的流量控制能力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
graph TB
A[客户端请求] --> B[限流拦截器]
B --> C[限流策略]
C --> D[Semaphore控制]
D --> E[令牌获取]
E --> F[请求处理]
F --> G[令牌释放]
G --> H[响应返回]

I[限流策略] --> J[固定窗口]
I --> K[滑动窗口]
I --> L[令牌桶]
I --> M[漏桶算法]

N[Semaphore类型] --> O[计数信号量]
N --> P[公平信号量]
N --> Q[非公平信号量]
N --> R[可重入信号量]

S[监控体系] --> T[QPS监控]
S --> U[响应时间]
S --> V[限流统计]
S --> W[异常告警]

X[分布式限流] --> Y[Redis协调]
X --> Z[Zookeeper协调]
X --> AA[Consul协调]
X --> BB[Etcd协调]

1.2 Semaphore接口限流核心特性

1.2.1 精确并发控制

  • 信号量机制:使用Semaphore精确控制并发数
  • 令牌管理:基于令牌桶算法的流量控制
  • 公平性保证:支持公平和非公平的令牌分配
  • 可重入支持:支持可重入的限流控制

1.2.2 多种限流策略

  • 固定窗口:基于固定时间窗口的限流
  • 滑动窗口:基于滑动时间窗口的限流
  • 令牌桶:基于令牌桶算法的限流
  • 漏桶算法:基于漏桶算法的限流

1.2.3 智能流量管理

  • 动态调整:根据系统负载动态调整限流参数
  • 优先级控制:支持不同优先级的流量控制
  • 熔断降级:系统过载时的自动熔断和降级
  • 异常处理:完善的异常处理和恢复机制

二、Semaphore接口限流核心实现

2.1 基础限流器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// 基础限流器接口
public interface RateLimiter {

/**
* 尝试获取令牌
*/
boolean tryAcquire();

/**
* 尝试获取令牌(带超时)
*/
boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException;

/**
* 获取令牌(阻塞)
*/
void acquire() throws InterruptedException;

/**
* 释放令牌
*/
void release();

/**
* 获取当前可用令牌数
*/
int getAvailablePermits();

/**
* 获取限流器名称
*/
String getName();
}

// 基于Semaphore的限流器
@Component
@Slf4j
public class SemaphoreRateLimiter implements RateLimiter {

private final String name;
private final Semaphore semaphore;
private final int maxPermits;
private final RateLimiterConfig config;

public SemaphoreRateLimiter(String name, int maxPermits, RateLimiterConfig config) {
this.name = name;
this.maxPermits = maxPermits;
this.config = config;

// 根据配置选择公平或非公平信号量
if (config.isFair()) {
this.semaphore = new Semaphore(maxPermits, true);
} else {
this.semaphore = new Semaphore(maxPermits, false);
}
}

@Override
public boolean tryAcquire() {
boolean acquired = semaphore.tryAcquire();
if (acquired) {
log.debug("获取令牌成功: limiter={}, available={}", name, getAvailablePermits());
} else {
log.debug("获取令牌失败: limiter={}, available={}", name, getAvailablePermits());
}
return acquired;
}

@Override
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
boolean acquired = semaphore.tryAcquire(timeout, unit);
if (acquired) {
log.debug("获取令牌成功: limiter={}, timeout={}ms, available={}",
name, unit.toMillis(timeout), getAvailablePermits());
} else {
log.debug("获取令牌超时: limiter={}, timeout={}ms, available={}",
name, unit.toMillis(timeout), getAvailablePermits());
}
return acquired;
}

@Override
public void acquire() throws InterruptedException {
semaphore.acquire();
log.debug("获取令牌成功: limiter={}, available={}", name, getAvailablePermits());
}

@Override
public void release() {
semaphore.release();
log.debug("释放令牌成功: limiter={}, available={}", name, getAvailablePermits());
}

@Override
public int getAvailablePermits() {
return semaphore.availablePermits();
}

@Override
public String getName() {
return name;
}

/**
* 获取信号量状态
*/
public SemaphoreStatus getStatus() {
SemaphoreStatus status = new SemaphoreStatus();
status.setName(name);
status.setMaxPermits(maxPermits);
status.setAvailablePermits(getAvailablePermits());
status.setAcquiredPermits(maxPermits - getAvailablePermits());
status.setUtilizationRate((double) (maxPermits - getAvailablePermits()) / maxPermits);
return status;
}
}

// 限流器配置
public class RateLimiterConfig {
private boolean fair = false; // 是否公平
private long refillPeriod = 1000; // 令牌补充周期(毫秒)
private int refillTokens = 1; // 每次补充的令牌数
private boolean enableWarmup = false; // 是否启用预热
private int warmupPeriod = 3000; // 预热周期(毫秒)
private boolean enableMonitoring = true; // 是否启用监控

// 构造函数和getter/setter方法
}

// 信号量状态
public class SemaphoreStatus {
private String name;
private int maxPermits;
private int availablePermits;
private int acquiredPermits;
private double utilizationRate;

// 构造函数和getter/setter方法
}

2.2 令牌桶限流器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
// 令牌桶限流器
@Component
@Slf4j
public class TokenBucketRateLimiter implements RateLimiter {

private final String name;
private final Semaphore semaphore;
private final int maxTokens;
private final int refillTokens;
private final long refillPeriod;
private final ScheduledExecutorService scheduler;
private final AtomicInteger currentTokens;
private final AtomicLong lastRefillTime;

public TokenBucketRateLimiter(String name, int maxTokens, int refillTokens, long refillPeriod) {
this.name = name;
this.maxTokens = maxTokens;
this.refillTokens = refillTokens;
this.refillPeriod = refillPeriod;
this.semaphore = new Semaphore(maxTokens);
this.currentTokens = new AtomicInteger(maxTokens);
this.lastRefillTime = new AtomicLong(System.currentTimeMillis());

// 创建定时任务补充令牌
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "token-bucket-" + name);
thread.setDaemon(true);
return thread;
});

// 启动令牌补充任务
startTokenRefillTask();
}

@Override
public boolean tryAcquire() {
return tryAcquire(1);
}

@Override
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
return tryAcquire(1, timeout, unit);
}

@Override
public void acquire() throws InterruptedException {
acquire(1);
}

@Override
public void release() {
release(1);
}

/**
* 尝试获取指定数量的令牌
*/
public boolean tryAcquire(int tokens) {
if (tokens <= 0) {
return true;
}

// 先尝试补充令牌
refillTokens();

// 检查是否有足够的令牌
if (currentTokens.get() >= tokens) {
// 扣减令牌
if (currentTokens.addAndGet(-tokens) >= 0) {
try {
semaphore.acquire(tokens);
log.debug("获取令牌成功: limiter={}, tokens={}, available={}",
name, tokens, getAvailablePermits());
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 回滚令牌
currentTokens.addAndGet(tokens);
return false;
}
} else {
// 令牌不足,回滚
currentTokens.addAndGet(tokens);
log.debug("令牌不足: limiter={}, required={}, available={}",
name, tokens, currentTokens.get());
return false;
}
}

return false;
}

/**
* 尝试获取指定数量的令牌(带超时)
*/
public boolean tryAcquire(int tokens, long timeout, TimeUnit unit) throws InterruptedException {
if (tokens <= 0) {
return true;
}

long startTime = System.currentTimeMillis();
long timeoutMillis = unit.toMillis(timeout);

while (System.currentTimeMillis() - startTime < timeoutMillis) {
if (tryAcquire(tokens)) {
return true;
}

// 等待一段时间后重试
Thread.sleep(10);
}

return false;
}

/**
* 获取指定数量的令牌(阻塞)
*/
public void acquire(int tokens) throws InterruptedException {
if (tokens <= 0) {
return;
}

while (!tryAcquire(tokens)) {
Thread.sleep(10);
}
}

/**
* 释放指定数量的令牌
*/
public void release(int tokens) {
if (tokens <= 0) {
return;
}

semaphore.release(tokens);
currentTokens.addAndGet(tokens);

// 确保不超过最大令牌数
int current = currentTokens.get();
if (current > maxTokens) {
currentTokens.set(maxTokens);
}

log.debug("释放令牌成功: limiter={}, tokens={}, available={}",
name, tokens, getAvailablePermits());
}

@Override
public int getAvailablePermits() {
return Math.min(currentTokens.get(), semaphore.availablePermits());
}

@Override
public String getName() {
return name;
}

/**
* 启动令牌补充任务
*/
private void startTokenRefillTask() {
scheduler.scheduleAtFixedRate(() -> {
try {
refillTokens();
} catch (Exception e) {
log.error("令牌补充任务异常: limiter={}", name, e);
}
}, refillPeriod, refillPeriod, TimeUnit.MILLISECONDS);
}

/**
* 补充令牌
*/
private void refillTokens() {
long currentTime = System.currentTimeMillis();
long timePassed = currentTime - lastRefillTime.get();

if (timePassed >= refillPeriod) {
int tokensToAdd = (int) (timePassed / refillPeriod) * refillTokens;
if (tokensToAdd > 0) {
int current = currentTokens.get();
int newTokens = Math.min(current + tokensToAdd, maxTokens);

if (currentTokens.compareAndSet(current, newTokens)) {
lastRefillTime.set(currentTime);
log.debug("补充令牌: limiter={}, added={}, total={}",
name, tokensToAdd, newTokens);
}
}
}
}

/**
* 获取令牌桶状态
*/
public TokenBucketStatus getStatus() {
TokenBucketStatus status = new TokenBucketStatus();
status.setName(name);
status.setMaxTokens(maxTokens);
status.setCurrentTokens(currentTokens.get());
status.setRefillTokens(refillTokens);
status.setRefillPeriod(refillPeriod);
status.setLastRefillTime(lastRefillTime.get());
return status;
}

/**
* 关闭限流器
*/
public void shutdown() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

// 令牌桶状态
public class TokenBucketStatus {
private String name;
private int maxTokens;
private int currentTokens;
private int refillTokens;
private long refillPeriod;
private long lastRefillTime;

// 构造函数和getter/setter方法
}

2.3 滑动窗口限流器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// 滑动窗口限流器
@Component
@Slf4j
public class SlidingWindowRateLimiter implements RateLimiter {

private final String name;
private final Semaphore semaphore;
private final int maxRequests;
private final long windowSize;
private final ConcurrentLinkedQueue<Long> requestTimes;
private final ScheduledExecutorService cleanupScheduler;

public SlidingWindowRateLimiter(String name, int maxRequests, long windowSize, TimeUnit timeUnit) {
this.name = name;
this.maxRequests = maxRequests;
this.windowSize = timeUnit.toMillis(windowSize);
this.semaphore = new Semaphore(maxRequests);
this.requestTimes = new ConcurrentLinkedQueue<>();

// 创建清理任务
this.cleanupScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "sliding-window-" + name);
thread.setDaemon(true);
return thread;
});

// 启动清理任务
startCleanupTask();
}

@Override
public boolean tryAcquire() {
long currentTime = System.currentTimeMillis();

// 清理过期请求
cleanupExpiredRequests(currentTime);

// 检查是否超过限制
if (requestTimes.size() >= maxRequests) {
log.debug("滑动窗口限流: limiter={}, current={}, max={}",
name, requestTimes.size(), maxRequests);
return false;
}

// 尝试获取信号量
boolean acquired = semaphore.tryAcquire();
if (acquired) {
requestTimes.offer(currentTime);
log.debug("滑动窗口获取成功: limiter={}, current={}, max={}",
name, requestTimes.size(), maxRequests);
}

return acquired;
}

@Override
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
long startTime = System.currentTimeMillis();
long timeoutMillis = unit.toMillis(timeout);

while (System.currentTimeMillis() - startTime < timeoutMillis) {
if (tryAcquire()) {
return true;
}

// 等待一段时间后重试
Thread.sleep(10);
}

return false;
}

@Override
public void acquire() throws InterruptedException {
while (!tryAcquire()) {
Thread.sleep(10);
}
}

@Override
public void release() {
semaphore.release();
log.debug("滑动窗口释放: limiter={}, available={}", name, getAvailablePermits());
}

@Override
public int getAvailablePermits() {
return semaphore.availablePermits();
}

@Override
public String getName() {
return name;
}

/**
* 清理过期请求
*/
private void cleanupExpiredRequests(long currentTime) {
long cutoffTime = currentTime - windowSize;

while (!requestTimes.isEmpty() && requestTimes.peek() < cutoffTime) {
requestTimes.poll();
}
}

/**
* 启动清理任务
*/
private void startCleanupTask() {
cleanupScheduler.scheduleAtFixedRate(() -> {
try {
cleanupExpiredRequests(System.currentTimeMillis());
} catch (Exception e) {
log.error("滑动窗口清理任务异常: limiter={}", name, e);
}
}, windowSize / 4, windowSize / 4, TimeUnit.MILLISECONDS);
}

/**
* 获取滑动窗口状态
*/
public SlidingWindowStatus getStatus() {
long currentTime = System.currentTimeMillis();
cleanupExpiredRequests(currentTime);

SlidingWindowStatus status = new SlidingWindowStatus();
status.setName(name);
status.setMaxRequests(maxRequests);
status.setCurrentRequests(requestTimes.size());
status.setWindowSize(windowSize);
status.setAvailableRequests(maxRequests - requestTimes.size());
return status;
}

/**
* 关闭限流器
*/
public void shutdown() {
cleanupScheduler.shutdown();
try {
if (!cleanupScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
cleanupScheduler.shutdownNow();
}
} catch (InterruptedException e) {
cleanupScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

// 滑动窗口状态
public class SlidingWindowStatus {
private String name;
private int maxRequests;
private int currentRequests;
private long windowSize;
private int availableRequests;

// 构造函数和getter/setter方法
}

2.4 限流管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
// 限流管理器
@Service
@Slf4j
public class RateLimiterManager {

private final Map<String, RateLimiter> limiters = new ConcurrentHashMap<>();
private final Map<String, RateLimiterConfig> configs = new ConcurrentHashMap<>();

@Autowired
private RateLimiterConfigService configService;

@Autowired
private RateLimiterMonitorService monitorService;

/**
* 创建限流器
*/
public RateLimiter createRateLimiter(String name, RateLimiterType type, RateLimiterConfig config) {
RateLimiter limiter = limiters.computeIfAbsent(name, k -> {
switch (type) {
case SEMAPHORE:
return new SemaphoreRateLimiter(name, config.getMaxPermits(), config);
case TOKEN_BUCKET:
return new TokenBucketRateLimiter(name, config.getMaxPermits(),
config.getRefillTokens(), config.getRefillPeriod());
case SLIDING_WINDOW:
return new SlidingWindowRateLimiter(name, config.getMaxPermits(),
config.getWindowSize(), TimeUnit.MILLISECONDS);
default:
throw new IllegalArgumentException("不支持的限流器类型: " + type);
}
});

configs.put(name, config);

// 注册监控
if (config.isEnableMonitoring()) {
monitorService.registerLimiter(name, limiter);
}

log.info("创建限流器成功: name={}, type={}", name, type);
return limiter;
}

/**
* 获取限流器
*/
public RateLimiter getRateLimiter(String name) {
return limiters.get(name);
}

/**
* 删除限流器
*/
public boolean removeRateLimiter(String name) {
RateLimiter limiter = limiters.remove(name);
configs.remove(name);

if (limiter != null) {
// 取消监控
monitorService.unregisterLimiter(name);

// 关闭限流器
if (limiter instanceof TokenBucketRateLimiter) {
((TokenBucketRateLimiter) limiter).shutdown();
} else if (limiter instanceof SlidingWindowRateLimiter) {
((SlidingWindowRateLimiter) limiter).shutdown();
}

log.info("删除限流器成功: name={}", name);
return true;
}

return false;
}

/**
* 更新限流器配置
*/
public boolean updateRateLimiterConfig(String name, RateLimiterConfig newConfig) {
RateLimiterConfig oldConfig = configs.get(name);
if (oldConfig == null) {
return false;
}

// 检查配置是否发生变化
if (isConfigChanged(oldConfig, newConfig)) {
// 重新创建限流器
RateLimiter limiter = limiters.get(name);
if (limiter != null) {
removeRateLimiter(name);
createRateLimiter(name, getLimiterType(limiter), newConfig);

log.info("更新限流器配置成功: name={}", name);
return true;
}
}

return false;
}

/**
* 获取所有限流器状态
*/
public Map<String, Object> getAllLimiterStatus() {
Map<String, Object> statusMap = new HashMap<>();

limiters.forEach((name, limiter) -> {
if (limiter instanceof SemaphoreRateLimiter) {
statusMap.put(name, ((SemaphoreRateLimiter) limiter).getStatus());
} else if (limiter instanceof TokenBucketRateLimiter) {
statusMap.put(name, ((TokenBucketRateLimiter) limiter).getStatus());
} else if (limiter instanceof SlidingWindowRateLimiter) {
statusMap.put(name, ((SlidingWindowRateLimiter) limiter).getStatus());
}
});

return statusMap;
}

/**
* 检查配置是否发生变化
*/
private boolean isConfigChanged(RateLimiterConfig oldConfig, RateLimiterConfig newConfig) {
return oldConfig.getMaxPermits() != newConfig.getMaxPermits() ||
oldConfig.getRefillPeriod() != newConfig.getRefillPeriod() ||
oldConfig.getRefillTokens() != newConfig.getRefillTokens() ||
oldConfig.isFair() != newConfig.isFair();
}

/**
* 获取限流器类型
*/
private RateLimiterType getLimiterType(RateLimiter limiter) {
if (limiter instanceof SemaphoreRateLimiter) {
return RateLimiterType.SEMAPHORE;
} else if (limiter instanceof TokenBucketRateLimiter) {
return RateLimiterType.TOKEN_BUCKET;
} else if (limiter instanceof SlidingWindowRateLimiter) {
return RateLimiterType.SLIDING_WINDOW;
}
return RateLimiterType.SEMAPHORE;
}

@PreDestroy
public void destroy() {
// 关闭所有限流器
limiters.values().forEach(limiter -> {
if (limiter instanceof TokenBucketRateLimiter) {
((TokenBucketRateLimiter) limiter).shutdown();
} else if (limiter instanceof SlidingWindowRateLimiter) {
((SlidingWindowRateLimiter) limiter).shutdown();
}
});

limiters.clear();
configs.clear();

log.info("限流管理器已关闭");
}
}

// 限流器类型枚举
public enum RateLimiterType {
SEMAPHORE, // 信号量限流器
TOKEN_BUCKET, // 令牌桶限流器
SLIDING_WINDOW // 滑动窗口限流器
}

2.5 限流拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// 限流拦截器
@Component
@Slf4j
public class RateLimitInterceptor implements HandlerInterceptor {

@Autowired
private RateLimiterManager limiterManager;

@Autowired
private RateLimiterConfigService configService;

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) throws Exception {

// 1. 获取限流配置
String requestPath = request.getRequestURI();
RateLimiterConfig config = configService.getConfig(requestPath);

if (config == null) {
return true; // 没有配置限流,直接通过
}

// 2. 获取限流器
String limiterName = "api:" + requestPath;
RateLimiter limiter = limiterManager.getRateLimiter(limiterName);

if (limiter == null) {
// 创建限流器
limiter = limiterManager.createRateLimiter(limiterName,
RateLimiterType.SEMAPHORE, config);
}

// 3. 尝试获取令牌
boolean acquired = limiter.tryAcquire();

if (!acquired) {
// 4. 限流处理
handleRateLimit(request, response, limiter);
return false;
}

// 5. 将限流器添加到请求属性中,用于后续释放
request.setAttribute("rateLimiter", limiter);

return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception ex) throws Exception {

// 释放令牌
RateLimiter limiter = (RateLimiter) request.getAttribute("rateLimiter");
if (limiter != null) {
limiter.release();
}
}

/**
* 处理限流
*/
private void handleRateLimit(HttpServletRequest request, HttpServletResponse response,
RateLimiter limiter) throws IOException {

// 设置响应头
response.setStatus(HttpStatus.TOO_MANY_REQUESTS.value());
response.setContentType("application/json;charset=UTF-8");

// 添加限流信息到响应头
response.setHeader("X-RateLimit-Limit", String.valueOf(limiter.getAvailablePermits()));
response.setHeader("X-RateLimit-Remaining", "0");
response.setHeader("X-RateLimit-Reset", String.valueOf(System.currentTimeMillis() + 1000));

// 返回错误信息
Map<String, Object> errorResponse = new HashMap<>();
errorResponse.put("code", 429);
errorResponse.put("message", "请求过于频繁,请稍后再试");
errorResponse.put("timestamp", System.currentTimeMillis());

response.getWriter().write(new ObjectMapper().writeValueAsString(errorResponse));

log.warn("接口限流: path={}, available={}", request.getRequestURI(),
limiter.getAvailablePermits());
}
}

// 限流配置服务
@Service
@Slf4j
public class RateLimiterConfigService {

private final Map<String, RateLimiterConfig> configs = new ConcurrentHashMap<>();

@PostConstruct
public void init() {
// 初始化默认配置
loadDefaultConfigs();

// 从数据库加载配置
loadConfigsFromDatabase();

log.info("限流配置服务初始化完成");
}

/**
* 获取限流配置
*/
public RateLimiterConfig getConfig(String path) {
return configs.get(path);
}

/**
* 设置限流配置
*/
public void setConfig(String path, RateLimiterConfig config) {
configs.put(path, config);
log.info("设置限流配置: path={}, config={}", path, config);
}

/**
* 删除限流配置
*/
public void removeConfig(String path) {
configs.remove(path);
log.info("删除限流配置: path={}", path);
}

/**
* 获取所有配置
*/
public Map<String, RateLimiterConfig> getAllConfigs() {
return new HashMap<>(configs);
}

/**
* 加载默认配置
*/
private void loadDefaultConfigs() {
// API限流配置
RateLimiterConfig apiConfig = new RateLimiterConfig();
apiConfig.setMaxPermits(100);
apiConfig.setRefillPeriod(1000);
apiConfig.setRefillTokens(10);
apiConfig.setFair(false);
apiConfig.setEnableMonitoring(true);

configs.put("/api/**", apiConfig);

// 登录接口限流配置
RateLimiterConfig loginConfig = new RateLimiterConfig();
loginConfig.setMaxPermits(10);
loginConfig.setRefillPeriod(60000);
loginConfig.setRefillTokens(1);
loginConfig.setFair(true);
loginConfig.setEnableMonitoring(true);

configs.put("/api/login", loginConfig);

// 上传接口限流配置
RateLimiterConfig uploadConfig = new RateLimiterConfig();
uploadConfig.setMaxPermits(5);
uploadConfig.setRefillPeriod(1000);
uploadConfig.setRefillTokens(1);
uploadConfig.setFair(false);
uploadConfig.setEnableMonitoring(true);

configs.put("/api/upload", uploadConfig);
}

/**
* 从数据库加载配置
*/
private void loadConfigsFromDatabase() {
// TODO: 从数据库加载配置
// 这里可以实现从数据库动态加载限流配置的逻辑
}
}

三、分布式限流实现

3.1 Redis分布式限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Redis分布式限流器
@Component
@Slf4j
public class RedisDistributedRateLimiter implements RateLimiter {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final String name;
private final int maxPermits;
private final long windowSize;
private final String redisKey;

private static final String LUA_SCRIPT =
"local key = KEYS[1]\n" +
"local window = tonumber(ARGV[1])\n" +
"local limit = tonumber(ARGV[2])\n" +
"local current = redis.call('GET', key)\n" +
"if current == false then\n" +
" redis.call('SET', key, 1)\n" +
" redis.call('EXPIRE', key, window)\n" +
" return 1\n" +
"else\n" +
" local count = tonumber(current)\n" +
" if count < limit then\n" +
" redis.call('INCR', key)\n" +
" return count + 1\n" +
" else\n" +
" return -1\n" +
" end\n" +
"end";

public RedisDistributedRateLimiter(String name, int maxPermits, long windowSize, TimeUnit timeUnit) {
this.name = name;
this.maxPermits = maxPermits;
this.windowSize = timeUnit.toSeconds(windowSize);
this.redisKey = "rate_limit:" + name + ":" + (System.currentTimeMillis() / 1000 / windowSize);
}

@Override
public boolean tryAcquire() {
try {
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(LUA_SCRIPT);
script.setResultType(Long.class);

Long result = redisTemplate.execute(script,
Collections.singletonList(redisKey),
String.valueOf(windowSize),
String.valueOf(maxPermits));

boolean acquired = result != null && result > 0;

if (acquired) {
log.debug("Redis分布式限流获取成功: limiter={}, result={}", name, result);
} else {
log.debug("Redis分布式限流获取失败: limiter={}, result={}", name, result);
}

return acquired;

} catch (Exception e) {
log.error("Redis分布式限流异常: limiter={}", name, e);
return false;
}
}

@Override
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
long startTime = System.currentTimeMillis();
long timeoutMillis = unit.toMillis(timeout);

while (System.currentTimeMillis() - startTime < timeoutMillis) {
if (tryAcquire()) {
return true;
}

Thread.sleep(10);
}

return false;
}

@Override
public void acquire() throws InterruptedException {
while (!tryAcquire()) {
Thread.sleep(10);
}
}

@Override
public void release() {
// Redis分布式限流不需要手动释放
log.debug("Redis分布式限流释放: limiter={}", name);
}

@Override
public int getAvailablePermits() {
try {
String countStr = (String) redisTemplate.opsForValue().get(redisKey);
if (countStr == null) {
return maxPermits;
}

int currentCount = Integer.parseInt(countStr);
return Math.max(0, maxPermits - currentCount);

} catch (Exception e) {
log.error("获取可用令牌数异常: limiter={}", name, e);
return maxPermits;
}
}

@Override
public String getName() {
return name;
}
}

3.2 分布式限流协调器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// 分布式限流协调器
@Service
@Slf4j
public class DistributedRateLimiterCoordinator {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private RateLimiterManager limiterManager;

private final Map<String, DistributedRateLimiterInfo> distributedLimiters = new ConcurrentHashMap<>();

/**
* 创建分布式限流器
*/
public RateLimiter createDistributedRateLimiter(String name, int maxPermits,
long windowSize, TimeUnit timeUnit) {

DistributedRateLimiterInfo info = new DistributedRateLimiterInfo();
info.setName(name);
info.setMaxPermits(maxPermits);
info.setWindowSize(timeUnit.toSeconds(windowSize));
info.setCreateTime(System.currentTimeMillis());

distributedLimiters.put(name, info);

// 创建Redis分布式限流器
RateLimiter limiter = new RedisDistributedRateLimiter(name, maxPermits, windowSize, timeUnit);

// 注册到限流管理器
limiterManager.createRateLimiter(name, RateLimiterType.SEMAPHORE,
createConfig(maxPermits, windowSize, timeUnit));

log.info("创建分布式限流器成功: name={}, maxPermits={}, windowSize={}",
name, maxPermits, windowSize);

return limiter;
}

/**
* 同步限流器状态
*/
@Scheduled(fixedRate = 5000)
public void syncLimiterStatus() {
distributedLimiters.forEach((name, info) -> {
try {
// 获取当前状态
RateLimiter limiter = limiterManager.getRateLimiter(name);
if (limiter != null) {
int availablePermits = limiter.getAvailablePermits();

// 更新Redis状态
String statusKey = "rate_limit_status:" + name;
Map<String, Object> status = new HashMap<>();
status.put("availablePermits", availablePermits);
status.put("maxPermits", info.getMaxPermits());
status.put("lastUpdateTime", System.currentTimeMillis());

redisTemplate.opsForValue().set(statusKey, status, 30, TimeUnit.SECONDS);
}

} catch (Exception e) {
log.error("同步限流器状态失败: name={}", name, e);
}
});
}

/**
* 获取分布式限流器状态
*/
public Map<String, Object> getDistributedLimiterStatus(String name) {
String statusKey = "rate_limit_status:" + name;
return (Map<String, Object>) redisTemplate.opsForValue().get(statusKey);
}

/**
* 获取所有分布式限流器状态
*/
public Map<String, Object> getAllDistributedLimiterStatus() {
Map<String, Object> allStatus = new HashMap<>();

distributedLimiters.keySet().forEach(name -> {
Map<String, Object> status = getDistributedLimiterStatus(name);
if (status != null) {
allStatus.put(name, status);
}
});

return allStatus;
}

/**
* 创建限流器配置
*/
private RateLimiterConfig createConfig(int maxPermits, long windowSize, TimeUnit timeUnit) {
RateLimiterConfig config = new RateLimiterConfig();
config.setMaxPermits(maxPermits);
config.setRefillPeriod(timeUnit.toMillis(windowSize));
config.setRefillTokens(maxPermits);
config.setFair(false);
config.setEnableMonitoring(true);
return config;
}
}

// 分布式限流器信息
public class DistributedRateLimiterInfo {
private String name;
private int maxPermits;
private long windowSize;
private long createTime;

// 构造函数和getter/setter方法
}

四、限流监控与管理

4.1 限流监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// 限流监控服务
@Service
@Slf4j
public class RateLimiterMonitorService {

private final Map<String, RateLimiter> monitoredLimiters = new ConcurrentHashMap<>();
private final Map<String, RateLimiterMetrics> metricsMap = new ConcurrentHashMap<>();

private final ScheduledExecutorService metricsCollector = Executors.newSingleThreadScheduledExecutor();

@PostConstruct
public void init() {
// 启动指标收集任务
metricsCollector.scheduleAtFixedRate(this::collectMetrics, 1, 1, TimeUnit.SECONDS);

log.info("限流监控服务初始化完成");
}

/**
* 注册限流器监控
*/
public void registerLimiter(String name, RateLimiter limiter) {
monitoredLimiters.put(name, limiter);
metricsMap.put(name, new RateLimiterMetrics(name));

log.info("注册限流器监控: name={}", name);
}

/**
* 取消限流器监控
*/
public void unregisterLimiter(String name) {
monitoredLimiters.remove(name);
metricsMap.remove(name);

log.info("取消限流器监控: name={}", name);
}

/**
* 收集指标
*/
private void collectMetrics() {
monitoredLimiters.forEach((name, limiter) -> {
try {
RateLimiterMetrics metrics = metricsMap.get(name);
if (metrics != null) {
metrics.update(limiter.getAvailablePermits());
}
} catch (Exception e) {
log.error("收集指标异常: name={}", name, e);
}
});
}

/**
* 获取限流器指标
*/
public RateLimiterMetrics getMetrics(String name) {
return metricsMap.get(name);
}

/**
* 获取所有限流器指标
*/
public Map<String, RateLimiterMetrics> getAllMetrics() {
return new HashMap<>(metricsMap);
}

/**
* 获取限流器统计报告
*/
public RateLimiterReport getReport(String name) {
RateLimiterMetrics metrics = metricsMap.get(name);
if (metrics == null) {
return null;
}

RateLimiterReport report = new RateLimiterReport();
report.setName(name);
report.setTotalRequests(metrics.getTotalRequests());
report.setSuccessfulRequests(metrics.getSuccessfulRequests());
report.setFailedRequests(metrics.getFailedRequests());
report.setSuccessRate(metrics.getSuccessRate());
report.setAverageResponseTime(metrics.getAverageResponseTime());
report.setMaxResponseTime(metrics.getMaxResponseTime());
report.setMinResponseTime(metrics.getMinResponseTime());

return report;
}

@PreDestroy
public void destroy() {
metricsCollector.shutdown();
try {
if (!metricsCollector.awaitTermination(5, TimeUnit.SECONDS)) {
metricsCollector.shutdownNow();
}
} catch (InterruptedException e) {
metricsCollector.shutdownNow();
Thread.currentThread().interrupt();
}

log.info("限流监控服务已关闭");
}
}

// 限流器指标
public class RateLimiterMetrics {
private final String name;
private final AtomicLong totalRequests = new AtomicLong(0);
private final AtomicLong successfulRequests = new AtomicLong(0);
private final AtomicLong failedRequests = new AtomicLong(0);
private final AtomicLong totalResponseTime = new AtomicLong(0);
private final AtomicLong maxResponseTime = new AtomicLong(0);
private final AtomicLong minResponseTime = new AtomicLong(Long.MAX_VALUE);

private volatile int lastAvailablePermits = 0;
private volatile long lastUpdateTime = System.currentTimeMillis();

public RateLimiterMetrics(String name) {
this.name = name;
}

public void update(int availablePermits) {
long currentTime = System.currentTimeMillis();
long timeDiff = currentTime - lastUpdateTime;

if (timeDiff > 0) {
// 计算请求数(基于令牌变化)
int permitsUsed = lastAvailablePermits - availablePermits;
if (permitsUsed > 0) {
totalRequests.addAndGet(permitsUsed);
successfulRequests.addAndGet(permitsUsed);
}

lastAvailablePermits = availablePermits;
lastUpdateTime = currentTime;
}
}

public void recordRequest(boolean success, long responseTime) {
totalRequests.incrementAndGet();

if (success) {
successfulRequests.incrementAndGet();
} else {
failedRequests.incrementAndGet();
}

totalResponseTime.addAndGet(responseTime);

// 更新最大响应时间
long currentMax = maxResponseTime.get();
while (responseTime > currentMax && !maxResponseTime.compareAndSet(currentMax, responseTime)) {
currentMax = maxResponseTime.get();
}

// 更新最小响应时间
long currentMin = minResponseTime.get();
while (responseTime < currentMin && !minResponseTime.compareAndSet(currentMin, responseTime)) {
currentMin = minResponseTime.get();
}
}

public double getSuccessRate() {
long total = totalRequests.get();
return total > 0 ? (double) successfulRequests.get() / total : 0.0;
}

public double getAverageResponseTime() {
long total = totalRequests.get();
return total > 0 ? (double) totalResponseTime.get() / total : 0.0;
}

// getter方法
}

// 限流器报告
public class RateLimiterReport {
private String name;
private long totalRequests;
private long successfulRequests;
private long failedRequests;
private double successRate;
private double averageResponseTime;
private long maxResponseTime;
private long minResponseTime;

// 构造函数和getter/setter方法
}

五、最佳实践与总结

5.1 Semaphore接口限流最佳实践

5.1.1 限流策略选择

  • 固定窗口:适合简单的限流需求
  • 滑动窗口:适合需要平滑限流的场景
  • 令牌桶:适合突发流量处理
  • 漏桶算法:适合严格控制流量的场景

5.1.2 参数调优策略

  • 并发数设置:根据系统承载能力设置
  • 窗口大小:根据业务特点设置时间窗口
  • 令牌补充:根据业务需求设置补充策略
  • 公平性:根据业务优先级选择公平或非公平

5.1.3 监控告警策略

  • 实时监控:监控限流器状态和指标
  • 异常告警:限流异常时的及时告警
  • 性能分析:分析限流对系统性能的影响
  • 容量规划:基于监控数据进行容量规划

5.1.4 分布式限流策略

  • 一致性保证:保证分布式环境下的限流一致性
  • 故障处理:Redis等中间件故障时的降级策略
  • 负载均衡:合理分配限流负载
  • 数据同步:限流状态的实时同步

5.2 架构演进建议

5.2.1 微服务架构演进

  • 服务拆分:将限流服务拆分为独立微服务
  • 服务治理:实现服务的注册发现、负载均衡
  • 配置管理:统一的限流配置管理
  • 监控集成:集成微服务监控体系

5.2.2 云原生架构演进

  • 容器化部署:使用Docker等容器技术部署
  • 弹性伸缩:实现基于负载的自动扩缩容
  • 服务网格:使用Istio等服务网格技术
  • 云原生存储:使用云原生的存储服务

5.2.3 智能化限流

  • AI驱动优化:使用机器学习优化限流参数
  • 自适应限流:根据系统负载自动调整限流策略
  • 预测性限流:预测流量峰值并提前限流
  • 智能降级:基于业务重要性的智能降级

5.3 总结

基于Semaphore的接口限流为企业提供了强大、灵活的流量控制能力。通过合理的架构设计,完善的限流策略,精确的并发控制,可以实现专业级的接口限流解决方案。随着微服务架构和云原生技术的普及,限流系统将更加智能化和自动化。

在未来的发展中,企业需要持续关注技术发展趋势,不断优化和完善限流策略,以适应不断变化的业务需求和技术环境。通过本文的深入分析和实践指导,希望能够为企业构建高质量的接口限流解决方案提供有价值的参考和帮助,推动企业级应用在流量控制场景下的稳定运行和持续发展。