分布式事务有哪些方案?适用场景?
1. 概述
1.1 分布式事务的重要性
分布式事务是分布式系统设计的核心问题之一,需要保证跨多个服务或数据库的操作要么全部成功,要么全部失败。
分布式事务的挑战:
- 网络分区:网络可能断开
- 节点故障:节点可能宕机
- 数据一致性:保证数据的一致性
- 性能影响:事务可能影响性能
1.2 分布式事务方案分类
分布式事务方案主要分为两类:
- 强一致性方案:2PC、3PC、TCC
- 最终一致性方案:Saga、本地消息表、事务消息
1.3 本文内容结构
本文将从以下几个方面全面解析分布式事务方案:
- 分布式事务概述:定义、挑战、分类
- 强一致性方案:2PC、3PC、TCC
- 最终一致性方案:Saga、本地消息表、事务消息
- 方案对比:优缺点、适用场景
- 实战案例:实际项目中的分布式事务实现
2. 分布式事务概述
2.1 分布式事务定义
2.1.1 什么是分布式事务
分布式事务:跨多个服务或数据库的事务,需要保证ACID特性。
特点:
- 跨多个服务
- 跨多个数据库
- 需要保证一致性
- 性能影响较大
2.1.2 分布式事务的挑战
挑战:
- 网络延迟:网络通信延迟
- 网络分区:网络可能断开
- 节点故障:节点可能宕机
- 数据一致性:保证数据的一致性
- 性能影响:事务可能影响性能
2.2 分布式事务分类
2.2.1 强一致性方案
强一致性方案:保证数据立即一致。
特点:
方案:
- 2PC:两阶段提交
- 3PC:三阶段提交
- TCC:Try-Confirm-Cancel
2.2.2 最终一致性方案
最终一致性方案:保证数据最终一致。
特点:
方案:
- Saga:长事务模式
- 本地消息表:本地消息表模式
- 事务消息:消息队列事务消息
3. 强一致性方案
3.1 两阶段提交(2PC)
3.1.1 原理
两阶段提交(2PC):协调者协调所有参与者完成事务。
阶段1:准备阶段(Prepare)
- 协调者向所有参与者发送Prepare请求
- 参与者执行事务,但不提交
- 参与者返回Yes/No
阶段2:提交阶段(Commit)
- 如果所有参与者都返回Yes,协调者发送Commit
- 如果任何参与者返回No,协调者发送Abort
- 参与者提交或回滚事务
3.1.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
| @Service public class TwoPhaseCommitCoordinator { @Autowired private List<TransactionParticipant> participants; public boolean executeTransaction(Transaction transaction) { List<Boolean> prepareResults = new ArrayList<>(); for (TransactionParticipant participant : participants) { try { boolean prepared = participant.prepare(transaction); prepareResults.add(prepared); } catch (Exception e) { prepareResults.add(false); } } boolean allPrepared = prepareResults.stream().allMatch(result -> result); if (allPrepared) { for (TransactionParticipant participant : participants) { try { participant.commit(transaction); } catch (Exception e) { log.error("Participant commit failed", e); } } return true; } else { for (TransactionParticipant participant : participants) { try { participant.rollback(transaction); } catch (Exception e) { log.error("Participant rollback failed", e); } } return false; } } }
@Component public class TransactionParticipant { @Autowired private DataSource dataSource; public boolean prepare(Transaction transaction) { Connection conn = null; try { conn = dataSource.getConnection(); conn.setAutoCommit(false); executeTransactionOperations(conn, transaction); return true; } catch (Exception e) { log.error("Prepare failed", e); return false; } finally { if (conn != null) { try { conn.close(); } catch (SQLException e) { log.error("Close connection failed", e); } } } } public void commit(Transaction transaction) { Connection conn = null; try { conn = dataSource.getConnection(); conn.setAutoCommit(false); executeTransactionOperations(conn, transaction); conn.commit(); } catch (Exception e) { log.error("Commit failed", e); if (conn != null) { try { conn.rollback(); } catch (SQLException ex) { log.error("Rollback failed", ex); } } throw new RuntimeException("Commit failed", e); } finally { if (conn != null) { try { conn.close(); } catch (SQLException e) { log.error("Close connection failed", e); } } } } public void rollback(Transaction transaction) { log.info("Rollback transaction: {}", transaction.getId()); } private void executeTransactionOperations(Connection conn, Transaction transaction) throws SQLException { } }
|
3.1.3 优缺点
优点:
缺点:
- 性能较低:需要两轮网络通信
- 阻塞时间长:参与者需要等待协调者
- 单点故障:协调者故障会导致整个事务失败
- 数据锁定:准备阶段数据被锁定,影响并发
3.1.4 适用场景
适用场景:
- 对一致性要求极高的场景
- 参与方较少的场景(2-3个)
- 网络延迟较低的场景
- 可以接受性能损失的场景
不适用场景:
3.2 三阶段提交(3PC)
3.2.1 原理
三阶段提交(3PC):在2PC基础上增加CanCommit阶段,减少阻塞。
阶段1:CanCommit
- 协调者向所有参与者发送CanCommit请求
- 参与者检查是否可以提交
- 参与者返回Yes/No
阶段2:PreCommit
- 如果所有参与者都返回Yes,协调者发送PreCommit
- 参与者执行事务,但不提交
- 参与者返回Ack
阶段3:DoCommit
- 协调者发送DoCommit
- 参与者提交事务
- 参与者返回Ack
3.2.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| @Service public class ThreePhaseCommitCoordinator { @Autowired private List<TransactionParticipant> participants; public boolean executeTransaction(Transaction transaction) { List<Boolean> canCommitResults = new ArrayList<>(); for (TransactionParticipant participant : participants) { try { boolean canCommit = participant.canCommit(transaction); canCommitResults.add(canCommit); } catch (Exception e) { canCommitResults.add(false); } } boolean allCanCommit = canCommitResults.stream().allMatch(result -> result); if (!allCanCommit) { for (TransactionParticipant participant : participants) { participant.abort(transaction); } return false; } List<Boolean> preCommitResults = new ArrayList<>(); for (TransactionParticipant participant : participants) { try { boolean preCommitted = participant.preCommit(transaction); preCommitResults.add(preCommitted); } catch (Exception e) { preCommitResults.add(false); } } boolean allPreCommitted = preCommitResults.stream().allMatch(result -> result); if (!allPreCommitted) { for (TransactionParticipant participant : participants) { participant.abort(transaction); } return false; } for (TransactionParticipant participant : participants) { try { participant.doCommit(transaction); } catch (Exception e) { log.error("DoCommit failed", e); } } return true; } }
@Component public class TransactionParticipant { public boolean canCommit(Transaction transaction) { return checkResources(transaction); } public boolean preCommit(Transaction transaction) { try { executeTransactionOperations(transaction); return true; } catch (Exception e) { log.error("PreCommit failed", e); return false; } } public void doCommit(Transaction transaction) { try { commitTransaction(transaction); } catch (Exception e) { log.error("DoCommit failed", e); throw new RuntimeException("DoCommit failed", e); } } public void abort(Transaction transaction) { rollbackTransaction(transaction); } }
|
3.2.3 优缺点
优点:
缺点:
3.2.4 适用场景
适用场景:
- 对一致性要求极高的场景
- 可以接受3轮网络通信的场景
- 需要减少阻塞时间的场景
不适用场景:
3.3 TCC(Try-Confirm-Cancel)
3.3.1 原理
TCC(Try-Confirm-Cancel):补偿型事务,分为三个阶段。
阶段1:Try(尝试)
- 尝试执行业务操作
- 预留资源
- 不提交事务
阶段2:Confirm(确认)
- 如果所有Try都成功,执行Confirm
- 提交事务
- 释放预留资源
阶段3:Cancel(取消)
- 如果任何Try失败,执行Cancel
- 回滚事务
- 释放预留资源
3.3.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
| @Service public class TCCTransactionService { @Autowired private OrderService orderService; @Autowired private InventoryService inventoryService; @Autowired private AccountService accountService; public void createOrder(Order order) { String transactionId = UUID.randomUUID().toString(); try { OrderTccResult orderResult = orderService.tryCreateOrder(order, transactionId); InventoryTccResult inventoryResult = inventoryService.tryDeductStock( order.getSkuId(), order.getQuantity(), transactionId); AccountTccResult accountResult = accountService.tryDeductBalance( order.getUserId(), order.getAmount(), transactionId); if (orderResult.isSuccess() && inventoryResult.isSuccess() && accountResult.isSuccess()) { orderService.confirmCreateOrder(transactionId); inventoryService.confirmDeductStock(transactionId); accountService.confirmDeductBalance(transactionId); } else { orderService.cancelCreateOrder(transactionId); inventoryService.cancelDeductStock(transactionId); accountService.cancelDeductBalance(transactionId); throw new BusinessException("创建订单失败"); } } catch (Exception e) { orderService.cancelCreateOrder(transactionId); inventoryService.cancelDeductStock(transactionId); accountService.cancelDeductBalance(transactionId); throw e; } } }
@Service public class InventoryService { @Autowired private InventoryMapper inventoryMapper; @Autowired private TccRecordMapper tccRecordMapper; public InventoryTccResult tryDeductStock(Long skuId, Integer quantity, String transactionId) { try { Inventory inventory = inventoryMapper.selectById(skuId); if (inventory.getStock() < quantity) { return InventoryTccResult.failure("库存不足"); } inventory.setFrozenStock(inventory.getFrozenStock() + quantity); inventory.setStock(inventory.getStock() - quantity); inventoryMapper.updateById(inventory); TccRecord record = new TccRecord(); record.setTransactionId(transactionId); record.setServiceName("InventoryService"); record.setMethodName("tryDeductStock"); record.setStatus(TccStatus.TRY); record.setParams(JSON.toJSONString(new Object[]{skuId, quantity})); tccRecordMapper.insert(record); return InventoryTccResult.success(); } catch (Exception e) { log.error("Try deduct stock failed", e); return InventoryTccResult.failure(e.getMessage()); } } public void confirmDeductStock(String transactionId) { TccRecord record = tccRecordMapper.selectByTransactionId(transactionId); if (record == null || record.getStatus() != TccStatus.TRY) { throw new BusinessException("TCC状态异常"); } record.setStatus(TccStatus.CONFIRM); tccRecordMapper.updateById(record); log.info("Confirm deduct stock: {}", transactionId); } public void cancelDeductStock(String transactionId) { TccRecord record = tccRecordMapper.selectByTransactionId(transactionId); if (record == null) { return; } Object[] params = JSON.parseObject(record.getParams(), Object[].class); Long skuId = Long.parseLong(params[0].toString()); Integer quantity = Integer.parseInt(params[1].toString()); Inventory inventory = inventoryMapper.selectById(skuId); inventory.setFrozenStock(inventory.getFrozenStock() - quantity); inventory.setStock(inventory.getStock() + quantity); inventoryMapper.updateById(inventory); record.setStatus(TccStatus.CANCEL); tccRecordMapper.updateById(record); log.info("Cancel deduct stock: {}", transactionId); } }
@Service public class AccountService { @Autowired private AccountMapper accountMapper; @Autowired private TccRecordMapper tccRecordMapper; public AccountTccResult tryDeductBalance(Long userId, BigDecimal amount, String transactionId) { try { Account account = accountMapper.selectById(userId); if (account.getBalance().compareTo(amount) < 0) { return AccountTccResult.failure("余额不足"); } account.setFrozenBalance(account.getFrozenBalance().add(amount)); account.setBalance(account.getBalance().subtract(amount)); accountMapper.updateById(account); TccRecord record = new TccRecord(); record.setTransactionId(transactionId); record.setServiceName("AccountService"); record.setMethodName("tryDeductBalance"); record.setStatus(TccStatus.TRY); record.setParams(JSON.toJSONString(new Object[]{userId, amount})); tccRecordMapper.insert(record); return AccountTccResult.success(); } catch (Exception e) { log.error("Try deduct balance failed", e); return AccountTccResult.failure(e.getMessage()); } } public void confirmDeductBalance(String transactionId) { TccRecord record = tccRecordMapper.selectByTransactionId(transactionId); if (record == null || record.getStatus() != TccStatus.TRY) { throw new BusinessException("TCC状态异常"); } record.setStatus(TccStatus.CONFIRM); tccRecordMapper.updateById(record); log.info("Confirm deduct balance: {}", transactionId); } public void cancelDeductBalance(String transactionId) { TccRecord record = tccRecordMapper.selectByTransactionId(transactionId); if (record == null) { return; } Object[] params = JSON.parseObject(record.getParams(), Object[].class); Long userId = Long.parseLong(params[0].toString()); BigDecimal amount = new BigDecimal(params[1].toString()); Account account = accountMapper.selectById(userId); account.setFrozenBalance(account.getFrozenBalance().subtract(amount)); account.setBalance(account.getBalance().add(amount)); accountMapper.updateById(account); record.setStatus(TccStatus.CANCEL); tccRecordMapper.updateById(record); log.info("Cancel deduct balance: {}", transactionId); } }
|
3.3.3 优缺点
优点:
- 性能较高(不需要全局锁)
- 适合高并发场景
- 可以灵活控制事务粒度
缺点:
- 实现复杂(需要实现Try、Confirm、Cancel三个方法)
- 业务侵入性强
- 需要保证幂等性
3.3.4 适用场景
适用场景:
- 高并发场景
- 对性能要求较高的场景
- 可以接受最终一致性的场景
- 业务逻辑相对简单的场景
不适用场景:
3.4 Seata AT模式
3.4.1 原理
Seata AT模式:自动补偿模式,基于数据源代理实现。
特点:
3.4.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Service public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private InventoryService inventoryService; @Autowired private AccountService accountService; @GlobalTransactional(rollbackFor = Exception.class) public void createOrder(Order order) { orderMapper.insert(order); inventoryService.deductStock(order.getSkuId(), order.getQuantity()); accountService.deductBalance(order.getUserId(), order.getAmount()); } }
@Service public class InventoryService { @Autowired private InventoryMapper inventoryMapper; public void deductStock(Long skuId, Integer quantity) { Inventory inventory = inventoryMapper.selectById(skuId); if (inventory.getStock() < quantity) { throw new BusinessException("库存不足"); } inventory.setStock(inventory.getStock() - quantity); inventoryMapper.updateById(inventory); } }
|
3.4.3 优缺点
优点:
缺点:
- 需要Seata框架支持
- 对数据库有要求(需要支持undo_log表)
3.4.4 适用场景
适用场景:
- 使用Seata框架的场景
- 需要业务代码无侵入的场景
- 对性能要求较高的场景
4. 最终一致性方案
4.1 Saga模式
4.1.1 原理
Saga模式:长事务模式,将大事务拆分为多个小事务。
两种实现方式:
- 编排式(Orchestration):中央协调器协调各个服务
- 协同式(Choreography):各个服务通过事件协同
4.1.2 编排式实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| @Service public class SagaOrchestrator { @Autowired private OrderService orderService; @Autowired private InventoryService inventoryService; @Autowired private AccountService accountService; @Autowired private SagaTransactionMapper sagaTransactionMapper; public void createOrder(Order order) { String sagaId = UUID.randomUUID().toString(); SagaTransaction saga = new SagaTransaction(); saga.setSagaId(sagaId); saga.setStatus(SagaStatus.STARTED); sagaTransactionMapper.insert(saga); try { orderService.createOrder(order, sagaId); saga.addStep(new SagaStep("createOrder", SagaStepStatus.COMPLETED)); inventoryService.deductStock(order.getSkuId(), order.getQuantity(), sagaId); saga.addStep(new SagaStep("deductStock", SagaStepStatus.COMPLETED)); accountService.deductBalance(order.getUserId(), order.getAmount(), sagaId); saga.addStep(new SagaStep("deductBalance", SagaStepStatus.COMPLETED)); saga.setStatus(SagaStatus.COMPLETED); sagaTransactionMapper.updateById(saga); } catch (Exception e) { compensate(saga); throw e; } } private void compensate(SagaTransaction saga) { List<SagaStep> steps = saga.getSteps(); Collections.reverse(steps); for (SagaStep step : steps) { if (step.getStatus() == SagaStepStatus.COMPLETED) { try { compensateStep(step); step.setStatus(SagaStepStatus.COMPENSATED); } catch (Exception e) { log.error("Compensate step failed: {}", step.getName(), e); } } } saga.setStatus(SagaStatus.COMPENSATED); sagaTransactionMapper.updateById(saga); } private void compensateStep(SagaStep step) { switch (step.getName()) { case "createOrder": orderService.cancelOrder(step.getSagaId()); break; case "deductStock": inventoryService.restoreStock(step.getSagaId()); break; case "deductBalance": accountService.restoreBalance(step.getSagaId()); break; } } }
|
4.1.3 协同式实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| @Service public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void createOrder(Order order) { order.setStatus(OrderStatus.PENDING); orderMapper.insert(order); OrderCreatedEvent event = new OrderCreatedEvent(); event.setOrderId(order.getId()); event.setSkuId(order.getSkuId()); event.setQuantity(order.getQuantity()); event.setUserId(order.getUserId()); event.setAmount(order.getAmount()); kafkaTemplate.send("order-events", JSON.toJSONString(event)); } @KafkaListener(topics = "order-compensation", groupId = "order-service") public void handleCompensation(String message) { OrderCompensationEvent event = JSON.parseObject(message, OrderCompensationEvent.class); Order order = orderMapper.selectById(event.getOrderId()); order.setStatus(OrderStatus.CANCELLED); orderMapper.updateById(order); } }
@Service public class InventoryService { @Autowired private InventoryMapper inventoryMapper; @KafkaListener(topics = "order-events", groupId = "inventory-service") public void handleOrderCreated(String message) { OrderCreatedEvent event = JSON.parseObject(message, OrderCreatedEvent.class); try { Inventory inventory = inventoryMapper.selectById(event.getSkuId()); if (inventory.getStock() < event.getQuantity()) { throw new BusinessException("库存不足"); } inventory.setStock(inventory.getStock() - event.getQuantity()); inventoryMapper.updateById(inventory); InventoryDeductedEvent deductedEvent = new InventoryDeductedEvent(); deductedEvent.setOrderId(event.getOrderId()); deductedEvent.setSkuId(event.getSkuId()); deductedEvent.setQuantity(event.getQuantity()); kafkaTemplate.send("inventory-events", JSON.toJSONString(deductedEvent)); } catch (Exception e) { InventoryDeductFailedEvent failedEvent = new InventoryDeductFailedEvent(); failedEvent.setOrderId(event.getOrderId()); failedEvent.setReason(e.getMessage()); kafkaTemplate.send("order-compensation", JSON.toJSONString(failedEvent)); } } }
|
4.1.4 优缺点
优点:
缺点:
- 实现复杂
- 需要实现补偿逻辑
- 可能出现数据不一致的中间状态
4.1.5 适用场景
适用场景:
- 长事务场景
- 可以接受最终一致性的场景
- 业务逻辑相对简单的场景
不适用场景:
4.2 本地消息表
4.2.1 原理
本地消息表:在本地数据库中创建消息表,保证本地事务和消息发送的一致性。
流程:
- 执行业务操作,同时插入消息到本地消息表
- 定时任务扫描本地消息表,发送消息到消息队列
- 消费者处理消息
- 更新消息状态
4.2.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| @Entity public class LocalMessage { @Id private String id; private String businessId; private String topic; private String message; private Integer status; private Integer retryCount; private LocalDateTime createTime; private LocalDateTime updateTime; }
@Service public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private LocalMessageMapper localMessageMapper; @Transactional public void createOrder(Order order) { orderMapper.insert(order); LocalMessage message = new LocalMessage(); message.setId(UUID.randomUUID().toString()); message.setBusinessId(order.getId().toString()); message.setTopic("order-created"); message.setMessage(JSON.toJSONString(order)); message.setStatus(0); message.setRetryCount(0); message.setCreateTime(LocalDateTime.now()); localMessageMapper.insert(message); } }
@Component public class LocalMessageScheduler { @Autowired private LocalMessageMapper localMessageMapper; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Scheduled(fixedDelay = 5000) public void sendPendingMessages() { List<LocalMessage> messages = localMessageMapper.selectByStatus(0); for (LocalMessage message : messages) { try { kafkaTemplate.send(message.getTopic(), message.getMessage()); message.setStatus(1); message.setUpdateTime(LocalDateTime.now()); localMessageMapper.updateById(message); } catch (Exception e) { log.error("Send message failed: {}", message.getId(), e); message.setRetryCount(message.getRetryCount() + 1); if (message.getRetryCount() >= 3) { message.setStatus(2); } message.setUpdateTime(LocalDateTime.now()); localMessageMapper.updateById(message); } } } }
@Component public class OrderEventConsumer { @Autowired private InventoryService inventoryService; @Autowired private AccountService accountService; @KafkaListener(topics = "order-created", groupId = "order-processor") public void handleOrderCreated(String message) { Order order = JSON.parseObject(message, Order.class); try { inventoryService.deductStock(order.getSkuId(), order.getQuantity()); accountService.deductBalance(order.getUserId(), order.getAmount()); order.setStatus(OrderStatus.CONFIRMED); orderMapper.updateById(order); } catch (Exception e) { log.error("Handle order created event failed", e); } } }
|
4.2.3 优缺点
优点:
- 实现简单
- 保证本地事务和消息发送的一致性
- 不需要额外的消息中间件支持
缺点:
- 需要额外的消息表
- 定时任务可能有延迟
- 需要处理消息重复发送
4.2.4 适用场景
适用场景:
- 需要保证本地事务和消息发送一致性的场景
- 可以接受最终一致性的场景
- 消息量不是特别大的场景
不适用场景:
4.3 事务消息(RocketMQ)
4.3.1 原理
事务消息:RocketMQ提供的事务消息功能,保证消息发送和本地事务的一致性。
流程:
- 发送Half消息(预发送消息)
- 执行本地事务
- 提交或回滚消息
4.3.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| @Service public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private RocketMQTemplate rocketMQTemplate; @Transactional public void createOrder(Order order) { TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( "order-tx-group", "order-created", MessageBuilder.withPayload(order).build(), new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { orderMapper.insert(order); return LocalTransactionState.COMMIT; } catch (Exception e) { log.error("Local transaction failed", e); return LocalTransactionState.ROLLBACK; } } @Override public LocalTransactionState checkLocalTransaction(Message msg) { String orderId = msg.getKeys(); Order order = orderMapper.selectById(Long.parseLong(orderId)); if (order != null) { return LocalTransactionState.COMMIT; } else { return LocalTransactionState.ROLLBACK; } } } ); } }
@Component @RocketMQMessageListener(topic = "order-created", consumerGroup = "order-processor") public class OrderEventConsumer implements RocketMQListener<Order> { @Autowired private InventoryService inventoryService; @Autowired private AccountService accountService; @Override public void onMessage(Order order) { try { inventoryService.deductStock(order.getSkuId(), order.getQuantity()); accountService.deductBalance(order.getUserId(), order.getAmount()); order.setStatus(OrderStatus.CONFIRMED); orderMapper.updateById(order); } catch (Exception e) { log.error("Handle order created event failed", e); } } }
|
4.3.3 优缺点
优点:
- 保证消息发送和本地事务的一致性
- 不需要额外的消息表
- 性能较高
缺点:
4.3.4 适用场景
适用场景:
- 使用RocketMQ的场景
- 需要保证消息发送和本地事务一致性的场景
- 可以接受最终一致性的场景
不适用场景:
5. 方案对比
5.1 方案对比表
| 方案 |
一致性 |
性能 |
复杂度 |
适用场景 |
| 2PC |
强一致 |
低 |
简单 |
小规模系统,对一致性要求极高 |
| 3PC |
强一致 |
低 |
中等 |
需要减少阻塞时间的场景 |
| TCC |
强一致 |
高 |
复杂 |
高并发场景,可以接受业务侵入 |
| Seata AT |
强一致 |
高 |
简单 |
使用Seata框架的场景 |
| Saga |
最终一致 |
高 |
复杂 |
长事务场景 |
| 本地消息表 |
最终一致 |
中 |
简单 |
需要保证本地事务和消息发送一致性 |
| 事务消息 |
最终一致 |
高 |
中等 |
使用RocketMQ的场景 |
5.2 选择原则
5.2.1 一致性要求
强一致性要求:
- 选择:2PC、3PC、TCC、Seata AT
- 适用场景:金融系统、支付系统
最终一致性要求:
- 选择:Saga、本地消息表、事务消息
- 适用场景:电商系统、社交网络
5.2.2 性能要求
高性能要求:
- 选择:TCC、Saga、事务消息
- 避免:2PC、3PC
中等性能要求:
5.2.3 复杂度要求
简单实现:
可以接受复杂实现:
6. 实战案例
6.1 案例1:支付系统(TCC)
6.1.1 场景
需求:支付系统,需要保证强一致性。
选择方案:TCC
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @Service public class PaymentService { @Autowired private AccountService accountService; @Autowired private PaymentRecordService paymentRecordService; public void pay(Long accountId, BigDecimal amount) { String transactionId = UUID.randomUUID().toString(); try { AccountTccResult accountResult = accountService.tryDeductBalance( accountId, amount, transactionId); PaymentRecordTccResult recordResult = paymentRecordService.tryCreateRecord( accountId, amount, transactionId); if (accountResult.isSuccess() && recordResult.isSuccess()) { accountService.confirmDeductBalance(transactionId); paymentRecordService.confirmCreateRecord(transactionId); } else { accountService.cancelDeductBalance(transactionId); paymentRecordService.cancelCreateRecord(transactionId); } } catch (Exception e) { accountService.cancelDeductBalance(transactionId); paymentRecordService.cancelCreateRecord(transactionId); throw e; } } }
|
6.2 案例2:电商系统(本地消息表)
6.2.1 场景
需求:电商系统,订单创建后异步处理库存和账户。
选择方案:本地消息表
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Service public class OrderService { @Transactional public void createOrder(Order order) { orderMapper.insert(order); LocalMessage message = new LocalMessage(); message.setId(UUID.randomUUID().toString()); message.setBusinessId(order.getId().toString()); message.setTopic("order-created"); message.setMessage(JSON.toJSONString(order)); message.setStatus(0); localMessageMapper.insert(message); } }
|
6.3 案例3:长事务系统(Saga)
6.3.1 场景
需求:订单系统,包含多个步骤,需要支持补偿。
选择方案:Saga
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Service public class OrderSagaService { public void createOrder(Order order) { String sagaId = UUID.randomUUID().toString(); try { orderService.createOrder(order, sagaId); inventoryService.deductStock(order.getSkuId(), order.getQuantity(), sagaId); accountService.deductBalance(order.getUserId(), order.getAmount(), sagaId); notificationService.sendNotification(order, sagaId); } catch (Exception e) { compensate(sagaId); throw e; } } }
|
7. 总结
7.1 核心要点
- 分布式事务方案:2PC、3PC、TCC、Saga、本地消息表、事务消息
- 强一致性方案:2PC、3PC、TCC、Seata AT
- 最终一致性方案:Saga、本地消息表、事务消息
- 选择原则:根据一致性要求、性能要求、复杂度要求选择
- 适用场景:不同场景选择不同方案
7.2 关键理解
- 强一致性:保证数据立即一致,但性能较低
- 最终一致性:保证数据最终一致,性能较高
- 业务驱动:根据业务需求选择方案
- 权衡取舍:在一致性和性能之间权衡
7.3 最佳实践
- 金融系统:TCC(强一致性)
- 电商系统:本地消息表或事务消息(最终一致性)
- 长事务系统:Saga(最终一致性)
- 高并发系统:TCC或Saga(高性能)
- 简单系统:2PC或本地消息表(简单实现)
相关文章: