幂等怎么做?有哪些常见坑?

1. 概述

1.1 幂等性的重要性

幂等性是分布式系统设计的核心原则之一,保证同一个操作执行多次和执行一次的效果相同。

幂等性的意义

  • 防止重复操作:避免重复提交、重复支付等问题
  • 保证数据一致性:避免数据重复或错误
  • 提高系统可靠性:网络重试、消息重复等场景下保证正确性

1.2 幂等性的定义

幂等性(Idempotency):同一个操作执行一次和执行多次的效果相同。

数学定义:f(f(x)) = f(x)

HTTP幂等性

  • GET:幂等
  • PUT:幂等
  • DELETE:幂等
  • POST:不幂等
  • PATCH:不幂等

1.3 本文内容结构

本文将从以下几个方面全面解析幂等性:

  1. 幂等性概述:定义、重要性、分类
  2. 幂等性实现方式:唯一索引、分布式锁、Token机制、状态机等
  3. 常见场景:接口幂等、消息幂等、定时任务幂等
  4. 常见坑和注意事项:常见问题和解决方案
  5. 实战案例:实际项目中的幂等性实现

2. 幂等性概述

2.1 什么是幂等性

2.1.1 定义

幂等性:同一个操作执行一次和执行多次的效果相同。

特点

  • 执行多次 = 执行一次
  • 不会产生副作用
  • 结果可预测

2.1.2 示例

幂等操作

  • 查询操作:SELECT * FROM users WHERE id = 1(多次查询结果相同)
  • 删除操作:DELETE FROM users WHERE id = 1(多次删除结果相同)
  • 更新操作:UPDATE users SET name = ‘Alice’ WHERE id = 1(多次更新结果相同)

非幂等操作

  • 创建操作:INSERT INTO users (name) VALUES (‘Alice’)(多次创建会产生多条记录)
  • 累加操作:UPDATE users SET balance = balance + 100 WHERE id = 1(多次执行余额会累加)

2.2 为什么需要幂等性

2.2.1 网络重试

场景:网络不稳定,请求可能重复发送。

问题:如果不保证幂等性,重复请求会导致重复操作。

示例

  • 用户点击支付按钮,网络延迟导致重复提交
  • 如果不保证幂等性,可能重复扣款

2.2.2 消息重复

场景:消息队列可能重复投递消息。

问题:如果不保证幂等性,重复消息会导致重复处理。

示例

  • Kafka消息可能重复消费
  • 如果不保证幂等性,可能重复处理订单

2.2.3 定时任务重复执行

场景:定时任务可能因为系统故障重复执行。

问题:如果不保证幂等性,重复执行会导致数据错误。

示例

  • 定时任务计算用户积分
  • 如果不保证幂等性,可能重复计算积分

2.3 幂等性分类

2.3.1 接口幂等

接口幂等:HTTP接口的幂等性。

实现方式

  • 唯一索引
  • 分布式锁
  • Token机制
  • 状态机

2.3.2 消息幂等

消息幂等:消息消费的幂等性。

实现方式

  • 去重表
  • 唯一索引
  • 状态检查

2.3.3 定时任务幂等

定时任务幂等:定时任务的幂等性。

实现方式

  • 分布式锁
  • 状态标记
  • 时间窗口

3. 幂等性实现方式

3.1 唯一索引

3.1.1 原理

唯一索引:在数据库层面保证唯一性。

特点

  • 数据库层面保证
  • 性能较高
  • 实现简单

3.1.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Entity
@Table(name = "orders", uniqueConstraints = {
@UniqueConstraint(columnNames = {"order_no"})
})
public class Order {
@Id
private Long id;

@Column(name = "order_no", unique = true, nullable = false)
private String orderNo;

private BigDecimal amount;
private Integer status;
}

@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;

public void createOrder(Order order) {
try {
// 插入订单,如果订单号已存在,会抛出唯一约束异常
orderMapper.insert(order);
} catch (DuplicateKeyException e) {
// 订单已存在,直接返回(幂等)
log.info("Order already exists: {}", order.getOrderNo());
return;
}
}
}

3.1.3 优缺点

优点

  • 实现简单
  • 数据库层面保证
  • 性能较高

缺点

  • 只能保证唯一性,不能处理业务逻辑
  • 异常处理需要小心

3.2 分布式锁

3.2.1 原理

分布式锁:使用分布式锁保证同一时间只有一个请求能执行。

特点

  • 保证同一时间只有一个请求执行
  • 适合高并发场景
  • 需要额外的锁服务

3.2.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Service
public class OrderService {

@Autowired
private RedissonClient redissonClient;

@Autowired
private OrderMapper orderMapper;

public void createOrder(Order order) {
// 使用订单号作为锁的key
String lockKey = "order:lock:" + order.getOrderNo();
RLock lock = redissonClient.getLock(lockKey);

try {
// 尝试加锁,最多等待3秒,锁定10秒
boolean locked = lock.tryLock(3, 10, TimeUnit.SECONDS);
if (!locked) {
throw new BusinessException("系统繁忙,请稍后再试");
}

// 检查订单是否已存在
Order existingOrder = orderMapper.selectByOrderNo(order.getOrderNo());
if (existingOrder != null) {
// 订单已存在,直接返回(幂等)
log.info("Order already exists: {}", order.getOrderNo());
return;
}

// 创建订单
orderMapper.insert(order);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BusinessException("获取锁失败", e);
} finally {
// 释放锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}

3.2.3 优缺点

优点

  • 保证同一时间只有一个请求执行
  • 适合高并发场景
  • 可以处理复杂的业务逻辑

缺点

  • 需要额外的锁服务(Redis、ZooKeeper等)
  • 性能相对较低
  • 需要处理锁超时问题

3.3 Token机制

3.3.1 原理

Token机制:客户端请求前先获取Token,请求时携带Token,服务端验证Token是否已使用。

流程

  1. 客户端请求获取Token
  2. 服务端生成Token并存储(Redis)
  3. 客户端请求时携带Token
  4. 服务端验证Token,如果已使用则拒绝,否则处理并标记Token已使用

3.3.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@Service
public class TokenService {

@Autowired
private RedisTemplate<String, String> redisTemplate;

private static final String TOKEN_PREFIX = "token:";
private static final long TOKEN_EXPIRE_TIME = 300; // 5分钟

/**
* 生成Token
*/
public String generateToken() {
String token = UUID.randomUUID().toString();
String key = TOKEN_PREFIX + token;

// 存储Token,设置过期时间
redisTemplate.opsForValue().set(key, "0", TOKEN_EXPIRE_TIME, TimeUnit.SECONDS);

return token;
}

/**
* 验证Token
*/
public boolean validateToken(String token) {
String key = TOKEN_PREFIX + token;

// 检查Token是否存在
String value = redisTemplate.opsForValue().get(key);
if (value == null) {
// Token不存在或已过期
return false;
}

// 检查Token是否已使用
if ("1".equals(value)) {
// Token已使用
return false;
}

// 标记Token已使用
redisTemplate.opsForValue().set(key, "1", TOKEN_EXPIRE_TIME, TimeUnit.SECONDS);

return true;
}
}

@RestController
@RequestMapping("/api/order")
public class OrderController {

@Autowired
private TokenService tokenService;

@Autowired
private OrderService orderService;

/**
* 获取Token
*/
@GetMapping("/token")
public Result<String> getToken() {
String token = tokenService.generateToken();
return Result.success(token);
}

/**
* 创建订单(需要Token)
*/
@PostMapping("/create")
public Result<String> createOrder(@RequestHeader("X-Token") String token,
@RequestBody Order order) {
// 验证Token
if (!tokenService.validateToken(token)) {
return Result.error("Token无效或已使用");
}

// 创建订单
orderService.createOrder(order);

return Result.success("订单创建成功");
}
}

3.3.3 优缺点

优点

  • 实现简单
  • 适合前端防重复提交
  • 不需要额外的锁服务

缺点

  • 需要额外的Token获取步骤
  • Token需要存储(Redis)
  • 需要处理Token过期问题

3.4 状态机

3.4.1 原理

状态机:通过状态机保证操作的幂等性。

特点

  • 只有特定状态才能执行特定操作
  • 状态转换是确定的
  • 适合复杂业务逻辑

3.4.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public enum OrderStatus {
PENDING(0, "待支付"),
PAID(1, "已支付"),
SHIPPED(2, "已发货"),
COMPLETED(3, "已完成"),
CANCELLED(4, "已取消");

private final int code;
private final String desc;

OrderStatus(int code, String desc) {
this.code = code;
this.desc = desc;
}

public int getCode() {
return code;
}

public String getDesc() {
return desc;
}

/**
* 检查状态转换是否合法
*/
public boolean canTransitionTo(OrderStatus target) {
switch (this) {
case PENDING:
return target == PAID || target == CANCELLED;
case PAID:
return target == SHIPPED || target == CANCELLED;
case SHIPPED:
return target == COMPLETED;
case COMPLETED:
case CANCELLED:
return false; // 终态,不能转换
default:
return false;
}
}
}

@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;

@Autowired
private RedissonClient redissonClient;

/**
* 支付订单(幂等)
*/
public void payOrder(Long orderId) {
String lockKey = "order:lock:" + orderId;
RLock lock = redissonClient.getLock(lockKey);

try {
boolean locked = lock.tryLock(3, 10, TimeUnit.SECONDS);
if (!locked) {
throw new BusinessException("系统繁忙,请稍后再试");
}

// 查询订单
Order order = orderMapper.selectById(orderId);
if (order == null) {
throw new BusinessException("订单不存在");
}

// 检查状态(幂等)
OrderStatus currentStatus = OrderStatus.fromCode(order.getStatus());
if (currentStatus == OrderStatus.PAID) {
// 订单已支付,直接返回(幂等)
log.info("Order already paid: {}", orderId);
return;
}

// 检查状态转换是否合法
if (!currentStatus.canTransitionTo(OrderStatus.PAID)) {
throw new BusinessException("订单状态不允许支付");
}

// 更新订单状态
order.setStatus(OrderStatus.PAID.getCode());
orderMapper.updateById(order);

// 执行支付逻辑
paymentService.processPayment(order);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BusinessException("获取锁失败", e);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}

3.4.3 优缺点

优点

  • 适合复杂业务逻辑
  • 状态转换清晰
  • 可以防止非法状态转换

缺点

  • 实现相对复杂
  • 需要定义状态机
  • 需要处理状态转换异常

3.5 去重表

3.5.1 原理

去重表:使用去重表记录已处理的消息或请求。

特点

  • 适合消息幂等
  • 实现简单
  • 需要额外的表

3.5.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Entity
@Table(name = "message_deduplication", uniqueConstraints = {
@UniqueConstraint(columnNames = {"message_id"})
})
public class MessageDeduplication {
@Id
private Long id;

@Column(name = "message_id", unique = true, nullable = false)
private String messageId;

@Column(name = "status")
private Integer status; // 0-处理中, 1-已处理, 2-处理失败

@Column(name = "create_time")
private LocalDateTime createTime;

@Column(name = "update_time")
private LocalDateTime updateTime;
}

@Service
public class OrderEventConsumer {

@Autowired
private MessageDeduplicationMapper deduplicationMapper;

@Autowired
private OrderService orderService;

@KafkaListener(topics = "order-created", groupId = "order-processor")
public void handleOrderCreated(ConsumerRecord<String, String> record) {
String messageId = record.headers().lastHeader("message-id") != null
? new String(record.headers().lastHeader("message-id").value())
: record.key();

if (messageId == null) {
messageId = UUID.randomUUID().toString();
}

try {
// 插入去重记录
MessageDeduplication deduplication = new MessageDeduplication();
deduplication.setMessageId(messageId);
deduplication.setStatus(0); // 处理中
deduplication.setCreateTime(LocalDateTime.now());
deduplicationMapper.insert(deduplication);
} catch (DuplicateKeyException e) {
// 消息已处理,直接返回(幂等)
log.info("Message already processed: {}", messageId);
return;
}

try {
// 处理消息
Order order = JSON.parseObject(record.value(), Order.class);
orderService.processOrder(order);

// 更新状态为已处理
MessageDeduplication deduplication = deduplicationMapper.selectByMessageId(messageId);
deduplication.setStatus(1); // 已处理
deduplication.setUpdateTime(LocalDateTime.now());
deduplicationMapper.updateById(deduplication);
} catch (Exception e) {
// 处理失败,更新状态
MessageDeduplication deduplication = deduplicationMapper.selectByMessageId(messageId);
deduplication.setStatus(2); // 处理失败
deduplication.setUpdateTime(LocalDateTime.now());
deduplicationMapper.updateById(deduplication);

throw e;
}
}
}

3.5.3 优缺点

优点

  • 实现简单
  • 适合消息幂等
  • 可以记录处理状态

缺点

  • 需要额外的表
  • 需要定期清理历史数据
  • 可能影响性能

3.6 乐观锁

3.6.1 原理

乐观锁:使用版本号或时间戳保证数据一致性。

特点

  • 适合读多写少场景
  • 性能较高
  • 需要处理冲突

3.6.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Entity
public class Order {
@Id
private Long id;

private String orderNo;
private BigDecimal amount;
private Integer status;

@Version
private Integer version; // 乐观锁版本号
}

@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;

/**
* 更新订单(幂等)
*/
public void updateOrder(Order order) {
// 查询订单(带版本号)
Order existingOrder = orderMapper.selectById(order.getId());
if (existingOrder == null) {
throw new BusinessException("订单不存在");
}

// 设置版本号
order.setVersion(existingOrder.getVersion());

// 更新订单(带版本号检查)
int updated = orderMapper.updateByIdWithVersion(order);
if (updated == 0) {
// 版本号冲突,说明订单已被其他请求修改
// 可以重新查询并重试,或者直接返回(幂等)
log.warn("Order version conflict: {}", order.getId());
throw new BusinessException("订单已被修改,请刷新后重试");
}
}
}

@Mapper
public interface OrderMapper extends BaseMapper<Order> {

/**
* 带版本号的更新
*/
@Update("UPDATE orders SET amount = #{amount}, status = #{status}, version = version + 1 " +
"WHERE id = #{id} AND version = #{version}")
int updateByIdWithVersion(Order order);
}

3.6.3 优缺点

优点

  • 性能较高
  • 适合读多写少场景
  • 不需要额外的锁

缺点

  • 需要处理版本冲突
  • 可能需要重试
  • 不适合高并发写场景

4. 常见场景

4.1 接口幂等

4.1.1 创建订单

场景:用户点击创建订单按钮,可能重复提交。

解决方案:使用唯一索引 + 分布式锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Service
public class OrderService {

@Autowired
private RedissonClient redissonClient;

@Autowired
private OrderMapper orderMapper;

public void createOrder(Order order) {
String lockKey = "order:lock:" + order.getOrderNo();
RLock lock = redissonClient.getLock(lockKey);

try {
boolean locked = lock.tryLock(3, 10, TimeUnit.SECONDS);
if (!locked) {
throw new BusinessException("系统繁忙,请稍后再试");
}

// 检查订单是否已存在
Order existingOrder = orderMapper.selectByOrderNo(order.getOrderNo());
if (existingOrder != null) {
// 订单已存在,直接返回(幂等)
return;
}

// 创建订单
orderMapper.insert(order);
} catch (DuplicateKeyException e) {
// 唯一索引冲突,订单已存在(幂等)
log.info("Order already exists: {}", order.getOrderNo());
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}

4.1.2 支付接口

场景:支付接口可能被重复调用。

解决方案:使用状态机 + 分布式锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Service
public class PaymentService {

public void pay(Long orderId, BigDecimal amount) {
String lockKey = "payment:lock:" + orderId;
RLock lock = redissonClient.getLock(lockKey);

try {
boolean locked = lock.tryLock(3, 10, TimeUnit.SECONDS);
if (!locked) {
throw new BusinessException("系统繁忙,请稍后再试");
}

// 查询订单
Order order = orderMapper.selectById(orderId);
if (order == null) {
throw new BusinessException("订单不存在");
}

// 检查订单状态(幂等)
if (order.getStatus() == OrderStatus.PAID.getCode()) {
// 订单已支付,直接返回(幂等)
return;
}

// 检查状态转换
if (order.getStatus() != OrderStatus.PENDING.getCode()) {
throw new BusinessException("订单状态不允许支付");
}

// 执行支付
processPayment(order, amount);

// 更新订单状态
order.setStatus(OrderStatus.PAID.getCode());
orderMapper.updateById(order);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}

4.2 消息幂等

4.2.1 Kafka消息消费

场景:Kafka消息可能重复消费。

解决方案:使用去重表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Component
public class OrderEventConsumer {

@Autowired
private MessageDeduplicationMapper deduplicationMapper;

@KafkaListener(topics = "order-created", groupId = "order-processor")
public void handleOrderCreated(ConsumerRecord<String, String> record) {
String messageId = getMessageId(record);

try {
// 插入去重记录
MessageDeduplication deduplication = new MessageDeduplication();
deduplication.setMessageId(messageId);
deduplication.setStatus(0);
deduplication.setCreateTime(LocalDateTime.now());
deduplicationMapper.insert(deduplication);
} catch (DuplicateKeyException e) {
// 消息已处理,直接返回(幂等)
log.info("Message already processed: {}", messageId);
return;
}

try {
// 处理消息
Order order = JSON.parseObject(record.value(), Order.class);
orderService.processOrder(order);

// 更新状态
updateDeduplicationStatus(messageId, 1);
} catch (Exception e) {
updateDeduplicationStatus(messageId, 2);
throw e;
}
}
}

4.3 定时任务幂等

4.3.1 定时计算积分

场景:定时任务计算用户积分,可能重复执行。

解决方案:使用分布式锁 + 时间窗口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
public class PointsCalculationTask {

@Autowired
private RedissonClient redissonClient;

@Scheduled(cron = "0 0 1 * * ?") // 每天凌晨1点执行
public void calculatePoints() {
String lockKey = "points:calculation:lock:" + LocalDate.now();
RLock lock = redissonClient.getLock(lockKey);

try {
// 尝试加锁,如果已锁定说明其他实例正在执行
boolean locked = lock.tryLock(0, 30, TimeUnit.MINUTES);
if (!locked) {
log.info("Points calculation already running");
return;
}

// 执行计算
calculateUserPoints();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}

5. 常见坑和注意事项

5.1 常见坑

5.1.1 唯一索引的坑

问题:使用唯一索引时,如果业务逻辑失败,但数据已插入,会导致后续重试失败。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Service
public class OrderService {

public void createOrder(Order order) {
try {
// 插入订单
orderMapper.insert(order);

// 后续业务逻辑可能失败
inventoryService.deductStock(order.getSkuId(), order.getQuantity());
// 如果这里失败,订单已创建,但库存未扣减
} catch (DuplicateKeyException e) {
// 如果重试,这里会捕获唯一索引异常
// 但订单可能处于不一致状态
}
}
}

解决方案:使用事务 + 状态标记。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Service
public class OrderService {

@Transactional
public void createOrder(Order order) {
// 检查订单是否已存在
Order existingOrder = orderMapper.selectByOrderNo(order.getOrderNo());
if (existingOrder != null) {
// 检查订单状态
if (existingOrder.getStatus() == OrderStatus.COMPLETED.getCode()) {
// 订单已完成,直接返回(幂等)
return;
} else if (existingOrder.getStatus() == OrderStatus.FAILED.getCode()) {
// 订单失败,可以重试
order = existingOrder;
} else {
// 订单处理中,直接返回(幂等)
return;
}
} else {
// 创建订单,状态为处理中
order.setStatus(OrderStatus.PROCESSING.getCode());
orderMapper.insert(order);
}

try {
// 执行业务逻辑
inventoryService.deductStock(order.getSkuId(), order.getQuantity());

// 更新订单状态为已完成
order.setStatus(OrderStatus.COMPLETED.getCode());
orderMapper.updateById(order);
} catch (Exception e) {
// 更新订单状态为失败
order.setStatus(OrderStatus.FAILED.getCode());
orderMapper.updateById(order);
throw e;
}
}
}

5.1.2 分布式锁的坑

问题1:锁超时时间设置不合理。

错误示例

1
2
// 锁超时时间太短,业务逻辑可能还没执行完就释放了
lock.tryLock(3, 5, TimeUnit.SECONDS);

正确示例

1
2
// 根据业务逻辑的执行时间设置合理的超时时间
lock.tryLock(3, 30, TimeUnit.SECONDS);

问题2:锁释放时机不对。

错误示例

1
2
3
4
5
6
7
try {
lock.tryLock(3, 10, TimeUnit.SECONDS);
// 业务逻辑
lock.unlock(); // 如果业务逻辑抛出异常,锁可能不会释放
} catch (Exception e) {
// 异常处理
}

正确示例

1
2
3
4
5
6
7
8
try {
lock.tryLock(3, 10, TimeUnit.SECONDS);
// 业务逻辑
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock(); // 确保锁一定会释放
}
}

5.1.3 Token机制的坑

问题:Token可能被多次使用。

错误示例

1
2
3
4
5
// 检查Token是否存在
if (redisTemplate.hasKey(tokenKey)) {
// 处理业务逻辑
redisTemplate.delete(tokenKey); // 如果这里失败,Token可能被再次使用
}

正确示例

1
2
3
4
5
6
7
// 使用原子操作
String value = redisTemplate.opsForValue().getAndSet(tokenKey, "1");
if (value == null || "1".equals(value)) {
// Token不存在或已使用
return false;
}
// 处理业务逻辑

5.1.4 状态机的坑

问题:状态检查不完整。

错误示例

1
2
3
4
5
6
// 只检查当前状态,没有检查状态转换
if (order.getStatus() == OrderStatus.PAID.getCode()) {
return; // 幂等
}
// 更新状态
order.setStatus(OrderStatus.PAID.getCode());

正确示例

1
2
3
4
5
6
7
8
9
10
// 检查状态和状态转换
OrderStatus currentStatus = OrderStatus.fromCode(order.getStatus());
if (currentStatus == OrderStatus.PAID) {
return; // 幂等
}
if (!currentStatus.canTransitionTo(OrderStatus.PAID)) {
throw new BusinessException("订单状态不允许支付");
}
// 更新状态
order.setStatus(OrderStatus.PAID.getCode());

5.2 注意事项

5.2.1 幂等键的选择

原则

  • 唯一性:保证全局唯一
  • 业务意义:有业务含义
  • 稳定性:不会变化

示例

  • 订单号:order_no(唯一,有业务意义)
  • 支付流水号:payment_no(唯一,有业务意义)
  • 用户ID + 时间戳:user_id + timestamp(唯一,但时间戳可能变化)

5.2.2 幂等性的粒度

原则

  • 接口级别:整个接口幂等
  • 操作级别:单个操作幂等
  • 数据级别:单条数据幂等

示例

  • 创建订单接口:整个接口幂等
  • 支付操作:单个支付操作幂等
  • 订单数据:单条订单数据幂等

5.2.3 幂等性的时效性

原则

  • 永久幂等:操作永远幂等(如查询、删除)
  • 临时幂等:操作在一定时间内幂等(如Token机制)
  • 状态幂等:操作在特定状态下幂等(如状态机)

示例

  • 查询订单:永久幂等
  • Token机制:5分钟内幂等
  • 支付操作:订单未支付时幂等

6. 实战案例

6.1 案例1:支付系统(状态机 + 分布式锁)

6.1.1 场景

需求:支付系统,保证支付操作的幂等性。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Service
public class PaymentService {

@Autowired
private RedissonClient redissonClient;

@Autowired
private PaymentRecordMapper paymentRecordMapper;

public void pay(Long orderId, BigDecimal amount, String paymentNo) {
String lockKey = "payment:lock:" + paymentNo;
RLock lock = redissonClient.getLock(lockKey);

try {
boolean locked = lock.tryLock(3, 30, TimeUnit.SECONDS);
if (!locked) {
throw new BusinessException("系统繁忙,请稍后再试");
}

// 检查支付记录是否已存在
PaymentRecord existingRecord = paymentRecordMapper.selectByPaymentNo(paymentNo);
if (existingRecord != null) {
// 检查支付状态
if (existingRecord.getStatus() == PaymentStatus.SUCCESS.getCode()) {
// 支付成功,直接返回(幂等)
log.info("Payment already success: {}", paymentNo);
return;
} else if (existingRecord.getStatus() == PaymentStatus.PROCESSING.getCode()) {
// 支付处理中,直接返回(幂等)
log.info("Payment processing: {}", paymentNo);
return;
}
}

// 创建支付记录
PaymentRecord record = new PaymentRecord();
record.setOrderId(orderId);
record.setPaymentNo(paymentNo);
record.setAmount(amount);
record.setStatus(PaymentStatus.PROCESSING.getCode());
paymentRecordMapper.insert(record);

try {
// 调用支付接口
PaymentResult result = paymentGateway.pay(orderId, amount);

// 更新支付记录
record.setStatus(result.isSuccess()
? PaymentStatus.SUCCESS.getCode()
: PaymentStatus.FAILED.getCode());
record.setTransactionId(result.getTransactionId());
paymentRecordMapper.updateById(record);
} catch (Exception e) {
// 更新支付记录为失败
record.setStatus(PaymentStatus.FAILED.getCode());
record.setFailureReason(e.getMessage());
paymentRecordMapper.updateById(record);
throw e;
}
} catch (DuplicateKeyException e) {
// 支付记录已存在,直接返回(幂等)
log.info("Payment record already exists: {}", paymentNo);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}

6.2 案例2:消息消费(去重表)

6.2.1 场景

需求:Kafka消息消费,保证消息处理的幂等性。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Component
public class OrderEventConsumer {

@Autowired
private MessageDeduplicationMapper deduplicationMapper;

@Autowired
private OrderService orderService;

@KafkaListener(topics = "order-created", groupId = "order-processor")
public void handleOrderCreated(ConsumerRecord<String, String> record) {
// 获取消息ID
String messageId = getMessageId(record);
if (messageId == null) {
messageId = generateMessageId(record);
}

// 检查消息是否已处理
MessageDeduplication existing = deduplicationMapper.selectByMessageId(messageId);
if (existing != null) {
if (existing.getStatus() == 1) {
// 消息已处理,直接返回(幂等)
log.info("Message already processed: {}", messageId);
return;
} else if (existing.getStatus() == 0) {
// 消息处理中,可能上次处理失败,可以重试
// 但需要设置超时时间,避免无限重试
if (existing.getCreateTime().plusMinutes(10).isAfter(LocalDateTime.now())) {
log.warn("Message processing timeout: {}", messageId);
return;
}
}
}

// 插入去重记录
MessageDeduplication deduplication = new MessageDeduplication();
deduplication.setMessageId(messageId);
deduplication.setStatus(0); // 处理中
deduplication.setCreateTime(LocalDateTime.now());

try {
deduplicationMapper.insert(deduplication);
} catch (DuplicateKeyException e) {
// 消息已处理,直接返回(幂等)
log.info("Message already processed: {}", messageId);
return;
}

try {
// 处理消息
Order order = JSON.parseObject(record.value(), Order.class);
orderService.processOrder(order);

// 更新状态为已处理
deduplication.setStatus(1);
deduplication.setUpdateTime(LocalDateTime.now());
deduplicationMapper.updateById(deduplication);
} catch (Exception e) {
// 处理失败,更新状态
deduplication.setStatus(2);
deduplication.setUpdateTime(LocalDateTime.now());
deduplicationMapper.updateById(deduplication);

// 可以发送到死信队列或重试队列
throw e;
}
}
}

6.3 案例3:定时任务(分布式锁 + 时间窗口)

6.3.1 场景

需求:定时任务计算用户积分,保证幂等性。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Component
public class PointsCalculationTask {

@Autowired
private RedissonClient redissonClient;

@Autowired
private UserPointsService userPointsService;

@Scheduled(cron = "0 0 1 * * ?") // 每天凌晨1点执行
public void calculatePoints() {
LocalDate today = LocalDate.now();
String lockKey = "points:calculation:lock:" + today;
RLock lock = redissonClient.getLock(lockKey);

try {
// 尝试加锁,如果已锁定说明其他实例正在执行
boolean locked = lock.tryLock(0, 30, TimeUnit.MINUTES);
if (!locked) {
log.info("Points calculation already running for date: {}", today);
return;
}

// 检查今天是否已计算
if (userPointsService.isCalculated(today)) {
log.info("Points already calculated for date: {}", today);
return;
}

// 执行计算
userPointsService.calculatePoints(today);

// 标记已计算
userPointsService.markCalculated(today);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Points calculation interrupted", e);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}

7. 总结

7.1 核心要点

  1. 幂等性定义:同一个操作执行一次和执行多次的效果相同
  2. 实现方式:唯一索引、分布式锁、Token机制、状态机、去重表、乐观锁
  3. 常见场景:接口幂等、消息幂等、定时任务幂等
  4. 常见坑:唯一索引的坑、分布式锁的坑、Token机制的坑、状态机的坑
  5. 注意事项:幂等键的选择、幂等性的粒度、幂等性的时效性

7.2 关键理解

  1. 幂等性是分布式系统的基础:必须保证关键操作的幂等性
  2. 选择合适的实现方式:根据场景选择最合适的实现方式
  3. 注意常见坑:避免常见的实现错误
  4. 测试幂等性:充分测试幂等性的实现

7.3 最佳实践

  1. 接口幂等:使用唯一索引 + 分布式锁 + 状态机
  2. 消息幂等:使用去重表 + 状态检查
  3. 定时任务幂等:使用分布式锁 + 时间窗口
  4. 幂等键选择:选择唯一、有业务意义、稳定的键
  5. 异常处理:正确处理异常,保证幂等性

相关文章