第478集如何设计"可回放、可对账、可补偿"的链路?
|字数总计:4.8k|阅读时长:23分钟|阅读量:
如何设计”可回放、可对账、可补偿”的链路?
1. 概述
1.1 “可回放、可对账、可补偿”的重要性
**”可回放、可对账、可补偿”**是分布式系统设计的核心原则,特别是在金融、支付、电商等对数据一致性要求极高的场景下。
三者的意义:
- 可回放:能够重新执行历史操作,用于数据修复、问题排查、审计等
- 可对账:能够核对不同系统间的数据一致性,发现数据差异
- 可补偿:能够回滚或补偿失败的操作,保证最终一致性
1.2 应用场景
应用场景:
- 金融系统:支付、转账、结算等
- 电商系统:订单、库存、账户等
- 对账系统:与第三方系统对账
- 审计系统:操作审计、数据追溯
1.3 本文内容结构
本文将从以下几个方面全面解析”可回放、可对账、可补偿”的链路设计:
- 可回放设计:事件溯源、操作日志、状态重建
- 可对账设计:对账模型、对账流程、差异处理
- 可补偿设计:补偿机制、Saga模式、TCC模式
- 整体链路设计:三者的结合设计
- 实战案例:实际项目中的完整实现
2. 可回放设计
2.1 事件溯源(Event Sourcing)
2.1.1 基本原理
事件溯源(Event Sourcing):将所有状态变化记录为事件序列,通过重放事件恢复状态。
特点:
- 所有状态变化都是事件
- 通过重放事件恢复状态
- 支持时间旅行(查询任意时间点的状态)
- 支持审计和追溯
2.1.2 事件模型设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Entity @Table(name = "domain_event", indexes = { @Index(name = "idx_aggregate_id", columnList = "aggregateId"), @Index(name = "idx_event_time", columnList = "eventTime") }) public class DomainEvent { @Id private String eventId; @Column(nullable = false) private String aggregateId; @Column(nullable = false) private String aggregateType; @Column(nullable = false) private String eventType; @Column(nullable = false, columnDefinition = "TEXT") private String eventData; @Column(nullable = false) private Long eventVersion; @Column(nullable = false) private LocalDateTime eventTime; @Column(nullable = false) private String userId; @Column(nullable = false) private String traceId; }
|
2.1.3 事件存储
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| @Service public class EventStore { @Autowired private DomainEventMapper eventMapper;
@Transactional public void saveEvent(DomainEvent event) { event.setEventId(UUID.randomUUID().toString()); Long currentVersion = eventMapper.selectMaxVersionByAggregateId(event.getAggregateId()); event.setEventVersion(currentVersion == null ? 1L : currentVersion + 1); event.setEventTime(LocalDateTime.now()); eventMapper.insert(event); }
public List<DomainEvent> getEventsByAggregateId(String aggregateId) { return eventMapper.selectByAggregateId(aggregateId); }
public List<DomainEvent> getEventsByTimeRange(LocalDateTime startTime, LocalDateTime endTime) { return eventMapper.selectByTimeRange(startTime, endTime); }
public List<DomainEvent> getEventsByAggregateIdAndVersion(String aggregateId, Long fromVersion, Long toVersion) { return eventMapper.selectByAggregateIdAndVersion(aggregateId, fromVersion, toVersion); } }
|
2.1.4 状态重建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| @Service public class OrderService { @Autowired private EventStore eventStore; @Autowired private OrderMapper orderMapper;
public Order rebuildOrderState(String orderId) { List<DomainEvent> events = eventStore.getEventsByAggregateId(orderId); Order order = new Order(); order.setId(orderId); for (DomainEvent event : events) { applyEvent(order, event); } return order; }
private void applyEvent(Order order, DomainEvent event) { switch (event.getEventType()) { case "OrderCreated": OrderCreatedEventData data = JSON.parseObject(event.getEventData(), OrderCreatedEventData.class); order.setUserId(data.getUserId()); order.setSkuId(data.getSkuId()); order.setQuantity(data.getQuantity()); order.setAmount(data.getAmount()); order.setStatus(OrderStatus.PENDING); break; case "OrderPaid": OrderPaidEventData paidData = JSON.parseObject(event.getEventData(), OrderPaidEventData.class); order.setStatus(OrderStatus.PAID); order.setPaidTime(paidData.getPaidTime()); order.setPaymentId(paidData.getPaymentId()); break; case "OrderShipped": OrderShippedEventData shippedData = JSON.parseObject(event.getEventData(), OrderShippedEventData.class); order.setStatus(OrderStatus.SHIPPED); order.setShippedTime(shippedData.getShippedTime()); order.setTrackingNumber(shippedData.getTrackingNumber()); break; case "OrderCompleted": order.setStatus(OrderStatus.COMPLETED); break; case "OrderCancelled": OrderCancelledEventData cancelledData = JSON.parseObject(event.getEventData(), OrderCancelledEventData.class); order.setStatus(OrderStatus.CANCELLED); order.setCancelReason(cancelledData.getCancelReason()); break; } }
public Order getOrder(String orderId) { Order order = orderMapper.selectById(orderId); if (order != null) { Order rebuiltOrder = rebuildOrderState(orderId); if (!order.equals(rebuiltOrder)) { log.warn("Order state mismatch: orderId={}", orderId); } return rebuiltOrder; } return rebuildOrderState(orderId); } }
|
2.1.5 事件回放
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Service public class EventReplayService { @Autowired private EventStore eventStore; @Autowired private OrderService orderService;
public void replayEvents(String aggregateId, LocalDateTime fromTime, LocalDateTime toTime) { List<DomainEvent> events = eventStore.getEventsByTimeRange(fromTime, toTime); Map<String, List<DomainEvent>> eventsByAggregate = events.stream() .filter(e -> e.getAggregateId().equals(aggregateId)) .collect(Collectors.groupingBy(DomainEvent::getAggregateId)); for (Map.Entry<String, List<DomainEvent>> entry : eventsByAggregate.entrySet()) { String aggregateId = entry.getKey(); List<DomainEvent> aggregateEvents = entry.getValue(); Order order = orderService.rebuildOrderState(aggregateId); orderMapper.updateById(order); } }
public void replayAllEvents(LocalDateTime fromTime, LocalDateTime toTime) { List<DomainEvent> events = eventStore.getEventsByTimeRange(fromTime, toTime); Map<String, List<DomainEvent>> eventsByAggregate = events.stream() .collect(Collectors.groupingBy(DomainEvent::getAggregateId)); for (Map.Entry<String, List<DomainEvent>> entry : eventsByAggregate.entrySet()) { String aggregateId = entry.getKey(); try { Order order = orderService.rebuildOrderState(aggregateId); orderMapper.updateById(order); } catch (Exception e) { log.error("Replay events failed: aggregateId={}", aggregateId, e); } } } }
|
2.2 操作日志
2.2.1 操作日志模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Entity @Table(name = "operation_log", indexes = { @Index(name = "idx_business_id", columnList = "businessId"), @Index(name = "idx_operation_time", columnList = "operationTime") }) public class OperationLog { @Id private String logId; @Column(nullable = false) private String businessId; @Column(nullable = false) private String businessType; @Column(nullable = false) private String operationType; @Column(nullable = false, columnDefinition = "TEXT") private String requestData; @Column(columnDefinition = "TEXT") private String responseData; @Column(nullable = false) private Integer status; @Column(columnDefinition = "TEXT") private String errorMessage; @Column(nullable = false) private LocalDateTime operationTime; @Column(nullable = false) private String userId; @Column(nullable = false) private String traceId; }
|
2.2.2 操作日志记录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| @Aspect @Component public class OperationLogAspect { @Autowired private OperationLogMapper operationLogMapper; @Around("@annotation(operationLog)") public Object logOperation(ProceedingJoinPoint joinPoint, OperationLog operationLog) throws Throwable { Object[] args = joinPoint.getArgs(); String requestData = JSON.toJSONString(args); String businessId = extractBusinessId(args); OperationLog log = new OperationLog(); log.setLogId(UUID.randomUUID().toString()); log.setBusinessId(businessId); log.setBusinessType(operationLog.businessType()); log.setOperationType(operationLog.operationType()); log.setRequestData(requestData); log.setOperationTime(LocalDateTime.now()); log.setUserId(getCurrentUserId()); log.setTraceId(getTraceId()); try { Object result = joinPoint.proceed(); log.setStatus(0); log.setResponseData(JSON.toJSONString(result)); operationLogMapper.insert(log); return result; } catch (Exception e) { log.setStatus(1); log.setErrorMessage(e.getMessage()); operationLogMapper.insert(log); throw e; } } }
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface OperationLog { String businessType(); String operationType(); }
@Service public class OrderService { @OperationLog(businessType = "ORDER", operationType = "CREATE") public Order createOrder(OrderRequest request) { } }
|
3. 可对账设计
3.1 对账模型设计
3.1.1 对账数据模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| @Entity @Table(name = "reconciliation_record", indexes = { @Index(name = "idx_business_date", columnList = "businessDate"), @Index(name = "idx_reconciliation_status", columnList = "reconciliationStatus") }) public class ReconciliationRecord { @Id private String recordId; @Column(nullable = false) private LocalDate businessDate; @Column(nullable = false) private String businessType; @Column(nullable = false) private String systemA; @Column(nullable = false) private String systemB; @Column(nullable = false) private Long systemACount; @Column(nullable = false) private Long systemBCount; @Column(nullable = false) private BigDecimal systemAAmount; @Column(nullable = false) private BigDecimal systemBAmount; @Column(nullable = false) private Integer reconciliationStatus; @Column(columnDefinition = "TEXT") private String differenceDetails; @Column(nullable = false) private LocalDateTime reconciliationTime; @Column(nullable = false) private LocalDateTime createTime; @Column(nullable = false) private LocalDateTime updateTime; }
@Entity @Table(name = "reconciliation_detail", indexes = { @Index(name = "idx_record_id", columnList = "recordId"), @Index(name = "idx_business_id", columnList = "businessId") }) public class ReconciliationDetail { @Id private String detailId; @Column(nullable = false) private String recordId; @Column(nullable = false) private String businessId; @Column(nullable = false) private String businessType; @Column(nullable = false) private Integer differenceType; @Column(columnDefinition = "TEXT") private String systemAData; @Column(columnDefinition = "TEXT") private String systemBData; @Column(nullable = false) private LocalDateTime createTime; }
|
3.1.2 对账服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
| @Service public class ReconciliationService { @Autowired private ReconciliationRecordMapper recordMapper; @Autowired private ReconciliationDetailMapper detailMapper; @Autowired private PaymentService paymentService; @Autowired private ThirdPartyPaymentService thirdPartyPaymentService;
@Transactional public ReconciliationResult reconcile(LocalDate businessDate, String businessType) { ReconciliationRecord record = new ReconciliationRecord(); record.setRecordId(UUID.randomUUID().toString()); record.setBusinessDate(businessDate); record.setBusinessType(businessType); record.setSystemA("内部系统"); record.setSystemB("第三方支付系统"); record.setReconciliationStatus(0); record.setCreateTime(LocalDateTime.now()); recordMapper.insert(record); try { List<PaymentRecord> systemAData = paymentService.getPaymentsByDate(businessDate); List<ThirdPartyPaymentRecord> systemBData = thirdPartyPaymentService.getPaymentsByDate(businessDate); record.setSystemACount((long) systemAData.size()); record.setSystemBCount((long) systemBData.size()); record.setSystemAAmount(systemAData.stream() .map(PaymentRecord::getAmount) .reduce(BigDecimal.ZERO, BigDecimal::add)); record.setSystemBAmount(systemBData.stream() .map(ThirdPartyPaymentRecord::getAmount) .reduce(BigDecimal.ZERO, BigDecimal::add)); List<ReconciliationDetail> differences = compareData(systemAData, systemBData, record.getRecordId()); if (!differences.isEmpty()) { detailMapper.insertBatch(differences); record.setReconciliationStatus(3); record.setDifferenceDetails(JSON.toJSONString(differences)); } else { record.setReconciliationStatus(2); } record.setReconciliationTime(LocalDateTime.now()); record.setUpdateTime(LocalDateTime.now()); recordMapper.updateById(record); return ReconciliationResult.success(record, differences); } catch (Exception e) { log.error("Reconciliation failed", e); record.setReconciliationStatus(3); record.setUpdateTime(LocalDateTime.now()); recordMapper.updateById(record); throw new BusinessException("对账失败", e); } }
private List<ReconciliationDetail> compareData( List<PaymentRecord> systemAData, List<ThirdPartyPaymentRecord> systemBData, String recordId) { List<ReconciliationDetail> differences = new ArrayList<>(); Map<String, ThirdPartyPaymentRecord> systemBMap = systemBData.stream() .collect(Collectors.toMap( ThirdPartyPaymentRecord::getPaymentId, record -> record )); for (PaymentRecord systemARecord : systemAData) { ThirdPartyPaymentRecord systemBRecord = systemBMap.get(systemARecord.getPaymentId()); if (systemBRecord == null) { ReconciliationDetail detail = createDifferenceDetail( recordId, systemARecord.getPaymentId(), 1, JSON.toJSONString(systemARecord), null ); differences.add(detail); } else { if (systemARecord.getAmount().compareTo(systemBRecord.getAmount()) != 0) { ReconciliationDetail detail = createDifferenceDetail( recordId, systemARecord.getPaymentId(), 3, JSON.toJSONString(systemARecord), JSON.toJSONString(systemBRecord) ); differences.add(detail); } systemBMap.remove(systemARecord.getPaymentId()); } } for (ThirdPartyPaymentRecord systemBRecord : systemBMap.values()) { ReconciliationDetail detail = createDifferenceDetail( recordId, systemBRecord.getPaymentId(), 2, null, JSON.toJSONString(systemBRecord) ); differences.add(detail); } return differences; } private ReconciliationDetail createDifferenceDetail( String recordId, String businessId, Integer differenceType, String systemAData, String systemBData) { ReconciliationDetail detail = new ReconciliationDetail(); detail.setDetailId(UUID.randomUUID().toString()); detail.setRecordId(recordId); detail.setBusinessId(businessId); detail.setBusinessType("PAYMENT"); detail.setDifferenceType(differenceType); detail.setSystemAData(systemAData); detail.setSystemBData(systemBData); detail.setCreateTime(LocalDateTime.now()); return detail; } }
|
3.2 定时对账
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Component public class ReconciliationScheduler { @Autowired private ReconciliationService reconciliationService;
@Scheduled(cron = "0 0 2 * * ?") public void dailyReconciliation() { LocalDate yesterday = LocalDate.now().minusDays(1); try { reconciliationService.reconcile(yesterday, "PAYMENT"); log.info("Daily reconciliation completed: date={}", yesterday); } catch (Exception e) { log.error("Daily reconciliation failed: date={}", yesterday, e); } } }
|
4. 可补偿设计
4.1 Saga模式
4.1.1 Saga事务模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| @Entity @Table(name = "saga_transaction", indexes = { @Index(name = "idx_saga_id", columnList = "sagaId"), @Index(name = "idx_status", columnList = "status") }) public class SagaTransaction { @Id private String id; @Column(nullable = false, unique = true) private String sagaId; @Column(nullable = false) private String businessType; @Column(nullable = false) private String businessId; @Column(nullable = false) private Integer status; @Column(columnDefinition = "TEXT") private String steps; @Column(nullable = false) private LocalDateTime createTime; @Column(nullable = false) private LocalDateTime updateTime; }
@Entity @Table(name = "saga_step", indexes = { @Index(name = "idx_saga_id", columnList = "sagaId") }) public class SagaStep { @Id private String id; @Column(nullable = false) private String sagaId; @Column(nullable = false) private String stepName; @Column(nullable = false) private Integer stepOrder; @Column(nullable = false) private Integer status; @Column(columnDefinition = "TEXT") private String requestData; @Column(columnDefinition = "TEXT") private String responseData; @Column(columnDefinition = "TEXT") private String compensateData; @Column(nullable = false) private LocalDateTime createTime; @Column(nullable = false) private LocalDateTime updateTime; }
|
4.1.2 Saga编排器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
| @Service public class SagaOrchestrator { @Autowired private SagaTransactionMapper sagaTransactionMapper; @Autowired private SagaStepMapper sagaStepMapper; @Autowired private OrderService orderService; @Autowired private InventoryService inventoryService; @Autowired private AccountService accountService;
@Transactional public void executeSaga(OrderRequest request) { String sagaId = UUID.randomUUID().toString(); SagaTransaction saga = new SagaTransaction(); saga.setId(UUID.randomUUID().toString()); saga.setSagaId(sagaId); saga.setBusinessType("ORDER"); saga.setBusinessId(request.getOrderId()); saga.setStatus(0); saga.setCreateTime(LocalDateTime.now()); saga.setUpdateTime(LocalDateTime.now()); sagaTransactionMapper.insert(saga); List<SagaStep> steps = new ArrayList<>(); try { SagaStep step1 = executeStep(sagaId, "createOrder", 1, () -> { return orderService.createOrder(request); }); steps.add(step1); SagaStep step2 = executeStep(sagaId, "deductStock", 2, () -> { return inventoryService.deductStock(request.getSkuId(), request.getQuantity()); }); steps.add(step2); SagaStep step3 = executeStep(sagaId, "deductBalance", 3, () -> { return accountService.deductBalance(request.getUserId(), request.getAmount()); }); steps.add(step3); saga.setStatus(1); saga.setSteps(JSON.toJSONString(steps)); saga.setUpdateTime(LocalDateTime.now()); sagaTransactionMapper.updateById(saga); } catch (Exception e) { log.error("Saga execution failed: sagaId={}", sagaId, e); compensate(sagaId, steps); saga.setStatus(2); saga.setUpdateTime(LocalDateTime.now()); sagaTransactionMapper.updateById(saga); throw new BusinessException("Saga执行失败", e); } }
private SagaStep executeStep(String sagaId, String stepName, Integer stepOrder, Supplier<Object> action) { SagaStep step = new SagaStep(); step.setId(UUID.randomUUID().toString()); step.setSagaId(sagaId); step.setStepName(stepName); step.setStepOrder(stepOrder); step.setStatus(1); step.setCreateTime(LocalDateTime.now()); step.setUpdateTime(LocalDateTime.now()); sagaStepMapper.insert(step); try { Object result = action.get(); step.setStatus(2); step.setResponseData(JSON.toJSONString(result)); step.setUpdateTime(LocalDateTime.now()); sagaStepMapper.updateById(step); return step; } catch (Exception e) { step.setStatus(3); step.setResponseData(e.getMessage()); step.setUpdateTime(LocalDateTime.now()); sagaStepMapper.updateById(step); throw e; } }
private void compensate(String sagaId, List<SagaStep> steps) { Collections.reverse(steps); for (SagaStep step : steps) { if (step.getStatus() == 2) { try { compensateStep(step); step.setStatus(4); step.setUpdateTime(LocalDateTime.now()); sagaStepMapper.updateById(step); } catch (Exception e) { log.error("Compensate step failed: stepId={}", step.getId(), e); } } } }
private void compensateStep(SagaStep step) { switch (step.getStepName()) { case "createOrder": Order order = JSON.parseObject(step.getResponseData(), Order.class); orderService.cancelOrder(order.getId()); break; case "deductStock": InventoryDeductResult result = JSON.parseObject(step.getResponseData(), InventoryDeductResult.class); inventoryService.restoreStock(result.getSkuId(), result.getQuantity()); break; case "deductBalance": AccountDeductResult accountResult = JSON.parseObject(step.getResponseData(), AccountDeductResult.class); accountService.restoreBalance(accountResult.getUserId(), accountResult.getAmount()); break; } } }
|
4.2 TCC模式
4.2.1 TCC事务模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Entity @Table(name = "tcc_transaction", indexes = { @Index(name = "idx_transaction_id", columnList = "transactionId"), @Index(name = "idx_status", columnList = "status") }) public class TccTransaction { @Id private String id; @Column(nullable = false, unique = true) private String transactionId; @Column(nullable = false) private String businessType; @Column(nullable = false) private String businessId; @Column(nullable = false) private Integer status; @Column(nullable = false) private LocalDateTime createTime; @Column(nullable = false) private LocalDateTime updateTime; @Column(nullable = false) private LocalDateTime expireTime; }
|
4.2.2 TCC实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| @Service public class TccTransactionService { @Autowired private TccTransactionMapper tccTransactionMapper; @Autowired private OrderService orderService; @Autowired private InventoryService inventoryService;
@Transactional public void executeTcc(OrderRequest request) { String transactionId = UUID.randomUUID().toString(); TccTransaction tcc = new TccTransaction(); tcc.setId(UUID.randomUUID().toString()); tcc.setTransactionId(transactionId); tcc.setBusinessType("ORDER"); tcc.setBusinessId(request.getOrderId()); tcc.setStatus(0); tcc.setCreateTime(LocalDateTime.now()); tcc.setUpdateTime(LocalDateTime.now()); tcc.setExpireTime(LocalDateTime.now().plusMinutes(30)); tccTransactionMapper.insert(tcc); try { OrderTccResult orderResult = orderService.tryCreateOrder(request, transactionId); InventoryTccResult inventoryResult = inventoryService.tryDeductStock( request.getSkuId(), request.getQuantity(), transactionId); AccountTccResult accountResult = accountService.tryDeductBalance( request.getUserId(), request.getAmount(), transactionId); if (orderResult.isSuccess() && inventoryResult.isSuccess() && accountResult.isSuccess()) { tcc.setStatus(1); tcc.setUpdateTime(LocalDateTime.now()); tccTransactionMapper.updateById(tcc); orderService.confirmCreateOrder(transactionId); inventoryService.confirmDeductStock(transactionId); accountService.confirmDeductBalance(transactionId); tcc.setStatus(2); tcc.setUpdateTime(LocalDateTime.now()); tccTransactionMapper.updateById(tcc); } else { cancel(tcc); } } catch (Exception e) { log.error("TCC execution failed: transactionId={}", transactionId, e); cancel(tcc); throw new BusinessException("TCC执行失败", e); } }
private void cancel(TccTransaction tcc) { tcc.setStatus(3); tcc.setUpdateTime(LocalDateTime.now()); tccTransactionMapper.updateById(tcc); try { orderService.cancelCreateOrder(tcc.getTransactionId()); inventoryService.cancelDeductStock(tcc.getTransactionId()); accountService.cancelDeductBalance(tcc.getTransactionId()); } catch (Exception e) { log.error("TCC cancel failed: transactionId={}", tcc.getTransactionId(), e); } } }
|
5. 整体链路设计
5.1 完整链路设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| @Service public class OrderService { @Autowired private EventStore eventStore; @Autowired private OperationLogMapper operationLogMapper; @Autowired private SagaOrchestrator sagaOrchestrator;
@Transactional public Order createOrder(OrderRequest request) { String traceId = getTraceId(); String orderId = generateOrderId(); try { OperationLog operationLog = createOperationLog(orderId, "ORDER", "CREATE", request, traceId); sagaOrchestrator.executeSaga(request); Order order = new Order(); order.setId(orderId); order.setUserId(request.getUserId()); order.setSkuId(request.getSkuId()); order.setQuantity(request.getQuantity()); order.setAmount(request.getAmount()); order.setStatus(OrderStatus.PENDING); orderMapper.insert(order); DomainEvent event = createEvent(orderId, "OrderCreated", order, traceId); eventStore.saveEvent(event); operationLog.setStatus(0); operationLog.setResponseData(JSON.toJSONString(order)); operationLogMapper.updateById(operationLog); return order; } catch (Exception e) { OperationLog operationLog = getOperationLog(orderId); if (operationLog != null) { operationLog.setStatus(1); operationLog.setErrorMessage(e.getMessage()); operationLogMapper.updateById(operationLog); } throw e; } } }
|
5.2 链路追踪
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Component public class TraceContext { private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<>(); public static String getTraceId() { String traceId = TRACE_ID.get(); if (traceId == null) { traceId = UUID.randomUUID().toString(); TRACE_ID.set(traceId); } return traceId; } public static void setTraceId(String traceId) { TRACE_ID.set(traceId); } public static void clear() { TRACE_ID.remove(); } }
@Aspect @Component public class TraceAspect { @Around("execution(* com.example.service.*.*(..))") public Object trace(ProceedingJoinPoint joinPoint) throws Throwable { String traceId = TraceContext.getTraceId(); if (traceId == null) { traceId = UUID.randomUUID().toString(); TraceContext.setTraceId(traceId); } try { return joinPoint.proceed(); } finally { } } }
|
6. 实战案例
6.1 案例:支付系统完整链路
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| @Service public class PaymentService { @Autowired private EventStore eventStore; @Autowired private ReconciliationService reconciliationService; @Autowired private SagaOrchestrator sagaOrchestrator;
@Transactional public PaymentResult payOrder(PaymentRequest request) { String traceId = TraceContext.getTraceId(); String paymentId = generatePaymentId(); try { OperationLog log = createOperationLog(paymentId, "PAYMENT", "PAY", request, traceId); PaymentResult result = sagaOrchestrator.executePayment(request); DomainEvent event = createEvent(paymentId, "PaymentCompleted", result, traceId); eventStore.saveEvent(event); log.setStatus(0); log.setResponseData(JSON.toJSONString(result)); operationLogMapper.updateById(log); return result; } catch (Exception e) { OperationLog log = getOperationLog(paymentId); if (log != null) { log.setStatus(1); log.setErrorMessage(e.getMessage()); operationLogMapper.updateById(log); } throw e; } }
@Scheduled(cron = "0 0 2 * * ?") public void dailyReconciliation() { LocalDate yesterday = LocalDate.now().minusDays(1); reconciliationService.reconcile(yesterday, "PAYMENT"); }
public void replayPaymentEvents(String paymentId) { List<DomainEvent> events = eventStore.getEventsByAggregateId(paymentId); PaymentRecord payment = rebuildPaymentState(events); paymentMapper.updateById(payment); } }
|
7. 总结
7.1 核心要点
- 可回放:事件溯源、操作日志、状态重建
- 可对账:对账模型、对账流程、差异处理
- 可补偿:Saga模式、TCC模式、补偿机制
- 整体链路:三者结合,形成完整的可追溯、可修复的链路
- 最佳实践:根据业务场景选择合适的方案
7.2 关键理解
- 可回放:通过事件溯源和操作日志,可以重建任意时间点的状态
- 可对账:通过定期对账,可以发现数据差异并及时处理
- 可补偿:通过Saga或TCC模式,可以回滚或补偿失败的操作
- 链路追踪:通过TraceId,可以追踪整个请求链路
7.3 最佳实践
- 事件溯源:所有状态变化都记录为事件
- 操作日志:所有操作都记录日志
- 定期对账:每天定时对账,发现差异
- 补偿机制:失败操作自动补偿
- 链路追踪:使用TraceId追踪整个链路
相关文章: