1. 队列消费堆积LAG监控架构概述

队列消费堆积LAG(Lag)是衡量消息队列系统健康状态的关键指标,它表示消费者落后于生产者的程度。作为架构师,深入理解LAG监控的原理、影响和处理策略,对于构建高性能、高可用的消息队列系统至关重要。本文从架构师的角度深入分析队列消费堆积LAG的实现原理、监控策略和优化方案,为企业级应用提供完整的队列消费监控解决方案。

1.1 队列消费堆积LAG架构设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────────────────────────────────────────────────┐
│ 生产者层 │
│ (消息生产、批量发送、性能监控) │
├─────────────────────────────────────────────────────────┤
│ 队列层 │
│ (消息存储、分区管理、LAG计算) │
├─────────────────────────────────────────────────────────┤
│ 消费者层 │
│ (消息消费、批量处理、消费监控) │
├─────────────────────────────────────────────────────────┤
│ 监控层 │
│ (LAG监控、告警通知、性能分析) │
├─────────────────────────────────────────────────────────┤
│ 优化层 │
│ (动态扩缩容、负载均衡、性能调优) │
└─────────────────────────────────────────────────────────┘

1.2 LAG监控关键指标

  1. 消费延迟: 消息生产到消费的时间差、P95/P99延迟
  2. 积压数量: 未消费消息数量、队列深度
  3. 消费速率: 每秒消费消息数、消费吞吐量
  4. 消费者健康: 消费者状态、连接数、处理能力
  5. 系统资源: CPU使用率、内存使用率、网络IO

2. LAG监控框架实现

2.1 LAG监控核心组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
/**
* LAG监控框架
* 实现队列消费堆积监控的核心功能
*/
public class LagMonitoringFramework {

/**
* LAG指标
*/
public static class LagMetrics {
private String topic;
private int partition;
private long producerOffset;
private long consumerOffset;
private long lag;
private long timestamp;
private String consumerGroup;

public LagMetrics(String topic, int partition, long producerOffset, long consumerOffset, String consumerGroup) {
this.topic = topic;
this.partition = partition;
this.producerOffset = producerOffset;
this.consumerOffset = consumerOffset;
this.lag = producerOffset - consumerOffset;
this.timestamp = System.currentTimeMillis();
this.consumerGroup = consumerGroup;
}

// getters and setters
public String getTopic() { return topic; }
public int getPartition() { return partition; }
public long getProducerOffset() { return producerOffset; }
public long getConsumerOffset() { return consumerOffset; }
public long getLag() { return lag; }
public long getTimestamp() { return timestamp; }
public String getConsumerGroup() { return consumerGroup; }

public double getLagPercentage() {
return producerOffset > 0 ? (double) lag / producerOffset * 100 : 0;
}

public boolean isHealthy() {
return lag < 1000; // 健康阈值
}

public boolean isWarning() {
return lag >= 1000 && lag < 10000; // 警告阈值
}

public boolean isCritical() {
return lag >= 10000; // 严重阈值
}
}

/**
* LAG监控器
*/
public static class LagMonitor {
private final Map<String, Map<Integer, LagMetrics>> lagMetrics;
private final ScheduledExecutorService scheduler;
private final LagAlertManager alertManager;
private final LagAnalyzer analyzer;
private final AtomicBoolean running;

public LagMonitor(LagAlertManager alertManager, LagAnalyzer analyzer) {
this.lagMetrics = new ConcurrentHashMap<>();
this.scheduler = Executors.newScheduledThreadPool(2);
this.alertManager = alertManager;
this.analyzer = analyzer;
this.running = new AtomicBoolean(false);
}

public void start() {
if (running.compareAndSet(false, true)) {
// 启动LAG监控任务
scheduler.scheduleAtFixedRate(this::collectLagMetrics, 0, 10, TimeUnit.SECONDS);

// 启动LAG分析任务
scheduler.scheduleAtFixedRate(this::analyzeLagTrends, 0, 60, TimeUnit.SECONDS);
}
}

public void stop() {
if (running.compareAndSet(true, false)) {
scheduler.shutdown();
}
}

public void updateLagMetrics(String topic, int partition, long producerOffset, long consumerOffset, String consumerGroup) {
LagMetrics metrics = new LagMetrics(topic, partition, producerOffset, consumerOffset, consumerGroup);

lagMetrics.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()).put(partition, metrics);

// 检查告警条件
checkAlertConditions(metrics);
}

private void collectLagMetrics() {
// 收集LAG指标的逻辑
System.out.println("Collecting LAG metrics...");
}

private void analyzeLagTrends() {
// 分析LAG趋势的逻辑
analyzer.analyzeTrends(lagMetrics);
}

private void checkAlertConditions(LagMetrics metrics) {
if (metrics.isCritical()) {
alertManager.sendCriticalAlert(metrics);
} else if (metrics.isWarning()) {
alertManager.sendWarningAlert(metrics);
}
}

public LagMetrics getLagMetrics(String topic, int partition) {
Map<Integer, LagMetrics> topicMetrics = lagMetrics.get(topic);
return topicMetrics != null ? topicMetrics.get(partition) : null;
}

public List<LagMetrics> getAllLagMetrics() {
return lagMetrics.values().stream()
.flatMap(map -> map.values().stream())
.collect(Collectors.toList());
}

public Map<String, Long> getTopicLagSummary() {
Map<String, Long> summary = new HashMap<>();
lagMetrics.forEach((topic, partitions) -> {
long totalLag = partitions.values().stream()
.mapToLong(LagMetrics::getLag)
.sum();
summary.put(topic, totalLag);
});
return summary;
}
}

/**
* LAG告警管理器
*/
public static class LagAlertManager {
private final List<AlertHandler> alertHandlers;
private final Map<String, Long> lastAlertTime;
private final long alertCooldown;

public LagAlertManager() {
this.alertHandlers = new ArrayList<>();
this.lastAlertTime = new ConcurrentHashMap<>();
this.alertCooldown = 300000; // 5分钟冷却时间
}

public void addAlertHandler(AlertHandler handler) {
alertHandlers.add(handler);
}

public void sendWarningAlert(LagMetrics metrics) {
String alertKey = metrics.getTopic() + ":" + metrics.getPartition();
if (shouldSendAlert(alertKey)) {
Alert alert = new Alert(AlertLevel.WARNING, "LAG Warning",
"Topic: " + metrics.getTopic() + ", Partition: " + metrics.getPartition() +
", Lag: " + metrics.getLag(), metrics);

for (AlertHandler handler : alertHandlers) {
handler.handleAlert(alert);
}

lastAlertTime.put(alertKey, System.currentTimeMillis());
}
}

public void sendCriticalAlert(LagMetrics metrics) {
String alertKey = metrics.getTopic() + ":" + metrics.getPartition();
if (shouldSendAlert(alertKey)) {
Alert alert = new Alert(AlertLevel.CRITICAL, "LAG Critical",
"Topic: " + metrics.getTopic() + ", Partition: " + metrics.getPartition() +
", Lag: " + metrics.getLag(), metrics);

for (AlertHandler handler : alertHandlers) {
handler.handleAlert(alert);
}

lastAlertTime.put(alertKey, System.currentTimeMillis());
}
}

private boolean shouldSendAlert(String alertKey) {
Long lastTime = lastAlertTime.get(alertKey);
return lastTime == null || System.currentTimeMillis() - lastTime > alertCooldown;
}
}

/**
* LAG分析器
*/
public static class LagAnalyzer {
private final Map<String, List<LagMetrics>> historicalData;
private final int maxHistorySize;

public LagAnalyzer(int maxHistorySize) {
this.historicalData = new ConcurrentHashMap<>();
this.maxHistorySize = maxHistorySize;
}

public void analyzeTrends(Map<String, Map<Integer, LagMetrics>> currentMetrics) {
// 存储历史数据
currentMetrics.forEach((topic, partitions) -> {
partitions.forEach((partition, metrics) -> {
String key = topic + ":" + partition;
historicalData.computeIfAbsent(key, k -> new ArrayList<>()).add(metrics);

// 限制历史数据大小
List<LagMetrics> history = historicalData.get(key);
if (history.size() > maxHistorySize) {
history.remove(0);
}
});
});

// 分析趋势
analyzeLagTrends();
}

private void analyzeLagTrends() {
historicalData.forEach((key, history) -> {
if (history.size() >= 3) {
LagTrend trend = calculateTrend(history);
if (trend.isIncreasing()) {
System.out.println("LAG increasing trend detected for " + key + ": " + trend);
}
}
});
}

private LagTrend calculateTrend(List<LagMetrics> history) {
if (history.size() < 2) {
return LagTrend.STABLE;
}

long firstLag = history.get(0).getLag();
long lastLag = history.get(history.size() - 1).getLag();

if (lastLag > firstLag * 1.2) {
return LagTrend.INCREASING;
} else if (lastLag < firstLag * 0.8) {
return LagTrend.DECREASING;
} else {
return LagTrend.STABLE;
}
}

public Map<String, LagTrend> getTrendAnalysis() {
Map<String, LagTrend> trends = new HashMap<>();
historicalData.forEach((key, history) -> {
if (history.size() >= 2) {
trends.put(key, calculateTrend(history));
}
});
return trends;
}
}

/**
* LAG趋势枚举
*/
public enum LagTrend {
INCREASING, DECREASING, STABLE;

public boolean isIncreasing() {
return this == INCREASING;
}

public boolean isDecreasing() {
return this == DECREASING;
}

public boolean isStable() {
return this == STABLE;
}
}

/**
* 告警接口
*/
public interface AlertHandler {
void handleAlert(Alert alert);
}

/**
* 告警级别
*/
public enum AlertLevel {
INFO, WARNING, CRITICAL
}

/**
* 告警
*/
public static class Alert {
private AlertLevel level;
private String title;
private String message;
private LagMetrics metrics;
private long timestamp;

public Alert(AlertLevel level, String title, String message, LagMetrics metrics) {
this.level = level;
this.title = title;
this.message = message;
this.metrics = metrics;
this.timestamp = System.currentTimeMillis();
}

// getters
public AlertLevel getLevel() { return level; }
public String getTitle() { return title; }
public String getMessage() { return message; }
public LagMetrics getMetrics() { return metrics; }
public long getTimestamp() { return timestamp; }
}
}

/**
* 邮件告警处理器
*/
@Component
public class EmailAlertHandler implements LagMonitoringFramework.AlertHandler {
private final EmailService emailService;
private final String[] recipients;

public EmailAlertHandler(EmailService emailService, String[] recipients) {
this.emailService = emailService;
this.recipients = recipients;
}

@Override
public void handleAlert(LagMonitoringFramework.Alert alert) {
try {
String subject = "[" + alert.getLevel() + "] " + alert.getTitle();
String body = buildEmailBody(alert);

for (String recipient : recipients) {
emailService.sendEmail(recipient, subject, body);
}

System.out.println("Email alert sent: " + alert.getTitle());
} catch (Exception e) {
System.err.println("Failed to send email alert: " + e.getMessage());
}
}

private String buildEmailBody(LagMonitoringFramework.Alert alert) {
StringBuilder body = new StringBuilder();
body.append("Alert Details:\n");
body.append("Level: ").append(alert.getLevel()).append("\n");
body.append("Title: ").append(alert.getTitle()).append("\n");
body.append("Message: ").append(alert.getMessage()).append("\n");
body.append("Timestamp: ").append(new Date(alert.getTimestamp())).append("\n");

if (alert.getMetrics() != null) {
LagMonitoringFramework.LagMetrics metrics = alert.getMetrics();
body.append("\nLAG Metrics:\n");
body.append("Topic: ").append(metrics.getTopic()).append("\n");
body.append("Partition: ").append(metrics.getPartition()).append("\n");
body.append("Producer Offset: ").append(metrics.getProducerOffset()).append("\n");
body.append("Consumer Offset: ").append(metrics.getConsumerOffset()).append("\n");
body.append("Lag: ").append(metrics.getLag()).append("\n");
body.append("Consumer Group: ").append(metrics.getConsumerGroup()).append("\n");
}

return body.toString();
}
}

/**
* 钉钉告警处理器
*/
@Component
public class DingTalkAlertHandler implements LagMonitoringFramework.AlertHandler {
private final DingTalkService dingTalkService;
private final String webhookUrl;

public DingTalkAlertHandler(DingTalkService dingTalkService, String webhookUrl) {
this.dingTalkService = dingTalkService;
this.webhookUrl = webhookUrl;
}

@Override
public void handleAlert(LagMonitoringFramework.Alert alert) {
try {
DingTalkMessage message = buildDingTalkMessage(alert);
dingTalkService.sendMessage(webhookUrl, message);

System.out.println("DingTalk alert sent: " + alert.getTitle());
} catch (Exception e) {
System.err.println("Failed to send DingTalk alert: " + e.getMessage());
}
}

private DingTalkMessage buildDingTalkMessage(LagMonitoringFramework.Alert alert) {
DingTalkMessage message = new DingTalkMessage();
message.setMsgType("text");

StringBuilder content = new StringBuilder();
content.append("【").append(alert.getLevel()).append("】").append(alert.getTitle()).append("\n");
content.append("消息: ").append(alert.getMessage()).append("\n");
content.append("时间: ").append(new Date(alert.getTimestamp())).append("\n");

if (alert.getMetrics() != null) {
LagMonitoringFramework.LagMetrics metrics = alert.getMetrics();
content.append("主题: ").append(metrics.getTopic()).append("\n");
content.append("分区: ").append(metrics.getPartition()).append("\n");
content.append("积压: ").append(metrics.getLag()).append("\n");
}

message.setText(content.toString());
return message;
}
}

/**
* 钉钉消息
*/
class DingTalkMessage {
private String msgType;
private String text;

// getters and setters
public String getMsgType() { return msgType; }
public void setMsgType(String msgType) { this.msgType = msgType; }
public String getText() { return text; }
public void setText(String text) { this.text = text; }
}

2.2 消费性能监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
/**
* 消费性能监控器
* 监控消费者性能和消费速率
*/
public class ConsumerPerformanceMonitor {
private final Map<String, ConsumerMetrics> consumerMetrics;
private final ScheduledExecutorService scheduler;
private final AtomicBoolean running;

public ConsumerPerformanceMonitor() {
this.consumerMetrics = new ConcurrentHashMap<>();
this.scheduler = Executors.newScheduledThreadPool(2);
this.running = new AtomicBoolean(false);
}

public void start() {
if (running.compareAndSet(false, true)) {
// 启动性能监控任务
scheduler.scheduleAtFixedRate(this::collectMetrics, 0, 5, TimeUnit.SECONDS);

// 启动性能分析任务
scheduler.scheduleAtFixedRate(this::analyzePerformance, 0, 30, TimeUnit.SECONDS);
}
}

public void stop() {
if (running.compareAndSet(true, false)) {
scheduler.shutdown();
}
}

public void recordConsumption(String consumerId, String topic, int partition, long offset, long processingTime) {
String key = consumerId + ":" + topic + ":" + partition;
ConsumerMetrics metrics = consumerMetrics.computeIfAbsent(key, k -> new ConsumerMetrics(consumerId, topic, partition));

metrics.recordConsumption(offset, processingTime);
}

private void collectMetrics() {
// 收集系统资源指标
System.out.println("Collecting consumer performance metrics...");
}

private void analyzePerformance() {
consumerMetrics.forEach((key, metrics) -> {
if (metrics.isPerformanceDegraded()) {
System.out.println("Performance degradation detected for " + key + ": " + metrics);
}
});
}

public ConsumerMetrics getConsumerMetrics(String consumerId, String topic, int partition) {
String key = consumerId + ":" + topic + ":" + partition;
return consumerMetrics.get(key);
}

public List<ConsumerMetrics> getAllConsumerMetrics() {
return new ArrayList<>(consumerMetrics.values());
}

public Map<String, Double> getConsumerThroughput() {
Map<String, Double> throughput = new HashMap<>();
consumerMetrics.forEach((key, metrics) -> {
throughput.put(key, metrics.getThroughput());
});
return throughput;
}
}

/**
* 消费者指标
*/
public static class ConsumerMetrics {
private String consumerId;
private String topic;
private int partition;
private long totalMessages;
private long totalProcessingTime;
private long lastOffset;
private long lastUpdateTime;
private final List<Long> recentProcessingTimes;
private final int maxRecentSize;

public ConsumerMetrics(String consumerId, String topic, int partition) {
this.consumerId = consumerId;
this.topic = topic;
this.partition = partition;
this.totalMessages = 0;
this.totalProcessingTime = 0;
this.lastOffset = 0;
this.lastUpdateTime = System.currentTimeMillis();
this.recentProcessingTimes = new ArrayList<>();
this.maxRecentSize = 100;
}

public void recordConsumption(long offset, long processingTime) {
this.totalMessages++;
this.totalProcessingTime += processingTime;
this.lastOffset = offset;
this.lastUpdateTime = System.currentTimeMillis();

// 记录最近的处理时间
recentProcessingTimes.add(processingTime);
if (recentProcessingTimes.size() > maxRecentSize) {
recentProcessingTimes.remove(0);
}
}

public double getThroughput() {
long timeWindow = System.currentTimeMillis() - lastUpdateTime;
if (timeWindow > 0) {
return (double) totalMessages / (timeWindow / 1000.0);
}
return 0;
}

public double getAverageProcessingTime() {
return totalMessages > 0 ? (double) totalProcessingTime / totalMessages : 0;
}

public double getP95ProcessingTime() {
if (recentProcessingTimes.isEmpty()) {
return 0;
}

List<Long> sortedTimes = new ArrayList<>(recentProcessingTimes);
sortedTimes.sort(Long::compareTo);

int index = (int) Math.ceil(sortedTimes.size() * 0.95) - 1;
return sortedTimes.get(Math.max(0, index));
}

public double getP99ProcessingTime() {
if (recentProcessingTimes.isEmpty()) {
return 0;
}

List<Long> sortedTimes = new ArrayList<>(recentProcessingTimes);
sortedTimes.sort(Long::compareTo);

int index = (int) Math.ceil(sortedTimes.size() * 0.99) - 1;
return sortedTimes.get(Math.max(0, index));
}

public boolean isPerformanceDegraded() {
// 性能降级判断逻辑
double avgProcessingTime = getAverageProcessingTime();
double p95ProcessingTime = getP95ProcessingTime();

return avgProcessingTime > 1000 || p95ProcessingTime > 5000; // 阈值可配置
}

public boolean isStale() {
return System.currentTimeMillis() - lastUpdateTime > 60000; // 1分钟无更新认为过期
}

// getters
public String getConsumerId() { return consumerId; }
public String getTopic() { return topic; }
public int getPartition() { return partition; }
public long getTotalMessages() { return totalMessages; }
public long getTotalProcessingTime() { return totalProcessingTime; }
public long getLastOffset() { return lastOffset; }
public long getLastUpdateTime() { return lastUpdateTime; }
}

/**
* 消费速率监控器
*/
public class ConsumptionRateMonitor {
private final Map<String, RateMetrics> rateMetrics;
private final ScheduledExecutorService scheduler;
private final AtomicBoolean running;

public ConsumptionRateMonitor() {
this.rateMetrics = new ConcurrentHashMap<>();
this.scheduler = Executors.newScheduledThreadPool(1);
this.running = new AtomicBoolean(false);
}

public void start() {
if (running.compareAndSet(false, true)) {
// 启动速率监控任务
scheduler.scheduleAtFixedRate(this::calculateRates, 0, 10, TimeUnit.SECONDS);
}
}

public void stop() {
if (running.compareAndSet(true, false)) {
scheduler.shutdown();
}
}

public void recordMessageConsumed(String topic, int partition, long offset) {
String key = topic + ":" + partition;
RateMetrics metrics = rateMetrics.computeIfAbsent(key, k -> new RateMetrics(topic, partition));
metrics.recordConsumption(offset);
}

private void calculateRates() {
rateMetrics.forEach((key, metrics) -> {
metrics.calculateRate();
});
}

public RateMetrics getRateMetrics(String topic, int partition) {
String key = topic + ":" + partition;
return rateMetrics.get(key);
}

public Map<String, Double> getAllConsumptionRates() {
Map<String, Double> rates = new HashMap<>();
rateMetrics.forEach((key, metrics) -> {
rates.put(key, metrics.getCurrentRate());
});
return rates;
}
}

/**
* 速率指标
*/
public static class RateMetrics {
private String topic;
private int partition;
private long lastOffset;
private long lastTimestamp;
private double currentRate;
private final List<Double> rateHistory;
private final int maxHistorySize;

public RateMetrics(String topic, int partition) {
this.topic = topic;
this.partition = partition;
this.lastOffset = 0;
this.lastTimestamp = System.currentTimeMillis();
this.currentRate = 0;
this.rateHistory = new ArrayList<>();
this.maxHistorySize = 60; // 保留60个数据点
}

public void recordConsumption(long offset) {
long currentTime = System.currentTimeMillis();

if (lastTimestamp > 0) {
long timeDiff = currentTime - lastTimestamp;
long offsetDiff = offset - lastOffset;

if (timeDiff > 0) {
double rate = (double) offsetDiff / (timeDiff / 1000.0);
currentRate = rate;

// 记录历史速率
rateHistory.add(rate);
if (rateHistory.size() > maxHistorySize) {
rateHistory.remove(0);
}
}
}

this.lastOffset = offset;
this.lastTimestamp = currentTime;
}

public void calculateRate() {
// 计算平均速率
if (!rateHistory.isEmpty()) {
double sum = rateHistory.stream().mapToDouble(Double::doubleValue).sum();
currentRate = sum / rateHistory.size();
}
}

public double getCurrentRate() {
return currentRate;
}

public double getAverageRate() {
return rateHistory.isEmpty() ? 0 :
rateHistory.stream().mapToDouble(Double::doubleValue).average().orElse(0);
}

public double getMaxRate() {
return rateHistory.isEmpty() ? 0 :
rateHistory.stream().mapToDouble(Double::doubleValue).max().orElse(0);
}

public double getMinRate() {
return rateHistory.isEmpty() ? 0 :
rateHistory.stream().mapToDouble(Double::doubleValue).min().orElse(0);
}

public boolean isRateDeclining() {
if (rateHistory.size() < 3) {
return false;
}

// 检查最近3个数据点是否呈下降趋势
int size = rateHistory.size();
double recent1 = rateHistory.get(size - 1);
double recent2 = rateHistory.get(size - 2);
double recent3 = rateHistory.get(size - 3);

return recent1 < recent2 && recent2 < recent3;
}

// getters
public String getTopic() { return topic; }
public int getPartition() { return partition; }
public long getLastOffset() { return lastOffset; }
public long getLastTimestamp() { return lastTimestamp; }
}

3. 积压处理策略

3.1 动态扩缩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
/**
* 动态扩缩容管理器
* 根据LAG情况自动调整消费者数量
*/
public class DynamicScalingManager {
private final LagMonitoringFramework.LagMonitor lagMonitor;
private final ConsumerPerformanceMonitor performanceMonitor;
private final ScalingPolicy scalingPolicy;
private final ScheduledExecutorService scheduler;
private final AtomicBoolean running;

public DynamicScalingManager(LagMonitoringFramework.LagMonitor lagMonitor,
ConsumerPerformanceMonitor performanceMonitor,
ScalingPolicy scalingPolicy) {
this.lagMonitor = lagMonitor;
this.performanceMonitor = performanceMonitor;
this.scalingPolicy = scalingPolicy;
this.scheduler = Executors.newScheduledThreadPool(2);
this.running = new AtomicBoolean(false);
}

public void start() {
if (running.compareAndSet(false, true)) {
// 启动扩缩容检查任务
scheduler.scheduleAtFixedRate(this::checkScalingNeeds, 0, 30, TimeUnit.SECONDS);

// 启动扩缩容执行任务
scheduler.scheduleAtFixedRate(this::executeScaling, 0, 60, TimeUnit.SECONDS);
}
}

public void stop() {
if (running.compareAndSet(true, false)) {
scheduler.shutdown();
}
}

private void checkScalingNeeds() {
List<LagMonitoringFramework.LagMetrics> allMetrics = lagMonitor.getAllLagMetrics();

for (LagMonitoringFramework.LagMetrics metrics : allMetrics) {
ScalingDecision decision = scalingPolicy.evaluateScaling(metrics);

if (decision.isScaleOut()) {
System.out.println("Scale out needed for " + metrics.getTopic() + ":" + metrics.getPartition() +
" - Lag: " + metrics.getLag());
} else if (decision.isScaleIn()) {
System.out.println("Scale in needed for " + metrics.getTopic() + ":" + metrics.getPartition() +
" - Lag: " + metrics.getLag());
}
}
}

private void executeScaling() {
// 执行扩缩容操作
System.out.println("Executing scaling operations...");
}
}

/**
* 扩缩容策略
*/
public static class ScalingPolicy {
private final long scaleOutThreshold;
private final long scaleInThreshold;
private final int maxConsumers;
private final int minConsumers;
private final double scaleOutFactor;
private final double scaleInFactor;

public ScalingPolicy(long scaleOutThreshold, long scaleInThreshold,
int maxConsumers, int minConsumers,
double scaleOutFactor, double scaleInFactor) {
this.scaleOutThreshold = scaleOutThreshold;
this.scaleInThreshold = scaleInThreshold;
this.maxConsumers = maxConsumers;
this.minConsumers = minConsumers;
this.scaleOutFactor = scaleOutFactor;
this.scaleInFactor = scaleInFactor;
}

public ScalingDecision evaluateScaling(LagMonitoringFramework.LagMetrics metrics) {
long lag = metrics.getLag();

if (lag > scaleOutThreshold) {
return new ScalingDecision(ScalingAction.SCALE_OUT, calculateScaleOutCount(metrics));
} else if (lag < scaleInThreshold) {
return new ScalingDecision(ScalingAction.SCALE_IN, calculateScaleInCount(metrics));
} else {
return new ScalingDecision(ScalingAction.NO_ACTION, 0);
}
}

private int calculateScaleOutCount(LagMonitoringFramework.LagMetrics metrics) {
// 根据LAG大小计算需要扩容的消费者数量
long lag = metrics.getLag();
int additionalConsumers = (int) Math.ceil(lag / 1000.0 * scaleOutFactor);
return Math.min(additionalConsumers, maxConsumers);
}

private int calculateScaleInCount(LagMonitoringFramework.LagMetrics metrics) {
// 根据LAG大小计算需要缩容的消费者数量
return Math.max(1, (int) Math.ceil(scaleInFactor));
}
}

/**
* 扩缩容决策
*/
public static class ScalingDecision {
private ScalingAction action;
private int consumerCount;

public ScalingDecision(ScalingAction action, int consumerCount) {
this.action = action;
this.consumerCount = consumerCount;
}

public boolean isScaleOut() {
return action == ScalingAction.SCALE_OUT;
}

public boolean isScaleIn() {
return action == ScalingAction.SCALE_IN;
}

public boolean isNoAction() {
return action == ScalingAction.NO_ACTION;
}

// getters
public ScalingAction getAction() { return action; }
public int getConsumerCount() { return consumerCount; }
}

/**
* 扩缩容动作
*/
public enum ScalingAction {
SCALE_OUT, SCALE_IN, NO_ACTION
}

/**
* 消费者管理器
*/
public class ConsumerManager {
private final Map<String, List<Consumer>> consumers;
private final ConsumerFactory consumerFactory;
private final AtomicBoolean running;

public ConsumerManager(ConsumerFactory consumerFactory) {
this.consumers = new ConcurrentHashMap<>();
this.consumerFactory = consumerFactory;
this.running = new AtomicBoolean(false);
}

public void start() {
running.set(true);
}

public void stop() {
running.set(false);
consumers.values().forEach(consumerList -> {
consumerList.forEach(Consumer::stop);
});
}

public void addConsumer(String topic, int partition) {
Consumer consumer = consumerFactory.createConsumer(topic, partition);
consumers.computeIfAbsent(topic, k -> new ArrayList<>()).add(consumer);

if (running.get()) {
consumer.start();
}
}

public void removeConsumer(String topic, int partition) {
List<Consumer> topicConsumers = consumers.get(topic);
if (topicConsumers != null) {
topicConsumers.removeIf(consumer -> consumer.getPartition() == partition);
}
}

public int getConsumerCount(String topic) {
List<Consumer> topicConsumers = consumers.get(topic);
return topicConsumers != null ? topicConsumers.size() : 0;
}

public List<Consumer> getConsumers(String topic) {
return consumers.getOrDefault(topic, new ArrayList<>());
}
}

/**
* 消费者接口
*/
public interface Consumer {
void start();
void stop();
String getTopic();
int getPartition();
boolean isRunning();
}

/**
* 消费者工厂
*/
public interface ConsumerFactory {
Consumer createConsumer(String topic, int partition);
}

3.2 批量处理优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/**
* 批量处理优化器
* 优化消息批量处理以提高消费效率
*/
public class BatchProcessingOptimizer {
private final Map<String, BatchConfig> batchConfigs;
private final Map<String, BatchProcessor> batchProcessors;
private final ScheduledExecutorService scheduler;
private final AtomicBoolean running;

public BatchProcessingOptimizer() {
this.batchConfigs = new ConcurrentHashMap<>();
this.batchProcessors = new ConcurrentHashMap<>();
this.scheduler = Executors.newScheduledThreadPool(2);
this.running = new AtomicBoolean(false);
}

public void start() {
if (running.compareAndSet(false, true)) {
// 启动批量处理任务
scheduler.scheduleAtFixedRate(this::processBatches, 0, 100, TimeUnit.MILLISECONDS);
}
}

public void stop() {
if (running.compareAndSet(true, false)) {
scheduler.shutdown();
}
}

public void configureBatch(String topic, BatchConfig config) {
batchConfigs.put(topic, config);
batchProcessors.put(topic, new BatchProcessor(config));
}

public void addMessage(String topic, Object message) {
BatchProcessor processor = batchProcessors.get(topic);
if (processor != null) {
processor.addMessage(message);
}
}

private void processBatches() {
batchProcessors.forEach((topic, processor) -> {
if (processor.shouldProcess()) {
processor.processBatch();
}
});
}

public BatchConfig getBatchConfig(String topic) {
return batchConfigs.get(topic);
}

public BatchProcessor getBatchProcessor(String topic) {
return batchProcessors.get(topic);
}
}

/**
* 批量配置
*/
public static class BatchConfig {
private int batchSize;
private long batchTimeout;
private int maxRetries;
private boolean enableCompression;

public BatchConfig(int batchSize, long batchTimeout) {
this.batchSize = batchSize;
this.batchTimeout = batchTimeout;
this.maxRetries = 3;
this.enableCompression = false;
}

// getters and setters
public int getBatchSize() { return batchSize; }
public void setBatchSize(int batchSize) { this.batchSize = batchSize; }
public long getBatchTimeout() { return batchTimeout; }
public void setBatchTimeout(long batchTimeout) { this.batchTimeout = batchTimeout; }
public int getMaxRetries() { return maxRetries; }
public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; }
public boolean isEnableCompression() { return enableCompression; }
public void setEnableCompression(boolean enableCompression) { this.enableCompression = enableCompression; }
}

/**
* 批量处理器
*/
public static class BatchProcessor {
private final BatchConfig config;
private final List<Object> batch;
private final long batchStartTime;
private final Object lock;

public BatchProcessor(BatchConfig config) {
this.config = config;
this.batch = new ArrayList<>();
this.batchStartTime = System.currentTimeMillis();
this.lock = new Object();
}

public void addMessage(Object message) {
synchronized (lock) {
batch.add(message);
}
}

public boolean shouldProcess() {
synchronized (lock) {
return batch.size() >= config.getBatchSize() ||
(System.currentTimeMillis() - batchStartTime) >= config.getBatchTimeout();
}
}

public void processBatch() {
List<Object> currentBatch;
synchronized (lock) {
if (batch.isEmpty()) {
return;
}
currentBatch = new ArrayList<>(batch);
batch.clear();
}

try {
// 处理批量消息
processMessages(currentBatch);
} catch (Exception e) {
System.err.println("Error processing batch: " + e.getMessage());
// 重试逻辑
retryBatch(currentBatch);
}
}

private void processMessages(List<Object> messages) {
// 批量处理消息的逻辑
System.out.println("Processing batch of " + messages.size() + " messages");

// 模拟处理时间
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void retryBatch(List<Object> messages) {
// 重试逻辑
System.out.println("Retrying batch of " + messages.size() + " messages");
}

public int getBatchSize() {
synchronized (lock) {
return batch.size();
}
}

public boolean isEmpty() {
synchronized (lock) {
return batch.isEmpty();
}
}
}

4. 性能调优策略

4.1 消费性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
/**
* 消费性能优化器
* 优化消费者性能以提高消费效率
*/
public class ConsumerPerformanceOptimizer {
private final Map<String, OptimizationConfig> optimizationConfigs;
private final Map<String, PerformanceMetrics> performanceMetrics;
private final ScheduledExecutorService scheduler;
private final AtomicBoolean running;

public ConsumerPerformanceOptimizer() {
this.optimizationConfigs = new ConcurrentHashMap<>();
this.performanceMetrics = new ConcurrentHashMap<>();
this.scheduler = Executors.newScheduledThreadPool(2);
this.running = new AtomicBoolean(false);
}

public void start() {
if (running.compareAndSet(false, true)) {
// 启动性能优化任务
scheduler.scheduleAtFixedRate(this::optimizePerformance, 0, 60, TimeUnit.SECONDS);
}
}

public void stop() {
if (running.compareAndSet(true, false)) {
scheduler.shutdown();
}
}

public void configureOptimization(String topic, OptimizationConfig config) {
optimizationConfigs.put(topic, config);
}

public void recordPerformance(String topic, long processingTime, long messageSize) {
PerformanceMetrics metrics = performanceMetrics.computeIfAbsent(topic, k -> new PerformanceMetrics());
metrics.recordProcessing(processingTime, messageSize);
}

private void optimizePerformance() {
performanceMetrics.forEach((topic, metrics) -> {
OptimizationConfig config = optimizationConfigs.get(topic);
if (config != null) {
OptimizationRecommendation recommendation = analyzePerformance(metrics, config);
if (recommendation.hasRecommendations()) {
applyOptimization(topic, recommendation);
}
}
});
}

private OptimizationRecommendation analyzePerformance(PerformanceMetrics metrics, OptimizationConfig config) {
OptimizationRecommendation recommendation = new OptimizationRecommendation();

// 分析处理时间
if (metrics.getAverageProcessingTime() > config.getMaxProcessingTime()) {
recommendation.addRecommendation("Increase batch size or optimize processing logic");
}

// 分析吞吐量
if (metrics.getThroughput() < config.getMinThroughput()) {
recommendation.addRecommendation("Increase consumer count or optimize network");
}

// 分析内存使用
if (metrics.getMemoryUsage() > config.getMaxMemoryUsage()) {
recommendation.addRecommendation("Reduce batch size or optimize memory usage");
}

return recommendation;
}

private void applyOptimization(String topic, OptimizationRecommendation recommendation) {
System.out.println("Applying optimization for " + topic + ": " + recommendation);
// 应用优化建议
}

public PerformanceMetrics getPerformanceMetrics(String topic) {
return performanceMetrics.get(topic);
}

public Map<String, PerformanceMetrics> getAllPerformanceMetrics() {
return new HashMap<>(performanceMetrics);
}
}

/**
* 优化配置
*/
public static class OptimizationConfig {
private long maxProcessingTime;
private double minThroughput;
private double maxMemoryUsage;
private int maxBatchSize;
private int minBatchSize;

public OptimizationConfig(long maxProcessingTime, double minThroughput, double maxMemoryUsage) {
this.maxProcessingTime = maxProcessingTime;
this.minThroughput = minThroughput;
this.maxMemoryUsage = maxMemoryUsage;
this.maxBatchSize = 1000;
this.minBatchSize = 10;
}

// getters and setters
public long getMaxProcessingTime() { return maxProcessingTime; }
public void setMaxProcessingTime(long maxProcessingTime) { this.maxProcessingTime = maxProcessingTime; }
public double getMinThroughput() { return minThroughput; }
public void setMinThroughput(double minThroughput) { this.minThroughput = minThroughput; }
public double getMaxMemoryUsage() { return maxMemoryUsage; }
public void setMaxMemoryUsage(double maxMemoryUsage) { this.maxMemoryUsage = maxMemoryUsage; }
public int getMaxBatchSize() { return maxBatchSize; }
public void setMaxBatchSize(int maxBatchSize) { this.maxBatchSize = maxBatchSize; }
public int getMinBatchSize() { return minBatchSize; }
public void setMinBatchSize(int minBatchSize) { this.minBatchSize = minBatchSize; }
}

/**
* 性能指标
*/
public static class PerformanceMetrics {
private long totalMessages;
private long totalProcessingTime;
private long totalMessageSize;
private long lastUpdateTime;
private final List<Long> recentProcessingTimes;
private final List<Long> recentMessageSizes;
private final int maxRecentSize;

public PerformanceMetrics() {
this.totalMessages = 0;
this.totalProcessingTime = 0;
this.totalMessageSize = 0;
this.lastUpdateTime = System.currentTimeMillis();
this.recentProcessingTimes = new ArrayList<>();
this.recentMessageSizes = new ArrayList<>();
this.maxRecentSize = 100;
}

public void recordProcessing(long processingTime, long messageSize) {
this.totalMessages++;
this.totalProcessingTime += processingTime;
this.totalMessageSize += messageSize;
this.lastUpdateTime = System.currentTimeMillis();

// 记录最近的数据
recentProcessingTimes.add(processingTime);
recentMessageSizes.add(messageSize);

if (recentProcessingTimes.size() > maxRecentSize) {
recentProcessingTimes.remove(0);
recentMessageSizes.remove(0);
}
}

public double getAverageProcessingTime() {
return totalMessages > 0 ? (double) totalProcessingTime / totalMessages : 0;
}

public double getThroughput() {
long timeWindow = System.currentTimeMillis() - lastUpdateTime;
if (timeWindow > 0) {
return (double) totalMessages / (timeWindow / 1000.0);
}
return 0;
}

public double getAverageMessageSize() {
return totalMessages > 0 ? (double) totalMessageSize / totalMessages : 0;
}

public double getMemoryUsage() {
// 计算内存使用率
Runtime runtime = Runtime.getRuntime();
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long usedMemory = totalMemory - freeMemory;

return (double) usedMemory / totalMemory;
}

public double getP95ProcessingTime() {
if (recentProcessingTimes.isEmpty()) {
return 0;
}

List<Long> sortedTimes = new ArrayList<>(recentProcessingTimes);
sortedTimes.sort(Long::compareTo);

int index = (int) Math.ceil(sortedTimes.size() * 0.95) - 1;
return sortedTimes.get(Math.max(0, index));
}

public double getP99ProcessingTime() {
if (recentProcessingTimes.isEmpty()) {
return 0;
}

List<Long> sortedTimes = new ArrayList<>(recentProcessingTimes);
sortedTimes.sort(Long::compareTo);

int index = (int) Math.ceil(sortedTimes.size() * 0.99) - 1;
return sortedTimes.get(Math.max(0, index));
}

// getters
public long getTotalMessages() { return totalMessages; }
public long getTotalProcessingTime() { return totalProcessingTime; }
public long getTotalMessageSize() { return totalMessageSize; }
public long getLastUpdateTime() { return lastUpdateTime; }
}

/**
* 优化建议
*/
public static class OptimizationRecommendation {
private final List<String> recommendations;

public OptimizationRecommendation() {
this.recommendations = new ArrayList<>();
}

public void addRecommendation(String recommendation) {
recommendations.add(recommendation);
}

public boolean hasRecommendations() {
return !recommendations.isEmpty();
}

public List<String> getRecommendations() {
return new ArrayList<>(recommendations);
}

@Override
public String toString() {
return String.join(", ", recommendations);
}
}

4.2 内存优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/**
* 内存优化器
* 优化消费者内存使用以提高性能
*/
public class MemoryOptimizer {
private final Map<String, MemoryConfig> memoryConfigs;
private final Map<String, MemoryMetrics> memoryMetrics;
private final ScheduledExecutorService scheduler;
private final AtomicBoolean running;

public MemoryOptimizer() {
this.memoryConfigs = new ConcurrentHashMap<>();
this.memoryMetrics = new ConcurrentHashMap<>();
this.scheduler = Executors.newScheduledThreadPool(1);
this.running = new AtomicBoolean(false);
}

public void start() {
if (running.compareAndSet(false, true)) {
// 启动内存监控任务
scheduler.scheduleAtFixedRate(this::monitorMemory, 0, 30, TimeUnit.SECONDS);

// 启动内存优化任务
scheduler.scheduleAtFixedRate(this::optimizeMemory, 0, 60, TimeUnit.SECONDS);
}
}

public void stop() {
if (running.compareAndSet(true, false)) {
scheduler.shutdown();
}
}

public void configureMemory(String topic, MemoryConfig config) {
memoryConfigs.put(topic, config);
}

private void monitorMemory() {
Runtime runtime = Runtime.getRuntime();
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long usedMemory = totalMemory - freeMemory;
long maxMemory = runtime.maxMemory();

double memoryUsage = (double) usedMemory / maxMemory;

System.out.println("Memory usage: " + String.format("%.2f%%", memoryUsage * 100) +
" (" + formatBytes(usedMemory) + " / " + formatBytes(maxMemory) + ")");

// 检查内存使用情况
if (memoryUsage > 0.8) {
System.out.println("High memory usage detected, triggering GC");
System.gc();
}
}

private void optimizeMemory() {
memoryConfigs.forEach((topic, config) -> {
MemoryMetrics metrics = memoryMetrics.get(topic);
if (metrics != null && metrics.isMemoryPressure()) {
applyMemoryOptimization(topic, config);
}
});
}

private void applyMemoryOptimization(String topic, MemoryConfig config) {
System.out.println("Applying memory optimization for " + topic);

// 减少批量大小
if (config.getBatchSize() > config.getMinBatchSize()) {
config.setBatchSize(config.getBatchSize() / 2);
}

// 清理缓存
System.gc();
}

private String formatBytes(long bytes) {
if (bytes < 1024) return bytes + " B";
int exp = (int) (Math.log(bytes) / Math.log(1024));
String pre = "KMGTPE".charAt(exp - 1) + "";
return String.format("%.1f %sB", bytes / Math.pow(1024, exp), pre);
}

public MemoryConfig getMemoryConfig(String topic) {
return memoryConfigs.get(topic);
}

public MemoryMetrics getMemoryMetrics(String topic) {
return memoryMetrics.get(topic);
}
}

/**
* 内存配置
*/
public static class MemoryConfig {
private int batchSize;
private int minBatchSize;
private long maxMemoryUsage;
private boolean enableCompression;
private boolean enableCaching;

public MemoryConfig(int batchSize, long maxMemoryUsage) {
this.batchSize = batchSize;
this.minBatchSize = Math.max(1, batchSize / 4);
this.maxMemoryUsage = maxMemoryUsage;
this.enableCompression = false;
this.enableCaching = true;
}

// getters and setters
public int getBatchSize() { return batchSize; }
public void setBatchSize(int batchSize) { this.batchSize = batchSize; }
public int getMinBatchSize() { return minBatchSize; }
public void setMinBatchSize(int minBatchSize) { this.minBatchSize = minBatchSize; }
public long getMaxMemoryUsage() { return maxMemoryUsage; }
public void setMaxMemoryUsage(long maxMemoryUsage) { this.maxMemoryUsage = maxMemoryUsage; }
public boolean isEnableCompression() { return enableCompression; }
public void setEnableCompression(boolean enableCompression) { this.enableCompression = enableCompression; }
public boolean isEnableCaching() { return enableCaching; }
public void setEnableCaching(boolean enableCaching) { this.enableCaching = enableCaching; }
}

/**
* 内存指标
*/
public static class MemoryMetrics {
private long totalMemory;
private long usedMemory;
private long freeMemory;
private long maxMemory;
private long timestamp;

public MemoryMetrics() {
updateMetrics();
}

public void updateMetrics() {
Runtime runtime = Runtime.getRuntime();
this.totalMemory = runtime.totalMemory();
this.freeMemory = runtime.freeMemory();
this.usedMemory = totalMemory - freeMemory;
this.maxMemory = runtime.maxMemory();
this.timestamp = System.currentTimeMillis();
}

public double getMemoryUsage() {
return maxMemory > 0 ? (double) usedMemory / maxMemory : 0;
}

public boolean isMemoryPressure() {
return getMemoryUsage() > 0.8;
}

public boolean isMemoryCritical() {
return getMemoryUsage() > 0.9;
}

// getters
public long getTotalMemory() { return totalMemory; }
public long getUsedMemory() { return usedMemory; }
public long getFreeMemory() { return freeMemory; }
public long getMaxMemory() { return maxMemory; }
public long getTimestamp() { return timestamp; }
}

5. 企业级LAG监控架构

5.1 企业级LAG监控管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
/**
* 企业级LAG监控管理器
* 集成LAG监控、性能优化、告警通知
*/
@Component
public class EnterpriseLagMonitoringManager {
private final LagMonitoringFramework.LagMonitor lagMonitor;
private final ConsumerPerformanceMonitor performanceMonitor;
private final ConsumptionRateMonitor rateMonitor;
private final DynamicScalingManager scalingManager;
private final BatchProcessingOptimizer batchOptimizer;
private final ConsumerPerformanceOptimizer performanceOptimizer;
private final MemoryOptimizer memoryOptimizer;
private final LagAlertManager alertManager;
private final MonitoringDashboard dashboard;
private final AtomicBoolean running;

public EnterpriseLagMonitoringManager() {
// 初始化各个组件
this.lagMonitor = new LagMonitoringFramework.LagMonitor(
new LagMonitoringFramework.LagAlertManager(),
new LagMonitoringFramework.LagAnalyzer(100)
);

this.performanceMonitor = new ConsumerPerformanceMonitor();
this.rateMonitor = new ConsumptionRateMonitor();

ScalingPolicy scalingPolicy = new ScalingPolicy(1000, 100, 10, 1, 1.5, 0.5);
this.scalingManager = new DynamicScalingManager(lagMonitor, performanceMonitor, scalingPolicy);

this.batchOptimizer = new BatchProcessingOptimizer();
this.performanceOptimizer = new ConsumerPerformanceOptimizer();
this.memoryOptimizer = new MemoryOptimizer();

this.alertManager = new LagAlertManager();
this.dashboard = new MonitoringDashboard();
this.running = new AtomicBoolean(false);

// 配置告警处理器
configureAlertHandlers();

// 配置优化策略
configureOptimizationStrategies();
}

private void configureAlertHandlers() {
// 添加邮件告警处理器
alertManager.addAlertHandler(new EmailAlertHandler(
new EmailService(),
new String[]{"admin@example.com", "ops@example.com"}
));

// 添加钉钉告警处理器
alertManager.addAlertHandler(new DingTalkAlertHandler(
new DingTalkService(),
"https://oapi.dingtalk.com/robot/send?access_token=xxx"
));
}

private void configureOptimizationStrategies() {
// 配置批量处理优化
batchOptimizer.configureBatch("user-events", new BatchConfig(100, 1000));
batchOptimizer.configureBatch("order-events", new BatchConfig(50, 500));

// 配置性能优化
performanceOptimizer.configureOptimization("user-events",
new OptimizationConfig(1000, 100, 0.8));
performanceOptimizer.configureOptimization("order-events",
new OptimizationConfig(500, 200, 0.7));

// 配置内存优化
memoryOptimizer.configureMemory("user-events",
new MemoryConfig(100, 1024 * 1024 * 1024)); // 1GB
memoryOptimizer.configureMemory("order-events",
new MemoryConfig(50, 512 * 1024 * 1024)); // 512MB
}

public void start() {
if (running.compareAndSet(false, true)) {
// 启动所有组件
lagMonitor.start();
performanceMonitor.start();
rateMonitor.start();
scalingManager.start();
batchOptimizer.start();
performanceOptimizer.start();
memoryOptimizer.start();
dashboard.start();

System.out.println("Enterprise LAG monitoring manager started");
}
}

public void stop() {
if (running.compareAndSet(true, false)) {
// 停止所有组件
lagMonitor.stop();
performanceMonitor.stop();
rateMonitor.stop();
scalingManager.stop();
batchOptimizer.stop();
performanceOptimizer.stop();
memoryOptimizer.stop();
dashboard.stop();

System.out.println("Enterprise LAG monitoring manager stopped");
}
}

/**
* 更新LAG指标
*/
public void updateLagMetrics(String topic, int partition, long producerOffset, long consumerOffset, String consumerGroup) {
lagMonitor.updateLagMetrics(topic, partition, producerOffset, consumerOffset, consumerGroup);
}

/**
* 记录消费性能
*/
public void recordConsumptionPerformance(String consumerId, String topic, int partition, long offset, long processingTime) {
performanceMonitor.recordConsumption(consumerId, topic, partition, offset, processingTime);
}

/**
* 记录消费速率
*/
public void recordConsumptionRate(String topic, int partition, long offset) {
rateMonitor.recordMessageConsumed(topic, partition, offset);
}

/**
* 获取监控状态
*/
public MonitoringStatus getMonitoringStatus() {
MonitoringStatus status = new MonitoringStatus();

// 收集LAG状态
status.setLagSummary(lagMonitor.getTopicLagSummary());
status.setAllLagMetrics(lagMonitor.getAllLagMetrics());

// 收集性能状态
status.setPerformanceMetrics(performanceMonitor.getAllConsumerMetrics());
status.setThroughputMetrics(rateMonitor.getAllConsumptionRates());

// 收集优化状态
status.setBatchConfigs(batchOptimizer.getAllBatchConfigs());
status.setPerformanceConfigs(performanceOptimizer.getAllPerformanceMetrics());
status.setMemoryConfigs(memoryOptimizer.getAllMemoryConfigs());

// 收集告警状态
status.setAlertStatus(alertManager.getAlertStatus());

return status;
}

/**
* 获取LAG报告
*/
public LagReport generateLagReport() {
LagReport report = new LagReport();

// 生成LAG摘要
Map<String, Long> lagSummary = lagMonitor.getTopicLagSummary();
report.setLagSummary(lagSummary);

// 生成性能摘要
Map<String, Double> throughputSummary = rateMonitor.getAllConsumptionRates();
report.setThroughputSummary(throughputSummary);

// 生成趋势分析
Map<String, LagMonitoringFramework.LagTrend> trendAnalysis =
lagMonitor.getAnalyzer().getTrendAnalysis();
report.setTrendAnalysis(trendAnalysis);

// 生成优化建议
List<String> optimizationRecommendations = generateOptimizationRecommendations();
report.setOptimizationRecommendations(optimizationRecommendations);

return report;
}

private List<String> generateOptimizationRecommendations() {
List<String> recommendations = new ArrayList<>();

// 基于LAG情况生成建议
Map<String, Long> lagSummary = lagMonitor.getTopicLagSummary();
lagSummary.forEach((topic, lag) -> {
if (lag > 10000) {
recommendations.add("Topic " + topic + " has high LAG (" + lag + "), consider scaling out consumers");
}
});

// 基于性能情况生成建议
Map<String, Double> throughputSummary = rateMonitor.getAllConsumptionRates();
throughputSummary.forEach((topic, throughput) -> {
if (throughput < 10) {
recommendations.add("Topic " + topic + " has low throughput (" + throughput + "), consider optimizing processing logic");
}
});

return recommendations;
}

/**
* 配置监控参数
*/
public void configureMonitoring(MonitoringConfig config) {
// 应用监控配置
System.out.println("Applying monitoring configuration: " + config);
}

/**
* 手动触发优化
*/
public void triggerOptimization(String topic) {
System.out.println("Manually triggering optimization for topic: " + topic);

// 触发批量处理优化
BatchProcessor processor = batchOptimizer.getBatchProcessor(topic);
if (processor != null) {
processor.processBatch();
}

// 触发性能优化
PerformanceMetrics metrics = performanceOptimizer.getPerformanceMetrics(topic);
if (metrics != null) {
OptimizationConfig config = performanceOptimizer.getOptimizationConfig(topic);
if (config != null) {
OptimizationRecommendation recommendation =
performanceOptimizer.analyzePerformance(metrics, config);
if (recommendation.hasRecommendations()) {
performanceOptimizer.applyOptimization(topic, recommendation);
}
}
}
}
}

/**
* 监控状态
*/
public static class MonitoringStatus {
private Map<String, Long> lagSummary;
private List<LagMonitoringFramework.LagMetrics> allLagMetrics;
private List<ConsumerMetrics> performanceMetrics;
private Map<String, Double> throughputMetrics;
private Map<String, BatchConfig> batchConfigs;
private Map<String, PerformanceMetrics> performanceConfigs;
private Map<String, MemoryConfig> memoryConfigs;
private AlertStatus alertStatus;

// getters and setters
public Map<String, Long> getLagSummary() { return lagSummary; }
public void setLagSummary(Map<String, Long> lagSummary) { this.lagSummary = lagSummary; }
public List<LagMonitoringFramework.LagMetrics> getAllLagMetrics() { return allLagMetrics; }
public void setAllLagMetrics(List<LagMonitoringFramework.LagMetrics> allLagMetrics) { this.allLagMetrics = allLagMetrics; }
public List<ConsumerMetrics> getPerformanceMetrics() { return performanceMetrics; }
public void setPerformanceMetrics(List<ConsumerMetrics> performanceMetrics) { this.performanceMetrics = performanceMetrics; }
public Map<String, Double> getThroughputMetrics() { return throughputMetrics; }
public void setThroughputMetrics(Map<String, Double> throughputMetrics) { this.throughputMetrics = throughputMetrics; }
public Map<String, BatchConfig> getBatchConfigs() { return batchConfigs; }
public void setBatchConfigs(Map<String, BatchConfig> batchConfigs) { this.batchConfigs = batchConfigs; }
public Map<String, PerformanceMetrics> getPerformanceConfigs() { return performanceConfigs; }
public void setPerformanceConfigs(Map<String, PerformanceMetrics> performanceConfigs) { this.performanceConfigs = performanceConfigs; }
public Map<String, MemoryConfig> getMemoryConfigs() { return memoryConfigs; }
public void setMemoryConfigs(Map<String, MemoryConfig> memoryConfigs) { this.memoryConfigs = memoryConfigs; }
public AlertStatus getAlertStatus() { return alertStatus; }
public void setAlertStatus(AlertStatus alertStatus) { this.alertStatus = alertStatus; }
}

/**
* LAG报告
*/
public static class LagReport {
private Map<String, Long> lagSummary;
private Map<String, Double> throughputSummary;
private Map<String, LagMonitoringFramework.LagTrend> trendAnalysis;
private List<String> optimizationRecommendations;
private long reportTimestamp;

public LagReport() {
this.reportTimestamp = System.currentTimeMillis();
}

// getters and setters
public Map<String, Long> getLagSummary() { return lagSummary; }
public void setLagSummary(Map<String, Long> lagSummary) { this.lagSummary = lagSummary; }
public Map<String, Double> getThroughputSummary() { return throughputSummary; }
public void setThroughputSummary(Map<String, Double> throughputSummary) { this.throughputSummary = throughputSummary; }
public Map<String, LagMonitoringFramework.LagTrend> getTrendAnalysis() { return trendAnalysis; }
public void setTrendAnalysis(Map<String, LagMonitoringFramework.LagTrend> trendAnalysis) { this.trendAnalysis = trendAnalysis; }
public List<String> getOptimizationRecommendations() { return optimizationRecommendations; }
public void setOptimizationRecommendations(List<String> optimizationRecommendations) { this.optimizationRecommendations = optimizationRecommendations; }
public long getReportTimestamp() { return reportTimestamp; }
}

/**
* 监控配置
*/
public static class MonitoringConfig {
private long lagCheckInterval;
private long performanceCheckInterval;
private long alertCooldown;
private boolean enableAutoScaling;
private boolean enablePerformanceOptimization;
private boolean enableMemoryOptimization;

public MonitoringConfig() {
this.lagCheckInterval = 10000; // 10秒
this.performanceCheckInterval = 30000; // 30秒
this.alertCooldown = 300000; // 5分钟
this.enableAutoScaling = true;
this.enablePerformanceOptimization = true;
this.enableMemoryOptimization = true;
}

// getters and setters
public long getLagCheckInterval() { return lagCheckInterval; }
public void setLagCheckInterval(long lagCheckInterval) { this.lagCheckInterval = lagCheckInterval; }
public long getPerformanceCheckInterval() { return performanceCheckInterval; }
public void setPerformanceCheckInterval(long performanceCheckInterval) { this.performanceCheckInterval = performanceCheckInterval; }
public long getAlertCooldown() { return alertCooldown; }
public void setAlertCooldown(long alertCooldown) { this.alertCooldown = alertCooldown; }
public boolean isEnableAutoScaling() { return enableAutoScaling; }
public void setEnableAutoScaling(boolean enableAutoScaling) { this.enableAutoScaling = enableAutoScaling; }
public boolean isEnablePerformanceOptimization() { return enablePerformanceOptimization; }
public void setEnablePerformanceOptimization(boolean enablePerformanceOptimization) { this.enablePerformanceOptimization = enablePerformanceOptimization; }
public boolean isEnableMemoryOptimization() { return enableMemoryOptimization; }
public void setEnableMemoryOptimization(boolean enableMemoryOptimization) { this.enableMemoryOptimization = enableMemoryOptimization; }
}

/**
* 告警状态
*/
public static class AlertStatus {
private int totalAlerts;
private int warningAlerts;
private int criticalAlerts;
private long lastAlertTime;

// getters and setters
public int getTotalAlerts() { return totalAlerts; }
public void setTotalAlerts(int totalAlerts) { this.totalAlerts = totalAlerts; }
public int getWarningAlerts() { return warningAlerts; }
public void setWarningAlerts(int warningAlerts) { this.warningAlerts = warningAlerts; }
public int getCriticalAlerts() { return criticalAlerts; }
public void setCriticalAlerts(int criticalAlerts) { this.criticalAlerts = criticalAlerts; }
public long getLastAlertTime() { return lastAlertTime; }
public void setLastAlertTime(long lastAlertTime) { this.lastAlertTime = lastAlertTime; }
}

/**
* 监控仪表板
*/
public static class MonitoringDashboard {
private final AtomicBoolean running;

public MonitoringDashboard() {
this.running = new AtomicBoolean(false);
}

public void start() {
running.set(true);
System.out.println("Monitoring dashboard started");
}

public void stop() {
running.set(false);
System.out.println("Monitoring dashboard stopped");
}

public boolean isRunning() {
return running.get();
}
}

6. 总结

本文深入探讨了队列消费堆积LAG监控的架构师级别技术,涵盖了LAG监控框架、消费性能监控、积压处理策略、性能调优方案,以及企业级LAG监控架构的最佳实践。

关键技术要点:

  1. LAG监控框架

    • LAG指标计算、监控器、告警管理器
    • LAG分析器、趋势分析、告警处理
    • 邮件告警、钉钉告警、告警冷却
  2. 消费性能监控

    • 消费者性能监控、消费速率监控
    • 性能指标、处理时间、吞吐量
    • P95/P99延迟、性能降级检测
  3. 积压处理策略

    • 动态扩缩容、扩缩容策略、消费者管理
    • 批量处理优化、批量配置、批量处理器
    • 自动扩缩容、手动扩缩容
  4. 性能调优策略

    • 消费性能优化、优化配置、性能指标
    • 内存优化、内存配置、内存监控
    • 优化建议、自动优化、手动优化
  5. 企业级架构

    • 统一LAG监控管理、监控状态、LAG报告
    • 监控配置、告警状态、监控仪表板
    • 集成多种监控和优化组件

架构设计原则:

  • 实时监控:通过LAG监控实时掌握队列消费状态
  • 自动优化:通过动态扩缩容、性能优化自动处理积压
  • 智能告警:通过多级告警、告警冷却避免告警风暴
  • 性能调优:通过批量处理、内存优化提升消费性能
  • 可扩展性:支持水平扩展和垂直扩展

作为架构师,我们需要深入理解LAG监控的原理和实现,掌握各种优化策略的特点,并能够根据业务需求选择最合适的监控和优化方案。通过本文的实战案例,我们可以更好地理解LAG监控在企业级应用中的重要作用。

队列消费堆积LAG监控是现代消息队列系统运维的核心技术,它通过实时监控、自动优化、智能告警等手段保证系统的稳定运行。只有深入理解LAG监控技术的本质,才能设计出真正优秀的消息队列监控和优化解决方案。