第39集CompletableFuture异步编程runAsync与supplyAsync实战
|字数总计:5.1k|阅读时长:24分钟|阅读量:
1. CompletableFuture概述
CompletableFuture是Java 8引入的异步编程工具,它提供了强大的异步任务处理能力。相比传统的Future,CompletableFuture支持链式调用、异常处理和任务组合,是现代Java异步编程的核心组件。
1.1 核心特性
- 异步执行: 支持异步任务执行,不阻塞主线程
- 链式调用: 支持任务链式组合,提高代码可读性
- 异常处理: 完善的异常处理机制
- 任务组合: 支持多个异步任务的组合和协调
- 回调机制: 支持完成后的回调处理
1.2 主要方法分类
- 创建方法:
runAsync()
, supplyAsync()
, completedFuture()
- 转换方法:
thenApply()
, thenCompose()
, handle()
- 消费方法:
thenAccept()
, thenRun()
, whenComplete()
- 组合方法:
thenCombine()
, allOf()
, anyOf()
- 异常处理:
exceptionally()
, handle()
2. 基础用法详解
2.1 runAsync() - 无返回值异步任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| @Service public class AsyncTaskService { private final ExecutorService executor = Executors.newFixedThreadPool(10);
public void basicRunAsync() { CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { System.out.println("任务1开始执行,线程: " + Thread.currentThread().getName()); try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("任务1执行完成"); }); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { System.out.println("任务2开始执行,线程: " + Thread.currentThread().getName()); try { Thread.sleep(1500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("任务2执行完成"); }, executor); try { future1.get(); future2.get(); System.out.println("所有任务执行完成"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
public void chainedRunAsync() { CompletableFuture<Void> future = CompletableFuture .runAsync(() -> { System.out.println("第一步:数据预处理"); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }) .thenRun(() -> { System.out.println("第二步:数据验证"); try { Thread.sleep(800); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }) .thenRun(() -> { System.out.println("第三步:数据保存"); try { Thread.sleep(600); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); try { future.get(); System.out.println("链式任务执行完成"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
public void runAsyncWithExceptionHandling() { CompletableFuture<Void> future = CompletableFuture .runAsync(() -> { System.out.println("开始执行可能出错的任务"); if (Math.random() > 0.5) { throw new RuntimeException("任务执行失败"); } System.out.println("任务执行成功"); }) .exceptionally(throwable -> { System.err.println("任务执行异常: " + throwable.getMessage()); return null; }) .thenRun(() -> { System.out.println("异常处理后的清理工作"); }); try { future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
|
2.2 supplyAsync() - 有返回值异步任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
| @Service public class AsyncDataService { private final ExecutorService executor = Executors.newFixedThreadPool(10);
public void basicSupplyAsync() { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("数据查询任务开始,线程: " + Thread.currentThread().getName()); try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "查询结果1"; }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("数据计算任务开始,线程: " + Thread.currentThread().getName()); try { Thread.sleep(1500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return 42; }, executor); try { String result1 = future1.get(); Integer result2 = future2.get(); System.out.println("任务1结果: " + result1); System.out.println("任务2结果: " + result2); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
public void chainedSupplyAsync() { CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { System.out.println("第一步:获取原始数据"); return "原始数据"; }) .thenApply(data -> { System.out.println("第二步:数据转换"); return data.toUpperCase(); }) .thenApply(data -> { System.out.println("第三步:数据格式化"); return "格式化后的数据: " + data; }) .thenApply(data -> { System.out.println("第四步:数据验证"); if (data.length() > 10) { return data + " [已验证]"; } return data; }); try { String result = future.get(); System.out.println("最终结果: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
public void supplyAsyncWithExceptionHandling() { CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { System.out.println("开始执行可能出错的任务"); if (Math.random() > 0.5) { throw new RuntimeException("数据获取失败"); } return "成功获取数据"; }) .exceptionally(throwable -> { System.err.println("任务执行异常: " + throwable.getMessage()); return "默认数据"; }) .thenApply(data -> { System.out.println("处理数据: " + data); return "处理后的数据: " + data; }); try { String result = future.get(); System.out.println("最终结果: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
|
3. 实际应用场景
3.1 用户服务异步处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
| @Service public class UserService { @Autowired private UserRepository userRepository; @Autowired private EmailService emailService; @Autowired private SmsService smsService; @Autowired private AuditService auditService; private final ExecutorService executor = Executors.newFixedThreadPool(20);
public CompletableFuture<User> registerUser(UserRegistrationRequest request) { User user = createUser(request); CompletableFuture<Void> emailTask = CompletableFuture.runAsync(() -> { try { emailService.sendWelcomeEmail(user.getEmail(), user.getName()); System.out.println("欢迎邮件发送成功: " + user.getEmail()); } catch (Exception e) { System.err.println("欢迎邮件发送失败: " + e.getMessage()); } }, executor); CompletableFuture<Void> smsTask = CompletableFuture.runAsync(() -> { try { smsService.sendRegistrationSms(user.getPhone()); System.out.println("注册短信发送成功: " + user.getPhone()); } catch (Exception e) { System.err.println("注册短信发送失败: " + e.getMessage()); } }, executor); CompletableFuture<Void> auditTask = CompletableFuture.runAsync(() -> { try { auditService.logUserRegistration(user.getId(), request.getIpAddress()); System.out.println("审计日志记录成功: " + user.getId()); } catch (Exception e) { System.err.println("审计日志记录失败: " + e.getMessage()); } }, executor); CompletableFuture<Void> allTasks = CompletableFuture.allOf(emailTask, smsTask, auditTask); return CompletableFuture.completedFuture(user); }
public CompletableFuture<UserProfile> getUserProfile(Long userId) { return CompletableFuture .supplyAsync(() -> { User user = userRepository.findById(userId) .orElseThrow(() -> new UserNotFoundException("用户不存在")); return user; }, executor) .thenApply(user -> { UserStatistics stats = getUserStatistics(userId); return new UserProfile(user, stats); }) .thenApply(profile -> { List<Permission> permissions = getUserPermissions(userId); profile.setPermissions(permissions); return profile; }) .thenApply(profile -> { List<UserActivity> activities = getUserRecentActivities(userId); profile.setRecentActivities(activities); return profile; }) .exceptionally(throwable -> { System.err.println("获取用户信息失败: " + throwable.getMessage()); return new UserProfile(); }); }
public CompletableFuture<List<UserProfile>> getMultipleUserProfiles(List<Long> userIds) { List<CompletableFuture<UserProfile>> futures = userIds.stream() .map(this::getUserProfile) .collect(Collectors.toList()); CompletableFuture<Void> allTasks = CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]) ); return allTasks.thenApply(v -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ); } private User createUser(UserRegistrationRequest request) { User user = new User(); user.setName(request.getName()); user.setEmail(request.getEmail()); user.setPhone(request.getPhone()); user.setCreatedTime(LocalDateTime.now()); return userRepository.save(user); } private UserStatistics getUserStatistics(Long userId) { return new UserStatistics(); } private List<Permission> getUserPermissions(Long userId) { return new ArrayList<>(); } private List<UserActivity> getUserRecentActivities(Long userId) { return new ArrayList<>(); } }
|
3.2 订单处理异步流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
| @Service public class OrderService { @Autowired private OrderRepository orderRepository; @Autowired private InventoryService inventoryService; @Autowired private PaymentService paymentService; @Autowired private NotificationService notificationService; private final ExecutorService executor = Executors.newFixedThreadPool(15);
public CompletableFuture<Order> createOrder(OrderRequest request) { return CompletableFuture .supplyAsync(() -> { validateOrderRequest(request); return request; }, executor) .thenApply(validatedRequest -> { boolean hasStock = inventoryService.checkStock( validatedRequest.getProductId(), validatedRequest.getQuantity() ); if (!hasStock) { throw new InsufficientStockException("库存不足"); } return validatedRequest; }) .thenApply(validatedRequest -> { Order order = new Order(); order.setUserId(validatedRequest.getUserId()); order.setProductId(validatedRequest.getProductId()); order.setQuantity(validatedRequest.getQuantity()); order.setAmount(validatedRequest.getAmount()); order.setStatus(OrderStatus.PENDING); order.setCreatedTime(LocalDateTime.now()); return orderRepository.save(order); }, executor) .thenApply(order -> { CompletableFuture.runAsync(() -> { try { inventoryService.deductStock(order.getProductId(), order.getQuantity()); System.out.println("库存扣减成功: " + order.getId()); } catch (Exception e) { System.err.println("库存扣减失败: " + e.getMessage()); order.setStatus(OrderStatus.FAILED); orderRepository.save(order); } }, executor); return order; }) .thenApply(order -> { CompletableFuture.runAsync(() -> { try { notificationService.sendOrderConfirmation(order); System.out.println("订单确认通知发送成功: " + order.getId()); } catch (Exception e) { System.err.println("订单确认通知发送失败: " + e.getMessage()); } }, executor); return order; }) .exceptionally(throwable -> { System.err.println("订单创建失败: " + throwable.getMessage()); return null; }); }
public CompletableFuture<PaymentResult> processPayment(Long orderId, PaymentRequest request) { return CompletableFuture .supplyAsync(() -> { Order order = orderRepository.findById(orderId) .orElseThrow(() -> new OrderNotFoundException("订单不存在")); return order; }, executor) .thenApply(order -> { if (order.getStatus() != OrderStatus.PENDING) { throw new InvalidOrderStatusException("订单状态不正确"); } return order; }) .thenApply(order -> { PaymentResult result = paymentService.processPayment(order, request); return result; }, executor) .thenApply(paymentResult -> { if (paymentResult.isSuccess()) { Order order = orderRepository.findById(orderId).orElse(null); if (order != null) { order.setStatus(OrderStatus.PAID); order.setPaidTime(LocalDateTime.now()); orderRepository.save(order); CompletableFuture.runAsync(() -> { try { notificationService.sendPaymentSuccessNotification(order); System.out.println("支付成功通知发送完成: " + orderId); } catch (Exception e) { System.err.println("支付成功通知发送失败: " + e.getMessage()); } }, executor); } } return paymentResult; }) .exceptionally(throwable -> { System.err.println("支付处理失败: " + throwable.getMessage()); return new PaymentResult(false, "支付处理失败: " + throwable.getMessage()); }); }
public CompletableFuture<List<Order>> processBatchOrders(List<OrderRequest> requests) { List<CompletableFuture<Order>> futures = requests.stream() .map(this::createOrder) .collect(Collectors.toList()); CompletableFuture<Void> allTasks = CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]) ); return allTasks.thenApply(v -> futures.stream() .map(CompletableFuture::join) .filter(Objects::nonNull) .collect(Collectors.toList()) ); } private void validateOrderRequest(OrderRequest request) { if (request.getUserId() == null) { throw new IllegalArgumentException("用户ID不能为空"); } if (request.getProductId() == null) { throw new IllegalArgumentException("商品ID不能为空"); } if (request.getQuantity() <= 0) { throw new IllegalArgumentException("数量必须大于0"); } if (request.getAmount() == null || request.getAmount().compareTo(BigDecimal.ZERO) <= 0) { throw new IllegalArgumentException("金额必须大于0"); } } }
|
4. 高级用法和最佳实践
4.1 任务组合和协调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
| @Service public class TaskCoordinationService { private final ExecutorService executor = Executors.newFixedThreadPool(20);
public CompletableFuture<String> combineTasks() { CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1开始执行"); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "任务1结果"; }, executor); CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2开始执行"); try { Thread.sleep(1500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "任务2结果"; }, executor); return task1.thenCombine(task2, (result1, result2) -> { System.out.println("组合结果: " + result1 + " + " + result2); return result1 + " + " + result2; }); }
public CompletableFuture<String> composeTasks() { return CompletableFuture .supplyAsync(() -> { System.out.println("第一步:获取用户ID"); return 123L; }, executor) .thenCompose(userId -> { return CompletableFuture.supplyAsync(() -> { System.out.println("第二步:获取用户信息,用户ID: " + userId); return "用户信息_" + userId; }, executor); }) .thenCompose(userInfo -> { return CompletableFuture.supplyAsync(() -> { System.out.println("第三步:获取用户权限,用户信息: " + userInfo); return userInfo + "_权限信息"; }, executor); }); }
public CompletableFuture<List<String>> waitForAllTasks() { List<CompletableFuture<String>> futures = Arrays.asList( CompletableFuture.supplyAsync(() -> { System.out.println("任务A开始执行"); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "任务A完成"; }, executor), CompletableFuture.supplyAsync(() -> { System.out.println("任务B开始执行"); try { Thread.sleep(1500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "任务B完成"; }, executor), CompletableFuture.supplyAsync(() -> { System.out.println("任务C开始执行"); try { Thread.sleep(800); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "任务C完成"; }, executor) ); CompletableFuture<Void> allTasks = CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]) ); return allTasks.thenApply(v -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ); }
public CompletableFuture<String> waitForAnyTask() { List<CompletableFuture<String>> futures = Arrays.asList( CompletableFuture.supplyAsync(() -> { System.out.println("快速任务开始执行"); try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "快速任务完成"; }, executor), CompletableFuture.supplyAsync(() -> { System.out.println("慢速任务开始执行"); try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "慢速任务完成"; }, executor) ); CompletableFuture<Object> anyTask = CompletableFuture.anyOf( futures.toArray(new CompletableFuture[0]) ); return anyTask.thenApply(result -> (String) result); } }
|
4.2 异常处理和恢复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| @Service public class ExceptionHandlingService { private final ExecutorService executor = Executors.newFixedThreadPool(10);
public CompletableFuture<String> handleExceptions() { return CompletableFuture .supplyAsync(() -> { System.out.println("开始执行可能出错的任务"); if (Math.random() > 0.5) { throw new RuntimeException("任务执行失败"); } return "任务执行成功"; }, executor) .handle((result, throwable) -> { if (throwable != null) { System.err.println("任务执行异常: " + throwable.getMessage()); return "任务执行失败,使用默认结果"; } else { System.out.println("任务执行成功: " + result); return result; } }); }
public CompletableFuture<String> recoverFromException() { return CompletableFuture .supplyAsync(() -> { System.out.println("开始执行可能出错的任务"); if (Math.random() > 0.5) { throw new RuntimeException("网络连接失败"); } return "数据获取成功"; }, executor) .exceptionally(throwable -> { System.err.println("任务执行异常: " + throwable.getMessage()); return "使用缓存数据"; }) .thenApply(result -> { System.out.println("处理结果: " + result); return "处理后的结果: " + result; }); }
public CompletableFuture<String> whenComplete() { return CompletableFuture .supplyAsync(() -> { System.out.println("开始执行任务"); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "任务执行完成"; }, executor) .whenComplete((result, throwable) -> { if (throwable != null) { System.err.println("任务执行异常: " + throwable.getMessage()); } else { System.out.println("任务执行成功: " + result); } System.out.println("执行清理工作"); }); }
public CompletableFuture<String> retryMechanism() { return retryWithBackoff(() -> { System.out.println("尝试执行任务"); if (Math.random() > 0.7) { throw new RuntimeException("任务执行失败"); } return "任务执行成功"; }, 3, 1000); } private CompletableFuture<String> retryWithBackoff( Supplier<String> task, int maxRetries, long baseDelayMs) { return CompletableFuture .supplyAsync(task, executor) .exceptionally(throwable -> { if (maxRetries > 0) { System.out.println("任务失败,准备重试,剩余重试次数: " + (maxRetries - 1)); try { Thread.sleep(baseDelayMs); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return retryWithBackoff(task, maxRetries - 1, baseDelayMs * 2).join(); } else { System.err.println("重试次数用完,任务最终失败"); return "任务最终失败"; } }); } }
|
4.3 性能优化和监控
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| @Service public class PerformanceOptimizedService { private final ExecutorService executor = Executors.newFixedThreadPool(20); private final MeterRegistry meterRegistry; public PerformanceOptimizedService(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; }
public CompletableFuture<String> monitoredAsyncTask() { Timer.Sample sample = Timer.start(meterRegistry); return CompletableFuture .supplyAsync(() -> { System.out.println("开始执行监控任务"); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "任务执行完成"; }, executor) .whenComplete((result, throwable) -> { sample.stop(Timer.builder("async.task.duration") .description("异步任务执行时间") .register(meterRegistry)); if (throwable != null) { Counter.builder("async.task.failures") .description("异步任务失败次数") .register(meterRegistry) .increment(); } else { Counter.builder("async.task.successes") .description("异步任务成功次数") .register(meterRegistry) .increment(); } }); }
public CompletableFuture<String> timeoutControl() { CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { System.out.println("开始执行可能超时的任务"); try { Thread.sleep(5000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "任务执行完成"; }, executor); CompletableFuture<String> timeoutFuture = future .orTimeout(3, TimeUnit.SECONDS) .exceptionally(throwable -> { if (throwable instanceof TimeoutException) { System.err.println("任务执行超时"); return "任务执行超时"; } else { System.err.println("任务执行异常: " + throwable.getMessage()); return "任务执行异常"; } }); return timeoutFuture; }
public CompletableFuture<String> resourceCleanup() { return CompletableFuture .supplyAsync(() -> { System.out.println("开始执行任务"); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "任务执行完成"; }, executor) .whenComplete((result, throwable) -> { System.out.println("执行资源清理"); }); } }
|
5. 配置和最佳实践
5.1 线程池配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| @Configuration @EnableAsync public class AsyncConfig {
@Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(50); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("async-task-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); executor.initialize(); return executor; }
@Bean public AsyncConfigurer asyncConfigurer() { return new AsyncConfigurer() { @Override public Executor getAsyncExecutor() { return taskExecutor(); } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncUncaughtExceptionHandler() { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { System.err.println("异步方法执行异常: " + method.getName()); System.err.println("异常信息: " + ex.getMessage()); ex.printStackTrace(); } }; } }; } }
|
5.2 使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| @RestController @RequestMapping("/api/async") public class AsyncController { @Autowired private AsyncTaskService asyncTaskService; @Autowired private AsyncDataService asyncDataService; @Autowired private UserService userService;
@PostMapping("/run-async") public ResponseEntity<String> testRunAsync() { try { asyncTaskService.basicRunAsync(); return ResponseEntity.ok("runAsync测试完成"); } catch (Exception e) { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) .body("runAsync测试失败: " + e.getMessage()); } }
@PostMapping("/supply-async") public ResponseEntity<String> testSupplyAsync() { try { asyncDataService.basicSupplyAsync(); return ResponseEntity.ok("supplyAsync测试完成"); } catch (Exception e) { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) .body("supplyAsync测试失败: " + e.getMessage()); } }
@PostMapping("/user-registration") public ResponseEntity<User> testUserRegistration(@RequestBody UserRegistrationRequest request) { try { CompletableFuture<User> future = userService.registerUser(request); User user = future.get(10, TimeUnit.SECONDS); return ResponseEntity.ok(user); } catch (Exception e) { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) .body(null); } }
@PostMapping("/order-creation") public ResponseEntity<Order> testOrderCreation(@RequestBody OrderRequest request) { try { CompletableFuture<Order> future = userService.createOrder(request); Order order = future.get(15, TimeUnit.SECONDS); return ResponseEntity.ok(order); } catch (Exception e) { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) .body(null); } } }
|
6. 总结
CompletableFuture是Java异步编程的强大工具,通过runAsync和supplyAsync可以轻松实现异步任务处理。关键要点包括:
6.1 核心优势
- 非阻塞: 异步执行不阻塞主线程
- 链式调用: 支持任务链式组合
- 异常处理: 完善的异常处理机制
- 任务组合: 支持多个异步任务的协调
- 性能提升: 提高系统并发处理能力
6.2 最佳实践
- 合理使用线程池: 避免创建过多线程
- 异常处理: 完善的异常处理和恢复机制
- 超时控制: 设置合理的超时时间
- 资源管理: 及时清理资源,避免内存泄漏
- 监控告警: 完善的监控和告警机制
通过合理使用CompletableFuture,可以显著提升系统的并发处理能力和用户体验。