

Flink Table Api & SQL 初体验,Blink的使用
source link: http://www.cnblogs.com/ipoo/p/13168165.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

概述
- Flink具有Table API和SQL-用于统一流和批处理。
- Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接)的查询。
- Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批处理输入(DataSet)还是流输入(DataStream),在两个接口中指定的查询都具有相同的语义并指定相同的结果。
Table API和SQL尚未完成所有功能,正在积极开发中,支持程度需查看 官方文档
使用
多表连接案例
pom依赖
flink 版本为:1.9.3
<dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency>
模拟一个实时流
import lombok.Data; @Data public class Product { public Integer id; public String seasonType; }
自定义Source
import common.Product; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.ArrayList; import java.util.Random; public class ProductStremingSource implements SourceFunction<Product> { private boolean isRunning = true; @Override public void run(SourceContext<Product> ctx) throws Exception { while (isRunning){ // 每一秒钟产生一条数据 Product product = generateProduct(); ctx.collect(product); Thread.sleep(1000); } } private Product generateProduct(){ int i = new Random().nextInt(100); ArrayList<String> list = new ArrayList(); list.add("spring"); list.add("summer"); list.add("autumn"); list.add("winter"); Product product = new Product(); product.setSeasonType(list.get(new Random().nextInt(4))); product.setId(i); return product; } @Override public void cancel() { } }
主程序
public class TableStremingDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); // 使用Blink EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); SingleOutputStreamOperator<Item> source = bsEnv.addSource(new MyStremingSource()) .map(new MapFunction<Item, Item>() { @Override public Item map(Item value) throws Exception { return value; } }); // 分割流 final OutputTag<Item> even = new OutputTag<Item>("even") { }; final OutputTag<Item> old = new OutputTag<Item>("old") { }; SingleOutputStreamOperator<Item> sideOutputData = source.process(new ProcessFunction<Item, Item>() { @Override public void processElement(Item value, Context ctx, Collector<Item> out) throws Exception { if (value.getId() % 2 == 0) { ctx.output(even,value); }else{ ctx.output(old,value); } } }); DataStream<Item> evenStream = sideOutputData.getSideOutput(even); DataStream<Item> oldStream = sideOutputData.getSideOutput(old); // 注册两个 表 : evenTable,oddTable bsTableEnv.registerDataStream("evenTable",evenStream , "name,id"); bsTableEnv.registerDataStream("oddTable", oldStream, "name,id"); // 执行sql 输出Table Table queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name"); queryTable.printSchema();; // 获取流 DataStream<Tuple2<Boolean, Tuple4<Integer, String, Integer, String>>> dataStream = bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint<Tuple4<Integer,String,Integer,String>>(){})); dataStream.print(); bsEnv.execute("demo"); } }
结果打印
输出name相同的元素。
总结
简单的介绍了Flink Table Api & SQL和实现了两表连接的示例。
更多文章: www.ipooli.com
扫码关注公众号《ipoo》

Recommend
-
78
AI 前线导读: 本篇文章主要介绍 Flink的关系型 API,整个文章主要分为下面几个部分来介绍: 一、 什么是 Flink 关系型 API 二、 Flink 关系型 API 的各版本演进 三、 Fli...
-
42
前言 今天朋友圈有篇【阿里技术】发的文章,说Blink的性能如何强悍,功能现在也已经比较完善。譬如: Blink 在 TPC-DS 上和 Spark 相比有着非常明显的性能优势,而且这种性能优势随着数据量的增加而变得越来...
-
79
-
48
1 意义 1.1 分层的 APIs & 抽象层次 Flink提供三层API。 每个API在简洁性和表达性之间提供不同的权衡,并针对不同的用例。
-
27
Table API & SQL介绍 Apache Flink具有两个关系API:表API和SQL,用于统一流和批处理。Table API是Scala和Java的语言集成查询API,查询允许组合关系运算符,例如过滤和连接。Flink SQL支持标准的SQL语法。
-
18
PyFlink: Introducing Python Support for UDFs in Flink's Table API 09 Apr 2020 Jincheng Sun (@sunjincheng121) & Markos Sfikas (@MarkSfi...
-
12
From Streams to Tables and Back Again: An Update on Flink's Table & SQL API 29 Mar 2017 by Timo Walther (@twalthr) Stream processing can deliver a lot of value. Many organi...
-
11
Requirements for API owners:Chromium contributor in good standing, with a commitment to Blink’s mission: To improve the open web through technical innovation and good citizenship
-
8
flink(十):Table&Sql注册表和视图 - 羽落风起 - OSCHINA - 中文开源技术交流社区 本文属于实战,讲解 Flink1.12 版本java代码注册表和视图的实现方法,开发环境搭建,参考上篇文章,这里不再赘述。 官方Flink 1.12中文...
-
11
Apache Flink Table Store 0.1.0 Release Announcement For building dynamic tables for both stream and batch processing in Flink, supporting high speed data ingestion and timely data query. 11 May...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK