19

TinySQL学习笔记之SelectionExec

 3 years ago
source link: https://studygolang.com/articles/30718
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本文以中的代码对应的 tinysql 项目版本号为 df75611

SelectionExec 是tinysql第五章第一节中要补全的执行器组件,希望能够通过本文的总结来说明tinysql执行器中的一些概念。

接口

作为执行器组件, SelectionExec 实现了 Executor 接口, 其定义位于 tinysql/executor/executor.go L136

// Executor is the physical implementation of a algebra operator.
//
// In TiDB, all algebra operators are implemented as iterators, i.e., they
// support a simple Open-Next-Close protocol. See this paper for more details:
//
// "Volcano-An Extensible and Parallel Query Evaluation System"
//
// Different from Volcano's execution model, a "Next" function call in TiDB will
// return a batch of rows, other than a single row in Volcano.
// NOTE: Executors must call "chk.Reset()" before appending their results to it.
type Executor interface {
    base() *baseExecutor
    Open(context.Context) error
    Next(ctx context.Context, req *chunk.Chunk) error
    Close() error
    Schema() *expression.Schema
}

每一个 Executor 的实现都是从 baseExecutor 扩展出来的,一般都会继承其中的 base / Schema 方法,前者是将 baseExecutor 返回,而后者返回的是表结构,设计了哪些列和键。

在火山模型中,主要需要使用 Open / Next / Close 三个方法。代码上最基本的逻辑是,当上层 ExecutorNext 方法被调用时,被调用的 Executor 通过调用下层 ExecutorNext 方法返回的 Chunk ,经过一定的处理来构建本层的返回。简单的这么说比较宽泛,所以希望通过分享我自己在写proj时阅读代码的个人理解,帮助大家理解 Executor 的串联。

结构体

SelectionExec 的定义位于 tinysql/executor/executor.go L345

// SelectionExec represents a filter executor.
type SelectionExec struct {
    baseExecutor                           // 基础结构

    batched     bool                       // 是否以批处理的形式返回结果
    filters     []expression.Expression    // 过滤器表达式列表
    selected    []bool                     // 过滤结果buffer 
    inputIter   *chunk.Iterator4Chunk      // 迭代器
    inputRow    chunk.Row                  // 迭代当前行
    childResult *chunk.Chunk               // 下层Executor返回的结果buffer
}

其中 baseExecutor 的定义位于 tinysql/executor/executor.go L55

type baseExecutor struct {
    ctx           sessionctx.Context        // 执行上下文
    id            fmt.Stringer              // 标识
    schema        *expression.Schema        // 表结构
    initCap       int                       // Chunk初始容量
    maxChunkSize  int                       // 返回Chunk的最大尺寸
    children      []Executor                // 下层Executor
    retFieldTypes []*types.FieldType        // 返回的列信息
}

方法

下面根据 SelectionExecExecutor 接口的实现,来说明,主要说明的是

base

实现非常简单,就是直接继承 baseExecutorbase方法

// base returns the baseExecutor of an executor, don't override this method!
func (e *baseExecutor) base() *baseExecutor {
    return e
}

Schema

这里实现也是直接继承了 baseExecutorSchema方法

// Schema returns the current baseExecutor's schema. If it is nil, then create and return a new one.
func (e *baseExecutor) Schema() *expression.Schema {
    if e.schema == nil {
        return expression.NewSchema()
    }
    return e.schema
}

Open

SelectionExec 对Open方法进行了 重写 , 本质上Open方法是进行了初始化操作

// Open implements the Executor Open interface.
func (e *SelectionExec) Open(ctx context.Context) error {
  // 调用baseExecutor的初始化,类似super
    if err := e.baseExecutor.Open(ctx); err != nil { 
        return err
    }
  // 注意这里这newFirstChunk的名字具有一定的迷惑性,实际上根据下层Executor的属性来构建chunk
    e.childResult = newFirstChunk(e.children[0]) 
  // 判断是否可以filters是否可以向量化执行
  // 其实就是检查是否所有的filter都可以向量化, 只有所有filter都可以向量化,才可以进行批执行
    e.batched = expression.Vectorizable(e.filters)
    if e.batched {
    // 如果可以进行批执行的话,构建一个bool切片作为buffer,来保存过滤器的选择情况
    // 在这里初始化好了这块空间,只要之后没有发生切片的resize,那么始终使用的是这块空间
    // 减轻内存分配和GC的压力
        e.selected = make([]bool, 0, chunk.InitialCapacity)
    }
  // 注意这里仅仅是完成了iterator和chunk的绑定,此时chunk中没有数据,iterator也没有意义
    e.inputIter = chunk.NewIterator4Chunk(e.childResult)
  // 这里就是指向了一个空Row
    e.inputRow = e.inputIter.End()
    return nil
}

baseExecutor实现 如下

// Open initializes children recursively and "childrenResults" according to children's schemas.
func (e *baseExecutor) Open(ctx context.Context) error {
  // 本质上就是遍历所有位于下层的Executor调用一遍Open
  // 注意一点就是,位于下层的Executor会先于当前Executor被初始化
    for _, child := range e.children {
        err := child.Open(ctx)
        if err != nil {
            return err
        }
    }
    return nil
}

Close

SelectionExec 对Close方法进行了 重写 , 本质上Close方法是进行了资源释放的作用

// Close implements plannercore.Plan Close interface.
func (e *SelectionExec) Close() error {
  // 清空两个buffer
    e.childResult = nil
    e.selected = nil
    return e.baseExecutor.Close()
}

baseExecutor实现 如下

// Close closes all executors and release all resources.
func (e *baseExecutor) Close() error {
    var firstErr error
  // 与Open时相似,就是直接调用一遍下层Executor,
    for _, src := range e.children {
        if err := src.Close(); err != nil && firstErr == nil {
            firstErr = err
        }
    }
    return firstErr
}

Next

SelectionExec 的Next方法同样进行了 重写 ,这里也是需要我们进行填充的部分,但是这段代码内外两层循环乍一看上去有一些令人费解。

// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {
    req.GrowAndReset(e.maxChunkSize)

    if !e.batched {
        return e.unBatchedNext(ctx, req)
    }

    /*
        Exit the loop when:
            1. the `req` chunk` is full.
            2. there is no further results from child.
            3. meets any error.
     */
    for {
        // Fill in the `req` util it is full or the `inputIter` is fully processed.
        for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
            // Your code here.
        }
        err := Next(ctx, e.children[0], e.childResult)
        if err != nil {
            return err
        }
        // no more data.
        if e.childResult.NumRows() == 0 {
            return nil
        }
        /* Your code here.
           Process and filter the child result using `expression.VectorizedFilter`.
         */
    }
}

根据课程文档的建议是要求我们通过阅读 unBatchedNext方法 来学习函数功能,该函数就是单行处理我们要实现的逻辑的单行处理版本,并且该函数与我们要实现的部分的代码结构类似,通过理解这部分代码,也能帮助我们知道批处理时大概的处理流程。

// unBatchedNext filters input rows one by one and returns once an input row is selected.
// For sql with "SETVAR" in filter and "GETVAR" in projection, for example: "SELECT @a FROM t WHERE (@a := 2) > 0",
// we have to set batch size to 1 to do the evaluation of filter and projection.
func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) error {
  // 外循环, 本质上是更新下层结果集
    for {
    // 内循环,本质是不断迭代遍历下层结果集,直到返回一个被选中的行时,插入要返回的chunk,并
    //
    // 注意第一次调用Next的时,由于`e.inputRow`是一个空Row,`e.inputIter.End()`同样也会返回一个空Row
    // 因此第一次调用Next的时候并不会先进入内循环的逻辑
        for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
            selected, _, err := expression.EvalBool(e.ctx, e.filters, e.inputRow)
            if err != nil {
                return err
            }
            if selected {
                chk.AppendRow(e.inputRow)
                e.inputRow = e.inputIter.Next()
                return nil
            }
        }
    
    /* 这里才是第一次执行时的实际开始位置 */
    
    // Next函数的本质是将调用下层children[0]的Next方法,并将调用结果更新到childResult当中
        err := Next(ctx, e.children[0], e.childResult)
        if err != nil {
            return err
        }
    // 将inputRow定位到更新后的chunk第一行
        e.inputRow = e.inputIter.Begin()
        // 如果已经没有数据了,那就直接返回nil
        if e.childResult.NumRows() == 0 {
            return nil
        }
    }
}

那么根据这块流程的理解,那么我们就可以大致补充出代码

// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {
  // 这里由Callee对结果集chunk进行重置
  // 在批处理时,会返回maxChunkSize限定大小的结果集
    req.GrowAndReset(e.maxChunkSize)

    if !e.batched {
        return e.unBatchedNext(ctx, req)
    }

    /*
        Exit the loop when:
            1. the `req` chunk` is full.
            2. there is no further results from child.
            3. meets any error.
    */
    for {
        // Fill in the `req` util it is full or the `inputIter` is fully processed.
        for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
            /* Your code here. */
      // 根据过滤结果buffer中的数据判断当前行是否被选中,如果被选中了则添加到结果集中
            if e.selected[e.inputRow.Idx()] {
                req.AppendRow(e.inputRow)
        if req.IsFull() { // 如果结果集被填满了,那么需要将inputRow未被检索的第一行,并返回
               e.inputRow = e.inputIter.Next()
                    return nil
                }
            }
      
        }
    // 这里是调用volcano模型处在下层的子语句的Next方法, 并赋值到当前的childResult中,更新下层结果集内容
        err := Next(ctx, e.children[0], e.childResult) 
        if err != nil {
            return err
        }
        // no more data.
        if e.childResult.NumRows() == 0 {
            return nil
        }
        /* Your code here.
           Process and filter the child result using `expression.VectorizedFilter`.
        */
        // 这里主要是重复利用selected所申请的空间, 注意一定要赋值e.selected, 进行同步改变
        e.inputRow = e.inputIter.Begin()
    // selectd保存使用向量filters过滤后的结果
        e.selected, err = expression.VectorizedFilter(e.ctx, e.filters, e.inputIter, e.selected) 
        if err != nil {
            return nil
        }
    }
}

那么大家可能有一个疑问就是为什么需要使用内外两层循环来实现,这里按照我的个人理解做一定的解释

其实这里代码结构设计的非常精彩, 整个结构看起来就像是一个变速器。

外循环的作用是充分利用当前在 e.inputRow / e.inputIter / e.selected / e.childResult 中的缓存, 因为上下层的chunk可能是有一定尺寸差距的

内循环的作用是进行结果集的附加, 会有两种退出内层循环的情况

  • 一旦结果集被填充满, 就退出函数, 完成了当前的处理,当前还没处理的信息仍然保留在 e.inputRow / e.inputIter / e.selected / e.childResult 中, 下一次调用Next的时候就可以接着从 e.inputRow 开始
  • 如果遍历完了下层的结果集, 当前层的结果集仍然还没有被填满, 那么就会从调用 Next 更新下层的结果集, 然后在根据filter进行筛选, 将下层结果集中filter过滤后的结果保存到 e.selected 里面, 然后从头开始

下面完整的描述一下调用Next的实际执行流程, 因为实际的执行流程会和代码自上而下的顺序不一样, 这也是这段代码结构精妙的地方

  1. 首先要说明的是 SelectionExec 在调用 Next 之前是需要调用 Open 进行初始化的, 那么 Open 需要关注的是, Open 中进行的仅仅是状态的初始化, 并没有执行实质的计算, ( e.childResult 使用了 newFirstChunk 的时候只是进行了字段/容量/大小的初始化, 并没有进行内容填充), e.childResult 是空的, e.inputItere.inputRow 应该也是空的, 需要在后续步骤中进行初始化.
  2. 第1次调用SelectionExec.Next

    1. 注意的循环条件 e.inputRow != e.inputIter.End() 此时是不成立的, 二者都是空的Row结构体, 所以第一次调用Next的时候完全不进入内循环
    2. 调用Next将下层数据加载到 e.childResult 当中, 进行一些检查
    3. 更新 e.inputRow 使之对应 e.inputIter 的第一个数据
    4. 使用 expression.VectorizedFilter 根据 e.filters 的条件将下层结果集数据的根据过滤器的过滤结果存放到 e.selected
    5. 回到外循环开头往下执行, 那么我们就直接进入了内循环, 在 e.inputRow != e.inputIter.End() 此时已经成立了, 所以可以进入内循环
    6. 在内循环中, 需要判断结果集是否已经被填满

      • 如果没有被填满, 那么就根据筛选结果, 考虑是否将遍历到的行放到结果集中, 当遍历结束时, 步骤b开始继续往下执行
      • 如果已经被填满, 那么就直接返回. 在下层结果集中遍历的状态保存在 e.inputRow / e.inputIter 中, filter过滤的结果放在 e.selected 中, 等待下一次Next调用的时候再调用
  3. 第n次调用 SelectionExec.Next

    1. 如果上一次调用 Next 时还有下层结果集的数据没有遍历完,那么当时的遍历状态仍然保留在 e.inputRow / e.inputIter / e.selected / e.childResult 中, 那么可以直接从2.5开始,进入内循环
    2. 如果上一次调用 Next 时刚好下层结果集的数据也遍历完了,那么 e.inputRow 就会是一个空Row, 从2.1开始往下执行,也就是重新加载下层数据。

有疑问加站长微信联系

iiUfA3j.png!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK