ELK实践之采集并分析Nginx与应用程序日志
# 前言
ELK 是一个开源软件栈,由 Elasticsearch、Logstash 和 Kibana 三个主要组件组成,用于处理和可视化大规模的日志数据。
- Elasticsearch:Elasticsearch 是一个实时的分布式搜索和分析引擎。它基于 Apache Lucene 引擎构建,具有快速的搜索、分析和聚合能力。Elasticsearch 可以处理海量数据,并提供强大的全文搜索、结构化搜索、地理位置搜索等功能。它支持水平扩展和高可用性,并具有 RESTful API,使其易于集成和使用。
- Logstash:Logstash 是一个开源的数据收集和处理工具。它能够从各种来源(如文件、数据库、消息队列等)收集和处理日志数据,并将其转换为可索引的格式,如 JSON。Logstash 提供丰富的插件生态系统,可以进行数据过滤、转换和增强,以适应不同的数据源和需求。
- Kibana:Kibana 是一个用于数据可视化和分析的开源工具。它与 Elasticsearch 紧密集成,提供了一个直观的 Web 界面,用于创建交互式的仪表板、图表和可视化报表。Kibana 可以实时地检索和可视化 Elasticsearch 中的数据,帮助用户理解和分析日志数据,发现趋势、异常和关联关系。
ELK 被广泛用于日志管理和分析领域。它可以帮助组织和企业实时监控系统的运行状况、故障排查、安全分析、业务洞察等。通过将 Logstash 用于数据收集和处理,将数据存储在 Elasticsearch 中,并使用 Kibana 进行数据可视化,用户可以轻松地探索和分析日志数据,提取有价值的信息,并进行故障诊断、性能优化和决策制定等工作。
关于Elasticsearch,可以参考我以前的文章: Elasticsearch实战。
本文主要从实操的角度介绍一下ELK收集Nginx日志和应用程序日志,并对日志进行分析的过程。
# ELK安装
实操之前,需要先安装Elasticsearch (opens new window)和Kibana (opens new window)。
# 采集Nginx日志
我们需要用到Filebeat,它和 Logstash 都是用于日志数据收集和传输的工具,但它们在功能和设计上有一些区别。
Filebeat 是 Elastic 公司提供的一款轻量级日志数据收集器。它专注于实时读取和传输日志文件,将日志数据发送到指定的目标,通常是通过网络将数据发送给中央日志存储或分析系统。Filebeat 部署简单、资源消耗低,并具有高效的数据传输能力。它支持多种日志文件格式,能够监视文件的变化并实时发送新的日志事件。Filebeat 本身不具备数据处理和转换的能力,它的主要职责是快速、可靠地将日志数据传输到下游处理工具,如 Elasticsearch 或 Logstash。
Logstash 是 Elastic 公司提供的一个功能强大的数据收集、处理和传输工具。它是一个可扩展的数据管道,支持从多种数据源(包括文件、数据库、消息队列等)收集数据,进行丰富的数据处理和转换,然后将数据发送到多个目标,如 Elasticsearch、文件、消息队列等。Logstash 的数据处理能力非常丰富,可以进行数据过滤、解析、转换、增强等操作,还支持插件机制,可以通过插件扩展其功能。Logstash 提供了多种输入和输出插件,使其与各种数据源和目标兼容。
# 1. filebeat采集Nginx日志
先创建/etc/filebeat/filebeat.yml
文件,内容如下:
filebeat.inputs:
- type: log
paths:
- /usr/local/nginx/logs/access.log
fields:
input_type_source: nginx
output.logstash:
hosts: ["172.28.0.1:5044"]
假如你的Nginx在/usr/local/nginx/
目录下,那么你需要将/usr/local/nginx/
目录挂载到容器中。
启动filebeat容器:
docker run -d --name filebeat-nginx -v /usr/local/nginx/:/usr/local/nginx/ -v /etc/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml docker.elastic.co/beats/filebeat:7.3.1 filebeat -e -d "publish"
这样我们把Nginx的日志采集到了Logstash中,接下来我们需要配置Logstash。
# 2. Logstash配置
首先,我们需要创建一个/etc/logstash/pipeline/nginx.conf
文件,内容如下:
input {
beats {
port => "5044"
}
}
filter {
// 使用正则表达式模式匹配来解析日志中的字段。匹配到的字段会被添加到事件中,并从事件中移除原始的message字段。
grok {
match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{NUMBER:request_time}\" \"%{NUMBER:upstream_response_time}?\" \"%{DATA:http_host}\" \"%{DATA:request_uri}\" %{NUMBER:status} %{NUMBER:body_bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:http_user_agent}\" \"%{WORD:request_method}\" \"%{GREEDYDATA:request_body}\""}
remove_field => "message"
}
// 用于添加和转换字段
mutate {
// 添加自定义字段,这里是添加了一个字段,值为@timestamp
add_field => { "read_timestamp" => "%{@timestamp}" }
convert => {
"request_time" => "float"
"upstream_response_time" => "float"
"status" => "integer"
"body_bytes_sent" => "integer"
}
}
// 用于解析http_user_agent字段,获取用户代理信息
useragent {
source => "http_user_agent"
target => "nginx_http_user_agent"
}
// 会把time_local字段的值转换为@timestamp的值
date {
match => [ "time_local", "dd/MMM/YYYY:H:m:s Z" ]
remove_field => "time_local"
}
// 用于通过客户端IP地址获取地理位置信息
geoip {
source => "remote_addr"
}
}
output {
elasticsearch {
hosts => [ "elasticsearch:9200" ]
user => "elastic"
password => "elastic"
index => "logstash-%{[input][type]}-%{[fields][input_type_source]}-%{+YYYY.MM.dd}"
}
}
创建/etc/logstash/logstash.yml
文件,内容如下:
http.host: "0.0.0.0"
# 禁用了 X-Pack Monitoring,你可以通过配置 xpack.monitoring.enabled: true 来启用它
xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.hosts: [ "http://elasticsearch:9200" ]
#xpack.monitoring.elasticsearch.username: elastic
#xpack.monitoring.elasticsearch.password: elastic
# 批处理大小
pipeline.batch.size: 1000
# 批处理延时(ms)
pipeline.batch.delay: 500
启动Logstash容器:
docker run --name logstash --restart=on-failure:3 --net=host -d -v /etc/logstash/pipeline/:/usr/share/logstash/pipeline/ -v /etc/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml docker.elastic.co/logstash/logstash:7.3.1
我们创建了一个 Logstash Pipeline,在 Logstash 中,Pipeline(管道)是数据处理的核心概念。它定义了一系列的数据处理步骤,用于接收、处理和转发数据。每个 Pipeline 包含一组插件,按照定义的顺序依次处理数据。
它通常包含以下三个阶段:
- 输入阶段(Input Stage):在输入阶段,Logstash 从指定的数据源接收数据。数据源可以是各种来源,如文件、消息队列、网络服务等。Logstash 提供了多种输入插件,用于接收不同类型的数据。输入插件负责接收数据,并将其传递到下一个阶段。
- 过滤阶段(Filter Stage):在过滤阶段,Logstash 对输入的数据进行处理、过滤和转换。您可以使用各种过滤器插件,如 grok、mutate、date 等,对数据进行解析、添加字段、修改字段值、删除字段等操作。通过配置不同的过滤器插件和定义适当的过滤规则,您可以根据需求对数据进行结构化处理和准备,使其适应后续的操作和存储需求。
- 输出阶段(Output Stage):在输出阶段,经过过滤处理后的数据被发送到指定的目标。目标可以是 Elasticsearch、文件、消息队列、数据库等。Logstash 提供了多种输出插件,用于将数据发送到不同的目标。输出插件负责将数据传递给指定的目标,并完成数据的最终存储或转发。
Logstash Pipeline 的配置文件通常以 .conf
后缀名保存,其中定义了输入插件、过滤器插件和输出插件的配置以及它们的顺序。在运行 Logstash 时,它会加载配置文件,并按照定义的顺序逐步处理数据。
通过定义和配置 Logstash Pipeline,您可以实现灵活的数据处理和转发流程,将数据从不同的来源收集、加工和分发到指定的目标,满足数据集成、清洗、转换和分析的需求。
在上面的例子中,我们定义了一个输入插件,用于接收来自 Filebeat 的数据。然后,我们使用 grok 插件对日志进行解析,使用 mutate 插件对字段进行添加和转换,使用 useragent 插件对用户代理信息进行解析,使用 date 插件对时间进行转换,使用 geoip 插件对客户端 IP 地址进行解析。最后,我们使用 elasticsearch 插件将数据发送到 Elasticsearch 中。
注意:需要开启Elasticsearch的自动创建索引功能,否则logstash会报错。
如果要修改logstash的jvm配置,可以通过-v /etc/logstash/jvm.options:/usr/share/logstash/config/jvm.options
挂载配置文件。前提是你需要先创建/etc/logstash/jvm.options
文件。
# gork介绍
在 Logstash 中,Grok 是一种用于日志解析和结构化的强大模式匹配工具。它允许您根据自定义的模式来解析和提取非结构化的日志数据,并将其转换为结构化的字段。Grok 通过匹配文本模式,将日志中的各个部分解析为具有特定含义的字段,使得日志数据更易于搜索、过滤和分析。
Grok 使用一种简洁而强大的语法来定义模式,其中包含预定义的模式和正则表达式。您可以使用预定义的模式(如数字、IP 地址、日期等)或编写自定义的模式来匹配日志中的特定部分。Grok 的模式匹配基于正则表达式,通过定义模式和命名捕获组,可以将匹配到的文本部分提取为字段。
示例中用到的gork模式:
%{IPORHOST:remote_addr}
:匹配并提取 IP 地址或主机名,将其作为remote_addr
字段。%{DATA:remote_user}
:匹配并提取任意非空白字符序列,将其作为remote_user
字段。%{HTTPDATE:time_local}
:匹配并提取 HTTP 格式的日期和时间,将其作为time_local
字段。%{NUMBER:request_time}
:匹配并提取数字,将其作为request_time
字段。%{NUMBER:upstream_response_time}?
:匹配并提取数字,将其作为upstream_response_time
字段。这里的问号表示该字段是可选的,因为有时候会不存在这个值。%{DATA:http_host}
:匹配并提取任意非空白字符序列,将其作为http_host
字段。注意:因为logstash会自动收集host信息,所以这里一定不能配置为host字段,否则将会被覆盖。%{DATA:request_uri}
:匹配并提取任意非空白字符序列,将其作为request_uri
字段。%{NUMBER:status}
:匹配并提取数字,将其作为status
字段。%{NUMBER:body_bytes_sent}
:匹配并提取数字,将其作为body_bytes_sent
字段。%{DATA:http_referer}
:匹配并提取任意非空白字符序列,将其作为http_referer
字段。%{DATA:http_user_agent}
:匹配并提取任意非空白字符序列,将其作为http_user_agent
字段。%{WORD:request_method}
:匹配并提取由字母组成的单词,将其作为request_method
字段。%{GREEDYDATA:request_body}
:匹配并提取任意字符序列,将其作为request_body
字段。
其他常用的gork模式:
%{PATTERN:fieldname}
:将匹配的文本部分提取为名为fieldname
的字段。%{NUMBER:bytes:int}
:将匹配的数字提取为整数类型的字段bytes
。%{TIMESTAMP_ISO8601:timestamp}
:将匹配的 ISO 8601 格式的时间戳提取为字段timestamp
。%{USERNAME:username}
:匹配并提取用户名,将其作为 username 字段。%{HOSTNAME:hostname}
:匹配并提取主机名,将其作为 hostname 字段。%{EMAILADDRESS:email}
:匹配并提取电子邮件地址,将其作为 email 字段。%{URI:uri}
:匹配并提取 URI(统一资源标识符),将其作为 uri 字段。
前面的nginx pipeline配置的gork,对应的nginx日志的格式如下:
log_format main escape=json '$remote_addr - $remote_user [$time_local] "$request_time" "$upstream_response_time" "$host" "$request_uri" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$request_method" "$request_body" ';
nginx日志输出:
59.11.219.0 - [29/May/2023:17:30:35 +0800] "0.002" "0.002" "m.x.cn" "/apis/test?Id=29493" 200 8 "https://m.x.cn/auction/live/29493?s=11f231ff" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) HeadlessChrome/99.0.4844.51 Safari/537.36" "GET" ""
42.80.16.1 - [29/May/2023:17:30:35 +0800] "0.002" "0.002" "m.x.cn" "/apis/live/test?Id=29567" 200 8 "https://m.x.cn/auction/live/29567?s=5a9ac8c2" "Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148" "GET" ""
45.13.18.1 - [29/May/2023:17:37:52 +0800] "0.000" "0.000" "m.x.cn" "/apis/code" 403 37 "" "LangShen" "POST" "{\"mobile\":\"13700000000\",\"zoneCode\":\"86\"}"
你可以使用[Grok Constructor](https://grokconstructor.appspot.com/do/match 验证)来验证你的gork是否正确。
更多gork信息,参考 Grok filter plugin (opens new window)
# 重复数据处理
在采集Nginx日志的时候,发现导入到ES的数据存在重复。可以通过Logstash的fingerprint插件来解决问题,需要在配置文件的filter部分增加如下配置:
filter{
...
# 指纹插件
fingerprint {
source => "message"
target => "[@metadata][fingerprint]"
method => "MD5" # 签名方法,可以是MD5、SHA1
}
}
然后在output的elasticsearch增加document_id配置:
output {
elasticsearch {
...
document_id => "%{[@metadata][fingerprint]}" # 使用指纹作为文档的唯一标识符
}
}
# 采集应用日志
# 1. 定义Logstash Pipeline
首先,我们需要创建一个/etc/logstash/pipeline/app.conf
文件,内容如下:
input {
tcp {
port => 5045
codec => json_lines
}
}
output {
elasticsearch {
hosts => [ "elasticsearch:9200" ]
user => "elastic"
password => "elastic"
index => "logstash-%{[input][type]}-%{[fields][input_type_source]}-%{+YYYY.MM.dd}"
}
}
这个pipeline非常简单,它从TCP端口5055
接收JSON格式的日志,然后将日志输出到Elasticsearch中。
# 采集Java应用日志
要将Logback的日志输出发送到Logstash,可以通过使用Logstash的Logback插件来实现。以下是配置步骤:
- 在项目的依赖管理中添加Logstash Logback插件的依赖。示例中使用Maven:
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.3</version>
</dependency>
- 在Logback的配置文件中进行相应的配置,以将日志输出发送给Logstash。示例配置如下:
<configuration>
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>172.28.0.1:5045</destination>
<encoder class="net.logstash.logback.encoder.LogstashEncoder" />
</appender>
<root level="INFO">
<appender-ref ref="LOGSTASH" />
</root>
</configuration>
在配置中,your-logstash-host
和your-logstash-port
需要替换为实际的Logstash主机和端口。
- 重启Logstash,确保pipeline起作用。
如果你觉得这样的配置太麻烦,可以使用logstash-logging-spring-boot-starter (opens new window)来简化配置。
# Kibana的使用
进入管理页面,需要先创建index templates,以nginx为例,配置如下:
创建完成后,启动logstash,可以看到index已经创建成功了,如下图所示:
创建索引模式:
进入Kibana的首页,点击左侧的Discover
,可以看到nginx日志信息,如下图所示:
还可以为index templates创建index lifecycle policy(生命周期策略),以nginx为例,配置如下:
表示当index的日期超过30天时,就会被删除。
还需要把index lifecycle policy(生命周期策略)关联到对应的index template上,操作policy,将策略 “logstash-nginx-lifecycle” 添加到索引模板 “logstash-log-nginx” 上。
你还可以创建Dashboard,将多个图表组合在一起,以便更好地展示数据。参考:在Kibana中可视化NGINX访问日志 (opens new window)
注意:最好先在Kibana中创建index template,这样新建的index就会自动应用index template的配置。
# 到期之后,索引并没有自动删除的问题
实践中发现,索引到了生命周期删除阶段,并没有被实际删除,这应该是因为Kibana的Bug,它请求的时候没有定义action。
请参考 Elastic生命周期策略索引不删除的问题 (opens new window) 来解决此问题。
# Discover补充说明
如果日志搜索的结果和你期望的不符,你可以在右上角的”检查“看到完整的请求,比如:
可以看到这里默认按照时间倒序排序。你可以点击“Time”字段切换为正序排序。
默认情况下,text字段会使用standard分词器进行分词。你也可以在索引模式处对某个字段进行单独设置。
比如对message
字段设置了IK分词器,不过因为Discover中默认是有排序的(且无法更改),可能结果并不符合你的期望。你可以在开发工具中进行手动查询,但这很不方便。
GET /logstash-applog-*/_search
{
"size": 500,
"query": {
"match": {
"message": "全失败"
}
}
}
不管是standard分词还是IK分词,在Discover中都无法实现精确匹配,因为KQL默认使用了Terms查询 (opens new window),而Terms查询是不支持分词的,所以无法实现精确匹配。
即使你在Discover中使用Lucene查询,比如直接输入Query DSL:
"match": {
"message": "全失败"
}
这样也不行,因为排序会覆盖match搜索的评分,可能也不符合期望。
有一种变通的方式,就是将message
字段设置为wildcard
类型,然后使用wildcard查询,比如:
message : *全失败*
这样的话我们就能得到精确匹配。
# 使用Kafka作为中间件
如果日志的数据量很大,可以考虑使用Kafka作为中间件,将日志数据先缓存到Kafka中,然后再由Logstash从Kafka中读取数据,这样可以有效地减轻Logstash的压力。
filebeat的output配置修改如下:
output.kafka:
hosts: ["192.168.0.1:9092"]
topic: 'nginx'
前提是你已经安装了Kafka,参考:Kafka安装
logstash的input配置修改如下:
input {
kafka {
type => "kafka"
bootstrap_servers => "192.168.0.1:9092"
topics => "nginx"
group_id => "logstash"
consumer_threads => 2
}
}
具体参考:亿级 ELK 日志平台构建实践 (opens new window) 和 使用Logstash从Kafka到Elasticsearch (opens new window)
# 其他资料
- 日志的艺术 (opens new window) 补充:文中提到超大型集群和面向分析的场景,其实Doris 2.0开始支持倒排索引 (opens new window),可以考虑把Clickhouse换成Doris。
# 总结
本文从实操角度介绍了如何使用ELK搭建日志平台,主要讲解了采集nginx日志和Java应用日志的方法,以及如何使用Kibana进行可视化展示。希望对你有所帮助。
祝你变得更强!