分布式事务架构实战:ACID特性、事务管理与企业级事务解决方案

一、事务概述

1.1 事务基本概念

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
事务定义:
事务(Transaction):
- 一组数据库操作组成的工作单位
- 要么全部成功,要么全部失败
- 保证数据一致性

ACID特性:
原子性(Atomicity):
- 事务是不可分割的工作单位
- 要么全部执行,要么全部不执行
- 中间状态不可见

一致性(Consistency):
- 事务前后数据库状态一致
- 约束条件保持有效
- 数据完整性不变

隔离性(Isolation):
- 并发事务相互隔离
- 不同隔离级别提供不同保障
- 避免脏读、不可重复读、幻读

持久性(Durability):
- 事务提交后结果永久保存
- 即使系统崩溃也不丢失
- 通过日志和持久化保证

1.2 本地事务vs分布式事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
事务类型对比:
本地事务:
- 单个数据库
- 简单高效
- ACID保证强
- 性能开销小

分布式事务:
- 跨多个数据库
- 跨多个服务
- 2PC、3PC、TCC、Saga
- 性能开销大

挑战:
分布式事务:
- 网络延迟
- 节点故障
- 数据一致性
- 性能影响

二、本地事务实现

2.1 JDBC事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// JDBC事务示例
public void transferMoney(Connection conn,
String fromAccount,
String toAccount,
double amount) throws SQLException {

conn.setAutoCommit(false); // 关闭自动提交

try {
PreparedStatement stmt1 = conn.prepareStatement(
"UPDATE accounts SET balance = balance - ? WHERE account_id = ?"
);
stmt1.setDouble(1, amount);
stmt1.setString(2, fromAccount);
stmt1.executeUpdate();

PreparedStatement stmt2 = conn.prepareStatement(
"UPDATE accounts SET balance = balance + ? WHERE account_id = ?"
);
stmt2.setDouble(1, amount);
stmt2.setString(2, toAccount);
stmt2.executeUpdate();

conn.commit(); // 提交事务

} catch (SQLException e) {
conn.rollback(); // 回滚事务
throw e;
}
}

2.2 Spring声明式事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// @Transactional注解
@Service
public class AccountService {

@Autowired
private AccountRepository accountRepository;

/**
* 转账方法
*/
@Transactional(rollbackFor = Exception.class)
public void transferMoney(String fromAccount, String toAccount, double amount) {
// 1. 扣减转出账户
Account from = accountRepository.findByAccountId(fromAccount);
from.setBalance(from.getBalance() - amount);
accountRepository.save(from);

// 2. 增加转入账户
Account to = accountRepository.findByAccountId(toAccount);
to.setBalance(to.getBalance() + amount);
accountRepository.save(to);

// 3. 如果任何操作失败,整个事务回滚
}
}

2.3 编程式事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 编程式事务
@Service
public class AccountService {

@Autowired
private TransactionTemplate transactionTemplate;

@Autowired
private PlatformTransactionManager transactionManager;

public void transferMoney(String fromAccount, String toAccount, double amount) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
try {
// 业务逻辑
accountRepository.decreaseBalance(fromAccount, amount);
accountRepository.increaseBalance(toAccount, amount);
} catch (Exception e) {
status.setRollbackOnly(); // 标记回滚
throw e;
}
}
});
}
}

三、事务隔离级别

3.1 隔离级别说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- 隔离级别从低到高:
-- 1. READ UNCOMMITTED (读未提交)
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
-- 问题: 脏读、不可重复读、幻读

-- 2. READ COMMITTED (读已提交)
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- 问题: 不可重复读、幻读

-- 3. REPEATABLE READ (可重复读) - MySQL默认
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- 问题: 幻读

-- 4. SERIALIZABLE (串行化)
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- 无问题,但性能最差

3.2 隔离级别配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Spring事务配置
@Configuration
@EnableTransactionManagement
public class TransactionConfig {

@Bean
public DataSourceTransactionManager transactionManager(DataSource dataSource) {
DataSourceTransactionManager manager = new DataSourceTransactionManager();
manager.setDataSource(dataSource);

// 设置默认隔离级别
manager.setDefaultTimeout(30); // 超时30秒
manager.setRollbackOnCommitFailure(true); // 提交失败回滚

return manager;
}

/**
* 指定隔离级别
*/
@Transactional(
isolation = Isolation.REPEATABLE_READ,
timeout = 30,
rollbackFor = Exception.class
)
public void isolatedOperation() {
// 使用REPEATABLE READ隔离级别
}
}

四、分布式事务场景

4.1 典型场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
分布式事务场景:
场景1: 跨数据库转账
- 银行A数据库 -> 银行B数据库
- 需要保证一致性
- 2PC模式

场景2: 微服务调用
- 订单服务 -> 库存服务 -> 支付服务
- 需要保证一致性
- Saga/TCC模式

场景3: 消息中间件
- 数据库事务 + 发送消息
- 本地消息表
- 事务消息

场景4: 分布式锁
- Redis/Database锁
- 需要全局一致性

五、分布式事务解决方案

5.1 2PC两阶段提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 2PC实现
public class TwoPhaseCommit {

/**
* 第一阶段: Prepare
*/
public boolean prepare(TransactionContext context) {
List<Participant> participants = context.getParticipants();

for (Participant p : participants) {
try {
boolean prepared = p.prepare(context.getTransactionId());
if (!prepared) {
// 有参与者prepare失败,全部abort
abort(participants);
return false;
}
} catch (Exception e) {
abort(participants);
return false;
}
}

return true;
}

/**
* 第二阶段: Commit
*/
public void commit(List<Participant> participants) {
for (Participant p : participants) {
p.commit();
}
}

/**
* 第二阶段: Abort
*/
public void abort(List<Participant> participants) {
for (Participant p : participants) {
p.rollback();
}
}
}

// 参与者接口
public interface Participant {
boolean prepare(String transactionId);
void commit();
void rollback();
}

5.2 TCC模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// TCC: Try-Confirm-Cancel
@Service
public class OrderService {

/**
* Try阶段: 预留资源
*/
@Transactional
public void tryOrder(Order order) {
// 1. 冻结库存
inventoryService.freeze(order.getProductId(), order.getQuantity());

// 2. 创建订单(待确认状态)
order.setStatus("TRYING");
orderRepository.save(order);

// 3. 冻结优惠券
couponService.freeze(order.getCouponId());

// 如果失败,抛出异常,触发Cancel
}

/**
* Confirm阶段: 确认操作
*/
@Transactional
public void confirmOrder(Order order) {
// 1. 确认订单
order.setStatus("CONFIRMED");
orderRepository.save(order);

// 2. 实际扣减库存
inventoryService.deduct(order.getProductId(), order.getQuantity());

// 3. 使用优惠券
couponService.use(order.getCouponId());
}

/**
* Cancel阶段: 取消操作
*/
@Transactional
public void cancelOrder(Order order) {
// 1. 取消订单
order.setStatus("CANCELLED");
orderRepository.save(order);

// 2. 释放库存
inventoryService.release(order.getProductId(), order.getQuantity());

// 3. 释放优惠券
couponService.release(order.getCouponId());
}
}

5.3 Saga模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Saga: 长事务的补偿模式
@Service
public class OrderSaga {

/**
* Saga协调者
*/
public void processOrder(Order order) {
List<SagaStep> steps = buildSteps(order);

for (SagaStep step : steps) {
try {
step.execute();
} catch (Exception e) {
// 执行失败,补偿之前的步骤
compensate(step);
throw e;
}
}
}

private List<SagaStep> buildSteps(Order order) {
return Arrays.asList(
new CreateOrderStep(order),
new ReserveInventoryStep(order),
new CreatePaymentStep(order),
new ShipOrderStep(order)
);
}

/**
* 补偿操作
*/
private void compensate(SagaStep failedStep) {
// 按相反顺序补偿
for (SagaStep step : reversed(steps)) {
if (step != failedStep) {
step.compensate();
}
}
}
}

// Saga步骤
public abstract class SagaStep {
public abstract void execute();
public abstract void compensate();
}

5.4 Seata分布式事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Seata AT模式
@Configuration
public class SeataConfig {

@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}

@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("my-tx-service-group", "my_app");
}
}

// 使用Seata
@Service
public class OrderService {

@GlobalTransactional(rollbackFor = Exception.class)
public void createOrder(Order order) {
// 1. 创建订单
orderRepository.save(order);

// 2. 扣减库存
inventoryService.decrease(order.getProductId(), order.getQuantity());

// 3. 扣减余额
accountService.decrease(order.getUserId(), order.getAmount());

// 4. 如果任何操作失败,Seata自动回滚所有操作
}
}

六、本地消息表

6.1 本地消息表实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 本地消息表
@Entity
public class OutboxMessage {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

private String topic;
private String payload;
private String status; // PENDING, SENT, FAILED

private LocalDateTime createdAt;
private LocalDateTime sentAt;
}

@Service
public class OutboxService {

@Autowired
private OutboxRepository outboxRepository;

@Autowired
private KafkaProducer kafkaProducer;

/**
* 事务性发送消息
*/
@Transactional(rollbackFor = Exception.class)
public void saveWithMessage(Order order) {
// 1. 保存订单(事务)
orderRepository.save(order);

// 2. 保存消息到本地表(同一事务)
OutboxMessage message = new OutboxMessage();
message.setTopic("order-created");
message.setPayload(JSON.toJSONString(order));
message.setStatus("PENDING");
message.setCreatedAt(LocalDateTime.now());
outboxRepository.save(message);

// 事务提交后,消息在本地表中
}

/**
* 定时发送消息
*/
@Scheduled(fixedDelay = 1000)
public void sendPendingMessages() {
List<OutboxMessage> messages = outboxRepository.findByStatus("PENDING");

for (OutboxMessage msg : messages) {
try {
kafkaProducer.send(msg.getTopic(), msg.getPayload());
msg.setStatus("SENT");
msg.setSentAt(LocalDateTime.now());
outboxRepository.save(msg);
} catch (Exception e) {
msg.setStatus("FAILED");
outboxRepository.save(msg);
log.error("发送消息失败", e);
}
}
}
}

七、事务消息

7.1 RocketMQ事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// RocketMQ事务消息
@Service
public class TransactionMessageService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
* 使用事务消息
*/
@Transactional(rollbackFor = Exception.class)
public void createOrderWithMessage(Order order) {
// 1. 发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-group",
"order-topic:order-tag",
MessageBuilder.withPayload(order).build(),
order // 本地事务参数
);

// 2. 保存订单
orderRepository.save(order);
}

/**
* 本地事务执行
*/
@RocketMQTransactionListener(txProducerGroup = "order-group")
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Order order = (Order) arg;

// 执行本地事务
orderRepository.save(order);
inventoryService.decrease(order.getProductId(), order.getQuantity());

// 提交本地事务
return RocketMQLocalTransactionState.COMMIT;

} catch (Exception e) {
// 回滚
return RocketMQLocalTransactionState.ROLLBACK;
}
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务状态
String orderId = (String) msg.getHeaders().get("orderId");
Order order = orderRepository.findById(orderId);

if (order != null) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
}

八、事务监控

8.1 事务监控实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 事务监控
@Service
public class TransactionMonitorService {

private final MeterRegistry meterRegistry;

public TransactionMonitorService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

/**
* 监控事务
*/
@Around("@annotation(org.springframework.transaction.annotation.Transactional)")
public Object monitorTransaction(ProceedingJoinPoint joinPoint) throws Throwable {
String methodName = joinPoint.getSignature().toShortString();

Timer.Sample sample = Timer.start(meterRegistry);

try {
Object result = joinPoint.proceed();

// 记录成功
meterRegistry.counter("transaction.success", "method", methodName).increment();

return result;
} catch (Exception e) {
// 记录失败
meterRegistry.counter("transaction.failure", "method", methodName).increment();
throw e;
} finally {
sample.stop(meterRegistry.timer("transaction.duration", "method", methodName));
}
}
}

九、最佳实践

9.1 实践建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
事务管理最佳实践:
1. 事务边界:
- 事务尽量小
- 避免长事务
- 不在事务中调用外部服务

2. 异常处理:
- 明确指定rollbackFor
- 区分业务异常和系统异常
- 正确处理检查异常

3. 隔离级别:
- 根据业务选择合适级别
- 避免过度使用SERIALIZABLE
- 注意死锁问题

4. 性能优化:
- 使用读写分离
- 选择合适的锁粒度
- 避免热点数据

5. 分布式事务:
- 避免强一致性
- 使用最终一致性
- 选择合适的模式

十、总结

10.1 核心要点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
事务管理核心:
本地事务:
- ACID保证强
- 使用@Transactional
- 控制事务边界

分布式事务:
- 2PC: 强一致性,性能差
- TCC: 强一致性,需要业务改造
- Saga: 最终一致性,性能好
- Seata: 易集成

选择原则:
- 能不用分布式事务就不用
- 能用本地消息表就用
- 优先最终一致性

10.2 实践建议

  1. 优先本地事务: 满足需求则避免分布式事务
  2. 合理使用隔离级别: 按业务选择级别
  3. 避免长事务: 控制事务边界
  4. 监控与告警: 监控事务性能与错误率
  5. 降级策略: 为分布式事务准备降级

采用上述方案可实现本地与分布式事务的可靠管理。