34

Data Source V2 聚合下推

 5 years ago
source link: http://www.ibm.com/developerworks/cn/analytics/library/apache-spark-data-source-v2-aggregate-push-down/index.html?ca=drs-&%3Butm_source=tuicool&%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

背景和动机

当 Spark 应用程序对来自不同数据源的分布式数据进行操作时,它们通常必须直接查询 Spark 外部的数据源,比如支持关系数据库或数据仓库。为此,Spark 提供了 Data Source API,这些 API 是一种即插即用机制(pluggable mechanism),用于通过 Spark SQL 访问结构化数据。Data Source API 与 Spark Optimizer 紧密集成。它们提供了诸多优化,比如将过滤器下推到外部数据源和列剪枝(column pruning)。虽然这些优化显著加快了 Spark 查询的执行速度,但根据数据源的不同,它们仅支持将部分功能下推到数据源并执行。我们正在开展一个提供通用数据源下推 API 的项目,作为该项目的一部分,本博客将介绍我们有关聚合下推的工作。我们开放了 Spark jira 22390 来解决此问题,设计文档已在 jira 中。

过滤器下推实现

在 SQL 语句中,过滤器通常用于选择满足给定条件的行。在 Spark 中,可以使用以下实现将过滤器下推到数据源层:

  1. 逻辑计划过滤器包含在 Catalyst Expression 中。
  2. 一个 Catalyst Expression 被转换为 数据源过滤器
  3. 如果该 Catalyst Expression 无法转换为数据源 过滤器 ,或者不受数据源支持,那么将在 Spark 层上处理它。
  4. 否则,会将它下推到数据源层。

聚合下推

SQL 中经常使用聚合函数利用一组输入值来计算单个结果。最常用的聚合函数是 AVG、COUNT、MAX、MIN 和 SUM。如果 SQL 语句中的聚合得到与 Spark 具有相同语义的数据源的支持,那么可以将这些聚合下推到数据源级别,以提升性能。性能提升主要表现在两个领域:

  • Spark 与数据源之间的网络 IO 显著减少。
  • 由于索引的存在,数据源中的聚合计算速度变得更快。

聚合通常与过滤器结合使用。例如:

Select sum(i) From T Where i > 3 Group by j Having sum(i) > 10

下图展示了如何使用 Spark Data Source v2 下推上述过滤器并进行聚合。

nAr2Yvr.png!web

nAr2Yvr.png!web

DSV:数据源视图

AGG:聚合

FilterPD: 过滤器下推

AGGPD:聚合下推

下推聚合和无下推聚合的逻辑/物理计划

让我们看看前一个 SQL 语句对于下推和非下推情况的逻辑和物理计划。在 Spark 中,前面的 SQL 语句可以编写为 df.filter('i > 3).groupBy('j).agg(sum($"i")).where(sum('i) > 10) 在聚合下推前,只有过滤器 i> 3 被下推到数据源。但聚合函数 sum (i) 和聚合过滤器 sum(i) > 10 仍在 Spark 层。下面给出了经过优化的逻辑计划和物理计划:

== Optimized Logical Plan ==
Project [j#1, sum(i)#10L]
+- Filter (isnotnull(sum(cast(i#0 as bigint))#21L) && (sum(cast(i#0 as
      bigint))#21L > 10))
   +- Aggregate [j#1], [j#1, sum(cast(i#0 as bigint)) AS sum(i)#10L,
     sum(cast(i#0 as bigint)) AS sum(cast(i#0 as bigint))#21L]
      +- DataSourceV2Relation (source=AdvancedDataSourceV2,schema=[i#0 int,
       j#1 int],filters=[isnotnull(i#0), (i#0 > 3)]


== Physical Plan ==
*Project [j#1, sum(i)#10L]
+- *Filter (isnotnull(sum(cast(i#0 as bigint))#21L) && (sum(cast(i#0 as
      bigint))#21L > 10))
   +- *HashAggregate(keys=[j#1], functions=[sum(cast(i#0 as bigint))],
     output=[j#1, sum(i)#10L, sum(cast(i#0 as bigint))#21L])
       +- Exchange hashpartitioning(j#1, 5)
      +- *HashAggregate(keys=[j#1], functions=[partial_sum(cast(i#0 as
          bigint))], output=[j#1, sum#24L])
      +- DataSourceV2Scan(source=AdvancedDataSourceV2,
      schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)]

聚合下推后,它具有以下经过优化的逻辑计划和物理计划:

== Optimized Logical Plan ==
DataSourceV2Relation (source=AdvancedDataSourceV2,schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)] aggregates=[sum(cast(i#0 as bigint))], groupby=[j#1], havingClause==[sum(cast(i#0 as bigint))>10], options=Map())

== Physical Plan ==
DataSourceV2Scan(source=AdvancedDataSourceV2,schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)] aggregates=[sum(cast(i#0 as bigint))], groupby=[j#1], havingClause==[sum(cast(i#0 as bigint))>10], options=Map())

使用 TPCDS 1TB 设置的性能结果

在具有大量聚合的工作负载中,此功能的早期原型表现出很大的改进。下面是一些结果:

测试 1(含 group by、完全下推、无 partition)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk

无聚合下推:782.287 秒

聚合下推:250.331 秒

性能提升:约 3 倍的提升

测试 2(无 group by、完全下推、无 partition)

SELECT avg(ss_quantity), avg(ss_ext_sales_price), avg(ss_ext_wholesale_cost), sum(ss_ext_wholesale_cost) FROM store_sales

无聚合下推:2219.104 秒

聚合下推:839.664 秒

性能提升:约 2.6 倍的提升

测试 3(含 group by、完全下推、partition 列与 group by 列相同)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk partition by cs_sold_date_sk

无聚合下推:588.918 秒

聚合下推:296.763 秒

性能提升:约 2 倍的提升

测试 4(含 group by、完全下推、partition 列与 group by 列不同)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk partition by cs_sold_time_sk

无聚合下推:344.509 秒

聚合下推:225.186 秒

性能提升:约 1.5 倍的提升

本文翻译自 : Data Source V2 aggregate push down (2018-10-23)


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK