6

Apache Calcite 框架 50 倍性能优化实践

 3 years ago
source link: https://objcoding.com/2021/01/17/calcite/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

某天临时被当成壮丁拉去参加一个非常牛逼的应用监控平台(后续会开源),然后大佬就给我派了一个任务,要将项目中的查询性能优化 50 倍以上,大佬对我如此地寄予厚望,我怎么能让大佬失望呢(虽然我内心瑟瑟发抖)?于是我就开始了这段性能优化之旅。

初步认识 Calcite

项目使用 Calcite 框架作为查询引擎,之前一直没停过这玩意是啥,而且网上资料特别少,又是体现我学习能力的时候了,在着手排查性能问题前,我花了非常多时间在了解 Calcite 实现原理上面。

1、Calcite 简介

Apache Calcite是一款开源的动态数据管理框架,它提供了标准的 SQL 语言、多种查询优化和连接各种数据源的能力,但不包括数据存储、处理数据的算法和存储元数据的存储库。
Calcite 之前的名称叫做optiq,optiq 起初在 Hive 项目中,为 Hive 提供基于成本模型的优化,即CBO(Cost Based Optimizatio)。2014 年 5 月 optiq 独立出来,成为 Apache 社区的孵化项目,2014 年 9 月正式更名为 Calcite。
Calcite 的目标是“one size fits all(一种方案适应所有需求场景)”,希望能为不同计算平台和数据源提供统一的查询引擎。

2、Calcite 执行流程

NramYvi.png!mobile

1)解析 SQL,目的是为了将 SQL 转换成 AST 抽象语法数,Calcite 有一个专门的对象 SqlNode 表示;

2)语法检查,用数据库的元数据信息进行语法验证;

3)逻辑优化,根据前面生成的逻辑计划按照相应的规则(Rule)进行优化;

4)SQL 执行,按照执行计划执行。

3、Calcite 相关对象

  • RelNode:

关系表达式, 主要有 TableScan, Project, Sort, Join 等。如果 SQL 为查询的话,所有关系达式都可以在 SqlSelect中找到, 如 where 和 having 对应的 Filter, selectList 对应 Project, orderBy、offset、fetch 对应着 Sort, From 对应着 TableScan/Join 等等, 示便 Sql 最后会生成如下 RelNode 树。

Debug 源码得到的 RelNode 对象长这样:

aQrM7n.png!mobile

  • RexNode:

行表达式, 如 RexLiteral(常量), RexCall(函数), RexInputRef (输入引用) 等,举个例子:

SELECT LOCATION as LOCATION,MERGE2(VALUE2) as VALUE2 
FROM transaction 
WHERE REPORTTIME >=1594887720000 AND REPORTTIME <=1594891320000 AND APPID = 'test-api'  AND GROUP2 IN ('DubboService','URL') AND METRICKEY IN ('$$TOTAL') GROUP BY LOCATION

RexCall

<=($1, 1595496539000)

RexInputRef

$1

RexLiteral

1595496539000:BIGINT

下面根据官方资料的描述,总结 Calcite 的三种查询模式:

1)ScannableTable

这种方式基本不会用,原因是查询数据库的时候没有任何条件限制,默认会先把全部数据拉到内存,然后再根据filter条件在内存中过滤。

使用方式:实现 Enumerable scan(DataContext root); ,该函数返回Enumerable对象,通过该对象可以一行行的获取这个Table的全部数据。

2)FilterableTable

初级用法,我们能拿到filter条件,即能再查询底层DB时进行一部分的数据过滤,一般开始介入calcite可以用这种方式(translatable方式学习成本较高)。

使用方式:实现 Enumerable scan(DataContext root, List filters )

如果当前类型的“表”能够支持我们自己写代码优化这个过滤器,那么执行完自定义优化器,可以把该过滤条件从集合中移除,否则,就让calcite来过滤,简言之就是,如果我们不处理 List filters ,Calcite也会根据自己的规则在内存中过滤,无非就是对于查询引擎来说查的数据多了,但如果我们可以写查询引擎支持的过滤器(比如写一些hbase、es的filter),这样在查的时候引擎本身就能先过滤掉多余数据,更加优化。提示,即使走了我们的查询过滤条件,可以再让calcite帮我们过滤一次,比较灵活。

3)TranslatableTable

高阶用法,有些查询用上面的方式都支持不了或支持的不好,比如join、聚合、或对于select的字段筛选等,需要用这种方式来支持,好处是可以支持更全的功能,代价是所有的解析都要自己写,“承上启下”,上面解析sql的各个部件,下面要根据不同的DB(esmysqldrudi..)来写不同的语法查询。

当使用ScannableTable的时候,我们只需要实现函数 Enumerable scan(DataContext root); ,该函数返回Enumerable对象,通过该对象可以一行行的获取这个Table的全部数据(也就意味着每次的查询都是扫描这个表的数据,我们干涉不了任何执行过程);当使用FilterableTable的时候,我们需要实现函数 Enumerable scan(DataContext root, List filters ); 参数中多了filters数组,这个数据包含了针对这个表的过滤条件,这样我们根据过滤条件只返回过滤之后的行,减少上层进行其它运算的数据集;当使用TranslatableTable的时候,我们需要实现 RelNode toRel( RelOptTable.ToRelContext context, RelOptTable relOptTable); ,该函数可以让我们根据上下文自己定义表扫描的物理执行计划,至于为什么不在返回一个Enumerable对象了,因为上面两种其实使用的是默认的执行计划,转换成EnumerableTableAccessRel算子,通过TranslatableTable我们可以实现自定义的算子,以及执行一些其他的rule,Kylin就是使用这个类型的Table实现查询。

由于我对 Calcite 还没有一个更加深入的了解(但是并不阻碍我排查问题,而且 Calcite 这玩意对我来说太复杂了),因此 Calcite 更加复杂的概念我在这里就不继续啰嗦了,比如关系代数的基本知识、查询优化器等等,排查问题并不需要了解那么深入,而且项目中只是使用了 Calcite 一小部分功能。

使用 Calcite 实现一个简单的数据库

需要做如下几步:

  1. 编写 model.json
  2. 自定义 SchemaFactory
  3. 自定义 Schema(像一个“没有存储层的databse”一样,Calcite不会去了解任何文件格式)
  4. 自定义Table
  5. 自定义 Enumerator

demo url: https://github.com/objcoding/calcite-demo

耗时排查

我在项目中使用了 FilterableTable 模式,Cacite 在解析 Sql 时耗时非常大,然后通过调试,我发现每个请求都占据了两个位置:

org.apache.calcite.adapter.enumerable.EnumerableInterpretable#getBindable

bUV7jmz.png!mobile

Cacite 在这个地方通过设置缓存大小来优化缓存设置。

org.apache.calcite.interpreter.JaninoRexCompiler#baz

bMnemqJ.png!mobile

但是不会缓存该位置,并且每次都会使用新的表达式字符串通过反射创建它。

我使用 JProfile 工具对线程耗时的地方进行定位:

eUVZfq3.png!mobile

Calcite 会在这个地方会调用反射根据不同的 Sql 动态生成不同的表达式,Debug 获取的表达式如下:

UfAFbeI.png!mobile

Calcite 为什么会有这种机制呢?我们先从 Bindable 对象讲起:

在 EnumerableRel(RelNode,我们可以通过 TranslatableTable 自定义 FilterRel、JoinRel、AggregateRel)的每个算子的 implement 方法中会将一些算子(Group、join、sort、function)要实现的算法写成 Linq4j 的表达式,然后通过这些 Linq4j 表达式生成 Java Class。通过 JavaRowFormat 格式化)

calcite 会将 sql 生成的 linq4j 表达式生成可执行的 Java 代码( Bindable 类): org.apache.calcite.adapter.enumerable.EnumerableInterpretable#getBindable

Calcite 会调用 Janino 编译器动态编译这个 java 类,并且实例化这个类的一个对象,然后将其封装到 CalciteSignature 对象中。

调用 executorQuery 查询方法并创建 CalciteResultSet 的时候会调用 Bindable 对象的 bind 方法,这个方法返回一个 Eumerable 对象:

org.apache.calcite.avatica.AvaticaResultSet#execute

RZjYzuy.png!mobile

org.apache.calcite.jdbc.CalcitePrepare.CalciteSignature#enumerable

AZ7JVfN.png!mobile

将 Enumerable 赋值给 CalciteResultSet 的 cursor 成员变量。

在执行真正的数据库查询时,获得实际的 CalciteResultSet,最终会调用:

org.apache.calcite.avatica.AvaticaResultSet#next

mYVnUfi.png!mobile

以下是根据 SQL 动态生成的 linq4j 表达式:

public static class Record2_0 implements java.io.Serializable {
  public Object f0;
  public boolean f1;
  public Record2_0() {}
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (!(o instanceof Record2_0)) {
      return false;
    }
    return java.util.Objects.equals(this.f0, ((Record2_0) o).f0) && this.f1 == ((Record2_0) o).f1;
  }

  public int hashCode() {
    int h = 0;
    h = org.apache.calcite.runtime.Utilities.hash(h, this.f0);
    h = org.apache.calcite.runtime.Utilities.hash(h, this.f1);
    return h;
  }

  public int compareTo(Record2_0 that) {
    int c;
    c = org.apache.calcite.runtime.Utilities.compare(this.f1, that.f1);
    if (c != 0) {
      return c;
    }
    return 0;
  }

  public String toString() {
    return "{f0=" + this.f0 + ", f1=" + this.f1 + "}";
  }

}

public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {
  final org.apache.calcite.rel.RelNode v1stashed = (org.apache.calcite.rel.RelNode) root.get("v1stashed");
  final org.apache.calcite.interpreter.Interpreter interpreter = new org.apache.calcite.interpreter.Interpreter(
    root,
    v1stashed);
  java.util.List accumulatorAdders = new java.util.LinkedList();
  accumulatorAdders.add(new org.apache.calcite.linq4j.function.Function2() {
    public Record2_0 apply(Record2_0 acc, Object[] in) {
      final Object inp9_ = in[9];
      if (inp9_ != null) {
        acc.f1 = true;
        acc.f0 = com.zto.zcat.store.api.query.Merge2Fun.add(acc.f0, inp9_);
      }
      return acc;
    }
    public Record2_0 apply(Object acc, Object in) {
      return apply(
        (Record2_0) acc,
        (Object[]) in);
    }
  }
  );
  org.apache.calcite.adapter.enumerable.AggregateLambdaFactory lambdaFactory = new org.apache.calcite.adapter.enumerable.BasicAggregateLambdaFactory(
    new org.apache.calcite.linq4j.function.Function0() {
      public Object apply() {
        Object a0s0;
        boolean a0s1;
        a0s1 = false;
        a0s0 = com.zto.zcat.store.api.query.Merge2Fun.init();
        Record2_0 record0;
        record0 = new Record2_0();
        record0.f0 = a0s0;
        record0.f1 = a0s1;
        return record0;
      }
    }
,
    accumulatorAdders);
  return org.apache.calcite.linq4j.Linq4j.singletonEnumerable(interpreter.aggregate(lambdaFactory.accumulatorInitializer().apply(), lambdaFactory.accumulatorAdder(), lambdaFactory.singleGroupResultSelector(new org.apache.calcite.linq4j.function.Function1() {
      public Object apply(Record2_0 acc) {
        return acc.f1 ? com.zto.zcat.store.api.query.Merge2Fun.result(acc.f0) : (Object) null;
      }
      public Object apply(Object acc) {
        return apply(
          (Record2_0) acc);
      }
    }
    )));
}


public Class getElementType() {
  return java.lang.Object.class;
}

Enumerator是Linq风格的迭代器,它有4个方法:

  1. current
  2. moveNext
  3. reset
  4. close

current 返回游标所指的当前记录,需要注意的是current并不会改变游标的位置,这一点和iterator是不同的,在iterator相对应的是next方法,每一次调用都会将游标移动到下一条记录,current则不会,Enumerator是在调用moveNext方法时才会移动游标。moveNext方法将游标指向下一条记录,并获取当前记录供current方法调用,如果没有下一条记录则返回false。

CsvEnumerator是读取 csv 文件的迭代器,它还得需要一个RowConverter,因为csv中都是String类型,使用RowConverter转化成相应的类型。在moreNext方法中,有Stream和谓词下推filter部分的实现,在本文只关注如下几行代码:

总结执行顺序:

1、executeQuery 方法:

1)根据算子 linq4j 表达式子生成 Bindable 执行对象,如果有设置缓存,则会将对像存储到缓存中;

2)生成 CalciteResultSet 时会调用 Bindable#bind 方法返回一个 Enumerable 对象;

2、getData 方法:调用 ResultSet#next 方法最终会嗲用 Enumerable#moveNext

一图理解 Bindable 在 calcite 中的作用:

NbUFVb.png!mobile

发现 Bindable 缓存会持续增加,说明 Bindable 类内容不一致:

a6Zju2I.png!mobile

也说明了 calcite 会根据不同的 SQL 动态生成 linq4j 表达式。

性能优化

以上排查结果可知,在 Calcite 内容会频繁使用 JaninoRexCompiler 使用反射动态生成表达式,由于项目中的 sql 格式相对固定,因此我们是否可以自定义一个 Compiler,然后替换 JaninoRexCompiler ?

我将使用 JaninoRexCompiler 的相关类复制出来,实现一个自定义的 Interpreter.ScalarCompiler,然后在这个地方 org.apache.calcite.interpreter.Interpreter.CompilerImpl#CompilerImpl 替换 JaninoRexCompiler。

关于自定义 Interpreter.ScalarCompiler 的具体思路过程,我记录在这里了:

https://issues.apache.org/jira/browse/CALCITE-4144

经过反复调试,发现性能提上了 50 倍以上!

bquemy3.png!mobile

再次使用 JProfiler 查看,发现 Calcite 查询过程耗时已经大大降低了。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK