轩辕李的博客 轩辕李的博客
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

轩辕李

勇猛精进,星辰大海
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 分布式

  • 代码质量管理

  • 基础

  • 操作系统

  • 计算机网络

  • AI

  • 编程范式

  • 安全

  • 中间件

    • Tomcat体系介绍及应用
    • Redis实践应用
    • Elasticsearch实战
    • 理解MySQL
    • RabbitMQ介绍与应用
    • Kafka实战
    • ELK实践之采集并分析Nginx与应用程序日志
      • 一、前言
      • 二、环境准备
      • 三、采集 Nginx 日志
        • 1、配置 Filebeat 采集 Nginx 日志
        • 2、配置 Logstash 处理日志
        • 3、理解 Logstash Pipeline
        • 4、深入理解 Grok 模式
        • 5、避免重复数据
      • 四、采集应用日志
        • 1、配置应用日志 Pipeline
        • 2、Java 应用日志配置
        • 2.1、添加依赖
        • 2.2、配置 Logback
      • 五、使用 Kibana 分析日志
        • 1、创建索引模式
        • 2、查看日志数据
        • 3、设置数据生命周期
        • 4、创建可视化图表
        • 5、常见问题及解决方案
        • 5.1、生命周期策略不生效
        • 5.2、搜索结果不符合预期
        • 5.3、中文搜索优化
      • 六、高并发场景优化
        • 1、架构调整
        • 2、配置 Kafka 中间件
        • 2.1、修改 Filebeat 配置
        • 2.2、修改 Logstash 配置
      • 七、延伸阅读
      • 八、总结
  • 心得

  • 架构
  • 中间件
轩辕李
2023-05-29
目录

ELK实践之采集并分析Nginx与应用程序日志

# 一、前言

ELK 是由 Elasticsearch、Logstash 和 Kibana 三个组件构成的开源日志分析栈,在企业级日志管理中应用广泛。简单来说:

  • Elasticsearch:分布式搜索引擎,负责存储和索引日志数据
  • Logstash:数据处理管道,负责收集、解析和转换日志
  • Kibana:可视化界面,提供数据查询和图表展示

现在很多公司的运维体系都离不开ELK,特别是微服务架构下,日志分散在各个服务中,通过ELK可以统一收集和分析,大大提升了问题排查效率。

关于 Elasticsearch 的详细介绍,可以参考我之前写的文章:Elasticsearch实战。

本文主要从实践角度出发,手把手教你如何用 ELK 收集 Nginx 访问日志和 Java 应用日志,并进行有效的分析和监控。

# 二、环境准备

在开始之前,我们需要先部署好 ELK 的基础环境。这里推荐使用 Docker 方式,简单快捷:

# 拉取镜像
docker pull docker.elastic.co/elasticsearch/elasticsearch:7.3.1
docker pull docker.elastic.co/kibana/kibana:7.3.1
docker pull docker.elastic.co/logstash/logstash:7.3.1

具体的安装和配置步骤可以参考官方文档:

  • Elasticsearch 安装指南 (opens new window)
  • Kibana 安装指南 (opens new window)

安装好基础环境后,我们就可以开始日志收集的实战了。

# 三、采集 Nginx 日志

要收集 Nginx 日志,我们会用到 Filebeat 这个轻量级的日志收集器。可能有人会问,为什么不直接用 Logstash 呢?

简单来说,两者分工不同:

  • Filebeat:专门负责日志文件的采集和传输,资源占用小,部署简单
  • Logstash:主要负责数据的解析、过滤和转换,功能强大但相对重一些

这样的架构设计让整个系统更加灵活和高效,Filebeat 在各个服务器上收集日志,然后统一发送给 Logstash 进行处理。

# 1、配置 Filebeat 采集 Nginx 日志

首先创建 Filebeat 的配置文件 /etc/filebeat/filebeat.yml:

filebeat.inputs:
- type: log
  paths:
    - /usr/local/nginx/logs/access.log  
  fields:
     input_type_source: nginx
output.logstash:
  hosts: ["172.28.0.1:5044"]

这个配置很简单:

  • paths 指定要监控的日志文件路径
  • fields 添加自定义字段,用于后续的分类处理
  • output.logstash 指定 Logstash 的地址

接下来启动 Filebeat 容器:

docker run -d --name filebeat-nginx \
  -v /usr/local/nginx/:/usr/local/nginx/ \
  -v /etc/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml \
  docker.elastic.co/beats/filebeat:7.3.1 filebeat -e -d "publish"

注意:这里假设你的 Nginx 安装在 /usr/local/nginx/ 目录,需要将整个目录挂载到容器中,让 Filebeat 能访问到日志文件。

现在 Nginx 的日志就会被 Filebeat 收集并发送到 Logstash 了,下一步我们需要配置 Logstash 来处理这些日志。

# 2、配置 Logstash 处理日志

Logstash 的核心是数据处理管道(Pipeline),它包含三个阶段:输入、过滤、输出。我们来创建一个处理 Nginx 日志的配置文件 /etc/logstash/pipeline/nginx.conf:

input {
    beats {
        port => "5044"
    }
}

filter {
    # 使用 grok 插件解析 Nginx 日志格式
    grok {
        match => { 
            "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{NUMBER:request_time}\" \"%{NUMBER:upstream_response_time}?\" \"%{DATA:http_host}\" \"%{DATA:request_uri}\" %{NUMBER:status} %{NUMBER:body_bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:http_user_agent}\" \"%{WORD:request_method}\" \"%{GREEDYDATA:request_body}\"" 
        }
        remove_field => "message"
    }
    
    # 字段类型转换和添加
    mutate {
        add_field => { "read_timestamp" => "%{@timestamp}" }
        convert => {
            "request_time" => "float"
            "upstream_response_time" => "float"
            "status" => "integer"
            "body_bytes_sent" => "integer"
        }
    }
    
    # 解析 User-Agent 信息
    useragent {
        source => "http_user_agent"
        target => "nginx_http_user_agent"
    }
    
    # 时间字段处理
    date {
        match => [ "time_local", "dd/MMM/YYYY:H:m:s Z" ]
        remove_field => "time_local"
    }
    
    # IP 地理位置解析
    geoip {
        source => "remote_addr"
    }
}

output {
    elasticsearch {
        hosts => [ "elasticsearch:9200" ]
        user => "elastic"
        password => "elastic"
        index => "logstash-%{[input][type]}-%{[fields][input_type_source]}-%{+YYYY.MM.dd}"
    }
}

接下来创建 Logstash 的主配置文件 /etc/logstash/logstash.yml:

http.host: "0.0.0.0"
# 禁用监控功能(可选)
xpack.monitoring.enabled: false
# 批处理配置
pipeline.batch.size: 1000
pipeline.batch.delay: 500

最后启动 Logstash 容器:

docker run --name logstash --restart=on-failure:3 --net=host -d \
  -v /etc/logstash/pipeline/:/usr/share/logstash/pipeline/ \
  -v /etc/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml \
  docker.elastic.co/logstash/logstash:7.3.1

重要提醒:确保 Elasticsearch 开启了自动创建索引功能,否则 Logstash 会报错无法写入数据。

# 3、理解 Logstash Pipeline

上面的配置体现了 Logstash Pipeline 的三个核心阶段:

  1. Input(输入):接收来自 Filebeat 的数据
  2. Filter(过滤):解析和处理日志数据
    • grok:解析 Nginx 日志格式,提取各个字段
    • mutate:字段类型转换和添加自定义字段
    • useragent:解析浏览器信息
    • date:时间格式处理
    • geoip:根据 IP 获取地理位置
  3. Output(输出):将处理后的数据发送到 Elasticsearch

这样一个完整的数据处理流程就建立起来了,原本无结构的日志文本被解析成了结构化的数据,方便后续的搜索和分析。

性能调优:如果需要调整 JVM 内存等参数,可以通过挂载 jvm.options 文件来实现:

-v /etc/logstash/jvm.options:/usr/share/logstash/config/jvm.options

# 4、深入理解 Grok 模式

Grok 是 Logstash 中最重要的插件之一,它能把无结构的日志文本解析成结构化的字段。说白了,就是用正则表达式的升级版来提取你想要的信息。

Grok 的语法很简单:%{PATTERN:field_name},其中:

  • PATTERN 是预定义的匹配模式
  • field_name 是提取后的字段名

示例中用到的 Grok 模式:

  • %{IPORHOST:remote_addr}:提取客户端 IP 地址
  • %{HTTPDATE:time_local}:提取访问时间,格式如 29/May/2023:17:30:35 +0800
  • %{NUMBER:request_time}:提取请求处理时间(秒)
  • %{NUMBER:upstream_response_time}?:提取后端响应时间,? 表示可选
  • %{DATA:http_host}:提取访问的域名(注意不能用 host,会被系统字段覆盖)
  • %{DATA:request_uri}:提取请求的 URI 路径
  • %{NUMBER:status}:提取 HTTP 状态码
  • %{WORD:request_method}:提取请求方法(GET、POST 等)
  • %{GREEDYDATA:request_body}:提取请求体内容

常用的 Grok 模式还有:

  • %{TIMESTAMP_ISO8601:timestamp}:ISO 8601 时间格式
  • %{USERNAME:username}:用户名格式
  • %{EMAILADDRESS:email}:邮箱地址格式
  • %{URI:uri}:URI 格式

对应的 Nginx 日志格式配置:

log_format main escape=json '$remote_addr - $remote_user [$time_local] "$request_time" "$upstream_response_time" "$host" "$request_uri" '
                            '$status $body_bytes_sent "$http_referer" '
                            '"$http_user_agent" "$request_method" "$request_body" ';

实际的日志输出示例:

59.11.219.0 -  [29/May/2023:17:30:35 +0800] "0.002" "0.002" "m.x.cn" "/apis/test?Id=29493" 200 8 "https://m.x.cn/auction/live/29493?s=11f231ff" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) HeadlessChrome/99.0.4844.51 Safari/537.36" "GET" "" 

调试工具:可以使用 Grok Constructor (opens new window) 在线验证你的 Grok 模式是否正确。

官方文档:更多信息请参考 Grok filter plugin (opens new window)

# 5、避免重复数据

在实际使用中,可能会遇到日志重复采集的问题,比如 Filebeat 重启后重新读取日志文件。这时可以用 fingerprint 插件来去重:

filter {
    # ... 其他配置

    # 生成日志指纹,用于去重
    fingerprint {
        source => "message"
        target => "[@metadata][fingerprint]"
        method => "MD5"
    }
}

然后在 output 中使用指纹作为文档 ID:

output {
    elasticsearch {
        hosts => [ "elasticsearch:9200" ]
        user => "elastic"
        password => "elastic"
        document_id => "%{[@metadata][fingerprint]}"
        index => "logstash-%{[input][type]}-%{[fields][input_type_source]}-%{+YYYY.MM.dd}"
    }
}

这样相同内容的日志会有相同的文档 ID,Elasticsearch 会自动覆盖重复的文档,实现去重效果。

# 四、采集应用日志

相比 Nginx 访问日志,应用程序日志的采集更加直接。我们可以让应用直接通过网络将日志发送给 Logstash,省去了文件读取的步骤。

# 1、配置应用日志 Pipeline

创建应用日志的配置文件 /etc/logstash/pipeline/app.conf:

input {
    tcp {
        port => 5045
        codec => json_lines
    }
}

output {
    elasticsearch {
        hosts => [ "elasticsearch:9200" ]
        user => "elastic"
        password => "elastic"
        index => "logstash-%{[input][type]}-%{[fields][input_type_source]}-%{+YYYY.MM.dd}"
    }
}

这个配置很简洁:

  • Input:监听 5045 端口,接收 JSON 格式的日志
  • Output:直接发送到 Elasticsearch

由于应用日志通常已经是结构化的 JSON 格式,所以不需要复杂的 Filter 处理。

# 2、Java 应用日志配置

对于 Java 应用,我们可以使用 Logback 的 Logstash 插件,直接将日志通过网络发送到 Logstash。

# 2.1、添加依赖

在 pom.xml 中添加 Logstash Logback 插件:

<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>7.3</version>
</dependency>

# 2.2、配置 Logback

在 logback-spring.xml 中配置:

<configuration>
    <appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>172.28.0.1:5045</destination>
        <encoder class="net.logstash.logback.encoder.LogstashEncoder" />
    </appender>
    
    <root level="INFO">
        <appender-ref ref="LOGSTASH" />
    </root>
</configuration>

这样配置后,应用的所有日志就会自动发送到 Logstash 了。

简化配置:如果觉得手动配置麻烦,可以使用 logstash-logging-spring-boot-starter (opens new window) 来简化整个过程。

# 五、使用 Kibana 分析日志

数据收集完成后,就可以在 Kibana 中进行查看和分析了。

# 1、创建索引模式

首先进入 Kibana 管理页面,创建索引模式(Index Pattern)。这里以 Nginx 日志为例:

  1. 进入 Management → Index Patterns
  2. 点击 Create index pattern
  3. 输入索引模式,比如 logstash-log-nginx-*
  4. 选择时间字段 @timestamp

索引模式创建

# 2、查看日志数据

创建好索引模式后,点击左侧的 Discover,就能看到实时的日志数据了:

日志查看界面

在这里你可以:

  • 搜索特定的日志内容
  • 按时间范围筛选
  • 查看各个字段的统计信息
  • 添加或移除显示的字段

# 3、设置数据生命周期

为了避免磁盘空间被大量日志占满,建议设置索引生命周期策略:

  1. 进入 Management → Index Lifecycle Policies
  2. 创建新的策略,比如设置 30 天后自动删除

生命周期策略

然后将这个策略关联到对应的索引模板上。

# 4、创建可视化图表

你还可以创建各种图表来分析访问趋势、状态码分布、热门页面等。推荐参考:在 Kibana 中可视化 NGINX 访问日志 (opens new window)

# 5、常见问题及解决方案

# 5.1、生命周期策略不生效

有时候会发现设置的生命周期策略不生效,索引到期后没有被删除。这通常是 Kibana 的一个已知问题。

解决方案:可以参考 Elastic 生命周期策略索引不删除的问题 (opens new window) 的解决方法。

# 5.2、搜索结果不符合预期

在 Discover 中搜索时,如果结果不符合预期,可以:

  1. 点击右上角的"检查"查看完整的 ES 查询
  2. 注意默认是按时间倒序排列的
  3. 可以点击 "Time" 字段切换排序方式

检查查询

# 5.3、中文搜索优化

对于中文日志,如果需要更好的搜索体验,可以:

  1. 为 message 字段配置 IK 分词器
  2. 或者使用 wildcard 类型,支持通配符搜索:
    message : *关键词*
    

这样可以实现更精确的中文搜索匹配。

# 六、高并发场景优化

当日志量比较大的时候,直接让 Filebeat 发送到 Logstash 可能会造成压力。这时可以引入 Kafka 作为消息中间件,实现削峰填谷。

# 1、架构调整

原本的架构:Filebeat → Logstash → Elasticsearch

优化后的架构:Filebeat → Kafka → Logstash → Elasticsearch

# 2、配置 Kafka 中间件

# 2.1、修改 Filebeat 配置

output.kafka:
  hosts: ["192.168.0.1:9092"]
  topic: 'nginx'

# 2.2、修改 Logstash 配置

input {
    kafka {
        type => "kafka"
        bootstrap_servers => "192.168.0.1:9092"
        topics => "nginx"
        group_id => "logstash"
        consumer_threads => 2
    }
}

这样的好处是:

  • 缓冲机制:Kafka 可以暂存大量日志,避免 Logstash 处理不过来
  • 水平扩展:可以启动多个 Logstash 实例并行处理
  • 数据可靠性:即使 Logstash 宕机,日志也不会丢失

扩展阅读:

  • 亿级 ELK 日志平台构建实践 (opens new window)
  • 使用 Logstash 从 Kafka 到 Elasticsearch (opens new window)

# 七、延伸阅读

如果你对日志系统设计感兴趣,推荐阅读 日志的艺术 (opens new window)。

对于超大规模的日志分析场景,除了 ELK,也可以考虑使用 Apache Doris,它从 2.0 版本开始支持倒排索引 (opens new window),在某些场景下可能比传统的 ClickHouse 方案更合适。

# 八、总结

通过本文的实践,我们完整地搭建了一个 ELK 日志分析系统,涵盖了:

  • Nginx 访问日志的采集和解析
  • Java 应用日志的收集配置
  • Kibana 中的数据查看和分析
  • 高并发场景下的架构优化

ELK 栈确实是目前最主流的日志分析解决方案,掌握了这套技能,无论是日常运维还是故障排查,都会轻松很多。

祝你变得更强!

编辑 (opens new window)
#ELK
上次更新: 2025/08/16
Kafka实战
探讨编程的本质

← Kafka实战 探讨编程的本质→

最近更新
01
AI时代的编程心得
09-11
02
Claude Code与Codex的协同工作
09-01
03
Claude Code实战之供应商切换工具
08-18
更多文章>
Theme by Vdoing | Copyright © 2018-2025 京ICP备2021021832号-2 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式