1. Kafka消息队列概述

Kafka是一个分布式流处理平台,具有高吞吐量、低延迟、可扩展性等特点,广泛应用于大数据处理、实时数据流、事件驱动架构等场景。本文将详细介绍Kafka消息队列的配置、生产者消费者实现、消息分区、集群管理和监控告警的完整解决方案。

1.1 核心功能

  1. 消息生产: 消息生产者发送数据到Kafka
  2. 消息消费: 消息消费者从Kafka读取数据
  3. 消息分区: 消息分区和负载均衡
  4. 集群管理: Kafka集群部署和管理
  5. 监控告警: 集群状态监控和告警

1.2 技术架构

1
2
3
4
5
生产者 → Kafka集群 → 消费者
↓ ↓ ↓
消息发送 → 消息存储 → 消息处理
↓ ↓ ↓
分区策略 → 副本机制 → 消费组

2. Kafka配置

2.1 Kafka配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/**
* Kafka配置类
*/
@Configuration
@EnableKafka
public class KafkaConfig {

@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${kafka.group-id}")
private String groupId;

@Value("${kafka.auto-offset-reset}")
private String autoOffsetReset;

@Value("${kafka.enable-auto-commit}")
private boolean enableAutoCommit;

@Value("${kafka.session-timeout}")
private int sessionTimeout;

/**
* Kafka配置属性
*/
@Bean
public KafkaProperties kafkaProperties() {
return KafkaProperties.builder()
.bootstrapServers(bootstrapServers)
.groupId(groupId)
.autoOffsetReset(autoOffsetReset)
.enableAutoCommit(enableAutoCommit)
.sessionTimeout(sessionTimeout)
.build();
}

/**
* 生产者配置
*/
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

return new DefaultKafkaProducerFactory<>(configProps);
}

/**
* KafkaTemplate
*/
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

/**
* 消费者配置
*/
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

return new DefaultKafkaConsumerFactory<>(configProps);
}

/**
* 消费者容器工厂
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);

return factory;
}
}

/**
* Kafka配置属性
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class KafkaProperties {
private String bootstrapServers;
private String groupId;
private String autoOffsetReset;
private boolean enableAutoCommit;
private int sessionTimeout;

// 生产者配置
private int retries = 3;
private int batchSize = 16384;
private int lingerMs = 1;
private int bufferMemory = 33554432;

// 消费者配置
private int maxPollRecords = 500;
private int maxPollIntervalMs = 300000;
private int concurrency = 3;
private int pollTimeout = 3000;
}

2.2 应用配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# application.yml
kafka:
bootstrap-servers: localhost:9092
group-id: my-group
auto-offset-reset: earliest
enable-auto-commit: false
session-timeout: 30000

# 消息队列配置
message:
queue:
topics:
- user-events
- order-events
- payment-events
partitions: 3
replication-factor: 1
producer:
retries: 3
batch-size: 16384
linger-ms: 1
consumer:
max-poll-records: 500
max-poll-interval-ms: 300000
concurrency: 3

3. Kafka生产者服务

3.1 Kafka生产者服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
/**
* Kafka生产者服务
*/
@Service
public class KafkaProducerService {

private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;

public KafkaProducerService(KafkaTemplate<String, Object> kafkaTemplate,
ObjectMapper objectMapper) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
}

/**
* 发送消息
* @param topic 主题
* @param message 消息
* @return 发送结果
*/
public SendResult<String, Object> sendMessage(String topic, Object message) {
try {
// 1. 序列化消息
String messageJson = objectMapper.writeValueAsString(message);

// 2. 发送消息
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(topic, messageJson);

// 3. 获取发送结果
SendResult<String, Object> result = future.get(10, TimeUnit.SECONDS);

log.info("发送消息成功: topic={}, partition={}, offset={}",
topic, result.getRecordMetadata().partition(), result.getRecordMetadata().offset());

return result;

} catch (Exception e) {
log.error("发送消息失败: topic={}, message={}", topic, message, e);
throw new BusinessException("发送消息失败", e);
}
}

/**
* 发送带键的消息
* @param topic 主题
* @param key 键
* @param message 消息
* @return 发送结果
*/
public SendResult<String, Object> sendMessage(String topic, String key, Object message) {
try {
// 1. 序列化消息
String messageJson = objectMapper.writeValueAsString(message);

// 2. 发送消息
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(topic, key, messageJson);

// 3. 获取发送结果
SendResult<String, Object> result = future.get(10, TimeUnit.SECONDS);

log.info("发送带键消息成功: topic={}, key={}, partition={}, offset={}",
topic, key, result.getRecordMetadata().partition(), result.getRecordMetadata().offset());

return result;

} catch (Exception e) {
log.error("发送带键消息失败: topic={}, key={}, message={}", topic, key, message, e);
throw new BusinessException("发送带键消息失败", e);
}
}

/**
* 发送消息到指定分区
* @param topic 主题
* @param partition 分区
* @param key 键
* @param message 消息
* @return 发送结果
*/
public SendResult<String, Object> sendMessage(String topic, int partition, String key, Object message) {
try {
// 1. 序列化消息
String messageJson = objectMapper.writeValueAsString(message);

// 2. 发送消息
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(topic, partition, key, messageJson);

// 3. 获取发送结果
SendResult<String, Object> result = future.get(10, TimeUnit.SECONDS);

log.info("发送分区消息成功: topic={}, partition={}, key={}, offset={}",
topic, partition, key, result.getRecordMetadata().offset());

return result;

} catch (Exception e) {
log.error("发送分区消息失败: topic={}, partition={}, key={}, message={}",
topic, partition, key, message, e);
throw new BusinessException("发送分区消息失败", e);
}
}

/**
* 异步发送消息
* @param topic 主题
* @param message 消息
* @param callback 回调函数
*/
public void sendMessageAsync(String topic, Object message, SendCallback callback) {
try {
// 1. 序列化消息
String messageJson = objectMapper.writeValueAsString(message);

// 2. 异步发送消息
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(topic, messageJson);

// 3. 添加回调
future.addCallback(callback);

log.debug("异步发送消息: topic={}", topic);

} catch (Exception e) {
log.error("异步发送消息失败: topic={}, message={}", topic, message, e);
if (callback != null) {
callback.onFailure(new RuntimeException("异步发送消息失败", e));
}
}
}

/**
* 批量发送消息
* @param topic 主题
* @param messages 消息列表
* @return 发送结果列表
*/
public List<SendResult<String, Object>> sendBatchMessages(String topic, List<Object> messages) {
try {
List<SendResult<String, Object>> results = new ArrayList<>();

for (Object message : messages) {
try {
SendResult<String, Object> result = sendMessage(topic, message);
results.add(result);
} catch (Exception e) {
log.error("批量发送消息失败: topic={}, message={}", topic, message, e);
// 继续发送其他消息
}
}

log.info("批量发送消息完成: topic={}, total={}, success={}",
topic, messages.size(), results.size());

return results;

} catch (Exception e) {
log.error("批量发送消息失败: topic={}", topic, e);
throw new BusinessException("批量发送消息失败", e);
}
}

/**
* 发送事务消息
* @param topic 主题
* @param message 消息
* @return 发送结果
*/
@Transactional
public SendResult<String, Object> sendTransactionalMessage(String topic, Object message) {
try {
// 1. 序列化消息
String messageJson = objectMapper.writeValueAsString(message);

// 2. 发送事务消息
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(topic, messageJson);

// 3. 获取发送结果
SendResult<String, Object> result = future.get(10, TimeUnit.SECONDS);

log.info("发送事务消息成功: topic={}, partition={}, offset={}",
topic, result.getRecordMetadata().partition(), result.getRecordMetadata().offset());

return result;

} catch (Exception e) {
log.error("发送事务消息失败: topic={}, message={}", topic, message, e);
throw new BusinessException("发送事务消息失败", e);
}
}
}

/**
* 消息实体
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class KafkaMessage {
private String id;
private String type;
private Object data;
private LocalDateTime timestamp;
private String source;
private Map<String, Object> headers;
}

/**
* 用户事件消息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UserEventMessage {
private String userId;
private String eventType;
private String eventData;
private LocalDateTime eventTime;
private String source;
}

/**
* 订单事件消息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderEventMessage {
private String orderId;
private String userId;
private String eventType;
private String eventData;
private LocalDateTime eventTime;
private String source;
}

4. Kafka消费者服务

4.1 Kafka消费者服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
/**
* Kafka消费者服务
*/
@Service
public class KafkaConsumerService {

private final ObjectMapper objectMapper;

public KafkaConsumerService(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

/**
* 消费用户事件消息
* @param message 消息
* @param partition 分区
* @param offset 偏移量
*/
@KafkaListener(topics = "user-events", groupId = "user-events-group")
public void consumeUserEvents(String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
try {
// 1. 反序列化消息
UserEventMessage userEvent = objectMapper.readValue(message, UserEventMessage.class);

// 2. 处理用户事件
processUserEvent(userEvent);

log.info("消费用户事件成功: partition={}, offset={}, userId={}, eventType={}",
partition, offset, userEvent.getUserId(), userEvent.getEventType());

} catch (Exception e) {
log.error("消费用户事件失败: partition={}, offset={}, message={}",
partition, offset, message, e);
// 这里可以实现重试或死信队列逻辑
}
}

/**
* 消费订单事件消息
* @param message 消息
* @param partition 分区
* @param offset 偏移量
*/
@KafkaListener(topics = "order-events", groupId = "order-events-group")
public void consumeOrderEvents(String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
try {
// 1. 反序列化消息
OrderEventMessage orderEvent = objectMapper.readValue(message, OrderEventMessage.class);

// 2. 处理订单事件
processOrderEvent(orderEvent);

log.info("消费订单事件成功: partition={}, offset={}, orderId={}, eventType={}",
partition, offset, orderEvent.getOrderId(), orderEvent.getEventType());

} catch (Exception e) {
log.error("消费订单事件失败: partition={}, offset={}, message={}",
partition, offset, message, e);
// 这里可以实现重试或死信队列逻辑
}
}

/**
* 批量消费消息
* @param messages 消息列表
*/
@KafkaListener(topics = "batch-events", groupId = "batch-events-group")
public void consumeBatchMessages(List<String> messages) {
try {
log.info("批量消费消息开始: count={}", messages.size());

for (String message : messages) {
try {
// 1. 反序列化消息
KafkaMessage kafkaMessage = objectMapper.readValue(message, KafkaMessage.class);

// 2. 处理消息
processMessage(kafkaMessage);

} catch (Exception e) {
log.error("批量消费消息失败: message={}", message, e);
// 继续处理其他消息
}
}

log.info("批量消费消息完成: count={}", messages.size());

} catch (Exception e) {
log.error("批量消费消息失败", e);
}
}

/**
* 处理用户事件
* @param userEvent 用户事件
*/
private void processUserEvent(UserEventMessage userEvent) {
try {
switch (userEvent.getEventType()) {
case "USER_REGISTER":
handleUserRegister(userEvent);
break;
case "USER_LOGIN":
handleUserLogin(userEvent);
break;
case "USER_LOGOUT":
handleUserLogout(userEvent);
break;
case "USER_UPDATE":
handleUserUpdate(userEvent);
break;
default:
log.warn("未知的用户事件类型: {}", userEvent.getEventType());
}
} catch (Exception e) {
log.error("处理用户事件失败: userId={}, eventType={}",
userEvent.getUserId(), userEvent.getEventType(), e);
}
}

/**
* 处理订单事件
* @param orderEvent 订单事件
*/
private void processOrderEvent(OrderEventMessage orderEvent) {
try {
switch (orderEvent.getEventType()) {
case "ORDER_CREATED":
handleOrderCreated(orderEvent);
break;
case "ORDER_PAID":
handleOrderPaid(orderEvent);
break;
case "ORDER_SHIPPED":
handleOrderShipped(orderEvent);
break;
case "ORDER_DELIVERED":
handleOrderDelivered(orderEvent);
break;
case "ORDER_CANCELLED":
handleOrderCancelled(orderEvent);
break;
default:
log.warn("未知的订单事件类型: {}", orderEvent.getEventType());
}
} catch (Exception e) {
log.error("处理订单事件失败: orderId={}, eventType={}",
orderEvent.getOrderId(), orderEvent.getEventType(), e);
}
}

/**
* 处理消息
* @param kafkaMessage 消息
*/
private void processMessage(KafkaMessage kafkaMessage) {
try {
log.debug("处理消息: id={}, type={}", kafkaMessage.getId(), kafkaMessage.getType());

// 这里可以实现具体的消息处理逻辑

} catch (Exception e) {
log.error("处理消息失败: id={}, type={}", kafkaMessage.getId(), kafkaMessage.getType(), e);
}
}

/**
* 处理用户注册事件
* @param userEvent 用户事件
*/
private void handleUserRegister(UserEventMessage userEvent) {
log.info("处理用户注册事件: userId={}", userEvent.getUserId());
// 实现用户注册事件处理逻辑
}

/**
* 处理用户登录事件
* @param userEvent 用户事件
*/
private void handleUserLogin(UserEventMessage userEvent) {
log.info("处理用户登录事件: userId={}", userEvent.getUserId());
// 实现用户登录事件处理逻辑
}

/**
* 处理用户登出事件
* @param userEvent 用户事件
*/
private void handleUserLogout(UserEventMessage userEvent) {
log.info("处理用户登出事件: userId={}", userEvent.getUserId());
// 实现用户登出事件处理逻辑
}

/**
* 处理用户更新事件
* @param userEvent 用户事件
*/
private void handleUserUpdate(UserEventMessage userEvent) {
log.info("处理用户更新事件: userId={}", userEvent.getUserId());
// 实现用户更新事件处理逻辑
}

/**
* 处理订单创建事件
* @param orderEvent 订单事件
*/
private void handleOrderCreated(OrderEventMessage orderEvent) {
log.info("处理订单创建事件: orderId={}", orderEvent.getOrderId());
// 实现订单创建事件处理逻辑
}

/**
* 处理订单支付事件
* @param orderEvent 订单事件
*/
private void handleOrderPaid(OrderEventMessage orderEvent) {
log.info("处理订单支付事件: orderId={}", orderEvent.getOrderId());
// 实现订单支付事件处理逻辑
}

/**
* 处理订单发货事件
* @param orderEvent 订单事件
*/
private void handleOrderShipped(OrderEventMessage orderEvent) {
log.info("处理订单发货事件: orderId={}", orderEvent.getOrderId());
// 实现订单发货事件处理逻辑
}

/**
* 处理订单送达事件
* @param orderEvent 订单事件
*/
private void handleOrderDelivered(OrderEventMessage orderEvent) {
log.info("处理订单送达事件: orderId={}", orderEvent.getOrderId());
// 实现订单送达事件处理逻辑
}

/**
* 处理订单取消事件
* @param orderEvent 订单事件
*/
private void handleOrderCancelled(OrderEventMessage orderEvent) {
log.info("处理订单取消事件: orderId={}", orderEvent.getOrderId());
// 实现订单取消事件处理逻辑
}
}

5. Kafka集群管理

5.1 Kafka集群管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
/**
* Kafka集群管理服务
*/
@Service
public class KafkaClusterService {

private final AdminClient adminClient;
private final KafkaProperties kafkaProperties;

public KafkaClusterService(AdminClient adminClient, KafkaProperties kafkaProperties) {
this.adminClient = adminClient;
this.kafkaProperties = kafkaProperties;
}

/**
* 创建主题
* @param topicName 主题名称
* @param partitions 分区数
* @param replicationFactor 副本因子
* @return 创建结果
*/
public CreateTopicResult createTopic(String topicName, int partitions, short replicationFactor) {
try {
// 1. 创建主题配置
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);

// 2. 创建主题
CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));

// 3. 等待创建完成
result.all().get(30, TimeUnit.SECONDS);

log.info("创建主题成功: topic={}, partitions={}, replicationFactor={}",
topicName, partitions, replicationFactor);

return CreateTopicResult.success(topicName);

} catch (Exception e) {
log.error("创建主题失败: topic={}, partitions={}, replicationFactor={}",
topicName, partitions, replicationFactor, e);
return CreateTopicResult.error("创建主题失败: " + e.getMessage());
}
}

/**
* 删除主题
* @param topicName 主题名称
* @return 删除结果
*/
public DeleteTopicResult deleteTopic(String topicName) {
try {
// 1. 删除主题
DeleteTopicsResult result = adminClient.deleteTopics(Collections.singletonList(topicName));

// 2. 等待删除完成
result.all().get(30, TimeUnit.SECONDS);

log.info("删除主题成功: topic={}", topicName);

return DeleteTopicResult.success(topicName);

} catch (Exception e) {
log.error("删除主题失败: topic={}", topicName, e);
return DeleteTopicResult.error("删除主题失败: " + e.getMessage());
}
}

/**
* 获取主题列表
* @return 主题列表
*/
public List<TopicInfo> getTopics() {
try {
// 1. 获取主题列表
ListTopicsResult result = adminClient.listTopics();
Set<String> topicNames = result.names().get(10, TimeUnit.SECONDS);

// 2. 获取主题详细信息
DescribeTopicsResult describeResult = adminClient.describeTopics(topicNames);
Map<String, TopicDescription> topicDescriptions = describeResult.all().get(10, TimeUnit.SECONDS);

// 3. 构建主题信息列表
List<TopicInfo> topics = new ArrayList<>();
for (Map.Entry<String, TopicDescription> entry : topicDescriptions.entrySet()) {
TopicInfo topicInfo = TopicInfo.builder()
.name(entry.getKey())
.partitions(entry.getValue().partitions().size())
.replicationFactor(entry.getValue().partitions().get(0).replicas().size())
.build();
topics.add(topicInfo);
}

log.debug("获取主题列表成功: count={}", topics.size());

return topics;

} catch (Exception e) {
log.error("获取主题列表失败", e);
throw new BusinessException("获取主题列表失败", e);
}
}

/**
* 获取主题详细信息
* @param topicName 主题名称
* @return 主题详细信息
*/
public TopicDetailInfo getTopicDetail(String topicName) {
try {
// 1. 获取主题描述
DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topicName));
TopicDescription topicDescription = result.all().get(10, TimeUnit.SECONDS).get(topicName);

if (topicDescription == null) {
throw new BusinessException("主题不存在");
}

// 2. 构建主题详细信息
TopicDetailInfo topicDetail = TopicDetailInfo.builder()
.name(topicDescription.name())
.partitions(topicDescription.partitions().size())
.replicationFactor(topicDescription.partitions().get(0).replicas().size())
.partitionDetails(topicDescription.partitions())
.build();

log.debug("获取主题详细信息成功: topic={}", topicName);

return topicDetail;

} catch (Exception e) {
log.error("获取主题详细信息失败: topic={}", topicName, e);
throw new BusinessException("获取主题详细信息失败", e);
}
}

/**
* 获取集群信息
* @return 集群信息
*/
public ClusterInfo getClusterInfo() {
try {
// 1. 获取集群描述
DescribeClusterResult result = adminClient.describeCluster();
Cluster cluster = result.cluster().get(10, TimeUnit.SECONDS);

// 2. 构建集群信息
ClusterInfo clusterInfo = ClusterInfo.builder()
.clusterId(cluster.clusterId())
.controller(cluster.controller().toString())
.nodes(cluster.nodes().size())
.topics(cluster.topics().size())
.build();

log.debug("获取集群信息成功: clusterId={}, nodes={}",
clusterInfo.getClusterId(), clusterInfo.getNodes());

return clusterInfo;

} catch (Exception e) {
log.error("获取集群信息失败", e);
throw new BusinessException("获取集群信息失败", e);
}
}

/**
* 获取消费者组信息
* @return 消费者组信息列表
*/
public List<ConsumerGroupInfo> getConsumerGroups() {
try {
// 1. 获取消费者组列表
ListConsumerGroupsResult result = adminClient.listConsumerGroups();
Collection<ConsumerGroupListing> consumerGroups = result.all().get(10, TimeUnit.SECONDS);

// 2. 构建消费者组信息列表
List<ConsumerGroupInfo> consumerGroupInfos = new ArrayList<>();
for (ConsumerGroupListing group : consumerGroups) {
ConsumerGroupInfo groupInfo = ConsumerGroupInfo.builder()
.groupId(group.groupId())
.type(group.type().toString())
.build();
consumerGroupInfos.add(groupInfo);
}

log.debug("获取消费者组信息成功: count={}", consumerGroupInfos.size());

return consumerGroupInfos;

} catch (Exception e) {
log.error("获取消费者组信息失败", e);
throw new BusinessException("获取消费者组信息失败", e);
}
}
}

/**
* 创建主题结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CreateTopicResult {
private boolean success;
private String topicName;
private String message;

public static CreateTopicResult success(String topicName) {
return CreateTopicResult.builder()
.success(true)
.topicName(topicName)
.build();
}

public static CreateTopicResult error(String message) {
return CreateTopicResult.builder()
.success(false)
.message(message)
.build();
}
}

/**
* 删除主题结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeleteTopicResult {
private boolean success;
private String topicName;
private String message;

public static DeleteTopicResult success(String topicName) {
return DeleteTopicResult.builder()
.success(true)
.topicName(topicName)
.build();
}

public static DeleteTopicResult error(String message) {
return DeleteTopicResult.builder()
.success(false)
.message(message)
.build();
}
}

/**
* 主题信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TopicInfo {
private String name;
private int partitions;
private int replicationFactor;
}

/**
* 主题详细信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TopicDetailInfo {
private String name;
private int partitions;
private int replicationFactor;
private List<TopicPartitionInfo> partitionDetails;
}

/**
* 集群信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ClusterInfo {
private String clusterId;
private String controller;
private int nodes;
private int topics;
}

/**
* 消费者组信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ConsumerGroupInfo {
private String groupId;
private String type;
}

6. Kafka监控服务

6.1 Kafka监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/**
* Kafka监控服务
*/
@Service
public class KafkaMonitorService {

private final KafkaClusterService clusterService;
private final MeterRegistry meterRegistry;

public KafkaMonitorService(KafkaClusterService clusterService, MeterRegistry meterRegistry) {
this.clusterService = clusterService;
this.meterRegistry = meterRegistry;
}

/**
* 监控Kafka集群状态
*/
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void monitorClusterStatus() {
try {
// 1. 获取集群信息
ClusterInfo clusterInfo = clusterService.getClusterInfo();

// 2. 记录监控指标
Gauge.builder("kafka.cluster.nodes")
.description("Kafka集群节点数量")
.register(meterRegistry, clusterInfo, ClusterInfo::getNodes);

Gauge.builder("kafka.cluster.topics")
.description("Kafka集群主题数量")
.register(meterRegistry, clusterInfo, ClusterInfo::getTopics);

log.debug("Kafka集群状态监控完成: nodes={}, topics={}",
clusterInfo.getNodes(), clusterInfo.getTopics());

} catch (Exception e) {
log.error("Kafka集群状态监控失败", e);
}
}

/**
* 监控主题状态
*/
@Scheduled(fixedRate = 300000) // 每5分钟执行一次
public void monitorTopicStatus() {
try {
// 1. 获取主题列表
List<TopicInfo> topics = clusterService.getTopics();

// 2. 记录监控指标
Gauge.builder("kafka.topics.count")
.description("Kafka主题数量")
.register(meterRegistry, topics, List::size);

log.debug("Kafka主题状态监控完成: count={}", topics.size());

} catch (Exception e) {
log.error("Kafka主题状态监控失败", e);
}
}

/**
* 监控消费者组状态
*/
@Scheduled(fixedRate = 300000) // 每5分钟执行一次
public void monitorConsumerGroupStatus() {
try {
// 1. 获取消费者组列表
List<ConsumerGroupInfo> consumerGroups = clusterService.getConsumerGroups();

// 2. 记录监控指标
Gauge.builder("kafka.consumer.groups.count")
.description("Kafka消费者组数量")
.register(meterRegistry, consumerGroups, List::size);

log.debug("Kafka消费者组状态监控完成: count={}", consumerGroups.size());

} catch (Exception e) {
log.error("Kafka消费者组状态监控失败", e);
}
}

/**
* 获取监控指标
* @return 监控指标
*/
public Map<String, Object> getMonitorMetrics() {
try {
Map<String, Object> metrics = new HashMap<>();

// 1. 集群信息
ClusterInfo clusterInfo = clusterService.getClusterInfo();
metrics.put("cluster", clusterInfo);

// 2. 主题信息
List<TopicInfo> topics = clusterService.getTopics();
metrics.put("topics", topics);

// 3. 消费者组信息
List<ConsumerGroupInfo> consumerGroups = clusterService.getConsumerGroups();
metrics.put("consumerGroups", consumerGroups);

return metrics;

} catch (Exception e) {
log.error("获取监控指标失败", e);
throw new BusinessException("获取监控指标失败", e);
}
}
}

7. Kafka控制器

7.1 Kafka控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/**
* Kafka控制器
*/
@RestController
@RequestMapping("/kafka")
public class KafkaController {

@Autowired
private KafkaProducerService producerService;

@Autowired
private KafkaClusterService clusterService;

@Autowired
private KafkaMonitorService monitorService;

/**
* 发送消息
*/
@PostMapping("/send")
public ResponseEntity<Map<String, Object>> sendMessage(
@RequestParam String topic,
@RequestBody Object message) {
try {
SendResult<String, Object> result = producerService.sendMessage(topic, message);

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("partition", result.getRecordMetadata().partition());
response.put("offset", result.getRecordMetadata().offset());
response.put("message", "发送消息成功");

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("发送消息失败: topic={}", topic, e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "发送消息失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 创建主题
*/
@PostMapping("/topic/create")
public ResponseEntity<Map<String, Object>> createTopic(
@RequestParam String topicName,
@RequestParam int partitions,
@RequestParam short replicationFactor) {
try {
CreateTopicResult result = clusterService.createTopic(topicName, partitions, replicationFactor);

Map<String, Object> response = new HashMap<>();
response.put("success", result.isSuccess());
response.put("topicName", result.getTopicName());
response.put("message", result.getMessage());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("创建主题失败: topicName={}", topicName, e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "创建主题失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 获取主题列表
*/
@GetMapping("/topics")
public ResponseEntity<Map<String, Object>> getTopics() {
try {
List<TopicInfo> topics = clusterService.getTopics();

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("topics", topics);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取主题列表失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取主题列表失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 获取监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<Map<String, Object>> getMetrics() {
try {
Map<String, Object> metrics = monitorService.getMonitorMetrics();

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("metrics", metrics);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取监控指标失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取监控指标失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}
}

8. 总结

通过Kafka消息队列服务的实现,我们成功构建了一个完整的消息队列系统。关键特性包括:

8.1 核心优势

  1. 消息生产: 消息生产者发送数据到Kafka
  2. 消息消费: 消息消费者从Kafka读取数据
  3. 消息分区: 消息分区和负载均衡
  4. 集群管理: Kafka集群部署和管理
  5. 监控告警: 集群状态监控和告警

8.2 最佳实践

  1. 分区策略: 合理的分区策略和负载均衡
  2. 消费者组: 消费者组管理和并行处理
  3. 消息序列化: 高效的消息序列化和反序列化
  4. 错误处理: 完善的错误处理和重试机制
  5. 监控告警: 全面的监控指标和告警机制

这套Kafka消息队列方案不仅能够提供高吞吐量的消息处理能力,还包含了集群管理、监控告警等核心功能,是现代分布式系统的重要基础设施。