分布式事务有哪些方案?适用场景?

1. 概述

1.1 分布式事务的重要性

分布式事务是分布式系统设计的核心问题之一,需要保证跨多个服务或数据库的操作要么全部成功,要么全部失败。

分布式事务的挑战

  • 网络分区:网络可能断开
  • 节点故障:节点可能宕机
  • 数据一致性:保证数据的一致性
  • 性能影响:事务可能影响性能

1.2 分布式事务方案分类

分布式事务方案主要分为两类:

  1. 强一致性方案:2PC、3PC、TCC
  2. 最终一致性方案:Saga、本地消息表、事务消息

1.3 本文内容结构

本文将从以下几个方面全面解析分布式事务方案:

  1. 分布式事务概述:定义、挑战、分类
  2. 强一致性方案:2PC、3PC、TCC
  3. 最终一致性方案:Saga、本地消息表、事务消息
  4. 方案对比:优缺点、适用场景
  5. 实战案例:实际项目中的分布式事务实现

2. 分布式事务概述

2.1 分布式事务定义

2.1.1 什么是分布式事务

分布式事务:跨多个服务或数据库的事务,需要保证ACID特性。

特点

  • 跨多个服务
  • 跨多个数据库
  • 需要保证一致性
  • 性能影响较大

2.1.2 分布式事务的挑战

挑战

  • 网络延迟:网络通信延迟
  • 网络分区:网络可能断开
  • 节点故障:节点可能宕机
  • 数据一致性:保证数据的一致性
  • 性能影响:事务可能影响性能

2.2 分布式事务分类

2.2.1 强一致性方案

强一致性方案:保证数据立即一致。

特点

  • 数据立即一致
  • 性能较低
  • 实现复杂

方案

  • 2PC:两阶段提交
  • 3PC:三阶段提交
  • TCC:Try-Confirm-Cancel

2.2.2 最终一致性方案

最终一致性方案:保证数据最终一致。

特点

  • 数据最终一致
  • 性能较高
  • 实现相对简单

方案

  • Saga:长事务模式
  • 本地消息表:本地消息表模式
  • 事务消息:消息队列事务消息

3. 强一致性方案

3.1 两阶段提交(2PC)

3.1.1 原理

两阶段提交(2PC):协调者协调所有参与者完成事务。

阶段1:准备阶段(Prepare)

  1. 协调者向所有参与者发送Prepare请求
  2. 参与者执行事务,但不提交
  3. 参与者返回Yes/No

阶段2:提交阶段(Commit)

  1. 如果所有参与者都返回Yes,协调者发送Commit
  2. 如果任何参与者返回No,协调者发送Abort
  3. 参与者提交或回滚事务

3.1.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@Service
public class TwoPhaseCommitCoordinator {

@Autowired
private List<TransactionParticipant> participants;

public boolean executeTransaction(Transaction transaction) {
// 阶段1:准备阶段
List<Boolean> prepareResults = new ArrayList<>();
for (TransactionParticipant participant : participants) {
try {
boolean prepared = participant.prepare(transaction);
prepareResults.add(prepared);
} catch (Exception e) {
prepareResults.add(false);
}
}

// 检查所有参与者是否都准备成功
boolean allPrepared = prepareResults.stream().allMatch(result -> result);

// 阶段2:提交阶段
if (allPrepared) {
// 所有参与者都准备成功,提交
for (TransactionParticipant participant : participants) {
try {
participant.commit(transaction);
} catch (Exception e) {
// 提交失败,需要人工介入
log.error("Participant commit failed", e);
}
}
return true;
} else {
// 有参与者准备失败,回滚
for (TransactionParticipant participant : participants) {
try {
participant.rollback(transaction);
} catch (Exception e) {
log.error("Participant rollback failed", e);
}
}
return false;
}
}
}

@Component
public class TransactionParticipant {

@Autowired
private DataSource dataSource;

public boolean prepare(Transaction transaction) {
Connection conn = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);

// 执行事务操作,但不提交
executeTransactionOperations(conn, transaction);

// 准备成功,但不提交
return true;
} catch (Exception e) {
log.error("Prepare failed", e);
return false;
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
log.error("Close connection failed", e);
}
}
}
}

public void commit(Transaction transaction) {
Connection conn = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);

// 执行事务操作
executeTransactionOperations(conn, transaction);

// 提交事务
conn.commit();
} catch (Exception e) {
log.error("Commit failed", e);
if (conn != null) {
try {
conn.rollback();
} catch (SQLException ex) {
log.error("Rollback failed", ex);
}
}
throw new RuntimeException("Commit failed", e);
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
log.error("Close connection failed", e);
}
}
}
}

public void rollback(Transaction transaction) {
// 回滚事务
log.info("Rollback transaction: {}", transaction.getId());
}

private void executeTransactionOperations(Connection conn, Transaction transaction) throws SQLException {
// 执行具体的事务操作
// 例如:扣减库存、扣减账户余额等
}
}

3.1.3 优缺点

优点

  • 保证强一致性
  • 实现相对简单
  • 适合小规模系统

缺点

  • 性能较低:需要两轮网络通信
  • 阻塞时间长:参与者需要等待协调者
  • 单点故障:协调者故障会导致整个事务失败
  • 数据锁定:准备阶段数据被锁定,影响并发

3.1.4 适用场景

适用场景

  • 对一致性要求极高的场景
  • 参与方较少的场景(2-3个)
  • 网络延迟较低的场景
  • 可以接受性能损失的场景

不适用场景

  • 高并发场景
  • 网络延迟较高的场景
  • 参与方较多的场景

3.2 三阶段提交(3PC)

3.2.1 原理

三阶段提交(3PC):在2PC基础上增加CanCommit阶段,减少阻塞。

阶段1:CanCommit

  1. 协调者向所有参与者发送CanCommit请求
  2. 参与者检查是否可以提交
  3. 参与者返回Yes/No

阶段2:PreCommit

  1. 如果所有参与者都返回Yes,协调者发送PreCommit
  2. 参与者执行事务,但不提交
  3. 参与者返回Ack

阶段3:DoCommit

  1. 协调者发送DoCommit
  2. 参与者提交事务
  3. 参与者返回Ack

3.2.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@Service
public class ThreePhaseCommitCoordinator {

@Autowired
private List<TransactionParticipant> participants;

public boolean executeTransaction(Transaction transaction) {
// 阶段1:CanCommit
List<Boolean> canCommitResults = new ArrayList<>();
for (TransactionParticipant participant : participants) {
try {
boolean canCommit = participant.canCommit(transaction);
canCommitResults.add(canCommit);
} catch (Exception e) {
canCommitResults.add(false);
}
}

boolean allCanCommit = canCommitResults.stream().allMatch(result -> result);
if (!allCanCommit) {
// 有参与者不能提交,中止
for (TransactionParticipant participant : participants) {
participant.abort(transaction);
}
return false;
}

// 阶段2:PreCommit
List<Boolean> preCommitResults = new ArrayList<>();
for (TransactionParticipant participant : participants) {
try {
boolean preCommitted = participant.preCommit(transaction);
preCommitResults.add(preCommitted);
} catch (Exception e) {
preCommitResults.add(false);
}
}

boolean allPreCommitted = preCommitResults.stream().allMatch(result -> result);
if (!allPreCommitted) {
// 有参与者预提交失败,中止
for (TransactionParticipant participant : participants) {
participant.abort(transaction);
}
return false;
}

// 阶段3:DoCommit
for (TransactionParticipant participant : participants) {
try {
participant.doCommit(transaction);
} catch (Exception e) {
log.error("DoCommit failed", e);
}
}
return true;
}
}

@Component
public class TransactionParticipant {

public boolean canCommit(Transaction transaction) {
// 检查是否可以提交
// 例如:检查资源是否充足、系统是否正常等
return checkResources(transaction);
}

public boolean preCommit(Transaction transaction) {
// 预提交:执行事务操作,但不提交
try {
executeTransactionOperations(transaction);
return true;
} catch (Exception e) {
log.error("PreCommit failed", e);
return false;
}
}

public void doCommit(Transaction transaction) {
// 提交事务
try {
commitTransaction(transaction);
} catch (Exception e) {
log.error("DoCommit failed", e);
throw new RuntimeException("DoCommit failed", e);
}
}

public void abort(Transaction transaction) {
// 中止事务
rollbackTransaction(transaction);
}
}

3.2.3 优缺点

优点

  • 减少阻塞时间
  • 提高可用性
  • 减少单点故障影响

缺点

  • 实现更复杂
  • 仍然有性能问题
  • 网络通信次数增加

3.2.4 适用场景

适用场景

  • 对一致性要求极高的场景
  • 可以接受3轮网络通信的场景
  • 需要减少阻塞时间的场景

不适用场景

  • 高并发场景
  • 网络延迟较高的场景

3.3 TCC(Try-Confirm-Cancel)

3.3.1 原理

TCC(Try-Confirm-Cancel):补偿型事务,分为三个阶段。

阶段1:Try(尝试)

  1. 尝试执行业务操作
  2. 预留资源
  3. 不提交事务

阶段2:Confirm(确认)

  1. 如果所有Try都成功,执行Confirm
  2. 提交事务
  3. 释放预留资源

阶段3:Cancel(取消)

  1. 如果任何Try失败,执行Cancel
  2. 回滚事务
  3. 释放预留资源

3.3.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@Service
public class TCCTransactionService {

@Autowired
private OrderService orderService;

@Autowired
private InventoryService inventoryService;

@Autowired
private AccountService accountService;

public void createOrder(Order order) {
String transactionId = UUID.randomUUID().toString();

try {
// 阶段1:Try(尝试)
// 1. 尝试创建订单
OrderTccResult orderResult = orderService.tryCreateOrder(order, transactionId);

// 2. 尝试扣减库存
InventoryTccResult inventoryResult = inventoryService.tryDeductStock(
order.getSkuId(), order.getQuantity(), transactionId);

// 3. 尝试扣减账户余额
AccountTccResult accountResult = accountService.tryDeductBalance(
order.getUserId(), order.getAmount(), transactionId);

// 检查所有Try是否成功
if (orderResult.isSuccess() && inventoryResult.isSuccess() && accountResult.isSuccess()) {
// 阶段2:Confirm(确认)
orderService.confirmCreateOrder(transactionId);
inventoryService.confirmDeductStock(transactionId);
accountService.confirmDeductBalance(transactionId);
} else {
// 阶段3:Cancel(取消)
orderService.cancelCreateOrder(transactionId);
inventoryService.cancelDeductStock(transactionId);
accountService.cancelDeductBalance(transactionId);
throw new BusinessException("创建订单失败");
}
} catch (Exception e) {
// 发生异常,执行Cancel
orderService.cancelCreateOrder(transactionId);
inventoryService.cancelDeductStock(transactionId);
accountService.cancelDeductBalance(transactionId);
throw e;
}
}
}

@Service
public class InventoryService {

@Autowired
private InventoryMapper inventoryMapper;

@Autowired
private TccRecordMapper tccRecordMapper;

public InventoryTccResult tryDeductStock(Long skuId, Integer quantity, String transactionId) {
try {
// 1. 检查库存
Inventory inventory = inventoryMapper.selectById(skuId);
if (inventory.getStock() < quantity) {
return InventoryTccResult.failure("库存不足");
}

// 2. 预留库存(冻结)
inventory.setFrozenStock(inventory.getFrozenStock() + quantity);
inventory.setStock(inventory.getStock() - quantity);
inventoryMapper.updateById(inventory);

// 3. 记录TCC状态
TccRecord record = new TccRecord();
record.setTransactionId(transactionId);
record.setServiceName("InventoryService");
record.setMethodName("tryDeductStock");
record.setStatus(TccStatus.TRY);
record.setParams(JSON.toJSONString(new Object[]{skuId, quantity}));
tccRecordMapper.insert(record);

return InventoryTccResult.success();
} catch (Exception e) {
log.error("Try deduct stock failed", e);
return InventoryTccResult.failure(e.getMessage());
}
}

public void confirmDeductStock(String transactionId) {
// 确认:提交预留的库存
TccRecord record = tccRecordMapper.selectByTransactionId(transactionId);
if (record == null || record.getStatus() != TccStatus.TRY) {
throw new BusinessException("TCC状态异常");
}

// 更新TCC状态
record.setStatus(TccStatus.CONFIRM);
tccRecordMapper.updateById(record);

// 确认操作:预留的库存已经扣减,不需要额外操作
log.info("Confirm deduct stock: {}", transactionId);
}

public void cancelDeductStock(String transactionId) {
// 取消:释放预留的库存
TccRecord record = tccRecordMapper.selectByTransactionId(transactionId);
if (record == null) {
return;
}

// 解析参数
Object[] params = JSON.parseObject(record.getParams(), Object[].class);
Long skuId = Long.parseLong(params[0].toString());
Integer quantity = Integer.parseInt(params[1].toString());

// 释放预留的库存
Inventory inventory = inventoryMapper.selectById(skuId);
inventory.setFrozenStock(inventory.getFrozenStock() - quantity);
inventory.setStock(inventory.getStock() + quantity);
inventoryMapper.updateById(inventory);

// 更新TCC状态
record.setStatus(TccStatus.CANCEL);
tccRecordMapper.updateById(record);

log.info("Cancel deduct stock: {}", transactionId);
}
}

@Service
public class AccountService {

@Autowired
private AccountMapper accountMapper;

@Autowired
private TccRecordMapper tccRecordMapper;

public AccountTccResult tryDeductBalance(Long userId, BigDecimal amount, String transactionId) {
try {
// 1. 检查余额
Account account = accountMapper.selectById(userId);
if (account.getBalance().compareTo(amount) < 0) {
return AccountTccResult.failure("余额不足");
}

// 2. 冻结余额
account.setFrozenBalance(account.getFrozenBalance().add(amount));
account.setBalance(account.getBalance().subtract(amount));
accountMapper.updateById(account);

// 3. 记录TCC状态
TccRecord record = new TccRecord();
record.setTransactionId(transactionId);
record.setServiceName("AccountService");
record.setMethodName("tryDeductBalance");
record.setStatus(TccStatus.TRY);
record.setParams(JSON.toJSONString(new Object[]{userId, amount}));
tccRecordMapper.insert(record);

return AccountTccResult.success();
} catch (Exception e) {
log.error("Try deduct balance failed", e);
return AccountTccResult.failure(e.getMessage());
}
}

public void confirmDeductBalance(String transactionId) {
// 确认:提交冻结的余额
TccRecord record = tccRecordMapper.selectByTransactionId(transactionId);
if (record == null || record.getStatus() != TccStatus.TRY) {
throw new BusinessException("TCC状态异常");
}

record.setStatus(TccStatus.CONFIRM);
tccRecordMapper.updateById(record);

log.info("Confirm deduct balance: {}", transactionId);
}

public void cancelDeductBalance(String transactionId) {
// 取消:释放冻结的余额
TccRecord record = tccRecordMapper.selectByTransactionId(transactionId);
if (record == null) {
return;
}

Object[] params = JSON.parseObject(record.getParams(), Object[].class);
Long userId = Long.parseLong(params[0].toString());
BigDecimal amount = new BigDecimal(params[1].toString());

Account account = accountMapper.selectById(userId);
account.setFrozenBalance(account.getFrozenBalance().subtract(amount));
account.setBalance(account.getBalance().add(amount));
accountMapper.updateById(account);

record.setStatus(TccStatus.CANCEL);
tccRecordMapper.updateById(record);

log.info("Cancel deduct balance: {}", transactionId);
}
}

3.3.3 优缺点

优点

  • 性能较高(不需要全局锁)
  • 适合高并发场景
  • 可以灵活控制事务粒度

缺点

  • 实现复杂(需要实现Try、Confirm、Cancel三个方法)
  • 业务侵入性强
  • 需要保证幂等性

3.3.4 适用场景

适用场景

  • 高并发场景
  • 对性能要求较高的场景
  • 可以接受最终一致性的场景
  • 业务逻辑相对简单的场景

不适用场景

  • 业务逻辑复杂的场景
  • 需要强一致性的场景

3.4 Seata AT模式

3.4.1 原理

Seata AT模式:自动补偿模式,基于数据源代理实现。

特点

  • 自动生成回滚SQL
  • 业务代码无侵入
  • 性能较高

3.4.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;

@Autowired
private InventoryService inventoryService;

@Autowired
private AccountService accountService;

@GlobalTransactional(rollbackFor = Exception.class)
public void createOrder(Order order) {
// 1. 创建订单
orderMapper.insert(order);

// 2. 扣减库存(Seata自动管理事务)
inventoryService.deductStock(order.getSkuId(), order.getQuantity());

// 3. 扣减账户余额(Seata自动管理事务)
accountService.deductBalance(order.getUserId(), order.getAmount());

// 如果任何步骤失败,Seata会自动回滚所有操作
}
}

@Service
public class InventoryService {

@Autowired
private InventoryMapper inventoryMapper;

public void deductStock(Long skuId, Integer quantity) {
Inventory inventory = inventoryMapper.selectById(skuId);
if (inventory.getStock() < quantity) {
throw new BusinessException("库存不足");
}
inventory.setStock(inventory.getStock() - quantity);
inventoryMapper.updateById(inventory);
}
}

3.4.3 优缺点

优点

  • 业务代码无侵入
  • 自动生成回滚SQL
  • 性能较高

缺点

  • 需要Seata框架支持
  • 对数据库有要求(需要支持undo_log表)

3.4.4 适用场景

适用场景

  • 使用Seata框架的场景
  • 需要业务代码无侵入的场景
  • 对性能要求较高的场景

4. 最终一致性方案

4.1 Saga模式

4.1.1 原理

Saga模式:长事务模式,将大事务拆分为多个小事务。

两种实现方式

  1. 编排式(Orchestration):中央协调器协调各个服务
  2. 协同式(Choreography):各个服务通过事件协同

4.1.2 编排式实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@Service
public class SagaOrchestrator {

@Autowired
private OrderService orderService;

@Autowired
private InventoryService inventoryService;

@Autowired
private AccountService accountService;

@Autowired
private SagaTransactionMapper sagaTransactionMapper;

public void createOrder(Order order) {
String sagaId = UUID.randomUUID().toString();
SagaTransaction saga = new SagaTransaction();
saga.setSagaId(sagaId);
saga.setStatus(SagaStatus.STARTED);
sagaTransactionMapper.insert(saga);

try {
// 步骤1:创建订单
orderService.createOrder(order, sagaId);
saga.addStep(new SagaStep("createOrder", SagaStepStatus.COMPLETED));

// 步骤2:扣减库存
inventoryService.deductStock(order.getSkuId(), order.getQuantity(), sagaId);
saga.addStep(new SagaStep("deductStock", SagaStepStatus.COMPLETED));

// 步骤3:扣减账户余额
accountService.deductBalance(order.getUserId(), order.getAmount(), sagaId);
saga.addStep(new SagaStep("deductBalance", SagaStepStatus.COMPLETED));

// 所有步骤成功,完成Saga
saga.setStatus(SagaStatus.COMPLETED);
sagaTransactionMapper.updateById(saga);
} catch (Exception e) {
// 发生异常,执行补偿
compensate(saga);
throw e;
}
}

private void compensate(SagaTransaction saga) {
// 按照相反顺序执行补偿操作
List<SagaStep> steps = saga.getSteps();
Collections.reverse(steps);

for (SagaStep step : steps) {
if (step.getStatus() == SagaStepStatus.COMPLETED) {
try {
compensateStep(step);
step.setStatus(SagaStepStatus.COMPENSATED);
} catch (Exception e) {
log.error("Compensate step failed: {}", step.getName(), e);
// 补偿失败,需要人工介入
}
}
}

saga.setStatus(SagaStatus.COMPENSATED);
sagaTransactionMapper.updateById(saga);
}

private void compensateStep(SagaStep step) {
switch (step.getName()) {
case "createOrder":
orderService.cancelOrder(step.getSagaId());
break;
case "deductStock":
inventoryService.restoreStock(step.getSagaId());
break;
case "deductBalance":
accountService.restoreBalance(step.getSagaId());
break;
}
}
}

4.1.3 协同式实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void createOrder(Order order) {
// 1. 创建订单
order.setStatus(OrderStatus.PENDING);
orderMapper.insert(order);

// 2. 发布事件
OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderId(order.getId());
event.setSkuId(order.getSkuId());
event.setQuantity(order.getQuantity());
event.setUserId(order.getUserId());
event.setAmount(order.getAmount());

kafkaTemplate.send("order-events", JSON.toJSONString(event));
}

@KafkaListener(topics = "order-compensation", groupId = "order-service")
public void handleCompensation(String message) {
OrderCompensationEvent event = JSON.parseObject(message, OrderCompensationEvent.class);

// 补偿:取消订单
Order order = orderMapper.selectById(event.getOrderId());
order.setStatus(OrderStatus.CANCELLED);
orderMapper.updateById(order);
}
}

@Service
public class InventoryService {

@Autowired
private InventoryMapper inventoryMapper;

@KafkaListener(topics = "order-events", groupId = "inventory-service")
public void handleOrderCreated(String message) {
OrderCreatedEvent event = JSON.parseObject(message, OrderCreatedEvent.class);

try {
// 扣减库存
Inventory inventory = inventoryMapper.selectById(event.getSkuId());
if (inventory.getStock() < event.getQuantity()) {
throw new BusinessException("库存不足");
}
inventory.setStock(inventory.getStock() - event.getQuantity());
inventoryMapper.updateById(inventory);

// 发布库存扣减成功事件
InventoryDeductedEvent deductedEvent = new InventoryDeductedEvent();
deductedEvent.setOrderId(event.getOrderId());
deductedEvent.setSkuId(event.getSkuId());
deductedEvent.setQuantity(event.getQuantity());
kafkaTemplate.send("inventory-events", JSON.toJSONString(deductedEvent));
} catch (Exception e) {
// 发布库存扣减失败事件,触发补偿
InventoryDeductFailedEvent failedEvent = new InventoryDeductFailedEvent();
failedEvent.setOrderId(event.getOrderId());
failedEvent.setReason(e.getMessage());
kafkaTemplate.send("order-compensation", JSON.toJSONString(failedEvent));
}
}
}

4.1.4 优缺点

优点

  • 适合长事务
  • 性能较高
  • 可以灵活控制事务粒度

缺点

  • 实现复杂
  • 需要实现补偿逻辑
  • 可能出现数据不一致的中间状态

4.1.5 适用场景

适用场景

  • 长事务场景
  • 可以接受最终一致性的场景
  • 业务逻辑相对简单的场景

不适用场景

  • 需要强一致性的场景
  • 业务逻辑复杂的场景

4.2 本地消息表

4.2.1 原理

本地消息表:在本地数据库中创建消息表,保证本地事务和消息发送的一致性。

流程

  1. 执行业务操作,同时插入消息到本地消息表
  2. 定时任务扫描本地消息表,发送消息到消息队列
  3. 消费者处理消息
  4. 更新消息状态

4.2.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
@Entity
public class LocalMessage {
@Id
private String id;
private String businessId;
private String topic;
private String message;
private Integer status; // 0-待发送, 1-已发送, 2-发送失败
private Integer retryCount;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}

@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;

@Autowired
private LocalMessageMapper localMessageMapper;

@Transactional
public void createOrder(Order order) {
// 1. 创建订单(本地事务)
orderMapper.insert(order);

// 2. 插入消息到本地消息表(同一事务)
LocalMessage message = new LocalMessage();
message.setId(UUID.randomUUID().toString());
message.setBusinessId(order.getId().toString());
message.setTopic("order-created");
message.setMessage(JSON.toJSONString(order));
message.setStatus(0); // 待发送
message.setRetryCount(0);
message.setCreateTime(LocalDateTime.now());
localMessageMapper.insert(message);

// 事务提交后,消息会被定时任务发送
}
}

@Component
public class LocalMessageScheduler {

@Autowired
private LocalMessageMapper localMessageMapper;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Scheduled(fixedDelay = 5000) // 每5秒执行一次
public void sendPendingMessages() {
// 查询待发送的消息
List<LocalMessage> messages = localMessageMapper.selectByStatus(0);

for (LocalMessage message : messages) {
try {
// 发送消息到Kafka
kafkaTemplate.send(message.getTopic(), message.getMessage());

// 更新消息状态为已发送
message.setStatus(1);
message.setUpdateTime(LocalDateTime.now());
localMessageMapper.updateById(message);
} catch (Exception e) {
log.error("Send message failed: {}", message.getId(), e);

// 更新重试次数
message.setRetryCount(message.getRetryCount() + 1);
if (message.getRetryCount() >= 3) {
// 重试次数超过3次,标记为发送失败
message.setStatus(2);
}
message.setUpdateTime(LocalDateTime.now());
localMessageMapper.updateById(message);
}
}
}
}

@Component
public class OrderEventConsumer {

@Autowired
private InventoryService inventoryService;

@Autowired
private AccountService accountService;

@KafkaListener(topics = "order-created", groupId = "order-processor")
public void handleOrderCreated(String message) {
Order order = JSON.parseObject(message, Order.class);

try {
// 处理订单创建事件
// 1. 扣减库存
inventoryService.deductStock(order.getSkuId(), order.getQuantity());

// 2. 扣减账户余额
accountService.deductBalance(order.getUserId(), order.getAmount());

// 3. 更新订单状态
order.setStatus(OrderStatus.CONFIRMED);
orderMapper.updateById(order);
} catch (Exception e) {
log.error("Handle order created event failed", e);
// 处理失败,可以发送补偿消息
}
}
}

4.2.3 优缺点

优点

  • 实现简单
  • 保证本地事务和消息发送的一致性
  • 不需要额外的消息中间件支持

缺点

  • 需要额外的消息表
  • 定时任务可能有延迟
  • 需要处理消息重复发送

4.2.4 适用场景

适用场景

  • 需要保证本地事务和消息发送一致性的场景
  • 可以接受最终一致性的场景
  • 消息量不是特别大的场景

不适用场景

  • 需要强一致性的场景
  • 消息量特别大的场景

4.3 事务消息(RocketMQ)

4.3.1 原理

事务消息:RocketMQ提供的事务消息功能,保证消息发送和本地事务的一致性。

流程

  1. 发送Half消息(预发送消息)
  2. 执行本地事务
  3. 提交或回滚消息

4.3.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Transactional
public void createOrder(Order order) {
// 1. 发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-tx-group",
"order-created",
MessageBuilder.withPayload(order).build(),
new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 2. 执行本地事务
orderMapper.insert(order);

// 3. 本地事务成功,提交消息
return LocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("Local transaction failed", e);
// 本地事务失败,回滚消息
return LocalTransactionState.ROLLBACK;
}
}

@Override
public LocalTransactionState checkLocalTransaction(Message msg) {
// 4. 检查本地事务状态
String orderId = msg.getKeys();
Order order = orderMapper.selectById(Long.parseLong(orderId));
if (order != null) {
return LocalTransactionState.COMMIT;
} else {
return LocalTransactionState.ROLLBACK;
}
}
}
);
}
}

@Component
@RocketMQMessageListener(topic = "order-created", consumerGroup = "order-processor")
public class OrderEventConsumer implements RocketMQListener<Order> {

@Autowired
private InventoryService inventoryService;

@Autowired
private AccountService accountService;

@Override
public void onMessage(Order order) {
try {
// 处理订单创建事件
// 1. 扣减库存
inventoryService.deductStock(order.getSkuId(), order.getQuantity());

// 2. 扣减账户余额
accountService.deductBalance(order.getUserId(), order.getAmount());

// 3. 更新订单状态
order.setStatus(OrderStatus.CONFIRMED);
orderMapper.updateById(order);
} catch (Exception e) {
log.error("Handle order created event failed", e);
// 处理失败,可以发送补偿消息
}
}
}

4.3.3 优缺点

优点

  • 保证消息发送和本地事务的一致性
  • 不需要额外的消息表
  • 性能较高

缺点

  • 需要RocketMQ支持
  • 实现相对复杂

4.3.4 适用场景

适用场景

  • 使用RocketMQ的场景
  • 需要保证消息发送和本地事务一致性的场景
  • 可以接受最终一致性的场景

不适用场景

  • 需要强一致性的场景
  • 不使用RocketMQ的场景

5. 方案对比

5.1 方案对比表

方案 一致性 性能 复杂度 适用场景
2PC 强一致 简单 小规模系统,对一致性要求极高
3PC 强一致 中等 需要减少阻塞时间的场景
TCC 强一致 复杂 高并发场景,可以接受业务侵入
Seata AT 强一致 简单 使用Seata框架的场景
Saga 最终一致 复杂 长事务场景
本地消息表 最终一致 简单 需要保证本地事务和消息发送一致性
事务消息 最终一致 中等 使用RocketMQ的场景

5.2 选择原则

5.2.1 一致性要求

强一致性要求

  • 选择:2PC、3PC、TCC、Seata AT
  • 适用场景:金融系统、支付系统

最终一致性要求

  • 选择:Saga、本地消息表、事务消息
  • 适用场景:电商系统、社交网络

5.2.2 性能要求

高性能要求

  • 选择:TCC、Saga、事务消息
  • 避免:2PC、3PC

中等性能要求

  • 选择:Seata AT、本地消息表

5.2.3 复杂度要求

简单实现

  • 选择:2PC、本地消息表、Seata AT

可以接受复杂实现

  • 选择:TCC、Saga

6. 实战案例

6.1 案例1:支付系统(TCC)

6.1.1 场景

需求:支付系统,需要保证强一致性。

选择方案TCC

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Service
public class PaymentService {

@Autowired
private AccountService accountService;

@Autowired
private PaymentRecordService paymentRecordService;

public void pay(Long accountId, BigDecimal amount) {
String transactionId = UUID.randomUUID().toString();

try {
// Try阶段
AccountTccResult accountResult = accountService.tryDeductBalance(
accountId, amount, transactionId);
PaymentRecordTccResult recordResult = paymentRecordService.tryCreateRecord(
accountId, amount, transactionId);

if (accountResult.isSuccess() && recordResult.isSuccess()) {
// Confirm阶段
accountService.confirmDeductBalance(transactionId);
paymentRecordService.confirmCreateRecord(transactionId);
} else {
// Cancel阶段
accountService.cancelDeductBalance(transactionId);
paymentRecordService.cancelCreateRecord(transactionId);
}
} catch (Exception e) {
accountService.cancelDeductBalance(transactionId);
paymentRecordService.cancelCreateRecord(transactionId);
throw e;
}
}
}

6.2 案例2:电商系统(本地消息表)

6.2.1 场景

需求:电商系统,订单创建后异步处理库存和账户。

选择方案本地消息表

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Service
public class OrderService {

@Transactional
public void createOrder(Order order) {
// 1. 创建订单
orderMapper.insert(order);

// 2. 插入消息到本地消息表
LocalMessage message = new LocalMessage();
message.setId(UUID.randomUUID().toString());
message.setBusinessId(order.getId().toString());
message.setTopic("order-created");
message.setMessage(JSON.toJSONString(order));
message.setStatus(0);
localMessageMapper.insert(message);
}
}

6.3 案例3:长事务系统(Saga)

6.3.1 场景

需求:订单系统,包含多个步骤,需要支持补偿。

选择方案Saga

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Service
public class OrderSagaService {

public void createOrder(Order order) {
String sagaId = UUID.randomUUID().toString();

try {
// 步骤1:创建订单
orderService.createOrder(order, sagaId);

// 步骤2:扣减库存
inventoryService.deductStock(order.getSkuId(), order.getQuantity(), sagaId);

// 步骤3:扣减账户余额
accountService.deductBalance(order.getUserId(), order.getAmount(), sagaId);

// 步骤4:发送通知
notificationService.sendNotification(order, sagaId);
} catch (Exception e) {
// 执行补偿
compensate(sagaId);
throw e;
}
}
}

7. 总结

7.1 核心要点

  1. 分布式事务方案:2PC、3PC、TCC、Saga、本地消息表、事务消息
  2. 强一致性方案:2PC、3PC、TCC、Seata AT
  3. 最终一致性方案:Saga、本地消息表、事务消息
  4. 选择原则:根据一致性要求、性能要求、复杂度要求选择
  5. 适用场景:不同场景选择不同方案

7.2 关键理解

  1. 强一致性:保证数据立即一致,但性能较低
  2. 最终一致性:保证数据最终一致,性能较高
  3. 业务驱动:根据业务需求选择方案
  4. 权衡取舍:在一致性和性能之间权衡

7.3 最佳实践

  1. 金融系统:TCC(强一致性)
  2. 电商系统:本地消息表或事务消息(最终一致性)
  3. 长事务系统:Saga(最终一致性)
  4. 高并发系统:TCC或Saga(高性能)
  5. 简单系统:2PC或本地消息表(简单实现)

相关文章