第229集JUC压测工具类架构实战:高并发测试、性能监控、压力分析的企业级解决方案

前言

在当今高并发、大流量的企业级应用中,压力测试已成为系统性能评估和优化的重要手段。传统的压测工具往往功能单一、扩展性差,无法满足复杂业务场景的测试需求。基于JUC(Java并发工具包)构建的压测工具类,不仅能够提供强大的并发控制能力,还能实现精确的性能监控、详细的压力分析和灵活的测试策略。随着微服务架构和云原生技术的普及,构建可扩展、高精度的压测框架,已成为企业级架构师必须掌握的关键技能。

本文将深入探讨基于JUC的压测工具类架构设计与实战应用,从高并发测试到性能监控,从压力分析到结果统计,为企业构建专业级的压力测试解决方案提供全面的技术指导。

一、JUC压测工具架构概述与核心原理

1.1 JUC压测工具架构设计

基于JUC的压测工具采用分层架构设计,通过线程池管理、并发控制、性能监控等技术,实现高效的压力测试能力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
graph TB
A[压测任务] --> B[任务分发器]
B --> C[线程池管理]
C --> D[并发执行]
D --> E[性能监控]
E --> F[结果统计]
F --> G[报告生成]

H[JUC组件] --> I[ThreadPoolExecutor]
H --> J[CountDownLatch]
H --> K[CyclicBarrier]
H --> L[ConcurrentHashMap]
H --> M[AtomicLong]
H --> N[Semaphore]

O[监控指标] --> P[QPS统计]
O --> Q[响应时间]
O --> R[错误率]
O --> S[资源使用率]

T[测试策略] --> U[固定并发]
T --> V[阶梯递增]
T --> W[随机波动]
T --> X[峰值测试]

1.2 JUC压测工具核心特性

1.2.1 高并发控制

  • 线程池管理:使用ThreadPoolExecutor精确控制并发数
  • 同步机制:利用CountDownLatch和CyclicBarrier实现任务同步
  • 资源控制:通过Semaphore控制资源访问并发度
  • 原子操作:使用AtomicLong等原子类保证计数准确性

1.2.2 性能监控体系

  • 实时监控:实时统计QPS、响应时间等关键指标
  • 资源监控:监控CPU、内存、网络等系统资源
  • 错误监控:统计各种错误类型和错误率
  • 自定义指标:支持业务自定义监控指标

1.2.3 压力分析能力

  • 压力分布:分析不同时间段的压力分布
  • 性能瓶颈:识别系统性能瓶颈点
  • 容量评估:评估系统最大承载能力
  • 趋势分析:分析性能变化趋势

二、JUC压测工具类核心实现

2.1 压测任务接口设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// 压测任务接口
public interface StressTestTask {

/**
* 执行压测任务
*/
StressTestResult execute() throws Exception;

/**
* 获取任务名称
*/
String getTaskName();

/**
* 获取任务描述
*/
String getTaskDescription();

/**
* 获取任务超时时间
*/
long getTimeout();

/**
* 是否支持重试
*/
boolean isRetryable();

/**
* 获取重试次数
*/
int getRetryCount();
}

// 压测结果
public class StressTestResult {
private boolean success;
private long responseTime;
private String errorMessage;
private Map<String, Object> customMetrics;
private long timestamp;

public StressTestResult(boolean success, long responseTime) {
this.success = success;
this.responseTime = responseTime;
this.timestamp = System.currentTimeMillis();
this.customMetrics = new HashMap<>();
}

// 构造函数和getter/setter方法
}

// HTTP压测任务
public class HttpStressTestTask implements StressTestTask {

private final String url;
private final String method;
private final Map<String, String> headers;
private final String body;
private final long timeout;

public HttpStressTestTask(String url, String method, Map<String, String> headers,
String body, long timeout) {
this.url = url;
this.method = method;
this.headers = headers != null ? headers : new HashMap<>();
this.body = body;
this.timeout = timeout;
}

@Override
public StressTestResult execute() throws Exception {
long startTime = System.currentTimeMillis();

try {
// 创建HTTP客户端
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(timeout, TimeUnit.MILLISECONDS)
.readTimeout(timeout, TimeUnit.MILLISECONDS)
.writeTimeout(timeout, TimeUnit.MILLISECONDS)
.build();

// 构建请求
Request.Builder requestBuilder = new Request.Builder().url(url);

// 设置请求头
headers.forEach(requestBuilder::addHeader);

// 设置请求方法和body
if ("POST".equalsIgnoreCase(method) && body != null) {
RequestBody requestBody = RequestBody.create(
MediaType.parse("application/json"), body);
requestBuilder.post(requestBody);
} else if ("GET".equalsIgnoreCase(method)) {
requestBuilder.get();
}

Request request = requestBuilder.build();

// 执行请求
try (Response response = client.newCall(request).execute()) {
long responseTime = System.currentTimeMillis() - startTime;

StressTestResult result = new StressTestResult(
response.isSuccessful(), responseTime);

// 添加自定义指标
result.getCustomMetrics().put("statusCode", response.code());
result.getCustomMetrics().put("responseSize",
response.body() != null ? response.body().contentLength() : 0);

return result;
}

} catch (Exception e) {
long responseTime = System.currentTimeMillis() - startTime;
StressTestResult result = new StressTestResult(false, responseTime);
result.setErrorMessage(e.getMessage());
return result;
}
}

@Override
public String getTaskName() {
return "HTTP_" + method + "_" + url;
}

@Override
public String getTaskDescription() {
return "HTTP压测任务: " + method + " " + url;
}

@Override
public long getTimeout() {
return timeout;
}

@Override
public boolean isRetryable() {
return true;
}

@Override
public int getRetryCount() {
return 3;
}
}

2.2 JUC压测引擎

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
// JUC压测引擎
@Component
@Slf4j
public class JucStressTestEngine {

private final AtomicLong totalRequests = new AtomicLong(0);
private final AtomicLong successRequests = new AtomicLong(0);
private final AtomicLong failedRequests = new AtomicLong(0);

private final ConcurrentHashMap<String, AtomicLong> customCounters = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, AtomicLong> responseTimeBuckets = new ConcurrentHashMap<>();

private volatile boolean isRunning = false;
private volatile long startTime;
private volatile long endTime;

/**
* 执行固定并发压测
*/
public StressTestReport executeFixedConcurrencyTest(StressTestTask task,
int concurrency,
int totalRequests,
long duration) {
log.info("开始执行固定并发压测: concurrency={}, totalRequests={}, duration={}ms",
concurrency, totalRequests, duration);

// 1. 初始化压测环境
initializeTestEnvironment();

// 2. 创建线程池
ThreadPoolExecutor executor = createThreadPool(concurrency);

// 3. 创建同步器
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch finishLatch = new CountDownLatch(totalRequests);

// 4. 创建监控器
StressTestMonitor monitor = new StressTestMonitor();
monitor.start();

try {
// 5. 提交压测任务
for (int i = 0; i < totalRequests; i++) {
executor.submit(() -> {
try {
// 等待所有线程准备就绪
startLatch.await();

// 执行压测任务
executeTask(task, monitor);

} catch (Exception e) {
log.error("压测任务执行失败", e);
} finally {
finishLatch.countDown();
}
});
}

// 6. 开始压测
startTime = System.currentTimeMillis();
startLatch.countDown();

// 7. 等待压测完成
boolean finished = finishLatch.await(duration, TimeUnit.MILLISECONDS);
endTime = System.currentTimeMillis();

if (!finished) {
log.warn("压测超时,强制结束");
executor.shutdownNow();
}

// 8. 生成压测报告
return generateReport(monitor);

} catch (Exception e) {
log.error("压测执行失败", e);
throw new RuntimeException("压测执行失败", e);
} finally {
// 9. 清理资源
cleanup(executor, monitor);
}
}

/**
* 执行阶梯递增压测
*/
public StressTestReport executeRampUpTest(StressTestTask task,
int initialConcurrency,
int maxConcurrency,
int stepSize,
long stepDuration,
long totalDuration) {
log.info("开始执行阶梯递增压测: initialConcurrency={}, maxConcurrency={}, stepSize={}, stepDuration={}ms, totalDuration={}ms",
initialConcurrency, maxConcurrency, stepSize, stepDuration, totalDuration);

// 1. 初始化压测环境
initializeTestEnvironment();

// 2. 创建监控器
StressTestMonitor monitor = new StressTestMonitor();
monitor.start();

// 3. 创建线程池
ThreadPoolExecutor executor = createThreadPool(maxConcurrency);

// 4. 创建同步器
CyclicBarrier barrier = new CyclicBarrier(maxConcurrency);
CountDownLatch finishLatch = new CountDownLatch(maxConcurrency);

try {
startTime = System.currentTimeMillis();

// 5. 阶梯递增执行
for (int concurrency = initialConcurrency;
concurrency <= maxConcurrency &&
(System.currentTimeMillis() - startTime) < totalDuration;
concurrency += stepSize) {

log.info("当前并发数: {}", concurrency);

// 创建当前阶梯的同步器
CountDownLatch stepStartLatch = new CountDownLatch(1);
CountDownLatch stepFinishLatch = new CountDownLatch(concurrency);

// 提交当前阶梯的任务
for (int i = 0; i < concurrency; i++) {
executor.submit(() -> {
try {
stepStartLatch.await();

// 执行压测任务
executeTask(task, monitor);

} catch (Exception e) {
log.error("阶梯压测任务执行失败", e);
} finally {
stepFinishLatch.countDown();
}
});
}

// 开始当前阶梯
stepStartLatch.countDown();

// 等待当前阶梯完成
stepFinishLatch.await(stepDuration, TimeUnit.MILLISECONDS);

// 阶梯间隔
Thread.sleep(1000);
}

endTime = System.currentTimeMillis();

// 6. 生成压测报告
return generateReport(monitor);

} catch (Exception e) {
log.error("阶梯压测执行失败", e);
throw new RuntimeException("阶梯压测执行失败", e);
} finally {
// 7. 清理资源
cleanup(executor, monitor);
}
}

/**
* 执行峰值测试
*/
public StressTestReport executePeakTest(StressTestTask task,
int baseConcurrency,
int peakConcurrency,
long baseDuration,
long peakDuration,
int peakCount) {
log.info("开始执行峰值测试: baseConcurrency={}, peakConcurrency={}, baseDuration={}ms, peakDuration={}ms, peakCount={}",
baseConcurrency, peakConcurrency, baseDuration, peakDuration, peakCount);

// 1. 初始化压测环境
initializeTestEnvironment();

// 2. 创建监控器
StressTestMonitor monitor = new StressTestMonitor();
monitor.start();

// 3. 创建线程池
ThreadPoolExecutor executor = createThreadPool(peakConcurrency);

try {
startTime = System.currentTimeMillis();

// 4. 执行峰值测试
for (int peak = 0; peak < peakCount; peak++) {
// 基础负载阶段
log.info("执行基础负载阶段: 并发数={}", baseConcurrency);
executeConcurrencyPhase(task, executor, baseConcurrency, baseDuration, monitor);

// 峰值负载阶段
log.info("执行峰值负载阶段: 并发数={}", peakConcurrency);
executeConcurrencyPhase(task, executor, peakConcurrency, peakDuration, monitor);

// 峰值间隔
Thread.sleep(2000);
}

endTime = System.currentTimeMillis();

// 5. 生成压测报告
return generateReport(monitor);

} catch (Exception e) {
log.error("峰值测试执行失败", e);
throw new RuntimeException("峰值测试执行失败", e);
} finally {
// 6. 清理资源
cleanup(executor, monitor);
}
}

/**
* 执行并发阶段
*/
private void executeConcurrencyPhase(StressTestTask task,
ThreadPoolExecutor executor,
int concurrency,
long duration,
StressTestMonitor monitor) throws InterruptedException {

CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch finishLatch = new CountDownLatch(concurrency);

// 提交任务
for (int i = 0; i < concurrency; i++) {
executor.submit(() -> {
try {
startLatch.await();
executeTask(task, monitor);
} catch (Exception e) {
log.error("并发阶段任务执行失败", e);
} finally {
finishLatch.countDown();
}
});
}

// 开始执行
startLatch.countDown();

// 等待完成
finishLatch.await(duration, TimeUnit.MILLISECONDS);
}

/**
* 执行单个任务
*/
private void executeTask(StressTestTask task, StressTestMonitor monitor) {
int retryCount = 0;
int maxRetries = task.isRetryable() ? task.getRetryCount() : 0;

while (retryCount <= maxRetries) {
try {
long taskStartTime = System.currentTimeMillis();

// 执行任务
StressTestResult result = task.execute();

long taskEndTime = System.currentTimeMillis();
long responseTime = taskEndTime - taskStartTime;

// 更新统计
totalRequests.incrementAndGet();

if (result.isSuccess()) {
successRequests.incrementAndGet();
} else {
failedRequests.incrementAndGet();
}

// 更新响应时间分布
updateResponseTimeDistribution(responseTime);

// 更新监控指标
monitor.recordResult(result, responseTime);

return; // 成功执行,退出重试循环

} catch (Exception e) {
retryCount++;

if (retryCount > maxRetries) {
// 超过最大重试次数
totalRequests.incrementAndGet();
failedRequests.incrementAndGet();

StressTestResult result = new StressTestResult(false, 0);
result.setErrorMessage(e.getMessage());
monitor.recordResult(result, 0);

log.error("任务执行失败,超过最大重试次数: {}", task.getTaskName(), e);
return;
}

// 重试延迟
try {
Thread.sleep(1000 * retryCount); // 指数退避
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
}
}

/**
* 创建线程池
*/
private ThreadPoolExecutor createThreadPool(int concurrency) {
return new ThreadPoolExecutor(
concurrency, // 核心线程数
concurrency, // 最大线程数
60L, // 空闲时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(1000), // 队列
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "stress-test-" + threadNumber.getAndIncrement());
thread.setDaemon(false);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}

/**
* 更新响应时间分布
*/
private void updateResponseTimeDistribution(long responseTime) {
String bucket = getResponseTimeBucket(responseTime);
responseTimeBuckets.computeIfAbsent(bucket, k -> new AtomicLong(0)).incrementAndGet();
}

/**
* 获取响应时间分桶
*/
private String getResponseTimeBucket(long responseTime) {
if (responseTime < 100) return "0-100ms";
if (responseTime < 500) return "100-500ms";
if (responseTime < 1000) return "500ms-1s";
if (responseTime < 3000) return "1-3s";
if (responseTime < 5000) return "3-5s";
return "5s+";
}

/**
* 初始化测试环境
*/
private void initializeTestEnvironment() {
totalRequests.set(0);
successRequests.set(0);
failedRequests.set(0);
customCounters.clear();
responseTimeBuckets.clear();
isRunning = true;
}

/**
* 生成压测报告
*/
private StressTestReport generateReport(StressTestMonitor monitor) {
long totalTime = endTime - startTime;
long totalReq = totalRequests.get();
long successReq = successRequests.get();
long failedReq = failedRequests.get();

double qps = totalTime > 0 ? (double) totalReq * 1000 / totalTime : 0;
double successRate = totalReq > 0 ? (double) successReq / totalReq : 0;
double errorRate = totalReq > 0 ? (double) failedReq / totalReq : 0;

StressTestReport report = new StressTestReport();
report.setTotalRequests(totalReq);
report.setSuccessRequests(successReq);
report.setFailedRequests(failedReq);
report.setTotalTime(totalTime);
report.setQps(qps);
report.setSuccessRate(successRate);
report.setErrorRate(errorRate);
report.setResponseTimeDistribution(new HashMap<>(responseTimeBuckets));
report.setCustomMetrics(monitor.getCustomMetrics());
report.setStartTime(startTime);
report.setEndTime(endTime);

return report;
}

/**
* 清理资源
*/
private void cleanup(ThreadPoolExecutor executor, StressTestMonitor monitor) {
try {
executor.shutdown();
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}

monitor.stop();
isRunning = false;
}
}

2.3 压测监控器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// 压测监控器
@Component
@Slf4j
public class StressTestMonitor {

private final ConcurrentHashMap<String, AtomicLong> customMetrics = new ConcurrentHashMap<>();
private final List<Long> responseTimes = Collections.synchronizedList(new ArrayList<>());
private final List<StressTestResult> results = Collections.synchronizedList(new ArrayList<>());

private volatile boolean isRunning = false;
private volatile long startTime;
private volatile long lastReportTime;

private final ScheduledExecutorService reportExecutor = Executors.newSingleThreadScheduledExecutor();

/**
* 开始监控
*/
public void start() {
isRunning = true;
startTime = System.currentTimeMillis();
lastReportTime = startTime;

// 启动定期报告
reportExecutor.scheduleAtFixedRate(this::generatePeriodicReport,
5, 5, TimeUnit.SECONDS);

log.info("压测监控器已启动");
}

/**
* 停止监控
*/
public void stop() {
isRunning = false;
reportExecutor.shutdown();

try {
if (!reportExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
reportExecutor.shutdownNow();
}
} catch (InterruptedException e) {
reportExecutor.shutdownNow();
Thread.currentThread().interrupt();
}

log.info("压测监控器已停止");
}

/**
* 记录压测结果
*/
public void recordResult(StressTestResult result, long responseTime) {
if (!isRunning) return;

// 记录结果
results.add(result);
responseTimes.add(responseTime);

// 更新自定义指标
if (result.getCustomMetrics() != null) {
result.getCustomMetrics().forEach((key, value) -> {
if (value instanceof Number) {
customMetrics.computeIfAbsent(key, k -> new AtomicLong(0))
.addAndGet(((Number) value).longValue());
}
});
}
}

/**
* 生成定期报告
*/
private void generatePeriodicReport() {
if (!isRunning) return;

long currentTime = System.currentTimeMillis();
long periodTime = currentTime - lastReportTime;

// 计算期间统计
long periodRequests = results.size();
long periodSuccess = results.stream().mapToLong(r -> r.isSuccess() ? 1 : 0).sum();
long periodFailed = periodRequests - periodSuccess;

double periodQps = periodTime > 0 ? (double) periodRequests * 1000 / periodTime : 0;
double periodSuccessRate = periodRequests > 0 ? (double) periodSuccess / periodRequests : 0;

// 计算响应时间统计
List<Long> periodResponseTimes = responseTimes.subList(
Math.max(0, responseTimes.size() - (int) periodRequests), responseTimes.size());

double avgResponseTime = periodResponseTimes.stream().mapToLong(Long::longValue).average().orElse(0);
long maxResponseTime = periodResponseTimes.stream().mapToLong(Long::longValue).max().orElse(0);
long minResponseTime = periodResponseTimes.stream().mapToLong(Long::longValue).min().orElse(0);

log.info("压测监控报告 - 期间: {}ms, 请求数: {}, 成功: {}, 失败: {}, QPS: {:.2f}, 成功率: {:.2f}%, 平均响应时间: {:.2f}ms, 最大响应时间: {}ms, 最小响应时间: {}ms",
periodTime, periodRequests, periodSuccess, periodFailed, periodQps,
periodSuccessRate * 100, avgResponseTime, maxResponseTime, minResponseTime);

lastReportTime = currentTime;
}

/**
* 获取自定义指标
*/
public Map<String, Long> getCustomMetrics() {
Map<String, Long> metrics = new HashMap<>();
customMetrics.forEach((key, value) -> metrics.put(key, value.get()));
return metrics;
}

/**
* 获取响应时间统计
*/
public ResponseTimeStatistics getResponseTimeStatistics() {
if (responseTimes.isEmpty()) {
return new ResponseTimeStatistics();
}

List<Long> sortedTimes = new ArrayList<>(responseTimes);
Collections.sort(sortedTimes);

ResponseTimeStatistics stats = new ResponseTimeStatistics();
stats.setMin(sortedTimes.get(0));
stats.setMax(sortedTimes.get(sortedTimes.size() - 1));
stats.setAvg(responseTimes.stream().mapToLong(Long::longValue).average().orElse(0));
stats.setP50(getPercentile(sortedTimes, 50));
stats.setP90(getPercentile(sortedTimes, 90));
stats.setP95(getPercentile(sortedTimes, 95));
stats.setP99(getPercentile(sortedTimes, 99));

return stats;
}

/**
* 计算百分位数
*/
private long getPercentile(List<Long> sortedTimes, double percentile) {
int index = (int) Math.ceil(sortedTimes.size() * percentile / 100) - 1;
return sortedTimes.get(Math.max(0, Math.min(index, sortedTimes.size() - 1)));
}
}

// 响应时间统计
public class ResponseTimeStatistics {
private long min;
private long max;
private double avg;
private long p50;
private long p90;
private long p95;
private long p99;

// 构造函数和getter/setter方法
}

// 压测报告
public class StressTestReport {
private long totalRequests;
private long successRequests;
private long failedRequests;
private long totalTime;
private double qps;
private double successRate;
private double errorRate;
private Map<String, AtomicLong> responseTimeDistribution;
private Map<String, Long> customMetrics;
private ResponseTimeStatistics responseTimeStatistics;
private long startTime;
private long endTime;

// 构造函数和getter/setter方法
}

三、压测工具使用示例

3.1 HTTP接口压测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// HTTP接口压测示例
@RestController
@RequestMapping("/stress-test")
@Slf4j
public class StressTestController {

@Autowired
private JucStressTestEngine stressTestEngine;

/**
* HTTP接口压测
*/
@PostMapping("/http")
public ResponseEntity<StressTestReport> httpStressTest(@RequestBody HttpStressTestRequest request) {
try {
// 1. 创建HTTP压测任务
HttpStressTestTask task = new HttpStressTestTask(
request.getUrl(),
request.getMethod(),
request.getHeaders(),
request.getBody(),
request.getTimeout()
);

// 2. 执行压测
StressTestReport report;
switch (request.getTestType()) {
case "FIXED_CONCURRENCY":
report = stressTestEngine.executeFixedConcurrencyTest(
task, request.getConcurrency(), request.getTotalRequests(), request.getDuration());
break;
case "RAMP_UP":
report = stressTestEngine.executeRampUpTest(
task, request.getInitialConcurrency(), request.getMaxConcurrency(),
request.getStepSize(), request.getStepDuration(), request.getTotalDuration());
break;
case "PEAK":
report = stressTestEngine.executePeakTest(
task, request.getBaseConcurrency(), request.getPeakConcurrency(),
request.getBaseDuration(), request.getPeakDuration(), request.getPeakCount());
break;
default:
throw new IllegalArgumentException("不支持的测试类型: " + request.getTestType());
}

return ResponseEntity.ok(report);

} catch (Exception e) {
log.error("HTTP压测执行失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

// HTTP压测请求
public class HttpStressTestRequest {
private String url;
private String method;
private Map<String, String> headers;
private String body;
private long timeout;
private String testType;
private int concurrency;
private int totalRequests;
private long duration;
private int initialConcurrency;
private int maxConcurrency;
private int stepSize;
private long stepDuration;
private long totalDuration;
private int baseConcurrency;
private int peakConcurrency;
private long baseDuration;
private long peakDuration;
private int peakCount;

// 构造函数和getter/setter方法
}

3.2 数据库压测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// 数据库压测任务
public class DatabaseStressTestTask implements StressTestTask {

private final String sql;
private final Map<String, Object> parameters;
private final DataSource dataSource;
private final long timeout;

public DatabaseStressTestTask(String sql, Map<String, Object> parameters,
DataSource dataSource, long timeout) {
this.sql = sql;
this.parameters = parameters;
this.dataSource = dataSource;
this.timeout = timeout;
}

@Override
public StressTestResult execute() throws Exception {
long startTime = System.currentTimeMillis();

try (Connection connection = dataSource.getConnection()) {
// 设置超时
connection.setNetworkTimeout(Executors.newSingleThreadExecutor(), (int) timeout);

try (PreparedStatement statement = connection.prepareStatement(sql)) {
// 设置参数
if (parameters != null) {
int index = 1;
for (Map.Entry<String, Object> entry : parameters.entrySet()) {
statement.setObject(index++, entry.getValue());
}
}

// 执行SQL
boolean isQuery = sql.trim().toUpperCase().startsWith("SELECT");

if (isQuery) {
try (ResultSet resultSet = statement.executeQuery()) {
int rowCount = 0;
while (resultSet.next()) {
rowCount++;
}

long responseTime = System.currentTimeMillis() - startTime;
StressTestResult result = new StressTestResult(true, responseTime);
result.getCustomMetrics().put("rowCount", rowCount);
return result;
}
} else {
int updateCount = statement.executeUpdate();

long responseTime = System.currentTimeMillis() - startTime;
StressTestResult result = new StressTestResult(true, responseTime);
result.getCustomMetrics().put("updateCount", updateCount);
return result;
}
}

} catch (Exception e) {
long responseTime = System.currentTimeMillis() - startTime;
StressTestResult result = new StressTestResult(false, responseTime);
result.setErrorMessage(e.getMessage());
return result;
}
}

@Override
public String getTaskName() {
return "DB_" + sql.substring(0, Math.min(20, sql.length()));
}

@Override
public String getTaskDescription() {
return "数据库压测任务: " + sql;
}

@Override
public long getTimeout() {
return timeout;
}

@Override
public boolean isRetryable() {
return true;
}

@Override
public int getRetryCount() {
return 2;
}
}

四、最佳实践与总结

4.1 JUC压测工具最佳实践

4.1.1 线程池配置策略

  • 核心线程数:根据CPU核心数和测试目标设置
  • 最大线程数:避免创建过多线程导致系统资源耗尽
  • 队列容量:合理设置队列大小,避免内存溢出
  • 拒绝策略:选择合适的拒绝策略处理任务溢出

4.1.2 压测策略选择

  • 固定并发:适合稳定性测试和容量评估
  • 阶梯递增:适合发现系统性能拐点
  • 峰值测试:适合测试系统在突发流量下的表现
  • 随机波动:适合模拟真实业务场景

4.1.3 监控指标设计

  • 基础指标:QPS、响应时间、错误率
  • 系统指标:CPU、内存、网络、磁盘使用率
  • 业务指标:自定义业务相关指标
  • 趋势分析:性能变化趋势和异常检测

4.1.4 结果分析策略

  • 性能基准:建立性能基准线
  • 瓶颈识别:识别系统性能瓶颈
  • 容量规划:基于压测结果进行容量规划
  • 优化建议:提供具体的优化建议

4.2 架构演进建议

4.2.1 分布式压测

  • 多节点压测:支持多节点分布式压测
  • 负载均衡:智能分配压测任务到不同节点
  • 结果聚合:聚合多节点压测结果
  • 统一管理:统一的压测任务管理和监控

4.2.2 智能化压测

  • 自适应压测:根据系统响应自动调整压测强度
  • AI驱动分析:使用机器学习分析压测结果
  • 智能告警:基于压测结果的智能告警
  • 自动优化:基于压测结果的自动优化建议

4.2.3 云原生压测

  • 容器化部署:使用Docker等容器技术部署
  • 弹性伸缩:基于压测负载的自动扩缩容
  • 服务网格:集成Istio等服务网格技术
  • 云原生监控:集成云原生的监控和告警

4.3 总结

基于JUC的压测工具类为企业提供了强大、灵活的压力测试能力。通过合理的架构设计,完善的并发控制,精确的性能监控,可以实现专业级的压力测试解决方案。随着微服务架构和云原生技术的普及,压测工具将更加智能化和自动化。

在未来的发展中,企业需要持续关注技术发展趋势,不断优化和完善压测策略,以适应不断变化的业务需求和技术环境。通过本文的深入分析和实践指导,希望能够为企业构建高质量的压测解决方案提供有价值的参考和帮助,推动企业级应用在性能测试场景下的稳定运行和持续发展。