第311集:ELK/EFK日志采集分析架构师实战

在企业级分布式系统中,日志管理是运维和故障排查的核心环节。ELK(Elasticsearch + Logstash + Kibana)和EFK(Elasticsearch + Fluentd + Kibana)是目前最主流的日志采集、存储和分析解决方案。本文将从架构师的角度,深入解析完整的日志系统设计与高可用部署。


一、ELK/EFK架构核心组件

1.1 Elasticsearch:分布式搜索与分析引擎

核心特性:

  • 分布式架构:天然支持集群部署,水平扩展能力强
  • 全文搜索:基于Lucene构建,支持复杂的全文检索
  • 实时索引:近实时的数据索引和搜索能力
  • RESTful API:丰富的API支持各种操作
  • 灵活的文档结构:JSON格式存储,无需预定义schema

架构设计要点:

1
2
3
4
5
6
7
8
9
10
11
12
13
集群配置示例:
cluster.name: production-logs
node.name: node-1
node.roles: [master, data, ingest]
network.host: 0.0.0.0
discovery.seed_hosts: ["10.0.1.10", "10.0.1.11", "10.0.1.12"]
cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]

索引分片策略:
number_of_shards: 5 # 主分片数,一旦创建不可修改
number_of_replicas: 2 # 副本分片数,可动态调整
refresh_interval: 30s # 索引刷新间隔
max_result_window: 50000 # 最大结果窗口

1.2 Logstash:日志采集与数据管道

核心功能:

  • 数据采集:从多种数据源采集日志
  • 数据转换:使用丰富的Filter插件处理数据
  • 数据输出:将处理后的数据发送到各种目标

典型Pipeline配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# logstash.conf
input {
file {
path => "/var/log/app/*.log"
start_position => "beginning"
codec => json
}

kafka {
bootstrap_servers => "10.0.1.10:9092"
topics => ["application-logs"]
consumer_threads => 4
}
}

filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}" }
}

date {
match => [ "timestamp", "yyyy-MM-dd HH:mm:ss" ]
target => "@timestamp"
}

mutate {
convert => { "response_time" => "float" }
rename => { "client_ip" => "ip" }
remove_field => [ "host" ]
}

geoip {
source => "ip"
target => "geoip"
}

useragent {
source => "user_agent"
target => "ua"
}
}

output {
elasticsearch {
hosts => ["http://10.0.1.10:9200", "http://10.0.1.11:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
template_name => "app-logs-template"
}

stdout {
codec => rubydebug
}
}

1.3 Fluentd:轻量级数据采集器

核心特性:

  • 轻量级:资源占用小,适合作为轻量级日志代理
  • 插件丰富:超过500个社区插件
  • 高可靠性:内置缓冲和重试机制
  • 配置简单:通过标签路由实现灵活配置

Fluentd配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<source>
@type tail
path /var/log/nginx/access.log
pos_file /var/log/fluentd/nginx_access.log.pos
tag nginx.access
format nginx
</source>

<filter nginx.access>
@type record_transformer
<record>
hostname "#{Socket.gethostname}"
environment "production"
</record>
</filter>

<match nginx.access>
@type elasticsearch
host 10.0.1.10
port 9200
index_name nginx-access
include_tag_key true
tag_key @log_name
flush_interval 10s
retry_limit 5
retry_wait 1s
</match>

1.4 Kibana:数据可视化平台

核心功能:

  • Discover:交互式数据探索
  • Dashboard:自定义可视化面板
  • Dev Tools:Elasticsearch API调试
  • Monitoring:集群监控和告警
  • Machine Learning:异常检测和预测分析

二、ELK/EFK架构选型对比

2.1 ELK vs EFK对比

特性 ELK (Logstash) EFK (Fluentd)
资源占用 高(基于JVM,需1GB+内存) 低(C/Ruby实现,几MB内存)
性能 功能强大,但较重 轻量级,性能优秀
插件生态 丰富的官方和社区插件 超过500个社区插件
配置复杂度 配置灵活但复杂 配置简单清晰
适用场景 重量级日志处理 轻量级日志采集,容器化环境
缓冲机制 内置内存队列 内存/文件Buffer,可靠性高

2.2 混合架构(推荐)

在实际生产环境中,推荐采用混合架构:

  • 应用层:使用轻量级Fluentd/Filebeat采集
  • 缓冲层:使用Kafka/Redis缓存
  • 处理层:使用Logstash进行复杂的数据转换
  • 存储层:Elasticsearch集群存储
  • 展示层:Kibana可视化

三、Elasticsearch集群架构设计

3.1 节点角色规划

Elasticsearch节点可以有多种角色组合:

1. 专用Master节点

1
2
3
4
node.name: master-01
node.roles: [master]
node.data: false
node.ingest: false
  • 职责:集群管理、元数据管理、索引创建/删除
  • 配置:至少3个节点(保证仲裁)

2. 专用Data节点

1
2
3
node.name: data-01
node.roles: [data]
node.master: false
  • 职责:数据存储、CRUD操作
  • 配置:根据数据量和查询性能需求决定

3. Coordinating节点

1
2
3
4
node.name: coordinating-01
node.roles: []
node.data: false
node.master: false
  • 职责:请求分发、结果聚合
  • 配置:在查询负载高时可配置

4. Ingest节点

1
2
3
4
node.name: ingest-01
node.roles: [ingest]
node.master: false
node.data: false
  • 职责:数据预处理、pipeline执行
  • 配置:在数据写入前需要复杂处理时配置

3.2 分片设计策略

分片数量规划:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 单分片最大建议50GB
# 分片数量 = 集群中最大节点数 * 每个节点承载的分片数
# 例如:10个节点 * 1个分片/节点 = 10个分片

PUT /logs-2024.01
{
"settings": {
"number_of_shards": 10,
"number_of_replicas": 2,
"index.refresh_interval": "30s",
"index.max_result_window": 50000
}
}

分片策略考虑因素:

  • 数据量增长:单个索引不超过50GB
  • 查询性能:分片过多会增加协调开销
  • 集群规模:分片数应与节点数匹配
  • 副本策略:生产环境至少2个副本

3.3 高可用与容灾设计

集群配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# elasticsearch.yml
cluster.name: production-cluster
node.name: node-${HOSTNAME}
node.roles: [master, data, ingest]

network.host: 0.0.0.0
http.port: 9200
transport.port: 9300

discovery.seed_hosts: ["10.0.1.10:9300", "10.0.1.11:9300", "10.0.1.12:9300"]
cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]

# 防脑裂配置
discovery.zen.minimum_master_nodes: 2
cluster.remote.connect: true

索引模板与生命周期管理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"index_patterns": ["logs-*"],
"settings": {
"number_of_shards": 5,
"number_of_replicas": 2,
"index.lifecycle.name": "log-lifecycle-policy",
"index.routing.allocation.require.datacenter": "hot"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"level": { "type": "keyword" },
"message": { "type": "text", "analyzer": "ik_max_word" },
"fields": {
"properties": {
"customer": { "type": "keyword" }
}
}
}
}
}

四、数据采集架构设计

4.1 多层采集架构

1. 应用层采集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Node.js应用集成Winston-logger
const winston = require('winston');
const Elasticsearch = require('winston-elasticsearch');

const esTransport = new Elasticsearch({
level: 'info',
clientOpts: {
node: 'http://10.0.1.10:9200',
log: 'error'
},
index: 'app-logs',
indexPrefix: 'myapp',
indexPattern: '[myapp.]YYYY.MM.DD',
source: 'myapp',
transformer: (logData) => {
return {
...logData.meta,
severity: logData.level,
message: logData.message,
timestamp: new Date()
};
}
});

const logger = winston.createLogger({
transports: [esTransport]
});

2. 系统层采集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Filebeat配置
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/*.log
fields:
log_type: nginx_access
fields_under_root: true
parsers:
- pattern: '%{IPORHOST:remote_ip} - %{USERNAME:user_name} \[%{HTTPDATE:time_local}\] "%{DATA:request}" %{INT:status} %{INT:body_bytes_sent}'
type: grok

output.kafka:
hosts: ["10.0.1.20:9092"]
topic: "nginx-logs"
partition.round_robin:
reachable_only: false

processors:
- add_host_metadata:
when.not.contains.tags: forwarded

3. 容器层采集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# Docker日志驱动配置
version: '3'
services:
fluentd:
image: fluent/fluentd:v1.16
volumes:
- ./fluentd.conf:/fluentd/etc/fluentd.conf
logging:
driver: "fluentd"
options:
fluentd-address: localhost:24224
tag: "docker.{{.Name}}"

# Kubernetes部署Fluentd
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-config
data:
fluent.conf: |
<source>
@type tail
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
time_format %Y-%m-%dT%H:%M:%S.%NZ
tag kubernetes.*
format json
</source>

<filter kubernetes.**>
@type kubernetes_metadata
</filter>

<match kubernetes.**>
@type elasticsearch
host 10.0.1.10
port 9200
index_name k8s-logs
logstash_format true
</match>

4.2 缓冲队列设计

使用Kafka作为缓冲队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Kafka配置
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: log-kafka
spec:
kafka:
version: 3.5.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
default.replication.factor: 3
min.insync.replicas: 2
log.retention.hours: 168
log.segment.bytes: 1073741824
storage:
type: persistent-claim
size: 500Gi

使用Redis作为缓冲:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Python异步日志处理
import redis
import json
import asyncio

class RedisLogBuffer:
def __init__(self, redis_host='localhost', redis_port=6379):
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True
)

async def push_log(self, index, log_data):
"""推送到Redis队列"""
key = f"logbuffer:{index}"
await self.redis_client.lpush(key, json.dumps(log_data))

async def pop_logs(self, index, count=1000):
"""批量弹出日志"""
key = f"logbuffer:{index}"
logs = await self.redis_client.rpop(key, count)
return [json.loads(log) for log in logs]

五、索引设计最佳实践

5.1 索引命名策略

时间序列索引:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 按天创建索引
index-name-2024.01.01
index-name-2024.01.02
index-name-2024.01.03

# 按周创建索引
index-name-2024-w01
index-name-2024-w02

# 模板配置
PUT _template/logs-template
{
"index_patterns": ["logs-*"],
"template": {
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1
}
}
}

5.2 字段映射优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
{
"mappings": {
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"level": {
"type": "keyword"
},
"message": {
"type": "text",
"analyzer": "ik_max_word",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"status_code": {
"type": "short"
},
"duration": {
"type": "float"
},
"ip": {
"type": "ip"
},
"geoip": {
"properties": {
"location": {
"type": "geo_point"
},
"country": {
"type": "keyword"
}
}
},
"user_agent": {
"type": "text",
"analyzer": "standard"
},
"timestamp": {
"type": "date"
}
}
}
}

字段类型选择建议:

  • keyword:精确匹配、聚合、排序
  • text:全文搜索,会被分词
  • long:大整数值
  • date:日期时间
  • boolean:布尔值
  • geo_point:地理位置

5.3 索引别名管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 创建别名
POST _aliases
{
"actions": [
{
"add": {
"index": "logs-2024.01.01",
"alias": "logs-current"
}
}
]
}

# 滚动别名
POST _aliases
{
"actions": [
{
"remove": {
"index": "logs-2024.01.01",
"alias": "logs-current"
}
},
{
"add": {
"index": "logs-2024.01.02",
"alias": "logs-current"
}
}
]
}

# 查询时使用别名
curl -X GET "localhost:9200/logs-current/_search"

六、数据采集实战

6.1 Nginx日志采集配置

Grok模式解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Logstash配置
filter {
grok {
match => {
"message" => "%{NGINX_ACCESS}"
}
patterns_dir => ["/etc/logstash/patterns"]
}

date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}

geoip {
source => "clientip"
target => "geoip"
}

useragent {
source => "agent"
target => "ua"
}
}

# patterns/nginx
NGINX_ACCESS %{IPORHOST:clientip} - %{USER:user} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URIREF:referrer}|-)"|%{QS:referrer}) %{QS:agent}

6.2 Java应用日志采集

Log4j2配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<Configuration>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %5p %c{1}:%L - %m%n"/>
</Console>

<Gelf name="Gelf" host="udp:10.0.1.10" port="12201">
<Field name="timestamp" pattern="%d{UNIX_MILLIS}"/>
<Field name="level" pattern="%level"/>
<Field name="simpleClassName" pattern="%c{1}"/>
<Field name="className" pattern="%c"/>
<Field name="server" pattern="%host"/>
<Field name="method" pattern="%M"/>
</Gelf>
</Appenders>

<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
<AppenderRef ref="Gelf"/>
</Root>
</Loggers>
</Configuration>

6.3 Python应用日志采集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import logging
from pythonjsonlogger import jsonlogger

class ElasticsearchHandler(logging.Handler):
def __init__(self, es_client, index_name):
super().__init__()
self.es_client = es_client
self.index_name = index_name

def emit(self, record):
log_entry = self.format(record)
try:
self.es_client.index(
index=self.index_name,
body=json.loads(log_entry)
)
except Exception as e:
self.handleError(record)

# 使用示例
from elasticsearch import Elasticsearch

es_client = Elasticsearch(['http://10.0.1.10:9200'])

handler = ElasticsearchHandler(es_client, 'python-logs')
formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s'
)
handler.setFormatter(formatter)

logger = logging.getLogger('myapp')
logger.addHandler(handler)
logger.setLevel(logging.INFO)

七、性能优化策略

7.1 Elasticsearch性能调优

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# jvm.options
-Xms4g # 初始堆内存
-Xmx4g # 最大堆内存
-XX:+UseG1GC # 使用G1垃圾回收器
-XX:MaxGCPauseMillis=200
-XX:+DisableExplicitGC

# elasticsearch.yml
thread_pool.write.size: 16 # 写入线程池
thread_pool.search.size: 16 # 搜索线程池
thread_pool.bulk.size: 16 # Bulk操作线程池

# 索引设置
"settings": {
"index.translog.durability": "async",
"index.translog.sync_interval": "30s",
"index.number_of_replicas": 1, # 写入时先不复制
"index.refresh_interval": "30s", # 降低刷新频率
"index.max_result_window": 10000 # 限制窗口大小
}

7.2 Logstash性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 多Pipeline并行处理
pipelines.yml:
- pipeline.id: nginx
path.config: "/etc/logstash/conf.d/nginx.conf"
pipeline.workers: 4
pipeline.batch.size: 1000
pipeline.batch.delay: 50

- pipeline.id: application
path.config: "/etc/logstash/conf.d/app.conf"
pipeline.workers: 8
pipeline.batch.size: 2000

# JVM调优
/etc/logstash/jvm.options:
-Xms2g
-Xmx2g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=50

7.3 查询优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 使用过滤器替代查询
GET /logs/_search
{
"query": {
"bool": {
"filter": [
{ "term": { "status": "error" } },
{ "range": { "@timestamp": { "gte": "now-1h" } } } }
]
}
}
}

// 避免深度分页
// 使用 search_after
GET /logs/_search
{
"size": 100,
"sort": [
{ "@timestamp": "asc" },
{ "_id": "asc" }
],
"search_after": [ "2024-01-01T00:00:00", "doc_id_123" ]
}

// 使用scroll API进行大数据集查询
GET /logs/_search?scroll=1m
{
"size": 1000,
"query": {
"match_all": {}
}
}

八、监控与告警

8.1 Elasticsearch集群监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from elasticsearch import Elasticsearch
import smtplib
from email.mime.text import MIMEText

class ElasticsearchMonitor:
def __init__(self, es_host):
self.es = Elasticsearch([es_host])

def check_cluster_health(self):
health = self.es.cluster.health()

if health['status'] == 'red':
self.send_alert(f"集群状态异常: {health['status']}")

if health['number_of_pending_tasks'] > 100:
self.send_alert(f"待处理任务过多: {health['number_of_pending_tasks']}")

return health

def check_disk_usage(self):
stats = self.es.cluster.stats()
nodes = self.es.nodes.stats()

for node_id, node_info in nodes['nodes'].items():
disk_usage = node_info['fs']['total']['total_in_bytes']
disk_available = node_info['fs']['total']['available_in_bytes']
usage_percent = (1 - disk_available / disk_usage) * 100

if usage_percent > 85:
self.send_alert(
f"节点 {node_id} 磁盘使用率: {usage_percent:.2f}%"
)

def send_alert(self, message):
# 发送邮件告警
msg = MIMEText(message)
msg['Subject'] = 'Elasticsearch集群告警'
msg['From'] = 'monitor@example.com'
msg['To'] = 'admin@example.com'

smtp = smtplib.SMTP('smtp.example.com')
smtp.send_message(msg)
smtp.quit()

# 定时监控
import schedule
import time

monitor = ElasticsearchMonitor('http://10.0.1.10:9200')

schedule.every(5).minutes.do(monitor.check_cluster_health)
schedule.every(10).minutes.do(monitor.check_disk_usage)

while True:
schedule.run_pending()
time.sleep(1)

8.2 Kibana监控Dashboard

在Kibana中创建监控Dashboard,监控关键指标:

  • 集群健康状态
  • 索引写入/查询速率
  • 节点CPU/内存/磁盘使用
  • 搜索延迟和吞吐量
  • 错误率和失败查询

九、安全配置

9.1 X-Pack安全配置

1
2
3
4
5
6
7
8
9
# elasticsearch.yml
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12

# 创建内置用户
bin/elasticsearch-setup-passwords interactive

9.2 Nginx反向代理配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
upstream elasticsearch {
server 10.0.1.10:9200;
server 10.0.1.11:9200;
server 10.0.1.12:9200;
}

server {
listen 80;
server_name es.example.com;

location / {
proxy_pass http://elasticsearch;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

# 限制访问
allow 10.0.0.0/8;
deny all;
}
}

十、部署架构实践

10.1 Docker Compose部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
version: '3.7'

services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.10.0
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
- xpack.security.enabled=false
ports:
- "9200:9200"
- "9300:9300"
volumes:
- es-data:/usr/share/elasticsearch/data
networks:
- elk

logstash:
image: docker.elastic.co/logstash/logstash:8.10.0
volumes:
- ./logstash/config:/usr/share/logstash/config
- ./logstash/pipeline:/usr/share/logstash/pipeline
ports:
- "5044:5044"
- "9600:9600"
depends_on:
- elasticsearch
networks:
- elk

kibana:
image: docker.elastic.co/kibana/kibana:8.10.0
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
- elasticsearch
networks:
- elk

volumes:
es-data:

networks:
elk:
driver: bridge

10.2 Kubernetes部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# Elasticsearch StatefulSet
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: elasticsearch
spec:
serviceName: elasticsearch
replicas: 3
selector:
matchLabels:
app: elasticsearch
template:
metadata:
labels:
app: elasticsearch
spec:
containers:
- name: elasticsearch
image: docker.elastic.co/elasticsearch/elasticsearch:8.10.0
ports:
- containerPort: 9200
- containerPort: 9300
env:
- name: cluster.name
value: "k8s-logs"
- name: node.roles
value: "master,data,ingest"
- name: ES_JAVA_OPTS
value: "-Xms2g -Xmx2g"
resources:
requests:
memory: 4Gi
cpu: 2000m
limits:
memory: 4Gi
cpu: 2000m
volumeMounts:
- name: data
mountPath: /usr/share/elasticsearch/data
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: fast-ssd
resources:
requests:
storage: 500Gi

十一、故障排查与维护

11.1 常见问题排查

问题1:集群变红(Red)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 检查未分配的分片
GET /_cat/shards?v

# 手动分配分片
POST /_cluster/reroute
{
"commands": [
{
"allocate_primary": {
"index": "logs-2024.01.01",
"shard": 0,
"node": "node-01",
"accept_data_loss": true
}
}
]
}

问题2:磁盘空间不足

1
2
3
4
5
6
7
8
9
10
11
# 查看磁盘使用情况
GET /_cat/nodes?v&h=name,disk.used,disk.avail,disk.total,disk.percent

# 清理旧索引
DELETE /logs-2023.*

# 降低副本数
PUT /logs-*/_settings
{
"number_of_replicas": 0
}

问题3:查询超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 增加超时时间
GET /logs/_search?timeout=30s
{
"query": { "match_all": {} }
}

# 使用更高效的查询方式
GET /logs/_search
{
"size": 0,
"query": {
"bool": {
"should": [
{ "term": { "status": "error" } }
],
"minimum_should_match": 1
}
}
}

11.2 索引生命周期管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 创建ILM策略
PUT _ilm/policy/log-lifecycle-policy
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "7d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"allocate": {
"number_of_replicas": 1
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"allocate": {
"number_of_replicas": 0
}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}

// 应用ILM策略到索引模板
PUT _template/logs-template
{
"index_patterns": ["logs-*"],
"settings": {
"index.lifecycle.name": "log-lifecycle-policy",
"index.lifecycle.rollover_alias": "logs"
}
}

十二、总结与最佳实践

12.1 架构设计原则

  1. 高可用优先:至少3个Master节点,多副本策略
  2. 容量规划:提前规划数据量和集群规模
  3. 分片策略:合理的分片数量,单个分片不超过50GB
  4. 索引模板:标准化索引模板,便于管理
  5. 生命周期:自动化的索引生命周期管理

12.2 性能优化要点

  1. 硬件配置:使用SSD存储,充足的内存
  2. JVM调优:合理配置堆内存,避免swap
  3. 批量写入:使用bulk API,合理设置批次大小
  4. 查询优化:使用过滤器替代查询,避免深度分页
  5. 缓存利用:利用Elasticsearch的Query Cache和Field Data Cache

12.3 安全与监控

  1. 认证授权:启用X-Pack安全模块
  2. 网络隔离:使用防火墙和VPN
  3. 数据加密:启用传输层加密
  4. 监控告警:实时监控集群健康状态
  5. 定期备份:定期快照备份

12.4 技术栈选择建议

  • 小规模场景(日志量<100GB/天):ELK/EFK三节点
  • 中规模场景(日志量100GB-1TB/天):Elasticsearch集群 + Kafka缓冲
  • 大规模场景(日志量>1TB/天):多集群分离、索引生命周期管理

ELK/EFK日志系统为现代分布式系统提供了强大、灵活、可扩展的日志管理解决方案。掌握其架构设计与实战部署,是架构师的必备技能。通过合理的架构设计、性能优化和运维监控,可以构建出高效稳定的企业级日志系统。


关键词: ELK、EFK、Elasticsearch、Logstash、Fluentd、Kibana、日志采集、日志分析、架构设计、高可用部署、性能优化、容器化部署、Kubernetes