42

calcite简单入门

 5 years ago
source link: https://www.tuicool.com/articles/j6ZVNfn
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

官网: http://calcite.apache.org/

Apache Calcite是一款开源的动态数据管理框架,它提供了标准的 SQL 语言、多种查询优化和连接各种数据源的能力,但不包括数据存储、处理数据的算法和存储元数据的存储库。

Calcite 之前的名称叫做optiq,optiq 起初在 Hive 项目中,为 Hive 提供基于成本模型的优化,即CBO(Cost Based Optimizatio)。2014 年 5 月 optiq 独立出来,成为 Apache 社区的孵化项目,2014 年 9 月正式更名为 Calcite。

Calcite 的目标是“one size fits all(一种方案适应所有需求场景)”,希望能为不同计算平台和数据源提供统一的查询引擎。

2 架构与解析步骤

一般来说Calcite解析SQL有以下几步:

  • Parser. 此步中Calcite通过Java CC将SQL解析成未经校验的AST
  • Validate. 该步骤主要作用是校证Parser步骤中的AST是否合法,如验证SQL scheme、字段、函数等是否存在; SQL语句是否合法等. 此步完成之后就生成了RelNode树(关于RelNode树, 请参考下文)
  • Optimize. 该步骤主要的作用优化RelNode树, 并将其转化成物理执行计划。主要涉及SQL规则优化如:基于规则优化(RBO)及基于代价(CBO)优化; Optimze 这一步原则上来说是可选的, 通过Validate后的RelNode树已经可以直接转化物理执行计划,但现代的SQL解析器基本上都包括有这一步,目的是优化SQL执行计划。此步得到的结果为物理执行计划。
  • Execute,即执行阶段。此阶段主要做的是:将物理执行计划转化成可在特定的平台执行的程序。如Hive与Flink都在在此阶段将物理执行计划CodeGen生成相应的可执行代码。

2.1 查询优化

INSERT INTO tmp_node
SELECT s1.id1, s1.id2, s2.val1
FROM source1 as s1 INNER JOIN source2 AS s2
ON s1.id1 = s2.id1 and s1.id2 = s2.id2 where s1.val1 > 5 and s2.val2 = 3;

2.2 Parser解析

LogicalTableModify(table=[[TMP_NODE]], operation=[INSERT], flattened=[false])
  LogicalProject(ID1=[$0], ID2=[$1], VAL1=[$7])
    LogicalFilter(condition=[AND(>($2, 5), =($8, 3))])
      LogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[INNER])
        LogicalTableScan(table=[[SOURCE1]])
        LogicalTableScan(table=[[SOURCE2]])

2.3 Optimize优化

谓词下推,投影下推,关系代数定律优化

LogicalTableModify(table=[[TMP_NODE]], operation=[INSERT], flattened=[false])
  LogicalProject(ID1=[$0], ID2=[$1], VAL1=[$7])
      LogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[inner])
        LogicalFilter(condition=[=($4, 3)])  
          LogicalProject(ID1=[$0], ID2=[$1],      ID3=[$2], VAL1=[$3], VAL2=[$4],VAL3=[$5])
            LogicalTableScan(table=[[SOURCE1]])
        LogicalFilter(condition=[>($3,5)])    
          LogicalProject(ID1=[$0], ID2=[$1], ID3=[$2], VAL1=[$3], VAL2=[$4],VAL3=[$5])
            LogicalTableScan(table=[[SOURCE2]])

3 LogicalTableScan查询

如上,节点树中的最后节点均为 LogicalTableScan ,假设我们不参与(LogicalTableScan)Calcite的查询过程,即不做SQL解析,不做优化,只要把它接入进来,实际Calcite是可以工作的,无非就是可能会有扫全表、数据全部加载到内存里等问题,所以实际中我们可能会参与全部(Translatable)或部分工作(FilterableTable),覆盖Calcite的一些执行计划或过滤条件,让它能更高效的工作。

值得一提的是,Calcite支持异构数据源查询,比如数据存在es和mysql,可以通过写sql join之类的操作,让calcite分别先从不同的数据源查询数据,然后再在内存里进行合并计算;另外,它本身提供了许多优化规则,也支持我们自定义优化规则,来优化整个查询。

3.1 ScannableTable

a simple implementation of Table, using the ScannableTable interface, that enumerates all rows directly

这种方式基本不会用,原因是查询数据库的时候没有任何条件限制,默认会先把全部数据拉到内存,然后再根据filter条件在内存中过滤。

使用方式:实现 Enumerable scan(DataContext root); ,该函数返回Enumerable对象,通过该对象可以一行行的获取这个Table的全部数据。

3.2 FilterableTable

a more advanced implementation that implements FilterableTable, and can filter out rows according to simple predicates

初级用法,我们能拿到filter条件,即能再查询底层DB时进行一部分的数据过滤,一般开始介入calcite可以用这种方式(translatable方式学习成本较高)。

使用方式:实现 Enumerable scan(DataContext root, List filters )

如果当前类型的“表”能够支持我们自己写代码优化这个过滤器,那么执行完自定义优化器,可以把该过滤条件从集合中移除,否则,就让calcite来过滤,简言之就是,如果我们不处理 List filters ,Calcite也会根据自己的规则在内存中过滤,无非就是对于查询引擎来说查的数据多了,但如果我们可以写查询引擎支持的过滤器(比如写一些hbase、es的filter),这样在查的时候引擎本身就能先过滤掉多余数据,更加优化。提示,即使走了我们的查询过滤条件,可以再让calcite帮我们过滤一次,比较灵活。

3.3 TranslatableTable

advanced implementation of Table, using TranslatableTable, that translates to relational operators using planner rules.

高阶用法,有些查询用上面的方式都支持不了或支持的不好,比如join、聚合、或对于select的字段筛选等,需要用这种方式来支持,好处是可以支持更全的功能,代价是所有的解析都要自己写,“承上启下”,上面解析sql的各个部件,下面要根据不同的DB(es\mysql\drudi..)来写不同的语法查询。

当使用ScannableTable的时候,我们只需要实现函数 Enumerable scan(DataContext root); ,该函数返回Enumerable对象,通过该对象可以一行行的获取这个Table的全部数据(也就意味着每次的查询都是扫描这个表的数据,我们干涉不了任何执行过程);当使用FilterableTable的时候,我们需要实现函数 Enumerable scan(DataContext root, List filters ); 参数中多了filters数组,这个数据包含了针对这个表的过滤条件,这样我们根据过滤条件只返回过滤之后的行,减少上层进行其它运算的数据集;当使用TranslatableTable的时候,我们需要实现 RelNode toRel( RelOptTable.ToRelContext context, RelOptTable relOptTable); ,该函数可以让我们根据上下文自己定义表扫描的物理执行计划,至于为什么不在返回一个Enumerable对象了,因为上面两种其实使用的是默认的执行计划,转换成EnumerableTableAccessRel算子,通过TranslatableTable我们可以实现自定义的算子,以及执行一些其他的rule,Kylin就是使用这个类型的Table实现查询。

4 自定义数据源表接入demo

如果你的数据源不在官方的支持列表中,或者官方的支持不能满足你的需求,那么则需要自己实现源接入。

4.1 准备工作

4.1.1 maven引入

<!--calcite核心包-->
<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <version>1.19.0</version>
</dependency>
<!--项目用-->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.54</version>
</dependency>
<!--项目用-->
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>16.0.1</version>
</dependency>

4.1.2 开发流程

calcite中,引入一个数据库通常是通过注册一个 SchemaFactory 接口实现类来实现。 SchemaFactory 中只有一个方法,就是生成 SchemaSchema 最重要的功能是获取所有 TableTable 有两个功能,一个是获取所有字段的类型,另一个是得到 Enumerable 迭代器用来读取数据。

4.1.3 配置信息

如果将你的数据源引入calcite,一般情况下是使用一个配置文件,以下是配置文件的demo。

{
  "version": "1.0",
  "defaultSchema": "TEST",
  "schemas": [
    {
      "name": "TEST",
      "type": "custom",
      "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
      "operand": {
        "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8",
        "jdbcDriver":"com.mysql.cj.jdbc.Driver",
        "jdbcUser":"test",
        "jdbcPassword":"test"
      }
    }
  ]
}

4.2 CSV表demo

这里我们先生成一个CSV文件,后边的操作就是通过在calcite中调用SQL访问CSV中的数据。

TEST01.csv

ID:VARCHAR,NAME1:VARCHAR,NAME2:VARCHAR
0,first,second
1,hello,world

CsvSchemaFactory类

package com.calcite.csv;

import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;

import java.util.Map;

public class CsvSchemaFactory implements SchemaFactory {

    /**
     * parentSchema 他的父节点,一般为root
     * name     数据库的名字,它在model中定义的
     * operand  也是在mode中定义的,是Map类型,用于传入自定义参数。
     * */
    @Override
    public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
        return new CsvSchema(String.valueOf(operand.get("dataFile")));
    }
}

CsvSchema类

package com.calcite.csv;

import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.util.Source;
import org.apache.calcite.util.Sources;

import java.net.URL;
import java.util.Map;

public class CsvSchema extends AbstractSchema {
    private Map<String, Table> tableMap;
    private String dataFile;

    public CsvSchema(String dataFile) {
        this.dataFile = dataFile;
    }

    @Override
    protected Map<String, Table> getTableMap() {
        URL url = Resources.getResource(dataFile);
        Source source = Sources.of(url);
        if (tableMap == null) {
            final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
            builder.put(this.dataFile.split("\\.")[0],new CsvTable(source));    
            // 一个数据库有多个表名,这里初始化,大小写要注意了,TEST01是表名。
            tableMap = builder.build();
        }
        return tableMap;
    }
}

CsvTable类

package com.calcite.csv;

import com.google.common.collect.Lists;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Source;

import java.io.*;
import java.util.List;

public class CsvTable extends AbstractTable implements ScannableTable {
    private Source source;

    public CsvTable(Source source) {
        this.source = source;
    }

	/**
	 * 获取字段类型
     */
    @Override
    public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
        JavaTypeFactory typeFactory = (JavaTypeFactory)relDataTypeFactory;

        List<String> names = Lists.newLinkedList();
        List<RelDataType> types = Lists.newLinkedList();

        try {
            BufferedReader reader = new BufferedReader(new FileReader(source.file()));
            String line = reader.readLine();
            List<String> lines = Lists.newArrayList(line.split(","));
            lines.forEach(column -> {
                String name = column.split(":")[0];
                String type = column.split(":")[1];
                names.add(name);
                types.add(typeFactory.createSqlType(SqlTypeName.get(type)));
            });

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return typeFactory.createStructType(Pair.zip(names, types));
    }

    @Override
    public Enumerable<Object[]> scan(DataContext dataContext) {
        return new AbstractEnumerable<Object[]>() {
            @Override
            public Enumerator<Object[]> enumerator() {
                return new CsvEnumerator<>(source);
            }
        };
    }
}

CsvEnumerator类

package com.calcite.csv;

import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.util.Source;

import java.io.BufferedReader;
import java.io.IOException;

public class CsvEnumerator <E> implements Enumerator<E> {

    private E current;

    private BufferedReader br;

    public CsvEnumerator(Source source) {
        try {
            this.br = new BufferedReader(source.reader());
            this.br.readLine();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    @Override
    public E current() {
        return current;
    }

    @Override
    public boolean moveNext() {
        try {
            String line = br.readLine();
            if(line == null){
                return false;
            }

            current = (E)line.split(",");    // 如果是多列,这里要多个值
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 出现异常走这里
     * */
    @Override
    public void reset() {
        System.out.println("报错了兄弟,不支持此操作");
    }

    /**
     * InputStream流在这里关闭
     * */
    @Override
    public void close() {

    }
}

model.json

{
  "version": "1.0",
  "defaultSchema": "TEST_CSV",
  "schemas": [
    {
      "name": "TEST_CSV",
      "type": "custom",
      "factory": "com.calcite.csv.CsvSchemaFactory",
      "operand": {
        "dataFile": "TEST01.csv"
      }
    }
  ]
}

Main方法调用

package com.calcite;

import com.alibaba.fastjson.JSON;
import com.calcite.util.ReourceUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.sql.*;
import java.util.List;
import java.util.Map;


public class Client {
    /**
     * 测试的时候用字符串 defaultSchema 默认数据库 name 数据库名称 type custom factory
     * 请求接收类,该类会实例化Schema也就是数据库类,Schema会实例化Table实现类,Table会实例化数据类。
     * operand 动态参数,ScheamFactory的create方法会接收到这里的数据
     */
    public static void main(String[] args) {
        try {

            // 用文件的方式
            //URL url = Client.class.getResource("/model.json");
            //String str = URLDecoder.decode(url.toString(), "UTF-8");
            //Properties info = new Properties();
            //info.put("model", str.replace("file:", ""));
            //Connection connection = DriverManager.getConnection("jdbc:calcite:", info);

            // 字符串方式
            String model = ReourceUtil.getResourceAsString("model.json");
            Connection connection = DriverManager.getConnection("jdbc:calcite:model=inline:" + model);

            Statement statement = connection.createStatement();

            test1(statement);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    
    /**
     * CSV文件读取
     * @param statement
     * @throws Exception
     */
    public static void test1(Statement statement) throws Exception {
        ResultSet resultSet = statement.executeQuery("select * from test_csv.TEST01");
        System.out.println(JSON.toJSONString(getData(resultSet)));
    }
    
    
    
    public static List<Map<String,Object>> getData(ResultSet resultSet)throws Exception{
        List<Map<String,Object>> list = Lists.newArrayList();
        ResultSetMetaData metaData = resultSet.getMetaData();
        int columnSize = metaData.getColumnCount();

        while (resultSet.next()) {
            Map<String, Object> map = Maps.newLinkedHashMap();
            for (int i = 1; i < columnSize + 1; i++) {
                map.put(metaData.getColumnLabel(i), resultSet.getObject(i));
            }
            list.add(map);
        }
        return list;
    }
}

4.3 内存数据源与CSV数据源关联查询demo

在4.2的演示中,我们能够使用SQL查询CSV文件中的数据。接下来,我们再定义一种内存数据源,主要作用是演示两种数据源间的关联查询。

MemSchemaFactory类

package com.calcite.memory;

import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;

import java.util.Map;


public class MemSchemaFactory implements SchemaFactory {
    @Override
    public Schema create(SchemaPlus schemaPlus, String s, Map<String, Object> map) {
        return new MemSchema(map);
    }
}

MemSchema类

package com.calcite.memory;

import com.google.common.collect.ImmutableMap;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;

import java.util.Map;

public class MemSchema extends AbstractSchema {
    private Map<String, Object> map;
    private Map<String, Table> tableMap;

    public MemSchema(Map<String, Object> map) {
        this.map = map;
    }

    @Override
    protected Map<String, Table> getTableMap() {
        if (tableMap == null) {
            final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
            map.forEach((key, value) -> {
                builder.put(key, new MemTable(value));
            });
            tableMap = builder.build();
        }
        return tableMap;
    }
}

MemTable类

package com.calcite.memory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.google.common.collect.Lists;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.List;
import java.util.Map;

public class MemTable extends AbstractTable implements ScannableTable {

    private List<Map<String, Object>> list = Lists.newLinkedList();

    public MemTable(Object list) {
        if (list instanceof List) {
            ((List)list).forEach(o -> {
                this.list.add(
                    JSON.parseObject(JSON.toJSONString(o),
                        new TypeReference<Map<String, Object>>() {},
                        Feature.OrderedField));
            });
        }
    }

    @Override
    public Enumerable<Object[]> scan(DataContext dataContext) {
        return new AbstractEnumerable<Object[]>() {
            @Override
            public Enumerator<Object[]> enumerator() {
                return new MemEnumerator<Object[]>(list);
            }
        };
    }

    @Override
    public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
        JavaTypeFactory typeFactory = (JavaTypeFactory)relDataTypeFactory;

        List<String> names = Lists.newLinkedList();
        List<RelDataType> types = Lists.newLinkedList();

        if (list.size() != 0) {
            list.get(0).forEach((key, value) -> {
                names.add(key);
                types.add(typeFactory.createSqlType(SqlTypeName.get("VARCHAR")));
            });
        }
        return typeFactory.createStructType(Pair.zip(names, types));
    }
}

MemEnumerator类

package com.calcite.memory;

import com.google.common.collect.Lists;
import org.apache.calcite.linq4j.Enumerator;

import java.util.List;
import java.util.Map;

public class MemEnumerator<E> implements Enumerator<E> {

    private List<Map<String, Object>> list = Lists.newLinkedList();
    private int index = -1;
    private E e;

    public MemEnumerator(List<Map<String, Object>> list) {
        this.list = list;
    }

    @Override
    public E current() {
       return e;
    }

    @Override
    public boolean moveNext() {
        if (index+1 >= list.size()){
            return false;
        }else {
            e = (E)list.get(index+1).values().toArray();
            index++;
            return true;
        }
    }

    @Override
    public void reset() {
        index = -1;
        e = null;
    }

    @Override
    public void close() {

    }
}

model.json

{
  "version": "1.0",
  "defaultSchema": "TEST_CSV",
  "schemas": [
    {
      "name": "TEST_CSV",
      "type": "custom",
      "factory": "com.calcite.csv.CsvSchemaFactory",
      "operand": {
        "dataFile": "TEST01.csv"
      }
    },
    {
      "name": "TEST_MEM",
      "type": "custom",
      "factory": "com.calcite.memory.MemSchemaFactory",
      "operand": {
        "MEM_TABLE_1": [
          {
            "ID": 0,
            "MEM_STR": "str0"
          },
          {
            "ID": 1,
            "MEM_STR": "str1"
          },
          {
            "ID": 2,
            "MEM_STR": "str2"
          }
        ]
      }
    }
  ]
}

Main方法调用

package com.calcite;

import com.alibaba.fastjson.JSON;
import com.calcite.util.ReourceUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.sql.*;
import java.util.List;
import java.util.Map;


public class Client {
    /**
     * 测试的时候用字符串 defaultSchema 默认数据库 name 数据库名称 type custom factory
     * 请求接收类,该类会实例化Schema也就是数据库类,Schema会实例化Table实现类,Table会实例化数据类。
     * operand 动态参数,ScheamFactory的create方法会接收到这里的数据
     */
    public static void main(String[] args) {
        try {

            // 用文件的方式
            //URL url = Client.class.getResource("/model.json");
            //String str = URLDecoder.decode(url.toString(), "UTF-8");
            //Properties info = new Properties();
            //info.put("model", str.replace("file:", ""));
            //Connection connection = DriverManager.getConnection("jdbc:calcite:", info);

            // 字符串方式
            String model = ReourceUtil.getResourceAsString("model.json");
            Connection connection = DriverManager.getConnection("jdbc:calcite:model=inline:" + model);

            Statement statement = connection.createStatement();

            test2(statement);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * CSV文件读取
     * @param statement
     * @throws Exception
     */
    public static void test1(Statement statement) throws Exception {
        ResultSet resultSet = statement.executeQuery("select * from test_csv.TEST01");
        System.out.println(JSON.toJSONString(getData(resultSet)));
    }

    /**
     * CSV文件与内存文件关联读取
     * @param statement
     * @throws Exception
     */
    public static void test2(Statement statement) throws Exception {
        ResultSet resultSet1 = statement.executeQuery("select csv1.id as cid,csv1.name1 as cname ,mem1.id as mid,mem1.mem_str as mstr from test_csv.TEST01 as csv1 left join test_mem.mem_table_1 as mem1 on csv1.id = mem1.id");
        System.out.println(JSON.toJSONString(getData(resultSet1)));
    }

    public static List<Map<String,Object>> getData(ResultSet resultSet)throws Exception{
        List<Map<String,Object>> list = Lists.newArrayList();
        ResultSetMetaData metaData = resultSet.getMetaData();
        int columnSize = metaData.getColumnCount();

        while (resultSet.next()) {
            Map<String, Object> map = Maps.newLinkedHashMap();
            for (int i = 1; i < columnSize + 1; i++) {

                map.put(metaData.getColumnLabel(i), resultSet.getObject(i));
            }
            list.add(map);
        }
        return list;
    }
}

小结

calcite对于没有 高并发低延时 的多数据源间数据有着天然的优势。但需要注意的是,如果一个表中数据量特别大,大到读取速度很慢或内存无法容纳,那么务必在操作该表数据时加入尽可能多的筛选条件,如果自定义实现 LogicalTableScan ,最好也是实现 FilterableTable ,从而减少calcite在内存中操作数据行的量。

参考:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK