26

Golang处理大数据时使用高效的Pipeline(流水线)执行模型 - 个人文章 - SegmentFault...

 4 years ago
source link: https://segmentfault.com/a/1190000014788594?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

并发是件技术活

Golang被证明非常适合并发编程,goroutine比异步编程更易读、优雅、高效。本文提出一个适合由Golang实现的Pipeline执行模型,适合批量处理大量数据(ETL)的情景。

想象这样的应用情景:

  1. 从数据库A(Cassandra)加载用户评论(量巨大,例如10亿条);
  2. 根据每条评论的用户ID、从数据库B(MySQL)关联用户资料;
  3. 调用NLP服务(自然语言处理),处理每条评论;
  4. 将处理结果写入数据库C(ElasticSearch)。

由于应用中遇到的各种问题,归纳出这些需求:
需求一:应分批处理数据,例如规定每批100条。出现问题时(例如任意一个数据库故障)则中断,下次程序启动时使用checkpoint从中断处恢复。
需求二:每个流程设置合理的并发数、让数据库和NLP服务有合理的负载(不影响其它业务的基础上,尽可能占用更多资源以提高ETL性能)。例如,步骤(1)-(4)分别设置并发数1、4、8、2。

这就是一个典型的Pipeline(流水线)执行模型。把每一批数据(例如100条)看作流水线上的产品,4个步骤对应流水线上4个处理工序,每个工序处理完毕后就把半成品交给下一个工序。每个工序可以同时处理的产品数各不相同。

你可能首先想到启用1+4+8+2个goroutine,使用channel来传递数据。我也曾经这么干,结论就是这么干会让程序员疯掉:流程并发控制代码非常复杂,特别是你得处理异常、执行时间超出预期、可控中断等问题,你不得不加入一堆channel,直到你自己都不记得有什么用。

可重用的Pipeline模块

为了更高效完成ETL工作,我将Pipeline抽象成模块。我先把代码粘贴出来,再解析含义。模块可以直接使用,主要使用的接口是:NewPipeline、Async、Wait。

package main

import "sync"

func HasClosed(c <-chan struct{}) bool {
    select {
    case <-c: return true
    default: return false
    }
}

type SyncFlag interface{
    Wait()
    Chan() <-chan struct{}
    Done() bool
}

func NewSyncFlag() (done func(), flag SyncFlag) {
    f := &syncFlag{
        c : make(chan struct{}),
    }
    return f.done, f
}

type syncFlag struct {
    once sync.Once
    c chan struct{}
}

func (f *syncFlag) done() {
    f.once.Do(func(){
        close(f.c)
    })
}

func (f *syncFlag) Wait() {
    <-f.c
}

func (f *syncFlag) Chan() <-chan struct{} {
    return f.c
}

func (f *syncFlag) Done() bool {
    return HasClosed(f.c)
}

type pipelineThread struct {
    sigs []chan struct{}
    chanExit chan struct{}
    interrupt SyncFlag
    setInterrupt func()
    err error
}

func newPipelineThread(l int) *pipelineThread {
    p := &pipelineThread{
        sigs : make([]chan struct{}, l),
        chanExit : make(chan struct{}),
    }
    p.setInterrupt, p.interrupt = NewSyncFlag()

    for i := range p.sigs {
        p.sigs[i] = make(chan struct{})
    }
    return p
}

type Pipeline struct {
    mtx sync.Mutex
    workerChans []chan struct{}
    prevThd *pipelineThread
}

//创建流水线,参数个数是每个任务的子过程数,每个参数对应子过程的并发度。
func NewPipeline(workers ...int) *Pipeline {
    if len(workers) < 1 { panic("NewPipeline need aleast one argument") }

    workersChan := make([]chan struct{}, len(workers))
    for i := range workersChan {
        workersChan[i] = make(chan struct{}, workers[i])
    }

    prevThd := newPipelineThread(len(workers))
    for _,sig := range prevThd.sigs {
        close(sig)
    }
    close(prevThd.chanExit)

    return &Pipeline{
        workerChans : workersChan,
        prevThd : prevThd,
    }
}

//往流水线推入一个任务。如果第一个步骤的并发数达到设定上限,这个函数会堵塞等待。
//如果流水线中有其它任务失败(返回非nil),任务不被执行,函数返回false。
func (p *Pipeline) Async(works ...func()error) bool {
    if len(works) != len(p.workerChans) {
        panic("Async: arguments number not matched to NewPipeline(...)")
    }

    p.mtx.Lock()
    if p.prevThd.interrupt.Done() {
        p.mtx.Unlock()
        return false
    }
    prevThd := p.prevThd
    thisThd := newPipelineThread(len(p.workerChans))
    p.prevThd = thisThd
    p.mtx.Unlock()

    lock := func(idx int) bool {
        select {
        case <-prevThd.interrupt.Chan(): return false
        case <-prevThd.sigs[idx]: //wait for signal
        }
        select {
        case <-prevThd.interrupt.Chan(): return false
        case p.workerChans[idx]<-struct{}{}: //get lock
        }
        return true
    }
    if !lock(0) {
        thisThd.setInterrupt()
        <-prevThd.chanExit
        thisThd.err = prevThd.err
        close(thisThd.chanExit)
        return false
    }
    go func() { //watch interrupt of previous thread
        select {
        case <-prevThd.interrupt.Chan():
            thisThd.setInterrupt()
        case <-thisThd.chanExit:
        }
    }()
    go func() {
        var err error
        for i,work := range works {
            close(thisThd.sigs[i]) //signal next thread
            if work != nil {
                err = work()
            }
            if err != nil || (i+1 < len(works) && !lock(i+1)) {
                thisThd.setInterrupt()
                break
            }
            <-p.workerChans[i] //release lock
        }

        <-prevThd.chanExit
        if prevThd.interrupt.Done() {
            thisThd.setInterrupt()
        }
        if prevThd.err != nil {
            thisThd.err = prevThd.err
        } else {
            thisThd.err = err
        }
        close(thisThd.chanExit)
    }()
    return true
}

//等待流水线中所有任务执行完毕或失败,返回第一个错误,如果无错误则返回nil。
func (p *Pipeline) Wait() error {
    p.mtx.Lock()
    lastThd := p.prevThd
    p.mtx.Unlock()
    <-lastThd.chanExit
    return lastThd.err
}

使用这个Pipeline组件,我们的ETL程序将会简单、高效、可靠,让程序员从繁琐的并发流程控制中解放出来:

package main

import "log"

func main() {
    //恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。
    checkpoint := loadCheckpoint()
    
    //工序(1)在pipeline外执行,最后一个工序是保存checkpoint
    pipeline := NewPipeline(4, 8, 2, 1) 
    for {
        //(1)
        //加载100条数据,并修改变量checkpoint
        //data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。
        data, err := extractReviewsFromA(&checkpoint, 100) 
        if err != nil {
            log.Print(err)
            break
        }
        
        //这里有个Golang著名的坑。
        //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。
        //这里创建一个副本curCheckpoint,储存本次循环的checkpoint。
        curCheckpoint := checkpoint
        
        ok := pipeline.Async(func() error {
            //(2)
            return joinUserFromB(data)
        }, func() error {
            //(3)
            return nlp(data)
        }, func() error {
            //(4)
            return loadDataToC(data)
        }, func() error {
            //(5)保存checkpoint
            log.Print("done:", curCheckpoint)
            return saveCheckpoint(curCheckpoint)
        })
        if !ok { break }
        
        if len(data) < 100 { break } //处理完毕
    }
    err := pipeline.Wait()
    if err != nil { log.Print(err) }
}

图片描述

每个方格表示一批数据,黄色表示正在执行所属工序,白色表示已经完成工序但堵塞等待中。

Pipeline的工作方式:

  1. Pipeline分别控制每一个工序的并发数。

    • 如图:(4)的并发数已满,<14>(3)已经完成并堵塞等待(继续占有(3)的并发数),直到<12>(4)完成。
  2. 如果第一个工序的并发数已满,Async会堵塞,直到有线程第一个工序完成。

    • 如图:循环体内的<25>正在等待<21>(2)进入下一个工序。
  3. 每个线程的每个工序的调度,不早于上一个线程同一个工序的调度。

    • 如图:<22>(2)早于<21>(2)完成,<22>须堵塞等待,直到<21>(2)完成。
  4. 如果某个线程的某个工序处理失败(例如数据库故障),那之后的线程都会中止执行,下一次调用Async返回false,pipeline.Wait()返回第一个错误,整个流水线作业可控中断。

    • 例如:<12>(4)失败,那<13>、<14>……无论正在执行到哪一个工序,都不会进入下一个工序而中断。<11>不会受到影响,会一直执行完毕。Wait()等待全部完成或中止,返回<12>(4)的错误。
  5. 无法避免中断过程中有checkpoint后的数据写入。下次重启程序将重新写入、覆盖这些数据。

    • 例如:<12>(4)失败、<13>(4)执行成功(已写入数据),那<12>(5)和<13>(5)都不会被执行,checkpoint的最新状态是<11>写入的,下次重启程序将从<12>开始,<13>的数据会再次写入,所以写入应该按照记录ID作覆盖写入。

Pipeline解决了这些问题:

  1. 控制每个工序的并发数;
  2. 控制整体并发数,不会因为in fly数据太多无限占用内存。
  3. 任何工序出现故障(数据库操作失败),整个流水线可控中断,不会漏处理任何一批记录,也不会导致太多的重新执行。你也可以随时Ctrl+C、微调代码、重启程序,所有事情都会继续有序执行。
  4. 任何工序发生堵塞(例如数据库缓慢),整个流水线都会慢下来等待,不会强行加塞。
  5. 你可以随意修改每个工序的并发数,直到找到最佳值。

用channel在上下游间传递数据是件笨拙的事

如果你刚开始学习Golang,你一定觉得channel这东西好棒。但当你理所当然地用一堆channel来串联一条流水线,就是把自己逼疯的开始。实际上Golang有更棒的东西,我不知道那叫什么,反正你可以在func开启一个goroutine的时候,里面调用外面的变量。

package main

import (
    "fmt"
    "time"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    for i := 0 ; i < 10 ; i++ {
        my_var := i * 10
        wg.Add(1)
        go func() {
            defer wg.Done()
            time.Sleep(time.Second)
            fmt.Println(my_var)
        }()
    }
    wg.Wait()
}

程序会在启动1秒后不按顺序输出0、10、20、…… 90。Runtime创建了10个my_var,每个goroutine各有一个,所以每个goroutine输出不一样的值。

看起来很简单的东西,实际上是Golang的独有特性,涉及到Go runtime的机制,其他语言不得不定义一个对象来解决类似的问题。当我从C++转Go开发时就惊讶:还有这种操作?

上面的Pipeline模块利用了这个特性,它根本不需要任何channel来传递数据,使用者在一个在循环体内定义一个变量来储存一整批的数据,在异步的goroutine中读取、修改这些数据。在goroutine间用channel传递数据的思路转变为:每一批数据由一个goroutine处理,多个gouroutine竞争各个工序的并发数。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK