前言

CompletableFuture作为Java 8引入的异步编程核心工具,能够有效实现复杂的异步任务编排,提高系统的并发处理能力和响应性能。通过合理的异步任务编排策略和并发处理,能够构建一个高效、稳定、可扩展的异步任务编排系统,确保系统的稳定运行。本文从异步任务编排策略到并发处理,从基础实现到企业级应用,系统梳理CompletableFuture异步任务编排的完整解决方案。

一、CompletableFuture异步任务编排架构设计

1.1 异步任务编排整体架构

1.2 异步任务编排策略架构

二、CompletableFuture异步任务编排实现

2.1 基础异步任务编排

2.1.1 异步任务管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/**
* CompletableFuture异步任务管理器
*/
@Service
public class CompletableFutureTaskManager {

@Autowired
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private RabbitTemplate rabbitTemplate;

private final String ASYNC_TASK_CACHE_PREFIX = "async_task:";
private final long ASYNC_TASK_CACHE_EXPIRE = 3600; // 1小时

/**
* 创建异步任务
*/
public <T> CompletableFuture<T> createAsyncTask(Supplier<T> task, String taskId) {
try {
// 1. 验证任务
validateTask(task, taskId);

// 2. 创建CompletableFuture
CompletableFuture<T> future = CompletableFuture.supplyAsync(task, taskExecutor);

// 3. 设置任务ID
future = future.thenApply(result -> {
setTaskId(result, taskId);
return result;
});

// 4. 记录任务信息
recordTaskInfo(taskId, future);

// 5. 添加异常处理
future = future.exceptionally(throwable -> {
handleTaskException(taskId, throwable);
return null;
});

return future;

} catch (Exception e) {
log.error("创建异步任务失败: {}", taskId, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 验证任务
*/
private void validateTask(Object task, String taskId) {
if (task == null) {
throw new IllegalArgumentException("任务不能为空");
}

if (taskId == null || taskId.trim().isEmpty()) {
throw new IllegalArgumentException("任务ID不能为空");
}
}

/**
* 设置任务ID
*/
private <T> T setTaskId(T result, String taskId) {
try {
if (result instanceof TaskResult) {
((TaskResult) result).setTaskId(taskId);
}
return result;
} catch (Exception e) {
log.error("设置任务ID失败", e);
return result;
}
}

/**
* 记录任务信息
*/
private void recordTaskInfo(String taskId, CompletableFuture<?> future) {
try {
AsyncTaskInfo taskInfo = new AsyncTaskInfo();
taskInfo.setTaskId(taskId);
taskInfo.setCreateTime(LocalDateTime.now());
taskInfo.setStatus(AsyncTaskStatus.RUNNING);

String cacheKey = ASYNC_TASK_CACHE_PREFIX + taskId;
redisTemplate.opsForValue().set(cacheKey, taskInfo, Duration.ofSeconds(ASYNC_TASK_CACHE_EXPIRE));

} catch (Exception e) {
log.error("记录任务信息失败", e);
}
}

/**
* 处理任务异常
*/
private void handleTaskException(String taskId, Throwable throwable) {
try {
log.error("任务执行异常: {}", taskId, throwable);

// 更新任务状态
updateTaskStatus(taskId, AsyncTaskStatus.FAILED, throwable.getMessage());

// 发送异常事件
sendTaskExceptionEvent(taskId, throwable);

} catch (Exception e) {
log.error("处理任务异常失败", e);
}
}

/**
* 更新任务状态
*/
private void updateTaskStatus(String taskId, AsyncTaskStatus status, String errorMessage) {
try {
String cacheKey = ASYNC_TASK_CACHE_PREFIX + taskId;
AsyncTaskInfo taskInfo = (AsyncTaskInfo) redisTemplate.opsForValue().get(cacheKey);

if (taskInfo != null) {
taskInfo.setStatus(status);
taskInfo.setErrorMessage(errorMessage);
taskInfo.setEndTime(LocalDateTime.now());

redisTemplate.opsForValue().set(cacheKey, taskInfo, Duration.ofSeconds(ASYNC_TASK_CACHE_EXPIRE));
}

} catch (Exception e) {
log.error("更新任务状态失败", e);
}
}

/**
* 发送任务异常事件
*/
private void sendTaskExceptionEvent(String taskId, Throwable throwable) {
try {
TaskExceptionEvent event = new TaskExceptionEvent();
event.setTaskId(taskId);
event.setExceptionType(throwable.getClass().getSimpleName());
event.setExceptionMessage(throwable.getMessage());
event.setEventTime(LocalDateTime.now());

rabbitTemplate.convertAndSend("task.exception.queue", event);

} catch (Exception e) {
log.error("发送任务异常事件失败", e);
}
}
}

2.1.2 顺序任务编排

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/**
* 顺序任务编排服务
*/
@Service
public class SequentialTaskOrchestrationService {

@Autowired
private CompletableFutureTaskManager taskManager;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final String SEQUENTIAL_TASK_CACHE_PREFIX = "sequential_task:";

/**
* 顺序执行任务
*/
public <T> CompletableFuture<T> executeSequentialTasks(List<Supplier<T>> tasks, String orchestrationId) {
try {
// 1. 验证任务列表
validateTaskList(tasks, orchestrationId);

// 2. 创建初始Future
CompletableFuture<T> result = CompletableFuture.completedFuture(null);

// 3. 顺序执行任务
for (int i = 0; i < tasks.size(); i++) {
final int taskIndex = i;
final Supplier<T> task = tasks.get(i);
final String taskId = orchestrationId + "_" + taskIndex;

result = result.thenCompose(previousResult -> {
return taskManager.createAsyncTask(task, taskId);
});
}

// 4. 记录编排信息
recordOrchestrationInfo(orchestrationId, "SEQUENTIAL", tasks.size());

return result;

} catch (Exception e) {
log.error("顺序执行任务失败: {}", orchestrationId, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 验证任务列表
*/
private void validateTaskList(List<?> tasks, String orchestrationId) {
if (tasks == null || tasks.isEmpty()) {
throw new IllegalArgumentException("任务列表不能为空");
}

if (orchestrationId == null || orchestrationId.trim().isEmpty()) {
throw new IllegalArgumentException("编排ID不能为空");
}
}

/**
* 记录编排信息
*/
private void recordOrchestrationInfo(String orchestrationId, String type, int taskCount) {
try {
TaskOrchestrationInfo info = new TaskOrchestrationInfo();
info.setOrchestrationId(orchestrationId);
info.setType(type);
info.setTaskCount(taskCount);
info.setCreateTime(LocalDateTime.now());

String cacheKey = SEQUENTIAL_TASK_CACHE_PREFIX + orchestrationId;
redisTemplate.opsForValue().set(cacheKey, info, Duration.ofHours(1));

} catch (Exception e) {
log.error("记录编排信息失败", e);
}
}

/**
* 顺序执行任务(带结果传递)
*/
public <T, R> CompletableFuture<R> executeSequentialTasksWithResult(
List<Function<T, R>> tasks, T initialValue, String orchestrationId) {
try {
// 1. 验证任务列表
validateTaskList(tasks, orchestrationId);

// 2. 创建初始Future
CompletableFuture<T> result = CompletableFuture.completedFuture(initialValue);

// 3. 顺序执行任务
for (int i = 0; i < tasks.size(); i++) {
final int taskIndex = i;
final Function<T, R> task = tasks.get(i);
final String taskId = orchestrationId + "_" + taskIndex;

result = result.thenCompose(input -> {
return taskManager.createAsyncTask(() -> task.apply(input), taskId);
});
}

// 4. 记录编排信息
recordOrchestrationInfo(orchestrationId, "SEQUENTIAL_WITH_RESULT", tasks.size());

return result.thenApply(Function.identity());

} catch (Exception e) {
log.error("顺序执行任务(带结果传递)失败: {}", orchestrationId, e);
return CompletableFuture.failedFuture(e);
}
}
}

2.1.3 并行任务编排

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/**
* 并行任务编排服务
*/
@Service
public class ParallelTaskOrchestrationService {

@Autowired
private CompletableFutureTaskManager taskManager;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final String PARALLEL_TASK_CACHE_PREFIX = "parallel_task:";

/**
* 并行执行任务
*/
public <T> CompletableFuture<List<T>> executeParallelTasks(List<Supplier<T>> tasks, String orchestrationId) {
try {
// 1. 验证任务列表
validateTaskList(tasks, orchestrationId);

// 2. 创建并行任务
List<CompletableFuture<T>> futures = new ArrayList<>();

for (int i = 0; i < tasks.size(); i++) {
final int taskIndex = i;
final Supplier<T> task = tasks.get(i);
final String taskId = orchestrationId + "_" + taskIndex;

CompletableFuture<T> future = taskManager.createAsyncTask(task, taskId);
futures.add(future);
}

// 3. 等待所有任务完成
CompletableFuture<List<T>> result = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));

// 4. 记录编排信息
recordOrchestrationInfo(orchestrationId, "PARALLEL", tasks.size());

return result;

} catch (Exception e) {
log.error("并行执行任务失败: {}", orchestrationId, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 验证任务列表
*/
private void validateTaskList(List<?> tasks, String orchestrationId) {
if (tasks == null || tasks.isEmpty()) {
throw new IllegalArgumentException("任务列表不能为空");
}

if (orchestrationId == null || orchestrationId.trim().isEmpty()) {
throw new IllegalArgumentException("编排ID不能为空");
}
}

/**
* 记录编排信息
*/
private void recordOrchestrationInfo(String orchestrationId, String type, int taskCount) {
try {
TaskOrchestrationInfo info = new TaskOrchestrationInfo();
info.setOrchestrationId(orchestrationId);
info.setType(type);
info.setTaskCount(taskCount);
info.setCreateTime(LocalDateTime.now());

String cacheKey = PARALLEL_TASK_CACHE_PREFIX + orchestrationId;
redisTemplate.opsForValue().set(cacheKey, info, Duration.ofHours(1));

} catch (Exception e) {
log.error("记录编排信息失败", e);
}
}

/**
* 并行执行任务(带超时控制)
*/
public <T> CompletableFuture<List<T>> executeParallelTasksWithTimeout(
List<Supplier<T>> tasks, long timeout, TimeUnit timeUnit, String orchestrationId) {
try {
// 1. 验证任务列表
validateTaskList(tasks, orchestrationId);

// 2. 创建并行任务
List<CompletableFuture<T>> futures = new ArrayList<>();

for (int i = 0; i < tasks.size(); i++) {
final int taskIndex = i;
final Supplier<T> task = tasks.get(i);
final String taskId = orchestrationId + "_" + taskIndex;

CompletableFuture<T> future = taskManager.createAsyncTask(task, taskId);
futures.add(future);
}

// 3. 等待所有任务完成(带超时)
CompletableFuture<List<T>> result = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()))
.orTimeout(timeout, timeUnit);

// 4. 记录编排信息
recordOrchestrationInfo(orchestrationId, "PARALLEL_WITH_TIMEOUT", tasks.size());

return result;

} catch (Exception e) {
log.error("并行执行任务(带超时控制)失败: {}", orchestrationId, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 并行执行任务(部分成功)
*/
public <T> CompletableFuture<List<T>> executeParallelTasksPartialSuccess(
List<Supplier<T>> tasks, String orchestrationId) {
try {
// 1. 验证任务列表
validateTaskList(tasks, orchestrationId);

// 2. 创建并行任务
List<CompletableFuture<T>> futures = new ArrayList<>();

for (int i = 0; i < tasks.size(); i++) {
final int taskIndex = i;
final Supplier<T> task = tasks.get(i);
final String taskId = orchestrationId + "_" + taskIndex;

CompletableFuture<T> future = taskManager.createAsyncTask(task, taskId)
.exceptionally(throwable -> {
log.error("任务执行失败: {}", taskId, throwable);
return null; // 返回null表示失败
});
futures.add(future);
}

// 3. 等待所有任务完成(允许部分失败)
CompletableFuture<List<T>> result = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull) // 过滤掉null值
.collect(Collectors.toList()));

// 4. 记录编排信息
recordOrchestrationInfo(orchestrationId, "PARALLEL_PARTIAL_SUCCESS", tasks.size());

return result;

} catch (Exception e) {
log.error("并行执行任务(部分成功)失败: {}", orchestrationId, e);
return CompletableFuture.failedFuture(e);
}
}
}

2.2 复杂任务编排

2.2.1 条件任务编排

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/**
* 条件任务编排服务
*/
@Service
public class ConditionalTaskOrchestrationService {

@Autowired
private CompletableFutureTaskManager taskManager;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final String CONDITIONAL_TASK_CACHE_PREFIX = "conditional_task:";

/**
* 条件执行任务
*/
public <T> CompletableFuture<T> executeConditionalTask(
Supplier<Boolean> condition, Supplier<T> trueTask, Supplier<T> falseTask, String orchestrationId) {
try {
// 1. 验证参数
validateConditionalTask(condition, trueTask, falseTask, orchestrationId);

// 2. 执行条件判断
CompletableFuture<Boolean> conditionFuture = taskManager.createAsyncTask(condition, orchestrationId + "_condition");

// 3. 根据条件执行不同任务
CompletableFuture<T> result = conditionFuture.thenCompose(conditionResult -> {
if (conditionResult) {
return taskManager.createAsyncTask(trueTask, orchestrationId + "_true");
} else {
return taskManager.createAsyncTask(falseTask, orchestrationId + "_false");
}
});

// 4. 记录编排信息
recordOrchestrationInfo(orchestrationId, "CONDITIONAL", 3);

return result;

} catch (Exception e) {
log.error("条件执行任务失败: {}", orchestrationId, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 验证条件任务参数
*/
private void validateConditionalTask(Object condition, Object trueTask, Object falseTask, String orchestrationId) {
if (condition == null) {
throw new IllegalArgumentException("条件不能为空");
}

if (trueTask == null) {
throw new IllegalArgumentException("条件为真时的任务不能为空");
}

if (falseTask == null) {
throw new IllegalArgumentException("条件为假时的任务不能为空");
}

if (orchestrationId == null || orchestrationId.trim().isEmpty()) {
throw new IllegalArgumentException("编排ID不能为空");
}
}

/**
* 记录编排信息
*/
private void recordOrchestrationInfo(String orchestrationId, String type, int taskCount) {
try {
TaskOrchestrationInfo info = new TaskOrchestrationInfo();
info.setOrchestrationId(orchestrationId);
info.setType(type);
info.setTaskCount(taskCount);
info.setCreateTime(LocalDateTime.now());

String cacheKey = CONDITIONAL_TASK_CACHE_PREFIX + orchestrationId;
redisTemplate.opsForValue().set(cacheKey, info, Duration.ofHours(1));

} catch (Exception e) {
log.error("记录编排信息失败", e);
}
}

/**
* 多条件执行任务
*/
public <T> CompletableFuture<T> executeMultiConditionalTask(
List<ConditionalTask<T>> conditionalTasks, Supplier<T> defaultTask, String orchestrationId) {
try {
// 1. 验证参数
validateMultiConditionalTask(conditionalTasks, defaultTask, orchestrationId);

// 2. 创建条件检查任务
CompletableFuture<T> result = CompletableFuture.completedFuture(null);

for (int i = 0; i < conditionalTasks.size(); i++) {
final int taskIndex = i;
final ConditionalTask<T> conditionalTask = conditionalTasks.get(taskIndex);
final String taskId = orchestrationId + "_" + taskIndex;

result = result.thenCompose(previousResult -> {
if (previousResult != null) {
return CompletableFuture.completedFuture(previousResult);
}

return taskManager.createAsyncTask(conditionalTask.getCondition(), taskId + "_condition")
.thenCompose(conditionResult -> {
if (conditionResult) {
return taskManager.createAsyncTask(conditionalTask.getTask(), taskId + "_task");
} else {
return CompletableFuture.completedFuture(null);
}
});
});
}

// 3. 如果所有条件都不满足,执行默认任务
result = result.thenCompose(finalResult -> {
if (finalResult == null && defaultTask != null) {
return taskManager.createAsyncTask(defaultTask, orchestrationId + "_default");
} else {
return CompletableFuture.completedFuture(finalResult);
}
});

// 4. 记录编排信息
recordOrchestrationInfo(orchestrationId, "MULTI_CONDITIONAL", conditionalTasks.size() + 1);

return result;

} catch (Exception e) {
log.error("多条件执行任务失败: {}", orchestrationId, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 验证多条件任务参数
*/
private void validateMultiConditionalTask(List<?> conditionalTasks, Object defaultTask, String orchestrationId) {
if (conditionalTasks == null || conditionalTasks.isEmpty()) {
throw new IllegalArgumentException("条件任务列表不能为空");
}

if (orchestrationId == null || orchestrationId.trim().isEmpty()) {
throw new IllegalArgumentException("编排ID不能为空");
}
}
}

2.2.2 循环任务编排

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/**
* 循环任务编排服务
*/
@Service
public class LoopTaskOrchestrationService {

@Autowired
private CompletableFutureTaskManager taskManager;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final String LOOP_TASK_CACHE_PREFIX = "loop_task:";

/**
* 循环执行任务
*/
public <T> CompletableFuture<List<T>> executeLoopTask(
Supplier<T> task, int maxIterations, String orchestrationId) {
try {
// 1. 验证参数
validateLoopTask(task, maxIterations, orchestrationId);

// 2. 创建循环任务
CompletableFuture<List<T>> result = CompletableFuture.completedFuture(new ArrayList<>());

for (int i = 0; i < maxIterations; i++) {
final int iteration = i;
final String taskId = orchestrationId + "_" + iteration;

result = result.thenCompose(results -> {
return taskManager.createAsyncTask(task, taskId)
.thenApply(taskResult -> {
results.add(taskResult);
return results;
});
});
}

// 3. 记录编排信息
recordOrchestrationInfo(orchestrationId, "LOOP", maxIterations);

return result;

} catch (Exception e) {
log.error("循环执行任务失败: {}", orchestrationId, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 验证循环任务参数
*/
private void validateLoopTask(Object task, int maxIterations, String orchestrationId) {
if (task == null) {
throw new IllegalArgumentException("任务不能为空");
}

if (maxIterations <= 0) {
throw new IllegalArgumentException("最大迭代次数必须大于0");
}

if (orchestrationId == null || orchestrationId.trim().isEmpty()) {
throw new IllegalArgumentException("编排ID不能为空");
}
}

/**
* 记录编排信息
*/
private void recordOrchestrationInfo(String orchestrationId, String type, int taskCount) {
try {
TaskOrchestrationInfo info = new TaskOrchestrationInfo();
info.setOrchestrationId(orchestrationId);
info.setType(type);
info.setTaskCount(taskCount);
info.setCreateTime(LocalDateTime.now());

String cacheKey = LOOP_TASK_CACHE_PREFIX + orchestrationId;
redisTemplate.opsForValue().set(cacheKey, info, Duration.ofHours(1));

} catch (Exception e) {
log.error("记录编排信息失败", e);
}
}

/**
* 条件循环执行任务
*/
public <T> CompletableFuture<List<T>> executeConditionalLoopTask(
Supplier<T> task, Supplier<Boolean> condition, int maxIterations, String orchestrationId) {
try {
// 1. 验证参数
validateConditionalLoopTask(task, condition, maxIterations, orchestrationId);

// 2. 创建条件循环任务
CompletableFuture<List<T>> result = CompletableFuture.completedFuture(new ArrayList<>());

for (int i = 0; i < maxIterations; i++) {
final int iteration = i;
final String taskId = orchestrationId + "_" + iteration;

result = result.thenCompose(results -> {
return taskManager.createAsyncTask(condition, taskId + "_condition")
.thenCompose(conditionResult -> {
if (conditionResult) {
return taskManager.createAsyncTask(task, taskId + "_task")
.thenApply(taskResult -> {
results.add(taskResult);
return results;
});
} else {
return CompletableFuture.completedFuture(results);
}
});
});
}

// 3. 记录编排信息
recordOrchestrationInfo(orchestrationId, "CONDITIONAL_LOOP", maxIterations);

return result;

} catch (Exception e) {
log.error("条件循环执行任务失败: {}", orchestrationId, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 验证条件循环任务参数
*/
private void validateConditionalLoopTask(Object task, Object condition, int maxIterations, String orchestrationId) {
if (task == null) {
throw new IllegalArgumentException("任务不能为空");
}

if (condition == null) {
throw new IllegalArgumentException("条件不能为空");
}

if (maxIterations <= 0) {
throw new IllegalArgumentException("最大迭代次数必须大于0");
}

if (orchestrationId == null || orchestrationId.trim().isEmpty()) {
throw new IllegalArgumentException("编排ID不能为空");
}
}
}

三、企业级CompletableFuture应用方案

3.1 任务编排管理服务

3.1.1 任务编排管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
/**
* 任务编排管理服务
*/
@Service
public class TaskOrchestrationManagementService {

@Autowired
private SequentialTaskOrchestrationService sequentialService;

@Autowired
private ParallelTaskOrchestrationService parallelService;

@Autowired
private ConditionalTaskOrchestrationService conditionalService;

@Autowired
private LoopTaskOrchestrationService loopService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final String TASK_ORCHESTRATION_MANAGEMENT_CACHE_PREFIX = "task_orchestration_management:";
private final long TASK_ORCHESTRATION_MANAGEMENT_CACHE_EXPIRE = 3600; // 1小时

/**
* 智能选择任务编排策略
*/
public <T> CompletableFuture<T> executeTaskOrchestration(TaskOrchestrationRequest<T> request) {
try {
// 1. 验证请求
validateRequest(request);

// 2. 选择编排策略
TaskOrchestrationStrategy strategy = selectOrchestrationStrategy(request);

// 3. 执行编排策略
CompletableFuture<T> result = executeOrchestrationStrategy(request, strategy);

// 4. 记录编排统计
recordOrchestrationStatistics(request, strategy);

return result;

} catch (Exception e) {
log.error("执行任务编排失败: {}", request.getOrchestrationId(), e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 验证请求
*/
private void validateRequest(TaskOrchestrationRequest<?> request) {
if (request == null) {
throw new IllegalArgumentException("请求不能为空");
}

if (request.getOrchestrationId() == null) {
throw new IllegalArgumentException("编排ID不能为空");
}

if (request.getTasks() == null || request.getTasks().isEmpty()) {
throw new IllegalArgumentException("任务列表不能为空");
}
}

/**
* 选择编排策略
*/
private TaskOrchestrationStrategy selectOrchestrationStrategy(TaskOrchestrationRequest<?> request) {
try {
if (request.getStrategy() != null) {
return request.getStrategy();
}

// 根据任务特征选择策略
if (request.getTasks().size() == 1) {
return TaskOrchestrationStrategy.SEQUENTIAL;
} else if (request.isParallel()) {
return TaskOrchestrationStrategy.PARALLEL;
} else if (request.isConditional()) {
return TaskOrchestrationStrategy.CONDITIONAL;
} else if (request.isLoop()) {
return TaskOrchestrationStrategy.LOOP;
} else {
return TaskOrchestrationStrategy.SEQUENTIAL; // 默认策略
}

} catch (Exception e) {
log.error("选择编排策略失败", e);
return TaskOrchestrationStrategy.SEQUENTIAL; // 默认策略
}
}

/**
* 执行编排策略
*/
private <T> CompletableFuture<T> executeOrchestrationStrategy(
TaskOrchestrationRequest<T> request, TaskOrchestrationStrategy strategy) {
try {
switch (strategy) {
case SEQUENTIAL:
return executeSequentialStrategy(request);
case PARALLEL:
return executeParallelStrategy(request);
case CONDITIONAL:
return executeConditionalStrategy(request);
case LOOP:
return executeLoopStrategy(request);
default:
throw new IllegalArgumentException("不支持的编排策略: " + strategy);
}

} catch (Exception e) {
log.error("执行编排策略失败", e);
throw new TaskOrchestrationException("执行编排策略失败", e);
}
}

/**
* 执行顺序策略
*/
private <T> CompletableFuture<T> executeSequentialStrategy(TaskOrchestrationRequest<T> request) {
try {
List<Supplier<T>> tasks = request.getTasks();
return sequentialService.executeSequentialTasks(tasks, request.getOrchestrationId())
.thenApply(results -> results.get(results.size() - 1)); // 返回最后一个结果

} catch (Exception e) {
log.error("执行顺序策略失败", e);
throw new TaskOrchestrationException("执行顺序策略失败", e);
}
}

/**
* 执行并行策略
*/
private <T> CompletableFuture<T> executeParallelStrategy(TaskOrchestrationRequest<T> request) {
try {
List<Supplier<T>> tasks = request.getTasks();
return parallelService.executeParallelTasks(tasks, request.getOrchestrationId())
.thenApply(results -> results.get(0)); // 返回第一个结果

} catch (Exception e) {
log.error("执行并行策略失败", e);
throw new TaskOrchestrationException("执行并行策略失败", e);
}
}

/**
* 执行条件策略
*/
private <T> CompletableFuture<T> executeConditionalStrategy(TaskOrchestrationRequest<T> request) {
try {
// 实现条件策略逻辑
return CompletableFuture.completedFuture(null);

} catch (Exception e) {
log.error("执行条件策略失败", e);
throw new TaskOrchestrationException("执行条件策略失败", e);
}
}

/**
* 执行循环策略
*/
private <T> CompletableFuture<T> executeLoopStrategy(TaskOrchestrationRequest<T> request) {
try {
// 实现循环策略逻辑
return CompletableFuture.completedFuture(null);

} catch (Exception e) {
log.error("执行循环策略失败", e);
throw new TaskOrchestrationException("执行循环策略失败", e);
}
}

/**
* 记录编排统计
*/
private void recordOrchestrationStatistics(TaskOrchestrationRequest<?> request, TaskOrchestrationStrategy strategy) {
try {
TaskOrchestrationStatistics statistics = new TaskOrchestrationStatistics();
statistics.setOrchestrationId(request.getOrchestrationId());
statistics.setStrategy(strategy);
statistics.setTaskCount(request.getTasks().size());
statistics.setExecutionTime(LocalDateTime.now());

// 异步记录统计
CompletableFuture.runAsync(() -> {
try {
saveOrchestrationStatistics(statistics);
} catch (Exception e) {
log.error("保存编排统计失败", e);
}
});

} catch (Exception e) {
log.error("记录编排统计失败", e);
}
}

/**
* 保存编排统计
*/
private void saveOrchestrationStatistics(TaskOrchestrationStatistics statistics) {
// 实现统计保存逻辑
log.info("保存编排统计: {}", statistics.getOrchestrationId());
}

/**
* 获取任务编排统计
*/
public TaskOrchestrationStatisticsResult getTaskOrchestrationStatistics(Date startTime, Date endTime) {
try {
TaskOrchestrationStatisticsResult result = new TaskOrchestrationStatisticsResult();
result.setStartTime(startTime);
result.setEndTime(endTime);

// 统计编排次数
result.setTotalOrchestrations(1000L); // 实际应用中需要从数据库统计

// 统计策略使用情况
Map<TaskOrchestrationStrategy, Long> strategyCount = new HashMap<>();
strategyCount.put(TaskOrchestrationStrategy.SEQUENTIAL, 400L);
strategyCount.put(TaskOrchestrationStrategy.PARALLEL, 300L);
strategyCount.put(TaskOrchestrationStrategy.CONDITIONAL, 200L);
strategyCount.put(TaskOrchestrationStrategy.LOOP, 100L);
result.setStrategyCount(strategyCount);

// 统计成功率
result.setSuccessRate(0.98); // 98%

// 统计平均执行时间
result.setAverageExecutionTime(5000.0); // 5秒

return result;

} catch (Exception e) {
log.error("获取任务编排统计失败", e);
throw new TaskOrchestrationException("获取任务编排统计失败", e);
}
}
}

四、性能优化与监控

4.1 性能优化

4.1.1 CompletableFuture性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/**
* CompletableFuture性能优化服务
*/
@Service
public class CompletableFuturePerformanceOptimizationService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private CaffeineCache localCache;

private final String COMPLETABLE_FUTURE_PERFORMANCE_CACHE_PREFIX = "completable_future_performance:";

/**
* 优化CompletableFuture性能
*/
public CompletableFutureOptimizationResult optimizeCompletableFuturePerformance(CompletableFutureOptimizationRequest request) {
try {
CompletableFutureOptimizationResult result = new CompletableFutureOptimizationResult();
result.setRequestId(request.getRequestId());
result.setStartTime(new Date());

// 1. 分析CompletableFuture模式
CompletableFuturePatternAnalysis patternAnalysis = analyzeCompletableFuturePattern(request);
result.setPatternAnalysis(patternAnalysis);

// 2. 优化线程池策略
ThreadPoolOptimizationResult threadPoolOptimization = optimizeThreadPoolStrategy(request, patternAnalysis);
result.setThreadPoolOptimization(threadPoolOptimization);

// 3. 优化缓存策略
CompletableFutureCacheOptimizationResult cacheOptimization = optimizeCompletableFutureCacheStrategy(request, patternAnalysis);
result.setCacheOptimization(cacheOptimization);

result.setStatus(CompletableFutureOptimizationStatus.COMPLETED);
result.setEndTime(new Date());

return result;

} catch (Exception e) {
log.error("优化CompletableFuture性能失败", e);
throw new CompletableFutureException("优化CompletableFuture性能失败", e);
}
}

/**
* 分析CompletableFuture模式
*/
private CompletableFuturePatternAnalysis analyzeCompletableFuturePattern(CompletableFutureOptimizationRequest request) {
try {
CompletableFuturePatternAnalysis analysis = new CompletableFuturePatternAnalysis();
analysis.setRequestId(request.getRequestId());

// 分析任务执行频率
analysis.setTaskExecutionFrequency(analyzeTaskExecutionFrequency(request.getTaskType()));

// 分析任务性能
analysis.setTaskPerformance(analyzeTaskPerformance(request.getTaskType()));

return analysis;

} catch (Exception e) {
log.error("分析CompletableFuture模式失败", e);
throw new CompletableFutureException("分析CompletableFuture模式失败", e);
}
}

/**
* 分析任务执行频率
*/
private TaskExecutionFrequency analyzeTaskExecutionFrequency(String taskType) {
try {
TaskExecutionFrequency frequency = new TaskExecutionFrequency();
frequency.setTaskType(taskType);
frequency.setDailyCount(10000);
frequency.setHourlyCount(1000);
frequency.setMinuteCount(100);

return frequency;

} catch (Exception e) {
log.error("分析任务执行频率失败", e);
return new TaskExecutionFrequency();
}
}

/**
* 分析任务性能
*/
private TaskPerformance analyzeTaskPerformance(String taskType) {
try {
TaskPerformance performance = new TaskPerformance();
performance.setTaskType(taskType);
performance.setAverageExecutionTime(50.0); // 50ms
performance.setSuccessRate(0.99); // 99%
performance.setThroughput(2000.0); // 2000次/秒

return performance;

} catch (Exception e) {
log.error("分析任务性能失败", e);
return new TaskPerformance();
}
}

/**
* 优化线程池策略
*/
private ThreadPoolOptimizationResult optimizeThreadPoolStrategy(CompletableFutureOptimizationRequest request,
CompletableFuturePatternAnalysis analysis) {
try {
ThreadPoolOptimizationResult result = new ThreadPoolOptimizationResult();
result.setRequestId(request.getRequestId());

// 根据任务特征优化线程池策略
if (analysis.getTaskExecutionFrequency().getDailyCount() > 10000) {
result.setRecommendedCorePoolSize(20);
result.setRecommendedMaxPoolSize(50);
result.setRecommendedQueueCapacity(1000);
result.setRecommendedKeepAliveTime(60);
} else if (analysis.getTaskExecutionFrequency().getDailyCount() > 1000) {
result.setRecommendedCorePoolSize(10);
result.setRecommendedMaxPoolSize(20);
result.setRecommendedQueueCapacity(500);
result.setRecommendedKeepAliveTime(120);
} else {
result.setRecommendedCorePoolSize(5);
result.setRecommendedMaxPoolSize(10);
result.setRecommendedQueueCapacity(100);
result.setRecommendedKeepAliveTime(300);
}

return result;

} catch (Exception e) {
log.error("优化线程池策略失败", e);
throw new CompletableFutureException("优化线程池策略失败", e);
}
}

/**
* 优化CompletableFuture缓存策略
*/
private CompletableFutureCacheOptimizationResult optimizeCompletableFutureCacheStrategy(CompletableFutureOptimizationRequest request,
CompletableFuturePatternAnalysis analysis) {
try {
CompletableFutureCacheOptimizationResult result = new CompletableFutureCacheOptimizationResult();
result.setRequestId(request.getRequestId());

// 根据执行频率优化缓存策略
if (analysis.getTaskExecutionFrequency().getDailyCount() > 10000) {
result.setRecommendedCacheExpire(3600); // 1小时
result.setRecommendedCacheSize(2000);
result.setRecommendedCacheStrategy("LRU");
} else if (analysis.getTaskExecutionFrequency().getDailyCount() > 1000) {
result.setRecommendedCacheExpire(1800); // 30分钟
result.setRecommendedCacheSize(1000);
result.setRecommendedCacheStrategy("LFU");
} else {
result.setRecommendedCacheExpire(600); // 10分钟
result.setRecommendedCacheSize(500);
result.setRecommendedCacheStrategy("FIFO");
}

return result;

} catch (Exception e) {
log.error("优化CompletableFuture缓存策略失败", e);
throw new CompletableFutureException("优化CompletableFuture缓存策略失败", e);
}
}
}

4.2 监控告警

4.2.1 CompletableFuture监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* CompletableFuture监控指标
*/
@Component
public class CompletableFutureMetrics {

private final MeterRegistry meterRegistry;

public CompletableFutureMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

/**
* 记录CompletableFuture执行次数
*/
public void recordCompletableFutureExecutionCount(String taskType, String status) {
Counter.builder("completable_future.execution.count")
.description("CompletableFuture执行次数")
.tag("task_type", taskType)
.tag("status", status)
.register(meterRegistry)
.increment();
}

/**
* 记录CompletableFuture执行时间
*/
public void recordCompletableFutureExecutionTime(String taskType, String status, long duration) {
Timer.builder("completable_future.execution.time")
.description("CompletableFuture执行时间")
.tag("task_type", taskType)
.tag("status", status)
.register(meterRegistry)
.record(duration, TimeUnit.MILLISECONDS);
}

/**
* 记录CompletableFuture成功率
*/
public void recordCompletableFutureSuccessRate(String taskType, double successRate) {
Gauge.builder("completable_future.success.rate")
.description("CompletableFuture成功率")
.tag("task_type", taskType)
.register(meterRegistry, successRate);
}

/**
* 记录CompletableFuture失败率
*/
public void recordCompletableFutureFailureRate(String taskType, double failureRate) {
Gauge.builder("completable_future.failure.rate")
.description("CompletableFuture失败率")
.tag("task_type", taskType)
.register(meterRegistry, failureRate);
}

/**
* 记录CompletableFuture吞吐量
*/
public void recordCompletableFutureThroughput(String taskType, double throughput) {
Gauge.builder("completable_future.throughput")
.description("CompletableFuture吞吐量")
.tag("task_type", taskType)
.register(meterRegistry, throughput);
}

/**
* 记录CompletableFuture异常次数
*/
public void recordCompletableFutureExceptionCount(String taskType, String exceptionType) {
Counter.builder("completable_future.exception.count")
.description("CompletableFuture异常次数")
.tag("task_type", taskType)
.tag("exception_type", exceptionType)
.register(meterRegistry)
.increment();
}
}

4.2.2 CompletableFuture告警规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# prometheus-rules.yml
groups:
- name: completable_future_alerts
rules:
- alert: HighCompletableFutureExecutionTime
expr: completable_future_execution_time{quantile="0.95"} > 10000
for: 2m
labels:
severity: warning
annotations:
summary: "CompletableFuture执行时间过长"
description: "CompletableFuture执行时间P95超过10秒,当前值: {{ $value }}ms"

- alert: HighCompletableFutureFailureRate
expr: completable_future_failure_rate > 0.05
for: 2m
labels:
severity: warning
annotations:
summary: "CompletableFuture失败率过高"
description: "CompletableFuture失败率超过5%,当前值: {{ $value }}"

- alert: LowCompletableFutureThroughput
expr: completable_future_throughput < 100
for: 5m
labels:
severity: warning
annotations:
summary: "CompletableFuture吞吐量过低"
description: "CompletableFuture吞吐量低于100次/秒,当前值: {{ $value }}"

- alert: HighCompletableFutureExceptionCount
expr: rate(completable_future_exception_count[5m]) > 10
for: 2m
labels:
severity: critical
annotations:
summary: "CompletableFuture异常次数过多"
description: "CompletableFuture异常频率超过10次/分钟,当前值: {{ $value }}"

- alert: CompletableFutureServiceDown
expr: up{job="completable-future-service"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "CompletableFuture服务宕机"
description: "CompletableFuture服务已宕机超过1分钟"

五、总结

CompletableFuture作为Java 8引入的异步编程核心工具,通过合理的异步任务编排策略和并发处理,能够构建一个高效、稳定、可扩展的异步任务编排系统。本文从异步任务编排策略到并发处理,从基础实现到企业级应用,系统梳理了CompletableFuture异步任务编排的完整解决方案。

5.1 关键要点

  1. 任务编排:通过多种编排策略实现不同场景下的异步任务处理需求
  2. 并发处理:通过CompletableFuture实现高效的并发处理
  3. 性能优化:通过线程池优化、缓存优化等手段优化处理性能
  4. 监控告警:建立完善的监控体系,及时发现和处理问题
  5. 企业级方案:提供完整的企业级部署和监控方案

5.2 最佳实践

  1. 策略选择:根据任务特征、执行频率、系统负载选择合适的编排策略
  2. 线程池优化:合理配置线程池参数,提高并发处理能力
  3. 异常处理:实现完善的异常处理机制,确保系统稳定性
  4. 监控告警:建立完善的监控体系,确保CompletableFuture服务稳定运行
  5. 性能调优:通过缓存、批量处理等手段优化系统性能

通过以上措施,可以构建一个高效、稳定、可扩展的CompletableFuture异步任务编排系统,为企业的各种业务场景提供异步任务编排支持。