1. 一致性挑战概述

在分布式系统中,保证数据一致性是最核心的挑战之一。特别是在电商场景中,订单创建和库存扣减的一致性直接关系到业务的正确性。本文将详细介绍基于Kafka事务、幂等性设计和去重表的一致性保证方案。

1.1 核心问题

  1. 分布式事务: 跨多个服务的数据一致性
  2. 消息重复: 网络重试导致的消息重复消费
  3. 幂等性: 重复操作的结果一致性
  4. 最终一致性: 在分布式环境下的数据最终一致

1.2 技术架构

1
2
3
4
5
用户请求 → 订单服务 → Kafka事务消息 → 库存服务
↓ ↓ ↓ ↓
幂等键检查 → 去重表记录 → 事务协调器 → 库存扣减
↓ ↓ ↓ ↓
订单创建 → 消息发送 → 消息消费 → 库存更新

2. Kafka事务配置

2.1 生产者事务配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Configuration
public class KafkaTransactionConfig {

/**
* 事务生产者配置
*/
@Bean
public ProducerFactory<String, String> transactionProducerFactory() {
Map<String, Object> configProps = new HashMap<>();

// 基础配置
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

// 事务配置
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-transaction-producer");
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 启用幂等性
configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

// 性能优化
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 5);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

return new DefaultKafkaProducerFactory<>(configProps);
}

/**
* 事务KafkaTemplate
*/
@Bean
public KafkaTemplate<String, String> transactionKafkaTemplate() {
KafkaTemplate<String, String> template = new KafkaTemplate<>(transactionProducerFactory());
template.setDefaultTopic("order-events");
return template;
}

/**
* 消费者配置
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();

configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

// 事务消费配置
configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 只读已提交的消息
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

return new DefaultKafkaConsumerFactory<>(configProps);
}
}

2.2 事务管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Component
public class KafkaTransactionManager {

@Autowired
private KafkaTemplate<String, String> transactionKafkaTemplate;

@Autowired
private OrderService orderService;

@Autowired
private InventoryService inventoryService;

/**
* 执行分布式事务
* 订单创建 + 库存扣减的原子性操作
*/
@Transactional(transactionManager = "kafkaTransactionManager")
public void executeOrderTransaction(OrderRequest orderRequest) {
try {
// 1. 开始Kafka事务
transactionKafkaTemplate.executeInTransaction(operations -> {

// 2. 创建订单
Order order = orderService.createOrder(orderRequest);

// 3. 发送库存扣减消息
InventoryDeductionMessage message = InventoryDeductionMessage.builder()
.orderId(order.getId())
.productId(orderRequest.getProductId())
.quantity(orderRequest.getQuantity())
.timestamp(System.currentTimeMillis())
.build();

// 4. 在事务中发送消息
operations.send("inventory-deduction", order.getId().toString(), message.toJson());

// 5. 发送订单创建事件
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderId(order.getId())
.userId(orderRequest.getUserId())
.productId(orderRequest.getProductId())
.quantity(orderRequest.getQuantity())
.amount(orderRequest.getAmount())
.timestamp(System.currentTimeMillis())
.build();

operations.send("order-events", order.getId().toString(), event.toJson());

return order;
});

} catch (Exception e) {
log.error("订单事务执行失败: orderRequest={}", orderRequest, e);
throw new TransactionException("订单创建失败", e);
}
}

/**
* 幂等性检查
*/
public boolean checkIdempotency(String idempotencyKey) {
// 检查幂等键是否已存在
return orderService.isIdempotencyKeyExists(idempotencyKey);
}

/**
* 记录幂等键
*/
public void recordIdempotencyKey(String idempotencyKey, Long orderId) {
orderService.recordIdempotencyKey(idempotencyKey, orderId);
}
}

3. 订单服务实现

3.1 订单实体和DTO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/**
* 订单实体
*/
@Entity
@Table(name = "orders")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Order {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

@Column(name = "order_no", unique = true, nullable = false)
private String orderNo; // 订单号

@Column(name = "user_id", nullable = false)
private Long userId; // 用户ID

@Column(name = "product_id", nullable = false)
private Long productId; // 商品ID

@Column(name = "quantity", nullable = false)
private Integer quantity; // 数量

@Column(name = "amount", nullable = false)
private BigDecimal amount; // 金额

@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
private OrderStatus status; // 订单状态

@Column(name = "idempotency_key", unique = true)
private String idempotencyKey; // 幂等键

@Column(name = "created_time", nullable = false)
private LocalDateTime createdTime; // 创建时间

@Column(name = "updated_time")
private LocalDateTime updatedTime; // 更新时间

@Version
private Long version; // 乐观锁版本号
}

/**
* 订单状态枚举
*/
public enum OrderStatus {
PENDING, // 待处理
CONFIRMED, // 已确认
PAID, // 已支付
SHIPPED, // 已发货
DELIVERED, // 已送达
CANCELLED, // 已取消
REFUNDED // 已退款
}

/**
* 订单请求DTO
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class OrderRequest {

@NotNull(message = "用户ID不能为空")
private Long userId;

@NotNull(message = "商品ID不能为空")
private Long productId;

@NotNull(message = "数量不能为空")
@Min(value = 1, message = "数量必须大于0")
private Integer quantity;

@NotNull(message = "金额不能为空")
@DecimalMin(value = "0.01", message = "金额必须大于0")
private BigDecimal amount;

private String idempotencyKey; // 幂等键

private String remark; // 备注
}

3.2 订单服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@Service
@Transactional
public class OrderService {

@Autowired
private OrderRepository orderRepository;

@Autowired
private IdempotencyService idempotencyService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 创建订单 - 支持幂等性
*/
public Order createOrder(OrderRequest orderRequest) {
// 1. 幂等性检查
if (StringUtils.hasText(orderRequest.getIdempotencyKey())) {
Order existingOrder = getOrderByIdempotencyKey(orderRequest.getIdempotencyKey());
if (existingOrder != null) {
log.info("订单已存在,返回现有订单: idempotencyKey={}, orderId={}",
orderRequest.getIdempotencyKey(), existingOrder.getId());
return existingOrder;
}
}

// 2. 生成订单号
String orderNo = generateOrderNo();

// 3. 创建订单实体
Order order = Order.builder()
.orderNo(orderNo)
.userId(orderRequest.getUserId())
.productId(orderRequest.getProductId())
.quantity(orderRequest.getQuantity())
.amount(orderRequest.getAmount())
.status(OrderStatus.PENDING)
.idempotencyKey(orderRequest.getIdempotencyKey())
.createdTime(LocalDateTime.now())
.build();

// 4. 保存订单
order = orderRepository.save(order);

// 5. 记录幂等键
if (StringUtils.hasText(orderRequest.getIdempotencyKey())) {
idempotencyService.recordIdempotencyKey(
orderRequest.getIdempotencyKey(),
order.getId()
);
}

// 6. 缓存订单信息
cacheOrderInfo(order);

log.info("订单创建成功: orderId={}, orderNo={}", order.getId(), order.getOrderNo());
return order;
}

/**
* 根据幂等键查询订单
*/
public Order getOrderByIdempotencyKey(String idempotencyKey) {
return orderRepository.findByIdempotencyKey(idempotencyKey);
}

/**
* 检查幂等键是否存在
*/
public boolean isIdempotencyKeyExists(String idempotencyKey) {
return orderRepository.existsByIdempotencyKey(idempotencyKey);
}

/**
* 记录幂等键
*/
public void recordIdempotencyKey(String idempotencyKey, Long orderId) {
idempotencyService.recordIdempotencyKey(idempotencyKey, orderId);
}

/**
* 更新订单状态
*/
public void updateOrderStatus(Long orderId, OrderStatus status) {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException("订单不存在: " + orderId));

order.setStatus(status);
order.setUpdatedTime(LocalDateTime.now());
orderRepository.save(order);

// 更新缓存
cacheOrderInfo(order);

log.info("订单状态更新: orderId={}, status={}", orderId, status);
}

/**
* 生成订单号
*/
private String generateOrderNo() {
// 格式: ORD + 时间戳 + 随机数
String timestamp = String.valueOf(System.currentTimeMillis());
String random = String.valueOf((int) (Math.random() * 1000));
return "ORD" + timestamp + random;
}

/**
* 缓存订单信息
*/
private void cacheOrderInfo(Order order) {
String cacheKey = "order:info:" + order.getId();
redisTemplate.opsForValue().set(cacheKey, order, Duration.ofHours(24));
}
}

3.3 幂等性服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
@Service
public class IdempotencyService {

@Autowired
private IdempotencyRepository idempotencyRepository;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 记录幂等键
*/
public void recordIdempotencyKey(String idempotencyKey, Long orderId) {
// 1. 先检查Redis缓存
String cacheKey = "idempotency:" + idempotencyKey;
if (redisTemplate.hasKey(cacheKey)) {
log.warn("幂等键已存在: {}", idempotencyKey);
return;
}

// 2. 检查数据库
if (idempotencyRepository.existsByIdempotencyKey(idempotencyKey)) {
log.warn("幂等键已存在于数据库: {}", idempotencyKey);
return;
}

// 3. 创建幂等记录
IdempotencyRecord record = IdempotencyRecord.builder()
.idempotencyKey(idempotencyKey)
.orderId(orderId)
.createdTime(LocalDateTime.now())
.build();

// 4. 保存到数据库
idempotencyRepository.save(record);

// 5. 缓存到Redis
redisTemplate.opsForValue().set(cacheKey, orderId, Duration.ofHours(24));

log.info("幂等键记录成功: idempotencyKey={}, orderId={}", idempotencyKey, orderId);
}

/**
* 检查幂等键是否存在
*/
public boolean isIdempotencyKeyExists(String idempotencyKey) {
// 1. 先检查Redis缓存
String cacheKey = "idempotency:" + idempotencyKey;
if (redisTemplate.hasKey(cacheKey)) {
return true;
}

// 2. 检查数据库
return idempotencyRepository.existsByIdempotencyKey(idempotencyKey);
}

/**
* 根据幂等键获取订单ID
*/
public Long getOrderIdByIdempotencyKey(String idempotencyKey) {
// 1. 先检查Redis缓存
String cacheKey = "idempotency:" + idempotencyKey;
Object cachedOrderId = redisTemplate.opsForValue().get(cacheKey);
if (cachedOrderId != null) {
return Long.valueOf(cachedOrderId.toString());
}

// 2. 查询数据库
IdempotencyRecord record = idempotencyRepository.findByIdempotencyKey(idempotencyKey);
if (record != null) {
// 3. 缓存到Redis
redisTemplate.opsForValue().set(cacheKey, record.getOrderId(), Duration.ofHours(24));
return record.getOrderId();
}

return null;
}
}

/**
* 幂等性记录实体
*/
@Entity
@Table(name = "idempotency_records")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class IdempotencyRecord {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

@Column(name = "idempotency_key", unique = true, nullable = false)
private String idempotencyKey; // 幂等键

@Column(name = "order_id", nullable = false)
private Long orderId; // 关联订单ID

@Column(name = "created_time", nullable = false)
private LocalDateTime createdTime; // 创建时间
}

4. 库存服务实现

4.1 库存实体和消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/**
* 库存实体
*/
@Entity
@Table(name = "inventory")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Inventory {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

@Column(name = "product_id", unique = true, nullable = false)
private Long productId; // 商品ID

@Column(name = "total_stock", nullable = false)
private Integer totalStock; // 总库存

@Column(name = "available_stock", nullable = false)
private Integer availableStock; // 可用库存

@Column(name = "reserved_stock", nullable = false)
private Integer reservedStock; // 预留库存

@Column(name = "created_time", nullable = false)
private LocalDateTime createdTime; // 创建时间

@Column(name = "updated_time")
private LocalDateTime updatedTime; // 更新时间

@Version
private Long version; // 乐观锁版本号
}

/**
* 库存扣减消息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class InventoryDeductionMessage {

private Long orderId; // 订单ID
private Long productId; // 商品ID
private Integer quantity; // 扣减数量
private Long timestamp; // 时间戳
private String idempotencyKey; // 幂等键

public String toJson() {
return JSON.toJSONString(this);
}

public static InventoryDeductionMessage fromJson(String json) {
return JSON.parseObject(json, InventoryDeductionMessage.class);
}
}

/**
* 库存操作记录
*/
@Entity
@Table(name = "inventory_operations")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class InventoryOperation {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

@Column(name = "order_id", nullable = false)
private Long orderId; // 订单ID

@Column(name = "product_id", nullable = false)
private Long productId; // 商品ID

@Column(name = "operation_type", nullable = false)
private String operationType; // 操作类型:DEDUCT, RESTORE

@Column(name = "quantity", nullable = false)
private Integer quantity; // 操作数量

@Column(name = "idempotency_key", unique = true)
private String idempotencyKey; // 幂等键

@Column(name = "status", nullable = false)
private String status; // 状态:SUCCESS, FAILED

@Column(name = "created_time", nullable = false)
private LocalDateTime createdTime; // 创建时间
}

4.2 库存服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@Service
@Transactional
public class InventoryService {

@Autowired
private InventoryRepository inventoryRepository;

@Autowired
private InventoryOperationRepository operationRepository;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 扣减库存 - 支持幂等性
*/
public boolean deductInventory(InventoryDeductionMessage message) {
// 1. 幂等性检查
if (StringUtils.hasText(message.getIdempotencyKey())) {
InventoryOperation existingOperation = operationRepository
.findByIdempotencyKey(message.getIdempotencyKey());
if (existingOperation != null) {
log.info("库存操作已存在,返回现有结果: idempotencyKey={}, status={}",
message.getIdempotencyKey(), existingOperation.getStatus());
return "SUCCESS".equals(existingOperation.getStatus());
}
}

try {
// 2. 获取库存信息
Inventory inventory = inventoryRepository.findByProductId(message.getProductId())
.orElseThrow(() -> new InventoryNotFoundException("商品库存不存在: " + message.getProductId()));

// 3. 检查库存是否充足
if (inventory.getAvailableStock() < message.getQuantity()) {
log.warn("库存不足: productId={}, available={}, required={}",
message.getProductId(), inventory.getAvailableStock(), message.getQuantity());

// 记录失败操作
recordInventoryOperation(message, "FAILED");
return false;
}

// 4. 使用乐观锁扣减库存
int updatedRows = inventoryRepository.deductStock(
message.getProductId(),
message.getQuantity(),
inventory.getVersion()
);

if (updatedRows == 0) {
log.warn("库存扣减失败,可能被其他操作修改: productId={}", message.getProductId());
recordInventoryOperation(message, "FAILED");
return false;
}

// 5. 记录成功操作
recordInventoryOperation(message, "SUCCESS");

// 6. 更新缓存
updateInventoryCache(message.getProductId());

log.info("库存扣减成功: productId={}, quantity={}, orderId={}",
message.getProductId(), message.getQuantity(), message.getOrderId());

return true;

} catch (Exception e) {
log.error("库存扣减异常: message={}", message, e);
recordInventoryOperation(message, "FAILED");
return false;
}
}

/**
* 恢复库存
*/
public boolean restoreInventory(Long orderId, Long productId, Integer quantity) {
try {
// 1. 获取库存信息
Inventory inventory = inventoryRepository.findByProductId(productId)
.orElseThrow(() -> new InventoryNotFoundException("商品库存不存在: " + productId));

// 2. 恢复库存
int updatedRows = inventoryRepository.restoreStock(
productId,
quantity,
inventory.getVersion()
);

if (updatedRows == 0) {
log.warn("库存恢复失败: productId={}", productId);
return false;
}

// 3. 更新缓存
updateInventoryCache(productId);

log.info("库存恢复成功: productId={}, quantity={}, orderId={}",
productId, quantity, orderId);

return true;

} catch (Exception e) {
log.error("库存恢复异常: orderId={}, productId={}, quantity={}",
orderId, productId, quantity, e);
return false;
}
}

/**
* 记录库存操作
*/
private void recordInventoryOperation(InventoryDeductionMessage message, String status) {
InventoryOperation operation = InventoryOperation.builder()
.orderId(message.getOrderId())
.productId(message.getProductId())
.operationType("DEDUCT")
.quantity(message.getQuantity())
.idempotencyKey(message.getIdempotencyKey())
.status(status)
.createdTime(LocalDateTime.now())
.build();

operationRepository.save(operation);
}

/**
* 更新库存缓存
*/
private void updateInventoryCache(Long productId) {
Inventory inventory = inventoryRepository.findByProductId(productId).orElse(null);
if (inventory != null) {
String cacheKey = "inventory:" + productId;
redisTemplate.opsForValue().set(cacheKey, inventory, Duration.ofMinutes(30));
}
}

/**
* 获取库存信息
*/
public Inventory getInventory(Long productId) {
// 1. 先检查缓存
String cacheKey = "inventory:" + productId;
Object cachedInventory = redisTemplate.opsForValue().get(cacheKey);
if (cachedInventory != null) {
return (Inventory) cachedInventory;
}

// 2. 查询数据库
Inventory inventory = inventoryRepository.findByProductId(productId).orElse(null);
if (inventory != null) {
// 3. 缓存到Redis
redisTemplate.opsForValue().set(cacheKey, inventory, Duration.ofMinutes(30));
}

return inventory;
}
}

5. Kafka消费者实现

5.1 库存扣减消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@Component
public class InventoryDeductionConsumer {

@Autowired
private InventoryService inventoryService;

@Autowired
private OrderService orderService;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 消费库存扣减消息
*/
@KafkaListener(topics = "inventory-deduction", groupId = "inventory-consumer-group")
public void handleInventoryDeduction(ConsumerRecord<String, String> record) {
try {
// 1. 解析消息
InventoryDeductionMessage message = InventoryDeductionMessage.fromJson(record.value());
log.info("收到库存扣减消息: orderId={}, productId={}, quantity={}",
message.getOrderId(), message.getProductId(), message.getQuantity());

// 2. 执行库存扣减
boolean success = inventoryService.deductInventory(message);

// 3. 发送结果消息
InventoryDeductionResult result = InventoryDeductionResult.builder()
.orderId(message.getOrderId())
.productId(message.getProductId())
.quantity(message.getQuantity())
.success(success)
.timestamp(System.currentTimeMillis())
.build();

// 4. 发送到结果主题
kafkaTemplate.send("inventory-deduction-result",
message.getOrderId().toString(),
result.toJson());

// 5. 更新订单状态
if (success) {
orderService.updateOrderStatus(message.getOrderId(), OrderStatus.CONFIRMED);
} else {
orderService.updateOrderStatus(message.getOrderId(), OrderStatus.CANCELLED);
}

} catch (Exception e) {
log.error("处理库存扣减消息失败: {}", record.value(), e);

// 发送失败消息
InventoryDeductionResult result = InventoryDeductionResult.builder()
.orderId(Long.valueOf(record.key()))
.success(false)
.error(e.getMessage())
.timestamp(System.currentTimeMillis())
.build();

kafkaTemplate.send("inventory-deduction-result",
record.key(),
result.toJson());
}
}
}

/**
* 库存扣减结果
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class InventoryDeductionResult {

private Long orderId; // 订单ID
private Long productId; // 商品ID
private Integer quantity; // 数量
private boolean success; // 是否成功
private String error; // 错误信息
private Long timestamp; // 时间戳

public String toJson() {
return JSON.toJSONString(this);
}
}

5.2 订单状态消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Component
public class OrderStatusConsumer {

@Autowired
private OrderService orderService;

@Autowired
private InventoryService inventoryService;

/**
* 消费订单状态变更消息
*/
@KafkaListener(topics = "order-status-changes", groupId = "order-status-consumer-group")
public void handleOrderStatusChange(ConsumerRecord<String, String> record) {
try {
OrderStatusChangeMessage message = JSON.parseObject(record.value(), OrderStatusChangeMessage.class);

log.info("收到订单状态变更消息: orderId={}, oldStatus={}, newStatus={}",
message.getOrderId(), message.getOldStatus(), message.getNewStatus());

// 处理订单取消 - 恢复库存
if (OrderStatus.CANCELLED.name().equals(message.getNewStatus())) {
restoreInventoryForCancelledOrder(message.getOrderId());
}

} catch (Exception e) {
log.error("处理订单状态变更消息失败: {}", record.value(), e);
}
}

/**
* 为取消的订单恢复库存
*/
private void restoreInventoryForCancelledOrder(Long orderId) {
try {
// 1. 获取订单信息
Order order = orderService.getOrderById(orderId);
if (order == null) {
log.warn("订单不存在: orderId={}", orderId);
return;
}

// 2. 恢复库存
boolean success = inventoryService.restoreInventory(
orderId,
order.getProductId(),
order.getQuantity()
);

if (success) {
log.info("订单取消库存恢复成功: orderId={}, productId={}, quantity={}",
orderId, order.getProductId(), order.getQuantity());
} else {
log.warn("订单取消库存恢复失败: orderId={}", orderId);
}

} catch (Exception e) {
log.error("订单取消库存恢复异常: orderId={}", orderId, e);
}
}
}

6. 控制器实现

6.1 订单控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
@RestController
@RequestMapping("/api/orders")
@Validated
public class OrderController {

@Autowired
private KafkaTransactionManager transactionManager;

@Autowired
private OrderService orderService;

/**
* 创建订单
*/
@PostMapping
public ResponseEntity<ApiResponse<Order>> createOrder(@Valid @RequestBody OrderRequest orderRequest) {
try {
// 1. 生成幂等键(如果未提供)
if (!StringUtils.hasText(orderRequest.getIdempotencyKey())) {
orderRequest.setIdempotencyKey(generateIdempotencyKey(orderRequest));
}

// 2. 幂等性检查
if (transactionManager.checkIdempotency(orderRequest.getIdempotencyKey())) {
Order existingOrder = orderService.getOrderByIdempotencyKey(orderRequest.getIdempotencyKey());
return ResponseEntity.ok(ApiResponse.success(existingOrder, "订单已存在"));
}

// 3. 执行分布式事务
transactionManager.executeOrderTransaction(orderRequest);

// 4. 获取创建的订单
Order order = orderService.getOrderByIdempotencyKey(orderRequest.getIdempotencyKey());

return ResponseEntity.ok(ApiResponse.success(order, "订单创建成功"));

} catch (Exception e) {
log.error("订单创建失败: orderRequest={}", orderRequest, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("订单创建失败: " + e.getMessage()));
}
}

/**
* 查询订单
*/
@GetMapping("/{orderId}")
public ResponseEntity<ApiResponse<Order>> getOrder(@PathVariable Long orderId) {
try {
Order order = orderService.getOrderById(orderId);
if (order == null) {
return ResponseEntity.status(HttpStatus.NOT_FOUND)
.body(ApiResponse.error("订单不存在"));
}

return ResponseEntity.ok(ApiResponse.success(order));

} catch (Exception e) {
log.error("查询订单失败: orderId={}", orderId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("查询订单失败: " + e.getMessage()));
}
}

/**
* 取消订单
*/
@PostMapping("/{orderId}/cancel")
public ResponseEntity<ApiResponse<Void>> cancelOrder(@PathVariable Long orderId) {
try {
// 1. 更新订单状态
orderService.updateOrderStatus(orderId, OrderStatus.CANCELLED);

// 2. 发送订单取消消息
OrderStatusChangeMessage message = OrderStatusChangeMessage.builder()
.orderId(orderId)
.oldStatus(OrderStatus.CONFIRMED.name())
.newStatus(OrderStatus.CANCELLED.name())
.timestamp(System.currentTimeMillis())
.build();

kafkaTemplate.send("order-status-changes", orderId.toString(), message.toJson());

return ResponseEntity.ok(ApiResponse.success(null, "订单取消成功"));

} catch (Exception e) {
log.error("取消订单失败: orderId={}", orderId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("取消订单失败: " + e.getMessage()));
}
}

/**
* 生成幂等键
*/
private String generateIdempotencyKey(OrderRequest orderRequest) {
// 格式: userId + productId + quantity + timestamp
String timestamp = String.valueOf(System.currentTimeMillis());
return orderRequest.getUserId() + "_" +
orderRequest.getProductId() + "_" +
orderRequest.getQuantity() + "_" +
timestamp;
}
}

7. 测试方案

7.1 单元测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@SpringBootTest
@Transactional
class OrderServiceTest {

@Autowired
private OrderService orderService;

@Autowired
private InventoryService inventoryService;

@Test
void testCreateOrderWithIdempotency() {
// 1. 准备测试数据
OrderRequest orderRequest = OrderRequest.builder()
.userId(1L)
.productId(100L)
.quantity(2)
.amount(new BigDecimal("100.00"))
.idempotencyKey("test-key-001")
.build();

// 2. 第一次创建订单
Order order1 = orderService.createOrder(orderRequest);
assertNotNull(order1);
assertEquals("test-key-001", order1.getIdempotencyKey());

// 3. 第二次创建相同订单(幂等性测试)
Order order2 = orderService.createOrder(orderRequest);
assertNotNull(order2);
assertEquals(order1.getId(), order2.getId()); // 应该是同一个订单
}

@Test
void testInventoryDeduction() {
// 1. 准备库存数据
Inventory inventory = Inventory.builder()
.productId(100L)
.totalStock(100)
.availableStock(100)
.reservedStock(0)
.createdTime(LocalDateTime.now())
.build();
inventoryRepository.save(inventory);

// 2. 准备扣减消息
InventoryDeductionMessage message = InventoryDeductionMessage.builder()
.orderId(1L)
.productId(100L)
.quantity(10)
.idempotencyKey("deduct-test-001")
.build();

// 3. 执行库存扣减
boolean success = inventoryService.deductInventory(message);
assertTrue(success);

// 4. 验证库存变化
Inventory updatedInventory = inventoryService.getInventory(100L);
assertEquals(90, updatedInventory.getAvailableStock());

// 5. 重复扣减测试(幂等性)
boolean success2 = inventoryService.deductInventory(message);
assertTrue(success2); // 幂等性应该返回成功
}
}

7.2 集成测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = {
"spring.kafka.bootstrap-servers=localhost:9092",
"spring.datasource.url=jdbc:h2:mem:testdb"
})
class OrderIntegrationTest {

@Autowired
private TestRestTemplate restTemplate;

@Autowired
private KafkaTransactionManager transactionManager;

@Test
void testOrderCreationFlow() {
// 1. 准备订单请求
OrderRequest orderRequest = OrderRequest.builder()
.userId(1L)
.productId(100L)
.quantity(2)
.amount(new BigDecimal("100.00"))
.idempotencyKey("integration-test-001")
.build();

// 2. 发送创建订单请求
ResponseEntity<ApiResponse> response = restTemplate.postForEntity(
"/api/orders",
orderRequest,
ApiResponse.class
);

// 3. 验证响应
assertEquals(HttpStatus.OK, response.getStatusCode());
assertNotNull(response.getBody());
assertTrue(response.getBody().isSuccess());

// 4. 重复请求测试(幂等性)
ResponseEntity<ApiResponse> response2 = restTemplate.postForEntity(
"/api/orders",
orderRequest,
ApiResponse.class
);

assertEquals(HttpStatus.OK, response2.getStatusCode());
// 应该返回相同的订单
}
}

8. 监控和告警

8.1 监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Component
public class ConsistencyMonitor {

private final MeterRegistry meterRegistry;
private final Counter orderCreatedCounter;
private final Counter inventoryDeductionCounter;
private final Timer transactionDurationTimer;
private final Gauge activeTransactionsGauge;

public ConsistencyMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.orderCreatedCounter = Counter.builder("orders.created.total")
.description("订单创建总数")
.register(meterRegistry);
this.inventoryDeductionCounter = Counter.builder("inventory.deduction.total")
.description("库存扣减总数")
.register(meterRegistry);
this.transactionDurationTimer = Timer.builder("transaction.duration")
.description("事务执行时间")
.register(meterRegistry);
}

public void recordOrderCreated() {
orderCreatedCounter.increment();
}

public void recordInventoryDeduction() {
inventoryDeductionCounter.increment();
}

public void recordTransactionDuration(Duration duration) {
transactionDurationTimer.record(duration);
}
}

9. 总结

通过Kafka事务、幂等性设计和去重表的综合应用,我们成功构建了一个保证订单-库存一致性的分布式系统。关键特性包括:

9.1 技术优势

  1. 事务一致性: Kafka事务保证消息发送和业务操作的原子性
  2. 幂等性保证: 重复操作不会产生副作用
  3. 最终一致性: 通过消息队列实现分布式数据最终一致
  4. 故障恢复: 完善的错误处理和重试机制

9.2 实现要点

  1. 幂等键设计: 全局唯一的幂等键确保操作唯一性
  2. 去重表: 记录已执行的操作,避免重复处理
  3. 乐观锁: 使用版本号控制并发更新
  4. 监控告警: 完善的监控体系确保系统稳定运行

这套一致性保证方案不仅能够满足电商场景的需求,还为其他分布式系统提供了一致性保证的参考实现。