第249集微服务链路日志追踪架构实战:分布式链路、性能监控与企业级可观测性设计
|字数总计:7.5k|阅读时长:39分钟|阅读量:
前言
微服务架构下,一个用户请求可能跨越多个服务,涉及数据库、缓存、消息队列等多个组件。链路追踪通过记录请求在分布式系统中的完整调用路径,帮助开发者快速定位性能瓶颈和故障根因。本文从架构设计到代码实现,系统梳理企业级链路追踪的完整解决方案。
一、链路追踪核心概念
1.1 基本概念
- Trace(链路):一次完整的请求调用链,包含多个Span
- Span(跨度):链路中的最小工作单元,代表一个操作
- TraceId:全局唯一标识,贯穿整个调用链
- SpanId:当前Span的唯一标识
- Parent SpanId:父级Span的标识,用于构建调用树
- Tags:Span的标签,用于分类和过滤
- Logs:Span的日志事件,记录关键信息
- Baggage:跨服务传递的上下文信息
1.2 链路追踪架构图
二、链路追踪实现方案
2.1 手动埋点实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
| @Component @Slf4j public class TraceContext {
private static final ThreadLocal<TraceContext> CONTEXT_HOLDER = new ThreadLocal<>();
private String traceId; private String spanId; private String parentSpanId; private long startTime; private Map<String, String> tags; private List<TraceLog> logs;
public static TraceContext getCurrent() { return CONTEXT_HOLDER.get(); }
public static void setCurrent(TraceContext context) { CONTEXT_HOLDER.set(context); }
public static void clear() { CONTEXT_HOLDER.remove(); }
public static TraceContext createTrace(String operationName) { TraceContext context = new TraceContext(); context.traceId = generateTraceId(); context.spanId = generateSpanId(); context.parentSpanId = null; context.startTime = System.currentTimeMillis(); context.tags = new HashMap<>(); context.logs = new ArrayList<>();
context.addTag("operation", operationName); context.addLog("trace.start", "Trace started");
return context; }
public TraceContext createChildSpan(String operationName) { TraceContext childContext = new TraceContext(); childContext.traceId = this.traceId; childContext.spanId = generateSpanId(); childContext.parentSpanId = this.spanId; childContext.startTime = System.currentTimeMillis(); childContext.tags = new HashMap<>(); childContext.logs = new ArrayList<>();
childContext.addTag("operation", operationName); childContext.addLog("span.start", "Span started");
return childContext; }
public void addTag(String key, String value) { if (tags == null) { tags = new HashMap<>(); } tags.put(key, value); }
public void addLog(String event, String message) { if (logs == null) { logs = new ArrayList<>(); } TraceLog log = new TraceLog(); log.setTimestamp(System.currentTimeMillis()); log.setEvent(event); log.setMessage(message); logs.add(log); }
public void finish() { long duration = System.currentTimeMillis() - startTime; addTag("duration", String.valueOf(duration)); addLog("span.finish", "Span finished in " + duration + "ms");
sendToTracingSystem(); }
private void sendToTracingSystem() { TraceData traceData = new TraceData(); traceData.setTraceId(traceId); traceData.setSpanId(spanId); traceData.setParentSpanId(parentSpanId); traceData.setStartTime(startTime); traceData.setDuration(System.currentTimeMillis() - startTime); traceData.setTags(tags); traceData.setLogs(logs);
CompletableFuture.runAsync(() -> { try { tracingService.sendTrace(traceData); } catch (Exception e) { log.error("发送链路追踪数据失败", e); } }); }
private static String generateTraceId() { return UUID.randomUUID().toString().replace("-", ""); }
private static String generateSpanId() { return Long.toHexString(System.nanoTime()); } }
|
2.2 AOP自动埋点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| @Aspect @Component @Slf4j public class TracingAspect {
@Autowired private TracingService tracingService;
@Around("@annotation(Traced)") public Object traceMethod(ProceedingJoinPoint joinPoint) throws Throwable { String className = joinPoint.getTarget().getClass().getSimpleName(); String methodName = joinPoint.getSignature().getName(); String operationName = className + "." + methodName;
TraceContext currentContext = TraceContext.getCurrent(); TraceContext spanContext;
if (currentContext == null) { spanContext = TraceContext.createTrace(operationName); } else { spanContext = currentContext.createChildSpan(operationName); }
TraceContext.setCurrent(spanContext);
try { Object[] args = joinPoint.getArgs(); for (int i = 0; i < args.length; i++) { spanContext.addTag("arg." + i, String.valueOf(args[i])); }
Object result = joinPoint.proceed();
spanContext.addTag("result", String.valueOf(result)); spanContext.addLog("method.success", "Method executed successfully");
return result;
} catch (Exception e) { spanContext.addTag("error", "true"); spanContext.addTag("error.message", e.getMessage()); spanContext.addLog("method.error", "Method execution failed: " + e.getMessage()); throw e; } finally { spanContext.finish(); TraceContext.clear(); } }
@Around("execution(* org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handle(..))") public Object traceHttpRequest(ProceedingJoinPoint joinPoint) throws Throwable { HttpServletRequest request = getCurrentRequest(); String operationName = request.getMethod() + " " + request.getRequestURI();
TraceContext traceContext = TraceContext.createTrace(operationName); traceContext.addTag("http.method", request.getMethod()); traceContext.addTag("http.url", request.getRequestURL().toString()); traceContext.addTag("http.user_agent", request.getHeader("User-Agent")); traceContext.addTag("http.remote_addr", getClientIp(request));
TraceContext.setCurrent(traceContext);
try { Object result = joinPoint.proceed(); traceContext.addTag("http.status", "200"); return result; } catch (Exception e) { traceContext.addTag("http.status", "500"); traceContext.addTag("error", "true"); throw e; } finally { traceContext.finish(); TraceContext.clear(); } }
@Around("execution(* org.springframework.jdbc.core.JdbcTemplate.*(..))") public Object traceDatabaseOperation(ProceedingJoinPoint joinPoint) throws Throwable { String methodName = joinPoint.getSignature().getName(); String operationName = "database." + methodName;
TraceContext currentContext = TraceContext.getCurrent(); TraceContext spanContext = currentContext != null ? currentContext.createChildSpan(operationName) : TraceContext.createTrace(operationName);
TraceContext.setCurrent(spanContext);
try { Object[] args = joinPoint.getArgs(); if (args.length > 0 && args[0] instanceof String) { String sql = (String) args[0]; spanContext.addTag("db.statement", sql); spanContext.addTag("db.type", "sql"); }
Object result = joinPoint.proceed(); spanContext.addLog("db.success", "Database operation completed"); return result;
} catch (Exception e) { spanContext.addTag("error", "true"); spanContext.addTag("error.message", e.getMessage()); throw e; } finally { spanContext.finish(); TraceContext.clear(); } } }
|
2.3 链路追踪服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
| @Service @Slf4j public class TracingService {
@Autowired private TraceDataRepository traceDataRepository;
@Autowired private RedisTemplate<String, Object> redisTemplate;
@Autowired private RabbitTemplate rabbitTemplate;
public void sendTrace(TraceData traceData) { try { if (!shouldSample(traceData)) { return; }
preprocessTraceData(traceData);
CompletableFuture.runAsync(() -> { try { traceDataRepository.save(traceData);
storeTraceToRedis(traceData);
sendTraceToQueue(traceData);
} catch (Exception e) { log.error("存储链路追踪数据失败: traceId={}", traceData.getTraceId(), e); } });
} catch (Exception e) { log.error("发送链路追踪数据异常: traceId={}", traceData.getTraceId(), e); } }
private boolean shouldSample(TraceData traceData) { if (traceData.getTags().containsKey("error")) { return true; }
if (traceData.getDuration() > 1000) { return true; }
return Math.random() < 0.1; }
private void preprocessTraceData(TraceData traceData) { Map<String, String> tags = traceData.getTags(); if (tags.containsKey("http.url")) { String url = tags.get("http.url"); url = url.replaceAll("password=[^&]*", "password=***"); url = url.replaceAll("token=[^&]*", "token=***"); tags.put("http.url", url); }
tags.put("service.name", getServiceName()); tags.put("service.version", getServiceVersion()); tags.put("host.name", getHostName()); tags.put("host.ip", getHostIp());
traceData.setTimestamp(System.currentTimeMillis()); }
private void storeTraceToRedis(TraceData traceData) { String key = "trace:" + traceData.getTraceId(); redisTemplate.opsForValue().set(key, traceData, Duration.ofHours(1));
String spanKey = "span:" + traceData.getSpanId(); redisTemplate.opsForValue().set(spanKey, traceData, Duration.ofHours(1)); }
private void sendTraceToQueue(TraceData traceData) { TraceMessage message = new TraceMessage(); message.setTraceId(traceData.getTraceId()); message.setSpanId(traceData.getSpanId()); message.setParentSpanId(traceData.getParentSpanId()); message.setOperationName(traceData.getTags().get("operation")); message.setStartTime(traceData.getStartTime()); message.setDuration(traceData.getDuration()); message.setTags(traceData.getTags()); message.setTimestamp(System.currentTimeMillis());
rabbitTemplate.convertAndSend("trace.exchange", "trace.data", message); }
public List<TraceData> queryTraces(TraceQuery query) { List<TraceData> traces = new ArrayList<>();
try { if (query.getTraceId() != null) { TraceData trace = (TraceData) redisTemplate.opsForValue().get("trace:" + query.getTraceId()); if (trace != null) { traces.add(trace); } }
if (traces.isEmpty()) { traces = traceDataRepository.queryTraces(query); }
return buildCompleteTrace(traces);
} catch (Exception e) { log.error("查询链路追踪数据失败: query={}", query, e); return traces; } }
private List<TraceData> buildCompleteTrace(List<TraceData> traces) { if (traces.isEmpty()) { return traces; }
String traceId = traces.get(0).getTraceId();
List<TraceData> allSpans = traceDataRepository.findByTraceId(traceId);
allSpans.sort(Comparator.comparing(TraceData::getStartTime));
return allSpans; } }
|
三、集成主流追踪系统
3.1 SkyWalking集成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| @Configuration @EnableSkyWalking public class SkyWalkingConfig {
@Bean public SkyWalkingTracing skyWalkingTracing() { return new SkyWalkingTracing(); } }
@Component @Slf4j public class SkyWalkingTracing {
private final ContextManager contextManager;
public SkyWalkingTracing() { this.contextManager = ContextManager.getInstance(); }
public void createTrace(String operationName) { AbstractSpan span = contextManager.createEntrySpan(operationName, null); span.setComponent(ComponentsDefine.SPRING_MVC); span.tag("operation", operationName); }
public void createChildSpan(String operationName) { AbstractSpan span = contextManager.createLocalSpan(operationName); span.setComponent(ComponentsDefine.SPRING_MVC); span.tag("operation", operationName); }
public void finishSpan() { contextManager.stopSpan(); }
public void addTag(String key, String value) { AbstractSpan span = contextManager.activeSpan(); if (span != null) { span.tag(key, value); } }
public void addLog(String event, String message) { AbstractSpan span = contextManager.activeSpan(); if (span != null) { span.log(System.currentTimeMillis(), event, message); } }
public void recordException(Throwable throwable) { AbstractSpan span = contextManager.activeSpan(); if (span != null) { span.log(throwable); } } }
|
3.2 Zipkin集成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| @Configuration @EnableZipkinServer public class ZipkinConfig {
@Bean public Sender sender() { return OkHttpSender.create("http://localhost:9411/api/v2/spans"); }
@Bean public AsyncReporter<Span> spanReporter() { return AsyncReporter.create(sender()); }
@Bean public Tracing tracing() { return Tracing.newBuilder() .localServiceName("user-service") .spanReporter(spanReporter()) .sampler(Sampler.create(0.1f)) .build(); } }
@Component @Slf4j public class ZipkinTracing {
@Autowired private Tracing tracing;
public Span createTrace(String operationName) { Tracer tracer = tracing.tracer(); Span span = tracer.nextSpan() .name(operationName) .tag("operation", operationName) .start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) { return span; } }
public Span createChildSpan(String operationName) { Tracer tracer = tracing.tracer(); Span span = tracer.nextSpan() .name(operationName) .tag("operation", operationName) .start();
return span; }
public void finishSpan(Span span) { if (span != null) { span.finish(); } }
public void addTag(Span span, String key, String value) { if (span != null) { span.tag(key, value); } }
public void recordException(Span span, Throwable throwable) { if (span != null) { span.error(throwable); } } }
|
3.3 Jaeger集成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| @Configuration public class JaegerConfig {
@Bean public Tracer jaegerTracer() { return new Configuration("user-service") .withSampler(new Configuration.SamplerConfiguration() .withType("const") .withParam(1)) .withReporter(new Configuration.ReporterConfiguration() .withLogSpans(true) .withSender(new Configuration.SenderConfiguration() .withEndpoint("http://localhost:14268/api/traces"))) .getTracer(); } }
@Component @Slf4j public class JaegerTracing {
@Autowired private Tracer tracer;
public Span createTrace(String operationName) { Span span = tracer.buildSpan(operationName) .withTag("operation", operationName) .start();
return span; }
public Span createChildSpan(String operationName, Span parentSpan) { Span span = tracer.buildSpan(operationName) .asChildOf(parentSpan) .withTag("operation", operationName) .start();
return span; }
public void finishSpan(Span span) { if (span != null) { span.finish(); } }
public void addTag(Span span, String key, String value) { if (span != null) { span.setTag(key, value); } }
public void recordException(Span span, Throwable throwable) { if (span != null) { span.setTag("error", true); span.log(Map.of("event", "error", "error.object", throwable)); } } }
|
四、链路分析引擎
4.1 链路分析服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
| @Service @Slf4j public class TraceAnalysisService {
@Autowired private TraceDataRepository traceDataRepository;
@Autowired private RedisTemplate<String, Object> redisTemplate;
public TraceAnalysisResult analyzeTracePerformance(String traceId) { List<TraceData> spans = traceDataRepository.findByTraceId(traceId);
TraceAnalysisResult result = new TraceAnalysisResult(); result.setTraceId(traceId); result.setTotalSpans(spans.size()); result.setTotalDuration(calculateTotalDuration(spans)); result.setCriticalPath(findCriticalPath(spans)); result.setBottlenecks(findBottlenecks(spans)); result.setErrorSpans(findErrorSpans(spans));
return result; }
private long calculateTotalDuration(List<TraceData> spans) { if (spans.isEmpty()) { return 0; }
long minStartTime = spans.stream() .mapToLong(TraceData::getStartTime) .min() .orElse(0);
long maxEndTime = spans.stream() .mapToLong(span -> span.getStartTime() + span.getDuration()) .max() .orElse(0);
return maxEndTime - minStartTime; }
private List<TraceData> findCriticalPath(List<TraceData> spans) { Map<String, List<TraceData>> spanTree = buildSpanTree(spans);
TraceData rootSpan = spans.stream() .filter(span -> span.getParentSpanId() == null) .findFirst() .orElse(null);
if (rootSpan == null) { return new ArrayList<>(); }
return findLongestPath(rootSpan, spanTree); }
private List<TraceData> findBottlenecks(List<TraceData> spans) { return spans.stream() .filter(span -> span.getDuration() > 1000) .sorted(Comparator.comparing(TraceData::getDuration).reversed()) .limit(5) .collect(Collectors.toList()); }
private List<TraceData> findErrorSpans(List<TraceData> spans) { return spans.stream() .filter(span -> span.getTags().containsKey("error")) .collect(Collectors.toList()); }
private Map<String, List<TraceData>> buildSpanTree(List<TraceData> spans) { Map<String, List<TraceData>> tree = new HashMap<>();
for (TraceData span : spans) { String parentId = span.getParentSpanId(); if (parentId != null) { tree.computeIfAbsent(parentId, k -> new ArrayList<>()).add(span); } }
return tree; }
private List<TraceData> findLongestPath(TraceData rootSpan, Map<String, List<TraceData>> spanTree) { List<TraceData> longestPath = new ArrayList<>(); longestPath.add(rootSpan);
List<TraceData> children = spanTree.get(rootSpan.getSpanId()); if (children != null && !children.isEmpty()) { TraceData longestChild = children.stream() .max(Comparator.comparing(TraceData::getDuration)) .orElse(null);
if (longestChild != null) { longestPath.addAll(findLongestPath(longestChild, spanTree)); } }
return longestPath; }
public ServiceDependencyResult analyzeServiceDependency(String serviceName, LocalDateTime startTime, LocalDateTime endTime) { List<TraceData> traces = traceDataRepository.findByServiceAndTimeRange(serviceName, startTime, endTime);
ServiceDependencyResult result = new ServiceDependencyResult(); result.setServiceName(serviceName); result.setTotalTraces(traces.size()); result.setDependencies(extractDependencies(traces)); result.setPerformanceMetrics(calculatePerformanceMetrics(traces));
return result; }
private Map<String, DependencyInfo> extractDependencies(List<TraceData> traces) { Map<String, DependencyInfo> dependencies = new HashMap<>();
for (TraceData trace : traces) { String operation = trace.getTags().get("operation"); if (operation != null && operation.contains(".")) { String[] parts = operation.split("\\."); if (parts.length >= 2) { String targetService = parts[0];
DependencyInfo info = dependencies.computeIfAbsent(targetService, k -> new DependencyInfo()); info.setServiceName(targetService); info.setCallCount(info.getCallCount() + 1); info.setTotalDuration(info.getTotalDuration() + trace.getDuration()); info.setErrorCount(info.getErrorCount() + (trace.getTags().containsKey("error") ? 1 : 0)); } } }
return dependencies; }
private PerformanceMetrics calculatePerformanceMetrics(List<TraceData> traces) { PerformanceMetrics metrics = new PerformanceMetrics();
if (traces.isEmpty()) { return metrics; }
List<Long> durations = traces.stream() .map(TraceData::getDuration) .sorted() .collect(Collectors.toList());
metrics.setMinDuration(durations.get(0)); metrics.setMaxDuration(durations.get(durations.size() - 1)); metrics.setAvgDuration(durations.stream().mapToLong(Long::longValue).sum() / durations.size()); metrics.setP50Duration(durations.get(durations.size() / 2)); metrics.setP95Duration(durations.get((int) (durations.size() * 0.95))); metrics.setP99Duration(durations.get((int) (durations.size() * 0.99)));
return metrics; } }
|
4.2 实时监控服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
| @Service @Slf4j public class TraceMonitoringService {
@Autowired private RedisTemplate<String, Object> redisTemplate;
@Autowired private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "trace.monitor.queue") public void monitorTrace(TraceMessage message) { try { if (message.getTags().containsKey("error")) { handleTraceError(message); }
if (message.getDuration() > 1000) { handleSlowTrace(message); }
updateTraceStatistics(message);
} catch (Exception e) { log.error("监控链路异常: traceId={}", message.getTraceId(), e); } }
private void handleTraceError(TraceMessage message) { log.error("链路执行错误: traceId={}, spanId={}, error={}", message.getTraceId(), message.getSpanId(), message.getTags().get("error.message"));
AlertMessage alert = new AlertMessage(); alert.setType("TRACE_ERROR"); alert.setServiceName(message.getTags().get("service.name")); alert.setTraceId(message.getTraceId()); alert.setErrorMessage(message.getTags().get("error.message")); alert.setTimestamp(System.currentTimeMillis());
rabbitTemplate.convertAndSend("alert.exchange", "alert.trace.error", alert);
String errorKey = "trace:error:" + message.getTags().get("service.name"); redisTemplate.opsForValue().increment(errorKey); redisTemplate.expire(errorKey, Duration.ofHours(1)); }
private void handleSlowTrace(TraceMessage message) { log.warn("链路执行缓慢: traceId={}, spanId={}, duration={}ms", message.getTraceId(), message.getSpanId(), message.getDuration());
AlertMessage alert = new AlertMessage(); alert.setType("SLOW_TRACE"); alert.setServiceName(message.getTags().get("service.name")); alert.setTraceId(message.getTraceId()); alert.setDuration(message.getDuration()); alert.setTimestamp(System.currentTimeMillis());
rabbitTemplate.convertAndSend("alert.exchange", "alert.slow.trace", alert);
String slowKey = "trace:slow:" + message.getTags().get("service.name"); redisTemplate.opsForValue().increment(slowKey); redisTemplate.expire(slowKey, Duration.ofHours(1)); }
private void updateTraceStatistics(TraceMessage message) { String serviceName = message.getTags().get("service.name"); String operationName = message.getOperationName();
String serviceKey = "trace:service:" + serviceName; redisTemplate.opsForValue().increment(serviceKey); redisTemplate.expire(serviceKey, Duration.ofHours(1));
String operationKey = "trace:operation:" + serviceName + ":" + operationName; redisTemplate.opsForValue().increment(operationKey); redisTemplate.expire(operationKey, Duration.ofHours(1));
String durationKey = "trace:duration:" + serviceName; redisTemplate.opsForList().leftPush(durationKey, message.getDuration()); redisTemplate.opsForList().trim(durationKey, 0, 999); redisTemplate.expire(durationKey, Duration.ofHours(1)); }
public TraceStatistics getTraceStatistics(String serviceName) { TraceStatistics statistics = new TraceStatistics(); statistics.setServiceName(serviceName); statistics.setTimestamp(System.currentTimeMillis());
String serviceKey = "trace:service:" + serviceName; Object totalCount = redisTemplate.opsForValue().get(serviceKey); statistics.setTotalRequests(totalCount != null ? (Integer) totalCount : 0);
String errorKey = "trace:error:" + serviceName; Object errorCount = redisTemplate.opsForValue().get(errorKey); statistics.setErrorCount(errorCount != null ? (Integer) errorCount : 0);
String slowKey = "trace:slow:" + serviceName; Object slowCount = redisTemplate.opsForValue().get(slowKey); statistics.setSlowRequestCount(slowCount != null ? (Integer) slowCount : 0);
String durationKey = "trace:duration:" + serviceName; List<Object> durations = redisTemplate.opsForList().range(durationKey, 0, -1); if (durations != null && !durations.isEmpty()) { double avgDuration = durations.stream() .mapToDouble(d -> ((Integer) d).doubleValue()) .average() .orElse(0.0); statistics.setAvgDuration(avgDuration); }
return statistics; } }
|
五、可视化展示
5.1 链路追踪控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
| @RestController @RequestMapping("/api/trace") @Slf4j public class TraceController {
@Autowired private TracingService tracingService;
@Autowired private TraceAnalysisService traceAnalysisService;
@Autowired private TraceMonitoringService traceMonitoringService;
@GetMapping("/query") public ResponseEntity<TraceQueryResult> queryTraces( @RequestParam(required = false) String traceId, @RequestParam(required = false) String serviceName, @RequestParam(required = false) String operationName, @RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime, @RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime, @RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "20") int size) {
try { TraceQuery query = new TraceQuery(); query.setTraceId(traceId); query.setServiceName(serviceName); query.setOperationName(operationName); query.setStartTime(startTime); query.setEndTime(endTime); query.setPage(page); query.setSize(size);
List<TraceData> traces = tracingService.queryTraces(query);
TraceQueryResult result = new TraceQueryResult(); result.setTraces(traces); result.setTotal(traces.size()); result.setPage(page); result.setSize(size);
return ResponseEntity.ok(result);
} catch (Exception e) { log.error("查询链路追踪数据失败", e); return ResponseEntity.status(500).build(); } }
@GetMapping("/detail/{traceId}") public ResponseEntity<TraceDetail> getTraceDetail(@PathVariable String traceId) { try { List<TraceData> spans = tracingService.queryTraces(new TraceQuery(traceId));
if (spans.isEmpty()) { return ResponseEntity.notFound().build(); }
TraceDetail detail = new TraceDetail(); detail.setTraceId(traceId); detail.setSpans(spans); detail.setAnalysis(traceAnalysisService.analyzeTracePerformance(traceId));
return ResponseEntity.ok(detail);
} catch (Exception e) { log.error("获取链路详情失败: traceId={}", traceId, e); return ResponseEntity.status(500).build(); } }
@GetMapping("/dependency/{serviceName}") public ResponseEntity<ServiceDependencyResult> getServiceDependency( @PathVariable String serviceName, @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime, @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime) {
try { ServiceDependencyResult result = traceAnalysisService.analyzeServiceDependency( serviceName, startTime, endTime);
return ResponseEntity.ok(result);
} catch (Exception e) { log.error("获取服务依赖关系失败: serviceName={}", serviceName, e); return ResponseEntity.status(500).build(); } }
@GetMapping("/statistics/{serviceName}") public ResponseEntity<TraceStatistics> getTraceStatistics(@PathVariable String serviceName) { try { TraceStatistics statistics = traceMonitoringService.getTraceStatistics(serviceName); return ResponseEntity.ok(statistics);
} catch (Exception e) { log.error("获取统计信息失败: serviceName={}", serviceName, e); return ResponseEntity.status(500).build(); } }
@GetMapping("/topology") public ResponseEntity<TraceTopology> getTraceTopology( @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime, @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime) {
try { TraceTopology topology = traceAnalysisService.buildTraceTopology(startTime, endTime); return ResponseEntity.ok(topology);
} catch (Exception e) { log.error("获取链路拓扑图失败", e); return ResponseEntity.status(500).build(); } } }
|
5.2 前端展示组件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
| class TraceVisualization { constructor(containerId) { this.container = document.getElementById(containerId); this.traceData = null; this.init(); }
init() { this.renderSearchForm(); this.renderTraceList(); this.renderTraceDetail(); }
renderSearchForm() { const searchForm = ` <div class="search-form"> <div class="form-group"> <label>TraceId:</label> <input type="text" id="traceId" placeholder="输入TraceId"> </div> <div class="form-group"> <label>服务名称:</label> <input type="text" id="serviceName" placeholder="输入服务名称"> </div> <div class="form-group"> <label>操作名称:</label> <input type="text" id="operationName" placeholder="输入操作名称"> </div> <div class="form-group"> <label>开始时间:</label> <input type="datetime-local" id="startTime"> </div> <div class="form-group"> <label>结束时间:</label> <input type="datetime-local" id="endTime"> </div> <button onclick="traceVisualization.searchTraces()">查询</button> </div> `;
this.container.innerHTML = searchForm; }
renderTraceList() { const traceList = ` <div class="trace-list"> <h3>链路列表</h3> <div id="traceListContent"></div> </div> `;
this.container.innerHTML += traceList; }
renderTraceDetail() { const traceDetail = ` <div class="trace-detail"> <h3>链路详情</h3> <div id="traceDetailContent"></div> </div> `;
this.container.innerHTML += traceDetail; }
async searchTraces() { const params = { traceId: document.getElementById('traceId').value, serviceName: document.getElementById('serviceName').value, operationName: document.getElementById('operationName').value, startTime: document.getElementById('startTime').value, endTime: document.getElementById('endTime').value };
try { const response = await fetch('/api/trace/query?' + new URLSearchParams(params)); const result = await response.json();
this.renderTraceListContent(result.traces);
} catch (error) { console.error('查询链路失败:', error); } }
renderTraceListContent(traces) { const content = document.getElementById('traceListContent');
if (traces.length === 0) { content.innerHTML = '<p>没有找到相关链路</p>'; return; }
const table = ` <table class="trace-table"> <thead> <tr> <th>TraceId</th> <th>服务名称</th> <th>操作名称</th> <th>开始时间</th> <th>耗时(ms)</th> <th>状态</th> <th>操作</th> </tr> </thead> <tbody> ${traces.map(trace => ` <tr> <td>${trace.traceId}</td> <td>${trace.tags['service.name'] || '-'}</td> <td>${trace.tags['operation'] || '-'}</td> <td>${new Date(trace.startTime).toLocaleString()}</td> <td>${trace.duration}</td> <td>${trace.tags['error'] ? '错误' : '正常'}</td> <td> <button onclick="traceVisualization.showTraceDetail('${trace.traceId}')"> 查看详情 </button> </td> </tr> `).join('')} </tbody> </table> `;
content.innerHTML = table; }
async showTraceDetail(traceId) { try { const response = await fetch(`/api/trace/detail/${traceId}`); const detail = await response.json();
this.renderTraceDetailContent(detail);
} catch (error) { console.error('获取链路详情失败:', error); } }
renderTraceDetailContent(detail) { const content = document.getElementById('traceDetailContent');
const detailHtml = ` <div class="trace-detail-content"> <h4>链路信息</h4> <p><strong>TraceId:</strong> ${detail.traceId}</p> <p><strong>总Span数:</strong> ${detail.spans.length}</p> <p><strong>总耗时:</strong> ${detail.analysis.totalDuration}ms</p>
<h4>Span列表</h4> <div class="span-list"> ${detail.spans.map(span => ` <div class="span-item"> <div class="span-header"> <span class="span-operation">${span.tags['operation'] || '-'}</span> <span class="span-duration">${span.duration}ms</span> <span class="span-status ${span.tags['error'] ? 'error' : 'success'}"> ${span.tags['error'] ? '错误' : '成功'} </span> </div> <div class="span-details"> <p><strong>SpanId:</strong> ${span.spanId}</p> <p><strong>父SpanId:</strong> ${span.parentSpanId || '-'}</p> <p><strong>开始时间:</strong> ${new Date(span.startTime).toLocaleString()}</p> ${span.tags['error'] ? `<p><strong>错误信息:</strong> ${span.tags['error.message']}</p>` : ''} </div> </div> `).join('')} </div>
<h4>性能分析</h4> <div class="performance-analysis"> <p><strong>关键路径:</strong> ${detail.analysis.criticalPath.length} 个Span</p> <p><strong>性能瓶颈:</strong> ${detail.analysis.bottlenecks.length} 个</p> <p><strong>错误Span:</strong> ${detail.analysis.errorSpans.length} 个</p> </div> </div> `;
content.innerHTML = detailHtml; } }
const traceVisualization = new TraceVisualization('traceContainer');
|
六、性能优化
6.1 采样策略优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| @Component @Slf4j public class SamplingStrategy {
private final Map<String, Double> serviceSamplingRates = new HashMap<>(); private final Map<String, Long> serviceRequestCounts = new HashMap<>();
public boolean shouldSample(String serviceName, String operationName, Map<String, String> tags) { if (tags.containsKey("error")) { return true; }
if (tags.containsKey("duration")) { long duration = Long.parseLong(tags.get("duration")); if (duration > 1000) { return true; } }
if (isCriticalOperation(operationName)) { return true; }
double samplingRate = calculateAdaptiveSamplingRate(serviceName); return Math.random() < samplingRate; }
private double calculateAdaptiveSamplingRate(String serviceName) { long requestCount = serviceRequestCounts.getOrDefault(serviceName, 0L);
if (requestCount < 100) { return 1.0; } else if (requestCount < 1000) { return 0.5; } else if (requestCount < 10000) { return 0.1; } else { return 0.01; } }
private boolean isCriticalOperation(String operationName) { return operationName.contains("login") || operationName.contains("payment") || operationName.contains("order") || operationName.contains("user"); }
public void updateServiceRequestCount(String serviceName) { serviceRequestCounts.merge(serviceName, 1L, Long::sum); }
@Scheduled(fixedRate = 60000) public void resetServiceRequestCounts() { serviceRequestCounts.clear(); } }
|
6.2 数据压缩与存储优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| @Component @Slf4j public class TraceDataCompressor {
private final GZIPOutputStream gzipOutputStream; private final ByteArrayOutputStream byteArrayOutputStream;
public TraceDataCompressor() { this.byteArrayOutputStream = new ByteArrayOutputStream(); try { this.gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream); } catch (IOException e) { throw new RuntimeException("初始化压缩器失败", e); } }
public byte[] compressTraceData(TraceData traceData) { try { String json = objectMapper.writeValueAsString(traceData);
byte[] compressed = compress(json.getBytes());
double compressionRatio = (double) compressed.length / json.getBytes().length; log.debug("数据压缩比: {:.2f}", compressionRatio);
return compressed;
} catch (Exception e) { log.error("压缩链路数据失败", e); return null; } }
public TraceData decompressTraceData(byte[] compressedData) { try { byte[] decompressed = decompress(compressedData);
return objectMapper.readValue(decompressed, TraceData.class);
} catch (Exception e) { log.error("解压缩链路数据失败", e); return null; } }
private byte[] compress(byte[] data) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (GZIPOutputStream gzos = new GZIPOutputStream(baos)) { gzos.write(data); } return baos.toByteArray(); }
private byte[] decompress(byte[] compressedData) throws IOException { ByteArrayInputStream bais = new ByteArrayInputStream(compressedData); ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (GZIPInputStream gzis = new GZIPInputStream(bais)) { byte[] buffer = new byte[1024]; int len; while ((len = gzis.read(buffer)) != -1) { baos.write(buffer, 0, len); } } return baos.toByteArray(); } }
|
七、监控告警
7.1 告警规则配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| groups: - name: trace-alerts rules: - alert: HighErrorRate expr: rate(trace_error_count[5m]) / rate(trace_total_count[5m]) > 0.05 for: 2m labels: severity: warning annotations: summary: "链路错误率过高" description: "服务 {{ $labels.service }} 的错误率为 {{ $value | humanizePercentage }}"
- alert: SlowTraceRate expr: rate(trace_slow_count[5m]) / rate(trace_total_count[5m]) > 0.1 for: 2m labels: severity: warning annotations: summary: "慢链路比例过高" description: "服务 {{ $labels.service }} 的慢链路比例为 {{ $value | humanizePercentage }}"
- alert: TraceVolumeAbnormal expr: rate(trace_total_count[5m]) > 1000 for: 1m labels: severity: critical annotations: summary: "链路请求量异常" description: "服务 {{ $labels.service }} 的请求量为 {{ $value }} req/s"
|
7.2 告警处理服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
| @Service @Slf4j public class TraceAlertService {
@Autowired private NotificationService notificationService;
@Autowired private RedisTemplate<String, Object> redisTemplate;
@EventListener public void handleTraceAlert(TraceAlertEvent event) { try { log.warn("链路告警: type={}, service={}, message={}", event.getType(), event.getServiceName(), event.getMessage());
sendAlertNotification(event);
updateAlertStatistics(event);
if (event.getType() == AlertType.CRITICAL) { triggerAutoFix(event); }
} catch (Exception e) { log.error("处理链路告警失败", e); } }
private void sendAlertNotification(TraceAlertEvent event) { AlertNotification notification = new AlertNotification(); notification.setType(event.getType()); notification.setServiceName(event.getServiceName()); notification.setMessage(event.getMessage()); notification.setTimestamp(System.currentTimeMillis()); notification.setRecipients(getAlertRecipients(event.getServiceName()));
notificationService.sendAlert(notification); }
private void updateAlertStatistics(TraceAlertEvent event) { String key = "alert:count:" + event.getServiceName() + ":" + event.getType(); redisTemplate.opsForValue().increment(key); redisTemplate.expire(key, Duration.ofHours(24)); }
private void triggerAutoFix(TraceAlertEvent event) { if (canAutoFix(event)) { CompletableFuture.runAsync(() -> { try { executeAutoFix(event); } catch (Exception e) { log.error("自动修复失败", e); } }); } }
private boolean canAutoFix(TraceAlertEvent event) { if (event.getType() != AlertType.CRITICAL) { return false; }
String autoFixKey = "autofix:enabled:" + event.getServiceName(); Boolean enabled = (Boolean) redisTemplate.opsForValue().get(autoFixKey); return enabled != null && enabled; }
private void executeAutoFix(TraceAlertEvent event) { if (event.getMessage().contains("服务不可用")) { restartService(event.getServiceName()); }
if (event.getMessage().contains("缓存异常")) { clearCache(event.getServiceName()); }
if (event.getMessage().contains("请求量过高")) { scaleService(event.getServiceName()); } }
private void restartService(String serviceName) { log.info("自动重启服务: {}", serviceName); }
private void clearCache(String serviceName) { log.info("自动清理缓存: {}", serviceName); }
private void scaleService(String serviceName) { log.info("自动扩容服务: {}", serviceName); } }
|
八、最佳实践总结
8.1 实施建议
- 渐进式实施:从核心服务开始,逐步扩展到所有服务
- 采样策略:根据业务特点调整采样率,平衡性能与数据完整性
- 存储优化:使用压缩和分片存储,控制存储成本
- 监控告警:建立完善的监控体系,及时发现问题
8.2 性能优化
- 异步处理:链路数据收集和存储采用异步方式
- 批量处理:批量发送链路数据,减少网络开销
- 缓存策略:使用Redis缓存热点数据,提升查询性能
- 索引优化:为常用查询字段建立索引
8.3 运维建议
- 定期清理:定期清理过期的链路数据
- 容量规划:根据业务增长规划存储容量
- 备份策略:建立链路数据的备份和恢复机制
- 文档维护:维护详细的运维文档和故障处理手册
九、总结
微服务链路日志追踪是微服务架构中不可或缺的可观测性工具。通过合理的架构设计、完善的实现方案和有效的监控告警,可以构建一个稳定、高效、易用的链路追踪系统。
关键要点:
- 架构设计:清晰的数据模型和组件设计
- 实现方案:多种埋点方式和集成方案
- 性能优化:采样策略和存储优化
- 监控告警:实时监控和自动告警
- 可视化展示:直观的链路展示和分析
通过本文的实践指导,读者可以快速搭建企业级的链路追踪系统,为微服务架构的可观测性提供强有力的技术支撑。