6

Increasing the level of parallelism in DataFusion 4.0

 3 years ago
source link: https://medium.com/@danilheres/increasing-the-level-of-parallelism-in-datafusion-4-0-d2a15b5a2093
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Responses

There are currently no responses for this story.

Be the first to respond.

Increasing the level of parallelism in DataFusion 4.0

DataFusion is a next generation data processing engine written in the Rust programming language. The design of DataFusion based on the in-memory Arrow data format makes DataFusion a scalable and flexible engine that excels both at low-latency, in-memory analytics for interactive Data Science & BI workloads and queries / transformations on data lakes.

With the recently donated Ballista, one of the projects goals is to scale this engine to terabyte and even petabyte workloads.

In DataFusion 4.0 we are bringing some improvements to the level of parallelism you can expect from your queries, with two main features: automatic repartitioning and a partitioned hash join implementation.

Automatic Repartitioning

DataFusion supports parallelism to benefit from the number of cores in a CPU. For achieving parallelism, the engine will split parts of a query that each can run on a different thread to accelerate the query execution.

In DataFusion, currently the main source of parallelism is on a partition level. DataFusion uses the Tokio runtime to schedule the tasks and run them on different threads. DataFusion supports out-of-core processing to minimize the amount of memory needed by streaming and processing the data in small batches (configured by default as 8192 rows).

To benefit from the parallelism you can split your data into multiple partitions by saving them in multiple Parquet files, or loading your data into memory into multiple partitions using MemTable::load.

If you have only a single file or if the number of partitions in a query is reduced by including e.g. a hash aggregate or a ORDER BY, it can also help to repartition intermediate results. This will again split the data into multiple partitions, so it can benefit from running on more cores.
As this is very cumbersome to do manually, DataFusion 4.0 automatically repartitions the data based on the level of concurrency available by introducing a Repartitionoptimization rule. Whenever the plan has a lower amount of concurrency than configured or detected by the number of CPU cores, it will insert a Repartition node. In the benchmarks of TPC-H, we observe speed ups of up to 6x for data that is loaded as a single partition (as measured on a modern 8-core laptop).

As DataFusion uses micro-batches to stream and group data together (vectorized execution), repartitioning in DataFusion is extremely cheap. On a single node the only thing it will do is to change some references to the batches, but it will not copy the data itself!

Partitioned Hash Join a.k.a. Grace Join

DataFusion has a very fast hash join implementation. One of the performance improvements in version 4.0 is the introduction of vectorized hashing, speeding up some queries more than 40%.

Before version 4.0, the default join algorithm was a classic hash join implementation similar to the broadcast join in Spark: it will first collect the entire left side (that means, all files/partitions) of the join into memory, and creates an in-memory index on the columns using a hash table. Finally it processes the right side partitions.

To scale the implementation, DataFusion 4.0 introduces the use of a partitioned hash join. The algorithm will partition both sides of a join based on the hashed key values, and divides them into n partitions using the modulo function. The number of partitions is equal to the configured concurrency, which defaults to the number of CPU cores.

This has two important benefits:

  • It will enable more concurrency, as we can build the hashmap for the left side in multiple threads for each created partition. This can lead to improvements of more than 40% depending on the query.
  • In Ballista, this allows processing bigger data sets, as the data can be split over multiple machines
0*MESTQfGZeCSTY5jV?q=20
increasing-the-level-of-parallelism-in-datafusion-4-0-d2a15b5a2093
illustration of GRACE hash join — source

This algorithm will not always be faster than collecting the left side into one partition, if the left side of the join is very small (and the right side big), it can be faster to not repartition in the left and right side. In DataFusion 4.0, the partitioned hash join will be enabled by default as in many situations it leads to increased performance, but it can be disabled on a global level. In future versions of DataFusion an optimization could be added to choose between the two different options based on statistics.

Other notable DataFusion features in the new release

There were lot’s of other notable features being contributed, too many to list here.

Some highlights include:

  • Predicate pushdown using Parquet statistics
  • Addition of catalog functionality
  • UNION ALL and CTE support

And much more — check out the blog post here

Conclusion

With these additional features, DataFusion runs faster and uses more of the available hardware. The features also are preliminary work for Ballista to perform distributed data processing. With features like mentioned in the article, DataFusion is becoming a really powerful engine capable of speeding up many analytical workloads.

Lot’s of improvements to DataFusion are in the works and on the roadmap, some of which are currently being worked on by the DataFusion community:

  • Faster Parquet reader, see for example this experimental parquet 2 repository https://github.com/jorgecarleitao/parquet2
  • Improvements to hash aggregates, such as using vectorized hashing and using hash partitioning for aggregates with high cardinality
  • More optimization rules, such as those that benefit from the knowledge of the partitioning of the data at different stages in a query
  • Improvements to core Arrow operations, like using from more SIMD instructions that the compiler can produce based on auto-vectorization

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK