Kafka阻塞队列架构实战:高吞吐零丢失、可靠性保证与企业级消息队列解决方案

一、阻塞队列概述

1.1 什么是阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
阻塞队列特性:
阻塞特性:
- 队列为空时阻塞消费者
- 队列满时阻塞生产者
- 自动流控和背压

Kafka阻塞队列:
- 高吞吐量: 10万+ msg/s
- 零丢失: 持久化和副本
- 顺序保证: 分区内有序
- 可靠性: 持久化到磁盘

应用场景:
- 异步任务队列
- 事件驱动架构
- 数据流水线
- 异步日志收集

1.2 与传统队列对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
队列对比:
Redis Queue:
- 优点: 简单,低延迟
- 缺点: 内存受限,无持久化

RabbitMQ:
- 优点: 功能丰富,路由灵活
- 缺点: 吞吐量较低,性能瓶颈

Kafka:
- 优点: 高吞吐,零丢失,可扩展
- 缺点: 配置复杂,运维成本高

选型建议:
- 高吞吐+零丢失: Kafka
- 功能优先: RabbitMQ
- 简单场景: Redis

二、高吞吐量设计

2.1 生产者高吞吐配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// HighThroughputProducer.java
package com.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class HighThroughputProducer {

private KafkaProducer<String, String> producer;
private ThreadPoolExecutor executor;

public HighThroughputProducer() {
// 1. 性能优化配置
Properties props = new Properties();

// 基础配置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

// 高吞吐配置
props.put(ProducerConfig.ACKS_CONFIG, "1"); // 无需等待所有副本
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

// 批次配置
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 10ms延迟
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 压缩

// 缓冲区配置
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB

// 性能优化
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); // 10MB

// 连接配置
props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 540000);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 300000);

this.producer = new KafkaProducer<>(props);

// 2. 线程池配置
this.executor = new ThreadPoolExecutor(
20, // 核心线程数
50, // 最大线程数
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy() // 阻塞提交
);
}

/**
* 异步发送 - 提高吞吐
*/
public void sendAsync(String topic, String key, String value) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, key, value);

// 异步发送
Future<org.apache.kafka.clients.producer.RecordMetadata> future =
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("发送失败: " + exception.getMessage());
}
});
}

/**
* 批量发送
*/
public void sendBatch(String topic, java.util.List<String> messages) {
for (String message : messages) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, message);

// 批量提交到线程池
executor.submit(() -> producer.send(record));
}
}

/**
* 并行发送
*/
public void sendParallel(String topic, java.util.List<String> messages) {
messages.parallelStream().forEach(message -> {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, message);
producer.send(record);
});
}

/**
* 同步发送(零丢失)
*/
public void sendSync(String topic, String key, String value) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, key, value);

try {
// 同步等待确认
producer.send(record).get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("发送失败", e);
}
}

public void close() {
producer.close();
executor.shutdown();
}
}

2.2 消费者高吞吐配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// HighThroughputConsumer.java
package com.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class HighThroughputConsumer {

private KafkaConsumer<String, String> consumer;
private ExecutorService executor;
private LinkedBlockingQueue<ConsumerRecord<String, String>> queue;

public HighThroughputConsumer() {
// 1. 高吞吐配置
Properties props = new Properties();

// 基础配置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "high-throughput-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

// 高吞吐配置
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 最少1KB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 最多等500ms
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10485760); // 10MB
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); // 每次poll 1000条

// 自动提交配置
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

// 会话超时
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);

// 消费者实例数
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

this.consumer = new KafkaConsumer<>(props);

// 2. 线程池配置
this.executor = Executors.newFixedThreadPool(20);

// 3. 本地队列
this.queue = new LinkedBlockingQueue<>(10000);
}

/**
* 批量消费
*/
public void consumeBatch(String topic) {
consumer.subscribe(Collections.singletonList(topic));

while (true) {
// 批量poll
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));

if (records.isEmpty()) {
continue;
}

// 批量处理
for (ConsumerRecord<String, String> record : records) {
processMessage(record);
}

// 异步提交
consumer.commitAsync();
}
}

/**
* 并行消费
*/
public void consumeParallel(String topic) {
consumer.subscribe(Collections.singletonList(topic));

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));

if (records.isEmpty()) {
continue;
}

// 并行处理
records.forEach(record ->
executor.submit(() -> processMessage(record))
);
}
}

/**
* 队列缓冲消费
*/
public void consumeWithQueue(String topic) {
// 生产者线程:poll消息到队列
Thread producerThread = new Thread(() -> {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));

for (ConsumerRecord<String, String> record : records) {
try {
queue.put(record); // 阻塞放入
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
});

// 消费者线程:从队列取出处理
Thread consumerThread = new Thread(() -> {
while (true) {
try {
ConsumerRecord<String, String> record =
queue.take(); // 阻塞取出
processMessage(record);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});

producerThread.start();
consumerThread.start();
}

private void processMessage(ConsumerRecord<String, String> record) {
// 业务处理
System.out.println("Processing: " + record.value());
}
}

2.3 Topic配置优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/bin/bash
# create_optimized_topic.sh - 创建优化Topic

echo "=== 创建高吞吐Topic ==="

# 分区数:根据吞吐量设置
PARTITIONS=10

# 副本数:保证可靠性
REPLICATION_FACTOR=3

# 创建Topic
kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic high-throughput-topic \
--partitions $PARTITIONS \
--replication-factor $REPLICATION_FACTOR \
--config min.insync.replicas=2 \
--config cleanup.policy=delete \
--config retention.ms=604800000 \
--config segment.ms=3600000 \
--config compression.type=snappy

echo "Topic创建完成"

# 查看Topic配置
kafka-topics --describe \
--bootstrap-server localhost:9092 \
--topic high-throughput-topic

三、零丢失保障

3.1 生产者零丢失配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// ZeroLossProducer.java
package com.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class ZeroLossProducer {

private KafkaProducer<String, String> producer;

public ZeroLossProducer() {
Properties props = new Properties();

// 基础配置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

// 零丢失配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 保证顺序

// 幂等性配置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txn-producer");

// 压缩配置
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

// 批次配置
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 100); // 更长延迟以增加批次

this.producer = new KafkaProducer<>(props);

// 初始化事务
producer.initTransactions();
}

/**
* 同步发送 - 零丢失
*/
public void sendSync(String topic, String key, String value) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, key, value);

try {
// 同步等待,确保消息已发送
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get(30, TimeUnit.SECONDS);

System.out.println("发送成功: " + metadata);

} catch (Exception e) {
System.err.println("发送失败: " + e.getMessage());
throw new RuntimeException("消息发送失败", e);
}
}

/**
* 事务发送
*/
public void sendTransactional(String topic, String key, String value) {
try {
// 开始事务
producer.beginTransaction();

// 发送消息
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, key, value);
producer.send(record);

// 提交事务
producer.commitTransaction();

} catch (Exception e) {
// 回滚事务
producer.abortTransaction();
throw new RuntimeException("事务发送失败", e);
}
}

/**
* 带回调的发送
*/
public void sendWithCallback(String topic, String key, String value) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, key, value);

producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("发送失败: " + exception.getMessage());
// 记录到失败队列
handleFailedMessage(record);
} else {
System.out.println("发送成功: offset=" + metadata.offset());
}
});
}

/**
* 处理失败消息
*/
private void handleFailedMessage(ProducerRecord<String, String> record) {
// 1. 记录到数据库
// 2. 发送到死信队列
// 3. 告警通知
System.out.println("失败消息: " + record.value());
}

public void flush() {
producer.flush();
}

public void close() {
producer.close();
}
}

3.2 消费者零丢失配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// ZeroLossConsumer.java
package com.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ZeroLossConsumer {

private KafkaConsumer<String, String> consumer;

public ZeroLossConsumer() {
Properties props = new Properties();

// 基础配置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "zero-loss-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

// 零丢失配置
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁用自动提交
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 每次处理100条
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5分钟超时

// 隔离级别
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

// 自动重置offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

this.consumer = new KafkaConsumer<>(props);
}

/**
* 零丢失消费
*/
public void consumeZeroLoss(String topic) {
consumer.subscribe(Collections.singletonList(topic));

while (true) {
try {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(5));

if (records.isEmpty()) {
continue;
}

// 处理消息
for (ConsumerRecord<String, String> record : records) {
try {
// 业务处理
processMessage(record);

// 处理成功后提交offset
consumer.commitSync();

} catch (Exception e) {
System.err.println("处理失败: " + e.getMessage());
// 不提交offset,下次重试
}
}

} catch (Exception e) {
System.err.println("消费异常: " + e.getMessage());
// 继续消费,避免中断
}
}
}

/**
* 精确一次消费
*/
public void consumeExactlyOnce(String topic) {
consumer.subscribe(Collections.singletonList(topic));

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(5));

for (ConsumerRecord<String, String> record : records) {
try {
// 1. 检查幂等性
if (isProcessed(record.key())) {
continue;
}

// 2. 处理消息
processMessage(record);

// 3. 标记已处理
markAsProcessed(record.key());

} catch (Exception e) {
System.err.println("处理失败: " + e.getMessage());
}
}

// 批量提交offset
consumer.commitSync();
}
}

private void processMessage(ConsumerRecord<String, String> record) {
// 业务处理
System.out.println("处理: " + record.value());
}

private boolean isProcessed(String key) {
// 检查是否已处理
return false;
}

private void markAsProcessed(String key) {
// 标记已处理
}
}

四、阻塞队列实现

4.1 阻塞队列封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// KafkaBlockingQueue.java
package com.example.kafka.queue;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class KafkaBlockingQueue<V> {

private static final Logger logger = LoggerFactory.getLogger(KafkaBlockingQueue.class);

private HighThroughputProducer producer;
private HighThroughputConsumer consumer;
private String topic;
private BlockingQueue<ConsumerRecord<String, String>> localQueue;

public KafkaBlockingQueue(String topic) {
this.topic = topic;
this.producer = new HighThroughputProducer();
this.consumer = new HighThroughputConsumer();
this.localQueue = new LinkedBlockingQueue<>(10000);

// 启动消费线程
startConsumer();
}

/**
* 启动消费者线程
*/
private void startConsumer() {
Thread consumerThread = new Thread(() -> {
while (true) {
try {
ConsumerRecords<String, String> records =
consumer.getConsumer().poll(Duration.ofSeconds(1));

for (ConsumerRecord<String, String> record : records) {
localQueue.put(record);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});

consumerThread.setDaemon(true);
consumerThread.start();
}

/**
* 阻塞入队
*/
public void put(V value) throws InterruptedException {
producer.sendAsync(topic, value.toString(), value.toString());
}

/**
* 阻塞出队
*/
public V take() throws InterruptedException {
ConsumerRecord<String, String> record = localQueue.take();
return (V) record.value();
}

/**
* 超时出队
*/
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
ConsumerRecord<String, String> record =
localQueue.poll(timeout, unit);

if (record == null) {
return null;
}

return (V) record.value();
}

/**
* 非阻塞出队
*/
public V poll() {
ConsumerRecord<String, String> record = localQueue.poll();

if (record == null) {
return null;
}

return (V) record.value();
}

/**
* 获取队列大小
*/
public int size() {
return localQueue.size();
}

/**
* 队列是否为空
*/
public boolean isEmpty() {
return localQueue.isEmpty();
}
}

4.2 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// BlockingQueueExample.java
package com.example.kafka;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BlockingQueueExample {

public static void main(String[] args) throws InterruptedException {
// 创建阻塞队列
KafkaBlockingQueue<String> queue =
new KafkaBlockingQueue<>("my-queue");

// 生产者
ExecutorService producerExecutor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 1000; i++) {
final int index = i;
producerExecutor.submit(() -> {
try {
queue.put("message-" + index);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

// 消费者
ExecutorService consumerExecutor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
consumerExecutor.submit(() -> {
while (true) {
try {
String message = queue.take();
System.out.println("Consumed: " + message);
// 处理消息
processMessage(message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}

Thread.sleep(60000);
producerExecutor.shutdown();
consumerExecutor.shutdown();
}

private static void processMessage(String message) {
// 业务处理
}
}

五、性能测试

5.1 吞吐量测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// ThroughputTest.java
package com.example.kafka.test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class ThroughputTest {

public void testProducerThroughput() throws InterruptedException {
HighThroughputProducer producer = new HighThroughputProducer();

int messageCount = 100000;
CountDownLatch latch = new CountDownLatch(messageCount);
AtomicLong successCount = new AtomicLong(0);

long startTime = System.currentTimeMillis();

for (int i = 0; i < messageCount; i++) {
final int index = i;
producer.sendAsync("test-topic", String.valueOf(index), "message-" + index);
successCount.incrementAndGet();
}

producer.flush();

long endTime = System.currentTimeMillis();
long duration = endTime - startTime;

double throughput = (double) messageCount / (duration / 1000.0);

System.out.println("消息数量: " + messageCount);
System.out.println("耗时: " + duration + "ms");
System.out.println("吞吐量: " + throughput + " msg/s");
System.out.println("成功数: " + successCount.get());
}

public void testConsumerThroughput() {
// 消费者吞吐量测试
}
}

六、总结

6.1 核心要点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
高吞吐+零丢失实现要点:
生产者:
- acks=1: 高吞吐
- acks=all: 零丢失
- 批次和压缩
- 异步发送

消费者:
- 批量和并行消费
- 本地队列缓冲
- 手动提交offset
- 精确一次消费

Topic配置:
- 合理分区数
- 副本机制
- 压缩算法
- 保留策略

6.2 最佳实践

  1. 吞吐量优先: acks=1、异步发送、批次处理
  2. 可靠性优先: acks=all、同步发送、事务
  3. 平衡方案: 适度使用批次与压缩
  4. 监控告警: 监控吞吐量、延迟与错误率
  5. 性能测试: 压测并优化

按此方案可实现高吞吐与零丢失的阻塞队列。