1. Kafka死信队列概述

在分布式系统中,消息消费失败是常见的场景。死信队列(Dead Letter Queue, DLQ)是处理消费失败消息的重要机制。本文将详细介绍Kafka死信队列的设计实现、消息重试策略、异常处理、监控告警的完整解决方案。

1.1 核心场景

  1. 消费失败: 消息消费过程中发生异常
  2. 重试机制: 自动重试失败的消息
  3. 死信处理: 多次重试失败后进入死信队列
  4. 异常追踪: 记录异常信息便于排查
  5. 消息恢复: 从死信队列恢复消息重新处理

1.2 技术架构

1
2
3
4
5
6
7
8
9
10
11
生产者 → 主题Topic → 消费者消费

消费失败

重试队列 (Retry Topic)

重试3次失败

死信队列 (DLQ Topic)

人工介入/自动恢复

2. Maven依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
<!-- pom.xml -->
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.kafka</groupId>
<artifactId>kafka-dlq-demo</artifactId>
<version>1.0.0</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
</parent>

<properties>
<java.version>11</java.version>
<kafka.version>3.2.0</kafka.version>
</properties>

<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<!-- JSON处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!-- Redis(用于记录重试次数) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- MySQL(用于存储死信消息) -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<!-- Micrometer(监控指标) -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

3. Kafka配置

3.1 application.yml配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# application.yml
server:
port: 8080

spring:
application:
name: kafka-dlq-demo

# Kafka配置
kafka:
bootstrap-servers: localhost:9092

# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
properties:
max.in.flight.requests.per.connection: 5
enable.idempotence: true

# 消费者配置
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
group-id: kafka-dlq-demo-group
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 100
properties:
spring.json.trusted.packages: "*"
isolation.level: read_committed

# 监听器配置
listener:
ack-mode: manual
concurrency: 3
poll-timeout: 3000

# Redis配置(用于记录重试次数)
redis:
host: localhost
port: 6379
password:
database: 0
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0

# 数据库配置(用于存储死信消息)
datasource:
url: jdbc:mysql://localhost:3306/kafka_dlq?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver

jpa:
hibernate:
ddl-auto: update
show-sql: true
properties:
hibernate:
format_sql: true

# 自定义配置
kafka:
topics:
# 主题名称
main: order-topic
# 重试主题
retry: order-retry-topic
# 死信主题
dlq: order-dlq-topic

retry:
# 最大重试次数
max-attempts: 3
# 重试间隔(毫秒)
backoff-interval: 5000
# 重试间隔倍数
backoff-multiplier: 2.0
# 最大重试间隔(毫秒)
max-backoff-interval: 60000

# 监控配置
management:
endpoints:
web:
exposure:
include: "*"
metrics:
export:
prometheus:
enabled: true

3.2 Kafka配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.kafka.config;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;

import java.util.HashMap;
import java.util.Map;

/**
* Kafka配置类
* @author Java实战
*/
@Configuration
public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${kafka.topics.main}")
private String mainTopic;

@Value("${kafka.topics.retry}")
private String retryTopic;

@Value("${kafka.topics.dlq}")
private String dlqTopic;

/**
* Kafka管理配置
*/
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}

/**
* 创建主题
* 分区数: 3, 副本数: 2
*/
@Bean
public NewTopic mainTopic() {
return TopicBuilder.name(mainTopic)
.partitions(3)
.replicas(2)
.compact()
.build();
}

/**
* 创建重试主题
* 分区数: 3, 副本数: 2
*/
@Bean
public NewTopic retryTopic() {
return TopicBuilder.name(retryTopic)
.partitions(3)
.replicas(2)
.compact()
.build();
}

/**
* 创建死信主题
* 分区数: 3, 副本数: 2
*/
@Bean
public NewTopic dlqTopic() {
return TopicBuilder.name(dlqTopic)
.partitions(3)
.replicas(2)
.compact()
.build();
}
}

4. 消息模型定义

4.1 消息实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.kafka.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
* 订单消息实体
* @author Java实战
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderMessage implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 订单ID
*/
private String orderId;

/**
* 用户ID
*/
private String userId;

/**
* 订单金额
*/
private Double amount;

/**
* 订单状态
*/
private String status;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 消息ID(用于幂等性)
*/
private String messageId;

/**
* 重试次数
*/
private Integer retryCount;

/**
* 最后错误信息
*/
private String lastError;
}

4.2 死信消息实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package com.kafka.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.*;
import java.time.LocalDateTime;

/**
* 死信消息实体(数据库)
* @author Java实战
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "dead_letter_message", indexes = {
@Index(name = "idx_topic_key", columnList = "topic,message_key"),
@Index(name = "idx_create_time", columnList = "create_time")
})
public class DeadLetterMessage {

/**
* 主键ID
*/
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

/**
* 主题名称
*/
@Column(nullable = false, length = 100)
private String topic;

/**
* 分区
*/
@Column(nullable = false)
private Integer partition;

/**
* 偏移量
*/
@Column(nullable = false)
private Long offset;

/**
* 消息键
*/
@Column(name = "message_key", length = 200)
private String messageKey;

/**
* 消息值
*/
@Column(nullable = false, columnDefinition = "TEXT")
private String messageValue;

/**
* 重试次数
*/
@Column(nullable = false)
private Integer retryCount;

/**
* 错误信息
*/
@Column(columnDefinition = "TEXT")
private String errorMessage;

/**
* 错误堆栈
*/
@Column(columnDefinition = "TEXT")
private String errorStack;

/**
* 创建时间
*/
@Column(name = "create_time", nullable = false)
private LocalDateTime createTime;

/**
* 更新时间
*/
@Column(name = "update_time")
private LocalDateTime updateTime;

/**
* 处理状态:PENDING-待处理, PROCESSING-处理中, RESOLVED-已解决, IGNORED-已忽略
*/
@Column(nullable = false, length = 20)
private String status;
}

5. 消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.kafka.producer;

import com.kafka.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.time.LocalDateTime;
import java.util.UUID;

/**
* Kafka消息生产者
* @author Java实战
*/
@Slf4j
@Service
public class OrderProducer {

@Autowired
private KafkaTemplate<String, OrderMessage> kafkaTemplate;

@Value("${kafka.topics.main}")
private String mainTopic;

/**
* 发送订单消息
*/
public void sendOrderMessage(OrderMessage message) {
try {
// 设置消息ID(用于幂等性)
if (message.getMessageId() == null) {
message.setMessageId(UUID.randomUUID().toString());
}

// 设置创建时间
if (message.getCreateTime() == null) {
message.setCreateTime(LocalDateTime.now());
}

// 初始化重试次数
if (message.getRetryCount() == null) {
message.setRetryCount(0);
}

log.info("发送订单消息: orderId={}, messageId={}",
message.getOrderId(), message.getMessageId());

// 异步发送消息
ListenableFuture<SendResult<String, OrderMessage>> future =
kafkaTemplate.send(mainTopic, message.getOrderId(), message);

// 添加回调
future.addCallback(new ListenableFutureCallback<SendResult<String, OrderMessage>>() {
@Override
public void onSuccess(SendResult<String, OrderMessage> result) {
log.info("消息发送成功: orderId={}, topic={}, partition={}, offset={}",
message.getOrderId(),
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}

@Override
public void onFailure(Throwable ex) {
log.error("消息发送失败: orderId={}, error={}",
message.getOrderId(), ex.getMessage(), ex);
}
});

} catch (Exception e) {
log.error("发送订单消息异常: orderId={}", message.getOrderId(), e);
throw new RuntimeException("发送消息失败", e);
}
}

/**
* 发送重试消息
*/
public void sendRetryMessage(OrderMessage message, String retryTopic) {
try {
// 增加重试次数
message.setRetryCount(message.getRetryCount() + 1);

log.info("发送重试消息: orderId={}, retryCount={}, topic={}",
message.getOrderId(), message.getRetryCount(), retryTopic);

kafkaTemplate.send(retryTopic, message.getOrderId(), message);

} catch (Exception e) {
log.error("发送重试消息失败: orderId={}", message.getOrderId(), e);
}
}

/**
* 发送死信消息
*/
public void sendDeadLetterMessage(OrderMessage message, String dlqTopic) {
try {
log.error("消息进入死信队列: orderId={}, retryCount={}, lastError={}",
message.getOrderId(), message.getRetryCount(), message.getLastError());

kafkaTemplate.send(dlqTopic, message.getOrderId(), message);

} catch (Exception e) {
log.error("发送死信消息失败: orderId={}", message.getOrderId(), e);
}
}
}

6. 消息消费者

6.1 主题消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package com.kafka.consumer;

import com.kafka.model.OrderMessage;
import com.kafka.service.OrderService;
import com.kafka.service.RetryService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
* 订单主题消费者
* @author Java实战
*/
@Slf4j
@Component
public class OrderConsumer {

@Autowired
private OrderService orderService;

@Autowired
private RetryService retryService;

@Value("${kafka.topics.main}")
private String mainTopic;

@Value("${kafka.topics.retry}")
private String retryTopic;

@Value("${kafka.retry.max-attempts}")
private int maxRetryAttempts;

/**
* 消费订单消息
* 使用手动提交模式,确保消息处理完成后才提交偏移量
*/
@KafkaListener(
topics = "${kafka.topics.main}",
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeOrder(ConsumerRecord<String, OrderMessage> record,
Acknowledgment acknowledgment) {
OrderMessage message = record.value();

try {
log.info("消费订单消息: topic={}, partition={}, offset={}, orderId={}, messageId={}",
record.topic(), record.partition(), record.offset(),
message.getOrderId(), message.getMessageId());

// 检查是否已经处理过(幂等性)
if (orderService.isProcessed(message.getMessageId())) {
log.warn("消息已处理,跳过: messageId={}", message.getMessageId());
acknowledgment.acknowledge();
return;
}

// 处理订单业务逻辑
orderService.processOrder(message);

// 标记消息已处理
orderService.markAsProcessed(message.getMessageId());

// 手动提交偏移量
acknowledgment.acknowledge();

log.info("订单消息处理成功: orderId={}", message.getOrderId());

} catch (Exception e) {
log.error("订单消息处理失败: orderId={}, error={}",
message.getOrderId(), e.getMessage(), e);

// 处理失败,进行重试
handleFailure(record, message, e, acknowledgment);
}
}

/**
* 处理消费失败
*/
private void handleFailure(ConsumerRecord<String, OrderMessage> record,
OrderMessage message,
Exception exception,
Acknowledgment acknowledgment) {
try {
// 记录错误信息
message.setLastError(exception.getMessage());

// 检查重试次数
int currentRetryCount = message.getRetryCount() != null ? message.getRetryCount() : 0;

if (currentRetryCount < maxRetryAttempts) {
// 发送到重试队列
log.info("消息进入重试队列: orderId={}, retryCount={}/{}",
message.getOrderId(), currentRetryCount + 1, maxRetryAttempts);

retryService.sendToRetryQueue(message, record.topic(),
record.partition(), record.offset(), exception);

} else {
// 超过最大重试次数,发送到死信队列
log.error("消息超过最大重试次数,进入死信队列: orderId={}, retryCount={}",
message.getOrderId(), currentRetryCount);

retryService.sendToDeadLetterQueue(message, record.topic(),
record.partition(), record.offset(), exception);
}

// 提交偏移量(避免重复消费)
acknowledgment.acknowledge();

} catch (Exception e) {
log.error("处理失败消息异常: orderId={}", message.getOrderId(), e);
// 不提交偏移量,等待下次重新消费
}
}
}

6.2 重试队列消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package com.kafka.consumer;

import com.kafka.model.OrderMessage;
import com.kafka.service.OrderService;
import com.kafka.service.RetryService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
* 重试队列消费者
* @author Java实战
*/
@Slf4j
@Component
public class RetryConsumer {

@Autowired
private OrderService orderService;

@Autowired
private RetryService retryService;

@Value("${kafka.retry.max-attempts}")
private int maxRetryAttempts;

@Value("${kafka.retry.backoff-interval}")
private long backoffInterval;

@Value("${kafka.retry.backoff-multiplier}")
private double backoffMultiplier;

@Value("${kafka.retry.max-backoff-interval}")
private long maxBackoffInterval;

/**
* 消费重试消息
* 使用延迟重试策略
*/
@KafkaListener(
topics = "${kafka.topics.retry}",
groupId = "${spring.kafka.consumer.group-id}-retry",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeRetry(ConsumerRecord<String, OrderMessage> record,
Acknowledgment acknowledgment) {
OrderMessage message = record.value();

try {
log.info("消费重试消息: topic={}, partition={}, offset={}, orderId={}, retryCount={}",
record.topic(), record.partition(), record.offset(),
message.getOrderId(), message.getRetryCount());

// 计算延迟时间(指数退避)
long delay = calculateBackoff(message.getRetryCount());
log.info("延迟重试: orderId={}, delay={}ms", message.getOrderId(), delay);

// 延迟执行
TimeUnit.MILLISECONDS.sleep(delay);

// 检查是否已经处理过(幂等性)
if (orderService.isProcessed(message.getMessageId())) {
log.warn("重试消息已处理,跳过: messageId={}", message.getMessageId());
acknowledgment.acknowledge();
return;
}

// 重新处理订单业务逻辑
orderService.processOrder(message);

// 标记消息已处理
orderService.markAsProcessed(message.getMessageId());

// 手动提交偏移量
acknowledgment.acknowledge();

log.info("重试消息处理成功: orderId={}, retryCount={}",
message.getOrderId(), message.getRetryCount());

} catch (Exception e) {
log.error("重试消息处理失败: orderId={}, retryCount={}, error={}",
message.getOrderId(), message.getRetryCount(), e.getMessage(), e);

// 处理失败,继续重试或进入死信队列
handleRetryFailure(record, message, e, acknowledgment);
}
}

/**
* 计算退避时间(指数退避)
*/
private long calculateBackoff(int retryCount) {
long delay = backoffInterval;

for (int i = 1; i < retryCount; i++) {
delay = (long) (delay * backoffMultiplier);
if (delay > maxBackoffInterval) {
delay = maxBackoffInterval;
break;
}
}

return delay;
}

/**
* 处理重试失败
*/
private void handleRetryFailure(ConsumerRecord<String, OrderMessage> record,
OrderMessage message,
Exception exception,
Acknowledgment acknowledgment) {
try {
// 记录错误信息
message.setLastError(exception.getMessage());

int currentRetryCount = message.getRetryCount();

if (currentRetryCount < maxRetryAttempts) {
// 继续重试
log.info("消息继续重试: orderId={}, retryCount={}/{}",
message.getOrderId(), currentRetryCount + 1, maxRetryAttempts);

retryService.sendToRetryQueue(message, record.topic(),
record.partition(), record.offset(), exception);

} else {
// 超过最大重试次数,发送到死信队列
log.error("重试消息超过最大重试次数,进入死信队列: orderId={}, retryCount={}",
message.getOrderId(), currentRetryCount);

retryService.sendToDeadLetterQueue(message, record.topic(),
record.partition(), record.offset(), exception);
}

// 提交偏移量
acknowledgment.acknowledge();

} catch (Exception e) {
log.error("处理重试失败消息异常: orderId={}", message.getOrderId(), e);
}
}
}

6.3 死信队列消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.kafka.consumer;

import com.kafka.model.OrderMessage;
import com.kafka.service.DeadLetterService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
* 死信队列消费者
* @author Java实战
*/
@Slf4j
@Component
public class DeadLetterConsumer {

@Autowired
private DeadLetterService deadLetterService;

/**
* 消费死信消息
* 将死信消息持久化到数据库,便于人工介入处理
*/
@KafkaListener(
topics = "${kafka.topics.dlq}",
groupId = "${spring.kafka.consumer.group-id}-dlq",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeDeadLetter(ConsumerRecord<String, OrderMessage> record,
Acknowledgment acknowledgment) {
OrderMessage message = record.value();

try {
log.error("消费死信消息: topic={}, partition={}, offset={}, orderId={}, retryCount={}, error={}",
record.topic(), record.partition(), record.offset(),
message.getOrderId(), message.getRetryCount(), message.getLastError());

// 保存死信消息到数据库
deadLetterService.saveDeadLetterMessage(
record.topic(),
record.partition(),
record.offset(),
record.key(),
message
);

// 发送告警通知
deadLetterService.sendAlert(message);

// 手动提交偏移量
acknowledgment.acknowledge();

log.info("死信消息已保存: orderId={}", message.getOrderId());

} catch (Exception e) {
log.error("死信消息处理失败: orderId={}", message.getOrderId(), e);
// 即使保存失败也提交偏移量,避免死循环
acknowledgment.acknowledge();
}
}
}

7. 业务服务实现

7.1 订单服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.kafka.service;

import com.kafka.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* 订单业务服务
* @author Java实战
*/
@Slf4j
@Service
public class OrderService {

@Autowired
private StringRedisTemplate redisTemplate;

private static final String PROCESSED_KEY_PREFIX = "kafka:processed:";
private static final long PROCESSED_KEY_TTL = 7; // 7天
private final Random random = new Random();

/**
* 处理订单业务逻辑
*/
public void processOrder(OrderMessage message) {
log.info("开始处理订单: orderId={}, userId={}, amount={}",
message.getOrderId(), message.getUserId(), message.getAmount());

try {
// 模拟业务处理
simulateBusinessLogic(message);

// 这里可以添加真实的业务逻辑,例如:
// 1. 验证订单信息
// 2. 扣减库存
// 3. 创建支付订单
// 4. 更新订单状态
// 5. 发送通知

log.info("订单处理完成: orderId={}", message.getOrderId());

} catch (Exception e) {
log.error("订单处理失败: orderId={}", message.getOrderId(), e);
throw new RuntimeException("订单处理失败: " + e.getMessage(), e);
}
}

/**
* 模拟业务逻辑
* 随机抛出异常以测试重试机制
*/
private void simulateBusinessLogic(OrderMessage message) throws Exception {
// 模拟处理时间
Thread.sleep(100 + random.nextInt(400));

// 30%概率抛出异常
if (random.nextInt(100) < 30) {
throw new RuntimeException("模拟业务处理失败");
}

// 模拟不同类型的异常
int errorType = random.nextInt(100);
if (errorType < 5) {
throw new IllegalArgumentException("参数验证失败");
} else if (errorType < 10) {
throw new NullPointerException("空指针异常");
} else if (errorType < 15) {
throw new IllegalStateException("状态异常");
}
}

/**
* 检查消息是否已处理(幂等性)
*/
public boolean isProcessed(String messageId) {
String key = PROCESSED_KEY_PREFIX + messageId;
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}

/**
* 标记消息已处理
*/
public void markAsProcessed(String messageId) {
String key = PROCESSED_KEY_PREFIX + messageId;
redisTemplate.opsForValue().set(key, "1", PROCESSED_KEY_TTL, TimeUnit.DAYS);
}
}

7.2 重试服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.kafka.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.kafka.model.OrderMessage;
import com.kafka.producer.OrderProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.PrintWriter;
import java.io.StringWriter;

/**
* 重试服务
* @author Java实战
*/
@Slf4j
@Service
public class RetryService {

@Autowired
private OrderProducer orderProducer;

@Autowired
private ObjectMapper objectMapper;

@Value("${kafka.topics.retry}")
private String retryTopic;

@Value("${kafka.topics.dlq}")
private String dlqTopic;

/**
* 发送消息到重试队列
*/
public void sendToRetryQueue(OrderMessage message, String originalTopic,
int partition, long offset, Exception exception) {
try {
log.info("发送消息到重试队列: orderId={}, originalTopic={}, partition={}, offset={}",
message.getOrderId(), originalTopic, partition, offset);

// 记录原始信息
message.setLastError(exception.getMessage());

// 发送到重试队列
orderProducer.sendRetryMessage(message, retryTopic);

log.info("消息已发送到重试队列: orderId={}, retryCount={}",
message.getOrderId(), message.getRetryCount());

} catch (Exception e) {
log.error("发送消息到重试队列失败: orderId={}", message.getOrderId(), e);
}
}

/**
* 发送消息到死信队列
*/
public void sendToDeadLetterQueue(OrderMessage message, String originalTopic,
int partition, long offset, Exception exception) {
try {
log.error("发送消息到死信队列: orderId={}, originalTopic={}, partition={}, offset={}, retryCount={}",
message.getOrderId(), originalTopic, partition, offset, message.getRetryCount());

// 记录完整的错误信息
message.setLastError(getFullStackTrace(exception));

// 发送到死信队列
orderProducer.sendDeadLetterMessage(message, dlqTopic);

log.error("消息已发送到死信队列: orderId={}", message.getOrderId());

} catch (Exception e) {
log.error("发送消息到死信队列失败: orderId={}", message.getOrderId(), e);
}
}

/**
* 获取完整的异常堆栈信息
*/
private String getFullStackTrace(Exception exception) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
exception.printStackTrace(pw);
return sw.toString();
}
}

7.3 死信服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package com.kafka.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.kafka.entity.DeadLetterMessage;
import com.kafka.model.OrderMessage;
import com.kafka.repository.DeadLetterRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.LocalDateTime;
import java.util.List;

/**
* 死信服务
* @author Java实战
*/
@Slf4j
@Service
public class DeadLetterService {

@Autowired
private DeadLetterRepository deadLetterRepository;

@Autowired
private ObjectMapper objectMapper;

/**
* 保存死信消息
*/
@Transactional
public void saveDeadLetterMessage(String topic, int partition, long offset,
String messageKey, OrderMessage message) {
try {
log.info("保存死信消息: topic={}, partition={}, offset={}, orderId={}",
topic, partition, offset, message.getOrderId());

// 将消息对象转换为JSON字符串
String messageValue = objectMapper.writeValueAsString(message);

// 构建死信消息实体
DeadLetterMessage deadLetter = DeadLetterMessage.builder()
.topic(topic)
.partition(partition)
.offset(offset)
.messageKey(messageKey)
.messageValue(messageValue)
.retryCount(message.getRetryCount())
.errorMessage(message.getLastError())
.errorStack(message.getLastError())
.createTime(LocalDateTime.now())
.updateTime(LocalDateTime.now())
.status("PENDING")
.build();

// 保存到数据库
deadLetterRepository.save(deadLetter);

log.info("死信消息已保存: orderId={}, id={}", message.getOrderId(), deadLetter.getId());

} catch (Exception e) {
log.error("保存死信消息失败: orderId={}", message.getOrderId(), e);
throw new RuntimeException("保存死信消息失败", e);
}
}

/**
* 发送告警通知
*/
public void sendAlert(OrderMessage message) {
try {
log.error("发送死信告警: orderId={}, retryCount={}, error={}",
message.getOrderId(), message.getRetryCount(), message.getLastError());

// 这里可以集成钉钉、邮件、短信等告警方式
// 例如:
// dingtalkService.sendAlert("死信告警", message);
// emailService.sendAlert("dead-letter@example.com", message);

} catch (Exception e) {
log.error("发送死信告警失败: orderId={}", message.getOrderId(), e);
}
}

/**
* 查询待处理的死信消息
*/
public List<DeadLetterMessage> getPendingMessages() {
return deadLetterRepository.findByStatus("PENDING");
}

/**
* 重新处理死信消息
*/
@Transactional
public void reprocessMessage(Long id) {
try {
DeadLetterMessage deadLetter = deadLetterRepository.findById(id)
.orElseThrow(() -> new IllegalArgumentException("死信消息不存在: " + id));

// 更新状态为处理中
deadLetter.setStatus("PROCESSING");
deadLetter.setUpdateTime(LocalDateTime.now());
deadLetterRepository.save(deadLetter);

log.info("开始重新处理死信消息: id={}, orderId={}", id, deadLetter.getMessageKey());

// 解析消息
OrderMessage message = objectMapper.readValue(
deadLetter.getMessageValue(), OrderMessage.class);

// 重置重试次数,重新发送到主题
message.setRetryCount(0);
message.setLastError(null);

// 这里可以重新发送消息到主题
// orderProducer.sendOrderMessage(message);

// 更新状态为已解决
deadLetter.setStatus("RESOLVED");
deadLetter.setUpdateTime(LocalDateTime.now());
deadLetterRepository.save(deadLetter);

log.info("死信消息重新处理完成: id={}", id);

} catch (Exception e) {
log.error("重新处理死信消息失败: id={}", id, e);
throw new RuntimeException("重新处理死信消息失败", e);
}
}

/**
* 忽略死信消息
*/
@Transactional
public void ignoreMessage(Long id) {
try {
DeadLetterMessage deadLetter = deadLetterRepository.findById(id)
.orElseThrow(() -> new IllegalArgumentException("死信消息不存在: " + id));

deadLetter.setStatus("IGNORED");
deadLetter.setUpdateTime(LocalDateTime.now());
deadLetterRepository.save(deadLetter);

log.info("死信消息已忽略: id={}", id);

} catch (Exception e) {
log.error("忽略死信消息失败: id={}", id, e);
throw new RuntimeException("忽略死信消息失败", e);
}
}
}

8. 数据访问层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.kafka.repository;

import com.kafka.entity.DeadLetterMessage;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import java.time.LocalDateTime;
import java.util.List;

/**
* 死信消息Repository
* @author Java实战
*/
@Repository
public interface DeadLetterRepository extends JpaRepository<DeadLetterMessage, Long> {

/**
* 根据状态查询死信消息
*/
List<DeadLetterMessage> findByStatus(String status);

/**
* 根据主题和消息键查询
*/
List<DeadLetterMessage> findByTopicAndMessageKey(String topic, String messageKey);

/**
* 查询指定时间范围内的死信消息
*/
List<DeadLetterMessage> findByCreateTimeBetween(LocalDateTime startTime, LocalDateTime endTime);

/**
* 统计死信消息数量
*/
long countByStatus(String status);
}

9. 控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package com.kafka.controller;

import com.kafka.entity.DeadLetterMessage;
import com.kafka.model.OrderMessage;
import com.kafka.producer.OrderProducer;
import com.kafka.service.DeadLetterService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;

/**
* Kafka死信队列控制器
* @author Java实战
*/
@Slf4j
@RestController
@RequestMapping("/api/kafka")
public class KafkaController {

@Autowired
private OrderProducer orderProducer;

@Autowired
private DeadLetterService deadLetterService;

/**
* 发送测试消息
*/
@PostMapping("/send")
public ResponseEntity<String> sendMessage(@RequestBody OrderMessage message) {
try {
// 设置默认值
if (message.getMessageId() == null) {
message.setMessageId(UUID.randomUUID().toString());
}
if (message.getCreateTime() == null) {
message.setCreateTime(LocalDateTime.now());
}
if (message.getRetryCount() == null) {
message.setRetryCount(0);
}

orderProducer.sendOrderMessage(message);

return ResponseEntity.ok("消息发送成功: " + message.getMessageId());

} catch (Exception e) {
log.error("发送消息失败", e);
return ResponseEntity.internalServerError().body("发送消息失败: " + e.getMessage());
}
}

/**
* 查询死信消息列表
*/
@GetMapping("/dead-letters")
public ResponseEntity<List<DeadLetterMessage>> getDeadLetters() {
try {
List<DeadLetterMessage> messages = deadLetterService.getPendingMessages();
return ResponseEntity.ok(messages);

} catch (Exception e) {
log.error("查询死信消息失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 重新处理死信消息
*/
@PostMapping("/dead-letters/{id}/reprocess")
public ResponseEntity<String> reprocessDeadLetter(@PathVariable Long id) {
try {
deadLetterService.reprocessMessage(id);
return ResponseEntity.ok("死信消息重新处理成功");

} catch (Exception e) {
log.error("重新处理死信消息失败", e);
return ResponseEntity.internalServerError().body("重新处理失败: " + e.getMessage());
}
}

/**
* 忽略死信消息
*/
@PostMapping("/dead-letters/{id}/ignore")
public ResponseEntity<String> ignoreDeadLetter(@PathVariable Long id) {
try {
deadLetterService.ignoreMessage(id);
return ResponseEntity.ok("死信消息已忽略");

} catch (Exception e) {
log.error("忽略死信消息失败", e);
return ResponseEntity.internalServerError().body("忽略失败: " + e.getMessage());
}
}
}

10. 总结

Kafka死信队列是处理消息消费失败的重要机制。通过本文的详细介绍,我们了解了:

  1. 死信队列设计: 主题-重试-死信的三级队列架构
  2. 消息重试机制: 指数退避策略、最大重试次数控制
  3. 幂等性保证: 使用Redis记录已处理消息,避免重复消费
  4. 异常处理: 完善的异常捕获和日志记录
  5. 死信持久化: 将死信消息存储到数据库便于人工处理
  6. 监控告警: 实时监控死信消息并发送告警通知

通过合理的死信队列设计和实现,可以有效提升系统的可靠性和容错能力,确保消息不会丢失。


Java实战要点:

  • 使用手动提交模式确保消息处理完成后才提交偏移量
  • 实现幂等性机制避免重复消费
  • 使用指数退避策略优化重试效率
  • 将死信消息持久化便于追踪和恢复
  • 完善的日志记录和异常处理

代码注解说明:

  • @KafkaListener: Kafka消息监听器注解
  • Acknowledgment: 手动提交偏移量
  • @Transactional: 事务管理确保数据一致性
  • ConsumerRecord: Kafka消费记录包含完整的消息元数据
  • 指数退避算法: delay = backoffInterval * (backoffMultiplier ^ retryCount)