第457集Kafka从基础到架构实战 | 字数总计: 5.9k | 阅读时长: 28分钟 | 阅读量:
Kafka从基础到架构实战 1. 概述 1.1 Kafka的重要性 Apache Kafka 是一个分布式流处理平台,最初由LinkedIn开发,现已成为Apache顶级项目。Kafka具有高吞吐量、低延迟、可扩展性强等特点,广泛应用于大数据、实时流处理、日志收集、消息队列等场景。
Kafka核心特性 :
高吞吐量 :支持百万级消息/秒
低延迟 :毫秒级消息传递
可扩展性 :水平扩展,支持集群部署
持久化 :消息持久化到磁盘
高可用 :支持副本机制,保障数据安全
1.2 Kafka应用场景 典型应用场景 :
日志收集 :收集各系统日志,统一处理
消息队列 :解耦系统,异步处理
流式处理 :实时数据处理和分析
事件溯源 :记录系统所有事件
IoT设备监控 :设备数据上报和处理
1.3 本文内容结构 本文将从以下几个方面全面解析Kafka:
Kafka基础 :核心概念、架构设计
Kafka单机部署 :安装、配置、验证
Kafka集群部署 :集群配置、高可用
Kafka与Java集成 :Producer、Consumer、Streams
高可用架构 :副本机制、数据一致性
性能优化 :参数调优、性能测试
监控告警 :JMX监控、Prometheus集成
实战案例 :IoT设备监控系统架构
2. Kafka基础 2.1 Kafka核心概念 2.1.1 Topic(主题) Topic :消息的分类,类似于数据库中的表。
特点 :
一个Topic可以有多个Partition(分区)
每个Partition可以有多个Replica(副本)
消息按顺序存储在Partition中
2.1.2 Partition(分区) Partition :Topic的物理分区,每个Partition是一个有序的消息队列。
特点 :
提高并发性能
支持水平扩展
消息在Partition内有序
2.1.3 Replica(副本) Replica :Partition的副本,用于保障高可用。
类型 :
Leader Replica :处理读写请求
Follower Replica :同步Leader数据
2.1.4 Producer(生产者) Producer :向Kafka发送消息的客户端。
特点 :
可以指定发送到哪个Partition
支持同步和异步发送
支持消息确认机制
2.1.5 Consumer(消费者) Consumer :从Kafka读取消息的客户端。
特点 :
可以组成Consumer Group
支持自动或手动提交offset
支持从指定offset读取
2.1.6 Consumer Group(消费者组) Consumer Group :多个Consumer组成的组。
特点 :
组内Consumer共享Topic的Partition
一个Partition只能被组内一个Consumer消费
支持负载均衡和故障转移
2.1.7 Broker(代理) Broker :Kafka服务器节点。
特点 :
每个Broker有唯一ID
可以管理多个Topic的Partition
支持集群部署
2.2 Kafka架构设计 2.2.1 整体架构 1 2 3 Producer → Kafka Cluster (Brokers) → Consumer ↓ Zookeeper
组件说明 :
Producer :消息生产者
Kafka Cluster :Kafka集群(多个Broker)
Consumer :消息消费者
Zookeeper :元数据管理和协调服务
2.2.2 消息存储 消息存储结构 :
1 2 3 4 Topic: device-data ├── Partition 0 (Leader: Broker1, Replicas: Broker1, Broker2, Broker3) ├── Partition 1 (Leader: Broker2, Replicas: Broker2, Broker3, Broker1) └── Partition 2 (Leader: Broker3, Replicas: Broker3, Broker1, Broker2)
3. Kafka单机部署 3.1 环境准备 3.1.1 系统要求 操作系统 :Linux(CentOS 7+、Ubuntu 18+)
Java环境 :
JDK 8或更高版本
推荐使用OpenJDK或Oracle JDK
内存 :至少2GB可用内存
磁盘 :SSD推荐,至少10GB可用空间
3.1.2 安装Java 1 2 3 4 5 6 7 8 9 yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel apt-get update apt-get install -y openjdk-8-jdk java -version
3.1.3 安装Zookeeper Kafka依赖Zookeeper ,需要先安装Zookeeper。
1 2 3 4 5 6 7 8 9 10 11 12 13 cd /optwget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz tar -xzf apache-zookeeper-3.6.3-bin.tar.gz mv apache-zookeeper-3.6.3-bin zookeepercd zookeepermkdir -p /data/zookeepercp conf/zoo_sample.cfg conf/zoo.cfgvim conf/zoo.cfg
Zookeeper配置(conf/zoo.cfg) :
1 2 3 4 5 6 7 8 dataDir =/data/zookeeper clientPort =2181 server.1 =localhost:2888:3888
启动Zookeeper :
1 2 3 4 5 bin/zkServer.sh start bin/zkCli.sh -server localhost:2181
3.2 Kafka安装 3.2.1 下载安装 1 2 3 4 5 6 cd /optwget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz tar -xzf kafka_2.13-2.8.0.tgz mv kafka_2.13-2.8.0 kafkacd kafka
3.2.2 配置文件 编辑server.properties :
1 vim config/server.properties
基础配置 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 broker.id =0 listeners =PLAINTEXT://0.0.0.0:9092 log.dirs =/data/kafka-logs zookeeper.connect =localhost:2181 log.retention.hours =168 log.retention.bytes =1073741824 log.segment.bytes =1073741824
3.2.3 启动Kafka 1 2 3 4 5 6 7 8 bin/kafka-server-start.sh -daemon config/server.properties jps | grep Kafka tail -f logs/server.log
3.3 验证安装 3.3.1 创建Topic 1 2 3 4 5 6 7 8 9 10 11 12 13 14 bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 3 \ --topic test-topic bin/kafka-topics.sh --list --bootstrap-server localhost:9092 bin/kafka-topics.sh --describe \ --bootstrap-server localhost:9092 \ --topic test-topic
3.3.2 发送消息 1 2 3 4 bin/kafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic test-topic
3.3.3 消费消息 1 2 3 4 5 bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic test-topic \ --from-beginning
4. Kafka集群部署 4.1 集群架构设计 4.1.1 集群规划 3节点Kafka集群 :
节点
IP
Broker ID
角色
kafka-1
192.168.1.101
1
Broker
kafka-2
192.168.1.102
2
Broker
kafka-3
192.168.1.103
3
Broker
Zookeeper集群 :
节点
IP
Server ID
角色
zk-1
192.168.1.101
1
Zookeeper
zk-2
192.168.1.102
2
Zookeeper
zk-3
192.168.1.103
3
Zookeeper
4.2 Zookeeper集群配置 4.2.1 Zookeeper配置 每个节点配置(conf/zoo.cfg) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 dataDir =/data/zookeeper clientPort =2181 server.1 =192.168.1.101:2888:3888 server.2 =192.168.1.102:2888:3888 server.3 =192.168.1.103:2888:3888 tickTime =2000 initLimit =10 syncLimit =5
创建myid文件 :
1 2 3 4 5 6 7 8 echo "1" > /data/zookeeper/myidecho "2" > /data/zookeeper/myidecho "3" > /data/zookeeper/myid
4.2.2 启动Zookeeper集群 1 2 3 4 5 bin/zkServer.sh start bin/zkServer.sh status
4.3 Kafka集群配置 4.3.1 节点1配置(192.168.1.101) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 broker.id =1 listeners =PLAINTEXT://192.168.1.101:9092 log.dirs =/data/kafka-logs zookeeper.connect =192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 default.replication.factor =3 min.insync.replicas =2 num.network.threads =8 num.io.threads =8 socket.send.buffer.bytes =102400 socket.receive.buffer.bytes =102400 socket.request.max.bytes =104857600 log.retention.hours =168 log.segment.bytes =1073741824 log.retention.check.interval.ms =300000
4.3.2 节点2配置(192.168.1.102) 1 2 3 4 5 6 7 broker.id =2 listeners =PLAINTEXT://192.168.1.102:9092 log.dirs =/data/kafka-logs zookeeper.connect =192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 default.replication.factor =3 min.insync.replicas =2
4.3.3 节点3配置(192.168.1.103) 1 2 3 4 5 6 7 broker.id =3 listeners =PLAINTEXT://192.168.1.103:9092 log.dirs =/data/kafka-logs zookeeper.connect =192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 default.replication.factor =3 min.insync.replicas =2
4.3.4 启动Kafka集群 1 2 3 4 5 bin/kafka-server-start.sh -daemon config/server.properties bin/kafka-broker-api-versions.sh --bootstrap-server 192.168.1.101:9092
4.4 创建集群Topic 4.4.1 创建Topic 1 2 3 4 5 6 7 8 9 10 11 bin/kafka-topics.sh --create \ --bootstrap-server 192.168.1.101:9092 \ --replication-factor 3 \ --partitions 3 \ --topic device-data bin/kafka-topics.sh --describe \ --bootstrap-server 192.168.1.101:9092 \ --topic device-data
输出示例 :
1 2 3 4 Topic: device-data PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: device-data Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: device-data Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: device-data Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
5. Kafka与Java集成 5.1 Maven依赖 5.1.1 添加依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 <dependencies > <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 2.8.0</version > </dependency > <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-streams</artifactId > <version > 2.8.0</version > </dependency > <dependency > <groupId > org.slf4j</groupId > <artifactId > slf4j-api</artifactId > <version > 1.7.30</version > </dependency > <dependency > <groupId > org.slf4j</groupId > <artifactId > slf4j-log4j12</artifactId > <version > 1.7.30</version > </dependency > </dependencies >
5.2 Producer实现 5.2.1 基础Producer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package com.example.kafka;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class DeviceDataProducer { private static final String BOOTSTRAP_SERVERS = "192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092" ; private static final String TOPIC = "device-data" ; public static void main (String[] args) { Properties props = new Properties (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all" ); props.put(ProducerConfig.RETRIES_CONFIG, 3 ); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 ); props.put(ProducerConfig.LINGER_MS_CONFIG, 10 ); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432 ); KafkaProducer<String, String> producer = new KafkaProducer <>(props); try { for (int i = 0 ; i < 100 ; i++) { String deviceId = "device-" + (i % 10 ); String data = "{\"deviceId\":\"" + deviceId + "\",\"timestamp\":" + System.currentTimeMillis() + ",\"data\":\"value" + i + "\"}" ; ProducerRecord<String, String> record = new ProducerRecord <>(TOPIC, deviceId, data); producer.send(record, new Callback () { @Override public void onCompletion (RecordMetadata metadata, Exception exception) { if (exception == null ) { System.out.println("消息发送成功: topic=" + metadata.topic() + ", partition=" + metadata.partition() + ", offset=" + metadata.offset()); } else { System.err.println("消息发送失败: " + exception.getMessage()); } } }); } } finally { producer.close(); } } }
5.2.2 高级Producer(分区策略) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class AdvancedProducer { public static void main (String[] args) { Properties props = new Properties (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DevicePartitioner.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer <>(props); ProducerRecord<String, String> record = new ProducerRecord <>( TOPIC, 0 , "device-001" , "data" ); producer.send(record); producer.close(); } } class DevicePartitioner implements Partitioner { @Override public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) { String deviceId = (String) key; int partitionCount = cluster.partitionCountForTopic(topic); return Math.abs(deviceId.hashCode()) % partitionCount; } @Override public void close () {} @Override public void configure (Map<String, ?> configs) {} }
5.3 Consumer实现 5.3.1 基础Consumer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package com.example.kafka;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class DeviceDataConsumer { private static final String BOOTSTRAP_SERVERS = "192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092" ; private static final String TOPIC = "device-data" ; private static final String GROUP_ID = "device-consumer-group" ; public static void main (String[] args) { Properties props = new Properties (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true" ); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000" ); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props); consumer.subscribe(Collections.singletonList(TOPIC)); try { while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100 )); for (ConsumerRecord<String, String> record : records) { System.out.println("收到消息: topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset() + ", key=" + record.key() + ", value=" + record.value()); processMessage(record.value()); } } } finally { consumer.close(); } } private static void processMessage (String message) { System.out.println("处理消息: " + message); } }
5.3.2 高级Consumer(手动提交) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class AdvancedConsumer { public static void main (String[] args) { Properties props = new Properties (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false" ); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100" ); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props); consumer.subscribe(Collections.singletonList(TOPIC)); try { while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100 )); for (ConsumerRecord<String, String> record : records) { try { processMessage(record.value()); } catch (Exception e) { System.err.println("处理消息失败: " + e.getMessage()); } } consumer.commitSync(); } } finally { consumer.close(); } } }
5.4 Consumer Group 5.4.1 多Consumer实例 启动多个Consumer实例,组成Consumer Group :
1 2 3 4 5 6 7 8 java -cp kafka-demo.jar com.example.kafka.DeviceDataConsumer java -cp kafka-demo.jar com.example.kafka.DeviceDataConsumer java -cp kafka-demo.jar com.example.kafka.DeviceDataConsumer
效果 :
3个Consumer实例共享Topic的Partition
每个Partition只被一个Consumer消费
实现负载均衡
6. 高可用架构 6.1 副本机制 6.1.1 副本配置 创建Topic时指定副本数 :
1 2 3 4 5 bin/kafka-topics.sh --create \ --bootstrap-server 192.168.1.101:9092 \ --replication-factor 3 \ --partitions 3 \ --topic device-data
配置说明 :
replication-factor=3:每个Partition有3个副本
partitions=3:Topic有3个分区
总共9个副本(3个分区 × 3个副本)
6.1.2 Leader和Follower Leader :
处理读写请求
每个Partition只有一个Leader
Follower :
同步Leader数据
Leader故障时,Follower可以成为新Leader
6.1.3 ISR(In-Sync Replicas) ISR :与Leader保持同步的副本集合。
配置 :
6.2 数据一致性 6.2.1 Producer ACK配置 ACK级别 :
1 2 3 4 5 6 7 8 props.put(ProducerConfig.ACKS_CONFIG, "0" ); props.put(ProducerConfig.ACKS_CONFIG, "1" ); props.put(ProducerConfig.ACKS_CONFIG, "all" );
推荐配置 :
1 2 3 4 props.put(ProducerConfig.ACKS_CONFIG, "all" ); props.put(ProducerConfig.RETRIES_CONFIG, 3 ); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1 );
6.2.2 事务支持 启用事务 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Properties props = new Properties ();props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "device-producer-1" ); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true" ); KafkaProducer<String, String> producer = new KafkaProducer <>(props); producer.initTransactions(); try { producer.beginTransaction(); producer.send(new ProducerRecord <>(TOPIC, "key1" , "value1" )); producer.send(new ProducerRecord <>(TOPIC, "key2" , "value2" )); producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); }
6.3 故障转移 6.3.1 Leader选举 自动Leader选举 :
Leader故障时,Kafka自动从ISR中选择新Leader
保障服务可用性
查看Leader信息 :
1 2 3 bin/kafka-topics.sh --describe \ --bootstrap-server 192.168.1.101:9092 \ --topic device-data
6.3.2 副本同步 副本同步机制 :
Follower定期从Leader拉取数据
保持与Leader的数据一致性
7. 性能优化 7.1 Producer优化 7.1.1 批量发送 1 2 3 4 5 6 7 8 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 ); props.put(ProducerConfig.LINGER_MS_CONFIG, 10 ); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432 );
7.1.2 压缩 1 2 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy" );
7.1.3 异步发送 1 2 3 4 5 6 7 producer.send(record, new Callback () { @Override public void onCompletion (RecordMetadata metadata, Exception exception) { } });
7.2 Consumer优化 7.2.1 批量消费 1 2 3 4 5 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500" ); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000" );
7.2.2 Fetch配置 1 2 3 4 5 props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024" ); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500" );
7.3 Broker优化 7.3.1 网络线程 1 2 3 4 5 6 num.network.threads =8 num.io.threads =8
7.3.2 日志配置 1 2 3 4 5 6 7 8 log.segment.bytes =1073741824 # 1GB log.retention.hours =168 # 7天 log.retention.bytes =10737418240 # 10GB
8. 监控告警 8.1 JMX监控 8.1.1 启用JMX 1 2 3 4 export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999" bin/kafka-server-start.sh config/server.properties
8.1.2 关键指标 Producer指标 :
record-send-rate:消息发送速率
record-error-rate:消息错误率
request-latency-avg:请求平均延迟
Consumer指标 :
records-consumed-rate:消息消费速率
records-lag-max:最大延迟
fetch-rate:拉取速率
Broker指标 :
messages-in-rate:消息接收速率
bytes-in-rate:字节接收速率
bytes-out-rate:字节发送速率
under-replicated-partitions:未充分复制的分区数
8.2 Prometheus集成 8.2.1 Kafka Exporter 安装Kafka Exporter :
1 2 3 wget https://github.com/danielqsj/kafka_exporter/releases/download/v1.3.1/kafka_exporter-1.3.1.linux-amd64.tar.gz tar -xzf kafka_exporter-1.3.1.linux-amd64.tar.gz cd kafka_exporter-1.3.1.linux-amd64
启动Kafka Exporter :
1 2 3 4 ./kafka_exporter --kafka.server=192.168.1.101:9092 \ --kafka.server=192.168.1.102:9092 \ --kafka.server=192.168.1.103:9092 \ --web.listen-address=:9308
Prometheus配置 :
1 2 3 4 5 scrape_configs: - job_name: 'kafka' static_configs: - targets: ['192.168.1.101:9308' ]
9. IoT设备监控系统架构实战 9.1 架构设计 9.1.1 整体架构 1 2 3 4 5 6 7 8 9 10 11 10万台设备 ↓ Nginx(负载均衡) ↓ Java服务(设备上报接口) ↓ Kafka集群(消息中间件) ↓ 多台Java服务(处理Kafka事件) ↓ 数据库/存储
9.1.2 架构说明 设备层 :
10万台IoT设备
每台设备定期上报数据(如每分钟1次)
接入层 :
服务层 :
多台Java服务(设备上报接口)
接收设备数据,发送到Kafka
消息层 :
处理层 :
多台Java服务(Kafka Consumer)
处理Kafka事件,写入数据库
9.2 环境准备 9.2.1 服务器规划 最低成本配置 (每台服务器管理10万台设备):
服务器
配置
数量
用途
Nginx
2核4G
2台(主备)
负载均衡
Java服务(上报)
4核8G
3台
设备上报接口
Kafka集群
4核16G
3台
消息中间件
Zookeeper
2核4G
3台
元数据管理
Java服务(处理)
4核8G
3台
处理Kafka事件
数据库
8核32G
1台(主从)
数据存储
总成本 :约15台服务器(可根据实际情况调整)
9.2.2 性能估算 设备上报频率 :
10万台设备
每分钟1次上报
每秒约1667次请求(10万/60)
单台Java服务处理能力 :
4核8G服务器
预计每秒处理1000-2000请求
3台服务器可处理3000-6000请求/秒
Kafka吞吐量 :
3节点Kafka集群
预计每秒处理10万+消息
完全满足需求
9.3 Nginx负载均衡配置 9.3.1 Nginx配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 upstream device_api { least_conn; server 192.168.1.201:8080 weight=1 max_fails=3 fail_timeout=30s ; server 192.168.1.202:8080 weight=1 max_fails=3 fail_timeout=30s ; server 192.168.1.203:8080 weight=1 max_fails=3 fail_timeout=30s ; } server { listen 80 ; server_name api.device.example.com; location /api/device/report { proxy_pass http://device_api; include proxy_params; proxy_connect_timeout 10s ; proxy_read_timeout 30s ; proxy_send_timeout 30s ; limit_req zone=device_limit burst=10 nodelay; } location /health { access_log off ; return 200 "healthy\n" ; } } limit_req_zone $binary_remote_addr zone=device_limit:10m rate=10r/s;
9.4 Java服务(设备上报) 9.4.1 Spring Boot应用 pom.xml :
1 2 3 4 5 6 7 8 9 10 11 <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 2.8.0</version > </dependency > </dependencies >
application.yml :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 server: port: 8080 spring: kafka: bootstrap-servers: 192.168 .1 .101 :9092,192.168.1.102:9092,192.168.1.103:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all retries: 3 batch-size: 16384 linger-ms: 10 buffer-memory: 33554432
Controller :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package com.example.device.controller;import com.example.device.service.DeviceService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;@RestController @RequestMapping("/api/device") public class DeviceController { @Autowired private DeviceService deviceService; @PostMapping("/report") public Response report (@RequestBody DeviceData data) { try { if (!isValidDevice(data.getDeviceId())) { return Response.error("Invalid device ID" ); } deviceService.sendToKafka(data); return Response.success(); } catch (Exception e) { return Response.error("Failed to report: " + e.getMessage()); } } private boolean isValidDevice (String deviceId) { return deviceId != null && deviceId.startsWith("device-" ); } }
Service :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package com.example.device.service;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.util.Properties;@Service public class DeviceService { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; private KafkaProducer<String, String> producer; private static final String TOPIC = "device-data" ; @PostConstruct public void init () { Properties props = new Properties (); props.put("bootstrap.servers" , bootstrapServers); props.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); props.put("acks" , "all" ); props.put("retries" , 3 ); props.put("batch.size" , 16384 ); props.put("linger.ms" , 10 ); props.put("buffer.memory" , 33554432 ); producer = new KafkaProducer <>(props); } public void sendToKafka (DeviceData data) { ProducerRecord<String, String> record = new ProducerRecord <>( TOPIC, data.getDeviceId(), data.toJson() ); producer.send(record, (metadata, exception) -> { if (exception != null ) { System.err.println("发送失败: " + exception.getMessage()); } }); } @PreDestroy public void destroy () { if (producer != null ) { producer.close(); } } }
9.5 Kafka集群配置 9.5.1 Topic配置 1 2 3 4 5 6 7 8 bin/kafka-topics.sh --create \ --bootstrap-server 192.168.1.101:9092 \ --replication-factor 3 \ --partitions 10 \ --topic device-data \ --config retention.ms=604800000 \ --config segment.ms=3600000
配置说明 :
partitions=10:10个分区,支持10个Consumer并发消费
replication-factor=3:3个副本,保障高可用
retention.ms=604800000:保留7天
segment.ms=3600000:1小时一个段
9.6 Java服务(处理Kafka事件) 9.6.1 Consumer服务 application.yml :
1 2 3 4 5 6 7 8 9 10 spring: kafka: bootstrap-servers: 192.168 .1 .101 :9092,192.168.1.102:9092,192.168.1.103:9092 consumer: group-id: device-processor-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest enable-auto-commit: false max-poll-records: 500
Service :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package com.example.processor.service;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;import org.springframework.stereotype.Service;@Service public class DeviceProcessorService { @KafkaListener(topics = "device-data", groupId = "device-processor-group") public void processDeviceData (ConsumerRecord<String, String> record, Acknowledgment ack) { try { DeviceData data = DeviceData.fromJson(record.value()); processData(data); ack.acknowledge(); } catch (Exception e) { System.err.println("处理失败: " + e.getMessage()); } } private void processData (DeviceData data) { validateData(data); saveToDatabase(data); calculateMetrics(data); checkAlerts(data); } private void validateData (DeviceData data) { } private void saveToDatabase (DeviceData data) { } private void calculateMetrics (DeviceData data) { } private void checkAlerts (DeviceData data) { } }
9.7 数据一致性保障 9.7.1 Producer配置 1 2 3 4 5 props.put(ProducerConfig.ACKS_CONFIG, "all" ); props.put(ProducerConfig.RETRIES_CONFIG, 3 ); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1 ); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true" );
9.7.2 Consumer配置 1 2 3 4 5 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false" ); consumer.commitSync();
9.8 稳定性保障 9.8.1 服务高可用 1. Nginx主备 :
2. Java服务多实例 :
3. Kafka集群 :
9.8.2 监控告警 关键指标 :
设备上报成功率
Kafka消息积压
处理延迟
服务可用性
告警规则 :
设备上报失败率 > 1%
Kafka消息积压 > 10000
处理延迟 > 5秒
服务不可用
10. 总结 10.1 核心要点
Kafka基础 :Topic、Partition、Replica、Producer、Consumer
集群部署 :Zookeeper集群、Kafka集群
Java集成 :Producer、Consumer、Consumer Group
高可用 :副本机制、Leader选举、故障转移
性能优化 :批量发送、压缩、异步处理
监控告警 :JMX监控、Prometheus集成
实战案例 :IoT设备监控系统架构
10.2 架构师建议
集群规划 :
至少3节点Kafka集群
副本数至少为2
分区数根据并发需求设置
数据一致性 :
Producer使用acks=all
Consumer手动提交offset
启用幂等性
性能优化 :
监控告警 :
10.3 最佳实践
标准化 :统一Kafka配置标准
自动化 :自动化部署和监控
监控化 :实时监控和告警
文档化 :维护配置文档和架构图
相关文章 :