第475集幂等怎么做?有哪些常见坑? | 字数总计: 5.9k | 阅读时长: 24分钟 | 阅读量:
幂等怎么做?有哪些常见坑? 1. 概述 1.1 幂等性的重要性 幂等性 是分布式系统设计的核心原则之一,保证同一个操作执行多次和执行一次的效果相同。
幂等性的意义 :
防止重复操作 :避免重复提交、重复支付等问题
保证数据一致性 :避免数据重复或错误
提高系统可靠性 :网络重试、消息重复等场景下保证正确性
1.2 幂等性的定义 幂等性(Idempotency) :同一个操作执行一次和执行多次的效果相同。
数学定义 :f(f(x)) = f(x)
HTTP幂等性 :
GET :幂等
PUT :幂等
DELETE :幂等
POST :不幂等
PATCH :不幂等
1.3 本文内容结构 本文将从以下几个方面全面解析幂等性:
幂等性概述 :定义、重要性、分类
幂等性实现方式 :唯一索引、分布式锁、Token机制、状态机等
常见场景 :接口幂等、消息幂等、定时任务幂等
常见坑和注意事项 :常见问题和解决方案
实战案例 :实际项目中的幂等性实现
2. 幂等性概述 2.1 什么是幂等性 2.1.1 定义 幂等性 :同一个操作执行一次和执行多次的效果相同。
特点 :
执行多次 = 执行一次
不会产生副作用
结果可预测
2.1.2 示例 幂等操作 :
查询操作 :SELECT * FROM users WHERE id = 1(多次查询结果相同)
删除操作 :DELETE FROM users WHERE id = 1(多次删除结果相同)
更新操作 :UPDATE users SET name = ‘Alice’ WHERE id = 1(多次更新结果相同)
非幂等操作 :
创建操作 :INSERT INTO users (name) VALUES (‘Alice’)(多次创建会产生多条记录)
累加操作 :UPDATE users SET balance = balance + 100 WHERE id = 1(多次执行余额会累加)
2.2 为什么需要幂等性 2.2.1 网络重试 场景 :网络不稳定,请求可能重复发送。
问题 :如果不保证幂等性,重复请求会导致重复操作。
示例 :
用户点击支付按钮,网络延迟导致重复提交
如果不保证幂等性,可能重复扣款
2.2.2 消息重复 场景 :消息队列可能重复投递消息。
问题 :如果不保证幂等性,重复消息会导致重复处理。
示例 :
Kafka消息可能重复消费
如果不保证幂等性,可能重复处理订单
2.2.3 定时任务重复执行 场景 :定时任务可能因为系统故障重复执行。
问题 :如果不保证幂等性,重复执行会导致数据错误。
示例 :
定时任务计算用户积分
如果不保证幂等性,可能重复计算积分
2.3 幂等性分类 2.3.1 接口幂等 接口幂等 :HTTP接口的幂等性。
实现方式 :
2.3.2 消息幂等 消息幂等 :消息消费的幂等性。
实现方式 :
2.3.3 定时任务幂等 定时任务幂等 :定时任务的幂等性。
实现方式 :
3. 幂等性实现方式 3.1 唯一索引 3.1.1 原理 唯一索引 :在数据库层面保证唯一性。
特点 :
3.1.2 实现代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Entity @Table(name = "orders", uniqueConstraints = { @UniqueConstraint(columnNames = {"order_no"}) }) public class Order { @Id private Long id; @Column(name = "order_no", unique = true, nullable = false) private String orderNo; private BigDecimal amount; private Integer status; } @Service public class OrderService { @Autowired private OrderMapper orderMapper; public void createOrder (Order order) { try { orderMapper.insert(order); } catch (DuplicateKeyException e) { log.info("Order already exists: {}" , order.getOrderNo()); return ; } } }
3.1.3 优缺点 优点 :
缺点 :
只能保证唯一性,不能处理业务逻辑
异常处理需要小心
3.2 分布式锁 3.2.1 原理 分布式锁 :使用分布式锁保证同一时间只有一个请求能执行。
特点 :
保证同一时间只有一个请求执行
适合高并发场景
需要额外的锁服务
3.2.2 实现代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Service public class OrderService { @Autowired private RedissonClient redissonClient; @Autowired private OrderMapper orderMapper; public void createOrder (Order order) { String lockKey = "order:lock:" + order.getOrderNo(); RLock lock = redissonClient.getLock(lockKey); try { boolean locked = lock.tryLock(3 , 10 , TimeUnit.SECONDS); if (!locked) { throw new BusinessException ("系统繁忙,请稍后再试" ); } Order existingOrder = orderMapper.selectByOrderNo(order.getOrderNo()); if (existingOrder != null ) { log.info("Order already exists: {}" , order.getOrderNo()); return ; } orderMapper.insert(order); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BusinessException ("获取锁失败" , e); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } }
3.2.3 优缺点 优点 :
保证同一时间只有一个请求执行
适合高并发场景
可以处理复杂的业务逻辑
缺点 :
需要额外的锁服务(Redis、ZooKeeper等)
性能相对较低
需要处理锁超时问题
3.3 Token机制 3.3.1 原理 Token机制 :客户端请求前先获取Token,请求时携带Token,服务端验证Token是否已使用。
流程 :
客户端请求获取Token
服务端生成Token并存储(Redis)
客户端请求时携带Token
服务端验证Token,如果已使用则拒绝,否则处理并标记Token已使用
3.3.2 实现代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 @Service public class TokenService { @Autowired private RedisTemplate<String, String> redisTemplate; private static final String TOKEN_PREFIX = "token:" ; private static final long TOKEN_EXPIRE_TIME = 300 ; public String generateToken () { String token = UUID.randomUUID().toString(); String key = TOKEN_PREFIX + token; redisTemplate.opsForValue().set(key, "0" , TOKEN_EXPIRE_TIME, TimeUnit.SECONDS); return token; } public boolean validateToken (String token) { String key = TOKEN_PREFIX + token; String value = redisTemplate.opsForValue().get(key); if (value == null ) { return false ; } if ("1" .equals(value)) { return false ; } redisTemplate.opsForValue().set(key, "1" , TOKEN_EXPIRE_TIME, TimeUnit.SECONDS); return true ; } } @RestController @RequestMapping("/api/order") public class OrderController { @Autowired private TokenService tokenService; @Autowired private OrderService orderService; @GetMapping("/token") public Result<String> getToken () { String token = tokenService.generateToken(); return Result.success(token); } @PostMapping("/create") public Result<String> createOrder (@RequestHeader("X-Token") String token, @RequestBody Order order) { if (!tokenService.validateToken(token)) { return Result.error("Token无效或已使用" ); } orderService.createOrder(order); return Result.success("订单创建成功" ); } }
3.3.3 优缺点 优点 :
缺点 :
需要额外的Token获取步骤
Token需要存储(Redis)
需要处理Token过期问题
3.4 状态机 3.4.1 原理 状态机 :通过状态机保证操作的幂等性。
特点 :
只有特定状态才能执行特定操作
状态转换是确定的
适合复杂业务逻辑
3.4.2 实现代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 public enum OrderStatus { PENDING(0 , "待支付" ), PAID(1 , "已支付" ), SHIPPED(2 , "已发货" ), COMPLETED(3 , "已完成" ), CANCELLED(4 , "已取消" ); private final int code; private final String desc; OrderStatus(int code, String desc) { this .code = code; this .desc = desc; } public int getCode () { return code; } public String getDesc () { return desc; } public boolean canTransitionTo (OrderStatus target) { switch (this ) { case PENDING: return target == PAID || target == CANCELLED; case PAID: return target == SHIPPED || target == CANCELLED; case SHIPPED: return target == COMPLETED; case COMPLETED: case CANCELLED: return false ; default : return false ; } } } @Service public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private RedissonClient redissonClient; public void payOrder (Long orderId) { String lockKey = "order:lock:" + orderId; RLock lock = redissonClient.getLock(lockKey); try { boolean locked = lock.tryLock(3 , 10 , TimeUnit.SECONDS); if (!locked) { throw new BusinessException ("系统繁忙,请稍后再试" ); } Order order = orderMapper.selectById(orderId); if (order == null ) { throw new BusinessException ("订单不存在" ); } OrderStatus currentStatus = OrderStatus.fromCode(order.getStatus()); if (currentStatus == OrderStatus.PAID) { log.info("Order already paid: {}" , orderId); return ; } if (!currentStatus.canTransitionTo(OrderStatus.PAID)) { throw new BusinessException ("订单状态不允许支付" ); } order.setStatus(OrderStatus.PAID.getCode()); orderMapper.updateById(order); paymentService.processPayment(order); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BusinessException ("获取锁失败" , e); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } }
3.4.3 优缺点 优点 :
适合复杂业务逻辑
状态转换清晰
可以防止非法状态转换
缺点 :
实现相对复杂
需要定义状态机
需要处理状态转换异常
3.5 去重表 3.5.1 原理 去重表 :使用去重表记录已处理的消息或请求。
特点 :
3.5.2 实现代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 @Entity @Table(name = "message_deduplication", uniqueConstraints = { @UniqueConstraint(columnNames = {"message_id"}) }) public class MessageDeduplication { @Id private Long id; @Column(name = "message_id", unique = true, nullable = false) private String messageId; @Column(name = "status") private Integer status; @Column(name = "create_time") private LocalDateTime createTime; @Column(name = "update_time") private LocalDateTime updateTime; } @Service public class OrderEventConsumer { @Autowired private MessageDeduplicationMapper deduplicationMapper; @Autowired private OrderService orderService; @KafkaListener(topics = "order-created", groupId = "order-processor") public void handleOrderCreated (ConsumerRecord<String, String> record) { String messageId = record.headers().lastHeader("message-id" ) != null ? new String (record.headers().lastHeader("message-id" ).value()) : record.key(); if (messageId == null ) { messageId = UUID.randomUUID().toString(); } try { MessageDeduplication deduplication = new MessageDeduplication (); deduplication.setMessageId(messageId); deduplication.setStatus(0 ); deduplication.setCreateTime(LocalDateTime.now()); deduplicationMapper.insert(deduplication); } catch (DuplicateKeyException e) { log.info("Message already processed: {}" , messageId); return ; } try { Order order = JSON.parseObject(record.value(), Order.class); orderService.processOrder(order); MessageDeduplication deduplication = deduplicationMapper.selectByMessageId(messageId); deduplication.setStatus(1 ); deduplication.setUpdateTime(LocalDateTime.now()); deduplicationMapper.updateById(deduplication); } catch (Exception e) { MessageDeduplication deduplication = deduplicationMapper.selectByMessageId(messageId); deduplication.setStatus(2 ); deduplication.setUpdateTime(LocalDateTime.now()); deduplicationMapper.updateById(deduplication); throw e; } } }
3.5.3 优缺点 优点 :
缺点 :
3.6 乐观锁 3.6.1 原理 乐观锁 :使用版本号或时间戳保证数据一致性。
特点 :
3.6.2 实现代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Entity public class Order { @Id private Long id; private String orderNo; private BigDecimal amount; private Integer status; @Version private Integer version; } @Service public class OrderService { @Autowired private OrderMapper orderMapper; public void updateOrder (Order order) { Order existingOrder = orderMapper.selectById(order.getId()); if (existingOrder == null ) { throw new BusinessException ("订单不存在" ); } order.setVersion(existingOrder.getVersion()); int updated = orderMapper.updateByIdWithVersion(order); if (updated == 0 ) { log.warn("Order version conflict: {}" , order.getId()); throw new BusinessException ("订单已被修改,请刷新后重试" ); } } } @Mapper public interface OrderMapper extends BaseMapper <Order> { @Update("UPDATE orders SET amount = #{amount}, status = #{status}, version = version + 1 " + "WHERE id = #{id} AND version = #{version}") int updateByIdWithVersion (Order order) ; }
3.6.3 优缺点 优点 :
缺点 :
需要处理版本冲突
可能需要重试
不适合高并发写场景
4. 常见场景 4.1 接口幂等 4.1.1 创建订单 场景 :用户点击创建订单按钮,可能重复提交。
解决方案 :使用唯一索引 + 分布式锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Service public class OrderService { @Autowired private RedissonClient redissonClient; @Autowired private OrderMapper orderMapper; public void createOrder (Order order) { String lockKey = "order:lock:" + order.getOrderNo(); RLock lock = redissonClient.getLock(lockKey); try { boolean locked = lock.tryLock(3 , 10 , TimeUnit.SECONDS); if (!locked) { throw new BusinessException ("系统繁忙,请稍后再试" ); } Order existingOrder = orderMapper.selectByOrderNo(order.getOrderNo()); if (existingOrder != null ) { return ; } orderMapper.insert(order); } catch (DuplicateKeyException e) { log.info("Order already exists: {}" , order.getOrderNo()); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } }
4.1.2 支付接口 场景 :支付接口可能被重复调用。
解决方案 :使用状态机 + 分布式锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Service public class PaymentService { public void pay (Long orderId, BigDecimal amount) { String lockKey = "payment:lock:" + orderId; RLock lock = redissonClient.getLock(lockKey); try { boolean locked = lock.tryLock(3 , 10 , TimeUnit.SECONDS); if (!locked) { throw new BusinessException ("系统繁忙,请稍后再试" ); } Order order = orderMapper.selectById(orderId); if (order == null ) { throw new BusinessException ("订单不存在" ); } if (order.getStatus() == OrderStatus.PAID.getCode()) { return ; } if (order.getStatus() != OrderStatus.PENDING.getCode()) { throw new BusinessException ("订单状态不允许支付" ); } processPayment(order, amount); order.setStatus(OrderStatus.PAID.getCode()); orderMapper.updateById(order); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } }
4.2 消息幂等 4.2.1 Kafka消息消费 场景 :Kafka消息可能重复消费。
解决方案 :使用去重表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Component public class OrderEventConsumer { @Autowired private MessageDeduplicationMapper deduplicationMapper; @KafkaListener(topics = "order-created", groupId = "order-processor") public void handleOrderCreated (ConsumerRecord<String, String> record) { String messageId = getMessageId(record); try { MessageDeduplication deduplication = new MessageDeduplication (); deduplication.setMessageId(messageId); deduplication.setStatus(0 ); deduplication.setCreateTime(LocalDateTime.now()); deduplicationMapper.insert(deduplication); } catch (DuplicateKeyException e) { log.info("Message already processed: {}" , messageId); return ; } try { Order order = JSON.parseObject(record.value(), Order.class); orderService.processOrder(order); updateDeduplicationStatus(messageId, 1 ); } catch (Exception e) { updateDeduplicationStatus(messageId, 2 ); throw e; } } }
4.3 定时任务幂等 4.3.1 定时计算积分 场景 :定时任务计算用户积分,可能重复执行。
解决方案 :使用分布式锁 + 时间窗口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Component public class PointsCalculationTask { @Autowired private RedissonClient redissonClient; @Scheduled(cron = "0 0 1 * * ?") public void calculatePoints () { String lockKey = "points:calculation:lock:" + LocalDate.now(); RLock lock = redissonClient.getLock(lockKey); try { boolean locked = lock.tryLock(0 , 30 , TimeUnit.MINUTES); if (!locked) { log.info("Points calculation already running" ); return ; } calculateUserPoints(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } }
5. 常见坑和注意事项 5.1 常见坑 5.1.1 唯一索引的坑 问题 :使用唯一索引时,如果业务逻辑失败,但数据已插入,会导致后续重试失败。
示例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Service public class OrderService { public void createOrder (Order order) { try { orderMapper.insert(order); inventoryService.deductStock(order.getSkuId(), order.getQuantity()); } catch (DuplicateKeyException e) { } } }
解决方案 :使用事务 + 状态标记。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Service public class OrderService { @Transactional public void createOrder (Order order) { Order existingOrder = orderMapper.selectByOrderNo(order.getOrderNo()); if (existingOrder != null ) { if (existingOrder.getStatus() == OrderStatus.COMPLETED.getCode()) { return ; } else if (existingOrder.getStatus() == OrderStatus.FAILED.getCode()) { order = existingOrder; } else { return ; } } else { order.setStatus(OrderStatus.PROCESSING.getCode()); orderMapper.insert(order); } try { inventoryService.deductStock(order.getSkuId(), order.getQuantity()); order.setStatus(OrderStatus.COMPLETED.getCode()); orderMapper.updateById(order); } catch (Exception e) { order.setStatus(OrderStatus.FAILED.getCode()); orderMapper.updateById(order); throw e; } } }
5.1.2 分布式锁的坑 问题1 :锁超时时间设置不合理。
错误示例 :
1 2 lock.tryLock(3 , 5 , TimeUnit.SECONDS);
正确示例 :
1 2 lock.tryLock(3 , 30 , TimeUnit.SECONDS);
问题2 :锁释放时机不对。
错误示例 :
1 2 3 4 5 6 7 try { lock.tryLock(3 , 10 , TimeUnit.SECONDS); lock.unlock(); } catch (Exception e) { }
正确示例 :
1 2 3 4 5 6 7 8 try { lock.tryLock(3 , 10 , TimeUnit.SECONDS); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } }
5.1.3 Token机制的坑 问题 :Token可能被多次使用。
错误示例 :
1 2 3 4 5 if (redisTemplate.hasKey(tokenKey)) { redisTemplate.delete(tokenKey); }
正确示例 :
1 2 3 4 5 6 7 String value = redisTemplate.opsForValue().getAndSet(tokenKey, "1" );if (value == null || "1" .equals(value)) { return false ; }
5.1.4 状态机的坑 问题 :状态检查不完整。
错误示例 :
1 2 3 4 5 6 if (order.getStatus() == OrderStatus.PAID.getCode()) { return ; } order.setStatus(OrderStatus.PAID.getCode());
正确示例 :
1 2 3 4 5 6 7 8 9 10 OrderStatus currentStatus = OrderStatus.fromCode(order.getStatus());if (currentStatus == OrderStatus.PAID) { return ; } if (!currentStatus.canTransitionTo(OrderStatus.PAID)) { throw new BusinessException ("订单状态不允许支付" ); } order.setStatus(OrderStatus.PAID.getCode());
5.2 注意事项 5.2.1 幂等键的选择 原则 :
唯一性 :保证全局唯一
业务意义 :有业务含义
稳定性 :不会变化
示例 :
订单号 :order_no(唯一,有业务意义)
支付流水号 :payment_no(唯一,有业务意义)
用户ID + 时间戳 :user_id + timestamp(唯一,但时间戳可能变化)
5.2.2 幂等性的粒度 原则 :
接口级别 :整个接口幂等
操作级别 :单个操作幂等
数据级别 :单条数据幂等
示例 :
创建订单接口 :整个接口幂等
支付操作 :单个支付操作幂等
订单数据 :单条订单数据幂等
5.2.3 幂等性的时效性 原则 :
永久幂等 :操作永远幂等(如查询、删除)
临时幂等 :操作在一定时间内幂等(如Token机制)
状态幂等 :操作在特定状态下幂等(如状态机)
示例 :
查询订单 :永久幂等
Token机制 :5分钟内幂等
支付操作 :订单未支付时幂等
6. 实战案例 6.1 案例1:支付系统(状态机 + 分布式锁) 6.1.1 场景 需求 :支付系统,保证支付操作的幂等性。
实现 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 @Service public class PaymentService { @Autowired private RedissonClient redissonClient; @Autowired private PaymentRecordMapper paymentRecordMapper; public void pay (Long orderId, BigDecimal amount, String paymentNo) { String lockKey = "payment:lock:" + paymentNo; RLock lock = redissonClient.getLock(lockKey); try { boolean locked = lock.tryLock(3 , 30 , TimeUnit.SECONDS); if (!locked) { throw new BusinessException ("系统繁忙,请稍后再试" ); } PaymentRecord existingRecord = paymentRecordMapper.selectByPaymentNo(paymentNo); if (existingRecord != null ) { if (existingRecord.getStatus() == PaymentStatus.SUCCESS.getCode()) { log.info("Payment already success: {}" , paymentNo); return ; } else if (existingRecord.getStatus() == PaymentStatus.PROCESSING.getCode()) { log.info("Payment processing: {}" , paymentNo); return ; } } PaymentRecord record = new PaymentRecord (); record.setOrderId(orderId); record.setPaymentNo(paymentNo); record.setAmount(amount); record.setStatus(PaymentStatus.PROCESSING.getCode()); paymentRecordMapper.insert(record); try { PaymentResult result = paymentGateway.pay(orderId, amount); record.setStatus(result.isSuccess() ? PaymentStatus.SUCCESS.getCode() : PaymentStatus.FAILED.getCode()); record.setTransactionId(result.getTransactionId()); paymentRecordMapper.updateById(record); } catch (Exception e) { record.setStatus(PaymentStatus.FAILED.getCode()); record.setFailureReason(e.getMessage()); paymentRecordMapper.updateById(record); throw e; } } catch (DuplicateKeyException e) { log.info("Payment record already exists: {}" , paymentNo); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } }
6.2 案例2:消息消费(去重表) 6.2.1 场景 需求 :Kafka消息消费,保证消息处理的幂等性。
实现 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 @Component public class OrderEventConsumer { @Autowired private MessageDeduplicationMapper deduplicationMapper; @Autowired private OrderService orderService; @KafkaListener(topics = "order-created", groupId = "order-processor") public void handleOrderCreated (ConsumerRecord<String, String> record) { String messageId = getMessageId(record); if (messageId == null ) { messageId = generateMessageId(record); } MessageDeduplication existing = deduplicationMapper.selectByMessageId(messageId); if (existing != null ) { if (existing.getStatus() == 1 ) { log.info("Message already processed: {}" , messageId); return ; } else if (existing.getStatus() == 0 ) { if (existing.getCreateTime().plusMinutes(10 ).isAfter(LocalDateTime.now())) { log.warn("Message processing timeout: {}" , messageId); return ; } } } MessageDeduplication deduplication = new MessageDeduplication (); deduplication.setMessageId(messageId); deduplication.setStatus(0 ); deduplication.setCreateTime(LocalDateTime.now()); try { deduplicationMapper.insert(deduplication); } catch (DuplicateKeyException e) { log.info("Message already processed: {}" , messageId); return ; } try { Order order = JSON.parseObject(record.value(), Order.class); orderService.processOrder(order); deduplication.setStatus(1 ); deduplication.setUpdateTime(LocalDateTime.now()); deduplicationMapper.updateById(deduplication); } catch (Exception e) { deduplication.setStatus(2 ); deduplication.setUpdateTime(LocalDateTime.now()); deduplicationMapper.updateById(deduplication); throw e; } } }
6.3 案例3:定时任务(分布式锁 + 时间窗口) 6.3.1 场景 需求 :定时任务计算用户积分,保证幂等性。
实现 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Component public class PointsCalculationTask { @Autowired private RedissonClient redissonClient; @Autowired private UserPointsService userPointsService; @Scheduled(cron = "0 0 1 * * ?") public void calculatePoints () { LocalDate today = LocalDate.now(); String lockKey = "points:calculation:lock:" + today; RLock lock = redissonClient.getLock(lockKey); try { boolean locked = lock.tryLock(0 , 30 , TimeUnit.MINUTES); if (!locked) { log.info("Points calculation already running for date: {}" , today); return ; } if (userPointsService.isCalculated(today)) { log.info("Points already calculated for date: {}" , today); return ; } userPointsService.calculatePoints(today); userPointsService.markCalculated(today); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("Points calculation interrupted" , e); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } }
7. 总结 7.1 核心要点
幂等性定义 :同一个操作执行一次和执行多次的效果相同
实现方式 :唯一索引、分布式锁、Token机制、状态机、去重表、乐观锁
常见场景 :接口幂等、消息幂等、定时任务幂等
常见坑 :唯一索引的坑、分布式锁的坑、Token机制的坑、状态机的坑
注意事项 :幂等键的选择、幂等性的粒度、幂等性的时效性
7.2 关键理解
幂等性是分布式系统的基础 :必须保证关键操作的幂等性
选择合适的实现方式 :根据场景选择最合适的实现方式
注意常见坑 :避免常见的实现错误
测试幂等性 :充分测试幂等性的实现
7.3 最佳实践
接口幂等 :使用唯一索引 + 分布式锁 + 状态机
消息幂等 :使用去重表 + 状态检查
定时任务幂等 :使用分布式锁 + 时间窗口
幂等键选择 :选择唯一、有业务意义、稳定的键
异常处理 :正确处理异常,保证幂等性
相关文章 :