16

使用Calcite解析Sql做维表关联(一)

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzU5MTc1NDUyOA%3D%3D&%3Bmid=2247484197&%3Bidx=1&%3Bsn=aa82a19a007c553a03a682ce981d11ca
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

VJrYVzM.gif

点击箭头处 “蓝色字” ,关注我们哦!!

维表关联是离线计算或者实时计算里面常见的一种处理逻辑,常常用于字段补齐、规则过滤等,一般情况下维表数据放在MySql等数据库里面,对于离线计算直接通过ETL方式加载到Hive表中,然后通过sql方式关联查询即可,但是对于实时计算中Flink、SparkStreaming的表都是抽象的、虚拟的表,那么就没法使用加载方式完成。透过维表服务系列里面讲到的维表关联都是使用编码方式完成,使用Map或者AsyncIO方式完成,但是这种硬编码方式开发效率很低,特别是在实时数仓里面,我们希望能够使用跟离线一样sql方式完成维表关联操作。

在Flink1.9中提供了使用sql化方式完成维表关联,只需要实现LookupableTableSource接口即可,可以实现同步或者异步关联。在1.9之前就需要自己实现sql语法解析,然后在转换为API方式,对上层提供sql语法。看一个sql语句:

select * from orders o join gdsInfo g on o.gdsId=g.gdsId

orders表示流表,gdsInfo 表示维表。根据sql解析顺序先 from 部分、然后where 部分、最后select,那么对于join 方式,相当于join生成了一张临时表,然后去select 这张临时表,因此可以确认

sql解析流程:

1. 识别出流表与维表

3. select 临时表

现在使用calcite解析这条语句

public class ParseDemo {


public static void main(String[] args) {

//假设gdsInfo就是维表

String sql = "select * from orders o join gdsInfo g on o.gdsId=g.gdsId";


SqlParser.Config config = SqlParser.configBuilder().setLex(Lex.MYSQL).build();

SqlParser sqlParser = SqlParser.create(sql, config);

SqlSelect sqlSelect = null;

try {

sqlSelect = (SqlSelect) sqlParser.parseStmt();

} catch (Exception e) {

e.printStackTrace();

}


SqlNode sqlFrom = sqlSelect.getFrom();

boolean isSideJoin = false;

String leftTable = "";

String rightTable = "";

String newName = ""; //临时表

SqlJoin sqlJoin = null;

//解析join

if (sqlFrom.getKind() == SqlKind.JOIN) {

sqlJoin = (SqlJoin) sqlFrom;

SqlNode left = sqlJoin.getLeft();

SqlNode right = sqlJoin.getRight();

isSideJoin = true;

leftTable = paserTableName(left);

rightTable = paserTableName(right);

}

//生成新的select

if (isSideJoin) {

newName = leftTable + "_" + rightTable;

SqlParserPos pos = new SqlParserPos(0, 0);

SqlIdentifier sqlIdentifier = new SqlIdentifier(newName, pos);

sqlSelect.setFrom(sqlIdentifier);

}

}

//解析表

private static String paserTableName(SqlNode tbl) {

if (tbl.getKind() == SqlKind.AS) {

SqlBasicCall sqlBasicCall = (SqlBasicCall) tbl;

return sqlBasicCall.operands[1].toString();

}

return ((SqlIdentifier) tbl).toString();

}

}

那么我们需要的就是生成新的select节点与SqlJoin节点,执行逻辑就是根据SqlJoin节点做维表关联之后生成新的表,然后去select这样新的表。

sql解析部分已经完成,既然使用sql化方式,因此也需要定义源表与维表,数据源一般是kafka, 定义源表需要:表名称、字段名称、字段类型、数据格式、topic;维表假设为mysql,需要定义:表名称、字段类型、字段名称、关联方式(同步/异步)、缓存方式(LRU/全部缓存、无缓存)。

源表定义:

CREATE TABLE orders(

orderId varchar,

gdsId varchar,

orderTime varchar

)WITH(

type = 'kafka',

kafka.bootstrap.servers = 'localhost:9092',

kafka.topic = 'topic1',

kafka.group.id = 'gId1',

sourcedatatype ='json'

);

维表定义:

CREATE TABLE gdsInfo(

gdsId varchar,

gdsName varchar,

price double

)WITH(

type='mysql',

url='jdbc:mysql://localhost:3306/paul',

userName='root',

password='123456',

tableName='gdsInfo',

cache = 'LRU',

isSideTable='true'

);

现在就是要如何解析这些语句,正则表达式是首选,需要解析出表名称、字段、属性三个部分:creat table xxx (xxx) with(xxx);正则表达式可为:

(?i)create\s+table\s+(\S+)\s*\((.+)\)\s*with\s*\((.+)\)

?i表示后面的匹配忽略大小写,\s+ 表示匹配多个空格,\S+表示匹配多个字符,.+ 表示匹配任意字符。

定义一个table类:

class TableInfo{

private String tableName; // 表名称

private Map<String,String> fieldsInfo; //字段名称->类型

private Properties props; //表属性

private boolean isSideTable; //是否为维表

}

public class ParseCreate {


public static final String REG_CREATE="(?i)create\\s+table\\s+(\\S+)\\s*\\((.+)\\)\\s*with\\s*\\((.+)\\)";


public static void main(String[] args) {


String createSql="CREATE TABLE orders(" + " orderId varchar," + " gdsId varchar,"

+ " orderTime varchar" + " )WITH(" + " type = 'kafka',"

+ " kafka.bootstrap.servers = 'localhost:9092'," + " kafka.topic = 'topic1',"

+ " kafka.group.id = 'gId1'," + " sourcedatatype ='json'" + " );";

Pattern pattern=Pattern.compile(REG_CREATE);


TableInfo tableInfo=new TableInfo();

Matcher matcher=pattern.matcher(createSql);

if(matcher.find()){

tableInfo.setTableName(matcher.group(1));

String fieldsStr=matcher.group(2);

String propsStr=matcher.group(3);

tableInfo.setFieldsInfo(parseFiles(fieldsStr));

tableInfo.setProps(parseProps(propsStr));

if(Boolean.valueOf(tableInfo.getProps().getProperty("isSideTable","false"))){

tableInfo.setSideTable(true);

}

}


}


public static Map<String,String> parseFiles(String fieldsStr){

Map<String,String> fieldsInfo=new HashMap<>();

String[] fieldsArray=fieldsStr.split(",");

for(String field: fieldsArray){

String[] fieldInfo=field.trim().split(" ");

fieldsInfo.put(fieldInfo[0],fieldInfo[1]);

}

return fieldsInfo;

}


public static Properties parseProps(String propsStr){

Properties props=new Properties();

String[] propsArray=propsStr.split(",");

for(String prop: propsArray){

String[] propInfo=prop.trim().split("=");

props.setProperty(propInfo[0],propInfo[1]);

}

return props;

}


}

至此完成了简易的create语句解析,下一篇将介绍如何将解析后的create与维表关联转换为可执行代码。

—END—

36VN3az.jpg!web

关注回复 Flink

获取更多系列

原创不易,好看,就点个"在看"


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK