第189集Kafka集群架构设计与高可用部署 | 字数总计: 4.9k | 阅读时长: 25分钟 | 阅读量:
1. Kafka集群架构概述 Apache Kafka是一个分布式流处理平台,具有高吞吐量、低延迟、高可用的特点。Kafka集群架构设计是构建高性能消息系统的核心,需要合理规划Broker节点、Topic分区、副本策略等关键组件。本文将详细介绍Kafka集群架构的设计原理、部署方案以及最佳实践。
1.1 Kafka核心组件
Broker : Kafka服务器节点,负责消息存储和转发
Topic : 消息主题,用于分类和组织消息
Partition : 分区,Topic的物理分割单元
Replica : 副本,分区的备份,保证数据可靠性
Producer : 生产者,发送消息到Topic
Consumer : 消费者,从Topic消费消息
Consumer Group : 消费者组,实现负载均衡和容错
Zookeeper : 协调服务,管理集群元数据
1.2 Kafka架构特点
分布式 : 支持多Broker集群部署
高吞吐 : 支持百万级消息处理
低延迟 : 毫秒级消息传递延迟
高可用 : 副本机制保证数据可靠性
可扩展 : 支持水平扩展和动态扩容
持久化 : 消息持久化存储
1.3 集群架构优势
容错性 : 单点故障不影响整体服务
负载均衡 : 消息分布到多个分区
水平扩展 : 支持动态添加Broker节点
数据安全 : 多副本保证数据不丢失
性能优化 : 并行处理提升整体性能
2. Kafka集群架构设计 2.1 集群拓扑结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 kafka-cluster: brokers: - id: 1 host: kafka-broker-1 port: 9092 rack: rack-1 - id: 2 host: kafka-broker-2 port: 9092 rack: rack-2 - id: 3 host: kafka-broker-3 port: 9092 rack: rack-3 zookeeper: - host: zookeeper-1 port: 2181 - host: zookeeper-2 port: 2181 - host: zookeeper-3 port: 2181 topics: - name: user-events partitions: 12 replication-factor: 3 - name: order-events partitions: 8 replication-factor: 3
2.2 Broker配置优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 broker.id =1 listeners =PLAINTEXT://0.0.0.0:9092 advertised.listeners =PLAINTEXT://kafka-broker-1:9092 log.dirs =/opt/kafka/logs num.network.threads =8 num.io.threads =16 socket.send.buffer.bytes =102400 socket.receive.buffer.bytes =102400 socket.request.max.bytes =104857600 num.partitions =3 num.recovery.threads.per.data.dir =1 log.retention.hours =168 log.segment.bytes =1073741824 log.retention.check.interval.ms =300000 default.replication.factor =3 min.insync.replicas =2 unclean.leader.election.enable =false compression.type =snappy log.cleanup.policy =delete replica.fetch.max.bytes =1048576 message.max.bytes =1000000 replica.lag.time.max.ms =10000
2.3 Topic分区策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 @Service public class TopicPartitionStrategy { private final AdminClient adminClient; public TopicPartitionStrategy (AdminClient adminClient) { this .adminClient = adminClient; } public void createTopic (String topicName, int partitions, int replicationFactor) { try { NewTopic newTopic = new NewTopic (topicName, partitions, (short ) replicationFactor); CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic)); result.all().get(30 , TimeUnit.SECONDS); System.out.println("Topic " + topicName + " 创建成功" ); } catch (Exception e) { throw new RuntimeException ("创建Topic失败" , e); } } public void increasePartitions (String topicName, int newPartitionCount) { try { Map<String, NewPartitions> partitions = new HashMap <>(); partitions.put(topicName, NewPartitions.increaseTo(newPartitionCount)); CreatePartitionsResult result = adminClient.createPartitions(partitions); result.all().get(30 , TimeUnit.SECONDS); System.out.println("Topic " + topicName + " 分区数增加到 " + newPartitionCount); } catch (Exception e) { throw new RuntimeException ("增加分区失败" , e); } } public TopicDescription getTopicInfo (String topicName) { try { DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topicName)); return result.topicNameValues().get(topicName).get(30 , TimeUnit.SECONDS); } catch (Exception e) { throw new RuntimeException ("获取Topic信息失败" , e); } } public int calculatePartitionCount (int expectedThroughput, int partitionThroughput) { return Math.max(1 , (expectedThroughput + partitionThroughput - 1 ) / partitionThroughput); } public int selectPartition (String key, int partitionCount) { if (key == null ) { return ThreadLocalRandom.current().nextInt(partitionCount); } return Math.abs(key.hashCode()) % partitionCount; } }
3. 高可用部署方案 3.1 Docker Compose部署 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 version: '3.8' services: zookeeper-1: image: confluentinc/cp-zookeeper:7.0.0 hostname: zookeeper-1 container_name: zookeeper-1 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888 volumes: - zookeeper-1-data:/var/lib/zookeeper/data - zookeeper-1-logs:/var/lib/zookeeper/log networks: - kafka-network zookeeper-2: image: confluentinc/cp-zookeeper:7.0.0 hostname: zookeeper-2 container_name: zookeeper-2 ports: - "2182:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_SERVER_ID: 2 ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888 volumes: - zookeeper-2-data:/var/lib/zookeeper/data - zookeeper-2-logs:/var/lib/zookeeper/log networks: - kafka-network zookeeper-3: image: confluentinc/cp-zookeeper:7.0.0 hostname: zookeeper-3 container_name: zookeeper-3 ports: - "2183:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_SERVER_ID: 3 ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888 volumes: - zookeeper-3-data:/var/lib/zookeeper/data - zookeeper-3-logs:/var/lib/zookeeper/log networks: - kafka-network kafka-broker-1: image: confluentinc/cp-kafka:7.0.0 hostname: kafka-broker-1 container_name: kafka-broker-1 ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-1:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092 KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' KAFKA_NUM_PARTITIONS: 3 KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_MIN_INSYNC_REPLICAS: 2 volumes: - kafka-broker-1-data:/var/lib/kafka/data networks: - kafka-network depends_on: - zookeeper-1 - zookeeper-2 - zookeeper-3 kafka-broker-2: image: confluentinc/cp-kafka:7.0.0 hostname: kafka-broker-2 container_name: kafka-broker-2 ports: - "9093:9092" environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-2:29092,PLAINTEXT_HOST://localhost:9093 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092 KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' KAFKA_NUM_PARTITIONS: 3 KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_MIN_INSYNC_REPLICAS: 2 volumes: - kafka-broker-2-data:/var/lib/kafka/data networks: - kafka-network depends_on: - zookeeper-1 - zookeeper-2 - zookeeper-3 kafka-broker-3: image: confluentinc/cp-kafka:7.0.0 hostname: kafka-broker-3 container_name: kafka-broker-3 ports: - "9094:9092" environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-3:29092,PLAINTEXT_HOST://localhost:9094 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092 KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' KAFKA_NUM_PARTITIONS: 3 KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_MIN_INSYNC_REPLICAS: 2 volumes: - kafka-broker-3-data:/var/lib/kafka/data networks: - kafka-network depends_on: - zookeeper-1 - zookeeper-2 - zookeeper-3 volumes: zookeeper-1-data: zookeeper-1-logs: zookeeper-2-data: zookeeper-2-logs: zookeeper-3-data: zookeeper-3-logs: kafka-broker-1-data: kafka-broker-2-data: kafka-broker-3-data: networks: kafka-network: driver: bridge
3.2 Kubernetes部署 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 apiVersion: v1 kind: ConfigMap metadata: name: kafka-config data: server.properties: | broker.id=${KAFKA_BROKER_ID} listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://${KAFKA_ADVERTISED_HOSTNAME}:9092 zookeeper.connect=${KAFKA_ZOOKEEPER_CONNECT} log.dirs=/var/lib/kafka/data num.partitions=3 default.replication.factor=3 min.insync.replicas=2 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2 auto.create.topics.enable=false compression.type=snappy log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 --- apiVersion: apps/v1 kind: StatefulSet metadata: name: kafka spec: serviceName: kafka replicas: 3 selector: matchLabels: app: kafka template: metadata: labels: app: kafka spec: containers: - name: kafka image: confluentinc/cp-kafka:7.0.0 ports: - containerPort: 9092 env: - name: KAFKA_BROKER_ID valueFrom: fieldRef: fieldPath: metadata.name - name: KAFKA_ADVERTISED_HOSTNAME valueFrom: fieldRef: fieldPath: status.podIP - name: KAFKA_ZOOKEEPER_CONNECT value: "zookeeper-0.zookeeper:2181,zookeeper-1.zookeeper:2181,zookeeper-2.zookeeper:2181" volumeMounts: - name: kafka-data mountPath: /var/lib/kafka/data - name: kafka-config mountPath: /etc/kafka/server.properties subPath: server.properties resources: requests: memory: "2Gi" cpu: "1000m" limits: memory: "4Gi" cpu: "2000m" livenessProbe: exec: command: - kafka-broker-api-versions - --bootstrap-server - localhost:9092 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: exec: command: - kafka-broker-api-versions - --bootstrap-server - localhost:9092 initialDelaySeconds: 5 periodSeconds: 5 volumeClaimTemplates: - metadata: name: kafka-data spec: accessModes: ["ReadWriteOnce" ] resources: requests: storage: 10Gi --- apiVersion: v1 kind: Service metadata: name: kafka spec: ports: - port: 9092 targetPort: 9092 selector: app: kafka clusterIP: None
3.3 生产环境部署脚本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 #!/bin/bash deploy_kafka_cluster () { local cluster_name=$1 local broker_count=$2 local zookeeper_count=$3 echo "开始部署Kafka集群: $cluster_name " echo "Broker数量: $broker_count " echo "Zookeeper数量: $zookeeper_count " deploy_zookeeper_cluster $zookeeper_count wait_for_zookeeper deploy_kafka_brokers $broker_count wait_for_kafka create_default_topics echo "Kafka集群部署完成" } deploy_zookeeper_cluster () { local count=$1 for i in $(seq 1 $count ); do echo "部署Zookeeper节点 $i " cat > zookeeper-$i .properties << EOF dataDir=/var/lib/zookeeper/data clientPort=2181 tickTime=2000 initLimit=10 syncLimit=5 server.1=zookeeper-1:2888:3888 server.2=zookeeper-2:2888:3888 server.3=zookeeper-3:2888:3888 EOF docker run -d \ --name zookeeper-$i \ --network kafka-network \ -p 218$i :2181 \ -v zookeeper-$i -data:/var/lib/zookeeper/data \ -v $(pwd )/zookeeper-$i .properties:/opt/zookeeper/conf/zoo.cfg \ confluentinc/cp-zookeeper:7.0.0 done } deploy_kafka_brokers () { local count=$1 for i in $(seq 1 $count ); do echo "部署Kafka Broker $i " cat > server-$i .properties << EOF broker.id=$i listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://kafka-broker-$i:9092 zookeeper.connect=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 log.dirs=/var/lib/kafka/data num.partitions=3 default.replication.factor=3 min.insync.replicas=2 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2 auto.create.topics.enable=false compression.type=snappy log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 EOF docker run -d \ --name kafka-broker-$i \ --network kafka-network \ -p 909$i :9092 \ -v kafka-broker-$i -data:/var/lib/kafka/data \ -v $(pwd )/server-$i .properties:/etc/kafka/server.properties \ confluentinc/cp-kafka:7.0.0 \ kafka-server-start /etc/kafka/server.properties done } wait_for_zookeeper () { echo "等待Zookeeper启动..." for i in {1..30}; do if docker exec zookeeper-1 zkServer.sh status > /dev/null 2>&1; then echo "Zookeeper启动成功" return 0 fi sleep 2 done echo "Zookeeper启动超时" exit 1 } wait_for_kafka () { echo "等待Kafka启动..." for i in {1..30}; do if docker exec kafka-broker-1 kafka-broker-api-versions --bootstrap-server localhost:9092 > /dev/null 2>&1; then echo "Kafka启动成功" return 0 fi sleep 2 done echo "Kafka启动超时" exit 1 } create_default_topics () { echo "创建默认Topic..." docker exec kafka-broker-1 kafka-topics --create \ --bootstrap-server localhost:9092 \ --topic user-events \ --partitions 12 \ --replication-factor 3 docker exec kafka-broker-1 kafka-topics --create \ --bootstrap-server localhost:9092 \ --topic order-events \ --partitions 8 \ --replication-factor 3 echo "默认Topic创建完成" } main () { case $1 in "deploy" ) deploy_kafka_cluster $2 $3 $4 ;; "status" ) check_cluster_status ;; "stop" ) stop_cluster ;; "start" ) start_cluster ;; *) echo "Usage: $0 {deploy|status|stop|start} [args...]" exit 1 ;; esac } main "$@ "
4. 性能优化配置 4.1 生产者优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @Service public class KafkaProducerOptimization { private final KafkaProducer<String, String> producer; public KafkaProducerOptimization () { Properties props = new Properties (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092" ); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 ); props.put(ProducerConfig.LINGER_MS_CONFIG, 5 ); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy" ); props.put(ProducerConfig.ACKS_CONFIG, "1" ); props.put(ProducerConfig.RETRIES_CONFIG, 3 ); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100 ); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432 ); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000 ); props.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072 ); props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 32768 ); this .producer = new KafkaProducer <>(props); } public void sendAsync (String topic, String key, String value) { ProducerRecord<String, String> record = new ProducerRecord <>(topic, key, value); producer.send(record, (metadata, exception) -> { if (exception != null ) { System.err.println("发送失败: " + exception.getMessage()); } else { System.out.println("发送成功: " + metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset()); } }); } public void sendBatch (String topic, List<ProducerRecord<String, String>> records) { for (ProducerRecord<String, String> record : records) { producer.send(record); } producer.flush(); } public void sendTransactional (String topic, String key, String value) { producer.beginTransaction(); try { ProducerRecord<String, String> record = new ProducerRecord <>(topic, key, value); producer.send(record); producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); throw new RuntimeException ("事务发送失败" , e); } } }
4.2 消费者优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 @Service public class KafkaConsumerOptimization { private final KafkaConsumer<String, String> consumer; public KafkaConsumerOptimization () { Properties props = new Properties (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092" ); props.put(ConsumerConfig.GROUP_ID_CONFIG, "optimized-consumer-group" ); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1 ); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500 ); props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576 ); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000 ); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000 ); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false ); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000 ); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest" ); this .consumer = new KafkaConsumer <>(props); } public void consumeBatch (String topic, Consumer<List<ConsumerRecord<String, String>>> processor) { consumer.subscribe(Collections.singletonList(topic)); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); if (!records.isEmpty()) { List<ConsumerRecord<String, String>> recordList = new ArrayList <>(); for (ConsumerRecord<String, String> record : records) { recordList.add(record); } processor.accept(recordList); consumer.commitSync(); } } } public void consumeStream (String topic, Consumer<ConsumerRecord<String, String>> processor) { consumer.subscribe(Collections.singletonList(topic)); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records) { processor.accept(record); } consumer.commitSync(); } } public void consumePartition (String topic, int partition, Consumer<ConsumerRecord<String, String>> processor) { TopicPartition topicPartition = new TopicPartition (topic, partition); consumer.assign(Collections.singletonList(topicPartition)); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records) { processor.accept(record); } consumer.commitSync(); } } }
4.3 集群性能调优 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 num.network.threads =8 num.io.threads =16 socket.send.buffer.bytes =102400 socket.receive.buffer.bytes =102400 socket.request.max.bytes =104857600 log.flush.interval.messages =10000 log.flush.interval.ms =1000 log.flush.scheduler.interval.ms =2000 log.retention.check.interval.ms =300000 replica.fetch.max.bytes =1048576 replica.fetch.wait.max.ms =500 replica.socket.timeout.ms =30000 replica.socket.receive.buffer.bytes =65536 compression.type =snappy log.cleanup.policy =delete log.segment.bytes =1073741824 log.index.interval.bytes =4096 replica.lag.time.max.ms =10000 replica.lag.max.messages =4000 replica.fetch.min.bytes =1 replica.fetch.max.wait.ms =500
5. 监控与运维 5.1 集群监控 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 @Service public class KafkaClusterMonitor { private final AdminClient adminClient; private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1 ); public KafkaClusterMonitor (AdminClient adminClient) { this .adminClient = adminClient; scheduler.scheduleAtFixedRate(this ::monitorCluster, 0 , 30 , TimeUnit.SECONDS); } private void monitorCluster () { try { monitorBrokers(); monitorTopics(); monitorConsumerGroups(); } catch (Exception e) { System.err.println("监控集群失败: " + e.getMessage()); } } private void monitorBrokers () { try { DescribeClusterResult result = adminClient.describeCluster(); Collection<Node> nodes = result.nodes().get(30 , TimeUnit.SECONDS); System.out.println("=== Broker状态 ===" ); for (Node node : nodes) { System.out.println("Broker " + node.id() + ": " + node.host() + ":" + node.port()); } } catch (Exception e) { System.err.println("监控Broker失败: " + e.getMessage()); } } private void monitorTopics () { try { ListTopicsResult result = adminClient.listTopics(); Set<String> topics = result.names().get(30 , TimeUnit.SECONDS); System.out.println("=== Topic状态 ===" ); for (String topic : topics) { DescribeTopicsResult describeResult = adminClient.describeTopics(Collections.singletonList(topic)); TopicDescription description = describeResult.topicNameValues().get(topic).get(30 , TimeUnit.SECONDS); System.out.println("Topic: " + topic + ", 分区数: " + description.partitions().size()); } } catch (Exception e) { System.err.println("监控Topic失败: " + e.getMessage()); } } private void monitorConsumerGroups () { try { ListConsumerGroupsResult result = adminClient.listConsumerGroups(); Collection<ConsumerGroupListing> groups = result.valid().get(30 , TimeUnit.SECONDS); System.out.println("=== 消费者组状态 ===" ); for (ConsumerGroupListing group : groups) { System.out.println("消费者组: " + group.groupId() + ", 状态: " + group.state()); } } catch (Exception e) { System.err.println("监控消费者组失败: " + e.getMessage()); } } }
5.2 性能指标监控 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Component public class KafkaPerformanceMonitor { private final MeterRegistry meterRegistry; private final Timer producerTimer; private final Timer consumerTimer; private final Counter messageCounter; private final Gauge lagGauge; public KafkaPerformanceMonitor (MeterRegistry meterRegistry) { this .meterRegistry = meterRegistry; this .producerTimer = Timer.builder("kafka.producer.latency" ) .register(meterRegistry); this .consumerTimer = Timer.builder("kafka.consumer.latency" ) .register(meterRegistry); this .messageCounter = Counter.builder("kafka.messages.processed" ) .register(meterRegistry); this .lagGauge = Gauge.builder("kafka.consumer.lag" ) .register(meterRegistry, this , KafkaPerformanceMonitor::getConsumerLag); } public void recordProducerLatency (Duration duration) { producerTimer.record(duration); } public void recordConsumerLatency (Duration duration) { consumerTimer.record(duration); } public void incrementMessageCount () { messageCounter.increment(); } private double getConsumerLag () { return 0.0 ; } }
5.3 告警配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 kafka: alerts: broker-down: enabled: true threshold: 1 message: "Kafka Broker节点宕机" topic-lag: enabled: true threshold: 10000 message: "Topic消费延迟过高" partition-unavailable: enabled: true threshold: 1 message: "分区不可用" consumer-group-lag: enabled: true threshold: 5000 message: "消费者组延迟过高"
6. 故障处理与恢复 6.1 常见故障处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 @Service public class KafkaFaultHandler { private final AdminClient adminClient; public KafkaFaultHandler (AdminClient adminClient) { this .adminClient = adminClient; } public void handleBrokerFailure (int brokerId) { System.out.println("处理Broker " + brokerId + " 故障" ); try { DescribeClusterResult result = adminClient.describeCluster(); Collection<Node> nodes = result.nodes().get(30 , TimeUnit.SECONDS); boolean brokerExists = nodes.stream() .anyMatch(node -> node.id() == brokerId); if (!brokerExists) { System.out.println("Broker " + brokerId + " 已下线" ); reassignPartitions(brokerId); } } catch (Exception e) { System.err.println("处理Broker故障失败: " + e.getMessage()); } } private void reassignPartitions (int brokerId) { try { ListTopicsResult topicsResult = adminClient.listTopics(); Set<String> topics = topicsResult.names().get(30 , TimeUnit.SECONDS); for (String topic : topics) { reassignTopicPartitions(topic, brokerId); } } catch (Exception e) { System.err.println("重新分配分区失败: " + e.getMessage()); } } private void reassignTopicPartitions (String topic, int brokerId) { try { DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topic)); TopicDescription description = result.topicNameValues().get(topic).get(30 , TimeUnit.SECONDS); Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap <>(); for (TopicPartitionInfo partitionInfo : description.partitions()) { if (partitionInfo.replicas().stream().anyMatch(node -> node.id() == brokerId)) { List<Integer> newReplicas = partitionInfo.replicas().stream() .filter(node -> node.id() != brokerId) .map(Node::id) .collect(Collectors.toList()); reassignments.put(new TopicPartition (topic, partitionInfo.partition()), Optional.of(new NewPartitionReassignment (newReplicas))); } } if (!reassignments.isEmpty()) { AlterPartitionReassignmentsResult reassignResult = adminClient.alterPartitionReassignments(reassignments); reassignResult.all().get(30 , TimeUnit.SECONDS); System.out.println("Topic " + topic + " 分区重新分配完成" ); } } catch (Exception e) { System.err.println("重新分配Topic分区失败: " + e.getMessage()); } } public void handlePartitionUnavailable (String topic, int partition) { System.out.println("处理分区不可用: " + topic + "-" + partition); try { DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topic)); TopicDescription description = result.topicNameValues().get(topic).get(30 , TimeUnit.SECONDS); TopicPartitionInfo partitionInfo = description.partitions().stream() .filter(p -> p.partition() == partition) .findFirst() .orElse(null ); if (partitionInfo != null ) { if (partitionInfo.isr().isEmpty()) { System.out.println("分区 " + topic + "-" + partition + " ISR为空" ); recoverPartition(topic, partition); } } } catch (Exception e) { System.err.println("处理分区不可用失败: " + e.getMessage()); } } private void recoverPartition (String topic, int partition) { try { System.out.println("尝试恢复分区: " + topic + "-" + partition); } catch (Exception e) { System.err.println("恢复分区失败: " + e.getMessage()); } } }
6.2 数据恢复策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 @Service public class KafkaDataRecovery { private final AdminClient adminClient; public KafkaDataRecovery (AdminClient adminClient) { this .adminClient = adminClient; } public void recoverTopicData (String topic) { System.out.println("恢复Topic数据: " + topic); try { DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topic)); TopicDescription description = result.topicNameValues().get(topic).get(30 , TimeUnit.SECONDS); for (TopicPartitionInfo partitionInfo : description.partitions()) { if (partitionInfo.isr().size() < partitionInfo.replicas().size()) { System.out.println("分区 " + partitionInfo.partition() + " 需要恢复" ); recoverPartition(topic, partitionInfo.partition()); } } } catch (Exception e) { System.err.println("恢复Topic数据失败: " + e.getMessage()); } } private void recoverPartition (String topic, int partition) { try { System.out.println("恢复分区: " + topic + "-" + partition); } catch (Exception e) { System.err.println("恢复分区失败: " + e.getMessage()); } } public void backupTopicData (String topic, String backupPath) { System.out.println("备份Topic数据: " + topic + " 到 " + backupPath); try { System.out.println("Topic数据备份完成" ); } catch (Exception e) { System.err.println("备份Topic数据失败: " + e.getMessage()); } } public void restoreTopicData (String topic, String backupPath) { System.out.println("从备份恢复Topic数据: " + topic + " 从 " + backupPath); try { System.out.println("Topic数据恢复完成" ); } catch (Exception e) { System.err.println("恢复Topic数据失败: " + e.getMessage()); } } }
7. 最佳实践总结 7.1 集群设计原则
高可用 : 至少3个Broker节点,副本因子为3
负载均衡 : 合理分配分区到不同Broker
容错性 : 支持单点故障不影响服务
可扩展 : 支持动态添加Broker节点
性能优化 : 合理配置参数提升性能
7.2 部署最佳实践
硬件配置 : 使用SSD存储,充足的内存和CPU
网络配置 : 使用千兆网络,避免网络瓶颈
存储配置 : 合理配置日志目录和保留策略
监控配置 : 建立完善的监控和告警体系
备份策略 : 定期备份重要数据
7.3 运维最佳实践
定期监控 : 监控集群状态和性能指标
及时处理 : 及时发现和处理故障
容量规划 : 提前规划集群容量
版本升级 : 谨慎进行版本升级
安全加固 : 加强集群安全防护
通过合理的集群架构设计和部署方案,可以构建稳定、高性能的Kafka集群系统,满足企业级应用的需求。