32

使用Calcite解析Sql做维表关联(二)

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzU5MTc1NDUyOA%3D%3D&%3Bmid=2247484202&%3Bidx=1&%3Bsn=fb5ba179c7b108c30144d2f7045ccffc
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

VJrYVzM.gif

点击箭头处 “蓝色字” ,关注我们哦!!

继上一篇中 使用Calcite解析Sql做维表关联(一) 介绍了建表语句解析方式以及使用calcite解析解析流表join维表方法,这一篇将会介绍如何使用代码去实现将sql变为可执行的代码。

实现流程分析:

  1. 注册表

    根据对create语句解析的结果:表名称、字段信息、表属性,注册成为相应的源表、结果表;

  2. join 拆解

    使用calcite 解析后得到两个部分join部分、insert部分,join部分得到的流表先转换为流,然后根据维表配置的属性(维表来源、查询方式等)选择不同的维表关联策略,得到一个关联之后的流,最后将这个流注册为一张表;对于insert部分就比较简单,insert部分的select的表直接更换为关联之后的流表,然后执行即可。

经过以上分析之后,接下来看下具体的实现。

注册表

注册表包括源表、结果表。实时处理的数据源通常是kafka,针对不同的数据格式需要制定不同的反序列化方式,以json格式为例,如何将kafka的数据反序列化,将流转换为表,通常流的数据类型为Pojo、Tuple、Row等,为了能够通用化选择Row类型;结果表通常是mysql、hbase、es等,需要定义AppendStreamTableSink或者RetractStreamTableSink。

//以json格式为例

public class JsonDeserilization implements DeserializationSchema<Row> {


private Map<String,String> fields; //fieldName->fieldType

private RowTypeInfo rowTypeInfo;

private TypeInformation<?>[] typeInformations;

private String[] fieldNames;

//传入的参数解析create语句得到

public JsonDeserilization(String[] fieldNames,TypeInformation<?>[] typeInformations){

this.fieldNames=fieldNames;

this.typeInformations=typeInformations;

this.rowTypeInfo=new RowTypeInfo(typeInformations,fieldNames);

}


@Override public Row deserialize(byte[] message) throws IOException {

String msg=new String(message);

Row row=new Row(fieldNames.length);

JSONObject jsonObject=JSONObject.parseObject(msg);

for(int i=0;i<fieldNames.length;i++){

if(typeInformations[i].getTypeClass()==String.class){

row.setField(i,jsonObject.getString(fieldNames[i]));

}

if(typeInformations[i].getTypeClass()==Integer.class){

row.setField(i,jsonObject.getInteger(fieldNames[i]));

}

}

return row;

}

......

}

注册表:

//kafka json 类型

public static void registerSourceTable(TableInfo tableInfo,StreamTableEnvironment tblEnv){


Properties props=tableInfo.getProps();

String tableName=tableInfo.getTableName();

StreamExecutionEnvironment env=tblEnv.execEnv();

if("kafka".equals(props.getProperty("type"))) {

Properties kafkaPros = new Properties();

props.forEach((k, v) -> {

if (k.toString().startsWith("kafka.")) {

kafkaPros.setProperty(k.toString().replace("kafka.", ""), v.toString());

}

});

String topic = props.getProperty("kafka.topic");

FlinkKafkaConsumer<Row> consumer011 = new FlinkKafkaConsumer<Row>(topic,

new JsonDeserilization(tableInfo.getFieldNames(), tableInfo.getFieldTypes()), kafkaPros);

DataStream<Row> ds = env.addSource(consumer011);

tblEnv.registerDataStreamInternal(tableName, ds);

}

}


public static void registerSinkTable(TableInfo tableInfo,StreamTableEnvironment tblEnv){

Properties props=tableInfo.getProps();

String tableName=tableInfo.getTableName();

if("console".equals(props.getProperty("type"))){

ConsoleTableSink consoleTableSink=new ConsoleTableSink();

tblEnv.registerTableSink(tableName,tableInfo.getFieldNames(),tableInfo.getFieldTypes(),consoleTableSink);

}

}

ConsoleTableSink 实现了RetractStreamTableSink <Row> ,直接将数据原样输出到控制台。

Join实现

得到解析后的SqlJoin节点,获取源表、维表信息,首先将源表转换为流:

SqlJoin sqlJoin=(SqlJoin)sqlNode1;


String leftTableName=parseTableName(sqlJoin.getLeft()); //表名称

String rightTableName=parseTableName(sqlJoin.getRight());

TableInfo leftTableInfo=tableInfoMap.get(leftTableName);//表信息

TableInfo rightTable=tableInfoMap.get(rightTableName);


String leftAlias=paserAliasTableName(sqlJoin.getLeft()); //别名

String rightAlias=paserAliasTableName(sqlJoin.getRight());


Table leftTable=tblEnv.sqlQuery("select * from " + leftTableName);

DataStream leftStream = tblEnv.toAppendStream(leftTable,Row.class); //转换后的流

接下来将流表与维表进行关联查询,根据维表根据设置的不同属性:同步/异步查询、cache/nocache方式、查询不同的外部存储等,需要实现不同的查询方式。以异步查询mysql为例分析:需要根据维表定义的字段、join的关联条件解析生成一条sql语句,根据流入数据解析出sql的查询条件值,然后查询得到对应的维表值,将流入数据与查询得到的维表数据拼接起来输出到下游:

public class MySqlAsyncFunction extends RichAsyncFunction<Row,Row> {


private Connection connection;

private String sqlTemplate;


private String url;

private String username;

private String password;

private String tableName;


private int idx; //条件值在流入数据的位置

private int inLength; //流入字段数

private int outLength; //输出字段数

private int sideLength; //维表查询字段数


public MySqlAsyncFunction(SqlJoin sqlJoin,TableInfo sideTableInfo,TableInfo leftTableInfo){


Properties props=sideTableInfo.getProps();

this.url=props.getProperty("url");

this.username=props.getProperty("username");

this.password=props.getProperty("password");

this.tableName=props.getProperty("tableName");


String rightField=parseCondition(sqlJoin,false);

genSqlTemplate(sideTableInfo,rightField);



String leftField=parseCondition(sqlJoin,true);

for (int i = leftTableInfo.getFieldNames().length - 1; i >= 0; i--) {

if(leftField.equals(leftTableInfo.getFieldNames()[i])){

this.idx=i;

break;

}

}

inLength=leftTableInfo.getFieldNames().length;

sideLength=sideTableInfo.getFieldNames().length;

outLength=inLength+sideLength;

}


@Override public void open(Configuration parameters) throws Exception {

super.open(parameters);

this.connection= DriverManager.getConnection(url,username,password);

}

//这里还是一个同步查询,没有使用异步方式,需要使用一部mysql客户端

@Override public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {


String v=(String)input.getField(idx); //获取条件值

PreparedStatement preparedStatement=connection.prepareStatement(sqlTemplate);

preparedStatement.setString(1,v);

ResultSet rs=preparedStatement.executeQuery();

boolean isJoin=false;

while (rs.next()){

isJoin=true;

Row row=new Row(outLength);

for(int i=0;i<input.getArity();i++){

row.setField(i,input.getField(i));

}

//直接将维表数据补齐在流数据后面

for(int i=0;i<sideLength;i++){

row.setField(inLength+i,rs.getObject(i+1));

}

resultFuture.complete(Collections.singletonList(row));

}

if(!isJoin) resultFuture.complete(null);

}

//解析on 条件的左右表字段名称, 这里只解析了一个关联条件

private String parseCondition(SqlJoin sqlJoin,boolean isLeft){

SqlNode condition=sqlJoin.getCondition();

SqlBasicCall sqlBasicCall=(SqlBasicCall)condition;

String name=SqlExec.paserAliasTableName(isLeft?sqlJoin.getLeft():sqlJoin.getRight());

SqlIdentifier sqlIdentifier1=(SqlIdentifier)sqlBasicCall.operands[0];

if(name.equals(sqlIdentifier1.names.get(0))){

return sqlIdentifier1.names.get(1);

}


SqlIdentifier sqlIdentifier2=(SqlIdentifier)sqlBasicCall.operands[1];

if(name.equals(sqlIdentifier2.names.get(0))){

return sqlIdentifier2.names.get(1);

}

return null;

}


//查询sql

private void genSqlTemplate(TableInfo tableInfo,String condition){

StringBuilder sql=new StringBuilder();

StringBuilder selects=new StringBuilder();

sql.append("select ");

for(String field : tableInfo.getFieldNames()){

selects.append(field);

selects.append(",");

}

sql.append(selects.substring(0,selects.lastIndexOf(",")));

sql.append(" from ").append(this.tableName);

if(condition!=null)

sql.append(" where ").append(condition).append("=?");

this.sqlTemplate=sql.toString();

}

}

到目前为止获取了join之后的结果数据,但是有一点需要考虑,流表定义字段名称与维表定义的字段名称可能会相同,那么在将流转换为表时就存在相同的字段,因此需要对相同的字段重命名:

TableInfo leftTableInfo=tableInfoMap.get(leftTableName);

TableInfo rightTable=tableInfoMap.get(rightTableName);


List<String> newFields=new ArrayList<>(); //join之后流的字段名称

List<TypeInformation> newTypes=new ArrayList<>(); //join之后流的数据类型


//需要做字段解析 原始表名-原始字段名称-新字段名称

HashBasedTable hashBasedTable=HashBasedTable.create();

int i=0;

for(String field:leftTable.getSchema().getFieldNames()){

hashBasedTable.put(leftAlias,field,field);

newFields.add(field);

newTypes.add(leftTable.getSchema().getFieldType(i).get());

i++;

}

i=0;

for(String field:rightTable.getFieldNames()){

String newField=field;

if(hashBasedTable.containsColumn(field)){

newField=field+"0";

}

hashBasedTable.put(rightAlias,field,newField);

newFields.add(newField);

newTypes.add(rightTable.getFieldTypes()[i]);

i++;

}


String newTableNameAlias=leftAlias+"_"+rightAlias;

String newTableName=leftTableName+"_"+rightTableName;

hashBasedTableMap.put(newTableNameAlias,hashBasedTable);

//outType 表示关联之后的流数据类型

RowTypeInfo outType=new RowTypeInfo(newTypes.toArray(new TypeInformation[]{}),newFields.toArray(new String[]{}));

DataStream dsOut=AsyncDataStream.unorderedWait(leftStream,new MySqlAsyncFunction(sqlJoin,rightTable,leftTableInfo),10,

TimeUnit.SECONDS);

dsOut.getTransformation().setOutputType(outType);

tblEnv.registerDataStream(newTableName,dsOut); //将join之后的流注册成为表

同样也需要对insert部分的select字段根据上面得到新的字段名称hashBasedTable进行替换:

SqlInsert sqlInsert=(SqlInsert)sqlNode1;

SqlSelect source=(SqlSelect)sqlInsert.getSource();


SqlBasicCall sqlBasicCall=(SqlBasicCall) source.getFrom();

String newAlias=sqlBasicCall.operands[1].toString(); //新表的别名

HashBasedTable hashBasedTable=hashBasedTableMap.get(newAlias);


SqlNodeList sqlNodeList=source.getSelectList();

int i=0;

for(SqlNode x: sqlNodeList.getList()){

SqlIdentifier sqlIdentifier=(SqlIdentifier)x;

String tableAlias=sqlIdentifier.names.get(0);

String field=sqlIdentifier.names.get(1); String newFieldName=hashBasedTable.get(tableAlias,field).toString();

sqlIdentifier=sqlIdentifier.setName(0,newAlias);//替换为新的表别名称

sqlIdentifier=sqlIdentifier.setName(1,newFieldName);//替换为新的字段名称

sqlNodeList.set(i,sqlIdentifier);

i++;

}


tblEnv.sqlUpdate(sqlInsert.toString()); //执行insert 语句

总结

以上提供了流表join维表的sql实现思路以及部分demo代码的参考,但是其远远达不到工程上的要求,在实际使用中需要要考虑更多的因素:复杂嵌套的sql、时间语义支持、自定义函数支持等。推荐一个开源项flinkStreamSql, 地址为: https://github.com/DTStack/flinkStreamSQL , 丰富的语义支持、不同类型的源插件支持等。

—END—

36VN3az.jpg!web

关注回复 Flink

获取更多系列

原创不易,好看,就点个"在看"


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK