1. RocketMQ概述

RocketMQ是阿里巴巴开源的分布式消息中间件,具有高性能、高可靠、高可用的特点。本文将详细介绍RocketMQ消息队列的搭建与开发运维实战,包括集群部署、消息发送、消费处理、性能优化、故障处理的完整解决方案。

1.1 核心功能

  1. 消息发送: 支持同步、异步、单向发送
  2. 消息消费: 支持集群消费和广播消费
  3. 集群管理: NameServer集群、Broker集群管理
  4. 性能优化: 消息存储、网络传输优化
  5. 运维监控: 集群监控、性能监控、故障处理

1.2 技术架构

1
2
3
Producer → NameServer → Broker → Consumer
↓ ↓ ↓ ↓
消息发送 → 路由发现 → 消息存储 → 消息消费

2. 环境准备

2.1 系统要求检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#!/bin/bash
# check_rocketmq_env.sh - RocketMQ环境检查脚本
# @author 运维实战

# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}

# 检查系统版本
check_os_version() {
log "检查系统版本..."

if [ -f /etc/os-release ]; then
. /etc/os-release
log "操作系统: $NAME $VERSION"
else
log "无法确定操作系统版本"
fi
}

# 检查Java版本
check_java_version() {
log "检查Java版本..."

JAVA_VERSION=$(java -version 2>&1 | head -1 | cut -d'"' -f2)
log "Java版本: $JAVA_VERSION"

# 检查Java版本是否符合要求
if [[ $JAVA_VERSION == 1.8* ]] || [[ $JAVA_VERSION == 11* ]] || [[ $JAVA_VERSION == 17* ]]; then
log "Java版本符合要求"
else
log "Java版本不符合要求,需要Java 8+"
exit 1
fi
}

# 检查内存使用情况
check_memory() {
log "检查内存使用情况..."

TOTAL_MEM=$(free -h | grep Mem | awk '{print $2}')
USED_MEM=$(free -h | grep Mem | awk '{print $3}')
AVAIL_MEM=$(free -h | grep Mem | awk '{print $7}')

log "总内存: $TOTAL_MEM"
log "已使用: $USED_MEM"
log "可用内存: $AVAIL_MEM"
}

# 检查磁盘空间
check_disk_space() {
log "检查磁盘空间..."

df -h | grep -E "(/$|/var|/tmp|/home)" | while read line; do
log "磁盘使用: $line"
done
}

# 检查网络连接
check_network() {
log "检查网络连接..."

# 检查端口是否被占用
netstat -tlnp | grep -E "(9876|10911|10912|10909)"

# 检查防火墙状态
if command -v ufw > /dev/null 2>&1; then
ufw status
elif command -v firewall-cmd > /dev/null 2>&1; then
firewall-cmd --list-all
else
iptables -L
fi
}

# 主函数
main() {
log "开始RocketMQ环境检查..."

check_os_version
check_java_version
check_memory
check_disk_space
check_network

log "RocketMQ环境检查完成"
}

# 执行主函数
main "$@"

3. RocketMQ集群部署

3.1 集群部署脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#!/bin/bash
# rocketmq_cluster_deploy.sh - RocketMQ集群部署脚本
# @author 运维实战

# 集群配置
NAMESERVER_SERVERS=(
"192.168.1.10"
"192.168.1.11"
"192.168.1.12"
)

BROKER_SERVERS=(
"192.168.1.10"
"192.168.1.11"
"192.168.1.12"
)

ROCKETMQ_VERSION="4.9.4"
ROCKETMQ_USER="rocketmq"
ROCKETMQ_HOME="/opt/rocketmq"
CLUSTER_CONFIG_DIR="/etc/rocketmq/cluster"

# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}

# 安装RocketMQ
install_rocketmq() {
log "安装RocketMQ $ROCKETMQ_VERSION..."

# 下载RocketMQ源码
cd /tmp
wget https://archive.apache.org/dist/rocketmq/${ROCKETMQ_VERSION}/rocketmq-all-${ROCKETMQ_VERSION}-bin-release.zip
unzip rocketmq-all-${ROCKETMQ_VERSION}-bin-release.zip
mv rocketmq-all-${ROCKETMQ_VERSION}-bin-release $ROCKETMQ_HOME

if [ $? -eq 0 ]; then
log "RocketMQ安装成功"
else
log "RocketMQ安装失败"
exit 1
fi
}

# 创建RocketMQ用户
create_rocketmq_user() {
log "创建RocketMQ用户..."

if ! id $ROCKETMQ_USER &>/dev/null; then
useradd -r -s /bin/false $ROCKETMQ_USER
log "RocketMQ用户创建成功"
else
log "RocketMQ用户已存在"
fi
}

# 创建目录结构
create_directories() {
log "创建目录结构..."

# 创建RocketMQ主目录
mkdir -p $ROCKETMQ_HOME/{bin,conf,logs,store}

# 创建集群配置目录
mkdir -p $CLUSTER_CONFIG_DIR

# 设置权限
chown -R $ROCKETMQ_USER:$ROCKETMQ_USER $ROCKETMQ_HOME
chown -R $ROCKETMQ_USER:$ROCKETMQ_USER $CLUSTER_CONFIG_DIR

log "目录结构创建完成"
}

# 生成NameServer配置文件
generate_nameserver_config() {
log "生成NameServer配置文件..."

for i in "${!NAMESERVER_SERVERS[@]}"; do
server=${NAMESERVER_SERVERS[$i]}
server_id=$((i + 1))

# 生成NameServer配置文件
cat > $CLUSTER_CONFIG_DIR/nameserver-${server_id}.properties << EOF
# NameServer配置文件
rocketmqHome=$ROCKETMQ_HOME
listenPort=9876
serverIP=${server}
EOF

log "NameServer配置文件生成: nameserver-${server_id}.properties"
done
}

# 生成Broker配置文件
generate_broker_config() {
log "生成Broker配置文件..."

for i in "${!BROKER_SERVERS[@]}"; do
server=${BROKER_SERVERS[$i]}
server_id=$((i + 1))

# 生成Broker配置文件
cat > $CLUSTER_CONFIG_DIR/broker-${server_id}.properties << EOF
# Broker配置文件
brokerClusterName=DefaultCluster
brokerName=broker-${server_id}
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
namesrvAddr=192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876
storePathRootDir=$ROCKETMQ_HOME/store
storePathCommitLog=$ROCKETMQ_HOME/store/commitlog
storePathConsumeQueue=$ROCKETMQ_HOME/store/consumequeue
storePathIndex=$ROCKETMQ_HOME/store/index
maxMessageSize=4194304
maxTransferBytesOnMessageInMemory=262144
maxTransferCountOnMessageInMemory=32
maxTransferBytesOnMessageInDisk=65536
maxTransferCountOnMessageInDisk=8
EOF

log "Broker配置文件生成: broker-${server_id}.properties"
done
}

# 启动NameServer
start_nameserver() {
log "启动NameServer..."

for i in "${!NAMESERVER_SERVERS[@]}"; do
server=${NAMESERVER_SERVERS[$i]}
server_id=$((i + 1))

# 启动NameServer
nohup $ROCKETMQ_HOME/bin/mqnamesrv -c $CLUSTER_CONFIG_DIR/nameserver-${server_id}.properties > $ROCKETMQ_HOME/logs/namesrv-${server_id}.log 2>&1 &

if [ $? -eq 0 ]; then
log "NameServer启动成功: $server"
else
log "NameServer启动失败: $server"
exit 1
fi
done
}

# 启动Broker
start_broker() {
log "启动Broker..."

# 等待NameServer启动
sleep 10

for i in "${!BROKER_SERVERS[@]}"; do
server=${BROKER_SERVERS[$i]}
server_id=$((i + 1))

# 启动Broker
nohup $ROCKETMQ_HOME/bin/mqbroker -c $CLUSTER_CONFIG_DIR/broker-${server_id}.properties > $ROCKETMQ_HOME/logs/broker-${server_id}.log 2>&1 &

if [ $? -eq 0 ]; then
log "Broker启动成功: $server"
else
log "Broker启动失败: $server"
exit 1
fi
done
}

# 验证集群
verify_cluster() {
log "验证RocketMQ集群..."

# 等待服务启动
sleep 15

# 检查NameServer状态
for server in "${NAMESERVER_SERVERS[@]}"; do
if nc -z $server 9876; then
log "NameServer状态正常: $server:9876"
else
log "NameServer状态异常: $server:9876"
fi
done

# 检查Broker状态
for server in "${BROKER_SERVERS[@]}"; do
if nc -z $server 10911; then
log "Broker状态正常: $server:10911"
else
log "Broker状态异常: $server:10911"
fi
done

# 检查集群信息
$ROCKETMQ_HOME/bin/mqadmin clusterList -n 192.168.1.10:9876

log "集群验证完成"
}

# 主函数
main() {
log "开始RocketMQ集群部署..."

# 安装RocketMQ
install_rocketmq

# 创建RocketMQ用户
create_rocketmq_user

# 创建目录结构
create_directories

# 生成配置文件
generate_nameserver_config
generate_broker_config

# 启动服务
start_nameserver
start_broker

# 验证集群
verify_cluster

log "RocketMQ集群部署完成"
}

# 执行主函数
main "$@"

4. Java开发实战

4.1 Maven依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<!-- pom.xml -->
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.rocketmq</groupId>
<artifactId>rocketmq-demo</artifactId>
<version>1.0.0</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
</parent>

<properties>
<java.version>11</java.version>
<rocketmq.version>4.9.4</rocketmq.version>
</properties>

<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>

<!-- RocketMQ Client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>

<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

4.2 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# application.yml
server:
port: 8080

spring:
application:
name: rocketmq-demo

# RocketMQ配置
rocketmq:
name-server: 192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876
producer:
group: demo-producer-group
send-message-timeout: 3000
compress-message-body-threshold: 4096
max-message-size: 4194304
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
retry-another-broker-when-not-store-ok: false
consumer:
group: demo-consumer-group
consume-timeout: 15000
max-reconsume-times: 3
consume-message-batch-max-size: 1
pull-batch-size: 32
pull-interval: 0
pull-threshold-for-queue: 1000
pull-threshold-size-for-queue: 100
pull-threshold-for-topic: -1
pull-threshold-size-for-topic: -1

4.3 消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package com.rocketmq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;

/**
* 消息生产者
* @author 运维实战
*/
@Slf4j
@Component
public class MessageProducer {

@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
* 发送同步消息
*/
public void sendSyncMessage(String topic, String message) {
try {
log.info("发送同步消息: topic={}, message={}", topic, message);

// 发送同步消息
rocketMQTemplate.syncSend(topic, message);

log.info("同步消息发送成功: topic={}", topic);

} catch (Exception e) {
log.error("同步消息发送失败: topic={}", topic, e);
throw new RuntimeException("同步消息发送失败", e);
}
}

/**
* 发送异步消息
*/
public void sendAsyncMessage(String topic, String message) {
try {
log.info("发送异步消息: topic={}, message={}", topic, message);

// 发送异步消息
rocketMQTemplate.asyncSend(topic, message, new org.apache.rocketmq.client.producer.SendCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
log.info("异步消息发送成功: topic={}, msgId={}", topic, sendResult.getMsgId());
}

@Override
public void onException(Throwable e) {
log.error("异步消息发送失败: topic={}", topic, e);
}
});

} catch (Exception e) {
log.error("异步消息发送失败: topic={}", topic, e);
throw new RuntimeException("异步消息发送失败", e);
}
}

/**
* 发送单向消息
*/
public void sendOneWayMessage(String topic, String message) {
try {
log.info("发送单向消息: topic={}, message={}", topic, message);

// 发送单向消息
rocketMQTemplate.sendOneWay(topic, message);

log.info("单向消息发送成功: topic={}", topic);

} catch (Exception e) {
log.error("单向消息发送失败: topic={}", topic, e);
throw new RuntimeException("单向消息发送失败", e);
}
}

/**
* 发送延迟消息
*/
public void sendDelayMessage(String topic, String message, int delayLevel) {
try {
log.info("发送延迟消息: topic={}, message={}, delayLevel={}", topic, message, delayLevel);

// 发送延迟消息
rocketMQTemplate.syncSend(topic, message, 3000, delayLevel);

log.info("延迟消息发送成功: topic={}, delayLevel={}", topic, delayLevel);

} catch (Exception e) {
log.error("延迟消息发送失败: topic={}", topic, e);
throw new RuntimeException("延迟消息发送失败", e);
}
}

/**
* 发送顺序消息
*/
public void sendOrderMessage(String topic, String message, String orderKey) {
try {
log.info("发送顺序消息: topic={}, message={}, orderKey={}", topic, message, orderKey);

// 发送顺序消息
rocketMQTemplate.syncSendOrderly(topic, message, orderKey);

log.info("顺序消息发送成功: topic={}, orderKey={}", topic, orderKey);

} catch (Exception e) {
log.error("顺序消息发送失败: topic={}", topic, e);
throw new RuntimeException("顺序消息发送失败", e);
}
}

/**
* 发送事务消息
*/
public void sendTransactionMessage(String topic, String message) {
try {
log.info("发送事务消息: topic={}, message={}", topic, message);

// 发送事务消息
rocketMQTemplate.sendMessageInTransaction(topic, message, null);

log.info("事务消息发送成功: topic={}", topic);

} catch (Exception e) {
log.error("事务消息发送失败: topic={}", topic, e);
throw new RuntimeException("事务消息发送失败", e);
}
}

/**
* 发送批量消息
*/
public void sendBatchMessage(String topic, String[] messages) {
try {
log.info("发送批量消息: topic={}, count={}", topic, messages.length);

// 发送批量消息
for (String message : messages) {
rocketMQTemplate.syncSend(topic, message);
}

log.info("批量消息发送成功: topic={}, count={}", topic, messages.length);

} catch (Exception e) {
log.error("批量消息发送失败: topic={}", topic, e);
throw new RuntimeException("批量消息发送失败", e);
}
}

/**
* 发送带标签的消息
*/
public void sendTagMessage(String topic, String tag, String message) {
try {
log.info("发送带标签的消息: topic={}, tag={}, message={}", topic, tag, message);

// 发送带标签的消息
rocketMQTemplate.syncSend(topic + ":" + tag, message);

log.info("带标签的消息发送成功: topic={}, tag={}", topic, tag);

} catch (Exception e) {
log.error("带标签的消息发送失败: topic={}", topic, e);
throw new RuntimeException("带标签的消息发送失败", e);
}
}
}

4.4 消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package com.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

/**
* 消息消费者
* @author 运维实战
*/
@Slf4j
@Component
public class MessageConsumer {

/**
* 消费普通消息
*/
@RocketMQMessageListener(
topic = "demo-topic",
consumerGroup = "demo-consumer-group"
)
public class DemoMessageConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
try {
log.info("消费普通消息: {}", message);

// 处理业务逻辑
processMessage(message);

log.info("普通消息处理完成: {}", message);

} catch (Exception e) {
log.error("普通消息处理失败: {}", message, e);
throw new RuntimeException("普通消息处理失败", e);
}
}
}

/**
* 消费延迟消息
*/
@RocketMQMessageListener(
topic = "delay-topic",
consumerGroup = "delay-consumer-group"
)
public class DelayMessageConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
try {
log.info("消费延迟消息: {}", message);

// 处理延迟消息
processDelayMessage(message);

log.info("延迟消息处理完成: {}", message);

} catch (Exception e) {
log.error("延迟消息处理失败: {}", message, e);
throw new RuntimeException("延迟消息处理失败", e);
}
}
}

/**
* 消费顺序消息
*/
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
consumeMode = org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING,
messageModel = org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING,
consumeOrderly = true
)
public class OrderMessageConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
try {
log.info("消费顺序消息: {}", message);

// 处理顺序消息
processOrderMessage(message);

log.info("顺序消息处理完成: {}", message);

} catch (Exception e) {
log.error("顺序消息处理失败: {}", message, e);
throw new RuntimeException("顺序消息处理失败", e);
}
}
}

/**
* 消费事务消息
*/
@RocketMQMessageListener(
topic = "transaction-topic",
consumerGroup = "transaction-consumer-group"
)
public class TransactionMessageConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
try {
log.info("消费事务消息: {}", message);

// 处理事务消息
processTransactionMessage(message);

log.info("事务消息处理完成: {}", message);

} catch (Exception e) {
log.error("事务消息处理失败: {}", message, e);
throw new RuntimeException("事务消息处理失败", e);
}
}
}

/**
* 消费带标签的消息
*/
@RocketMQMessageListener(
topic = "tag-topic",
consumerGroup = "tag-consumer-group",
selectorExpression = "tag1 || tag2"
)
public class TagMessageConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
try {
log.info("消费带标签的消息: {}", message);

// 处理带标签的消息
processTagMessage(message);

log.info("带标签的消息处理完成: {}", message);

} catch (Exception e) {
log.error("带标签的消息处理失败: {}", message, e);
throw new RuntimeException("带标签的消息处理失败", e);
}
}
}

/**
* 处理普通消息
*/
private void processMessage(String message) {
// 模拟业务处理
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

log.info("普通消息处理完成: {}", message);
}

/**
* 处理延迟消息
*/
private void processDelayMessage(String message) {
// 模拟延迟消息处理
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

log.info("延迟消息处理完成: {}", message);
}

/**
* 处理顺序消息
*/
private void processOrderMessage(String message) {
// 模拟顺序消息处理
try {
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

log.info("顺序消息处理完成: {}", message);
}

/**
* 处理事务消息
*/
private void processTransactionMessage(String message) {
// 模拟事务消息处理
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

log.info("事务消息处理完成: {}", message);
}

/**
* 处理带标签的消息
*/
private void processTagMessage(String message) {
// 模拟带标签的消息处理
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

log.info("带标签的消息处理完成: {}", message);
}
}

4.5 控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package com.rocketmq.controller;

import com.rocketmq.producer.MessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;

/**
* RocketMQ控制器
* @author 运维实战
*/
@Slf4j
@RestController
@RequestMapping("/api/rocketmq")
public class RocketMQController {

@Autowired
private MessageProducer messageProducer;

/**
* 发送同步消息
*/
@PostMapping("/send/sync")
public ResponseEntity<Map<String, Object>> sendSyncMessage(
@RequestParam String topic,
@RequestParam String message) {
try {
messageProducer.sendSyncMessage(topic, message);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "同步消息发送成功");
result.put("timestamp", LocalDateTime.now());

return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("发送同步消息失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 发送异步消息
*/
@PostMapping("/send/async")
public ResponseEntity<Map<String, Object>> sendAsyncMessage(
@RequestParam String topic,
@RequestParam String message) {
try {
messageProducer.sendAsyncMessage(topic, message);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "异步消息发送成功");
result.put("timestamp", LocalDateTime.now());

return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("发送异步消息失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 发送单向消息
*/
@PostMapping("/send/oneway")
public ResponseEntity<Map<String, Object>> sendOneWayMessage(
@RequestParam String topic,
@RequestParam String message) {
try {
messageProducer.sendOneWayMessage(topic, message);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "单向消息发送成功");
result.put("timestamp", LocalDateTime.now());

return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("发送单向消息失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 发送延迟消息
*/
@PostMapping("/send/delay")
public ResponseEntity<Map<String, Object>> sendDelayMessage(
@RequestParam String topic,
@RequestParam String message,
@RequestParam(defaultValue = "1") int delayLevel) {
try {
messageProducer.sendDelayMessage(topic, message, delayLevel);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "延迟消息发送成功");
result.put("delayLevel", delayLevel);
result.put("timestamp", LocalDateTime.now());

return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("发送延迟消息失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 发送顺序消息
*/
@PostMapping("/send/order")
public ResponseEntity<Map<String, Object>> sendOrderMessage(
@RequestParam String topic,
@RequestParam String message,
@RequestParam String orderKey) {
try {
messageProducer.sendOrderMessage(topic, message, orderKey);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "顺序消息发送成功");
result.put("orderKey", orderKey);
result.put("timestamp", LocalDateTime.now());

return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("发送顺序消息失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 发送事务消息
*/
@PostMapping("/send/transaction")
public ResponseEntity<Map<String, Object>> sendTransactionMessage(
@RequestParam String topic,
@RequestParam String message) {
try {
messageProducer.sendTransactionMessage(topic, message);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "事务消息发送成功");
result.put("timestamp", LocalDateTime.now());

return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("发送事务消息失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 发送批量消息
*/
@PostMapping("/send/batch")
public ResponseEntity<Map<String, Object>> sendBatchMessage(
@RequestParam String topic,
@RequestBody String[] messages) {
try {
messageProducer.sendBatchMessage(topic, messages);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "批量消息发送成功");
result.put("count", messages.length);
result.put("timestamp", LocalDateTime.now());

return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("发送批量消息失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 发送带标签的消息
*/
@PostMapping("/send/tag")
public ResponseEntity<Map<String, Object>> sendTagMessage(
@RequestParam String topic,
@RequestParam String tag,
@RequestParam String message) {
try {
messageProducer.sendTagMessage(topic, tag, message);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "带标签的消息发送成功");
result.put("tag", tag);
result.put("timestamp", LocalDateTime.now());

return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("发送带标签的消息失败", e);
return ResponseEntity.internalServerError().build();
}
}
}

5. 性能优化脚本

5.1 性能调优脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/bin/bash
# rocketmq_performance_tune.sh - RocketMQ性能调优脚本
# @author 运维实战

# 配置参数
ROCKETMQ_HOME="/opt/rocketmq"
NAMESERVER_ADDR="192.168.1.10:9876"
LOG_FILE="/var/log/rocketmq_performance_tune.log"

# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a $LOG_FILE
}

# 分析RocketMQ性能
analyze_performance() {
log "分析RocketMQ性能..."

# 获取集群信息
$ROCKETMQ_HOME/bin/mqadmin clusterList -n $NAMESERVER_ADDR

# 获取Topic信息
$ROCKETMQ_HOME/bin/mqadmin topicList -n $NAMESERVER_ADDR

# 获取Broker信息
$ROCKETMQ_HOME/bin/mqadmin brokerStatus -n $NAMESERVER_ADDR -b 192.168.1.10:10911
}

# 优化Broker配置
optimize_broker_config() {
log "优化Broker配置..."

# 设置Broker配置
$ROCKETMQ_HOME/bin/mqadmin updateBrokerConfig -n $NAMESERVER_ADDR -b 192.168.1.10:10911 -k maxMessageSize -v 4194304
$ROCKETMQ_HOME/bin/mqadmin updateBrokerConfig -n $NAMESERVER_ADDR -b 192.168.1.10:10911 -k flushDiskType -v ASYNC_FLUSH
$ROCKETMQ_HOME/bin/mqadmin updateBrokerConfig -n $NAMESERVER_ADDR -b 192.168.1.10:10911 -k brokerRole -v ASYNC_MASTER

log "Broker配置优化完成"
}

# 优化Topic配置
optimize_topic_config() {
log "优化Topic配置..."

# 获取所有Topic
TOPICS=$($ROCKETMQ_HOME/bin/mqadmin topicList -n $NAMESERVER_ADDR | grep -v "Topic" | grep -v "Total")

for topic in $TOPICS; do
log "优化Topic: $topic"

# 设置Topic配置
$ROCKETMQ_HOME/bin/mqadmin updateTopic -n $NAMESERVER_ADDR -t $topic -c DefaultCluster -r 3 -w 3

if [ $? -eq 0 ]; then
log "Topic $topic 配置优化成功"
else
log "Topic $topic 配置优化失败"
fi
done
}

# 优化消费者组配置
optimize_consumer_config() {
log "优化消费者组配置..."

# 获取所有消费者组
CONSUMER_GROUPS=$($ROCKETMQ_HOME/bin/mqadmin consumerProgress -n $NAMESERVER_ADDR | grep "Consumer Group" | awk '{print $3}')

for group in $CONSUMER_GROUPS; do
log "优化消费者组: $group"

# 设置消费者组配置
$ROCKETMQ_HOME/bin/mqadmin updateSubGroup -n $NAMESERVER_ADDR -g $group -c DefaultCluster

if [ $? -eq 0 ]; then
log "消费者组 $group 配置优化成功"
else
log "消费者组 $group 配置优化失败"
fi
done
}

# 监控性能指标
monitor_performance() {
log "监控性能指标..."

# 监控集群状态
$ROCKETMQ_HOME/bin/mqadmin clusterList -n $NAMESERVER_ADDR

# 监控Topic状态
$ROCKETMQ_HOME/bin/mqadmin topicStatus -n $NAMESERVER_ADDR -t demo-topic

# 监控消费者组状态
$ROCKETMQ_HOME/bin/mqadmin consumerProgress -n $NAMESERVER_ADDR

# 监控Broker状态
$ROCKETMQ_HOME/bin/mqadmin brokerStatus -n $NAMESERVER_ADDR -b 192.168.1.10:10911
}

# 主函数
main() {
log "开始RocketMQ性能调优..."

# 分析性能
analyze_performance

# 优化Broker配置
optimize_broker_config

# 优化Topic配置
optimize_topic_config

# 优化消费者组配置
optimize_consumer_config

# 监控性能指标
monitor_performance

log "RocketMQ性能调优完成"
}

# 执行主函数
main "$@"

6. 总结

RocketMQ消息队列搭建与开发运维是运维工作中的重要组成部分。通过本文的详细介绍,我们了解了:

  1. 集群部署: RocketMQ集群的部署、配置和管理
  2. 消息发送: 支持同步、异步、单向、延迟、顺序、事务消息
  3. 消息消费: 支持集群消费和广播消费
  4. 性能优化: 系统性的性能调优策略
  5. 运维监控: 集群监控、性能监控、故障处理

通过合理的运维策略和工具,可以确保RocketMQ消息队列的稳定运行和高性能。


运维实战要点:

  • 集群部署时注意网络配置和防火墙设置
  • 定期监控集群状态,及时发现问题
  • 性能调优需要根据实际业务场景进行
  • 故障处理要有完整的诊断流程
  • 消息发送和消费要有完善的错误处理

代码注解说明:

  • 日志函数: 统一日志格式,便于问题追踪
  • 错误处理: 完善的错误检查和异常处理
  • 配置管理: 灵活的配置参数管理
  • 监控告警: 实时监控和告警机制
  • 自动化运维: 脚本化运维提高效率