1. ExecutorService概述

ExecutorService是Java并发包中用于管理线程池的核心接口,提供了异步执行任务的能力。本文将详细介绍ExecutorService的使用方法,包括线程池配置、任务调度、异步处理、性能优化的完整解决方案。

1.1 核心功能

  1. 线程池管理: 创建和管理线程池
  2. 任务调度: 提交和执行异步任务
  3. 生命周期管理: 启动、关闭线程池
  4. 结果获取: 获取任务执行结果
  5. 异常处理: 处理任务执行异常

1.2 技术架构

1
2
3
任务提交 → ExecutorService → 线程池 → 任务执行 → 结果返回
↓ ↓ ↓ ↓ ↓
Future/Callable → 队列管理 → 线程调度 → 异常处理 → 回调通知

2. Maven依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<!-- pom.xml -->
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.executor</groupId>
<artifactId>executor-service-demo</artifactId>
<version>1.0.0</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
</parent>

<properties>
<java.version>11</java.version>
</properties>

<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!-- Commons工具 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<!-- Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>

<!-- Micrometer(监控指标) -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

3. 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# application.yml
server:
port: 8080

spring:
application:
name: executor-service-demo

# 线程池配置
executor:
# 核心线程池配置
core-pool:
core-size: 5
max-size: 20
keep-alive-time: 60
queue-capacity: 100
thread-name-prefix: "core-pool-"

# IO密集型线程池配置
io-pool:
core-size: 10
max-size: 50
keep-alive-time: 30
queue-capacity: 200
thread-name-prefix: "io-pool-"

# CPU密集型线程池配置
cpu-pool:
core-size: 4
max-size: 8
keep-alive-time: 120
queue-capacity: 50
thread-name-prefix: "cpu-pool-"

# 定时任务线程池配置
scheduled-pool:
core-size: 3
thread-name-prefix: "scheduled-pool-"

# 监控配置
management:
endpoints:
web:
exposure:
include: "*"
metrics:
export:
prometheus:
enabled: true

4. 线程池配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package com.executor.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
* 线程池配置类
* @author Java实战
*/
@Data
@Configuration
@EnableAsync
@ConfigurationProperties(prefix = "executor")
public class ExecutorConfig {

/**
* 核心线程池配置
*/
private PoolConfig corePool;

/**
* IO密集型线程池配置
*/
private PoolConfig ioPool;

/**
* CPU密集型线程池配置
*/
private PoolConfig cpuPool;

/**
* 定时任务线程池配置
*/
private ScheduledPoolConfig scheduledPool;

/**
* 核心线程池
*/
@Bean("coreExecutor")
public Executor coreExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePool.getCoreSize());
executor.setMaxPoolSize(corePool.getMaxSize());
executor.setQueueCapacity(corePool.getQueueCapacity());
executor.setKeepAliveSeconds(corePool.getKeepAliveTime());
executor.setThreadNamePrefix(corePool.getThreadNamePrefix());

// 拒绝策略:调用者运行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

// 等待任务完成后关闭
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);

executor.initialize();
return executor;
}

/**
* IO密集型线程池
*/
@Bean("ioExecutor")
public Executor ioExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(ioPool.getCoreSize());
executor.setMaxPoolSize(ioPool.getMaxSize());
executor.setQueueCapacity(ioPool.getQueueCapacity());
executor.setKeepAliveSeconds(ioPool.getKeepAliveTime());
executor.setThreadNamePrefix(ioPool.getThreadNamePrefix());

// 拒绝策略:丢弃最老的任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);

executor.initialize();
return executor;
}

/**
* CPU密集型线程池
*/
@Bean("cpuExecutor")
public Executor cpuExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(cpuPool.getCoreSize());
executor.setMaxPoolSize(cpuPool.getMaxSize());
executor.setQueueCapacity(cpuPool.getQueueCapacity());
executor.setKeepAliveSeconds(cpuPool.getKeepAliveTime());
executor.setThreadNamePrefix(cpuPool.getThreadNamePrefix());

// 拒绝策略:丢弃任务并抛出异常
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);

executor.initialize();
return executor;
}

/**
* 定时任务线程池
*/
@Bean("scheduledExecutor")
public Executor scheduledExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(scheduledPool.getCoreSize());
executor.setThreadNamePrefix(scheduledPool.getThreadNamePrefix());

// 拒绝策略:调用者运行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);

executor.initialize();
return executor;
}

@Data
public static class PoolConfig {
private int coreSize;
private int maxSize;
private int keepAliveTime;
private int queueCapacity;
private String threadNamePrefix;
}

@Data
public static class ScheduledPoolConfig {
private int coreSize;
private String threadNamePrefix;
}
}

5. 异步任务服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package com.executor.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;

/**
* 异步任务服务
* @author Java实战
*/
@Slf4j
@Service
public class AsyncTaskService {

@Autowired
@Qualifier("coreExecutor")
private Executor coreExecutor;

@Autowired
@Qualifier("ioExecutor")
private Executor ioExecutor;

@Autowired
@Qualifier("cpuExecutor")
private Executor cpuExecutor;

/**
* 异步执行简单任务
*/
@Async("coreExecutor")
public CompletableFuture<String> executeSimpleTask(String taskName) {
try {
log.info("开始执行简单任务: {}", taskName);

// 模拟任务执行
Thread.sleep(1000);

String result = "任务完成: " + taskName;
log.info("简单任务执行完成: {}", result);

return CompletableFuture.completedFuture(result);

} catch (InterruptedException e) {
log.error("简单任务执行被中断: {}", taskName, e);
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
} catch (Exception e) {
log.error("简单任务执行失败: {}", taskName, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 异步执行IO密集型任务
*/
@Async("ioExecutor")
public CompletableFuture<String> executeIOTask(String taskName) {
try {
log.info("开始执行IO任务: {}", taskName);

// 模拟IO操作
Thread.sleep(2000);

String result = "IO任务完成: " + taskName;
log.info("IO任务执行完成: {}", result);

return CompletableFuture.completedFuture(result);

} catch (InterruptedException e) {
log.error("IO任务执行被中断: {}", taskName, e);
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
} catch (Exception e) {
log.error("IO任务执行失败: {}", taskName, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 异步执行CPU密集型任务
*/
@Async("cpuExecutor")
public CompletableFuture<Integer> executeCPUTask(String taskName) {
try {
log.info("开始执行CPU任务: {}", taskName);

// 模拟CPU密集型计算
int result = 0;
for (int i = 0; i < 1000000; i++) {
result += ThreadLocalRandom.current().nextInt(100);
}

log.info("CPU任务执行完成: {}, 结果: {}", taskName, result);
return CompletableFuture.completedFuture(result);

} catch (Exception e) {
log.error("CPU任务执行失败: {}", taskName, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 异步执行带参数的任务
*/
@Async("coreExecutor")
public CompletableFuture<String> executeTaskWithParams(String taskName, int delay, String data) {
try {
log.info("开始执行带参数任务: {}, 延迟: {}ms, 数据: {}", taskName, delay, data);

// 模拟任务执行
Thread.sleep(delay);

String result = String.format("任务完成: %s, 处理数据: %s", taskName, data);
log.info("带参数任务执行完成: {}", result);

return CompletableFuture.completedFuture(result);

} catch (InterruptedException e) {
log.error("带参数任务执行被中断: {}", taskName, e);
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
} catch (Exception e) {
log.error("带参数任务执行失败: {}", taskName, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 异步执行可能失败的任务
*/
@Async("coreExecutor")
public CompletableFuture<String> executeTaskWithException(String taskName, boolean shouldFail) {
try {
log.info("开始执行可能失败的任务: {}", taskName);

// 模拟任务执行
Thread.sleep(1000);

if (shouldFail) {
throw new RuntimeException("模拟任务执行失败");
}

String result = "任务成功完成: " + taskName;
log.info("可能失败的任务执行完成: {}", result);

return CompletableFuture.completedFuture(result);

} catch (Exception e) {
log.error("可能失败的任务执行失败: {}", taskName, e);
return CompletableFuture.failedFuture(e);
}
}
}

6. 任务管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package com.executor.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

/**
* 任务管理服务
* @author Java实战
*/
@Slf4j
@Service
public class TaskManagementService {

@Autowired
@Qualifier("coreExecutor")
private Executor coreExecutor;

/**
* 批量执行任务
*/
public List<String> executeBatchTasks(List<String> taskNames) {
log.info("开始批量执行任务,任务数量: {}", taskNames.size());

List<CompletableFuture<String>> futures = taskNames.stream()
.map(taskName -> CompletableFuture.supplyAsync(() -> {
try {
log.info("执行任务: {}", taskName);
Thread.sleep(1000); // 模拟任务执行
return "任务完成: " + taskName;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务被中断: " + taskName, e);
}
}, coreExecutor))
.collect(Collectors.toList());

// 等待所有任务完成
CompletableFuture<Void> allTasks = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);

try {
allTasks.get(30, TimeUnit.SECONDS); // 30秒超时

List<String> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

log.info("批量任务执行完成,成功数量: {}", results.size());
return results;

} catch (TimeoutException e) {
log.error("批量任务执行超时", e);
throw new RuntimeException("批量任务执行超时", e);
} catch (Exception e) {
log.error("批量任务执行失败", e);
throw new RuntimeException("批量任务执行失败", e);
}
}

/**
* 执行任务并获取结果
*/
public String executeTaskWithResult(String taskName) {
log.info("开始执行任务并获取结果: {}", taskName);

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
log.info("执行任务: {}", taskName);
Thread.sleep(2000); // 模拟任务执行
return "任务结果: " + taskName;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务被中断: " + taskName, e);
}
}, coreExecutor);

try {
String result = future.get(10, TimeUnit.SECONDS);
log.info("任务执行完成,结果: {}", result);
return result;

} catch (TimeoutException e) {
log.error("任务执行超时: {}", taskName, e);
throw new RuntimeException("任务执行超时: " + taskName, e);
} catch (Exception e) {
log.error("任务执行失败: {}", taskName, e);
throw new RuntimeException("任务执行失败: " + taskName, e);
}
}

/**
* 执行任务并处理异常
*/
public String executeTaskWithExceptionHandling(String taskName, boolean shouldFail) {
log.info("开始执行任务并处理异常: {}", taskName);

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
log.info("执行任务: {}", taskName);
Thread.sleep(1000);

if (shouldFail) {
throw new RuntimeException("模拟任务失败");
}

return "任务成功: " + taskName;

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务被中断: " + taskName, e);
}
}, coreExecutor);

// 处理异常
CompletableFuture<String> handledFuture = future.handle((result, throwable) -> {
if (throwable != null) {
log.error("任务执行异常: {}", taskName, throwable);
return "任务失败: " + taskName + ", 原因: " + throwable.getMessage();
}
return result;
});

try {
String result = handledFuture.get(10, TimeUnit.SECONDS);
log.info("任务处理完成: {}", result);
return result;

} catch (Exception e) {
log.error("任务处理失败: {}", taskName, e);
throw new RuntimeException("任务处理失败: " + taskName, e);
}
}

/**
* 执行任务链
*/
public String executeTaskChain(String initialTask) {
log.info("开始执行任务链: {}", initialTask);

CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
log.info("执行第一个任务: {}", initialTask);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "第一步完成: " + initialTask;
}, coreExecutor)
.thenApplyAsync(result -> {
log.info("执行第二个任务,基于: {}", result);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return result + " -> 第二步完成";
}, coreExecutor)
.thenApplyAsync(result -> {
log.info("执行第三个任务,基于: {}", result);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return result + " -> 第三步完成";
}, coreExecutor);

try {
String result = future.get(30, TimeUnit.SECONDS);
log.info("任务链执行完成: {}", result);
return result;

} catch (Exception e) {
log.error("任务链执行失败: {}", initialTask, e);
throw new RuntimeException("任务链执行失败: " + initialTask, e);
}
}

/**
* 执行并行任务
*/
public List<String> executeParallelTasks(List<String> taskNames) {
log.info("开始执行并行任务,任务数量: {}", taskNames.size());

List<CompletableFuture<String>> futures = taskNames.stream()
.map(taskName -> CompletableFuture.supplyAsync(() -> {
try {
log.info("并行执行任务: {}", taskName);
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 3000));
return "并行任务完成: " + taskName;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务被中断: " + taskName, e);
}
}, coreExecutor))
.collect(Collectors.toList());

// 等待所有任务完成
CompletableFuture<Void> allTasks = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);

try {
allTasks.get(60, TimeUnit.SECONDS);

List<String> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

log.info("并行任务执行完成,成功数量: {}", results.size());
return results;

} catch (Exception e) {
log.error("并行任务执行失败", e);
throw new RuntimeException("并行任务执行失败", e);
}
}
}

7. 定时任务服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.executor.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* 定时任务服务
* @author Java实战
*/
@Slf4j
@Service
public class ScheduledTaskService {

@Autowired
@Qualifier("scheduledExecutor")
private Executor scheduledExecutor;

/**
* 定时执行任务(每5秒)
*/
@Scheduled(fixedRate = 5000)
public void scheduledTask() {
log.info("开始执行定时任务,时间: {}", LocalDateTime.now());

CompletableFuture.runAsync(() -> {
try {
log.info("定时任务执行中...");
Thread.sleep(2000); // 模拟任务执行
log.info("定时任务执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("定时任务被中断", e);
}
}, scheduledExecutor);
}

/**
* 延迟执行任务(启动后10秒开始,每30秒执行一次)
*/
@Scheduled(initialDelay = 10000, fixedDelay = 30000)
public void delayedScheduledTask() {
log.info("开始执行延迟定时任务,时间: {}", LocalDateTime.now());

CompletableFuture.runAsync(() -> {
try {
log.info("延迟定时任务执行中...");
Thread.sleep(3000); // 模拟任务执行
log.info("延迟定时任务执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("延迟定时任务被中断", e);
}
}, scheduledExecutor);
}

/**
* Cron表达式定时任务(每天上午10点执行)
*/
@Scheduled(cron = "0 0 10 * * ?")
public void cronScheduledTask() {
log.info("开始执行Cron定时任务,时间: {}", LocalDateTime.now());

CompletableFuture.runAsync(() -> {
try {
log.info("Cron定时任务执行中...");
Thread.sleep(5000); // 模拟任务执行
log.info("Cron定时任务执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Cron定时任务被中断", e);
}
}, scheduledExecutor);
}

/**
* 手动触发定时任务
*/
public void triggerManualTask(String taskName) {
log.info("手动触发任务: {}", taskName);

CompletableFuture.runAsync(() -> {
try {
log.info("手动任务执行中: {}", taskName);
Thread.sleep(2000);
log.info("手动任务执行完成: {}", taskName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("手动任务被中断: {}", taskName, e);
}
}, scheduledExecutor);
}
}

8. 线程池监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package com.executor.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.util.concurrent.ThreadPoolExecutor;

/**
* 线程池监控服务
* @author Java实战
*/
@Slf4j
@Service
public class ThreadPoolMonitorService {

@Autowired
@Qualifier("coreExecutor")
private ThreadPoolTaskExecutor coreExecutor;

@Autowired
@Qualifier("ioExecutor")
private ThreadPoolTaskExecutor ioExecutor;

@Autowired
@Qualifier("cpuExecutor")
private ThreadPoolTaskExecutor cpuExecutor;

/**
* 监控线程池状态(每30秒)
*/
@Scheduled(fixedRate = 30000)
public void monitorThreadPools() {
log.info("=== 线程池监控报告 ===");

// 监控核心线程池
monitorThreadPool("核心线程池", coreExecutor);

// 监控IO线程池
monitorThreadPool("IO线程池", ioExecutor);

// 监控CPU线程池
monitorThreadPool("CPU线程池", cpuExecutor);

log.info("=== 监控报告结束 ===");
}

/**
* 监控单个线程池
*/
private void monitorThreadPool(String poolName, ThreadPoolTaskExecutor executor) {
ThreadPoolExecutor threadPoolExecutor = executor.getThreadPoolExecutor();

int corePoolSize = threadPoolExecutor.getCorePoolSize();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
int activeCount = threadPoolExecutor.getActiveCount();
long taskCount = threadPoolExecutor.getTaskCount();
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
int queueSize = threadPoolExecutor.getQueue().size();

log.info("{} - 核心线程数: {}, 最大线程数: {}, 活跃线程数: {}, " +
"总任务数: {}, 已完成任务数: {}, 队列大小: {}",
poolName, corePoolSize, maximumPoolSize, activeCount,
taskCount, completedTaskCount, queueSize);

// 检查线程池健康状态
if (activeCount >= maximumPoolSize) {
log.warn("{} 线程池已满,可能需要调整配置", poolName);
}

if (queueSize > 50) {
log.warn("{} 队列积压严重,队列大小: {}", poolName, queueSize);
}
}

/**
* 获取线程池状态信息
*/
public ThreadPoolStatus getThreadPoolStatus(String poolName) {
ThreadPoolTaskExecutor executor = getExecutorByName(poolName);
if (executor == null) {
return null;
}

ThreadPoolExecutor threadPoolExecutor = executor.getThreadPoolExecutor();

return ThreadPoolStatus.builder()
.poolName(poolName)
.corePoolSize(threadPoolExecutor.getCorePoolSize())
.maximumPoolSize(threadPoolExecutor.getMaximumPoolSize())
.activeCount(threadPoolExecutor.getActiveCount())
.taskCount(threadPoolExecutor.getTaskCount())
.completedTaskCount(threadPoolExecutor.getCompletedTaskCount())
.queueSize(threadPoolExecutor.getQueue().size())
.build();
}

/**
* 根据名称获取线程池
*/
private ThreadPoolTaskExecutor getExecutorByName(String poolName) {
switch (poolName.toLowerCase()) {
case "core":
case "coreExecutor":
return coreExecutor;
case "io":
case "ioExecutor":
return ioExecutor;
case "cpu":
case "cpuExecutor":
return cpuExecutor;
default:
return null;
}
}

/**
* 线程池状态信息
*/
@lombok.Data
@lombok.Builder
public static class ThreadPoolStatus {
private String poolName;
private int corePoolSize;
private int maximumPoolSize;
private int activeCount;
private long taskCount;
private long completedTaskCount;
private int queueSize;
}
}

9. 控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package com.executor.controller;

import com.executor.service.AsyncTaskService;
import com.executor.service.ScheduledTaskService;
import com.executor.service.TaskManagementService;
import com.executor.service.ThreadPoolMonitorService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* ExecutorService控制器
* @author Java实战
*/
@Slf4j
@RestController
@RequestMapping("/api/executor")
public class ExecutorController {

@Autowired
private AsyncTaskService asyncTaskService;

@Autowired
private TaskManagementService taskManagementService;

@Autowired
private ScheduledTaskService scheduledTaskService;

@Autowired
private ThreadPoolMonitorService threadPoolMonitorService;

/**
* 执行简单异步任务
*/
@PostMapping("/task/simple")
public ResponseEntity<CompletableFuture<String>> executeSimpleTask(@RequestParam String taskName) {
try {
CompletableFuture<String> future = asyncTaskService.executeSimpleTask(taskName);
return ResponseEntity.ok(future);
} catch (Exception e) {
log.error("执行简单任务失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 执行IO密集型任务
*/
@PostMapping("/task/io")
public ResponseEntity<CompletableFuture<String>> executeIOTask(@RequestParam String taskName) {
try {
CompletableFuture<String> future = asyncTaskService.executeIOTask(taskName);
return ResponseEntity.ok(future);
} catch (Exception e) {
log.error("执行IO任务失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 执行CPU密集型任务
*/
@PostMapping("/task/cpu")
public ResponseEntity<CompletableFuture<Integer>> executeCPUTask(@RequestParam String taskName) {
try {
CompletableFuture<Integer> future = asyncTaskService.executeCPUTask(taskName);
return ResponseEntity.ok(future);
} catch (Exception e) {
log.error("执行CPU任务失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 批量执行任务
*/
@PostMapping("/task/batch")
public ResponseEntity<List<String>> executeBatchTasks(@RequestBody List<String> taskNames) {
try {
List<String> results = taskManagementService.executeBatchTasks(taskNames);
return ResponseEntity.ok(results);
} catch (Exception e) {
log.error("批量执行任务失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 执行任务链
*/
@PostMapping("/task/chain")
public ResponseEntity<String> executeTaskChain(@RequestParam String initialTask) {
try {
String result = taskManagementService.executeTaskChain(initialTask);
return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("执行任务链失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 执行并行任务
*/
@PostMapping("/task/parallel")
public ResponseEntity<List<String>> executeParallelTasks(@RequestBody List<String> taskNames) {
try {
List<String> results = taskManagementService.executeParallelTasks(taskNames);
return ResponseEntity.ok(results);
} catch (Exception e) {
log.error("执行并行任务失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 手动触发定时任务
*/
@PostMapping("/scheduled/trigger")
public ResponseEntity<String> triggerScheduledTask(@RequestParam String taskName) {
try {
scheduledTaskService.triggerManualTask(taskName);
return ResponseEntity.ok("定时任务已触发: " + taskName);
} catch (Exception e) {
log.error("触发定时任务失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 获取线程池状态
*/
@GetMapping("/monitor/status/{poolName}")
public ResponseEntity<ThreadPoolMonitorService.ThreadPoolStatus> getThreadPoolStatus(@PathVariable String poolName) {
try {
ThreadPoolMonitorService.ThreadPoolStatus status = threadPoolMonitorService.getThreadPoolStatus(poolName);
if (status != null) {
return ResponseEntity.ok(status);
} else {
return ResponseEntity.notFound().build();
}
} catch (Exception e) {
log.error("获取线程池状态失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 获取所有线程池状态
*/
@GetMapping("/monitor/status/all")
public ResponseEntity<Map<String, ThreadPoolMonitorService.ThreadPoolStatus>> getAllThreadPoolStatus() {
try {
Map<String, ThreadPoolMonitorService.ThreadPoolStatus> statusMap = Map.of(
"core", threadPoolMonitorService.getThreadPoolStatus("core"),
"io", threadPoolMonitorService.getThreadPoolStatus("io"),
"cpu", threadPoolMonitorService.getThreadPoolStatus("cpu")
);
return ResponseEntity.ok(statusMap);
} catch (Exception e) {
log.error("获取所有线程池状态失败", e);
return ResponseEntity.internalServerError().build();
}
}
}

10. 测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package com.executor.service;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* ExecutorService测试类
* @author Java实战
*/
@Slf4j
@SpringBootTest
public class ExecutorServiceTest {

@Autowired
private AsyncTaskService asyncTaskService;

@Autowired
private TaskManagementService taskManagementService;

/**
* 测试简单异步任务
*/
@Test
public void testSimpleAsyncTask() throws Exception {
log.info("开始测试简单异步任务");

CompletableFuture<String> future = asyncTaskService.executeSimpleTask("测试任务1");
String result = future.get(10, TimeUnit.SECONDS);

log.info("简单异步任务测试完成,结果: {}", result);
}

/**
* 测试IO密集型任务
*/
@Test
public void testIOTask() throws Exception {
log.info("开始测试IO密集型任务");

CompletableFuture<String> future = asyncTaskService.executeIOTask("IO测试任务");
String result = future.get(15, TimeUnit.SECONDS);

log.info("IO密集型任务测试完成,结果: {}", result);
}

/**
* 测试CPU密集型任务
*/
@Test
public void testCPUTask() throws Exception {
log.info("开始测试CPU密集型任务");

CompletableFuture<Integer> future = asyncTaskService.executeCPUTask("CPU测试任务");
Integer result = future.get(20, TimeUnit.SECONDS);

log.info("CPU密集型任务测试完成,结果: {}", result);
}

/**
* 测试批量任务
*/
@Test
public void testBatchTasks() throws Exception {
log.info("开始测试批量任务");

List<String> taskNames = Arrays.asList("任务1", "任务2", "任务3", "任务4", "任务5");
List<String> results = taskManagementService.executeBatchTasks(taskNames);

log.info("批量任务测试完成,结果数量: {}", results.size());
results.forEach(result -> log.info("批量任务结果: {}", result));
}

/**
* 测试任务链
*/
@Test
public void testTaskChain() throws Exception {
log.info("开始测试任务链");

String result = taskManagementService.executeTaskChain("初始任务");

log.info("任务链测试完成,结果: {}", result);
}

/**
* 测试并行任务
*/
@Test
public void testParallelTasks() throws Exception {
log.info("开始测试并行任务");

List<String> taskNames = Arrays.asList("并行任务1", "并行任务2", "并行任务3");
List<String> results = taskManagementService.executeParallelTasks(taskNames);

log.info("并行任务测试完成,结果数量: {}", results.size());
results.forEach(result -> log.info("并行任务结果: {}", result));
}

/**
* 测试异常处理
*/
@Test
public void testExceptionHandling() throws Exception {
log.info("开始测试异常处理");

String result = taskManagementService.executeTaskWithExceptionHandling("异常测试任务", true);

log.info("异常处理测试完成,结果: {}", result);
}
}

11. 总结

ExecutorService是Java并发编程的核心组件。通过本文的详细介绍,我们了解了:

  1. 线程池配置: 不同类型线程池的配置和使用
  2. 异步任务执行: 使用@Async注解执行异步任务
  3. 任务管理: 批量任务、任务链、并行任务的处理
  4. 定时任务: 使用@Scheduled注解执行定时任务
  5. 线程池监控: 监控线程池状态和性能

通过合理的线程池配置和任务管理,可以为应用程序提供高效的并发处理能力。


Java实战要点:

  • ExecutorService提供线程池管理能力
  • @Async注解简化异步任务执行
  • CompletableFuture提供强大的异步编程能力
  • 合理配置线程池参数提升性能
  • 监控线程池状态确保系统稳定

代码注解说明:

  • @Async: 异步方法注解
  • @Scheduled: 定时任务注解
  • CompletableFuture: 异步任务结果处理
  • ThreadPoolTaskExecutor: Spring线程池实现
  • 拒绝策略: 处理线程池满时的策略