1. CompletableFuture概述

CompletableFuture是Java 8引入的异步编程工具,它提供了强大的异步任务处理能力。相比传统的Future,CompletableFuture支持链式调用、异常处理和任务组合,是现代Java异步编程的核心组件。

1.1 核心特性

  1. 异步执行: 支持异步任务执行,不阻塞主线程
  2. 链式调用: 支持任务链式组合,提高代码可读性
  3. 异常处理: 完善的异常处理机制
  4. 任务组合: 支持多个异步任务的组合和协调
  5. 回调机制: 支持完成后的回调处理

1.2 主要方法分类

  • 创建方法: runAsync(), supplyAsync(), completedFuture()
  • 转换方法: thenApply(), thenCompose(), handle()
  • 消费方法: thenAccept(), thenRun(), whenComplete()
  • 组合方法: thenCombine(), allOf(), anyOf()
  • 异常处理: exceptionally(), handle()

2. 基础用法详解

2.1 runAsync() - 无返回值异步任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
@Service
public class AsyncTaskService {

private final ExecutorService executor = Executors.newFixedThreadPool(10);

/**
* runAsync() 基础用法 - 无返回值
* 适用于执行不需要返回结果的任务
*/
public void basicRunAsync() {
// 1. 使用默认线程池执行
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("任务1开始执行,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(2000); // 模拟耗时操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务1执行完成");
});

// 2. 使用自定义线程池执行
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("任务2开始执行,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1500); // 模拟耗时操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务2执行完成");
}, executor);

// 3. 等待任务完成
try {
future1.get(); // 阻塞等待任务1完成
future2.get(); // 阻塞等待任务2完成
System.out.println("所有任务执行完成");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

/**
* runAsync() 链式调用
* 任务完成后执行后续操作
*/
public void chainedRunAsync() {
CompletableFuture<Void> future = CompletableFuture
.runAsync(() -> {
System.out.println("第一步:数据预处理");
// 模拟数据预处理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})
.thenRun(() -> {
System.out.println("第二步:数据验证");
// 模拟数据验证
try {
Thread.sleep(800);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})
.thenRun(() -> {
System.out.println("第三步:数据保存");
// 模拟数据保存
try {
Thread.sleep(600);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

try {
future.get(); // 等待整个链式任务完成
System.out.println("链式任务执行完成");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

/**
* runAsync() 异常处理
*/
public void runAsyncWithExceptionHandling() {
CompletableFuture<Void> future = CompletableFuture
.runAsync(() -> {
System.out.println("开始执行可能出错的任务");
// 模拟可能出现的异常
if (Math.random() > 0.5) {
throw new RuntimeException("任务执行失败");
}
System.out.println("任务执行成功");
})
.exceptionally(throwable -> {
System.err.println("任务执行异常: " + throwable.getMessage());
return null; // runAsync返回Void,异常处理返回null
})
.thenRun(() -> {
System.out.println("异常处理后的清理工作");
});

try {
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}

2.2 supplyAsync() - 有返回值异步任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
@Service
public class AsyncDataService {

private final ExecutorService executor = Executors.newFixedThreadPool(10);

/**
* supplyAsync() 基础用法 - 有返回值
* 适用于需要返回结果的任务
*/
public void basicSupplyAsync() {
// 1. 使用默认线程池执行
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("数据查询任务开始,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(2000); // 模拟数据库查询
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "查询结果1";
});

// 2. 使用自定义线程池执行
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("数据计算任务开始,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1500); // 模拟复杂计算
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return 42; // 返回计算结果
}, executor);

// 3. 获取结果
try {
String result1 = future1.get(); // 阻塞等待结果
Integer result2 = future2.get(); // 阻塞等待结果

System.out.println("任务1结果: " + result1);
System.out.println("任务2结果: " + result2);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

/**
* supplyAsync() 链式转换
* 对异步任务的结果进行转换
*/
public void chainedSupplyAsync() {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("第一步:获取原始数据");
return "原始数据";
})
.thenApply(data -> {
System.out.println("第二步:数据转换");
return data.toUpperCase(); // 转换为大写
})
.thenApply(data -> {
System.out.println("第三步:数据格式化");
return "格式化后的数据: " + data;
})
.thenApply(data -> {
System.out.println("第四步:数据验证");
if (data.length() > 10) {
return data + " [已验证]";
}
return data;
});

try {
String result = future.get();
System.out.println("最终结果: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

/**
* supplyAsync() 异常处理
*/
public void supplyAsyncWithExceptionHandling() {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("开始执行可能出错的任务");
// 模拟可能出现的异常
if (Math.random() > 0.5) {
throw new RuntimeException("数据获取失败");
}
return "成功获取数据";
})
.exceptionally(throwable -> {
System.err.println("任务执行异常: " + throwable.getMessage());
return "默认数据"; // 异常时返回默认值
})
.thenApply(data -> {
System.out.println("处理数据: " + data);
return "处理后的数据: " + data;
});

try {
String result = future.get();
System.out.println("最终结果: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}

3. 实际应用场景

3.1 用户服务异步处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@Service
public class UserService {

@Autowired
private UserRepository userRepository;

@Autowired
private EmailService emailService;

@Autowired
private SmsService smsService;

@Autowired
private AuditService auditService;

private final ExecutorService executor = Executors.newFixedThreadPool(20);

/**
* 用户注册异步处理
* 使用runAsync处理不需要返回值的任务
*/
public CompletableFuture<User> registerUser(UserRegistrationRequest request) {
// 1. 创建用户(同步操作)
User user = createUser(request);

// 2. 异步发送欢迎邮件
CompletableFuture<Void> emailTask = CompletableFuture.runAsync(() -> {
try {
emailService.sendWelcomeEmail(user.getEmail(), user.getName());
System.out.println("欢迎邮件发送成功: " + user.getEmail());
} catch (Exception e) {
System.err.println("欢迎邮件发送失败: " + e.getMessage());
}
}, executor);

// 3. 异步发送短信通知
CompletableFuture<Void> smsTask = CompletableFuture.runAsync(() -> {
try {
smsService.sendRegistrationSms(user.getPhone());
System.out.println("注册短信发送成功: " + user.getPhone());
} catch (Exception e) {
System.err.println("注册短信发送失败: " + e.getMessage());
}
}, executor);

// 4. 异步记录审计日志
CompletableFuture<Void> auditTask = CompletableFuture.runAsync(() -> {
try {
auditService.logUserRegistration(user.getId(), request.getIpAddress());
System.out.println("审计日志记录成功: " + user.getId());
} catch (Exception e) {
System.err.println("审计日志记录失败: " + e.getMessage());
}
}, executor);

// 5. 等待所有异步任务完成
CompletableFuture<Void> allTasks = CompletableFuture.allOf(emailTask, smsTask, auditTask);

// 6. 返回用户信息(不等待异步任务完成)
return CompletableFuture.completedFuture(user);
}

/**
* 用户信息查询异步处理
* 使用supplyAsync处理需要返回值的任务
*/
public CompletableFuture<UserProfile> getUserProfile(Long userId) {
return CompletableFuture
.supplyAsync(() -> {
// 1. 查询基础用户信息
User user = userRepository.findById(userId)
.orElseThrow(() -> new UserNotFoundException("用户不存在"));
return user;
}, executor)
.thenApply(user -> {
// 2. 查询用户统计信息
UserStatistics stats = getUserStatistics(userId);
return new UserProfile(user, stats);
})
.thenApply(profile -> {
// 3. 查询用户权限信息
List<Permission> permissions = getUserPermissions(userId);
profile.setPermissions(permissions);
return profile;
})
.thenApply(profile -> {
// 4. 查询用户最近活动
List<UserActivity> activities = getUserRecentActivities(userId);
profile.setRecentActivities(activities);
return profile;
})
.exceptionally(throwable -> {
System.err.println("获取用户信息失败: " + throwable.getMessage());
return new UserProfile(); // 返回空的用户档案
});
}

/**
* 并行处理多个用户查询
*/
public CompletableFuture<List<UserProfile>> getMultipleUserProfiles(List<Long> userIds) {
// 1. 为每个用户创建异步查询任务
List<CompletableFuture<UserProfile>> futures = userIds.stream()
.map(this::getUserProfile)
.collect(Collectors.toList());

// 2. 等待所有任务完成
CompletableFuture<Void> allTasks = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);

// 3. 收集所有结果
return allTasks.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}

private User createUser(UserRegistrationRequest request) {
// 创建用户的业务逻辑
User user = new User();
user.setName(request.getName());
user.setEmail(request.getEmail());
user.setPhone(request.getPhone());
user.setCreatedTime(LocalDateTime.now());
return userRepository.save(user);
}

private UserStatistics getUserStatistics(Long userId) {
// 查询用户统计信息的业务逻辑
return new UserStatistics();
}

private List<Permission> getUserPermissions(Long userId) {
// 查询用户权限的业务逻辑
return new ArrayList<>();
}

private List<UserActivity> getUserRecentActivities(Long userId) {
// 查询用户最近活动的业务逻辑
return new ArrayList<>();
}
}

3.2 订单处理异步流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@Service
public class OrderService {

@Autowired
private OrderRepository orderRepository;

@Autowired
private InventoryService inventoryService;

@Autowired
private PaymentService paymentService;

@Autowired
private NotificationService notificationService;

private final ExecutorService executor = Executors.newFixedThreadPool(15);

/**
* 订单创建异步处理
* 使用supplyAsync处理订单创建流程
*/
public CompletableFuture<Order> createOrder(OrderRequest request) {
return CompletableFuture
.supplyAsync(() -> {
// 1. 验证订单数据
validateOrderRequest(request);
return request;
}, executor)
.thenApply(validatedRequest -> {
// 2. 检查库存
boolean hasStock = inventoryService.checkStock(
validatedRequest.getProductId(),
validatedRequest.getQuantity()
);
if (!hasStock) {
throw new InsufficientStockException("库存不足");
}
return validatedRequest;
})
.thenApply(validatedRequest -> {
// 3. 创建订单
Order order = new Order();
order.setUserId(validatedRequest.getUserId());
order.setProductId(validatedRequest.getProductId());
order.setQuantity(validatedRequest.getQuantity());
order.setAmount(validatedRequest.getAmount());
order.setStatus(OrderStatus.PENDING);
order.setCreatedTime(LocalDateTime.now());

return orderRepository.save(order);
}, executor)
.thenApply(order -> {
// 4. 异步扣减库存
CompletableFuture.runAsync(() -> {
try {
inventoryService.deductStock(order.getProductId(), order.getQuantity());
System.out.println("库存扣减成功: " + order.getId());
} catch (Exception e) {
System.err.println("库存扣减失败: " + e.getMessage());
// 更新订单状态为失败
order.setStatus(OrderStatus.FAILED);
orderRepository.save(order);
}
}, executor);

return order;
})
.thenApply(order -> {
// 5. 异步发送订单确认通知
CompletableFuture.runAsync(() -> {
try {
notificationService.sendOrderConfirmation(order);
System.out.println("订单确认通知发送成功: " + order.getId());
} catch (Exception e) {
System.err.println("订单确认通知发送失败: " + e.getMessage());
}
}, executor);

return order;
})
.exceptionally(throwable -> {
System.err.println("订单创建失败: " + throwable.getMessage());
// 返回失败的订单或null
return null;
});
}

/**
* 订单支付异步处理
*/
public CompletableFuture<PaymentResult> processPayment(Long orderId, PaymentRequest request) {
return CompletableFuture
.supplyAsync(() -> {
// 1. 获取订单信息
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException("订单不存在"));
return order;
}, executor)
.thenApply(order -> {
// 2. 验证订单状态
if (order.getStatus() != OrderStatus.PENDING) {
throw new InvalidOrderStatusException("订单状态不正确");
}
return order;
})
.thenApply(order -> {
// 3. 处理支付
PaymentResult result = paymentService.processPayment(order, request);
return result;
}, executor)
.thenApply(paymentResult -> {
// 4. 更新订单状态
if (paymentResult.isSuccess()) {
Order order = orderRepository.findById(orderId).orElse(null);
if (order != null) {
order.setStatus(OrderStatus.PAID);
order.setPaidTime(LocalDateTime.now());
orderRepository.save(order);

// 异步发送支付成功通知
CompletableFuture.runAsync(() -> {
try {
notificationService.sendPaymentSuccessNotification(order);
System.out.println("支付成功通知发送完成: " + orderId);
} catch (Exception e) {
System.err.println("支付成功通知发送失败: " + e.getMessage());
}
}, executor);
}
}
return paymentResult;
})
.exceptionally(throwable -> {
System.err.println("支付处理失败: " + throwable.getMessage());
return new PaymentResult(false, "支付处理失败: " + throwable.getMessage());
});
}

/**
* 批量订单处理
*/
public CompletableFuture<List<Order>> processBatchOrders(List<OrderRequest> requests) {
// 1. 为每个订单创建异步处理任务
List<CompletableFuture<Order>> futures = requests.stream()
.map(this::createOrder)
.collect(Collectors.toList());

// 2. 等待所有任务完成
CompletableFuture<Void> allTasks = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);

// 3. 收集所有结果
return allTasks.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull) // 过滤掉失败的订单
.collect(Collectors.toList())
);
}

private void validateOrderRequest(OrderRequest request) {
if (request.getUserId() == null) {
throw new IllegalArgumentException("用户ID不能为空");
}
if (request.getProductId() == null) {
throw new IllegalArgumentException("商品ID不能为空");
}
if (request.getQuantity() <= 0) {
throw new IllegalArgumentException("数量必须大于0");
}
if (request.getAmount() == null || request.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
throw new IllegalArgumentException("金额必须大于0");
}
}
}

4. 高级用法和最佳实践

4.1 任务组合和协调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@Service
public class TaskCoordinationService {

private final ExecutorService executor = Executors.newFixedThreadPool(20);

/**
* thenCombine() - 组合两个异步任务的结果
*/
public CompletableFuture<String> combineTasks() {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务1结果";
}, executor);

CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2开始执行");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务2结果";
}, executor);

// 组合两个任务的结果
return task1.thenCombine(task2, (result1, result2) -> {
System.out.println("组合结果: " + result1 + " + " + result2);
return result1 + " + " + result2;
});
}

/**
* thenCompose() - 链式组合异步任务
*/
public CompletableFuture<String> composeTasks() {
return CompletableFuture
.supplyAsync(() -> {
System.out.println("第一步:获取用户ID");
return 123L;
}, executor)
.thenCompose(userId -> {
// 基于用户ID获取用户信息
return CompletableFuture.supplyAsync(() -> {
System.out.println("第二步:获取用户信息,用户ID: " + userId);
return "用户信息_" + userId;
}, executor);
})
.thenCompose(userInfo -> {
// 基于用户信息获取用户权限
return CompletableFuture.supplyAsync(() -> {
System.out.println("第三步:获取用户权限,用户信息: " + userInfo);
return userInfo + "_权限信息";
}, executor);
});
}

/**
* allOf() - 等待所有任务完成
*/
public CompletableFuture<List<String>> waitForAllTasks() {
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(() -> {
System.out.println("任务A开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务A完成";
}, executor),

CompletableFuture.supplyAsync(() -> {
System.out.println("任务B开始执行");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务B完成";
}, executor),

CompletableFuture.supplyAsync(() -> {
System.out.println("任务C开始执行");
try {
Thread.sleep(800);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务C完成";
}, executor)
);

// 等待所有任务完成
CompletableFuture<Void> allTasks = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);

// 收集所有结果
return allTasks.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}

/**
* anyOf() - 等待任意一个任务完成
*/
public CompletableFuture<String> waitForAnyTask() {
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(() -> {
System.out.println("快速任务开始执行");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "快速任务完成";
}, executor),

CompletableFuture.supplyAsync(() -> {
System.out.println("慢速任务开始执行");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "慢速任务完成";
}, executor)
);

// 等待任意一个任务完成
CompletableFuture<Object> anyTask = CompletableFuture.anyOf(
futures.toArray(new CompletableFuture[0])
);

return anyTask.thenApply(result -> (String) result);
}
}

4.2 异常处理和恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
@Service
public class ExceptionHandlingService {

private final ExecutorService executor = Executors.newFixedThreadPool(10);

/**
* handle() - 统一异常处理
*/
public CompletableFuture<String> handleExceptions() {
return CompletableFuture
.supplyAsync(() -> {
System.out.println("开始执行可能出错的任务");
if (Math.random() > 0.5) {
throw new RuntimeException("任务执行失败");
}
return "任务执行成功";
}, executor)
.handle((result, throwable) -> {
if (throwable != null) {
System.err.println("任务执行异常: " + throwable.getMessage());
return "任务执行失败,使用默认结果";
} else {
System.out.println("任务执行成功: " + result);
return result;
}
});
}

/**
* exceptionally() - 异常恢复
*/
public CompletableFuture<String> recoverFromException() {
return CompletableFuture
.supplyAsync(() -> {
System.out.println("开始执行可能出错的任务");
if (Math.random() > 0.5) {
throw new RuntimeException("网络连接失败");
}
return "数据获取成功";
}, executor)
.exceptionally(throwable -> {
System.err.println("任务执行异常: " + throwable.getMessage());
// 返回备用数据
return "使用缓存数据";
})
.thenApply(result -> {
System.out.println("处理结果: " + result);
return "处理后的结果: " + result;
});
}

/**
* whenComplete() - 完成时处理
*/
public CompletableFuture<String> whenComplete() {
return CompletableFuture
.supplyAsync(() -> {
System.out.println("开始执行任务");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务执行完成";
}, executor)
.whenComplete((result, throwable) -> {
if (throwable != null) {
System.err.println("任务执行异常: " + throwable.getMessage());
} else {
System.out.println("任务执行成功: " + result);
}
// 无论成功还是失败,都执行清理工作
System.out.println("执行清理工作");
});
}

/**
* 重试机制
*/
public CompletableFuture<String> retryMechanism() {
return retryWithBackoff(() -> {
System.out.println("尝试执行任务");
if (Math.random() > 0.7) {
throw new RuntimeException("任务执行失败");
}
return "任务执行成功";
}, 3, 1000);
}

private CompletableFuture<String> retryWithBackoff(
Supplier<String> task,
int maxRetries,
long baseDelayMs) {

return CompletableFuture
.supplyAsync(task, executor)
.exceptionally(throwable -> {
if (maxRetries > 0) {
System.out.println("任务失败,准备重试,剩余重试次数: " + (maxRetries - 1));
try {
Thread.sleep(baseDelayMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return retryWithBackoff(task, maxRetries - 1, baseDelayMs * 2).join();
} else {
System.err.println("重试次数用完,任务最终失败");
return "任务最终失败";
}
});
}
}

4.3 性能优化和监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@Service
public class PerformanceOptimizedService {

private final ExecutorService executor = Executors.newFixedThreadPool(20);
private final MeterRegistry meterRegistry;

public PerformanceOptimizedService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

/**
* 性能监控的异步任务
*/
public CompletableFuture<String> monitoredAsyncTask() {
Timer.Sample sample = Timer.start(meterRegistry);

return CompletableFuture
.supplyAsync(() -> {
System.out.println("开始执行监控任务");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务执行完成";
}, executor)
.whenComplete((result, throwable) -> {
// 记录执行时间
sample.stop(Timer.builder("async.task.duration")
.description("异步任务执行时间")
.register(meterRegistry));

// 记录成功/失败次数
if (throwable != null) {
Counter.builder("async.task.failures")
.description("异步任务失败次数")
.register(meterRegistry)
.increment();
} else {
Counter.builder("async.task.successes")
.description("异步任务成功次数")
.register(meterRegistry)
.increment();
}
});
}

/**
* 超时控制
*/
public CompletableFuture<String> timeoutControl() {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("开始执行可能超时的任务");
try {
Thread.sleep(5000); // 模拟长时间任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务执行完成";
}, executor);

// 设置超时时间
CompletableFuture<String> timeoutFuture = future
.orTimeout(3, TimeUnit.SECONDS)
.exceptionally(throwable -> {
if (throwable instanceof TimeoutException) {
System.err.println("任务执行超时");
return "任务执行超时";
} else {
System.err.println("任务执行异常: " + throwable.getMessage());
return "任务执行异常";
}
});

return timeoutFuture;
}

/**
* 资源清理
*/
public CompletableFuture<String> resourceCleanup() {
return CompletableFuture
.supplyAsync(() -> {
System.out.println("开始执行任务");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务执行完成";
}, executor)
.whenComplete((result, throwable) -> {
// 执行资源清理
System.out.println("执行资源清理");
// 关闭连接、释放资源等
});
}
}

5. 配置和最佳实践

5.1 线程池配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Configuration
@EnableAsync
public class AsyncConfig {

/**
* 自定义线程池配置
*/
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

// 核心线程数
executor.setCorePoolSize(10);

// 最大线程数
executor.setMaxPoolSize(50);

// 队列容量
executor.setQueueCapacity(200);

// 线程空闲时间
executor.setKeepAliveSeconds(60);

// 线程名前缀
executor.setThreadNamePrefix("async-task-");

// 拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

// 等待任务完成后关闭
executor.setWaitForTasksToCompleteOnShutdown(true);

// 等待时间
executor.setAwaitTerminationSeconds(60);

executor.initialize();
return executor;
}

/**
* 异步方法配置
*/
@Bean
public AsyncConfigurer asyncConfigurer() {
return new AsyncConfigurer() {
@Override
public Executor getAsyncExecutor() {
return taskExecutor();
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
System.err.println("异步方法执行异常: " + method.getName());
System.err.println("异常信息: " + ex.getMessage());
ex.printStackTrace();
}
};
}
};
}
}

5.2 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@RestController
@RequestMapping("/api/async")
public class AsyncController {

@Autowired
private AsyncTaskService asyncTaskService;

@Autowired
private AsyncDataService asyncDataService;

@Autowired
private UserService userService;

/**
* 测试runAsync
*/
@PostMapping("/run-async")
public ResponseEntity<String> testRunAsync() {
try {
asyncTaskService.basicRunAsync();
return ResponseEntity.ok("runAsync测试完成");
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("runAsync测试失败: " + e.getMessage());
}
}

/**
* 测试supplyAsync
*/
@PostMapping("/supply-async")
public ResponseEntity<String> testSupplyAsync() {
try {
asyncDataService.basicSupplyAsync();
return ResponseEntity.ok("supplyAsync测试完成");
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("supplyAsync测试失败: " + e.getMessage());
}
}

/**
* 测试用户注册异步处理
*/
@PostMapping("/user-registration")
public ResponseEntity<User> testUserRegistration(@RequestBody UserRegistrationRequest request) {
try {
CompletableFuture<User> future = userService.registerUser(request);
User user = future.get(10, TimeUnit.SECONDS); // 设置超时时间
return ResponseEntity.ok(user);
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(null);
}
}

/**
* 测试订单创建异步处理
*/
@PostMapping("/order-creation")
public ResponseEntity<Order> testOrderCreation(@RequestBody OrderRequest request) {
try {
CompletableFuture<Order> future = userService.createOrder(request);
Order order = future.get(15, TimeUnit.SECONDS); // 设置超时时间
return ResponseEntity.ok(order);
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(null);
}
}
}

6. 总结

CompletableFuture是Java异步编程的强大工具,通过runAsync和supplyAsync可以轻松实现异步任务处理。关键要点包括:

6.1 核心优势

  1. 非阻塞: 异步执行不阻塞主线程
  2. 链式调用: 支持任务链式组合
  3. 异常处理: 完善的异常处理机制
  4. 任务组合: 支持多个异步任务的协调
  5. 性能提升: 提高系统并发处理能力

6.2 最佳实践

  1. 合理使用线程池: 避免创建过多线程
  2. 异常处理: 完善的异常处理和恢复机制
  3. 超时控制: 设置合理的超时时间
  4. 资源管理: 及时清理资源,避免内存泄漏
  5. 监控告警: 完善的监控和告警机制

通过合理使用CompletableFuture,可以显著提升系统的并发处理能力和用户体验。