1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| package com.kafka.producer;
import com.kafka.model.RetryMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.TimeUnit;
@Slf4j @Service public class RetryProducer { @Autowired private KafkaTemplate<String, RetryMessage> kafkaTemplate;
public void sendRetryMessage(RetryMessage message, String topic, long delayMs) { try { log.info("发送重试消息: topic={}, messageId={}, businessId={}, delay={}ms", topic, message.getMessageId(), message.getBusinessId(), delayMs); if (delayMs > 0) { scheduleDelayedMessage(message, topic, delayMs); return; } ListenableFuture<SendResult<String, RetryMessage>> future = kafkaTemplate.send(topic, message.getBusinessId(), message); future.addCallback(new ListenableFutureCallback<SendResult<String, RetryMessage>>() { @Override public void onSuccess(SendResult<String, RetryMessage> result) { log.info("重试消息发送成功: messageId={}, topic={}, partition={}, offset={}", message.getMessageId(), result.getRecordMetadata().topic(), result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } @Override public void onFailure(Throwable ex) { log.error("重试消息发送失败: messageId={}, error={}", message.getMessageId(), ex.getMessage(), ex); } }); } catch (Exception e) { log.error("发送重试消息异常: messageId={}", message.getMessageId(), e); } }
private void scheduleDelayedMessage(RetryMessage message, String topic, long delayMs) { new Thread(() -> { try { TimeUnit.MILLISECONDS.sleep(delayMs); kafkaTemplate.send(topic, message.getBusinessId(), message); log.info("延迟消息已发送: messageId={}, delay={}ms", message.getMessageId(), delayMs); } catch (Exception e) { log.error("延迟消息发送失败: messageId={}", message.getMessageId(), e); } }).start(); }
public void sendToDLQ(RetryMessage message, String dlqTopic) { try { log.error("发送到死信队列: messageId={}, businessId={}, errorCount={}", message.getMessageId(), message.getBusinessId(), message.getErrorCount()); kafkaTemplate.send(dlqTopic, message.getBusinessId(), message); } catch (Exception e) { log.error("发送到死信队列失败: messageId={}", message.getMessageId(), e); } } }
|