第476集幂等怎么做?有哪些常见坑?(深入实战)
|字数总计:6.2k|阅读时长:27分钟|阅读量:
幂等怎么做?有哪些常见坑?(深入实战)
1. 概述
1.1 幂等性的实战重要性
幂等性在真实项目中是必须考虑的核心问题,特别是在高并发、分布式、消息队列等场景下,幂等性设计直接影响系统的正确性和稳定性。
真实项目中的挑战:
- 高并发场景:大量并发请求可能导致重复操作
- 网络重试:网络不稳定导致自动重试
- 消息重复:消息队列可能重复投递
- 分布式环境:多节点环境下保证幂等性
- 性能影响:幂等性实现不能影响系统性能
1.2 本文重点
本文将从实战角度深入解析幂等性:
- 复杂场景处理:高并发、分布式、消息队列等场景
- 性能优化:如何在不影响性能的前提下实现幂等性
- 常见坑深度分析:真实项目中遇到的坑和解决方案
- 真实项目案例:从实际项目中总结的经验
- 最佳实践:经过验证的最佳实践方案
2. 幂等性实现方案深入
2.1 唯一索引方案(深入)
2.1.1 原理深入
唯一索引方案:利用数据库唯一索引保证幂等性。
适用场景:
实现要点:
- 唯一索引必须包含业务唯一标识
- 需要处理唯一索引冲突异常
- 需要考虑索引性能影响
2.1.2 高并发场景实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| @Service public class OrderService { @Autowired private OrderMapper orderMapper;
public OrderResult createOrder(OrderRequest request) { String orderNo = generateOrderNo(request); Order order = new Order(); order.setOrderNo(orderNo); order.setUserId(request.getUserId()); order.setAmount(request.getAmount()); order.setStatus(OrderStatus.PENDING); try { orderMapper.insert(order); return OrderResult.success(order); } catch (DuplicateKeyException e) { log.info("Order already exists: {}", orderNo); Order existingOrder = orderMapper.selectByOrderNo(orderNo); return OrderResult.success(existingOrder); } catch (Exception e) { log.error("Create order failed", e); throw new BusinessException("创建订单失败", e); } }
private String generateOrderNo(OrderRequest request) { return String.format("ORDER%d%d%04d", System.currentTimeMillis(), request.getUserId(), ThreadLocalRandom.current().nextInt(10000)); } }
|
2.1.3 性能优化
问题:唯一索引在高并发下可能成为瓶颈。
优化方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| @Service public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private RedisTemplate<String, String> redisTemplate;
public OrderResult createOrderOptimized(OrderRequest request) { String orderNo = generateOrderNo(request); String cacheKey = "order:exists:" + orderNo; Boolean exists = redisTemplate.hasKey(cacheKey); if (Boolean.TRUE.equals(exists)) { Order existingOrder = orderMapper.selectByOrderNo(orderNo); return OrderResult.success(existingOrder); } Order order = new Order(); order.setOrderNo(orderNo); order.setUserId(request.getUserId()); order.setAmount(request.getAmount()); order.setStatus(OrderStatus.PENDING); try { orderMapper.insert(order); redisTemplate.opsForValue().set(cacheKey, "1", 1, TimeUnit.HOURS); return OrderResult.success(order); } catch (DuplicateKeyException e) { log.info("Order already exists (concurrent): {}", orderNo); redisTemplate.opsForValue().set(cacheKey, "1", 1, TimeUnit.HOURS); Order existingOrder = orderMapper.selectByOrderNo(orderNo); return OrderResult.success(existingOrder); } catch (Exception e) { log.error("Create order failed", e); throw new BusinessException("创建订单失败", e); } } }
|
2.2 分布式锁方案(深入)
2.2.1 原理深入
分布式锁方案:使用分布式锁保证同一时间只有一个请求能执行操作。
适用场景:
- 更新操作(UPDATE)
- 需要保证原子性的场景
- 高并发场景
实现要点:
- 锁的粒度要合适(不能太粗,也不能太细)
- 需要设置合理的锁超时时间
- 需要处理锁释放失败的情况
2.2.2 高并发场景实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| @Service public class AccountService { @Autowired private RedissonClient redissonClient; @Autowired private AccountMapper accountMapper;
public TransferResult transfer(TransferRequest request) { String transferId = generateTransferId(request); TransferRecord existingRecord = transferRecordMapper.selectByTransferId(transferId); if (existingRecord != null) { return TransferResult.success(existingRecord); } String lockKey = "transfer:lock:" + request.getFromAccountId(); RLock lock = redissonClient.getLock(lockKey); try { boolean locked = lock.tryLock(100, 30, TimeUnit.MILLISECONDS); if (!locked) { throw new BusinessException("系统繁忙,请稍后再试"); } existingRecord = transferRecordMapper.selectByTransferId(transferId); if (existingRecord != null) { return TransferResult.success(existingRecord); } Account fromAccount = accountMapper.selectById(request.getFromAccountId()); Account toAccount = accountMapper.selectById(request.getToAccountId()); if (fromAccount.getBalance().compareTo(request.getAmount()) < 0) { throw new BusinessException("余额不足"); } fromAccount.setBalance(fromAccount.getBalance().subtract(request.getAmount())); toAccount.setBalance(toAccount.getBalance().add(request.getAmount())); accountMapper.updateById(fromAccount); accountMapper.updateById(toAccount); TransferRecord record = new TransferRecord(); record.setTransferId(transferId); record.setFromAccountId(request.getFromAccountId()); record.setToAccountId(request.getToAccountId()); record.setAmount(request.getAmount()); record.setStatus(TransferStatus.SUCCESS); transferRecordMapper.insert(record); return TransferResult.success(record); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BusinessException("获取锁失败", e); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } }
private String generateTransferId(TransferRequest request) { return String.format("TRANSFER%d%d%d", System.currentTimeMillis(), request.getFromAccountId(), request.getToAccountId(), request.getAmount().hashCode()); } }
|
2.2.3 性能优化
问题:分布式锁可能成为性能瓶颈。
优化方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| @Service public class AccountService {
public TransferResult transferOptimized(TransferRequest request) { String transferId = generateTransferId(request); TransferRecord existingRecord = transferRecordMapper.selectByTransferId(transferId); if (existingRecord != null) { return TransferResult.success(existingRecord); } int segment = (int) (request.getFromAccountId() % 100); String lockKey = "transfer:lock:segment:" + segment; RLock lock = redissonClient.getLock(lockKey); try { boolean locked = lock.tryLock(50, 20, TimeUnit.MILLISECONDS); if (!locked) { throw new BusinessException("系统繁忙,请稍后再试"); } existingRecord = transferRecordMapper.selectByTransferId(transferId); if (existingRecord != null) { return TransferResult.success(existingRecord); } return doTransfer(request, transferId); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BusinessException("获取锁失败", e); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } }
|
2.3 Token机制(深入)
2.3.1 原理深入
Token机制:客户端请求前先获取Token,请求时携带Token,服务端验证Token后删除。
适用场景:
实现要点:
- Token需要设置合理的过期时间
- Token需要保证唯一性
- Token验证后必须删除
2.3.2 完整实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| @Service public class TokenService { @Autowired private RedisTemplate<String, String> redisTemplate;
public String generateToken(String userId) { String token = UUID.randomUUID().toString(); String tokenKey = "token:" + userId + ":" + token; redisTemplate.opsForValue().set(tokenKey, "1", 5, TimeUnit.MINUTES); return token; }
public boolean verifyAndConsumeToken(String userId, String token) { String tokenKey = "token:" + userId + ":" + token; Boolean exists = redisTemplate.hasKey(tokenKey); if (!Boolean.TRUE.equals(exists)) { return false; } Boolean deleted = redisTemplate.delete(tokenKey); return Boolean.TRUE.equals(deleted); } }
@RestController @RequestMapping("/api/order") public class OrderController { @Autowired private TokenService tokenService; @Autowired private OrderService orderService;
@GetMapping("/token") public Result<String> getToken(@RequestParam String userId) { String token = tokenService.generateToken(userId); return Result.success(token); }
@PostMapping("/create") public Result<Order> createOrder(@RequestBody OrderRequest request) { if (!tokenService.verifyAndConsumeToken(request.getUserId(), request.getToken())) { return Result.error("Token无效或已使用,请重新获取"); } Order order = orderService.createOrder(request); return Result.success(order); } }
|
2.3.3 高并发优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Service public class TokenService { @Autowired private RedisTemplate<String, String> redisTemplate;
public boolean verifyAndConsumeTokenAtomic(String userId, String token) { String tokenKey = "token:" + userId + ":" + token; String luaScript = "if redis.call('exists', KEYS[1]) == 1 then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end"; DefaultRedisScript<Long> script = new DefaultRedisScript<>(); script.setScriptText(luaScript); script.setResultType(Long.class); Long result = redisTemplate.execute(script, Collections.singletonList(tokenKey)); return result != null && result > 0; } }
|
2.4 状态机方案(深入)
2.4.1 原理深入
状态机方案:通过状态机控制操作流程,只有特定状态才能执行特定操作。
适用场景:
实现要点:
- 状态转换必须合法
- 需要处理状态冲突
- 需要记录状态变更历史
2.4.2 完整实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| public enum OrderStatus { PENDING(0, "待支付"), PAID(1, "已支付"), SHIPPED(2, "已发货"), COMPLETED(3, "已完成"), CANCELLED(-1, "已取消"); private final int code; private final String desc; OrderStatus(int code, String desc) { this.code = code; this.desc = desc; }
public boolean canTransitionTo(OrderStatus target) { switch (this) { case PENDING: return target == PAID || target == CANCELLED; case PAID: return target == SHIPPED || target == CANCELLED; case SHIPPED: return target == COMPLETED; case COMPLETED: case CANCELLED: return false; default: return false; } } }
@Service public class OrderService { @Autowired private OrderMapper orderMapper;
@Transactional public PaymentResult payOrder(Long orderId, BigDecimal amount) { Order order = orderMapper.selectById(orderId); if (order == null) { throw new BusinessException("订单不存在"); } if (!order.getStatus().canTransitionTo(OrderStatus.PAID)) { if (order.getStatus() == OrderStatus.PAID) { return PaymentResult.success(order, "订单已支付"); } throw new BusinessException("订单状态不允许支付"); } if (order.getAmount().compareTo(amount) != 0) { throw new BusinessException("支付金额不匹配"); } int updated = orderMapper.updateStatusWithVersion( orderId, OrderStatus.PENDING, OrderStatus.PAID, order.getVersion() ); if (updated == 0) { order = orderMapper.selectById(orderId); if (order.getStatus() == OrderStatus.PAID) { return PaymentResult.success(order, "订单已支付"); } throw new BusinessException("订单状态已变更,请刷新后重试"); } PaymentRecord record = new PaymentRecord(); record.setOrderId(orderId); record.setAmount(amount); record.setStatus(PaymentStatus.SUCCESS); paymentRecordMapper.insert(record); return PaymentResult.success(order); } }
|
2.5 去重表方案(深入)
2.5.1 原理深入
去重表方案:创建专门的去重表,记录已处理的请求。
适用场景:
实现要点:
- 去重表需要唯一索引
- 需要定期清理历史数据
- 需要考虑表性能
2.5.2 消息队列场景实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| @Entity @Table(name = "message_dedup", indexes = { @Index(name = "uk_message_id", columnList = "messageId", unique = true) }) public class MessageDedup { @Id private Long id; @Column(nullable = false, unique = true) private String messageId; private String businessType; private String businessId; private Integer status; private LocalDateTime createTime; private LocalDateTime updateTime; }
@Service public class OrderMessageConsumer { @Autowired private MessageDedupMapper messageDedupMapper; @Autowired private OrderService orderService;
@KafkaListener(topics = "order-created", groupId = "order-processor") public void handleOrderCreated(ConsumerRecord<String, String> record) { String messageId = record.key(); String message = record.value(); MessageDedup dedup = messageDedupMapper.selectByMessageId(messageId); if (dedup != null) { log.info("Message already processed: {}", messageId); return; } try { dedup = new MessageDedup(); dedup.setMessageId(messageId); dedup.setBusinessType("ORDER_CREATED"); dedup.setStatus(0); dedup.setCreateTime(LocalDateTime.now()); messageDedupMapper.insert(dedup); } catch (DuplicateKeyException e) { log.info("Message already processed (concurrent): {}", messageId); return; } try { Order order = JSON.parseObject(message, Order.class); orderService.processOrder(order); dedup.setStatus(1); dedup.setUpdateTime(LocalDateTime.now()); messageDedupMapper.updateById(dedup); } catch (Exception e) { log.error("Process message failed: {}", messageId, e); dedup.setStatus(2); dedup.setUpdateTime(LocalDateTime.now()); messageDedupMapper.updateById(dedup); throw e; } } }
|
2.5.3 性能优化
问题:去重表在高并发下可能成为瓶颈。
优化方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| @Service public class OrderMessageConsumer { @Autowired private RedisTemplate<String, String> redisTemplate; @Autowired private MessageDedupMapper messageDedupMapper;
@KafkaListener(topics = "order-created", groupId = "order-processor") public void handleOrderCreatedOptimized(ConsumerRecord<String, String> record) { String messageId = record.key(); String message = record.value(); String cacheKey = "message:processed:" + messageId; Boolean exists = redisTemplate.hasKey(cacheKey); if (Boolean.TRUE.equals(exists)) { log.info("Message already processed (Redis): {}", messageId); return; } MessageDedup dedup = messageDedupMapper.selectByMessageId(messageId); if (dedup != null) { redisTemplate.opsForValue().set(cacheKey, "1", 1, TimeUnit.HOURS); return; } try { dedup = new MessageDedup(); dedup.setMessageId(messageId); dedup.setBusinessType("ORDER_CREATED"); dedup.setStatus(0); dedup.setCreateTime(LocalDateTime.now()); messageDedupMapper.insert(dedup); } catch (DuplicateKeyException e) { redisTemplate.opsForValue().set(cacheKey, "1", 1, TimeUnit.HOURS); return; } try { Order order = JSON.parseObject(message, Order.class); orderService.processOrder(order); dedup.setStatus(1); dedup.setUpdateTime(LocalDateTime.now()); messageDedupMapper.updateById(dedup); redisTemplate.opsForValue().set(cacheKey, "1", 1, TimeUnit.HOURS); } catch (Exception e) { log.error("Process message failed: {}", messageId, e); dedup.setStatus(2); dedup.setUpdateTime(LocalDateTime.now()); messageDedupMapper.updateById(dedup); throw e; } } }
|
3. 常见坑深度分析
3.1 坑1:唯一索引冲突处理不当
3.1.1 问题描述
问题:唯一索引冲突时,直接抛出异常,没有检查是否是重复请求。
错误示例:
1 2 3 4
| public void createOrder(Order order) { orderMapper.insert(order); }
|
3.1.2 正确做法
1 2 3 4 5 6 7 8 9 10 11
| public OrderResult createOrder(Order order) { try { orderMapper.insert(order); return OrderResult.success(order); } catch (DuplicateKeyException e) { Order existingOrder = orderMapper.selectByOrderNo(order.getOrderNo()); return OrderResult.success(existingOrder); } }
|
3.2 坑2:分布式锁释放失败
3.2.1 问题描述
问题:分布式锁释放失败,导致后续请求无法获取锁。
错误示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void transfer(TransferRequest request) { RLock lock = redissonClient.getLock("transfer:lock"); lock.lock(); try { doTransfer(request); } catch (Exception e) { throw e; } }
|
3.2.2 正确做法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void transfer(TransferRequest request) { RLock lock = redissonClient.getLock("transfer:lock"); try { lock.lock(); doTransfer(request); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } }
|
3.3 坑3:Token验证后未删除
3.3.1 问题描述
问题:Token验证后没有删除,导致Token可以重复使用。
错误示例:
1 2 3 4 5 6
| public boolean verifyToken(String token) { String tokenKey = "token:" + token; Boolean exists = redisTemplate.hasKey(tokenKey); return Boolean.TRUE.equals(exists); }
|
3.3.2 正确做法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public boolean verifyAndConsumeToken(String token) { String tokenKey = "token:" + token; String luaScript = "if redis.call('exists', KEYS[1]) == 1 then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end"; DefaultRedisScript<Long> script = new DefaultRedisScript<>(); script.setScriptText(luaScript); script.setResultType(Long.class); Long result = redisTemplate.execute(script, Collections.singletonList(tokenKey)); return result != null && result > 0; }
|
3.4 坑4:状态机检查不完整
3.4.1 问题描述
问题:只检查当前状态,没有检查状态转换是否合法。
错误示例:
1 2 3 4 5 6 7 8 9 10
| public void payOrder(Long orderId) { Order order = orderMapper.selectById(orderId); if (order.getStatus() != OrderStatus.PENDING) { throw new BusinessException("订单状态不正确"); } order.setStatus(OrderStatus.PAID); orderMapper.updateById(order); }
|
3.4.2 正确做法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public void payOrder(Long orderId) { Order order = orderMapper.selectById(orderId); if (!order.getStatus().canTransitionTo(OrderStatus.PAID)) { if (order.getStatus() == OrderStatus.PAID) { return; } throw new BusinessException("订单状态不允许支付"); } int updated = orderMapper.updateStatusWithVersion( orderId, order.getStatus(), OrderStatus.PAID, order.getVersion() ); if (updated == 0) { order = orderMapper.selectById(orderId); if (order.getStatus() == OrderStatus.PAID) { return; } throw new BusinessException("订单状态已变更"); } }
|
3.5 坑5:去重表数据未清理
3.5.1 问题描述
问题:去重表数据不断积累,导致表越来越大,影响性能。
错误示例:
3.5.2 正确做法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Component public class MessageDedupCleaner { @Autowired private MessageDedupMapper messageDedupMapper;
@Scheduled(cron = "0 0 2 * * ?") public void cleanHistoryData() { LocalDateTime cutoffTime = LocalDateTime.now().minusDays(7); int deleted = messageDedupMapper.deleteByCreateTimeBefore(cutoffTime); log.info("Cleaned {} message dedup records", deleted); } }
|
3.6 坑6:并发场景下的双重检查失效
3.6.1 问题描述
问题:在高并发场景下,双重检查可能失效。
错误示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void processOrder(Order order) { if (orderProcessed(order.getId())) { return; } processOrderInternal(order); if (orderProcessed(order.getId())) { return; } }
|
3.6.2 正确做法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public void processOrder(Order order) { if (orderProcessed(order.getId())) { return; } String lockKey = "order:process:lock:" + order.getId(); RLock lock = redissonClient.getLock(lockKey); try { if (lock.tryLock(100, 30, TimeUnit.MILLISECONDS)) { if (orderProcessed(order.getId())) { return; } processOrderInternal(order); } } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } }
|
4. 真实项目案例
4.1 案例1:高并发支付系统
4.1.1 场景
需求:支付系统,需要保证支付幂等性,支持高并发。
挑战:
- 高并发场景(QPS 10万+)
- 网络重试
- 消息重复
解决方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| @Service public class PaymentService { @Autowired private PaymentRecordMapper paymentRecordMapper; @Autowired private RedisTemplate<String, String> redisTemplate; @Autowired private RedissonClient redissonClient;
@Transactional public PaymentResult pay(PaymentRequest request) { String paymentId = generatePaymentId(request); String cacheKey = "payment:processed:" + paymentId; Boolean exists = redisTemplate.hasKey(cacheKey); if (Boolean.TRUE.equals(exists)) { PaymentRecord record = paymentRecordMapper.selectByPaymentId(paymentId); return PaymentResult.success(record); } PaymentRecord existingRecord = paymentRecordMapper.selectByPaymentId(paymentId); if (existingRecord != null) { redisTemplate.opsForValue().set(cacheKey, "1", 1, TimeUnit.HOURS); return PaymentResult.success(existingRecord); } String lockKey = "payment:lock:" + request.getOrderId(); RLock lock = redissonClient.getLock(lockKey); try { if (!lock.tryLock(100, 30, TimeUnit.MILLISECONDS)) { throw new BusinessException("系统繁忙,请稍后再试"); } existingRecord = paymentRecordMapper.selectByPaymentId(paymentId); if (existingRecord != null) { redisTemplate.opsForValue().set(cacheKey, "1", 1, TimeUnit.HOURS); return PaymentResult.success(existingRecord); } Order order = orderMapper.selectById(request.getOrderId()); if (order.getStatus() != OrderStatus.PENDING) { if (order.getStatus() == OrderStatus.PAID) { return PaymentResult.success("订单已支付"); } throw new BusinessException("订单状态不允许支付"); } PaymentRecord record = doPayment(request, paymentId); int updated = orderMapper.updateStatusWithVersion( request.getOrderId(), OrderStatus.PENDING, OrderStatus.PAID, order.getVersion() ); if (updated == 0) { order = orderMapper.selectById(request.getOrderId()); if (order.getStatus() == OrderStatus.PAID) { return PaymentResult.success("订单已支付"); } throw new BusinessException("订单状态已变更"); } redisTemplate.opsForValue().set(cacheKey, "1", 1, TimeUnit.HOURS); return PaymentResult.success(record); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BusinessException("获取锁失败", e); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } }
|
4.2 案例2:消息队列幂等消费
4.2.1 场景
需求:Kafka消息消费,需要保证幂等性。
挑战:
解决方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| @Component public class OrderMessageConsumer { @Autowired private RedisTemplate<String, String> redisTemplate; @Autowired private MessageDedupMapper messageDedupMapper;
@KafkaListener(topics = "order-created", groupId = "order-processor") public void handleOrderCreated(ConsumerRecord<String, String> record) { String messageId = record.key(); String message = record.value(); String cacheKey = "message:processed:" + messageId; Boolean exists = redisTemplate.hasKey(cacheKey); if (Boolean.TRUE.equals(exists)) { return; } MessageDedup dedup = messageDedupMapper.selectByMessageId(messageId); if (dedup != null) { redisTemplate.opsForValue().set(cacheKey, "1", 1, TimeUnit.HOURS); return; } try { dedup = new MessageDedup(); dedup.setMessageId(messageId); dedup.setBusinessType("ORDER_CREATED"); dedup.setStatus(0); dedup.setCreateTime(LocalDateTime.now()); messageDedupMapper.insert(dedup); } catch (DuplicateKeyException e) { redisTemplate.opsForValue().set(cacheKey, "1", 1, TimeUnit.HOURS); return; } try { Order order = JSON.parseObject(message, Order.class); orderService.processOrder(order); dedup.setStatus(1); dedup.setUpdateTime(LocalDateTime.now()); messageDedupMapper.updateById(dedup); redisTemplate.opsForValue().set(cacheKey, "1", 1, TimeUnit.HOURS); } catch (Exception e) { log.error("Process message failed: {}", messageId, e); dedup.setStatus(2); dedup.setUpdateTime(LocalDateTime.now()); messageDedupMapper.updateById(dedup); throw e; } } }
|
5. 性能优化最佳实践
5.1 多级缓存策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Service public class IdempotentService { @Autowired private RedisTemplate<String, String> redisTemplate; @Autowired private CaffeineCache localCache;
public boolean isProcessed(String businessId) { Boolean localResult = localCache.getIfPresent(businessId); if (Boolean.TRUE.equals(localResult)) { return true; } String cacheKey = "processed:" + businessId; Boolean redisResult = redisTemplate.hasKey(cacheKey); if (Boolean.TRUE.equals(redisResult)) { localCache.put(businessId, true); return true; } boolean dbResult = checkInDatabase(businessId); if (dbResult) { redisTemplate.opsForValue().set(cacheKey, "1", 1, TimeUnit.HOURS); localCache.put(businessId, true); } return dbResult; } }
|
5.2 批量处理优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Service public class BatchIdempotentService {
public Map<String, Boolean> batchCheckProcessed(List<String> businessIds) { List<String> cacheKeys = businessIds.stream() .map(id -> "processed:" + id) .collect(Collectors.toList()); List<Boolean> redisResults = redisTemplate.hasKeys(cacheKeys); List<String> needDbCheck = new ArrayList<>(); Map<String, Boolean> result = new HashMap<>(); for (int i = 0; i < businessIds.size(); i++) { if (Boolean.TRUE.equals(redisResults.get(i))) { result.put(businessIds.get(i), true); } else { needDbCheck.add(businessIds.get(i)); } } if (!needDbCheck.isEmpty()) { Map<String, Boolean> dbResults = batchCheckInDatabase(needDbCheck); result.putAll(dbResults); for (String id : needDbCheck) { if (Boolean.TRUE.equals(dbResults.get(id))) { redisTemplate.opsForValue().set("processed:" + id, "1", 1, TimeUnit.HOURS); } } } return result; } }
|
6. 总结
6.1 核心要点
- 幂等性实现方案:唯一索引、分布式锁、Token机制、状态机、去重表
- 高并发优化:Redis预检查、分段锁、批量处理、多级缓存
- 常见坑:唯一索引冲突处理、锁释放失败、Token未删除、状态机检查不完整、去重表未清理、双重检查失效
- 性能优化:多级缓存、批量处理、快速路径优化
- 最佳实践:根据场景选择合适的方案,组合使用多种方案
6.2 关键理解
- 幂等性不是性能的敌人:通过合理的优化,可以实现高性能的幂等性
- 组合使用:多种方案组合使用,取长补短
- 快速路径优化:大部分重复请求在快速路径被拦截
- 双重检查:加锁后的双重检查是必须的
6.3 最佳实践
- 高并发场景:Redis预检查 + 唯一索引 + 分布式锁
- 消息队列场景:去重表 + Redis缓存
- 支付场景:Token机制 + 状态机 + 分布式锁
- 性能优化:多级缓存、批量处理、快速路径
- 容错处理:所有方案都要考虑异常情况和并发场景
相关文章: