37

Presto 系列:Presto 基本介绍

 3 years ago
source link: https://mp.weixin.qq.com/s/owUt0pds0dN52_clm4foCA
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Y7Rr2uA.jpg!mobileCLANNAD

前言

Presto是一款Facebook开源的MPP架构的OLAP查询引擎,可针对不同数据源执行大容量数据集的一款分布式SQL执行引擎。因为工作中接触到Presto,研究它对理解SQL Parser、常见算子的实现(如SQL中table scan,join,aggregation)、资源管理与调度、查询优化(如向量化执行、动态代码生成)、大数据下各个组件为何适用不同场景等等都有帮助。我希望通过这个系列可以了解一条SQL在大数据场景下该如何高效执行。233酱准备不定时持续更新这个系列,本文主要从Presto的使用举例,Presto的应用场景、Presto的基本概念三个部分来初步介绍Presto。

Presto的使用举例

比如说,你想对存储在不同数据源中的数据,如HDFS、Mysql、HBase等通过一个SQL做查询分析,那么只需要把每一个数据源当成是Presto的Connector,对应实现Presto SPI暴露出的Connector API就可以了。

hbase 和 es 的Join查询举例

Presto官方版 Presto社区版 已经支持了很多Connector,社区版略胜一筹。至于两者有何区别,吃瓜群众可以前往文末参考资料[4]。简而言之,都主要由Facebook那帮大佬核心维护。社区版更新更为频繁,但高版本需要JDK11才能支持;官方版JDK8就行,官方版的Star数是社区版的10倍左右,选哪个就一目了然了吧。

Presto的应用场景

Presto是为了处理TB/PB级别的数据查询和分析,它是OLAP(Online Analytical Processing)领域的一个计算引擎。参考资料[1]提到了Presto在Facebook中的使用场景有:

报表和大盘查询

做过报表和大盘的小伙伴应该对这个场景下复杂的SQL有所了解。这个场景下的使用用户是Facebook内部或外部人员,通常要求: 高QPS,低时延(<1s)

Adhoc分析

Ad hoc是拉丁文「for this purpose」的意思,Adhoc query的查询特点是 海量、实时、灵活 。数据量如PB级别以上,时延秒-分钟级别,灵活性举例子如下:

var adhoclQuery = "SELECT * FROM table WHERE id = " + myId;

var sqlQuery = "SELECT * FROM table WHERE id = 1";

adhoclQuery的结果取决于参数“myId”的值,它的结果不能被预计算。sqlQuery的结果每次执行可认为都一样,它的结果可以被预计算。

典型应用场景如:用户趋势分析,产品市场洞察等。主要用户是内部数据分析人员。

批处理

批处理通常是指更大数据量的一个分析,可容忍高时延(小时-天级别)。Presto是为了低时延而设计的,它属于内存型的MPP架构。并不适合类似Spark那样的长时间离线跑批。参考资料[1]的视频中分析了两者架构的区别,Presto跑批的限制。这里我截几张PPT帮助大家理解:
两者的架构区别:

veY3iy.png!mobile

Presto跑批的限制原因:

eYBzeiR.png!mobile

Presto跑批的条件:

qEV7zej.png!mobile

所以他们提供了Presto on Spark方案,这样做的好处是可以统一用户使用的SQL方言差异,UDF差异。

nQJZ3y3.png!mobile

当然,业界除了Facebook还有公司把Presto跑在Spark上来跑批吗?我没有搜到相关信息。

Presto的基本概念

前面主要谈了Presto的使用场景,下面简要从 Presto的架构和基本术语上介绍Presto。

Presto架构

Presto的架构图如下:

uimA3ua.png!mobile

Presto集群包含1个Coordinator节点和1-多个Worker节点。

Coordinator节点 负责接受客户端请求、解析SQL语句、生成并优化分布式逻辑执行计划、将计划中的任务调度到Worker节点上,并跟踪Worker节点和任务的执行状态。

Worker节点 负责任务的执行,接受Coordinator节点的调度。

从中我们可以粗略看出一条SQL在Presto中的执行过程为:

1).Client发送一个SQL语句到Coordinator节点
2).Coordinator节点把请求放到队列中,解析和分析其中的SQL语句;生成并且优化分布式逻辑执行计划。
3).Coordinator节点会把这个Plan分解为任务,由多个Worker分布式执行。

要想了解具体的SQL执行过程,我们得先介绍下Presto的基本概念,也为下篇介绍「Presto为什么是OLAP领域的实时计算引擎」的文章作准备>_<

基本术语

我们很容易知道 statements queries 的意思。作为一个使用者我们也应该熟悉 stages、 splits 这些概念使Presto尽可能高效执行queries;作为一个Presto管理员,应该理解 stages 是如何映射为 tasks 的,包含 drivers 集合 的 task 是如何处理数据的。以下将从一般到具体介绍Presto的基本术语。

Server Types

Presto包含两种类型的服务端节点:coordinators 和 workers。

Coordinator

Presto中的Coordinator节点负责解析SQL语句,生成并优化物理执行计划,管理Presto worker节点。它是Presto运行的“大脑”。它也是客户端提交SQL语句的节点。每个运行的Presto集群包含1个Coordinator节点和1-多个Worker节点。一个服务示例可同时担任这两种节点角色。

Coordinator节点一直跟踪每个Worker节点的状态和协调查询计划的执行。Coordinator生成一个物理执行计划模型,它包含一系列的stages,而stages会转化为一系列的任务跑在workers节点上。

Coordinator通过REST API和workers 、 clients通信。

Worker

Worker节点负责执行tasks,处理数据。它从connectors中捞取数据,并且Worker节点之间可交换中间数据。coordinator节点负责合并workers的结果,并且返回结果给Client。

当一个Worker节点开始工作后,它会把自己注册到coordinator的注册服务上,从而使Coordinator节点可将task调度到自己执行。

Workers和其他Workers、coordinators之间都是通过REST API通信。

Data Sources

诸如connector, catalog, schema, and table这些术语,都是和Presto的模型中:一种特定的数据源有关。

Connector

connector是Presto中的一个数据源,可以是Hive、Mysql、Elasticsearch、HBase等。你可以把connector认为是一种数据库驱动,只要实现Presto   SPI 中暴露的相关接口,就可以接入一种Connector。

Presto自带一些connectors:如 JMX System connector用来获取system tables的, Hive connector, TPCH connector 用来性能测试用的,等等。

每一个catalog和一个特定的connector关联。每一个catalog配置文件中有一个 connector.name 属性,它是被catalog manager用来为一个给定的catalog创建一个connector。一个catalog可以使用相同的connector获取类似数据库的两个实例。

Catalog

Presto catalog包含schemas和通过Connector持有的数据源引用。比如:你可以配置一个ES catalog,就可以通过ES Connector提供从ES中获取数据。

#elasticsearch.properties

connector.name=elasticsearch
elasticsearch.host=es host
elasticsearch.port=9200
elasticsearch.default-schema-name=es

当你在Presto上执行SQL时,你就在运行1-多个catalogs.在Presto上定位一张表,是通过一个catalog的全限定名确定的,如 hive.test_data.test 代表在 hive catalog, test_data schema 下的一张 test table.

Catalogs属性文件是存储在Presto配置目录的,默认是Presto主安装文件下的etc目录下。

Schema

Schemas是一种组织tables的方式。一个catalog和一个catalog定义了一个可被查询的table集合。对于MySQL这种关系型数据库,Presto的schema是和MySQL中的schema相同的概念。对于其他类型的connector,如ES, Presto的schema是用来组织一些表到特定的schema中,从而使底层的数据源能够在Presto层面说得通。

Table

table是一组无序的Row集合,Row是一组有类型的column集合。和关系型数据库中的概念一样,table的映射是由connector中定义的。

Query Execution Model

Presto执行SQL语句,并且转化为执行计划,在由coordinator 和 workers组成的分布式集群上运行。

Statement

Presto执行兼容ANSI标准的SQL。这些SQL statements包含子句,表达式,条件。
Presto把Statement 和 Query区分开是因为:在Presto中,statements是指Client提交上来的SQL语句,如:

SELECT * FROM table WHERE id = 1

query是指Presto执行statement时,生成的一个物理执行计划,并且之后分布式的在一系列workers上执行它。

Query

当Presto解析一个statement时,它会把statement转化为一个query,并且创建一个分布式的执行计划,然后转化为一系列的有关联性的stages运行在Presto workers上。当你在Presto获取一个query的信息时,得到的是每个参与执行的组件的一个当前结果快照。

statement可认为是Client提交上来的SQL语句,query指的是执行一个statement有关的配置和组件实例信息。query围绕着stages, tasks, splits, connectors,其他组件和数据源一起工作,以产生最终结果。

Stage

当Presto执行一个query时,它会把执行分为一个有层次结构关系的stages.比如SQL语句:

7zA3aqe.png!mobile

会先转化为逻辑执行计划:

yyYnM3B.png!mobile

然后会转化为实现这个分布式逻辑执行计划的一个层次结构的stage:

mMJvQf6.png!mobile

这个层次结构的stages可以理解为一个一个树。每个query都有一个root stage负责其他stages的输出结果聚合。stages是coordinator节点用来生成一个分布式查询计划的模型,但是stages它们自己并不跑在Presto workers节点上。

Task

一个stage是由一系列的tasks分布式运行在Presto workers上。

在Presto架构中,task是“work horse”。因为分布式查询计划被分解为一系列stage,然后被转换为task,这些task随后执行或被进一步split。一个task有输入和输出,就像一个stage可以有一系列的tasks并行执行一样,一个task可以由一系列的drivers并行执行。

Split

Split是较大数据集的一个分片。分布式查询计划的最低级别的stage(如上图中的Stage3/Stage4)通过来自connectors得到的splits集合获取输入数据,更高级别的中间Stage(如上图中的Stage2/Stage1)从下一层stage中获取输入数据。

当Presto调度一个query时,coordinator节点会查询连接器的SPI接口获得一个表可用的所有split集合。coordinator跟踪哪些机器正在运行哪些task,以及哪些任务正在处理哪些split。

Driver

Task包含一个或多个并行的driver。driver对数据进行操作,并结合operators产生输出,然后结果由一个task聚合,然后传递到另一个stage的另一个task。driver是一系列operators实例,或者您可以将driver看作内存中的operator的物理集合。它是Presto体系结构中并行的最低级别。一个driver有一个输入和一个输出。

Operator

一个operator消费、转换和生产数据。例如,一个table scan operator从一个connector中获取数据并生产出可由其他operator消费的数据,一个filter operator通过对输入数据应用谓词(过滤条件)并生成一个子集。

Exchange

Exchanges为一个query的不同stage在Presto节点之间传递数据。task使用一个exchange client,生产数据到一个output buffer中,并且消费其他task产生的数据。

参考资料

[1].https://databricks.com/session_na20/presto-on-apache-spark-a-tale-of-two-computation-engines

[2].https://research.fb.com/wp-content/uploads/2019/03/Presto-SQL-on-Everything.pdf

[3].https://prestodb.io/docs/current/overview/concepts.html

[4].https://zhuanlan.zhihu.com/p/55628236


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK