1. 高峰流量与延时任务概述

在高并发场景下,系统需要处理大量瞬时请求,同时还要管理各种延时任务。Kafka作为高吞吐量的消息队列,能够有效承接高峰流量;Redisson DelayQueue则提供了可靠的延时任务处理能力。本文将详细介绍两者的结合使用方案。

1.1 核心挑战

  1. 高峰流量: 瞬时大量请求需要快速处理
  2. 延时任务: 需要延迟执行的任务管理
  3. 任务调度: 精确的任务时间控制
  4. 系统稳定性: 保证系统在高负载下的稳定运行

1.2 技术架构

1
2
3
4
5
高峰请求 → Kafka消息队列 → 异步处理 → 业务逻辑
↓ ↓ ↓ ↓
流量削峰 → 消息缓冲 → 任务分发 → 结果处理
↓ ↓ ↓ ↓
延时任务 → Redisson DelayQueue → 定时执行 → 任务完成

2. Kafka高峰流量处理

2.1 Kafka配置优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* Kafka高峰流量处理配置
*/
@Configuration
public class KafkaPeakHandlingConfig {

/**
* 高峰流量生产者配置
*/
@Bean
public ProducerFactory<String, String> peakProducerFactory() {
Map<String, Object> configProps = new HashMap<>();

// 基础配置
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

// 高峰流量优化配置
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB批量大小
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 5ms延迟
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // LZ4压缩
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 134217728); // 128MB缓冲区
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

// 可靠性配置
configProps.put(ProducerConfig.ACKS_CONFIG, "1"); // 等待leader确认
configProps.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试3次
configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100); // 重试间隔

// 超时配置
configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

return new DefaultKafkaProducerFactory<>(configProps);
}

/**
* 高峰流量消费者配置
*/
@Bean
public ConsumerFactory<String, String> peakConsumerFactory() {
Map<String, Object> configProps = new HashMap<>();

configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "peak-handling-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

// 高峰流量消费优化
configProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 最小拉取1KB
configProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 最大等待500ms
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); // 每次拉取1000条
configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5分钟拉取间隔

// 自动提交配置
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

// 会话配置
configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
configProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

return new DefaultKafkaConsumerFactory<>(configProps);
}

/**
* 高峰流量KafkaTemplate
*/
@Bean
public KafkaTemplate<String, String> peakKafkaTemplate() {
KafkaTemplate<String, String> template = new KafkaTemplate<>(peakProducerFactory());
template.setDefaultTopic("peak-traffic");
return template;
}
}

2.2 高峰流量处理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/**
* 高峰流量处理服务
*/
@Service
public class PeakTrafficHandlingService {

@Autowired
private KafkaTemplate<String, String> peakKafkaTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private static final String PEAK_TRAFFIC_TOPIC = "peak-traffic";
private static final String RATE_LIMIT_PREFIX = "rate:limit:";

/**
* 处理高峰请求
* @param request 请求数据
* @return 处理结果
*/
public PeakHandlingResult handlePeakRequest(PeakRequest request) {
try {
// 1. 限流检查
if (!checkRateLimit(request.getUserId())) {
return PeakHandlingResult.rateLimited("请求过于频繁");
}

// 2. 生成请求ID
String requestId = generateRequestId();
request.setRequestId(requestId);

// 3. 发送到Kafka异步处理
PeakMessage message = PeakMessage.builder()
.requestId(requestId)
.userId(request.getUserId())
.requestType(request.getRequestType())
.requestData(request.getRequestData())
.timestamp(System.currentTimeMillis())
.priority(request.getPriority())
.build();

// 4. 根据优先级选择分区
int partition = calculatePartition(request.getPriority());

// 5. 发送消息
ListenableFuture<SendResult<String, String>> future = peakKafkaTemplate.send(
PEAK_TRAFFIC_TOPIC,
partition,
requestId,
JSON.toJSONString(message)
);

// 6. 添加回调处理
future.addCallback(
result -> log.info("高峰请求发送成功: requestId={}", requestId),
failure -> log.error("高峰请求发送失败: requestId={}", requestId, failure)
);

// 7. 缓存请求状态
cacheRequestStatus(requestId, "PROCESSING");

return PeakHandlingResult.success(requestId, "请求已提交处理");

} catch (Exception e) {
log.error("处理高峰请求失败: userId={}", request.getUserId(), e);
return PeakHandlingResult.error("处理失败: " + e.getMessage());
}
}

/**
* 限流检查
* @param userId 用户ID
* @return 是否通过限流
*/
private boolean checkRateLimit(String userId) {
String rateLimitKey = RATE_LIMIT_PREFIX + userId;

try {
// 使用滑动窗口限流
String script =
"local key = KEYS[1] " +
"local window = tonumber(ARGV[1]) " +
"local limit = tonumber(ARGV[2]) " +
"local now = tonumber(ARGV[3]) " +
"redis.call('ZREMRANGEBYSCORE', key, 0, now - window) " +
"local count = redis.call('ZCARD', key) " +
"if count < limit then " +
" redis.call('ZADD', key, now, now) " +
" redis.call('EXPIRE', key, window) " +
" return 1 " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript,
Collections.singletonList(rateLimitKey),
String.valueOf(60000), // 1分钟窗口
String.valueOf(100), // 限制100次
String.valueOf(System.currentTimeMillis())
);

return result != null && result == 1;

} catch (Exception e) {
log.error("限流检查失败: userId={}", userId, e);
return true; // 限流失败时允许通过
}
}

/**
* 计算分区
* @param priority 优先级
* @return 分区号
*/
private int calculatePartition(int priority) {
// 根据优先级分配分区
if (priority >= 8) {
return 0; // 高优先级分区
} else if (priority >= 5) {
return 1; // 中优先级分区
} else {
return 2; // 低优先级分区
}
}

/**
* 生成请求ID
* @return 请求ID
*/
private String generateRequestId() {
return "req_" + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().substring(0, 8);
}

/**
* 缓存请求状态
* @param requestId 请求ID
* @param status 状态
*/
private void cacheRequestStatus(String requestId, String status) {
try {
String statusKey = "request:status:" + requestId;
RequestStatus requestStatus = new RequestStatus(requestId, status, System.currentTimeMillis());
redisTemplate.opsForValue().set(statusKey, requestStatus, Duration.ofHours(1));
} catch (Exception e) {
log.error("缓存请求状态失败: requestId={}", requestId, e);
}
}
}

/**
* 高峰请求实体
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PeakRequest {
private String userId;
private String requestType;
private String requestData;
private int priority;
private String requestId;
}

/**
* 高峰消息实体
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PeakMessage {
private String requestId;
private String userId;
private String requestType;
private String requestData;
private long timestamp;
private int priority;
}

/**
* 高峰处理结果
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PeakHandlingResult {
private boolean success;
private String requestId;
private String message;

public static PeakHandlingResult success(String requestId, String message) {
return new PeakHandlingResult(true, requestId, message);
}

public static PeakHandlingResult error(String message) {
return new PeakHandlingResult(false, null, message);
}

public static PeakHandlingResult rateLimited(String message) {
return new PeakHandlingResult(false, null, message);
}
}

/**
* 请求状态
*/
@Data
@AllArgsConstructor
public class RequestStatus {
private String requestId;
private String status;
private long timestamp;
}

2.3 高峰流量消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
/**
* 高峰流量消费者
*/
@Component
public class PeakTrafficConsumer {

private static final Logger log = LoggerFactory.getLogger(PeakTrafficConsumer.class);

@Autowired
private PeakRequestProcessor peakRequestProcessor;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 消费高峰流量消息
*/
@KafkaListener(topics = "peak-traffic", groupId = "peak-handling-group")
public void handlePeakTraffic(ConsumerRecord<String, String> record) {
try {
// 1. 解析消息
PeakMessage message = JSON.parseObject(record.value(), PeakMessage.class);

log.info("收到高峰流量消息: requestId={}, userId={}, priority={}",
message.getRequestId(), message.getUserId(), message.getPriority());

// 2. 处理请求
PeakProcessResult result = peakRequestProcessor.processRequest(message);

// 3. 更新请求状态
updateRequestStatus(message.getRequestId(), result);

// 4. 手动提交offset
// 这里需要根据实际使用的Kafka客户端版本进行配置

} catch (Exception e) {
log.error("处理高峰流量消息失败: {}", record.value(), e);
}
}

/**
* 更新请求状态
* @param requestId 请求ID
* @param result 处理结果
*/
private void updateRequestStatus(String requestId, PeakProcessResult result) {
try {
String statusKey = "request:status:" + requestId;
String status = result.isSuccess() ? "COMPLETED" : "FAILED";

RequestStatus requestStatus = new RequestStatus(requestId, status, System.currentTimeMillis());
redisTemplate.opsForValue().set(statusKey, requestStatus, Duration.ofHours(1));

} catch (Exception e) {
log.error("更新请求状态失败: requestId={}", requestId, e);
}
}
}

/**
* 高峰请求处理器
*/
@Service
public class PeakRequestProcessor {

private static final Logger log = LoggerFactory.getLogger(PeakRequestProcessor.class);

@Autowired
private BusinessService businessService;

/**
* 处理请求
* @param message 消息
* @return 处理结果
*/
public PeakProcessResult processRequest(PeakMessage message) {
try {
log.info("开始处理高峰请求: requestId={}, type={}",
message.getRequestId(), message.getRequestType());

// 根据请求类型处理
switch (message.getRequestType()) {
case "ORDER":
return processOrderRequest(message);
case "PAYMENT":
return processPaymentRequest(message);
case "NOTIFICATION":
return processNotificationRequest(message);
default:
return PeakProcessResult.error("未知请求类型: " + message.getRequestType());
}

} catch (Exception e) {
log.error("处理高峰请求失败: requestId={}", message.getRequestId(), e);
return PeakProcessResult.error("处理失败: " + e.getMessage());
}
}

/**
* 处理订单请求
*/
private PeakProcessResult processOrderRequest(PeakMessage message) {
try {
// 调用业务服务处理订单
OrderResult result = businessService.processOrder(message.getRequestData());

if (result.isSuccess()) {
return PeakProcessResult.success("订单处理成功");
} else {
return PeakProcessResult.error("订单处理失败: " + result.getMessage());
}

} catch (Exception e) {
log.error("处理订单请求失败: requestId={}", message.getRequestId(), e);
return PeakProcessResult.error("订单处理异常: " + e.getMessage());
}
}

/**
* 处理支付请求
*/
private PeakProcessResult processPaymentRequest(PeakMessage message) {
try {
// 调用业务服务处理支付
PaymentResult result = businessService.processPayment(message.getRequestData());

if (result.isSuccess()) {
return PeakProcessResult.success("支付处理成功");
} else {
return PeakProcessResult.error("支付处理失败: " + result.getMessage());
}

} catch (Exception e) {
log.error("处理支付请求失败: requestId={}", message.getRequestId(), e);
return PeakProcessResult.error("支付处理异常: " + e.getMessage());
}
}

/**
* 处理通知请求
*/
private PeakProcessResult processNotificationRequest(PeakMessage message) {
try {
// 调用业务服务发送通知
NotificationResult result = businessService.sendNotification(message.getRequestData());

if (result.isSuccess()) {
return PeakProcessResult.success("通知发送成功");
} else {
return PeakProcessResult.error("通知发送失败: " + result.getMessage());
}

} catch (Exception e) {
log.error("处理通知请求失败: requestId={}", message.getRequestId(), e);
return PeakProcessResult.error("通知发送异常: " + e.getMessage());
}
}
}

/**
* 高峰处理结果
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PeakProcessResult {
private boolean success;
private String message;

public static PeakProcessResult success(String message) {
return new PeakProcessResult(true, message);
}

public static PeakProcessResult error(String message) {
return new PeakProcessResult(false, message);
}
}

3. Redisson DelayQueue延时任务

3.1 Redisson配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Redisson配置
*/
@Configuration
public class RedissonConfig {

@Value("${redis.host:localhost}")
private String redisHost;

@Value("${redis.port:6379}")
private int redisPort;

@Value("${redis.password:}")
private String redisPassword;

@Value("${redis.database:0}")
private int redisDatabase;

/**
* Redisson客户端配置
*/
@Bean
public RedissonClient redissonClient() {
Config config = new Config();

// 单节点配置
String address = "redis://" + redisHost + ":" + redisPort;
config.useSingleServer()
.setAddress(address)
.setPassword(StringUtils.hasText(redisPassword) ? redisPassword : null)
.setDatabase(redisDatabase)
.setConnectionPoolSize(64) // 连接池大小
.setConnectionMinimumIdleSize(10) // 最小空闲连接
.setIdleConnectionTimeout(10000) // 空闲连接超时
.setConnectTimeout(10000) // 连接超时
.setTimeout(3000) // 命令超时
.setRetryAttempts(3) // 重试次数
.setRetryInterval(1500); // 重试间隔

// 序列化配置
config.setCodec(new JsonJacksonCodec());

return Redisson.create(config);
}

/**
* 延时队列配置
*/
@Bean
public RDelayedQueue<String> delayedQueue(RedissonClient redissonClient) {
RQueue<String> queue = redissonClient.getQueue("delay-queue");
return redissonClient.getDelayedQueue(queue);
}
}

3.2 延时任务管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
/**
* 延时任务管理服务
*/
@Service
public class DelayTaskManagementService {

@Autowired
private RedissonClient redissonClient;

@Autowired
private DelayTaskProcessor delayTaskProcessor;

private static final String DELAY_QUEUE_NAME = "delay-queue";
private static final String TASK_PREFIX = "delay:task:";

/**
* 添加延时任务
* @param task 任务
* @param delayTime 延时时间(毫秒)
* @return 任务ID
*/
public String addDelayTask(DelayTask task, long delayTime) {
try {
// 生成任务ID
String taskId = generateTaskId();
task.setTaskId(taskId);
task.setCreateTime(System.currentTimeMillis());
task.setExecuteTime(System.currentTimeMillis() + delayTime);

// 添加到延时队列
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(
redissonClient.getQueue(DELAY_QUEUE_NAME)
);

delayedQueue.offer(JSON.toJSONString(task), delayTime, TimeUnit.MILLISECONDS);

// 缓存任务信息
cacheTaskInfo(task);

log.info("添加延时任务成功: taskId={}, delayTime={}ms, executeTime={}",
taskId, delayTime, new Date(task.getExecuteTime()));

return taskId;

} catch (Exception e) {
log.error("添加延时任务失败: task={}", task, e);
throw new RuntimeException("添加延时任务失败", e);
}
}

/**
* 添加延时任务(指定执行时间)
* @param task 任务
* @param executeTime 执行时间
* @return 任务ID
*/
public String addDelayTaskAt(DelayTask task, LocalDateTime executeTime) {
long delayTime = ChronoUnit.MILLIS.between(LocalDateTime.now(), executeTime);

if (delayTime <= 0) {
throw new IllegalArgumentException("执行时间不能早于当前时间");
}

return addDelayTask(task, delayTime);
}

/**
* 取消延时任务
* @param taskId 任务ID
* @return 是否取消成功
*/
public boolean cancelDelayTask(String taskId) {
try {
// 从缓存中获取任务信息
DelayTask task = getTaskInfo(taskId);
if (task == null) {
log.warn("任务不存在: taskId={}", taskId);
return false;
}

// 更新任务状态
task.setStatus("CANCELLED");
task.setCancelTime(System.currentTimeMillis());

// 更新缓存
cacheTaskInfo(task);

log.info("取消延时任务成功: taskId={}", taskId);
return true;

} catch (Exception e) {
log.error("取消延时任务失败: taskId={}", taskId, e);
return false;
}
}

/**
* 获取任务信息
* @param taskId 任务ID
* @return 任务信息
*/
public DelayTask getTaskInfo(String taskId) {
try {
String taskKey = TASK_PREFIX + taskId;
RBucket<String> bucket = redissonClient.getBucket(taskKey);
String taskJson = bucket.get();

if (taskJson != null) {
return JSON.parseObject(taskJson, DelayTask.class);
}

} catch (Exception e) {
log.error("获取任务信息失败: taskId={}", taskId, e);
}

return null;
}

/**
* 获取待执行任务列表
* @return 任务列表
*/
public List<DelayTask> getPendingTasks() {
List<DelayTask> pendingTasks = new ArrayList<>();

try {
// 获取所有任务键
RKeys keys = redissonClient.getKeys();
Iterable<String> taskKeys = keys.getKeysByPattern(TASK_PREFIX + "*");

for (String taskKey : taskKeys) {
RBucket<String> bucket = redissonClient.getBucket(taskKey);
String taskJson = bucket.get();

if (taskJson != null) {
DelayTask task = JSON.parseObject(taskJson, DelayTask.class);
if ("PENDING".equals(task.getStatus())) {
pendingTasks.add(task);
}
}
}

} catch (Exception e) {
log.error("获取待执行任务列表失败", e);
}

return pendingTasks;
}

/**
* 生成任务ID
* @return 任务ID
*/
private String generateTaskId() {
return "task_" + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().substring(0, 8);
}

/**
* 缓存任务信息
* @param task 任务
*/
private void cacheTaskInfo(DelayTask task) {
try {
String taskKey = TASK_PREFIX + task.getTaskId();
RBucket<String> bucket = redissonClient.getBucket(taskKey);
bucket.set(JSON.toJSONString(task), Duration.ofDays(7)); // 缓存7天

} catch (Exception e) {
log.error("缓存任务信息失败: taskId={}", task.getTaskId(), e);
}
}
}

/**
* 延时任务实体
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DelayTask {
private String taskId;
private String taskType;
private String taskData;
private String status;
private long createTime;
private long executeTime;
private long cancelTime;
private int retryCount;
private String errorMessage;
}

3.3 延时任务处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
/**
* 延时任务处理器
*/
@Component
public class DelayTaskProcessor {

private static final Logger log = LoggerFactory.getLogger(DelayTaskProcessor.class);

@Autowired
private DelayTaskManagementService delayTaskManagementService;

@Autowired
private BusinessService businessService;

/**
* 处理延时任务
* @param taskJson 任务JSON
*/
public void processDelayTask(String taskJson) {
try {
// 解析任务
DelayTask task = JSON.parseObject(taskJson, DelayTask.class);

log.info("开始处理延时任务: taskId={}, type={}", task.getTaskId(), task.getTaskType());

// 检查任务状态
if (!"PENDING".equals(task.getStatus())) {
log.warn("任务状态不是待执行: taskId={}, status={}", task.getTaskId(), task.getStatus());
return;
}

// 更新任务状态
updateTaskStatus(task.getTaskId(), "PROCESSING");

// 根据任务类型处理
boolean success = processTaskByType(task);

if (success) {
updateTaskStatus(task.getTaskId(), "COMPLETED");
log.info("延时任务处理完成: taskId={}", task.getTaskId());
} else {
handleTaskFailure(task);
}

} catch (Exception e) {
log.error("处理延时任务失败: taskJson={}", taskJson, e);
}
}

/**
* 根据任务类型处理
* @param task 任务
* @return 是否处理成功
*/
private boolean processTaskByType(DelayTask task) {
try {
switch (task.getTaskType()) {
case "ORDER_TIMEOUT":
return processOrderTimeoutTask(task);
case "PAYMENT_TIMEOUT":
return processPaymentTimeoutTask(task);
case "NOTIFICATION_DELAY":
return processNotificationDelayTask(task);
case "DATA_CLEANUP":
return processDataCleanupTask(task);
case "REPORT_GENERATION":
return processReportGenerationTask(task);
default:
log.warn("未知任务类型: taskId={}, type={}", task.getTaskId(), task.getTaskType());
return false;
}
} catch (Exception e) {
log.error("处理任务失败: taskId={}, type={}", task.getTaskId(), task.getTaskType(), e);
return false;
}
}

/**
* 处理订单超时任务
*/
private boolean processOrderTimeoutTask(DelayTask task) {
try {
OrderTimeoutData data = JSON.parseObject(task.getTaskData(), OrderTimeoutData.class);

// 检查订单状态
OrderStatus status = businessService.getOrderStatus(data.getOrderId());

if (status == OrderStatus.PENDING) {
// 取消订单
businessService.cancelOrder(data.getOrderId(), "订单超时自动取消");
log.info("订单超时取消成功: orderId={}", data.getOrderId());
return true;
} else {
log.info("订单状态已变更,无需处理: orderId={}, status={}", data.getOrderId(), status);
return true;
}

} catch (Exception e) {
log.error("处理订单超时任务失败: taskId={}", task.getTaskId(), e);
return false;
}
}

/**
* 处理支付超时任务
*/
private boolean processPaymentTimeoutTask(DelayTask task) {
try {
PaymentTimeoutData data = JSON.parseObject(task.getTaskData(), PaymentTimeoutData.class);

// 检查支付状态
PaymentStatus status = businessService.getPaymentStatus(data.getPaymentId());

if (status == PaymentStatus.PENDING) {
// 取消支付
businessService.cancelPayment(data.getPaymentId(), "支付超时自动取消");
log.info("支付超时取消成功: paymentId={}", data.getPaymentId());
return true;
} else {
log.info("支付状态已变更,无需处理: paymentId={}, status={}", data.getPaymentId(), status);
return true;
}

} catch (Exception e) {
log.error("处理支付超时任务失败: taskId={}", task.getTaskId(), e);
return false;
}
}

/**
* 处理延时通知任务
*/
private boolean processNotificationDelayTask(DelayTask task) {
try {
NotificationData data = JSON.parseObject(task.getTaskData(), NotificationData.class);

// 发送通知
NotificationResult result = businessService.sendNotification(data);

if (result.isSuccess()) {
log.info("延时通知发送成功: taskId={}", task.getTaskId());
return true;
} else {
log.error("延时通知发送失败: taskId={}, error={}", task.getTaskId(), result.getMessage());
return false;
}

} catch (Exception e) {
log.error("处理延时通知任务失败: taskId={}", task.getTaskId(), e);
return false;
}
}

/**
* 处理数据清理任务
*/
private boolean processDataCleanupTask(DelayTask task) {
try {
DataCleanupData data = JSON.parseObject(task.getTaskData(), DataCleanupData.class);

// 执行数据清理
CleanupResult result = businessService.cleanupData(data);

if (result.isSuccess()) {
log.info("数据清理完成: taskId={}, cleanedCount={}", task.getTaskId(), result.getCleanedCount());
return true;
} else {
log.error("数据清理失败: taskId={}, error={}", task.getTaskId(), result.getMessage());
return false;
}

} catch (Exception e) {
log.error("处理数据清理任务失败: taskId={}", task.getTaskId(), e);
return false;
}
}

/**
* 处理报表生成任务
*/
private boolean processReportGenerationTask(DelayTask task) {
try {
ReportData data = JSON.parseObject(task.getTaskData(), ReportData.class);

// 生成报表
ReportResult result = businessService.generateReport(data);

if (result.isSuccess()) {
log.info("报表生成完成: taskId={}, reportId={}", task.getTaskId(), result.getReportId());
return true;
} else {
log.error("报表生成失败: taskId={}, error={}", task.getTaskId(), result.getMessage());
return false;
}

} catch (Exception e) {
log.error("处理报表生成任务失败: taskId={}", task.getTaskId(), e);
return false;
}
}

/**
* 处理任务失败
* @param task 任务
*/
private void handleTaskFailure(DelayTask task) {
try {
// 增加重试次数
task.setRetryCount(task.getRetryCount() + 1);

if (task.getRetryCount() < 3) {
// 重试任务
long retryDelay = calculateRetryDelay(task.getRetryCount());
delayTaskManagementService.addDelayTask(task, retryDelay);

log.info("任务重试: taskId={}, retryCount={}, retryDelay={}ms",
task.getTaskId(), task.getRetryCount(), retryDelay);
} else {
// 标记为失败
updateTaskStatus(task.getTaskId(), "FAILED");
log.error("任务重试次数超限: taskId={}", task.getTaskId());
}

} catch (Exception e) {
log.error("处理任务失败异常: taskId={}", task.getTaskId(), e);
}
}

/**
* 计算重试延迟
* @param retryCount 重试次数
* @return 延迟时间(毫秒)
*/
private long calculateRetryDelay(int retryCount) {
// 指数退避策略
return (long) Math.pow(2, retryCount) * 60000; // 1分钟、2分钟、4分钟
}

/**
* 更新任务状态
* @param taskId 任务ID
* @param status 状态
*/
private void updateTaskStatus(String taskId, String status) {
try {
DelayTask task = delayTaskManagementService.getTaskInfo(taskId);
if (task != null) {
task.setStatus(status);
// 这里需要更新缓存,简化处理
}
} catch (Exception e) {
log.error("更新任务状态失败: taskId={}, status={}", taskId, status, e);
}
}
}

4. 任务调度和监控

4.1 延时任务调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* 延时任务调度器
*/
@Component
public class DelayTaskScheduler {

private static final Logger log = LoggerFactory.getLogger(DelayTaskScheduler.class);

@Autowired
private RedissonClient redissonClient;

@Autowired
private DelayTaskProcessor delayTaskProcessor;

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

@PostConstruct
public void startScheduler() {
// 启动延时任务调度
scheduler.scheduleWithFixedDelay(this::processDelayQueue, 0, 1, TimeUnit.SECONDS);

log.info("延时任务调度器启动成功");
}

/**
* 处理延时队列
*/
private void processDelayQueue() {
try {
RQueue<String> queue = redissonClient.getQueue("delay-queue");

// 批量处理任务
for (int i = 0; i < 100; i++) {
String taskJson = queue.poll();
if (taskJson == null) {
break;
}

// 异步处理任务
CompletableFuture.runAsync(() -> {
try {
delayTaskProcessor.processDelayTask(taskJson);
} catch (Exception e) {
log.error("处理延时任务异常: taskJson={}", taskJson, e);
}
});
}

} catch (Exception e) {
log.error("处理延时队列异常", e);
}
}

@PreDestroy
public void shutdown() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

4.2 任务监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/**
* 任务监控服务
*/
@Service
public class TaskMonitoringService {

@Autowired
private DelayTaskManagementService delayTaskManagementService;

@Autowired
private RedissonClient redissonClient;

private final MeterRegistry meterRegistry;
private final Counter delayTaskCounter;
private final Timer delayTaskTimer;

public TaskMonitoringService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.delayTaskCounter = Counter.builder("delay.task.total")
.description("延时任务总数")
.register(meterRegistry);
this.delayTaskTimer = Timer.builder("delay.task.duration")
.description("延时任务执行时间")
.register(meterRegistry);
}

/**
* 获取任务统计信息
* @return 统计信息
*/
public TaskStatistics getTaskStatistics() {
TaskStatistics statistics = new TaskStatistics();

try {
// 获取队列长度
RQueue<String> queue = redissonClient.getQueue("delay-queue");
statistics.setQueueSize(queue.size());

// 获取待执行任务数
List<DelayTask> pendingTasks = delayTaskManagementService.getPendingTasks();
statistics.setPendingTaskCount(pendingTasks.size());

// 获取任务统计
statistics.setTotalTasks(delayTaskCounter.count());
statistics.setAverageExecutionTime(delayTaskTimer.mean(TimeUnit.MILLISECONDS));

} catch (Exception e) {
log.error("获取任务统计信息失败", e);
}

return statistics;
}

/**
* 检查任务健康状态
* @return 健康状态报告
*/
public TaskHealthReport checkTaskHealth() {
TaskHealthReport report = new TaskHealthReport();

try {
// 检查队列健康状态
RQueue<String> queue = redissonClient.getQueue("delay-queue");
int queueSize = queue.size();

if (queueSize > 10000) {
report.setQueueHealthy(false);
report.addWarning("队列积压严重: " + queueSize);
} else {
report.setQueueHealthy(true);
}

// 检查任务执行成功率
double successRate = calculateTaskSuccessRate();
report.setTaskSuccessRate(successRate);

if (successRate < 0.9) {
report.addWarning("任务执行成功率过低: " + successRate);
}

// 计算整体健康状态
boolean overallHealthy = report.isQueueHealthy() && successRate > 0.9;
report.setOverallHealthy(overallHealthy);

} catch (Exception e) {
log.error("检查任务健康状态失败", e);
report.setOverallHealthy(false);
report.addError("健康检查失败: " + e.getMessage());
}

return report;
}

/**
* 计算任务执行成功率
* @return 成功率
*/
private double calculateTaskSuccessRate() {
// 这里可以实现更复杂的成功率计算逻辑
return 0.95; // 示例值
}
}

/**
* 任务统计信息
*/
@Data
public class TaskStatistics {
private int queueSize;
private int pendingTaskCount;
private double totalTasks;
private double averageExecutionTime;
private LocalDateTime timestamp;

public TaskStatistics() {
this.timestamp = LocalDateTime.now();
}
}

/**
* 任务健康报告
*/
@Data
public class TaskHealthReport {
private boolean overallHealthy;
private boolean queueHealthy;
private double taskSuccessRate;
private List<String> warnings = new ArrayList<>();
private List<String> errors = new ArrayList<>();
private LocalDateTime checkTime;

public TaskHealthReport() {
this.checkTime = LocalDateTime.now();
}

public void addWarning(String warning) {
this.warnings.add(warning);
}

public void addError(String error) {
this.errors.add(error);
}
}

5. 实际应用示例

5.1 订单超时处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
* 订单超时处理服务
*/
@Service
public class OrderTimeoutService {

@Autowired
private DelayTaskManagementService delayTaskManagementService;

/**
* 创建订单超时任务
* @param orderId 订单ID
* @param timeoutMinutes 超时时间(分钟)
* @return 任务ID
*/
public String createOrderTimeoutTask(String orderId, int timeoutMinutes) {
try {
// 创建订单超时数据
OrderTimeoutData timeoutData = new OrderTimeoutData();
timeoutData.setOrderId(orderId);
timeoutData.setTimeoutMinutes(timeoutMinutes);

// 创建延时任务
DelayTask task = new DelayTask();
task.setTaskType("ORDER_TIMEOUT");
task.setTaskData(JSON.toJSONString(timeoutData));
task.setStatus("PENDING");

// 添加延时任务
long delayTime = timeoutMinutes * 60 * 1000L; // 转换为毫秒
String taskId = delayTaskManagementService.addDelayTask(task, delayTime);

log.info("创建订单超时任务成功: orderId={}, timeoutMinutes={}, taskId={}",
orderId, timeoutMinutes, taskId);

return taskId;

} catch (Exception e) {
log.error("创建订单超时任务失败: orderId={}, timeoutMinutes={}", orderId, timeoutMinutes, e);
throw new RuntimeException("创建订单超时任务失败", e);
}
}

/**
* 取消订单超时任务
* @param orderId 订单ID
* @return 是否取消成功
*/
public boolean cancelOrderTimeoutTask(String orderId) {
try {
// 查找相关的超时任务
List<DelayTask> pendingTasks = delayTaskManagementService.getPendingTasks();

for (DelayTask task : pendingTasks) {
if ("ORDER_TIMEOUT".equals(task.getTaskType())) {
OrderTimeoutData data = JSON.parseObject(task.getTaskData(), OrderTimeoutData.class);
if (orderId.equals(data.getOrderId())) {
return delayTaskManagementService.cancelDelayTask(task.getTaskId());
}
}
}

log.warn("未找到订单超时任务: orderId={}", orderId);
return false;

} catch (Exception e) {
log.error("取消订单超时任务失败: orderId={}", orderId, e);
return false;
}
}
}

/**
* 订单超时数据
*/
@Data
public class OrderTimeoutData {
private String orderId;
private int timeoutMinutes;
}

6. 总结

通过Kafka和Redisson DelayQueue的结合使用,我们成功构建了一个完整的高峰流量处理和延时任务管理系统。关键特性包括:

6.1 核心优势

  1. 高峰流量处理: Kafka有效承接高峰流量,提供缓冲和削峰能力
  2. 延时任务管理: Redisson DelayQueue提供可靠的延时任务处理
  3. 任务调度: 精确的任务时间控制和调度
  4. 监控告警: 完善的任务监控和健康检查
  5. 系统稳定性: 保证系统在高负载下的稳定运行

6.2 最佳实践

  1. 流量控制: 合理的限流策略和优先级处理
  2. 任务管理: 完善的任务状态跟踪和重试机制
  3. 异常处理: 完善的异常处理和错误恢复
  4. 监控告警: 实时监控任务状态和系统健康
  5. 资源管理: 合理的资源分配和性能优化

这套方案不仅能够有效处理高峰流量,还为延时任务提供了可靠的执行保障,是现代分布式系统的重要基础设施。