12

Filebeat源码浅析

 3 years ago
source link: https://niyanchun.com/filebeat-source-learning.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Filebeat源码浅析

2021-03-06 大数据 Filebeat 43次阅读

本文对Filebeat代码进行简单分析,作为之前 Filebeat导致文件无法被删除的原因分析 一文的补充,当然也可单独阅读,了解Filebeat的代码逻辑。需要注意的是:本文不是全面、深度的Filebeat源码剖析,而是专注在通用配置下核心数据的流转上面,目标是理清楚数据从采集到中间流转,最后到发送的流程,而不是对每处代码细节进行分析讲解。本文核心点包括:

  1. Filebeat实例的创建和初始化流程;
  2. 文件的遍历、采集,包括Crawler、Input、Harvester、Reader等核心概念;
  3. 数据发送逻辑,即Publisher/Pipeline;
  4. Filebeat的ACK机制。

另外,本文包含大量代码,但为了尽量减少代码所占篇幅和突出重点逻辑,大部分代码都做了删减,有的地方有说明,有的没有说明。如果想看完整代码,建议按照下面的指导搭建自己的源码环境,对照着阅读。当然,本文假设你已经对Filebeat基本使用比较熟悉了。

Filebeat是Elastic公司Beat系列的一员,所有Beat的源码都归档在一起,所以下载源码直接克隆Beat的代码即可。需要注意的是早期Go的模块依赖使用的是govendor,这种方式需要代码在GOPATH的src目录下面,而Go 1.11版本中引入了Go module作为新的模块依赖,并且在1.12版本中正式启用,作为默认选项(当然用户可以通过GO111MODULE这个环境变量来控制是否使用Go module功能)。这个特性的转变对于搭建Filebeat的源码环境也有一些影响,查看一下你想阅读的Beat的分支是否有go.mod文件,如果有则说明使用的是Go module,否则还是老的govendor。如果是govendor,那你必须将代码放到${GOPATH}/src/github.com/elastic,且GO111MODULE设置为auto或者off;如果是Go module,则没有这个要求。至于Golang的版本,不要比官方要求的版本低就行。

本文的讲解是基于6.8分支的代码(6.8.15),该版本的beat模块依赖还是早期的govendor,所以用如下命令搭建源码环境:

mkdir -p ${GOPATH}/src/github.com/elastic
git clone https://github.com/elastic/beats ${GOPATH}/src/github.com/elastic/beats
cd ${GOPATH}/src/github.com/elastic/beats
git checkout 6.8

如果你无法科学上网,从github拉取beat的代码会很慢(主要是因为.git目录非常大),所以我把自己的代码打包传到了百度网盘,实在从github上拉不下来的可以直接下载我上传的(点此进入下载页面,密码: 6bro)。我提供了2个版本,一个包含git,一个不包含,都不影响代码阅读,没有git的好处是代码会小很多。

我的Golang版本是1.15.7,编辑器是vscode。代码下载好以后,直接运行根目录下面的filebeat目录中的main.go,如果可以运行,那环境就算搭建好了。注意:如果提示配置文件filebeat.yml找不到,那环境也没问题,可能是因为你启动Filebeat的目录没有这个配置文件,可以通过-c <配置文件>这个命令行参数去指定一个filebeat配置即可。另外,Filebeat日志默认只输出到文件,如果调试的时候想输出到终端,再增加一个-e参数。如果是vscode,则直接修改.vscode/launch.json即可:

{
    // Use IntelliSense to learn about possible attributes.
    // Hover to view descriptions of existing attributes.
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Launch file",
            "type": "go",
            "request": "launch",
            "mode": "debug",
            "program": "${file}",
            "args": ["-e"]
        }
    ]
}

filebeat与libbeat

熟悉Beat的应该都知道Beat里面有一个核心的libbeat模块,实现了很多基础功能,比如实例初始化、公共配置、队列、输出(output)、日志等,不同的Beat往往只需要实现如何采集数据,然后将数据以事件(event)的方式通过channel发给libbeat即可,这种方式大大简化了各个Beat的开发。下面是官方的一张说明Beat与libbeat关系的图:

上面提到的事件是在libbeat/beat/event.go中定义的:

// Event is the common event format shared by all beats.
// Every event must have a timestamp and provide encodable Fields in `Fields`.
// The `Meta`-fields can be used to pass additional meta-data to the outputs.
// Output can optionally publish a subset of Meta, or ignore Meta.
type Event struct {
    Timestamp time.Time
    Meta      common.MapStr
    Fields    common.MapStr
    Private   interface{} // for beats private use
}

Filebeat作为Beat的一员,也遵从上面的逻辑:Filebeat实现了文件的采集,并以event的方式发送给libbeat的Publisher,后续的工作都由libbeat完成。所以了解Filebeat完整的数据流程包含Filebeat和libbeat两部分代码。对应的代码在根目录下的filebeat目录和libbeat目录。为了方便理解,这里先给出一个精简的数据流图:

各部分主要功能是:

  • Filebeat instance:Filebeat实例的创建和初始化;
  • Crawler:遍历配置中的input,创建、初始化Input实例;
  • Input:遍历配置的目录、文件,为每个需要采集的文件创建、初始化Harvester;
  • Harvester:读取文件内容,以event的方式发送到后端的队列(本文以最常使用的内存队列为例讲解);
  • Queue/Spooler:Filebeat内部的队列有两种:内存队列(Broker)和文件队列(Spooler),代码中文件队列叫spooler,但大多数地方spooler是queue的代名词,而不专指文件队列;
  • Publisher:libbeat中的发送逻辑,整个发送流程在代码中称为pipeline
  • Registrar:记录采集过的文件及位置(offset),和ACK机制一起实现Filebeat承诺的至少发送一次(at-least once)的保证。

下面分别介绍这些模块。

实例初始化 && Registrar

第一步是Filebeat的实例初始化。libbeat定义了一个Beater接口,不同的Beat实现这个接口,即可复用libbeat定义的诸多基础功能,这个接口定义如下:

// libbeat/beat/beat.go

// Beater is the interface that must be implemented by every Beat. A Beater
// provides the main Run-loop and a Stop method to break the Run-loop.
// Instantiation and Configuration is normally provided by a Beat-`Creator`.
//
// Once the beat is fully configured, the Run() method is invoked. The
// Run()-method implements the beat its run-loop. Once the Run()-method returns,
// the beat shuts down.
//
// The Stop() method is invoked the first time (and only the first time) a
// shutdown signal is received. The Stop()-method normally will stop the Run()-loop,
// such that the beat can gracefully shutdown.
type Beater interface {
    // The main event loop. This method should block until signalled to stop by an
    // invocation of the Stop() method.
    Run(b *Beat) error

    // Stop is invoked to signal that the Run method should finish its execution.
    // It will be invoked at most once.
    Stop()
}

// Beat contains the basic beat data and the publisher client used to publish
// events.
type Beat struct {
    Info      Info     // beat metadata.
    Publisher Pipeline // Publisher pipeline

    SetupMLCallback SetupMLCallback // setup callback for ML job configs
    InSetupCmd      bool            // this is set to true when the `setup` command is called

    OverwritePipelinesCallback OverwritePipelinesCallback // ingest pipeline loader callback
    // XXX: remove Config from public interface.
    //      It's currently used by filebeat modules to setup the Ingest Node
    //      pipeline and ML jobs.
    Config *BeatConfig // Common Beat configuration data.

    BeatConfig *common.Config // The beat's own configuration section

    Fields []byte // Data from fields.yml

    ConfigManager management.ConfigManager // config manager
}

Filebeat在filebeat/beater/filebeat.go中实现了这个接口。我们看下Filebeat是如何从main函数(filebeat/main.go)流转到filebeat.go的:

// filebeat/main.go

// The basic model of execution:
// - input: finds files in paths/globs to harvest, starts harvesters
// - harvester: reads a file, sends events to the spooler
// - spooler: buffers events until ready to flush to the publisher
// - publisher: writes to the network, notifies registrar
// - registrar: records positions of files read
// Finally, input uses the registrar information, on restart, to
// determine where in each file to restart a harvester.
func main() {
    if err := cmd.RootCmd.Execute(); err != nil {
        os.Exit(1)
    }
}

main前面的注释概述了Filebeat的运行逻辑,也是前面图的由来。main方法里面的代码很简单,下面直接给出关键的调用关系:

  1. filebeat/main.go#main
  2. filebeat/cmd/root.go#init: RootCmd = cmd.GenRootCmdWithRunFlags(Name, "", beater.New, runFlags)
  3. libbeat/cmd/root.go#GenRootCmdWithSettings: rootCmd.RunCmd = genRunCmd(settings, beatCreator, runFlags)
  4. libbeat/cmd/run.go#genRunCmd: err := instance.Run(settings, beatCreator)
  5. libbeat/cmd/instance/beat.go#Run: return b.launch(settings, bt)
  6. libbeat/cmd/instance/beat.go#launch: return beater.Run(&b.Beat)
  7. filebeat/beater/filebeat.go#Run

这部分的代码流转主要是在libbeat中,所以是一些比较通用化的配置,主要是构造Beater实例(此处是Filebeat实例),并最终进入到filebeat.go的Run方法中。Run方法中有两个比较重要的操作:registrar和crawler的创建、启动。

Filebeat的at-least once采集就是通过Registrar模块实现的,该文件会记录采集过的文件以及最后采集的位置,并且只有收到数据成功发送的确认后才会更新,其核心结构如下:

// filebeat/registrar/registrar.go
type Registrar struct {
    Channel      chan []file.State
    out          successLogger
    done         chan struct{}
    registryFile string      // Path to the Registry File
    fileMode     os.FileMode // Permissions to apply on the Registry File
    wg           sync.WaitGroup

    states               *file.States // Map with all file paths inside and the corresponding state
    gcRequired           bool         // gcRequired is set if registry state needs to be gc'ed before the next write
    gcEnabled            bool         // gcEnabled indicates the registry contains some state that can be gc'ed in the future
    flushTimeout         time.Duration
    bufferedStateUpdates int
}

确认消息成功发送的回调(通过channel实现)函数是在Run方法里面注册的(这部分后面还会专门讲解):

err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
    ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
})

Registrar的记录文件默认为filebeat安装目录/data/registry,示例内容如下(为了方便查看,进行了格式化):

[{
    "source": "/Users/allan/Desktop/temp/test-logs/test-1.log",
    "offset": 362,
    "timestamp": "2021-02-28T21:28:55.435218+08:00",
    "ttl": -1,
    "type": "log",
    "meta": null,
    "FileStateOS": {
        "inode": 8712351738,
        "device": 16777225
    }
}, {
    "source": "/Users/allan/Desktop/temp/test-logs/test-2.log",
    "offset": 362,
    "timestamp": "2021-02-28T21:28:55.24922+08:00",
    "ttl": -1,
    "type": "log",
    "meta": null,
    "FileStateOS": {
        "inode": 8712603538,
        "device": 16777225
    }
}]

另外,为了保证数据不丢失,实例初始化的时候各大模块的启动顺序依次是:registrar --> publisher --> spooler --> crawler/input. 而实例停止的时候各大模块的停止顺序则正好相反。下面看Crawler和Input。

输入/文件遍历(Crawler && Input)

Crawler

我们知道filebeat采集数据是通过在配置文件里面配置若干个input实现的,而Crawler的作用就是解析这些input,并创建、启动Input。Crawler的核心代码在filebeat/crawler/crawler.go中。

核心结构定义如下:

type Crawler struct {
    inputs          map[uint64]*input.Runner   // 包含若干个Input
    inputConfigs    []*common.Config
    out             channel.Factory
    wg              sync.WaitGroup
    InputsFactory   cfgfile.RunnerFactory
    ModulesFactory  cfgfile.RunnerFactory
    modulesReloader *cfgfile.Reloader
    inputReloader   *cfgfile.Reloader
    once            bool
    beatVersion     string
    beatDone        chan struct{}
}

在Start方法中遍历配置中的input,并创建、启动input:

// Start starts the crawler with all inputs
func (c *Crawler) Start(
    pipeline beat.Pipeline,
    r *registrar.Registrar,
    configInputs *common.Config,
    configModules *common.Config,
    pipelineLoaderFactory fileset.PipelineLoaderFactory,
    overwritePipelines bool,
) error {

    logp.Info("Loading Inputs: %v", len(c.inputConfigs))

    // Prospect the globs/paths given on the command line and launch harvesters
    for _, inputConfig := range c.inputConfigs {
        err := c.startInput(pipeline, inputConfig, r.GetStates())
        if err != nil {
            return err
        }
    }

    // 省略部分代码
    logp.Info("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs))

    return nil
}

// 创建、启动Input
func (c *Crawler) startInput(
    pipeline beat.Pipeline,
    config *common.Config,
    states []file.State,
) error {
    if !config.Enabled() {
        return nil
    }

    connector := channel.ConnectTo(pipeline, c.out)
    p, err := input.New(config, connector, c.beatDone, states, nil)
    if err != nil {
        return fmt.Errorf("Error in initing input: %s", err)
    }
    p.Once = c.once

    if _, ok := c.inputs[p.ID]; ok {
        return fmt.Errorf("Input with same ID already exists: %d", p.ID)
    }

    c.inputs[p.ID] = p

    p.Start()

    return nil
}

Input

Filebeat支持多种类型的Input,比如log、redis、stdin、docker等,这些代码都在filebeat/input目录,不同类型的目录下面实现了特定的input。通用Input接口定义如下:

// filebeat/input/input.go
// Input is the interface common to all input
type Input interface {
    Run()
    Stop()
    Wait()
}

// Start starts the input
func (p *Runner) Start() {
    p.wg.Add(1)
    logp.Info("Starting input of type: %v; ID: %d ", p.config.Type, p.ID)

    // 省略部分代码

    // Add waitgroup to make sure input is finished
    go func() {
        defer func() {
            onceWg.Done()
            p.stop()
            p.wg.Done()
        }()

        p.Run()
    }()
}

// Run starts scanning through all the file paths and fetch the related files. Start a harvester for each file
func (p *Runner) Run() {
    // Initial input run
    p.input.Run()

    // 省略部分代码
}

此处以最常用的log类型为例进行说明。p.input.Run()会跳转到filebeat/log/input.go#Run

// Input contains the input and its config
type Input struct {
    cfg           *common.Config
    config        config
    states        *file.States
    harvesters    *harvester.Registry   // 1个input包含多个harvesters
    outlet        channel.Outleter
    stateOutlet   channel.Outleter
    done          chan struct{}
    numHarvesters atomic.Uint32
    meta          map[string]string
    stopOnce      sync.Once
}


// Run runs the input
func (p *Input) Run() {
    logp.Debug("input", "Start next scan")

    // TailFiles is like ignore_older = 1ns and only on startup
    if p.config.TailFiles {
        ignoreOlder := p.config.IgnoreOlder

        // Overwrite ignore_older for the first scan
        p.config.IgnoreOlder = 1
        defer func() {
            // Reset ignore_older after first run
            p.config.IgnoreOlder = ignoreOlder
            // Disable tail_files after the first run
            p.config.TailFiles = false
        }()
    }
    p.scan()

    // It is important that a first scan is run before cleanup to make sure all new states are read first
    if p.config.CleanInactive > 0 || p.config.CleanRemoved {
        beforeCount := p.states.Count()
        cleanedStates, pendingClean := p.states.Cleanup()
        logp.Debug("input", "input states cleaned up. Before: %d, After: %d, Pending: %d",
            beforeCount, beforeCount-cleanedStates, pendingClean)
    }

    // Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first
    if p.config.CleanRemoved {
        for _, state := range p.states.GetStates() {
            // os.Stat will return an error in case the file does not exist
            stat, err := os.Stat(state.Source)
            if err != nil {
                if os.IsNotExist(err) {
                    p.removeState(state)
                    logp.Debug("input", "Remove state for file as file removed: %s", state.Source)
                } else {
                    logp.Err("input state for %s was not removed: %s", state.Source, err)
                }
            } else {
                // Check if existing source on disk and state are the same. Remove if not the case.
                newState := file.NewState(stat, state.Source, p.config.Type, p.meta)
                if !newState.FileStateOS.IsSame(state.FileStateOS) {
                    p.removeState(state)
                    logp.Debug("input", "Remove state for file as file removed or renamed: %s", state.Source)
                }
            }
        }
    }
}

对Filebeat配置比较熟悉的朋友看到这部分代码,应该很亲切,变量的命名和配置项几乎是对应的,很多判断逻辑都是对配置项的处理,很容易理解。其中比较关键的是p.scan():

// Scan starts a scanGlob for each provided path/glob
func (p *Input) scan() {
    var sortInfos []FileSortInfo
    var files []string

    // 获取目录下需要被采集的文件,是否需要被采集的逻辑就是在getFiles()方法中实现
    paths := p.getFiles()

    // 省略部分代码

    for i := 0; i < len(paths); i++ {
        // 省略一些判断是否采集过,以及采集到哪里的代码

        // Decides if previous state exists
        if lastState.IsEmpty() {
            logp.Debug("input", "Start harvester for new file: %s", newState.Source)
            // 启动harvester
            err := p.startHarvester(newState, 0)
            // 省略错误处理代码
        } else {
            p.harvestExistingFile(newState, lastState)
        }
    }
}

// harvestExistingFile continues harvesting a file with a known state if needed
func (p *Input) harvestExistingFile(newState file.State, oldState file.State) {
    logp.Debug("input", "Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset)

    if oldState.Finished && newState.Fileinfo.Size() > oldState.Offset {
        logp.Debug("input", "Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size())
        // 启动harvester采集
        err := p.startHarvester(newState, oldState.Offset)
        if err != nil {
            logp.Err("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
        }
        return
    }
    // 省略后续代码
}

// startHarvester starts a new harvester with the given offset
// In case the HarvesterLimit is reached, an error is returned
func (p *Input) startHarvester(state file.State, offset int64) error {
    if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 {
        p.numHarvesters.Dec()
        harvesterSkipped.Add(1)
        return errHarvesterLimit
    }
    // Set state to "not" finished to indicate that a harvester is running
    state.Finished = false
    state.Offset = offset

    // Create harvester with state
    h, err := p.createHarvester(state, func() { p.numHarvesters.Dec() })
    if err != nil {
        p.numHarvesters.Dec()
        return err
    }

    err = h.Setup()
    if err != nil {
        p.numHarvesters.Dec()
        return fmt.Errorf("error setting up harvester: %s", err)
    }

    // Update state before staring harvester
    // This makes sure the states is set to Finished: false
    // This is synchronous state update as part of the scan
    h.SendStateUpdate()

    if err = p.harvesters.Start(h); err != nil {
        p.numHarvesters.Dec()
    }
    return err
}

scan代码的核心逻辑就是遍历目录下的文件,找到需要采集的文件后就创建启动一个harvester实例进行采集。最后面的startHarvester方法中先是创建了一个harvester实例(p.createHarvester),然后配置(h.Setup)该实例,最后启动(p.harvesters.Start(h))实例。这部分在接下来的Harvester部分介绍。

filebeat/log/input.go文件中的代码中包含了大量配置项的代码逻辑,建议好好看一下。如果你对input部分的配置项比较熟悉,这部分代码看起来也比较简单。对照配置项说明文档进行查看,效果更佳。

数据采集Harvester

Harvester

一个Harvester就是一个goroutine,接口定义在filebeat/harvester/harvester.go中:

// Harvester contains all methods which must be supported by each harvester
// so the registry can be used by the input
type Harvester interface {
    ID() uuid.UUID      
    Run() error
    Stop()
}

不同的input类型会实现自己的Harvester,log类型的Harvester实现在filebeat/input/log/harvester.go中,核心结构定义如下:

// Harvester contains all harvester related data
type Harvester struct {
    id     uuid.UUID
    config config
    source harvester.Source // the source being watched

    // shutdown handling
    done     chan struct{}
    stopOnce sync.Once
    stopWg   *sync.WaitGroup
    stopLock sync.Mutex

    // internal harvester state
    state  file.State
    states *file.States
    log    *Log

    // file reader pipeline
    reader          reader.Reader
    encodingFactory encoding.EncodingFactory
    encoding        encoding.Encoding

    // event/state publishing
    outletFactory OutletFactory
    publishState  func(*util.Data) bool

    onTerminate func()
}

// Run start the harvester and reads files line by line and sends events to the defined output
func (h *Harvester) Run() error {
    // 该方法在上篇文章中已经介绍过,下面省略掉了大部分代码,只保留了读取和发送
    for {
        // 读取
        message, err := h.reader.Next()

        // 发送
        if !h.sendEvent(data, forwarder) {
            return nil
        }
    }
}

上篇文章中已经介绍过Harvester,其核心任务就是打开文件,根据配置读取文件内容,并发送。读取和发送都是在Run中实现,见上面的代码注释。本文再补充介绍下另外一个关键点reader.Reader

各种Reader

其实读取的真正操作是由一系列Reader完成的。Reader接口定义在filebeat/reader/reader.go中:

// Reader is the interface that wraps the basic Next method for
// getting a new message.
// Next returns the message being read or and error. EOF is returned
// if reader will not return any new message on subsequent calls.
type Reader interface {
    Next() (Message, error)
}

// Message represents a reader event with timestamp, content and actual number
// of bytes read from input before decoding.
type Message struct {
    Ts      time.Time     // timestamp the content was read
    Content []byte        // actual content read
    Bytes   int           // total number of bytes read to generate the message
    Fields  common.MapStr // optional fields that can be added by reader
}

该接口只包含一个Next方法,每调用一次,则读取一个Message。Filebeat实现了很多种Reader,这些Reader根据用户的配置形成一个调用链,对最原始的数据依次进行处理,就像一个流水线一样。每一个后面的Reader都包含了前面的Reader。这些Reader都定义在filebeat/reader目录,主要包括下面这些:

最底层的log.go#Read并不是一个Filebeat的reader.Reader实现,而是直接调用了Go底层的Read,实现读取指定长度的字节数据:

// filebeat/input/log/log.go
func (f *Log) Read(buf []byte) (int, error) {
    for {
        n, err := f.fs.Read(buf)  // 调用go底层os/file.go#Read
    }
}

// os/file.go#Read
// Read reads up to len(b) bytes from the File.
// It returns the number of bytes read and any error encountered.
// At end of file, Read returns 0, io.EOF.
func (f *File) Read(b []byte) (n int, err error) {
    if err := f.checkValid("read"); err != nil {
        return 0, err
    }
    n, e := f.read(b)
    return n, f.wrapErr("read", e)
}

再往上都是各种reader.Reader的实现,依次如下(都省略了部分代码):

LineReader,实现逐行读取的功能:

// filebeat/reader/readfile/line.go
// lineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
    reader     io.Reader
    codec      encoding.Encoding
    bufferSize int
    maxBytes   int
    nl         []byte
    inBuffer   *streambuf.Buffer
    outBuffer  *streambuf.Buffer
    inOffset   int // input buffer read offset
    byteCount  int // number of bytes decoded from input buffer into output buffer
    decoder    transform.Transformer
}

// Next reads the next line until the new line character
func (r *LineReader) Next() ([]byte, int, error) {
    for {
        err := r.advance()
    }
}

// Reads from the buffer until a new line character is detected
// Returns an error otherwise
func (r *LineReader) advance() error {
    // fill inBuffer until '\n' sequence has been found in input buffer
    for idx == -1 {
        // try to read more bytes into buffer   filebeat/input/log/log.go#Read
        n, err := r.reader.Read(buf)
    }
}

EncoderReader,对行数据进行编解码:

// filebeat/reader/readfile/encode.go
// Reader produces lines by reading lines from an io.Reader
// through a decoder converting the reader it's encoding to utf-8.
type EncoderReader struct {
    reader *LineReader
}

// Next reads the next line from it's initial io.Reader
// This converts a io.Reader to a reader.reader
func (r EncoderReader) Next() (reader.Message, error) {
    c, sz, err := r.reader.Next()
    // Creating message object
    return reader.Message{
        Ts:      time.Now(),
        Content: c,
        Bytes:   sz,
    }, err
}

JSON Reader,处理JSON格式的数据:

// filebeat/reader/json/json.go
type JSON struct {
    reader reader.Reader
    cfg    *Config
}

// Next decodes JSON and returns the filled Line object.
func (r *JSON) Next() (reader.Message, error) {
}

StripNewline Reader,去掉后面的换行符:

// filebeat/reader/readfile/strip_newline.go
// StripNewline reader removes the last trailing newline characters from
// read lines.
type StripNewline struct {
    reader reader.Reader
}

// Next returns the next line.
func (p *StripNewline) Next() (reader.Message, error) {
}

Timeout Reader,超时处理:

// filebeat/reader/readfile/timeout.go
// TimeoutReader will signal some configurable timeout error if no
// new line can be returned in time.
type TimeoutReader struct {
    reader  reader.Reader
    timeout time.Duration
    signal  error
    running bool
    ch      chan lineMessage
}

// Next returns the next line. If no line was returned before timeout, the
// configured timeout error is returned.
// For handline timeouts a goroutine is started for reading lines from
// configured line reader. Only when underlying reader returns an error, the
// goroutine will be finished.
func (r *TimeoutReader) Next() (reader.Message, error) {
}

Multiline Reader,多行合并处理:

// filebeat/reader/multiline/multiline.go
// MultiLine reader combining multiple line events into one multi-line event.
//
// Lines to be combined are matched by some configurable predicate using
// regular expression.
//
// The maximum number of bytes and lines to be returned is fully configurable.
// Even if limits are reached subsequent lines are matched, until event is
// fully finished.
//
// Errors will force the multiline reader to return the currently active
// multiline event first and finally return the actual error on next call to Next.
type Reader struct {
    reader       reader.Reader
    pred         matcher
    flushMatcher *match.Matcher
    maxBytes     int // bytes stored in content
    maxLines     int
    separator    []byte
    last         []byte
    numLines     int
    truncated    int
    err          error // last seen error
    state        func(*Reader) (reader.Message, error)
    message      reader.Message
}

// Next returns next multi-line event.
func (mlr *Reader) Next() (reader.Message, error) {
    return mlr.state(mlr)
}

LimitReader,单个event长度限制:

// filebeat/reader/readfile/limit.go
// Reader sets an upper limited on line length. Lines longer
// then the max configured line length will be snapped short.
type LimitReader struct {
    reader   reader.Reader
    maxBytes int
}

// Next returns the next line.
func (r *LimitReader) Next() (reader.Message, error) {
    message, err := r.reader.Next()
    if len(message.Content) > r.maxBytes {
        message.Content = message.Content[:r.maxBytes]
        message.AddFlagsWithKey("log.flags", "truncated")
    }
    return message, err
}

这么多的Reader并非在所有场景下都是必须的,需要根据用户配置进行装配,这个操作是在初始化Harvester时进行的:

// Setup opens the file handler and creates the reader for the harvester
func (h *Harvester) Setup() error {
    err := h.open()
    if err != nil {
        return fmt.Errorf("Harvester setup failed. Unexpected file opening error: %s", err)
    }

    // 在newLogFileReader中根据用户配置装配 Reader 流
    h.reader, err = h.newLogFileReader()
    if err != nil {
        if h.source != nil {
            h.source.Close()
        }
        return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)
    }

    return nil
}

Harvester的Run方法中通过无限循环调用h.reader.Next()时,会依次递归调用这些Reader的Next方法,加工数据,得到的最终数据发给给后端。

发送逻辑(Harvester内部)

发送的调用流程如下:

  1. filebeat/input/log/harvester.go#Run(): h.sendEvent(data, forwarder)
  2. filebeat/input/log/harvester.go#sendEvent: forwarder.Send(data)
  3. filebeat/harvester/forwarder.go#Send: f.Outlet.OnEvent(data)
  4. filebeat/channel/util.go#OnEvent

实际的发送是由filebeat/channel/util.gofilebeat/channel/outlet.go中的代码协作完成的:

filebeat/channel/util.go:

// filebeat/channel/util.go
func (o *subOutlet) OnEvent(d *util.Data) bool {

    o.mutex.Lock()
    defer o.mutex.Unlock()
    select {
    case <-o.done:
        return false
    default:
    }

    select {
    case <-o.done:
        return false
    // 数据写入channel
    case o.ch <- d:
        select {
        case <-o.done:
            return true
        // 等待结果
        case ret := <-o.res:
            return ret
        }
    }
}

// filebeat/channel/util.go
// SubOutlet create a sub-outlet, which can be closed individually, without closing the
// underlying outlet.
func SubOutlet(out Outleter) Outleter {
    s := &subOutlet{
        done: make(chan struct{}),
        ch:   make(chan *util.Data),
        res:  make(chan bool, 1),
    }

    go func() {
        // 从channel读取数据,并调用OnEvent发送数据
        for event := range s.ch {
            s.res <- out.OnEvent(event)
        }
    }()

    return s
}

filebeat/channel/outlet.go:

func (o *outlet) OnEvent(d *util.Data) bool {
    if !o.isOpen.Load() {
        return false
    }

    event := d.GetEvent()
    if d.HasState() {
        event.Private = d.GetState()
    }

    if o.wg != nil {
        o.wg.Add(1)
    }

    o.client.Publish(event) // 跳到libbeat/publisher/pipeline/client.go#Publish

    return o.isOpen.Load()
}

这部分代码逻辑是:

  1. 数据经由filebeat/channel/util.go#OnEvent中的case o.ch <- d写入到channel中,然后在内层的select中等待结果;
  2. 同时filebeat/channel/util.go#Outleter里面的一个goroutine会读取写入到channel的数据,并调用filebeat/channel/outlet.go#OnEvent中的o.client.Publish(event)发送数据,发送成功后,将结果写回到subOutlet的res channel,然后harvester返回。

上篇文章中harvester关闭不掉,就是因为卡在了o.client.Publish(event)无法返回,所以harvester卡在了subOutlet的res channel,即一直等待发送结果。

到目前为止,数据都是在Filebeat的代码中流转的,此处调用o.client.Publish(event)之后,就会进入到libbeat中(libbeat/publisher/pipeline/client.go#Publish)。

数据发送Publisher

从这部分开始,代码进入到libbeat中,这部分代码的复杂主要体现在涉及概念比较多,流转流程比较长。

Pipeline

Beat把Publisher中的整个流程称之为pipeline,是libbeat中非常核心的一部分,代码在libbeat/publisher/pipeline/pipeline.go中,这里贴一下包含较详细注释部分的代码:

// Package pipeline combines all publisher functionality (processors, queue,
// outputs) to create instances of complete publisher pipelines, beats can
// connect to publish events to.
package pipeline

// Pipeline implementation providint all beats publisher functionality.
// The pipeline consists of clients, processors, a central queue, an output
// controller and the actual outputs.
// The queue implementing the queue.Queue interface is the most central entity
// to the pipeline, providing support for pushung, batching and pulling events.
// The pipeline adds different ACKing strategies and wait close support on top
// of the queue. For handling ACKs, the pipeline keeps track of filtered out events,
// to be ACKed to the client in correct order.
// The output controller configures a (potentially reloadable) set of load
// balanced output clients. Events will be pulled from the queue and pushed to
// the output clients using a shared work queue for the active outputs.Group.
// Processors in the pipeline are executed in the clients go-routine, before
// entering the queue. No filtering/processing will occur on the output side.
type Pipeline struct {
    beatInfo beat.Info

    logger *logp.Logger
    queue  queue.Queue
    output *outputController

    observer observer

    eventer pipelineEventer

    // wait close support
    waitCloseMode    WaitCloseMode
    waitCloseTimeout time.Duration
    waitCloser       *waitCloser

    // pipeline ack
    ackMode    pipelineACKMode
    ackActive  atomic.Bool
    ackDone    chan struct{}
    ackBuilder ackBuilder
    eventSema  *sema

    processors pipelineProcessors
}

我整理了一个图:

下面介绍pipeline中的各个环节。接前文,Harvester中会持有一个最左边的client实例(一个Input实例中的所有Harvester共享该Client),然后通过这个client调用Producer将数据发送到一个buffer,这是一个channel,大小硬编码为20。同时需要注意的一个点是client的publish中还执行了processor,也就是如果定义了processor,他是在发送的Client里面执行的。这部分功能对应代码如下(只保留了关键代码):

// filebeat/channel/outlet.go
func (o *outlet) OnEvent(d *util.Data) bool {
    o.client.Publish(event) // 跳到libbeat/publisher/pipeline/client.go#Publish
}

// libbeat/publisher/pipeline/client.go
func (c *client) publish(e beat.Event) {
    // 如果定义了processor,则在此处执行
    if c.processors != nil {
        var err error

        event, err = c.processors.Run(event)
        publish = event != nil
        if err != nil {
            // TODO: introduce dead-letter queue?

            log.Errorf("Failed to publish event: %v", err)
        }
    }

    var published bool
    if c.canDrop {
        published = c.producer.TryPublish(pubEvent)
    } else {
        published = c.producer.Publish(pubEvent) // queue/memqueue/produce.go
    }
}

// libbeat/publisher/queue/memqueue/produce.go
func (p *forgetfulProducer) Publish(event publisher.Event) bool {
    return p.openState.publish(p.makeRequest(event))
}
func (st *openState) publish(req pushRequest) bool {
    select {
    // 将数据发送到events buffer中
    case st.events <- req:
        return true
    case <-st.done:
        st.events = nil
        return false
    }
}

然后EventLoop从events buffer中读取数据写到batchBuffer中,batchBuffer是一个Slice,其大小为queue.mem.events(默认值为4096)。这部分功能对应代码如下(只保留了关键代码):

// libbeat/publisher/queue/memqueue/eventloop.go
// 创建EventLoop
func newBufferingEventLoop(b *Broker, size int, minEvents int, flushTimeout time.Duration) *bufferingEventLoop {
    l := &bufferingEventLoop{
        broker:       b,
        maxEvents:    size,
        minEvents:    minEvents,
        flushTimeout: flushTimeout,
        // 直接使用Broker的events
        events:    b.events,
        get:       nil,
        pubCancel: b.pubCancel,
        acks:      b.acks,
    }
    l.buf = newBatchBuffer(l.minEvents)

    l.timer = time.NewTimer(flushTimeout)
    if !l.timer.Stop() {
        <-l.timer.C
    }

    return l
}

func (l *bufferingEventLoop) run() {
    var (
        broker = l.broker
    )

    for {
        select {
        case <-broker.done:
            return

        case req := <-l.events: // producer pushing new event
            l.handleInsert(&req)
        }
    }
}

func (l *bufferingEventLoop) handleInsert(req *pushRequest) {
    // insert会把数据写入batchBuffer
    if l.insert(req) {
        l.eventCount++
        if l.eventCount == l.maxEvents {
            // 队列面了就把chan设置为nil,此时写会被阻塞。等收到ack(handleACK)后又会恢复队列
            l.events = nil // stop inserting events if upper limit is reached
        }
    }
}

数据到batchBuffer之后,eventConsumer会按照用户配置的规则批量从batchBuffer读取数据,并写入workQueue,这是一个channel。这部分功能对应代码如下(只保留了关键代码):

// libbeat/publisher/pipeline/consumer.go
func (c *eventConsumer) loop(consumer queue.Consumer) {
    log := c.logger

    log.Debug("start pipeline event consumer")

    var (
        out    workQueue
        batch  *Batch
        paused = true
    )

    for {
        select {
        case <-c.done:
            log.Debug("stop pipeline event consumer")
            return
        case sig := <-c.sig:
            handleSignal(sig)
        // 将数据写入 workQueue
        case out <- batch:
            batch = nil
        }
    }
}

数据到workQueue之后,再由netClientWorker模块读取并通过调用output的Publish将数据真正的发送出去。这里以ElasticSearch类型的output为例展示代码流程(只保留了关键代码):

// libbeat/publisher/pipeline/output.go
func (w *netClientWorker) run() {
    for !w.closed.Load() {
        reconnectAttempts := 0

        // 从 workQueue读取数据
        // send loop
        for batch := range w.qu {
            if w.closed.Load() {
                if batch != nil {
                    batch.Cancelled()
                }
                return
            }
            // 发送到output
            err := w.client.Publish(batch) // libbeat/outputs/backoff.go
            if err != nil {
                logp.Err("Failed to publish events: %v", err)
                // on error return to connect loop
                break
            }
        }
    }
}

// libbeat/outputs/backoff.go
func (b *backoffClient) Publish(batch publisher.Batch) error {
    err := b.client.Publish(batch) // libbeat/outputs/elasticsearch/client.go
    if err != nil {
        b.client.Close()
    }
    backoff.WaitOnError(b.backoff, err)
    return err
}

// libbeat/outputs/elasticsearch/client.go
func (client *Client) Publish(batch publisher.Batch) error {
    events := batch.Events()
    rest, err := client.publishEvents(events)
    if len(rest) == 0 {
        batch.ACK() // libbeat/publisher/pipeline/batch.go
    } else {
        batch.RetryEvents(rest)
    }
    return err
}

至此,整个pipeline的数据流程就算完成了,其实都是各种代码调用,并不复杂,只是需要花时间去看代码而已。接下来,再补充介绍一下pipeline中比较核心的spooler。

Queue

Filebeat提供了2种队列:内存队列和文件队列。实际中绝大多数应该用的都是内存队列,这里也仅介绍内存队列,文件队列的实现在libbeat/publisher/queue/spool目录下,有兴趣的自行查看,核心的东西和内存队列一致。内存队列的定义在libbeat/publisher/queue/memqueue目录下,定义队列的文件是broker.go

// 内存队列在代码中叫Broker
type Broker struct {
    done chan struct{}

    logger logger

    bufSize int
    // buf         brokerBuffer
    // minEvents   int
    // idleTimeout time.Duration

    // api channels
    events    chan pushRequest
    requests  chan getRequest
    pubCancel chan producerCancelRequest

    // internal channels
    acks          chan int
    scheduledACKs chan chanList

    eventer queue.Eventer

    // wait group for worker shutdown
    wg          sync.WaitGroup
    waitOnClose bool
}

前面的介绍的event buffer就是这里的events字段。该文件中还有一个NewBroker函数比较重要,里面是创建一个Broker,并且定义了eventLoop接口,该接口有2个实现:

  • directEventLoop(queue.mem.flush.min_events = 1
  • bufferingEventLoop(queue.mem.flush.min_events > 1

queue.mem.flush.min_events的默认值为2048,所以创建的是bufferingEventLoop,这里仅介绍该类型的EventLoop:

// libbeat/publisher/queue/memqueue/eventloop.go
// bufferingEventLoop implements the broker main event loop.
// Events in the buffer are forwarded to consumers only if the buffer is full or on flush timeout.
type bufferingEventLoop struct {
    broker *Broker

    buf        *batchBuffer
    flushList  flushList
    eventCount int

    minEvents    int
    maxEvents    int
    flushTimeout time.Duration

    // active broker API channels
    events    chan pushRequest
    get       chan getRequest
    pubCancel chan producerCancelRequest

    // ack handling
    acks        chan int      // ackloop -> eventloop : total number of events ACKed by outputs
    schedACKS   chan chanList // eventloop -> ackloop : active list of batches to be acked
    pendingACKs chanList      // ordered list of active batches to be send to the ackloop
    ackSeq      uint          // ack batch sequence number to validate ordering

    // buffer flush timer state
    timer *time.Timer
    idleC <-chan time.Time
}

前面提到的batchBuffer就是这里的buf字段。另外,看代码的时候注意区分一下spooler这个词的含义,大多数时候它都指代的是queue。同时,文件队列也称为spooler。

至此,整个pipeline就介绍完了。到这里,从Filebeat实例创建,到数据采集、发送都全部介绍完了。还差的就是ACK流程,这个是和数据发送流程相反的。

ACK流程

Filebeat基于Registrar+ACK的方式实现了至少发送一次的保证,Registrar前面已经介绍过了,最后看下ACK的流程,这部分相对复杂一些。

再看下Registrar的逻辑:

// filebeat/registrar/registrar.go
type Registrar struct {
    Channel      chan []file.State
    out          successLogger
    done         chan struct{}
    registryFile string      // Path to the Registry File
    fileMode     os.FileMode // Permissions to apply on the Registry File
    wg           sync.WaitGroup

    states               *file.States // Map with all file paths inside and the corresponding state
    gcRequired           bool         // gcRequired is set if registry state needs to be gc'ed before the next write
    gcEnabled            bool         // gcEnabled indicates the registry contains some state that can be gc'ed in the future
    flushTimeout         time.Duration
    bufferedStateUpdates int
}


func (r *Registrar) Run() {
    logp.Debug("registrar", "Starting Registrar")
    // Writes registry on shutdown
    defer func() {
        r.writeRegistry()
        r.wg.Done()
    }()

    var (
        timer  *time.Timer
        flushC <-chan time.Time
    )

    for {
        select {
        case <-r.done:
            logp.Info("Ending Registrar")
            return
        case <-flushC:
            flushC = nil
            timer.Stop()
            r.flushRegistry()
        case states := <-r.Channel:         // 依靠这个channel通信
            // 收到channel中的确认信息之后,将数据写入registry文件
            r.onEvents(states)
            if r.flushTimeout <= 0 {
                r.flushRegistry()
            } else if flushC == nil {
                timer = time.NewTimer(r.flushTimeout)
                flushC = timer.C
            }
        }
    }
}

Registrar结构定义了一个Channel字段,这个channel就是用来接收ack消息的。Run方法里面从这个channel读取数据。然后看下ack数据是如何写到这个channel的。

首先filebeat.go初始化时注册了全局ACK处理回调:

// filebeat/beater/filebeat.go
// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {

    // Make sure all events that were published in
    registrarChannel := newRegistrarLogger(registrar)

    // 注册消息成功发送的回调
    err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
        ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
    })
    if err != nil {
        logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
        return err
    }
}
// filebeat/beater/filebeat.go
func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
    return &registrarLogger{
        done: make(chan struct{}),
        ch:   reg.Channel,
    }
}

// filebeat/beater/channels.go
type registrarLogger struct {
    done chan struct{}
    ch   chan<- []file.State
}
// filebeat/beater/channels.go
func (l *registrarLogger) Published(states []file.State) {
    select {
    case <-l.done:
        // set ch to nil, so no more events will be send after channel close signal
        // has been processed the first time.
        // Note: nil channels will block, so only done channel will be actively
        //       report 'closed'.
        l.ch = nil
    case l.ch <- states:
    }
}

// filebeat/beater/acker.go
type statefulLogger interface {
    Published(states []file.State)
}
// filebeat/beater/acker.go
func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer {
    return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")}
}

// 注册的回调函数
func (a *eventACKer) ackEvents(data []interface{}) {
    stateless := 0
    states := make([]file.State, 0, len(data))
    for _, datum := range data {
        if datum == nil {
            stateless++
            continue
        }

        st, ok := datum.(file.State)
        if !ok {
            stateless++
            continue
        }

        states = append(states, st)
    }

    if len(states) > 0 {
        a.log.Debugw("stateful ack", "count", len(states))
        a.stateful.Published(states) // filebeat/beater/channels.go: func (l *registrarLogger) Published(states []file.State)
    }

    if stateless > 0 {
        a.log.Debugw("stateless ack", "count", stateless)
        a.stateless.Published(stateless) //
    }
}

这里最终注册的回调是eventACKer,里面的重点是a.stateful.Published,看下这个的实现:

// filebeat/beater/channels.go
func (l *registrarLogger) Published(states []file.State) {
    select {
    case <-l.done:
        // set ch to nil, so no more events will be send after channel close signal
        // has been processed the first time.
        // Note: nil channels will block, so only done channel will be actively
        //       report 'closed'.
        l.ch = nil
    case l.ch <- states:
    }
}

里面将最终的ack消息(states)写到了l.ch。这个channel就是Registrar那里的channel(从注册的代码里面可以分析出来),即回调函数将ack消息写入channel,然后Registrar从channel中读取states数据,写入registry文件,这样形成一个闭环。如下图:

现在的问题就是:这个ackEvents回调函数的ack又是哪来的呢?是谁(who),在什么地方(where),什么时候(when),以何种方式(how)发送到ackEvents的?首先推断一下,既然是ack,那最源头当然应该是最终发送数据的地方发出,即发送数据完成得到外部确认之后,反向传递ack,正好和采集的时间传递方向相反,也就是核心应该在Publisher里面,或者说libbeat的pipeline里面。下面从pipeline中最核心的(内存)队列Broker模块开始分析。

// libbeat/publisher/queue/memqueue/broker.go
// NewBroker creates a new broker based in-memory queue holding up to sz number of events.
// If waitOnClose is set to true, the broker will block on Close, until all internal
// workers handling incoming messages and ACKs have been shut down.
func NewBroker(
    logger logger,
    settings Settings,
) *Broker {

    var eventLoop interface {
        run()
        processACK(chanList, int)
    }

    // 创建EventLoop
    if minEvents > 1 {
        eventLoop = newBufferingEventLoop(b, sz, minEvents, flushTimeout)
    } else {
        eventLoop = newDirectEventLoop(b, sz)
    }

    b.bufSize = sz
    // 创建AckLoop
    ack := newACKLoop(b, eventLoop.processACK)

    b.wg.Add(2)
    go func() {
        defer b.wg.Done()
        eventLoop.run()
    }()
    // 这个goroutine中启动 ack
    go func() {
        defer b.wg.Done()
        ack.run()
    }()
}

// libbeat/publisher/queue/memqueue/ackloop.go
func newACKLoop(b *Broker, processACK func(chanList, int)) *ackLoop {
    l := &ackLoop{broker: b}
    l.processACK = processACK
    return l
}

NewBroker中创建了eventLoop和ackLoop,前者用于发送数据,前面已经讲过,后者则用于处理ack。看下ackLoop的代码:

// libbeat/publisher/queue/memqueue/ackloop.go
// ackLoop implements the brokers asynchronous ACK worker.
// Multiple concurrent ACKs from consecutive published batches will be batched up by the
// worker, to reduce the number of signals to return to the producer and the
// broker event loop.
// Producer ACKs are run in the ackLoop go-routine.
type ackLoop struct {
    broker *Broker
    sig    chan batchAckMsg     // 确认消息发送的channel
    lst    chanList

    totalACK   uint64
    totalSched uint64

    batchesSched uint64
    batchesACKed uint64

    processACK func(chanList, int)
}

func (l *ackLoop) run() {
    for {
        select {
        case <-l.broker.done:
            return

        case acks <- acked:
            acks, acked = nil, 0

        case lst := <-l.broker.scheduledACKs:
            count, events := lst.count()
            l.lst.concat(&lst)

            l.batchesSched += uint64(count)
            l.totalSched += uint64(events)

        // 这里等待batch发送完成的确认信号
        case <-l.sig:
            acked += l.handleBatchSig()
            if acked > 0 {
                acks = l.broker.acks
            }
        }
    }
}

可以看到,ackloop中在等待batch发送完成的信号(sig),这里有2条线:

  1. 信号如何来的?
  2. 收到信号之后,后续是如何将这个信号发送给前面的那个回调函数?

先来看第1个问题:信号如何来的?根据前面的推断,应该由发送数据那里产生。而由pipeline部分的分析知道最终数据发送在output那里。此处继续以ES这种类型的output为例,看下最终发送的代码,从那里反推:

// libbeat/outputs/elasticsearch/client.go  这是之前ES部分最终的发送代码
func (client *Client) Publish(batch publisher.Batch) error {
    events := batch.Events()
    rest, err := client.publishEvents(events)
    if len(rest) == 0 {
        batch.ACK() // 重点看这里的ACK,这个会跳转到libbeat/publisher/pipeline/batch.go
    } else {
        batch.RetryEvents(rest)
    }
    return err
}

// libbeat/publisher/pipeline/batch.go
func (b *Batch) ACK() {
    b.ctx.observer.outBatchACKed(len(b.events)) // libbeat/publisher/pipeline/monitoring.go: func (o *metricsObserver) outBatchACKed(int) {}  这个是监控用的
    b.original.ACK()   // 重点看这里: libbeat/publiser/queue/memequeue/consumer.go
    releaseBatch(b)
}

// libbeat/publisher/queue/memqueue/consume.go
func (b *batch) ACK() {
    if b.state != batchActive {
        switch b.state {
        case batchACK:
            panic("Can not acknowledge already acknowledged batch")
        default:
            panic("inactive batch")
        }
    }

    b.report()      // 重点在这里
}
// libbeat/publisher/queue/memqueue/consume.go
func (b *batch) report() {
    b.ack.ch <- batchAckMsg{}      // 最终在这里发送了确认ACK
}

Bingo!在Publish里面发送完成(可能失败,可能成功)之后,就会发送ACK。然后根据调用关系往回推,最终在report中发送了ack,第1个问题就解答了,ack信号就是这样来的。然后看第2个问题:收到信号之后,后续是如何将这个信号发送给前面的那个回调函数?接着看收到信号后调用的handleBatchSig的代码:

// libbeat/publisher/queue/memqueue/ackloop.go
// handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig
// is run by the ackLoop.
func (l *ackLoop) handleBatchSig() int {

    if count > 0 {
        if e := l.broker.eventer; e != nil {
            e.OnACK(count)
        }
        // 这里会调用之前EventLoop的processACK,我们用的是bufferingEventLoop,所以会调用(*bufferingEventLoop)processACK
        // report acks to waiting clients
        l.processACK(lst, count) // libbeat/publisher/queue/memqueue/eventloop.go#(*bufferingEventLoop)processACK
    }

}

// libbeat/publisher/queue/memqueue/eventloop.go
func (l *bufferingEventLoop) processACK(lst chanList, N int) {
    for !lst.empty() {
            // 重点在这里,这里会调用ackEvents发送确认消息
            st.state.cb(int(count)) // libbeat/publisher/pipeline/acker.go (a *eventDataACK) ackEvents(n int)
        }
    }
}

handleBatchSig里面调用了对应类型的eventLoop的processACK方法,该方法内部会转到pipeline的acker.go,下面给出代码流转:

  1. libbeat/publisher/queue/memqueue/eventloop.go: (l *bufferingEventLoop) processACK --> st.state.cb(int(count))
  2. libbeat/publisher/pipeline/acker.go: func (a *eventDataACK) ackEvents(n int) { a.acker.ackEvents(n) }
  3. libbeat/publisher/pipeline/acker.go: func (a *boundGapCountACK) ackEvents(n int) { a.acker.ackEvents(n) }
  4. libbeat/publisher/pipeline/acker.go: func (a *gapCountACK) ackEvents(n int) {}

看下最后一个ackEvents:

func (a *gapCountACK) ackEvents(n int) {
    select {
    case <-a.pipeline.ackDone: // pipeline is closing down -> ignore event
        a.acks = nil
    // ack数n写入了a.acks
    case a.acks <- n: // send ack event to worker
    }
}

// gapCountACK returns event ACKs to the producer, taking account for dropped events.
// Events being dropped by processors will always be ACKed with the last batch ACKed
// by the broker. This way clients waiting for ACKs can expect all processed
// events being always ACKed.
type gapCountACK struct {
    pipeline *Pipeline

    fn func(total int, acked int)

    done chan struct{}

    drop chan struct{}
    acks chan int

    events atomic.Uint32
    lst    gapList
}

数据会写入gapCountACKacks之后,会在ackLoop中读取:

// libbeat/publisher/pipeline/acker.go
func (a *gapCountACK) ackLoop() {
    acks, drop := a.acks, a.drop
    closing := false
    for {
        select {
        case <-a.done:
            closing = true
            a.done = nil
            if a.events.Load() == 0 {
                // stop worker, if all events accounted for have been ACKed.
                // If new events are added after this acker won't handle them, which may
                // result in duplicates
                return
            }

        case <-a.pipeline.ackDone:
            return

        case n := <-acks:
            // 重点:从acks读出n之后调用handleACK处理
            empty := a.handleACK(n)
            if empty && closing && a.events.Load() == 0 {
                // stop worker, if and only if all events accounted for have been ACKed
                return
            }

        case <-drop:
            // TODO: accumulate multiple drop events + flush count with timer
            a.events.Sub(1)
            a.fn(1, 0)
        }
    }
}

func (a *gapCountACK) handleACK(n int) bool {
    a.events.Sub(uint32(total))
    a.fn(total, acked) // line 326: func (a *boundGapCountACK) onACK(total, acked int) {
    return emptyLst
}

从acks读出n之后调用handleACK处理,后续的调用流程如下:

1. libbeat/publisher/pipeline/acker.go: handleACK --> a.fn(total, acked)
2. libbeat/publisher/pipeline/acker.go: func (a *boundGapCountACK) onACK(total, acked int) --> a.fn(total, acked)
3. libbeat/publisher/pipeline/acker.go: func (a *eventDataACK) onACK(total, acked int) --> a.fn(data, acked) 
4. libbeat/publisher/pipeline/pipeline_ack.go: func (p *pipelineEventCB) onEvents(data []interface{}, acked int)

最终进入到pipelineEventCB中,这个结构是内部处理ack的(我理解pipelineEventCB命名的含义是pipeline中event的callback函数,可见它是处理ack的核心),看下关键代码:

// libbeat/publisher/pipeline/pipeline_ack.go
// pipelineEventCB internally handles active ACKs in the pipeline.
// It receives ACK events from the queue and the individual clients.
// Once the queue returns an ACK to the pipelineEventCB, the worker loop will collect
// events from all clients having published events in the last batch of events
// being ACKed.
// the PipelineACKHandler will be notified, once all events being ACKed
// (including dropped events) have been collected. Only one ACK-event is handled
// at a time. The pipeline global and clients ACK handler will be blocked for the time
// an ACK event is being processed.
type pipelineEventCB struct {
    done chan struct{}

    acks chan int

    // 这个字段是关键,确认信息会写到这个channel,然后在worker中读出,最终写入到Registrar的channel
    events        chan eventsDataMsg    
    droppedEvents chan eventsDataMsg

    mode    pipelineACKMode
    handler beat.PipelineACKHandler
}

其中的events字段是关键,确认信息会写到这个channel,然后在worker中读出,最终写入到Registrar的channel。接着之前的调用,看数据如何写到events这个channel:

// reportEvents sends a batch of ACKed events to the ACKer.
// The events array contains send and dropped events. The `acked` counters
// indicates the total number of events acked by the pipeline.
// That is, the number of dropped events is given by `len(events) - acked`.
// A client can report events with acked=0, iff the client has no waiting events
// in the pipeline (ACK ordering requirements)
//
// Note: the call blocks, until the ACK handler has collected all active events
//       from all clients. This ensure an ACK event being fully 'captured'
//       by the pipeline, before receiving/processing another ACK event.
//       In the meantime the queue has the chance of batching-up more ACK events,
//       such that only one ACK event is being reported to the pipeline handler
func (p *pipelineEventCB) onEvents(data []interface{}, acked int) {
    p.pushMsg(eventsDataMsg{data: data, total: len(data), acked: acked})
}

func (p *pipelineEventCB) pushMsg(msg eventsDataMsg) {
    if msg.acked == 0 {
        p.droppedEvents <- msg
    } else {
        msg.sig = make(chan struct{})
        p.events <- msg // 此处写入channel后,在(p *pipelineEventCB) worker()的collect中读出,最后reportEventsData
        <-msg.sig
    }
}

pushMsg中,将消息写入events中。然后看下channel另一端的读取:

func (p *pipelineEventCB) worker() {
    defer close(p.acks)
    defer close(p.events)
    defer close(p.droppedEvents)

    for {
        select {
        case count := <-p.acks:
            // 在collect中读取消息
            exit := p.collect(count)
            if exit {
                return
            }

            // short circuit dropped events, but have client block until all events
            // have been processed by pipeline ack handler
        case msg := <-p.droppedEvents:
            p.reportEventsData(msg.data, msg.total)
            if msg.sig != nil {
                close(msg.sig)
            }

        case <-p.done:
            return
        }
    }
}

func (p *pipelineEventCB) collect(count int) (exit bool) {
    for acked < count {
        var msg eventsDataMsg
        select {
        // 在此处读取消息
        case msg = <-p.events:
        case msg = <-p.droppedEvents:
        case <-p.done:
            exit = true
            return
        }
    }

    p.reportEventsData(data, total)
    return
}

func (p *pipelineEventCB) reportEventsData(data []interface{}, total int) {
    // report ACK back to the beat
    switch p.mode {
    case countACKMode:
        p.handler.ACKCount(total)
    case eventsACKMode:
        // 这里调用之前在 Filebeat中注册的 ACKEvents
        p.handler.ACKEvents(data) // filebeat/beater/acker.go: func (a *eventACKer) ackEvents(data []interface{})
    case lastEventsACKMode:
        p.handler.ACKLastEvents(data)
    }
}

可以看到从channel中读出确认消息后,最终会在reportEventsData中调用之前在Filebeat中注册的ACKEvents:eventACKer。至此,第2个问题也得到了解答,并且和之前的回调函数成功对接。把之前的图补充完整如下:

本文从源码角度对Filebeat的核心数据流转进行了简单的分析,为了突出重点,省掉了很多细节代码,比如各个环节的数据结构的实例是何时创建的,又是何时启动的,以及一些异常分支的处理、失败重传等。正如前文所说,本文并不是一篇全面的Filebeat代码深度剖析,而是通用配置下的核心数据流转分析。这篇文章我改了很多遍,总感觉没写什么东西,只是贴了了大量代码,然后讲了代码的调用点,而这些其实自己debug几遍也就清楚了。不过话说回来,如果事先知道整个逻辑流程,以及关键调用点,就能在debug时做到胸有成竹、有所侧重,从而节省很多时间。我想这篇文章对我最大的意义有两点:一个是让自己对Filebeat更加了解,另外一个是可能会给其它看源码的人节省一些时间。这就够了。

Reference


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK