第490集如何保证消息不丢?
|字数总计:4.7k|阅读时长:20分钟|阅读量:
如何保证消息不丢?
1. 概述
1.1 消息丢失的严重性
消息丢失是消息队列使用中最严重的问题之一,可能导致业务数据不一致、订单丢失、用户操作失败等严重后果。
本文内容:
- 消息丢失场景:消息在哪些环节可能丢失
- 生产者保证:如何保证消息成功发送到MQ
- MQ保证:如何保证MQ不丢失消息
- 消费者保证:如何保证消息成功消费
- 最佳实践:消息不丢失的最佳实践
1.2 本文内容结构
本文将从以下几个方面深入探讨如何保证消息不丢:
- 消息丢失场景:消息可能丢失的三个环节
- 生产者保证:生产者如何保证消息不丢
- MQ保证:MQ如何保证消息不丢
- 消费者保证:消费者如何保证消息不丢
- 不同MQ的保证机制:RabbitMQ、Kafka、RocketMQ的可靠性机制
- 最佳实践:消息不丢失的最佳实践
2. 消息丢失的三个环节
2.1 消息丢失的完整链路
消息流转链路:
1 2 3
| 生产者 → MQ Broker → 消费者 ↓ ↓ ↓ 可能丢失 可能丢失 可能丢失
|
三个可能丢失的环节:
- 生产者到MQ:生产者发送消息到MQ时可能丢失
- MQ存储:MQ存储消息时可能丢失
- MQ到消费者:消费者消费消息时可能丢失
2.2 环节1:生产者到MQ
2.2.1 丢失场景
场景1:网络故障
场景2:MQ服务故障
场景3:发送失败未处理
示例代码(会丢失消息):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) { orderMapper.insert(order); OrderCreatedEvent event = new OrderCreatedEvent(); event.setOrderId(order.getId()); rabbitTemplate.convertAndSend("order.exchange", "order.created", event); } }
|
2.3 环节2:MQ存储
2.3.1 丢失场景
场景1:消息未持久化
场景2:磁盘故障
场景3:MQ集群故障
示例配置(会丢失消息):
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Configuration public class RabbitMQConfig { @Bean public Queue orderQueue() { return new Queue("order.queue", false); } }
|
2.4 环节3:MQ到消费者
2.4.1 丢失场景
场景1:消费者处理失败
场景2:消费者确认后处理失败
场景3:消费者宕机
示例代码(会丢失消息):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Component public class OrderConsumer { @RabbitListener(queues = "order.queue") public void handleOrderCreated(OrderCreatedEvent event) { try { orderService.processOrder(event); } catch (Exception e) { log.error("Process order failed", e); } } }
|
3. 生产者保证消息不丢
3.1 确认机制
3.1.1 发送确认
确认机制:生产者发送消息后,等待MQ确认消息已接收。
RabbitMQ实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Configuration public class RabbitMQConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("Message sent successfully: {}", correlationData); } else { log.error("Message send failed: {}, cause: {}", correlationData, cause); handleSendFailure(correlationData); } }); return template; } private void handleSendFailure(CorrelationData correlationData) { } }
|
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) { orderMapper.insert(order); OrderCreatedEvent event = new OrderCreatedEvent(); event.setOrderId(order.getId()); CorrelationData correlationData = new CorrelationData(); correlationData.setId(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend( "order.exchange", "order.created", event, correlationData ); } }
|
3.2 事务消息
3.2.1 事务保证
事务消息:保证消息发送和业务操作在同一事务中。
RabbitMQ事务消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate;
@Transactional public void createOrder(Order order) { rabbitTemplate.setChannelTransacted(true); try { orderMapper.insert(order); OrderCreatedEvent event = new OrderCreatedEvent(); event.setOrderId(order.getId()); rabbitTemplate.convertAndSend("order.exchange", "order.created", event); } catch (Exception e) { throw e; } } }
|
RocketMQ事务消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Service public class OrderService { @Autowired private RocketMQTemplate rocketMQTemplate;
public void createOrder(Order order) { TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( "order-topic", MessageBuilder.withPayload(order).build(), order ); if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) { log.info("Transaction message sent successfully"); } }
@RocketMQTransactionListener public class OrderTransactionListener implements RocketMQLocalTransactionListener { @Override @Transactional public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { Order order = (Order) arg; orderMapper.insert(order); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String orderId = msg.getHeaders().get("orderId").toString(); Order order = orderMapper.selectById(Long.parseLong(orderId)); return order != null ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK; } } }
|
3.3 消息重试
3.3.1 重试机制
重试机制:发送失败时自动重试,保证消息最终发送成功。
实现示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| @Service public class MessageSender { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RedisTemplate<String, String> redisTemplate; private static final int MAX_RETRY = 3; private static final long RETRY_INTERVAL = 1000;
public void sendWithRetry(String exchange, String routingKey, Object message) { int retryCount = 0; boolean success = false; while (retryCount < MAX_RETRY && !success) { try { rabbitTemplate.convertAndSend(exchange, routingKey, message); success = true; } catch (Exception e) { retryCount++; log.warn("Send message failed, retry {}/{}", retryCount, MAX_RETRY, e); if (retryCount < MAX_RETRY) { try { Thread.sleep(RETRY_INTERVAL * retryCount); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); break; } } else { saveFailedMessage(exchange, routingKey, message); throw new RuntimeException("Send message failed after retries", e); } } } }
private void saveFailedMessage(String exchange, String routingKey, Object message) { String key = "failed:message:" + UUID.randomUUID().toString(); FailedMessage failedMessage = new FailedMessage(); failedMessage.setExchange(exchange); failedMessage.setRoutingKey(routingKey); failedMessage.setMessage(message); failedMessage.setCreateTime(System.currentTimeMillis()); redisTemplate.opsForValue().set(key, JSON.toJSONString(failedMessage), 24, TimeUnit.HOURS); } }
|
4. MQ保证消息不丢
4.1 消息持久化
4.1.1 持久化配置
持久化:将消息持久化到磁盘,MQ重启后消息不丢失。
RabbitMQ持久化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @Configuration public class RabbitMQConfig {
@Bean public Queue orderQueue() { return QueueBuilder.durable("order.queue").build(); }
@Bean public Exchange orderExchange() { return ExchangeBuilder.topicExchange("order.exchange") .durable(true) .build(); }
@Bean public Binding orderBinding() { return BindingBuilder .bind(orderQueue()) .to(orderExchange()) .with("order.created") .noargs(); } }
|
发送持久化消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) { OrderCreatedEvent event = new OrderCreatedEvent(); event.setOrderId(order.getId()); Message message = MessageBuilder .withBody(JSON.toJSONBytes(event)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); rabbitTemplate.send("order.exchange", "order.created", message); } }
|
4.2 集群高可用
4.2.1 集群配置
集群高可用:MQ集群模式,单节点故障不影响消息。
RabbitMQ集群:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| version: '3.8' services: rabbitmq1: image: rabbitmq:3.9-management hostname: rabbitmq1 environment: RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG" RABBITMQ_DEFAULT_USER: admin RABBITMQ_DEFAULT_PASS: admin123 ports: - "5672:5672" - "15672:15672" volumes: - rabbitmq1_data:/var/lib/rabbitmq rabbitmq2: image: rabbitmq:3.9-management hostname: rabbitmq2 environment: RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG" RABBITMQ_DEFAULT_USER: admin RABBITMQ_DEFAULT_PASS: admin123 ports: - "5673:5672" - "15673:15672" volumes: - rabbitmq2_data:/var/lib/rabbitmq depends_on: - rabbitmq1
volumes: rabbitmq1_data: rabbitmq2_data:
|
镜像队列(高可用):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Configuration public class RabbitMQConfig {
@Bean public Queue orderQueue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-ha-policy", "all"); return QueueBuilder.durable("order.queue") .withArguments(arguments) .build(); } }
|
4.3 消息复制
4.3.1 多副本机制
消息复制:消息在多个节点复制,单节点故障不影响消息。
Kafka多副本:
1 2 3 4 5 6 7 8 9
|
default.replication.factor=3
min.insync.replicas=2
acks=all
|
Kafka生产者配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Configuration public class KafkaConfig { @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); configProps.put(ProducerConfig.ACKS_CONFIG, "all"); configProps.put(ProducerConfig.RETRIES_CONFIG, 3); configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); return new DefaultKafkaProducerFactory<>(configProps); } }
|
5. 消费者保证消息不丢
5.1 手动确认
5.1.1 确认机制
手动确认:消费者处理完消息后手动确认,处理失败不确认,消息会重试。
RabbitMQ手动确认:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Configuration public class RabbitMQConfig { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setPrefetchCount(10); return factory; } }
|
消费者手动确认:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Component public class OrderConsumer { @RabbitListener(queues = "order.queue") public void handleOrderCreated( OrderCreatedEvent event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { orderService.processOrder(event); channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("Process order failed", e); try { channel.basicNack(deliveryTag, false, true); } catch (IOException ioException) { log.error("Nack failed", ioException); } } } }
|
5.2 幂等性处理
5.2.1 幂等保证
幂等性:同一消息处理多次,结果一致。
实现方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| @Component public class OrderConsumer { @Autowired private RedisTemplate<String, String> redisTemplate; @RabbitListener(queues = "order.queue") public void handleOrderCreated( OrderCreatedEvent event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { String messageId = event.getEventId(); String key = "order:processed:" + messageId; Boolean processed = redisTemplate.opsForValue().setIfAbsent( key, "1", 24, TimeUnit.HOURS ); if (Boolean.FALSE.equals(processed)) { log.info("Message already processed: {}", messageId); try { channel.basicAck(deliveryTag, false); } catch (IOException e) { log.error("Ack failed", e); } return; } try { orderService.processOrder(event); channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("Process order failed", e); redisTemplate.delete(key); try { channel.basicNack(deliveryTag, false, true); } catch (IOException ioException) { log.error("Nack failed", ioException); } } } }
|
5.3 死信队列
5.3.1 死信处理
死信队列:处理失败超过重试次数的消息,进入死信队列,人工处理。
配置死信队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| @Configuration public class RabbitMQConfig {
@Bean public Exchange dlxExchange() { return ExchangeBuilder.topicExchange("dlx.exchange") .durable(true) .build(); }
@Bean public Queue dlxQueue() { return QueueBuilder.durable("dlx.queue").build(); }
@Bean public Binding dlxBinding() { return BindingBuilder .bind(dlxQueue()) .to(dlxExchange()) .with("order.#") .noargs(); }
@Bean public Queue orderQueue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "dlx.exchange"); arguments.put("x-dead-letter-routing-key", "order.failed"); arguments.put("x-message-ttl", 60000); return QueueBuilder.durable("order.queue") .withArguments(arguments) .build(); } }
|
死信消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Component public class DeadLetterConsumer { @RabbitListener(queues = "dlx.queue") public void handleDeadLetter(OrderCreatedEvent event) { log.error("Dead letter message: {}", event); saveDeadLetterMessage(event); alertService.sendAlert("Dead letter message: " + event.getEventId()); } }
|
6. 不同MQ的可靠性机制
6.1 RabbitMQ可靠性
6.1.1 可靠性配置
RabbitMQ可靠性保证:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| @Configuration public class RabbitMQReliabilityConfig {
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setConfirmCallback((correlationData, ack, cause) -> { if (!ack) { log.error("Message send failed: {}", cause); } }); template.setMandatory(true); template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.error("Message return: {}", replyText); }); return template; }
@Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setPrefetchCount(10); factory.setConcurrentConsumers(5); factory.setMaxConcurrentConsumers(10); return factory; }
@Bean public Queue orderQueue() { return QueueBuilder.durable("order.queue").build(); }
@Bean public Exchange orderExchange() { return ExchangeBuilder.topicExchange("order.exchange") .durable(true) .build(); } }
|
6.2 Kafka可靠性
6.2.1 可靠性配置
Kafka可靠性保证:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Configuration public class KafkaReliabilityConfig {
@Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); configProps.put(ProducerConfig.ACKS_CONFIG, "all"); configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); return new DefaultKafkaProducerFactory<>(configProps); }
@Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group"); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); return new DefaultKafkaConsumerFactory<>(configProps); } }
|
Kafka消费者手动提交:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Component public class OrderConsumer { @KafkaListener(topics = "order-topic", groupId = "order-group") public void handleOrderCreated(OrderCreatedEvent event, Acknowledgment acknowledgment) { try { orderService.processOrder(event); acknowledgment.acknowledge(); } catch (Exception e) { log.error("Process order failed", e); } } }
|
6.3 RocketMQ可靠性
6.3.1 可靠性配置
RocketMQ可靠性保证:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Configuration public class RocketMQReliabilityConfig {
@Bean public RocketMQTemplate rocketMQTemplate() { RocketMQTemplate template = new RocketMQTemplate(); template.setProducer(new DefaultMQProducer("order-producer-group")); template.getProducer().setRetryTimesWhenSendFailed(3); template.getProducer().setSendMsgTimeout(3000); return template; } }
|
RocketMQ事务消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Service public class OrderService { @Autowired private RocketMQTemplate rocketMQTemplate;
public void createOrder(Order order) { TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( "order-topic", MessageBuilder.withPayload(order).build(), order ); if (result.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) { throw new RuntimeException("Transaction message send failed"); } } }
|
RocketMQ消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Component @RocketMQMessageListener( topic = "order-topic", consumerGroup = "order-consumer-group", consumeMode = ConsumeMode.ORDERLY, // 顺序消费 messageModel = MessageModel.CLUSTERING // 集群模式 ) public class OrderConsumer implements RocketMQListener<OrderCreatedEvent> { @Override public void onMessage(OrderCreatedEvent event) { try { orderService.processOrder(event); } catch (Exception e) { log.error("Process order failed", e); throw e; } } }
|
7. 最佳实践
7.1 完整方案
7.1.1 三端保证
完整方案:生产者、MQ、消费者三端都要保证消息不丢。
生产者保证:
- ✅ 开启发送确认
- ✅ 使用事务消息
- ✅ 失败重试机制
- ✅ 失败消息补偿
MQ保证:
- ✅ 消息持久化
- ✅ 队列持久化
- ✅ 交换机持久化
- ✅ 集群高可用
- ✅ 消息复制
消费者保证:
- ✅ 手动确认
- ✅ 幂等性处理
- ✅ 死信队列
- ✅ 失败重试
7.2 监控告警
7.2.1 监控指标
监控指标:
- 消息堆积:队列中消息数量
- 消费速度:每秒消费消息数
- 错误率:消息处理错误率
- 延迟:消息处理延迟
- 死信数量:死信队列消息数量
监控实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Component public class MQMonitor { @Autowired private RabbitTemplate rabbitTemplate; @Scheduled(fixedRate = 60000) public void monitorQueue() { QueueInformation queueInfo = rabbitTemplate.execute(channel -> { return channel.queueDeclarePassive("order.queue"); }); long messageCount = queueInfo.getMessageCount(); long consumerCount = queueInfo.getConsumerCount(); if (messageCount > 10000) { alertService.sendAlert("队列消息堆积: " + messageCount); } if (consumerCount == 0) { alertService.sendAlert("队列无消费者: order.queue"); } } }
|
7.3 测试验证
7.3.1 测试场景
测试场景:
- 生产者发送失败:模拟网络故障,验证重试机制
- MQ重启:重启MQ,验证消息不丢失
- 消费者处理失败:模拟处理异常,验证重试机制
- 消费者宕机:模拟消费者宕机,验证消息不丢失
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @SpringBootTest public class MQReliabilityTest { @Autowired private OrderService orderService; @Autowired private RabbitTemplate rabbitTemplate; @Test public void testMessageNotLost() { Order order = new Order(); order.setProductId(1L); order.setQuantity(1); orderService.createOrder(order); } }
|
8. 总结
8.1 核心要点
- 三端保证:生产者、MQ、消费者三端都要保证消息不丢
- 持久化:消息、队列、交换机都要持久化
- 确认机制:生产者发送确认,消费者手动确认
- 重试机制:失败重试,保证最终成功
- 幂等性:消费者处理要保证幂等性
8.2 关键理解
- 没有绝对不丢:只能做到99.99%不丢,不能100%保证
- 性能与可靠性平衡:可靠性越高,性能可能越低
- 监控告警重要:完善的监控是保证可靠性的基础
- 测试验证必要:通过测试验证可靠性机制
8.3 最佳实践
- 生产者:开启确认、事务消息、失败重试
- MQ:持久化、集群、消息复制
- 消费者:手动确认、幂等处理、死信队列
- 监控:实时监控、及时告警
- 测试:定期测试、验证可靠性
相关文章: