26

Filebeat实现剖析

 4 years ago
source link: http://www.cyhone.com/articles/analysis-of-filebeat/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Filebeat是使用Golang实现的轻量型日志采集器,也是Elasticsearch stack里面的一员。本质上是一个agent,可以安装在各个节点上,根据配置读取对应位置的日志,并上报到相应的地方去。

Filebeat的可靠性很强,可以保证日志At least once的上报,同时也考虑了日志搜集中的各类问题,例如日志断点续读、文件名更改、日志Truncated等。

Filebeat并不依赖于ElasticSearch,可以单独存在。我们可以单独使用Filebeat进行日志的上报和搜集。filebeat内置了常用的Output组件, 例如kafka、ElasticSearch、redis等,出于调试考虑,也可以输出到console和file。我们可以利用现有的Output组件,将日志进行上报。

当然,我们也可以自定义Output组件,让Filebeat将日志转发到我们想要的地方。

filebeat其实是 elastic/beats 的一员,除了filebeat外,还有HeartBeat、PacketBeat。这些beat的实现都是基于libbeat框架。

整体架构

下图是Filebeat官方提供的架构图:

2eeMzyR.png!web

除了图中提到的各个组件,整个filebeat主要包含以下重要组件:

  1. Crawler:负责管理和启动各个Input
  2. Input:负责管理和解析输入源的信息,以及为每个文件启动Harvester。可由配置文件指定输入源信息。
  3. Harvester: Harvester负责读取一个文件的信息。
  4. Pipeline: 负责管理缓存、Harvester的信息写入以及Output的消费等,是Filebeat最核心的组件。
  5. Output: 输出源,可由配置文件指定输出源信息。
  6. Registrar:管理记录每个文件处理状态,包括偏移量、文件名等信息。当Filebeat启动时,会从Registrar恢复文件处理状态。

filebeat的整个生命周期,几个组件共同协作,完成了日志从采集到上报的整个过程。

日志采集流程

Filebeat不仅支持普通文本日志的作为输入源,还内置支持了redis的慢查询日志、stdin、tcp和udp等作为输入源。

本文只分析下普通文本日志的处理方式,对于普通文本日志,可以按照以下配置方式,指定log的输入源信息。

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/*.log

其中Input也可以指定多个, 每个Input下的Log也可以指定多个。

filebeat启动时会开启Crawler,对于配置中的每条Input,Crawler都会启动一个Input进行处理,代码如下所示:

func (c *Crawler) Start(...){
    ...
    for _, inputConfig := range c.inputConfigs {
        err := c.startInput(pipeline, inputConfig, r.GetStates())
        if err != nil {
            return err
        }
    }
    ...
}

由于指定的paths可以配置多个,而且可以是Glob类型,因此Filebeat将会匹配到多个配置文件。

Input对于每个匹配到的文件,都会开启一个Harvester进行逐行读取,每个Harvester都工作在自己的的goroutine中。

Harvester的工作流程非常简单,就是逐行读取文件,并更新该文件暂时在Input中的文件偏移量(注意,并不是Registrar中的偏移量),读取完成则结束流程。

同时,我们需要考虑到,日志型的数据其实是在不断增长和变化的:

  1. 会有新的日志在不断产生
  2. 可能一个日志文件对应的Harvester退出后,又再次有了内容更新。

为了解决这两个情况,filebeat采用了Input定时扫描的方式。代码如下,可以看出,Input扫描的频率是由用户指定的 scan_frequency 配置来决定的( 默认10s扫描一次)。

func (p *Runner) Run() {
	p.input.Run()

	if p.Once {
		return
	}

	for {
		select {
		case <-p.done:
			logp.Info("input ticker stopped")
			return
		case <-time.After(p.config.ScanFrequency): // 定时扫描
			logp.Debug("input", "Run input")
			p.input.Run()
		}
	}
}

此外,如果用户启动时指定了 --once 选项,则扫描只会进行一次,就退出了。

日志定时扫描及异常处理

我们之前讲到Registrar会记录每个文件的状态,当Filebeat启动时,从Registrar启动时,会从Registrar恢复文件处理状态。

其实在filebeat运行过程中,Input组件也记录了文件状态。不一样的是,Registrar是持久化存储,而Input中的文件状态仅表示当前文件的读取偏移量,且修改时不会同步到磁盘中。

每次,Filebeat刚启动时,Input都会载入Registrar中记录的文件状态,作为初始状态。Input中的状态有两个非常重要:

  1. offset: 代表文件当前读取的offset,从Registrar中初始化。Harvest读取文件后,会同时修改offset。
  2. finished: 代表该文件对应的Harvester是否已经结束,Harvester开始时置为false,结束时置为False。

对于每次定时扫描到的文件,概括来说,会有三种大的情况:

  1. Input找不到该文件状态的记录, 说明是新增文件,则开启一个Harvester,从头开始解析该文件
  2. 如果可以找到文件状态,且finished等于false。这个说明已经有了一个Harvester在处理了,这种情况直接忽略就好了。
  3. 如果可以找到文件状态,且finished等于true。说明之前有Harvester处理过,但已经处理结束了。

对于这种第三种情况,我们需要考虑到一些异常情况,Filebeat是这么处理的:

  1. 如果offset大于当前文件大小:说明文件被Truncate过,此时按做一个新文件处理,直接从头开始解析该文件
  2. 如果offset小于当前文件大小,说明文件内容有新增,则从上次offset处继续读即可。

对于第二种情况,Filebeat似乎有一个逻辑上的问题: 如果文件被Truncate过,后来又新增了数据,且文件大小也比之前offset大,那么Filebeat是检查不出来这个问题的。

除此之外,一个比较有意思的点是,Filebeat甚至可以处理文件名修改的问题。即使一个日志的文件名被修改过,Filebeat重启后,也能找到该文件,从上次读过的地方继续读。

这是因为Filebeat除了在Registrar存储了文件名,还存储了文件的唯一标识。对于Linux来说,这个文件的唯一标识就是该文件的inode ID + device ID。

至此,我们可以清楚的知道,Filebeat是如何采集日志文件,同时做到监听日志文件的更新和修改。而日志采集过程,Harvest会将数据写到Pipeline中。我们接下来看下数据是如何写入到Pipeline中的。

Pipeline的写入

Haveseter会将数据写入缓存中,而另一方面Output会从缓存将数据读走。整个生产消费的过程都是由Pipeline进行调度的,而整个调度过程也非常复杂。

此外,Filebeat的缓存目前分为memqueue和spool。memqueue顾名思义就是内存缓存,spool则是将数据缓存到磁盘中。本文将基于memqueue讲解整个调度过程。

我们首先看下Haveseter是如何将数据写入缓存中的,如下图所示:

nQfAbeB.png!web

Harvester通过pipeline提供的pipelineClient将数据写入到pipeline中,Haveseter会将读到的数据会包装成一个Event结构体,再递交给pipeline。

在Filebeat的实现中,pipelineClient并不直接操作缓存,而是将event先写入一个events channel中。

同时,有一个eventloop组件,会监听events channel的事件到来,等event到达时,eventloop会将其放入缓存中。

当缓存满的时候,eventloop直接移除对该channel的监听。

每次event ACK或者取消后,缓存不再满了,则eventloop会重新监听events channel。

以上是Pipeline的写入过程,此时event已被写入到了缓存中。

但是Output是如何从缓存中拿到event数据的?

Pipeline的消费过程

整个消费的过程非常复杂,数据会在多个channel之间传递流转,如下图所示:

UnqmyaI.png!web

首先再介绍两个角色:

  1. consumer: pipeline在创建的时候,会同时创建一个consumer。consumer负责从缓存中取数据
  2. client worker:负责接收consumer传来的数据,并调用Output的Publish函数进行上报。

与producer类似,consumer也不直接操作缓存,而是会向get channel中写入消费请求。

consumer本身是个后台loop的过程,这个消费请求会不断进行。

eventloop监听get channel, 拿到之后会从缓存中取数据。并将数据写入到resp channel中。

consumer从resp channel中拿到event数据后,又会将其写入到workQueue。

workQueue也是个channel。client worker会监听该channel上的数据到来,将数据交给Output client进行Publish上报。

而且,Output收到的是Batch Events,即会一次收到一批Events。BatchSize由各个Output自行决定。

至此,消息已经递交给了Output组件。

Ack机制

filebeat之所以可以保证日志可以at least once的上报,就是基于其Ack机制。

简单来说,Ack机制就是,当Output Publish成功之后会调用ACK,最终Registrar会收到ACK,并修改偏移量。

而且, Registrar只会在Output调用batch的相关信号时,才改变文件偏移量。其中Batch对外提供了这些信号:

type Batch interface {
	Events() []Event

	// signals
	ACK()
	Drop()
	Retry()
	RetryEvents(events []Event)
	Cancelled()
	CancelledEvents(events []Event)
}

Output在Publish之后,无论失败,必须调用这些函数中的其中一个。

以下是Output Publish成功后调用Ack的流程:

vyiAFzj.png!web

可以看到其中起核心作用的组件是Ackloop。AckLoop中有一个ackChanList,其中每一个ackChan,对应于转发给Output的一个Batch。

每次新建一个Batch,同时会建立一个ackChan,该ackChan会被append到ackChanList中。

而AckLoop每次只监听处于ackChanList最头部的ackChan。

当Batch被Output调用Ack后,AckLoop会收到对应ackChan上的事件,并将其最终转发给Registrar。同时,ackChanList将会pop头部的ackChan,继续监听接下来的Ack事件。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK