第298集RocketMQ集群搭建架构实战:分布式消息、高可用部署与企业级消息中间件解决方案
|字数总计:9.8k|阅读时长:49分钟|阅读量:
前言
RocketMQ集群搭建作为企业级分布式消息中间件的核心能力之一,直接影响着系统的消息处理能力和数据可靠性。通过高可用的集群部署策略,完善的性能调优机制,能够构建稳定可靠的分布式消息队列系统,保障企业级应用的高并发处理能力。本文从集群架构设计到部署实施,从基础原理到企业级实践,系统梳理RocketMQ集群搭建的完整解决方案。
一、RocketMQ集群架构设计
1.1 集群整体架构
1.2 集群核心组件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
@Component public class RocketMQClusterManager {
@Autowired private NameServerManager nameServerManager;
@Autowired private BrokerManager brokerManager;
@Autowired private TopicManager topicManager;
@Autowired private ClusterMonitorService clusterMonitorService;
@Autowired private SecurityManager securityManager;
public void initializeCluster() { try { startNameServerCluster();
startBrokerCluster();
createSystemTopics();
configureSecurity();
startMonitoringService();
log.info("RocketMQ集群初始化完成");
} catch (Exception e) { log.error("RocketMQ集群初始化失败", e); throw new ClusterInitializationException("集群初始化失败", e); } }
public void startCluster() { try { checkClusterStatus();
nameServerManager.startAllNameServers();
brokerManager.startAllBrokers();
validateClusterHealth();
clusterMonitorService.startMonitoring();
log.info("RocketMQ集群启动成功");
} catch (Exception e) { log.error("RocketMQ集群启动失败", e); throw new ClusterStartException("集群启动失败", e); } }
public void stopCluster() { try { clusterMonitorService.stopMonitoring();
brokerManager.stopAllBrokers();
nameServerManager.stopAllNameServers();
log.info("RocketMQ集群停止成功");
} catch (Exception e) { log.error("RocketMQ集群停止失败", e); } }
private void startNameServerCluster() { log.info("启动NameServer集群"); }
private void startBrokerCluster() { log.info("启动Broker集群"); }
private void createSystemTopics() { log.info("创建系统Topic"); }
private void configureSecurity() { log.info("配置安全认证"); }
private void startMonitoringService() { log.info("启动监控服务"); } }
|
二、NameServer集群管理
2.1 NameServer管理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
|
@Service public class NameServerManager {
@Autowired private NameServerConfigService nameServerConfigService;
@Autowired private NameServerDeploymentService nameServerDeploymentService;
@Autowired private NameServerHealthChecker nameServerHealthChecker;
private final Map<String, NameServerNode> nameServerNodes;
public NameServerManager() { this.nameServerNodes = new ConcurrentHashMap<>(); }
public void startAllNameServers() { try { List<NameServerConfig> nameServerConfigs = nameServerConfigService.getAllNameServerConfigs();
for (NameServerConfig config : nameServerConfigs) { startNameServer(config); }
waitForNameServersToStart();
validateNameServerClusterState();
log.info("所有NameServer启动完成,数量: {}", nameServerConfigs.size());
} catch (Exception e) { log.error("NameServer启动失败", e); throw new NameServerStartException("NameServer启动失败", e); } }
public void stopAllNameServers() { try { for (NameServerNode nameServerNode : nameServerNodes.values()) { stopNameServer(nameServerNode); }
waitForNameServersToStop();
log.info("所有NameServer停止完成");
} catch (Exception e) { log.error("NameServer停止失败", e); } }
private void startNameServer(NameServerConfig config) { try { NameServerNode nameServerNode = createNameServerNode(config);
nameServerDeploymentService.deployNameServer(nameServerNode);
nameServerDeploymentService.startNameServerProcess(nameServerNode);
waitForNameServerToStart(nameServerNode);
validateNameServerHealth(nameServerNode);
nameServerNodes.put(nameServerNode.getNameServerId(), nameServerNode);
log.info("NameServer启动成功: {}", nameServerNode.getNameServerId());
} catch (Exception e) { log.error("NameServer启动失败: {}", config.getNameServerId(), e); throw new NameServerStartException("NameServer启动失败", e); } }
private void stopNameServer(NameServerNode nameServerNode) { try { nameServerDeploymentService.stopNameServerProcess(nameServerNode);
waitForNameServerToStop(nameServerNode);
nameServerDeploymentService.cleanupNameServerResources(nameServerNode);
nameServerNodes.remove(nameServerNode.getNameServerId());
log.info("NameServer停止成功: {}", nameServerNode.getNameServerId());
} catch (Exception e) { log.error("NameServer停止失败: {}", nameServerNode.getNameServerId(), e); } }
private NameServerNode createNameServerNode(NameServerConfig config) { NameServerNode nameServerNode = new NameServerNode(); nameServerNode.setNameServerId(config.getNameServerId()); nameServerNode.setHost(config.getHost()); nameServerNode.setPort(config.getPort()); nameServerNode.setConfig(config); nameServerNode.setStatus(NameServerStatus.STARTING); nameServerNode.setStartTime(System.currentTimeMillis());
return nameServerNode; }
private void waitForNameServerToStart(NameServerNode nameServerNode) { int maxWaitTime = 30000; int checkInterval = 1000; int waitedTime = 0;
while (waitedTime < maxWaitTime) { try { if (nameServerHealthChecker.isNameServerHealthy(nameServerNode)) { nameServerNode.setStatus(NameServerStatus.RUNNING); return; }
Thread.sleep(checkInterval); waitedTime += checkInterval;
} catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new NameServerStartException("等待NameServer启动被中断", e); } }
throw new NameServerStartException("NameServer启动超时: " + nameServerNode.getNameServerId()); }
private void waitForNameServerToStop(NameServerNode nameServerNode) { int maxWaitTime = 30000; int checkInterval = 1000; int waitedTime = 0;
while (waitedTime < maxWaitTime) { try { if (!nameServerHealthChecker.isNameServerRunning(nameServerNode)) { nameServerNode.setStatus(NameServerStatus.STOPPED); return; }
Thread.sleep(checkInterval); waitedTime += checkInterval;
} catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new NameServerStopException("等待NameServer停止被中断", e); } }
throw new NameServerStopException("NameServer停止超时: " + nameServerNode.getNameServerId()); }
private void validateNameServerHealth(NameServerNode nameServerNode) { try { NameServerHealthStatus healthStatus = nameServerHealthChecker.checkNameServerHealth(nameServerNode);
if (!healthStatus.isHealthy()) { throw new NameServerHealthException("NameServer健康检查失败: " + nameServerNode.getNameServerId()); }
nameServerNode.setHealthStatus(healthStatus);
} catch (Exception e) { log.error("NameServer健康检查失败: {}", nameServerNode.getNameServerId(), e); throw new NameServerHealthException("NameServer健康检查失败", e); } }
private void waitForNameServersToStart() { int maxWaitTime = 120000; int checkInterval = 5000; int waitedTime = 0;
while (waitedTime < maxWaitTime) { try { boolean allNameServersRunning = nameServerNodes.values().stream() .allMatch(nameServer -> nameServer.getStatus() == NameServerStatus.RUNNING);
if (allNameServersRunning) { return; }
Thread.sleep(checkInterval); waitedTime += checkInterval;
} catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new NameServerStartException("等待所有NameServer启动被中断", e); } }
throw new NameServerStartException("等待所有NameServer启动超时"); }
private void waitForNameServersToStop() { int maxWaitTime = 120000; int checkInterval = 5000; int waitedTime = 0;
while (waitedTime < maxWaitTime) { try { boolean allNameServersStopped = nameServerNodes.values().stream() .allMatch(nameServer -> nameServer.getStatus() == NameServerStatus.STOPPED);
if (allNameServersStopped) { return; }
Thread.sleep(checkInterval); waitedTime += checkInterval;
} catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new NameServerStopException("等待所有NameServer停止被中断", e); } }
throw new NameServerStopException("等待所有NameServer停止超时"); }
private void validateNameServerClusterState() { try { if (nameServerNodes.size() < 2) { log.warn("NameServer集群节点数量不足,建议至少2个节点"); }
for (NameServerNode nameServerNode : nameServerNodes.values()) { if (nameServerNode.getHealthStatus() == null || !nameServerNode.getHealthStatus().isHealthy()) { throw new NameServerClusterValidationException("NameServer集群状态验证失败,NameServer不健康: " + nameServerNode.getNameServerId()); } }
log.info("NameServer集群状态验证通过");
} catch (Exception e) { log.error("NameServer集群状态验证失败", e); throw new NameServerClusterValidationException("NameServer集群状态验证失败", e); } } }
|
2.2 NameServer配置管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
@Service public class NameServerConfigService {
@Autowired private ConfigurationRepository configRepository;
public List<NameServerConfig> getAllNameServerConfigs() { return configRepository.findAllNameServerConfigs(); }
public NameServerConfig getNameServerConfig(String nameServerId) { return configRepository.findNameServerConfigById(nameServerId) .orElseThrow(() -> new ConfigNotFoundException("NameServer配置不存在: " + nameServerId)); }
public void saveNameServerConfig(NameServerConfig config) { try { validateNameServerConfig(config);
configRepository.saveNameServerConfig(config);
log.info("NameServer配置保存成功: {}", config.getNameServerId());
} catch (Exception e) { log.error("NameServer配置保存失败", e); throw new ConfigSaveException("NameServer配置保存失败", e); } }
public void updateNameServerConfig(String nameServerId, NameServerConfig config) { try { if (!configRepository.existsNameServerConfig(nameServerId)) { throw new ConfigNotFoundException("NameServer配置不存在: " + nameServerId); }
validateNameServerConfig(config);
config.setNameServerId(nameServerId); configRepository.saveNameServerConfig(config);
log.info("NameServer配置更新成功: {}", nameServerId);
} catch (Exception e) { log.error("NameServer配置更新失败", e); throw new ConfigUpdateException("NameServer配置更新失败", e); } }
public void deleteNameServerConfig(String nameServerId) { try { if (!configRepository.existsNameServerConfig(nameServerId)) { throw new ConfigNotFoundException("NameServer配置不存在: " + nameServerId); }
configRepository.deleteNameServerConfig(nameServerId);
log.info("NameServer配置删除成功: {}", nameServerId);
} catch (Exception e) { log.error("NameServer配置删除失败", e); throw new ConfigDeleteException("NameServer配置删除失败", e); } }
public String generateNameServerConfigFile(NameServerConfig config) { StringBuilder configContent = new StringBuilder();
configContent.append("# RocketMQ NameServer Configuration\n"); configContent.append("listenPort=").append(config.getPort()).append("\n"); configContent.append("serverWorkerThreads=").append(config.getServerWorkerThreads()).append("\n"); configContent.append("serverCallbackExecutorThreads=").append(config.getServerCallbackExecutorThreads()).append("\n"); configContent.append("serverSelectorThreads=").append(config.getServerSelectorThreads()).append("\n"); configContent.append("serverOnewaySemaphoreValue=").append(config.getServerOnewaySemaphoreValue()).append("\n"); configContent.append("serverAsyncSemaphoreValue=").append(config.getServerAsyncSemaphoreValue()).append("\n");
configContent.append("serverChannelMaxIdleTimeSeconds=").append(config.getServerChannelMaxIdleTimeSeconds()).append("\n"); configContent.append("serverSocketSndBufSize=").append(config.getServerSocketSndBufSize()).append("\n"); configContent.append("serverSocketRcvBufSize=").append(config.getServerSocketRcvBufSize()).append("\n"); configContent.append("serverPooledByteBufAllocatorEnable=").append(config.isServerPooledByteBufAllocatorEnable()).append("\n");
configContent.append("rocketmqHome=").append(config.getRocketmqHome()).append("\n"); configContent.append("namesrvAddr=").append(config.getNamesrvAddr()).append("\n"); configContent.append("rocketmqHome=").append(config.getRocketmqHome()).append("\n"); configContent.append("kvConfigPath=").append(config.getKvConfigPath()).append("\n"); configContent.append("configStorePath=").append(config.getConfigStorePath()).append("\n");
configContent.append("serverChannelMaxIdleTimeSeconds=").append(config.getServerChannelMaxIdleTimeSeconds()).append("\n"); configContent.append("serverSocketSndBufSize=").append(config.getServerSocketSndBufSize()).append("\n"); configContent.append("serverSocketRcvBufSize=").append(config.getServerSocketRcvBufSize()).append("\n");
return configContent.toString(); }
private void validateNameServerConfig(NameServerConfig config) { if (config.getNameServerId() == null || config.getNameServerId().isEmpty()) { throw new ConfigValidationException("NameServer ID不能为空"); }
if (config.getHost() == null || config.getHost().isEmpty()) { throw new ConfigValidationException("NameServer主机地址不能为空"); }
if (config.getPort() <= 0 || config.getPort() > 65535) { throw new ConfigValidationException("NameServer端口必须在1-65535之间"); }
if (config.getRocketmqHome() == null || config.getRocketmqHome().isEmpty()) { throw new ConfigValidationException("RocketMQ Home目录不能为空"); }
if (config.getKvConfigPath() == null || config.getKvConfigPath().isEmpty()) { throw new ConfigValidationException("KV配置路径不能为空"); }
if (config.getConfigStorePath() == null || config.getConfigStorePath().isEmpty()) { throw new ConfigValidationException("配置存储路径不能为空"); } } }
|
三、Broker集群管理
3.1 Broker管理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
|
@Service public class BrokerManager {
@Autowired private BrokerConfigService brokerConfigService;
@Autowired private BrokerDeploymentService brokerDeploymentService;
@Autowired private BrokerHealthChecker brokerHealthChecker;
private final Map<String, BrokerNode> brokerNodes;
public BrokerManager() { this.brokerNodes = new ConcurrentHashMap<>(); }
public void startAllBrokers() { try { List<BrokerConfig> brokerConfigs = brokerConfigService.getAllBrokerConfigs();
for (BrokerConfig config : brokerConfigs) { startBroker(config); }
waitForBrokersToStart();
validateBrokerClusterState();
log.info("所有Broker启动完成,数量: {}", brokerConfigs.size());
} catch (Exception e) { log.error("Broker启动失败", e); throw new BrokerStartException("Broker启动失败", e); } }
public void stopAllBrokers() { try { for (BrokerNode brokerNode : brokerNodes.values()) { stopBroker(brokerNode); }
waitForBrokersToStop();
log.info("所有Broker停止完成");
} catch (Exception e) { log.error("Broker停止失败", e); } }
private void startBroker(BrokerConfig config) { try { BrokerNode brokerNode = createBrokerNode(config);
brokerDeploymentService.deployBroker(brokerNode);
brokerDeploymentService.startBrokerProcess(brokerNode);
waitForBrokerToStart(brokerNode);
validateBrokerHealth(brokerNode);
brokerNodes.put(brokerNode.getBrokerId(), brokerNode);
log.info("Broker启动成功: {}", brokerNode.getBrokerId());
} catch (Exception e) { log.error("Broker启动失败: {}", config.getBrokerId(), e); throw new BrokerStartException("Broker启动失败", e); } }
private void stopBroker(BrokerNode brokerNode) { try { brokerDeploymentService.stopBrokerProcess(brokerNode);
waitForBrokerToStop(brokerNode);
brokerDeploymentService.cleanupBrokerResources(brokerNode);
brokerNodes.remove(brokerNode.getBrokerId());
log.info("Broker停止成功: {}", brokerNode.getBrokerId());
} catch (Exception e) { log.error("Broker停止失败: {}", brokerNode.getBrokerId(), e); } }
private BrokerNode createBrokerNode(BrokerConfig config) { BrokerNode brokerNode = new BrokerNode(); brokerNode.setBrokerId(config.getBrokerId()); brokerNode.setBrokerName(config.getBrokerName()); brokerNode.setHost(config.getHost()); brokerNode.setPort(config.getPort()); brokerNode.setBrokerRole(config.getBrokerRole()); brokerNode.setBrokerClusterName(config.getBrokerClusterName()); brokerNode.setConfig(config); brokerNode.setStatus(BrokerStatus.STARTING); brokerNode.setStartTime(System.currentTimeMillis());
return brokerNode; }
private void waitForBrokerToStart(BrokerNode brokerNode) { int maxWaitTime = 60000; int checkInterval = 2000; int waitedTime = 0;
while (waitedTime < maxWaitTime) { try { if (brokerHealthChecker.isBrokerHealthy(brokerNode)) { brokerNode.setStatus(BrokerStatus.RUNNING); return; }
Thread.sleep(checkInterval); waitedTime += checkInterval;
} catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BrokerStartException("等待Broker启动被中断", e); } }
throw new BrokerStartException("Broker启动超时: " + brokerNode.getBrokerId()); }
private void waitForBrokerToStop(BrokerNode brokerNode) { int maxWaitTime = 60000; int checkInterval = 2000; int waitedTime = 0;
while (waitedTime < maxWaitTime) { try { if (!brokerHealthChecker.isBrokerRunning(brokerNode)) { brokerNode.setStatus(BrokerStatus.STOPPED); return; }
Thread.sleep(checkInterval); waitedTime += checkInterval;
} catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BrokerStopException("等待Broker停止被中断", e); } }
throw new BrokerStopException("Broker停止超时: " + brokerNode.getBrokerId()); }
private void validateBrokerHealth(BrokerNode brokerNode) { try { BrokerHealthStatus healthStatus = brokerHealthChecker.checkBrokerHealth(brokerNode);
if (!healthStatus.isHealthy()) { throw new BrokerHealthException("Broker健康检查失败: " + brokerNode.getBrokerId()); }
brokerNode.setHealthStatus(healthStatus);
} catch (Exception e) { log.error("Broker健康检查失败: {}", brokerNode.getBrokerId(), e); throw new BrokerHealthException("Broker健康检查失败", e); } }
private void waitForBrokersToStart() { int maxWaitTime = 300000; int checkInterval = 10000; int waitedTime = 0;
while (waitedTime < maxWaitTime) { try { boolean allBrokersRunning = brokerNodes.values().stream() .allMatch(broker -> broker.getStatus() == BrokerStatus.RUNNING);
if (allBrokersRunning) { return; }
Thread.sleep(checkInterval); waitedTime += checkInterval;
} catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BrokerStartException("等待所有Broker启动被中断", e); } }
throw new BrokerStartException("等待所有Broker启动超时"); }
private void waitForBrokersToStop() { int maxWaitTime = 300000; int checkInterval = 10000; int waitedTime = 0;
while (waitedTime < maxWaitTime) { try { boolean allBrokersStopped = brokerNodes.values().stream() .allMatch(broker -> broker.getStatus() == BrokerStatus.STOPPED);
if (allBrokersStopped) { return; }
Thread.sleep(checkInterval); waitedTime += checkInterval;
} catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BrokerStopException("等待所有Broker停止被中断", e); } }
throw new BrokerStopException("等待所有Broker停止超时"); }
private void validateBrokerClusterState() { try { if (brokerNodes.size() < 2) { log.warn("Broker集群节点数量不足,建议至少2个节点"); }
for (BrokerNode brokerNode : brokerNodes.values()) { if (brokerNode.getHealthStatus() == null || !brokerNode.getHealthStatus().isHealthy()) { throw new BrokerClusterValidationException("Broker集群状态验证失败,Broker不健康: " + brokerNode.getBrokerId()); } }
log.info("Broker集群状态验证通过");
} catch (Exception e) { log.error("Broker集群状态验证失败", e); throw new BrokerClusterValidationException("Broker集群状态验证失败", e); } } }
|
3.2 Broker配置管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
|
@Service public class BrokerConfigService {
@Autowired private ConfigurationRepository configRepository;
public List<BrokerConfig> getAllBrokerConfigs() { return configRepository.findAllBrokerConfigs(); }
public BrokerConfig getBrokerConfig(String brokerId) { return configRepository.findBrokerConfigById(brokerId) .orElseThrow(() -> new ConfigNotFoundException("Broker配置不存在: " + brokerId)); }
public void saveBrokerConfig(BrokerConfig config) { try { validateBrokerConfig(config);
configRepository.saveBrokerConfig(config);
log.info("Broker配置保存成功: {}", config.getBrokerId());
} catch (Exception e) { log.error("Broker配置保存失败", e); throw new ConfigSaveException("Broker配置保存失败", e); } }
public void updateBrokerConfig(String brokerId, BrokerConfig config) { try { if (!configRepository.existsBrokerConfig(brokerId)) { throw new ConfigNotFoundException("Broker配置不存在: " + brokerId); }
validateBrokerConfig(config);
config.setBrokerId(brokerId); configRepository.saveBrokerConfig(config);
log.info("Broker配置更新成功: {}", brokerId);
} catch (Exception e) { log.error("Broker配置更新失败", e); throw new ConfigUpdateException("Broker配置更新失败", e); } }
public void deleteBrokerConfig(String brokerId) { try { if (!configRepository.existsBrokerConfig(brokerId)) { throw new ConfigNotFoundException("Broker配置不存在: " + brokerId); }
configRepository.deleteBrokerConfig(brokerId);
log.info("Broker配置删除成功: {}", brokerId);
} catch (Exception e) { log.error("Broker配置删除失败", e); throw new ConfigDeleteException("Broker配置删除失败", e); } }
public String generateBrokerConfigFile(BrokerConfig config) { StringBuilder configContent = new StringBuilder();
configContent.append("# RocketMQ Broker Configuration\n"); configContent.append("brokerClusterName=").append(config.getBrokerClusterName()).append("\n"); configContent.append("brokerName=").append(config.getBrokerName()).append("\n"); configContent.append("brokerId=").append(config.getBrokerId()).append("\n"); configContent.append("brokerRole=").append(config.getBrokerRole()).append("\n"); configContent.append("namesrvAddr=").append(config.getNamesrvAddr()).append("\n"); configContent.append("listenPort=").append(config.getListenPort()).append("\n");
configContent.append("storePathRootDir=").append(config.getStorePathRootDir()).append("\n"); configContent.append("storePathCommitLog=").append(config.getStorePathCommitLog()).append("\n"); configContent.append("storePathConsumeQueue=").append(config.getStorePathConsumeQueue()).append("\n"); configContent.append("storePathIndex=").append(config.getStorePathIndex()).append("\n"); configContent.append("storePathDLedger=").append(config.getStorePathDLedger()).append("\n");
configContent.append("mapedFileSizeCommitLog=").append(config.getMapedFileSizeCommitLog()).append("\n"); configContent.append("mapedFileSizeConsumeQueue=").append(config.getMapedFileSizeConsumeQueue()).append("\n"); configContent.append("maxTransferBytesOnMessageInMemory=").append(config.getMaxTransferBytesOnMessageInMemory()).append("\n"); configContent.append("maxTransferCountOnMessageInMemory=").append(config.getMaxTransferCountOnMessageInMemory()).append("\n"); configContent.append("maxTransferBytesOnMessageInDisk=").append(config.getMaxTransferBytesOnMessageInDisk()).append("\n"); configContent.append("maxTransferCountOnMessageInDisk=").append(config.getMaxTransferCountOnMessageInDisk()).append("\n");
configContent.append("messageDelayLevel=").append(config.getMessageDelayLevel()).append("\n"); configContent.append("messageMaxSize=").append(config.getMessageMaxSize()).append("\n"); configContent.append("messageMaxSize=").append(config.getMessageMaxSize()).append("\n");
configContent.append("flushDiskType=").append(config.getFlushDiskType()).append("\n"); configContent.append("flushCommitLogLeastPages=").append(config.getFlushCommitLogLeastPages()).append("\n"); configContent.append("flushCommitLogThoroughInterval=").append(config.getFlushCommitLogThoroughInterval()).append("\n"); configContent.append("flushConsumeQueueLeastPages=").append(config.getFlushConsumeQueueLeastPages()).append("\n"); configContent.append("flushConsumeQueueThoroughInterval=").append(config.getFlushConsumeQueueThoroughInterval()).append("\n");
configContent.append("listenPort=").append(config.getListenPort()).append("\n"); configContent.append("serverWorkerThreads=").append(config.getServerWorkerThreads()).append("\n"); configContent.append("serverCallbackExecutorThreads=").append(config.getServerCallbackExecutorThreads()).append("\n"); configContent.append("serverSelectorThreads=").append(config.getServerSelectorThreads()).append("\n"); configContent.append("serverOnewaySemaphoreValue=").append(config.getServerOnewaySemaphoreValue()).append("\n"); configContent.append("serverAsyncSemaphoreValue=").append(config.getServerAsyncSemaphoreValue()).append("\n");
configContent.append("sendMessageThreadPoolNums=").append(config.getSendMessageThreadPoolNums()).append("\n"); configContent.append("pullMessageThreadPoolNums=").append(config.getPullMessageThreadPoolNums()).append("\n"); configContent.append("queryMessageThreadPoolNums=").append(config.getQueryMessageThreadPoolNums()).append("\n"); configContent.append("adminBrokerThreadPoolNums=").append(config.getAdminBrokerThreadPoolNums()).append("\n"); configContent.append("clientManageThreadPoolNums=").append(config.getClientManageThreadPoolNums()).append("\n"); configContent.append("consumerManageThreadPoolNums=").append(config.getConsumerManageThreadPoolNums()).append("\n");
return configContent.toString(); }
private void validateBrokerConfig(BrokerConfig config) { if (config.getBrokerId() == null || config.getBrokerId().isEmpty()) { throw new ConfigValidationException("Broker ID不能为空"); }
if (config.getBrokerName() == null || config.getBrokerName().isEmpty()) { throw new ConfigValidationException("Broker名称不能为空"); }
if (config.getBrokerClusterName() == null || config.getBrokerClusterName().isEmpty()) { throw new ConfigValidationException("Broker集群名称不能为空"); }
if (config.getHost() == null || config.getHost().isEmpty()) { throw new ConfigValidationException("Broker主机地址不能为空"); }
if (config.getListenPort() <= 0 || config.getListenPort() > 65535) { throw new ConfigValidationException("Broker监听端口必须在1-65535之间"); }
if (config.getNamesrvAddr() == null || config.getNamesrvAddr().isEmpty()) { throw new ConfigValidationException("NameServer地址不能为空"); }
if (config.getStorePathRootDir() == null || config.getStorePathRootDir().isEmpty()) { throw new ConfigValidationException("存储根目录不能为空"); }
if (config.getBrokerRole() == null) { throw new ConfigValidationException("Broker角色不能为空"); }
if (!isValidBrokerRole(config.getBrokerRole())) { throw new ConfigValidationException("Broker角色无效: " + config.getBrokerRole()); } }
private boolean isValidBrokerRole(String brokerRole) { return "ASYNC_MASTER".equals(brokerRole) || "SYNC_MASTER".equals(brokerRole) || "SLAVE".equals(brokerRole); } }
|
四、Topic管理
4.1 Topic管理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
|
@Service public class TopicManager {
@Autowired private RocketMQAdminClient rocketMQAdminClient;
@Autowired private TopicConfigService topicConfigService;
@Autowired private QueueManager queueManager;
public void createTopic(TopicConfig topicConfig) { try { validateTopicConfig(topicConfig);
if (topicExists(topicConfig.getTopicName())) { throw new TopicExistsException("Topic已存在: " + topicConfig.getTopicName()); }
CreateTopicRequest request = new CreateTopicRequest(); request.setTopic(topicConfig.getTopicName()); request.setDefaultTopicQueueNums(topicConfig.getDefaultTopicQueueNums()); request.setPerm(topicConfig.getPerm()); request.setTopicFilterType(topicConfig.getTopicFilterType()); request.setTopicSysFlag(topicConfig.getTopicSysFlag()); request.setOrder(topicConfig.isOrder());
rocketMQAdminClient.createTopic(request);
topicConfigService.saveTopicConfig(topicConfig);
log.info("Topic创建成功: {}", topicConfig.getTopicName());
} catch (Exception e) { log.error("Topic创建失败: {}", topicConfig.getTopicName(), e); throw new TopicCreationException("Topic创建失败", e); } }
public void deleteTopic(String topicName) { try { if (!topicExists(topicName)) { throw new TopicNotFoundException("Topic不存在: " + topicName); }
DeleteTopicRequest request = new DeleteTopicRequest(); request.setTopic(topicName);
rocketMQAdminClient.deleteTopic(request);
topicConfigService.deleteTopicConfig(topicName);
log.info("Topic删除成功: {}", topicName);
} catch (Exception e) { log.error("Topic删除失败: {}", topicName, e); throw new TopicDeletionException("Topic删除失败", e); } }
public void updateTopicConfig(String topicName, TopicConfig topicConfig) { try { if (!topicExists(topicName)) { throw new TopicNotFoundException("Topic不存在: " + topicName); }
UpdateTopicRequest request = new UpdateTopicRequest(); request.setTopic(topicName); request.setDefaultTopicQueueNums(topicConfig.getDefaultTopicQueueNums()); request.setPerm(topicConfig.getPerm()); request.setTopicFilterType(topicConfig.getTopicFilterType()); request.setTopicSysFlag(topicConfig.getTopicSysFlag()); request.setOrder(topicConfig.isOrder());
rocketMQAdminClient.updateTopic(request);
topicConfigService.updateTopicConfig(topicName, topicConfig);
log.info("Topic配置更新成功: {}", topicName);
} catch (Exception e) { log.error("Topic配置更新失败: {}", topicName, e); throw new TopicConfigUpdateException("Topic配置更新失败", e); } }
public TopicInfo getTopicInfo(String topicName) { try { if (!topicExists(topicName)) { throw new TopicNotFoundException("Topic不存在: " + topicName); }
TopicRouteInfo topicRouteInfo = rocketMQAdminClient.getTopicRouteInfo(topicName);
TopicInfo topicInfo = new TopicInfo(); topicInfo.setTopicName(topicName); topicInfo.setQueueDatas(topicRouteInfo.getQueueDatas()); topicInfo.setBrokerDatas(topicRouteInfo.getBrokerDatas()); topicInfo.setFilterServerTable(topicRouteInfo.getFilterServerTable());
return topicInfo;
} catch (Exception e) { log.error("获取Topic信息失败: {}", topicName, e); throw new TopicInfoException("获取Topic信息失败", e); } }
public List<TopicInfo> getAllTopics() { try { List<String> topicNames = rocketMQAdminClient.getAllTopicNames();
List<TopicInfo> topicInfos = new ArrayList<>(); for (String topicName : topicNames) { try { TopicInfo topicInfo = getTopicInfo(topicName); topicInfos.add(topicInfo); } catch (Exception e) { log.warn("获取Topic信息失败: {}", topicName, e); } }
return topicInfos;
} catch (Exception e) { log.error("获取所有Topic列表失败", e); throw new TopicListException("获取所有Topic列表失败", e); } }
public boolean topicExists(String topicName) { try { List<String> topicNames = rocketMQAdminClient.getAllTopicNames(); return topicNames.contains(topicName);
} catch (Exception e) { log.error("检查Topic是否存在失败: {}", topicName, e); return false; } }
private void validateTopicConfig(TopicConfig topicConfig) { if (topicConfig.getTopicName() == null || topicConfig.getTopicName().isEmpty()) { throw new TopicConfigValidationException("Topic名称不能为空"); }
if (topicConfig.getDefaultTopicQueueNums() <= 0) { throw new TopicConfigValidationException("默认队列数必须大于0"); }
if (topicConfig.getPerm() == null) { throw new TopicConfigValidationException("Topic权限不能为空"); }
if (!isValidTopicName(topicConfig.getTopicName())) { throw new TopicConfigValidationException("Topic名称格式不正确"); }
if (!isValidPerm(topicConfig.getPerm())) { throw new TopicConfigValidationException("Topic权限无效: " + topicConfig.getPerm()); } }
private boolean isValidTopicName(String topicName) { return topicName.matches("^[a-zA-Z0-9._-]+$"); }
private boolean isValidPerm(int perm) { return perm == 2 || perm == 4 || perm == 6; } }
|
4.2 队列管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
|
@Service public class QueueManager {
@Autowired private RocketMQAdminClient rocketMQAdminClient;
@Autowired private BrokerManager brokerManager;
public void increaseQueueNums(String topicName, int newQueueNums) { try { if (!topicExists(topicName)) { throw new TopicNotFoundException("Topic不存在: " + topicName); }
int currentQueueNums = getCurrentQueueNums(topicName);
if (newQueueNums <= currentQueueNums) { throw new QueueValidationException("新队列数必须大于当前队列数"); }
UpdateTopicRequest request = new UpdateTopicRequest(); request.setTopic(topicName); request.setDefaultTopicQueueNums(newQueueNums);
rocketMQAdminClient.updateTopic(request);
log.info("Topic队列数增加成功: {} -> {}", topicName, newQueueNums);
} catch (Exception e) { log.error("增加Topic队列数失败: {}", topicName, e); throw new QueueIncreaseException("增加Topic队列数失败", e); } }
public void reassignQueues(String topicName, Map<String, List<Integer>> brokerQueues) { try { if (!topicExists(topicName)) { throw new TopicNotFoundException("Topic不存在: " + topicName); }
validateQueueAssignment(topicName, brokerQueues);
ReassignQueueRequest request = new ReassignQueueRequest(); request.setTopic(topicName); request.setBrokerQueues(brokerQueues);
rocketMQAdminClient.reassignQueue(request);
log.info("Topic队列重分配成功: {}", topicName);
} catch (Exception e) { log.error("队列重分配失败: {}", topicName, e); throw new QueueReassignmentException("队列重分配失败", e); } }
public List<QueueInfo> getQueueInfo(String topicName) { try { if (!topicExists(topicName)) { throw new TopicNotFoundException("Topic不存在: " + topicName); }
TopicRouteInfo topicRouteInfo = rocketMQAdminClient.getTopicRouteInfo(topicName);
List<QueueInfo> queueInfos = new ArrayList<>(); for (QueueData queueData : topicRouteInfo.getQueueDatas()) { QueueInfo queueInfo = new QueueInfo(); queueInfo.setTopicName(topicName); queueInfo.setBrokerName(queueData.getBrokerName()); queueInfo.setReadQueueNums(queueData.getReadQueueNums()); queueInfo.setWriteQueueNums(queueData.getWriteQueueNums()); queueInfo.setPerm(queueData.getPerm()); queueInfo.setTopicSysFlag(queueData.getTopicSysFlag());
queueInfos.add(queueInfo); }
return queueInfos;
} catch (Exception e) { log.error("获取队列信息失败: {}", topicName, e); throw new QueueInfoException("获取队列信息失败", e); } }
public QueueHealthStatus checkQueueHealth(String topicName) { try { List<QueueInfo> queueInfos = getQueueInfo(topicName);
QueueHealthStatus healthStatus = new QueueHealthStatus(); healthStatus.setTopicName(topicName); healthStatus.setTotalQueues(queueInfos.size());
int healthyQueues = 0; int unbalancedQueues = 0; int readonlyQueues = 0;
for (QueueInfo queueInfo : queueInfos) { int readQueueNums = queueInfo.getReadQueueNums(); int writeQueueNums = queueInfo.getWriteQueueNums();
if (readQueueNums != writeQueueNums) { unbalancedQueues++; }
int perm = queueInfo.getPerm(); if (perm == 2) { readonlyQueues++; }
if (readQueueNums == writeQueueNums && perm == 6) { healthyQueues++; } }
healthStatus.setHealthyQueues(healthyQueues); healthStatus.setUnbalancedQueues(unbalancedQueues); healthStatus.setReadonlyQueues(readonlyQueues);
double healthRate = (double) healthyQueues / queueInfos.size(); healthStatus.setHealthRate(healthRate);
healthStatus.setHealthy(healthRate >= 0.8);
return healthStatus;
} catch (Exception e) { log.error("检查队列健康状态失败: {}", topicName, e); throw new QueueHealthException("检查队列健康状态失败", e); } }
private int getCurrentQueueNums(String topicName) { try { TopicRouteInfo topicRouteInfo = rocketMQAdminClient.getTopicRouteInfo(topicName); return topicRouteInfo.getQueueDatas().stream() .mapToInt(QueueData::getWriteQueueNums) .sum();
} catch (Exception e) { log.error("获取当前队列数失败: {}", topicName, e); throw new QueueInfoException("获取当前队列数失败", e); } }
private void validateQueueAssignment(String topicName, Map<String, List<Integer>> brokerQueues) { for (String brokerName : brokerQueues.keySet()) { if (!brokerManager.isBrokerExists(brokerName)) { throw new QueueValidationException("Broker不存在: " + brokerName); } }
for (Map.Entry<String, List<Integer>> entry : brokerQueues.entrySet()) { List<Integer> queues = entry.getValue();
if (queues.isEmpty()) { throw new QueueValidationException("Broker队列不能为空: " + entry.getKey()); }
Set<Integer> queueSet = new HashSet<>(queues); if (queueSet.size() != queues.size()) { throw new QueueValidationException("Broker队列不能重复: " + entry.getKey()); } } }
private boolean topicExists(String topicName) { try { List<String> topicNames = rocketMQAdminClient.getAllTopicNames(); return topicNames.contains(topicName);
} catch (Exception e) { log.error("检查Topic是否存在失败: {}", topicName, e); return false; } } }
|
五、集群监控与管理
5.1 集群监控服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
|
@Service public class ClusterMonitorService {
@Autowired private RocketMQAdminClient rocketMQAdminClient;
@Autowired private JMXMetricsCollector jmxMetricsCollector;
@Autowired private ClusterHealthAnalyzer clusterHealthAnalyzer;
@Autowired private AlertService alertService;
private final ScheduledExecutorService monitorScheduler;
public ClusterMonitorService() { this.monitorScheduler = Executors.newScheduledThreadPool(5); }
public void startMonitoring() { monitorScheduler.scheduleAtFixedRate( this::monitorCluster, 0, 60, TimeUnit.SECONDS );
log.info("RocketMQ集群监控启动成功"); }
public void stopMonitoring() { try { monitorScheduler.shutdown(); if (!monitorScheduler.awaitTermination(30, TimeUnit.SECONDS)) { monitorScheduler.shutdownNow(); }
log.info("RocketMQ集群监控停止成功");
} catch (Exception e) { log.error("RocketMQ集群监控停止失败", e); } }
private void monitorCluster() { try { ClusterMetrics metrics = collectClusterMetrics();
ClusterHealthStatus healthStatus = clusterHealthAnalyzer.analyzeHealth(metrics);
if (!healthStatus.isHealthy()) { handleUnhealthyCluster(healthStatus); }
recordMonitoringResult(metrics, healthStatus);
} catch (Exception e) { log.error("集群监控失败", e); } }
private ClusterMetrics collectClusterMetrics() { ClusterMetrics metrics = new ClusterMetrics();
try { collectBasicMetrics(metrics);
collectPerformanceMetrics(metrics);
collectStorageMetrics(metrics);
collectJMXMetrics(metrics);
} catch (Exception e) { log.error("集群指标收集失败", e); }
return metrics; }
private void collectBasicMetrics(ClusterMetrics metrics) { try { ClusterInfo clusterInfo = rocketMQAdminClient.getClusterInfo();
metrics.setClusterName(clusterInfo.getClusterName()); metrics.setBrokerCount(clusterInfo.getBrokerCount()); metrics.setNameServerCount(clusterInfo.getNameServerCount());
List<String> topicNames = rocketMQAdminClient.getAllTopicNames(); metrics.setTopicCount(topicNames.size());
} catch (Exception e) { log.error("基础指标收集失败", e); } }
private void collectPerformanceMetrics(ClusterMetrics metrics) { try { Map<String, Object> jmxMetrics = jmxMetricsCollector.collectPerformanceMetrics();
Double messagesPerSecond = (Double) jmxMetrics.get("RocketMQ.Broker:type=BrokerStats,name=msgPutNums"); metrics.setMessagesPerSecond(messagesPerSecond != null ? messagesPerSecond : 0.0);
Double bytesPerSecond = (Double) jmxMetrics.get("RocketMQ.Broker:type=BrokerStats,name=msgPutSize"); metrics.setBytesPerSecond(bytesPerSecond != null ? bytesPerSecond : 0.0);
Double messagesOutPerSecond = (Double) jmxMetrics.get("RocketMQ.Broker:type=BrokerStats,name=msgGetNums"); metrics.setMessagesOutPerSecond(messagesOutPerSecond != null ? messagesOutPerSecond : 0.0);
Double bytesOutPerSecond = (Double) jmxMetrics.get("RocketMQ.Broker:type=BrokerStats,name=msgGetSize"); metrics.setBytesOutPerSecond(bytesOutPerSecond != null ? bytesOutPerSecond : 0.0);
} catch (Exception e) { log.error("性能指标收集失败", e); } }
private void collectStorageMetrics(ClusterMetrics metrics) { try { Map<String, Object> jmxMetrics = jmxMetricsCollector.collectStorageMetrics();
Long storageSize = (Long) jmxMetrics.get("RocketMQ.Broker:type=BrokerStats,name=storageSize"); metrics.setStorageSize(storageSize != null ? storageSize : 0L);
Long queueCount = (Long) jmxMetrics.get("RocketMQ.Broker:type=BrokerStats,name=queueCount"); metrics.setQueueCount(queueCount != null ? queueCount : 0L);
Long messageCount = (Long) jmxMetrics.get("RocketMQ.Broker:type=BrokerStats,name=messageCount"); metrics.setMessageCount(messageCount != null ? messageCount : 0L);
} catch (Exception e) { log.error("存储指标收集失败", e); } }
private void collectJMXMetrics(ClusterMetrics metrics) { try { Map<String, Object> jmxMetrics = jmxMetricsCollector.collectAllMetrics();
metrics.setJmxMetrics(jmxMetrics);
} catch (Exception e) { log.error("JMX指标收集失败", e); } }
private void handleUnhealthyCluster(ClusterHealthStatus healthStatus) { try { sendClusterAlert(healthStatus);
recordClusterIssue(healthStatus);
} catch (Exception e) { log.error("不健康集群处理失败", e); } }
private void sendClusterAlert(ClusterHealthStatus healthStatus) { ClusterAlert alert = new ClusterAlert(); alert.setAlertType(AlertType.CLUSTER_UNHEALTHY); alert.setSeverity(healthStatus.getSeverity()); alert.setMessage("RocketMQ集群状态异常"); alert.setHealthStatus(healthStatus); alert.setTimestamp(System.currentTimeMillis());
alertService.sendAlert(alert); }
private void recordMonitoringResult(ClusterMetrics metrics, ClusterHealthStatus healthStatus) { ClusterMonitoringResult result = new ClusterMonitoringResult(); result.setTimestamp(System.currentTimeMillis()); result.setMetrics(metrics); result.setHealthStatus(healthStatus);
} }
|
六、企业级RocketMQ集群方案
6.1 集群配置管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
|
@Service public class RocketMQClusterConfigService {
@Autowired private ConfigurationRepository configRepository;
public RocketMQClusterConfig getClusterConfig(String clusterId) { return configRepository.findClusterConfigById(clusterId) .orElseThrow(() -> new ConfigNotFoundException("集群配置不存在: " + clusterId)); }
public void saveClusterConfig(RocketMQClusterConfig config) { try { validateClusterConfig(config);
configRepository.saveClusterConfig(config);
log.info("集群配置保存成功: {}", config.getClusterId());
} catch (Exception e) { log.error("集群配置保存失败", e); throw new ConfigSaveException("集群配置保存失败", e); } }
public void updateClusterConfig(String clusterId, RocketMQClusterConfig config) { try { if (!configRepository.existsClusterConfig(clusterId)) { throw new ConfigNotFoundException("集群配置不存在: " + clusterId); }
validateClusterConfig(config);
config.setClusterId(clusterId); configRepository.saveClusterConfig(config);
log.info("集群配置更新成功: {}", clusterId);
} catch (Exception e) { log.error("集群配置更新失败", e); throw new ConfigUpdateException("集群配置更新失败", e); } }
public void deleteClusterConfig(String clusterId) { try { if (!configRepository.existsClusterConfig(clusterId)) { throw new ConfigNotFoundException("集群配置不存在: " + clusterId); }
configRepository.deleteClusterConfig(clusterId);
log.info("集群配置删除成功: {}", clusterId);
} catch (Exception e) { log.error("集群配置删除失败", e); throw new ConfigDeleteException("集群配置删除失败", e); } }
public List<RocketMQClusterConfig> getAllClusterConfigs() { return configRepository.findAllClusterConfigs(); }
private void validateClusterConfig(RocketMQClusterConfig config) { if (config.getClusterId() == null || config.getClusterId().isEmpty()) { throw new ConfigValidationException("集群ID不能为空"); }
if (config.getClusterName() == null || config.getClusterName().isEmpty()) { throw new ConfigValidationException("集群名称不能为空"); }
if (config.getNameServerConfigs() == null || config.getNameServerConfigs().isEmpty()) { throw new ConfigValidationException("NameServer配置不能为空"); }
if (config.getBrokerConfigs() == null || config.getBrokerConfigs().isEmpty()) { throw new ConfigValidationException("Broker配置不能为空"); }
if (config.getNameServerConfigs().size() < 2) { throw new ConfigValidationException("NameServer集群至少需要2个节点"); }
if (config.getBrokerConfigs().size() < 2) { throw new ConfigValidationException("Broker集群至少需要2个节点"); }
for (NameServerConfig nameServerConfig : config.getNameServerConfigs()) { validateNameServerConfig(nameServerConfig); }
for (BrokerConfig brokerConfig : config.getBrokerConfigs()) { validateBrokerConfig(brokerConfig); } }
private void validateNameServerConfig(NameServerConfig nameServerConfig) { if (nameServerConfig.getNameServerId() == null || nameServerConfig.getNameServerId().isEmpty()) { throw new ConfigValidationException("NameServer ID不能为空"); }
if (nameServerConfig.getHost() == null || nameServerConfig.getHost().isEmpty()) { throw new ConfigValidationException("NameServer主机地址不能为空"); }
if (nameServerConfig.getPort() <= 0 || nameServerConfig.getPort() > 65535) { throw new ConfigValidationException("NameServer端口必须在1-65535之间"); } }
private void validateBrokerConfig(BrokerConfig brokerConfig) { if (brokerConfig.getBrokerId() == null || brokerConfig.getBrokerId().isEmpty()) { throw new ConfigValidationException("Broker ID不能为空"); }
if (brokerConfig.getBrokerName() == null || brokerConfig.getBrokerName().isEmpty()) { throw new ConfigValidationException("Broker名称不能为空"); }
if (brokerConfig.getHost() == null || brokerConfig.getHost().isEmpty()) { throw new ConfigValidationException("Broker主机地址不能为空"); }
if (brokerConfig.getListenPort() <= 0 || brokerConfig.getListenPort() > 65535) { throw new ConfigValidationException("Broker监听端口必须在1-65535之间"); } } }
|
6.2 集群部署脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
@Service public class RocketMQClusterDeploymentScriptGenerator {
public String generateClusterDeploymentScript(RocketMQClusterConfig clusterConfig) { StringBuilder script = new StringBuilder();
try { generateScriptHeader(script);
generateEnvironmentVariables(script, clusterConfig);
generateNameServerStartScript(script, clusterConfig.getNameServerConfigs());
generateBrokerStartScript(script, clusterConfig.getBrokerConfigs());
generateHealthCheckScript(script, clusterConfig);
generateStopScript(script, clusterConfig);
generateMonitoringScript(script, clusterConfig);
} catch (Exception e) { log.error("集群部署脚本生成失败", e); throw new ScriptGenerationException("集群部署脚本生成失败", e); }
return script.toString(); }
private void generateScriptHeader(StringBuilder script) { script.append("#!/bin/bash\n"); script.append("# RocketMQ Cluster Deployment Script\n"); script.append("# Generated by RocketMQ Cluster Manager\n"); script.append("# Date: ").append(new Date()).append("\n"); script.append("\n"); script.append("set -e\n"); script.append("\n"); }
private void generateEnvironmentVariables(StringBuilder script, RocketMQClusterConfig clusterConfig) { script.append("# Environment Variables\n"); script.append("export ROCKETMQ_HOME=").append(clusterConfig.getRocketmqHome()).append("\n"); script.append("export CLUSTER_ID=").append(clusterConfig.getClusterId()).append("\n"); script.append("export CLUSTER_NAME=").append(clusterConfig.getClusterName()).append("\n"); script.append("\n"); }
private void generateNameServerStartScript(StringBuilder script, List<NameServerConfig> nameServerConfigs) { script.append("# Start NameServer Cluster\n"); script.append("start_nameserver_cluster() {\n"); script.append(" echo \"Starting NameServer Cluster...\"\n");
for (NameServerConfig config : nameServerConfigs) { script.append(" echo \"Starting NameServer ").append(config.getNameServerId()).append("...\"\n"); script.append(" ssh ").append(config.getHost()).append(" \"nohup $ROCKETMQ_HOME/bin/mqnamesrv > $ROCKETMQ_HOME/logs/namesrv.log 2>&1 &\"\n"); }
script.append(" echo \"NameServer Cluster started\"\n"); script.append("}\n"); script.append("\n"); }
private void generateBrokerStartScript(StringBuilder script, List<BrokerConfig> brokerConfigs) { script.append("# Start Broker Cluster\n"); script.append("start_broker_cluster() {\n"); script.append(" echo \"Starting Broker Cluster...\"\n");
for (BrokerConfig config : brokerConfigs) { script.append(" echo \"Starting Broker ").append(config.getBrokerId()).append("...\"\n"); script.append(" ssh ").append(config.getHost()).append(" \"nohup $ROCKETMQ_HOME/bin/mqbroker -n ").append(config.getNamesrvAddr()).append(" -c $ROCKETMQ_HOME/conf/broker.conf > $ROCKETMQ_HOME/logs/broker.log 2>&1 &\"\n"); }
script.append(" echo \"Broker Cluster started\"\n"); script.append("}\n"); script.append("\n"); }
private void generateHealthCheckScript(StringBuilder script, RocketMQClusterConfig clusterConfig) { script.append("# Health Check\n"); script.append("health_check() {\n"); script.append(" echo \"Checking cluster health...\"\n");
for (NameServerConfig config : clusterConfig.getNameServerConfigs()) { script.append(" echo \"Checking NameServer ").append(config.getNameServerId()).append("...\"\n"); script.append(" ssh ").append(config.getHost()).append " \"ps aux | grep mqnamesrv | grep -v grep\"\n"); }
for (BrokerConfig config : clusterConfig.getBrokerConfigs()) { script.append(" echo \"Checking Broker ").append(config.getBrokerId()).append("...\"\n"); script.append(" ssh ").append(config.getHost()).append " \"ps aux | grep mqbroker | grep -v grep\"\n"); }
script.append(" echo \"Cluster health check completed\"\n"); script.append("}\n"); script.append("\n"); }
private void generateStopScript(StringBuilder script, RocketMQClusterConfig clusterConfig) { script.append("# Stop RocketMQ Cluster\n"); script.append("stop_rocketmq_cluster() {\n"); script.append(" echo \"Stopping RocketMQ Cluster...\"\n");
for (BrokerConfig config : clusterConfig.getBrokerConfigs()) { script.append(" echo \"Stopping Broker ").append(config.getBrokerId()).append("...\"\n"); script.append(" ssh ").append(config.getHost()).append " \"$ROCKETMQ_HOME/bin/mqshutdown broker\"\n"); }
for (NameServerConfig config : clusterConfig.getNameServerConfigs()) { script.append(" echo \"Stopping NameServer ").append(config.getNameServerId()).append("...\"\n"); script.append(" ssh ").append(config.getHost()).append " \"$ROCKETMQ_HOME/bin/mqshutdown namesrv\"\n"); }
script.append(" echo \"RocketMQ Cluster stopped\"\n"); script.append("}\n"); script.append("\n"); }
private void generateMonitoringScript(StringBuilder script, RocketMQClusterConfig clusterConfig) { script.append("# Monitoring\n"); script.append("monitor_cluster() {\n"); script.append(" echo \"Monitoring cluster...\"\n"); script.append(" $ROCKETMQ_HOME/bin/mqadmin topicList -n ").append(clusterConfig.getNameServerConfigs().get(0).getHost()).append(":").append(clusterConfig.getNameServerConfigs().get(0).getPort()).append("\n"); script.append(" $ROCKETMQ_HOME/bin/mqadmin clusterList -n ").append(clusterConfig.getNameServerConfigs().get(0).getHost()).append(":").append(clusterConfig.getNameServerConfigs().get(0).getPort()).append("\n"); script.append("}\n"); script.append("\n"); } }
|
七、最佳实践与总结
7.1 RocketMQ集群搭建最佳实践
集群规划策略
- 至少2个NameServer节点确保高可用
- 合理规划Broker Master/Slave架构
- 考虑网络拓扑和机架感知
性能优化策略
- 优化JVM参数配置
- 调整网络和存储参数
- 合理设置消息存储策略
监控告警体系
- 建立完善的监控指标
- 设置合理的告警阈值
- 实现自动化运维
安全认证机制
7.2 架构师级RocketMQ运维技能
集群管理能力
- 深入理解RocketMQ架构
- 掌握集群扩展和收缩
- 管理Topic和队列
性能调优能力
- 优化生产者和消费者性能
- 调整Broker参数
- 优化网络和存储
故障处理能力
监控运维能力
7.3 持续改进建议
集群优化
自动化程度提升
知识积累
总结
RocketMQ集群搭建是企业级分布式消息中间件的核心能力,通过高可用的集群部署策略、完善的性能调优机制和系统化的监控管理,能够构建稳定可靠的分布式消息队列系统,保障企业级应用的高并发处理能力。本文从集群架构设计到部署实施,从基础原理到企业级实践,系统梳理了RocketMQ集群搭建的完整解决方案。
关键要点:
- 集群架构设计:高可用的NameServer和Broker集群架构
- 部署实施策略:NameServer管理、Broker管理、配置管理
- Topic管理方案:Topic创建、队列管理、健康检查
- 监控管理体系:集群监控、JMX指标收集、告警机制
- 企业级实践:配置管理、部署脚本、持续改进
通过深入理解这些技术要点,架构师能够设计出完善的RocketMQ集群系统,提升消息队列的稳定性和可靠性,确保企业级应用的高可用性。