如何设计”可回放、可对账、可补偿”的链路?

1. 概述

1.1 “可回放、可对账、可补偿”的重要性

**”可回放、可对账、可补偿”**是分布式系统设计的核心原则,特别是在金融、支付、电商等对数据一致性要求极高的场景下。

三者的意义

  • 可回放:能够重新执行历史操作,用于数据修复、问题排查、审计等
  • 可对账:能够核对不同系统间的数据一致性,发现数据差异
  • 可补偿:能够回滚或补偿失败的操作,保证最终一致性

1.2 应用场景

应用场景

  • 金融系统:支付、转账、结算等
  • 电商系统:订单、库存、账户等
  • 对账系统:与第三方系统对账
  • 审计系统:操作审计、数据追溯

1.3 本文内容结构

本文将从以下几个方面全面解析”可回放、可对账、可补偿”的链路设计:

  1. 可回放设计:事件溯源、操作日志、状态重建
  2. 可对账设计:对账模型、对账流程、差异处理
  3. 可补偿设计:补偿机制、Saga模式、TCC模式
  4. 整体链路设计:三者的结合设计
  5. 实战案例:实际项目中的完整实现

2. 可回放设计

2.1 事件溯源(Event Sourcing)

2.1.1 基本原理

事件溯源(Event Sourcing):将所有状态变化记录为事件序列,通过重放事件恢复状态。

特点

  • 所有状态变化都是事件
  • 通过重放事件恢复状态
  • 支持时间旅行(查询任意时间点的状态)
  • 支持审计和追溯

2.1.2 事件模型设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Entity
@Table(name = "domain_event", indexes = {
@Index(name = "idx_aggregate_id", columnList = "aggregateId"),
@Index(name = "idx_event_time", columnList = "eventTime")
})
public class DomainEvent {
@Id
private String eventId;

@Column(nullable = false)
private String aggregateId; // 聚合根ID

@Column(nullable = false)
private String aggregateType; // 聚合根类型:Order, Payment等

@Column(nullable = false)
private String eventType; // 事件类型:OrderCreated, OrderPaid等

@Column(nullable = false, columnDefinition = "TEXT")
private String eventData; // 事件数据(JSON)

@Column(nullable = false)
private Long eventVersion; // 事件版本号

@Column(nullable = false)
private LocalDateTime eventTime; // 事件时间

@Column(nullable = false)
private String userId; // 操作人

@Column(nullable = false)
private String traceId; // 链路追踪ID
}

2.1.3 事件存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Service
public class EventStore {

@Autowired
private DomainEventMapper eventMapper;

/**
* 保存事件
*/
@Transactional
public void saveEvent(DomainEvent event) {
// 1. 生成事件ID
event.setEventId(UUID.randomUUID().toString());

// 2. 获取当前版本号
Long currentVersion = eventMapper.selectMaxVersionByAggregateId(event.getAggregateId());
event.setEventVersion(currentVersion == null ? 1L : currentVersion + 1);

// 3. 设置事件时间
event.setEventTime(LocalDateTime.now());

// 4. 保存事件
eventMapper.insert(event);
}

/**
* 查询事件(按聚合根ID)
*/
public List<DomainEvent> getEventsByAggregateId(String aggregateId) {
return eventMapper.selectByAggregateId(aggregateId);
}

/**
* 查询事件(按时间范围)
*/
public List<DomainEvent> getEventsByTimeRange(LocalDateTime startTime, LocalDateTime endTime) {
return eventMapper.selectByTimeRange(startTime, endTime);
}

/**
* 查询事件(按聚合根ID和版本号)
*/
public List<DomainEvent> getEventsByAggregateIdAndVersion(String aggregateId, Long fromVersion, Long toVersion) {
return eventMapper.selectByAggregateIdAndVersion(aggregateId, fromVersion, toVersion);
}
}

2.1.4 状态重建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@Service
public class OrderService {

@Autowired
private EventStore eventStore;

@Autowired
private OrderMapper orderMapper;

/**
* 通过事件重建订单状态
*/
public Order rebuildOrderState(String orderId) {
// 1. 查询所有事件
List<DomainEvent> events = eventStore.getEventsByAggregateId(orderId);

// 2. 创建订单对象
Order order = new Order();
order.setId(orderId);

// 3. 重放事件
for (DomainEvent event : events) {
applyEvent(order, event);
}

return order;
}

/**
* 应用事件到订单对象
*/
private void applyEvent(Order order, DomainEvent event) {
switch (event.getEventType()) {
case "OrderCreated":
OrderCreatedEventData data = JSON.parseObject(event.getEventData(), OrderCreatedEventData.class);
order.setUserId(data.getUserId());
order.setSkuId(data.getSkuId());
order.setQuantity(data.getQuantity());
order.setAmount(data.getAmount());
order.setStatus(OrderStatus.PENDING);
break;

case "OrderPaid":
OrderPaidEventData paidData = JSON.parseObject(event.getEventData(), OrderPaidEventData.class);
order.setStatus(OrderStatus.PAID);
order.setPaidTime(paidData.getPaidTime());
order.setPaymentId(paidData.getPaymentId());
break;

case "OrderShipped":
OrderShippedEventData shippedData = JSON.parseObject(event.getEventData(), OrderShippedEventData.class);
order.setStatus(OrderStatus.SHIPPED);
order.setShippedTime(shippedData.getShippedTime());
order.setTrackingNumber(shippedData.getTrackingNumber());
break;

case "OrderCompleted":
order.setStatus(OrderStatus.COMPLETED);
break;

case "OrderCancelled":
OrderCancelledEventData cancelledData = JSON.parseObject(event.getEventData(), OrderCancelledEventData.class);
order.setStatus(OrderStatus.CANCELLED);
order.setCancelReason(cancelledData.getCancelReason());
break;
}
}

/**
* 查询订单(优先从事件重建,保证数据一致性)
*/
public Order getOrder(String orderId) {
// 1. 先从数据库查询(快速路径)
Order order = orderMapper.selectById(orderId);
if (order != null) {
// 2. 从事件重建状态(保证一致性)
Order rebuiltOrder = rebuildOrderState(orderId);

// 3. 对比状态,如果不一致,记录告警
if (!order.equals(rebuiltOrder)) {
log.warn("Order state mismatch: orderId={}", orderId);
// 可以选择修复数据库状态
// orderMapper.updateById(rebuiltOrder);
}

return rebuiltOrder;
}

// 4. 如果数据库不存在,从事件重建
return rebuildOrderState(orderId);
}
}

2.1.5 事件回放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Service
public class EventReplayService {

@Autowired
private EventStore eventStore;

@Autowired
private OrderService orderService;

/**
* 回放事件(用于数据修复)
*/
public void replayEvents(String aggregateId, LocalDateTime fromTime, LocalDateTime toTime) {
// 1. 查询指定时间范围的事件
List<DomainEvent> events = eventStore.getEventsByTimeRange(fromTime, toTime);

// 2. 按聚合根ID分组
Map<String, List<DomainEvent>> eventsByAggregate = events.stream()
.filter(e -> e.getAggregateId().equals(aggregateId))
.collect(Collectors.groupingBy(DomainEvent::getAggregateId));

// 3. 重放每个聚合根的事件
for (Map.Entry<String, List<DomainEvent>> entry : eventsByAggregate.entrySet()) {
String aggregateId = entry.getKey();
List<DomainEvent> aggregateEvents = entry.getValue();

// 4. 重建状态
Order order = orderService.rebuildOrderState(aggregateId);

// 5. 更新数据库(如果需要)
orderMapper.updateById(order);
}
}

/**
* 回放所有事件(用于全量数据修复)
*/
public void replayAllEvents(LocalDateTime fromTime, LocalDateTime toTime) {
List<DomainEvent> events = eventStore.getEventsByTimeRange(fromTime, toTime);

// 按聚合根ID分组
Map<String, List<DomainEvent>> eventsByAggregate = events.stream()
.collect(Collectors.groupingBy(DomainEvent::getAggregateId));

// 批量处理
for (Map.Entry<String, List<DomainEvent>> entry : eventsByAggregate.entrySet()) {
String aggregateId = entry.getKey();
try {
Order order = orderService.rebuildOrderState(aggregateId);
orderMapper.updateById(order);
} catch (Exception e) {
log.error("Replay events failed: aggregateId={}", aggregateId, e);
}
}
}
}

2.2 操作日志

2.2.1 操作日志模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Entity
@Table(name = "operation_log", indexes = {
@Index(name = "idx_business_id", columnList = "businessId"),
@Index(name = "idx_operation_time", columnList = "operationTime")
})
public class OperationLog {
@Id
private String logId;

@Column(nullable = false)
private String businessId; // 业务ID

@Column(nullable = false)
private String businessType; // 业务类型:ORDER, PAYMENT等

@Column(nullable = false)
private String operationType; // 操作类型:CREATE, UPDATE, DELETE等

@Column(nullable = false, columnDefinition = "TEXT")
private String requestData; // 请求数据(JSON)

@Column(columnDefinition = "TEXT")
private String responseData; // 响应数据(JSON)

@Column(nullable = false)
private Integer status; // 状态:0-成功,1-失败

@Column(columnDefinition = "TEXT")
private String errorMessage; // 错误信息

@Column(nullable = false)
private LocalDateTime operationTime; // 操作时间

@Column(nullable = false)
private String userId; // 操作人

@Column(nullable = false)
private String traceId; // 链路追踪ID
}

2.2.2 操作日志记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Aspect
@Component
public class OperationLogAspect {

@Autowired
private OperationLogMapper operationLogMapper;

@Around("@annotation(operationLog)")
public Object logOperation(ProceedingJoinPoint joinPoint, OperationLog operationLog) throws Throwable {
// 1. 获取请求参数
Object[] args = joinPoint.getArgs();
String requestData = JSON.toJSONString(args);

// 2. 获取业务ID
String businessId = extractBusinessId(args);

// 3. 创建操作日志
OperationLog log = new OperationLog();
log.setLogId(UUID.randomUUID().toString());
log.setBusinessId(businessId);
log.setBusinessType(operationLog.businessType());
log.setOperationType(operationLog.operationType());
log.setRequestData(requestData);
log.setOperationTime(LocalDateTime.now());
log.setUserId(getCurrentUserId());
log.setTraceId(getTraceId());

try {
// 4. 执行方法
Object result = joinPoint.proceed();

// 5. 记录成功日志
log.setStatus(0);
log.setResponseData(JSON.toJSONString(result));
operationLogMapper.insert(log);

return result;
} catch (Exception e) {
// 6. 记录失败日志
log.setStatus(1);
log.setErrorMessage(e.getMessage());
operationLogMapper.insert(log);
throw e;
}
}
}

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface OperationLog {
String businessType();
String operationType();
}

@Service
public class OrderService {

@OperationLog(businessType = "ORDER", operationType = "CREATE")
public Order createOrder(OrderRequest request) {
// 创建订单逻辑
}
}

3. 可对账设计

3.1 对账模型设计

3.1.1 对账数据模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@Entity
@Table(name = "reconciliation_record", indexes = {
@Index(name = "idx_business_date", columnList = "businessDate"),
@Index(name = "idx_reconciliation_status", columnList = "reconciliationStatus")
})
public class ReconciliationRecord {
@Id
private String recordId;

@Column(nullable = false)
private LocalDate businessDate; // 对账日期

@Column(nullable = false)
private String businessType; // 业务类型:PAYMENT, ORDER等

@Column(nullable = false)
private String systemA; // 系统A名称

@Column(nullable = false)
private String systemB; // 系统B名称

@Column(nullable = false)
private Long systemACount; // 系统A记录数

@Column(nullable = false)
private Long systemBCount; // 系统B记录数

@Column(nullable = false)
private BigDecimal systemAAmount; // 系统A金额

@Column(nullable = false)
private BigDecimal systemBAmount; // 系统B金额

@Column(nullable = false)
private Integer reconciliationStatus; // 对账状态:0-待对账,1-对账中,2-对账成功,3-对账失败

@Column(columnDefinition = "TEXT")
private String differenceDetails; // 差异详情(JSON)

@Column(nullable = false)
private LocalDateTime reconciliationTime; // 对账时间

@Column(nullable = false)
private LocalDateTime createTime;

@Column(nullable = false)
private LocalDateTime updateTime;
}

@Entity
@Table(name = "reconciliation_detail", indexes = {
@Index(name = "idx_record_id", columnList = "recordId"),
@Index(name = "idx_business_id", columnList = "businessId")
})
public class ReconciliationDetail {
@Id
private String detailId;

@Column(nullable = false)
private String recordId; // 对账记录ID

@Column(nullable = false)
private String businessId; // 业务ID

@Column(nullable = false)
private String businessType; // 业务类型

@Column(nullable = false)
private Integer differenceType; // 差异类型:0-无差异,1-系统A有系统B无,2-系统B有系统A无,3-金额不一致

@Column(columnDefinition = "TEXT")
private String systemAData; // 系统A数据(JSON)

@Column(columnDefinition = "TEXT")
private String systemBData; // 系统B数据(JSON)

@Column(nullable = false)
private LocalDateTime createTime;
}

3.1.2 对账服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@Service
public class ReconciliationService {

@Autowired
private ReconciliationRecordMapper recordMapper;

@Autowired
private ReconciliationDetailMapper detailMapper;

@Autowired
private PaymentService paymentService;

@Autowired
private ThirdPartyPaymentService thirdPartyPaymentService;

/**
* 执行对账
*/
@Transactional
public ReconciliationResult reconcile(LocalDate businessDate, String businessType) {
// 1. 创建对账记录
ReconciliationRecord record = new ReconciliationRecord();
record.setRecordId(UUID.randomUUID().toString());
record.setBusinessDate(businessDate);
record.setBusinessType(businessType);
record.setSystemA("内部系统");
record.setSystemB("第三方支付系统");
record.setReconciliationStatus(0); // 待对账
record.setCreateTime(LocalDateTime.now());
recordMapper.insert(record);

try {
// 2. 查询系统A数据
List<PaymentRecord> systemAData = paymentService.getPaymentsByDate(businessDate);

// 3. 查询系统B数据
List<ThirdPartyPaymentRecord> systemBData = thirdPartyPaymentService.getPaymentsByDate(businessDate);

// 4. 统计信息
record.setSystemACount((long) systemAData.size());
record.setSystemBCount((long) systemBData.size());
record.setSystemAAmount(systemAData.stream()
.map(PaymentRecord::getAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add));
record.setSystemBAmount(systemBData.stream()
.map(ThirdPartyPaymentRecord::getAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add));

// 5. 执行对账
List<ReconciliationDetail> differences = compareData(systemAData, systemBData, record.getRecordId());

// 6. 保存差异明细
if (!differences.isEmpty()) {
detailMapper.insertBatch(differences);
record.setReconciliationStatus(3); // 对账失败
record.setDifferenceDetails(JSON.toJSONString(differences));
} else {
record.setReconciliationStatus(2); // 对账成功
}

record.setReconciliationTime(LocalDateTime.now());
record.setUpdateTime(LocalDateTime.now());
recordMapper.updateById(record);

return ReconciliationResult.success(record, differences);
} catch (Exception e) {
log.error("Reconciliation failed", e);
record.setReconciliationStatus(3);
record.setUpdateTime(LocalDateTime.now());
recordMapper.updateById(record);
throw new BusinessException("对账失败", e);
}
}

/**
* 对比数据
*/
private List<ReconciliationDetail> compareData(
List<PaymentRecord> systemAData,
List<ThirdPartyPaymentRecord> systemBData,
String recordId) {

List<ReconciliationDetail> differences = new ArrayList<>();

// 1. 构建系统B的数据映射(按业务ID)
Map<String, ThirdPartyPaymentRecord> systemBMap = systemBData.stream()
.collect(Collectors.toMap(
ThirdPartyPaymentRecord::getPaymentId,
record -> record
));

// 2. 检查系统A的数据
for (PaymentRecord systemARecord : systemAData) {
ThirdPartyPaymentRecord systemBRecord = systemBMap.get(systemARecord.getPaymentId());

if (systemBRecord == null) {
// 系统A有,系统B无
ReconciliationDetail detail = createDifferenceDetail(
recordId,
systemARecord.getPaymentId(),
1, // 系统A有系统B无
JSON.toJSONString(systemARecord),
null
);
differences.add(detail);
} else {
// 检查金额是否一致
if (systemARecord.getAmount().compareTo(systemBRecord.getAmount()) != 0) {
ReconciliationDetail detail = createDifferenceDetail(
recordId,
systemARecord.getPaymentId(),
3, // 金额不一致
JSON.toJSONString(systemARecord),
JSON.toJSONString(systemBRecord)
);
differences.add(detail);
}
// 从映射中移除,表示已匹配
systemBMap.remove(systemARecord.getPaymentId());
}
}

// 3. 检查系统B剩余的数据(系统B有,系统A无)
for (ThirdPartyPaymentRecord systemBRecord : systemBMap.values()) {
ReconciliationDetail detail = createDifferenceDetail(
recordId,
systemBRecord.getPaymentId(),
2, // 系统B有系统A无
null,
JSON.toJSONString(systemBRecord)
);
differences.add(detail);
}

return differences;
}

private ReconciliationDetail createDifferenceDetail(
String recordId,
String businessId,
Integer differenceType,
String systemAData,
String systemBData) {
ReconciliationDetail detail = new ReconciliationDetail();
detail.setDetailId(UUID.randomUUID().toString());
detail.setRecordId(recordId);
detail.setBusinessId(businessId);
detail.setBusinessType("PAYMENT");
detail.setDifferenceType(differenceType);
detail.setSystemAData(systemAData);
detail.setSystemBData(systemBData);
detail.setCreateTime(LocalDateTime.now());
return detail;
}
}

3.2 定时对账

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class ReconciliationScheduler {

@Autowired
private ReconciliationService reconciliationService;

/**
* 每日对账(每天凌晨2点执行)
*/
@Scheduled(cron = "0 0 2 * * ?")
public void dailyReconciliation() {
// 对账昨天的数据
LocalDate yesterday = LocalDate.now().minusDays(1);

try {
reconciliationService.reconcile(yesterday, "PAYMENT");
log.info("Daily reconciliation completed: date={}", yesterday);
} catch (Exception e) {
log.error("Daily reconciliation failed: date={}", yesterday, e);
// 发送告警
}
}
}

4. 可补偿设计

4.1 Saga模式

4.1.1 Saga事务模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Entity
@Table(name = "saga_transaction", indexes = {
@Index(name = "idx_saga_id", columnList = "sagaId"),
@Index(name = "idx_status", columnList = "status")
})
public class SagaTransaction {
@Id
private String id;

@Column(nullable = false, unique = true)
private String sagaId; // Saga事务ID

@Column(nullable = false)
private String businessType; // 业务类型

@Column(nullable = false)
private String businessId; // 业务ID

@Column(nullable = false)
private Integer status; // 状态:0-进行中,1-已完成,2-已补偿

@Column(columnDefinition = "TEXT")
private String steps; // 步骤列表(JSON)

@Column(nullable = false)
private LocalDateTime createTime;

@Column(nullable = false)
private LocalDateTime updateTime;
}

@Entity
@Table(name = "saga_step", indexes = {
@Index(name = "idx_saga_id", columnList = "sagaId")
})
public class SagaStep {
@Id
private String id;

@Column(nullable = false)
private String sagaId; // Saga事务ID

@Column(nullable = false)
private String stepName; // 步骤名称

@Column(nullable = false)
private Integer stepOrder; // 步骤顺序

@Column(nullable = false)
private Integer status; // 状态:0-待执行,1-执行中,2-执行成功,3-执行失败,4-已补偿

@Column(columnDefinition = "TEXT")
private String requestData; // 请求数据(JSON)

@Column(columnDefinition = "TEXT")
private String responseData; // 响应数据(JSON)

@Column(columnDefinition = "TEXT")
private String compensateData; // 补偿数据(JSON)

@Column(nullable = false)
private LocalDateTime createTime;

@Column(nullable = false)
private LocalDateTime updateTime;
}

4.1.2 Saga编排器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@Service
public class SagaOrchestrator {

@Autowired
private SagaTransactionMapper sagaTransactionMapper;

@Autowired
private SagaStepMapper sagaStepMapper;

@Autowired
private OrderService orderService;

@Autowired
private InventoryService inventoryService;

@Autowired
private AccountService accountService;

/**
* 执行Saga事务
*/
@Transactional
public void executeSaga(OrderRequest request) {
String sagaId = UUID.randomUUID().toString();

// 1. 创建Saga事务
SagaTransaction saga = new SagaTransaction();
saga.setId(UUID.randomUUID().toString());
saga.setSagaId(sagaId);
saga.setBusinessType("ORDER");
saga.setBusinessId(request.getOrderId());
saga.setStatus(0); // 进行中
saga.setCreateTime(LocalDateTime.now());
saga.setUpdateTime(LocalDateTime.now());
sagaTransactionMapper.insert(saga);

List<SagaStep> steps = new ArrayList<>();

try {
// 2. 步骤1:创建订单
SagaStep step1 = executeStep(sagaId, "createOrder", 1, () -> {
return orderService.createOrder(request);
});
steps.add(step1);

// 3. 步骤2:扣减库存
SagaStep step2 = executeStep(sagaId, "deductStock", 2, () -> {
return inventoryService.deductStock(request.getSkuId(), request.getQuantity());
});
steps.add(step2);

// 4. 步骤3:扣减账户余额
SagaStep step3 = executeStep(sagaId, "deductBalance", 3, () -> {
return accountService.deductBalance(request.getUserId(), request.getAmount());
});
steps.add(step3);

// 5. 所有步骤成功,完成Saga
saga.setStatus(1); // 已完成
saga.setSteps(JSON.toJSONString(steps));
saga.setUpdateTime(LocalDateTime.now());
sagaTransactionMapper.updateById(saga);

} catch (Exception e) {
log.error("Saga execution failed: sagaId={}", sagaId, e);

// 6. 执行补偿(按相反顺序)
compensate(sagaId, steps);

saga.setStatus(2); // 已补偿
saga.setUpdateTime(LocalDateTime.now());
sagaTransactionMapper.updateById(saga);

throw new BusinessException("Saga执行失败", e);
}
}

/**
* 执行步骤
*/
private SagaStep executeStep(String sagaId, String stepName, Integer stepOrder, Supplier<Object> action) {
SagaStep step = new SagaStep();
step.setId(UUID.randomUUID().toString());
step.setSagaId(sagaId);
step.setStepName(stepName);
step.setStepOrder(stepOrder);
step.setStatus(1); // 执行中
step.setCreateTime(LocalDateTime.now());
step.setUpdateTime(LocalDateTime.now());
sagaStepMapper.insert(step);

try {
// 执行操作
Object result = action.get();

step.setStatus(2); // 执行成功
step.setResponseData(JSON.toJSONString(result));
step.setUpdateTime(LocalDateTime.now());
sagaStepMapper.updateById(step);

return step;
} catch (Exception e) {
step.setStatus(3); // 执行失败
step.setResponseData(e.getMessage());
step.setUpdateTime(LocalDateTime.now());
sagaStepMapper.updateById(step);
throw e;
}
}

/**
* 执行补偿
*/
private void compensate(String sagaId, List<SagaStep> steps) {
// 按相反顺序补偿
Collections.reverse(steps);

for (SagaStep step : steps) {
if (step.getStatus() == 2) { // 只补偿执行成功的步骤
try {
compensateStep(step);
step.setStatus(4); // 已补偿
step.setUpdateTime(LocalDateTime.now());
sagaStepMapper.updateById(step);
} catch (Exception e) {
log.error("Compensate step failed: stepId={}", step.getId(), e);
// 补偿失败,记录日志,可能需要人工介入
}
}
}
}

/**
* 补偿单个步骤
*/
private void compensateStep(SagaStep step) {
switch (step.getStepName()) {
case "createOrder":
Order order = JSON.parseObject(step.getResponseData(), Order.class);
orderService.cancelOrder(order.getId());
break;

case "deductStock":
InventoryDeductResult result = JSON.parseObject(step.getResponseData(), InventoryDeductResult.class);
inventoryService.restoreStock(result.getSkuId(), result.getQuantity());
break;

case "deductBalance":
AccountDeductResult accountResult = JSON.parseObject(step.getResponseData(), AccountDeductResult.class);
accountService.restoreBalance(accountResult.getUserId(), accountResult.getAmount());
break;
}
}
}

4.2 TCC模式

4.2.1 TCC事务模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Entity
@Table(name = "tcc_transaction", indexes = {
@Index(name = "idx_transaction_id", columnList = "transactionId"),
@Index(name = "idx_status", columnList = "status")
})
public class TccTransaction {
@Id
private String id;

@Column(nullable = false, unique = true)
private String transactionId; // TCC事务ID

@Column(nullable = false)
private String businessType; // 业务类型

@Column(nullable = false)
private String businessId; // 业务ID

@Column(nullable = false)
private Integer status; // 状态:0-Try中,1-Confirm中,2-已Confirm,3-已Cancel

@Column(nullable = false)
private LocalDateTime createTime;

@Column(nullable = false)
private LocalDateTime updateTime;

@Column(nullable = false)
private LocalDateTime expireTime; // 过期时间
}

4.2.2 TCC实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@Service
public class TccTransactionService {

@Autowired
private TccTransactionMapper tccTransactionMapper;

@Autowired
private OrderService orderService;

@Autowired
private InventoryService inventoryService;

/**
* 执行TCC事务
*/
@Transactional
public void executeTcc(OrderRequest request) {
String transactionId = UUID.randomUUID().toString();

// 1. 创建TCC事务
TccTransaction tcc = new TccTransaction();
tcc.setId(UUID.randomUUID().toString());
tcc.setTransactionId(transactionId);
tcc.setBusinessType("ORDER");
tcc.setBusinessId(request.getOrderId());
tcc.setStatus(0); // Try中
tcc.setCreateTime(LocalDateTime.now());
tcc.setUpdateTime(LocalDateTime.now());
tcc.setExpireTime(LocalDateTime.now().plusMinutes(30));
tccTransactionMapper.insert(tcc);

try {
// 2. Try阶段
OrderTccResult orderResult = orderService.tryCreateOrder(request, transactionId);
InventoryTccResult inventoryResult = inventoryService.tryDeductStock(
request.getSkuId(), request.getQuantity(), transactionId);
AccountTccResult accountResult = accountService.tryDeductBalance(
request.getUserId(), request.getAmount(), transactionId);

if (orderResult.isSuccess() && inventoryResult.isSuccess() && accountResult.isSuccess()) {
// 3. Confirm阶段
tcc.setStatus(1); // Confirm中
tcc.setUpdateTime(LocalDateTime.now());
tccTransactionMapper.updateById(tcc);

orderService.confirmCreateOrder(transactionId);
inventoryService.confirmDeductStock(transactionId);
accountService.confirmDeductBalance(transactionId);

tcc.setStatus(2); // 已Confirm
tcc.setUpdateTime(LocalDateTime.now());
tccTransactionMapper.updateById(tcc);
} else {
// 4. Cancel阶段
cancel(tcc);
}
} catch (Exception e) {
log.error("TCC execution failed: transactionId={}", transactionId, e);
cancel(tcc);
throw new BusinessException("TCC执行失败", e);
}
}

/**
* Cancel阶段
*/
private void cancel(TccTransaction tcc) {
tcc.setStatus(3); // 已Cancel
tcc.setUpdateTime(LocalDateTime.now());
tccTransactionMapper.updateById(tcc);

try {
orderService.cancelCreateOrder(tcc.getTransactionId());
inventoryService.cancelDeductStock(tcc.getTransactionId());
accountService.cancelDeductBalance(tcc.getTransactionId());
} catch (Exception e) {
log.error("TCC cancel failed: transactionId={}", tcc.getTransactionId(), e);
}
}
}

5. 整体链路设计

5.1 完整链路设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Service
public class OrderService {

@Autowired
private EventStore eventStore;

@Autowired
private OperationLogMapper operationLogMapper;

@Autowired
private SagaOrchestrator sagaOrchestrator;

/**
* 创建订单(完整链路:可回放、可对账、可补偿)
*/
@Transactional
public Order createOrder(OrderRequest request) {
String traceId = getTraceId();
String orderId = generateOrderId();

try {
// 1. 记录操作日志(可回放)
OperationLog operationLog = createOperationLog(orderId, "ORDER", "CREATE", request, traceId);

// 2. 执行Saga事务(可补偿)
sagaOrchestrator.executeSaga(request);

// 3. 创建订单
Order order = new Order();
order.setId(orderId);
order.setUserId(request.getUserId());
order.setSkuId(request.getSkuId());
order.setQuantity(request.getQuantity());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.PENDING);
orderMapper.insert(order);

// 4. 保存事件(可回放)
DomainEvent event = createEvent(orderId, "OrderCreated", order, traceId);
eventStore.saveEvent(event);

// 5. 更新操作日志
operationLog.setStatus(0);
operationLog.setResponseData(JSON.toJSONString(order));
operationLogMapper.updateById(operationLog);

return order;
} catch (Exception e) {
// 记录失败日志
OperationLog operationLog = getOperationLog(orderId);
if (operationLog != null) {
operationLog.setStatus(1);
operationLog.setErrorMessage(e.getMessage());
operationLogMapper.updateById(operationLog);
}
throw e;
}
}
}

5.2 链路追踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Component
public class TraceContext {

private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<>();

public static String getTraceId() {
String traceId = TRACE_ID.get();
if (traceId == null) {
traceId = UUID.randomUUID().toString();
TRACE_ID.set(traceId);
}
return traceId;
}

public static void setTraceId(String traceId) {
TRACE_ID.set(traceId);
}

public static void clear() {
TRACE_ID.remove();
}
}

@Aspect
@Component
public class TraceAspect {

@Around("execution(* com.example.service.*.*(..))")
public Object trace(ProceedingJoinPoint joinPoint) throws Throwable {
String traceId = TraceContext.getTraceId();
if (traceId == null) {
traceId = UUID.randomUUID().toString();
TraceContext.setTraceId(traceId);
}

try {
return joinPoint.proceed();
} finally {
// 可以在这里记录链路信息
}
}
}

6. 实战案例

6.1 案例:支付系统完整链路

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Service
public class PaymentService {

@Autowired
private EventStore eventStore;

@Autowired
private ReconciliationService reconciliationService;

@Autowired
private SagaOrchestrator sagaOrchestrator;

/**
* 支付订单(完整链路)
*/
@Transactional
public PaymentResult payOrder(PaymentRequest request) {
String traceId = TraceContext.getTraceId();
String paymentId = generatePaymentId();

try {
// 1. 记录操作日志
OperationLog log = createOperationLog(paymentId, "PAYMENT", "PAY", request, traceId);

// 2. 执行支付(Saga事务,可补偿)
PaymentResult result = sagaOrchestrator.executePayment(request);

// 3. 保存事件(可回放)
DomainEvent event = createEvent(paymentId, "PaymentCompleted", result, traceId);
eventStore.saveEvent(event);

// 4. 更新操作日志
log.setStatus(0);
log.setResponseData(JSON.toJSONString(result));
operationLogMapper.updateById(log);

return result;
} catch (Exception e) {
// 记录失败日志
OperationLog log = getOperationLog(paymentId);
if (log != null) {
log.setStatus(1);
log.setErrorMessage(e.getMessage());
operationLogMapper.updateById(log);
}
throw e;
}
}

/**
* 对账(定时任务)
*/
@Scheduled(cron = "0 0 2 * * ?")
public void dailyReconciliation() {
LocalDate yesterday = LocalDate.now().minusDays(1);
reconciliationService.reconcile(yesterday, "PAYMENT");
}

/**
* 回放事件(数据修复)
*/
public void replayPaymentEvents(String paymentId) {
List<DomainEvent> events = eventStore.getEventsByAggregateId(paymentId);
PaymentRecord payment = rebuildPaymentState(events);
paymentMapper.updateById(payment);
}
}

7. 总结

7.1 核心要点

  1. 可回放:事件溯源、操作日志、状态重建
  2. 可对账:对账模型、对账流程、差异处理
  3. 可补偿:Saga模式、TCC模式、补偿机制
  4. 整体链路:三者结合,形成完整的可追溯、可修复的链路
  5. 最佳实践:根据业务场景选择合适的方案

7.2 关键理解

  1. 可回放:通过事件溯源和操作日志,可以重建任意时间点的状态
  2. 可对账:通过定期对账,可以发现数据差异并及时处理
  3. 可补偿:通过Saga或TCC模式,可以回滚或补偿失败的操作
  4. 链路追踪:通过TraceId,可以追踪整个请求链路

7.3 最佳实践

  1. 事件溯源:所有状态变化都记录为事件
  2. 操作日志:所有操作都记录日志
  3. 定期对账:每天定时对账,发现差异
  4. 补偿机制:失败操作自动补偿
  5. 链路追踪:使用TraceId追踪整个链路

相关文章