前言

Kafka集群搭建作为企业级消息中间件的核心能力之一,直接影响着系统的消息处理能力和数据可靠性。通过高可用的集群部署策略,完善的性能调优机制,能够构建稳定可靠的消息队列系统,保障企业级应用的高并发处理能力。本文从集群架构设计到部署实施,从基础原理到企业级实践,系统梳理Kafka集群搭建的完整解决方案。

一、Kafka集群架构设计

1.1 集群整体架构

1.2 集群核心组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/**
* Kafka集群核心组件
*/
@Component
public class KafkaClusterManager {

@Autowired
private BrokerManager brokerManager;

@Autowired
private TopicManager topicManager;

@Autowired
private ConsumerGroupManager consumerGroupManager;

@Autowired
private ClusterMonitorService clusterMonitorService;

@Autowired
private SecurityManager securityManager;

/**
* 初始化Kafka集群
*/
public void initializeCluster() {
try {
// 1. 初始化Zookeeper连接
initializeZookeeperConnection();

// 2. 启动Broker节点
startBrokerNodes();

// 3. 创建系统Topic
createSystemTopics();

// 4. 配置安全认证
configureSecurity();

// 5. 启动监控服务
startMonitoringService();

log.info("Kafka集群初始化完成");

} catch (Exception e) {
log.error("Kafka集群初始化失败", e);
throw new ClusterInitializationException("集群初始化失败", e);
}
}

/**
* 启动Kafka集群
*/
public void startCluster() {
try {
// 1. 检查集群状态
checkClusterStatus();

// 2. 启动所有Broker
brokerManager.startAllBrokers();

// 3. 验证集群健康状态
validateClusterHealth();

// 4. 启动监控
clusterMonitorService.startMonitoring();

log.info("Kafka集群启动成功");

} catch (Exception e) {
log.error("Kafka集群启动失败", e);
throw new ClusterStartException("集群启动失败", e);
}
}

/**
* 停止Kafka集群
*/
public void stopCluster() {
try {
// 1. 停止监控
clusterMonitorService.stopMonitoring();

// 2. 停止所有Broker
brokerManager.stopAllBrokers();

// 3. 关闭Zookeeper连接
closeZookeeperConnection();

log.info("Kafka集群停止成功");

} catch (Exception e) {
log.error("Kafka集群停止失败", e);
}
}

/**
* 初始化Zookeeper连接
*/
private void initializeZookeeperConnection() {
// 实现Zookeeper连接初始化逻辑
log.info("初始化Zookeeper连接");
}

/**
* 启动Broker节点
*/
private void startBrokerNodes() {
// 实现Broker节点启动逻辑
log.info("启动Broker节点");
}

/**
* 创建系统Topic
*/
private void createSystemTopics() {
// 实现系统Topic创建逻辑
log.info("创建系统Topic");
}

/**
* 配置安全认证
*/
private void configureSecurity() {
// 实现安全认证配置逻辑
log.info("配置安全认证");
}

/**
* 启动监控服务
*/
private void startMonitoringService() {
// 实现监控服务启动逻辑
log.info("启动监控服务");
}
}

二、Kafka集群部署实施

2.1 Broker节点管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
/**
* Kafka Broker节点管理器
*/
@Service
public class BrokerManager {

@Autowired
private BrokerConfigService brokerConfigService;

@Autowired
private BrokerDeploymentService brokerDeploymentService;

@Autowired
private BrokerHealthChecker brokerHealthChecker;

private final Map<String, BrokerNode> brokerNodes;

public BrokerManager() {
this.brokerNodes = new ConcurrentHashMap<>();
}

/**
* 启动所有Broker
*/
public void startAllBrokers() {
try {
// 1. 获取Broker配置
List<BrokerConfig> brokerConfigs = brokerConfigService.getAllBrokerConfigs();

// 2. 启动每个Broker
for (BrokerConfig config : brokerConfigs) {
startBroker(config);
}

// 3. 等待所有Broker启动完成
waitForBrokersToStart();

// 4. 验证集群状态
validateClusterState();

log.info("所有Broker启动完成,数量: {}", brokerConfigs.size());

} catch (Exception e) {
log.error("Broker启动失败", e);
throw new BrokerStartException("Broker启动失败", e);
}
}

/**
* 停止所有Broker
*/
public void stopAllBrokers() {
try {
// 1. 优雅关闭所有Broker
for (BrokerNode brokerNode : brokerNodes.values()) {
stopBroker(brokerNode);
}

// 2. 等待所有Broker停止完成
waitForBrokersToStop();

log.info("所有Broker停止完成");

} catch (Exception e) {
log.error("Broker停止失败", e);
}
}

/**
* 启动单个Broker
*/
private void startBroker(BrokerConfig config) {
try {
// 1. 创建Broker节点
BrokerNode brokerNode = createBrokerNode(config);

// 2. 部署Broker
brokerDeploymentService.deployBroker(brokerNode);

// 3. 启动Broker进程
brokerDeploymentService.startBrokerProcess(brokerNode);

// 4. 等待Broker启动
waitForBrokerToStart(brokerNode);

// 5. 验证Broker健康状态
validateBrokerHealth(brokerNode);

// 6. 注册Broker节点
brokerNodes.put(brokerNode.getBrokerId(), brokerNode);

log.info("Broker启动成功: {}", brokerNode.getBrokerId());

} catch (Exception e) {
log.error("Broker启动失败: {}", config.getBrokerId(), e);
throw new BrokerStartException("Broker启动失败", e);
}
}

/**
* 停止单个Broker
*/
private void stopBroker(BrokerNode brokerNode) {
try {
// 1. 优雅关闭Broker进程
brokerDeploymentService.stopBrokerProcess(brokerNode);

// 2. 等待Broker停止
waitForBrokerToStop(brokerNode);

// 3. 清理Broker资源
brokerDeploymentService.cleanupBrokerResources(brokerNode);

// 4. 注销Broker节点
brokerNodes.remove(brokerNode.getBrokerId());

log.info("Broker停止成功: {}", brokerNode.getBrokerId());

} catch (Exception e) {
log.error("Broker停止失败: {}", brokerNode.getBrokerId(), e);
}
}

/**
* 创建Broker节点
*/
private BrokerNode createBrokerNode(BrokerConfig config) {
BrokerNode brokerNode = new BrokerNode();
brokerNode.setBrokerId(config.getBrokerId());
brokerNode.setHost(config.getHost());
brokerNode.setPort(config.getPort());
brokerNode.setLogDir(config.getLogDir());
brokerNode.setConfig(config);
brokerNode.setStatus(BrokerStatus.STARTING);
brokerNode.setStartTime(System.currentTimeMillis());

return brokerNode;
}

/**
* 等待Broker启动
*/
private void waitForBrokerToStart(BrokerNode brokerNode) {
int maxWaitTime = 30000; // 30秒
int checkInterval = 1000; // 1秒
int waitedTime = 0;

while (waitedTime < maxWaitTime) {
try {
if (brokerHealthChecker.isBrokerHealthy(brokerNode)) {
brokerNode.setStatus(BrokerStatus.RUNNING);
return;
}

Thread.sleep(checkInterval);
waitedTime += checkInterval;

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BrokerStartException("等待Broker启动被中断", e);
}
}

throw new BrokerStartException("Broker启动超时: " + brokerNode.getBrokerId());
}

/**
* 等待Broker停止
*/
private void waitForBrokerToStop(BrokerNode brokerNode) {
int maxWaitTime = 30000; // 30秒
int checkInterval = 1000; // 1秒
int waitedTime = 0;

while (waitedTime < maxWaitTime) {
try {
if (!brokerHealthChecker.isBrokerRunning(brokerNode)) {
brokerNode.setStatus(BrokerStatus.STOPPED);
return;
}

Thread.sleep(checkInterval);
waitedTime += checkInterval;

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BrokerStopException("等待Broker停止被中断", e);
}
}

throw new BrokerStopException("Broker停止超时: " + brokerNode.getBrokerId());
}

/**
* 验证Broker健康状态
*/
private void validateBrokerHealth(BrokerNode brokerNode) {
try {
BrokerHealthStatus healthStatus = brokerHealthChecker.checkBrokerHealth(brokerNode);

if (!healthStatus.isHealthy()) {
throw new BrokerHealthException("Broker健康检查失败: " + brokerNode.getBrokerId());
}

brokerNode.setHealthStatus(healthStatus);

} catch (Exception e) {
log.error("Broker健康检查失败: {}", brokerNode.getBrokerId(), e);
throw new BrokerHealthException("Broker健康检查失败", e);
}
}

/**
* 等待所有Broker启动完成
*/
private void waitForBrokersToStart() {
int maxWaitTime = 120000; // 2分钟
int checkInterval = 5000; // 5秒
int waitedTime = 0;

while (waitedTime < maxWaitTime) {
try {
boolean allBrokersRunning = brokerNodes.values().stream()
.allMatch(broker -> broker.getStatus() == BrokerStatus.RUNNING);

if (allBrokersRunning) {
return;
}

Thread.sleep(checkInterval);
waitedTime += checkInterval;

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BrokerStartException("等待所有Broker启动被中断", e);
}
}

throw new BrokerStartException("等待所有Broker启动超时");
}

/**
* 等待所有Broker停止完成
*/
private void waitForBrokersToStop() {
int maxWaitTime = 120000; // 2分钟
int checkInterval = 5000; // 5秒
int waitedTime = 0;

while (waitedTime < maxWaitTime) {
try {
boolean allBrokersStopped = brokerNodes.values().stream()
.allMatch(broker -> broker.getStatus() == BrokerStatus.STOPPED);

if (allBrokersStopped) {
return;
}

Thread.sleep(checkInterval);
waitedTime += checkInterval;

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BrokerStopException("等待所有Broker停止被中断", e);
}
}

throw new BrokerStopException("等待所有Broker停止超时");
}

/**
* 验证集群状态
*/
private void validateClusterState() {
try {
// 检查集群是否形成
if (brokerNodes.size() < 3) {
log.warn("集群节点数量不足,建议至少3个节点");
}

// 检查所有Broker是否健康
for (BrokerNode brokerNode : brokerNodes.values()) {
if (brokerNode.getHealthStatus() == null || !brokerNode.getHealthStatus().isHealthy()) {
throw new ClusterValidationException("集群状态验证失败,Broker不健康: " + brokerNode.getBrokerId());
}
}

log.info("集群状态验证通过");

} catch (Exception e) {
log.error("集群状态验证失败", e);
throw new ClusterValidationException("集群状态验证失败", e);
}
}
}

2.2 Broker配置管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/**
* Kafka Broker配置服务
*/
@Service
public class BrokerConfigService {

@Autowired
private ConfigurationRepository configRepository;

/**
* 获取所有Broker配置
*/
public List<BrokerConfig> getAllBrokerConfigs() {
return configRepository.findAllBrokerConfigs();
}

/**
* 获取Broker配置
*/
public BrokerConfig getBrokerConfig(String brokerId) {
return configRepository.findBrokerConfigById(brokerId)
.orElseThrow(() -> new ConfigNotFoundException("Broker配置不存在: " + brokerId));
}

/**
* 保存Broker配置
*/
public void saveBrokerConfig(BrokerConfig config) {
try {
// 验证配置
validateBrokerConfig(config);

// 保存配置
configRepository.saveBrokerConfig(config);

log.info("Broker配置保存成功: {}", config.getBrokerId());

} catch (Exception e) {
log.error("Broker配置保存失败", e);
throw new ConfigSaveException("Broker配置保存失败", e);
}
}

/**
* 更新Broker配置
*/
public void updateBrokerConfig(String brokerId, BrokerConfig config) {
try {
// 检查配置是否存在
if (!configRepository.existsBrokerConfig(brokerId)) {
throw new ConfigNotFoundException("Broker配置不存在: " + brokerId);
}

// 验证配置
validateBrokerConfig(config);

// 更新配置
config.setBrokerId(brokerId);
configRepository.saveBrokerConfig(config);

log.info("Broker配置更新成功: {}", brokerId);

} catch (Exception e) {
log.error("Broker配置更新失败", e);
throw new ConfigUpdateException("Broker配置更新失败", e);
}
}

/**
* 删除Broker配置
*/
public void deleteBrokerConfig(String brokerId) {
try {
if (!configRepository.existsBrokerConfig(brokerId)) {
throw new ConfigNotFoundException("Broker配置不存在: " + brokerId);
}

configRepository.deleteBrokerConfig(brokerId);

log.info("Broker配置删除成功: {}", brokerId);

} catch (Exception e) {
log.error("Broker配置删除失败", e);
throw new ConfigDeleteException("Broker配置删除失败", e);
}
}

/**
* 生成Broker配置文件
*/
public String generateBrokerConfigFile(BrokerConfig config) {
StringBuilder configContent = new StringBuilder();

// 基础配置
configContent.append("# Kafka Broker Configuration\n");
configContent.append("broker.id=").append(config.getBrokerId()).append("\n");
configContent.append("listeners=PLAINTEXT://").append(config.getHost()).append(":").append(config.getPort()).append("\n");
configContent.append("advertised.listeners=PLAINTEXT://").append(config.getHost()).append(":").append(config.getPort()).append("\n");

// 日志配置
configContent.append("log.dirs=").append(config.getLogDir()).append("\n");
configContent.append("num.network.threads=").append(config.getNumNetworkThreads()).append("\n");
configContent.append("num.io.threads=").append(config.getNumIoThreads()).append("\n");
configContent.append("socket.send.buffer.bytes=").append(config.getSocketSendBufferBytes()).append("\n");
configContent.append("socket.receive.buffer.bytes=").append(config.getSocketReceiveBufferBytes()).append("\n");
configContent.append("socket.request.max.bytes=").append(config.getSocketRequestMaxBytes()).append("\n");

// 日志保留配置
configContent.append("log.retention.hours=").append(config.getLogRetentionHours()).append("\n");
configContent.append("log.retention.bytes=").append(config.getLogRetentionBytes()).append("\n");
configContent.append("log.segment.bytes=").append(config.getLogSegmentBytes()).append("\n");
configContent.append("log.retention.check.interval.ms=").append(config.getLogRetentionCheckIntervalMs()).append("\n");

// 副本配置
configContent.append("default.replication.factor=").append(config.getDefaultReplicationFactor()).append("\n");
configContent.append("min.insync.replicas=").append(config.getMinInsyncReplicas()).append("\n");

// Zookeeper配置
configContent.append("zookeeper.connect=").append(config.getZookeeperConnect()).append("\n");
configContent.append("zookeeper.connection.timeout.ms=").append(config.getZookeeperConnectionTimeoutMs()).append("\n");

// 性能配置
configContent.append("num.partitions=").append(config.getNumPartitions()).append("\n");
configContent.append("offsets.topic.replication.factor=").append(config.getOffsetsTopicReplicationFactor()).append("\n");
configContent.append("transaction.state.log.replication.factor=").append(config.getTransactionStateLogReplicationFactor()).append("\n");
configContent.append("transaction.state.log.min.isr=").append(config.getTransactionStateLogMinIsr()).append("\n");

// 安全配置
if (config.isSecurityEnabled()) {
configContent.append("security.inter.broker.protocol=").append(config.getSecurityInterBrokerProtocol()).append("\n");
configContent.append("sasl.mechanism.inter.broker.protocol=").append(config.getSaslMechanismInterBrokerProtocol()).append("\n");
configContent.append("sasl.enabled.mechanisms=").append(config.getSaslEnabledMechanisms()).append("\n");
}

return configContent.toString();
}

/**
* 验证Broker配置
*/
private void validateBrokerConfig(BrokerConfig config) {
if (config.getBrokerId() == null || config.getBrokerId().isEmpty()) {
throw new ConfigValidationException("Broker ID不能为空");
}

if (config.getHost() == null || config.getHost().isEmpty()) {
throw new ConfigValidationException("Broker主机地址不能为空");
}

if (config.getPort() <= 0 || config.getPort() > 65535) {
throw new ConfigValidationException("Broker端口必须在1-65535之间");
}

if (config.getLogDir() == null || config.getLogDir().isEmpty()) {
throw new ConfigValidationException("日志目录不能为空");
}

if (config.getZookeeperConnect() == null || config.getZookeeperConnect().isEmpty()) {
throw new ConfigValidationException("Zookeeper连接地址不能为空");
}

if (config.getDefaultReplicationFactor() < 1) {
throw new ConfigValidationException("默认副本因子必须大于0");
}

if (config.getMinInsyncReplicas() < 1) {
throw new ConfigValidationException("最小同步副本数必须大于0");
}

if (config.getMinInsyncReplicas() > config.getDefaultReplicationFactor()) {
throw new ConfigValidationException("最小同步副本数不能大于默认副本因子");
}
}
}

2.3 Broker部署服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
/**
* Kafka Broker部署服务
*/
@Service
public class BrokerDeploymentService {

@Autowired
private FileSystemService fileSystemService;

@Autowired
private ProcessManager processManager;

@Autowired
private NetworkService networkService;

/**
* 部署Broker
*/
public void deployBroker(BrokerNode brokerNode) {
try {
// 1. 创建Broker目录
createBrokerDirectories(brokerNode);

// 2. 生成配置文件
generateBrokerConfigFile(brokerNode);

// 3. 创建启动脚本
createBrokerStartScript(brokerNode);

// 4. 创建停止脚本
createBrokerStopScript(brokerNode);

// 5. 设置文件权限
setBrokerFilePermissions(brokerNode);

log.info("Broker部署完成: {}", brokerNode.getBrokerId());

} catch (Exception e) {
log.error("Broker部署失败: {}", brokerNode.getBrokerId(), e);
throw new BrokerDeploymentException("Broker部署失败", e);
}
}

/**
* 启动Broker进程
*/
public void startBrokerProcess(BrokerNode brokerNode) {
try {
// 1. 检查端口是否可用
if (!networkService.isPortAvailable(brokerNode.getHost(), brokerNode.getPort())) {
throw new BrokerStartException("端口不可用: " + brokerNode.getPort());
}

// 2. 启动Broker进程
Process process = processManager.startProcess(
brokerNode.getStartScriptPath(),
brokerNode.getConfigFilePath(),
brokerNode.getLogFilePath()
);

// 3. 设置进程信息
brokerNode.setProcess(process);
brokerNode.setPid(process.pid());

log.info("Broker进程启动成功: {}, PID: {}", brokerNode.getBrokerId(), process.pid());

} catch (Exception e) {
log.error("Broker进程启动失败: {}", brokerNode.getBrokerId(), e);
throw new BrokerStartException("Broker进程启动失败", e);
}
}

/**
* 停止Broker进程
*/
public void stopBrokerProcess(BrokerNode brokerNode) {
try {
if (brokerNode.getProcess() != null && brokerNode.getProcess().isAlive()) {
// 1. 优雅关闭进程
brokerNode.getProcess().destroy();

// 2. 等待进程结束
boolean terminated = brokerNode.getProcess().waitFor(30, TimeUnit.SECONDS);

if (!terminated) {
// 3. 强制关闭进程
brokerNode.getProcess().destroyForcibly();
brokerNode.getProcess().waitFor(10, TimeUnit.SECONDS);
}

log.info("Broker进程停止成功: {}", brokerNode.getBrokerId());
}

} catch (Exception e) {
log.error("Broker进程停止失败: {}", brokerNode.getBrokerId(), e);
throw new BrokerStopException("Broker进程停止失败", e);
}
}

/**
* 清理Broker资源
*/
public void cleanupBrokerResources(BrokerNode brokerNode) {
try {
// 1. 清理临时文件
fileSystemService.cleanupTempFiles(brokerNode.getTempDir());

// 2. 清理日志文件
fileSystemService.cleanupLogFiles(brokerNode.getLogDir());

// 3. 清理配置文件
fileSystemService.cleanupConfigFiles(brokerNode.getConfigDir());

log.info("Broker资源清理完成: {}", brokerNode.getBrokerId());

} catch (Exception e) {
log.error("Broker资源清理失败: {}", brokerNode.getBrokerId(), e);
}
}

/**
* 创建Broker目录
*/
private void createBrokerDirectories(BrokerNode brokerNode) {
try {
// 创建基础目录
fileSystemService.createDirectory(brokerNode.getBaseDir());
fileSystemService.createDirectory(brokerNode.getConfigDir());
fileSystemService.createDirectory(brokerNode.getLogDir());
fileSystemService.createDirectory(brokerNode.getTempDir());
fileSystemService.createDirectory(brokerNode.getScriptDir());

// 创建日志子目录
fileSystemService.createDirectory(brokerNode.getLogDir() + "/kafka-logs");
fileSystemService.createDirectory(brokerNode.getLogDir() + "/kafka-logs-0");
fileSystemService.createDirectory(brokerNode.getLogDir() + "/kafka-logs-1");
fileSystemService.createDirectory(brokerNode.getLogDir() + "/kafka-logs-2");

} catch (Exception e) {
log.error("创建Broker目录失败: {}", brokerNode.getBrokerId(), e);
throw new BrokerDeploymentException("创建Broker目录失败", e);
}
}

/**
* 生成Broker配置文件
*/
private void generateBrokerConfigFile(BrokerNode brokerNode) {
try {
BrokerConfig config = brokerNode.getConfig();
String configContent = generateBrokerConfigContent(config);

String configFilePath = brokerNode.getConfigDir() + "/server.properties";
fileSystemService.writeFile(configFilePath, configContent);

brokerNode.setConfigFilePath(configFilePath);

} catch (Exception e) {
log.error("生成Broker配置文件失败: {}", brokerNode.getBrokerId(), e);
throw new BrokerDeploymentException("生成Broker配置文件失败", e);
}
}

/**
* 创建Broker启动脚本
*/
private void createBrokerStartScript(BrokerNode brokerNode) {
try {
String startScriptContent = generateStartScriptContent(brokerNode);

String startScriptPath = brokerNode.getScriptDir() + "/start-kafka.sh";
fileSystemService.writeFile(startScriptPath, startScriptContent);

brokerNode.setStartScriptPath(startScriptPath);

} catch (Exception e) {
log.error("创建Broker启动脚本失败: {}", brokerNode.getBrokerId(), e);
throw new BrokerDeploymentException("创建Broker启动脚本失败", e);
}
}

/**
* 创建Broker停止脚本
*/
private void createBrokerStopScript(BrokerNode brokerNode) {
try {
String stopScriptContent = generateStopScriptContent(brokerNode);

String stopScriptPath = brokerNode.getScriptDir() + "/stop-kafka.sh";
fileSystemService.writeFile(stopScriptPath, stopScriptContent);

brokerNode.setStopScriptPath(stopScriptPath);

} catch (Exception e) {
log.error("创建Broker停止脚本失败: {}", brokerNode.getBrokerId(), e);
throw new BrokerDeploymentException("创建Broker停止脚本失败", e);
}
}

/**
* 设置Broker文件权限
*/
private void setBrokerFilePermissions(BrokerNode brokerNode) {
try {
// 设置脚本文件执行权限
fileSystemService.setExecutablePermission(brokerNode.getStartScriptPath());
fileSystemService.setExecutablePermission(brokerNode.getStopScriptPath());

// 设置配置文件读取权限
fileSystemService.setReadablePermission(brokerNode.getConfigFilePath());

} catch (Exception e) {
log.error("设置Broker文件权限失败: {}", brokerNode.getBrokerId(), e);
throw new BrokerDeploymentException("设置Broker文件权限失败", e);
}
}

/**
* 生成Broker配置内容
*/
private String generateBrokerConfigContent(BrokerConfig config) {
StringBuilder content = new StringBuilder();

content.append("# Kafka Broker Configuration\n");
content.append("broker.id=").append(config.getBrokerId()).append("\n");
content.append("listeners=PLAINTEXT://").append(config.getHost()).append(":").append(config.getPort()).append("\n");
content.append("advertised.listeners=PLAINTEXT://").append(config.getHost()).append(":").append(config.getPort()).append("\n");
content.append("log.dirs=").append(config.getLogDir()).append("\n");
content.append("num.network.threads=").append(config.getNumNetworkThreads()).append("\n");
content.append("num.io.threads=").append(config.getNumIoThreads()).append("\n");
content.append("socket.send.buffer.bytes=").append(config.getSocketSendBufferBytes()).append("\n");
content.append("socket.receive.buffer.bytes=").append(config.getSocketReceiveBufferBytes()).append("\n");
content.append("socket.request.max.bytes=").append(config.getSocketRequestMaxBytes()).append("\n");
content.append("log.retention.hours=").append(config.getLogRetentionHours()).append("\n");
content.append("log.retention.bytes=").append(config.getLogRetentionBytes()).append("\n");
content.append("log.segment.bytes=").append(config.getLogSegmentBytes()).append("\n");
content.append("log.retention.check.interval.ms=").append(config.getLogRetentionCheckIntervalMs()).append("\n");
content.append("default.replication.factor=").append(config.getDefaultReplicationFactor()).append("\n");
content.append("min.insync.replicas=").append(config.getMinInsyncReplicas()).append("\n");
content.append("zookeeper.connect=").append(config.getZookeeperConnect()).append("\n");
content.append("zookeeper.connection.timeout.ms=").append(config.getZookeeperConnectionTimeoutMs()).append("\n");
content.append("num.partitions=").append(config.getNumPartitions()).append("\n");
content.append("offsets.topic.replication.factor=").append(config.getOffsetsTopicReplicationFactor()).append("\n");
content.append("transaction.state.log.replication.factor=").append(config.getTransactionStateLogReplicationFactor()).append("\n");
content.append("transaction.state.log.min.isr=").append(config.getTransactionStateLogMinIsr()).append("\n");

return content.toString();
}

/**
* 生成启动脚本内容
*/
private String generateStartScriptContent(BrokerNode brokerNode) {
StringBuilder content = new StringBuilder();

content.append("#!/bin/bash\n");
content.append("# Kafka Broker Start Script\n");
content.append("export KAFKA_HOME=").append(brokerNode.getKafkaHome()).append("\n");
content.append("export KAFKA_LOG_DIR=").append(brokerNode.getLogDir()).append("\n");
content.append("export KAFKA_CONFIG_DIR=").append(brokerNode.getConfigDir()).append("\n");
content.append("\n");
content.append("echo \"Starting Kafka Broker ").append(brokerNode.getBrokerId()).append("...\"\n");
content.append("nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_CONFIG_DIR/server.properties > $KAFKA_LOG_DIR/kafka.out 2>&1 &\n");
content.append("echo \"Kafka Broker ").append(brokerNode.getBrokerId()).append(" started\"\n");

return content.toString();
}

/**
* 生成停止脚本内容
*/
private String generateStopScriptContent(BrokerNode brokerNode) {
StringBuilder content = new StringBuilder();

content.append("#!/bin/bash\n");
content.append("# Kafka Broker Stop Script\n");
content.append("export KAFKA_HOME=").append(brokerNode.getKafkaHome()).append("\n");
content.append("\n");
content.append("echo \"Stopping Kafka Broker ").append(brokerNode.getBrokerId()).append("...\"\n");
content.append("$KAFKA_HOME/bin/kafka-server-stop.sh\n");
content.append("echo \"Kafka Broker ").append(brokerNode.getBrokerId()).append(" stopped\"\n");

return content.toString();
}
}

三、Topic管理

3.1 Topic管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
/**
* Kafka Topic管理器
*/
@Service
public class TopicManager {

@Autowired
private KafkaAdminClient kafkaAdminClient;

@Autowired
private TopicConfigService topicConfigService;

@Autowired
private PartitionManager partitionManager;

/**
* 创建Topic
*/
public void createTopic(TopicConfig topicConfig) {
try {
// 1. 验证Topic配置
validateTopicConfig(topicConfig);

// 2. 检查Topic是否已存在
if (topicExists(topicConfig.getTopicName())) {
throw new TopicExistsException("Topic已存在: " + topicConfig.getTopicName());
}

// 3. 创建Topic
NewTopic newTopic = new NewTopic(
topicConfig.getTopicName(),
topicConfig.getNumPartitions(),
topicConfig.getReplicationFactor()
);

// 4. 设置Topic配置
Map<String, String> configs = topicConfig.getConfigs();
if (configs != null && !configs.isEmpty()) {
newTopic.configs(configs);
}

// 5. 执行创建
CreateTopicsResult result = kafkaAdminClient.createTopics(Collections.singletonList(newTopic));

// 6. 等待创建完成
result.all().get(30, TimeUnit.SECONDS);

// 7. 保存Topic配置
topicConfigService.saveTopicConfig(topicConfig);

log.info("Topic创建成功: {}", topicConfig.getTopicName());

} catch (Exception e) {
log.error("Topic创建失败: {}", topicConfig.getTopicName(), e);
throw new TopicCreationException("Topic创建失败", e);
}
}

/**
* 删除Topic
*/
public void deleteTopic(String topicName) {
try {
// 1. 检查Topic是否存在
if (!topicExists(topicName)) {
throw new TopicNotFoundException("Topic不存在: " + topicName);
}

// 2. 删除Topic
DeleteTopicsResult result = kafkaAdminClient.deleteTopics(Collections.singletonList(topicName));

// 3. 等待删除完成
result.all().get(30, TimeUnit.SECONDS);

// 4. 删除Topic配置
topicConfigService.deleteTopicConfig(topicName);

log.info("Topic删除成功: {}", topicName);

} catch (Exception e) {
log.error("Topic删除失败: {}", topicName, e);
throw new TopicDeletionException("Topic删除失败", e);
}
}

/**
* 更新Topic配置
*/
public void updateTopicConfig(String topicName, Map<String, String> configs) {
try {
// 1. 检查Topic是否存在
if (!topicExists(topicName)) {
throw new TopicNotFoundException("Topic不存在: " + topicName);
}

// 2. 更新Topic配置
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
Config config = new Config(configs.entrySet().stream()
.map(entry -> new ConfigEntry(entry.getKey(), entry.getValue()))
.collect(Collectors.toList()));

AlterConfigsResult result = kafkaAdminClient.alterConfigs(
Collections.singletonMap(configResource, config)
);

// 3. 等待更新完成
result.all().get(30, TimeUnit.SECONDS);

// 4. 更新Topic配置
topicConfigService.updateTopicConfig(topicName, configs);

log.info("Topic配置更新成功: {}", topicName);

} catch (Exception e) {
log.error("Topic配置更新失败: {}", topicName, e);
throw new TopicConfigUpdateException("Topic配置更新失败", e);
}
}

/**
* 获取Topic信息
*/
public TopicInfo getTopicInfo(String topicName) {
try {
// 1. 检查Topic是否存在
if (!topicExists(topicName)) {
throw new TopicNotFoundException("Topic不存在: " + topicName);
}

// 2. 获取Topic详细信息
DescribeTopicsResult result = kafkaAdminClient.describeTopics(Collections.singletonList(topicName));
TopicDescription topicDescription = result.all().get().get(topicName);

// 3. 获取Topic配置
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
DescribeConfigsResult configResult = kafkaAdminClient.describeConfigs(Collections.singletonList(configResource));
Config config = configResult.all().get().get(configResource);

// 4. 构建Topic信息
TopicInfo topicInfo = new TopicInfo();
topicInfo.setTopicName(topicName);
topicInfo.setPartitions(topicDescription.partitions().size());
topicInfo.setReplicationFactor(topicDescription.partitions().get(0).replicas().size());
topicInfo.setConfigs(config.entries().stream()
.collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value)));

return topicInfo;

} catch (Exception e) {
log.error("获取Topic信息失败: {}", topicName, e);
throw new TopicInfoException("获取Topic信息失败", e);
}
}

/**
* 获取所有Topic列表
*/
public List<TopicInfo> getAllTopics() {
try {
// 1. 获取所有Topic名称
ListTopicsResult result = kafkaAdminClient.listTopics();
Set<String> topicNames = result.names().get();

// 2. 获取每个Topic的详细信息
List<TopicInfo> topicInfos = new ArrayList<>();
for (String topicName : topicNames) {
try {
TopicInfo topicInfo = getTopicInfo(topicName);
topicInfos.add(topicInfo);
} catch (Exception e) {
log.warn("获取Topic信息失败: {}", topicName, e);
}
}

return topicInfos;

} catch (Exception e) {
log.error("获取所有Topic列表失败", e);
throw new TopicListException("获取所有Topic列表失败", e);
}
}

/**
* 检查Topic是否存在
*/
public boolean topicExists(String topicName) {
try {
ListTopicsResult result = kafkaAdminClient.listTopics();
Set<String> topicNames = result.names().get();
return topicNames.contains(topicName);

} catch (Exception e) {
log.error("检查Topic是否存在失败: {}", topicName, e);
return false;
}
}

/**
* 验证Topic配置
*/
private void validateTopicConfig(TopicConfig topicConfig) {
if (topicConfig.getTopicName() == null || topicConfig.getTopicName().isEmpty()) {
throw new TopicConfigValidationException("Topic名称不能为空");
}

if (topicConfig.getNumPartitions() <= 0) {
throw new TopicConfigValidationException("分区数必须大于0");
}

if (topicConfig.getReplicationFactor() <= 0) {
throw new TopicConfigValidationException("副本因子必须大于0");
}

if (topicConfig.getReplicationFactor() > getMaxReplicationFactor()) {
throw new TopicConfigValidationException("副本因子不能大于最大副本因子");
}

// 验证Topic名称格式
if (!isValidTopicName(topicConfig.getTopicName())) {
throw new TopicConfigValidationException("Topic名称格式不正确");
}
}

/**
* 获取最大副本因子
*/
private int getMaxReplicationFactor() {
// 实现获取最大副本因子的逻辑
return 3; // 默认最大副本因子
}

/**
* 验证Topic名称格式
*/
private boolean isValidTopicName(String topicName) {
// Topic名称只能包含字母、数字、点、下划线和连字符
return topicName.matches("^[a-zA-Z0-9._-]+$");
}
}

3.2 分区管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
/**
* Kafka分区管理器
*/
@Service
public class PartitionManager {

@Autowired
private KafkaAdminClient kafkaAdminClient;

@Autowired
private BrokerManager brokerManager;

/**
* 增加Topic分区数
*/
public void increasePartitions(String topicName, int newPartitionCount) {
try {
// 1. 检查Topic是否存在
if (!topicExists(topicName)) {
throw new TopicNotFoundException("Topic不存在: " + topicName);
}

// 2. 获取当前分区数
int currentPartitionCount = getCurrentPartitionCount(topicName);

// 3. 验证新分区数
if (newPartitionCount <= currentPartitionCount) {
throw new PartitionValidationException("新分区数必须大于当前分区数");
}

// 4. 创建分区变更请求
Map<String, NewPartitions> partitionsMap = new HashMap<>();
partitionsMap.put(topicName, NewPartitions.increaseTo(newPartitionCount));

// 5. 执行分区变更
CreatePartitionsResult result = kafkaAdminClient.createPartitions(partitionsMap);

// 6. 等待变更完成
result.all().get(30, TimeUnit.SECONDS);

log.info("Topic分区数增加成功: {} -> {}", topicName, newPartitionCount);

} catch (Exception e) {
log.error("增加Topic分区数失败: {}", topicName, e);
throw new PartitionIncreaseException("增加Topic分区数失败", e);
}
}

/**
* 重新分配分区
*/
public void reassignPartitions(String topicName, Map<Integer, List<Integer>> partitionReplicas) {
try {
// 1. 检查Topic是否存在
if (!topicExists(topicName)) {
throw new TopicNotFoundException("Topic不存在: " + topicName);
}

// 2. 验证分区分配
validatePartitionReplicas(topicName, partitionReplicas);

// 3. 创建分区重分配请求
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap<>();

for (Map.Entry<Integer, List<Integer>> entry : partitionReplicas.entrySet()) {
int partition = entry.getKey();
List<Integer> replicas = entry.getValue();

TopicPartition topicPartition = new TopicPartition(topicName, partition);
NewPartitionReassignment reassignment = new NewPartitionReassignment(replicas);
reassignments.put(topicPartition, Optional.of(reassignment));
}

// 4. 执行分区重分配
AlterPartitionReassignmentsResult result = kafkaAdminClient.alterPartitionReassignments(reassignments);

// 5. 等待重分配完成
result.all().get(60, TimeUnit.SECONDS);

log.info("Topic分区重分配成功: {}", topicName);

} catch (Exception e) {
log.error("分区重分配失败: {}", topicName, e);
throw new PartitionReassignmentException("分区重分配失败", e);
}
}

/**
* 获取分区信息
*/
public List<PartitionInfo> getPartitionInfo(String topicName) {
try {
// 1. 检查Topic是否存在
if (!topicExists(topicName)) {
throw new TopicNotFoundException("Topic不存在: " + topicName);
}

// 2. 获取Topic详细信息
DescribeTopicsResult result = kafkaAdminClient.describeTopics(Collections.singletonList(topicName));
TopicDescription topicDescription = result.all().get().get(topicName);

// 3. 构建分区信息
List<PartitionInfo> partitionInfos = new ArrayList<>();
for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
PartitionInfo info = new PartitionInfo();
info.setTopicName(topicName);
info.setPartition(partitionInfo.partition());
info.setLeader(partitionInfo.leader().id());
info.setReplicas(partitionInfo.replicas().stream()
.map(Node::id)
.collect(Collectors.toList()));
info.setIsr(partitionInfo.isr().stream()
.map(Node::id)
.collect(Collectors.toList()));

partitionInfos.add(info);
}

return partitionInfos;

} catch (Exception e) {
log.error("获取分区信息失败: {}", topicName, e);
throw new PartitionInfoException("获取分区信息失败", e);
}
}

/**
* 检查分区健康状态
*/
public PartitionHealthStatus checkPartitionHealth(String topicName) {
try {
// 1. 获取分区信息
List<PartitionInfo> partitionInfos = getPartitionInfo(topicName);

// 2. 分析分区健康状态
PartitionHealthStatus healthStatus = new PartitionHealthStatus();
healthStatus.setTopicName(topicName);
healthStatus.setTotalPartitions(partitionInfos.size());

int healthyPartitions = 0;
int underReplicatedPartitions = 0;
int leaderlessPartitions = 0;

for (PartitionInfo partitionInfo : partitionInfos) {
// 检查副本数
int expectedReplicas = partitionInfo.getReplicas().size();
int actualReplicas = partitionInfo.getIsr().size();

if (actualReplicas < expectedReplicas) {
underReplicatedPartitions++;
}

// 检查Leader
if (partitionInfo.getLeader() == -1) {
leaderlessPartitions++;
}

// 检查分区是否健康
if (actualReplicas >= expectedReplicas && partitionInfo.getLeader() != -1) {
healthyPartitions++;
}
}

healthStatus.setHealthyPartitions(healthyPartitions);
healthStatus.setUnderReplicatedPartitions(underReplicatedPartitions);
healthStatus.setLeaderlessPartitions(leaderlessPartitions);

// 计算健康率
double healthRate = (double) healthyPartitions / partitionInfos.size();
healthStatus.setHealthRate(healthRate);

// 判断整体健康状态
healthStatus.setHealthy(healthRate >= 0.8); // 80%以上分区健康

return healthStatus;

} catch (Exception e) {
log.error("检查分区健康状态失败: {}", topicName, e);
throw new PartitionHealthException("检查分区健康状态失败", e);
}
}

/**
* 获取当前分区数
*/
private int getCurrentPartitionCount(String topicName) {
try {
DescribeTopicsResult result = kafkaAdminClient.describeTopics(Collections.singletonList(topicName));
TopicDescription topicDescription = result.all().get().get(topicName);
return topicDescription.partitions().size();

} catch (Exception e) {
log.error("获取当前分区数失败: {}", topicName, e);
throw new PartitionInfoException("获取当前分区数失败", e);
}
}

/**
* 验证分区副本分配
*/
private void validatePartitionReplicas(String topicName, Map<Integer, List<Integer>> partitionReplicas) {
// 1. 检查分区是否存在
List<PartitionInfo> currentPartitions = getPartitionInfo(topicName);
Set<Integer> existingPartitions = currentPartitions.stream()
.map(PartitionInfo::getPartition)
.collect(Collectors.toSet());

for (Integer partition : partitionReplicas.keySet()) {
if (!existingPartitions.contains(partition)) {
throw new PartitionValidationException("分区不存在: " + partition);
}
}

// 2. 检查副本数
for (Map.Entry<Integer, List<Integer>> entry : partitionReplicas.entrySet()) {
List<Integer> replicas = entry.getValue();

if (replicas.isEmpty()) {
throw new PartitionValidationException("分区副本不能为空: " + entry.getKey());
}

// 检查副本是否重复
Set<Integer> replicaSet = new HashSet<>(replicas);
if (replicaSet.size() != replicas.size()) {
throw new PartitionValidationException("分区副本不能重复: " + entry.getKey());
}

// 检查副本是否有效
for (Integer replica : replicas) {
if (!brokerManager.isBrokerExists(replica)) {
throw new PartitionValidationException("副本Broker不存在: " + replica);
}
}
}
}

/**
* 检查Topic是否存在
*/
private boolean topicExists(String topicName) {
try {
ListTopicsResult result = kafkaAdminClient.listTopics();
Set<String> topicNames = result.names().get();
return topicNames.contains(topicName);

} catch (Exception e) {
log.error("检查Topic是否存在失败: {}", topicName, e);
return false;
}
}
}

四、集群监控与管理

4.1 集群监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
/**
* Kafka集群监控服务
*/
@Service
public class ClusterMonitorService {

@Autowired
private KafkaAdminClient kafkaAdminClient;

@Autowired
private JMXMetricsCollector jmxMetricsCollector;

@Autowired
private ClusterHealthAnalyzer clusterHealthAnalyzer;

@Autowired
private AlertService alertService;

private final ScheduledExecutorService monitorScheduler;

public ClusterMonitorService() {
this.monitorScheduler = Executors.newScheduledThreadPool(5);
}

/**
* 启动集群监控
*/
public void startMonitoring() {
// 启动定期监控任务
monitorScheduler.scheduleAtFixedRate(
this::monitorCluster,
0,
60, // 1分钟
TimeUnit.SECONDS
);

log.info("Kafka集群监控启动成功");
}

/**
* 停止集群监控
*/
public void stopMonitoring() {
try {
monitorScheduler.shutdown();
if (!monitorScheduler.awaitTermination(30, TimeUnit.SECONDS)) {
monitorScheduler.shutdownNow();
}

log.info("Kafka集群监控停止成功");

} catch (Exception e) {
log.error("Kafka集群监控停止失败", e);
}
}

/**
* 监控集群
*/
private void monitorCluster() {
try {
// 1. 收集集群指标
ClusterMetrics metrics = collectClusterMetrics();

// 2. 分析集群健康状态
ClusterHealthStatus healthStatus = clusterHealthAnalyzer.analyzeHealth(metrics);

// 3. 检查集群状态
if (!healthStatus.isHealthy()) {
handleUnhealthyCluster(healthStatus);
}

// 4. 记录监控结果
recordMonitoringResult(metrics, healthStatus);

} catch (Exception e) {
log.error("集群监控失败", e);
}
}

/**
* 收集集群指标
*/
private ClusterMetrics collectClusterMetrics() {
ClusterMetrics metrics = new ClusterMetrics();

try {
// 1. 收集基础指标
collectBasicMetrics(metrics);

// 2. 收集性能指标
collectPerformanceMetrics(metrics);

// 3. 收集存储指标
collectStorageMetrics(metrics);

// 4. 收集JMX指标
collectJMXMetrics(metrics);

} catch (Exception e) {
log.error("集群指标收集失败", e);
}

return metrics;
}

/**
* 收集基础指标
*/
private void collectBasicMetrics(ClusterMetrics metrics) {
try {
// 获取集群信息
DescribeClusterResult result = kafkaAdminClient.describeCluster();
Cluster cluster = result.cluster().get();

metrics.setClusterId(cluster.clusterResource().clusterId());
metrics.setBrokerCount(cluster.nodes().size());
metrics.setControllerId(cluster.controller().id());

// 获取Topic信息
ListTopicsResult topicsResult = kafkaAdminClient.listTopics();
Set<String> topicNames = topicsResult.names().get();
metrics.setTopicCount(topicNames.size());

} catch (Exception e) {
log.error("基础指标收集失败", e);
}
}

/**
* 收集性能指标
*/
private void collectPerformanceMetrics(ClusterMetrics metrics) {
try {
// 收集JMX性能指标
Map<String, Object> jmxMetrics = jmxMetricsCollector.collectPerformanceMetrics();

// 消息生产速率
Double messagesPerSecond = (Double) jmxMetrics.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec");
metrics.setMessagesPerSecond(messagesPerSecond != null ? messagesPerSecond : 0.0);

// 字节生产速率
Double bytesPerSecond = (Double) jmxMetrics.get("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec");
metrics.setBytesPerSecond(bytesPerSecond != null ? bytesPerSecond : 0.0);

// 消息消费速率
Double messagesOutPerSecond = (Double) jmxMetrics.get("kafka.server:type=BrokerTopicMetrics,name=MessagesOutPerSec");
metrics.setMessagesOutPerSecond(messagesOutPerSecond != null ? messagesOutPerSecond : 0.0);

// 字节消费速率
Double bytesOutPerSecond = (Double) jmxMetrics.get("kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec");
metrics.setBytesOutPerSecond(bytesOutPerSecond != null ? bytesOutPerSecond : 0.0);

} catch (Exception e) {
log.error("性能指标收集失败", e);
}
}

/**
* 收集存储指标
*/
private void collectStorageMetrics(ClusterMetrics metrics) {
try {
// 收集JMX存储指标
Map<String, Object> jmxMetrics = jmxMetricsCollector.collectStorageMetrics();

// 日志大小
Long logSize = (Long) jmxMetrics.get("kafka.log:type=Log,name=Size");
metrics.setLogSize(logSize != null ? logSize : 0L);

// 日志数量
Long logCount = (Long) jmxMetrics.get("kafka.log:type=Log,name=LogCount");
metrics.setLogCount(logCount != null ? logCount : 0L);

// 分区数量
Long partitionCount = (Long) jmxMetrics.get("kafka.server:type=ReplicaManager,name=PartitionCount");
metrics.setPartitionCount(partitionCount != null ? partitionCount : 0L);

} catch (Exception e) {
log.error("存储指标收集失败", e);
}
}

/**
* 收集JMX指标
*/
private void collectJMXMetrics(ClusterMetrics metrics) {
try {
// 收集JMX指标
Map<String, Object> jmxMetrics = jmxMetricsCollector.collectAllMetrics();

// 设置JMX指标
metrics.setJmxMetrics(jmxMetrics);

} catch (Exception e) {
log.error("JMX指标收集失败", e);
}
}

/**
* 处理不健康集群
*/
private void handleUnhealthyCluster(ClusterHealthStatus healthStatus) {
try {
// 1. 发送集群告警
sendClusterAlert(healthStatus);

// 2. 记录集群问题
recordClusterIssue(healthStatus);

} catch (Exception e) {
log.error("不健康集群处理失败", e);
}
}

/**
* 发送集群告警
*/
private void sendClusterAlert(ClusterHealthStatus healthStatus) {
ClusterAlert alert = new ClusterAlert();
alert.setAlertType(AlertType.CLUSTER_UNHEALTHY);
alert.setSeverity(healthStatus.getSeverity());
alert.setMessage("Kafka集群状态异常");
alert.setHealthStatus(healthStatus);
alert.setTimestamp(System.currentTimeMillis());

// 发送告警
alertService.sendAlert(alert);
}

/**
* 记录监控结果
*/
private void recordMonitoringResult(ClusterMetrics metrics, ClusterHealthStatus healthStatus) {
ClusterMonitoringResult result = new ClusterMonitoringResult();
result.setTimestamp(System.currentTimeMillis());
result.setMetrics(metrics);
result.setHealthStatus(healthStatus);

// 存储监控结果
// monitoringResultStorage.store(result);
}
}

4.2 JMX指标收集器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/**
* Kafka JMX指标收集器
*/
@Component
public class JMXMetricsCollector {

@Autowired
private BrokerManager brokerManager;

private final Map<String, MBeanServerConnection> mbeanConnections;

public JMXMetricsCollector() {
this.mbeanConnections = new ConcurrentHashMap<>();
}

/**
* 收集所有JMX指标
*/
public Map<String, Object> collectAllMetrics() {
Map<String, Object> allMetrics = new HashMap<>();

try {
// 1. 收集性能指标
Map<String, Object> performanceMetrics = collectPerformanceMetrics();
allMetrics.putAll(performanceMetrics);

// 2. 收集存储指标
Map<String, Object> storageMetrics = collectStorageMetrics();
allMetrics.putAll(storageMetrics);

// 3. 收集网络指标
Map<String, Object> networkMetrics = collectNetworkMetrics();
allMetrics.putAll(networkMetrics);

// 4. 收集JVM指标
Map<String, Object> jvmMetrics = collectJVMMetrics();
allMetrics.putAll(jvmMetrics);

} catch (Exception e) {
log.error("JMX指标收集失败", e);
}

return allMetrics;
}

/**
* 收集性能指标
*/
public Map<String, Object> collectPerformanceMetrics() {
Map<String, Object> metrics = new HashMap<>();

try {
for (BrokerNode brokerNode : brokerManager.getAllBrokers()) {
MBeanServerConnection connection = getMBeanConnection(brokerNode);

// 消息生产速率
Object messagesInPerSec = connection.getAttribute(
new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec"),
"OneMinuteRate"
);
metrics.put("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", messagesInPerSec);

// 字节生产速率
Object bytesInPerSec = connection.getAttribute(
new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec"),
"OneMinuteRate"
);
metrics.put("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec", bytesInPerSec);

// 消息消费速率
Object messagesOutPerSec = connection.getAttribute(
new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesOutPerSec"),
"OneMinuteRate"
);
metrics.put("kafka.server:type=BrokerTopicMetrics,name=MessagesOutPerSec", messagesOutPerSec);

// 字节消费速率
Object bytesOutPerSec = connection.getAttribute(
new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec"),
"OneMinuteRate"
);
metrics.put("kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec", bytesOutPerSec);
}

} catch (Exception e) {
log.error("性能指标收集失败", e);
}

return metrics;
}

/**
* 收集存储指标
*/
public Map<String, Object> collectStorageMetrics() {
Map<String, Object> metrics = new HashMap<>();

try {
for (BrokerNode brokerNode : brokerManager.getAllBrokers()) {
MBeanServerConnection connection = getMBeanConnection(brokerNode);

// 日志大小
Object logSize = connection.getAttribute(
new ObjectName("kafka.log:type=Log,name=Size"),
"Value"
);
metrics.put("kafka.log:type=Log,name=Size", logSize);

// 日志数量
Object logCount = connection.getAttribute(
new ObjectName("kafka.log:type=Log,name=LogCount"),
"Value"
);
metrics.put("kafka.log:type=Log,name=LogCount", logCount);

// 分区数量
Object partitionCount = connection.getAttribute(
new ObjectName("kafka.server:type=ReplicaManager,name=PartitionCount"),
"Value"
);
metrics.put("kafka.server:type=ReplicaManager,name=PartitionCount", partitionCount);
}

} catch (Exception e) {
log.error("存储指标收集失败", e);
}

return metrics;
}

/**
* 收集网络指标
*/
public Map<String, Object> collectNetworkMetrics() {
Map<String, Object> metrics = new HashMap<>();

try {
for (BrokerNode brokerNode : brokerManager.getAllBrokers()) {
MBeanServerConnection connection = getMBeanConnection(brokerNode);

// 网络请求速率
Object requestsPerSec = connection.getAttribute(
new ObjectName("kafka.network:type=RequestMetrics,name=RequestsPerSec"),
"OneMinuteRate"
);
metrics.put("kafka.network:type=RequestMetrics,name=RequestsPerSec", requestsPerSec);

// 网络请求延迟
Object requestLatency = connection.getAttribute(
new ObjectName("kafka.network:type=RequestMetrics,name=RequestLatencyMs"),
"Mean"
);
metrics.put("kafka.network:type=RequestMetrics,name=RequestLatencyMs", requestLatency);
}

} catch (Exception e) {
log.error("网络指标收集失败", e);
}

return metrics;
}

/**
* 收集JVM指标
*/
public Map<String, Object> collectJVMMetrics() {
Map<String, Object> metrics = new HashMap<>();

try {
for (BrokerNode brokerNode : brokerManager.getAllBrokers()) {
MBeanServerConnection connection = getMBeanConnection(brokerNode);

// 堆内存使用
Object heapMemoryUsage = connection.getAttribute(
new ObjectName("java.lang:type=Memory"),
"HeapMemoryUsage"
);
metrics.put("java.lang:type=Memory", heapMemoryUsage);

// GC时间
Object gcTime = connection.getAttribute(
new ObjectName("java.lang:type=GarbageCollector,name=*"),
"CollectionTime"
);
metrics.put("java.lang:type=GarbageCollector,name=*", gcTime);

// 线程数
Object threadCount = connection.getAttribute(
new ObjectName("java.lang:type=Threading"),
"ThreadCount"
);
metrics.put("java.lang:type=Threading", threadCount);
}

} catch (Exception e) {
log.error("JVM指标收集失败", e);
}

return metrics;
}

/**
* 获取MBean连接
*/
private MBeanServerConnection getMBeanConnection(BrokerNode brokerNode) {
String brokerId = brokerNode.getBrokerId();

if (!mbeanConnections.containsKey(brokerId)) {
try {
// 创建JMX连接
JMXServiceURL jmxUrl = new JMXServiceURL(
"service:jmx:rmi:///jndi/rmi://" + brokerNode.getHost() + ":" + brokerNode.getJmxPort() + "/jmxrmi"
);

JMXConnector jmxConnector = JMXConnectorFactory.connect(jmxUrl);
MBeanServerConnection connection = jmxConnector.getMBeanServerConnection();

mbeanConnections.put(brokerId, connection);

} catch (Exception e) {
log.error("创建JMX连接失败: {}", brokerId, e);
throw new JMXConnectionException("创建JMX连接失败", e);
}
}

return mbeanConnections.get(brokerId);
}
}

五、企业级Kafka集群方案

5.1 集群配置管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/**
* Kafka集群配置管理服务
*/
@Service
public class KafkaClusterConfigService {

@Autowired
private ConfigurationRepository configRepository;

/**
* 获取集群配置
*/
public KafkaClusterConfig getClusterConfig(String clusterId) {
return configRepository.findClusterConfigById(clusterId)
.orElseThrow(() -> new ConfigNotFoundException("集群配置不存在: " + clusterId));
}

/**
* 保存集群配置
*/
public void saveClusterConfig(KafkaClusterConfig config) {
try {
// 验证配置
validateClusterConfig(config);

// 保存配置
configRepository.saveClusterConfig(config);

log.info("集群配置保存成功: {}", config.getClusterId());

} catch (Exception e) {
log.error("集群配置保存失败", e);
throw new ConfigSaveException("集群配置保存失败", e);
}
}

/**
* 更新集群配置
*/
public void updateClusterConfig(String clusterId, KafkaClusterConfig config) {
try {
// 检查配置是否存在
if (!configRepository.existsClusterConfig(clusterId)) {
throw new ConfigNotFoundException("集群配置不存在: " + clusterId);
}

// 验证配置
validateClusterConfig(config);

// 更新配置
config.setClusterId(clusterId);
configRepository.saveClusterConfig(config);

log.info("集群配置更新成功: {}", clusterId);

} catch (Exception e) {
log.error("集群配置更新失败", e);
throw new ConfigUpdateException("集群配置更新失败", e);
}
}

/**
* 删除集群配置
*/
public void deleteClusterConfig(String clusterId) {
try {
if (!configRepository.existsClusterConfig(clusterId)) {
throw new ConfigNotFoundException("集群配置不存在: " + clusterId);
}

configRepository.deleteClusterConfig(clusterId);

log.info("集群配置删除成功: {}", clusterId);

} catch (Exception e) {
log.error("集群配置删除失败", e);
throw new ConfigDeleteException("集群配置删除失败", e);
}
}

/**
* 获取所有集群配置
*/
public List<KafkaClusterConfig> getAllClusterConfigs() {
return configRepository.findAllClusterConfigs();
}

/**
* 验证集群配置
*/
private void validateClusterConfig(KafkaClusterConfig config) {
if (config.getClusterId() == null || config.getClusterId().isEmpty()) {
throw new ConfigValidationException("集群ID不能为空");
}

if (config.getClusterName() == null || config.getClusterName().isEmpty()) {
throw new ConfigValidationException("集群名称不能为空");
}

if (config.getBrokerConfigs() == null || config.getBrokerConfigs().isEmpty()) {
throw new ConfigValidationException("Broker配置不能为空");
}

if (config.getZookeeperConfig() == null) {
throw new ConfigValidationException("Zookeeper配置不能为空");
}

if (config.getBrokerConfigs().size() < 3) {
throw new ConfigValidationException("集群至少需要3个Broker节点");
}

// 验证Broker配置
for (BrokerConfig brokerConfig : config.getBrokerConfigs()) {
validateBrokerConfig(brokerConfig);
}

// 验证Zookeeper配置
validateZookeeperConfig(config.getZookeeperConfig());
}

/**
* 验证Broker配置
*/
private void validateBrokerConfig(BrokerConfig brokerConfig) {
if (brokerConfig.getBrokerId() == null || brokerConfig.getBrokerId().isEmpty()) {
throw new ConfigValidationException("Broker ID不能为空");
}

if (brokerConfig.getHost() == null || brokerConfig.getHost().isEmpty()) {
throw new ConfigValidationException("Broker主机地址不能为空");
}

if (brokerConfig.getPort() <= 0 || brokerConfig.getPort() > 65535) {
throw new ConfigValidationException("Broker端口必须在1-65535之间");
}

if (brokerConfig.getLogDir() == null || brokerConfig.getLogDir().isEmpty()) {
throw new ConfigValidationException("日志目录不能为空");
}
}

/**
* 验证Zookeeper配置
*/
private void validateZookeeperConfig(ZookeeperConfig zookeeperConfig) {
if (zookeeperConfig.getConnectString() == null || zookeeperConfig.getConnectString().isEmpty()) {
throw new ConfigValidationException("Zookeeper连接字符串不能为空");
}

if (zookeeperConfig.getSessionTimeoutMs() <= 0) {
throw new ConfigValidationException("Zookeeper会话超时时间必须大于0");
}

if (zookeeperConfig.getConnectionTimeoutMs() <= 0) {
throw new ConfigValidationException("Zookeeper连接超时时间必须大于0");
}
}
}

5.2 集群部署脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/**
* Kafka集群部署脚本生成器
*/
@Service
public class KafkaClusterDeploymentScriptGenerator {

/**
* 生成集群部署脚本
*/
public String generateClusterDeploymentScript(KafkaClusterConfig clusterConfig) {
StringBuilder script = new StringBuilder();

try {
// 1. 生成脚本头部
generateScriptHeader(script);

// 2. 生成环境变量
generateEnvironmentVariables(script, clusterConfig);

// 3. 生成Zookeeper启动脚本
generateZookeeperStartScript(script, clusterConfig.getZookeeperConfig());

// 4. 生成Kafka启动脚本
generateKafkaStartScript(script, clusterConfig);

// 5. 生成健康检查脚本
generateHealthCheckScript(script, clusterConfig);

// 6. 生成停止脚本
generateStopScript(script, clusterConfig);

// 7. 生成监控脚本
generateMonitoringScript(script, clusterConfig);

} catch (Exception e) {
log.error("集群部署脚本生成失败", e);
throw new ScriptGenerationException("集群部署脚本生成失败", e);
}

return script.toString();
}

/**
* 生成脚本头部
*/
private void generateScriptHeader(StringBuilder script) {
script.append("#!/bin/bash\n");
script.append("# Kafka Cluster Deployment Script\n");
script.append("# Generated by Kafka Cluster Manager\n");
script.append("# Date: ").append(new Date()).append("\n");
script.append("\n");
script.append("set -e\n");
script.append("\n");
}

/**
* 生成环境变量
*/
private void generateEnvironmentVariables(StringBuilder script, KafkaClusterConfig clusterConfig) {
script.append("# Environment Variables\n");
script.append("export KAFKA_HOME=").append(clusterConfig.getKafkaHome()).append("\n");
script.append("export ZOOKEEPER_HOME=").append(clusterConfig.getZookeeperConfig().getZookeeperHome()).append("\n");
script.append("export CLUSTER_ID=").append(clusterConfig.getClusterId()).append("\n");
script.append("export CLUSTER_NAME=").append(clusterConfig.getClusterName()).append("\n");
script.append("\n");
}

/**
* 生成Zookeeper启动脚本
*/
private void generateZookeeperStartScript(StringBuilder script, ZookeeperConfig zookeeperConfig) {
script.append("# Start Zookeeper\n");
script.append("start_zookeeper() {\n");
script.append(" echo \"Starting Zookeeper...\"\n");
script.append(" nohup $ZOOKEEPER_HOME/bin/zkServer.sh start > /dev/null 2>&1 &\n");
script.append(" echo \"Zookeeper started\"\n");
script.append("}\n");
script.append("\n");
}

/**
* 生成Kafka启动脚本
*/
private void generateKafkaStartScript(StringBuilder script, KafkaClusterConfig clusterConfig) {
script.append("# Start Kafka Cluster\n");
script.append("start_kafka_cluster() {\n");
script.append(" echo \"Starting Kafka Cluster...\"\n");

for (BrokerConfig brokerConfig : clusterConfig.getBrokerConfigs()) {
script.append(" echo \"Starting Broker ").append(brokerConfig.getBrokerId()).append("...\"\n");
script.append(" ssh ").append(brokerConfig.getHost()).append(" \"$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties\" &\n");
}

script.append(" echo \"Kafka Cluster started\"\n");
script.append("}\n");
script.append("\n");
}

/**
* 生成健康检查脚本
*/
private void generateHealthCheckScript(StringBuilder script, KafkaClusterConfig clusterConfig) {
script.append("# Health Check\n");
script.append("health_check() {\n");
script.append(" echo \"Checking cluster health...\"\n");

for (BrokerConfig brokerConfig : clusterConfig.getBrokerConfigs()) {
script.append(" echo \"Checking Broker ").append(brokerConfig.getBrokerId()).append("...\"\n");
script.append(" ssh ").append(brokerConfig.getHost()).append(" \"$KAFKA_HOME/bin/kafka-broker-api-versions.sh --bootstrap-server ").append(brokerConfig.getHost()).append(":").append(brokerConfig.getPort()).append("\"\n");
}

script.append(" echo \"Cluster health check completed\"\n");
script.append("}\n");
script.append("\n");
}

/**
* 生成停止脚本
*/
private void generateStopScript(StringBuilder script, KafkaClusterConfig clusterConfig) {
script.append("# Stop Kafka Cluster\n");
script.append("stop_kafka_cluster() {\n");
script.append(" echo \"Stopping Kafka Cluster...\"\n");

for (BrokerConfig brokerConfig : clusterConfig.getBrokerConfigs()) {
script.append(" echo \"Stopping Broker ").append(brokerConfig.getBrokerId()).append("...\"\n");
script.append(" ssh ").append(brokerConfig.getHost()).append(" \"$KAFKA_HOME/bin/kafka-server-stop.sh\"\n");
}

script.append(" echo \"Stopping Zookeeper...\"\n");
script.append(" $ZOOKEEPER_HOME/bin/zkServer.sh stop\n");
script.append(" echo \"Kafka Cluster stopped\"\n");
script.append("}\n");
script.append("\n");
}

/**
* 生成监控脚本
*/
private void generateMonitoringScript(StringBuilder script, KafkaClusterConfig clusterConfig) {
script.append("# Monitoring\n");
script.append("monitor_cluster() {\n");
script.append(" echo \"Monitoring cluster...\"\n");
script.append(" $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server ").append(clusterConfig.getBrokerConfigs().get(0).getHost()).append(":").append(clusterConfig.getBrokerConfigs().get(0).getPort()).append("\n");
script.append(" $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server ").append(clusterConfig.getBrokerConfigs().get(0).getHost()).append(":").append(clusterConfig.getBrokerConfigs().get(0).getPort()).append(" --list\n");
script.append("}\n");
script.append("\n");
}
}

六、最佳实践与总结

6.1 Kafka集群搭建最佳实践

  1. 集群规划策略

    • 至少3个Broker节点确保高可用
    • 合理规划Topic分区和副本
    • 考虑网络拓扑和机架感知
  2. 性能优化策略

    • 优化JVM参数配置
    • 调整网络和存储参数
    • 合理设置日志保留策略
  3. 监控告警体系

    • 建立完善的监控指标
    • 设置合理的告警阈值
    • 实现自动化运维
  4. 安全认证机制

    • 启用SASL认证
    • 配置SSL加密
    • 设置ACL权限控制

6.2 架构师级Kafka运维技能

  1. 集群管理能力

    • 深入理解Kafka架构
    • 掌握集群扩展和收缩
    • 管理Topic和分区
  2. 性能调优能力

    • 优化生产者和消费者性能
    • 调整Broker参数
    • 优化网络和存储
  3. 故障处理能力

    • 快速定位问题
    • 制定修复方案
    • 预防潜在问题
  4. 监控运维能力

    • 建立监控体系
    • 实现自动化运维
    • 持续优化改进

6.3 持续改进建议

  1. 集群优化

    • 持续优化集群配置
    • 改进监控策略
    • 提升运维效率
  2. 自动化程度提升

    • 实现更多自动化操作
    • 优化部署流程
    • 提升运维效率
  3. 知识积累

    • 建立运维知识库
    • 总结最佳实践
    • 形成标准化流程

总结

Kafka集群搭建是企业级消息中间件的核心能力,通过高可用的集群部署策略、完善的性能调优机制和系统化的监控管理,能够构建稳定可靠的消息队列系统,保障企业级应用的高并发处理能力。本文从集群架构设计到部署实施,从基础原理到企业级实践,系统梳理了Kafka集群搭建的完整解决方案。

关键要点:

  1. 集群架构设计:高可用的集群架构和组件设计
  2. 部署实施策略:Broker管理、配置管理、部署服务
  3. Topic管理方案:Topic创建、分区管理、健康检查
  4. 监控管理体系:集群监控、JMX指标收集、告警机制
  5. 企业级实践:配置管理、部署脚本、持续改进

通过深入理解这些技术要点,架构师能够设计出完善的Kafka集群系统,提升消息队列的稳定性和可靠性,确保企业级应用的高可用性。