0

Github blog/2020-12-29.md at master · frankmcsherry/blog · GitHub

 3 years ago
source link: https://github.com/frankmcsherry/blog/blob/master/posts/2020-12-29.md
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Slicing up temporal aggregates with Materialize

This post is also available on the Materialize blog.

Materialize computes and maintains SQL queries as your underlying data change. This makes it especially well-suited to tracking the current state of various SQL queries and aggregates!

But, what if you want to root around in the past? Maybe you want to compare today's numbers to yesterday's numbers. Maybe you want to scrub through the past, moving windows around looking for the most interesting moments where exciting things happened!

Today, we'll build up one way to use Materialize to explore historical temporal data. As is often the case, we'll write things in vanilla SQL, but take advantage of Materialize's unique performance to build surprisingly reactive applications. By the end, we'll have a few queries that taken together allow you to interactively browse aggregates for arbitrary historical ranges.


Temporal data, and queries

You've probably got some data with timestamps in it.

I'm going to use a fairly common data set of NYC taxi rides. You are welcome to grab it too, but it is fairly large. Feel free to grab a small subset, or just follow along for now.

Each record in this data represents a ride, and has a pick up and drop off time. Let's take the drop off time as the "event time" for now, as this is presumably when the data (including fare, distance, etc) was finalized.

This SQL query is commonly used on this data, which is representative of some time-slice-y aggregations:

SELECT
    passenger_count,  -- a key
    MIN(fare_amount), -- some aggregate
    MAX(fare_amount)  -- some aggregate
FROM
    tripdata          -- much data
GROUP BY
    passenger_count   -- that key again

This query determines for each passenger_count (number of folks in the taxi) the minimum and maximum fares paid. The query aggregates across all of the data, because it is meant to exercise analytic systems and that's what's going to do the most work. But, this isn't always (or even usually) what folks want.

Let's imagine instead that what you want is to subset the data by some time interval.

SELECT
    passenger_count,
    MIN(fare_amount),
    MAX(fare_amount)
FROM
    tripdata
WHERE --             /--your arguments-\
    drop_off BETWEEN <TIME1> AND <TIME2>
GROUP BY
    passenger_count

This query does the same aggregation, but over a restricted amount of data that may be more meaningful to you. Perhaps the data corresponds to some month you are investigating. Perhaps you are scrubbing around through time looking for the moments of greatest disparity. In any case, you aren't interested in watching just the one aggregate across all of the data.

A first approach: That Query

That query up there works fine. Materialize can compute and efficiently maintain it for you.

The problem is that you might want to change TIME1 or TIME2 (or both!). That would make it a brand new query, and Materialize would need to start from scratch. That's probably not the experience you were hoping for.

Ideally, you could supply pairs (TIME1, TIME2) as data, and get results streamed out at you. You would get to keep the query the same, and just interactively change arguments to it. For bonus points, you (and others) could add arguments, and get results to multiple queries at the same time.

Let's commit to a specific approach: a table with schema:

CREATE TABLE requests (key int8, time1 TIMESTAMP, time2 TIMESTAMP);

Here the key is what identifies a request and the time1 and time2 columns are the requests parameters. You (and others) can change this relation, adding in new requests and removing those no longer of interest. We'd like to build up a view in Materialize that allows these live input changes, and provides interactive, always up to date outputs.

For example, we'll want to set something up so that when you do

INSERT INTO requests VALUES (1234, '2019-12-29 21:53:00'::timestamp, '2019-12-31 11:03:00'::timestamp);

you get something out the other end that looks like

  key | passenger_count |  min |  max
------+-----------------+------+------
 1234 |                 |    0 |   92
 1234 |               0 |  -23 |  340
 1234 |               1 | -275 |  709
 1234 |               2 |  -87 |  260
 1234 |               3 |  -55 |  470
 1234 |               4 |  -52 |  220
 1234 |               5 |  -52 |  215
 1234 |               6 |   -4 |  143
 1234 |               7 |    7 |    7
 1234 |               8 |   85 |   85

The data are messed up, with absent passenger_count and negative fares, I agree. This is how you know it is real data.

A second approach: (Lateral) Joins

If you've been here on the blog before, you may have a hunch that LATERAL joins are a candidate solution. If not, check out this sweet blog post on lateral joins. It turns out that we can write a query that lateral joins requests with our SELECT up above to produce the results we want:

-- Lateral join `requests` and the parameterized query.
CREATE MATERIALIZED VIEW results AS
SELECT *
FROM
    requests,
    LATERAL (
        SELECT
            passenger_count,
            MIN(fare_amount),
            MAX(fare_amount)
        FROM
            tripdata
        WHERE --            /-from requests-\
            drop_off BETWEEN TIME1 AND TIME2
        GROUP BY
            passenger_count
    );

The LATERAL keyword exposes the columns of requests to the subquery that follows. In particular, it allows correlation with the TIME1 and TIME2 columns of requests. The result is a correlated subquery that produces independent results for each pair of bindings.

This is syntactically and semantically great! It does exactly what we want, and is very concise.

Unfortunately, it is also very inefficient.

The problem is that Materialize cannot discern any common structure between the subqueries. The implementation will cross-join requests and tripdata, and then perform the reduction. Each new row in requests will effectively prompt as much computation as a from-scratch query.

While is was easy to write, this view will not result in interactive responses to new queries.

Lateral joins work great when the parameter bindings narrow the computation, for example when they populate equality constraints. When the parameter bindings are used in other (e.g. inequality, for Materialize) constraints, it is much less obvious how to share the computation and state of the subqueries.

A third approach: Time Slicing

While naive lateral joins could not identify commonality between the subquery for requests, that commonality does exist. We just need to figure out how to express it to Materialize.

To see it, imagine all TIME1 and TIME2 bindings were cleanly on a day boundary. We could first perform the aggregation above for each day.

-- Reduce down to per-day aggregates
CREATE VIEW daily_aggregates AS
SELECT
    passenger_count,
    date_trunc('day', drop_off) as time,
    MIN(fare_amount) as min_fare_amount,
    MAX(fare_amount) as max_fare_amount
FROM
    tripdata
GROUP BY
    passenger_count,
    date_trunc('day', drop_off);

From this reduced data, each of our requests could then pick up and stitch together their days of interest. The reduction down to days is common across all requests, although each then has its own unique work to do assembling the aggregates. It's not immediately clear how to do that assembly, but we'll get there. It turns out it is much less work than re-reading the whole tripdata collection.

The restriction to day-aligned request intervals is a pretty big one. What if the request times aren't aligned to days but to hours instead?

-- Reduce down to per-hour aggregates
CREATE VIEW hourly_aggregates AS
SELECT
    passenger_count,
    date_trunc('hour', drop_off) as time,
    MIN(fare_amount) as min_fare_amount,
    MAX(fare_amount) as max_fare_amount
FROM
    tripdata
GROUP BY
    passenger_count,
    date_trunc('hour', drop_off);

Now we have aggregates at the granularity of hours. We can do minutes too!

You may think we've made life harder because there are so many more aggregates to put back together. There are 24 times as many hours as there are days, and 1,440 times as many minutes as days. That is a lot more work to do than when we had to point out the days of interest. Expressing a request for a week-long interval would require 10,080 minutes as input.

However, no one said you had to use only minutes. Or only hours. You can cover most of your hypothetical week with daily aggregates, six at least. You can then just grab a few houly aggregates at each end, and a few minutely aggregates if you want those too.

Let's spell this out with an example, as it will be important to be clear. Let's say your request times are

|   key |               time1 |               time2 |
|------:|--------------------:|--------------------:|
| 12345 | 2019-12-29 21:53:00 | 2019-12-31 11:03:00 |

If we want to collect aggregates that cover the span from time1 to time2, we can do that with the following intervals. Notice that all of these intervals are either a minute, an an hour, or a day.

|   key |               time1 |               time2 |
|------:|--------------------:|--------------------:|
| 12345 | 2019-12-29 21:53:00 | 2019-12-29 21:54:00 |
| 12345 | 2019-12-29 21:54:00 | 2019-12-29 21:55:00 |
| 12345 | 2019-12-29 21:55:00 | 2019-12-29 21:56:00 |
| 12345 | 2019-12-29 21:56:00 | 2019-12-29 21:57:00 |
| 12345 | 2019-12-29 21:57:00 | 2019-12-29 21:58:00 |
| 12345 | 2019-12-29 21:58:00 | 2019-12-29 21:59:00 |
| 12345 | 2019-12-29 21:59:00 | 2019-12-29 22:00:00 |
| 12345 | 2019-12-29 22:00:00 | 2019-12-29 23:00:00 |
| 12345 | 2019-12-29 23:00:00 | 2019-12-30 00:00:00 |
| 12345 | 2019-12-30 00:00:00 | 2019-12-31 00:00:00 |
| 12345 | 2019-12-31 00:00:00 | 2019-12-31 01:00:00 |
  ... approximately 10 hours later ...
| 12345 | 2019-12-30 11:00:00 | 2019-12-30 11:01:00 |
| 12345 | 2019-12-30 11:01:00 | 2019-12-30 11:02:00 |
| 12345 | 2019-12-30 11:02:00 | 2019-12-30 11:03:00 |

It turns out that we chose a relatively concise request interval for this example. In general, you might need as many as ~23-ish hours and ~59-ish minutes on each end of the interval. But, this is not nearly as intractable a number of intervals as if done minute-by-minute.


CONCERN: That sure is a lot of input to provide to requests, isn't it? It seems like it would be tedious to do, and easy to get wrong. Absolutely! We'll write some queries in the appendix that automatically produce these intervals for you!


For now, let's imagine for now we have your request input in this more expansive representation. How might you get your aggregate results out? What SQL queries do we need to write to make that happen?

First, we need to take our daily, hourly, and minutely aggregates and turn them in to intervals. This is no more complicated than (shown for days, but the same structure for hours and minutes):

-- Reframe daily aggregates using an interval.
CREATE VIEW daily_intervals AS
SELECT
    passenger_count,
    time as time1,
    time + INTERVAL '1 day' as time2,
    min_fare_amount,
    max_fare_amount
FROM
    daily_aggregates;

We now have the data written down as a key (passenger_count), an interval (time1 and time2), and the aggregate values (min_fare_amount and max_fare_amount). We can repeat this for hours and minutes.

The reason we convert to intervals is so that we can put all records in the same collection.

-- Homogenous collection of aggregates by intervals.
CREATE VIEW all_intervals AS
SELECT * FROM daily_intervals UNION ALL
SELECT * FROM hourly_intervals UNION ALL
SELECT * FROM minutely_intervals;

We can safely use UNION ALL instead of UNION because all the measurements are distinct: they are distinct in each input, and intervals from different inputs have different widths. It turns out UNION ALL is much more efficient than UNION, because we don't need to do the work of deduplication.

Now, we can just join the expanded requests and all_intervals, and aggregate out the various time intervals to get accumulated results.

-- Select out intervals of interest and aggregate.
CREATE MATERIALIZED VIEW results AS
SELECT
    key,
    passenger_count,
    MIN(min_fare_amount),
    MAX(max_fare_amount)
FROM
    requests, -- IMPORTANT: as days, hours, and minutes
    all_intervals
WHERE
    requests.time1 = all_intervals.time1 AND
    requests.time2 = all_intervals.time2
GROUP BY
    key,
    passenger_count;

This view will collect the relevant intervals, and apply the reduction functions to the aggregates from each of the intervals. However, each request starts from relatively few bits of pre-aggregated data, rather than reconsidering the entire collection.

Trying it out

We've just defined a materialized view, and Materialize will keep this up to date as the input data change. Let's see just how interactive it is.

We'll open up another Materialize session and use the handy TAIL command.

materialize=> COPY (TAIL results) TO STDOUT;
1609383407362   1       1234            0       92
1609383407362   1       1234    0       -23     340
1609383407362   1       1234    1       -275    709
1609383407362   1       1234    2       -87     260
1609383407362   1       1234    3       -55     470
1609383407362   1       1234    4       -52     220
1609383407362   1       1234    5       -52     215
1609383407362   1       1234    6       -4      143
1609383407362   1       1234    7       7       7
1609383407362   1       1234    8       85      85
NOTICE:  TAIL waiting for more data
NOTICE:  TAIL waiting for more data
NOTICE:  TAIL waiting for more data
...

This gives us a consistent snapshot of the current results (our test request we INSERTed up above). The snapshot is then followed by a live stream of timestamped updates, each describing a new consistent state. As it happens, there are no updates because I am currently changing neither requests nor tripdata.

As soon as we type into our first shell something like (note: new times):

INSERT INTO requests VALUES (123, '2019-12-23 21:53:00'::timestamp, '2019-12-29 11:03:00'::timestamp);

we should see some new results appear in the TAIL output.

...
NOTICE:  TAIL waiting for more data
1609383413088   1       123             -40     108
1609383413088   1       123     0       -78     400
1609383413088   1       123     1       -275    743
1609383413088   1       123     2       -222    499
1609383413088   1       123     3       -80     499
1609383413088   1       123     4       -150    499
1609383413088   1       123     5       -52     231
1609383413088   1       123     6       -65     168
1609383413088   1       123     7       4       74
1609383413088   1       123     8       80      85
1609383413088   1       123     9       9       70
NOTICE:  TAIL waiting for more data
NOTICE:  TAIL waiting for more data
...

I don't have anything quantitative for you about how long this took, other than "apparently immediately". There was no perceptible amount of time between insertion and reporting the new results. This makes sense, as we just had to track down tens of records and accumulate them. We didn't even have to build a new dataflow, as the results materialized view is already up and running.

Should the underlying tripdata collection change, each of the registered queries will have updates pushed at them. In this case, the input data come pre-sliced by time, so loading the data is the main way to change it.

Thoughtful comments

We've seen a fairly reproducible pattern for slicing out intervals of time from aggregations. Our example used relatively few keys, and only did minimum and maximum aggregations. However, it should be clear-ish that the approach generalizes, to more keys and other aggregations.

What I like about this example is that we've implemented, using SQL, a reactive computation with interesting performance properties. We used our knowledge and understanding of computer science, and were able to do something better as a result. While it can be great to blast out the first SQL that you can think of, Materialize responds well to more precise direction.

If you'd like to take this example out for a spin, go and grab a copy of Materialize now!

Appendix: All the views you need to write

I wrote all the views down just to make sure that they worked. It took a fair bit of unhelpful-reading SQL to do. But, I wanted to make sure you had access to them to reproduce this, if you wanted!

There are three classes of things to do:

Converting request intervals to minutes, hours, and days

We asked the user to provide pre-sliced intervals. That seems error-prone. Surely we can do that for them?

Indeed we can, but my version is pretty terrible to read. Perhaps it can be improved.

Let's imagine that we have a relation requests with (key, time1, time2) triples, and no requirement that they be aligned to days, hours, minutes. We need to peel out some minutes near time1, then some hours, then some days, then some hours, and then some minutes ending at time2.

The logic to do this isn't impossible, or even that hard, just wordy. Here's what I wrote for the "minutes near time1":

-- Try out each of the 60 minutes near `time1`;
-- accept those that lie entirely between `time1` and `time2`.
CREATE VIEW time1_minutes AS
SELECT
    key,
    date_trunc('hour', queries.time1) + x * INTERVAL '1 minute' as time1,
    date_trunc('hour', queries.time1) + (x + 1) * INTERVAL '1 minute' as time2
FROM
    requests,
    generate_series(0, 59) x
WHERE
    -- The entire interval must lie between `time1` and `time2`.
    queries.time1 <= date_trunc('hour', queries.time1) + x * INTERVAL '1 minute' AND
    queries.time2 >= date_trunc('hour', queries.time1) + (x + 1) * INTERVAL '1 minute';

In case you read SQL as well as I do, what's going on here is that we pull out the hour of time1, and try out the 60 one-minute intervals after it. Each interval is kept only if it starts after time1, and ends before time2.

It's actually not that complicated, computationally (it is a flat_map in Materialize, which maintains no state). The logic generalizes to hours, days, etc., and can be used on the way back down if you round from time2 instead of time1.

You'll then need to take all of these intervals and union them together

-- union together derived "aligned" intervals.
CREATE VIEW request_intervals AS
SELECT * FROM time1_minutes UNION
SELECT * FROM time1_hours UNION
SELECT * FROM days UNION
SELECT * FROM time2_hours UNION
SELECT * FROM time2_minutes;

You'll notice I've invented days here. I'll leave that as homework for you. It's also worth stressing that I used UNION rather than UNION ALL. There can be repetition of these intervals if e.g. time1 and time2 are within the same day (or hour).

Grouping and aggregate data by minute, hour, and day

The input data need to be grouped into intervals by minute, hour, and day.

I did that by first dropping each record in each of the types of interval. For example, here are the daily intervals. Notice that there is no aggregation yet.

-- Drop the data into multiple windows
CREATE VIEW daily_intervals AS
SELECT
    passenger_count,
    date_trunc('day', dropoff) as time1,
    date_trunc('day', dropoff) + INTERVAL '1 day' as time2,
    fare_amount
FROM
    tripdata;

We can do the same thing for hours and minutes. Once we've produce those views too, we can aggregate them up.

-- Union contributions to each interval, and aggregate.
CREATE VIEW all_intervals AS
SELECT
    passenger_count,
    time1,
    time2,
    MIN(fare_amount) as min_fare_amount,
    MAX(fare_amount) as max_fare_amount
FROM (
    SELECT * FROM daily_aggregates UNION ALL
    SELECT * FROM hourly_aggregates UNION ALL
    SELECT * FROM minutely_aggregates
)
GROUP BY
    passenger_count,
    time1,
    time2;

I've used UNION ALL here because it is more efficient than UNION, and no records will be duplicated across the inputs because the intervals have different widths.

Finally, I knew that I wanted access to this information by (time1, time2). I intentionally left the input views unmaterialized until this point, and only now create an appropriate index. This is the point at which your computer will start chugging away, reading data and splitting it off into various intervals and aggregates.

-- Index `all_intervals` by `(time1, time2)`.
CREATE INDEX all_by_intervals ON all_intervals (time1, time2);

Join queries and input data, and reduce

Finally, we need to join the request_intervals collection (of multiple aligned intervals) with the all_intervals collection (of input data contributions). This will select out those aggregations that will contribute to each of the queries. Finally, we need to do a finishing aggregation to reduce the partial aggregates to one value.

-- Join queries and input data, and finish the reduction.
CREATE MATERIALIZED VIEW results AS
SELECT
    key,
    passenger_count,
    MIN(min_fare_amount),
    MAX(max_fare_amount)
FROM
    request_intervals,
    all_intervals
WHERE
    request_intervals.time1 = all_intervals.time1 AND
    request_intervals.time2 = all_intervals.time2
GROUP BY
    key,
    passenger_count;

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK