Kafka主题、组、分区从基础到架构实战

1. 概述

1.1 核心概念

Kafka的核心概念包括Topic(主题)、Partition(分区)和Consumer Group(消费者组),这三个概念是理解和使用Kafka的基础。

Topic(主题)

  • 消息的分类,类似于数据库中的表
  • 一个Topic可以有多个Partition
  • 消息按顺序存储在Partition中

Partition(分区)

  • Topic的物理分区,每个Partition是一个有序的消息队列
  • 提高并发性能,支持水平扩展
  • 消息在Partition内有序

Consumer Group(消费者组)

  • 多个Consumer组成的组
  • 组内Consumer共享Topic的Partition
  • 实现负载均衡和故障转移

1.2 本文内容结构

本文将从以下几个方面全面解析Kafka主题、组、分区:

  1. Topic管理:创建、配置、查看、删除
  2. Partition详解:分区策略、分区分配、分区重平衡
  3. Consumer Group:工作原理、负载均衡、Offset管理
  4. 实战案例:不同场景下的最佳实践

2. Topic(主题)管理

2.1 Topic基础概念

2.1.1 Topic定义

Topic是Kafka中消息的逻辑分类,类似于数据库中的表。生产者将消息发送到Topic,消费者从Topic读取消息。

Topic特点

  • 一个Topic可以有多个Partition
  • 每个Partition可以有多个Replica
  • 消息在Partition内有序
  • 不同Partition之间的消息顺序不保证

2.1.2 Topic命名规范

命名建议

  • 使用小写字母和连字符
  • 避免使用特殊字符
  • 名称要有意义,便于理解

示例

1
2
3
4
device-data          # 设备数据
user-events # 用户事件
order-transactions # 订单交易
log-app # 应用日志

2.2 Topic创建

2.2.1 命令行创建

基础创建

1
2
3
4
5
6
7
8
9
10
11
# 创建Topic(使用默认配置)
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic my-topic

# 指定分区数和副本数
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic device-data \
--partitions 3 \
--replication-factor 2

完整创建(带配置)

1
2
3
4
5
6
7
8
9
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic device-data \
--partitions 10 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config segment.ms=3600000 \
--config compression.type=snappy \
--config max.message.bytes=1048576

配置说明

  • partitions:分区数
  • replication-factor:副本数
  • retention.ms:消息保留时间(毫秒)
  • segment.ms:日志段滚动时间
  • compression.type:压缩类型
  • max.message.bytes:最大消息大小

2.2.2 集群创建

集群环境创建

1
2
3
4
5
bin/kafka-topics.sh --create \
--bootstrap-server 192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092 \
--topic device-data \
--partitions 10 \
--replication-factor 3

2.2.3 自动创建Topic

启用自动创建

1
2
# server.properties
auto.create.topics.enable=true

自动创建规则

  • 使用默认配置
  • 分区数:num.partitions(默认1)
  • 副本数:default.replication.factor(默认1)

生产环境建议:禁用自动创建,手动创建Topic并配置合适的参数。

2.3 Topic配置

2.3.1 常用配置参数

保留策略

1
2
3
4
5
# 按时间保留(7天)
--config retention.ms=604800000

# 按大小保留(10GB)
--config retention.bytes=10737418240

压缩配置

1
2
3
4
5
# 压缩类型:none, gzip, snappy, lz4, zstd
--config compression.type=snappy

# 启用日志压缩
--config cleanup.policy=compact

段配置

1
2
3
4
5
# 段大小(1GB)
--config segment.bytes=1073741824

# 段滚动时间(1小时)
--config segment.ms=3600000

消息大小

1
2
# 最大消息大小(1MB)
--config max.message.bytes=1048576

2.3.2 修改Topic配置

修改保留时间

1
2
3
4
bin/kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--topic device-data \
--add-config retention.ms=86400000

修改压缩类型

1
2
3
4
bin/kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--topic device-data \
--add-config compression.type=gzip

删除配置

1
2
3
4
bin/kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--topic device-data \
--delete-config retention.ms

2.4 Topic查看

2.4.1 查看Topic列表

1
2
3
4
5
6
7
8
# 查看所有Topic
bin/kafka-topics.sh --list \
--bootstrap-server localhost:9092

# 使用正则表达式过滤
bin/kafka-topics.sh --list \
--bootstrap-server localhost:9092 \
--topic device-*

2.4.2 查看Topic详情

1
2
3
4
# 查看Topic详细信息
bin/kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic device-data

输出示例

1
2
3
4
5
Topic: device-data	PartitionCount: 10	ReplicationFactor: 3	Configs: segment.ms=3600000
Topic: device-data Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: device-data Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: device-data Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
...

字段说明

  • Partition:分区编号
  • Leader:Leader副本所在的Broker ID
  • Replicas:所有副本所在的Broker ID列表
  • Isr:In-Sync Replicas,与Leader保持同步的副本列表

2.4.3 查看Topic配置

1
2
3
4
# 查看Topic所有配置
bin/kafka-configs.sh --describe \
--bootstrap-server localhost:9092 \
--topic device-data

2.5 Topic删除

2.5.1 删除Topic

1
2
3
4
# 删除Topic
bin/kafka-topics.sh --delete \
--bootstrap-server localhost:9092 \
--topic device-data

注意事项

  • 删除是异步操作
  • 需要等待一段时间才能完全删除
  • 如果delete.topic.enable=false,删除操作会被忽略

2.5.2 启用删除功能

1
2
# server.properties
delete.topic.enable=true

2.6 Topic实战案例

2.6.1 案例1:设备数据Topic

场景:IoT设备上报数据,需要长期保存。

1
2
3
4
5
6
7
8
9
bin/kafka-topics.sh --create \
--bootstrap-server 192.168.1.101:9092 \
--topic device-data \
--partitions 10 \
--replication-factor 3 \
--config retention.ms=2592000000 \
--config segment.bytes=1073741824 \
--config compression.type=snappy \
--config max.message.bytes=1048576

配置说明

  • partitions=10:支持10个Consumer并发消费
  • replication-factor=3:3个副本,保障高可用
  • retention.ms=2592000000:保留30天
  • compression.type=snappy:使用Snappy压缩,节省空间

2.6.2 案例2:日志收集Topic

场景:收集应用日志,短期保存。

1
2
3
4
5
6
7
8
bin/kafka-topics.sh --create \
--bootstrap-server 192.168.1.101:9092 \
--topic log-app \
--partitions 20 \
--replication-factor 2 \
--config retention.ms=604800000 \
--config segment.ms=3600000 \
--config compression.type=gzip

配置说明

  • partitions=20:高并发日志写入
  • replication-factor=2:2个副本,节省资源
  • retention.ms=604800000:保留7天
  • compression.type=gzip:Gzip压缩,压缩率高

2.6.3 案例3:用户事件Topic

场景:用户行为事件,需要日志压缩。

1
2
3
4
5
6
7
8
bin/kafka-topics.sh --create \
--bootstrap-server 192.168.1.101:9092 \
--topic user-events \
--partitions 5 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.1 \
--config segment.ms=3600000

配置说明

  • cleanup.policy=compact:启用日志压缩
  • min.cleanable.dirty.ratio=0.1:压缩阈值
  • 只保留每个key的最新值

3. Partition(分区)详解

3.1 Partition基础概念

3.1.1 Partition定义

Partition是Topic的物理分区,每个Partition是一个有序的消息队列。消息在Partition内有序,但不同Partition之间的消息顺序不保证。

Partition特点

  • 提高并发性能
  • 支持水平扩展
  • 消息在Partition内有序
  • 每个Partition可以有多个Replica

3.1.2 Partition数量选择

选择原则

  • 并发消费:Partition数 ≥ Consumer数量
  • 吞吐量:更多Partition可以提高吞吐量
  • 资源消耗:Partition过多会增加资源消耗

推荐配置

  • 小规模:3-5个Partition
  • 中规模:10-20个Partition
  • 大规模:50-100个Partition

注意事项

  • Partition数量创建后不能减少(只能增加)
  • 建议根据实际需求设置,避免过多

3.2 分区策略

3.2.1 默认分区策略

Producer发送消息时的分区选择

  1. 指定Partition:直接发送到指定Partition
  2. 指定Key:根据Key的hash值选择Partition
  3. 轮询:没有Key时,轮询选择Partition

Java代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 1. 指定Partition
ProducerRecord<String, String> record = new ProducerRecord<>(
"device-data",
0, // 指定Partition 0
"device-001",
"data"
);
producer.send(record);

// 2. 指定Key(根据Key的hash值选择Partition)
ProducerRecord<String, String> record = new ProducerRecord<>(
"device-data",
"device-001", // Key
"data"
);
producer.send(record);

// 3. 不指定Key(轮询)
ProducerRecord<String, String> record = new ProducerRecord<>(
"device-data",
null, // 无Key
"data"
);
producer.send(record);

3.2.2 自定义分区器

实现Partitioner接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.example.kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;

public class DevicePartitioner implements Partitioner {

@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

if (key == null) {
// 无Key时轮询
return 0;
}

String deviceId = (String) key;

// 根据设备ID的hash值选择Partition
// 保证同一设备的数据发送到同一Partition
return Math.abs(deviceId.hashCode()) % numPartitions;
}

@Override
public void close() {
// 清理资源
}

@Override
public void configure(Map<String, ?> configs) {
// 初始化配置
}
}

使用自定义分区器

1
2
3
4
5
6
7
8
9
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 指定自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DevicePartitioner.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

3.2.3 分区策略实战

场景1:设备数据分区

需求:同一设备的数据发送到同一Partition,保证顺序。

1
2
3
4
5
6
7
// 使用设备ID作为Key
ProducerRecord<String, String> record = new ProducerRecord<>(
"device-data",
deviceId, // 设备ID作为Key
data
);
producer.send(record);

场景2:用户事件分区

需求:同一用户的事件发送到同一Partition。

1
2
3
4
5
6
7
// 使用用户ID作为Key
ProducerRecord<String, String> record = new ProducerRecord<>(
"user-events",
userId, // 用户ID作为Key
eventData
);
producer.send(record);

场景3:日志分区

需求:日志按应用名称分区。

1
2
3
4
5
6
7
// 使用应用名称作为Key
ProducerRecord<String, String> record = new ProducerRecord<>(
"log-app",
appName, // 应用名称作为Key
logData
);
producer.send(record);

3.3 分区分配

3.3.1 Consumer分区分配

Consumer Group内的分区分配

  1. Range策略:按Partition范围分配
  2. RoundRobin策略:轮询分配
  3. Sticky策略:粘性分配(尽量保持原有分配)

默认策略:Range

3.3.2 Range分配策略

分配规则

  • 按Partition编号排序
  • 按Consumer排序
  • 平均分配,余数分配给前面的Consumer

示例

  • 3个Partition(0, 1, 2)
  • 2个Consumer(C1, C2)

分配结果

  • C1:Partition 0, 1
  • C2:Partition 2

3.3.3 RoundRobin分配策略

分配规则

  • 所有Partition和Consumer排序
  • 轮询分配

示例

  • 3个Partition(0, 1, 2)
  • 2个Consumer(C1, C2)

分配结果

  • C1:Partition 0, 2
  • C2:Partition 1

3.3.4 Sticky分配策略

分配规则

  • 尽量保持原有分配
  • 减少分区重平衡

优势

  • 减少重平衡开销
  • 提高稳定性

3.3.5 配置分配策略

Java配置

1
2
3
4
5
6
7
8
9
10
11
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");

// 配置分配策略
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
// 或 "org.apache.kafka.clients.consumer.RangeAssignor"
// 或 "org.apache.kafka.clients.consumer.StickyAssignor"

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

多策略配置

1
2
3
4
5
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
Arrays.asList(
"org.apache.kafka.clients.consumer.StickyAssignor",
"org.apache.kafka.clients.consumer.RoundRobinAssignor"
));

3.4 分区重平衡

3.4.1 重平衡触发条件

触发场景

  1. Consumer加入组
  2. Consumer离开组
  3. Topic的Partition数量变化
  4. Consumer订阅的Topic变化

3.4.2 重平衡过程

过程

  1. 所有Consumer停止消费
  2. 释放Partition分配
  3. 重新分配Partition
  4. 恢复消费

影响

  • 消费暂停
  • 可能重复消费
  • 性能下降

3.4.3 减少重平衡

策略

  1. 使用Sticky分配策略:减少重平衡
  2. 合理设置session.timeout.ms:避免误判
  3. 合理设置max.poll.interval.ms:避免超时
  4. 批量处理:减少处理时间

配置示例

1
2
3
4
Properties props = new Properties();
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); // 30秒
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // 5分钟
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); // 批量处理

3.5 分区扩展

3.5.1 增加分区

增加Partition数量

1
2
3
4
bin/kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic device-data \
--partitions 20

注意事项

  • 只能增加,不能减少
  • 增加后需要重新分配Partition
  • 可能触发重平衡

3.5.2 分区扩展实战

场景:Topic初始3个Partition,需要扩展到10个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 1. 查看当前分区数
bin/kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic device-data

# 2. 扩展到10个分区
bin/kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic device-data \
--partitions 10

# 3. 验证扩展结果
bin/kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic device-data

4. Consumer Group(消费者组)

4.1 Consumer Group基础概念

4.1.1 Consumer Group定义

Consumer Group是多个Consumer组成的组,组内Consumer共享Topic的Partition,实现负载均衡和故障转移。

Consumer Group特点

  • 组内Consumer共享Topic的Partition
  • 一个Partition只能被组内一个Consumer消费
  • 支持负载均衡
  • 支持故障转移

4.1.2 Consumer Group命名

命名建议

  • 使用有意义的名称
  • 避免使用特殊字符
  • 名称要唯一

示例

1
2
3
device-processor-group
log-processor-group
user-event-processor-group

4.2 Consumer Group工作原理

4.2.1 工作流程

流程

  1. Consumer加入Group
  2. Group Coordinator分配Partition
  3. Consumer消费分配的Partition
  4. Consumer定期提交Offset
  5. Consumer离开时,Partition重新分配

4.2.2 Group Coordinator

Coordinator

  • 管理Consumer Group
  • 分配Partition
  • 监控Consumer状态

选择

  • 根据Group ID的hash值选择Broker
  • 该Broker成为Group Coordinator

4.3 Consumer Group配置

4.3.1 基础配置

1
2
3
4
5
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "device-processor-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

4.3.2 Offset管理

自动提交

1
2
3
// 启用自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // 1秒

手动提交

1
2
3
4
5
6
// 禁用自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

// 手动提交
consumer.commitSync(); // 同步提交
consumer.commitAsync(); // 异步提交

4.3.3 Offset重置

配置

1
2
3
4
5
6
7
8
// 从最早的消息开始消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// 从最新的消息开始消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

// 抛出异常
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

4.4 Consumer Group管理

4.4.1 查看Consumer Group

1
2
3
4
5
6
7
# 查看所有Consumer Group
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 查看指定Group的详细信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group device-processor-group \
--describe

输出示例

1
2
3
4
GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LAG  CONSUMER-ID
device-processor-group device-data 0 1000 0 consumer-1-xxx
device-processor-group device-data 1 2000 0 consumer-2-xxx
device-processor-group device-data 2 1500 0 consumer-3-xxx

字段说明

  • CURRENT-OFFSET:当前消费的Offset
  • LAG:消息积压数量
  • CONSUMER-ID:Consumer ID

4.4.2 重置Offset

重置到最早

1
2
3
4
5
6
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group device-processor-group \
--topic device-data \
--reset-offsets \
--to-earliest \
--execute

重置到最新

1
2
3
4
5
6
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group device-processor-group \
--topic device-data \
--reset-offsets \
--to-latest \
--execute

重置到指定Offset

1
2
3
4
5
6
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group device-processor-group \
--topic device-data \
--reset-offsets \
--to-offset 1000 \
--execute

4.4.3 删除Consumer Group

1
2
3
4
# 删除Consumer Group
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group device-processor-group \
--delete

4.5 Consumer Group实战

4.5.1 案例1:设备数据处理

场景:3个Consumer处理设备数据,10个Partition。

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "device-processor-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("device-data"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
processDeviceData(record.value());
}

// 手动提交Offset
consumer.commitSync();
}

分配结果

  • Consumer 1:Partition 0, 1, 2, 3
  • Consumer 2:Partition 4, 5, 6
  • Consumer 3:Partition 7, 8, 9

4.5.2 案例2:日志处理

场景:5个Consumer处理日志,20个Partition。

配置

1
2
3
4
5
6
7
8
9
10
11
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "log-processor-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("log-app"));

分配结果

  • 每个Consumer处理4个Partition

4.5.3 案例3:多Topic消费

场景:消费多个Topic。

配置

1
2
3
4
5
// 订阅多个Topic
consumer.subscribe(Arrays.asList("device-data", "user-events", "order-transactions"));

// 或者使用正则表达式
consumer.subscribe(Pattern.compile("device-.*"));

5. 综合实战案例

5.1 案例1:IoT设备监控系统

5.1.1 架构设计

1
2
3
4
5
6
7
8
9
10
11
10万台设备

Nginx(负载均衡)

Java服务(设备上报)

Kafka Topic: device-data (10个Partition, 3个副本)

Consumer Group: device-processor-group (3个Consumer)

数据库/存储

5.1.2 Topic配置

1
2
3
4
5
6
7
8
9
# 创建设备数据Topic
bin/kafka-topics.sh --create \
--bootstrap-server 192.168.1.101:9092 \
--topic device-data \
--partitions 10 \
--replication-factor 3 \
--config retention.ms=2592000000 \
--config segment.bytes=1073741824 \
--config compression.type=snappy

5.1.3 Producer配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 使用设备ID作为Key,保证同一设备的数据发送到同一Partition
ProducerRecord<String, String> record = new ProducerRecord<>(
"device-data",
deviceId,
data
);
producer.send(record);

5.1.4 Consumer配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "device-processor-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.StickyAssignor");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("device-data"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
processDeviceData(record.value());
}

consumer.commitSync();
}

5.1.5 分区分配

10个Partition,3个Consumer

  • Consumer 1:Partition 0, 1, 2, 3
  • Consumer 2:Partition 4, 5, 6
  • Consumer 3:Partition 7, 8, 9

5.2 案例2:日志收集系统

5.2.1 Topic配置

1
2
3
4
5
6
7
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic log-app \
--partitions 20 \
--replication-factor 2 \
--config retention.ms=604800000 \
--config compression.type=gzip

5.2.2 Producer配置

1
2
3
4
5
6
7
// 使用应用名称作为Key
ProducerRecord<String, String> record = new ProducerRecord<>(
"log-app",
appName, // 应用名称作为Key
logData
);
producer.send(record);

5.2.3 Consumer配置

1
2
3
4
5
6
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "log-processor-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");

5.3 案例3:用户事件系统

5.3.1 Topic配置

1
2
3
4
5
6
7
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic user-events \
--partitions 5 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.1

5.3.2 Producer配置

1
2
3
4
5
6
7
// 使用用户ID作为Key
ProducerRecord<String, String> record = new ProducerRecord<>(
"user-events",
userId, // 用户ID作为Key
eventData
);
producer.send(record);

5.3.3 Consumer配置

1
2
3
4
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-event-processor-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

6. 最佳实践

6.1 Topic设计

6.1.1 分区数选择

原则

  • Partition数 ≥ Consumer数量
  • 考虑未来扩展
  • 避免过多Partition

推荐

  • 小规模:3-5个
  • 中规模:10-20个
  • 大规模:50-100个

6.1.2 副本数选择

原则

  • 至少2个副本(生产环境)
  • 推荐3个副本(高可用)

配置

  • 开发环境:1个副本
  • 生产环境:3个副本

6.2 Partition设计

6.2.1 分区策略选择

场景1:需要顺序

  • 使用Key分区
  • 同一Key发送到同一Partition

场景2:不需要顺序

  • 不指定Key
  • 轮询分配

场景3:自定义分区

  • 实现Partitioner接口
  • 根据业务逻辑分区

6.2.2 Key设计

原则

  • Key要有意义
  • Key分布要均匀
  • 避免热点Partition

示例

  • 设备ID
  • 用户ID
  • 订单ID

6.3 Consumer Group设计

6.3.1 Group命名

原则

  • 名称要有意义
  • 名称要唯一
  • 避免使用特殊字符

6.3.2 Consumer数量

原则

  • Consumer数 ≤ Partition数
  • 多余的Consumer不会消费任何Partition

推荐

  • Consumer数 = Partition数(充分利用)

6.4 Offset管理

6.4.1 自动提交 vs 手动提交

自动提交

  • 简单,但可能丢失数据
  • 适合对数据丢失不敏感的场景

手动提交

  • 复杂,但数据安全
  • 适合对数据丢失敏感的场景

6.4.2 提交策略

同步提交

1
consumer.commitSync();  // 阻塞,等待提交完成

异步提交

1
consumer.commitAsync();  // 非阻塞,性能更好

推荐:处理完消息后同步提交,保障数据不丢失。


7. 监控和故障排查

7.1 监控指标

7.1.1 Topic监控

关键指标

  • 消息生产速率
  • 消息消费速率
  • 消息积压(Lag)
  • Partition数量

7.1.2 Consumer Group监控

关键指标

  • Consumer数量
  • 消费速率
  • Offset提交延迟
  • 重平衡次数

7.2 故障排查

7.2.1 消息积压

原因

  • Consumer处理慢
  • Consumer数量不足
  • 网络问题

解决

  • 增加Consumer数量
  • 优化Consumer处理逻辑
  • 检查网络连接

7.2.2 重平衡频繁

原因

  • session.timeout.ms设置过小
  • max.poll.interval.ms设置过小
  • Consumer处理时间过长

解决

  • 增加超时时间
  • 优化处理逻辑
  • 批量处理

8. 总结

8.1 核心要点

  1. Topic管理:创建、配置、查看、删除
  2. Partition详解:分区策略、分区分配、分区扩展
  3. Consumer Group:工作原理、Offset管理、负载均衡
  4. 实战案例:IoT设备监控、日志收集、用户事件

8.2 架构师建议

  1. Topic设计

    • 合理设置分区数
    • 合理设置副本数
    • 合理设置保留策略
  2. Partition设计

    • 根据业务需求选择分区策略
    • 使用Key保证顺序
    • 避免热点Partition
  3. Consumer Group设计

    • Consumer数 ≤ Partition数
    • 合理设置Offset提交策略
    • 监控消费延迟

8.3 最佳实践

  1. 标准化:统一Topic命名和配置标准
  2. 监控化:实时监控Topic和Consumer Group
  3. 文档化:维护配置文档和架构图
  4. 测试化:充分测试分区分配和重平衡

相关文章