第235集SpringBoot线程池管理器架构实战:动态配置、智能监控、资源管控的企业级解决方案

前言

在当今高并发、多任务的企业级应用中,线程池管理已成为系统性能和稳定性的关键因素。传统的线程池配置方式往往静态固化、缺乏灵活性,无法适应动态变化的业务负载。基于SpringBoot的线程池管理器,不仅能够提供动态配置能力,还能实现智能监控、资源隔离和自动调优。随着微服务架构和云原生技术的普及,构建可扩展、智能化的线程池管理框架,已成为企业级架构师必须掌握的核心技能。

本文将深入探讨SpringBoot中线程池管理器的架构设计与实战应用,从动态配置到智能监控,从资源管控到性能优化,为企业构建稳定、高效的线程池管理解决方案提供全面的技术指导。

一、SpringBoot线程池管理器架构概述与核心原理

1.1 线程池管理器架构设计

SpringBoot线程池管理器采用分层架构设计,通过动态配置、智能监控、资源管控等技术,实现高效的线程池管理能力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
graph TB
A[线程池请求] --> B[线程池管理器]
B --> C[配置管理器]
C --> D[线程池工厂]
D --> E[线程池实例]
E --> F[任务执行器]
F --> G[监控收集器]
G --> H[性能分析器]
H --> I[动态调优器]

J[线程池类型] --> K[核心线程池]
J --> L[缓存线程池]
J --> M[定时线程池]
J --> N[工作窃取线程池]
J --> O[自定义线程池]

P[配置策略] --> Q[静态配置]
P --> R[动态配置]
P --> S[自适应配置]
P --> T[环境感知配置]

U[监控体系] --> V[性能监控]
U --> W[资源监控]
U --> X[异常监控]
U --> Y[业务监控]

Z[管控策略] --> AA[资源隔离]
Z --> BB[负载均衡]
Z --> CC[熔断降级]
Z --> DD[弹性伸缩]

1.2 线程池管理器核心特性

1.2.1 动态配置能力

  • 实时配置:支持运行时动态调整线程池参数
  • 配置热更新:无需重启应用即可更新配置
  • 环境感知:根据运行环境自动调整配置
  • 配置验证:智能验证配置参数的有效性

1.2.2 智能监控体系

  • 性能监控:实时监控线程池性能指标
  • 资源监控:监控CPU、内存等系统资源
  • 异常监控:监控线程池异常和错误
  • 业务监控:监控业务相关的线程池指标

1.2.3 资源管控机制

  • 资源隔离:不同业务使用独立的线程池
  • 负载均衡:智能分配任务到合适的线程池
  • 熔断降级:线程池过载时的自动保护
  • 弹性伸缩:根据负载自动调整线程池大小

二、SpringBoot线程池管理器核心实现

2.1 线程池管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
// 线程池管理器
@Service
@Slf4j
public class ThreadPoolManager {

@Autowired
private ThreadPoolConfigManager configManager;

@Autowired
private ThreadPoolMonitorService monitorService;

@Autowired
private ThreadPoolDynamicAdjuster dynamicAdjuster;

private final Map<String, ManagedThreadPool> threadPools = new ConcurrentHashMap<>();
private final Map<String, ThreadPoolConfig> configs = new ConcurrentHashMap<>();

private final ScheduledExecutorService configMonitor = Executors.newSingleThreadScheduledExecutor();

@PostConstruct
public void init() {
// 1. 加载线程池配置
loadThreadPoolConfigs();

// 2. 创建默认线程池
createDefaultThreadPools();

// 3. 启动配置监控
startConfigMonitoring();

log.info("线程池管理器初始化完成");
}

/**
* 创建线程池
*/
public ManagedThreadPool createThreadPool(String poolName, ThreadPoolConfig config) {
try {
// 1. 验证配置
validateThreadPoolConfig(config);

// 2. 创建线程池
ManagedThreadPool threadPool = createManagedThreadPool(poolName, config);

// 3. 注册线程池
threadPools.put(poolName, threadPool);
configs.put(poolName, config);

// 4. 启动监控
monitorService.startMonitoring(poolName, threadPool);

log.info("创建线程池成功: poolName={}, config={}", poolName, config);

return threadPool;

} catch (Exception e) {
log.error("创建线程池失败: poolName={}", poolName, e);
throw new RuntimeException("创建线程池失败", e);
}
}

/**
* 获取线程池
*/
public ManagedThreadPool getThreadPool(String poolName) {
return threadPools.get(poolName);
}

/**
* 动态调整线程池配置
*/
public boolean adjustThreadPoolConfig(String poolName, ThreadPoolConfig newConfig) {
try {
ManagedThreadPool threadPool = threadPools.get(poolName);
if (threadPool == null) {
log.warn("线程池不存在: poolName={}", poolName);
return false;
}

// 1. 验证新配置
validateThreadPoolConfig(newConfig);

// 2. 执行动态调整
boolean success = dynamicAdjuster.adjustThreadPool(threadPool, newConfig);

if (success) {
// 3. 更新配置
configs.put(poolName, newConfig);

// 4. 记录调整日志
log.info("线程池配置调整成功: poolName={}, newConfig={}", poolName, newConfig);
}

return success;

} catch (Exception e) {
log.error("调整线程池配置失败: poolName={}", poolName, e);
return false;
}
}

/**
* 销毁线程池
*/
public boolean destroyThreadPool(String poolName) {
try {
ManagedThreadPool threadPool = threadPools.remove(poolName);
if (threadPool == null) {
log.warn("线程池不存在: poolName={}", poolName);
return false;
}

// 1. 停止监控
monitorService.stopMonitoring(poolName);

// 2. 优雅关闭线程池
threadPool.shutdown();

// 3. 清理配置
configs.remove(poolName);

log.info("销毁线程池成功: poolName={}", poolName);

return true;

} catch (Exception e) {
log.error("销毁线程池失败: poolName={}", poolName, e);
return false;
}
}

/**
* 获取所有线程池状态
*/
public Map<String, ThreadPoolStatus> getAllThreadPoolStatus() {
Map<String, ThreadPoolStatus> statusMap = new HashMap<>();

threadPools.forEach((poolName, threadPool) -> {
ThreadPoolStatus status = threadPool.getStatus();
statusMap.put(poolName, status);
});

return statusMap;
}

/**
* 获取线程池统计信息
*/
public Map<String, ThreadPoolStatistics> getAllThreadPoolStatistics() {
Map<String, ThreadPoolStatistics> statisticsMap = new HashMap<>();

threadPools.forEach((poolName, threadPool) -> {
ThreadPoolStatistics statistics = monitorService.getStatistics(poolName);
if (statistics != null) {
statisticsMap.put(poolName, statistics);
}
});

return statisticsMap;
}

/**
* 执行任务
*/
public CompletableFuture<Void> executeTask(String poolName, Runnable task) {
ManagedThreadPool threadPool = threadPools.get(poolName);
if (threadPool == null) {
return CompletableFuture.failedFuture(new IllegalArgumentException("线程池不存在: " + poolName));
}

return threadPool.executeTask(task);
}

/**
* 提交任务
*/
public <T> CompletableFuture<T> submitTask(String poolName, Callable<T> task) {
ManagedThreadPool threadPool = threadPools.get(poolName);
if (threadPool == null) {
return CompletableFuture.failedFuture(new IllegalArgumentException("线程池不存在: " + poolName));
}

return threadPool.submitTask(task);
}

/**
* 创建托管线程池
*/
private ManagedThreadPool createManagedThreadPool(String poolName, ThreadPoolConfig config) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
config.getCorePoolSize(),
config.getMaximumPoolSize(),
config.getKeepAliveTime(),
config.getTimeUnit(),
createBlockingQueue(config),
createThreadFactory(poolName),
createRejectedExecutionHandler(config)
);

// 设置线程池属性
executor.setThreadFactory(createThreadFactory(poolName));
executor.setRejectedExecutionHandler(createRejectedExecutionHandler(config));

// 允许核心线程超时
executor.allowCoreThreadTimeOut(config.isAllowCoreThreadTimeOut());

return new ManagedThreadPool(poolName, executor, config);
}

/**
* 创建阻塞队列
*/
private BlockingQueue<Runnable> createBlockingQueue(ThreadPoolConfig config) {
switch (config.getQueueType()) {
case LINKED_BLOCKING_QUEUE:
return new LinkedBlockingQueue<>(config.getQueueCapacity());
case ARRAY_BLOCKING_QUEUE:
return new ArrayBlockingQueue<>(config.getQueueCapacity());
case SYNCHRONOUS_QUEUE:
return new SynchronousQueue<>();
case PRIORITY_BLOCKING_QUEUE:
return new PriorityBlockingQueue<>(config.getQueueCapacity());
case DELAY_QUEUE:
return new DelayQueue<>();
default:
return new LinkedBlockingQueue<>(config.getQueueCapacity());
}
}

/**
* 创建线程工厂
*/
private ThreadFactory createThreadFactory(String poolName) {
return new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = poolName + "-thread-";

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
};
}

/**
* 创建拒绝策略
*/
private RejectedExecutionHandler createRejectedExecutionHandler(ThreadPoolConfig config) {
switch (config.getRejectedExecutionHandlerType()) {
case ABORT_POLICY:
return new ThreadPoolExecutor.AbortPolicy();
case CALLER_RUNS_POLICY:
return new ThreadPoolExecutor.CallerRunsPolicy();
case DISCARD_POLICY:
return new ThreadPoolExecutor.DiscardPolicy();
case DISCARD_OLDEST_POLICY:
return new ThreadPoolExecutor.DiscardOldestPolicy();
case CUSTOM_POLICY:
return new CustomRejectedExecutionHandler(config.getCustomRejectedExecutionHandler());
default:
return new ThreadPoolExecutor.CallerRunsPolicy();
}
}

/**
* 验证线程池配置
*/
private void validateThreadPoolConfig(ThreadPoolConfig config) {
if (config == null) {
throw new IllegalArgumentException("线程池配置不能为空");
}

if (config.getCorePoolSize() < 0) {
throw new IllegalArgumentException("核心线程数不能小于0");
}

if (config.getMaximumPoolSize() <= 0) {
throw new IllegalArgumentException("最大线程数必须大于0");
}

if (config.getCorePoolSize() > config.getMaximumPoolSize()) {
throw new IllegalArgumentException("核心线程数不能大于最大线程数");
}

if (config.getKeepAliveTime() < 0) {
throw new IllegalArgumentException("线程存活时间不能小于0");
}

if (config.getQueueCapacity() <= 0) {
throw new IllegalArgumentException("队列容量必须大于0");
}
}

/**
* 加载线程池配置
*/
private void loadThreadPoolConfigs() {
try {
List<ThreadPoolConfig> configs = configManager.loadAllConfigs();
for (ThreadPoolConfig config : configs) {
this.configs.put(config.getPoolName(), config);
}
log.info("加载线程池配置完成: count={}", configs.size());
} catch (Exception e) {
log.error("加载线程池配置失败", e);
}
}

/**
* 创建默认线程池
*/
private void createDefaultThreadPools() {
// 1. 创建核心线程池
ThreadPoolConfig coreConfig = new ThreadPoolConfig();
coreConfig.setPoolName("core-pool");
coreConfig.setCorePoolSize(10);
coreConfig.setMaximumPoolSize(50);
coreConfig.setKeepAliveTime(60L);
coreConfig.setTimeUnit(TimeUnit.SECONDS);
coreConfig.setQueueType(QueueType.LINKED_BLOCKING_QUEUE);
coreConfig.setQueueCapacity(1000);
coreConfig.setRejectedExecutionHandlerType(RejectedExecutionHandlerType.CALLER_RUNS_POLICY);
coreConfig.setAllowCoreThreadTimeOut(false);

createThreadPool("core-pool", coreConfig);

// 2. 创建缓存线程池
ThreadPoolConfig cacheConfig = new ThreadPoolConfig();
cacheConfig.setPoolName("cache-pool");
cacheConfig.setCorePoolSize(0);
cacheConfig.setMaximumPoolSize(Integer.MAX_VALUE);
cacheConfig.setKeepAliveTime(60L);
cacheConfig.setTimeUnit(TimeUnit.SECONDS);
cacheConfig.setQueueType(QueueType.SYNCHRONOUS_QUEUE);
cacheConfig.setQueueCapacity(0);
cacheConfig.setRejectedExecutionHandlerType(RejectedExecutionHandlerType.ABORT_POLICY);
cacheConfig.setAllowCoreThreadTimeOut(true);

createThreadPool("cache-pool", cacheConfig);

// 3. 创建定时线程池
ThreadPoolConfig scheduledConfig = new ThreadPoolConfig();
scheduledConfig.setPoolName("scheduled-pool");
scheduledConfig.setCorePoolSize(5);
scheduledConfig.setMaximumPoolSize(20);
scheduledConfig.setKeepAliveTime(60L);
scheduledConfig.setTimeUnit(TimeUnit.SECONDS);
scheduledConfig.setQueueType(QueueType.LINKED_BLOCKING_QUEUE);
scheduledConfig.setQueueCapacity(500);
scheduledConfig.setRejectedExecutionHandlerType(RejectedExecutionHandlerType.CALLER_RUNS_POLICY);
scheduledConfig.setAllowCoreThreadTimeOut(false);

createThreadPool("scheduled-pool", scheduledConfig);
}

/**
* 启动配置监控
*/
private void startConfigMonitoring() {
configMonitor.scheduleAtFixedRate(() -> {
try {
// 检查配置变更
checkConfigChanges();

// 执行自动调优
performAutoTuning();

} catch (Exception e) {
log.error("配置监控异常", e);
}
}, 30, 30, TimeUnit.SECONDS);
}

/**
* 检查配置变更
*/
private void checkConfigChanges() {
try {
List<ThreadPoolConfig> latestConfigs = configManager.loadAllConfigs();

for (ThreadPoolConfig latestConfig : latestConfigs) {
String poolName = latestConfig.getPoolName();
ThreadPoolConfig currentConfig = configs.get(poolName);

if (currentConfig == null || !currentConfig.equals(latestConfig)) {
// 配置发生变更,需要更新
if (threadPools.containsKey(poolName)) {
adjustThreadPoolConfig(poolName, latestConfig);
} else {
createThreadPool(poolName, latestConfig);
}
}
}

} catch (Exception e) {
log.error("检查配置变更失败", e);
}
}

/**
* 执行自动调优
*/
private void performAutoTuning() {
try {
threadPools.forEach((poolName, threadPool) -> {
try {
dynamicAdjuster.performAutoTuning(threadPool);
} catch (Exception e) {
log.error("自动调优失败: poolName={}", poolName, e);
}
});
} catch (Exception e) {
log.error("执行自动调优失败", e);
}
}

@PreDestroy
public void destroy() {
// 1. 停止配置监控
configMonitor.shutdown();

// 2. 关闭所有线程池
threadPools.values().forEach(ManagedThreadPool::shutdown);

// 3. 停止所有监控
monitorService.stopAllMonitoring();

log.info("线程池管理器已关闭");
}
}

// 托管线程池
public class ManagedThreadPool {
private final String poolName;
private final ThreadPoolExecutor executor;
private final ThreadPoolConfig config;
private final AtomicLong totalTasks = new AtomicLong(0);
private final AtomicLong completedTasks = new AtomicLong(0);
private final AtomicLong failedTasks = new AtomicLong(0);

public ManagedThreadPool(String poolName, ThreadPoolExecutor executor, ThreadPoolConfig config) {
this.poolName = poolName;
this.executor = executor;
this.config = config;
}

/**
* 执行任务
*/
public CompletableFuture<Void> executeTask(Runnable task) {
totalTasks.incrementAndGet();

return CompletableFuture.runAsync(() -> {
try {
task.run();
completedTasks.incrementAndGet();
} catch (Exception e) {
failedTasks.incrementAndGet();
throw e;
}
}, executor);
}

/**
* 提交任务
*/
public <T> CompletableFuture<T> submitTask(Callable<T> task) {
totalTasks.incrementAndGet();

return CompletableFuture.supplyAsync(() -> {
try {
T result = task.call();
completedTasks.incrementAndGet();
return result;
} catch (Exception e) {
failedTasks.incrementAndGet();
throw new RuntimeException(e);
}
}, executor);
}

/**
* 获取线程池状态
*/
public ThreadPoolStatus getStatus() {
ThreadPoolStatus status = new ThreadPoolStatus();
status.setPoolName(poolName);
status.setCorePoolSize(executor.getCorePoolSize());
status.setMaximumPoolSize(executor.getMaximumPoolSize());
status.setCurrentPoolSize(executor.getPoolSize());
status.setActiveThreadCount(executor.getActiveCount());
status.setLargestPoolSize(executor.getLargestPoolSize());
status.setTaskCount(executor.getTaskCount());
status.setCompletedTaskCount(executor.getCompletedTaskCount());
status.setQueueSize(executor.getQueue().size());
status.setQueueRemainingCapacity(executor.getQueue().remainingCapacity());
status.setTotalTasks(totalTasks.get());
status.setCompletedTasks(completedTasks.get());
status.setFailedTasks(failedTasks.get());
status.setIsShutdown(executor.isShutdown());
status.setIsTerminated(executor.isTerminated());

return status;
}

/**
* 关闭线程池
*/
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}

// getter方法
}

// 线程池配置
public class ThreadPoolConfig {
private String poolName;
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
private TimeUnit timeUnit;
private QueueType queueType;
private int queueCapacity;
private RejectedExecutionHandlerType rejectedExecutionHandlerType;
private String customRejectedExecutionHandler;
private boolean allowCoreThreadTimeOut;
private String description;
private Map<String, Object> customProperties;

// 构造函数和getter/setter方法
}

// 队列类型
public enum QueueType {
LINKED_BLOCKING_QUEUE,
ARRAY_BLOCKING_QUEUE,
SYNCHRONOUS_QUEUE,
PRIORITY_BLOCKING_QUEUE,
DELAY_QUEUE
}

// 拒绝策略类型
public enum RejectedExecutionHandlerType {
ABORT_POLICY,
CALLER_RUNS_POLICY,
DISCARD_POLICY,
DISCARD_OLDEST_POLICY,
CUSTOM_POLICY
}

// 线程池状态
public class ThreadPoolStatus {
private String poolName;
private int corePoolSize;
private int maximumPoolSize;
private int currentPoolSize;
private int activeThreadCount;
private int largestPoolSize;
private long taskCount;
private long completedTaskCount;
private int queueSize;
private int queueRemainingCapacity;
private long totalTasks;
private long completedTasks;
private long failedTasks;
private boolean isShutdown;
private boolean isTerminated;

// 构造函数和getter/setter方法
}

2.2 线程池配置管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
// 线程池配置管理器
@Service
@Slf4j
public class ThreadPoolConfigManager {

@Autowired
private ThreadPoolConfigRepository configRepository;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private static final String CONFIG_CACHE_PREFIX = "thread_pool_config:";
private static final long CONFIG_CACHE_TTL = 3600; // 1小时

/**
* 保存线程池配置
*/
public ThreadPoolConfig saveConfig(ThreadPoolConfig config) {
try {
// 1. 验证配置
validateConfig(config);

// 2. 保存到数据库
ThreadPoolConfig savedConfig = configRepository.save(config);

// 3. 更新缓存
updateCache(savedConfig);

log.info("保存线程池配置成功: poolName={}", savedConfig.getPoolName());

return savedConfig;

} catch (Exception e) {
log.error("保存线程池配置失败: poolName={}", config.getPoolName(), e);
throw new RuntimeException("保存线程池配置失败", e);
}
}

/**
* 获取线程池配置
*/
public ThreadPoolConfig getConfig(String poolName) {
try {
// 1. 先从缓存获取
String cacheKey = CONFIG_CACHE_PREFIX + poolName;
ThreadPoolConfig cachedConfig = (ThreadPoolConfig) redisTemplate.opsForValue().get(cacheKey);

if (cachedConfig != null) {
log.debug("从缓存获取线程池配置: poolName={}", poolName);
return cachedConfig;
}

// 2. 从数据库获取
ThreadPoolConfig config = configRepository.findByPoolName(poolName);

if (config != null) {
// 3. 更新缓存
redisTemplate.opsForValue().set(cacheKey, config, CONFIG_CACHE_TTL, TimeUnit.SECONDS);
log.debug("从数据库获取线程池配置并缓存: poolName={}", poolName);
}

return config;

} catch (Exception e) {
log.error("获取线程池配置失败: poolName={}", poolName, e);
return null;
}
}

/**
* 删除线程池配置
*/
public boolean deleteConfig(String poolName) {
try {
// 1. 从数据库删除
boolean deleted = configRepository.deleteByPoolName(poolName);

if (deleted) {
// 2. 从缓存删除
String cacheKey = CONFIG_CACHE_PREFIX + poolName;
redisTemplate.delete(cacheKey);

log.info("删除线程池配置成功: poolName={}", poolName);
}

return deleted;

} catch (Exception e) {
log.error("删除线程池配置失败: poolName={}", poolName, e);
return false;
}
}

/**
* 获取所有配置
*/
public List<ThreadPoolConfig> loadAllConfigs() {
try {
// 1. 先从缓存获取
Set<String> cacheKeys = redisTemplate.keys(CONFIG_CACHE_PREFIX + "*");
if (cacheKeys != null && !cacheKeys.isEmpty()) {
List<ThreadPoolConfig> cachedConfigs = new ArrayList<>();
for (String cacheKey : cacheKeys) {
ThreadPoolConfig config = (ThreadPoolConfig) redisTemplate.opsForValue().get(cacheKey);
if (config != null) {
cachedConfigs.add(config);
}
}

if (!cachedConfigs.isEmpty()) {
log.debug("从缓存获取所有线程池配置: count={}", cachedConfigs.size());
return cachedConfigs;
}
}

// 2. 从数据库获取
List<ThreadPoolConfig> configs = configRepository.findAll();

// 3. 更新缓存
for (ThreadPoolConfig config : configs) {
updateCache(config);
}

log.debug("从数据库获取所有线程池配置: count={}", configs.size());

return configs;

} catch (Exception e) {
log.error("获取所有线程池配置失败", e);
return new ArrayList<>();
}
}

/**
* 根据环境获取配置
*/
public List<ThreadPoolConfig> getConfigsByEnvironment(String environment) {
try {
return configRepository.findByEnvironment(environment);
} catch (Exception e) {
log.error("根据环境获取线程池配置失败: environment={}", environment, e);
return new ArrayList<>();
}
}

/**
* 复制配置
*/
public ThreadPoolConfig copyConfig(String sourcePoolName, String targetPoolName) {
try {
// 1. 获取源配置
ThreadPoolConfig sourceConfig = getConfig(sourcePoolName);
if (sourceConfig == null) {
throw new IllegalArgumentException("源配置不存在: " + sourcePoolName);
}

// 2. 创建目标配置
ThreadPoolConfig targetConfig = new ThreadPoolConfig();
targetConfig.setPoolName(targetPoolName);
targetConfig.setCorePoolSize(sourceConfig.getCorePoolSize());
targetConfig.setMaximumPoolSize(sourceConfig.getMaximumPoolSize());
targetConfig.setKeepAliveTime(sourceConfig.getKeepAliveTime());
targetConfig.setTimeUnit(sourceConfig.getTimeUnit());
targetConfig.setQueueType(sourceConfig.getQueueType());
targetConfig.setQueueCapacity(sourceConfig.getQueueCapacity());
targetConfig.setRejectedExecutionHandlerType(sourceConfig.getRejectedExecutionHandlerType());
targetConfig.setCustomRejectedExecutionHandler(sourceConfig.getCustomRejectedExecutionHandler());
targetConfig.setAllowCoreThreadTimeOut(sourceConfig.isAllowCoreThreadTimeOut());
targetConfig.setDescription("复制自: " + sourceConfig.getPoolName());

// 3. 保存目标配置
return saveConfig(targetConfig);

} catch (Exception e) {
log.error("复制线程池配置失败: sourcePoolName={}, targetPoolName={}",
sourcePoolName, targetPoolName, e);
throw new RuntimeException("复制线程池配置失败", e);
}
}

/**
* 批量更新配置
*/
public List<ThreadPoolConfig> batchUpdateConfigs(List<ThreadPoolConfig> configs) {
List<ThreadPoolConfig> updatedConfigs = new ArrayList<>();

for (ThreadPoolConfig config : configs) {
try {
ThreadPoolConfig updatedConfig = saveConfig(config);
updatedConfigs.add(updatedConfig);
} catch (Exception e) {
log.error("批量更新配置失败: poolName={}", config.getPoolName(), e);
}
}

return updatedConfigs;
}

/**
* 验证配置
*/
private void validateConfig(ThreadPoolConfig config) {
if (config == null) {
throw new IllegalArgumentException("线程池配置不能为空");
}

if (config.getPoolName() == null || config.getPoolName().trim().isEmpty()) {
throw new IllegalArgumentException("线程池名称不能为空");
}

if (config.getCorePoolSize() < 0) {
throw new IllegalArgumentException("核心线程数不能小于0");
}

if (config.getMaximumPoolSize() <= 0) {
throw new IllegalArgumentException("最大线程数必须大于0");
}

if (config.getCorePoolSize() > config.getMaximumPoolSize()) {
throw new IllegalArgumentException("核心线程数不能大于最大线程数");
}

if (config.getKeepAliveTime() < 0) {
throw new IllegalArgumentException("线程存活时间不能小于0");
}

if (config.getQueueCapacity() <= 0) {
throw new IllegalArgumentException("队列容量必须大于0");
}
}

/**
* 更新缓存
*/
private void updateCache(ThreadPoolConfig config) {
try {
String cacheKey = CONFIG_CACHE_PREFIX + config.getPoolName();
redisTemplate.opsForValue().set(cacheKey, config, CONFIG_CACHE_TTL, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("更新缓存失败: poolName={}", config.getPoolName(), e);
}
}
}

// 线程池配置仓库
@Repository
public interface ThreadPoolConfigRepository {

ThreadPoolConfig findByPoolName(String poolName);

ThreadPoolConfig save(ThreadPoolConfig config);

boolean deleteByPoolName(String poolName);

List<ThreadPoolConfig> findAll();

List<ThreadPoolConfig> findByEnvironment(String environment);
}

2.3 线程池监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
// 线程池监控服务
@Service
@Slf4j
public class ThreadPoolMonitorService {

private final Map<String, ThreadPoolMonitor> monitors = new ConcurrentHashMap<>();
private final ScheduledExecutorService monitorExecutor = Executors.newSingleThreadScheduledExecutor();

@Autowired
private ThreadPoolAlertService alertService;

@PostConstruct
public void init() {
// 启动监控任务
monitorExecutor.scheduleAtFixedRate(this::collectMetrics, 5, 5, TimeUnit.SECONDS);

log.info("线程池监控服务初始化完成");
}

/**
* 开始监控线程池
*/
public void startMonitoring(String poolName, ManagedThreadPool threadPool) {
try {
ThreadPoolMonitor monitor = new ThreadPoolMonitor(poolName, threadPool);
monitors.put(poolName, monitor);

log.info("开始监控线程池: poolName={}", poolName);

} catch (Exception e) {
log.error("开始监控线程池失败: poolName={}", poolName, e);
}
}

/**
* 停止监控线程池
*/
public void stopMonitoring(String poolName) {
try {
ThreadPoolMonitor monitor = monitors.remove(poolName);
if (monitor != null) {
monitor.stop();
log.info("停止监控线程池: poolName={}", poolName);
}
} catch (Exception e) {
log.error("停止监控线程池失败: poolName={}", poolName, e);
}
}

/**
* 停止所有监控
*/
public void stopAllMonitoring() {
try {
monitors.values().forEach(ThreadPoolMonitor::stop);
monitors.clear();

log.info("停止所有线程池监控");

} catch (Exception e) {
log.error("停止所有监控失败", e);
}
}

/**
* 获取线程池统计信息
*/
public ThreadPoolStatistics getStatistics(String poolName) {
ThreadPoolMonitor monitor = monitors.get(poolName);
return monitor != null ? monitor.getStatistics() : null;
}

/**
* 获取所有线程池统计信息
*/
public Map<String, ThreadPoolStatistics> getAllStatistics() {
Map<String, ThreadPoolStatistics> statisticsMap = new HashMap<>();

monitors.forEach((poolName, monitor) -> {
ThreadPoolStatistics statistics = monitor.getStatistics();
if (statistics != null) {
statisticsMap.put(poolName, statistics);
}
});

return statisticsMap;
}

/**
* 获取线程池健康状态
*/
public ThreadPoolHealthStatus getHealthStatus(String poolName) {
ThreadPoolMonitor monitor = monitors.get(poolName);
return monitor != null ? monitor.getHealthStatus() : null;
}

/**
* 获取所有线程池健康状态
*/
public Map<String, ThreadPoolHealthStatus> getAllHealthStatus() {
Map<String, ThreadPoolHealthStatus> healthStatusMap = new HashMap<>();

monitors.forEach((poolName, monitor) -> {
ThreadPoolHealthStatus healthStatus = monitor.getHealthStatus();
if (healthStatus != null) {
healthStatusMap.put(poolName, healthStatus);
}
});

return healthStatusMap;
}

/**
* 收集指标
*/
private void collectMetrics() {
try {
monitors.forEach((poolName, monitor) -> {
try {
// 1. 收集基础指标
monitor.collectBasicMetrics();

// 2. 收集性能指标
monitor.collectPerformanceMetrics();

// 3. 检查健康状态
monitor.checkHealthStatus();

// 4. 检查告警条件
checkAlertConditions(poolName, monitor);

} catch (Exception e) {
log.error("收集指标失败: poolName={}", poolName, e);
}
});

} catch (Exception e) {
log.error("收集指标异常", e);
}
}

/**
* 检查告警条件
*/
private void checkAlertConditions(String poolName, ThreadPoolMonitor monitor) {
try {
ThreadPoolStatistics statistics = monitor.getStatistics();
ThreadPoolHealthStatus healthStatus = monitor.getHealthStatus();

// 1. 检查线程池利用率
if (statistics.getPoolUtilization() > 0.8) {
alertService.sendHighUtilizationAlert(poolName, statistics.getPoolUtilization());
}

// 2. 检查队列使用率
if (statistics.getQueueUtilization() > 0.8) {
alertService.sendHighQueueUtilizationAlert(poolName, statistics.getQueueUtilization());
}

// 3. 检查任务拒绝率
if (statistics.getRejectionRate() > 0.1) {
alertService.sendHighRejectionRateAlert(poolName, statistics.getRejectionRate());
}

// 4. 检查平均响应时间
if (statistics.getAverageResponseTime() > 5000) {
alertService.sendSlowResponseAlert(poolName, statistics.getAverageResponseTime());
}

// 5. 检查健康状态
if (healthStatus.getStatus() == HealthStatus.UNHEALTHY) {
alertService.sendUnhealthyAlert(poolName, healthStatus.getMessage());
}

} catch (Exception e) {
log.error("检查告警条件失败: poolName={}", poolName, e);
}
}
}

// 线程池监控器
public class ThreadPoolMonitor {
private final String poolName;
private final ManagedThreadPool threadPool;
private final ThreadPoolStatistics statistics;
private final ThreadPoolHealthStatus healthStatus;
private final AtomicLong lastCollectTime = new AtomicLong(System.currentTimeMillis());
private final AtomicLong lastTaskCount = new AtomicLong(0);
private final AtomicLong lastCompletedTaskCount = new AtomicLong(0);
private final AtomicLong lastRejectedTaskCount = new AtomicLong(0);

public ThreadPoolMonitor(String poolName, ManagedThreadPool threadPool) {
this.poolName = poolName;
this.threadPool = threadPool;
this.statistics = new ThreadPoolStatistics();
this.healthStatus = new ThreadPoolHealthStatus();

// 初始化统计数据
initializeStatistics();
}

/**
* 收集基础指标
*/
public void collectBasicMetrics() {
try {
ThreadPoolStatus status = threadPool.getStatus();

// 更新基础指标
statistics.setPoolName(poolName);
statistics.setCorePoolSize(status.getCorePoolSize());
statistics.setMaximumPoolSize(status.getMaximumPoolSize());
statistics.setCurrentPoolSize(status.getCurrentPoolSize());
statistics.setActiveThreadCount(status.getActiveThreadCount());
statistics.setLargestPoolSize(status.getLargestPoolSize());
statistics.setTaskCount(status.getTaskCount());
statistics.setCompletedTaskCount(status.getCompletedTaskCount());
statistics.setQueueSize(status.getQueueSize());
statistics.setQueueRemainingCapacity(status.getQueueRemainingCapacity());
statistics.setTotalTasks(status.getTotalTasks());
statistics.setCompletedTasks(status.getCompletedTasks());
statistics.setFailedTasks(status.getFailedTasks());

// 计算利用率
calculateUtilization();

// 计算QPS
calculateQPS();

// 计算拒绝率
calculateRejectionRate();

} catch (Exception e) {
log.error("收集基础指标失败: poolName={}", poolName, e);
}
}

/**
* 收集性能指标
*/
public void collectPerformanceMetrics() {
try {
long currentTime = System.currentTimeMillis();
long lastTime = lastCollectTime.getAndSet(currentTime);
long timeDiff = currentTime - lastTime;

if (timeDiff > 0) {
// 计算任务处理速率
long currentTaskCount = statistics.getTaskCount();
long lastTaskCount = this.lastTaskCount.getAndSet(currentTaskCount);
long taskDiff = currentTaskCount - lastTaskCount;

double taskRate = (double) taskDiff * 1000 / timeDiff;
statistics.setTaskRate(taskRate);

// 计算完成速率
long currentCompletedCount = statistics.getCompletedTaskCount();
long lastCompletedCount = this.lastCompletedTaskCount.getAndSet(currentCompletedCount);
long completedDiff = currentCompletedCount - lastCompletedCount;

double completedRate = (double) completedDiff * 1000 / timeDiff;
statistics.setCompletedRate(completedRate);
}

} catch (Exception e) {
log.error("收集性能指标失败: poolName={}", poolName, e);
}
}

/**
* 检查健康状态
*/
public void checkHealthStatus() {
try {
healthStatus.setPoolName(poolName);
healthStatus.setCheckTime(System.currentTimeMillis());

// 检查各项健康指标
boolean isHealthy = true;
List<String> issues = new ArrayList<>();

// 1. 检查线程池是否关闭
if (statistics.isShutdown()) {
isHealthy = false;
issues.add("线程池已关闭");
}

// 2. 检查线程池利用率
if (statistics.getPoolUtilization() > 0.9) {
isHealthy = false;
issues.add("线程池利用率过高: " + String.format("%.2f%%", statistics.getPoolUtilization() * 100));
}

// 3. 检查队列使用率
if (statistics.getQueueUtilization() > 0.9) {
isHealthy = false;
issues.add("队列使用率过高: " + String.format("%.2f%%", statistics.getQueueUtilization() * 100));
}

// 4. 检查任务拒绝率
if (statistics.getRejectionRate() > 0.2) {
isHealthy = false;
issues.add("任务拒绝率过高: " + String.format("%.2f%%", statistics.getRejectionRate() * 100));
}

// 5. 检查失败率
if (statistics.getFailureRate() > 0.1) {
isHealthy = false;
issues.add("任务失败率过高: " + String.format("%.2f%%", statistics.getFailureRate() * 100));
}

// 设置健康状态
if (isHealthy) {
healthStatus.setStatus(HealthStatus.HEALTHY);
healthStatus.setMessage("线程池运行正常");
} else {
healthStatus.setStatus(HealthStatus.UNHEALTHY);
healthStatus.setMessage(String.join("; ", issues));
}

} catch (Exception e) {
log.error("检查健康状态失败: poolName={}", poolName, e);
healthStatus.setStatus(HealthStatus.UNKNOWN);
healthStatus.setMessage("健康检查异常: " + e.getMessage());
}
}

/**
* 计算利用率
*/
private void calculateUtilization() {
if (statistics.getMaximumPoolSize() > 0) {
double poolUtilization = (double) statistics.getActiveThreadCount() / statistics.getMaximumPoolSize();
statistics.setPoolUtilization(Math.min(poolUtilization, 1.0));
}

if (statistics.getQueueCapacity() > 0) {
double queueUtilization = (double) statistics.getQueueSize() /
(statistics.getQueueSize() + statistics.getQueueRemainingCapacity());
statistics.setQueueUtilization(Math.min(queueUtilization, 1.0));
}
}

/**
* 计算QPS
*/
private void calculateQPS() {
long currentTime = System.currentTimeMillis();
long lastTime = lastCollectTime.get();
long timeDiff = currentTime - lastTime;

if (timeDiff > 0) {
long currentCompletedCount = statistics.getCompletedTaskCount();
long lastCompletedCount = this.lastCompletedTaskCount.get();
long completedDiff = currentCompletedCount - lastCompletedCount;

double qps = (double) completedDiff * 1000 / timeDiff;
statistics.setQps(qps);
}
}

/**
* 计算拒绝率
*/
private void calculateRejectionRate() {
long totalTasks = statistics.getTotalTasks();
if (totalTasks > 0) {
long rejectedTasks = statistics.getRejectedTasks();
double rejectionRate = (double) rejectedTasks / totalTasks;
statistics.setRejectionRate(rejectionRate);
}
}

/**
* 初始化统计数据
*/
private void initializeStatistics() {
ThreadPoolStatus status = threadPool.getStatus();

lastTaskCount.set(status.getTaskCount());
lastCompletedTaskCount.set(status.getCompletedTaskCount());
lastRejectedTaskCount.set(status.getRejectedTasks());
}

/**
* 停止监控
*/
public void stop() {
// 清理资源
log.debug("停止监控线程池: poolName={}", poolName);
}

// getter方法
}

// 线程池统计信息
public class ThreadPoolStatistics {
private String poolName;
private int corePoolSize;
private int maximumPoolSize;
private int currentPoolSize;
private int activeThreadCount;
private int largestPoolSize;
private long taskCount;
private long completedTaskCount;
private int queueSize;
private int queueRemainingCapacity;
private long totalTasks;
private long completedTasks;
private long failedTasks;
private long rejectedTasks;
private double poolUtilization;
private double queueUtilization;
private double rejectionRate;
private double failureRate;
private double taskRate;
private double completedRate;
private double qps;
private double averageResponseTime;
private boolean isShutdown;
private boolean isTerminated;

// 构造函数和getter/setter方法
}

// 线程池健康状态
public class ThreadPoolHealthStatus {
private String poolName;
private HealthStatus status;
private String message;
private long checkTime;

public enum HealthStatus {
HEALTHY, // 健康
UNHEALTHY, // 不健康
UNKNOWN // 未知
}

// 构造函数和getter/setter方法
}

2.4 线程池动态调整器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
// 线程池动态调整器
@Service
@Slf4j
public class ThreadPoolDynamicAdjuster {

@Autowired
private ThreadPoolConfigManager configManager;

@Autowired
private ThreadPoolAlertService alertService;

private final Map<String, AdjustmentHistory> adjustmentHistories = new ConcurrentHashMap<>();

/**
* 调整线程池
*/
public boolean adjustThreadPool(ManagedThreadPool threadPool, ThreadPoolConfig newConfig) {
String poolName = threadPool.getPoolName();

try {
// 1. 获取当前配置
ThreadPoolConfig currentConfig = configManager.getConfig(poolName);
if (currentConfig == null) {
log.warn("当前配置不存在: poolName={}", poolName);
return false;
}

// 2. 检查是否需要调整
if (!needsAdjustment(currentConfig, newConfig)) {
log.debug("无需调整线程池: poolName={}", poolName);
return true;
}

// 3. 执行调整
boolean success = performAdjustment(threadPool, currentConfig, newConfig);

if (success) {
// 4. 记录调整历史
recordAdjustmentHistory(poolName, currentConfig, newConfig);

// 5. 发送调整通知
alertService.sendAdjustmentNotification(poolName, currentConfig, newConfig);

log.info("线程池调整成功: poolName={}", poolName);
}

return success;

} catch (Exception e) {
log.error("调整线程池失败: poolName={}", poolName, e);
return false;
}
}

/**
* 执行自动调优
*/
public void performAutoTuning(ManagedThreadPool threadPool) {
String poolName = threadPool.getPoolName();

try {
// 1. 获取当前统计信息
ThreadPoolStatistics statistics = threadPool.getStatistics();

// 2. 分析性能指标
TuningRecommendation recommendation = analyzePerformance(statistics);

if (recommendation != null && recommendation.isRecommended()) {
// 3. 获取当前配置
ThreadPoolConfig currentConfig = configManager.getConfig(poolName);

// 4. 生成新配置
ThreadPoolConfig newConfig = generateNewConfig(currentConfig, recommendation);

// 5. 执行调整
adjustThreadPool(threadPool, newConfig);

log.info("自动调优完成: poolName={}, recommendation={}", poolName, recommendation);
}

} catch (Exception e) {
log.error("自动调优失败: poolName={}", poolName, e);
}
}

/**
* 检查是否需要调整
*/
private boolean needsAdjustment(ThreadPoolConfig currentConfig, ThreadPoolConfig newConfig) {
return currentConfig.getCorePoolSize() != newConfig.getCorePoolSize() ||
currentConfig.getMaximumPoolSize() != newConfig.getMaximumPoolSize() ||
currentConfig.getKeepAliveTime() != newConfig.getKeepAliveTime() ||
currentConfig.getTimeUnit() != newConfig.getTimeUnit() ||
currentConfig.getQueueCapacity() != newConfig.getQueueCapacity() ||
currentConfig.getRejectedExecutionHandlerType() != newConfig.getRejectedExecutionHandlerType();
}

/**
* 执行调整
*/
private boolean performAdjustment(ManagedThreadPool threadPool, ThreadPoolConfig currentConfig,
ThreadPoolConfig newConfig) {
try {
ThreadPoolExecutor executor = threadPool.getExecutor();

// 1. 调整核心线程数
if (currentConfig.getCorePoolSize() != newConfig.getCorePoolSize()) {
executor.setCorePoolSize(newConfig.getCorePoolSize());
log.debug("调整核心线程数: {} -> {}", currentConfig.getCorePoolSize(), newConfig.getCorePoolSize());
}

// 2. 调整最大线程数
if (currentConfig.getMaximumPoolSize() != newConfig.getMaximumPoolSize()) {
executor.setMaximumPoolSize(newConfig.getMaximumPoolSize());
log.debug("调整最大线程数: {} -> {}", currentConfig.getMaximumPoolSize(), newConfig.getMaximumPoolSize());
}

// 3. 调整线程存活时间
if (currentConfig.getKeepAliveTime() != newConfig.getKeepAliveTime() ||
currentConfig.getTimeUnit() != newConfig.getTimeUnit()) {
executor.setKeepAliveTime(newConfig.getKeepAliveTime(), newConfig.getTimeUnit());
log.debug("调整线程存活时间: {} {} -> {} {}",
currentConfig.getKeepAliveTime(), currentConfig.getTimeUnit(),
newConfig.getKeepAliveTime(), newConfig.getTimeUnit());
}

// 4. 调整拒绝策略
if (currentConfig.getRejectedExecutionHandlerType() != newConfig.getRejectedExecutionHandlerType()) {
RejectedExecutionHandler newHandler = createRejectedExecutionHandler(newConfig);
executor.setRejectedExecutionHandler(newHandler);
log.debug("调整拒绝策略: {} -> {}",
currentConfig.getRejectedExecutionHandlerType(),
newConfig.getRejectedExecutionHandlerType());
}

return true;

} catch (Exception e) {
log.error("执行调整失败", e);
return false;
}
}

/**
* 分析性能
*/
private TuningRecommendation analyzePerformance(ThreadPoolStatistics statistics) {
TuningRecommendation recommendation = new TuningRecommendation();

// 1. 分析线程池利用率
if (statistics.getPoolUtilization() > 0.8) {
recommendation.setRecommended(true);
recommendation.addRecommendation("增加最大线程数");
recommendation.setRecommendedMaxPoolSize(statistics.getMaximumPoolSize() * 2);
}

// 2. 分析队列使用率
if (statistics.getQueueUtilization() > 0.8) {
recommendation.setRecommended(true);
recommendation.addRecommendation("增加队列容量");
recommendation.setRecommendedQueueCapacity(statistics.getQueueCapacity() * 2);
}

// 3. 分析任务拒绝率
if (statistics.getRejectionRate() > 0.1) {
recommendation.setRecommended(true);
recommendation.addRecommendation("调整拒绝策略或增加线程数");
}

// 4. 分析QPS
if (statistics.getQps() < 10 && statistics.getPoolUtilization() < 0.3) {
recommendation.setRecommended(true);
recommendation.addRecommendation("减少核心线程数");
recommendation.setRecommendedCorePoolSize(Math.max(1, statistics.getCorePoolSize() / 2));
}

return recommendation;
}

/**
* 生成新配置
*/
private ThreadPoolConfig generateNewConfig(ThreadPoolConfig currentConfig, TuningRecommendation recommendation) {
ThreadPoolConfig newConfig = new ThreadPoolConfig();
newConfig.setPoolName(currentConfig.getPoolName());
newConfig.setCorePoolSize(recommendation.getRecommendedCorePoolSize() != null ?
recommendation.getRecommendedCorePoolSize() : currentConfig.getCorePoolSize());
newConfig.setMaximumPoolSize(recommendation.getRecommendedMaxPoolSize() != null ?
recommendation.getRecommendedMaxPoolSize() : currentConfig.getMaximumPoolSize());
newConfig.setKeepAliveTime(currentConfig.getKeepAliveTime());
newConfig.setTimeUnit(currentConfig.getTimeUnit());
newConfig.setQueueType(currentConfig.getQueueType());
newConfig.setQueueCapacity(recommendation.getRecommendedQueueCapacity() != null ?
recommendation.getRecommendedQueueCapacity() : currentConfig.getQueueCapacity());
newConfig.setRejectedExecutionHandlerType(currentConfig.getRejectedExecutionHandlerType());
newConfig.setCustomRejectedExecutionHandler(currentConfig.getCustomRejectedExecutionHandler());
newConfig.setAllowCoreThreadTimeOut(currentConfig.isAllowCoreThreadTimeOut());
newConfig.setDescription("自动调优生成");

return newConfig;
}

/**
* 创建拒绝策略
*/
private RejectedExecutionHandler createRejectedExecutionHandler(ThreadPoolConfig config) {
switch (config.getRejectedExecutionHandlerType()) {
case ABORT_POLICY:
return new ThreadPoolExecutor.AbortPolicy();
case CALLER_RUNS_POLICY:
return new ThreadPoolExecutor.CallerRunsPolicy();
case DISCARD_POLICY:
return new ThreadPoolExecutor.DiscardPolicy();
case DISCARD_OLDEST_POLICY:
return new ThreadPoolExecutor.DiscardOldestPolicy();
default:
return new ThreadPoolExecutor.CallerRunsPolicy();
}
}

/**
* 记录调整历史
*/
private void recordAdjustmentHistory(String poolName, ThreadPoolConfig oldConfig, ThreadPoolConfig newConfig) {
AdjustmentHistory history = adjustmentHistories.computeIfAbsent(poolName, k -> new AdjustmentHistory());

AdjustmentRecord record = new AdjustmentRecord();
record.setPoolName(poolName);
record.setOldConfig(oldConfig);
record.setNewConfig(newConfig);
record.setAdjustTime(System.currentTimeMillis());
record.setReason("动态调整");

history.addRecord(record);

// 只保留最近100条记录
if (history.getRecords().size() > 100) {
history.getRecords().remove(0);
}
}

/**
* 获取调整历史
*/
public List<AdjustmentRecord> getAdjustmentHistory(String poolName) {
AdjustmentHistory history = adjustmentHistories.get(poolName);
return history != null ? new ArrayList<>(history.getRecords()) : new ArrayList<>();
}
}

// 调优建议
public class TuningRecommendation {
private boolean recommended;
private List<String> recommendations;
private Integer recommendedCorePoolSize;
private Integer recommendedMaxPoolSize;
private Integer recommendedQueueCapacity;

public TuningRecommendation() {
this.recommended = false;
this.recommendations = new ArrayList<>();
}

public void addRecommendation(String recommendation) {
this.recommendations.add(recommendation);
}

// 构造函数和getter/setter方法
}

// 调整历史
public class AdjustmentHistory {
private final List<AdjustmentRecord> records = new ArrayList<>();

public void addRecord(AdjustmentRecord record) {
records.add(record);
}

// getter方法
}

// 调整记录
public class AdjustmentRecord {
private String poolName;
private ThreadPoolConfig oldConfig;
private ThreadPoolConfig newConfig;
private long adjustTime;
private String reason;

// 构造函数和getter/setter方法
}

三、线程池管理器控制器

3.1 线程池管理器控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
// 线程池管理器控制器
@RestController
@RequestMapping("/api/thread-pool")
@Slf4j
public class ThreadPoolManagerController {

@Autowired
private ThreadPoolManager threadPoolManager;

@Autowired
private ThreadPoolConfigManager configManager;

@Autowired
private ThreadPoolMonitorService monitorService;

@Autowired
private ThreadPoolDynamicAdjuster dynamicAdjuster;

/**
* 创建线程池
*/
@PostMapping("/create")
public ResponseEntity<ManagedThreadPool> createThreadPool(@RequestBody ThreadPoolConfig config) {
try {
// 1. 验证配置
validateThreadPoolConfig(config);

// 2. 创建线程池
ManagedThreadPool threadPool = threadPoolManager.createThreadPool(config.getPoolName(), config);

return ResponseEntity.ok(threadPool);

} catch (Exception e) {
log.error("创建线程池失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取线程池
*/
@GetMapping("/{poolName}")
public ResponseEntity<ManagedThreadPool> getThreadPool(@PathVariable String poolName) {
try {
ManagedThreadPool threadPool = threadPoolManager.getThreadPool(poolName);

if (threadPool != null) {
return ResponseEntity.ok(threadPool);
} else {
return ResponseEntity.notFound().build();
}

} catch (Exception e) {
log.error("获取线程池失败: poolName={}", poolName, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 调整线程池配置
*/
@PutMapping("/{poolName}/adjust")
public ResponseEntity<Void> adjustThreadPoolConfig(@PathVariable String poolName,
@RequestBody ThreadPoolConfig newConfig) {
try {
// 1. 设置线程池名称
newConfig.setPoolName(poolName);

// 2. 验证配置
validateThreadPoolConfig(newConfig);

// 3. 调整配置
boolean success = threadPoolManager.adjustThreadPoolConfig(poolName, newConfig);

if (success) {
return ResponseEntity.ok().build();
} else {
return ResponseEntity.notFound().build();
}

} catch (Exception e) {
log.error("调整线程池配置失败: poolName={}", poolName, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 销毁线程池
*/
@DeleteMapping("/{poolName}")
public ResponseEntity<Void> destroyThreadPool(@PathVariable String poolName) {
try {
boolean success = threadPoolManager.destroyThreadPool(poolName);

if (success) {
return ResponseEntity.ok().build();
} else {
return ResponseEntity.notFound().build();
}

} catch (Exception e) {
log.error("销毁线程池失败: poolName={}", poolName, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取所有线程池状态
*/
@GetMapping("/status")
public ResponseEntity<Map<String, ThreadPoolStatus>> getAllThreadPoolStatus() {
try {
Map<String, ThreadPoolStatus> statusMap = threadPoolManager.getAllThreadPoolStatus();
return ResponseEntity.ok(statusMap);
} catch (Exception e) {
log.error("获取所有线程池状态失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取线程池状态
*/
@GetMapping("/{poolName}/status")
public ResponseEntity<ThreadPoolStatus> getThreadPoolStatus(@PathVariable String poolName) {
try {
ManagedThreadPool threadPool = threadPoolManager.getThreadPool(poolName);

if (threadPool != null) {
ThreadPoolStatus status = threadPool.getStatus();
return ResponseEntity.ok(status);
} else {
return ResponseEntity.notFound().build();
}

} catch (Exception e) {
log.error("获取线程池状态失败: poolName={}", poolName, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取所有线程池统计信息
*/
@GetMapping("/statistics")
public ResponseEntity<Map<String, ThreadPoolStatistics>> getAllThreadPoolStatistics() {
try {
Map<String, ThreadPoolStatistics> statisticsMap = threadPoolManager.getAllThreadPoolStatistics();
return ResponseEntity.ok(statisticsMap);
} catch (Exception e) {
log.error("获取所有线程池统计信息失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取线程池统计信息
*/
@GetMapping("/{poolName}/statistics")
public ResponseEntity<ThreadPoolStatistics> getThreadPoolStatistics(@PathVariable String poolName) {
try {
ThreadPoolStatistics statistics = monitorService.getStatistics(poolName);

if (statistics != null) {
return ResponseEntity.ok(statistics);
} else {
return ResponseEntity.notFound().build();
}

} catch (Exception e) {
log.error("获取线程池统计信息失败: poolName={}", poolName, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取线程池健康状态
*/
@GetMapping("/{poolName}/health")
public ResponseEntity<ThreadPoolHealthStatus> getThreadPoolHealth(@PathVariable String poolName) {
try {
ThreadPoolHealthStatus healthStatus = monitorService.getHealthStatus(poolName);

if (healthStatus != null) {
return ResponseEntity.ok(healthStatus);
} else {
return ResponseEntity.notFound().build();
}

} catch (Exception e) {
log.error("获取线程池健康状态失败: poolName={}", poolName, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 执行任务
*/
@PostMapping("/{poolName}/execute")
public ResponseEntity<CompletableFuture<Void>> executeTask(@PathVariable String poolName,
@RequestBody TaskRequest request) {
try {
CompletableFuture<Void> future = threadPoolManager.executeTask(poolName, () -> {
// 执行任务逻辑
log.info("执行任务: poolName={}, taskId={}", poolName, request.getTaskId());
});

return ResponseEntity.ok(future);

} catch (Exception e) {
log.error("执行任务失败: poolName={}", poolName, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 提交任务
*/
@PostMapping("/{poolName}/submit")
public ResponseEntity<CompletableFuture<String>> submitTask(@PathVariable String poolName,
@RequestBody TaskRequest request) {
try {
CompletableFuture<String> future = threadPoolManager.submitTask(poolName, () -> {
// 执行任务逻辑
log.info("提交任务: poolName={}, taskId={}", poolName, request.getTaskId());
return "Task completed: " + request.getTaskId();
});

return ResponseEntity.ok(future);

} catch (Exception e) {
log.error("提交任务失败: poolName={}", poolName, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取调整历史
*/
@GetMapping("/{poolName}/adjustment-history")
public ResponseEntity<List<AdjustmentRecord>> getAdjustmentHistory(@PathVariable String poolName) {
try {
List<AdjustmentRecord> history = dynamicAdjuster.getAdjustmentHistory(poolName);
return ResponseEntity.ok(history);
} catch (Exception e) {
log.error("获取调整历史失败: poolName={}", poolName, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 保存线程池配置
*/
@PostMapping("/config")
public ResponseEntity<ThreadPoolConfig> saveConfig(@RequestBody ThreadPoolConfig config) {
try {
ThreadPoolConfig savedConfig = configManager.saveConfig(config);
return ResponseEntity.ok(savedConfig);
} catch (Exception e) {
log.error("保存线程池配置失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取线程池配置
*/
@GetMapping("/config/{poolName}")
public ResponseEntity<ThreadPoolConfig> getConfig(@PathVariable String poolName) {
try {
ThreadPoolConfig config = configManager.getConfig(poolName);

if (config != null) {
return ResponseEntity.ok(config);
} else {
return ResponseEntity.notFound().build();
}

} catch (Exception e) {
log.error("获取线程池配置失败: poolName={}", poolName, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取所有线程池配置
*/
@GetMapping("/config")
public ResponseEntity<List<ThreadPoolConfig>> getAllConfigs() {
try {
List<ThreadPoolConfig> configs = configManager.loadAllConfigs();
return ResponseEntity.ok(configs);
} catch (Exception e) {
log.error("获取所有线程池配置失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 删除线程池配置
*/
@DeleteMapping("/config/{poolName}")
public ResponseEntity<Void> deleteConfig(@PathVariable String poolName) {
try {
boolean success = configManager.deleteConfig(poolName);

if (success) {
return ResponseEntity.ok().build();
} else {
return ResponseEntity.notFound().build();
}

} catch (Exception e) {
log.error("删除线程池配置失败: poolName={}", poolName, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 复制线程池配置
*/
@PostMapping("/config/{sourcePoolName}/copy")
public ResponseEntity<ThreadPoolConfig> copyConfig(@PathVariable String sourcePoolName,
@RequestParam String targetPoolName) {
try {
ThreadPoolConfig copiedConfig = configManager.copyConfig(sourcePoolName, targetPoolName);
return ResponseEntity.ok(copiedConfig);
} catch (Exception e) {
log.error("复制线程池配置失败: sourcePoolName={}, targetPoolName={}",
sourcePoolName, targetPoolName, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 验证线程池配置
*/
private void validateThreadPoolConfig(ThreadPoolConfig config) {
if (config == null) {
throw new IllegalArgumentException("线程池配置不能为空");
}

if (config.getPoolName() == null || config.getPoolName().trim().isEmpty()) {
throw new IllegalArgumentException("线程池名称不能为空");
}

if (config.getCorePoolSize() < 0) {
throw new IllegalArgumentException("核心线程数不能小于0");
}

if (config.getMaximumPoolSize() <= 0) {
throw new IllegalArgumentException("最大线程数必须大于0");
}

if (config.getCorePoolSize() > config.getMaximumPoolSize()) {
throw new IllegalArgumentException("核心线程数不能大于最大线程数");
}

if (config.getKeepAliveTime() < 0) {
throw new IllegalArgumentException("线程存活时间不能小于0");
}

if (config.getQueueCapacity() <= 0) {
throw new IllegalArgumentException("队列容量必须大于0");
}
}
}

// 任务请求
public class TaskRequest {
private String taskId;
private String taskName;
private Map<String, Object> parameters;

// 构造函数和getter/setter方法
}

四、最佳实践与总结

4.1 SpringBoot线程池管理器最佳实践

4.1.1 配置管理策略

  • 动态配置:支持运行时动态调整线程池参数
  • 配置验证:智能验证配置参数的有效性
  • 配置缓存:使用Redis缓存配置提高性能
  • 配置版本管理:支持配置版本管理和回滚

4.1.2 监控告警策略

  • 实时监控:实时监控线程池性能指标
  • 健康检查:定期检查线程池健康状态
  • 异常告警:线程池异常时的及时告警
  • 性能分析:分析线程池性能趋势

4.1.3 动态调整策略

  • 自动调优:基于性能指标自动调整参数
  • 手动调整:支持手动调整线程池参数
  • 调整历史:记录调整历史和原因
  • 回滚机制:支持配置回滚到历史版本

4.1.4 资源管控策略

  • 资源隔离:不同业务使用独立的线程池
  • 负载均衡:智能分配任务到合适的线程池
  • 熔断降级:线程池过载时的自动保护
  • 弹性伸缩:根据负载自动调整线程池大小

4.2 架构演进建议

4.2.1 微服务架构演进

  • 服务拆分:将线程池管理服务拆分为独立微服务
  • 服务治理:实现服务的注册发现、负载均衡
  • 配置中心:使用配置中心统一管理线程池配置
  • 容错处理:实现熔断、降级、重试机制

4.2.2 云原生架构演进

  • 容器化部署:使用Docker等容器技术部署
  • 弹性伸缩:实现基于负载的自动扩缩容
  • 服务网格:使用Istio等服务网格技术
  • 云原生监控:集成云原生的监控和告警

4.2.3 智能化管理

  • AI驱动优化:使用机器学习优化线程池参数
  • 预测性调整:预测负载变化并提前调整
  • 智能告警:基于AI的智能告警和故障诊断
  • 自适应管理:根据业务特征自适应管理策略

4.3 总结

SpringBoot线程池管理器是企业级应用性能优化的重要工具,通过动态配置,智能监控,自动调优,可以实现稳定、高效的线程池管理解决方案。随着微服务架构和云原生技术的普及,线程池管理将更加智能化和自动化。

在未来的发展中,企业需要持续关注技术发展趋势,不断优化和完善线程池管理策略,以适应不断变化的业务需求和技术环境。通过本文的深入分析和实践指导,希望能够为企业构建高质量的线程池管理解决方案提供有价值的参考和帮助,推动企业级应用在并发处理场景下的稳定运行和持续发展。