第458集Kafka主题组分区从基础到架构实战 | 字数总计: 5.1k | 阅读时长: 22分钟 | 阅读量:
Kafka主题、组、分区从基础到架构实战 1. 概述 1.1 核心概念 Kafka的核心概念 包括Topic(主题)、Partition(分区)和Consumer Group(消费者组),这三个概念是理解和使用Kafka的基础。
Topic(主题) :
消息的分类,类似于数据库中的表
一个Topic可以有多个Partition
消息按顺序存储在Partition中
Partition(分区) :
Topic的物理分区,每个Partition是一个有序的消息队列
提高并发性能,支持水平扩展
消息在Partition内有序
Consumer Group(消费者组) :
多个Consumer组成的组
组内Consumer共享Topic的Partition
实现负载均衡和故障转移
1.2 本文内容结构 本文将从以下几个方面全面解析Kafka主题、组、分区:
Topic管理 :创建、配置、查看、删除
Partition详解 :分区策略、分区分配、分区重平衡
Consumer Group :工作原理、负载均衡、Offset管理
实战案例 :不同场景下的最佳实践
2. Topic(主题)管理 2.1 Topic基础概念 2.1.1 Topic定义 Topic 是Kafka中消息的逻辑分类,类似于数据库中的表。生产者将消息发送到Topic,消费者从Topic读取消息。
Topic特点 :
一个Topic可以有多个Partition
每个Partition可以有多个Replica
消息在Partition内有序
不同Partition之间的消息顺序不保证
2.1.2 Topic命名规范 命名建议 :
使用小写字母和连字符
避免使用特殊字符
名称要有意义,便于理解
示例 :
1 2 3 4 device-data # 设备数据 user-events # 用户事件 order-transactions # 订单交易 log-app # 应用日志
2.2 Topic创建 2.2.1 命令行创建 基础创建 :
1 2 3 4 5 6 7 8 9 10 11 bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --topic my-topic bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --topic device-data \ --partitions 3 \ --replication-factor 2
完整创建(带配置) :
1 2 3 4 5 6 7 8 9 bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --topic device-data \ --partitions 10 \ --replication-factor 3 \ --config retention.ms=604800000 \ --config segment.ms=3600000 \ --config compression.type=snappy \ --config max.message.bytes=1048576
配置说明 :
partitions:分区数
replication-factor:副本数
retention.ms:消息保留时间(毫秒)
segment.ms:日志段滚动时间
compression.type:压缩类型
max.message.bytes:最大消息大小
2.2.2 集群创建 集群环境创建 :
1 2 3 4 5 bin/kafka-topics.sh --create \ --bootstrap-server 192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092 \ --topic device-data \ --partitions 10 \ --replication-factor 3
2.2.3 自动创建Topic 启用自动创建 :
1 2 auto.create.topics.enable =true
自动创建规则 :
使用默认配置
分区数:num.partitions(默认1)
副本数:default.replication.factor(默认1)
生产环境建议 :禁用自动创建,手动创建Topic并配置合适的参数。
2.3 Topic配置 2.3.1 常用配置参数 保留策略 :
1 2 3 4 5 --config retention.ms=604800000 --config retention.bytes=10737418240
压缩配置 :
1 2 3 4 5 --config compression.type=snappy --config cleanup.policy=compact
段配置 :
1 2 3 4 5 --config segment.bytes=1073741824 --config segment.ms=3600000
消息大小 :
1 2 --config max.message.bytes=1048576
2.3.2 修改Topic配置 修改保留时间 :
1 2 3 4 bin/kafka-configs.sh --alter \ --bootstrap-server localhost:9092 \ --topic device-data \ --add-config retention.ms=86400000
修改压缩类型 :
1 2 3 4 bin/kafka-configs.sh --alter \ --bootstrap-server localhost:9092 \ --topic device-data \ --add-config compression.type=gzip
删除配置 :
1 2 3 4 bin/kafka-configs.sh --alter \ --bootstrap-server localhost:9092 \ --topic device-data \ --delete-config retention.ms
2.4 Topic查看 2.4.1 查看Topic列表 1 2 3 4 5 6 7 8 bin/kafka-topics.sh --list \ --bootstrap-server localhost:9092 bin/kafka-topics.sh --list \ --bootstrap-server localhost:9092 \ --topic device-*
2.4.2 查看Topic详情 1 2 3 4 bin/kafka-topics.sh --describe \ --bootstrap-server localhost:9092 \ --topic device-data
输出示例 :
1 2 3 4 5 Topic: device-data PartitionCount: 10 ReplicationFactor: 3 Configs: segment.ms=3600000 Topic: device-data Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: device-data Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: device-data Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 ...
字段说明 :
Partition:分区编号
Leader:Leader副本所在的Broker ID
Replicas:所有副本所在的Broker ID列表
Isr:In-Sync Replicas,与Leader保持同步的副本列表
2.4.3 查看Topic配置 1 2 3 4 bin/kafka-configs.sh --describe \ --bootstrap-server localhost:9092 \ --topic device-data
2.5 Topic删除 2.5.1 删除Topic 1 2 3 4 bin/kafka-topics.sh --delete \ --bootstrap-server localhost:9092 \ --topic device-data
注意事项 :
删除是异步操作
需要等待一段时间才能完全删除
如果delete.topic.enable=false,删除操作会被忽略
2.5.2 启用删除功能 1 2 delete.topic.enable =true
2.6 Topic实战案例 2.6.1 案例1:设备数据Topic 场景 :IoT设备上报数据,需要长期保存。
1 2 3 4 5 6 7 8 9 bin/kafka-topics.sh --create \ --bootstrap-server 192.168.1.101:9092 \ --topic device-data \ --partitions 10 \ --replication-factor 3 \ --config retention.ms=2592000000 \ --config segment.bytes=1073741824 \ --config compression.type=snappy \ --config max.message.bytes=1048576
配置说明 :
partitions=10:支持10个Consumer并发消费
replication-factor=3:3个副本,保障高可用
retention.ms=2592000000:保留30天
compression.type=snappy:使用Snappy压缩,节省空间
2.6.2 案例2:日志收集Topic 场景 :收集应用日志,短期保存。
1 2 3 4 5 6 7 8 bin/kafka-topics.sh --create \ --bootstrap-server 192.168.1.101:9092 \ --topic log-app \ --partitions 20 \ --replication-factor 2 \ --config retention.ms=604800000 \ --config segment.ms=3600000 \ --config compression.type=gzip
配置说明 :
partitions=20:高并发日志写入
replication-factor=2:2个副本,节省资源
retention.ms=604800000:保留7天
compression.type=gzip:Gzip压缩,压缩率高
2.6.3 案例3:用户事件Topic 场景 :用户行为事件,需要日志压缩。
1 2 3 4 5 6 7 8 bin/kafka-topics.sh --create \ --bootstrap-server 192.168.1.101:9092 \ --topic user-events \ --partitions 5 \ --replication-factor 3 \ --config cleanup.policy=compact \ --config min.cleanable.dirty.ratio=0.1 \ --config segment.ms=3600000
配置说明 :
cleanup.policy=compact:启用日志压缩
min.cleanable.dirty.ratio=0.1:压缩阈值
只保留每个key的最新值
3. Partition(分区)详解 3.1 Partition基础概念 3.1.1 Partition定义 Partition 是Topic的物理分区,每个Partition是一个有序的消息队列。消息在Partition内有序,但不同Partition之间的消息顺序不保证。
Partition特点 :
提高并发性能
支持水平扩展
消息在Partition内有序
每个Partition可以有多个Replica
3.1.2 Partition数量选择 选择原则 :
并发消费 :Partition数 ≥ Consumer数量
吞吐量 :更多Partition可以提高吞吐量
资源消耗 :Partition过多会增加资源消耗
推荐配置 :
小规模:3-5个Partition
中规模:10-20个Partition
大规模:50-100个Partition
注意事项 :
Partition数量创建后不能减少(只能增加)
建议根据实际需求设置,避免过多
3.2 分区策略 3.2.1 默认分区策略 Producer发送消息时的分区选择 :
指定Partition :直接发送到指定Partition
指定Key :根据Key的hash值选择Partition
轮询 :没有Key时,轮询选择Partition
Java代码示例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 ProducerRecord<String, String> record = new ProducerRecord <>( "device-data" , 0 , "device-001" , "data" ); producer.send(record); ProducerRecord<String, String> record = new ProducerRecord <>( "device-data" , "device-001" , "data" ); producer.send(record); ProducerRecord<String, String> record = new ProducerRecord <>( "device-data" , null , "data" ); producer.send(record);
3.2.2 自定义分区器 实现Partitioner接口 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package com.example.kafka;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import org.apache.kafka.common.PartitionInfo;import java.util.List;import java.util.Map;public class DevicePartitioner implements Partitioner { @Override public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (key == null ) { return 0 ; } String deviceId = (String) key; return Math.abs(deviceId.hashCode()) % numPartitions; } @Override public void close () { } @Override public void configure (Map<String, ?> configs) { } }
使用自定义分区器 :
1 2 3 4 5 6 7 8 9 Properties props = new Properties ();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DevicePartitioner.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer <>(props);
3.2.3 分区策略实战 场景1:设备数据分区
需求 :同一设备的数据发送到同一Partition,保证顺序。
1 2 3 4 5 6 7 ProducerRecord<String, String> record = new ProducerRecord <>( "device-data" , deviceId, data ); producer.send(record);
场景2:用户事件分区
需求 :同一用户的事件发送到同一Partition。
1 2 3 4 5 6 7 ProducerRecord<String, String> record = new ProducerRecord <>( "user-events" , userId, eventData ); producer.send(record);
场景3:日志分区
需求 :日志按应用名称分区。
1 2 3 4 5 6 7 ProducerRecord<String, String> record = new ProducerRecord <>( "log-app" , appName, logData ); producer.send(record);
3.3 分区分配 3.3.1 Consumer分区分配 Consumer Group内的分区分配 :
Range策略 :按Partition范围分配
RoundRobin策略 :轮询分配
Sticky策略 :粘性分配(尽量保持原有分配)
默认策略 :Range
3.3.2 Range分配策略 分配规则 :
按Partition编号排序
按Consumer排序
平均分配,余数分配给前面的Consumer
示例 :
3个Partition(0, 1, 2)
2个Consumer(C1, C2)
分配结果 :
C1:Partition 0, 1
C2:Partition 2
3.3.3 RoundRobin分配策略 分配规则 :
所有Partition和Consumer排序
轮询分配
示例 :
3个Partition(0, 1, 2)
2个Consumer(C1, C2)
分配结果 :
C1:Partition 0, 2
C2:Partition 1
3.3.4 Sticky分配策略 分配规则 :
优势 :
3.3.5 配置分配策略 Java配置 :
1 2 3 4 5 6 7 8 9 10 11 Properties props = new Properties ();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group" ); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor" ); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props);
多策略配置 :
1 2 3 4 5 props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Arrays.asList( "org.apache.kafka.clients.consumer.StickyAssignor" , "org.apache.kafka.clients.consumer.RoundRobinAssignor" ));
3.4 分区重平衡 3.4.1 重平衡触发条件 触发场景 :
Consumer加入组
Consumer离开组
Topic的Partition数量变化
Consumer订阅的Topic变化
3.4.2 重平衡过程 过程 :
所有Consumer停止消费
释放Partition分配
重新分配Partition
恢复消费
影响 :
3.4.3 减少重平衡 策略 :
使用Sticky分配策略 :减少重平衡
合理设置session.timeout.ms :避免误判
合理设置max.poll.interval.ms :避免超时
批量处理 :减少处理时间
配置示例 :
1 2 3 4 Properties props = new Properties ();props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000" ); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000" ); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500" );
3.5 分区扩展 3.5.1 增加分区 增加Partition数量 :
1 2 3 4 bin/kafka-topics.sh --alter \ --bootstrap-server localhost:9092 \ --topic device-data \ --partitions 20
注意事项 :
只能增加,不能减少
增加后需要重新分配Partition
可能触发重平衡
3.5.2 分区扩展实战 场景 :Topic初始3个Partition,需要扩展到10个。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 bin/kafka-topics.sh --describe \ --bootstrap-server localhost:9092 \ --topic device-data bin/kafka-topics.sh --alter \ --bootstrap-server localhost:9092 \ --topic device-data \ --partitions 10 bin/kafka-topics.sh --describe \ --bootstrap-server localhost:9092 \ --topic device-data
4. Consumer Group(消费者组) 4.1 Consumer Group基础概念 4.1.1 Consumer Group定义 Consumer Group 是多个Consumer组成的组,组内Consumer共享Topic的Partition,实现负载均衡和故障转移。
Consumer Group特点 :
组内Consumer共享Topic的Partition
一个Partition只能被组内一个Consumer消费
支持负载均衡
支持故障转移
4.1.2 Consumer Group命名 命名建议 :
示例 :
1 2 3 device-processor-group log-processor-group user-event-processor-group
4.2 Consumer Group工作原理 4.2.1 工作流程 流程 :
Consumer加入Group
Group Coordinator分配Partition
Consumer消费分配的Partition
Consumer定期提交Offset
Consumer离开时,Partition重新分配
4.2.2 Group Coordinator Coordinator :
管理Consumer Group
分配Partition
监控Consumer状态
选择 :
根据Group ID的hash值选择Broker
该Broker成为Group Coordinator
4.3 Consumer Group配置 4.3.1 基础配置 1 2 3 4 5 Properties props = new Properties ();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(ConsumerConfig.GROUP_ID_CONFIG, "device-processor-group" ); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
4.3.2 Offset管理 自动提交 :
1 2 3 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true" ); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000" );
手动提交 :
1 2 3 4 5 6 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false" ); consumer.commitSync(); consumer.commitAsync();
4.3.3 Offset重置 配置 :
1 2 3 4 5 6 7 8 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest" ); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none" );
4.4 Consumer Group管理 4.4.1 查看Consumer Group 1 2 3 4 5 6 7 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group device-processor-group \ --describe
输出示例 :
1 2 3 4 GROUP TOPIC PARTITION CURRENT-OFFSET LAG CONSUMER-ID device-processor-group device-data 0 1000 0 consumer-1-xxx device-processor-group device-data 1 2000 0 consumer-2-xxx device-processor-group device-data 2 1500 0 consumer-3-xxx
字段说明 :
CURRENT-OFFSET:当前消费的Offset
LAG:消息积压数量
CONSUMER-ID:Consumer ID
4.4.2 重置Offset 重置到最早 :
1 2 3 4 5 6 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group device-processor-group \ --topic device-data \ --reset-offsets \ --to-earliest \ --execute
重置到最新 :
1 2 3 4 5 6 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group device-processor-group \ --topic device-data \ --reset-offsets \ --to-latest \ --execute
重置到指定Offset :
1 2 3 4 5 6 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group device-processor-group \ --topic device-data \ --reset-offsets \ --to-offset 1000 \ --execute
4.4.3 删除Consumer Group 1 2 3 4 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group device-processor-group \ --delete
4.5 Consumer Group实战 4.5.1 案例1:设备数据处理 场景 :3个Consumer处理设备数据,10个Partition。
配置 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Properties props = new Properties ();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(ConsumerConfig.GROUP_ID_CONFIG, "device-processor-group" ); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false" ); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500" ); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props); consumer.subscribe(Collections.singletonList("device-data" )); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100 )); for (ConsumerRecord<String, String> record : records) { processDeviceData(record.value()); } consumer.commitSync(); }
分配结果 :
Consumer 1:Partition 0, 1, 2, 3
Consumer 2:Partition 4, 5, 6
Consumer 3:Partition 7, 8, 9
4.5.2 案例2:日志处理 场景 :5个Consumer处理日志,20个Partition。
配置 :
1 2 3 4 5 6 7 8 9 10 11 Properties props = new Properties ();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(ConsumerConfig.GROUP_ID_CONFIG, "log-processor-group" ); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true" ); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000" ); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000" ); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props); consumer.subscribe(Collections.singletonList("log-app" ));
分配结果 :
4.5.3 案例3:多Topic消费 场景 :消费多个Topic。
配置 :
1 2 3 4 5 consumer.subscribe(Arrays.asList("device-data" , "user-events" , "order-transactions" )); consumer.subscribe(Pattern.compile("device-.*" ));
5. 综合实战案例 5.1 案例1:IoT设备监控系统 5.1.1 架构设计 1 2 3 4 5 6 7 8 9 10 11 10万台设备 ↓ Nginx(负载均衡) ↓ Java服务(设备上报) ↓ Kafka Topic: device-data (10个Partition, 3个副本) ↓ Consumer Group: device-processor-group (3个Consumer) ↓ 数据库/存储
5.1.2 Topic配置 1 2 3 4 5 6 7 8 9 bin/kafka-topics.sh --create \ --bootstrap-server 192.168.1.101:9092 \ --topic device-data \ --partitions 10 \ --replication-factor 3 \ --config retention.ms=2592000000 \ --config segment.bytes=1073741824 \ --config compression.type=snappy
5.1.3 Producer配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Properties props = new Properties ();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092" ); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all" ); props.put(ProducerConfig.RETRIES_CONFIG, 3 ); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 ); props.put(ProducerConfig.LINGER_MS_CONFIG, 10 ); KafkaProducer<String, String> producer = new KafkaProducer <>(props); ProducerRecord<String, String> record = new ProducerRecord <>( "device-data" , deviceId, data ); producer.send(record);
5.1.4 Consumer配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Properties props = new Properties ();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092" ); props.put(ConsumerConfig.GROUP_ID_CONFIG, "device-processor-group" ); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false" ); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500" ); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor" ); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props); consumer.subscribe(Collections.singletonList("device-data" )); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100 )); for (ConsumerRecord<String, String> record : records) { processDeviceData(record.value()); } consumer.commitSync(); }
5.1.5 分区分配 10个Partition,3个Consumer :
Consumer 1:Partition 0, 1, 2, 3
Consumer 2:Partition 4, 5, 6
Consumer 3:Partition 7, 8, 9
5.2 案例2:日志收集系统 5.2.1 Topic配置 1 2 3 4 5 6 7 bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --topic log-app \ --partitions 20 \ --replication-factor 2 \ --config retention.ms=604800000 \ --config compression.type=gzip
5.2.2 Producer配置 1 2 3 4 5 6 7 ProducerRecord<String, String> record = new ProducerRecord <>( "log-app" , appName, logData ); producer.send(record);
5.2.3 Consumer配置 1 2 3 4 5 6 Properties props = new Properties ();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(ConsumerConfig.GROUP_ID_CONFIG, "log-processor-group" ); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true" ); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000" ); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000" );
5.3 案例3:用户事件系统 5.3.1 Topic配置 1 2 3 4 5 6 7 bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --topic user-events \ --partitions 5 \ --replication-factor 3 \ --config cleanup.policy=compact \ --config min.cleanable.dirty.ratio=0.1
5.3.2 Producer配置 1 2 3 4 5 6 7 ProducerRecord<String, String> record = new ProducerRecord <>( "user-events" , userId, eventData ); producer.send(record);
5.3.3 Consumer配置 1 2 3 4 Properties props = new Properties ();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-event-processor-group" ); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false" );
6. 最佳实践 6.1 Topic设计 6.1.1 分区数选择 原则 :
Partition数 ≥ Consumer数量
考虑未来扩展
避免过多Partition
推荐 :
小规模:3-5个
中规模:10-20个
大规模:50-100个
6.1.2 副本数选择 原则 :
配置 :
6.2 Partition设计 6.2.1 分区策略选择 场景1:需要顺序 :
使用Key分区
同一Key发送到同一Partition
场景2:不需要顺序 :
场景3:自定义分区 :
6.2.2 Key设计 原则 :
Key要有意义
Key分布要均匀
避免热点Partition
示例 :
6.3 Consumer Group设计 6.3.1 Group命名 原则 :
6.3.2 Consumer数量 原则 :
Consumer数 ≤ Partition数
多余的Consumer不会消费任何Partition
推荐 :
Consumer数 = Partition数(充分利用)
6.4 Offset管理 6.4.1 自动提交 vs 手动提交 自动提交 :
手动提交 :
6.4.2 提交策略 同步提交 :
异步提交 :
推荐 :处理完消息后同步提交,保障数据不丢失。
7. 监控和故障排查 7.1 监控指标 7.1.1 Topic监控 关键指标 :
消息生产速率
消息消费速率
消息积压(Lag)
Partition数量
7.1.2 Consumer Group监控 关键指标 :
Consumer数量
消费速率
Offset提交延迟
重平衡次数
7.2 故障排查 7.2.1 消息积压 原因 :
Consumer处理慢
Consumer数量不足
网络问题
解决 :
增加Consumer数量
优化Consumer处理逻辑
检查网络连接
7.2.2 重平衡频繁 原因 :
session.timeout.ms设置过小
max.poll.interval.ms设置过小
Consumer处理时间过长
解决 :
8. 总结 8.1 核心要点
Topic管理 :创建、配置、查看、删除
Partition详解 :分区策略、分区分配、分区扩展
Consumer Group :工作原理、Offset管理、负载均衡
实战案例 :IoT设备监控、日志收集、用户事件
8.2 架构师建议
Topic设计 :
Partition设计 :
根据业务需求选择分区策略
使用Key保证顺序
避免热点Partition
Consumer Group设计 :
Consumer数 ≤ Partition数
合理设置Offset提交策略
监控消费延迟
8.3 最佳实践
标准化 :统一Topic命名和配置标准
监控化 :实时监控Topic和Consumer Group
文档化 :维护配置文档和架构图
测试化 :充分测试分区分配和重平衡
相关文章 :