Kafka集群分区架构 - 千万级设备负载均衡实战

1. 架构概述

在千万级设备接入场景下,Kafka集群需要处理海量消息,确保高可用、高性能和高扩展性。本文将从架构设计、分区策略、负载均衡、Java客户端实现等多个维度,深入讲解如何构建一个支持千万级设备的Kafka集群架构。

1.1 业务场景

  • 设备规模: 千万级IoT设备同时在线
  • 消息量级: 每秒百万级消息吞吐
  • 服务器集群: 多台相同配置服务器
  • 负载均衡: 每台服务器均匀分担负载
  • 高可用要求: 99.99%可用性

1.2 架构设计图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
千万级设备Kafka集群架构
├── 设备层 (Device Layer)
│ ├── IoT设备1 (千万台)
│ ├── IoT设备2
│ └── IoT设备N

├── 负载均衡层 (Load Balancer Layer)
│ ├── Nginx/HAProxy
│ ├── 设备ID哈希路由
│ └── 健康检查

├── 应用服务层 (Application Service Layer)
│ ├── 服务1 (多实例)
│ ├── 应用服务2 (多实例)
│ └── 应用服务N (多实例)
│ ├── 服务实例1
│ ├── 服务实例2
│ └── 服务实例N

├── Kafka集群层 (Kafka Cluster Layer)
│ ├── Broker 1
│ ├── Broker 2
│ ├── Broker 3
│ └── Broker N
│ ├── Topic: device-events
│ │ ├── Partition 0 (Leader: Broker1)
│ │ ├── Partition 1 (Leader: Broker2)
│ │ ├── Partition 2 (Leader: Broker3)
│ │ └── Partition N (Leader: BrokerN)
│ └── Topic: device-status

└── 消费层 (Consumer Layer)
├── Consumer Group 1
├── Consumer Group 2
└── Consumer Group N

2. Kafka集群配置

2.1 集群规模规划

对于千万级设备场景,建议配置:

1
2
3
4
5
6
7
8
9
集群配置:
Broker数量: 6-12 (根据消息量调整)
副本因子: 3 (保证高可用)
分区数量: 根据设备数量动态调整
单Broker配置:
CPU: 16核+
内存: 32GB+
磁盘: SSD 1TB+
网络: 万兆网卡

2.2 Broker核心配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# server.properties - Kafka Broker配置

# Broker唯一标识
broker.id=1

# 监听地址
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka-broker1:9092

# 日志目录
log.dirs=/data/kafka-logs

# 分区数量默认值
num.partitions=12

# 副本因子
default.replication.factor=3

# 最小同步副本数
min.insync.replicas=2

# 消息保留时间(小时)
log.retention.hours=168

# 日志段大小
log.segment.bytes=1073741824

# 压缩类型
compression.type=lz4

# 网络线程数
num.network.threads=8

# IO线程数
num.io.threads=16

# Socket发送缓冲区
socket.send.buffer.bytes=102400

# Socket接收缓冲区
socket.receive.buffer.bytes=102400

# Socket请求最大字节数
socket.request.max.bytes=104857600

# 副本拉取线程数
num.replica.fetchers=4

# 副本拉取延迟
replica.fetch.wait.max.ms=500

# 控制器线程数
controller.thread.pool.size=4

# 事务状态日志副本数
transaction.state.log.replication.factor=3

# 事务状态日志最小同步副本数
transaction.state.log.min.isr=2

2.3 Zookeeper集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# zookeeper.properties

# 数据目录
dataDir=/data/zookeeper

# 客户端端口
clientPort=2181

# 服务器配置
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888

# 最大客户端连接数
maxClientCnxns=1000

# 会话超时时间
tickTime=2000
initLimit=10
syncLimit=5

3. 分区策略设计

3.1 分区数量计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 分区数量计算公式
*
* 分区数 = (设备数 / 单分区处理能力) * 冗余系数
*
* 假设:
* - 单分区每秒处理: 10万消息
* - 千万设备每秒产生: 100万消息
* - 冗余系数: 1.5
*
* 分区数 = (1000000 / 100000) * 1.5 = 15
*
* 建议: 分区数应该是Broker数量的整数倍
*/
public class PartitionCalculator {

/**
* 计算推荐分区数
*
* @param deviceCount 设备数量
* @param messagesPerSecond 每秒消息数
* @param brokerCount Broker数量
* @return 推荐分区数
*/
public static int calculatePartitions(
long deviceCount,
long messagesPerSecond,
int brokerCount) {

// 单分区处理能力(消息/秒)
long partitionCapacity = 100000;

// 计算基础分区数
int basePartitions = (int) Math.ceil(
(double) messagesPerSecond / partitionCapacity
);

// 冗余系数
double redundancyFactor = 1.5;

// 计算推荐分区数
int recommendedPartitions = (int) Math.ceil(
basePartitions * redundancyFactor
);

// 调整为Broker数量的整数倍
int adjustedPartitions = ((recommendedPartitions + brokerCount - 1)
/ brokerCount) * brokerCount;

return Math.max(adjustedPartitions, brokerCount);
}
}

3.2 自定义分区器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.architecture.kafka.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

/**
* 设备ID哈希分区器
*
* 策略: 根据设备ID进行哈希分区,确保同一设备的消息发送到同一分区
* 优点:
* 1. 保证同一设备消息的顺序性
* 2. 负载均衡分布
* 3. 便于按设备维度进行消费
*/
public class DeviceIdPartitioner implements Partitioner {

@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

if (keyBytes == null) {
// 如果没有key,使用轮询方式
return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
}

// 从key中提取设备ID
String deviceId = extractDeviceId(key);

if (deviceId == null) {
// 如果无法提取设备ID,使用key的哈希值
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

// 使用设备ID进行哈希分区
return Utils.toPositive(Utils.murmur2(deviceId.getBytes())) % numPartitions;
}

/**
* 从key中提取设备ID
*/
private String extractDeviceId(Object key) {
if (key instanceof String) {
// 假设key格式: device-{deviceId}
String keyStr = (String) key;
if (keyStr.startsWith("device-")) {
return keyStr.substring(7);
}
return keyStr;
}

// 如果key是自定义对象,提取deviceId字段
if (key instanceof DeviceMessage) {
return ((DeviceMessage) key).getDeviceId();
}

return null;
}

@Override
public void close() {
// 清理资源
}

@Override
public void configure(Map<String, ?> configs) {
// 配置初始化
}
}

/**
* 设备消息对象
*/
class DeviceMessage {
private String deviceId;
private String messageType;
private Object payload;

// getters and setters
public String getDeviceId() {
return deviceId;
}
}

3.3 动态分区创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.architecture.kafka.admin;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.TopicConfig;

import java.util.*;
import java.util.concurrent.ExecutionException;

/**
* Kafka Topic管理工具
*/
public class KafkaTopicManager {

private AdminClient adminClient;

public KafkaTopicManager(String bootstrapServers) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
adminClient = AdminClient.create(props);
}

/**
* 创建Topic,支持动态分区
*/
public void createTopicWithPartitions(
String topicName,
int partitionCount,
short replicationFactor) throws ExecutionException, InterruptedException {

// 检查Topic是否已存在
if (topicExists(topicName)) {
// 如果存在,扩展分区
expandPartitions(topicName, partitionCount);
return;
}

// 创建新Topic
NewTopic newTopic = new NewTopic(topicName, partitionCount, replicationFactor);

// 配置Topic参数
Map<String, String> configs = new HashMap<>();
configs.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
configs.put(TopicConfig.RETENTION_MS_CONFIG, "604800000"); // 7天
configs.put(TopicConfig.SEGMENT_MS_CONFIG, "86400000"); // 1天
configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "10485760"); // 10MB

newTopic.configs(configs);

CreateTopicsResult result = adminClient.createTopics(
Collections.singletonList(newTopic)
);

result.all().get();
System.out.println("Topic创建成功: " + topicName);
}

/**
* 扩展分区
*/
private void expandPartitions(String topicName, int newPartitionCount)
throws ExecutionException, InterruptedException {

Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(topicName, NewPartitions.increaseTo(newPartitionCount));

adminClient.createPartitions(newPartitions).all().get();
System.out.println("Topic分区扩展成功: " + topicName);
}

/**
* 检查Topic是否存在
*/
private boolean topicExists(String topicName)
throws ExecutionException, InterruptedException {

Set<String> topics = adminClient.listTopics().names().get();
return topics.contains(topicName);
}

public void close() {
if (adminClient != null) {
adminClient.close();
}
}
}

4. 负载均衡方案

4.1 应用层负载均衡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.architecture.kafka.loadbalance;

import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 负载均衡策略
*/
@Component
public class LoadBalancer {

private final AtomicInteger roundRobinCounter = new AtomicInteger(0);

/**
* 轮询负载均衡
*/
public String roundRobin(List<String> servers) {
if (servers == null || servers.isEmpty()) {
throw new IllegalArgumentException("服务器列表为空");
}
int index = roundRobinCounter.getAndIncrement() % servers.size();
return servers.get(Math.abs(index));
}

/**
* 随机负载均衡
*/
public String random(List<String> servers) {
if (servers == null || servers.isEmpty()) {
throw new IllegalArgumentException("服务器列表为空");
}
int index = ThreadLocalRandom.current().nextInt(servers.size());
return servers.get(index);
}

/**
* 设备ID哈希负载均衡
* 确保同一设备总是路由到同一服务器
*/
public String hash(String deviceId, List<String> servers) {
if (servers == null || servers.isEmpty()) {
throw new IllegalArgumentException("服务器列表为空");
}
int hashCode = deviceId.hashCode();
int index = Math.abs(hashCode) % servers.size();
return servers.get(index);
}

/**
* 加权轮询
*/
public String weightedRoundRobin(List<ServerWeight> servers) {
// 实现加权轮询算法
int totalWeight = servers.stream()
.mapToInt(ServerWeight::getWeight)
.sum();

int currentWeight = roundRobinCounter.getAndIncrement() % totalWeight;
int weightSum = 0;

for (ServerWeight server : servers) {
weightSum += server.getWeight();
if (currentWeight < weightSum) {
return server.getServer();
}
}

return servers.get(0).getServer();
}
}

/**
* 服务器权重
*/
class ServerWeight {
private String server;
private int weight;

public ServerWeight(String server, int weight) {
this.server = server;
this.weight = weight;
}

public String getServer() {
return server;
}

public int getWeight() {
return weight;
}
}

4.2 Kafka Producer负载均衡配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package com.architecture.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

/**
* 高性能Kafka Producer
*/
public class HighPerformanceKafkaProducer {

private KafkaProducer<String, String> producer;

public HighPerformanceKafkaProducer(String bootstrapServers) {
Properties props = new Properties();

// Broker地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

// 序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());

// 自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
DeviceIdPartitioner.class.getName());

// 确认机制
props.put(ProducerConfig.ACKS_CONFIG, "1"); // 1: leader确认, all: 所有副本确认

// 重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 3);

// 批量发送配置
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待10ms批量发送

// 缓冲区配置
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32MB

// 压缩类型
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

// 最大请求大小
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); // 10MB

// 请求超时
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);

// 元数据获取超时
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 300000);

// 连接空闲时间
props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 540000);

// 最大阻塞时间
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);

// 启用幂等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// 事务ID(如果使用事务)
// props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "device-producer-1");

producer = new KafkaProducer<>(props);
}

/**
* 异步发送消息
*/
public Future<RecordMetadata> sendAsync(String topic, String deviceId, String message) {
ProducerRecord<String, String> record = new ProducerRecord<>(
topic,
"device-" + deviceId, // key: 设备ID
message
);

return producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("消息发送失败: " + exception.getMessage());
// 记录失败日志,可以重试或持久化
} else {
System.out.println("消息发送成功: topic=" + metadata.topic()
+ ", partition=" + metadata.partition()
+ ", offset=" + metadata.offset());
}
});
}

/**
* 同步发送消息
*/
public RecordMetadata sendSync(String topic, String deviceId, String message)
throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>(
topic,
"device-" + deviceId,
message
);

return producer.send(record).get();
}

/**
* 批量发送消息
*/
public void sendBatch(String topic, List<DeviceMessage> messages) {
for (DeviceMessage msg : messages) {
ProducerRecord<String, String> record = new ProducerRecord<>(
topic,
"device-" + msg.getDeviceId(),
msg.getPayload().toString()
);
producer.send(record);
}
producer.flush(); // 确保所有消息都发送
}

public void close() {
if (producer != null) {
producer.close();
}
}
}

5. 高并发消费方案

5.1 Consumer Group配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package com.architecture.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

/**
* 高并发Kafka Consumer
*/
public class HighConcurrencyKafkaConsumer {

private KafkaConsumer<String, String> consumer;

public HighConcurrencyKafkaConsumer(String bootstrapServers, String groupId) {
Properties props = new Properties();

// Broker地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

// 消费者组ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

// 反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

// 自动提交偏移量
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交

// 自动提交间隔
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

// 偏移量重置策略
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // earliest, latest, none

// 会话超时时间
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

// 心跳间隔
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

// 最大拉取记录数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

// 最大拉取间隔
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

// 分区分配策略
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RangeAssignor," +
"org.apache.kafka.clients.consumer.RoundRobinAssignor," +
"org.apache.kafka.clients.consumer.StickyAssignor");

// 拉取最小字节数
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);

// 拉取最大等待时间
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);

// 拉取最大字节数
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 50MB

// 最大分区获取字节数
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB

consumer = new KafkaConsumer<>(props);
}

/**
* 订阅Topic
*/
public void subscribe(String topic) {
consumer.subscribe(Collections.singletonList(topic));
}

/**
* 消费消息
*/
public void consume() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
try {
// 处理消息
processMessage(record);
} catch (Exception e) {
System.err.println("消息处理失败: " + e.getMessage());
// 记录失败,可以发送到死信队列
}
}

// 手动提交偏移量
consumer.commitSync();
}
}

/**
* 处理消息
*/
private void processMessage(ConsumerRecord<String, String> record) {
String deviceId = record.key().replace("device-", "");
String message = record.value();

System.out.println("处理消息 - Device: " + deviceId
+ ", Partition: " + record.partition()
+ ", Offset: " + record.offset()
+ ", Message: " + message);

// 业务处理逻辑
// ...
}

public void close() {
if (consumer != null) {
consumer.close();
}
}
}

5.2 多线程消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.architecture.kafka.consumer;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* 多线程Kafka Consumer
*/
public class MultiThreadKafkaConsumer {

private final ExecutorService executorService;
private final AtomicBoolean running = new AtomicBoolean(true);
private final int threadCount;

public MultiThreadKafkaConsumer(int threadCount) {
this.threadCount = threadCount;
this.executorService = Executors.newFixedThreadPool(threadCount);
}

/**
* 启动多线程消费
*/
public void start(String bootstrapServers, String groupId, String topic) {
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
executorService.submit(() -> {
HighConcurrencyKafkaConsumer consumer =
new HighConcurrencyKafkaConsumer(
bootstrapServers,
groupId + "-thread-" + threadId
);
consumer.subscribe(topic);

while (running.get()) {
try {
consumer.consume();
} catch (Exception e) {
System.err.println("消费线程异常: " + e.getMessage());
}
}

consumer.close();
});
}
}

public void stop() {
running.set(false);
executorService.shutdown();
}
}

6. Spring Boot集成

6.1 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# application.yml
spring:
kafka:
bootstrap-servers: kafka-broker1:9092,kafka-broker2:9092,kafka-broker3:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1
retries: 3
batch-size: 16384
linger-ms: 10
buffer-memory: 33554432
compression-type: lz4
consumer:
group-id: device-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
enable-auto-commit: false
max-poll-records: 500
max-poll-interval-ms: 300000

6.2 Producer Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.architecture.kafka.service;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Service
public class DeviceMessageProducerService {

private final KafkaTemplate<String, String> kafkaTemplate;

public DeviceMessageProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

/**
* 发送设备消息
*/
public void sendDeviceMessage(String deviceId, String message) {
String topic = "device-events";
String key = "device-" + deviceId;

ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, key, message);

future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("消息发送成功: " + result.getRecordMetadata());
}

@Override
public void onFailure(Throwable ex) {
System.err.println("消息发送失败: " + ex.getMessage());
}
});
}

/**
* 批量发送
*/
public void sendBatch(List<DeviceMessage> messages) {
for (DeviceMessage msg : messages) {
sendDeviceMessage(msg.getDeviceId(), msg.getPayload().toString());
}
}
}

6.3 Consumer Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.architecture.kafka.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class DeviceMessageConsumerService {

@KafkaListener(
topics = "device-events",
groupId = "device-consumer-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeDeviceMessage(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment acknowledgment) {

try {
// 处理消息
processMessage(message, topic, partition, offset);

// 手动提交偏移量
acknowledgment.acknowledge();
} catch (Exception e) {
System.err.println("消息处理失败: " + e.getMessage());
// 可以重试或发送到死信队列
}
}

private void processMessage(String message, String topic, int partition, long offset) {
// 业务处理逻辑
System.out.println("消费消息 - Topic: " + topic
+ ", Partition: " + partition
+ ", Offset: " + offset
+ ", Message: " + message);
}
}

7. 性能优化

7.1 Producer优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 1. 批量发送
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 等待20ms

// 2. 压缩
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 或 "snappy"

// 3. 异步发送
producer.send(record, callback); // 使用回调,不阻塞

// 4. 缓冲区优化
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB

// 5. 连接池优化
props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 540000);

7.2 Consumer优化

1
2
3
4
5
6
7
8
9
10
11
12
// 1. 增加拉取批次
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);

// 2. 多线程消费
// 使用多个Consumer实例,每个实例处理不同分区

// 3. 异步提交偏移量
consumer.commitAsync();

// 4. 批量处理
List<ConsumerRecord> batch = new ArrayList<>();
// 收集一批消息后统一处理

7.3 分区分配优化

1
2
3
// 使用StickyAssignor,减少分区重平衡
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.StickyAssignor");

8. 监控和运维

8.1 监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.architecture.kafka.monitor;

import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;

import java.util.Map;

/**
* Kafka监控指标收集
*/
public class KafkaMetricsCollector {

/**
* 收集Producer指标
*/
public void collectProducerMetrics(KafkaProducer producer) {
Map<MetricName, ? extends Metric> metrics = producer.metrics();

for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
MetricName metricName = entry.getKey();
Metric metric = entry.getValue();

// 关键指标
if (metricName.name().contains("record-send-rate")) {
System.out.println("发送速率: " + metric.metricValue());
}
if (metricName.name().contains("record-error-rate")) {
System.out.println("错误速率: " + metric.metricValue());
}
if (metricName.name().contains("request-latency")) {
System.out.println("请求延迟: " + metric.metricValue());
}
}
}

/**
* 收集Consumer指标
*/
public void collectConsumerMetrics(KafkaConsumer consumer) {
Map<MetricName, ? extends Metric> metrics = consumer.metrics();

for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
MetricName metricName = entry.getKey();
Metric metric = entry.getValue();

// 关键指标
if (metricName.name().contains("records-consumed-rate")) {
System.out.println("消费速率: " + metric.metricValue());
}
if (metricName.name().contains("records-lag")) {
System.out.println("消费延迟: " + metric.metricValue());
}
}
}
}

8.2 告警配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# 告警规则
alerts:
- name: kafka_lag_high
condition: consumer_lag > 100000
action: send_notification

- name: kafka_producer_error_rate
condition: producer_error_rate > 0.01
action: send_notification

- name: kafka_broker_down
condition: broker_status != "up"
action: send_notification

9. 总结

9.1 架构要点

  1. 分区策略: 根据设备ID哈希分区,保证顺序性和负载均衡
  2. 集群规模: 6-12台Broker,副本因子3,保证高可用
  3. 负载均衡: 应用层和设备层双重负载均衡
  4. 消费模式: 多Consumer Group,多线程消费
  5. 性能优化: 批量发送、压缩、异步处理

9.2 最佳实践

  1. 分区数 = Broker数 × 2-3倍
  2. 副本因子至少为3
  3. 使用设备ID作为消息key
  4. 启用消息压缩(lz4或snappy)
  5. 手动提交偏移量,保证数据一致性
  6. 监控关键指标,及时告警
  7. 使用StickyAssignor减少重平衡

9.3 扩展性

  • 水平扩展: 增加Broker和分区
  • 垂直扩展: 提升单机配置
  • 动态调整: 根据负载动态调整分区数

架构师级别文章