你如何做可观测性建设?

1. 概述

1.1 可观测性的重要性

可观测性(Observability)是现代分布式系统运维的核心能力,通过日志、指标、链路追踪三大支柱,实现对系统运行状态的全面感知和快速问题定位。

本文内容

  • 日志管理:日志收集、聚合、分析、存储
  • 指标监控:指标采集、存储、可视化、分析
  • 链路追踪:分布式追踪、调用链分析、性能分析
  • 告警系统:告警规则、告警通知、告警处理
  • 可观测性平台:统一可观测性平台建设
  • 实战案例:可观测性建设实践案例

1.2 本文内容结构

本文将从以下几个方面深入探讨可观测性建设:

  1. 日志管理:日志收集、聚合、分析、存储
  2. 指标监控:指标采集、存储、可视化、分析
  3. 链路追踪:分布式追踪、调用链分析、性能分析
  4. 告警系统:告警规则、告警通知、告警处理
  5. 可观测性平台:统一可观测性平台建设
  6. 实战案例:可观测性建设实践案例

2. 日志管理

2.1 日志收集

2.1.1 日志收集架构

日志收集架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// 日志收集架构
public class LogCollection {

// 1. 应用日志输出
@RestController
public class LogController {

private static final Logger logger = LoggerFactory.getLogger(LogController.class);

@GetMapping("/api/test")
public String test() {
// 结构化日志
logger.info("请求处理开始",
kv("requestId", UUID.randomUUID().toString()),
kv("path", "/api/test"),
kv("method", "GET")
);

try {
// 业务逻辑
return "success";
} catch (Exception e) {
// 错误日志
logger.error("请求处理失败",
kv("requestId", UUID.randomUUID().toString()),
kv("error", e.getMessage()),
kv("stackTrace", ExceptionUtils.getStackTrace(e))
);
throw e;
} finally {
logger.info("请求处理完成",
kv("requestId", UUID.randomUUID().toString()),
kv("duration", System.currentTimeMillis())
);
}
}
}

// 2. 日志配置(Logback)
// logback-spring.xml
/*
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/application.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/application.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<appender name="JSON" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/application.json</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/application.%d{yyyy-MM-dd}.json</fileNamePattern>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeContext>true</includeContext>
<includeCallerData>true</includeCallerData>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="CONSOLE" />
<appender-ref ref="FILE" />
<appender-ref ref="JSON" />
</root>
</configuration>
*/
}

2.2 日志聚合

2.2.1 日志聚合方案

日志聚合方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# ELK Stack (Elasticsearch + Logstash + Kibana)
# docker-compose.yml
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data

logstash:
image: docker.elastic.co/logstash/logstash:7.17.0
volumes:
- ./logstash/config:/usr/share/logstash/config
- ./logstash/pipeline:/usr/share/logstash/pipeline
ports:
- "5044:5044"
depends_on:
- elasticsearch

kibana:
image: docker.elastic.co/kibana/kibana:7.17.0
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
- elasticsearch

volumes:
es_data:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Filebeat配置
// filebeat.yml
/*
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/application/*.log
fields:
service: user-service
environment: production
fields_under_root: true
multiline.pattern: '^\d{4}-\d{2}-\d{2}'
multiline.negate: true
multiline.match: after

output.logstash:
hosts: ["logstash:5044"]

processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_docker_metadata: ~
*/

2.3 日志分析

2.3.1 日志分析实践

日志分析实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// 日志分析工具
public class LogAnalysis {

// 1. 日志查询(Kibana DSL)
/*
GET /logs-*/_search
{
"query": {
"bool": {
"must": [
{ "match": { "level": "ERROR" } },
{ "range": { "@timestamp": { "gte": "now-1h" } } }
]
}
},
"aggs": {
"errors_by_service": {
"terms": { "field": "service.keyword" },
"aggs": {
"error_count": { "value_count": { "field": "_id" } }
}
}
}
}
*/

// 2. 日志分析服务
@Service
public class LogAnalysisService {

@Autowired
private ElasticsearchClient elasticsearchClient;

// 错误日志统计
public Map<String, Long> getErrorLogStats(String timeRange) {
SearchRequest searchRequest = SearchRequest.of(s -> s
.index("logs-*")
.query(q -> q
.bool(b -> b
.must(m -> m.match(t -> t.field("level").query("ERROR")))
.must(m -> m.range(r -> r
.field("@timestamp")
.gte(JsonData.of("now-" + timeRange))
))
)
)
.aggregations("errors_by_service", a -> a
.terms(t -> t.field("service.keyword"))
.aggregations("error_count", a2 -> a2
.valueCount(v -> v.field("_id"))
)
)
);

SearchResponse<Object> response = elasticsearchClient.search(searchRequest, Object.class);
// 处理结果
return processResponse(response);
}

// 日志搜索
public List<LogEntry> searchLogs(LogSearchRequest request) {
SearchRequest searchRequest = SearchRequest.of(s -> s
.index("logs-*")
.query(q -> buildQuery(request))
.sort(so -> so.field(f -> f.field("@timestamp").order(SortOrder.Desc)))
.from(request.getFrom())
.size(request.getSize())
);

SearchResponse<LogEntry> response = elasticsearchClient.search(searchRequest, LogEntry.class);
return response.hits().hits().stream()
.map(hit -> hit.source())
.collect(Collectors.toList());
}
}
}

3. 指标监控

3.1 指标采集

3.1.1 指标采集方案

指标采集方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Prometheus指标采集
public class MetricsCollection {

// 1. Prometheus客户端
@Component
public class PrometheusMetrics {

// Counter(计数器)
private final Counter requestCounter = Counter.build()
.name("http_requests_total")
.help("Total HTTP requests")
.labelNames("method", "path", "status")
.register();

// Gauge(仪表盘)
private final Gauge activeConnections = Gauge.build()
.name("active_connections")
.help("Active connections")
.register();

// Histogram(直方图)
private final Histogram requestDuration = Histogram.build()
.name("http_request_duration_seconds")
.help("HTTP request duration")
.labelNames("method", "path")
.buckets(0.1, 0.5, 1.0, 2.0, 5.0)
.register();

// Summary(摘要)
private final Summary responseSize = Summary.build()
.name("http_response_size_bytes")
.help("HTTP response size")
.labelNames("method", "path")
.quantile(0.5, 0.05)
.quantile(0.95, 0.01)
.quantile(0.99, 0.001)
.register();

// 记录指标
public void recordRequest(String method, String path, int status, double duration) {
requestCounter.labels(method, path, String.valueOf(status)).inc();
requestDuration.labels(method, path).observe(duration);
}
}

// 2. Spring Boot Actuator集成
@Configuration
public class ActuatorConfig {

@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
return registry -> registry.config().commonTags(
"application", "user-service",
"environment", "production"
);
}
}

// 3. 自定义指标
@Component
public class CustomMetrics {

private final MeterRegistry meterRegistry;

public CustomMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

// 业务指标
public void recordBusinessMetric(String businessType, double value) {
meterRegistry.counter("business_metric_total",
"type", businessType).increment(value);
}

// JVM指标
public void recordJvmMetrics() {
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();

meterRegistry.gauge("jvm_memory_used_bytes",
Tags.of("area", "heap"),
heapUsage.getUsed());

meterRegistry.gauge("jvm_memory_max_bytes",
Tags.of("area", "heap"),
heapUsage.getMax());
}
}
}

3.2 指标存储与查询

3.2.1 Prometheus存储

Prometheus存储与查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s

scrape_configs:
- job_name: 'user-service'
metrics_path: '/actuator/prometheus'
static_configs:
- targets: ['localhost:8080']
labels:
service: 'user-service'
environment: 'production'

- job_name: 'order-service'
metrics_path: '/actuator/prometheus'
static_configs:
- targets: ['localhost:8081']
labels:
service: 'order-service'
environment: 'production'

rule_files:
- 'alerts.yml'

alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# PromQL查询示例
# 1. 请求速率
rate(http_requests_total[5m])

# 2. 错误率
rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m])

# 3. P95延迟
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))

# 4. CPU使用率
100 - (avg(irate(process_cpu_seconds_total[5m])) * 100)

# 5. 内存使用率
(process_resident_memory_bytes / process_virtual_memory_bytes) * 100

# 6. 服务可用性
up{job="user-service"}

# 7. 业务指标
sum(rate(business_metric_total[5m])) by (type)

3.3 指标可视化

3.3.1 Grafana可视化

Grafana可视化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Grafana Dashboard配置
{
"dashboard": {
"title": "应用监控面板",
"panels": [
{
"title": "请求速率",
"targets": [
{
"expr": "rate(http_requests_total[5m])",
"legendFormat": "{{method}} {{path}}"
}
],
"type": "graph"
},
{
"title": "错误率",
"targets": [
{
"expr": "rate(http_requests_total{status=~\"5..\"}[5m]) / rate(http_requests_total[5m])",
"legendFormat": "错误率"
}
],
"type": "graph"
},
{
"title": "P95延迟",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))",
"legendFormat": "P95延迟"
}
],
"type": "graph"
},
{
"title": "CPU使用率",
"targets": [
{
"expr": "100 - (avg(irate(process_cpu_seconds_total[5m])) * 100)",
"legendFormat": "CPU使用率"
}
],
"type": "graph"
}
]
}
}

4. 链路追踪

4.1 分布式追踪

4.1.1 链路追踪实现

链路追踪实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// 链路追踪实现
public class DistributedTracing {

// 1. Spring Cloud Sleuth集成
@SpringBootApplication
public class TracingApplication {

public static void main(String[] args) {
SpringApplication.run(TracingApplication.class, args);
}

// Sleuth配置
@Bean
public Sampler alwaysSampler() {
return Sampler.alwaysSample();
}
}

// 2. 手动创建Span
@Service
public class TracingService {

@Autowired
private Tracer tracer;

public String processRequest(String requestId) {
// 创建Span
Span span = tracer.nextSpan()
.name("process-request")
.tag("requestId", requestId)
.start();

try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
// 业务逻辑
String result = doProcess(requestId);

// 添加事件
span.event("request-processed");
span.tag("result", result);

return result;
} catch (Exception e) {
span.tag("error", true);
span.tag("error.message", e.getMessage());
throw e;
} finally {
span.end();
}
}

private String doProcess(String requestId) {
// 业务处理
return "success";
}
}

// 3. HTTP请求追踪
@RestController
public class TracingController {

@Autowired
private RestTemplate restTemplate;

@Autowired
private Tracer tracer;

@GetMapping("/api/call")
public String callService() {
Span span = tracer.nextSpan().name("call-service").start();

try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
// 自动注入追踪头
String result = restTemplate.getForObject("http://other-service/api/data", String.class);
return result;
} finally {
span.end();
}
}
}

// 4. 异步追踪
@Service
public class AsyncTracingService {

@Autowired
private Tracer tracer;

@Async
public CompletableFuture<String> asyncProcess(String data) {
Span span = tracer.nextSpan().name("async-process").start();

try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
// 异步处理
String result = processData(data);
return CompletableFuture.completedFuture(result);
} finally {
span.end();
}
}

private String processData(String data) {
return "processed: " + data;
}
}
}

4.2 调用链分析

4.2.1 调用链分析

调用链分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// 调用链分析
public class TraceAnalysis {

// 1. Zipkin集成
@Configuration
public class ZipkinConfig {

@Bean
public ZipkinReporter zipkinReporter() {
return AsyncReporter.create(
OkHttpSender.create("http://zipkin:9411/api/v2/spans")
);
}

@Bean
public Brave brave() {
return new Brave.Builder("user-service")
.reporter(zipkinReporter())
.sampler(Sampler.create(1.0f))
.build();
}
}

// 2. 调用链查询
@Service
public class TraceQueryService {

@Autowired
private ZipkinClient zipkinClient;

// 查询调用链
public List<Trace> queryTraces(TraceQueryRequest request) {
QueryRequest queryRequest = QueryRequest.newBuilder()
.serviceName(request.getServiceName())
.spanName(request.getSpanName())
.lookback(request.getLookback())
.limit(request.getLimit())
.build();

return zipkinClient.getTraces(queryRequest);
}

// 查询服务依赖
public List<DependencyLink> getDependencies(String endTs, String lookback) {
return zipkinClient.getDependencies(endTs, lookback);
}

// 查询Span详情
public Span getSpan(String traceId, String spanId) {
return zipkinClient.getSpan(traceId, spanId);
}
}

// 3. 性能分析
@Service
public class PerformanceAnalysis {

@Autowired
private TraceQueryService traceQueryService;

// 分析慢请求
public List<SlowRequest> analyzeSlowRequests(String serviceName, long threshold) {
List<Trace> traces = traceQueryService.queryTraces(
TraceQueryRequest.builder()
.serviceName(serviceName)
.lookback(3600000) // 1小时
.build()
);

return traces.stream()
.flatMap(trace -> trace.getSpans().stream())
.filter(span -> span.getDuration() > threshold)
.map(span -> SlowRequest.builder()
.traceId(span.getTraceId())
.spanId(span.getId())
.duration(span.getDuration())
.operation(span.getName())
.build())
.collect(Collectors.toList());
}

// 分析服务依赖
public ServiceDependencyGraph buildDependencyGraph() {
List<DependencyLink> dependencies = traceQueryService.getDependencies(
String.valueOf(System.currentTimeMillis()),
"3600000"
);

return ServiceDependencyGraph.builder()
.dependencies(dependencies)
.build();
}
}
}

5. 告警系统

5.1 告警规则

5.1.1 告警规则配置

告警规则配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# Prometheus告警规则
# alerts.yml
groups:
- name: application_alerts
interval: 30s
rules:
# 错误率告警
- alert: HighErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m]) > 0.05
for: 5m
labels:
severity: critical
service: "{{ $labels.service }}"
annotations:
summary: "服务错误率过高"
description: "服务 {{ $labels.service }} 错误率超过5%,当前值: {{ $value }}"

# 响应时间告警
- alert: HighResponseTime
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
service: "{{ $labels.service }}"
annotations:
summary: "服务响应时间过长"
description: "服务 {{ $labels.service }} P95响应时间超过2秒,当前值: {{ $value }}秒"

# 服务不可用告警
- alert: ServiceDown
expr: up{job="user-service"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "服务不可用"
description: "服务 {{ $labels.job }} 已下线"

# CPU使用率告警
- alert: HighCPUUsage
expr: 100 - (avg(irate(process_cpu_seconds_total[5m])) * 100) > 80
for: 5m
labels:
severity: warning
annotations:
summary: "CPU使用率过高"
description: "CPU使用率超过80%,当前值: {{ $value }}%"

# 内存使用率告警
- alert: HighMemoryUsage
expr: (process_resident_memory_bytes / process_virtual_memory_bytes) * 100 > 80
for: 5m
labels:
severity: warning
annotations:
summary: "内存使用率过高"
description: "内存使用率超过80%,当前值: {{ $value }}%"

# 业务指标告警
- alert: BusinessMetricAnomaly
expr: rate(business_metric_total[5m]) < 10
for: 10m
labels:
severity: warning
annotations:
summary: "业务指标异常"
description: "业务指标 {{ $labels.type }} 低于阈值,当前值: {{ $value }}"

5.2 告警通知

5.2.1 告警通知配置

告警通知配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# Alertmanager配置
# alertmanager.yml
global:
resolve_timeout: 5m
slack_api_url: 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL'

route:
group_by: ['alertname', 'cluster', 'service']
group_wait: 10s
group_interval: 10s
repeat_interval: 12h
receiver: 'default'
routes:
- match:
severity: critical
receiver: 'critical-alerts'
continue: true
- match:
severity: warning
receiver: 'warning-alerts'

receivers:
- name: 'default'
webhook_configs:
- url: 'http://alert-handler:8080/alerts'
send_resolved: true

- name: 'critical-alerts'
slack_configs:
- channel: '#critical-alerts'
title: '{{ .GroupLabels.alertname }}'
text: '{{ range .Alerts }}{{ .Annotations.description }}{{ end }}'
email_configs:
- to: 'oncall@example.com'
headers:
Subject: 'Critical Alert: {{ .GroupLabels.alertname }}'
webhook_configs:
- url: 'http://alert-handler:8080/alerts/critical'
send_resolved: true

- name: 'warning-alerts'
slack_configs:
- channel: '#warning-alerts'
title: '{{ .GroupLabels.alertname }}'
text: '{{ range .Alerts }}{{ .Annotations.description }}{{ end }}'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 告警处理服务
@RestController
@RequestMapping("/alerts")
public class AlertHandler {

@Autowired
private AlertService alertService;

@Autowired
private NotificationService notificationService;

// 接收告警
@PostMapping
public ResponseEntity<Void> handleAlert(@RequestBody Alert alert) {
// 保存告警
alertService.saveAlert(alert);

// 发送通知
notificationService.sendNotification(alert);

// 触发自动化处理
if (alert.getSeverity() == Severity.CRITICAL) {
alertService.triggerAutoRemediation(alert);
}

return ResponseEntity.ok().build();
}

// 告警处理
@PostMapping("/{alertId}/resolve")
public ResponseEntity<Void> resolveAlert(@PathVariable String alertId) {
alertService.resolveAlert(alertId);
return ResponseEntity.ok().build();
}
}

@Service
public class NotificationService {

// 发送通知
public void sendNotification(Alert alert) {
// 1. 发送邮件
sendEmail(alert);

// 2. 发送短信
if (alert.getSeverity() == Severity.CRITICAL) {
sendSms(alert);
}

// 3. 发送钉钉/企业微信
sendDingTalk(alert);

// 4. 发送Slack
sendSlack(alert);
}

private void sendEmail(Alert alert) {
// 邮件发送逻辑
}

private void sendSms(Alert alert) {
// 短信发送逻辑
}

private void sendDingTalk(Alert alert) {
// 钉钉通知逻辑
}

private void sendSlack(Alert alert) {
// Slack通知逻辑
}
}

5.3 告警处理

5.3.1 告警处理流程

告警处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// 告警处理流程
@Service
public class AlertProcessingService {

@Autowired
private AlertRepository alertRepository;

@Autowired
private RemediationService remediationService;

// 告警处理流程
@Transactional
public void processAlert(Alert alert) {
// 1. 告警去重
if (isDuplicate(alert)) {
updateExistingAlert(alert);
return;
}

// 2. 告警分级
Severity severity = classifySeverity(alert);
alert.setSeverity(severity);

// 3. 保存告警
alertRepository.save(alert);

// 4. 告警路由
routeAlert(alert);

// 5. 自动化处理
if (canAutoRemediate(alert)) {
remediationService.autoRemediate(alert);
}
}

// 告警去重
private boolean isDuplicate(Alert alert) {
return alertRepository.existsByFingerprint(alert.getFingerprint());
}

// 告警分级
private Severity classifySeverity(Alert alert) {
// 根据规则分类
if (alert.getLabels().containsKey("severity")) {
return Severity.valueOf(alert.getLabels().get("severity"));
}

// 默认分级逻辑
if (alert.getAnnotations().containsKey("critical")) {
return Severity.CRITICAL;
}

return Severity.WARNING;
}

// 告警路由
private void routeAlert(Alert alert) {
// 根据告警类型路由到不同的处理组
String route = determineRoute(alert);
alert.setRoute(route);
}

// 自动化处理
private boolean canAutoRemediate(Alert alert) {
// 判断是否可以自动处理
return alert.getSeverity() == Severity.CRITICAL
&& alert.getType() == AlertType.AUTO_REMEDIABLE;
}
}

@Service
public class RemediationService {

// 自动化修复
public void autoRemediate(Alert alert) {
switch (alert.getType()) {
case HIGH_CPU:
scaleOut(alert);
break;
case HIGH_MEMORY:
restartService(alert);
break;
case SERVICE_DOWN:
restartService(alert);
break;
default:
// 无法自动处理
break;
}
}

// 扩容
private void scaleOut(Alert alert) {
// 扩容逻辑
}

// 重启服务
private void restartService(Alert alert) {
// 重启服务逻辑
}
}

6. 可观测性平台

6.1 统一可观测性平台

6.1.1 平台架构

统一可观测性平台架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// 统一可观测性平台
@SpringBootApplication
public class ObservabilityPlatform {

public static void main(String[] args) {
SpringApplication.run(ObservabilityPlatform.class, args);
}

// 统一查询接口
@RestController
@RequestMapping("/api/observability")
public class ObservabilityController {

@Autowired
private LogService logService;

@Autowired
private MetricsService metricsService;

@Autowired
private TraceService traceService;

// 统一查询
@PostMapping("/query")
public ObservabilityResult query(@RequestBody ObservabilityQuery query) {
ObservabilityResult result = new ObservabilityResult();

// 查询日志
if (query.isIncludeLogs()) {
result.setLogs(logService.searchLogs(query.getLogQuery()));
}

// 查询指标
if (query.isIncludeMetrics()) {
result.setMetrics(metricsService.queryMetrics(query.getMetricsQuery()));
}

// 查询链路
if (query.isIncludeTraces()) {
result.setTraces(traceService.queryTraces(query.getTraceQuery()));
}

return result;
}

// 关联查询
@GetMapping("/correlate")
public CorrelationResult correlate(
@RequestParam String traceId,
@RequestParam(required = false) String spanId
) {
// 根据TraceId关联日志、指标、链路
Trace trace = traceService.getTrace(traceId);
List<LogEntry> logs = logService.searchLogsByTraceId(traceId);
List<Metric> metrics = metricsService.queryMetricsByTraceId(traceId);

return CorrelationResult.builder()
.trace(trace)
.logs(logs)
.metrics(metrics)
.build();
}
}

// 数据关联服务
@Service
public class CorrelationService {

// 关联TraceId
public void correlateByTraceId(String traceId) {
// 在日志中添加TraceId
// 在指标中添加TraceId
// 在链路中关联日志和指标
}

// 关联RequestId
public void correlateByRequestId(String requestId) {
// 通过RequestId关联所有数据
}
}
}

6.2 数据关联

6.2.1 数据关联实现

数据关联实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// 数据关联实现
public class DataCorrelation {

// 1. TraceId注入
@Component
public class TraceIdInterceptor implements HandlerInterceptor {

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
// 从请求头获取TraceId
String traceId = request.getHeader("X-Trace-Id");
if (traceId == null) {
traceId = UUID.randomUUID().toString();
}

// 设置到MDC
MDC.put("traceId", traceId);
MDC.put("requestId", UUID.randomUUID().toString());

// 添加到响应头
response.setHeader("X-Trace-Id", traceId);

return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
MDC.clear();
}
}

// 2. 日志关联
@Component
public class LogCorrelation {

// 日志格式包含TraceId
// logback-spring.xml
/*
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%X{traceId}] [%X{requestId}] %-5level %logger{36} - %msg%n</pattern>
*/
}

// 3. 指标关联
@Component
public class MetricsCorrelation {

@Autowired
private MeterRegistry meterRegistry;

// 指标标签包含TraceId
public void recordMetricWithTraceId(String metricName, String traceId, double value) {
meterRegistry.counter(metricName, "traceId", traceId).increment(value);
}
}

// 4. 关联查询
@Service
public class CorrelationQueryService {

@Autowired
private ElasticsearchClient elasticsearchClient;

@Autowired
private PrometheusClient prometheusClient;

@Autowired
private ZipkinClient zipkinClient;

// 根据TraceId关联查询
public CorrelationResult queryByTraceId(String traceId) {
// 查询日志
List<LogEntry> logs = searchLogsByTraceId(traceId);

// 查询指标
List<Metric> metrics = queryMetricsByTraceId(traceId);

// 查询链路
Trace trace = zipkinClient.getTrace(traceId);

return CorrelationResult.builder()
.traceId(traceId)
.logs(logs)
.metrics(metrics)
.trace(trace)
.build();
}

private List<LogEntry> searchLogsByTraceId(String traceId) {
SearchRequest searchRequest = SearchRequest.of(s -> s
.index("logs-*")
.query(q -> q.match(m -> m.field("traceId").query(traceId)))
);

SearchResponse<LogEntry> response = elasticsearchClient.search(searchRequest, LogEntry.class);
return response.hits().hits().stream()
.map(hit -> hit.source())
.collect(Collectors.toList());
}

private List<Metric> queryMetricsByTraceId(String traceId) {
// 查询Prometheus指标
String query = String.format("metrics_total{traceId=\"%s\"}", traceId);
return prometheusClient.query(query);
}
}
}

7. 实战案例

7.1 完整可观测性建设

7.1.1 可观测性建设案例

完整可观测性建设案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// 完整可观测性建设案例
@SpringBootApplication
public class CompleteObservabilitySetup {

public static void main(String[] args) {
SpringApplication app = new SpringApplication(CompleteObservabilitySetup.class);

// 启用可观测性
app.setAdditionalProfiles("observability");

ConfigurableApplicationContext context = app.run(args);
}

// 可观测性配置
@Configuration
@Profile("observability")
public class ObservabilityConfig {

// 1. 日志配置
@Bean
public LoggingSystemCustomizer loggingSystemCustomizer() {
return loggingSystem -> {
// 配置结构化日志
// 配置日志输出到文件
// 配置日志聚合
};
}

// 2. 指标配置
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCustomizer() {
return registry -> {
registry.config().commonTags(
"application", "user-service",
"environment", "production"
);
};
}

// 3. 链路追踪配置
@Bean
public Sampler traceSampler() {
// 采样率配置
return Sampler.create(1.0f);
}

// 4. 健康检查配置
@Bean
public HealthIndicator customHealthIndicator() {
return new CustomHealthIndicator();
}
}

// 可观测性拦截器
@Component
public class ObservabilityInterceptor implements HandlerInterceptor {

@Autowired
private MeterRegistry meterRegistry;

@Autowired
private Tracer tracer;

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
// 创建Span
Span span = tracer.nextSpan()
.name("http-request")
.tag("method", request.getMethod())
.tag("path", request.getRequestURI())
.start();

request.setAttribute("span", span);
request.setAttribute("startTime", System.currentTimeMillis());

return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
Span span = (Span) request.getAttribute("span");
Long startTime = (Long) request.getAttribute("startTime");

if (span != null) {
long duration = System.currentTimeMillis() - startTime;

// 记录指标
meterRegistry.counter("http_requests_total",
"method", request.getMethod(),
"path", request.getRequestURI(),
"status", String.valueOf(response.getStatus())
).increment();

meterRegistry.timer("http_request_duration_seconds",
"method", request.getMethod(),
"path", request.getRequestURI()
).record(duration, TimeUnit.MILLISECONDS);

// 记录Span
span.tag("status", String.valueOf(response.getStatus()));
span.tag("duration", String.valueOf(duration));

if (ex != null) {
span.tag("error", true);
span.tag("error.message", ex.getMessage());
}

span.end();
}
}
}

// 可观测性服务
@Service
public class ObservabilityService {

@Autowired
private LogService logService;

@Autowired
private MetricsService metricsService;

@Autowired
private TraceService traceService;

@Autowired
private AlertService alertService;

// 统一查询
public ObservabilityDashboard getDashboard(String serviceName, String timeRange) {
// 查询日志
LogStats logStats = logService.getLogStats(serviceName, timeRange);

// 查询指标
MetricsStats metricsStats = metricsService.getMetricsStats(serviceName, timeRange);

// 查询链路
TraceStats traceStats = traceService.getTraceStats(serviceName, timeRange);

// 查询告警
List<Alert> alerts = alertService.getActiveAlerts(serviceName);

return ObservabilityDashboard.builder()
.serviceName(serviceName)
.timeRange(timeRange)
.logStats(logStats)
.metricsStats(metricsStats)
.traceStats(traceStats)
.alerts(alerts)
.build();
}
}
}

8. 总结

8.1 核心要点

  1. 日志管理:日志收集、聚合、分析、存储的完整流程
  2. 指标监控:指标采集、存储、可视化、分析的完整流程
  3. 链路追踪:分布式追踪、调用链分析、性能分析的完整流程
  4. 告警系统:告警规则、告警通知、告警处理的完整流程
  5. 可观测性平台:统一可观测性平台建设
  6. 数据关联:日志、指标、链路的关联分析

8.2 关键理解

  1. 三大支柱:日志、指标、链路是可观测性的三大支柱
  2. 数据关联:通过TraceId、RequestId等实现数据关联
  3. 告警策略:合理的告警规则和通知策略
  4. 平台统一:统一的可观测性平台提供一站式服务
  5. 自动化:自动化告警处理和问题定位

8.3 最佳实践

  1. 结构化日志:使用结构化日志便于分析和查询
  2. 指标标准化:统一的指标命名和标签规范
  3. 采样策略:合理的链路追踪采样策略
  4. 告警去重:避免告警风暴
  5. 数据关联:通过TraceId等实现数据关联
  6. 可视化:直观的可视化展示
  7. 自动化:自动化告警处理和问题定位
  8. 持续优化:持续优化可观测性体系

相关文章