第236集SpringBoot动态线程池架构实战:自适应调整、智能扩容、负载感知的企业级解决方案

前言

在当今高并发、多变的业务环境中,传统的静态线程池配置已无法满足动态负载的需求。业务高峰期需要更多线程处理请求,低谷期则需要减少线程以节省资源。基于SpringBoot的动态线程池,不仅能够根据实时负载自动调整线程数量,还能实现智能扩容、负载感知和自适应优化。随着微服务架构和云原生技术的普及,构建可自适应、智能化的动态线程池框架,已成为企业级架构师必须掌握的核心技能。

本文将深入探讨SpringBoot中动态线程池的架构设计与实战应用,从自适应调整到智能扩容,从负载感知到性能优化,为企业构建稳定、高效的动态线程池解决方案提供全面的技术指导。

一、SpringBoot动态线程池架构概述与核心原理

1.1 动态线程池架构设计

SpringBoot动态线程池系统采用分层架构设计,通过负载监控、自适应调整、智能扩容等技术,实现高效的动态线程池管理能力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
graph TB
A[业务请求] --> B[动态线程池]
B --> C[负载监控器]
C --> D[自适应调整器]
D --> E[智能扩容器]
E --> F[性能优化器]
F --> G[配置管理器]
G --> H[监控告警器]

I[负载感知] --> J[CPU负载]
I --> K[内存负载]
I --> L[队列负载]
I --> M[响应时间]
I --> N[吞吐量]

O[调整策略] --> P[线性调整]
O --> Q[指数调整]
O --> R[预测调整]
O --> S[机器学习调整]

T[扩容机制] --> U[垂直扩容]
T --> V[水平扩容]
T --> W[弹性扩容]
T --> X[预扩容]

Y[优化策略] --> Z[参数优化]
Y --> AA[策略优化]
Y --> BB[资源优化]
Y --> CC[性能优化]

DD[监控体系] --> EE[实时监控]
DD --> FF[历史分析]
DD --> GG[趋势预测]
DD --> HH[异常告警]

1.2 动态线程池核心特性

1.2.1 自适应调整能力

  • 负载感知:实时感知系统负载变化
  • 动态调整:根据负载自动调整线程数量
  • 平滑过渡:避免线程数量剧烈变化
  • 智能预测:基于历史数据预测负载趋势

1.2.2 智能扩容机制

  • 垂直扩容:增加单个线程池的线程数量
  • 水平扩容:创建新的线程池实例
  • 弹性扩容:根据业务需求弹性调整
  • 预扩容:提前扩容应对预期负载

1.2.3 负载感知系统

  • CPU监控:监控CPU使用率
  • 内存监控:监控内存使用情况
  • 队列监控:监控任务队列状态
  • 响应时间监控:监控任务响应时间

二、SpringBoot动态线程池核心实现

2.1 动态线程池核心组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
// 动态线程池
@Component
@Slf4j
public class DynamicThreadPool {

@Autowired
private LoadMonitorService loadMonitorService;

@Autowired
private AdaptiveAdjusterService adaptiveAdjusterService;

@Autowired
private IntelligentScalerService intelligentScalerService;

@Autowired
private PerformanceOptimizerService performanceOptimizerService;

private volatile ThreadPoolExecutor executor;
private volatile DynamicThreadPoolConfig config;
private volatile LoadMetrics currentLoad;
private volatile AdjustmentHistory adjustmentHistory;

private final ScheduledExecutorService monitorExecutor = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService adjustExecutor = Executors.newSingleThreadScheduledExecutor();

private final AtomicLong totalTasks = new AtomicLong(0);
private final AtomicLong completedTasks = new AtomicLong(0);
private final AtomicLong rejectedTasks = new AtomicLong(0);
private final AtomicLong failedTasks = new AtomicLong(0);

@PostConstruct
public void init() {
// 1. 初始化配置
initializeConfig();

// 2. 创建线程池
createThreadPool();

// 3. 启动监控
startMonitoring();

// 4. 启动自适应调整
startAdaptiveAdjustment();

log.info("动态线程池初始化完成: poolName={}", config.getPoolName());
}

/**
* 执行任务
*/
public CompletableFuture<Void> execute(Runnable task) {
totalTasks.incrementAndGet();

return CompletableFuture.runAsync(() -> {
try {
task.run();
completedTasks.incrementAndGet();
} catch (Exception e) {
failedTasks.incrementAndGet();
log.error("任务执行失败", e);
throw e;
}
}, executor);
}

/**
* 提交任务
*/
public <T> CompletableFuture<T> submit(Callable<T> task) {
totalTasks.incrementAndGet();

return CompletableFuture.supplyAsync(() -> {
try {
T result = task.call();
completedTasks.incrementAndGet();
return result;
} catch (Exception e) {
failedTasks.incrementAndGet();
log.error("任务提交失败", e);
throw new RuntimeException(e);
}
}, executor);
}

/**
* 获取线程池状态
*/
public DynamicThreadPoolStatus getStatus() {
DynamicThreadPoolStatus status = new DynamicThreadPoolStatus();
status.setPoolName(config.getPoolName());
status.setCorePoolSize(executor.getCorePoolSize());
status.setMaximumPoolSize(executor.getMaximumPoolSize());
status.setCurrentPoolSize(executor.getPoolSize());
status.setActiveThreadCount(executor.getActiveCount());
status.setLargestPoolSize(executor.getLargestPoolSize());
status.setTaskCount(executor.getTaskCount());
status.setCompletedTaskCount(executor.getCompletedTaskCount());
status.setQueueSize(executor.getQueue().size());
status.setQueueRemainingCapacity(executor.getQueue().remainingCapacity());
status.setTotalTasks(totalTasks.get());
status.setCompletedTasks(completedTasks.get());
status.setRejectedTasks(rejectedTasks.get());
status.setFailedTasks(failedTasks.get());
status.setCurrentLoad(currentLoad);
status.setLastAdjustmentTime(adjustmentHistory.getLastAdjustmentTime());
status.setAdjustmentCount(adjustmentHistory.getAdjustmentCount());

return status;
}

/**
* 动态调整线程池
*/
public boolean adjustThreadPool(AdjustmentRequest request) {
try {
// 1. 验证调整请求
validateAdjustmentRequest(request);

// 2. 计算新配置
DynamicThreadPoolConfig newConfig = calculateNewConfig(request);

// 3. 执行调整
boolean success = performAdjustment(newConfig);

if (success) {
// 4. 记录调整历史
recordAdjustmentHistory(request, newConfig);

// 5. 更新配置
this.config = newConfig;

log.info("动态线程池调整成功: poolName={}, newConfig={}", config.getPoolName(), newConfig);
}

return success;

} catch (Exception e) {
log.error("动态线程池调整失败: poolName={}", config.getPoolName(), e);
return false;
}
}

/**
* 创建线程池
*/
private void createThreadPool() {
executor = new ThreadPoolExecutor(
config.getCorePoolSize(),
config.getMaximumPoolSize(),
config.getKeepAliveTime(),
config.getTimeUnit(),
createBlockingQueue(config),
createThreadFactory(),
createRejectedExecutionHandler()
);

// 设置线程池属性
executor.allowCoreThreadTimeOut(config.isAllowCoreThreadTimeOut());

log.info("创建动态线程池: corePoolSize={}, maxPoolSize={}, queueCapacity={}",
config.getCorePoolSize(), config.getMaximumPoolSize(), config.getQueueCapacity());
}

/**
* 创建阻塞队列
*/
private BlockingQueue<Runnable> createBlockingQueue(DynamicThreadPoolConfig config) {
switch (config.getQueueType()) {
case LINKED_BLOCKING_QUEUE:
return new LinkedBlockingQueue<>(config.getQueueCapacity());
case ARRAY_BLOCKING_QUEUE:
return new ArrayBlockingQueue<>(config.getQueueCapacity());
case SYNCHRONOUS_QUEUE:
return new SynchronousQueue<>();
case PRIORITY_BLOCKING_QUEUE:
return new PriorityBlockingQueue<>(config.getQueueCapacity());
case DYNAMIC_QUEUE:
return new DynamicBlockingQueue(config.getQueueCapacity());
default:
return new LinkedBlockingQueue<>(config.getQueueCapacity());
}
}

/**
* 创建线程工厂
*/
private ThreadFactory createThreadFactory() {
return new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = config.getPoolName() + "-dynamic-thread-";

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
};
}

/**
* 创建拒绝策略
*/
private RejectedExecutionHandler createRejectedExecutionHandler() {
return new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
rejectedTasks.incrementAndGet();

// 记录拒绝信息
log.warn("任务被拒绝: poolName={}, activeThreads={}, queueSize={}",
config.getPoolName(), executor.getActiveCount(), executor.getQueue().size());

// 触发扩容
triggerScaling();

// 使用调用者运行策略
if (!executor.isShutdown()) {
r.run();
}
}
};
}

/**
* 启动监控
*/
private void startMonitoring() {
monitorExecutor.scheduleAtFixedRate(() -> {
try {
// 1. 收集负载指标
collectLoadMetrics();

// 2. 分析负载趋势
analyzeLoadTrend();

// 3. 检查是否需要调整
checkAdjustmentNeeded();

} catch (Exception e) {
log.error("监控任务执行失败", e);
}
}, 5, 5, TimeUnit.SECONDS);
}

/**
* 启动自适应调整
*/
private void startAdaptiveAdjustment() {
adjustExecutor.scheduleAtFixedRate(() -> {
try {
// 1. 获取调整建议
AdjustmentRecommendation recommendation = adaptiveAdjusterService.getRecommendation(currentLoad, config);

if (recommendation != null && recommendation.isRecommended()) {
// 2. 创建调整请求
AdjustmentRequest request = createAdjustmentRequest(recommendation);

// 3. 执行调整
adjustThreadPool(request);
}

} catch (Exception e) {
log.error("自适应调整任务执行失败", e);
}
}, 30, 30, TimeUnit.SECONDS);
}

/**
* 收集负载指标
*/
private void collectLoadMetrics() {
try {
LoadMetrics metrics = loadMonitorService.collectMetrics(executor);
this.currentLoad = metrics;

log.debug("收集负载指标: cpuLoad={}, memoryLoad={}, queueLoad={}, responseTime={}",
metrics.getCpuLoad(), metrics.getMemoryLoad(),
metrics.getQueueLoad(), metrics.getAverageResponseTime());

} catch (Exception e) {
log.error("收集负载指标失败", e);
}
}

/**
* 分析负载趋势
*/
private void analyzeLoadTrend() {
try {
LoadTrendAnalysis analysis = loadMonitorService.analyzeTrend(currentLoad);

if (analysis.isTrendingUp()) {
log.info("负载趋势上升: trend={}, prediction={}", analysis.getTrend(), analysis.getPrediction());
} else if (analysis.isTrendingDown()) {
log.info("负载趋势下降: trend={}, prediction={}", analysis.getTrend(), analysis.getPrediction());
}

} catch (Exception e) {
log.error("分析负载趋势失败", e);
}
}

/**
* 检查是否需要调整
*/
private void checkAdjustmentNeeded() {
try {
if (currentLoad == null) {
return;
}

// 1. 检查CPU负载
if (currentLoad.getCpuLoad() > config.getCpuThreshold()) {
triggerScaling();
}

// 2. 检查队列负载
if (currentLoad.getQueueLoad() > config.getQueueThreshold()) {
triggerScaling();
}

// 3. 检查响应时间
if (currentLoad.getAverageResponseTime() > config.getResponseTimeThreshold()) {
triggerScaling();
}

} catch (Exception e) {
log.error("检查调整需求失败", e);
}
}

/**
* 触发扩容
*/
private void triggerScaling() {
try {
ScalingRequest request = intelligentScalerService.createScalingRequest(currentLoad, config);

if (request != null && request.isScalingNeeded()) {
boolean success = intelligentScalerService.performScaling(request);

if (success) {
log.info("触发扩容成功: scalingType={}, targetSize={}",
request.getScalingType(), request.getTargetSize());
}
}

} catch (Exception e) {
log.error("触发扩容失败", e);
}
}

/**
* 执行调整
*/
private boolean performAdjustment(DynamicThreadPoolConfig newConfig) {
try {
// 1. 调整核心线程数
if (config.getCorePoolSize() != newConfig.getCorePoolSize()) {
executor.setCorePoolSize(newConfig.getCorePoolSize());
log.debug("调整核心线程数: {} -> {}", config.getCorePoolSize(), newConfig.getCorePoolSize());
}

// 2. 调整最大线程数
if (config.getMaximumPoolSize() != newConfig.getMaximumPoolSize()) {
executor.setMaximumPoolSize(newConfig.getMaximumPoolSize());
log.debug("调整最大线程数: {} -> {}", config.getMaximumPoolSize(), newConfig.getMaximumPoolSize());
}

// 3. 调整线程存活时间
if (config.getKeepAliveTime() != newConfig.getKeepAliveTime() ||
config.getTimeUnit() != newConfig.getTimeUnit()) {
executor.setKeepAliveTime(newConfig.getKeepAliveTime(), newConfig.getTimeUnit());
log.debug("调整线程存活时间: {} {} -> {} {}",
config.getKeepAliveTime(), config.getTimeUnit(),
newConfig.getKeepAliveTime(), newConfig.getTimeUnit());
}

return true;

} catch (Exception e) {
log.error("执行调整失败", e);
return false;
}
}

/**
* 计算新配置
*/
private DynamicThreadPoolConfig calculateNewConfig(AdjustmentRequest request) {
DynamicThreadPoolConfig newConfig = new DynamicThreadPoolConfig();
newConfig.setPoolName(config.getPoolName());

// 计算新的核心线程数
int newCorePoolSize = calculateNewCorePoolSize(request);
newConfig.setCorePoolSize(newCorePoolSize);

// 计算新的最大线程数
int newMaxPoolSize = calculateNewMaxPoolSize(request);
newConfig.setMaximumPoolSize(newMaxPoolSize);

// 保持其他配置不变
newConfig.setKeepAliveTime(config.getKeepAliveTime());
newConfig.setTimeUnit(config.getTimeUnit());
newConfig.setQueueType(config.getQueueType());
newConfig.setQueueCapacity(config.getQueueCapacity());
newConfig.setAllowCoreThreadTimeOut(config.isAllowCoreThreadTimeOut());
newConfig.setCpuThreshold(config.getCpuThreshold());
newConfig.setQueueThreshold(config.getQueueThreshold());
newConfig.setResponseTimeThreshold(config.getResponseTimeThreshold());

return newConfig;
}

/**
* 计算新的核心线程数
*/
private int calculateNewCorePoolSize(AdjustmentRequest request) {
int currentCorePoolSize = config.getCorePoolSize();

switch (request.getAdjustmentType()) {
case INCREASE:
return Math.min(currentCorePoolSize + request.getAdjustmentValue(),
config.getMaximumPoolSize());
case DECREASE:
return Math.max(currentCorePoolSize - request.getAdjustmentValue(), 1);
case SET:
return Math.min(request.getAdjustmentValue(), config.getMaximumPoolSize());
default:
return currentCorePoolSize;
}
}

/**
* 计算新的最大线程数
*/
private int calculateNewMaxPoolSize(AdjustmentRequest request) {
int currentMaxPoolSize = config.getMaximumPoolSize();

switch (request.getAdjustmentType()) {
case INCREASE:
return Math.min(currentMaxPoolSize + request.getAdjustmentValue(),
config.getMaxPoolSizeLimit());
case DECREASE:
return Math.max(currentMaxPoolSize - request.getAdjustmentValue(),
config.getCorePoolSize());
case SET:
return Math.min(request.getAdjustmentValue(), config.getMaxPoolSizeLimit());
default:
return currentMaxPoolSize;
}
}

/**
* 创建调整请求
*/
private AdjustmentRequest createAdjustmentRequest(AdjustmentRecommendation recommendation) {
AdjustmentRequest request = new AdjustmentRequest();
request.setPoolName(config.getPoolName());
request.setAdjustmentType(recommendation.getAdjustmentType());
request.setAdjustmentValue(recommendation.getAdjustmentValue());
request.setReason(recommendation.getReason());
request.setTimestamp(System.currentTimeMillis());

return request;
}

/**
* 记录调整历史
*/
private void recordAdjustmentHistory(AdjustmentRequest request, DynamicThreadPoolConfig newConfig) {
AdjustmentRecord record = new AdjustmentRecord();
record.setPoolName(config.getPoolName());
record.setOldConfig(config);
record.setNewConfig(newConfig);
record.setRequest(request);
record.setTimestamp(System.currentTimeMillis());

adjustmentHistory.addRecord(record);
}

/**
* 验证调整请求
*/
private void validateAdjustmentRequest(AdjustmentRequest request) {
if (request == null) {
throw new IllegalArgumentException("调整请求不能为空");
}

if (request.getAdjustmentValue() <= 0) {
throw new IllegalArgumentException("调整值必须大于0");
}

if (request.getAdjustmentType() == null) {
throw new IllegalArgumentException("调整类型不能为空");
}
}

/**
* 初始化配置
*/
private void initializeConfig() {
config = new DynamicThreadPoolConfig();
config.setPoolName("dynamic-pool");
config.setCorePoolSize(10);
config.setMaximumPoolSize(50);
config.setKeepAliveTime(60L);
config.setTimeUnit(TimeUnit.SECONDS);
config.setQueueType(QueueType.LINKED_BLOCKING_QUEUE);
config.setQueueCapacity(1000);
config.setAllowCoreThreadTimeOut(false);
config.setCpuThreshold(0.8);
config.setQueueThreshold(0.8);
config.setResponseTimeThreshold(5000L);
config.setMaxPoolSizeLimit(100);

adjustmentHistory = new AdjustmentHistory();
}

@PreDestroy
public void destroy() {
// 1. 停止监控
monitorExecutor.shutdown();
adjustExecutor.shutdown();

// 2. 关闭线程池
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}

log.info("动态线程池已关闭: poolName={}", config.getPoolName());
}
}

// 动态线程池配置
public class DynamicThreadPoolConfig {
private String poolName;
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
private TimeUnit timeUnit;
private QueueType queueType;
private int queueCapacity;
private boolean allowCoreThreadTimeOut;
private double cpuThreshold;
private double queueThreshold;
private long responseTimeThreshold;
private int maxPoolSizeLimit;

// 构造函数和getter/setter方法
}

// 动态线程池状态
public class DynamicThreadPoolStatus {
private String poolName;
private int corePoolSize;
private int maximumPoolSize;
private int currentPoolSize;
private int activeThreadCount;
private int largestPoolSize;
private long taskCount;
private long completedTaskCount;
private int queueSize;
private int queueRemainingCapacity;
private long totalTasks;
private long completedTasks;
private long rejectedTasks;
private long failedTasks;
private LoadMetrics currentLoad;
private long lastAdjustmentTime;
private int adjustmentCount;

// 构造函数和getter/setter方法
}

// 调整请求
public class AdjustmentRequest {
private String poolName;
private AdjustmentType adjustmentType;
private int adjustmentValue;
private String reason;
private long timestamp;

public enum AdjustmentType {
INCREASE, DECREASE, SET
}

// 构造函数和getter/setter方法
}

// 调整记录
public class AdjustmentRecord {
private String poolName;
private DynamicThreadPoolConfig oldConfig;
private DynamicThreadPoolConfig newConfig;
private AdjustmentRequest request;
private long timestamp;

// 构造函数和getter/setter方法
}

// 调整历史
public class AdjustmentHistory {
private final List<AdjustmentRecord> records = new ArrayList<>();

public void addRecord(AdjustmentRecord record) {
records.add(record);
}

public long getLastAdjustmentTime() {
return records.isEmpty() ? 0 : records.get(records.size() - 1).getTimestamp();
}

public int getAdjustmentCount() {
return records.size();
}

// getter方法
}

2.2 负载监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
// 负载监控服务
@Service
@Slf4j
public class LoadMonitorService {

@Autowired
private SystemMetricsCollector systemMetricsCollector;

@Autowired
private LoadTrendAnalyzer loadTrendAnalyzer;

private final Map<String, LoadMetricsHistory> metricsHistory = new ConcurrentHashMap<>();

/**
* 收集负载指标
*/
public LoadMetrics collectMetrics(ThreadPoolExecutor executor) {
try {
LoadMetrics metrics = new LoadMetrics();

// 1. 收集系统指标
SystemMetrics systemMetrics = systemMetricsCollector.collectSystemMetrics();
metrics.setCpuLoad(systemMetrics.getCpuUsage());
metrics.setMemoryLoad(systemMetrics.getMemoryUsage());
metrics.setDiskLoad(systemMetrics.getDiskUsage());
metrics.setNetworkLoad(systemMetrics.getNetworkUsage());

// 2. 收集线程池指标
ThreadPoolMetrics threadPoolMetrics = collectThreadPoolMetrics(executor);
metrics.setActiveThreadCount(threadPoolMetrics.getActiveThreadCount());
metrics.setQueueSize(threadPoolMetrics.getQueueSize());
metrics.setQueueCapacity(threadPoolMetrics.getQueueCapacity());
metrics.setTaskCount(threadPoolMetrics.getTaskCount());
metrics.setCompletedTaskCount(threadPoolMetrics.getCompletedTaskCount());

// 3. 计算队列负载
double queueLoad = calculateQueueLoad(threadPoolMetrics);
metrics.setQueueLoad(queueLoad);

// 4. 计算平均响应时间
long averageResponseTime = calculateAverageResponseTime(threadPoolMetrics);
metrics.setAverageResponseTime(averageResponseTime);

// 5. 计算吞吐量
double throughput = calculateThroughput(threadPoolMetrics);
metrics.setThroughput(throughput);

// 6. 设置时间戳
metrics.setTimestamp(System.currentTimeMillis());

// 7. 保存历史数据
saveMetricsHistory(metrics);

return metrics;

} catch (Exception e) {
log.error("收集负载指标失败", e);
return new LoadMetrics();
}
}

/**
* 分析负载趋势
*/
public LoadTrendAnalysis analyzeTrend(LoadMetrics currentMetrics) {
try {
String poolName = "default"; // 这里应该从上下文获取
LoadMetricsHistory history = metricsHistory.get(poolName);

if (history == null || history.getMetrics().size() < 2) {
return new LoadTrendAnalysis();
}

// 1. 分析CPU负载趋势
Trend cpuTrend = analyzeMetricTrend(history.getCpuLoadHistory());

// 2. 分析内存负载趋势
Trend memoryTrend = analyzeMetricTrend(history.getMemoryLoadHistory());

// 3. 分析队列负载趋势
Trend queueTrend = analyzeMetricTrend(history.getQueueLoadHistory());

// 4. 分析响应时间趋势
Trend responseTimeTrend = analyzeMetricTrend(history.getResponseTimeHistory());

// 5. 预测未来负载
LoadPrediction prediction = predictFutureLoad(history);

// 6. 创建趋势分析结果
LoadTrendAnalysis analysis = new LoadTrendAnalysis();
analysis.setCpuTrend(cpuTrend);
analysis.setMemoryTrend(memoryTrend);
analysis.setQueueTrend(queueTrend);
analysis.setResponseTimeTrend(responseTimeTrend);
analysis.setPrediction(prediction);
analysis.setAnalysisTime(System.currentTimeMillis());

return analysis;

} catch (Exception e) {
log.error("分析负载趋势失败", e);
return new LoadTrendAnalysis();
}
}

/**
* 收集线程池指标
*/
private ThreadPoolMetrics collectThreadPoolMetrics(ThreadPoolExecutor executor) {
ThreadPoolMetrics metrics = new ThreadPoolMetrics();
metrics.setCorePoolSize(executor.getCorePoolSize());
metrics.setMaximumPoolSize(executor.getMaximumPoolSize());
metrics.setCurrentPoolSize(executor.getPoolSize());
metrics.setActiveThreadCount(executor.getActiveCount());
metrics.setLargestPoolSize(executor.getLargestPoolSize());
metrics.setTaskCount(executor.getTaskCount());
metrics.setCompletedTaskCount(executor.getCompletedTaskCount());
metrics.setQueueSize(executor.getQueue().size());
metrics.setQueueRemainingCapacity(executor.getQueue().remainingCapacity());
metrics.setQueueCapacity(executor.getQueue().size() + executor.getQueue().remainingCapacity());

return metrics;
}

/**
* 计算队列负载
*/
private double calculateQueueLoad(ThreadPoolMetrics metrics) {
if (metrics.getQueueCapacity() == 0) {
return 0.0;
}

return (double) metrics.getQueueSize() / metrics.getQueueCapacity();
}

/**
* 计算平均响应时间
*/
private long calculateAverageResponseTime(ThreadPoolMetrics metrics) {
// 这里需要根据实际的任务执行时间来计算
// 简化实现,返回固定值
return 1000L; // 1秒
}

/**
* 计算吞吐量
*/
private double calculateThroughput(ThreadPoolMetrics metrics) {
// 这里需要根据实际的任务完成情况来计算
// 简化实现,返回固定值
return 100.0; // 100 tasks/second
}

/**
* 分析指标趋势
*/
private Trend analyzeMetricTrend(List<Double> values) {
if (values.size() < 2) {
return Trend.STABLE;
}

// 计算趋势
double sum = 0;
for (int i = 1; i < values.size(); i++) {
sum += values.get(i) - values.get(i - 1);
}

double averageChange = sum / (values.size() - 1);

if (averageChange > 0.1) {
return Trend.INCREASING;
} else if (averageChange < -0.1) {
return Trend.DECREASING;
} else {
return Trend.STABLE;
}
}

/**
* 预测未来负载
*/
private LoadPrediction predictFutureLoad(LoadMetricsHistory history) {
LoadPrediction prediction = new LoadPrediction();

// 简单的线性预测
if (history.getCpuLoadHistory().size() >= 2) {
List<Double> cpuHistory = history.getCpuLoadHistory();
double lastCpu = cpuHistory.get(cpuHistory.size() - 1);
double prevCpu = cpuHistory.get(cpuHistory.size() - 2);
double cpuChange = lastCpu - prevCpu;

// 预测未来5分钟的CPU负载
double predictedCpu = lastCpu + cpuChange * 5;
prediction.setPredictedCpuLoad(Math.max(0, Math.min(1, predictedCpu)));
}

if (history.getQueueLoadHistory().size() >= 2) {
List<Double> queueHistory = history.getQueueLoadHistory();
double lastQueue = queueHistory.get(queueHistory.size() - 1);
double prevQueue = queueHistory.get(queueHistory.size() - 2);
double queueChange = lastQueue - prevQueue;

// 预测未来5分钟的队列负载
double predictedQueue = lastQueue + queueChange * 5;
prediction.setPredictedQueueLoad(Math.max(0, Math.min(1, predictedQueue)));
}

prediction.setPredictionTime(System.currentTimeMillis() + 5 * 60 * 1000); // 5分钟后

return prediction;
}

/**
* 保存指标历史
*/
private void saveMetricsHistory(LoadMetrics metrics) {
String poolName = "default"; // 这里应该从上下文获取
LoadMetricsHistory history = metricsHistory.computeIfAbsent(poolName, k -> new LoadMetricsHistory());

history.addCpuLoad(metrics.getCpuLoad());
history.addMemoryLoad(metrics.getMemoryLoad());
history.addQueueLoad(metrics.getQueueLoad());
history.addResponseTime(metrics.getAverageResponseTime());

// 只保留最近1000条记录
if (history.getCpuLoadHistory().size() > 1000) {
history.getCpuLoadHistory().remove(0);
history.getMemoryLoadHistory().remove(0);
history.getQueueLoadHistory().remove(0);
history.getResponseTimeHistory().remove(0);
}
}
}

// 负载指标
public class LoadMetrics {
private double cpuLoad;
private double memoryLoad;
private double diskLoad;
private double networkLoad;
private int activeThreadCount;
private int queueSize;
private int queueCapacity;
private double queueLoad;
private long taskCount;
private long completedTaskCount;
private long averageResponseTime;
private double throughput;
private long timestamp;

// 构造函数和getter/setter方法
}

// 线程池指标
public class ThreadPoolMetrics {
private int corePoolSize;
private int maximumPoolSize;
private int currentPoolSize;
private int activeThreadCount;
private int largestPoolSize;
private long taskCount;
private long completedTaskCount;
private int queueSize;
private int queueRemainingCapacity;
private int queueCapacity;

// 构造函数和getter/setter方法
}

// 负载趋势分析
public class LoadTrendAnalysis {
private Trend cpuTrend;
private Trend memoryTrend;
private Trend queueTrend;
private Trend responseTimeTrend;
private LoadPrediction prediction;
private long analysisTime;

public boolean isTrendingUp() {
return cpuTrend == Trend.INCREASING ||
memoryTrend == Trend.INCREASING ||
queueTrend == Trend.INCREASING;
}

public boolean isTrendingDown() {
return cpuTrend == Trend.DECREASING ||
memoryTrend == Trend.DECREASING ||
queueTrend == Trend.DECREASING;
}

// 构造函数和getter/setter方法
}

// 趋势枚举
public enum Trend {
INCREASING, DECREASING, STABLE
}

// 负载预测
public class LoadPrediction {
private double predictedCpuLoad;
private double predictedQueueLoad;
private long predictionTime;

// 构造函数和getter/setter方法
}

// 负载指标历史
public class LoadMetricsHistory {
private final List<Double> cpuLoadHistory = new ArrayList<>();
private final List<Double> memoryLoadHistory = new ArrayList<>();
private final List<Double> queueLoadHistory = new ArrayList<>();
private final List<Long> responseTimeHistory = new ArrayList<>();

public void addCpuLoad(double cpuLoad) {
cpuLoadHistory.add(cpuLoad);
}

public void addMemoryLoad(double memoryLoad) {
memoryLoadHistory.add(memoryLoad);
}

public void addQueueLoad(double queueLoad) {
queueLoadHistory.add(queueLoad);
}

public void addResponseTime(long responseTime) {
responseTimeHistory.add(responseTime);
}

// getter方法
}

2.3 自适应调整服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
// 自适应调整服务
@Service
@Slf4j
public class AdaptiveAdjusterService {

@Autowired
private LoadMonitorService loadMonitorService;

@Autowired
private AdjustmentStrategyManager strategyManager;

private final Map<String, AdjustmentStrategy> strategies = new ConcurrentHashMap<>();

@PostConstruct
public void init() {
// 初始化调整策略
initializeStrategies();
}

/**
* 获取调整建议
*/
public AdjustmentRecommendation getRecommendation(LoadMetrics currentLoad, DynamicThreadPoolConfig config) {
try {
if (currentLoad == null) {
return null;
}

// 1. 分析当前负载
LoadAnalysis analysis = analyzeCurrentLoad(currentLoad, config);

// 2. 选择调整策略
AdjustmentStrategy strategy = selectStrategy(analysis);

// 3. 生成调整建议
AdjustmentRecommendation recommendation = strategy.generateRecommendation(analysis);

if (recommendation != null && recommendation.isRecommended()) {
log.info("生成调整建议: strategy={}, recommendation={}",
strategy.getName(), recommendation);
}

return recommendation;

} catch (Exception e) {
log.error("获取调整建议失败", e);
return null;
}
}

/**
* 分析当前负载
*/
private LoadAnalysis analyzeCurrentLoad(LoadMetrics currentLoad, DynamicThreadPoolConfig config) {
LoadAnalysis analysis = new LoadAnalysis();
analysis.setCurrentLoad(currentLoad);
analysis.setConfig(config);
analysis.setAnalysisTime(System.currentTimeMillis());

// 1. 分析CPU负载
if (currentLoad.getCpuLoad() > config.getCpuThreshold()) {
analysis.setCpuOverloaded(true);
analysis.setCpuOverloadLevel(calculateOverloadLevel(currentLoad.getCpuLoad(), config.getCpuThreshold()));
}

// 2. 分析队列负载
if (currentLoad.getQueueLoad() > config.getQueueThreshold()) {
analysis.setQueueOverloaded(true);
analysis.setQueueOverloadLevel(calculateOverloadLevel(currentLoad.getQueueLoad(), config.getQueueThreshold()));
}

// 3. 分析响应时间
if (currentLoad.getAverageResponseTime() > config.getResponseTimeThreshold()) {
analysis.setResponseTimeOverloaded(true);
analysis.setResponseTimeOverloadLevel(calculateOverloadLevel(
(double) currentLoad.getAverageResponseTime() / config.getResponseTimeThreshold(), 1.0));
}

// 4. 分析资源利用率
double resourceUtilization = calculateResourceUtilization(currentLoad, config);
analysis.setResourceUtilization(resourceUtilization);

// 5. 分析负载模式
LoadPattern pattern = analyzeLoadPattern(currentLoad);
analysis.setLoadPattern(pattern);

return analysis;
}

/**
* 选择调整策略
*/
private AdjustmentStrategy selectStrategy(LoadAnalysis analysis) {
// 1. 根据负载模式选择策略
if (analysis.getLoadPattern() == LoadPattern.BURST) {
return strategies.get("burst-strategy");
} else if (analysis.getLoadPattern() == LoadPattern.STEADY) {
return strategies.get("steady-strategy");
} else if (analysis.getLoadPattern() == LoadPattern.GRADUAL) {
return strategies.get("gradual-strategy");
}

// 2. 根据过载情况选择策略
if (analysis.isCpuOverloaded() || analysis.isQueueOverloaded()) {
return strategies.get("overload-strategy");
}

// 3. 根据资源利用率选择策略
if (analysis.getResourceUtilization() < 0.3) {
return strategies.get("underutilized-strategy");
}

// 4. 默认策略
return strategies.get("default-strategy");
}

/**
* 计算过载级别
*/
private OverloadLevel calculateOverloadLevel(double currentValue, double threshold) {
double ratio = currentValue / threshold;

if (ratio >= 2.0) {
return OverloadLevel.CRITICAL;
} else if (ratio >= 1.5) {
return OverloadLevel.HIGH;
} else if (ratio >= 1.2) {
return OverloadLevel.MEDIUM;
} else {
return OverloadLevel.LOW;
}
}

/**
* 计算资源利用率
*/
private double calculateResourceUtilization(LoadMetrics currentLoad, DynamicThreadPoolConfig config) {
// 计算线程池利用率
double threadUtilization = (double) currentLoad.getActiveThreadCount() / config.getMaximumPoolSize();

// 计算队列利用率
double queueUtilization = currentLoad.getQueueLoad();

// 综合利用率
return (threadUtilization + queueUtilization) / 2.0;
}

/**
* 分析负载模式
*/
private LoadPattern analyzeLoadPattern(LoadMetrics currentLoad) {
// 简化的负载模式分析
if (currentLoad.getCpuLoad() > 0.8 && currentLoad.getQueueLoad() > 0.8) {
return LoadPattern.BURST;
} else if (currentLoad.getCpuLoad() > 0.5 && currentLoad.getQueueLoad() > 0.5) {
return LoadPattern.STEADY;
} else {
return LoadPattern.GRADUAL;
}
}

/**
* 初始化调整策略
*/
private void initializeStrategies() {
// 1. 突发负载策略
strategies.put("burst-strategy", new BurstLoadStrategy());

// 2. 稳定负载策略
strategies.put("steady-strategy", new SteadyLoadStrategy());

// 3. 渐进负载策略
strategies.put("gradual-strategy", new GradualLoadStrategy());

// 4. 过载策略
strategies.put("overload-strategy", new OverloadStrategy());

// 5. 低利用率策略
strategies.put("underutilized-strategy", new UnderutilizedStrategy());

// 6. 默认策略
strategies.put("default-strategy", new DefaultStrategy());

log.info("初始化调整策略完成: count={}", strategies.size());
}
}

// 负载分析
public class LoadAnalysis {
private LoadMetrics currentLoad;
private DynamicThreadPoolConfig config;
private boolean cpuOverloaded;
private boolean queueOverloaded;
private boolean responseTimeOverloaded;
private OverloadLevel cpuOverloadLevel;
private OverloadLevel queueOverloadLevel;
private OverloadLevel responseTimeOverloadLevel;
private double resourceUtilization;
private LoadPattern loadPattern;
private long analysisTime;

// 构造函数和getter/setter方法
}

// 过载级别
public enum OverloadLevel {
LOW, MEDIUM, HIGH, CRITICAL
}

// 负载模式
public enum LoadPattern {
BURST, // 突发负载
STEADY, // 稳定负载
GRADUAL // 渐进负载
}

// 调整建议
public class AdjustmentRecommendation {
private boolean recommended;
private AdjustmentType adjustmentType;
private int adjustmentValue;
private String reason;
private double confidence;
private long timestamp;

public enum AdjustmentType {
INCREASE_CORE_POOL_SIZE,
INCREASE_MAX_POOL_SIZE,
DECREASE_CORE_POOL_SIZE,
DECREASE_MAX_POOL_SIZE,
ADJUST_QUEUE_CAPACITY,
CHANGE_REJECTION_POLICY
}

// 构造函数和getter/setter方法
}

// 调整策略接口
public interface AdjustmentStrategy {
String getName();
AdjustmentRecommendation generateRecommendation(LoadAnalysis analysis);
}

// 突发负载策略
public class BurstLoadStrategy implements AdjustmentStrategy {

@Override
public String getName() {
return "BurstLoadStrategy";
}

@Override
public AdjustmentRecommendation generateRecommendation(LoadAnalysis analysis) {
AdjustmentRecommendation recommendation = new AdjustmentRecommendation();

if (analysis.isCpuOverloaded() || analysis.isQueueOverloaded()) {
recommendation.setRecommended(true);
recommendation.setAdjustmentType(AdjustmentRecommendation.AdjustmentType.INCREASE_MAX_POOL_SIZE);
recommendation.setAdjustmentValue(calculateAdjustmentValue(analysis));
recommendation.setReason("检测到突发负载,建议增加最大线程数");
recommendation.setConfidence(0.9);
recommendation.setTimestamp(System.currentTimeMillis());
}

return recommendation;
}

private int calculateAdjustmentValue(LoadAnalysis analysis) {
if (analysis.getCpuOverloadLevel() == OverloadLevel.CRITICAL) {
return 20;
} else if (analysis.getCpuOverloadLevel() == OverloadLevel.HIGH) {
return 15;
} else if (analysis.getCpuOverloadLevel() == OverloadLevel.MEDIUM) {
return 10;
} else {
return 5;
}
}
}

// 稳定负载策略
public class SteadyLoadStrategy implements AdjustmentStrategy {

@Override
public String getName() {
return "SteadyLoadStrategy";
}

@Override
public AdjustmentRecommendation generateRecommendation(LoadAnalysis analysis) {
AdjustmentRecommendation recommendation = new AdjustmentRecommendation();

if (analysis.getResourceUtilization() > 0.8) {
recommendation.setRecommended(true);
recommendation.setAdjustmentType(AdjustmentRecommendation.AdjustmentType.INCREASE_CORE_POOL_SIZE);
recommendation.setAdjustmentValue(5);
recommendation.setReason("检测到稳定高负载,建议增加核心线程数");
recommendation.setConfidence(0.8);
recommendation.setTimestamp(System.currentTimeMillis());
}

return recommendation;
}
}

// 渐进负载策略
public class GradualLoadStrategy implements AdjustmentStrategy {

@Override
public String getName() {
return "GradualLoadStrategy";
}

@Override
public AdjustmentRecommendation generateRecommendation(LoadAnalysis analysis) {
AdjustmentRecommendation recommendation = new AdjustmentRecommendation();

if (analysis.getResourceUtilization() > 0.6) {
recommendation.setRecommended(true);
recommendation.setAdjustmentType(AdjustmentRecommendation.AdjustmentType.INCREASE_MAX_POOL_SIZE);
recommendation.setAdjustmentValue(3);
recommendation.setReason("检测到渐进负载增长,建议适度增加最大线程数");
recommendation.setConfidence(0.7);
recommendation.setTimestamp(System.currentTimeMillis());
}

return recommendation;
}
}

// 过载策略
public class OverloadStrategy implements AdjustmentStrategy {

@Override
public String getName() {
return "OverloadStrategy";
}

@Override
public AdjustmentRecommendation generateRecommendation(LoadAnalysis analysis) {
AdjustmentRecommendation recommendation = new AdjustmentRecommendation();

recommendation.setRecommended(true);
recommendation.setAdjustmentType(AdjustmentRecommendation.AdjustmentType.INCREASE_MAX_POOL_SIZE);
recommendation.setAdjustmentValue(calculateAdjustmentValue(analysis));
recommendation.setReason("检测到系统过载,建议紧急增加线程数");
recommendation.setConfidence(0.95);
recommendation.setTimestamp(System.currentTimeMillis());

return recommendation;
}

private int calculateAdjustmentValue(LoadAnalysis analysis) {
int adjustmentValue = 0;

if (analysis.isCpuOverloaded()) {
adjustmentValue += analysis.getCpuOverloadLevel().ordinal() * 5;
}

if (analysis.isQueueOverloaded()) {
adjustmentValue += analysis.getQueueOverloadLevel().ordinal() * 5;
}

return Math.max(adjustmentValue, 10);
}
}

// 低利用率策略
public class UnderutilizedStrategy implements AdjustmentStrategy {

@Override
public String getName() {
return "UnderutilizedStrategy";
}

@Override
public AdjustmentRecommendation generateRecommendation(LoadAnalysis analysis) {
AdjustmentRecommendation recommendation = new AdjustmentRecommendation();

if (analysis.getResourceUtilization() < 0.3) {
recommendation.setRecommended(true);
recommendation.setAdjustmentType(AdjustmentRecommendation.AdjustmentType.DECREASE_CORE_POOL_SIZE);
recommendation.setAdjustmentValue(2);
recommendation.setReason("检测到资源利用率较低,建议减少核心线程数");
recommendation.setConfidence(0.6);
recommendation.setTimestamp(System.currentTimeMillis());
}

return recommendation;
}
}

// 默认策略
public class DefaultStrategy implements AdjustmentStrategy {

@Override
public String getName() {
return "DefaultStrategy";
}

@Override
public AdjustmentRecommendation generateRecommendation(LoadAnalysis analysis) {
// 默认策略不推荐调整
return new AdjustmentRecommendation();
}
}

2.4 智能扩容服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
// 智能扩容服务
@Service
@Slf4j
public class IntelligentScalerService {

@Autowired
private LoadMonitorService loadMonitorService;

@Autowired
private ScalingStrategyManager scalingStrategyManager;

private final Map<String, ScalingHistory> scalingHistories = new ConcurrentHashMap<>();

/**
* 创建扩容请求
*/
public ScalingRequest createScalingRequest(LoadMetrics currentLoad, DynamicThreadPoolConfig config) {
try {
// 1. 分析扩容需求
ScalingAnalysis analysis = analyzeScalingNeeded(currentLoad, config);

if (!analysis.isScalingNeeded()) {
return null;
}

// 2. 选择扩容策略
ScalingStrategy strategy = scalingStrategyManager.selectStrategy(analysis);

// 3. 生成扩容请求
ScalingRequest request = strategy.generateScalingRequest(analysis);

if (request != null) {
log.info("创建扩容请求: scalingType={}, targetSize={}, reason={}",
request.getScalingType(), request.getTargetSize(), request.getReason());
}

return request;

} catch (Exception e) {
log.error("创建扩容请求失败", e);
return null;
}
}

/**
* 执行扩容
*/
public boolean performScaling(ScalingRequest request) {
try {
// 1. 验证扩容请求
validateScalingRequest(request);

// 2. 检查扩容限制
if (!checkScalingLimits(request)) {
log.warn("扩容请求超出限制: request={}", request);
return false;
}

// 3. 执行扩容操作
boolean success = executeScaling(request);

if (success) {
// 4. 记录扩容历史
recordScalingHistory(request);

// 5. 发送扩容通知
sendScalingNotification(request);

log.info("扩容执行成功: scalingType={}, targetSize={}",
request.getScalingType(), request.getTargetSize());
}

return success;

} catch (Exception e) {
log.error("执行扩容失败: request={}", request, e);
return false;
}
}

/**
* 分析扩容需求
*/
private ScalingAnalysis analyzeScalingNeeded(LoadMetrics currentLoad, DynamicThreadPoolConfig config) {
ScalingAnalysis analysis = new ScalingAnalysis();
analysis.setCurrentLoad(currentLoad);
analysis.setConfig(config);
analysis.setAnalysisTime(System.currentTimeMillis());

// 1. 分析CPU负载
if (currentLoad.getCpuLoad() > config.getCpuThreshold()) {
analysis.setCpuScalingNeeded(true);
analysis.setCpuScalingLevel(calculateScalingLevel(currentLoad.getCpuLoad(), config.getCpuThreshold()));
}

// 2. 分析队列负载
if (currentLoad.getQueueLoad() > config.getQueueThreshold()) {
analysis.setQueueScalingNeeded(true);
analysis.setQueueScalingLevel(calculateScalingLevel(currentLoad.getQueueLoad(), config.getQueueThreshold()));
}

// 3. 分析响应时间
if (currentLoad.getAverageResponseTime() > config.getResponseTimeThreshold()) {
analysis.setResponseTimeScalingNeeded(true);
analysis.setResponseTimeScalingLevel(calculateScalingLevel(
(double) currentLoad.getAverageResponseTime() / config.getResponseTimeThreshold(), 1.0));
}

// 4. 分析吞吐量
if (currentLoad.getThroughput() < config.getMinThroughput()) {
analysis.setThroughputScalingNeeded(true);
}

// 5. 确定是否需要扩容
analysis.setScalingNeeded(analysis.isCpuScalingNeeded() ||
analysis.isQueueScalingNeeded() ||
analysis.isResponseTimeScalingNeeded() ||
analysis.isThroughputScalingNeeded());

return analysis;
}

/**
* 计算扩容级别
*/
private ScalingLevel calculateScalingLevel(double currentValue, double threshold) {
double ratio = currentValue / threshold;

if (ratio >= 2.0) {
return ScalingLevel.AGGRESSIVE;
} else if (ratio >= 1.5) {
return ScalingLevel.MODERATE;
} else if (ratio >= 1.2) {
return ScalingLevel.CONSERVATIVE;
} else {
return ScalingLevel.MINIMAL;
}
}

/**
* 执行扩容操作
*/
private boolean executeScaling(ScalingRequest request) {
try {
switch (request.getScalingType()) {
case VERTICAL_SCALING:
return performVerticalScaling(request);
case HORIZONTAL_SCALING:
return performHorizontalScaling(request);
case ELASTIC_SCALING:
return performElasticScaling(request);
case PREEMPTIVE_SCALING:
return performPreemptiveScaling(request);
default:
log.warn("不支持的扩容类型: {}", request.getScalingType());
return false;
}
} catch (Exception e) {
log.error("执行扩容操作失败: scalingType={}", request.getScalingType(), e);
return false;
}
}

/**
* 执行垂直扩容
*/
private boolean performVerticalScaling(ScalingRequest request) {
try {
// 垂直扩容:增加单个线程池的线程数量
log.info("执行垂直扩容: targetSize={}", request.getTargetSize());

// 这里应该调用线程池的调整方法
// 实际实现中需要注入线程池实例

return true;

} catch (Exception e) {
log.error("执行垂直扩容失败", e);
return false;
}
}

/**
* 执行水平扩容
*/
private boolean performHorizontalScaling(ScalingRequest request) {
try {
// 水平扩容:创建新的线程池实例
log.info("执行水平扩容: targetSize={}", request.getTargetSize());

// 这里应该创建新的线程池实例
// 实际实现中需要线程池管理器

return true;

} catch (Exception e) {
log.error("执行水平扩容失败", e);
return false;
}
}

/**
* 执行弹性扩容
*/
private boolean performElasticScaling(ScalingRequest request) {
try {
// 弹性扩容:根据负载动态调整
log.info("执行弹性扩容: targetSize={}", request.getTargetSize());

// 这里应该实现弹性扩容逻辑
// 实际实现中需要负载均衡器

return true;

} catch (Exception e) {
log.error("执行弹性扩容失败", e);
return false;
}
}

/**
* 执行预扩容
*/
private boolean performPreemptiveScaling(ScalingRequest request) {
try {
// 预扩容:提前扩容应对预期负载
log.info("执行预扩容: targetSize={}", request.getTargetSize());

// 这里应该实现预扩容逻辑
// 实际实现中需要负载预测器

return true;

} catch (Exception e) {
log.error("执行预扩容失败", e);
return false;
}
}

/**
* 验证扩容请求
*/
private void validateScalingRequest(ScalingRequest request) {
if (request == null) {
throw new IllegalArgumentException("扩容请求不能为空");
}

if (request.getScalingType() == null) {
throw new IllegalArgumentException("扩容类型不能为空");
}

if (request.getTargetSize() <= 0) {
throw new IllegalArgumentException("目标大小必须大于0");
}
}

/**
* 检查扩容限制
*/
private boolean checkScalingLimits(ScalingRequest request) {
// 检查最大线程数限制
if (request.getTargetSize() > 1000) {
log.warn("扩容请求超出最大线程数限制: targetSize={}", request.getTargetSize());
return false;
}

// 检查扩容频率限制
String poolName = "default"; // 这里应该从上下文获取
ScalingHistory history = scalingHistories.get(poolName);

if (history != null) {
long lastScalingTime = history.getLastScalingTime();
long currentTime = System.currentTimeMillis();

if (currentTime - lastScalingTime < 30000) { // 30秒内不允许重复扩容
log.warn("扩容请求过于频繁: lastScalingTime={}", lastScalingTime);
return false;
}
}

return true;
}

/**
* 记录扩容历史
*/
private void recordScalingHistory(ScalingRequest request) {
String poolName = "default"; // 这里应该从上下文获取
ScalingHistory history = scalingHistories.computeIfAbsent(poolName, k -> new ScalingHistory());

ScalingRecord record = new ScalingRecord();
record.setPoolName(poolName);
record.setScalingType(request.getScalingType());
record.setTargetSize(request.getTargetSize());
record.setReason(request.getReason());
record.setTimestamp(System.currentTimeMillis());

history.addRecord(record);

// 只保留最近100条记录
if (history.getRecords().size() > 100) {
history.getRecords().remove(0);
}
}

/**
* 发送扩容通知
*/
private void sendScalingNotification(ScalingRequest request) {
// 发送扩容通知
log.info("发送扩容通知: scalingType={}, targetSize={}",
request.getScalingType(), request.getTargetSize());
}
}

// 扩容分析
public class ScalingAnalysis {
private LoadMetrics currentLoad;
private DynamicThreadPoolConfig config;
private boolean scalingNeeded;
private boolean cpuScalingNeeded;
private boolean queueScalingNeeded;
private boolean responseTimeScalingNeeded;
private boolean throughputScalingNeeded;
private ScalingLevel cpuScalingLevel;
private ScalingLevel queueScalingLevel;
private ScalingLevel responseTimeScalingLevel;
private long analysisTime;

// 构造函数和getter/setter方法
}

// 扩容级别
public enum ScalingLevel {
MINIMAL, // 最小扩容
CONSERVATIVE, // 保守扩容
MODERATE, // 适度扩容
AGGRESSIVE // 激进扩容
}

// 扩容请求
public class ScalingRequest {
private ScalingType scalingType;
private int targetSize;
private String reason;
private double confidence;
private long timestamp;

public enum ScalingType {
VERTICAL_SCALING, // 垂直扩容
HORIZONTAL_SCALING, // 水平扩容
ELASTIC_SCALING, // 弹性扩容
PREEMPTIVE_SCALING // 预扩容
}

// 构造函数和getter/setter方法
}

// 扩容记录
public class ScalingRecord {
private String poolName;
private ScalingRequest.ScalingType scalingType;
private int targetSize;
private String reason;
private long timestamp;

// 构造函数和getter/setter方法
}

// 扩容历史
public class ScalingHistory {
private final List<ScalingRecord> records = new ArrayList<>();

public void addRecord(ScalingRecord record) {
records.add(record);
}

public long getLastScalingTime() {
return records.isEmpty() ? 0 : records.get(records.size() - 1).getTimestamp();
}

// getter方法
}

三、动态线程池控制器

3.1 动态线程池控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
// 动态线程池控制器
@RestController
@RequestMapping("/api/dynamic-thread-pool")
@Slf4j
public class DynamicThreadPoolController {

@Autowired
private DynamicThreadPool dynamicThreadPool;

@Autowired
private LoadMonitorService loadMonitorService;

@Autowired
private AdaptiveAdjusterService adaptiveAdjusterService;

@Autowired
private IntelligentScalerService intelligentScalerService;

/**
* 获取线程池状态
*/
@GetMapping("/status")
public ResponseEntity<DynamicThreadPoolStatus> getStatus() {
try {
DynamicThreadPoolStatus status = dynamicThreadPool.getStatus();
return ResponseEntity.ok(status);
} catch (Exception e) {
log.error("获取线程池状态失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取负载指标
*/
@GetMapping("/metrics")
public ResponseEntity<LoadMetrics> getMetrics() {
try {
LoadMetrics metrics = loadMonitorService.collectMetrics(dynamicThreadPool.getExecutor());
return ResponseEntity.ok(metrics);
} catch (Exception e) {
log.error("获取负载指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取负载趋势分析
*/
@GetMapping("/trend-analysis")
public ResponseEntity<LoadTrendAnalysis> getTrendAnalysis() {
try {
LoadMetrics currentLoad = loadMonitorService.collectMetrics(dynamicThreadPool.getExecutor());
LoadTrendAnalysis analysis = loadMonitorService.analyzeTrend(currentLoad);
return ResponseEntity.ok(analysis);
} catch (Exception e) {
log.error("获取负载趋势分析失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取调整建议
*/
@GetMapping("/adjustment-recommendation")
public ResponseEntity<AdjustmentRecommendation> getAdjustmentRecommendation() {
try {
LoadMetrics currentLoad = loadMonitorService.collectMetrics(dynamicThreadPool.getExecutor());
AdjustmentRecommendation recommendation = adaptiveAdjusterService.getRecommendation(
currentLoad, dynamicThreadPool.getConfig());
return ResponseEntity.ok(recommendation);
} catch (Exception e) {
log.error("获取调整建议失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 手动调整线程池
*/
@PostMapping("/adjust")
public ResponseEntity<Void> adjustThreadPool(@RequestBody AdjustmentRequest request) {
try {
boolean success = dynamicThreadPool.adjustThreadPool(request);

if (success) {
return ResponseEntity.ok().build();
} else {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).build();
}

} catch (Exception e) {
log.error("手动调整线程池失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 触发扩容
*/
@PostMapping("/scale")
public ResponseEntity<Void> triggerScaling() {
try {
LoadMetrics currentLoad = loadMonitorService.collectMetrics(dynamicThreadPool.getExecutor());
ScalingRequest request = intelligentScalerService.createScalingRequest(
currentLoad, dynamicThreadPool.getConfig());

if (request != null) {
boolean success = intelligentScalerService.performScaling(request);

if (success) {
return ResponseEntity.ok().build();
} else {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).build();
}
} else {
return ResponseEntity.status(HttpStatus.NOT_FOUND).build();
}

} catch (Exception e) {
log.error("触发扩容失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 执行任务
*/
@PostMapping("/execute")
public ResponseEntity<CompletableFuture<Void>> executeTask(@RequestBody TaskRequest request) {
try {
CompletableFuture<Void> future = dynamicThreadPool.execute(() -> {
log.info("执行任务: taskId={}", request.getTaskId());
// 执行任务逻辑
});

return ResponseEntity.ok(future);

} catch (Exception e) {
log.error("执行任务失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 提交任务
*/
@PostMapping("/submit")
public ResponseEntity<CompletableFuture<String>> submitTask(@RequestBody TaskRequest request) {
try {
CompletableFuture<String> future = dynamicThreadPool.submit(() -> {
log.info("提交任务: taskId={}", request.getTaskId());
// 执行任务逻辑
return "Task completed: " + request.getTaskId();
});

return ResponseEntity.ok(future);

} catch (Exception e) {
log.error("提交任务失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取调整历史
*/
@GetMapping("/adjustment-history")
public ResponseEntity<List<AdjustmentRecord>> getAdjustmentHistory() {
try {
List<AdjustmentRecord> history = dynamicThreadPool.getAdjustmentHistory();
return ResponseEntity.ok(history);
} catch (Exception e) {
log.error("获取调整历史失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取扩容历史
*/
@GetMapping("/scaling-history")
public ResponseEntity<List<ScalingRecord>> getScalingHistory() {
try {
List<ScalingRecord> history = intelligentScalerService.getScalingHistory("default");
return ResponseEntity.ok(history);
} catch (Exception e) {
log.error("获取扩容历史失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

// 任务请求
public class TaskRequest {
private String taskId;
private String taskName;
private Map<String, Object> parameters;

// 构造函数和getter/setter方法
}

四、最佳实践与总结

4.1 SpringBoot动态线程池最佳实践

4.1.1 自适应调整策略

  • 负载感知:实时感知系统负载变化
  • 动态调整:根据负载自动调整线程数量
  • 平滑过渡:避免线程数量剧烈变化
  • 智能预测:基于历史数据预测负载趋势

4.1.2 智能扩容策略

  • 垂直扩容:增加单个线程池的线程数量
  • 水平扩容:创建新的线程池实例
  • 弹性扩容:根据业务需求弹性调整
  • 预扩容:提前扩容应对预期负载

4.1.3 负载监控策略

  • 实时监控:实时监控系统负载指标
  • 历史分析:分析历史负载数据
  • 趋势预测:预测未来负载趋势
  • 异常告警:负载异常时的及时告警

4.1.4 性能优化策略

  • 参数优化:优化线程池参数配置
  • 策略优化:优化调整和扩容策略
  • 资源优化:优化系统资源使用
  • 监控优化:优化监控数据收集

4.2 架构演进建议

4.2.1 微服务架构演进

  • 服务拆分:将动态线程池服务拆分为独立微服务
  • 服务治理:实现服务的注册发现、负载均衡
  • 配置中心:使用配置中心统一管理配置
  • 容错处理:实现熔断、降级、重试机制

4.2.2 云原生架构演进

  • 容器化部署:使用Docker等容器技术部署
  • 弹性伸缩:实现基于负载的自动扩缩容
  • 服务网格:使用Istio等服务网格技术
  • 云原生监控:集成云原生的监控和告警

4.2.3 智能化管理

  • AI驱动优化:使用机器学习优化线程池参数
  • 预测性调整:预测负载变化并提前调整
  • 智能告警:基于AI的智能告警和故障诊断
  • 自适应管理:根据业务特征自适应管理策略

4.3 总结

SpringBoot动态线程池是企业级应用性能优化的重要工具,通过自适应调整,智能扩容,负载感知,可以实现稳定、高效的动态线程池管理解决方案。随着微服务架构和云原生技术的普及,动态线程池管理将更加智能化和自动化。

在未来的发展中,企业需要持续关注技术发展趋势,不断优化和完善动态线程池管理策略,以适应不断变化的业务需求和技术环境。通过本文的深入分析和实践指导,希望能够为企业构建高质量的动态线程池解决方案提供有价值的参考和帮助,推动企业级应用在并发处理场景下的稳定运行和持续发展。