CompletableFuture:异步编程与任务组合实战

1. CompletableFuture概述

CompletableFuture是Java 8引入的异步编程工具,提供了强大的异步任务处理能力,支持任务组合、异常处理、超时控制等功能。本文将详细介绍CompletableFuture的异步编程、任务组合策略、异常处理机制、超时控制和性能优化的完整解决方案。

1.1 核心功能

  1. 异步编程: 异步任务执行、非阻塞操作
  2. 任务组合: 任务串行、并行、依赖关系
  3. 异常处理: 异常捕获、错误恢复、重试机制
  4. 超时控制: 任务超时、超时处理、资源释放
  5. 性能优化: 线程池管理、任务调度、资源优化

1.2 技术架构

1
2
3
4
5
异步任务 → CompletableFuture → 任务组合 → 异常处理 → 结果返回
↓ ↓ ↓ ↓ ↓
非阻塞 → 异步执行 → 依赖管理 → 错误处理 → 性能优化
↓ ↓ ↓ ↓ ↓
高并发 → 线程池 → 任务调度 → 超时控制 → 资源管理

2. CompletableFuture配置

2.1 Maven依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Spring Boot Data Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- Spring Boot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<!-- Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>

<!-- Apache Commons Lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>

2.2 CompletableFuture配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
* CompletableFuture配置类
*/
@Configuration
public class CompletableFutureConfig {

@Value("${completable-future.thread-pool.core-size:10}")
private int corePoolSize;

@Value("${completable-future.thread-pool.max-size:50}")
private int maxPoolSize;

@Value("${completable-future.thread-pool.queue-capacity:1000}")
private int queueCapacity;

/**
* CompletableFuture配置属性
*/
@Bean
public CompletableFutureProperties completableFutureProperties() {
return CompletableFutureProperties.builder()
.corePoolSize(corePoolSize)
.maxPoolSize(maxPoolSize)
.queueCapacity(queueCapacity)
.build();
}

/**
* 异步任务线程池
*/
@Bean("asyncTaskExecutor")
public ThreadPoolTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("AsyncTask-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}

/**
* CompletableFuture服务
*/
@Bean
public CompletableFutureService completableFutureService() {
return new CompletableFutureService(completableFutureProperties());
}
}

/**
* CompletableFuture配置属性
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CompletableFutureProperties {
private int corePoolSize;
private int maxPoolSize;
private int queueCapacity;

// 超时配置
private int defaultTimeout = 30; // 秒
private boolean enableTimeout = true;
private int maxTimeout = 300; // 秒

// 重试配置
private boolean enableRetry = true;
private int maxRetryAttempts = 3;
private int retryDelay = 1000; // 毫秒

// 监控配置
private boolean enableMonitoring = true;
private int monitoringInterval = 60; // 秒
private boolean enableMetrics = true;
}

3. CompletableFuture基础用法

3.1 CompletableFuture基础用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/**
* CompletableFuture基础服务
*/
@Service
public class CompletableFutureService {

private final CompletableFutureProperties properties;
private final ThreadPoolTaskExecutor asyncTaskExecutor;

public CompletableFutureService(CompletableFutureProperties properties) {
this.properties = properties;
this.asyncTaskExecutor = null; // 注入
}

/**
* 异步执行任务(无返回值)
* @param task 任务
* @return CompletableFuture
*/
public CompletableFuture<Void> runAsync(Runnable task) {
return CompletableFuture.runAsync(task, asyncTaskExecutor);
}

/**
* 异步执行任务(有返回值)
* @param task 任务
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> supplyAsync(Supplier<T> task) {
return CompletableFuture.supplyAsync(task, asyncTaskExecutor);
}

/**
* 异步执行任务(带超时)
* @param task 任务
* @param timeout 超时时间(秒)
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> supplyAsyncWithTimeout(Supplier<T> task, int timeout) {
CompletableFuture<T> future = CompletableFuture.supplyAsync(task, asyncTaskExecutor);

if (properties.isEnableTimeout()) {
return future.orTimeout(timeout, TimeUnit.SECONDS);
}

return future;
}

/**
* 异步执行任务(带重试)
* @param task 任务
* @param maxRetries 最大重试次数
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> supplyAsyncWithRetry(Supplier<T> task, int maxRetries) {
return supplyAsyncWithRetryInternal(task, maxRetries, 0);
}

/**
* 内部重试方法
* @param task 任务
* @param maxRetries 最大重试次数
* @param currentRetry 当前重试次数
* @param <T> 返回类型
* @return CompletableFuture
*/
private <T> CompletableFuture<T> supplyAsyncWithRetryInternal(Supplier<T> task, int maxRetries, int currentRetry) {
return CompletableFuture.supplyAsync(task, asyncTaskExecutor)
.handle((result, throwable) -> {
if (throwable != null && currentRetry < maxRetries) {
log.warn("任务执行失败,准备重试: retry={}, maxRetries={}, error={}",
currentRetry + 1, maxRetries, throwable.getMessage());

try {
Thread.sleep(properties.getRetryDelay());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return supplyAsyncWithRetryInternal(task, maxRetries, currentRetry + 1);
} else if (throwable != null) {
log.error("任务执行失败,重试次数已用完: maxRetries={}", maxRetries, throwable);
throw new RuntimeException(throwable);
} else {
return CompletableFuture.completedFuture(result);
}
})
.thenCompose(Function.identity());
}

/**
* 异步执行任务(带异常处理)
* @param task 任务
* @param exceptionHandler 异常处理器
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> supplyAsyncWithExceptionHandling(
Supplier<T> task,
Function<Throwable, T> exceptionHandler) {
return CompletableFuture.supplyAsync(task, asyncTaskExecutor)
.handle((result, throwable) -> {
if (throwable != null) {
log.error("任务执行异常", throwable);
return exceptionHandler.apply(throwable);
}
return result;
});
}
}

4. 任务组合策略

4.1 任务组合策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/**
* 任务组合服务
*/
@Service
public class TaskCombinationService {

private final CompletableFutureService completableFutureService;

public TaskCombinationService(CompletableFutureService completableFutureService) {
this.completableFutureService = completableFutureService;
}

/**
* 串行执行任务
* @param tasks 任务列表
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<List<T>> executeSequentially(List<Supplier<T>> tasks) {
CompletableFuture<List<T>> result = CompletableFuture.completedFuture(new ArrayList<>());

for (Supplier<T> task : tasks) {
result = result.thenCompose(list ->
completableFutureService.supplyAsync(task)
.thenApply(value -> {
list.add(value);
return list;
})
);
}

return result;
}

/**
* 并行执行任务
* @param tasks 任务列表
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<List<T>> executeInParallel(List<Supplier<T>> tasks) {
List<CompletableFuture<T>> futures = tasks.stream()
.map(task -> completableFutureService.supplyAsync(task))
.collect(Collectors.toList());

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}

/**
* 执行任务并等待第一个完成
* @param tasks 任务列表
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> executeAndWaitForFirst(List<Supplier<T>> tasks) {
List<CompletableFuture<T>> futures = tasks.stream()
.map(task -> completableFutureService.supplyAsync(task))
.collect(Collectors.toList());

return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]))
.thenApply(result -> (T) result);
}

/**
* 执行任务并等待所有完成
* @param tasks 任务列表
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<List<T>> executeAndWaitForAll(List<Supplier<T>> tasks) {
List<CompletableFuture<T>> futures = tasks.stream()
.map(task -> completableFutureService.supplyAsync(task))
.collect(Collectors.toList());

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}

/**
* 执行任务并处理部分失败
* @param tasks 任务列表
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<List<T>> executeWithPartialFailureHandling(List<Supplier<T>> tasks) {
List<CompletableFuture<T>> futures = tasks.stream()
.map(task -> completableFutureService.supplyAsyncWithExceptionHandling(
task,
throwable -> null // 异常时返回null
))
.collect(Collectors.toList());

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList())
);
}

/**
* 执行任务并收集结果
* @param tasks 任务列表
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<List<T>> executeAndCollectResults(List<Supplier<T>> tasks) {
List<CompletableFuture<T>> futures = tasks.stream()
.map(task -> completableFutureService.supplyAsync(task))
.collect(Collectors.toList());

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
}

5. 异常处理机制

5.1 异常处理机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/**
* 异常处理服务
*/
@Service
public class ExceptionHandlingService {

private final CompletableFutureService completableFutureService;

public ExceptionHandlingService(CompletableFutureService completableFutureService) {
this.completableFutureService = completableFutureService;
}

/**
* 处理异常并返回默认值
* @param task 任务
* @param defaultValue 默认值
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> handleExceptionWithDefault(Supplier<T> task, T defaultValue) {
return completableFutureService.supplyAsync(task)
.handle((result, throwable) -> {
if (throwable != null) {
log.error("任务执行异常,返回默认值", throwable);
return defaultValue;
}
return result;
});
}

/**
* 处理异常并重试
* @param task 任务
* @param maxRetries 最大重试次数
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> handleExceptionWithRetry(Supplier<T> task, int maxRetries) {
return completableFutureService.supplyAsyncWithRetry(task, maxRetries);
}

/**
* 处理异常并执行备用任务
* @param primaryTask 主要任务
* @param fallbackTask 备用任务
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> handleExceptionWithFallback(
Supplier<T> primaryTask,
Supplier<T> fallbackTask) {
return completableFutureService.supplyAsync(primaryTask)
.handle((result, throwable) -> {
if (throwable != null) {
log.warn("主要任务执行失败,执行备用任务", throwable);
return completableFutureService.supplyAsync(fallbackTask);
} else {
return CompletableFuture.completedFuture(result);
}
})
.thenCompose(Function.identity());
}

/**
* 处理异常并记录日志
* @param task 任务
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> handleExceptionWithLogging(Supplier<T> task) {
return completableFutureService.supplyAsync(task)
.handle((result, throwable) -> {
if (throwable != null) {
log.error("任务执行异常", throwable);
throw new RuntimeException(throwable);
}
return result;
});
}

/**
* 处理异常并发送告警
* @param task 任务
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> handleExceptionWithAlert(Supplier<T> task) {
return completableFutureService.supplyAsync(task)
.handle((result, throwable) -> {
if (throwable != null) {
log.error("任务执行异常,发送告警", throwable);
sendAlert(throwable);
throw new RuntimeException(throwable);
}
return result;
});
}

/**
* 发送告警
* @param throwable 异常
*/
private void sendAlert(Throwable throwable) {
try {
// 实现告警发送逻辑
log.info("发送告警: {}", throwable.getMessage());
} catch (Exception e) {
log.error("发送告警失败", e);
}
}
}

6. 超时控制机制

6.1 超时控制机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/**
* 超时控制服务
*/
@Service
public class TimeoutControlService {

private final CompletableFutureService completableFutureService;

public TimeoutControlService(CompletableFutureService completableFutureService) {
this.completableFutureService = completableFutureService;
}

/**
* 执行任务并设置超时
* @param task 任务
* @param timeout 超时时间(秒)
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> executeWithTimeout(Supplier<T> task, int timeout) {
return completableFutureService.supplyAsyncWithTimeout(task, timeout);
}

/**
* 执行任务并设置超时(带默认值)
* @param task 任务
* @param timeout 超时时间(秒)
* @param defaultValue 默认值
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> executeWithTimeoutAndDefault(
Supplier<T> task,
int timeout,
T defaultValue) {
return completableFutureService.supplyAsyncWithTimeout(task, timeout)
.handle((result, throwable) -> {
if (throwable instanceof TimeoutException) {
log.warn("任务执行超时,返回默认值: timeout={}s", timeout);
return defaultValue;
} else if (throwable != null) {
log.error("任务执行异常", throwable);
throw new RuntimeException(throwable);
}
return result;
});
}

/**
* 执行任务并设置超时(带重试)
* @param task 任务
* @param timeout 超时时间(秒)
* @param maxRetries 最大重试次数
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> executeWithTimeoutAndRetry(
Supplier<T> task,
int timeout,
int maxRetries) {
return executeWithTimeoutAndRetryInternal(task, timeout, maxRetries, 0);
}

/**
* 内部重试方法
* @param task 任务
* @param timeout 超时时间(秒)
* @param maxRetries 最大重试次数
* @param currentRetry 当前重试次数
* @param <T> 返回类型
* @return CompletableFuture
*/
private <T> CompletableFuture<T> executeWithTimeoutAndRetryInternal(
Supplier<T> task,
int timeout,
int maxRetries,
int currentRetry) {
return completableFutureService.supplyAsyncWithTimeout(task, timeout)
.handle((result, throwable) -> {
if (throwable instanceof TimeoutException && currentRetry < maxRetries) {
log.warn("任务执行超时,准备重试: retry={}, maxRetries={}, timeout={}s",
currentRetry + 1, maxRetries, timeout);

try {
Thread.sleep(1000); // 等待1秒后重试
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return executeWithTimeoutAndRetryInternal(task, timeout, maxRetries, currentRetry + 1);
} else if (throwable instanceof TimeoutException) {
log.error("任务执行超时,重试次数已用完: maxRetries={}, timeout={}s", maxRetries, timeout);
throw new RuntimeException(throwable);
} else if (throwable != null) {
log.error("任务执行异常", throwable);
throw new RuntimeException(throwable);
} else {
return CompletableFuture.completedFuture(result);
}
})
.thenCompose(Function.identity());
}

/**
* 执行任务并设置超时(带备用任务)
* @param primaryTask 主要任务
* @param fallbackTask 备用任务
* @param timeout 超时时间(秒)
* @param <T> 返回类型
* @return CompletableFuture
*/
public <T> CompletableFuture<T> executeWithTimeoutAndFallback(
Supplier<T> primaryTask,
Supplier<T> fallbackTask,
int timeout) {
return completableFutureService.supplyAsyncWithTimeout(primaryTask, timeout)
.handle((result, throwable) -> {
if (throwable instanceof TimeoutException) {
log.warn("主要任务执行超时,执行备用任务: timeout={}s", timeout);
return completableFutureService.supplyAsync(fallbackTask);
} else if (throwable != null) {
log.error("主要任务执行异常", throwable);
return completableFutureService.supplyAsync(fallbackTask);
} else {
return CompletableFuture.completedFuture(result);
}
})
.thenCompose(Function.identity());
}
}

7. CompletableFuture控制器

7.1 CompletableFuture控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/**
* CompletableFuture控制器
*/
@RestController
@RequestMapping("/api/v1/completable-future")
public class CompletableFutureController {

@Autowired
private CompletableFutureService completableFutureService;

@Autowired
private TaskCombinationService taskCombinationService;

@Autowired
private ExceptionHandlingService exceptionHandlingService;

@Autowired
private TimeoutControlService timeoutControlService;

/**
* 异步执行任务
*/
@PostMapping("/async")
public ResponseEntity<Map<String, Object>> executeAsync(@RequestBody AsyncTaskRequest request) {
try {
CompletableFuture<String> future = completableFutureService.supplyAsync(() -> {
// 模拟任务执行
try {
Thread.sleep(request.getDelay());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务执行完成: " + request.getTaskName();
});

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("message", "任务已提交");
response.put("taskName", request.getTaskName());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("异步执行任务失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "异步执行任务失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 并行执行任务
*/
@PostMapping("/parallel")
public ResponseEntity<Map<String, Object>> executeParallel(@RequestBody ParallelTaskRequest request) {
try {
List<Supplier<String>> tasks = request.getTaskNames().stream()
.map(taskName -> (Supplier<String>) () -> {
try {
Thread.sleep(request.getDelay());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务执行完成: " + taskName;
})
.collect(Collectors.toList());

CompletableFuture<List<String>> future = taskCombinationService.executeInParallel(tasks);

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("message", "并行任务已提交");
response.put("taskCount", request.getTaskNames().size());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("并行执行任务失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "并行执行任务失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 执行任务并处理异常
*/
@PostMapping("/exception-handling")
public ResponseEntity<Map<String, Object>> executeWithExceptionHandling(@RequestBody ExceptionTaskRequest request) {
try {
CompletableFuture<String> future = exceptionHandlingService.handleExceptionWithDefault(
() -> {
if (request.isShouldFail()) {
throw new RuntimeException("任务执行失败");
}
return "任务执行成功: " + request.getTaskName();
},
"任务执行失败,返回默认值"
);

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("message", "异常处理任务已提交");
response.put("taskName", request.getTaskName());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("执行异常处理任务失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "执行异常处理任务失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 执行任务并设置超时
*/
@PostMapping("/timeout")
public ResponseEntity<Map<String, Object>> executeWithTimeout(@RequestBody TimeoutTaskRequest request) {
try {
CompletableFuture<String> future = timeoutControlService.executeWithTimeoutAndDefault(
() -> {
try {
Thread.sleep(request.getDelay());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务执行成功: " + request.getTaskName();
},
request.getTimeout(),
"任务执行超时,返回默认值"
);

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("message", "超时控制任务已提交");
response.put("taskName", request.getTaskName());
response.put("timeout", request.getTimeout());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("执行超时控制任务失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "执行超时控制任务失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}
}

/**
* 异步任务请求模型
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AsyncTaskRequest {
private String taskName;
private int delay; // 毫秒
}

/**
* 并行任务请求模型
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ParallelTaskRequest {
private List<String> taskNames;
private int delay; // 毫秒
}

/**
* 异常任务请求模型
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ExceptionTaskRequest {
private String taskName;
private boolean shouldFail;
}

/**
* 超时任务请求模型
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TimeoutTaskRequest {
private String taskName;
private int delay; // 毫秒
private int timeout; // 秒
}

8. 总结

通过CompletableFuture的实现,我们成功构建了一个强大的异步编程框架。关键特性包括:

8.1 核心优势

  1. 异步编程: 异步任务执行、非阻塞操作
  2. 任务组合: 任务串行、并行、依赖关系
  3. 异常处理: 异常捕获、错误恢复、重试机制
  4. 超时控制: 任务超时、超时处理、资源释放
  5. 性能优化: 线程池管理、任务调度、资源优化

8.2 最佳实践

  1. 异步编程: 合理使用线程池、避免阻塞操作
  2. 任务组合: 选择合适的组合策略、处理依赖关系
  3. 异常处理: 完善的异常处理机制、错误恢复策略
  4. 超时控制: 合理的超时设置、超时处理机制
  5. 性能优化: 线程池调优、任务调度优化、资源管理

这套CompletableFuture方案不仅能够提供强大的异步编程能力,还包含了完整的任务组合、异常处理、超时控制等核心功能,是企业级Java应用的重要技术基础。