1. Kafka重试队列概述

Kafka重试队列是处理消息消费失败的核心机制,通过多级重试队列和延迟消息处理,可以有效提升系统的容错能力和可靠性。本文将详细介绍Kafka重试队列的设计实现、延迟消息处理、定时重试策略、优先级队列的完整解决方案。

1.1 核心场景

  1. 多级重试: 根据重试次数使用不同的重试队列
  2. 延迟消息: 实现延迟消费和定时任务
  3. 优先级队列: 根据消息优先级进行处理
  4. 动态重试间隔: 支持固定间隔和指数退避
  5. 重试监控: 实时监控重试队列状态

1.2 技术架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
消息消费失败

快速重试队列 (5秒后重试)

中速重试队列 (30秒后重试)

慢速重试队列 (5分钟后重试)

死信队列 (人工处理)

延迟消息处理

时间轮算法 / Redis延迟队列

定时扫描 / 到期触发

重新投递到消费队列

2. Maven依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
<!-- pom.xml -->
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.kafka</groupId>
<artifactId>kafka-retry-queue-demo</artifactId>
<version>1.0.0</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
</parent>

<properties>
<java.version>11</java.version>
<kafka.version>3.2.0</kafka.version>
</properties>

<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<!-- JSON处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.6</version>
</dependency>

<!-- Scheduling -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

<!-- Micrometer(监控指标) -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

<!-- Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
</dependencies>
</project>

3. 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# application.yml
server:
port: 8080

spring:
application:
name: kafka-retry-queue-demo

# Kafka配置
kafka:
bootstrap-servers: localhost:9092

# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
properties:
max.in.flight.requests.per.connection: 5
enable.idempotence: true

# 消费者配置
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
group-id: kafka-retry-queue-group
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 100
properties:
spring.json.trusted.packages: "*"

# 监听器配置
listener:
ack-mode: manual
concurrency: 3

# Redis配置
redis:
host: localhost
port: 6379
password:
database: 0
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0

# 自定义Kafka主题配置
kafka:
topics:
# 主题
main: order-topic

# 快速重试队列(5秒)
retry-fast: order-retry-fast-topic

# 中速重试队列(30秒)
retry-medium: order-retry-medium-topic

# 慢速重试队列(5分钟)
retry-slow: order-retry-slow-topic

# 死信队列
dlq: order-dlq-topic

# 延迟消息队列
delay: order-delay-topic

# 重试配置
retry:
# 快速重试配置
fast:
max-attempts: 2
delay: 5000 # 5秒

# 中速重试配置
medium:
max-attempts: 3
delay: 30000 # 30秒

# 慢速重试配置
slow:
max-attempts: 3
delay: 300000 # 5分钟

# 监控配置
management:
endpoints:
web:
exposure:
include: "*"
metrics:
export:
prometheus:
enabled: true

4. 消息模型定义

4.1 重试消息实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.kafka.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
* 重试消息实体
* @author Java实战
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RetryMessage implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 消息ID(唯一标识)
*/
private String messageId;

/**
* 业务ID
*/
private String businessId;

/**
* 消息内容(JSON格式)
*/
private String messageBody;

/**
* 消息类型
*/
private String messageType;

/**
* 优先级(1-10,数字越大优先级越高)
*/
private Integer priority;

/**
* 重试次数
*/
private Integer retryCount;

/**
* 最大重试次数
*/
private Integer maxRetryCount;

/**
* 当前重试级别:FAST-快速, MEDIUM-中速, SLOW-慢速
*/
private String retryLevel;

/**
* 下次重试时间
*/
private LocalDateTime nextRetryTime;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 最后错误信息
*/
private String lastError;

/**
* 错误次数
*/
private Integer errorCount;
}

4.2 延迟消息实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.kafka.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
* 延迟消息实体
* @author Java实战
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DelayMessage implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 消息ID
*/
private String messageId;

/**
* 业务ID
*/
private String businessId;

/**
* 消息内容
*/
private String messageBody;

/**
* 目标主题(延迟到期后投递到该主题)
*/
private String targetTopic;

/**
* 延迟时间(毫秒)
*/
private Long delayTime;

/**
* 执行时间
*/
private LocalDateTime executeTime;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 状态:PENDING-待执行, EXECUTING-执行中, COMPLETED-已完成, FAILED-失败
*/
private String status;
}

5. 多级重试队列实现

5.1 重试队列管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package com.kafka.retry;

import com.kafka.model.RetryMessage;
import com.kafka.producer.RetryProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;

/**
* 重试队列管理器
* @author Java实战
*/
@Slf4j
@Service
public class RetryQueueManager {

@Autowired
private RetryProducer retryProducer;

@Value("${kafka.topics.retry-fast}")
private String fastRetryTopic;

@Value("${kafka.topics.retry-medium}")
private String mediumRetryTopic;

@Value("${kafka.topics.retry-slow}")
private String slowRetryTopic;

@Value("${kafka.topics.dlq}")
private String dlqTopic;

@Value("${kafka.retry.fast.max-attempts}")
private int fastMaxAttempts;

@Value("${kafka.retry.fast.delay}")
private long fastDelay;

@Value("${kafka.retry.medium.max-attempts}")
private int mediumMaxAttempts;

@Value("${kafka.retry.medium.delay}")
private long mediumDelay;

@Value("${kafka.retry.slow.max-attempts}")
private int slowMaxAttempts;

@Value("${kafka.retry.slow.delay}")
private long slowDelay;

/**
* 处理重试逻辑
*/
public void handleRetry(RetryMessage message, Exception exception) {
try {
// 记录错误信息
message.setLastError(exception.getMessage());
message.setErrorCount(message.getErrorCount() + 1);

// 获取当前重试级别
String currentLevel = message.getRetryLevel();
if (currentLevel == null) {
currentLevel = "FAST";
}

log.info("处理重试: messageId={}, businessId={}, level={}, retryCount={}",
message.getMessageId(), message.getBusinessId(), currentLevel, message.getRetryCount());

// 根据重试级别处理
switch (currentLevel) {
case "FAST":
handleFastRetry(message);
break;
case "MEDIUM":
handleMediumRetry(message);
break;
case "SLOW":
handleSlowRetry(message);
break;
default:
log.warn("未知的重试级别: {}", currentLevel);
sendToDLQ(message);
}

} catch (Exception e) {
log.error("处理重试失败: messageId={}", message.getMessageId(), e);
}
}

/**
* 处理快速重试(5秒间隔)
*/
private void handleFastRetry(RetryMessage message) {
int retryCount = message.getRetryCount() != null ? message.getRetryCount() : 0;

if (retryCount < fastMaxAttempts) {
// 继续在快速队列重试
message.setRetryCount(retryCount + 1);
message.setRetryLevel("FAST");
message.setNextRetryTime(LocalDateTime.now().plusSeconds(fastDelay / 1000));

log.info("发送到快速重试队列: messageId={}, retryCount={}/{}",
message.getMessageId(), message.getRetryCount(), fastMaxAttempts);

retryProducer.sendRetryMessage(message, fastRetryTopic, fastDelay);

} else {
// 升级到中速队列
log.info("升级到中速重试队列: messageId={}", message.getMessageId());
message.setRetryCount(0);
message.setRetryLevel("MEDIUM");
message.setNextRetryTime(LocalDateTime.now().plusSeconds(mediumDelay / 1000));

retryProducer.sendRetryMessage(message, mediumRetryTopic, mediumDelay);
}
}

/**
* 处理中速重试(30秒间隔)
*/
private void handleMediumRetry(RetryMessage message) {
int retryCount = message.getRetryCount() != null ? message.getRetryCount() : 0;

if (retryCount < mediumMaxAttempts) {
// 继续在中速队列重试
message.setRetryCount(retryCount + 1);
message.setRetryLevel("MEDIUM");
message.setNextRetryTime(LocalDateTime.now().plusSeconds(mediumDelay / 1000));

log.info("发送到中速重试队列: messageId={}, retryCount={}/{}",
message.getMessageId(), message.getRetryCount(), mediumMaxAttempts);

retryProducer.sendRetryMessage(message, mediumRetryTopic, mediumDelay);

} else {
// 升级到慢速队列
log.info("升级到慢速重试队列: messageId={}", message.getMessageId());
message.setRetryCount(0);
message.setRetryLevel("SLOW");
message.setNextRetryTime(LocalDateTime.now().plusSeconds(slowDelay / 1000));

retryProducer.sendRetryMessage(message, slowRetryTopic, slowDelay);
}
}

/**
* 处理慢速重试(5分钟间隔)
*/
private void handleSlowRetry(RetryMessage message) {
int retryCount = message.getRetryCount() != null ? message.getRetryCount() : 0;

if (retryCount < slowMaxAttempts) {
// 继续在慢速队列重试
message.setRetryCount(retryCount + 1);
message.setRetryLevel("SLOW");
message.setNextRetryTime(LocalDateTime.now().plusSeconds(slowDelay / 1000));

log.info("发送到慢速重试队列: messageId={}, retryCount={}/{}",
message.getMessageId(), message.getRetryCount(), slowMaxAttempts);

retryProducer.sendRetryMessage(message, slowRetryTopic, slowDelay);

} else {
// 发送到死信队列
log.error("重试次数已达上限,发送到死信队列: messageId={}", message.getMessageId());
sendToDLQ(message);
}
}

/**
* 发送到死信队列
*/
private void sendToDLQ(RetryMessage message) {
message.setRetryLevel("DLQ");
retryProducer.sendToDLQ(message, dlqTopic);
}
}

5.2 重试消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package com.kafka.producer;

import com.kafka.model.RetryMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.concurrent.TimeUnit;

/**
* 重试消息生产者
* @author Java实战
*/
@Slf4j
@Service
public class RetryProducer {

@Autowired
private KafkaTemplate<String, RetryMessage> kafkaTemplate;

/**
* 发送重试消息
*/
public void sendRetryMessage(RetryMessage message, String topic, long delayMs) {
try {
log.info("发送重试消息: topic={}, messageId={}, businessId={}, delay={}ms",
topic, message.getMessageId(), message.getBusinessId(), delayMs);

// 如果需要延迟,则先sleep(实际生产中应使用更优雅的延迟方案)
if (delayMs > 0) {
// 这里可以使用时间轮算法或Redis延迟队列
// 为了演示,这里使用简单的定时任务
scheduleDelayedMessage(message, topic, delayMs);
return;
}

// 立即发送
ListenableFuture<SendResult<String, RetryMessage>> future =
kafkaTemplate.send(topic, message.getBusinessId(), message);

future.addCallback(new ListenableFutureCallback<SendResult<String, RetryMessage>>() {
@Override
public void onSuccess(SendResult<String, RetryMessage> result) {
log.info("重试消息发送成功: messageId={}, topic={}, partition={}, offset={}",
message.getMessageId(),
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}

@Override
public void onFailure(Throwable ex) {
log.error("重试消息发送失败: messageId={}, error={}",
message.getMessageId(), ex.getMessage(), ex);
}
});

} catch (Exception e) {
log.error("发送重试消息异常: messageId={}", message.getMessageId(), e);
}
}

/**
* 定时发送延迟消息
*/
private void scheduleDelayedMessage(RetryMessage message, String topic, long delayMs) {
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(delayMs);
kafkaTemplate.send(topic, message.getBusinessId(), message);
log.info("延迟消息已发送: messageId={}, delay={}ms", message.getMessageId(), delayMs);
} catch (Exception e) {
log.error("延迟消息发送失败: messageId={}", message.getMessageId(), e);
}
}).start();
}

/**
* 发送到死信队列
*/
public void sendToDLQ(RetryMessage message, String dlqTopic) {
try {
log.error("发送到死信队列: messageId={}, businessId={}, errorCount={}",
message.getMessageId(), message.getBusinessId(), message.getErrorCount());

kafkaTemplate.send(dlqTopic, message.getBusinessId(), message);

} catch (Exception e) {
log.error("发送到死信队列失败: messageId={}", message.getMessageId(), e);
}
}
}

5.3 重试消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package com.kafka.consumer;

import com.kafka.model.RetryMessage;
import com.kafka.retry.RetryQueueManager;
import com.kafka.service.MessageProcessor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
* 快速重试队列消费者
* @author Java实战
*/
@Slf4j
@Component
public class FastRetryConsumer {

@Autowired
private MessageProcessor messageProcessor;

@Autowired
private RetryQueueManager retryQueueManager;

/**
* 消费快速重试消息
*/
@KafkaListener(
topics = "${kafka.topics.retry-fast}",
groupId = "${spring.kafka.consumer.group-id}-retry-fast",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeFastRetry(ConsumerRecord<String, RetryMessage> record,
Acknowledgment acknowledgment) {
RetryMessage message = record.value();

try {
log.info("消费快速重试消息: messageId={}, businessId={}, retryCount={}",
message.getMessageId(), message.getBusinessId(), message.getRetryCount());

// 处理消息
messageProcessor.process(message);

// 处理成功,提交偏移量
acknowledgment.acknowledge();

log.info("快速重试消息处理成功: messageId={}", message.getMessageId());

} catch (Exception e) {
log.error("快速重试消息处理失败: messageId={}, error={}",
message.getMessageId(), e.getMessage(), e);

// 处理失败,继续重试
retryQueueManager.handleRetry(message, e);

// 提交偏移量
acknowledgment.acknowledge();
}
}
}

/**
* 中速重试队列消费者
* @author Java实战
*/
@Slf4j
@Component
class MediumRetryConsumer {

@Autowired
private MessageProcessor messageProcessor;

@Autowired
private RetryQueueManager retryQueueManager;

/**
* 消费中速重试消息
*/
@KafkaListener(
topics = "${kafka.topics.retry-medium}",
groupId = "${spring.kafka.consumer.group-id}-retry-medium",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeMediumRetry(ConsumerRecord<String, RetryMessage> record,
Acknowledgment acknowledgment) {
RetryMessage message = record.value();

try {
log.info("消费中速重试消息: messageId={}, businessId={}, retryCount={}",
message.getMessageId(), message.getBusinessId(), message.getRetryCount());

// 处理消息
messageProcessor.process(message);

// 处理成功,提交偏移量
acknowledgment.acknowledge();

log.info("中速重试消息处理成功: messageId={}", message.getMessageId());

} catch (Exception e) {
log.error("中速重试消息处理失败: messageId={}, error={}",
message.getMessageId(), e.getMessage(), e);

// 处理失败,继续重试
retryQueueManager.handleRetry(message, e);

// 提交偏移量
acknowledgment.acknowledge();
}
}
}

/**
* 慢速重试队列消费者
* @author Java实战
*/
@Slf4j
@Component
class SlowRetryConsumer {

@Autowired
private MessageProcessor messageProcessor;

@Autowired
private RetryQueueManager retryQueueManager;

/**
* 消费慢速重试消息
*/
@KafkaListener(
topics = "${kafka.topics.retry-slow}",
groupId = "${spring.kafka.consumer.group-id}-retry-slow",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeSlowRetry(ConsumerRecord<String, RetryMessage> record,
Acknowledgment acknowledgment) {
RetryMessage message = record.value();

try {
log.info("消费慢速重试消息: messageId={}, businessId={}, retryCount={}",
message.getMessageId(), message.getBusinessId(), message.getRetryCount());

// 处理消息
messageProcessor.process(message);

// 处理成功,提交偏移量
acknowledgment.acknowledge();

log.info("慢速重试消息处理成功: messageId={}", message.getMessageId());

} catch (Exception e) {
log.error("慢速重试消息处理失败: messageId={}, error={}",
message.getMessageId(), e.getMessage(), e);

// 处理失败,继续重试
retryQueueManager.handleRetry(message, e);

// 提交偏移量
acknowledgment.acknowledge();
}
}
}

6. 延迟消息处理实现

6.1 基于Redis的延迟队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.kafka.delay;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.kafka.model.DelayMessage;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.concurrent.TimeUnit;

/**
* 基于Redis的延迟队列实现
* @author Java实战
*/
@Slf4j
@Service
public class RedisDelayQueue {

@Autowired
private RedissonClient redissonClient;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private ObjectMapper objectMapper;

private static final String DELAY_QUEUE_NAME = "kafka:delay:queue";

private RBlockingQueue<DelayMessage> blockingQueue;
private RDelayedQueue<DelayMessage> delayedQueue;

/**
* 初始化延迟队列
*/
@PostConstruct
public void init() {
blockingQueue = redissonClient.getBlockingQueue(DELAY_QUEUE_NAME);
delayedQueue = redissonClient.getDelayedQueue(blockingQueue);

log.info("Redis延迟队列初始化完成");

// 启动消费线程
startConsumer();
}

/**
* 添加延迟消息
*/
public void addDelayMessage(DelayMessage message) {
try {
message.setCreateTime(LocalDateTime.now());
message.setStatus("PENDING");

// 计算延迟时间
long delay = message.getDelayTime();

log.info("添加延迟消息: messageId={}, businessId={}, delay={}ms",
message.getMessageId(), message.getBusinessId(), delay);

// 添加到延迟队列
delayedQueue.offer(message, delay, TimeUnit.MILLISECONDS);

} catch (Exception e) {
log.error("添加延迟消息失败: messageId={}", message.getMessageId(), e);
}
}

/**
* 启动消费线程
*/
private void startConsumer() {
new Thread(() -> {
log.info("延迟队列消费线程启动");

while (true) {
try {
// 阻塞获取到期的消息
DelayMessage message = blockingQueue.take();

log.info("获取到期延迟消息: messageId={}, businessId={}",
message.getMessageId(), message.getBusinessId());

// 处理到期消息
processExpiredMessage(message);

} catch (InterruptedException e) {
log.warn("延迟队列消费线程被中断");
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("处理延迟消息异常", e);
}
}
}).start();
}

/**
* 处理到期消息
*/
private void processExpiredMessage(DelayMessage message) {
try {
message.setStatus("EXECUTING");

// 发送到目标Kafka主题
kafkaTemplate.send(message.getTargetTopic(),
message.getBusinessId(), message.getMessageBody());

message.setStatus("COMPLETED");

log.info("延迟消息已投递: messageId={}, targetTopic={}",
message.getMessageId(), message.getTargetTopic());

} catch (Exception e) {
message.setStatus("FAILED");
log.error("延迟消息投递失败: messageId={}", message.getMessageId(), e);
}
}
}

6.2 基于时间轮的延迟队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package com.kafka.delay;

import com.kafka.model.DelayMessage;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Set;
import java.util.concurrent.*;

/**
* 基于时间轮的延迟队列实现
* @author Java实战
*/
@Slf4j
@Service
public class TimingWheelDelayQueue {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 时间轮(每个槽代表1秒)
*/
private final ConcurrentHashMap<Integer, Set<DelayMessage>> timeWheel = new ConcurrentHashMap<>();

/**
* 时间轮大小(3600秒 = 1小时)
*/
private static final int WHEEL_SIZE = 3600;

/**
* 当前指针位置
*/
private int currentIndex = 0;

/**
* 定时任务执行器
*/
private ScheduledExecutorService scheduler;

/**
* 任务执行线程池
*/
private ExecutorService executorService;

/**
* 初始化时间轮
*/
@PostConstruct
public void init() {
// 初始化时间轮槽位
for (int i = 0; i < WHEEL_SIZE; i++) {
timeWheel.put(i, ConcurrentHashMap.newKeySet());
}

// 创建线程池
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("timing-wheel-%d")
.build();
scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
executorService = Executors.newFixedThreadPool(10, threadFactory);

// 启动时间轮
startTimeWheel();

log.info("时间轮延迟队列初始化完成");
}

/**
* 添加延迟消息
*/
public void addDelayMessage(DelayMessage message, long delaySeconds) {
try {
if (delaySeconds <= 0) {
// 立即执行
executeMessage(message);
return;
}

if (delaySeconds >= WHEEL_SIZE) {
// 超过时间轮范围,使用其他方案
log.warn("延迟时间超过时间轮范围: {}秒", delaySeconds);
return;
}

// 计算槽位
int slotIndex = (currentIndex + (int) delaySeconds) % WHEEL_SIZE;

log.info("添加延迟消息到时间轮: messageId={}, slotIndex={}, delay={}s",
message.getMessageId(), slotIndex, delaySeconds);

// 添加到对应槽位
timeWheel.get(slotIndex).add(message);

} catch (Exception e) {
log.error("添加延迟消息失败: messageId={}", message.getMessageId(), e);
}
}

/**
* 启动时间轮
*/
private void startTimeWheel() {
scheduler.scheduleAtFixedRate(() -> {
try {
// 获取当前槽位的消息
Set<DelayMessage> messages = timeWheel.get(currentIndex);

if (!messages.isEmpty()) {
log.info("时间轮指针移动: index={}, messages={}", currentIndex, messages.size());

// 执行当前槽位的所有消息
messages.forEach(message -> {
executorService.submit(() -> executeMessage(message));
});

// 清空当前槽位
messages.clear();
}

// 移动指针
currentIndex = (currentIndex + 1) % WHEEL_SIZE;

} catch (Exception e) {
log.error("时间轮执行异常", e);
}
}, 0, 1, TimeUnit.SECONDS);
}

/**
* 执行消息
*/
private void executeMessage(DelayMessage message) {
try {
log.info("执行延迟消息: messageId={}, businessId={}",
message.getMessageId(), message.getBusinessId());

// 发送到目标Kafka主题
kafkaTemplate.send(message.getTargetTopic(),
message.getBusinessId(), message.getMessageBody());

log.info("延迟消息已投递: messageId={}, targetTopic={}",
message.getMessageId(), message.getTargetTopic());

} catch (Exception e) {
log.error("执行延迟消息失败: messageId={}", message.getMessageId(), e);
}
}

/**
* 销毁时间轮
*/
@PreDestroy
public void destroy() {
if (scheduler != null) {
scheduler.shutdown();
}
if (executorService != null) {
executorService.shutdown();
}
log.info("时间轮延迟队列已销毁");
}
}

7. 优先级队列实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package com.kafka.priority;

import com.kafka.model.RetryMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;

/**
* 优先级队列管理器
* @author Java实战
*/
@Slf4j
@Service
public class PriorityQueueManager {

@Autowired
private KafkaTemplate<String, RetryMessage> kafkaTemplate;

/**
* 优先级队列(优先级高的先处理)
*/
private final PriorityBlockingQueue<RetryMessage> priorityQueue =
new PriorityBlockingQueue<>(1000,
Comparator.comparing(RetryMessage::getPriority).reversed());

/**
* 添加消息到优先级队列
*/
public void addMessage(RetryMessage message) {
try {
// 设置默认优先级
if (message.getPriority() == null) {
message.setPriority(5);
}

log.info("添加消息到优先级队列: messageId={}, businessId={}, priority={}",
message.getMessageId(), message.getBusinessId(), message.getPriority());

priorityQueue.offer(message);

} catch (Exception e) {
log.error("添加消息到优先级队列失败: messageId={}", message.getMessageId(), e);
}
}

/**
* 批量处理优先级队列消息
*/
public void processBatch(String targetTopic, int batchSize) {
try {
int count = 0;

while (count < batchSize && !priorityQueue.isEmpty()) {
RetryMessage message = priorityQueue.poll();

if (message != null) {
log.info("处理优先级消息: messageId={}, priority={}",
message.getMessageId(), message.getPriority());

// 发送到目标主题
kafkaTemplate.send(targetTopic, message.getBusinessId(), message);

count++;
}
}

log.info("批量处理优先级消息完成: count={}", count);

} catch (Exception e) {
log.error("批量处理优先级消息失败", e);
}
}

/**
* 获取队列大小
*/
public int getQueueSize() {
return priorityQueue.size();
}
}

8. 消息处理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.kafka.service;

import com.kafka.model.RetryMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Random;

/**
* 消息处理服务
* @author Java实战
*/
@Slf4j
@Service
public class MessageProcessor {

private final Random random = new Random();

/**
* 处理消息
*/
public void process(RetryMessage message) throws Exception {
log.info("处理消息: messageId={}, businessId={}, type={}",
message.getMessageId(), message.getBusinessId(), message.getMessageType());

// 模拟业务处理
simulateBusinessLogic(message);

// 实际业务逻辑
// 例如:订单处理、支付回调、库存扣减等

log.info("消息处理完成: messageId={}", message.getMessageId());
}

/**
* 模拟业务逻辑
*/
private void simulateBusinessLogic(RetryMessage message) throws Exception {
// 模拟处理时间
Thread.sleep(100 + random.nextInt(400));

// 根据重试级别调整失败率
int failureRate;
switch (message.getRetryLevel()) {
case "FAST":
failureRate = 30; // 30%失败率
break;
case "MEDIUM":
failureRate = 20; // 20%失败率
break;
case "SLOW":
failureRate = 10; // 10%失败率
break;
default:
failureRate = 40;
}

// 模拟失败
if (random.nextInt(100) < failureRate) {
throw new RuntimeException("模拟业务处理失败 - " + message.getRetryLevel());
}
}
}

9. 控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package com.kafka.controller;

import com.kafka.delay.RedisDelayQueue;
import com.kafka.delay.TimingWheelDelayQueue;
import com.kafka.model.DelayMessage;
import com.kafka.model.RetryMessage;
import com.kafka.priority.PriorityQueueManager;
import com.kafka.producer.RetryProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.time.LocalDateTime;
import java.util.UUID;

/**
* Kafka重试队列控制器
* @author Java实战
*/
@Slf4j
@RestController
@RequestMapping("/api/retry")
public class RetryController {

@Autowired
private RetryProducer retryProducer;

@Autowired
private RedisDelayQueue redisDelayQueue;

@Autowired
private TimingWheelDelayQueue timingWheelDelayQueue;

@Autowired
private PriorityQueueManager priorityQueueManager;

@Value("${kafka.topics.main}")
private String mainTopic;

/**
* 发送重试消息
*/
@PostMapping("/send")
public ResponseEntity<String> sendRetryMessage(@RequestBody RetryMessage message) {
try {
// 设置默认值
if (message.getMessageId() == null) {
message.setMessageId(UUID.randomUUID().toString());
}
if (message.getCreateTime() == null) {
message.setCreateTime(LocalDateTime.now());
}
if (message.getRetryCount() == null) {
message.setRetryCount(0);
}
if (message.getRetryLevel() == null) {
message.setRetryLevel("FAST");
}

retryProducer.sendRetryMessage(message, mainTopic, 0);

return ResponseEntity.ok("重试消息发送成功: " + message.getMessageId());

} catch (Exception e) {
log.error("发送重试消息失败", e);
return ResponseEntity.internalServerError().body("发送失败: " + e.getMessage());
}
}

/**
* 发送延迟消息(Redis实现)
*/
@PostMapping("/delay/redis")
public ResponseEntity<String> sendDelayMessageByRedis(@RequestBody DelayMessage message) {
try {
if (message.getMessageId() == null) {
message.setMessageId(UUID.randomUUID().toString());
}
if (message.getTargetTopic() == null) {
message.setTargetTopic(mainTopic);
}

redisDelayQueue.addDelayMessage(message);

return ResponseEntity.ok("延迟消息添加成功: " + message.getMessageId());

} catch (Exception e) {
log.error("发送延迟消息失败", e);
return ResponseEntity.internalServerError().body("发送失败: " + e.getMessage());
}
}

/**
* 发送延迟消息(时间轮实现)
*/
@PostMapping("/delay/timing-wheel")
public ResponseEntity<String> sendDelayMessageByTimingWheel(
@RequestBody DelayMessage message,
@RequestParam long delaySeconds) {
try {
if (message.getMessageId() == null) {
message.setMessageId(UUID.randomUUID().toString());
}
if (message.getTargetTopic() == null) {
message.setTargetTopic(mainTopic);
}

timingWheelDelayQueue.addDelayMessage(message, delaySeconds);

return ResponseEntity.ok("延迟消息添加成功: " + message.getMessageId());

} catch (Exception e) {
log.error("发送延迟消息失败", e);
return ResponseEntity.internalServerError().body("发送失败: " + e.getMessage());
}
}

/**
* 添加优先级消息
*/
@PostMapping("/priority")
public ResponseEntity<String> addPriorityMessage(@RequestBody RetryMessage message) {
try {
if (message.getMessageId() == null) {
message.setMessageId(UUID.randomUUID().toString());
}

priorityQueueManager.addMessage(message);

return ResponseEntity.ok("优先级消息添加成功: " + message.getMessageId());

} catch (Exception e) {
log.error("添加优先级消息失败", e);
return ResponseEntity.internalServerError().body("添加失败: " + e.getMessage());
}
}

/**
* 批量处理优先级消息
*/
@PostMapping("/priority/process")
public ResponseEntity<String> processPriorityMessages(@RequestParam int batchSize) {
try {
priorityQueueManager.processBatch(mainTopic, batchSize);

return ResponseEntity.ok("批量处理完成");

} catch (Exception e) {
log.error("批量处理失败", e);
return ResponseEntity.internalServerError().body("处理失败: " + e.getMessage());
}
}

/**
* 获取优先级队列大小
*/
@GetMapping("/priority/size")
public ResponseEntity<Integer> getPriorityQueueSize() {
return ResponseEntity.ok(priorityQueueManager.getQueueSize());
}
}

10. 总结

Kafka重试队列是保障消息可靠性的重要机制。通过本文的详细介绍,我们了解了:

  1. 多级重试队列: 快速-中速-慢速三级重试策略
  2. 延迟消息处理: Redis延迟队列和时间轮两种实现方案
  3. 优先级队列: 基于优先级的消息处理机制
  4. 动态重试策略: 支持固定间隔和指数退避
  5. 完整的代码实现: 包含生产者、消费者、管理器的完整代码

通过合理的重试队列设计和实现,可以有效提升系统的容错能力和消息处理的灵活性。


Java实战要点:

  • 多级重试队列可以根据失败严重程度采用不同策略
  • 延迟消息处理适用于定时任务和延迟执行场景
  • 优先级队列确保重要消息优先处理
  • 时间轮算法适合大量延迟任务的场景
  • Redis延迟队列实现简单且可靠

代码注解说明:

  • PriorityBlockingQueue: 优先级阻塞队列
  • RDelayedQueue: Redisson延迟队列
  • ScheduledExecutorService: 定时任务执行器
  • 时间轮算法: 使用环形数组实现O(1)时间复杂度
  • 指数退避: 重试间隔随重试次数指数增长