6

Building Scalable Streaming Pipelines for Near Real-Time Features

 2 years ago
source link: https://eng.uber.com/building-scalable-streaming-pipelines/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Building Scalable Streaming Pipelines for Near Real-Time Features

Background

Uber is committed to providing reliable services to customers across our global markets. To achieve this, we heavily rely on machine learning (ML) to make informed decisions like forecasting and surge. As a result, real-time streaming pipelines, which are used to generate the data and features for ML, have become more popular and important.

At Uber, we leverage Apache Flink to build the real-time streaming pipelines, and build platforms like Gairos and AthenaX to simplify development. However, there are still many challenges, such as scalability, due to either the complexity of computation or the amount of real-time data to be processed.

In this article, we will use the pipelines that generate demand and supply features, as an example to introduce some of the challenges we faced and how we solved them. In particular we will explain how we tune the real-time pipelines with the performance tuning framework.

Architecture

The figure below shows the high-level architecture: Streaming Pipelines in Apache Flink are responsible for the feature computation and ingestion. For the rest of the article, we will discuss these pipelines in detail.

Figure 1: Simplified Overview of the Architecture

Feature Computation

This section details how to aggregate raw events, such as the demand and supply events, by their geospatial and temporal dimensions, as well as by global product (UberX, etc.) for any given hexagon (c.f., here). The simplified computation algorithm is as follows:

  1. Count the number of raw events from distinct riders and drivers by hexagon and global product type in a 1-minute window
  2. Apply the Kring Smooth to multiple rings, up to ring-20 (discussed later) on the 1-minute window
  3. Aggregate the smoothed values of each ring on multiple sliding window sizes up to 32 minutes

In total, one real-time pipeline generates 54 features for a hexagon each minute, using the combination of 9 rings (0, 1, 2, 3, 4, 5, 10, 15, 20), and 6 window sizes (1, 2, 4, 8, 16, 32).

Next, we discuss step 2 of the algorithm:

Kring Smooth

The Kring Smooth process calculates the geospatial aggregation by broadcasting the event counts of a hexagon to its Kring neighbours. In other words, the feature value of a hexagon for a particular ring takes into account the event counts from all hexagons within that ring.

In order to calculate the feature value aggregated on the ring R for a given hexagon H, the equation is:

549917F0-10B2-49DA-9B4F-DF1F44B7D221_4_5005_c-300x133.jpeg

Where:

  • Num(i) is the number of hexagons of ring i
  • Nij is the jth hexagon of ring i
  • f(H, 0) is the number of events originated from Hexagon H

So let’s check the following example to see how to compute the values of 3 features: ring 0, ring 1, and ring 2 of the hexagon A, following the equation:

Num(0) = 1

Num(1) = 6

Num(2) = 12

f(A, 0) = 1

f(A, 1) = (f(A, 0) + f(B1, 0) + f(B2, 0)) / (Num(0) + Num(1)) = (1 + 2 + 1) / 7 = 4 / 7

f(A, 2) = (f(A, 0) + f(B1, 0) + f(B2, 0) + f(C1, 0) + f(C2, 0) + f(C3, 0)) / (Num(0) + Num(1) + Num(2)) = (1 + 2 + 1 + 3 + 2 + 1) / (1 + 6 + 12) = 10 / 19

Figure 2: Ring 0, 1, 2 of Hexagon A

The pipeline follows the equation to calculate the values of features for multiple ring sizes, up to 20. 

Temporal Aggregation

After the Kring Smooth completes for a one-minute window, step 3 of the algorithm is to further aggregate the smoothed event counts on larger windows, up to 32 minutes. In order to calculate the aggregation on a larger window for a given hexagon H, the equation is:

554450E9-7A67-42E5-A696-572D67418298_4_5005_c-300x108.jpeg

Where:

  • T is the start timestamp of a window
  • W is the window size in minutes
  • q(H, T, 1) is the smoothed event count from the Kring Smooth

Figure 3 below demonstrates how to compute the feature value of hexagon A for a 2-minute window:

  • The smoothed event counts from Kring Smooth for windows – W1 and W2 – are 1.0 and 3.0 respectively, and are emitted at T0 + 1min and T0 + 2min, respectively
  • the feature value for the 2-minute window is 2.0 by following the equation above using the smoothed event counts, which falls into the time range of (T0, T0 + 2min)
Figure 3: Aggregation of a 2-minute Window for Hexagon A

Streaming Implementation and Optimization

This section uses the demand pipeline as the example to illustrate how to implement the feature computation algorithm in Apache Kafka and Apache Flink, and how to tune the real -time pipeline.

Logical Jobs Topology

Figure 4 below illustrates the logical DAG of the streaming pipeline to calculate the demand features. For all the windows whose sizes are greater than 1 minute, they are sliding windows, and these windows will be sliding by 1 minute, which means an input event could be included within 63 windows: 32 + 16 + 8 + 4 + 2 + 1.

Figure 4: Logical DAG of Demand Pipeline

The table below lists the functionalities for major operators in the logical DAG:

Operator Functionalities

Kring Smooth on 1-Min This operator applies the Kring Smooth algorithm.

2-Min, 4-Min, 8-Min, 16-Min, 32-Min These operators aggregate for those windows sliding by 1 minute, using the smoothed demands returned from Kring Smooth.

Merge Windows This operator collects the aggregated results from all upstream windows (1, 2, 4, 8, 16, 32), then packs them into a single record for persistence.

For example, at 1:32am, the operator will emit a record including the demand features for time windows:

  • 1-min (1:31 – 1:32)
  • 2-min (1:30 – 1:32)
  • 4-min (1:28 – 1:32)
  • 8-min (1:24 – 1:32)
  • 16-min (1:16 – 1:32)
  • 32-min (1:00 – 1:32)

Table 1: Logical Operators of Demand Pipeline

The Streaming Pipeline’s Data Volume

This section lists the data volumes for the demand pipeline:

  • The average input rate of Kafka topics: 120k/s
  • Count of Hexagons: 5M 
  • Count of cities: 1500
  • Average and Maximum counts of Hexagons per city: 4K and 76K
  • Average counts of demand events by Hexagon at 1 minute: 45
  • Hexagon counts of ring-20: 1261

It’s clear that the pipeline has high volume, intensive computation, and large states to manage. The first version was actually built as per the logical DAG, which could not run stably (as shown in the dashboard below due), due to issues including backpressure and OOM. Considering that we are targeting near-real-time latency (less than 5 minutes), there is a real challenge ahead of us to build a stable, working pipeline.

Memory Monitor:

Screen-Shot-2021-08-23-at-3.51.45-PM.png
Figure 5: Dashboard of the Used Memory

Lagging Monitor:

763ACF39-38A3-416E-924B-892364A70290.jpeg
Figure 6: Dashboard of the Lagging

How to Optimize

This section discusses how to tune this streaming pipeline. At Uber, we have developed a framework of performance tuning for streaming pipelines, as well as an end-to-end integration test framework. Dedicated integration tests are developed before kicking off the actual tunings, allowing us to refactor or optimize a streaming pipeline with confidence that the pipeline will still generate correct results, similar to how unit tests protect us from regression. These integration tests become extremely valuable over the whole process of optimization.

Next, we will introduce the performance tuning framework.

Performance Tuning Framework

As shown in the inner triangle of the figure below, our framework focuses on 3 areas: Network, CPU, and Memory, measured and monitored with the metrics served by Uber’s uMonitor system. The vertices of the outer pentagon indicate the major domains that could be explored for optimization.

performance-tuning-map-1024x936.png
Figure 7: the Framework of Performance Tuning

The table below briefly explains the techniques and potential impacts of each domain:

Domain Major Impacted Areas Remarks

Parallelism CPU

Memory

Controls the count of containers/works for a streaming job.

Use Cases:

  • If a job’s input rate is 16k/s, and each container could process 2k/s at most, then the parallelism should be at least 8 or 16k / 2k; for the same job, if the Kafka topic has fewer than 8 partitions, then it makes sense to bump the topic’s partitions.

Partition Network

Memory

Controls how the messages should be grouped by key, and transferred between the upstream and downstream operators. Partition is one of the most important domains, and it impacts all areas as the messages have to be se/der, which takes significant share of CPU, and might trigger more runs of garbage collector, due to the object creation at deserialization.

Use Cases:

  • Be cautious with the skewed partition, which could create hotspots, slowdown, or cause lagging for a streaming pipeline

Remote Call Network

Controls how an operator of a streaming pipeline interacts with external services or sinks.

Use Cases:

  • If the remote call is the bottleneck, to consider adding Redis cache or in-memory cache
  • Redis cache also involves remote call, which could become a bottleneck when retrieving a cached item with large data size
  • Consider with async

Algorithm CPU It’s reasonable to assume that the impact of algorithms is greatest on the CPU.

Use Cases:

  • Consider sampling the events if the pipeline’s window has large volume of them, such as to sample events for the calculation of percentiles

Garbage Collector Memory

Use Cases:

  • Reuse the message or object when passing messages between the upstream and downstream operators if possible, to avoid unnecessary object creation/removal
  • Be cautious with unnecessary boxing/unboxing, which could add up when processing high volume data
  • Have pipeline-specific memory management, using pre-allocated memory

Table 2: The Domains of Performance Tuning

Next, we discuss how to optimize the pipeline.

Optimizations

We have applied many optimizations onto the streaming pipeline, and some optimization techniques impact on multiple areas as described above. One particular technique, Customized Sliding Window, has a significant impact on all 3 areas, so we have a dedicated section to discuss it, as well as one for storage.

Network Optimization

The major optimization techniques are listed in the table below:

Technique Area / Domain Explanations

Fields exclusion Algorithm Raw Kafka messages have many fields that are not used by the streaming pipeline; hence we have a mapper to filter out unused fields at the very beginning of Job DAG.

Key encoding / decoding Algorithm Global product type UUID is used as part of the output key. We encode the UUID (128 bits) with a byte (8 bits) via an internal encoding, and then convert back to UUID: writing the output to sink, which reduces the size for both the memory and the network payload.

Dedup Algorithm We have introduced a 1-Min window to only keep one demand event per distinct rider before the Kring Smooth. The dedup, implemented with a reduce function, has significantly reduced the message rate to be 8k/s for the expensive Kring Smooth calculation. The trade-off is that the dedup window has introduced one more minute to the latency, which is mitigated with other techniques.

Field Type Selection Algorithm We have refactored the algorithm so that we could choose Integer as the data type for the intermediate computation value rather than Double, which further reduces the message size from 451 bytes to 237 bytes.

Merge Window Output Remote Call We could have 2 options when writing the output into sink:

  • Let windows of all supported size (1-32 Min) to directly write into sink
  • Introduce an extra window to merge these windows’ output into a single record first, then write into sink 

We chose the latter, as the former option could only output up to 2M/s to the sink, which simply could not handle this ingestion rate.

Table 3: Techniques of Network Optimization 

As detailed above, the key improvement was to have both fewer and smaller messages.

Memory Optimization

The techniques for memory are listed in the table below:

Technique Area / Domain Explanations

Object Reuse Garbage Collector By default, objects are not reused in Flink when passing messages between the upstream and downstream operators. We have enabled the object reuse when possible to avoid message cloning.

Increase Containers Partition Due to the large data volume and states (each container could consume around 6G memory) we have used 128 containers, each with one vcore, for the streaming pipeline.

Key encoding / decoding Algorithm Global product type UUID is used as the key of the output. We encode the UUID (128 bits) with a byte (8 bits) via an internal encoding, and convert back to UUID before writing the output to Sink, which reduces the memory size.

Fields projection Algorithm Raw Kafka messages have many fields that are not used by the streaming pipeline, hence we have a mapper to filter out unused fields at the entry point of Job DAG, reducing messages’ memory load on computation.

Field Type Selection Algorithm We have refactored the algorithm to choose Integer as the data type for the intermediate computation value rather than Double, reducing the message size from 451 to 237 bytes. 

Table 4: Techniques of Memory Optimization

Note: some techniques have also been included for the optimization on the network.

CPU Optimization

The techniques applied for CPU optimization are listed below:

Technique Area / Domain Explanations

Message in Tuple CPU Flink provides the Tuple type, which is more efficient compared to POJO at serialization, due to the direct access without reflection. We have chosen Tuple for messages being passed between operators.

Avoid boxing / unboxing CPU The streaming pipeline needs to call an internal library (H3) to retrieve the neighbours for a given hexagon. The API returns an array of Long, leading to unnecessary boxing/unboxing along with the computation. We have added another API to return an array of the primitive type instead.

Cache for API Remote Call We have enabled an in-memory cache to improve performance when converting the global product type, as the return from remote API does not change that much.

Hexagon Index Type Algorithm We converted the default hexagon data type from String to Long, which has reduced the window aggregation function’s time by 50%. 

Kring Smooth Algorithm We have re-implemented the Kring Smooth algorithm to make it more efficient.

Table 5: Techniques of CPU Optimization

Customized Sliding Window

The pipeline still could not run smoothly with just the tunings above, because it needs to aggregate on several sliding windows (2, 4, 8, 16, 32). The window aggregation has the following overheads, due to the need to partition the events by a key:

  • De/Ser when passing messages from the upstream to window operators
  • Message Transfer over network
  • Object being created at deserialization
  • State management and metadata required by window management, such as the window trigger

These overheads have added significant pressure to Garbage Collector, CPU and Network. To make things worse, the sliding window requires more states compared to a tumbling or fixed-size window, because one event needs to be kept in a series of slided windows. Take a 4-minute sliding window as an example: given an event occurred at 2021-01-01T01:15:01Z, this event will be kept in the following 4-minutes windows,

  • 2021-01-01T01:12:00Z ~ 2021-01-01T01:16:00Z
  • 2021-01-01T01:13:00Z ~ 2021-01-01T01:17:00Z
  • 2021-01-01T01:14:00Z ~ 2021-01-01T01:18:00Z
  • 2021-01-01T01:15:00Z ~ 2021-01-01T01:19:00Z

Due to the fan-out effect from the sliding window, the pipeline is under lots of pressure from state management. To resolve these issues, we have manually implemented the sliding window logic with an operator of FlatMap, with the following features:

  • With Object Reuse being enabled, the events from the upstream operators are passed and reused, which avoids the partition and related costs
  • States are managed in memory, so in effect each event only has one copy of data 

We have the following estimation with respect to the maximum required memory to keep the states in memory:

Total Memory = Count(Hexagon) * Count(Product) * Max(window size) * sizeof(event)

          = 3M * 6 * 32 * 237b

                       = 136G

With the parallelism of 128, the memory per container is around 1G, which is manageable. In production, the actual memory is far below the maximum, because not all hexagons have events for a time range. 

The efficiency from this customized sliding window is remarkable, so we have successfully re-used this operator for more than 5 different use cases that required aggregations on multiple large sliding windows.

Final Job DAG After Tuning

Network-Pricing-Eyeball-DAG-Final.png
Figure 8: Final DAG of Demand Pipeline

After optimization, we end up with a simpler job DAG, where the customized sliding window has replaced the larger window operators.

The pipeline has been running reliably, as shown in the following 24-hour dashboards:

Lagging Monitor:

659566A9-8786-4C0E-AC01-41E01A455D7B_4_5005_c.jpeg
Figure 9: Dashboard of Showing the Lagging After Optimization

Container Memory Monitor:

185B9C0B-6540-4EF0-8766-6AC184689457.jpeg
Figure 10: Dashboard of Showing Memory Usage After Optimization

Storing

To make it easier for us to maintain the pipelines and re-use the sinks, we have further refactored the pipeline DAG by separating the sink operator into a dedicated publisher job in Flink, and connecting the computation and publisher jobs with Kafka. This section focuses on the details of this publisher job.

For the model being served, it will look up the demand and supply information based on the geo, time and product. We selected Docstore (Uber’s in-house KV store solution) as our storage.

We started with a docstore cluster which is shared by many use cases. 

Here are the results for inserting a one row-per-API call. Write QPS peaks around 13k, but most of the time it is on the order of hundreds. 

FCBDD083-1089-449C-8B9A-F5C2BFD30A97.jpeg
Figure 11: Write QPS is not stable with one row per API call

Batching

We tried to write these rows in batches to see whether it would improve throughput. To increase the batching efficiency, we partitioned the data based on the shard number in the Docstore. However, write QPS is lower after batching is applied. After we dug deeper, we found it was due to the cardinality of a dimension of metrics emitted in the streaming job is too large. We change that dimension to a constant string instead of a random UUID. The write QPS can reach about 16k.

Before writing to Docstore, we first write the data to a Kafka topic. After disabling the Kafka sink, we can see around a 10% increase for write QPS.

Write QPS doubled to 34k after we changed the per-shard batch size to 50. We have also tried batch size 100 and 200. For batch size 100, write QPS increases to 37k (about 20% increase).

After changing batch size to 200, not much difference (c.1k) was observed.  

In the following table, we list QPS under different configurations: 

Write QPS 

First version 150

Fixing the metrics cardinality problem 16k

After disabling writing to kafka topic 17.6k

Batch size 50  34k

Batch size 100 37k

Batch size 200 38k

Table 6: Throughput under different batch sizes

Parallelism

Flink job parallelism is another parameter we tuned to improve the QPS. 

After updating the parallelism of the publisher job to 256, the write QPS was around 75k, more than doubled. Batch size is 200. With parallelism 1024, we see the QPS reaches 112k. However, we see a lot of timeout errors already. After changing batch to 50, write QPS is around 120k.

Job Parallelism Batch Size Write QPS

256 200 75k

1024 200 112k

1024 50 120k

Table 7: Throughput under different job parallelisms

Thread Pool

For each Flink job, we have also tried using a thread pool to increase the write QPS, with the following results:

Thread Pool Size Job Parallelism Batch Size Write QPS

32 128 50 64k

128 128 50 62k

128 256 50 80k

16 256 50 120k

Table 8: Throughput under different thread pool sizes

If we use thread pool size 16, peak QPS is around 120k, but it is not very stable.

After we tried every optimization we could think of in the shared cluster, it still could not reach the write QPS we wanted. We asked for a dedicated cluster to test.  

Partitioner Tuning

We removed the Docstore sink and just kept the FlatMap. If we removed the call to the partitioner, 64 containers can handle over 200k input message rate without lagging. 

We added the custom partition strategy before the FlatMap.

CC9A8AEE-BB1A-4C24-B556-75F1B1F5030A_4_5005_c.jpeg

With 384 containers, Lagging was around 12 min. Partitioner latency varies from 0.2ms to 5ms. Increasing to 512 containers brought the lagging down to 3 min. Later we found that 0.2ms per partitioner call is the bottleneck. We added a local partitioner call cache to flatmap. The cache hits were similar to input message rate after 20 min.

However, lagging kept increasing:

50610EF6-B102-4657-99A4-F3ECF7361FD6.jpeg
Figure 12: Job lagging keeps increasing

Back-pressure is at the custom partition stage.

0F26CD00-22AA-4E4C-BC7A-8CA1E8E1BED2.jpeg
Figure 13: Topology of the job and backpressure is at the custom partition stage

Updating parallelism to 128 effectively removed any lag from the pipeline. Each DC can write 300K QPS without any problem.

Data Size

We tried 3 different schema to see the data size difference. The first uses one column for each (ring size, time bucket, supply/demand) tuple. The second one uses one map for demand and one map for supply. The third one groups 7 hexagons at granular level 9 into one row. 

F5BB74B5-03A3-42DC-A04B-75F9BA9775BD.jpeg

With 6 days of data, we get the data sizes like this: 

No compression With compression

driver_pricing 96.3TB 36.8TB

driver_pricing_map 105.6TB 43.6TB

driver_pricing_map_group 328TB 132.6TB

Table 9: Compression under different data schemas

After enabling the compression, we are seeing around 60% disk savings on all 3 tables.

Serving

During testing, we found some latency issues: P99 latency is around 150ms. It is unacceptable for our pricing workflow. Through debugging we found that each partition key has many rows–around 6k. This means our database engine needs to scan at least 6k rows and then will apply the filtering passed in the Query. As the partition key’s size grows, it causes periodic spikes of 200msec or so. But we realized that TTL is also set for this table, so what we have done is deployed a hot patch in Query to restrict the result to only rows that are not expired, and then apply the filtering passed in the query. This reduced the scan on the underlying engine, and P99 latency dropped to 10ms. 

61480C10-BD79-4122-91BB-052FC6A8970E.jpeg
Figure 14: Serving latency drops from 150ms to 10ms after the optimization

Conclusion

Powering machine learning models with near real-time features can be quite challenging, due to computation logic complexity, write throughput, serving SLA, etc. In this blog, we introduced some of the problems that we faced and our solutions to them, in the hope of aiding our peers in similar use cases.

If you are interested in joining the Marketplace Intelligence team, tackling challenges for large scale streaming (e.g. Flink, Apache Samza), OLAP (e.g. Elasticsearch®) and ML (e.g. ETD prediction), please apply to join our team!

Apache®, Apache Cassandra®, Apache Flink, Apache Kafka, Apache Pinot, Apache Samza, Cassandra®, Flink, Samza, and Kafka are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks. Elasticsearch is a registered trademark of Elasticsearch BV.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK