6

High-throughput stream processing in Rust

 9 months ago
source link: https://noz.ai/hash-pipeline/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

After the last post, I hope you’re excited to dive into a new blockchain architecture.

Let’s begin with the basics: stream processing. We’re going to talk about what this is, the channel libraries you might use, and tuning performance using idle and blocked measurements.

This post won’t be specific to blockchains. Regardless of how you arrived, I hope you’ll still find it interesting.

How to understand stream processes0

Streams

Most developers are familiar with the concept of streams from Unix pipes:

ls | grep myfile

It doesn’t matter how many results there are from ls. grep will handily process them line by line.

Streams don’t need to end. You can grep a log file while it’s still being appended:

tail -f log.txt | grep abc

This is stream processing.

Stream processingConsuming messages from a continuous flow of data and producing outputs in real-time

At tech companies of all sizes, you’ll find streams used for analytics, events, and more1. These are generally distributed systems.

However, it’s less common to find an application or server’s code solely architected to process a single high-volume data stream. The app domain is more varied. Instead, you’ll see event buses and Rx mixed with background tasks, async/await, state mutexes, direct calls to other components, dedicated threads, and other architectures. This is true even for high-performance code like web servers and game engines4.

So from that perspective, it makes sense that the original Bitcoin node, being like a server, would not be based around streams. Famously, it uses a lock called cs_main5 that protects the global state:

cs_main.png

Nevertheless, there are domains that do use streams heavily on a single machine including video processing and high-frequency trading. These are some of the most data-intensive applications I know and use hardware resources to their fullest. It’s here where we find inspiration for our new blockchain architecture, because what really is a blockchain but a stream of transactions and metadata requiring processing? (a topic for another post)

You can rent AWS instances today with 100+ CPU cores, 100s of GBs of memory, multiple GPUs, and 100 Gbps bandwidth. There’s no need for a distributed system for a node, yet6.

So let’s jump into stream processing in Rust.

The example

A song to set the mood

Let’s write a program to calculate the SHA512 and BLAKE37 hashes for 1 billion numbers. This example is a little contrived so you may imagine these numbers represent transactions, analytics events, or price signals. The hashing represents any transformation on those inputs.

Here is the single-threaded solution:

use sha2::{Digest, Sha512};
use std::time::Instant;

const N: usize = 1_000_000_000;

fn main() {
let start = Instant::now();
for i in 0..N {
let preimage = (i as u64).to_le_bytes();
Sha512::digest(&preimage);
blake3::hash(&preimage);
}
println!("{:?}", start.elapsed());
}

When I run this in release mode on a Digital Ocean instance with a dedicated CPU and 16 cores (not that the core count matters yet), it takes just over 6 minutes.

367.215521566s

Channels

Now let’s rewrite this to use stream processing. Instead of performing the hashing in a single loop, we’re going to setup a pipeline of threads to perform the hashing in parallel and collect the results. In theory, we should expect a speed-up.

Local streams that send data between two threads are called channels. You can read more about them here, or watch the excellent Crust of Rust feature on the topic:

Our new program will spawn four threads. A generator thread will produce numbers and send them to two different hashing threads simultaneously. The hashing threads will read these numbers, hash them separately, and then send their outputs to a results thread. Here’s what it will look like:

hash-pipeline-threads-1.png

We’ll use the standard library’s mpsc channels to send and receive data. mpsc stands for multiple-producer-single-consumer because you can send data into the channel from multiple threads but there is only ever one pipe out. We won’t be using this multiple-producer feature but it’s important to know.

It’s still a rather simple program:

use sha2::{Digest, Sha512};
use std::sync::mpsc;
use std::thread;
use std::time::Instant;

const N: usize = 1_000_000_000;

fn main() {
let start = Instant::now();

let (generator_to_sha512_tx, generator_to_sha512_rx) = mpsc::sync_channel(1_000_000);
let (generator_to_blake3_tx, generator_to_blake3_rx) = mpsc::sync_channel(1_000_000);
let (sha512_to_result_tx, sha512_to_result_rx) = mpsc::sync_channel(1_000_000);
let (blake3_to_result_tx, blake3_to_result_rx) = mpsc::sync_channel(1_000_000);

// Generator
thread::spawn(move || {
for i in 0..N {
let preimage = (i as u64).to_le_bytes();
generator_to_sha512_tx.send(preimage.clone()).unwrap();
generator_to_blake3_tx.send(preimage).unwrap();
}
});

// Sha512
thread::spawn(move || {
while let Ok(preimage) = generator_to_sha512_rx.recv() {
let hash = Sha512::digest(&preimage);
sha512_to_result_tx.send(hash).ok();
}
});

// Blake3
thread::spawn(move || {
while let Ok(preimage) = generator_to_blake3_rx.recv() {
let hash = blake3::hash(&preimage);
blake3_to_result_tx.send(hash).ok();
}
});

// Result
let result_thread = thread::spawn(move || {
for _ in 0..N {
sha512_to_result_rx.recv().unwrap();
blake3_to_result_rx.recv().unwrap();
}
});

result_thread.join().unwrap();

println!("{:?}", start.elapsed());
}

Let’s take it for a spin:

786.193241554s

Uh oh. The new version with channels took twice as long. What’s going on?

Circular buffers

You could run this through a flamegraph8 but let me save you the time.

All channel libraries have overhead, however small. The benefits of parallelization have to outweigh the cost of this overhead. The bottlenecks in this case are the channel send() and recv() methods. The standard library mpsc channels in Rust are relatively slow and there are alternatives like crossbeam-channel.

We profiled 4 different channel libraries for the example above. These were the results:

channel-perf.png

ringbuf and rtrb are clearly fastest. Why? They’re both lock-free ring buffers that act as a single-producer-single-consumer queue. Single producer means that there’s only one pipe to put data into the queue and one pipe out. This has less overhead than a multiple-producer queue.

These libraries are non-blocking too. When the queue is full, if you attempt to push, it will return an error rather than block. The same is true when popping an empty queue. To use these ring buffer libraries then, I added spin-locks to keep retrying when the channels would otherwise block. As it turns out, this is also what high-frequency trading architectures do9.

I also found that by adding very short sleeps while waiting, the overall performance improved. This is likely due to CPU throttling that kicks in when core usage is at 100% or above certain temps.

Here are the new pop() and push(value) helpers that retry:

fn push<T>(tx: &mut rtrb::Producer<T>, mut value: T) {
loop {
match tx.push(value) {
Ok(_) => break,
Err(rtrb::PushError::Full(v)) => value = v,
}
thread::sleep(Duration::from_millis(1));
}
}

fn pop<T>(rx: &mut rtrb::Consumer<T>) -> T {
loop {
if let Ok(value) = rx.pop() {
return value;
}
thread::sleep(Duration::from_millis(1));
}
}

Our new performance is:

306.509178724s

We’re officially faster than before, but not by much.

Now let’s kick parallelization up another gear.

More parallelization

We currently create two threads for hashing, one for SHA512 and the other for BLAKE3. Whichever is the slower of the two will be our bottleneck. To prove this, I re-ran the original single-threaded example with only SHA512 hashing. The result was:

282.74808151s

This is pretty close to the performance in the example with parallelized hashing. It means overall most of the time spent hashing is SHA512.

So what if we created more threads and hashed multiple numbers at the same time? Let’s try it. We’ll create 2 SHA512 hash threads and 2 BLAKE3 hash threads to start.

To visualize:

hash-pipeline-threads-2.png

Each thread will have its own input and output queues. We’ll cycle sending generated numbers to each thread in a round-robin order and read the results out the same order. This guarantees the order of the stream is preserved in the results thread. If ordering is not as important, or if message processing times are more variable, other dispatching mechanisms may be better.

Here’s what round-robin dispatch looks like:

let mut blake3_channel = 0;
for i in 0..N {
//...
push(&mut generator_to_blake3_tx[blake3_channel], preimage);
blake3_channel = (blake3_channel + 1) % NUM_BLAKE3_HASHERS;
}

The new code is more complex but you’ll get it:

use sha2::{Digest, Sha512};
use std::thread;
use std::time::{Duration, Instant};

const N: usize = 1_000_000_000;

const NUM_SHA512_HASHERS: usize = 2;
const NUM_BLAKE3_HASHERS: usize = 2;

fn main() {
let start = Instant::now();

let (mut generator_to_sha512_tx, mut generator_to_sha512_rx) =
ring_buffers(NUM_SHA512_HASHERS, 1_000_000);
let (mut generator_to_blake3_tx, mut generator_to_blake3_rx) =
ring_buffers(NUM_BLAKE3_HASHERS, 1_000_000);
let (mut sha512_to_result_tx, mut sha512_to_result_rx) =
ring_buffers(NUM_SHA512_HASHERS, 1_000_000);
let (mut blake3_to_result_tx, mut blake3_to_result_rx) =
ring_buffers(NUM_BLAKE3_HASHERS, 1_000_000);

// Generator
thread::spawn(move || {
let mut sha512_channel = 0;
let mut blake3_channel = 0;
for i in 0..N {
let preimage = (i as u64).to_le_bytes();
push(
&mut generator_to_sha512_tx[sha512_channel],
preimage.clone(),
);
push(&mut generator_to_blake3_tx[blake3_channel], preimage);
sha512_channel = (sha512_channel + 1) % NUM_SHA512_HASHERS;
blake3_channel = (blake3_channel + 1) % NUM_BLAKE3_HASHERS;
}
});

// Sha512
for _ in 0..NUM_SHA512_HASHERS {
let mut rx = generator_to_sha512_rx.remove(0);
let mut tx = sha512_to_result_tx.remove(0);
thread::spawn(move || loop {
let preimage = pop(&mut rx);
let hash = Sha512::digest(&preimage);
push(&mut tx, hash);
});
}

// Blake3
for _ in 0..NUM_BLAKE3_HASHERS {
let mut rx = generator_to_blake3_rx.remove(0);
let mut tx = blake3_to_result_tx.remove(0);
thread::spawn(move || loop {
let preimage = pop(&mut rx);
let hash = blake3::hash(&preimage);
push(&mut tx, hash);
});
}

// Result
let result_thread = thread::spawn(move || {
let mut sha512_channel = 0;
let mut blake3_channel = 0;
for _ in 0..N {
pop(&mut sha512_to_result_rx[sha512_channel]);
pop(&mut blake3_to_result_rx[blake3_channel]);
sha512_channel = (sha512_channel + 1) % NUM_SHA512_HASHERS;
blake3_channel = (blake3_channel + 1) % NUM_BLAKE3_HASHERS;
}
});

result_thread.join().unwrap();

println!("{:?}", start.elapsed());
}

fn ring_buffers<T>(
count: usize,
capacity: usize,
) -> (Vec<rtrb::Producer<T>>, Vec<rtrb::Consumer<T>>) {
(0..count).map(|_| rtrb::RingBuffer::new(capacity)).unzip()
}

fn push<T>(tx: &mut rtrb::Producer<T>, mut value: T) {
loop {
match tx.push(value) {
Ok(_) => break,
Err(rtrb::PushError::Full(v)) => value = v,
}
thread::sleep(Duration::from_millis(1));
}
}

fn pop<T>(rx: &mut rtrb::Consumer<T>) -> T {
loop {
if let Ok(value) = rx.pop() {
return value;
}
thread::sleep(Duration::from_millis(1));
}
}

How’s the performance now?

156.493931684s

Getting better.

Idle and blocked time

How many threads should we have for each hash function? In a more complex system, this can be quite difficult to determine, and you may even require it to be dynamic.

A technique I’ve found helpful in streams is to measure time spent idle and blocked over some time window.

Idle timeTime spent waiting on an empty queue to receive an messageBlocked timeTime spent waiting on a full queue to send an output

Idle time is when we are spinning during a pop() and blocked time is when we are spinning during a push(). I’ve modified these two functions to track how much time is spent. This code uses atomics which have little overhead.

fn push<T>(tx: &mut rtrb::Producer<T>, mut value: T, blocked: &Arc<AtomicU64>) {
loop {
match tx.push(value) {
Ok(_) => break,
Err(rtrb::PushError::Full(v)) => value = v,
}
let start = Instant::now();
thread::sleep(Duration::from_millis(10));
blocked.fetch_add(start.elapsed().as_millis() as u64, Ordering::Relaxed);
}
}

I’ve also created a new stats thread to print out these timings. Let’s see the results:

generator: %idle=0 %blocked=76
sha512_0: %idle=0 %blocked=0
sha512_1: %idle=0 %blocked=0
blake3_0: %idle=21 %blocked=26
blake3_1: %idle=21 %blocked=26
result: %idle=89 %blocked=0

We see that the sha512 threads are neither idle nor blocked. That means they are active 100% of the time and we can speed up the system by increasing the number of threads for sha512.

Note: A problem like the Heisenberg Uncertainty Principle may arise when the act of measuring a system changes its performance. If you run into this, check out the coarsetime library. Oftentimes, approximate timing measurements are good enough.

On our Digital Ocean instance, with a little trial and error, I found the optimal number was 8 SHA512 threads and 4 BLAKE3 threads.

58.277055839s

Less than 1/6th of our original time.

Conclusion

We covered what streams are, libraries to use them in Rust, a round-robin pattern for parallelization, and how to measure stream timing.

There are many other details we didn’t discuss. In a real system, you should consider pinning threads to CPU cores or perhaps a version of green threads to reduce context switches. Also, when processing a stream you’ll often need to allocate memory for the results. Memory allocations are expensive so in a future post we’ll go over some strategies for that.

The repository for this project is here: https://github.com/nozcat/hash-pipeline

You can follow me on Twitter @overclockednoz.

Notes

0. Some folks may prefer I Love Lucy’s Chocolate Factory scene for understanding.

1. See where KafKa is used.

2. See AWS’s What is a modern data streaming architecture? and A16z’s Emerging Architectures for Modern Data Infrastructure.

3. The C10K problem is now the C10M problem. Modern servers like nginx and node abstract away concurrency complexity. The SEDA architecture has an interesting history.

4. Multi-threading in game engines. UE5 has the same architecture as UE4.

5. Read more about Bitcoin’s cs_main. It’s a known scaling bottleneck.

6. BSV’s Teranode and Koinos’s node both use microservices.

7. Blake3 is pretty cool.

8. Flamegraphs are one of the most useful profiling tools to learn.

9. Also see Cache invalidation really is one of the hardest problems in computer science.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK