1. TB级日志处理架构概述

在大数据时代,企业每天产生TB级的日志数据,如何高效处理这些海量日志成为技术挑战。本文将详细介绍如何设计、实现和优化支撑TB级日志的Java日志处理架构,包括日志采集、存储、分析、检索等完整解决方案。

1.1 核心挑战

  1. 海量数据: 每天TB级的日志数据量
  2. 实时处理: 需要实时分析和处理日志
  3. 存储优化: 高效存储和压缩日志数据
  4. 检索性能: 快速检索和查询日志
  5. 系统扩展: 支持日志量的水平扩展

1.2 技术架构

1
2
3
4
5
应用日志 → 日志采集 → 消息队列 → 日志处理 → 存储引擎
↓ ↓ ↓ ↓ ↓
多源日志 → Flume/Filebeat → Kafka → Storm/Spark → Elasticsearch
↓ ↓ ↓ ↓ ↓
系统日志 → 日志解析 → 数据清洗 → 实时分析 → 可视化展示

2. 日志采集架构

2.1 Maven依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Spring Boot Data Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!-- Elasticsearch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

<!-- Logback -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>

<!-- Log4j2 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.19.0</version>
</dependency>

<!-- Apache Commons Compress -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>

<!-- MyBatis Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.2</version>
</dependency>
</dependencies>

2.2 应用配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# application.yml
server:
port: 8080

spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
batch-size: 16384
buffer-memory: 33554432
compression-type: gzip
consumer:
group-id: log-processing-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
max-poll-records: 1000

elasticsearch:
uris: http://localhost:9200
connection-timeout: 5000
read-timeout: 30000

redis:
host: localhost
port: 6379
database: 0
timeout: 5000

# 日志处理配置
log-processing:
batch-size: 1000
flush-interval: 5000
compression-enabled: true
retention-days: 30
index-prefix: "logs"

3. 日志采集服务

3.1 日志实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 日志实体类
* 定义日志的基本结构
*/
@Data
public class LogEntry {

private String id; // 日志ID

private String timestamp; // 时间戳

private String level; // 日志级别

private String logger; // 日志记录器

private String message; // 日志消息

private String thread; // 线程名

private String application; // 应用名称

private String host; // 主机名

private String ip; // IP地址

private String traceId; // 链路追踪ID

private String spanId; // 跨度ID

private Map<String, Object> tags; // 标签

private Map<String, Object> fields; // 字段

private String rawMessage; // 原始消息

private String source; // 日志源
}

/**
* 日志统计实体类
* 用于日志统计分析
*/
@Data
public class LogStatistics {

private String application; // 应用名称

private String level; // 日志级别

private Long count; // 数量

private Date startTime; // 开始时间

private Date endTime; // 结束时间

private String host; // 主机名

private Map<String, Long> levelCount; // 各级别数量统计
}

3.2 日志采集服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
/**
* 日志采集服务
* 负责采集和处理各种来源的日志
*/
@Service
public class LogCollectionService {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private LogProcessingService logProcessingService;

/**
* 采集应用日志
* 从应用程序中采集日志
*/
public void collectApplicationLog(LogEntry logEntry) {
try {
// 1. 设置日志基本信息
enrichLogEntry(logEntry);

// 2. 验证日志格式
if (!validateLogEntry(logEntry)) {
log.warn("日志格式验证失败: {}", logEntry.getMessage());
return;
}

// 3. 发送到Kafka
sendToKafka(logEntry);

// 4. 更新统计信息
updateLogStatistics(logEntry);

log.debug("采集应用日志: application={}, level={}",
logEntry.getApplication(), logEntry.getLevel());

} catch (Exception e) {
log.error("采集应用日志失败: {}", e.getMessage(), e);
}
}

/**
* 采集系统日志
* 从系统文件中采集日志
*/
@Scheduled(fixedRate = 10000) // 每10秒执行一次
public void collectSystemLogs() {
try {
// 1. 读取系统日志文件
List<String> logLines = readSystemLogFiles();

// 2. 解析日志行
for (String logLine : logLines) {
LogEntry logEntry = parseSystemLogLine(logLine);
if (logEntry != null) {
collectApplicationLog(logEntry);
}
}

} catch (Exception e) {
log.error("采集系统日志失败: {}", e.getMessage(), e);
}
}

/**
* 批量采集日志
* 批量处理日志提高性能
*/
public void batchCollectLogs(List<LogEntry> logEntries) {
if (logEntries.isEmpty()) {
return;
}

try {
// 1. 批量验证日志
List<LogEntry> validLogs = logEntries.stream()
.filter(this::validateLogEntry)
.collect(Collectors.toList());

// 2. 批量发送到Kafka
batchSendToKafka(validLogs);

// 3. 批量更新统计
batchUpdateStatistics(validLogs);

log.info("批量采集日志: total={}, valid={}",
logEntries.size(), validLogs.size());

} catch (Exception e) {
log.error("批量采集日志失败: {}", e.getMessage(), e);
}
}

/**
* 丰富日志条目
*/
private void enrichLogEntry(LogEntry logEntry) {
// 设置默认值
if (StringUtils.isEmpty(logEntry.getId())) {
logEntry.setId(UUID.randomUUID().toString());
}

if (StringUtils.isEmpty(logEntry.getTimestamp())) {
logEntry.setTimestamp(Instant.now().toString());
}

if (StringUtils.isEmpty(logEntry.getHost())) {
logEntry.setHost(getHostName());
}

if (StringUtils.isEmpty(logEntry.getIp())) {
logEntry.setIp(getLocalIpAddress());
}

if (StringUtils.isEmpty(logEntry.getApplication())) {
logEntry.setApplication(getApplicationName());
}
}

/**
* 验证日志条目
*/
private boolean validateLogEntry(LogEntry logEntry) {
// 检查必要字段
if (StringUtils.isEmpty(logEntry.getMessage()) ||
StringUtils.isEmpty(logEntry.getLevel()) ||
StringUtils.isEmpty(logEntry.getTimestamp())) {
return false;
}

// 检查日志级别
if (!isValidLogLevel(logEntry.getLevel())) {
return false;
}

return true;
}

/**
* 检查日志级别是否有效
*/
private boolean isValidLogLevel(String level) {
return Arrays.asList("TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL")
.contains(level.toUpperCase());
}

/**
* 发送日志到Kafka
*/
private void sendToKafka(LogEntry logEntry) {
String topic = "logs-" + logEntry.getApplication().toLowerCase();
String key = logEntry.getId();
String value = JSON.toJSONString(logEntry);

kafkaTemplate.send(topic, key, value);
}

/**
* 批量发送日志到Kafka
*/
private void batchSendToKafka(List<LogEntry> logEntries) {
Map<String, List<LogEntry>> groupedLogs = logEntries.stream()
.collect(Collectors.groupingBy(LogEntry::getApplication));

for (Map.Entry<String, List<LogEntry>> entry : groupedLogs.entrySet()) {
String topic = "logs-" + entry.getKey().toLowerCase();
List<ProducerRecord<String, String>> records = entry.getValue().stream()
.map(log -> new ProducerRecord<>(topic, log.getId(), JSON.toJSONString(log)))
.collect(Collectors.toList());

// 批量发送
for (ProducerRecord<String, String> record : records) {
kafkaTemplate.send(record);
}
}
}

/**
* 更新日志统计
*/
private void updateLogStatistics(LogEntry logEntry) {
String key = "log:stats:" + logEntry.getApplication() + ":" + logEntry.getLevel();
redisTemplate.opsForValue().increment(key);

// 设置过期时间
redisTemplate.expire(key, Duration.ofHours(24));
}

/**
* 批量更新统计
*/
private void batchUpdateStatistics(List<LogEntry> logEntries) {
Map<String, Long> stats = new HashMap<>();

for (LogEntry logEntry : logEntries) {
String key = logEntry.getApplication() + ":" + logEntry.getLevel();
stats.put(key, stats.getOrDefault(key, 0L) + 1);
}

// 批量更新Redis
for (Map.Entry<String, Long> entry : stats.entrySet()) {
String redisKey = "log:stats:" + entry.getKey();
redisTemplate.opsForValue().increment(redisKey, entry.getValue());
redisTemplate.expire(redisKey, Duration.ofHours(24));
}
}

/**
* 读取系统日志文件
*/
private List<String> readSystemLogFiles() {
List<String> logLines = new ArrayList<>();

// 读取常见的系统日志文件
String[] logFiles = {
"/var/log/syslog",
"/var/log/messages",
"/var/log/kern.log"
};

for (String logFile : logFiles) {
try {
Path path = Paths.get(logFile);
if (Files.exists(path)) {
List<String> lines = Files.readAllLines(path, StandardCharsets.UTF_8);
logLines.addAll(lines);
}
} catch (IOException e) {
log.warn("读取日志文件失败: {}", logFile);
}
}

return logLines;
}

/**
* 解析系统日志行
*/
private LogEntry parseSystemLogLine(String logLine) {
try {
// 简单的日志解析,实际应该使用更复杂的解析器
String[] parts = logLine.split("\\s+", 5);
if (parts.length >= 5) {
LogEntry logEntry = new LogEntry();
logEntry.setTimestamp(parts[0] + " " + parts[1] + " " + parts[2]);
logEntry.setHost(parts[3]);
logEntry.setMessage(parts[4]);
logEntry.setLevel("INFO");
logEntry.setSource("SYSTEM");
return logEntry;
}
} catch (Exception e) {
log.warn("解析系统日志行失败: {}", logLine);
}

return null;
}

/**
* 获取主机名
*/
private String getHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
return "unknown";
}
}

/**
* 获取本地IP地址
*/
private String getLocalIpAddress() {
try {
return InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
return "127.0.0.1";
}
}

/**
* 获取应用名称
*/
private String getApplicationName() {
return System.getProperty("spring.application.name", "unknown");
}
}

4. 日志处理服务

4.1 日志处理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
/**
* 日志处理服务
* 负责处理和分析日志数据
*/
@Service
public class LogProcessingService {

@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private LogAnalysisService logAnalysisService;

/**
* 处理日志数据
* 从Kafka消费日志并进行处理
*/
@KafkaListener(topics = "logs-.*", groupId = "log-processing-group")
public void processLogData(String logData) {
try {
// 1. 解析日志数据
LogEntry logEntry = JSON.parseObject(logData, LogEntry.class);

// 2. 数据清洗和标准化
LogEntry cleanedLog = cleanAndStandardizeLog(logEntry);

// 3. 存储到Elasticsearch
saveToElasticsearch(cleanedLog);

// 4. 实时分析
performRealTimeAnalysis(cleanedLog);

// 5. 更新缓存
updateLogCache(cleanedLog);

log.debug("处理日志数据: id={}, level={}",
cleanedLog.getId(), cleanedLog.getLevel());

} catch (Exception e) {
log.error("处理日志数据失败: {}", e.getMessage(), e);
}
}

/**
* 批量处理日志数据
*/
@KafkaListener(topics = "logs-.*", groupId = "log-processing-group")
public void batchProcessLogData(List<String> logDataList) {
if (logDataList.isEmpty()) {
return;
}

try {
// 1. 批量解析日志数据
List<LogEntry> logEntries = logDataList.stream()
.map(data -> JSON.parseObject(data, LogEntry.class))
.collect(Collectors.toList());

// 2. 批量清洗和标准化
List<LogEntry> cleanedLogs = logEntries.stream()
.map(this::cleanAndStandardizeLog)
.collect(Collectors.toList());

// 3. 批量存储到Elasticsearch
batchSaveToElasticsearch(cleanedLogs);

// 4. 批量实时分析
batchPerformRealTimeAnalysis(cleanedLogs);

log.info("批量处理日志数据: count={}", cleanedLogs.size());

} catch (Exception e) {
log.error("批量处理日志数据失败: {}", e.getMessage(), e);
}
}

/**
* 清洗和标准化日志
*/
private LogEntry cleanAndStandardizeLog(LogEntry logEntry) {
// 1. 清理消息内容
if (logEntry.getMessage() != null) {
logEntry.setMessage(logEntry.getMessage().trim());
}

// 2. 标准化时间戳
if (logEntry.getTimestamp() != null) {
logEntry.setTimestamp(standardizeTimestamp(logEntry.getTimestamp()));
}

// 3. 标准化日志级别
if (logEntry.getLevel() != null) {
logEntry.setLevel(logEntry.getLevel().toUpperCase());
}

// 4. 提取结构化信息
extractStructuredInfo(logEntry);

// 5. 设置处理时间
logEntry.setProcessTime(Instant.now().toString());

return logEntry;
}

/**
* 标准化时间戳
*/
private String standardizeTimestamp(String timestamp) {
try {
// 尝试解析各种时间格式
DateTimeFormatter[] formatters = {
DateTimeFormatter.ISO_INSTANT,
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"),
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
};

for (DateTimeFormatter formatter : formatters) {
try {
LocalDateTime dateTime = LocalDateTime.parse(timestamp, formatter);
return dateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
} catch (DateTimeParseException e) {
// 继续尝试下一个格式
}
}

// 如果都解析失败,使用当前时间
return Instant.now().toString();

} catch (Exception e) {
return Instant.now().toString();
}
}

/**
* 提取结构化信息
*/
private void extractStructuredInfo(LogEntry logEntry) {
String message = logEntry.getMessage();
if (message == null) {
return;
}

// 提取IP地址
Pattern ipPattern = Pattern.compile("\\b(?:[0-9]{1,3}\\.){3}[0-9]{1,3}\\b");
Matcher ipMatcher = ipPattern.matcher(message);
if (ipMatcher.find()) {
logEntry.setIp(ipMatcher.group());
}

// 提取URL
Pattern urlPattern = Pattern.compile("https?://[\\w\\-._~:/?#\\[\\]@!$&'()*+,;=]+");
Matcher urlMatcher = urlPattern.matcher(message);
if (urlMatcher.find()) {
if (logEntry.getFields() == null) {
logEntry.setFields(new HashMap<>());
}
logEntry.getFields().put("url", urlMatcher.group());
}

// 提取错误码
Pattern errorCodePattern = Pattern.compile("\\b(\\d{3,4})\\b");
Matcher errorCodeMatcher = errorCodePattern.matcher(message);
if (errorCodeMatcher.find()) {
if (logEntry.getFields() == null) {
logEntry.setFields(new HashMap<>());
}
logEntry.getFields().put("errorCode", errorCodeMatcher.group());
}
}

/**
* 保存到Elasticsearch
*/
private void saveToElasticsearch(LogEntry logEntry) {
try {
// 创建索引名称(按日期分片)
String indexName = createIndexName(logEntry.getTimestamp());

// 保存文档
elasticsearchTemplate.save(logEntry, indexName);

} catch (Exception e) {
log.error("保存到Elasticsearch失败: {}", e.getMessage(), e);
}
}

/**
* 批量保存到Elasticsearch
*/
private void batchSaveToElasticsearch(List<LogEntry> logEntries) {
try {
// 按日期分组
Map<String, List<LogEntry>> groupedLogs = logEntries.stream()
.collect(Collectors.groupingBy(log -> createIndexName(log.getTimestamp())));

// 批量保存
for (Map.Entry<String, List<LogEntry>> entry : groupedLogs.entrySet()) {
String indexName = entry.getKey();
List<LogEntry> logs = entry.getValue();

// 批量保存
elasticsearchTemplate.save(logs, indexName);
}

} catch (Exception e) {
log.error("批量保存到Elasticsearch失败: {}", e.getMessage(), e);
}
}

/**
* 创建索引名称
*/
private String createIndexName(String timestamp) {
try {
LocalDateTime dateTime = LocalDateTime.parse(timestamp, DateTimeFormatter.ISO_LOCAL_DATE_TIME);
return "logs-" + dateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
} catch (Exception e) {
return "logs-" + LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
}
}

/**
* 实时分析
*/
private void performRealTimeAnalysis(LogEntry logEntry) {
// 1. 错误日志分析
if ("ERROR".equals(logEntry.getLevel()) || "FATAL".equals(logEntry.getLevel())) {
logAnalysisService.analyzeErrorLog(logEntry);
}

// 2. 性能日志分析
if (logEntry.getMessage().contains("performance") ||
logEntry.getMessage().contains("duration")) {
logAnalysisService.analyzePerformanceLog(logEntry);
}

// 3. 安全日志分析
if (logEntry.getMessage().contains("security") ||
logEntry.getMessage().contains("attack")) {
logAnalysisService.analyzeSecurityLog(logEntry);
}
}

/**
* 批量实时分析
*/
private void batchPerformRealTimeAnalysis(List<LogEntry> logEntries) {
// 按类型分组分析
Map<String, List<LogEntry>> groupedLogs = logEntries.stream()
.collect(Collectors.groupingBy(log -> {
if ("ERROR".equals(log.getLevel()) || "FATAL".equals(log.getLevel())) {
return "ERROR";
} else if (log.getMessage().contains("performance")) {
return "PERFORMANCE";
} else if (log.getMessage().contains("security")) {
return "SECURITY";
} else {
return "INFO";
}
}));

// 批量分析
for (Map.Entry<String, List<LogEntry>> entry : groupedLogs.entrySet()) {
String type = entry.getKey();
List<LogEntry> logs = entry.getValue();

switch (type) {
case "ERROR":
logAnalysisService.batchAnalyzeErrorLogs(logs);
break;
case "PERFORMANCE":
logAnalysisService.batchAnalyzePerformanceLogs(logs);
break;
case "SECURITY":
logAnalysisService.batchAnalyzeSecurityLogs(logs);
break;
}
}
}

/**
* 更新日志缓存
*/
private void updateLogCache(LogEntry logEntry) {
try {
// 缓存最近的日志
String cacheKey = "recent:logs:" + logEntry.getApplication();
redisTemplate.opsForList().leftPush(cacheKey, JSON.toJSONString(logEntry));

// 限制缓存大小
redisTemplate.opsForList().trim(cacheKey, 0, 99);

// 设置过期时间
redisTemplate.expire(cacheKey, Duration.ofHours(1));

} catch (Exception e) {
log.warn("更新日志缓存失败: {}", e.getMessage());
}
}
}

5. 日志分析服务

5.1 日志分析服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
/**
* 日志分析服务
* 提供各种日志分析功能
*/
@Service
public class LogAnalysisService {

@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private AlertService alertService;

/**
* 分析错误日志
*/
public void analyzeErrorLog(LogEntry logEntry) {
try {
// 1. 提取错误信息
ErrorInfo errorInfo = extractErrorInfo(logEntry);

// 2. 检查是否是新错误
if (isNewError(errorInfo)) {
// 3. 发送告警
alertService.sendErrorAlert(errorInfo);

// 4. 记录错误统计
recordErrorStatistics(errorInfo);
}

// 5. 更新错误趋势
updateErrorTrend(errorInfo);

} catch (Exception e) {
log.error("分析错误日志失败: {}", e.getMessage(), e);
}
}

/**
* 分析性能日志
*/
public void analyzePerformanceLog(LogEntry logEntry) {
try {
// 1. 提取性能指标
PerformanceMetrics metrics = extractPerformanceMetrics(logEntry);

// 2. 检查性能阈值
if (isPerformanceThresholdExceeded(metrics)) {
// 3. 发送性能告警
alertService.sendPerformanceAlert(metrics);
}

// 4. 更新性能统计
updatePerformanceStatistics(metrics);

} catch (Exception e) {
log.error("分析性能日志失败: {}", e.getMessage(), e);
}
}

/**
* 分析安全日志
*/
public void analyzeSecurityLog(LogEntry logEntry) {
try {
// 1. 检测安全威胁
SecurityThreat threat = detectSecurityThreat(logEntry);

if (threat != null) {
// 2. 发送安全告警
alertService.sendSecurityAlert(threat);

// 3. 记录安全事件
recordSecurityEvent(threat);
}

} catch (Exception e) {
log.error("分析安全日志失败: {}", e.getMessage(), e);
}
}

/**
* 提取错误信息
*/
private ErrorInfo extractErrorInfo(LogEntry logEntry) {
ErrorInfo errorInfo = new ErrorInfo();
errorInfo.setId(logEntry.getId());
errorInfo.setMessage(logEntry.getMessage());
errorInfo.setLevel(logEntry.getLevel());
errorInfo.setApplication(logEntry.getApplication());
errorInfo.setHost(logEntry.getHost());
errorInfo.setTimestamp(logEntry.getTimestamp());

// 提取异常堆栈
if (logEntry.getMessage().contains("Exception")) {
errorInfo.setExceptionType(extractExceptionType(logEntry.getMessage()));
}

// 提取错误码
if (logEntry.getFields() != null && logEntry.getFields().containsKey("errorCode")) {
errorInfo.setErrorCode(logEntry.getFields().get("errorCode").toString());
}

return errorInfo;
}

/**
* 提取异常类型
*/
private String extractExceptionType(String message) {
Pattern pattern = Pattern.compile("(\\w+\\.\\w+Exception)");
Matcher matcher = pattern.matcher(message);
if (matcher.find()) {
return matcher.group(1);
}
return "UnknownException";
}

/**
* 检查是否是新错误
*/
private boolean isNewError(ErrorInfo errorInfo) {
String errorKey = errorInfo.getApplication() + ":" + errorInfo.getExceptionType();
String cacheKey = "error:cache:" + errorKey;

Boolean exists = redisTemplate.hasKey(cacheKey);
if (exists == null || !exists) {
// 设置缓存,5分钟内不重复告警
redisTemplate.opsForValue().set(cacheKey, "1", Duration.ofMinutes(5));
return true;
}

return false;
}

/**
* 提取性能指标
*/
private PerformanceMetrics extractPerformanceMetrics(LogEntry logEntry) {
PerformanceMetrics metrics = new PerformanceMetrics();
metrics.setId(logEntry.getId());
metrics.setApplication(logEntry.getApplication());
metrics.setHost(logEntry.getHost());
metrics.setTimestamp(logEntry.getTimestamp());

String message = logEntry.getMessage();

// 提取响应时间
Pattern responseTimePattern = Pattern.compile("responseTime[=:]\\s*(\\d+)");
Matcher responseTimeMatcher = responseTimePattern.matcher(message);
if (responseTimeMatcher.find()) {
metrics.setResponseTime(Long.parseLong(responseTimeMatcher.group(1)));
}

// 提取内存使用
Pattern memoryPattern = Pattern.compile("memory[=:]\\s*(\\d+)");
Matcher memoryMatcher = memoryPattern.matcher(message);
if (memoryMatcher.find()) {
metrics.setMemoryUsage(Long.parseLong(memoryMatcher.group(1)));
}

// 提取CPU使用
Pattern cpuPattern = Pattern.compile("cpu[=:]\\s*(\\d+)");
Matcher cpuMatcher = cpuPattern.matcher(message);
if (cpuMatcher.find()) {
metrics.setCpuUsage(Long.parseLong(cpuMatcher.group(1)));
}

return metrics;
}

/**
* 检查性能阈值
*/
private boolean isPerformanceThresholdExceeded(PerformanceMetrics metrics) {
// 响应时间超过1秒
if (metrics.getResponseTime() != null && metrics.getResponseTime() > 1000) {
return true;
}

// 内存使用超过80%
if (metrics.getMemoryUsage() != null && metrics.getMemoryUsage() > 80) {
return true;
}

// CPU使用超过90%
if (metrics.getCpuUsage() != null && metrics.getCpuUsage() > 90) {
return true;
}

return false;
}

/**
* 检测安全威胁
*/
private SecurityThreat detectSecurityThreat(LogEntry logEntry) {
String message = logEntry.getMessage().toLowerCase();

// 检测SQL注入
if (message.contains("sql") && message.contains("injection")) {
SecurityThreat threat = new SecurityThreat();
threat.setType("SQL_INJECTION");
threat.setSeverity("HIGH");
threat.setDescription("检测到SQL注入攻击");
threat.setSource(logEntry.getIp());
threat.setTimestamp(logEntry.getTimestamp());
return threat;
}

// 检测XSS攻击
if (message.contains("xss") || message.contains("script")) {
SecurityThreat threat = new SecurityThreat();
threat.setType("XSS");
threat.setSeverity("MEDIUM");
threat.setDescription("检测到XSS攻击");
threat.setSource(logEntry.getIp());
threat.setTimestamp(logEntry.getTimestamp());
return threat;
}

// 检测暴力破解
if (message.contains("failed") && message.contains("login")) {
String key = "failed:login:" + logEntry.getIp();
Long count = redisTemplate.opsForValue().increment(key);
redisTemplate.expire(key, Duration.ofMinutes(10));

if (count > 5) {
SecurityThreat threat = new SecurityThreat();
threat.setType("BRUTE_FORCE");
threat.setSeverity("HIGH");
threat.setDescription("检测到暴力破解攻击");
threat.setSource(logEntry.getIp());
threat.setTimestamp(logEntry.getTimestamp());
return threat;
}
}

return null;
}

/**
* 记录错误统计
*/
private void recordErrorStatistics(ErrorInfo errorInfo) {
String key = "error:stats:" + errorInfo.getApplication() + ":" + errorInfo.getExceptionType();
redisTemplate.opsForValue().increment(key);
redisTemplate.expire(key, Duration.ofDays(1));
}

/**
* 更新错误趋势
*/
private void updateErrorTrend(ErrorInfo errorInfo) {
String key = "error:trend:" + errorInfo.getApplication() + ":" +
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"));
redisTemplate.opsForValue().increment(key);
redisTemplate.expire(key, Duration.ofHours(2));
}

/**
* 更新性能统计
*/
private void updatePerformanceStatistics(PerformanceMetrics metrics) {
String key = "performance:stats:" + metrics.getApplication();

Map<String, Object> stats = new HashMap<>();
stats.put("responseTime", metrics.getResponseTime());
stats.put("memoryUsage", metrics.getMemoryUsage());
stats.put("cpuUsage", metrics.getCpuUsage());
stats.put("timestamp", metrics.getTimestamp());

redisTemplate.opsForList().leftPush(key, JSON.toJSONString(stats));
redisTemplate.opsForList().trim(key, 0, 99);
redisTemplate.expire(key, Duration.ofHours(1));
}

/**
* 记录安全事件
*/
private void recordSecurityEvent(SecurityThreat threat) {
String key = "security:events:" + threat.getType();

Map<String, Object> event = new HashMap<>();
event.put("threat", threat);
event.put("timestamp", Instant.now().toString());

redisTemplate.opsForList().leftPush(key, JSON.toJSONString(event));
redisTemplate.opsForList().trim(key, 0, 999);
redisTemplate.expire(key, Duration.ofDays(7));
}
}

6. 日志检索服务

6.1 日志检索服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/**
* 日志检索服务
* 提供高效的日志检索功能
*/
@Service
public class LogSearchService {

@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 搜索日志
* 根据条件搜索日志
*/
public SearchResult<LogEntry> searchLogs(LogSearchRequest request) {
try {
// 1. 构建搜索查询
NativeSearchQuery query = buildSearchQuery(request);

// 2. 执行搜索
SearchResult<LogEntry> result = elasticsearchTemplate.search(query, LogEntry.class);

// 3. 缓存搜索结果
cacheSearchResult(request, result);

return result;

} catch (Exception e) {
log.error("搜索日志失败: {}", e.getMessage(), e);
return new SearchResult<>();
}
}

/**
* 构建搜索查询
*/
private NativeSearchQuery buildSearchQuery(LogSearchRequest request) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

// 应用名称过滤
if (StringUtils.isNotEmpty(request.getApplication())) {
boolQuery.must(QueryBuilders.termQuery("application", request.getApplication()));
}

// 日志级别过滤
if (StringUtils.isNotEmpty(request.getLevel())) {
boolQuery.must(QueryBuilders.termQuery("level", request.getLevel()));
}

// 主机过滤
if (StringUtils.isNotEmpty(request.getHost())) {
boolQuery.must(QueryBuilders.termQuery("host", request.getHost()));
}

// 时间范围过滤
if (request.getStartTime() != null && request.getEndTime() != null) {
boolQuery.must(QueryBuilders.rangeQuery("timestamp")
.gte(request.getStartTime())
.lte(request.getEndTime()));
}

// 关键词搜索
if (StringUtils.isNotEmpty(request.getKeyword())) {
boolQuery.must(QueryBuilders.multiMatchQuery(request.getKeyword(),
"message", "logger", "thread"));
}

// 构建查询
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder()
.withQuery(boolQuery)
.withPageable(PageRequest.of(request.getPage(), request.getSize()))
.withSort(SortBuilders.fieldSort("timestamp").order(SortOrder.DESC));

return queryBuilder.build();
}

/**
* 缓存搜索结果
*/
private void cacheSearchResult(LogSearchRequest request, SearchResult<LogEntry> result) {
try {
String cacheKey = "search:result:" + DigestUtils.md5Hex(JSON.toJSONString(request));
redisTemplate.opsForValue().set(cacheKey, result, Duration.ofMinutes(10));
} catch (Exception e) {
log.warn("缓存搜索结果失败: {}", e.getMessage());
}
}

/**
* 获取日志统计
*/
public LogStatistics getLogStatistics(String application, Date startTime, Date endTime) {
try {
// 1. 构建聚合查询
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("application", application))
.must(QueryBuilders.rangeQuery("timestamp")
.gte(startTime)
.lte(endTime)))
.addAggregation(AggregationBuilders.terms("level_count")
.field("level")
.size(10))
.build();

// 2. 执行查询
SearchResult<LogEntry> result = elasticsearchTemplate.search(query, LogEntry.class);

// 3. 解析结果
LogStatistics statistics = new LogStatistics();
statistics.setApplication(application);
statistics.setStartTime(startTime);
statistics.setEndTime(endTime);

// 解析聚合结果
ParsedStringTerms levelCountAgg = result.getAggregations().get("level_count");
Map<String, Long> levelCount = new HashMap<>();
for (ParsedStringTerms.ParsedBucket bucket : levelCountAgg.getBuckets()) {
levelCount.put(bucket.getKeyAsString(), bucket.getDocCount());
}
statistics.setLevelCount(levelCount);

return statistics;

} catch (Exception e) {
log.error("获取日志统计失败: {}", e.getMessage(), e);
return new LogStatistics();
}
}

/**
* 获取实时日志
*/
public List<LogEntry> getRealTimeLogs(String application) {
try {
String cacheKey = "recent:logs:" + application;
List<Object> cachedLogs = redisTemplate.opsForList().range(cacheKey, 0, 99);

return cachedLogs.stream()
.map(log -> JSON.parseObject(log.toString(), LogEntry.class))
.collect(Collectors.toList());

} catch (Exception e) {
log.error("获取实时日志失败: {}", e.getMessage(), e);
return new ArrayList<>();
}
}
}

7. 总结

本文详细介绍了支撑TB级日志的Java日志处理架构设计,包括:

7.1 核心技术点

  1. 日志采集: 多源日志采集、批量处理
  2. 日志处理: 数据清洗、标准化、结构化
  3. 日志存储: Elasticsearch存储、索引优化
  4. 日志分析: 实时分析、错误检测、性能监控
  5. 日志检索: 高效检索、统计分析

7.2 架构优势

  1. 高性能: 批量处理、异步处理提高性能
  2. 高可用: 消息队列保证数据不丢失
  3. 可扩展: 水平扩展支持更大数据量
  4. 实时性: 实时分析和告警

7.3 最佳实践

  1. 日志采集: 统一日志格式、批量采集
  2. 数据处理: 数据清洗、结构化存储
  3. 分析告警: 实时分析、智能告警
  4. 检索优化: 索引优化、缓存策略

通过以上架构设计,可以构建支撑TB级日志的处理系统,满足大规模日志数据的采集、存储、分析和检索需求。