Kafka从基础到架构实战

1. 概述

1.1 Kafka的重要性

Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,现已成为Apache顶级项目。Kafka具有高吞吐量、低延迟、可扩展性强等特点,广泛应用于大数据、实时流处理、日志收集、消息队列等场景。

Kafka核心特性

  • 高吞吐量:支持百万级消息/秒
  • 低延迟:毫秒级消息传递
  • 可扩展性:水平扩展,支持集群部署
  • 持久化:消息持久化到磁盘
  • 高可用:支持副本机制,保障数据安全

1.2 Kafka应用场景

典型应用场景

  • 日志收集:收集各系统日志,统一处理
  • 消息队列:解耦系统,异步处理
  • 流式处理:实时数据处理和分析
  • 事件溯源:记录系统所有事件
  • IoT设备监控:设备数据上报和处理

1.3 本文内容结构

本文将从以下几个方面全面解析Kafka:

  1. Kafka基础:核心概念、架构设计
  2. Kafka单机部署:安装、配置、验证
  3. Kafka集群部署:集群配置、高可用
  4. Kafka与Java集成:Producer、Consumer、Streams
  5. 高可用架构:副本机制、数据一致性
  6. 性能优化:参数调优、性能测试
  7. 监控告警:JMX监控、Prometheus集成
  8. 实战案例:IoT设备监控系统架构

2. Kafka基础

2.1 Kafka核心概念

2.1.1 Topic(主题)

Topic:消息的分类,类似于数据库中的表。

特点

  • 一个Topic可以有多个Partition(分区)
  • 每个Partition可以有多个Replica(副本)
  • 消息按顺序存储在Partition中

2.1.2 Partition(分区)

Partition:Topic的物理分区,每个Partition是一个有序的消息队列。

特点

  • 提高并发性能
  • 支持水平扩展
  • 消息在Partition内有序

2.1.3 Replica(副本)

Replica:Partition的副本,用于保障高可用。

类型

  • Leader Replica:处理读写请求
  • Follower Replica:同步Leader数据

2.1.4 Producer(生产者)

Producer:向Kafka发送消息的客户端。

特点

  • 可以指定发送到哪个Partition
  • 支持同步和异步发送
  • 支持消息确认机制

2.1.5 Consumer(消费者)

Consumer:从Kafka读取消息的客户端。

特点

  • 可以组成Consumer Group
  • 支持自动或手动提交offset
  • 支持从指定offset读取

2.1.6 Consumer Group(消费者组)

Consumer Group:多个Consumer组成的组。

特点

  • 组内Consumer共享Topic的Partition
  • 一个Partition只能被组内一个Consumer消费
  • 支持负载均衡和故障转移

2.1.7 Broker(代理)

Broker:Kafka服务器节点。

特点

  • 每个Broker有唯一ID
  • 可以管理多个Topic的Partition
  • 支持集群部署

2.2 Kafka架构设计

2.2.1 整体架构

1
2
3
Producer → Kafka Cluster (Brokers) → Consumer

Zookeeper

组件说明

  • Producer:消息生产者
  • Kafka Cluster:Kafka集群(多个Broker)
  • Consumer:消息消费者
  • Zookeeper:元数据管理和协调服务

2.2.2 消息存储

消息存储结构

1
2
3
4
Topic: device-data
├── Partition 0 (Leader: Broker1, Replicas: Broker1, Broker2, Broker3)
├── Partition 1 (Leader: Broker2, Replicas: Broker2, Broker3, Broker1)
└── Partition 2 (Leader: Broker3, Replicas: Broker3, Broker1, Broker2)

3. Kafka单机部署

3.1 环境准备

3.1.1 系统要求

操作系统:Linux(CentOS 7+、Ubuntu 18+)

Java环境

  • JDK 8或更高版本
  • 推荐使用OpenJDK或Oracle JDK

内存:至少2GB可用内存

磁盘:SSD推荐,至少10GB可用空间

3.1.2 安装Java

1
2
3
4
5
6
7
8
9
# CentOS/RHEL
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

# Ubuntu/Debian
apt-get update
apt-get install -y openjdk-8-jdk

# 验证安装
java -version

3.1.3 安装Zookeeper

Kafka依赖Zookeeper,需要先安装Zookeeper。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 下载Zookeeper
cd /opt
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
tar -xzf apache-zookeeper-3.6.3-bin.tar.gz
mv apache-zookeeper-3.6.3-bin zookeeper
cd zookeeper

# 创建数据目录
mkdir -p /data/zookeeper

# 配置Zookeeper
cp conf/zoo_sample.cfg conf/zoo.cfg
vim conf/zoo.cfg

Zookeeper配置(conf/zoo.cfg)

1
2
3
4
5
6
7
8
# 数据目录
dataDir=/data/zookeeper

# 客户端端口
clientPort=2181

# 服务器配置(单机模式)
server.1=localhost:2888:3888

启动Zookeeper

1
2
3
4
5
# 启动Zookeeper
bin/zkServer.sh start

# 验证
bin/zkCli.sh -server localhost:2181

3.2 Kafka安装

3.2.1 下载安装

1
2
3
4
5
6
# 下载Kafka
cd /opt
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzf kafka_2.13-2.8.0.tgz
mv kafka_2.13-2.8.0 kafka
cd kafka

3.2.2 配置文件

编辑server.properties

1
vim config/server.properties

基础配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Broker ID(集群中唯一)
broker.id=0

# 监听地址
listeners=PLAINTEXT://0.0.0.0:9092

# 日志目录
log.dirs=/data/kafka-logs

# Zookeeper连接
zookeeper.connect=localhost:2181

# 日志保留时间(小时)
log.retention.hours=168

# 日志保留大小
log.retention.bytes=1073741824

# 日志段大小
log.segment.bytes=1073741824

3.2.3 启动Kafka

1
2
3
4
5
6
7
8
# 启动Kafka(后台运行)
bin/kafka-server-start.sh -daemon config/server.properties

# 验证进程
jps | grep Kafka

# 查看日志
tail -f logs/server.log

3.3 验证安装

3.3.1 创建Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 创建Topic
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 3 \
--topic test-topic

# 查看Topic列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看Topic详情
bin/kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic test-topic

3.3.2 发送消息

1
2
3
4
# 启动Producer
bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic

3.3.3 消费消息

1
2
3
4
5
# 启动Consumer(新终端)
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic \
--from-beginning

4. Kafka集群部署

4.1 集群架构设计

4.1.1 集群规划

3节点Kafka集群

节点 IP Broker ID 角色
kafka-1 192.168.1.101 1 Broker
kafka-2 192.168.1.102 2 Broker
kafka-3 192.168.1.103 3 Broker

Zookeeper集群

节点 IP Server ID 角色
zk-1 192.168.1.101 1 Zookeeper
zk-2 192.168.1.102 2 Zookeeper
zk-3 192.168.1.103 3 Zookeeper

4.2 Zookeeper集群配置

4.2.1 Zookeeper配置

每个节点配置(conf/zoo.cfg)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 数据目录
dataDir=/data/zookeeper

# 客户端端口
clientPort=2181

# 服务器配置
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888

# 其他配置
tickTime=2000
initLimit=10
syncLimit=5

创建myid文件

1
2
3
4
5
6
7
8
# 节点1
echo "1" > /data/zookeeper/myid

# 节点2
echo "2" > /data/zookeeper/myid

# 节点3
echo "3" > /data/zookeeper/myid

4.2.2 启动Zookeeper集群

1
2
3
4
5
# 每个节点启动
bin/zkServer.sh start

# 验证集群状态
bin/zkServer.sh status

4.3 Kafka集群配置

4.3.1 节点1配置(192.168.1.101)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# config/server.properties
broker.id=1
listeners=PLAINTEXT://192.168.1.101:9092
log.dirs=/data/kafka-logs
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181

# 副本配置
default.replication.factor=3
min.insync.replicas=2

# 其他配置
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

4.3.2 节点2配置(192.168.1.102)

1
2
3
4
5
6
7
broker.id=2
listeners=PLAINTEXT://192.168.1.102:9092
log.dirs=/data/kafka-logs
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
default.replication.factor=3
min.insync.replicas=2
# ... 其他配置相同

4.3.3 节点3配置(192.168.1.103)

1
2
3
4
5
6
7
broker.id=3
listeners=PLAINTEXT://192.168.1.103:9092
log.dirs=/data/kafka-logs
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
default.replication.factor=3
min.insync.replicas=2
# ... 其他配置相同

4.3.4 启动Kafka集群

1
2
3
4
5
# 每个节点启动
bin/kafka-server-start.sh -daemon config/server.properties

# 验证集群状态
bin/kafka-broker-api-versions.sh --bootstrap-server 192.168.1.101:9092

4.4 创建集群Topic

4.4.1 创建Topic

1
2
3
4
5
6
7
8
9
10
11
# 创建Topic(3个分区,3个副本)
bin/kafka-topics.sh --create \
--bootstrap-server 192.168.1.101:9092 \
--replication-factor 3 \
--partitions 3 \
--topic device-data

# 查看Topic详情
bin/kafka-topics.sh --describe \
--bootstrap-server 192.168.1.101:9092 \
--topic device-data

输出示例

1
2
3
4
Topic: device-data	PartitionCount: 3	ReplicationFactor: 3	Configs: 
Topic: device-data Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: device-data Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: device-data Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

5. Kafka与Java集成

5.1 Maven依赖

5.1.1 添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<dependencies>
<!-- Kafka Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>

<!-- Kafka Streams (可选) -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>

<!-- Log4j (日志) -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>

5.2 Producer实现

5.2.1 基础Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.example.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class DeviceDataProducer {

private static final String BOOTSTRAP_SERVERS = "192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092";
private static final String TOPIC = "device-data";

public static void main(String[] args) {
// Producer配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 性能优化配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批次大小
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待时间
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小

// 创建Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

try {
// 发送消息
for (int i = 0; i < 100; i++) {
String deviceId = "device-" + (i % 10);
String data = "{\"deviceId\":\"" + deviceId + "\",\"timestamp\":" + System.currentTimeMillis() + ",\"data\":\"value" + i + "\"}";

ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, deviceId, data);

// 异步发送(带回调)
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息发送成功: topic=" + metadata.topic()
+ ", partition=" + metadata.partition()
+ ", offset=" + metadata.offset());
} else {
System.err.println("消息发送失败: " + exception.getMessage());
}
}
});
}
} finally {
producer.close();
}
}
}

5.2.2 高级Producer(分区策略)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class AdvancedProducer {

public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DevicePartitioner.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 发送消息(指定分区)
ProducerRecord<String, String> record = new ProducerRecord<>(
TOPIC,
0, // 指定分区
"device-001",
"data"
);
producer.send(record);

producer.close();
}
}

// 自定义分区器
class DevicePartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 根据设备ID的hash值选择分区
String deviceId = (String) key;
int partitionCount = cluster.partitionCountForTopic(topic);
return Math.abs(deviceId.hashCode()) % partitionCount;
}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {}
}

5.3 Consumer实现

5.3.1 基础Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.example.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class DeviceDataConsumer {

private static final String BOOTSTRAP_SERVERS = "192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092";
private static final String TOPIC = "device-data";
private static final String GROUP_ID = "device-consumer-group";

public static void main(String[] args) {
// Consumer配置
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// 自动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

// 从最早的消息开始消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// 创建Consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅Topic
consumer.subscribe(Collections.singletonList(TOPIC));

try {
while (true) {
// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
System.out.println("收到消息: topic=" + record.topic()
+ ", partition=" + record.partition()
+ ", offset=" + record.offset()
+ ", key=" + record.key()
+ ", value=" + record.value());

// 处理消息
processMessage(record.value());
}
}
} finally {
consumer.close();
}
}

private static void processMessage(String message) {
// 处理设备数据
System.out.println("处理消息: " + message);
}
}

5.3.2 高级Consumer(手动提交)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class AdvancedConsumer {

public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// 手动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

// 每次拉取的最大记录数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
try {
// 处理消息
processMessage(record.value());
} catch (Exception e) {
System.err.println("处理消息失败: " + e.getMessage());
// 可以选择跳过或重试
}
}

// 手动提交offset
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}

5.4 Consumer Group

5.4.1 多Consumer实例

启动多个Consumer实例,组成Consumer Group

1
2
3
4
5
6
7
8
# 终端1
java -cp kafka-demo.jar com.example.kafka.DeviceDataConsumer

# 终端2
java -cp kafka-demo.jar com.example.kafka.DeviceDataConsumer

# 终端3
java -cp kafka-demo.jar com.example.kafka.DeviceDataConsumer

效果

  • 3个Consumer实例共享Topic的Partition
  • 每个Partition只被一个Consumer消费
  • 实现负载均衡

6. 高可用架构

6.1 副本机制

6.1.1 副本配置

创建Topic时指定副本数

1
2
3
4
5
bin/kafka-topics.sh --create \
--bootstrap-server 192.168.1.101:9092 \
--replication-factor 3 \
--partitions 3 \
--topic device-data

配置说明

  • replication-factor=3:每个Partition有3个副本
  • partitions=3:Topic有3个分区
  • 总共9个副本(3个分区 × 3个副本)

6.1.2 Leader和Follower

Leader

  • 处理读写请求
  • 每个Partition只有一个Leader

Follower

  • 同步Leader数据
  • Leader故障时,Follower可以成为新Leader

6.1.3 ISR(In-Sync Replicas)

ISR:与Leader保持同步的副本集合。

配置

1
2
3
# server.properties
# 最小同步副本数
min.insync.replicas=2

6.2 数据一致性

6.2.1 Producer ACK配置

ACK级别

1
2
3
4
5
6
7
8
// ACK=0:不等待确认(最快,但可能丢失数据)
props.put(ProducerConfig.ACKS_CONFIG, "0");

// ACK=1:等待Leader确认(较快,Leader故障可能丢失数据)
props.put(ProducerConfig.ACKS_CONFIG, "1");

// ACK=all:等待所有ISR副本确认(最安全,但较慢)
props.put(ProducerConfig.ACKS_CONFIG, "all");

推荐配置

1
2
3
4
// 保障数据一致性
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

6.2.2 事务支持

启用事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Properties props = new Properties();
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "device-producer-1");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 初始化事务
producer.initTransactions();

try {
producer.beginTransaction();

// 发送多条消息
producer.send(new ProducerRecord<>(TOPIC, "key1", "value1"));
producer.send(new ProducerRecord<>(TOPIC, "key2", "value2"));

// 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 回滚事务
producer.abortTransaction();
}

6.3 故障转移

6.3.1 Leader选举

自动Leader选举

  • Leader故障时,Kafka自动从ISR中选择新Leader
  • 保障服务可用性

查看Leader信息

1
2
3
bin/kafka-topics.sh --describe \
--bootstrap-server 192.168.1.101:9092 \
--topic device-data

6.3.2 副本同步

副本同步机制

  • Follower定期从Leader拉取数据
  • 保持与Leader的数据一致性

7. 性能优化

7.1 Producer优化

7.1.1 批量发送

1
2
3
4
5
6
7
8
// 批次大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16KB

// 等待时间
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 10ms

// 缓冲区大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32MB

7.1.2 压缩

1
2
// 启用压缩
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 或 "gzip", "lz4"

7.1.3 异步发送

1
2
3
4
5
6
7
// 异步发送(默认)
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
// 回调处理
}
});

7.2 Consumer优化

7.2.1 批量消费

1
2
3
4
5
// 每次拉取的最大记录数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");

// 拉取超时时间
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // 5分钟

7.2.2 Fetch配置

1
2
3
4
5
// 每次拉取的最小数据量
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); // 1KB

// 拉取超时时间
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); // 500ms

7.3 Broker优化

7.3.1 网络线程

1
2
3
4
5
6
# server.properties
# 网络线程数
num.network.threads=8

# IO线程数
num.io.threads=8

7.3.2 日志配置

1
2
3
4
5
6
7
8
# 日志段大小
log.segment.bytes=1073741824 # 1GB

# 日志保留时间
log.retention.hours=168 # 7天

# 日志保留大小
log.retention.bytes=10737418240 # 10GB

8. 监控告警

8.1 JMX监控

8.1.1 启用JMX

1
2
3
4
# 启动Kafka时启用JMX
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999"

bin/kafka-server-start.sh config/server.properties

8.1.2 关键指标

Producer指标

  • record-send-rate:消息发送速率
  • record-error-rate:消息错误率
  • request-latency-avg:请求平均延迟

Consumer指标

  • records-consumed-rate:消息消费速率
  • records-lag-max:最大延迟
  • fetch-rate:拉取速率

Broker指标

  • messages-in-rate:消息接收速率
  • bytes-in-rate:字节接收速率
  • bytes-out-rate:字节发送速率
  • under-replicated-partitions:未充分复制的分区数

8.2 Prometheus集成

8.2.1 Kafka Exporter

安装Kafka Exporter

1
2
3
wget https://github.com/danielqsj/kafka_exporter/releases/download/v1.3.1/kafka_exporter-1.3.1.linux-amd64.tar.gz
tar -xzf kafka_exporter-1.3.1.linux-amd64.tar.gz
cd kafka_exporter-1.3.1.linux-amd64

启动Kafka Exporter

1
2
3
4
./kafka_exporter --kafka.server=192.168.1.101:9092 \
--kafka.server=192.168.1.102:9092 \
--kafka.server=192.168.1.103:9092 \
--web.listen-address=:9308

Prometheus配置

1
2
3
4
5
# prometheus.yml
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['192.168.1.101:9308']

9. IoT设备监控系统架构实战

9.1 架构设计

9.1.1 整体架构

1
2
3
4
5
6
7
8
9
10
11
10万台设备

Nginx(负载均衡)

Java服务(设备上报接口)

Kafka集群(消息中间件)

多台Java服务(处理Kafka事件)

数据库/存储

9.1.2 架构说明

设备层

  • 10万台IoT设备
  • 每台设备定期上报数据(如每分钟1次)

接入层

  • Nginx负载均衡
  • 分发请求到多台Java服务

服务层

  • 多台Java服务(设备上报接口)
  • 接收设备数据,发送到Kafka

消息层

  • Kafka集群(3节点)
  • 存储设备数据

处理层

  • 多台Java服务(Kafka Consumer)
  • 处理Kafka事件,写入数据库

9.2 环境准备

9.2.1 服务器规划

最低成本配置(每台服务器管理10万台设备):

服务器 配置 数量 用途
Nginx 2核4G 2台(主备) 负载均衡
Java服务(上报) 4核8G 3台 设备上报接口
Kafka集群 4核16G 3台 消息中间件
Zookeeper 2核4G 3台 元数据管理
Java服务(处理) 4核8G 3台 处理Kafka事件
数据库 8核32G 1台(主从) 数据存储

总成本:约15台服务器(可根据实际情况调整)

9.2.2 性能估算

设备上报频率

  • 10万台设备
  • 每分钟1次上报
  • 每秒约1667次请求(10万/60)

单台Java服务处理能力

  • 4核8G服务器
  • 预计每秒处理1000-2000请求
  • 3台服务器可处理3000-6000请求/秒

Kafka吞吐量

  • 3节点Kafka集群
  • 预计每秒处理10万+消息
  • 完全满足需求

9.3 Nginx负载均衡配置

9.3.1 Nginx配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# /etc/nginx/conf.d/device-api.conf

# 设备上报服务upstream
upstream device_api {
least_conn; # 最少连接
server 192.168.1.201:8080 weight=1 max_fails=3 fail_timeout=30s;
server 192.168.1.202:8080 weight=1 max_fails=3 fail_timeout=30s;
server 192.168.1.203:8080 weight=1 max_fails=3 fail_timeout=30s;
}

server {
listen 80;
server_name api.device.example.com;

# 设备上报接口
location /api/device/report {
proxy_pass http://device_api;
include proxy_params;

# 超时配置
proxy_connect_timeout 10s;
proxy_read_timeout 30s;
proxy_send_timeout 30s;

# 限流(防止单设备异常)
limit_req zone=device_limit burst=10 nodelay;
}

# 健康检查
location /health {
access_log off;
return 200 "healthy\n";
}
}

# 限流配置
limit_req_zone $binary_remote_addr zone=device_limit:10m rate=10r/s;

9.4 Java服务(设备上报)

9.4.1 Spring Boot应用

pom.xml

1
2
3
4
5
6
7
8
9
10
11
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>

application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
server:
port: 8080

spring:
kafka:
bootstrap-servers: 192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
batch-size: 16384
linger-ms: 10
buffer-memory: 33554432

Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.example.device.controller;

import com.example.device.service.DeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/device")
public class DeviceController {

@Autowired
private DeviceService deviceService;

@PostMapping("/report")
public Response report(@RequestBody DeviceData data) {
try {
// 验证设备ID
if (!isValidDevice(data.getDeviceId())) {
return Response.error("Invalid device ID");
}

// 发送到Kafka
deviceService.sendToKafka(data);

return Response.success();
} catch (Exception e) {
return Response.error("Failed to report: " + e.getMessage());
}
}

private boolean isValidDevice(String deviceId) {
// 验证设备ID逻辑
return deviceId != null && deviceId.startsWith("device-");
}
}

Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.example.device.service;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Properties;

@Service
public class DeviceService {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

private KafkaProducer<String, String> producer;
private static final String TOPIC = "device-data";

@PostConstruct
public void init() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);

producer = new KafkaProducer<>(props);
}

public void sendToKafka(DeviceData data) {
// 使用设备ID作为key,保证同一设备的数据发送到同一分区
ProducerRecord<String, String> record = new ProducerRecord<>(
TOPIC,
data.getDeviceId(),
data.toJson()
);

producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("发送失败: " + exception.getMessage());
}
});
}

@PreDestroy
public void destroy() {
if (producer != null) {
producer.close();
}
}
}

9.5 Kafka集群配置

9.5.1 Topic配置

1
2
3
4
5
6
7
8
# 创建设备数据Topic
bin/kafka-topics.sh --create \
--bootstrap-server 192.168.1.101:9092 \
--replication-factor 3 \
--partitions 10 \
--topic device-data \
--config retention.ms=604800000 \
--config segment.ms=3600000

配置说明

  • partitions=10:10个分区,支持10个Consumer并发消费
  • replication-factor=3:3个副本,保障高可用
  • retention.ms=604800000:保留7天
  • segment.ms=3600000:1小时一个段

9.6 Java服务(处理Kafka事件)

9.6.1 Consumer服务

application.yml

1
2
3
4
5
6
7
8
9
10
spring:
kafka:
bootstrap-servers: 192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092
consumer:
group-id: device-processor-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 500

Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.example.processor.service;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class DeviceProcessorService {

@KafkaListener(topics = "device-data", groupId = "device-processor-group")
public void processDeviceData(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
// 解析设备数据
DeviceData data = DeviceData.fromJson(record.value());

// 处理数据(写入数据库、计算、告警等)
processData(data);

// 手动提交offset
ack.acknowledge();
} catch (Exception e) {
System.err.println("处理失败: " + e.getMessage());
// 可以选择重试或记录到死信队列
}
}

private void processData(DeviceData data) {
// 1. 数据验证
validateData(data);

// 2. 写入数据库
saveToDatabase(data);

// 3. 实时计算(如统计、聚合)
calculateMetrics(data);

// 4. 告警检查
checkAlerts(data);
}

private void validateData(DeviceData data) {
// 验证数据有效性
}

private void saveToDatabase(DeviceData data) {
// 批量写入数据库
}

private void calculateMetrics(DeviceData data) {
// 实时计算指标
}

private void checkAlerts(DeviceData data) {
// 检查告警条件
}
}

9.7 数据一致性保障

9.7.1 Producer配置

1
2
3
4
5
// 保障数据一致性
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

9.7.2 Consumer配置

1
2
3
4
5
// 手动提交offset,保障数据不丢失
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

// 处理完消息后手动提交
consumer.commitSync();

9.8 稳定性保障

9.8.1 服务高可用

1. Nginx主备

  • 使用Keepalived实现主备切换

2. Java服务多实例

  • 每台服务器部署多个实例
  • 使用Nginx负载均衡

3. Kafka集群

  • 3节点集群,保障高可用

9.8.2 监控告警

关键指标

  • 设备上报成功率
  • Kafka消息积压
  • 处理延迟
  • 服务可用性

告警规则

  • 设备上报失败率 > 1%
  • Kafka消息积压 > 10000
  • 处理延迟 > 5秒
  • 服务不可用

10. 总结

10.1 核心要点

  1. Kafka基础:Topic、Partition、Replica、Producer、Consumer
  2. 集群部署:Zookeeper集群、Kafka集群
  3. Java集成:Producer、Consumer、Consumer Group
  4. 高可用:副本机制、Leader选举、故障转移
  5. 性能优化:批量发送、压缩、异步处理
  6. 监控告警:JMX监控、Prometheus集成
  7. 实战案例:IoT设备监控系统架构

10.2 架构师建议

  1. 集群规划

    • 至少3节点Kafka集群
    • 副本数至少为2
    • 分区数根据并发需求设置
  2. 数据一致性

    • Producer使用acks=all
    • Consumer手动提交offset
    • 启用幂等性
  3. 性能优化

    • 批量发送和消费
    • 启用压缩
    • 合理设置分区数
  4. 监控告警

    • 监控消息积压
    • 监控处理延迟
    • 监控服务可用性

10.3 最佳实践

  1. 标准化:统一Kafka配置标准
  2. 自动化:自动化部署和监控
  3. 监控化:实时监控和告警
  4. 文档化:维护配置文档和架构图

相关文章