6

Rust Dataframe: Update 1

 4 years ago
source link: https://github.com/nevi-me/rust-dataframe/blob/master/notes/update-01__04-04-2020.md
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Rust Dataframe Updates

Update 01: 04-04-2020

I started working on an experimental Rust Dataframe in January 2019, but due to work commitments, I've been unable to work on it regularly. The project is still in its infancy, and is not abandoned. I'm writing this update mostly for people who might be interested in helping out, or following along as I go along with the experiment.

I haven't had a chance to post updates on the experiment, but now that we're home-bound due to the lockdown in South Africa, this is a good chance to do this.

Overall Goal

In the past 3 years, most of my work has involved me bringing my own infra, which is often my laptop. I use Apache Spark (PySpark) and Pandas regularly, and I also write Rust a lot. I started thinking about how an undistributed dataframe library could look like in Rust, mainly to be my daily driver for my kind of work.

My current goals are to create a dataframe library that:

  • can work on my laptop, and be as fast as Spark or Dask
  • I can use in Rust and in Python (and other languages) for exploratory purposes

Design

Memory Format

The dataframe is built on top of Apache Arrow , which is an in-memory columnar data format. There are ample resources online that explain what Arrow is and does, so allow me not to digress.

Arrow is optimised to avoid common performance pitfalls like data (de)serialisation, and data is laid out in a compute-optimised structure. Using Arrow also makes it easier for us to interoperate with other libraries in future (e.g. converting to/from Pandas dataframes, writing UDFs in Python/JavaScript, etc).

An Arrow equivalent of a table is the RecordBatch . A RecordBatch has columns made up of equal-length Array s, and a schema describing those columns.

#[derive(Clone)]
pub struct RecordBatch {
    schema: Arc<Schema>,
    columns: Vec<Arc<Array>>,
}

In turn, our Dataframe is a collection of RecordBatches, though broken down into Column s which contain chunks of arrays. I have adapted this from the Arrow C++ library as this structure makes it easier to operate on a single column across all record batches without needing to iterate on the batches of the dataframe.

IO Capabilities

Arrow has decent IO support to get one started, and the Rust implementation supports:

Data Source Read Write csv yes yes json yes (limited) no parquet yes no (in progress) Arrow IPC* yes yes

*Arrow IPC (Inter-Process Communication) is a streaming and file format that allows different Arrow implementations to share data with each other.

When more IO formats/connections are supported, we get to benefit from them. I've also spent a bit of time working on SQL support (with PostgreSQL as a start). I recently completed a PostgreSQL binary reader, and I plan on benchmarking it against a row-based approach. If it's performant enough, I'd like to split it out into its own crate/library, and add more SQL variants.

I recently needed to extract data from MongoDB to CSV, so I wrote a stand-alone MongoDB connector . The connector uses the aggregation framework to pull batches of documents, which it then converts to Arrow data. The Arrow data can then be converted to other formats which Arrow supports, such as Parquet or CSV. I haven't added schema inference and write support, but they're on my TODO list. If I end up not using it on the dataframe, other people could still benefit from such a library.

Compute

The Rust implementation has some basic compute support, and there's also DataFusion which is a distributed SQL engine built on top of Arrow. In our dataframe, we will leverage Arrow's compute capabilities as much as possible, which means contributing compute functions (kernels) upstream on Arrow if the greater community would benefit from them.

Arrow currently supports a lot of the primitive building blocks, such as:

  • some aggregations
  • primitive arithmetic (add, mul, div, sub)
  • boolean comparisons (and, or, not)
  • casting between compatible data types
  • comparisons (gt, lt, eq, neq, etc.)
  • filtering, limits (slicing)
  • sorting (I'm still working on this)

These building blocks allow one to perform a lot of compute, when stitched together. We still have a long way to go though, as we don't yet support transformations like joins and some complex aggregates (DataFusion has aggregate support, here I mean in Arrow itself).

My goals for compute in the dataframe are to support most of what Apache Spark supports. It might not be all functions, but I want to cover the various categories (scalar, array functions, aggregations and window functions). I haven't thought of how user-defined fuctions could work, but Andy Grove recently submitted a pull request for UDF support in DataFusion, and there is ongoing discussion of whether this could live in Arrow.

Lazy vs Eager Evaluation

When I started the experiment, I wanted to create a POC to see if it's feasible to create an Arrow-backed datafrme in Rust. This used eager evaluation (expressions were calculated immediately without some planning and optimisation). After that, I started exploring how to support lazy-evaluation.

The repository is currently an in-flux mess because of that, but I see light at the end of the tunnel.

The idea is to buffer operations, and only perform calculations when we need to display or save data (same way deferred computation libraries work). This is the really fun part because of the possibilities of optimising computations.

I recently started a draft of this optimisation , but I'll write more about this in a separate update.

There is an abstraction (so far called a 'LazyFrame' lol) which represents an output dataframe, and the expressions being applied on it.

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LazyFrame {
    id: String,
    pub(crate) expression: Expression,
    output: Dataset,
}

Expressions are the types of data operations being performed, and we currently have 3:

pub enum Expression {
    Read(Computation),
    Compute(Box<Expression>, Computation),
    Join(Box<Expression>, Box<Expression>, JoinCriteria, Dataset),
}

Expression::Compute in turn handles most of the computations, and has the inputs, transformations, and the output dataframe structure:

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Computation {
    pub(crate) input: Vec<Dataset>,
    pub(crate) transformations: Vec<Transformation>,
    pub(crate) output: Dataset,
}

There's still a bit of duplication here and there, but I am cleaning up the interface as I go along. Everything's also quite verbose under the hood, but I plan on creating a simplified higher-level API. In some of the earlier discussions around a Rust dataframe, there was a bit of opposition to stringy APIs, but they're very convenient. It's better to type dataframe.filter('a > 3') than to try construct that using the building blocks of the API.

I ultimately plan on making lazy evaluation the default.

How will bindings work?

Rust doesn't have a runtime, and lazily evaluated compute tends to be reproducible without maintaining state. I'm thinking that the Rust library would be stateless, and a Python/JS binding would keep track of the LazyFrame and its expressions (such as "read this SQL table, then perform these computations"), then we could invoke transformations when we need to display or save data.

This would make the dataframe usable in Jupyter notebooks and equivalents.

Performance

The short answer is "I don't know how it performs yet". I'll write benchmarks in the coming weeks/months, especially as we support more functionality.

Can the Library be Used?

If you'd like to experiment, then yes; otherwise there are Rust-based alternatives that will be more stable and move at a faster pace. Check out:

Other Interesting Tidbits

Arrow has a subproject called Gandiva , it's an LLVM-based analytical expression compiler ( intro blog post ). The summary of it is that it allows more optimised compute of Arrow data, as opposed to hand-rolling optimisations.

It's written in C++ but there's interest in creating Rust bindings, so perhaps we could use it for compute in future.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK