如何保证消息不丢?

1. 概述

1.1 消息丢失的严重性

消息丢失是消息队列使用中最严重的问题之一,可能导致业务数据不一致、订单丢失、用户操作失败等严重后果。

本文内容

  • 消息丢失场景:消息在哪些环节可能丢失
  • 生产者保证:如何保证消息成功发送到MQ
  • MQ保证:如何保证MQ不丢失消息
  • 消费者保证:如何保证消息成功消费
  • 最佳实践:消息不丢失的最佳实践

1.2 本文内容结构

本文将从以下几个方面深入探讨如何保证消息不丢:

  1. 消息丢失场景:消息可能丢失的三个环节
  2. 生产者保证:生产者如何保证消息不丢
  3. MQ保证:MQ如何保证消息不丢
  4. 消费者保证:消费者如何保证消息不丢
  5. 不同MQ的保证机制:RabbitMQ、Kafka、RocketMQ的可靠性机制
  6. 最佳实践:消息不丢失的最佳实践

2. 消息丢失的三个环节

2.1 消息丢失的完整链路

消息流转链路

1
2
3
生产者 → MQ Broker → 消费者
↓ ↓ ↓
可能丢失 可能丢失 可能丢失

三个可能丢失的环节

  1. 生产者到MQ:生产者发送消息到MQ时可能丢失
  2. MQ存储:MQ存储消息时可能丢失
  3. MQ到消费者:消费者消费消息时可能丢失

2.2 环节1:生产者到MQ

2.2.1 丢失场景

场景1:网络故障

  • 生产者发送消息时网络中断
  • 消息未到达MQ就丢失

场景2:MQ服务故障

  • MQ服务崩溃,消息未持久化
  • 消息在内存中丢失

场景3:发送失败未处理

  • 生产者发送失败,但未重试
  • 消息直接丢失

示例代码(会丢失消息)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Service
public class OrderService {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送消息(可能丢失)
*/
public void createOrder(Order order) {
// 1. 保存订单
orderMapper.insert(order);

// 2. 发送消息(没有确认机制)
OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderId(order.getId());

// 问题:如果发送失败,消息丢失
rabbitTemplate.convertAndSend("order.exchange", "order.created", event);

// 问题:
// 1. 没有确认消息是否成功发送
// 2. 发送失败没有重试
// 3. 消息可能丢失
}
}

2.3 环节2:MQ存储

2.3.1 丢失场景

场景1:消息未持久化

  • MQ将消息存在内存中
  • MQ重启,消息丢失

场景2:磁盘故障

  • 消息已持久化到磁盘
  • 磁盘故障,消息丢失

场景3:MQ集群故障

  • 单机MQ故障
  • 消息未同步到其他节点

示例配置(会丢失消息)

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class RabbitMQConfig {

@Bean
public Queue orderQueue() {
// 问题:队列不持久化
return new Queue("order.queue", false); // durable = false

// 问题:
// 1. 队列不持久化,MQ重启后队列消失
// 2. 消息不持久化,MQ重启后消息丢失
}
}

2.4 环节3:MQ到消费者

2.4.1 丢失场景

场景1:消费者处理失败

  • 消费者处理消息时异常
  • 消息被丢弃,未重试

场景2:消费者确认后处理失败

  • 消费者确认消息后,处理失败
  • 消息已确认,无法重试

场景3:消费者宕机

  • 消费者处理消息时宕机
  • 消息未确认,但可能丢失

示例代码(会丢失消息)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class OrderConsumer {

@RabbitListener(queues = "order.queue")
public void handleOrderCreated(OrderCreatedEvent event) {
try {
// 处理消息
orderService.processOrder(event);

// 问题:自动确认模式
// 如果处理失败,消息已确认,无法重试

} catch (Exception e) {
// 问题:异常被捕获,消息已确认
// 消息丢失,无法重试
log.error("Process order failed", e);
}
}
}

3. 生产者保证消息不丢

3.1 确认机制

3.1.1 发送确认

确认机制:生产者发送消息后,等待MQ确认消息已接收。

RabbitMQ实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Configuration
public class RabbitMQConfig {

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);

// 1. 开启发送确认
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("Message sent successfully: {}", correlationData);
} else {
log.error("Message send failed: {}, cause: {}", correlationData, cause);
// 发送失败,需要重试
handleSendFailure(correlationData);
}
});

return template;
}

private void handleSendFailure(CorrelationData correlationData) {
// 重试逻辑
// 1. 记录失败消息
// 2. 定时重试
// 3. 超过最大重试次数,告警
}
}

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Service
public class OrderService {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送消息(保证不丢)
*/
public void createOrder(Order order) {
// 1. 保存订单
orderMapper.insert(order);

// 2. 发送消息(带确认)
OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderId(order.getId());

CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());

// 发送消息,等待确认
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
event,
correlationData
);

// 注意:这里是异步确认,实际项目中需要同步等待确认
// 或者使用事务消息
}
}

3.2 事务消息

3.2.1 事务保证

事务消息:保证消息发送和业务操作在同一事务中。

RabbitMQ事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Service
public class OrderService {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送消息(事务保证)
*/
@Transactional
public void createOrder(Order order) {
// 1. 开启事务
rabbitTemplate.setChannelTransacted(true);

try {
// 2. 保存订单
orderMapper.insert(order);

// 3. 发送消息
OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderId(order.getId());
rabbitTemplate.convertAndSend("order.exchange", "order.created", event);

// 4. 提交事务(订单和消息同时提交)
// 如果任何一步失败,都会回滚

} catch (Exception e) {
// 事务回滚,订单和消息都不会提交
throw e;
}
}
}

RocketMQ事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Service
public class OrderService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
* 发送事务消息(RocketMQ)
*/
public void createOrder(Order order) {
// 1. 发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-topic",
MessageBuilder.withPayload(order).build(),
order // 本地事务参数
);

if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
log.info("Transaction message sent successfully");
}
}

/**
* 执行本地事务
*/
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Order order = (Order) arg;
// 执行本地事务:保存订单
orderMapper.insert(order);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务状态
String orderId = msg.getHeaders().get("orderId").toString();
Order order = orderMapper.selectById(Long.parseLong(orderId));
return order != null ?
RocketMQLocalTransactionState.COMMIT :
RocketMQLocalTransactionState.ROLLBACK;
}
}
}

3.3 消息重试

3.3.1 重试机制

重试机制:发送失败时自动重试,保证消息最终发送成功。

实现示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Service
public class MessageSender {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private RedisTemplate<String, String> redisTemplate;

private static final int MAX_RETRY = 3;
private static final long RETRY_INTERVAL = 1000; // 1秒

/**
* 发送消息(带重试)
*/
public void sendWithRetry(String exchange, String routingKey, Object message) {
int retryCount = 0;
boolean success = false;

while (retryCount < MAX_RETRY && !success) {
try {
// 发送消息
rabbitTemplate.convertAndSend(exchange, routingKey, message);

// 等待确认(简化处理,实际应该异步确认)
success = true;

} catch (Exception e) {
retryCount++;
log.warn("Send message failed, retry {}/{}", retryCount, MAX_RETRY, e);

if (retryCount < MAX_RETRY) {
// 等待后重试
try {
Thread.sleep(RETRY_INTERVAL * retryCount); // 指数退避
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
} else {
// 超过最大重试次数,记录到数据库或Redis,后续补偿
saveFailedMessage(exchange, routingKey, message);
throw new RuntimeException("Send message failed after retries", e);
}
}
}
}

/**
* 保存失败消息,后续补偿
*/
private void saveFailedMessage(String exchange, String routingKey, Object message) {
String key = "failed:message:" + UUID.randomUUID().toString();
FailedMessage failedMessage = new FailedMessage();
failedMessage.setExchange(exchange);
failedMessage.setRoutingKey(routingKey);
failedMessage.setMessage(message);
failedMessage.setCreateTime(System.currentTimeMillis());

redisTemplate.opsForValue().set(key, JSON.toJSONString(failedMessage), 24, TimeUnit.HOURS);

// 或者保存到数据库,定时任务补偿
}
}

4. MQ保证消息不丢

4.1 消息持久化

4.1.1 持久化配置

持久化:将消息持久化到磁盘,MQ重启后消息不丢失。

RabbitMQ持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Configuration
public class RabbitMQConfig {

/**
* 持久化队列
*/
@Bean
public Queue orderQueue() {
// durable = true:队列持久化
return QueueBuilder.durable("order.queue").build();
}

/**
* 持久化交换机
*/
@Bean
public Exchange orderExchange() {
// durable = true:交换机持久化
return ExchangeBuilder.topicExchange("order.exchange")
.durable(true)
.build();
}

/**
* 绑定关系
*/
@Bean
public Binding orderBinding() {
return BindingBuilder
.bind(orderQueue())
.to(orderExchange())
.with("order.created")
.noargs();
}
}

发送持久化消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
public class OrderService {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送持久化消息
*/
public void createOrder(Order order) {
OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderId(order.getId());

// 设置消息持久化
Message message = MessageBuilder
.withBody(JSON.toJSONBytes(event))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
.build();

rabbitTemplate.send("order.exchange", "order.created", message);
}
}

4.2 集群高可用

4.2.1 集群配置

集群高可用:MQ集群模式,单节点故障不影响消息。

RabbitMQ集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# docker-compose.yml
version: '3.8'
services:
rabbitmq1:
image: rabbitmq:3.9-management
hostname: rabbitmq1
environment:
RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin123
ports:
- "5672:5672"
- "15672:15672"
volumes:
- rabbitmq1_data:/var/lib/rabbitmq

rabbitmq2:
image: rabbitmq:3.9-management
hostname: rabbitmq2
environment:
RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin123
ports:
- "5673:5672"
- "15673:15672"
volumes:
- rabbitmq2_data:/var/lib/rabbitmq
depends_on:
- rabbitmq1

volumes:
rabbitmq1_data:
rabbitmq2_data:

镜像队列(高可用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class RabbitMQConfig {

/**
* 镜像队列(高可用)
*/
@Bean
public Queue orderQueue() {
Map<String, Object> arguments = new HashMap<>();
// 设置镜像策略:所有节点都镜像
arguments.put("x-ha-policy", "all");

return QueueBuilder.durable("order.queue")
.withArguments(arguments)
.build();
}
}

4.3 消息复制

4.3.1 多副本机制

消息复制:消息在多个节点复制,单节点故障不影响消息。

Kafka多副本

1
2
3
4
5
6
7
8
9
# server.properties
# 副本因子:每个分区3个副本
default.replication.factor=3

# 最小同步副本:至少2个副本同步成功才认为写入成功
min.insync.replicas=2

# 生产者acks:等待所有副本确认
acks=all

Kafka生产者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class KafkaConfig {

@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

// 关键配置:保证消息不丢
configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
configProps.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试3次
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 保证顺序
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性

return new DefaultKafkaProducerFactory<>(configProps);
}
}

5. 消费者保证消息不丢

5.1 手动确认

5.1.1 确认机制

手动确认:消费者处理完消息后手动确认,处理失败不确认,消息会重试。

RabbitMQ手动确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class RabbitMQConfig {

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);

// 关键:手动确认模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

// 设置预取数量(一次最多拉取多少条消息)
factory.setPrefetchCount(10);

return factory;
}
}

消费者手动确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
public class OrderConsumer {

@RabbitListener(queues = "order.queue")
public void handleOrderCreated(
OrderCreatedEvent event,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {

try {
// 1. 处理消息
orderService.processOrder(event);

// 2. 处理成功,手动确认
channel.basicAck(deliveryTag, false);

} catch (Exception e) {
log.error("Process order failed", e);

// 3. 处理失败,拒绝消息
// requeue = true:重新入队,会重试
// requeue = false:不重新入队,进入死信队列
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
log.error("Nack failed", ioException);
}
}
}
}

5.2 幂等性处理

5.2.1 幂等保证

幂等性:同一消息处理多次,结果一致。

实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Component
public class OrderConsumer {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@RabbitListener(queues = "order.queue")
public void handleOrderCreated(
OrderCreatedEvent event,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {

String messageId = event.getEventId();
String key = "order:processed:" + messageId;

// 1. 检查是否已处理(幂等性)
Boolean processed = redisTemplate.opsForValue().setIfAbsent(
key,
"1",
24,
TimeUnit.HOURS
);

if (Boolean.FALSE.equals(processed)) {
// 已处理,直接确认
log.info("Message already processed: {}", messageId);
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
log.error("Ack failed", e);
}
return;
}

try {
// 2. 处理消息
orderService.processOrder(event);

// 3. 处理成功,确认消息
channel.basicAck(deliveryTag, false);

} catch (Exception e) {
log.error("Process order failed", e);

// 4. 处理失败,删除标记,拒绝消息(会重试)
redisTemplate.delete(key);

try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
log.error("Nack failed", ioException);
}
}
}
}

5.3 死信队列

5.3.1 死信处理

死信队列:处理失败超过重试次数的消息,进入死信队列,人工处理。

配置死信队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Configuration
public class RabbitMQConfig {

/**
* 死信交换机
*/
@Bean
public Exchange dlxExchange() {
return ExchangeBuilder.topicExchange("dlx.exchange")
.durable(true)
.build();
}

/**
* 死信队列
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable("dlx.queue").build();
}

/**
* 绑定死信队列
*/
@Bean
public Binding dlxBinding() {
return BindingBuilder
.bind(dlxQueue())
.to(dlxExchange())
.with("order.#")
.noargs();
}

/**
* 业务队列(配置死信)
*/
@Bean
public Queue orderQueue() {
Map<String, Object> arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange", "dlx.exchange");
// 设置死信路由键
arguments.put("x-dead-letter-routing-key", "order.failed");
// 设置消息TTL(可选)
arguments.put("x-message-ttl", 60000); // 60秒

return QueueBuilder.durable("order.queue")
.withArguments(arguments)
.build();
}
}

死信消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
public class DeadLetterConsumer {

@RabbitListener(queues = "dlx.queue")
public void handleDeadLetter(OrderCreatedEvent event) {
log.error("Dead letter message: {}", event);

// 1. 记录到数据库
saveDeadLetterMessage(event);

// 2. 发送告警
alertService.sendAlert("Dead letter message: " + event.getEventId());

// 3. 人工处理或自动补偿
// ...
}
}

6. 不同MQ的可靠性机制

6.1 RabbitMQ可靠性

6.1.1 可靠性配置

RabbitMQ可靠性保证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Configuration
public class RabbitMQReliabilityConfig {

/**
* 生产者配置(保证不丢)
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);

// 1. 开启发送确认
template.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.error("Message send failed: {}", cause);
// 重试或记录
}
});

// 2. 开启消息返回(路由失败时)
template.setMandatory(true);
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.error("Message return: {}", replyText);
// 处理路由失败的消息
});

return template;
}

/**
* 消费者配置(保证不丢)
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);

// 1. 手动确认
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

// 2. 设置预取数量
factory.setPrefetchCount(10);

// 3. 设置并发消费者
factory.setConcurrentConsumers(5);
factory.setMaxConcurrentConsumers(10);

return factory;
}

/**
* 持久化队列
*/
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue").build();
}

/**
* 持久化交换机
*/
@Bean
public Exchange orderExchange() {
return ExchangeBuilder.topicExchange("order.exchange")
.durable(true)
.build();
}
}

6.2 Kafka可靠性

6.2.1 可靠性配置

Kafka可靠性保证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Configuration
public class KafkaReliabilityConfig {

/**
* 生产者配置(保证不丢)
*/
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

// 关键配置:保证消息不丢
configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 保证顺序
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 压缩

return new DefaultKafkaProducerFactory<>(configProps);
}

/**
* 消费者配置(保证不丢)
*/
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

// 关键配置:保证消息不丢
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早开始消费
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 每次拉取100条

return new DefaultKafkaConsumerFactory<>(configProps);
}
}

Kafka消费者手动提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class OrderConsumer {

@KafkaListener(topics = "order-topic", groupId = "order-group")
public void handleOrderCreated(OrderCreatedEvent event, Acknowledgment acknowledgment) {
try {
// 1. 处理消息
orderService.processOrder(event);

// 2. 处理成功,手动提交
acknowledgment.acknowledge();

} catch (Exception e) {
log.error("Process order failed", e);
// 不提交,消息会重新消费
// 或者记录到数据库,后续补偿
}
}
}

6.3 RocketMQ可靠性

6.3.1 可靠性配置

RocketMQ可靠性保证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class RocketMQReliabilityConfig {

/**
* 生产者配置(保证不丢)
*/
@Bean
public RocketMQTemplate rocketMQTemplate() {
RocketMQTemplate template = new RocketMQTemplate();

// 关键配置:保证消息不丢
template.setProducer(new DefaultMQProducer("order-producer-group"));
template.getProducer().setRetryTimesWhenSendFailed(3); // 重试3次
template.getProducer().setSendMsgTimeout(3000); // 超时3秒

return template;
}
}

RocketMQ事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
public class OrderService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
* 发送事务消息(保证不丢)
*/
public void createOrder(Order order) {
// 发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-topic",
MessageBuilder.withPayload(order).build(),
order
);

if (result.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) {
throw new RuntimeException("Transaction message send failed");
}
}
}

RocketMQ消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
consumeMode = ConsumeMode.ORDERLY, // 顺序消费
messageModel = MessageModel.CLUSTERING // 集群模式
)
public class OrderConsumer implements RocketMQListener<OrderCreatedEvent> {

@Override
public void onMessage(OrderCreatedEvent event) {
try {
// 处理消息
orderService.processOrder(event);

// 处理成功,自动确认(RocketMQ自动确认)

} catch (Exception e) {
log.error("Process order failed", e);
// 抛出异常,消息会重试
throw e;
}
}
}

7. 最佳实践

7.1 完整方案

7.1.1 三端保证

完整方案:生产者、MQ、消费者三端都要保证消息不丢。

生产者保证

  1. ✅ 开启发送确认
  2. ✅ 使用事务消息
  3. ✅ 失败重试机制
  4. ✅ 失败消息补偿

MQ保证

  1. ✅ 消息持久化
  2. ✅ 队列持久化
  3. ✅ 交换机持久化
  4. ✅ 集群高可用
  5. ✅ 消息复制

消费者保证

  1. ✅ 手动确认
  2. ✅ 幂等性处理
  3. ✅ 死信队列
  4. ✅ 失败重试

7.2 监控告警

7.2.1 监控指标

监控指标

  1. 消息堆积:队列中消息数量
  2. 消费速度:每秒消费消息数
  3. 错误率:消息处理错误率
  4. 延迟:消息处理延迟
  5. 死信数量:死信队列消息数量

监控实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Component
public class MQMonitor {

@Autowired
private RabbitTemplate rabbitTemplate;

@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void monitorQueue() {
// 监控队列消息数量
QueueInformation queueInfo = rabbitTemplate.execute(channel -> {
return channel.queueDeclarePassive("order.queue");
});

long messageCount = queueInfo.getMessageCount();
long consumerCount = queueInfo.getConsumerCount();

// 告警规则
if (messageCount > 10000) {
alertService.sendAlert("队列消息堆积: " + messageCount);
}

if (consumerCount == 0) {
alertService.sendAlert("队列无消费者: order.queue");
}
}
}

7.3 测试验证

7.3.1 测试场景

测试场景

  1. 生产者发送失败:模拟网络故障,验证重试机制
  2. MQ重启:重启MQ,验证消息不丢失
  3. 消费者处理失败:模拟处理异常,验证重试机制
  4. 消费者宕机:模拟消费者宕机,验证消息不丢失

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@SpringBootTest
public class MQReliabilityTest {

@Autowired
private OrderService orderService;

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testMessageNotLost() {
// 1. 创建订单
Order order = new Order();
order.setProductId(1L);
order.setQuantity(1);

orderService.createOrder(order);

// 2. 验证消息已发送
// ...

// 3. 模拟MQ重启
// ...

// 4. 验证消息不丢失
// ...
}
}

8. 总结

8.1 核心要点

  1. 三端保证:生产者、MQ、消费者三端都要保证消息不丢
  2. 持久化:消息、队列、交换机都要持久化
  3. 确认机制:生产者发送确认,消费者手动确认
  4. 重试机制:失败重试,保证最终成功
  5. 幂等性:消费者处理要保证幂等性

8.2 关键理解

  1. 没有绝对不丢:只能做到99.99%不丢,不能100%保证
  2. 性能与可靠性平衡:可靠性越高,性能可能越低
  3. 监控告警重要:完善的监控是保证可靠性的基础
  4. 测试验证必要:通过测试验证可靠性机制

8.3 最佳实践

  1. 生产者:开启确认、事务消息、失败重试
  2. MQ:持久化、集群、消息复制
  3. 消费者:手动确认、幂等处理、死信队列
  4. 监控:实时监控、及时告警
  5. 测试:定期测试、验证可靠性

相关文章