Filebeat 原理及使用
source link: https://hulining.github.io/2021/04/03/filebeat-details-and-configuration/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Filebeat 工作原理
如官方文档所描述的那样,filebeat 包含两个主要的组件,inputs
和 harvesters
.
harvesters 组件
harvester
负责读取单个文件的内容。它逐行读取每个文件,将内容发送到 output
. 对于每个文件,filebeat 都会启动一个 harvester, 负责打开和关闭文件。默认情况下 filebeat 会使文件处于打开状态,直到达到 close_inactive
配置的未读取到内容的超时时长.
需要注意的是:如果 harvester 在读取文件过程中,对该文件删除或重命名,filebeat 会继续读取该文件,直到 harvester 关闭为止.
inputs 组件
input
负责管理 harvesters 并查找所有可读取的输入源文件.
如果 input 类型为 log
, 则 input 将查找与定义的 glob 路径匹配的所有文件,并为每个文件启动 harvester. 每个 intput 都在其自己的 goroutine 中运行。示例如下:
filebeat.inputs:
- type: log
paths:
- /var/log/*.log
- /var/path2/*.log
input 支持多种类型,详见 input types
filebeat 如何保存文件状态
filebeat 会保存每个文件的读取状态,并将相关状态刷新到磁盘中的 registry file
(注册表文件) 中,用于记住 harvester 正在读取的文件的偏移量,并确保发送了所有的日志内容。重新启动 filebeat 时,会使用注册表文件中的数据来重建读取状态,并且 harvester 会在上一次读取到的位置继续进行读取.
由于在 harvester 读取文件过程中,可以重命名或移动文件,因此文件名和路径不足以标识文件。对于每个文件,filebeat 都存储唯一的标识符已检测文件是否被读取过
如果每天创建大量新文件,则可能会发现注册表文件变得太大。有关可以设置为解决此问题的配置选项的详细信息,请参阅注册表文件太大相关内容。主要是使用 clean_removed
和 clean_inactive
配置,分别对被忽略的旧文件和删除旧文件的注册表信息进行删除.
filebeat 如何保证事件至少一次被传递
filebeat 之所以能够实现此行为,是因为它在注册表文件中存储了每个事件的传递状态。在定义的 output 无法访问且尚未确认所有事件的情况下,filebeat 将继续尝试发送事件,直到 output 确认它已经收到事件为止.
重新启动 filebeat 时,将再次发送之前发送到 output 但在未确认的所有事件。这样可以确保每个事件至少发送一次,但是最终可能会将重复的事件发送到 output. 您可以通过 shutdown_timeout
配置项设置 filebeat 在关闭之前等待的时间.
filebeat 配置
filebeat 配置文件默认为 filebeat.yml
. 官方给出了不同平台下 filebeat 配置文件的位置。详见官方文档
filebeat 支持对 input 等配置进行拆分,并以文件形式包含到主配置文件中
- 可以使用
filebeat.config.inputs
将 inputs 相关配置拆分后,包含到主配置文件中.
filebeat.config.inputs:
enabled: true
path: inputs.d/*.yml
- 可以使用
filebeat.config.modules
将 modules 相关配置拆分后,包含到主配置文件中.
filebeat.config.modules:
enabled: true
path: ${path.config}/modules.d/*.yml
inputs
inputs 指定了 filebeat 如何查找和处理输入的数据。配置块为 filebeat.inputs
.
inputs 支持多种输入类型,常用的主要如下:
log
: 从日志文件中读取内容
inputs 包含一些通用的配置,主要如下:
通用配置 | 描述 | 默认值 |
---|---|---|
enabled | 是否启用该 input | true |
tags | filebeat 为每个事件添加的标签字段列表,主要用于过滤操作 | - |
fields | 添加额外的字段列表到 output. 添加的额外字段默认会在 fields 字段下 |
- |
fields_under_root | 将额外添加的字段直接添加到 output 中,而不在 fields 字段下 |
false |
processors | 应用于输入数据的处理器列表 | - |
keep_null | 是否将值为 null 字段在 output 文档中发布 | false |
index | 设置来自此输入事件的索引 | 示例: %{[agent.name]}-myindex-%{+ yyyy.MM.dd} 替换为 filebeat-myindex-2019.11.01 |
log
类型的 input 用于从日志文件中读取行,它包含如下参数:
path
: input 读取文件的路径列表recursive_glob.enabled
: 是否将**
扩展为递归 glob 匹配。最多支持 8 级。默认为 true
以下为
log
,container
类型共有的参数
encoding
: 用于读取字符数据的文件编码。如:plain
,utf-8
,gbk
.exclude_lines
: 删除与列表中正则表达式匹配的所有行include_lines
: 仅保留与列表中正则表达式匹配的所有行harvester_buffer_size
: harvester 读取文件时的缓冲区大小,单位字节。默认为 16384 (16KB)json
: 解码结构为 JSON 消息的日志.exclude_files
: 忽略列表中正则表达式匹配的文件ingore_older
: 忽略在指定时间之前修改的文件close_inactive
: 若在指定时间内文件内容未更新,则关闭文件句柄。一般设置为比日志文件更新频率大的值。默认值为 5m.close_renamed
: 是否在重命名文件后,关闭文件处理程序。默认为 falseclose_removed
: 是否在文件删除后,关闭文件处理程序。默认为 falseclose_eof
: 是否在达到文件末尾时,立即关闭文件。默认为 falseclose_timeout
: 在指定的超时时间后,harvester 关闭文件。若文件仍在更新,可以在scan_frequency
后在次启动新的 harvester. 对于释放从磁盘上删除的文件特别有用。默认为 0, 表示禁用clean_inactive
: 若在指定时间内文件内容未更新,则删除注册表中文件的状态信息clean_removed
: 是否在重命名文件后,则删除注册表中文件的状态信息。默认为 true. 若文件在段时间内消息并再次出现,则将从头开始读取文件内容scan_frequency
: 指定每隔多久扫描一次 path 路径中新文件的频率。默认为 10sscan.sort
: 对扫描到的文件按指定字段进行排序。可选值为使用modtime
修改文件和filename
文件名称进行排序。默认为空值scan.order
: 对扫描到的文件进行升序或降序排序。可选值为asc
升序和desc
降序.tail_files
: 是否从文件末尾开始进行读取文件,默认值为 falsesymlinks
: 是否除常规文件外还读取符号链接文件。默认为 false. 如果符号链接与源文件都在 path 路径下,则可能会发送重复的数据。在处理 Kubernetes 日志文件时很有用.harvester_limit
: 限制一个 input 并行启动 harvester 的数量。默认为 0, 表示没有限制.
container
container
类型的 input 用于从容器日志文件中读取日志文件,它包含如下参数:
stream
: 仅从指定的数据流中读取,可选值为all
,stdout
或stderr
. 默认值为all
.format
: 使用给定格式读取日志文件.auto
,docker
或cri
. 默认为auto
, 自动检测.
其余参数见 log
kafka
kafka
类型的 input 用于从 Kafka 集群中读取日志文件,主要支持的 kafka 集群版本为 0.11 - 2.1.0
. 它包含如下参数:
hosts
: kafka 集群主机列表topics
: 从指定 kafka 主题中读取消息group_id
: kafka 消费者组的 idclient_id
: kafka 客户端 id. 可选参数version
: kafka 协议版本。默认为1.0.0
initial_offset
: 从主题读取的初始位置。可选值为oldest
或newest
. 默认为oldest
.connect_backoff
: 出现错误后,重新连接 kafka 集群的等待时间。默认为 30s.consume_backoff
: 读取失败后,重新读取数据的等待时间。默认为 2s.max_wait_time
: 读取时等待最小输入字节数的时间。默认值为 250ms.wait_close
: 关闭时,等待传输消息并确认的时间.isolation_level
: 设置 kafka 的组隔离级别。默认为read_uncommitted
read_uncommitted
: 返回消息通道中的所有消息read_committed
: 隐藏作为中止的事务的一部分的消息
fatch
: 从 kafka 获取数据的相关设置.min
,default
,max
分别设置获取数据的最小,默认,最大 bytes 数。默认分别为1,1MB,0.
rebalance
: kafka 负载均衡设置strategy
: 负载均衡策略.range
遍历或roundrobin
轮询。默认为range
timeout
: 负载均衡的超时时间。默认为 60smax_retries
: 负载均衡失败后重启多少次。默认为 4retry_backoff
: 负载均衡失败后要等待多长时间。默认为 2s.
redis
redis
类型的 input 用于从 redis 慢日志中读取数据。它包含如下参数:
hosts
: redis 服务列表password
: 连接 redis 的密码scan_frequency
: 从 redis 慢日志中读取数据的频率。默认为 10s.timeout
: 等待 redis 响应的超时时间。默认为 1s.network
: redis 连接的网络类型。可选值为tcp
,tcp4
,tcp6
,unix
. 默认值为tcp
.maxconn
: redis 客户端的最大连接数。默认为 10.
syslog
syslog
类型的 input 用于从 TCP,UDP,Unix 套接字中读取事件。它包含如下参数:
参数 | 描述 | protocol.udp |
protocol.tcp |
protocol.unix |
---|---|---|---|---|
max_message_size |
接收数据的最大大小 | 10KiB | 20MiB | 20MiB |
host |
监听事件流的主机和端口 | - | - | 不支持 |
path |
unix 套节点地址 | 不支持 | 不支持 | - |
socket_type |
unix 套接字类型 | 不支持 | 不支持 | stream 或 datagram , 默认为 stream |
group |
unix 套接字的属组 | 不支持 | 不支持 | 运行 filebeat 程序的用户属组 |
mode |
unix 套接字的文件模式 | 不支持 | 不支持 | 默认为 0755 |
read_buffer |
套接字上读取缓冲区的大小 | - | - | - |
timeout |
套接字的读写超时时间 | - | - | - |
frame |
指定用于拆分传入事件的帧 | 不支持 | - | - |
line_delimiter |
指定分割传入事件的字符 | 不支持 | \n | \n |
max_connections |
指定任意时间点的最大连接数 | 不支持 | - | - |
timeout |
不活动的远程连接关闭之前的秒数 | 不支持 | 300s | 300s |
ssl |
ssl 参数配置 | 不支持 | - | - |
stdin
redis
类型的 input 用于从 stdin 中读取事件,它主要用于测试.
output
您可以通过设置 filebeat.yml
配置文件的 output
部分中的参数,将 filebeat 中的事件写入到指定 output.filebeat 只能定义一个 output.
elasticsearch
elasticsearch
类型的 output 将事件输出到 elasticsearch. 它包含以下参数:
api_key
: 使用 api_key 保护与 elasticsearch 的通信.username
: elasticsearch 用户名password
: elasticsearch 密码parameters
: HTTP 参数字典protocol
: elasticsearch 协议名称,http
或https
. 默认为http
path
: HTTP API 调用之前的 HTTP 路径前缀.headers
: 自定义 HTTP 请求头
以下为
elasticsearch
,logstash
类型共有的参数
enabled
: 是否启用此输出,默认为true
hosts
: elasticsearch 的主机列表.compression_level
: 事件的 gzip 压缩级别,1-9
压缩级别逐级递增。默认为 0, 表示不压缩escape_html
: 是否转义 html 为字符串。默认为 falseworker
: 向每个 elasticsearch 实例发送事件的工作程序数,最好在负载均衡模式下使用。默认为 1.proxy_url
: 连接 elasticsearch 的代理 URLindex
: 写入事件事的索引名称。默认值为filebeat-%{[agent.version]}-%{+yyyy.MM.dd}
indices
: 索引选择器规则的数组。支持条件判断等复杂条件index
: 索引名称mappings
: 一个字典,使用 index 返回的值,并将其映射到新名称default
: 如果 mapping 没有找到匹配项,则使用默认的字符串when
: 在何时使用此索引。见如下示例
# when 条件判断示例
output.elasticsearch:
hosts: ["http://localhost:9200"]
indices:
- index: "warning-%{[agent.version]}-%{+yyyy.MM.dd}"
when.contains:
message: "WARN"
- index: "error-%{[agent.version]}-%{+yyyy.MM.dd}"
when.contains:
message: "ERR"
# mapping,default 示例
output.elasticsearch:
hosts: ["http://localhost:9200"]
indices:
- index: "%{[fields.log_type]}"
mappings:
critical: "sev1"
normal: "sev2"
default: "sev3"
bulk_max_size
: 单个 elasticsearch 批量 API 索引请求中要批量处理的最大事件数。默认为 50.backoff.init
: 网络错误后尝试重新连接到 elasticsearch 的等待时长.backoff.max
: 网络错误后尝试重新连接到 elasticsearch 的最大等待时长。默认为 60stimeout
: elasticsearch 请求的超时时长,默认为 90s.ssl
: SSL 配置相关
logstash
logstash
类型的 output 将事件输出到 logstash. 它包含以下参数:
其余参数见 elasticsearch
loadbalance
: 是否启用多个 logstash 的负载均衡。默认为 false, 随机分发到主机上ttl
: filebeat 与 logstash 连接的生存时间,之后将重新建立连接。默认为 0, 表示禁用此功能
kafka
kafka
类型的 output 将事件输出到 kafka. 它包含以下参数:
enabled
: 是否启用此 outputhosts
: kafka 集群主机列表topic
: 从指定 kafka 主题中读取消息topics
: topic 选择器规则的数组。支持条件判断等复杂条件topic
: topic 名称mappings
: 一个字典,使用 topic 返回的值,并将其映射到新名称default
: 如果 mapping 没有找到匹配项,则使用默认的字符串when
: 在何时使用此 topic. 见如下示例
output.kafka:
hosts: ["localhost:9092"]
topic: "logs-%{[agent.version]}"
topics:
- topic: "critical-%{[agent.version]}"
when.contains:
message: "CRITICAL"
- topic: "error-%{[agent.version]}"
when.contains:
message: "ERR"
version
: kafka 协议版本。默认为1.0.0
username
: 连接 kafka 的用户名password
: 连接 kafka 的密码client_id
: kafka 客户端 id. 可选参数worker
: 向每个 kafka 实例发送事件的工作程序数,最好在负载均衡模式下使用。默认为 1.codec
: 编码格式。默认为 jsonmetadata
: kafka 元数据信息设置.bulk_max_size
: 单个 kafka 请求中要批量处理的最大事件数。默认值为 2048.bulk_flush_frequency
: 向 kafka 发送请求之前需要等待的时间。默认为 0. 表示没有延迟.timeout
: kafka 响应超时时间。默认值为 30schannel_buffer_size
: kafka 在 output 管道中缓冲的消息数.keep_alive
: 活动连接的保持活动时长。默认为 0. 表示禁用连接.compression
: 压缩编码方式。可选值为none
,snappy
,lz4
或gzip
. 默认值为gzip
.compression_level
: 压缩级别,0-9 依次升高。默认为 4.max_message_bytes
: json 消息的最大大小。默认为 100 万字节
redis
redis
类型的 output 用于将事件输出到 Redis list 或 channel 中。此插件与 logstash 的 redis 输入插件兼容。它包含如下参数:
enabled
: 是否启用此插件hosts
: redis 服务列表password
: 连接 redis 的密码db
: 连接 redis 的数据库。默认为 0.key
: 发送事件到 redis 的 list 或 channel 的 key 的名称。若为空,则使用index
设置的值.index
: 添加到事件元数据中的索引名称,以供 logstash 使用。默认值为filebeat
.keys
: key 选择器的规则的数组。支持条件判断等复杂条件key
: key 名称mappings
: 一个字典,使用 key 返回的值,并将其映射到新名称default
: 如果 mapping 没有找到匹配项,则使用默认的字符串when
: 在何时使用此 key. 见如下示例
output.redis:
hosts: ["localhost"]
key: "default_list"
keys:
- key: "info_list" # send to info_list if `message` field contains INFO
when.contains:
message: "INFO"
- key: "debug_list" # send to debug_list if `message` field contains DEBUG
when.contains:
message: "DEBUG"
- key: "%{[fields.list]}"
mappings:
http: "frontend_list"
nginx: "frontend_list"
mysql: "backend_list"
datatype
: 用于发布事件的 redis 数据类型。可选值为list
(使用rpush
命令) 或channel
(使用publish
命令). 默认为list
.codec
: 发送到 redis 数据的编码格式。默认为json
worker
: redis 连接的网络类型。可选值为tcp
,tcp4
,tcp6
,unix
. 默认值为tcp
.loadbalance
: redis 客户端的最大连接数。默认为 10.timeout
: 连接 redis 的超时时间。默认为 5s.backoff.init
: 网络错误后尝试重新连接到 redis 的等待时长。默认为 1s.backoff.max
: 网络错误后尝试重新连接到 redis 的最大等待时长。默认为 60s.max_retries
: 设置重试次数bulk_max_size
: 单个 redis 请求或 pipeline 批量发送数据最大数据大小。默认为 2048.
file
类型的 output 用于将事件输出到文件中。每个事件采用 json 格式,主要用于测试。它包含如下参数:
enabled
: 是否启用此插件。默认为 truepath
: 文件位置。必选参数filename
: 生成文件名称。默认设置为 Beat 的名称。如filebeat
生成文件将是filebeat.1
rotate_every_kb
: 每个文件的最大大小,以 kb 为单位。超过此大小后,文件会自动滚动。默认值为 1024 kb.number_of_files
:path
下保存的最大文件数。最早的文件将被删除。默认为 7.permissions
: 指定创建文件的权限。默认值为0600
.codec
: 输出事件的编码格式。默认为 json.
console
console
类型的 output 用于将事件输出到标准输出。每个事件采用 json 格式,主要用于测试。它包含如下参数:
pretty
: 是否以漂亮格式输出。默认为 falseenabled
: 是否启用此插件。默认为 truebulk_max_size
: 输出缓冲区的最大事件数。默认为 1024.codec
: 输出的编码格式
Logging
配置文件 filebeat.yml
的 logging
部分包含用于配置日志输出的选项。日志系统可将日志写入 syslog 或滚动日志文件中。如果未显式配置,则使用滚动文件输出日志。它主要包含如下配置项:
logging.to_stderr
: 如果为 true, 则将所有日志写入到标准输出中。与使用-e
选项相同.logging.to_syslog
: 如果为 true, 则将所有日志写入到 syslog 中.logging.to_files
: 如果为 true, 则将所有日志写入到滚动日志文件中,在日志文件大小到达一定大小后进行滚动logging.level
: 设置日志级别。可选值为debug
,info
,warning
,error
. 默认值为info
logging.metrics.enabled
: 是否定期记录其内部数据指标.logging.metrics.period
: 日志记录内部指标的时间间隔。默认为 30s.logging.files.path
: 日志文件写入的目录。默认值为logs
logging.files.name
: 日志的文件名,默认值为filebeat
.logging.files.rotateeverybytes
: 日志文件的最大大小。默认值为 10485760 (10 MB).logging.files.keepfiles
: 日志保存的文件个数。默认值为 7.logging.files.permissions
: 滚动日志时要使用的权限掩码。默认值为 0600.logging.files.interval
: 除了基于大小的滚动外,filebeat 日志还支持基于日期的文件滚动。默认是禁用的logging.files.rotateonstartup
: 如果日志文件启动时已存在,则立即滚动并写入新的文件中.logging.json
: 是否以 json 格式写入文件。默认为 false.
HTTP endpoint
http.enabled
: 是否启用 HTTP endpoint. 默认为 falsehttp.host
: 绑定地址,支持主机名,IP 地址,Unix 套接字。默认为localhost
http.port
: http endpoint 监听端口。默认为5066
# /stats 暴露了内部指标,如下
curl -XGET 'localhost:5066/stats?pretty'
{
"beat": {
# ...
}
}
filebeat.reference.yml
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK