1. 分布式事务与幂等保障概述

分布式事务是微服务架构中的核心挑战,通过Seata等分布式事务框架和幂等性设计,可以确保跨服务的数据一致性。本文将详细介绍分布式事务解决方案、幂等性保障机制、事务管理和补偿机制的完整实现。

1.1 核心功能

  1. 分布式事务: 跨服务事务管理和协调
  2. 幂等性保障: 防止重复操作和数据不一致
  3. 事务补偿: 失败事务的补偿和回滚机制
  4. 事务监控: 事务状态监控和告警
  5. 性能优化: 事务性能优化和并发控制

1.2 技术架构

1
2
3
4
5
事务发起 → 事务协调器 → 分支事务 → 事务提交/回滚
↓ ↓ ↓ ↓
业务服务 → Seata TC → 数据源代理 → 补偿机制
↓ ↓ ↓ ↓
幂等检查 → 事务日志 → 状态管理 → 异常处理

2. Seata分布式事务配置

2.1 Seata配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* Seata分布式事务配置
*/
@Configuration
public class SeataConfig {

@Value("${seata.application-id}")
private String applicationId;

@Value("${seata.tx-service-group}")
private String txServiceGroup;

@Value("${seata.service.vgroup-mapping}")
private String vgroupMapping;

@Value("${seata.service.grouplist}")
private String grouplist;

/**
* 数据源代理配置
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource() {
return new DruidDataSource();
}

/**
* 数据源代理
*/
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}

/**
* JdbcTemplate配置
*/
@Bean
public JdbcTemplate jdbcTemplate(DataSourceProxy dataSourceProxy) {
return new JdbcTemplate(dataSourceProxy);
}

/**
* Seata配置初始化
*/
@PostConstruct
public void initSeata() {
// 设置应用ID
System.setProperty("seata.application.id", applicationId);
System.setProperty("seata.tx-service-group", txServiceGroup);
System.setProperty("seata.service.vgroup-mapping." + txServiceGroup, vgroupMapping);
System.setProperty("seata.service.grouplist." + vgroupMapping, grouplist);

// 设置事务模式
System.setProperty("seata.tx-service-mode", "AT");

// 设置日志存储模式
System.setProperty("seata.log.store-mode", "db");

// 设置数据库配置
System.setProperty("seata.store.db.datasource", "druid");
System.setProperty("seata.store.db.db-type", "mysql");
System.setProperty("seata.store.db.driver-class-name", "com.mysql.cj.jdbc.Driver");
System.setProperty("seata.store.db.url", "jdbc:mysql://localhost:3306/seata?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai");
System.setProperty("seata.store.db.user", "root");
System.setProperty("seata.store.db.password", "password");
}
}

/**
* Seata配置属性
*/
@Data
@ConfigurationProperties(prefix = "seata")
public class SeataProperties {
private String applicationId;
private String txServiceGroup;
private Service service = new Service();
private Store store = new Store();

@Data
public static class Service {
private String vgroupMapping;
private String grouplist;
}

@Data
public static class Store {
private String mode = "db";
private Db db = new Db();

@Data
public static class Db {
private String datasource = "druid";
private String dbType = "mysql";
private String driverClassName = "com.mysql.cj.jdbc.Driver";
private String url;
private String user;
private String password;
}
}
}

2.2 应用配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# application.yml
seata:
application-id: ${spring.application.name}
tx-service-group: my-tx-group
service:
vgroup-mapping: my-tx-group
grouplist: 127.0.0.1:8091
store:
mode: db
db:
datasource: druid
db-type: mysql
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
user: root
password: password

# 数据源配置
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/business?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
username: root
password: password
druid:
initial-size: 5
min-idle: 5
max-active: 20
max-wait: 60000
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 300000
validation-query: SELECT 1
test-while-idle: true
test-on-borrow: false
test-on-return: false

3. 分布式事务服务

3.1 订单服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/**
* 订单服务
*/
@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;

@Autowired
private InventoryService inventoryService;

@Autowired
private PaymentService paymentService;

@Autowired
private IdempotentService idempotentService;

/**
* 创建订单(分布式事务)
* @param orderRequest 订单请求
* @return 订单结果
*/
@GlobalTransactional(rollbackFor = Exception.class)
public OrderResult createOrder(OrderRequest orderRequest) {
try {
// 1. 幂等性检查
String idempotentKey = "order:create:" + orderRequest.getUserId() + ":" + orderRequest.getProductId();
if (!idempotentService.checkIdempotent(idempotentKey)) {
throw new BusinessException("重复的订单请求");
}

// 2. 创建订单
Order order = createOrderEntity(orderRequest);
orderMapper.insert(order);

// 3. 扣减库存
inventoryService.deductInventory(orderRequest.getProductId(), orderRequest.getQuantity());

// 4. 创建支付
PaymentRequest paymentRequest = PaymentRequest.builder()
.orderId(order.getId())
.userId(orderRequest.getUserId())
.amount(order.getTotalAmount())
.paymentMethod(orderRequest.getPaymentMethod())
.build();

paymentService.createPayment(paymentRequest);

// 5. 设置幂等性标记
idempotentService.setIdempotent(idempotentKey, 300); // 5分钟有效期

log.info("订单创建成功: orderId={}, userId={}", order.getId(), orderRequest.getUserId());

return OrderResult.success(order);

} catch (Exception e) {
log.error("订单创建失败: userId={}, productId={}",
orderRequest.getUserId(), orderRequest.getProductId(), e);
throw new BusinessException("订单创建失败: " + e.getMessage());
}
}

/**
* 取消订单(分布式事务)
* @param orderId 订单ID
* @return 取消结果
*/
@GlobalTransactional(rollbackFor = Exception.class)
public OrderResult cancelOrder(Long orderId) {
try {
// 1. 查询订单
Order order = orderMapper.selectById(orderId);
if (order == null) {
throw new BusinessException("订单不存在");
}

if (!"PENDING".equals(order.getStatus())) {
throw new BusinessException("订单状态不允许取消");
}

// 2. 更新订单状态
order.setStatus("CANCELLED");
order.setCancelTime(LocalDateTime.now());
orderMapper.updateById(order);

// 3. 恢复库存
inventoryService.restoreInventory(order.getProductId(), order.getQuantity());

// 4. 取消支付
paymentService.cancelPayment(orderId);

log.info("订单取消成功: orderId={}", orderId);

return OrderResult.success(order);

} catch (Exception e) {
log.error("订单取消失败: orderId={}", orderId, e);
throw new BusinessException("订单取消失败: " + e.getMessage());
}
}

/**
* 创建订单实体
* @param orderRequest 订单请求
* @return 订单实体
*/
private Order createOrderEntity(OrderRequest orderRequest) {
Order order = new Order();
order.setUserId(orderRequest.getUserId());
order.setProductId(orderRequest.getProductId());
order.setQuantity(orderRequest.getQuantity());
order.setUnitPrice(orderRequest.getUnitPrice());
order.setTotalAmount(orderRequest.getQuantity() * orderRequest.getUnitPrice());
order.setStatus("PENDING");
order.setCreateTime(LocalDateTime.now());
order.setUpdateTime(LocalDateTime.now());
return order;
}
}

/**
* 订单实体
*/
@Data
@TableName("orders")
public class Order {
@TableId(type = IdType.AUTO)
private Long id;
private Long userId;
private Long productId;
private Integer quantity;
private BigDecimal unitPrice;
private BigDecimal totalAmount;
private String status;
private LocalDateTime createTime;
private LocalDateTime updateTime;
private LocalDateTime cancelTime;
}

/**
* 订单请求
*/
@Data
public class OrderRequest {
private Long userId;
private Long productId;
private Integer quantity;
private BigDecimal unitPrice;
private String paymentMethod;
}

/**
* 订单结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderResult {
private boolean success;
private Order order;
private String errorMessage;

public static OrderResult success(Order order) {
return OrderResult.builder()
.success(true)
.order(order)
.build();
}

public static OrderResult error(String errorMessage) {
return OrderResult.builder()
.success(false)
.errorMessage(errorMessage)
.build();
}
}

3.2 库存服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/**
* 库存服务
*/
@Service
public class InventoryService {

@Autowired
private InventoryMapper inventoryMapper;

@Autowired
private IdempotentService idempotentService;

/**
* 扣减库存
* @param productId 商品ID
* @param quantity 数量
* @return 扣减结果
*/
@Transactional(rollbackFor = Exception.class)
public InventoryResult deductInventory(Long productId, Integer quantity) {
try {
// 1. 幂等性检查
String idempotentKey = "inventory:deduct:" + productId + ":" + quantity + ":" + System.currentTimeMillis();
if (!idempotentService.checkIdempotent(idempotentKey)) {
throw new BusinessException("重复的库存扣减请求");
}

// 2. 查询库存
Inventory inventory = inventoryMapper.selectById(productId);
if (inventory == null) {
throw new BusinessException("商品不存在");
}

// 3. 检查库存是否充足
if (inventory.getAvailableStock() < quantity) {
throw new BusinessException("库存不足");
}

// 4. 扣减库存
inventory.setAvailableStock(inventory.getAvailableStock() - quantity);
inventory.setReservedStock(inventory.getReservedStock() + quantity);
inventory.setUpdateTime(LocalDateTime.now());

inventoryMapper.updateById(inventory);

// 5. 设置幂等性标记
idempotentService.setIdempotent(idempotentKey, 300);

log.info("库存扣减成功: productId={}, quantity={}, remainingStock={}",
productId, quantity, inventory.getAvailableStock());

return InventoryResult.success(inventory);

} catch (Exception e) {
log.error("库存扣减失败: productId={}, quantity={}", productId, quantity, e);
throw new BusinessException("库存扣减失败: " + e.getMessage());
}
}

/**
* 恢复库存
* @param productId 商品ID
* @param quantity 数量
* @return 恢复结果
*/
@Transactional(rollbackFor = Exception.class)
public InventoryResult restoreInventory(Long productId, Integer quantity) {
try {
// 1. 查询库存
Inventory inventory = inventoryMapper.selectById(productId);
if (inventory == null) {
throw new BusinessException("商品不存在");
}

// 2. 恢复库存
inventory.setAvailableStock(inventory.getAvailableStock() + quantity);
inventory.setReservedStock(inventory.getReservedStock() - quantity);
inventory.setUpdateTime(LocalDateTime.now());

inventoryMapper.updateById(inventory);

log.info("库存恢复成功: productId={}, quantity={}, availableStock={}",
productId, quantity, inventory.getAvailableStock());

return InventoryResult.success(inventory);

} catch (Exception e) {
log.error("库存恢复失败: productId={}, quantity={}", productId, quantity, e);
throw new BusinessException("库存恢复失败: " + e.getMessage());
}
}
}

/**
* 库存实体
*/
@Data
@TableName("inventory")
public class Inventory {
@TableId(type = IdType.AUTO)
private Long id;
private Long productId;
private Integer totalStock;
private Integer availableStock;
private Integer reservedStock;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}

/**
* 库存结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class InventoryResult {
private boolean success;
private Inventory inventory;
private String errorMessage;

public static InventoryResult success(Inventory inventory) {
return InventoryResult.builder()
.success(true)
.inventory(inventory)
.build();
}

public static InventoryResult error(String errorMessage) {
return InventoryResult.builder()
.success(false)
.errorMessage(errorMessage)
.build();
}
}

4. 幂等性保障机制

4.1 幂等性服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/**
* 幂等性服务
*/
@Service
public class IdempotentService {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private IdempotentRecordMapper idempotentRecordMapper;

/**
* 检查幂等性
* @param idempotentKey 幂等性键
* @return 是否幂等
*/
public boolean checkIdempotent(String idempotentKey) {
try {
// 1. 检查Redis缓存
String cachedResult = redisTemplate.opsForValue().get(idempotentKey);
if (cachedResult != null) {
return false; // 已存在,不是幂等
}

// 2. 检查数据库记录
IdempotentRecord record = idempotentRecordMapper.selectByIdempotentKey(idempotentKey);
if (record != null) {
// 如果记录存在且未过期,则不是幂等
if (record.getExpireTime().isAfter(LocalDateTime.now())) {
return false;
}
}

return true; // 可以执行

} catch (Exception e) {
log.error("检查幂等性失败: idempotentKey={}", idempotentKey, e);
return false; // 异常情况下不允许执行
}
}

/**
* 设置幂等性标记
* @param idempotentKey 幂等性键
* @param expireSeconds 过期时间(秒)
*/
public void setIdempotent(String idempotentKey, int expireSeconds) {
try {
// 1. 设置Redis缓存
redisTemplate.opsForValue().set(idempotentKey, "1", Duration.ofSeconds(expireSeconds));

// 2. 保存数据库记录
IdempotentRecord record = new IdempotentRecord();
record.setIdempotentKey(idempotentKey);
record.setCreateTime(LocalDateTime.now());
record.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
record.setStatus("ACTIVE");

idempotentRecordMapper.insert(record);

log.debug("设置幂等性标记成功: idempotentKey={}, expireSeconds={}", idempotentKey, expireSeconds);

} catch (Exception e) {
log.error("设置幂等性标记失败: idempotentKey={}", idempotentKey, e);
}
}

/**
* 删除幂等性标记
* @param idempotentKey 幂等性键
*/
public void removeIdempotent(String idempotentKey) {
try {
// 1. 删除Redis缓存
redisTemplate.delete(idempotentKey);

// 2. 更新数据库记录状态
IdempotentRecord record = idempotentRecordMapper.selectByIdempotentKey(idempotentKey);
if (record != null) {
record.setStatus("INACTIVE");
record.setUpdateTime(LocalDateTime.now());
idempotentRecordMapper.updateById(record);
}

log.debug("删除幂等性标记成功: idempotentKey={}", idempotentKey);

} catch (Exception e) {
log.error("删除幂等性标记失败: idempotentKey={}", idempotentKey, e);
}
}

/**
* 清理过期的幂等性记录
*/
@Scheduled(fixedRate = 300000) // 5分钟执行一次
public void cleanExpiredRecords() {
try {
// 清理数据库中的过期记录
idempotentRecordMapper.deleteExpiredRecords(LocalDateTime.now());

log.debug("清理过期幂等性记录完成");

} catch (Exception e) {
log.error("清理过期幂等性记录失败", e);
}
}
}

/**
* 幂等性记录实体
*/
@Data
@TableName("idempotent_records")
public class IdempotentRecord {
@TableId(type = IdType.AUTO)
private Long id;
private String idempotentKey;
private LocalDateTime createTime;
private LocalDateTime updateTime;
private LocalDateTime expireTime;
private String status;
}

4.2 幂等性注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/**
* 幂等性注解
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {

/**
* 幂等性键的SpEL表达式
*/
String key() default "";

/**
* 过期时间(秒)
*/
int expire() default 300;

/**
* 错误消息
*/
String message() default "重复请求";
}

/**
* 幂等性切面
*/
@Aspect
@Component
public class IdempotentAspect {

@Autowired
private IdempotentService idempotentService;

@Autowired
private SpelExpressionParser spelParser = new SpelExpressionParser();

/**
* 幂等性切点
*/
@Pointcut("@annotation(com.example.annotation.Idempotent)")
public void idempotentPointcut() {}

/**
* 幂等性处理
*/
@Around("idempotentPointcut()")
public Object handleIdempotent(ProceedingJoinPoint joinPoint) throws Throwable {
// 1. 获取注解
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Idempotent idempotent = signature.getMethod().getAnnotation(Idempotent.class);

// 2. 生成幂等性键
String idempotentKey = generateIdempotentKey(joinPoint, idempotent);

// 3. 检查幂等性
if (!idempotentService.checkIdempotent(idempotentKey)) {
throw new BusinessException(idempotent.message());
}

try {
// 4. 执行方法
Object result = joinPoint.proceed();

// 5. 设置幂等性标记
idempotentService.setIdempotent(idempotentKey, idempotent.expire());

return result;

} catch (Exception e) {
// 6. 异常情况下删除幂等性标记
idempotentService.removeIdempotent(idempotentKey);
throw e;
}
}

/**
* 生成幂等性键
*/
private String generateIdempotentKey(ProceedingJoinPoint joinPoint, Idempotent idempotent) {
if (StringUtils.hasText(idempotent.key())) {
// 使用SpEL表达式生成键
return evaluateSpelExpression(joinPoint, idempotent.key());
} else {
// 使用默认规则生成键
return generateDefaultKey(joinPoint);
}
}

/**
* 评估SpEL表达式
*/
private String evaluateSpelExpression(ProceedingJoinPoint joinPoint, String expression) {
try {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
String[] paramNames = signature.getParameterNames();
Object[] args = joinPoint.getArgs();

// 创建上下文
StandardEvaluationContext context = new StandardEvaluationContext();
for (int i = 0; i < paramNames.length; i++) {
context.setVariable(paramNames[i], args[i]);
}

// 解析表达式
Expression exp = spelParser.parseExpression(expression);
Object value = exp.getValue(context);

return String.valueOf(value);

} catch (Exception e) {
log.error("评估SpEL表达式失败: expression={}", expression, e);
return generateDefaultKey(joinPoint);
}
}

/**
* 生成默认键
*/
private String generateDefaultKey(ProceedingJoinPoint joinPoint) {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
String className = signature.getDeclaringType().getSimpleName();
String methodName = signature.getName();
Object[] args = joinPoint.getArgs();

StringBuilder keyBuilder = new StringBuilder();
keyBuilder.append(className).append(".").append(methodName);

if (args.length > 0) {
keyBuilder.append(":");
for (Object arg : args) {
keyBuilder.append(arg != null ? arg.toString() : "null").append(",");
}
keyBuilder.setLength(keyBuilder.length() - 1); // 移除最后的逗号
}

return keyBuilder.toString();
}
}

5. 事务补偿机制

5.1 补偿服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/**
* 事务补偿服务
*/
@Service
public class CompensationService {

@Autowired
private CompensationRecordMapper compensationRecordMapper;

@Autowired
private OrderService orderService;

@Autowired
private InventoryService inventoryService;

@Autowired
private PaymentService paymentService;

/**
* 记录补偿操作
* @param compensationRecord 补偿记录
*/
public void recordCompensation(CompensationRecord compensationRecord) {
try {
compensationRecord.setCreateTime(LocalDateTime.now());
compensationRecord.setStatus("PENDING");
compensationRecordMapper.insert(compensationRecord);

log.info("记录补偿操作: transactionId={}, operation={}",
compensationRecord.getTransactionId(), compensationRecord.getOperation());

} catch (Exception e) {
log.error("记录补偿操作失败: transactionId={}",
compensationRecord.getTransactionId(), e);
}
}

/**
* 执行补偿操作
* @param transactionId 事务ID
*/
@Async
public void executeCompensation(String transactionId) {
try {
// 1. 查询补偿记录
List<CompensationRecord> records = compensationRecordMapper.selectByTransactionId(transactionId);

// 2. 按创建时间倒序执行补偿
records.sort((r1, r2) -> r2.getCreateTime().compareTo(r1.getCreateTime()));

for (CompensationRecord record : records) {
if ("PENDING".equals(record.getStatus())) {
executeCompensationOperation(record);
}
}

log.info("补偿操作执行完成: transactionId={}", transactionId);

} catch (Exception e) {
log.error("执行补偿操作失败: transactionId={}", transactionId, e);
}
}

/**
* 执行补偿操作
* @param record 补偿记录
*/
private void executeCompensationOperation(CompensationRecord record) {
try {
switch (record.getOperation()) {
case "CREATE_ORDER":
compensateCreateOrder(record);
break;
case "DEDUCT_INVENTORY":
compensateDeductInventory(record);
break;
case "CREATE_PAYMENT":
compensateCreatePayment(record);
break;
default:
log.warn("未知的补偿操作: {}", record.getOperation());
}

// 更新补偿记录状态
record.setStatus("COMPLETED");
record.setUpdateTime(LocalDateTime.now());
compensationRecordMapper.updateById(record);

} catch (Exception e) {
log.error("执行补偿操作失败: recordId={}, operation={}",
record.getId(), record.getOperation(), e);

// 更新补偿记录状态为失败
record.setStatus("FAILED");
record.setErrorMessage(e.getMessage());
record.setUpdateTime(LocalDateTime.now());
compensationRecordMapper.updateById(record);
}
}

/**
* 补偿创建订单
*/
private void compensateCreateOrder(CompensationRecord record) {
try {
// 解析补偿数据
Map<String, Object> data = JSON.parseObject(record.getCompensationData(), Map.class);
Long orderId = Long.valueOf(data.get("orderId").toString());

// 取消订单
orderService.cancelOrder(orderId);

log.info("补偿创建订单完成: orderId={}", orderId);

} catch (Exception e) {
log.error("补偿创建订单失败: recordId={}", record.getId(), e);
throw e;
}
}

/**
* 补偿扣减库存
*/
private void compensateDeductInventory(CompensationRecord record) {
try {
// 解析补偿数据
Map<String, Object> data = JSON.parseObject(record.getCompensationData(), Map.class);
Long productId = Long.valueOf(data.get("productId").toString());
Integer quantity = Integer.valueOf(data.get("quantity").toString());

// 恢复库存
inventoryService.restoreInventory(productId, quantity);

log.info("补偿扣减库存完成: productId={}, quantity={}", productId, quantity);

} catch (Exception e) {
log.error("补偿扣减库存失败: recordId={}", record.getId(), e);
throw e;
}
}

/**
* 补偿创建支付
*/
private void compensateCreatePayment(CompensationRecord record) {
try {
// 解析补偿数据
Map<String, Object> data = JSON.parseObject(record.getCompensationData(), Map.class);
Long paymentId = Long.valueOf(data.get("paymentId").toString());

// 取消支付
paymentService.cancelPayment(paymentId);

log.info("补偿创建支付完成: paymentId={}", paymentId);

} catch (Exception e) {
log.error("补偿创建支付失败: recordId={}", record.getId(), e);
throw e;
}
}
}

/**
* 补偿记录实体
*/
@Data
@TableName("compensation_records")
public class CompensationRecord {
@TableId(type = IdType.AUTO)
private Long id;
private String transactionId;
private String operation;
private String compensationData;
private String status;
private String errorMessage;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}

6. 总结

通过分布式事务与幂等保障的实现,我们成功构建了一个完整的分布式事务管理系统。关键特性包括:

6.1 核心优势

  1. 分布式事务: 跨服务事务管理和协调
  2. 幂等性保障: 防止重复操作和数据不一致
  3. 事务补偿: 失败事务的补偿和回滚机制
  4. 事务监控: 事务状态监控和告警
  5. 性能优化: 事务性能优化和并发控制

6.2 最佳实践

  1. 事务设计: 合理的事务边界和粒度控制
  2. 幂等性设计: 完善的幂等性保障机制
  3. 补偿机制: 可靠的事务补偿和回滚策略
  4. 监控告警: 完善的事务监控和异常处理
  5. 性能优化: 事务性能优化和并发控制

这套分布式事务方案不仅能够确保跨服务的数据一致性,还提供了完善的幂等性保障和补偿机制,是现代微服务架构的重要基础设施。