1. Kafka迁移概述

Kafka消息队列迁移是运维工作中的重要环节,涉及数据迁移、集群管理、性能优化和故障处理等多个方面。本文将详细介绍Kafka消息队列迁移与运维的实战经验,包括数据迁移策略、集群管理、性能优化、故障处理的完整解决方案。

1.1 核心功能

  1. 数据迁移: 不同Kafka集群间的数据迁移
  2. 集群管理: Kafka集群的部署、监控和管理
  3. 性能优化: 消息队列性能调优和监控
  4. 故障处理: 常见故障的诊断和处理
  5. 运维自动化: 自动化运维脚本和工具

1.2 技术架构

1
2
3
源Kafka → 数据导出 → 数据转换 → 目标Kafka → 数据验证
↓ ↓ ↓ ↓ ↓
备份策略 → 迁移工具 → 格式转换 → 集群部署 → 一致性检查

2. 环境准备

2.1 系统要求检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#!/bin/bash
# check_kafka_env.sh - Kafka环境检查脚本
# @author 运维实战

# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}

# 检查系统版本
check_os_version() {
log "检查系统版本..."

if [ -f /etc/os-release ]; then
. /etc/os-release
log "操作系统: $NAME $VERSION"
else
log "无法确定操作系统版本"
fi
}

# 检查Java版本
check_java_version() {
log "检查Java版本..."

JAVA_VERSION=$(java -version 2>&1 | head -1 | cut -d'"' -f2)
log "Java版本: $JAVA_VERSION"

# 检查Java版本是否符合要求
if [[ $JAVA_VERSION == 1.8* ]] || [[ $JAVA_VERSION == 11* ]] || [[ $JAVA_VERSION == 17* ]]; then
log "Java版本符合要求"
else
log "Java版本不符合要求,需要Java 8+"
exit 1
fi
}

# 检查Kafka版本
check_kafka_version() {
log "检查Kafka版本..."

# 检查源Kafka版本
SOURCE_VERSION=$(kafka-topics.sh --bootstrap-server source-kafka-server:9092 --version 2>/dev/null | head -1)
log "源Kafka版本: $SOURCE_VERSION"

# 检查目标Kafka版本
TARGET_VERSION=$(kafka-topics.sh --bootstrap-server target-kafka-server:9092 --version 2>/dev/null | head -1)
log "目标Kafka版本: $TARGET_VERSION"
}

# 检查Kafka连接
check_kafka_connection() {
log "检查Kafka连接..."

# 检查源Kafka连接
if kafka-topics.sh --bootstrap-server source-kafka-server:9092 --list > /dev/null 2>&1; then
log "源Kafka连接正常"
else
log "源Kafka连接失败"
return 1
fi

# 检查目标Kafka连接
if kafka-topics.sh --bootstrap-server target-kafka-server:9092 --list > /dev/null 2>&1; then
log "目标Kafka连接正常"
else
log "目标Kafka连接失败"
return 1
fi
}

# 检查Topic信息
check_topic_info() {
log "检查Topic信息..."

# 获取源Kafka的Topic列表
SOURCE_TOPICS=$(kafka-topics.sh --bootstrap-server source-kafka-server:9092 --list)
log "源Kafka Topics: $SOURCE_TOPICS"

# 获取目标Kafka的Topic列表
TARGET_TOPICS=$(kafka-topics.sh --bootstrap-server target-kafka-server:9092 --list)
log "目标Kafka Topics: $TARGET_TOPICS"
}

# 检查磁盘空间
check_disk_space() {
log "检查磁盘空间..."

df -h | grep -E "(/$|/var|/tmp|/home)" | while read line; do
log "磁盘使用: $line"
done
}

# 主函数
main() {
log "开始Kafka环境检查..."

check_os_version
check_java_version
check_kafka_version

if check_kafka_connection; then
check_topic_info
else
log "Kafka连接检查失败"
exit 1
fi

check_disk_space

log "Kafka环境检查完成"
}

# 执行主函数
main "$@"

3. 数据迁移脚本

3.1 Kafka数据迁移脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#!/bin/bash
# kafka_migration.sh - Kafka数据迁移脚本
# @author 运维实战

# 配置参数
SOURCE_BOOTSTRAP_SERVERS="source-kafka-server:9092"
TARGET_BOOTSTRAP_SERVERS="target-kafka-server:9092"
MIGRATION_TOPICS="topic1,topic2,topic3"
BACKUP_DIR="/backup/kafka"
LOG_FILE="/var/log/kafka_migration.log"
CONSUMER_GROUP="migration-consumer-group"

# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a $LOG_FILE
}

# 创建备份目录
create_backup_dir() {
log "创建备份目录..."

BACKUP_DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_PATH="$BACKUP_DIR/$BACKUP_DATE"

mkdir -p $BACKUP_PATH

if [ $? -eq 0 ]; then
log "备份目录创建成功: $BACKUP_PATH"
else
log "备份目录创建失败"
exit 1
fi
}

# 导出Topic配置
export_topic_config() {
log "导出Topic配置..."

TOPIC_CONFIG_FILE="$BACKUP_PATH/topic_config.json"

# 获取Topic配置
kafka-topics.sh --bootstrap-server $SOURCE_BOOTSTRAP_SERVERS --describe > $TOPIC_CONFIG_FILE

if [ $? -eq 0 ]; then
log "Topic配置导出成功: $TOPIC_CONFIG_FILE"
else
log "Topic配置导出失败"
exit 1
fi
}

# 创建目标Topic
create_target_topics() {
log "创建目标Topic..."

# 解析Topic列表
IFS=',' read -ra TOPICS <<< "$MIGRATION_TOPICS"

for topic in "${TOPICS[@]}"; do
log "创建目标Topic: $topic"

# 获取源Topic的配置
SOURCE_CONFIG=$(kafka-topics.sh --bootstrap-server $SOURCE_BOOTSTRAP_SERVERS --describe --topic $topic)

# 提取分区数和副本数
PARTITIONS=$(echo $SOURCE_CONFIG | grep -o "PartitionCount: [0-9]*" | cut -d' ' -f2)
REPLICATION_FACTOR=$(echo $SOURCE_CONFIG | grep -o "ReplicationFactor: [0-9]*" | cut -d' ' -f2)

log "Topic $topic 配置: 分区数=$PARTITIONS, 副本数=$REPLICATION_FACTOR"

# 创建目标Topic
kafka-topics.sh --bootstrap-server $TARGET_BOOTSTRAP_SERVERS \
--create \
--topic $topic \
--partitions $PARTITIONS \
--replication-factor $REPLICATION_FACTOR

if [ $? -eq 0 ]; then
log "目标Topic创建成功: $topic"
else
log "目标Topic创建失败: $topic"
exit 1
fi
done
}

# 执行数据迁移
execute_data_migration() {
log "执行数据迁移..."

# 解析Topic列表
IFS=',' read -ra TOPICS <<< "$MIGRATION_TOPICS"

for topic in "${TOPICS[@]}"; do
log "开始迁移Topic: $topic"

# 使用kafka-mirror-maker进行数据迁移
kafka-mirror-maker.sh \
--consumer.config /etc/kafka/consumer.properties \
--producer.config /etc/kafka/producer.properties \
--whitelist $topic \
--num.streams 4 \
--offset.commit.interval.ms 10000

if [ $? -eq 0 ]; then
log "Topic $topic 迁移成功"
else
log "Topic $topic 迁移失败"
exit 1
fi
done
}

# 验证数据迁移
verify_migration() {
log "验证数据迁移..."

# 解析Topic列表
IFS=',' read -ra TOPICS <<< "$MIGRATION_TOPICS"

for topic in "${TOPICS[@]}"; do
log "验证Topic: $topic"

# 比较消息数量
SOURCE_COUNT=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
--bootstrap-server $SOURCE_BOOTSTRAP_SERVERS \
--topic $topic \
--time -1 | awk -F: '{sum += $3} END {print sum}')

TARGET_COUNT=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
--bootstrap-server $TARGET_BOOTSTRAP_SERVERS \
--topic $topic \
--time -1 | awk -F: '{sum += $3} END {print sum}')

log "Topic $topic 消息数量 - 源: $SOURCE_COUNT, 目标: $TARGET_COUNT"

if [ $SOURCE_COUNT -eq $TARGET_COUNT ]; then
log "Topic $topic 消息数量验证通过"
else
log "Topic $topic 消息数量验证失败"
exit 1
fi
done
}

# 生成迁移报告
generate_migration_report() {
log "生成迁移报告..."

REPORT_FILE="$BACKUP_PATH/migration_report.txt"

echo "Kafka数据迁移报告" > $REPORT_FILE
echo "迁移时间: $(date)" >> $REPORT_FILE
echo "源Kafka: $SOURCE_BOOTSTRAP_SERVERS" >> $REPORT_FILE
echo "目标Kafka: $TARGET_BOOTSTRAP_SERVERS" >> $REPORT_FILE
echo "迁移Topics: $MIGRATION_TOPICS" >> $REPORT_FILE
echo "================================" >> $REPORT_FILE

# 添加Topic信息
echo "Topic信息:" >> $REPORT_FILE
kafka-topics.sh --bootstrap-server $TARGET_BOOTSTRAP_SERVERS --describe >> $REPORT_FILE

log "迁移报告生成: $REPORT_FILE"
}

# 主函数
main() {
log "开始Kafka数据迁移..."

# 创建备份目录
create_backup_dir

# 导出Topic配置
export_topic_config

# 创建目标Topic
create_target_topics

# 执行数据迁移
execute_data_migration

# 验证数据迁移
verify_migration

# 生成迁移报告
generate_migration_report

log "Kafka数据迁移完成"
}

# 执行主函数
main "$@"

4. 集群管理脚本

4.1 Kafka集群部署脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#!/bin/bash
# kafka_cluster_deploy.sh - Kafka集群部署脚本
# @author 运维实战

# 集群配置
KAFKA_SERVERS=(
"192.168.1.10"
"192.168.1.11"
"192.168.1.12"
)

ZOOKEEPER_SERVERS=(
"192.168.1.10:2181"
"192.168.1.11:2181"
"192.168.1.12:2181"
)

KAFKA_VERSION="2.8.0"
KAFKA_USER="kafka"
KAFKA_HOME="/opt/kafka"
CLUSTER_CONFIG_DIR="/etc/kafka/cluster"

# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}

# 安装Kafka
install_kafka() {
log "安装Kafka $KAFKA_VERSION..."

# 下载Kafka源码
cd /tmp
wget https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_2.13-${KAFKA_VERSION}.tgz
tar xzf kafka_2.13-${KAFKA_VERSION}.tgz
mv kafka_2.13-${KAFKA_VERSION} $KAFKA_HOME

if [ $? -eq 0 ]; then
log "Kafka安装成功"
else
log "Kafka安装失败"
exit 1
fi
}

# 创建Kafka用户
create_kafka_user() {
log "创建Kafka用户..."

if ! id $KAFKA_USER &>/dev/null; then
useradd -r -s /bin/false $KAFKA_USER
log "Kafka用户创建成功"
else
log "Kafka用户已存在"
fi
}

# 创建目录结构
create_directories() {
log "创建目录结构..."

# 创建Kafka主目录
mkdir -p $KAFKA_HOME/{bin,conf,logs,data}

# 创建集群配置目录
mkdir -p $CLUSTER_CONFIG_DIR

# 设置权限
chown -R $KAFKA_USER:$KAFKA_USER $KAFKA_HOME
chown -R $KAFKA_USER:$KAFKA_USER $CLUSTER_CONFIG_DIR

log "目录结构创建完成"
}

# 生成ZooKeeper配置文件
generate_zookeeper_config() {
log "生成ZooKeeper配置文件..."

for i in "${!KAFKA_SERVERS[@]}"; do
server=${KAFKA_SERVERS[$i]}
server_id=$((i + 1))

# 生成ZooKeeper配置文件
cat > $CLUSTER_CONFIG_DIR/zookeeper-${server_id}.properties << EOF
# ZooKeeper配置文件
dataDir=/var/lib/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false

# 集群配置
server.1=192.168.1.10:2888:3888
server.2=192.168.1.11:2888:3888
server.3=192.168.1.12:2888:3888

# 日志配置
4lw.commands.whitelist=*
EOF

log "ZooKeeper配置文件生成: zookeeper-${server_id}.properties"
done
}

# 生成Kafka配置文件
generate_kafka_config() {
log "生成Kafka配置文件..."

for i in "${!KAFKA_SERVERS[@]}"; do
server=${KAFKA_SERVERS[$i]}
server_id=$((i + 1))

# 生成Kafka配置文件
cat > $CLUSTER_CONFIG_DIR/server-${server_id}.properties << EOF
# Kafka配置文件
broker.id=${server_id}
listeners=PLAINTEXT://${server}:9092
advertised.listeners=PLAINTEXT://${server}:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

log "Kafka配置文件生成: server-${server_id}.properties"
done
}

# 启动ZooKeeper
start_zookeeper() {
log "启动ZooKeeper..."

for i in "${!KAFKA_SERVERS[@]}"; do
server=${KAFKA_SERVERS[$i]}
server_id=$((i + 1))

# 创建ZooKeeper数据目录
mkdir -p /var/lib/zookeeper
echo $server_id > /var/lib/zookeeper/myid

# 启动ZooKeeper
$KAFKA_HOME/bin/zookeeper-server-start.sh -daemon $CLUSTER_CONFIG_DIR/zookeeper-${server_id}.properties

if [ $? -eq 0 ]; then
log "ZooKeeper启动成功: $server"
else
log "ZooKeeper启动失败: $server"
exit 1
fi
done
}

# 启动Kafka
start_kafka() {
log "启动Kafka..."

# 等待ZooKeeper启动
sleep 10

for i in "${!KAFKA_SERVERS[@]}"; do
server=${KAFKA_SERVERS[$i]}
server_id=$((i + 1))

# 创建Kafka日志目录
mkdir -p /var/lib/kafka-logs

# 启动Kafka
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $CLUSTER_CONFIG_DIR/server-${server_id}.properties

if [ $? -eq 0 ]; then
log "Kafka启动成功: $server"
else
log "Kafka启动失败: $server"
exit 1
fi
done
}

# 验证集群
verify_cluster() {
log "验证Kafka集群..."

# 等待Kafka启动
sleep 15

# 检查集群状态
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server 192.168.1.10:9092 --list

# 检查集群信息
$KAFKA_HOME/bin/kafka-broker-api-versions.sh --bootstrap-server 192.168.1.10:9092

log "集群验证完成"
}

# 主函数
main() {
log "开始Kafka集群部署..."

# 安装Kafka
install_kafka

# 创建Kafka用户
create_kafka_user

# 创建目录结构
create_directories

# 生成配置文件
generate_zookeeper_config
generate_kafka_config

# 启动服务
start_zookeeper
start_kafka

# 验证集群
verify_cluster

log "Kafka集群部署完成"
}

# 执行主函数
main "$@"

5. 性能优化脚本

5.1 Kafka性能调优脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/bin/bash
# kafka_performance_tune.sh - Kafka性能调优脚本
# @author 运维实战

# 配置参数
KAFKA_BOOTSTRAP_SERVERS="localhost:9092"
LOG_FILE="/var/log/kafka_performance_tune.log"

# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a $LOG_FILE
}

# 分析Kafka性能
analyze_performance() {
log "分析Kafka性能..."

# 获取性能指标
kafka-log-dirs.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --describe

# 获取Topic信息
kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --describe

# 获取消费者组信息
kafka-consumer-groups.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list
}

# 优化Topic配置
optimize_topic_config() {
log "优化Topic配置..."

# 获取所有Topic
TOPICS=$(kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list)

for topic in $TOPICS; do
log "优化Topic: $topic"

# 设置Topic配置
kafka-configs.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS \
--entity-type topics \
--entity-name $topic \
--alter \
--add-config retention.ms=604800000,segment.ms=3600000,compression.type=lz4

if [ $? -eq 0 ]; then
log "Topic $topic 配置优化成功"
else
log "Topic $topic 配置优化失败"
fi
done
}

# 优化消费者组配置
optimize_consumer_config() {
log "优化消费者组配置..."

# 获取所有消费者组
CONSUMER_GROUPS=$(kafka-consumer-groups.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list)

for group in $CONSUMER_GROUPS; do
log "优化消费者组: $group"

# 设置消费者组配置
kafka-configs.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS \
--entity-type clients \
--entity-name $group \
--alter \
--add-config consumer.request.timeout.ms=30000,consumer.fetch.min.bytes=1,consumer.fetch.max.wait.ms=500

if [ $? -eq 0 ]; then
log "消费者组 $group 配置优化成功"
else
log "消费者组 $group 配置优化失败"
fi
done
}

# 优化生产者配置
optimize_producer_config() {
log "优化生产者配置..."

# 设置生产者配置
kafka-configs.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS \
--entity-type clients \
--entity-name default \
--alter \
--add-config producer.batch.size=16384,producer.linger.ms=5,producer.compression.type=lz4

if [ $? -eq 0 ]; then
log "生产者配置优化成功"
else
log "生产者配置优化失败"
fi
}

# 优化Broker配置
optimize_broker_config() {
log "优化Broker配置..."

# 设置Broker配置
kafka-configs.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS \
--entity-type brokers \
--entity-name 1 \
--alter \
--add-config num.network.threads=8,num.io.threads=16,socket.send.buffer.bytes=102400,socket.receive.buffer.bytes=102400

if [ $? -eq 0 ]; then
log "Broker配置优化成功"
else
log "Broker配置优化失败"
fi
}

# 监控性能指标
monitor_performance() {
log "监控性能指标..."

# 监控Topic性能
kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --describe | grep -E "(Topic|PartitionCount|ReplicationFactor)"

# 监控消费者组性能
kafka-consumer-groups.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --describe --all-groups

# 监控Broker性能
kafka-broker-api-versions.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS
}

# 主函数
main() {
log "开始Kafka性能调优..."

# 分析性能
analyze_performance

# 优化Topic配置
optimize_topic_config

# 优化消费者组配置
optimize_consumer_config

# 优化生产者配置
optimize_producer_config

# 优化Broker配置
optimize_broker_config

# 监控性能指标
monitor_performance

log "Kafka性能调优完成"
}

# 执行主函数
main "$@"

6. 故障处理脚本

6.1 Kafka故障诊断脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#!/bin/bash
# kafka_troubleshoot.sh - Kafka故障诊断脚本
# @author 运维实战

# 配置参数
KAFKA_BOOTSTRAP_SERVERS="localhost:9092"
ZOOKEEPER_SERVERS="localhost:2181"
LOG_FILE="/var/log/kafka_troubleshoot.log"

# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a $LOG_FILE
}

# 检查Kafka服务状态
check_kafka_service() {
log "检查Kafka服务状态..."

# 检查Kafka进程
KAFKA_PID=$(ps aux | grep kafka | grep -v grep | awk '{print $2}')

if [ -n "$KAFKA_PID" ]; then
log "Kafka进程运行中: PID $KAFKA_PID"
return 0
else
log "Kafka进程未运行"
return 1
fi
}

# 检查ZooKeeper服务状态
check_zookeeper_service() {
log "检查ZooKeeper服务状态..."

# 检查ZooKeeper进程
ZOOKEEPER_PID=$(ps aux | grep zookeeper | grep -v grep | awk '{print $2}')

if [ -n "$ZOOKEEPER_PID" ]; then
log "ZooKeeper进程运行中: PID $ZOOKEEPER_PID"
return 0
else
log "ZooKeeper进程未运行"
return 1
fi
}

# 检查Kafka连接
check_kafka_connection() {
log "检查Kafka连接..."

if kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list > /dev/null 2>&1; then
log "Kafka连接正常"
return 0
else
log "Kafka连接失败"
return 1
fi
}

# 检查ZooKeeper连接
check_zookeeper_connection() {
log "检查ZooKeeper连接..."

if echo "ruok" | nc $ZOOKEEPER_SERVERS > /dev/null 2>&1; then
log "ZooKeeper连接正常"
return 0
else
log "ZooKeeper连接失败"
return 1
fi
}

# 检查Kafka日志
check_kafka_logs() {
log "检查Kafka日志..."

# 查找Kafka日志文件
KAFKA_LOG_DIR="/opt/kafka/logs"

if [ -d "$KAFKA_LOG_DIR" ]; then
log "Kafka日志目录: $KAFKA_LOG_DIR"

# 检查最近的错误
find $KAFKA_LOG_DIR -name "*.log" -exec tail -50 {} \; | grep -i error
else
log "Kafka日志目录不存在: $KAFKA_LOG_DIR"
fi
}

# 检查系统资源
check_system_resources() {
log "检查系统资源..."

# 检查内存使用
free -h

# 检查磁盘空间
df -h

# 检查CPU使用
top -bn1 | grep "Cpu(s)"

# 检查I/O使用
iostat -x 1 1
}

# 检查网络连接
check_network() {
log "检查网络连接..."

# 检查端口监听
netstat -tlnp | grep -E "(9092|2181)"

# 检查防火墙
iptables -L | grep -E "(9092|2181)"
}

# 检查Topic状态
check_topic_status() {
log "检查Topic状态..."

# 检查Topic列表
kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list

# 检查Topic详情
kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --describe

# 检查消费者组
kafka-consumer-groups.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list
}

# 修复常见问题
fix_common_issues() {
log "修复常见问题..."

# 检查并修复权限问题
KAFKA_DATA_DIR="/var/lib/kafka-logs"

if [ -d "$KAFKA_DATA_DIR" ]; then
chown -R kafka:kafka $KAFKA_DATA_DIR
chmod 755 $KAFKA_DATA_DIR
log "修复Kafka数据目录权限"
fi

# 检查并修复ZooKeeper数据目录权限
ZOOKEEPER_DATA_DIR="/var/lib/zookeeper"

if [ -d "$ZOOKEEPER_DATA_DIR" ]; then
chown -R kafka:kafka $ZOOKEEPER_DATA_DIR
chmod 755 $ZOOKEEPER_DATA_DIR
log "修复ZooKeeper数据目录权限"
fi
}

# 重启Kafka服务
restart_kafka_service() {
log "重启Kafka服务..."

# 停止Kafka服务
systemctl stop kafka

# 停止ZooKeeper服务
systemctl stop zookeeper

# 等待服务停止
sleep 5

# 启动ZooKeeper服务
systemctl start zookeeper

# 等待ZooKeeper启动
sleep 10

# 启动Kafka服务
systemctl start kafka

# 检查服务状态
if systemctl is-active --quiet kafka && systemctl is-active --quiet zookeeper; then
log "Kafka服务重启成功"
return 0
else
log "Kafka服务重启失败"
return 1
fi
}

# 主函数
main() {
log "开始Kafka故障诊断..."

# 检查Kafka服务状态
if ! check_kafka_service; then
log "Kafka服务未运行,尝试启动..."
systemctl start kafka
sleep 5
fi

# 检查ZooKeeper服务状态
if ! check_zookeeper_service; then
log "ZooKeeper服务未运行,尝试启动..."
systemctl start zookeeper
sleep 5
fi

# 检查Kafka连接
if ! check_kafka_connection; then
log "Kafka连接失败,检查配置..."
check_kafka_logs
check_system_resources
check_network

# 修复常见问题
fix_common_issues

# 重启服务
restart_kafka_service
else
log "Kafka服务运行正常"
check_topic_status
fi

log "Kafka故障诊断完成"
}

# 执行主函数
main "$@"

7. 总结

Kafka消息队列迁移与运维是运维工作中的重要组成部分。通过本文的详细介绍,我们了解了:

  1. 数据迁移: 使用kafka-mirror-maker进行数据迁移
  2. 集群管理: Kafka集群的部署、监控和管理
  3. 性能优化: 消息队列性能调优和监控
  4. 故障处理: 常见故障的诊断和处理
  5. 运维自动化: 脚本化运维提高效率

通过合理的运维策略和工具,可以确保Kafka消息队列的稳定运行和高性能。


运维实战要点:

  • 数据迁移前做好备份,确保数据安全
  • 集群部署时注意网络配置和防火墙设置
  • 定期监控集群状态,及时发现问题
  • 性能调优需要根据实际业务场景进行
  • 故障处理要有完整的诊断流程

代码注解说明:

  • 日志函数: 统一日志格式,便于问题追踪
  • 错误处理: 完善的错误检查和异常处理
  • 配置管理: 灵活的配置参数管理
  • 监控告警: 实时监控和告警机制
  • 自动化运维: 脚本化运维提高效率