2

开源分布式查询引擎Presto

 2 years ago
source link: https://www.biaodianfu.com/presto.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Presto是什么?

Presto是Facebook开源的MPP(Massive Parallel Processing)SQL引擎,其理念来源于一个叫Volcano的并行数据库,该数据库提出了一个并行执行SQL的模型,它被设计为用来专门进行高速、实时的数据分析。Presto是一个SQL计算引擎,分离计算层和存储层,其不存储数据,通过Connector SPI实现对各种数据源(Storage)的访问。

Hadoop提供了大数据存储与计算的一整套解决方案,但是它采用的是MapReduce计算框架,只适合离线和批量计算,无法满足快速实时的Ad-Hoc(即席分析)查询计算的性能要求。Hive使用MapReduce作为底层计算框架,是专为批处理设计的。但随着数据越来越多,使用Hive进行一个简单的数据查询可能要花费几分到几小时,显然不能满足交互式查询的需求。与Hive比较:

上图显示了MapReduce与Presto的执行过程的不同点,MR每个操作要么需要写磁盘,要么需要等待前一个stage全部完成才开始执行,而Presto将SQL转换为多个stage,每个stage又由多个tasks执行,每个tasks又将分为多个split。所有的task是并行的方式进行允许,stage之间数据是以pipeline形式流式的执行,数据之间的传输也是通过网络以Memory-to-Memory的形式进行,没有磁盘io操作。这也是Presto性能比Hive快很多倍的决定性原因。

Presto沿用了通用的Master-Slave架构,一个Coordinator,多个Worker。Coordinator负责解析SQL语句,生成执行计划,分发执行任务给Worker节点执行;Worker节点负责实际执行查询任务。Presto提供了一套Connector接口,用于读取元信息和原始数据,Presto 内置有多种数据源,如 Hive、MySQL、Kudu、Kafka 等。同时,Presto 的扩展机制允许自定义 Connector,从而实现对定制数据源的查询。假如配置了Hive Connector,需要配置一个Hive MetaStore服务为Presto提供Hive元信息,Worker节点通过Hive Connector与HDFS交互,读取原始数据。

Presto的优点:

  • Ad-hoc,期望查询时间秒级或几分钟
  • 比Hive快10倍
  • 支持多数据源,如Hive、Kafka、MySQL、MonogoDB、Redis、JMX等,也可自己实现Connector
  • Client Protocol: HTTP+JSON, support various languages(Python, Ruby, PHP, Node.js Java)
  • 支持JDBC/ODBC连接
  • ANSI SQL,支持窗口函数,join,聚合,复杂查询等

Presto的缺点:

  • No fault tolerance;当一个Query分发到多个Worker去执行时,当有一个Worker因为各种原因查询失败,那么Master会感知到,整个Query也就查询失败了,而Presto并没有重试机制,所以需要用户方实现重试机制。
  • Memory Limitations for aggregations, huge joins;比如多表join需要很大的内存,由于Presto是纯内存计算,所以当内存不够时,Presto并不会将结果dump到磁盘上,所以查询也就失败了,但最新版本的Presto已支持写磁盘操作。
  • MPP(Massively Parallel Processing )架构;这个并不能说其是一个缺点,因为MPP架构就是解决大量数据分析而产生的,但是其缺点也很明显,假如我们访问的是Hive数据源,如果其中一台Worke由于load问题,数据处理很慢,那么整个查询都会受到影响,因为上游需要等待上游结果。

Presto的架构

Presto查询引擎是一个Master-Slave的主从架构,Coordinator是主,worker是从。一个presto集群,由一个Coordinator节点,一个Discovery Server节点(通常内嵌于Coordinator节点中),多个Worker节点组成。其中,Coordinator负责接收查询请求、解析SQL语句、生成执行计划、任务调度给Worker节点执行、worker管理;Worker节点是工作节点,负责实际执行查询任务Task。

Worker节点启动后向Discovery Server服务注册;Coordinator从Discovery Server获得可以正常工作的Worker节点。

查询执行过程

整体查询流程为:

  • Client使用HTTP协议发送一个query请求。
  • 通过Discovery Server发现可用的Server。
  • Coordinator构建查询计划(Connector插件提供Metadata)
  • Coordinator向workers发送任务
  • Worker通过Connector插件读取数据
  • Worker在内存里执行任务(Worker是纯内存型计算引擎)
  • Worker将数据返回给Coordinator,之后再Response Client

SQL执行流程:

当Coordinator收到一个Query,其SQL执行流程如上图所示。SQL通过Anltr3解析为AST(抽象语法树),然后通过Connector获取原始数据的Metadata信息,这里会有一些优化,比如缓存Metadata信息等,根据Metadata信息生成逻辑计划,然后会依次生成分发计划和执行计划,在执行计划里需要去Discovery里获取可用的node列表,然后根据一定的策略,将这些计划分发到指定的Worker机器上,Worker机器再分别执行。

Presto包含三类角色,coordinator,discovery,worker。coordinator负责query的解析和调度。discovery负责集群的心跳和角色管理。worker负责执行计算。

  • presto-cli提交的查询,实际上是一个http POST请求。查询请求发送到coordinator后,经过词法解析和语法解析,生成抽象语法树,描述查询的执行。
  • 执行计划编译器,会根据抽象语法树,层层展开,把语法树所表示的结构,转化成由单个操作所组成的树状的执行结构,称为逻辑执行计划。
  • 原始的逻辑执行计划,直接表示用户所期望的操作,未必是性能最优的,在经过一系列性能优化和转写,以及分布式处理后,形成最终的逻辑执行计划。这时的逻辑执行计划,已经包含了map-reduce操作,以及跨机器传输中间计算结果操作。
  • scheduler从数据的meta上获取数据的分布,构造split,配合逻辑执行计划,把对应的执行计划调度到对应的worker上。
  • 在worker上,逻辑执行计划生成物理执行计划,根据逻辑执行计划,会生成执行的字节码,以及operator列表。operator交由执行驱动来完成计算。

抽象语法树

由语法解析器根据SQL,解析生成的树状结构,描述SQL的执行过程。 在下文中,以SQL select avg(response_size) as a , client_address from localfile.logs.http_request_log group by client_address order by a desc limit 10为例来描述。

抽象语法树数以Query为单位来描述查询,分层次表示不同层的子查询。每一层查询查询包含了几个关键因素:select, from,where,group by,having,order by,limit。其中,from可以是一个子查询,也可以是一张表。

一个典型的抽象语法树:

query.png

生成逻辑执行计划

抽象语法树树,描述的最原始的用户需求。抽象语法树描述的信息,执行效率上不是最优,执行操作也过于复杂。需要把抽象语法树转化成执行计划。执行计划分成两类,一类是逻辑执行计划,一类是物理执行计划。逻辑执行计划,以树状结构来描述执行,每个节点是最简单的操作。物理执行计划,根据逻辑执行计划生成字节码,交由驱动执行。

转写成逻辑执行计划的过程,包括转写和优化。把抽象语法树转写成由简单操作组成的结点树,然后把树中所有聚合计算节点转写成map-reduce形式。并且在map-reduce节点中间插入Exchange节点。然后,进行一系列优化,把一些能提前加速计算的节点下推,能合并的节点合并。

最后逻辑执行计划按照Exchange节点做划分,分成不同的段(fragament),表示不同阶段的的执行计划。在调度时,按照fragment调度。

SELECT avg(response_size) as a , client_address
FROM localfile.logs.http_request_log
GROUP BY client_address
ORDER BY a DESC
LIMIT 10
SELECT avg(response_size) as a , client_address 
FROM localfile.logs.http_request_log
GROUP BY client_address
ORDER BY a DESC
LIMIT 10

以上SQL的执行逻辑:

sql-execute.png

从执行计划中可以看到,agg节点都是分成partial和final两步。

调度执行计划到机器上

调度涉及到两个问题,第一,某个fragment分配由哪些机器执行;第二,某个fragment的计算结果如何输出到下游fragment。

在调度时,需要为每一个fragment指定分配到哪些机器上。从调度上划分,fragment分三种类型

  • 一类是source类型由原始数据的存储位置决定fragment调度机器,有多少个source节点呢?connector会根据数据的meta,决定需要读取多少个split(分片) ,对于每一个source节点,分配一个split到一台机器上,如果在配置中指定了network-topology=flat,则尽量选择split所在的机器。
  • 一类是FIXED类型,主要用于纯计算节点,从集群中选择一台或多台机器分配给某个fragment。一般只有最终输出节点分配一个机器,中间的计算结果都要分配多台机器。分配的机器数由配置hash_partition_count决定。选择机器的方式是随机选择。
  • 一类是SINGLE类型,只有一台机器,主要用于汇总结果,随机选择一台机器。

对于计算结果输出,根据下游节点的机器个数,也有多种方式,

  • 如果下游节点有多台机器,例如group by的中间结果,会按照group by的key计算hash,按照hash值选择一个下游机器输出。对于非group by的计算,会随机选择或者round robin。
  • 如果下游节点只有一台机器,会输出到这台机器上。

以下图为例,fragment 2是source类型fragment,有三个split,所以分配了三台机器。因为这一层计算是group by 聚合计算,所以输出时按照group by的key计算hash,选择下游的某台机器输出。

Fragment.png

调度之前的任务,都在coordinator完成,调度完成后,之后任务发送到worker上执行。

生成物理执行计划

逻辑执行计划fragment发送到机器上后,由结点树形式转写成operator list,根据逻辑代码动态编译生成字节码。动态生成字节码,主要是利用编译原理:

  • 根据数据列的类型,直接调用对用的函数,以减少分支跳转语句。

这些手段会更好的利用CPU的流水线。

执行驱动

物理执行计划构造生成的Operator list,交给Driver执行。具体计算哪些数据,由加载的Split决定。

Operator list 以串联形式处理数据,前一个operator的结果作为下一个结果的输入,对于source类型的operator,每一次调用都会获取一份新的数据;对于Aggregate的operator,只有之前所有的operator都finish之后,才能获取输出结果。

stage.png

聚合计算

生成的执行计划中,聚合计算都拆分成了两步,分别是Map、Reduce。

聚合计算的Operator有两类,分别是AggregationOperator和HashAggregationOperator。

AggregationOperator对所有行进行计算,并且把结果更新到一个位置。HashAggregationOperator使用某一列的hash值作为hash表的key,key相同的行才会把结果保存在一起,用于group by类的计算。

聚合计算都是要按照Map-Reduce的形式执行。

聚合计算所提供的函数,都要提供四个接口,分别有两个输入,两个输出:

  • 接受原始数据的输入
  • 接受中间结果的输入
  • 输出中间结果
  • 输出最终结果。

1+3 构成了Map操作 2+4构成了Reduce操作。

以Avg为例:

  • Map阶段输入1,2,3,4
  • Map截断输出10,4 分别代表Sum和Count
  • Reduce输入10,4
  • Reduce输出最终平均值5

我们改造了Presto系统,使得Presto能够提供缓存功能,就是在MapReduce中间加了一层计算,接受中间结果输入和中间结果输出。

函数

函数分为两类,分别是Scaler函数和Aggregate函数

  • Scaler函数提供数据的转换处理,不保存状态,一个输入产生一个输出。
  • Aggregate函数提供数据的聚合处理,利用已有状态+输入,产生新的状态。

Presto采取三层表结构:

  • catalog 对应某一类数据源,例如hive的数据,或mysql的数据
  • schema 对应mysql中的数据库
  • table 对应mysql中的表
table.png

Presto的存储单元包括:

  • Page: 多行数据的集合,包含多个列的数据,内部仅提供逻辑行,实际以列式存储。
  • Block:一列数据,根据不同类型的数据,通常采取不同的编码方式,了解这些编码方式,有助于自己的存储系统对接presto。

不同类型的block:

  • array类型block,应用于固定宽度的类型,例如int,long,double。block由两部分组成
    • boolean valueIsNull[]表示每一行是否有值。
    • T values[] 每一行的具体值。
  • 可变宽度的block,应用于string类数据,由三部分信息组成
    • Slice : 所有行的数据拼接起来的字符串。
    • int offsets[] :每一行数据的起始便宜位置。每一行的长度等于下一行的起始便宜减去当前行的起始便宜。
    • boolean valueIsNull[] 表示某一行是否有值。如果有某一行无值,那么这一行的便宜量等于上一行的偏移量。
  • 固定宽度的string类型的block,所有行的数据拼接成一长串Slice,每一行的长度固定。
  • 字典block:对于某些列,distinct值较少,适合使用字典保存。主要有两部分组成:
    • 字典,可以是任意一种类型的block(甚至可以嵌套一个字典block),block中的每一行按照顺序排序编号。
    • int ids[] 表示每一行数据对应的value在字典中的编号。在查找时,首先找到某一行的id,然后到字典中获取真实的值。

了解了presto的数据模型,就可以给presto编写插件,来对接自己的存储系统。presto提供了一套connector接口,从自定义存储中读取元数据,以及列存储数据。先看connector的基本概念:

  • ConnectorMetadata: 管理表的元数据,表的元数据,partition等信息。在处理请求时,需要获取元信息,以便确认读取的数据的位置。Presto会传入filter条件,以便减少读取的数据的范围。元信息可以从磁盘上读取,也可以缓存在内存中。
  • ConnectorSplit: 一个IO Task处理的数据的集合,是调度的单元。一个split可以对应一个partition,或多个partition。
  • SplitManager : 根据表的meta,构造split。
  • SlsPageSource : 根据split的信息以及要读取的列信息,从磁盘上读取0个或多个page,供计算引擎计算。

插件能够帮助开发者添加这些功能:

  • 对接自己的存储系统。
  • 添加自定义数据类型。
  • 添加自定义处理函数。
  • 自定义权限控制。
  • 自定义资源控制。
  • 添加query事件处理逻辑。

Presto提供了一个简单的connector : local file connector ,可用于参考如何实现自己的connector。不过local file connector中使用的遍历数据的单元是cursor,即一行数据,而不是一个page。 hive 的connector中实现了三种类型,parquet connector, orc connector, rc file connector。

connecter.png

Presto是一款内存计算型的引擎,所以对于内存管理必须做到精细,才能保证query有序、顺利的执行,部分发生饿死、死锁等情况。

内存池

Presto采用逻辑的内存池,来管理不同类型的内存需求。

Presto把整个内存划分成三个内存池,分别是System Pool ,Reserved Pool, General Pool。

pool.jpg
  • System Pool 是用来保留给系统使用的,默认为40%的内存空间留给系统使用。
  • Reserved Pool和General Pool 是用来分配query运行时内存的。
  • 其中大部分的query使用general Pool。 而最大的一个query,使用Reserved Pool, 所以Reserved Pool的空间等同于一个query在一个机器上运行使用的最大空间大小,默认是10%的空间。
  • General则享有除了System Pool和General Pool之外的其他内存空间。

为什么要使用内存池

System Pool用于系统使用的内存,例如机器之间传递数据,在内存中会维护buffer,这部分内存挂载system名下。

那么,为什么需要保留区内存呢?并且保留区内存正好等于query在机器上使用的最大内存?

如果没有Reserved Pool, 那么当query非常多,并且把内存空间几乎快要占完的时候,某一个内存消耗比较大的query开始运行。但是这时候已经没有内存空间可供这个query运行了,这个query一直处于挂起状态,等待可用的内存。 但是其他的小内存query跑完后,又有新的小内存query加进来。由于小内存query占用内存小,很容易找到可用内存。 这种情况下,大内存query就一直挂起直到饿死。

所以为了防止出现这种饿死的情况,必须预留出来一块空间,共大内存query运行。 预留的空间大小等于query允许使用的最大内存。Presto每秒钟,挑出来一个内存占用最大的query,允许它使用reserved pool,避免一直没有可用内存供该query运行。

内存管理

cache.jpg

Presto内存管理,分两部分:

  • query内存管理。query划分成很多task, 每个task会有一个线程循环获取task的状态,包括task所用内存。汇总成query所用内存。如果query的汇总内存超过一定大小,则强制终止该query。
  • 机器内存管理。coordinator有一个线程,定时的轮训每台机器,查看当前的机器内存状态。

当query内存和机器内存汇总之后,coordinator会挑选出一个内存使用最大的query,分配给Reserved Pool。

内存管理是由coordinator来管理的, coordinator每秒钟做一次判断,指定某个query在所有的机器上都能使用reserved 内存。那么问题来了,如果某台机器上,,没有运行该query,那岂不是该机器预留的内存浪费了?为什么不在单台机器上挑出来一个最大的task执行。原因还是死锁,假如query,在其他机器上享有reserved内存,很快执行结束。但是在某一台机器上不是最大的task,一直得不到运行,导致该query无法结束。

参考链接:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK