5

Understanding Rust futures by going way too deep

 2 years ago
source link: https://fasterthanli.me/articles/understanding-rust-futures-by-going-way-too-deep
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Understanding Rust futures by going way too deep

Jul 25, 2021 ยท 107 minute read ยท rust ยท async

So! Rust futures! Easy peasy lemon squeezy. Until it's not. So let's do the easy thing, and then instead of waiting for the hard thing to sneak up on us, we'll go for it intentionally.

Cool bear's hot tip

That's all-around solid life advice.

Choo choo here comes the easy part ๐Ÿš‚๐Ÿ’จ

We make a new project:

$ cargo new waytoodeep Created binary (application) `waytoodeep` package

We install cargo-edit in case we don't have it yet, so we can just cargo add later:

$ cargo install cargo-edit Updating crates.io index Downloaded cargo-edit v0.7.0 Downloaded 1 crate (57.6 KB) in 0.47s Ignored package `cargo-edit v0.7.0` is already installed, use --force to override

..but we already have it.

Yeah, because it's really convenient. Readers just get confused because subcommands like cargo new, cargo build, cargo test, cargo run etc. are built into cargo, but cargo add isn't.

Ah right! In fact I see there's a bunch of these, like cargo-hack, cargo-udeps, cargo-expand... the list goes on.

Then we pick an async runtime, because those futures won't poll themselves... and we'll pick tokio for no reason other than: that's what I've been using a bunch these past few months.

$ cargo add [email protected] --features full Updating 'https://github.com/rust-lang/crates.io-index' index Adding tokio v1.9.0 to dependencies with features: ["full"]

Then we change up our main so it uses a default tokio executor (cargo new generated one for us, but it's not adequate here):

// in `src/main.rs` #[tokio::main] async fn main() { println!("Hello from a (so far completely unnecessary) async runtime"); }

$ cargo run Finished dev [unoptimized + debuginfo] target(s) in 0.01s Running `target/debug/waytoodeep` Hello from a (so far completely unnecessary) async runtime

Cool!

But let's add some other nice things I just like to have in my projects.

First, for error handling - we're writing an app, we're going to get a bunch of different types from different libraries, it'd be neat if we could have one type to unify them all.

eyre gives us that (just like anyhow)!

And since I like pretty colors I'll use color-eyre

$ cargo add [email protected] Updating 'https://github.com/rust-lang/crates.io-index' index Adding color-eyre v0.5.11 to dependencies

Now we need to install color-eyre as the default panic handler, and I snuck in some environment variable modification so we get backtraces by default.

use color_eyre::Report; #[tokio::main] async fn main() -> Result<(), Report> { setup()?; println!("Hello from a (so far completely unnecessary) async runtime"); Ok(()) } fn setup() -> Result<(), Report> { if std::env::var("RUST_LIB_BACKTRACE").is_err() { std::env::set_var("RUST_LIB_BACKTRACE", "1") } color_eyre::install()?; Ok(()) }

$ cargo run Finished dev [unoptimized + debuginfo] target(s) in 0.02s Running `target/debug/waytoodeep` Hello from a (so far completely unnecessary) async runtime

Okay good! Now if we have an error from somewhere, we'll see the full stack trace, like so:

color-eyre.59b0b0e3bc02bbd3.jpg

And finally, because I like my logs to be structured, let's add tracing and to print them with nice colors in the terminal, let's add tracing-subscriber.

$ cargo add [email protected] [email protected] Updating 'https://github.com/rust-lang/crates.io-index' index Adding tracing v0.1.26 to dependencies Adding tracing-subscriber v0.2.19 to dependencies

We already have a setup function so we'll just install tracing-subscriber in there.. and we'll change that println! to an info!. Also, again, some environment variable manipulation so that if nothing is set, we default to the info log level for all crates.

use color_eyre::Report; use tracing::info; use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<(), Report> { setup()?; info!("Hello from a comfy nest we've made for ourselves"); Ok(()) } fn setup() -> Result<(), Report> { if std::env::var("RUST_LIB_BACKTRACE").is_err() { std::env::set_var("RUST_LIB_BACKTRACE", "1") } color_eyre::install()?; if std::env::var("RUST_LOG").is_err() { std::env::set_var("RUST_LOG", "info") } tracing_subscriber::fmt::fmt() .with_env_filter(EnvFilter::from_default_env()) .init(); Ok(()) }

$ cargo run Finished dev [unoptimized + debuginfo] target(s) in 0.02s Running `target/debug/waytoodeep` Jul 25 17:03:46.993 INFO waytoodeep: Hello from a comfy nest we've made for ourselves

Alright, we're ready to do something useful!

Doing something useful

When deciding which article to read during their coffee break, people usually open several websites at the exact same moment, and read whichever article loads first.

And that's a fact. You can quote me on that because, well, who's going to go and verify that? That sounds like a lot of work. Just trust me on this.

So let's write a program that does exactly that.

Oh boy, we're gonna need more crates aren't we.

You guessed it! Let's bring in reqwest - although I don't love its API, it'll work nicely with the rest of our stack here.

Also we'll tell reqwest to use rustls because screw OpenSSL, that's why.

$ cargo add [email protected] --no-default-features --features rustls-tls Updating 'https://github.com/rust-lang/crates.io-index' index Adding reqwest v0.11.4 to dependencies with features: ["rustls-tls"]

We're ready to make a request!

#[tokio::main] async fn main() -> Result<(), Report> { setup()?; info!("Hello from a comfy nest we've made for ourselves"); let client = Client::new(); let url = "https://fasterthanli.me"; // this will turn non-200 HTTP status codes into rust errors, // so the first `?` propagates "we had a connection problem" and // the second `?` propagates "we had a chat with the server and they // were not pleased" let res = client.get(url).send().await?.error_for_status()?; info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!"); Ok(()) }

And off we go!

$ cargo run Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep) Finished dev [unoptimized + debuginfo] target(s) in 3.05s Running `target/debug/waytoodeep` Jul 25 17:12:32.276 INFO waytoodeep: Hello from a comfy nest we've made for ourselves Jul 25 17:12:32.409 INFO waytoodeep: Got a response! url=https://fasterthanli.me content_type=Some("text/html; charset=utf-8")

And this is what I mean by structured logging. Well, part of it anyway. In that line here:

info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!");

We have a message, Got a response!, then a tag named url whose value is the Display-formatting of the binding named url, and a tag named content_type, whose value is the Debug-formatting of the expression res.headers().get("content-type").

Easy peasy! name = %value for Display, name = ?value, for Debug, and if both name and value have the same... name, we can use the short forms %value and ?value.

Of course there's also spans, which are great, and to me the whole point of this is you can then send them to APM platforms like Datadog or Honeycomb or whoever, but this isn't an article about tracing.

Just to illustrate though, if we install a JSON tracing subscriber instead, this is what we get:

$ cargo run Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep) Finished dev [unoptimized + debuginfo] target(s) in 3.09s Running `target/debug/waytoodeep` {"timestamp":"Jul 25 17:17:21.531","level":"INFO","fields":{"message":"Hello from a comfy nest we've made for ourselves"},"target":"waytoodeep"} {"timestamp":"Jul 25 17:17:21.709","level":"INFO","fields":{"message":"Got a response!","url":"https://fasterthanli.me","content_type":"Some(\"text/html; charset=utf-8\")"},"target":"waytoodeep"}

Which should be enough to pique your interest.

To peak your interest?

Either. Both. Both is good.

Fetching two things

Okay, now let's fetch two things!

These two things:

pub const URL_1: &str = "https://fasterthanli.me/articles/whats-in-the-box"; pub const URL_2: &str = "https://fasterthanli.me/series/advent-of-code-2020/part-13";

...so that it's a fair comparison. Both these articles are hosted on my own website, and it's definitely not a marketing scheme, instead it's so that the fetch time is comparable and there's a chance one will finish fetching before the other (and that will change randomly over time).

Uh-huh, sure. If that's what you need to tell yourself.

We'll make a quick function to fetch a thing:

async fn fetch_thing(client: &Client, url: &str) -> Result<(), Report> { let res = client.get(url).send().await?.error_for_status()?; info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!"); Ok(()) }

And use it:

#[tokio::main] async fn main() -> Result<(), Report> { setup()?; info!("Hello from a comfy nest we've made for ourselves"); let client = Client::new(); fetch_thing(&client, URL_1); fetch_thing(&client, URL_2); Ok(()) }

And then run it:

$ cargo run Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep) warning: unused implementer of `Future` that must be used --> src/main.rs:15:5 | 15 | fetch_thing(&client, URL_1); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: `#[warn(unused_must_use)]` on by default = note: futures do nothing unless you `.await` or poll them warning: unused implementer of `Future` that must be used --> src/main.rs:16:5 | 16 | fetch_thing(&client, URL_2); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: futures do nothing unless you `.await` or poll them warning: 2 warnings emitted Finished dev [unoptimized + debuginfo] target(s) in 3.01s Running `target/debug/waytoodeep` Jul 25 17:26:31.571 INFO waytoodeep: Hello from a comfy nest we've made for ourselves

Huh, weird, nothing happened.

long sigh amos ffs you ignored the yellow squigglies and the very noisy Rust warnings about those futures not being polled just to prove a point, I get it I get it now go fix it.

Okay yeesh sure I'll fix it.

fetch_thing(&client, URL_1).await?; fetch_thing(&client, URL_2).await?;
$ cargo run Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep) Finished dev [unoptimized + debuginfo] target(s) in 3.17s Running `target/debug/waytoodeep` Jul 25 17:27:29.768 INFO waytoodeep: Hello from a comfy nest we've made for ourselves Jul 25 17:27:29.891 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 17:27:29.974 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")

So, uh yeah, lesson zero: futures don't do anything unless they're polled.

And that's because futures are just state, pretty much! In fact, let's make one:

// in `src/main.rs` mod dumb;

// in `src/dumb.rs` use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; use tracing::info; pub struct DumbFuture {} impl Future for DumbFuture { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { info!("Hello from a dumb future!"); Poll::Ready(()) } }

// back in `src/main.rs` #[tokio::main] async fn main() -> Result<(), Report> { setup()?; let fut = dumb::DumbFuture {}; Ok(()) }

There. That's pretty much what we did when we didn't .await.

Running it does nothing other than print warnings:

$ cargo run Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep) warning: unused variable: `fut` --> src/main.rs:14:9 | 14 | let fut = dumb::DumbFuture {}; | ^^^ help: if this is intentional, prefix it with an underscore: `_fut` | = note: `#[warn(unused_variables)]` on by default warning: 1 warning emitted Finished dev [unoptimized + debuginfo] target(s) in 2.11s Running `target/debug/waytoodeep`

Because how could it? We're literally just building a struct. A zero-sized struct at that!

If we .await it though... then we're asking the runtime to run its event loop until such time as the future is polled and it finally returns Poll::Ready, which ours does immediately:

#[tokio::main] async fn main() -> Result<(), Report> { setup()?; info!("Building that dumb future..."); let fut = dumb::DumbFuture {}; info!("Awaiting that dumb future..."); fut.await; info!("Done awaiting that dumb future"); Ok(()) }

$ cargo run Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep) Finished dev [unoptimized + debuginfo] target(s) in 2.34s Running `target/debug/waytoodeep` Jul 25 17:37:09.261 INFO waytoodeep: Building that dumb future... Jul 25 17:37:09.261 INFO waytoodeep: Awaiting that dumb future... Jul 25 17:37:09.261 INFO waytoodeep::dumb: Hello from a dumb future! Jul 25 17:37:09.262 INFO waytoodeep: Done awaiting that dumb future

And that's a bit different from, say, ECMAScript promises, which can do some amount of work even if they're not awaited at all.

But nope, Rust futures are just dumb boring state machines, and you can see the machinery if you cause trouble on purpose:

// in `src/dumb.rs` impl Future for DumbFuture { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { panic!("Oh heck no"); } }

$ RUST_BACKTRACE=1 cargo run Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep) Finished dev [unoptimized + debuginfo] target(s) in 2.28s Running `target/debug/waytoodeep` Jul 25 17:41:18.956 INFO waytoodeep: Building that dumb future... Jul 25 17:41:18.956 INFO waytoodeep: Awaiting that dumb future... The application panicked (crashed). Message: Oh heck no Location: src/dumb.rs:14 โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ” BACKTRACE โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ” โ‹ฎ 6 frames hidden โ‹ฎ 7: <waytoodeep::dumb::DumbFuture as core::future::future::Future>::poll::h4a44780628f4c5f0 at /home/amos/ftl/waytoodeep/src/dumb.rs:14 8: waytoodeep::main::{{closure}}::h36de5a1f1f2a5c5b at /home/amos/ftl/waytoodeep/src/main.rs:17 9: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll::h20a96e082c7a581e at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/mod.rs:80 10: tokio::park::thread::CachedParkThread::block_on::{{closure}}::hdf98cb3c7fdf3de4 at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/park/thread.rs:263 11: tokio::coop::with_budget::{{closure}}::h6a86a24a246e220f at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/coop.rs:106 12: std::thread::local::LocalKey<T>::try_with::h2ce0ac27c85965b6 at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:376 13: std::thread::local::LocalKey<T>::with::hc449f38c9f65fb53 at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:352 14: tokio::coop::with_budget::h5db157bd1e95e0e8 at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/coop.rs:99 15: tokio::coop::budget::h7b57383f1255ac24 at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/coop.rs:76 16: tokio::park::thread::CachedParkThread::block_on::hece399485213b91c at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/park/thread.rs:263 17: tokio::runtime::enter::Enter::block_on::h89e9882e539e82d3 at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/runtime/enter.rs:151 18: tokio::runtime::thread_pool::ThreadPool::block_on::h1a0186470c00ba70 at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/runtime/thread_pool/mod.rs:71 19: tokio::runtime::Runtime::block_on::h7c21d6989b86d606 at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/runtime/mod.rs:452 20: waytoodeep::main::hb4dd5ffd46a5c032 at /home/amos/ftl/waytoodeep/src/main.rs:20 21: core::ops::function::FnOnce::call_once::hc1fcc87431f77d25 at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ops/function.rs:227 โ‹ฎ 11 frames hidden โ‹ฎ Run with COLORBT_SHOW_HIDDEN=1 environment variable to disable frame filtering. Run with RUST_BACKTRACE=full to include source snippets.

This is much nicer with colors so I hope you're following along at home, but we can see our actual main function at frame 20, then going up, we can see Runtime::block_on, a thread pool thingy, some parked threads, thread-local stuff (the other TLS), a generated future (frame 9 and 8, which is basically what our async fn main ended up being), and finally our DumbFuture's poll method (frame 7).

Frames 6 through 1 are just panic machinery, again wholly out of scope for this article.

But please step up, dear spectator, and move your arms around the contraption to make sure that there's not trickery going on, no hidden wires, no..

What in the world are you going on about

...there's no "special handling" for async stacktraces is what I'm saying. Here, sure, we're panicking, that's a Rust-only thing, the OS never even knows we nearly avoided a catastrophe.

But we can make a much bigger mess, if we're willing to use unsafe:

impl Future for DumbFuture { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { unsafe { *(0xF00D as *mut u64) = 0x0; } unreachable!(); // pinky promise } }

And then no amount of panic handling will save us:

$ RUST_BACKTRACE=1 cargo run Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep) Finished dev [unoptimized + debuginfo] target(s) in 2.18s Running `target/debug/waytoodeep` Jul 25 17:46:53.926 INFO waytoodeep: Building that dumb future... Jul 25 17:46:53.926 INFO waytoodeep: Awaiting that dumb future... zsh: segmentation fault (core dumped) RUST_BACKTRACE=1 cargo run

However, GDB can!

$ cargo build && gdb --quiet --args ./target/debug/waytoodeep Finished dev [unoptimized + debuginfo] target(s) in 0.04s Reading symbols from ./target/debug/waytoodeep... warning: Missing auto-load script at offset 0 in section .debug_gdb_scripts of file /home/amos/ftl/waytoodeep/target/debug/waytoodeep. Use `info auto-load python-scripts [REGEXP]' to list them. (gdb) r Starting program: /home/amos/ftl/waytoodeep/target/debug/waytoodeep [Thread debugging using libthread_db enabled] Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1". [New Thread 0x7ffff7c28700 (LWP 129418)] [New Thread 0x7ffff7a27700 (LWP 129419)] [New Thread 0x7ffff7826700 (LWP 129420)] [New Thread 0x7ffff7625700 (LWP 129421)] [New Thread 0x7ffff7424700 (LWP 129422)] [New Thread 0x7ffff7223700 (LWP 129423)] [New Thread 0x7ffff7022700 (LWP 129424)] [New Thread 0x7ffff6e1e700 (LWP 129425)] [New Thread 0x7ffff6c1a700 (LWP 129426)] [New Thread 0x7ffff6a16700 (LWP 129427)] [New Thread 0x7ffff6812700 (LWP 129428)] [New Thread 0x7ffff660e700 (LWP 129429)] [New Thread 0x7ffff640a700 (LWP 129430)] [New Thread 0x7ffff6206700 (LWP 129431)] [New Thread 0x7ffff6002700 (LWP 129432)] Jul 25 17:47:13.278 INFO waytoodeep: Building that dumb future... Jul 25 17:47:13.279 INFO waytoodeep: Awaiting that dumb future... Thread 1 "waytoodeep" received signal SIGSEGV, Segmentation fault. <waytoodeep::dumb::DumbFuture as core::future::future::Future>::poll (self=..., _cx=0x7fffffffd690) at src/dumb.rs:15 15 *(0xF00D as *mut u64) = 0x0; (gdb) bt #0 <waytoodeep::dumb::DumbFuture as core::future::future::Future>::poll (self=..., _cx=0x7fffffffd690) at src/dumb.rs:15 #1 0x00005555555ab3a3 in waytoodeep::main::{{closure}} () at src/main.rs:17 #2 0x00005555555adb29 in <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll (self=..., cx=0x7fffffffd690) at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/mod.rs:80 #3 0x00005555555adaa0 in tokio::park::thread::CachedParkThread::block_on::{{closure}} () at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/park/thread.rs:263 #4 0x00005555555b1742 in tokio::coop::with_budget::{{closure}} (cell=0x7ffff7c2c412) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/coop.rs:106 #5 0x00005555555a9f58 in std::thread::local::LocalKey<T>::try_with (self=0x555555925fc0, f=...) at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:376 #6 0x00005555555a9e3d in std::thread::local::LocalKey<T>::with (self=0x555555925fc0, f=...) at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:352 #7 0x00005555555ad7c8 in tokio::coop::with_budget (budget=..., f=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/coop.rs:99 #8 tokio::coop::budget (f=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/coop.rs:76 #9 tokio::park::thread::CachedParkThread::block_on (self=0x7fffffffd7a0, f=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/park/thread.rs:263 #10 0x00005555555abcc9 in tokio::runtime::enter::Enter::block_on (self=0x7fffffffd7f0, f=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/runtime/enter.rs:151 #11 0x00005555555acf2e in tokio::runtime::thread_pool::ThreadPool::block_on (self=0x7fffffffd908, future=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/runtime/thread_pool/mod.rs:71 #12 0x00005555555b0dfd in tokio::runtime::Runtime::block_on (self=0x7fffffffd900, future=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/runtime/mod.rs:452 #13 0x00005555555aa807 in waytoodeep::main () at src/main.rs:20 (gdb)

Again, you're missing out on the pretty colors, here's a peek:

Which is just wonderful.

But back to actually useful code, let's remove all traces of our dumb future for now (ie. the mod dumb and the src/dumb.rs file), and do it with a fetch future instead:

#[tokio::main] async fn main() -> Result<(), Report> { setup()?; info!("Building that fetch future..."); let client = Client::new(); let fut = fetch_thing(&client, URL_1); info!("Awaiting that fetch future..."); fut.await?; info!("Done awaiting that fetch future"); Ok(()) }

$ RUST_BACKTRACE=1 cargo run Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep) Finished dev [unoptimized + debuginfo] target(s) in 2.99s Running `target/debug/waytoodeep` Jul 25 17:51:49.281 INFO waytoodeep: Building that fetch future... Jul 25 17:51:49.282 INFO waytoodeep: Awaiting that fetch future... Jul 25 17:51:49.437 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 17:51:49.438 INFO waytoodeep: Done awaiting that fetch future

Okay! Same idea.

There's two ways to think about our function here. There's the syntactic-sugar-coated way: that it's an "async fn":

async fn fetch_thing(client: &Client, url: &str) -> Result<(), Report> { let res = client.get(url).send().await?.error_for_status()?; info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!"); Ok(()) }

And then there's the crunchy-core way: that it's a regular "fn" that just happens to return a future:

use std::future::Future; fn fetch_thing<'a>( client: &'a Client, url: &'a str, ) -> impl Future<Output = Result<(), Report>> + 'a { async move { let res = client.get(url).send().await?.error_for_status()?; info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!"); Ok(()) } }

And since it borrows from both client and url, the Future can only live as long as they both do, which is why I've named a lifetime 'a above, and the return type is "something that implements Future (with that Output) and also lives for 'a".

And the whole async move {} block is also just "building state" - it evaluates to a type that implements Future.

We just can't name it.

We can get a description of it, at best:

fn type_name_of<T>(_: &T) -> &'static str { std::any::type_name::<T>() } // in main #[tokio::main] async fn main() -> Result<(), Report> { setup()?; info!("Building that fetch future..."); let client = Client::new(); let fut = fetch_thing(&client, URL_1); info!( type_name = type_name_of(&fut), "That fetch future has a type.." ); info!("Awaiting that fetch future..."); fut.await?; info!("Done awaiting that fetch future"); Ok(()) }

$ cargo run Finished dev [unoptimized + debuginfo] target(s) in 0.05s Running `target/debug/waytoodeep` Jul 25 18:00:39.774 INFO waytoodeep: Building that fetch future... Jul 25 18:00:39.775 INFO waytoodeep: That fetch future has a type.. type_name="core::future::from_generator::GenFuture<waytoodeep::fetch_thing::{{closure}}>" Jul 25 18:00:39.775 INFO waytoodeep: Awaiting that fetch future... Jul 25 18:00:39.882 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 18:00:39.882 INFO waytoodeep: Done awaiting that fetch future

...but yeah, it's a type generated by the compiler because we're using the async syntax. We can't name it in the sense that we can't declare a binding of that type, or write a function that accepts only that type.

And to really convince ourselves that our future is not doing any work until we actually poll it, well, we can turn on debug logging for reqwest:

$ RUST_LOG=info,reqwest=debug cargo run Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep) Finished dev [unoptimized + debuginfo] target(s) in 3.07s Running `target/debug/waytoodeep` Jul 25 18:05:07.384 INFO waytoodeep: Building that fetch future... Jul 25 18:05:07.385 INFO waytoodeep: That fetch future has a type.. type_name="core::future::from_generator::GenFuture<waytoodeep::fetch_thing::{{closure}}>" Jul 25 18:05:07.385 INFO waytoodeep: Awaiting that fetch future... Jul 25 18:05:07.385 DEBUG reqwest::connect: starting new connection: https://fasterthanli.me/ Jul 25 18:05:07.503 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/articles/whats-in-the-box Jul 25 18:05:07.503 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 18:05:07.503 INFO waytoodeep: Done awaiting that fetch future

Or even for every crate, so we can hear from hyper and h2:

$ RUST_LOG=debug cargo run Finished dev [unoptimized + debuginfo] target(s) in 0.04s Running `target/debug/waytoodeep` Jul 25 18:05:59.973 INFO waytoodeep: Building that fetch future... Jul 25 18:05:59.973 INFO waytoodeep: That fetch future has a type.. type_name="core::future::from_generator::GenFuture<waytoodeep::fetch_thing::{{closure}}>" Jul 25 18:05:59.973 INFO waytoodeep: Awaiting that fetch future... Jul 25 18:05:59.974 DEBUG reqwest::connect: starting new connection: https://fasterthanli.me/ Jul 25 18:05:59.974 DEBUG hyper::client::connect::dns: resolving host="fasterthanli.me" Jul 25 18:05:59.989 DEBUG hyper::client::connect::http: connecting to 172.67.196.144:443 Jul 25 18:06:00.000 DEBUG hyper::client::connect::http: connected to 172.67.196.144:443 Jul 25 18:06:00.000 DEBUG rustls::client::hs: No cached session for DNSNameRef("fasterthanli.me") Jul 25 18:06:00.000 DEBUG rustls::client::hs: Not resuming any session Jul 25 18:06:00.016 DEBUG rustls::client::hs: Using ciphersuite TLS13_CHACHA20_POLY1305_SHA256 Jul 25 18:06:00.016 DEBUG rustls::client::tls13: Not resuming Jul 25 18:06:00.017 DEBUG rustls::client::tls13: TLS1.3 encrypted extensions: [ServerNameAck, Protocols([PayloadU8([104, 50])])] Jul 25 18:06:00.017 DEBUG rustls::client::hs: ALPN protocol is Some(b"h2") Jul 25 18:06:00.018 DEBUG h2::client: binding client connection Jul 25 18:06:00.018 DEBUG h2::client: client connection bound Jul 25 18:06:00.018 DEBUG h2::codec::framed_write: send frame=Settings { flags: (0x0), enable_push: 0, initial_window_size: 2097152, max_frame_size: 16384 } Jul 25 18:06:00.019 DEBUG Connection{peer=Client}: h2::codec::framed_write: send frame=WindowUpdate { stream_id: StreamId(0), size_increment: 5177345 } Jul 25 18:06:00.019 DEBUG hyper::client::pool: pooling idle connection for ("https", fasterthanli.me) Jul 25 18:06:00.020 DEBUG Connection{peer=Client}: h2::codec::framed_write: send frame=Headers { stream_id: StreamId(1), flags: (0x5: END_HEADERS | END_STREAM) } Jul 25 18:06:00.029 DEBUG Connection{peer=Client}: rustls::client::tls13: Ticket saved Jul 25 18:06:00.029 DEBUG Connection{peer=Client}: rustls::client::tls13: Ticket saved Jul 25 18:06:00.029 DEBUG Connection{peer=Client}: h2::codec::framed_read: received frame=Settings { flags: (0x0), max_concurrent_streams: 256, initial_window_size: 65536, max_frame_size: 16777215 } Jul 25 18:06:00.030 DEBUG Connection{peer=Client}: h2::codec::framed_write: send frame=Settings { flags: (0x1: ACK) } Jul 25 18:06:00.030 DEBUG Connection{peer=Client}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 2147418112 } Jul 25 18:06:00.041 DEBUG Connection{peer=Client}: h2::codec::framed_read: received frame=Settings { flags: (0x1: ACK) } Jul 25 18:06:00.041 DEBUG Connection{peer=Client}: h2::proto::settings: received settings ACK; applying Settings { flags: (0x0), enable_push: 0, initial_window_size: 2097152, max_frame_size: 16384 } Jul 25 18:06:00.120 DEBUG Connection{peer=Client}: h2::codec::framed_read: received frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) } Jul 25 18:06:00.120 DEBUG Connection{peer=Client}: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1) } Jul 25 18:06:00.121 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/articles/whats-in-the-box Jul 25 18:06:00.121 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 18:06:00.121 INFO waytoodeep: Done awaiting that fetch future Jul 25 18:06:00.121 DEBUG Connection{peer=Client}: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1) } Jul 25 18:06:00.122 DEBUG Connection{peer=Client}: h2::codec::framed_write: send frame=Reset { stream_id: StreamId(1), error_code: CANCEL } Jul 25 18:06:00.122 DEBUG Connection{peer=Client}: h2::codec::framed_write: send frame=GoAway { error_code: NO_ERROR, last_stream_id: StreamId(0) } Jul 25 18:06:00.122 DEBUG Connection{peer=Client}: h2::proto::connection: Connection::poll; connection error error=NO_ERROR Jul 25 18:06:00.122 DEBUG Connection{peer=Client}: rustls::session: Sending warning alert CloseNotify

Oh look, we are using rustls! And TLS 1.3!

TLS 1.3? You mean the thing I made a video about?

Oh that's right, I forgot you make videos now.

...let's say "now and then".

So that should be enough to convince you, unless you really only trust what the kernel says (and even then...), so let's ask what strace thinks just to be extra sure.

And also add a one-second sleep before we await the future, just to be extra sure:

use tokio::time::sleep; use std::time::Duration; #[tokio::main] async fn main() -> Result<(), Report> { setup()?; info!("Building that fetch future..."); let client = Client::new(); let fut = fetch_thing(&client, URL_1); info!("Sleeping for a bit..."); sleep(Duration::from_secs(1)).await; info!("Awaiting that fetch future..."); fut.await?; info!("Done awaiting that fetch future"); Ok(()) }

$ cargo build && strace -e 'connect' ./target/debug/waytoodeep Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep) Finished dev [unoptimized + debuginfo] target(s) in 3.13s Jul 25 18:09:36.595 INFO waytoodeep: Building that fetch future... Jul 25 18:09:36.596 INFO waytoodeep: Sleeping for a bit... Jul 25 18:09:37.599 INFO waytoodeep: Awaiting that fetch future... connect(9, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("104.21.92.169")}, 16) = -1 EINPROGRESS (Operation now in progress) Jul 25 18:09:37.720 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 18:09:37.721 INFO waytoodeep: Done awaiting that fetch future +++ exited with 0 +++

Mh again colors make that a lot more readable, I just love colors as long as I don't have to pick them. Here's how it looks for me:

And because tracing-subscriber's default formatter shows timestamps, you can see it is sleeping for one second (and 3 milliseconds), and only when we await it does it connect to whichever CDN node is serving that article today.

Okay, so! Let's try fetching two things again:

#[tokio::main] async fn main() -> Result<(), Report> { setup()?; let client = Client::new(); let fut1 = fetch_thing(&client, URL_1); let fut2 = fetch_thing(&client, URL_2); fut1.await?; fut2.await?; Ok(()) }

And still look at some debug logs, but less of them:

$ RUST_LOG=info,reqwest=debug cargo run --quiet Jul 25 18:31:47.396 DEBUG reqwest::connect: starting new connection: https://fasterthanli.me/ Jul 25 18:31:47.536 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/articles/whats-in-the-box Jul 25 18:31:47.537 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 18:31:47.627 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/series/advent-of-code-2020/part-13 Jul 25 18:31:47.627 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")

Okay! Interesting. So from what I'm seeing here, reqwest re-uses the same connection for both requests. I say that because I only see one reqwest::connect log line.

Let's do a quick strace check:

$ cargo build --quiet && strace -e 'connect' ./target/debug/waytoodeep > /dev/null connect(9, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("172.67.196.144")}, 16) = -1 EINPROGRESS (Operation now in progress) +++ exited with 0 +++

Yup sure enough, only one connection.

But still, we're waiting for the first request to be done before making the second request. The first one took... 536-396 = 140 milliseconds, and the second one took 627-537 = 90 milliseconds!

Roughly.

Ah, but aren't we running a debug build of our application right now?

That is true. I'm fairly sure our problem is IO-bound though, not CPU-bound.

There's definitely some overhead associated with a debug build of that app, but I doubt it has a significant impact on the latency here. Let's check anyway:

(mind the --release)

$ RUST_LOG=info,reqwest=debug cargo run --quiet --release Jul 25 18:34:59.211 DEBUG reqwest::connect: starting new connection: https://fasterthanli.me/ Jul 25 18:34:59.343 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/articles/whats-in-the-box Jul 25 18:34:59.343 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 18:34:59.427 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/series/advent-of-code-2020/part-13 Jul 25 18:34:59.427 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")

Okay here we got 343-211 = 132ms, then 427-343 = 84ms.

A difference of a few milliseconds like that, could just as well be caused by the neighbor clicking on another YouTube video, resulting in a BURST of radio waves, which resulted in collisions (there's no air traffic controller for 802.11, it's a free-for-all!) and retransmissions and would explain slight latency changes like that.

Or another million reasons. That's why we don't do those measurements like that.

But back to the important thing.

It's waiting for the first one to finish

It is! It is waiting for the first one. So how do we make our program do both requests at the same time?

Well, there's a bunch of ways!

For example, we could spawn the futures on an executor, and then sleep for a second. That would be enough, right? One second?

#[tokio::main] async fn main() -> Result<(), Report> { setup()?; let client = Client::new(); let fut1 = fetch_thing(&client, URL_1); tokio::spawn(fut1); let fut2 = fetch_thing(&client, URL_2); tokio::spawn(fut2); tokio::time::sleep(Duration::from_secs(1)).await; Ok(()) }

$ RUST_LOG=info,reqwest=debug cargo run --quiet --release error[E0597]: `client` does not live long enough --> src/main.rs:17:28 | 17 | let fut1 = fetch_thing(&client, URL_1); | ------------^^^^^^^-------- | | | | | borrowed value does not live long enough | argument requires that `client` is borrowed for `'static` ... 25 | } | - `client` dropped here while still borrowed error: aborting due to previous error For more information about this error, try `rustc --explain E0597`. error: could not compile `waytoodeep` To learn more, run the command again with --verbose.

Ah, unless we can't.

And we can't because...

Oooh ooh I got this one! So by "spawning the future on the executor", we hand our future off to the executor, right? We transfer ownership of it and everything?

Correct.

And then even if we don't await it, the future we just spawned is just part of "what the executor needs to do", so it's being polled even if we return from main.

But if we return from main the whole program ex-

Yeah okay sure, here it's main but it could be any function. The point is we could return from our function, from which the future is borrowing some data, and that makes the borrow checker very sad.

Right!

And it makes me very, very happy, because it means we cannot accidentally access some data after it's been deallocated. The old UAF.

But it is getting in the way of this example.

So... we have to find another way. What if the future returned by fetch_thing was 'static? What if it didn't borrow anything?

Currently it looks like this:

use std::future::Future; fn fetch_thing<'a>( client: &'a Client, url: &'a str, ) -> impl Future<Output = Result<(), Report>> + 'a { async move { let res = client.get(url).send().await?.error_for_status()?; info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!"); Ok(()) } }

Well it used to look like an async fn but we just had to go ahead and eschew the nice syntax for the sake of gaining some understanding.

Which is fortunate because really what we want is something like this:

fn fetch_thing<'a>( client: &'a Client, url: &'a str, // ๐Ÿ‘‡ ) -> impl Future<Output = Result<(), Report>> + 'static {}

But hm since we currently borrow from client and url, we have to solve these.

url is the easy one, since we're using those consts:

pub const URL_1: &str = "https://fasterthanli.me/articles/whats-in-the-box"; pub const URL_2: &str = "https://fasterthanli.me/series/advent-of-code-2020/part-13";

...they're already 'static. So we can just require url to be 'static as well.

fn fetch_thing<'a>( client: &'a Client, // ๐Ÿ‘‡ url: &'static str, ) -> impl Future<Output = Result<(), Report>> + 'static {}

Alright! One lifetime down, one to go.

Well, we could require client itself to live for static. And since it's a reference to a Client, that means the Client itself must live for static.

fn fetch_thing( // ๐Ÿ‘‡ client: &'static Client, url: &'static str, ) -> impl Future<Output = Result<(), Report>> + 'static {}

And since it's owned by main, uhh, we can, we can... we can leak it.

#[tokio::main] async fn main() -> Result<(), Report> { setup()?; let client = Client::new(); let leaked_client = Box::leak(Box::new(client)); let fut1 = fetch_thing(leaked_client, URL_1); let fut2 = fetch_thing(leaked_client, URL_2); tokio::spawn(fut1); tokio::spawn(fut2); tokio::time::sleep(Duration::from_secs(1)).await; Ok(()) }

There! No lifetime problems.

Just leak everything. See? You don't need C!

$ RUST_LOG=info,reqwest=debug cargo run --quiet --release Jul 25 18:54:53.614 DEBUG reqwest::connect: starting new connection: https://fasterthanli.me/ Jul 25 18:54:53.614 DEBUG reqwest::connect: starting new connection: https://fasterthanli.me/ Jul 25 18:54:53.708 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/articles/whats-in-the-box Jul 25 18:54:53.708 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 18:54:53.733 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/series/advent-of-code-2020/part-13 Jul 25 18:54:53.733 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")

Iiiiiinteresting.

So our two requests are definitely going out concurrently, we know because we know making a request from my laptop to my website takes between 80ms and 140ms, but in the logs we can see a ~25ms interval between both responses coming in.

We can also see that reqwest, which has a connection pooling mechanism, is immediately creating two connections, probably because by the time we fire off the second request, the first request's connection is not done establishing yet.

And that means strace should see...

$ cargo build --quiet --release && strace -e 'connect' ./target/release/waytoodeep Jul 25 18:58:16.425 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 18:58:16.443 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8") +++ exited with 0 +++

...two connect calls! I knew it!

Amos I hate to break it to you.

There's zero connect calls there.

How efficient. With Rust you don't even need to establish TCP connections to make HTTP/2 requests. Truly revolutionary.

Okay uh that can't be right. Maybe it's doing it in another thread? And maybe strace only traces the main thread by default?

Ah, there, -f should trace all "children processes", and as everyone knows, on Linux threads are just processes in a trenchcoat (or is it the other way around), so, off we go:

strace: Process 154612 attached strace: Process 154613 attached strace: Process 154614 attached strace: Process 154615 attached strace: Process 154616 attached strace: Process 154617 attached strace: Process 154618 attached strace: Process 154619 attached strace: Process 154620 attached strace: Process 154621 attached strace: Process 154622 attached strace: Process 154623 attached strace: Process 154624 attached strace: Process 154625 attached strace: Process 154626 attached strace: Process 154627 attached strace: Process 154628 attached [pid 154627] connect(9, {sa_family=AF_UNIX, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory) [pid 154628] connect(10, {sa_family=AF_UNIX, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory) [pid 154627] connect(9, {sa_family=AF_UNIX, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory) [pid 154628] connect(9, {sa_family=AF_INET, sin_port=htons(53), sin_addr=inet_addr("127.0.0.53")}, 16) = 0 [pid 154627] connect(10, {sa_family=AF_INET, sin_port=htons(53), sin_addr=inet_addr("127.0.0.53")}, 16) = 0 [pid 154627] connect(9, {sa_family=AF_INET6, sin6_port=htons(0), sin6_flowinfo=htonl(0), inet_pton(AF_INET6, "2606:4700:3034::6815:5ca9", &sin6_addr), sin6_scope_id=0}, 28) = -1 ENETUNREACH (Network is unreachable) [pid 154627] connect(9, {sa_family=AF_UNSPEC, sa_data="\0\0\0\0\0\0\0\0\0\0\0\0\0\0"}, 16) = 0 [pid 154627] connect(9, {sa_family=AF_INET6, sin6_port=htons(0), sin6_flowinfo=htonl(0), inet_pton(AF_INET6, "2606:4700:3031::ac43:c490", &sin6_addr), sin6_scope_id=0}, 28) = -1 ENETUNREACH (Network is unreachable) [pid 154627] connect(9, {sa_family=AF_UNSPEC, sa_data="\0\0\0\0\0\0\0\0\0\0\0\0\0\0"}, 16) = 0 [pid 154627] connect(9, {sa_family=AF_INET, sin_port=htons(0), sin_addr=inet_addr("104.21.92.169")}, 16) = 0 [pid 154627] connect(9, {sa_family=AF_UNSPEC, sa_data="\0\0\0\0\0\0\0\0\0\0\0\0\0\0"}, 16) = 0 [pid 154627] connect(9, {sa_family=AF_INET, sin_port=htons(0), sin_addr=inet_addr("172.67.196.144")}, 16) = 0 [pid 154628] connect(10, {sa_family=AF_INET6, sin6_port=htons(0), sin6_flowinfo=htonl(0), inet_pton(AF_INET6, "2606:4700:3034::6815:5ca9", &sin6_addr), sin6_scope_id=0}, 28) = -1 ENETUNREACH (Network is unreachable) [pid 154628] connect(10, {sa_family=AF_UNSPEC, sa_data="\0\0\0\0\0\0\0\0\0\0\0\0\0\0"}, 16) = 0 [pid 154628] connect(10, {sa_family=AF_INET6, sin6_port=htons(0), sin6_flowinfo=htonl(0), inet_pton(AF_INET6, "2606:4700:3031::ac43:c490", &sin6_addr), sin6_scope_id=0}, 28) = -1 ENETUNREACH (Network is unreachable) [pid 154628] connect(10, {sa_family=AF_UNSPEC, sa_data="\0\0\0\0\0\0\0\0\0\0\0\0\0\0"}, 16) = 0 [pid 154628] connect(10, {sa_family=AF_INET, sin_port=htons(0), sin_addr=inet_addr("104.21.92.169")}, 16) = 0 [pid 154628] connect(10, {sa_family=AF_UNSPEC, sa_data="\0\0\0\0\0\0\0\0\0\0\0\0\0\0"}, 16) = 0 [pid 154628] connect(10, {sa_family=AF_INET, sin_port=htons(0), sin_addr=inet_addr("172.67.196.144")}, 16) = 0 [pid 154625] connect(9, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("104.21.92.169")}, 16) = -1 EINPROGRESS (Operation now in progress) [pid 154626] connect(10, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("104.21.92.169")}, 16) = -1 EINPROGRESS (Operation now in progress) Jul 25 19:00:53.862 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 19:00:53.880 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8") [pid 154628] +++ exited with 0 +++ [pid 154627] +++ exited with 0 +++ [pid 154618] +++ exited with 0 +++ [pid 154614] +++ exited with 0 +++ [pid 154612] +++ exited with 0 +++ [pid 154619] +++ exited with 0 +++ [pid 154617] +++ exited with 0 +++ [pid 154613] +++ exited with 0 +++ [pid 154615] +++ exited with 0 +++ [pid 154623] +++ exited with 0 +++ [pid 154616] +++ exited with 0 +++ [pid 154624] +++ exited with 0 +++ [pid 154621] +++ exited with 0 +++ [pid 154622] +++ exited with 0 +++ [pid 154626] +++ exited with 0 +++ [pid 154620] +++ exited with 0 +++ [pid 154625] +++ exited with 0 +++ +++ exited with 0 +++shell

Wowee that's a whole bunch of connects.

So first it tries to connect to nscd because apparently we still live in the 90s:

[pid 154627] connect(9, {sa_family=AF_UNIX, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory)

...thankfully my system doesn't have it, so it moves on to whatever /etc/resolv.conf says to use to make DNS lookups...

[pid 154628] connect(9, {sa_family=AF_INET, sin_port=htons(53), sin_addr=inet_addr("127.0.0.53")}, 16) = 0

And then it finally gets a bunch of results like 172.67.196.144 and 104.21.92.169, which are Cloudflare IP addresses, and also some IPv6 stuff which doesn't work because I forcibly disabled IPv6 in my weird "HyperV VM on Windows 11" setup:

[pid 154627] connect(9, {sa_family=AF_INET6, sin6_port=htons(0), sin6_flowinfo=htonl(0), inet_pton(AF_INET6, "2606:4700:3034::6815:5ca9", &sin6_addr), sin6_scope_id=0}, 28) = -1 ENETUNREACH (Network is unreachable)

And finally it decides to use the IPv4 address 104.21.92.169 for both requests, and we can see here that those are non-blocking connects because instead of returning 0 it returns -1 which means "I'm doing it, I'm doing it, check back later".

[pid 154625] connect(9, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("104.21.92.169")}, 16) = -1 EINPROGRESS (Operation now in progress) [pid 154626] connect(10, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("104.21.92.169")}, 16) = -1 EINPROGRESS (Operation now in progress)

Okay! So we have two connects. If we ignore DNS.

We also have a bunch of threads.

Is this how async Rust works? We just have a bunch of threads? And that's how it can do work "in the background"?

Before we answer that question, let's change our code to actually wait for both futures to be done, instead of waiting for an arbitrary one second.

#[tokio::main] async fn main() -> Result<(), Report> { setup()?; let client = Client::new(); let leaked_client = Box::leak(Box::new(client)); let fut1 = fetch_thing(leaked_client, URL_1); let fut2 = fetch_thing(leaked_client, URL_2); let handle1 = tokio::spawn(fut1); let handle2 = tokio::spawn(fut2); handle1.await.unwrap()?; handle2.await.unwrap()?; Ok(()) }

Wait, aren't we back to the point where we wait for the first request to be complete, and then we fire off the second request?

Well, no! See, if we run it a bunch of times:

$ RUST_LOG=info cargo run --quiet --release Jul 25 19:11:07.934 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 19:11:07.958 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8") $ RUST_LOG=info cargo run --quiet --release Jul 25 19:11:08.676 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 19:11:08.680 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8") $ RUST_LOG=info cargo run --quiet --release Jul 25 19:11:09.325 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 19:11:09.338 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8") $ RUST_LOG=info cargo run --quiet --release Jul 25 19:11:10.134 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8") Jul 25 19:11:10.144 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")

...well, "box" wins most of the time (it does have a headstart), but "advent" comes in first sometimes! Which is exactly what I was looking for.

Right. So it's doing things in parallel. Because it has threads.

No. But don't trust me, let's take a look.

It's not because of threads

Let's run our little program in GDB, mostly because I still haven't acquired LLDB muscle memory, which I'm sure will show up any day now, through no effort of my own.

$ cargo build --quiet && gdb --quiet --args ./target/debug/waytoodeep Reading symbols from ./target/debug/waytoodeep... warning: Missing auto-load script at offset 0 in section .debug_gdb_scripts of file /home/amos/ftl/waytoodeep/target/debug/waytoodeep. Use `info auto-load python-scripts [REGEXP]' to list them. (gdb)

There, good.

Now before we launch the program we're gonna set up a breakpoint. Did I say breakpoint? I meant catchpoint. I don't know the name of all the functions that are involved in making an HTTP/2 request, but I know the name of the syscall used to connect somewhere, and so, that's what we're going to break on. Or catch.

(gdb) catch syscall connect Catchpoint 1 (syscall 'connect' [42])

Now we're good to go!

$ Starting program: /home/amos/ftl/waytoodeep/target/debug/waytoodeep [Thread debugging using libthread_db enabled] Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1". [New Thread 0x7ffff7c28700 (LWP 158945)] [New Thread 0x7ffff7a27700 (LWP 158946)] [New Thread 0x7fffef826700 (LWP 158947)] [New Thread 0x7ffff7826700 (LWP 158948)] [New Thread 0x7ffff7625700 (LWP 158949)] [New Thread 0x7ffff7424700 (LWP 158950)] [New Thread 0x7ffff7223700 (LWP 158951)] [New Thread 0x7ffff701f700 (LWP 158952)] [New Thread 0x7ffff6e1e700 (LWP 158953)] [New Thread 0x7ffff6c1a700 (LWP 158954)] [New Thread 0x7ffff6a16700 (LWP 158955)] [New Thread 0x7ffff680f700 (LWP 158956)] [New Thread 0x7ffff660e700 (LWP 158957)] [New Thread 0x7ffff640a700 (LWP 158958)] [New Thread 0x7ffff6206700 (LWP 158959)] [New Thread 0x7ffff5f4b700 (LWP 158960)] [New Thread 0x7ffff5d4a700 (LWP 158961)] [Switching to Thread 0x7ffff5f4b700 (LWP 158960)] Thread 17 "tokio-runtime-w" hit Catchpoint 1 (call to syscall connect), 0x00007ffff7d5033b in __libc_connect (fd=fd@entry=9, addr=..., addr@entry=..., len=len@entry=110) at ../sysdeps/unix/sysv/linux/connect.c:26 26 ../sysdeps/unix/sysv/linux/connect.c: No such file or directory. (gdb)

Alright cool, that was fast! So we stopped in "Thread 17", which is named "tokio-runtime-w", because I guess all the other letters were taken already.

...the "w" is for "worker".

I don't know! Why do they truncate stuff like that?

Is this your first day at Unix?

Ok so, Thread 17, what are the other threads doing?

(gdb) info threads Id Target Id Frame 1 Thread 0x7ffff7c2c6c0 (LWP 158941) "waytoodeep" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 2 Thread 0x7ffff7c28700 (LWP 158945) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 3 Thread 0x7ffff7a27700 (LWP 158946) "tokio-runtime-w" 0x00007ffff7d4f5ce in epoll_wait (epfd=3, events=0x555556338b60, maxevents=1024, timeout=-1) at ../sysdeps/unix/sysv/linux/epoll_wait.c:30 4 Thread 0x7fffef826700 (LWP 158947) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 5 Thread 0x7ffff7826700 (LWP 158948) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 6 Thread 0x7ffff7625700 (LWP 158949) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 7 Thread 0x7ffff7424700 (LWP 158950) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 8 Thread 0x7ffff7223700 (LWP 158951) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 9 Thread 0x7ffff701f700 (LWP 158952) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 10 Thread 0x7ffff6e1e700 (LWP 158953) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 11 Thread 0x7ffff6c1a700 (LWP 158954) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 12 Thread 0x7ffff6a16700 (LWP 158955) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 13 Thread 0x7ffff680f700 (LWP 158956) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 14 Thread 0x7ffff660e700 (LWP 158957) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 15 Thread 0x7ffff640a700 (LWP 158958) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 16 Thread 0x7ffff6206700 (LWP 158959) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 * 17 Thread 0x7ffff5f4b700 (LWP 158960) "tokio-runtime-w" 0x00007ffff7d5033b in __libc_connect (fd=fd@entry=9, addr=..., addr@entry=..., len=len@entry=110) at ../sysdeps/unix/sysv/linux/connect.c:26 18 Thread 0x7ffff5d4a700 (LWP 158961) "tokio-runtime-w" 0x00007ffff7d48a46 in __GI___mmap64 (offset=0, fd=-1, flags=16418, prot=0, len=134217728, addr=0x0) at ../sysdeps/unix/sysv/linux/mmap64.c:59

Could we maybe get one more stackframe?

(gdb) thread apply all backtrace 2 Thread 18 (Thread 0x7ffff5d4a700 (LWP 158961)): #0 0x00007ffff7d48a46 in __GI___mmap64 (offset=0, fd=-1, flags=16418, prot=0, len=134217728, addr=0x0) at ../sysdeps/unix/sysv/linux/mmap64.c:59 #1 __GI___mmap64 (addr=addr@entry=0x0, len=len@entry=134217728, prot=prot@entry=0, flags=flags@entry=16418, fd=fd@entry=-1, offset=offset@entry=0) at ../sysdeps/unix/sysv/linux/mmap64.c:47 (More stack frames follow...) Thread 17 (Thread 0x7ffff5f4b700 (LWP 158960)): #0 0x00007ffff7d5033b in __libc_connect (fd=fd@entry=9, addr=..., addr@entry=..., len=len@entry=110) at ../sysdeps/unix/sysv/linux/connect.c:26 #1 0x00007ffff7d8b713 in open_socket (type=type@entry=GETFDHST, key=key@entry=0x7ffff7de5ccb "hosts", keylen=keylen@entry=6) at nscd_helper.c:185 (More stack frames follow...) Thread 16 (Thread 0x7ffff6206700 (LWP 158959)): #0 syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 #1 0x0000555555b9f1d1 in parking_lot_core::thread_parker::imp::ThreadParker::futex_wait (self=0x7ffff6206498, ts=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.8.3/src/thread_parker/linux.rs:112 (More stack frames follow...) Thread 15 (Thread 0x7ffff640a700 (LWP 158958)): #0 syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 #1 0x0000555555b9f1d1 in parking_lot_core::thread_parker::imp::ThreadParker::futex_wait (self=0x7ffff640a498, ts=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.8.3/src/thread_parker/linux.rs:112 (More stack frames follow...) Thread 14 (Thread 0x7ffff660e700 (LWP 158957)): #0 syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 #1 0x0000555555b9f1d1 in parking_lot_core::thread_parker::imp::ThreadParker::futex_wait (self=0x7ffff660e498, ts=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.8.3/src/thread_parker/linux.rs:112 (More stack frames follow...) Thread 13 (Thread 0x7ffff680f700 (LWP 158956)): #0 syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38 #1 0x0000555555b9f1d1 in parking_lot_core::thread_parker::imp::ThreadParker::futex_wait (self=0x7ffff680f498, ts=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.8.3/src/thread_parker/linux.rs:112 (More stack frames follow...)

Ah. They're mostly just parked. Which means they're idle. More accurately, they're waiting for work to do.

We can also look at all those threads from htop, I mean we already know they're there, but I just think htop is neat. Thanks Hisham!

So uh, I can't help but notice we have a bunch of threads, and we also have a bunch of cores. Maybe it's creating one thread per core?

Ah, yup. And then some blocking threads for good measure, because, as we've seen in the strace output above, it's making some blocking connect calls to do DNS lookups (well, glibc is doing that), so that all runs outside of the worker threads so it doesn't block the other tasks.

Okay yeah!

So it is threads. It can do multiple things at the same time because it has many threads.

Well bear, I'm scrolling the docs here and it says there's a single-threaded executor.

Yeah but if we use that then it'll do one request at a time! I mean it has to, right? Because it's doing all of that thanks to threads?

Here's the thing bear, I'm not so sure... let's try it out.

// ๐Ÿ‘‡ #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Report> { // (same as before) }

So now if we run it...

$ RUST_LOG=info cargo run --quiet --release Jul 25 19:50:15.977 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 19:50:15.994 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")

Huh! Those responses are...

17 milliseconds apart.

Yeah that's not enough time to make a full request.

So it does make the requests in parallel?

...concurrently.

So then it must use threads.

Oh what is it with bears and threads. Or am I thinking of cats.

No matter, let's make sure we only have one thread.

AhAH!

Yeah okay but those are the blocking threads. It's all because of DNS. You can see that there's no longer a zillion (15) worker threads:

(The reason there used to be 15 worker threads, by the way, is that I leave one core for the host operating system to do its thing so that even if my Linux VM is firing on all cylinders).

If we took DNS out of the equation then we'd see it actually is using just the one thread, which I guess we're gonna do because I can feel you're still a little skeptical.

Interlude: let's not leak memory

But before we do: it really bothers me that we're leaking the reqwest Client.

Instead of doing that, we can make it atomically-reference-counted, so that it lives as long as either task is still alive.

It's a pretty easy change:

// ๐Ÿ‘‡ Atomically Reference Counted = Arc use std::sync::Arc; #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Report> { setup()?; // ๐Ÿ‘‡ there we go let client = Arc::new(Client::new()); // ๐Ÿ‘‡ let fut1 = fetch_thing(client.clone(), URL_1); // (cloning it only increases the reference count) let fut2 = fetch_thing(client.clone(), URL_2); let handle1 = tokio::spawn(fut1); let handle2 = tokio::spawn(fut2); handle1.await.unwrap()?; handle2.await.unwrap()?; Ok(()) } #[allow(clippy::manual_async_fn)] fn fetch_thing( // ๐Ÿ‘‡ now taking this, we have shared ownership of it client: Arc<Client>, url: &'static str, ) -> impl Future<Output = Result<(), Report>> + 'static { async move { // luckily this ๐Ÿ‘‡ only requires `&self` let res = client.get(url).send().await?.error_for_status()?; info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!"); Ok(()) } }

There. I feel much better. We're no longer leaking dozens of bytes in our program that never runs for more than a couple seconds. All is right with the world.

Amos you're not gonna believe this, I looked at the definition of reqwest's Client and it's just this:

#[derive(Clone)] pub struct Client { inner: Arc<ClientRef>, }

Ah, well, turns out it's already reference-counted, so we can just take a Client:

#[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Report> { setup()?; // ๐Ÿ‘‡ let client = Client::new(); // ๐Ÿ‘‡ let fut1 = fetch_thing(client.clone(), URL_1); // no need to clone a second time let fut2 = fetch_thing(client, URL_2); let handle1 = tokio::spawn(fut1); let handle2 = tokio::spawn(fut2); handle1.await.unwrap()?; handle2.await.unwrap()?; Ok(()) } #[allow(clippy::manual_async_fn)] fn fetch_thing( // ๐Ÿ‘‡ client: Client, url: &'static str, ) -> impl Future<Output = Result<(), Report>> + 'static { async move { let res = client.get(url).send().await?.error_for_status()?; info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!"); Ok(()) } }

There.

Oh and for reference, the much simpler "async fn" version of that works just as well:

async fn fetch_thing(client: Client, url: &str) -> Result<(), Report> { let res = client.get(url).send().await?.error_for_status()?; info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!"); Ok(()) }

We don't even need to specifically request that url is borrowed for 'static. If it happens to be 'static, then the resulting Future will be too. If it's not, then it won't.

So for example, this breaks:

#[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Report> { setup()?; let client = Client::new(); // this is a `String`, owned by main let url1 = String::from(URL_1); // we're borrowing from main ๐Ÿ‘‡ let fut1 = fetch_thing(client.clone(), &url1); let fut2 = fetch_thing(client, URL_2); let handle1 = tokio::spawn(fut1); let handle2 = tokio::spawn(fut2); handle1.await.unwrap()?; handle2.await.unwrap()?; Ok(()) }

$ cargo check Checking waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep) error[E0597]: `url1` does not live long enough --> src/main.rs:18:44 | 18 | let fut1 = fetch_thing(client.clone(), &url1); | ----------------------------^^^^^- | | | | | borrowed value does not live long enough | argument requires that `url1` is borrowed for `'static` ... 28 | } | - `url1` dropped here while still borrowed

Very cool.

I mean, yes, until you change one bit of code and suddenly the whole Future isn't Send anymore, and you really need it to be Send, which is what that whole ordeal, I mean, article, was about.

Before we go any further, I also wanted to mention that, well, spawning two futures and immediately waiting for both of them to be done is sorta silly to do with tokio::spawn, here we could just as well reach for FuturesUnordered.

$ cargo add [email protected] Updating 'https://github.com/rust-lang/crates.io-index' index Adding futures v0.3.16 to dependencies

use futures::{stream::FuturesUnordered, StreamExt}; #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Report> { setup()?; let client = Client::new(); let mut group = vec![ fetch_thing(client.clone(), URL_1), fetch_thing(client, URL_2), ] .into_iter() .collect::<FuturesUnordered<_>>(); while let Some(item) = group.next().await { // propagate errors item?; } Ok(()) }

And with that solution, we can await an arbitrary number of futures, and they're still polled concurrently:

$ RUST_LOG=info cargo run --quiet --release Jul 25 20:12:37.208 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8") Jul 25 20:12:37.227 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")

Just... 19 milliseconds apart - those are concurrent for sure.

Let's get rid of DNS altogether

Okay so let's forget about reqwest for a moment.

HTTP isn't that hard, we can just speak it ourselves. All we need is TCP.

use std::net::SocketAddr; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::TcpStream, }; async fn fetch_thing(name: &str) -> Result<(), Report> { // look mom, no DNS! let addr: SocketAddr = ([1, 1, 1, 1], 80).into(); let mut socket = TcpStream::connect(addr).await?; // we're writing straight to the socket, there's no buffering // so no need to flush socket.write_all(b"GET / HTTP/1.1\r\n").await?; socket.write_all(b"Host: 1.1.1.1\r\n").await?; socket.write_all(b"User-Agent: cool-bear\r\n").await?; socket.write_all(b"Connection: close\r\n").await?; socket.write_all(b"\r\n").await?; let mut response = String::with_capacity(256); socket.read_to_string(&mut response).await?; let status = response.lines().next().unwrap_or_default(); info!(%status, %name, "Got response!"); // dropping the socket will close the connection Ok(()) }

Running it "works":

$ RUST_LOG=info cargo run --quiet --release Jul 25 20:24:05.158 INFO waytoodeep: Got response! status=HTTP/1.1 301 Moved Permanently name=second Jul 25 20:24:05.159 INFO waytoodeep: Got response! status=HTTP/1.1 301 Moved Permanently name=first

(Oh look, second won the race!)

And there's no filthy filthy DNS getting in the way anymore.

Of course http://1.1.1.1 redirects us to the HTTPS version of the page, and it's not technically that hard to just use rustls to speak TLS, but the article is getting long and I don't think we should be...

chanting TLS, TLS, TLS!

Ahhhhhh alright.

$ cargo add [email protected] Updating 'https://github.com/rust-lang/crates.io-index' index Adding tokio-rustls v0.22.0 to dependencies $ cargo add [email protected] Updating 'https://github.com/rust-lang/crates.io-index' index Adding webpki v0.21.4 to dependencies $ cargo add [email protected] Updating 'https://github.com/rust-lang/crates.io-index' index Adding webpki-roots v0.21.1 to dependencies

And while we're at it:

$ cargo rm reqwest Removing reqwest from dependencies

use std::sync::Arc; use webpki::DNSNameRef; use tokio_rustls::{rustls::ClientConfig, TlsConnector}; async fn fetch_thing(name: &str) -> Result<(), Report> { // look out it's port 443 now let addr: SocketAddr = ([1, 1, 1, 1], 443).into(); let socket = TcpStream::connect(addr).await?; // establish a TLS session... let connector: TlsConnector = { let mut config = ClientConfig::new(); config .root_store .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); Arc::new(config).into() }; // we have to use the proper DNS name now ๐Ÿ‘‡ let dnsname = DNSNameRef::try_from_ascii_str("one.one.one.one")?; let mut socket = connector.connect(dnsname, socket).await?; // we're writing straight to the socket, there's no buffering // so no need to flush socket.write_all(b"GET / HTTP/1.1\r\n").await?; // ๐Ÿ‘‡ socket.write_all(b"Host: one.one.one.one\r\n").await?; socket.write_all(b"User-Agent: cool-bear\r\n").await?; socket.write_all(b"Connection: close\r\n").await?; socket.write_all(b"\r\n").await?; let mut response = String::with_capacity(256); socket.read_to_string(&mut response).await?; let status = response.lines().next().unwrap_or_default(); info!(%status, %name, "Got response!"); // dropping the socket will close the connection Ok(()) }

$ RUST_LOG=info cargo run --quiet --release Jul 25 20:31:32.627 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second Jul 25 20:31:32.658 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first

There! Now it's a 200. You happy?

Very.

Now can we talk about the thing the article is about?

You mean we haven't talked about it yet?

I mean, the goal was to understand Rust futures and I guess we've done some good progress.

But let's consider the following scenario: we want to perform two requests concurrently, either of which can fail, and we want to stop as soon as either request fails, or when both requests succeed.

tokio's try_join macro

As it turns out, there's a macro for that!

#[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Report> { setup()?; let res = tokio::try_join!(fetch_thing("first"), fetch_thing("second"),)?; info!(?res, "All done!"); Ok(()) }

Which does exactly what we wanted!

$ RUST_LOG=info cargo run --quiet --release Jul 25 20:44:52.150 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 20:44:52.165 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second Jul 25 20:44:52.165 INFO waytoodeep: All done! res=((), ())

Again, quick check: the responses are coming in 15ms apart - they're definitely being sent concurrently.

try_join! does the awaiting for us, and it's result-aware. If everything goes well, we end up with a tuple of the contents of the Ok variant of each of the future's results (in order).

So, we can have our futures return something:

// ๐Ÿ‘‡ async fn fetch_thing(name: &str) -> Result<&str, Report> { // (omitted) // ๐Ÿ‘‡ Ok(name) }

To convince ourselves that they're returned in order, no matter who wins the race:

$ RUST_LOG=info cargo run --quiet --release Jul 25 20:47:56.967 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second Jul 25 20:47:56.967 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 20:47:56.967 INFO waytoodeep: All done! res=("first", "second") $ RUST_LOG=info cargo run --quiet --release Jul 25 20:47:57.933 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 20:47:57.935 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second Jul 25 20:47:57.935 INFO waytoodeep: All done! res=("first", "second") $ RUST_LOG=info cargo run --quiet --release Jul 25 20:47:58.942 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second Jul 25 20:47:58.946 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 20:47:58.946 INFO waytoodeep: All done! res=("first", "second")

Alright cool. Where do we go from there?

Well, first, now that we're DNS-free, we can put to rest the notion that things are happening "at the same time" thanks to threads.

Because, well, if we run our program under strace, asking it to track childrens with -f (the f is for "follow children" btw):

$ cargo build --quiet --release && strace -f -e 'connect' ./target/release/waytoodeep connect(9, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("1.1.1.1")}, 16) = -1 EINPROGRESS (Operation now in progress) connect(10, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("1.1.1.1")}, 16) = -1 EINPROGRESS (Operation now in progress) Jul 25 20:51:54.004 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 20:51:54.013 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second Jul 25 20:51:54.015 INFO waytoodeep: All done! res=("first", "second") +++ exited with 0 +++

...we see two connect calls as expected, but no threads whatsoever. And in that run, the responses came back 9 milliseconds apart! Which is less than my ping to 1.1.1.1:

$ ping -c 1 1.1.1.1 PING 1.1.1.1 (1.1.1.1) 56(84) bytes of data. 64 bytes from 1.1.1.1: icmp_seq=1 ttl=57 time=13.7 ms --- 1.1.1.1 ping statistics --- 1 packets transmitted, 1 received, 0% packet loss, time 0ms rtt min/avg/max/mdev = 13.748/13.748/13.748/0.000 ms

Okay, okay, I'm convinced. It's not because of the threads. It's because of some event loop that the executor has. It's making non-blocking syscalls and uhh..

...and it's subscribing to events related to the resources it's managing, so it knows when a socket is ready to read from / write to, for example.

Ah, that makes sense. I mean, in theory. In practice I uhh.. so futures are a bunch of state... and ye shall await them, and uhh where does the subscribing happen?

Well, let's try making our own try_join - as a function, and only for exactly two futures. And we'll see what happens.

Ah, we've made our own future before, how bad could it be?

Pretty bad as it turns out

Let's start simple! So we want a function that takes two futures and returns a single future.

// in `src/main.rs` mod tj;

// in `src/tj.rs` use std::future::Future; pub fn try_join<A, B>(a: A, b: B) -> impl Future<Output = ()> where A: Future, B: Future, { todo!("implement me!"); }

Mh. It shouldn't return the empty tuple, it should return a tuple of... the successful results. Or the first error that came in.

So we have to add a few more generic type parameters: one for the error type (we'll assume both futures have the same error type), and one for the type of the Ok variant of each future.

pub fn try_join<A, B, AR, BR, E>(a: A, b: B) -> impl Future<Output = Result<(AR, BR), E>> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { todo!("implement me!"); }

Okay! It's a mouthful, but I think we got everything we need.

Note that we're using "impl Trait" syntax so we don't have to make our "try join future" type public. It really doesn't matter but it'll save us a few pub keywords, and my fingers are getting tired of typing. Oh so tired.

So, let's make that type!

It's gonna need to hold A, and B, and be aware of the AR, BR and E types, so, yeah, hope you're hungry for generic type parameter salad.

struct TryJoin<A, B, AR, BR, E> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { a: A, b: B, }

And now we can return that from try_join:

pub fn try_join<A, B, AR, BR, E>(a: A, b: B) -> impl Future<Output = Result<(AR, BR), E>> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { // so simple! TryJoin { a, b } }

Which I think illustrates very nicely the fact that creating the future is just building state. It doesn't do any work.

Of course this doesn't compile because TryJoin doesn't implement Future yet.

But not to worry! rust-analyzer can help us generate the missing bits:

use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; impl<A, B, AR, BR, E> Future for TryJoin<A, B, AR, BR, E> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { type Output = Result<(AR, BR), E>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { todo!() } }

And here's how we'd use it, if we'd actually finished it:

#[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Report> { setup()?; let res = tj::try_join(fetch_thing("first"), fetch_thing("second")).await?; info!(?res, "All done!"); Ok(()) }

But right now it just panics:

$ RUST_LOG=info cargo run --quiet --release The application panicked (crashed). Message: not yet implemented Location: src/tj.rs:32 Backtrace omitted. Run with RUST_BACKTRACE=1 environment variable to display it. Run with RUST_BACKTRACE=full to include source snippets.

So I guess we need to implement it!

Well, let's try polling at least one of our futures with it.

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let a = self.a.poll(cx); todo!() }

$ RUST_LOG=info cargo run --quiet --release error[E0599]: no method named `poll` found for type parameter `A` in the current scope --> src/tj.rs:32:24 | 32 | let a = self.a.poll(cx); | ^^^^ method not found in `A` | ::: /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:100:8 | 100 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; | ---- the method is available for `Pin<&mut A>` here | help: consider wrapping the receiver expression with the appropriate type | 32 | let a = Pin::new(&mut self.a).poll(cx); | ^^^^^^^^^^^^^ ^

Ah! Good start, good start.

Well I went into pinning in excruciating detail here, so let's just keep it short.

Methods usually have a receiver like that:

struct MyType { fn do_thing(&self) { println!("my value is {}", self.value) } }

Which is really just short for:

struct MyType { fn do_thing(self: &Self) { println!("my value is {}", self.value) } }

Where Self is MyType because we're in an impl MyType block.

Good? Well, there's a lot of other possible receiver types, and Pin<&mut Self> is one of them:

struct MyType { fn do_thing(self: Pin<&mut Self>) { // good luck!1 } }

And what this means is that MyType must be pinned somewhere - ie. it is guaranteed not to move. Unless it implements Unpin, and then it can be unpinned, moved, and re-pinned again.

For the rest of this article, we won't assume that our futures A and B are Unpin, which means we'll never move them ourselves (only drop them).

You can tell we don't require A and B to be Unpin because we didn't add a specific where clause to require it. If we did, we'd have an additional trait bound like this:

struct TryJoin<A, B, AR, BR, E> where // ๐Ÿ‘‡ A: Future<Output = Result<AR, E>> + Unpin, B: Future<Output = Result<BR, E>> + Unpin, {}

But we don't, so we can't assume A or B are Unpin.

So! Now our problem really is just pin projection.

We're holding a Pin<&mut TryJoin<A, B, ...>> and we want to be holding a Pin<&mut A> (because that's what we need to poll A).

In another situation, I would be reaching for something like the pin-project crate, or perhaps pin-project-lite, but the direction we're going in will make using pin-project really awkward so today, we unsafe instead.

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let a = unsafe { self.map_unchecked_mut(|this| &mut this.a) }; let a = a.poll(cx); todo!() }

That compiles. But we're using unsafe, which means that the compiler has officially stopped caring checking our work. We must enforce some invariants ourselves, by being vewy vewy careful and having peers review our work, and still getting it wrong occasionally but them's the breaks.

So now, we're able to poll a, which is fantastic. It returns either Poll::Ready(Result<AR, E>), if it's done, or Poll::Pending if it'll be done later.

We can match that:

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let a = unsafe { self.map_unchecked_mut(|this| &mut this.a) }; match a.poll(cx) { Poll::Pending => { info!("A is pending..."); return Poll::Pending; } Poll::Ready(res) => match res { Ok(_) => info!("A is ready!"), Err(e) => return Poll::Ready(Err(e)), }, } todo!() }

Here we log "A is pending" until A becomes ready. This might take a few turns: after all, we are doing nontrivial stuff. We're establishing a TCP connection, then a TLS session on top of that, then doing a bunch of separate writes, then finally reading everything until EOF (end of file).

And indeed, if we run it:

$ RUST_LOG=info cargo run --quiet --release Jul 25 22:54:14.227 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.239 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.239 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.252 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.252 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.478 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.478 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.478 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.478 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.513 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.513 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.513 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.513 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.513 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.514 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.522 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.522 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.522 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.522 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.522 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.523 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.523 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.530 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.530 INFO waytoodeep::tj: A is pending... Jul 25 22:54:14.530 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 22:54:14.530 INFO waytoodeep::tj: A is ready! The application panicked (crashed). Message: not yet implemented Location: src/tj.rs:46 Backtrace omitted. Run with RUST_BACKTRACE=1 environment variable to display it. Run with RUST_BACKTRACE=full to include source snippets.

We see that it does take a few turns.

Note that that code only returns Poll::Ready if A errors out, because we want to gather the results of both A and B.

So let's do the same with B:

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let a = unsafe { self.map_unchecked_mut(|this| &mut this.a) }; match a.poll(cx) { Poll::Pending => { info!("A is pending..."); return Poll::Pending; } Poll::Ready(res) => match res { Ok(_) => info!("A is ready!"), Err(e) => return Poll::Ready(Err(e)), }, } let b = unsafe { self.map_unchecked_mut(|this| &mut this.a) }; match b.poll(cx) { Poll::Pending => { info!("B is pending..."); return Poll::Pending; } Poll::Ready(res) => match res { Ok(_) => info!("B is ready!"), Err(e) => return Poll::Ready(Err(e)), }, } todo!() }

And.. whoops:

RUST_LOG=info cargo run --quiet --release error[E0382]: use of moved value: `self` --> src/tj.rs:46:26 | 33 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ---- move occurs because `self` has type `Pin<&mut TryJoin<A, B, AR, BR, E>>`, which does not implement the `Copy` trait 34 | let a = unsafe { self.map_unchecked_mut(|this| &mut this.a) }; | ------------------------------------- `self` moved due to this method call ... 46 | let b = unsafe { self.map_unchecked_mut(|this| &mut this.a) }; | ^^^^ value used here after move | note: this function takes ownership of the receiver `self`, which moves `self` --> /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/pin.rs:776:43 | 776 | pub unsafe fn map_unchecked_mut<U, F>(self, func: F) -> Pin<&'a mut U> | ^^^^

Right. map_unchecked_mut consumed self.

No worries though, we can use .as_mut():

// ๐Ÿ‘‡ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // ๐Ÿ‘‡ let a = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.a) }; match a.poll(cx) { Poll::Pending => { info!("A is pending..."); return Poll::Pending; } Poll::Ready(res) => match res { Ok(_) => info!("A is ready!"), Err(e) => return Poll::Ready(Err(e)), }, } // ๐Ÿ‘‡ let b = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.a) }; match b.poll(cx) { Poll::Pending => { info!("B is pending..."); return Poll::Pending; } Poll::Ready(res) => match res { Ok(_) => info!("B is ready!"), Err(e) => return Poll::Ready(Err(e)), }, } todo!() }

But this still doesn't work:

$ RUST_LOG=info cargo run --quiet --release (cut) Jul 25 22:57:07.913 INFO waytoodeep::tj: A is pending... Jul 25 22:57:07.913 INFO waytoodeep::tj: A is pending... Jul 25 22:57:07.913 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 22:57:07.913 INFO waytoodeep::tj: A is ready! The application panicked (crashed). Message: `async fn` resumed after completion Location: src/main.rs:24 Backtrace omitted. Run with RUST_BACKTRACE=1 environment variable to display it. Run with RUST_BACKTRACE=full to include source snippets.

See, as soon as a Future returns Poll::Ready, we should not poll it anymore. And why would we? It's already given us its output. And if the output is non-Copy, it might only be able to give it to us once.

So, we need to 1) keep track that A is done, and 2) store its output somewhere.

Well, we can just add a couple fields to our struct!

struct TryJoin<A, B, AR, BR, E> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { a: A, b: B, // ๐Ÿ‘‡ a_res: Option<AR>, b_res: Option<BR>, }

Let's not forget to initialize them:

pub fn try_join<A, B, AR, BR, E>(a: A, b: B) -> impl Future<Output = Result<(AR, BR), E>> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { TryJoin { a, b, // ๐Ÿ‘‡ a_res: None, b_res: None, } }

And now the plan is:

  • if a_res is Some, then we don't need to poll a because it already finished
  • same for b_res and b

Alright let's do it. Also, because we're already using unsafe code, and so we're already in charge of maintaining the invariants, I'm going to make an executive decision and pin-project both a and b in one fell swoop, like so:

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = unsafe { self.get_unchecked_mut() }; let (a, b) = unsafe { ( Pin::new_unchecked(&mut this.a), Pin::new_unchecked(&mut this.b), ) }; if this.a_res.is_none() { match a.poll(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(res) => match res { Ok(x) => this.a_res = Some(x), Err(e) => return Poll::Ready(Err(e)), }, } } if this.b_res.is_none() { match b.poll(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(res) => match res { Ok(x) => this.b_res = Some(x), Err(e) => return Poll::Ready(Err(e)), }, } } todo!() }

Alright, this should at least give a and b the opportunity to complete before we panic:

$ RUST_LOG=info cargo run --quiet --release Jul 25 23:11:03.851 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 23:11:04.380 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second The application panicked (crashed). Message: not yet implemented Location: src/tj.rs:69 Backtrace omitted. Run with RUST_BACKTRACE=1 environment variable to display it. Run with RUST_BACKTRACE=full to include source snippets.

Splendid! Now all we have to do is extract both results and return those.

// instead of the `todo!()`: if let (Some(_), Some(_)) = (&this.a_res, &this.b_res) { let a = this.a_res.take().unwrap(); let b = this.b_res.take().unwrap(); Poll::Ready(Ok((a, b))) } else { Poll::Pending }

And that works:

$ RUST_LOG=info cargo run --quiet --release Jul 25 23:13:32.497 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 23:13:32.829 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second Jul 25 23:13:32.829 INFO waytoodeep: All done! res=("first", "second")

...but it's not a try_join implementation. What we're doing is exactly the same as this:

// (pseudo-code, buncha things are missing) async fn try_join(a: A, b: B) { let a = self.a.await?; let b = self.b.await?; Ok((a, b)) }

ie. it's sequential. Remember, just because tokio's executor might use a bunch of threads doesn't automatically mean things are happening at the same time. Earlier, we had to use tokio::spawn, or UnorderedFutures, or try_join! to get that to happen.

So let's review... what happens when we poll a?

if this.a_res.is_none() { match a.poll(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(res) => match res { Ok(x) => this.a_res = Some(x), Err(e) => return Poll::Ready(Err(e)), }, } }

Mhh if it's pending we return pending, and so... ah, that's it already. We should not return if a is pending. Because what if b is already ready with an error?

What if, for example, we invoke our try_join like this:

info!("Joining..."); let res = tj::try_join( async move { sleep(Duration::from_millis(2000)).await; Ok(()) }, async move { sleep(Duration::from_millis(10)).await; Err::<(), Report>(color_eyre::eyre::eyre!("uh oh")) }, ) .await;

...then a takes 2 seconds to get ready, whereas b would return an error within 10 milliseconds, if only we polled it!

Alas, we do not:

$ RUST_LOG=info cargo run --quiet --release Jul 25 23:19:26.972 INFO waytoodeep: Joining... Jul 25 23:19:28.990 INFO waytoodeep: All done! res=Err( 0: uh oh Location: src/main.rs:28 (cut)

(Look at the timestamps)

The whole point of try_join is that it fails early: as soon as any of the promises returns Result::Err.

So we must poll a and b at the same time. Well... not strictly at the same time. We must poll them concurrently, every time our TryJoin future is polled, until the give an output.

That's a relatively easy fix - just don't return Poll::Pending when either future returns Poll::Pending!

Also, I'm tired of typing Poll::Ready and Poll<T> implements From<T>, so we can golf our way down using .into():

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = unsafe { self.get_unchecked_mut() }; let (a, b) = unsafe { ( Pin::new_unchecked(&mut this.a), Pin::new_unchecked(&mut this.b), ) }; if this.a_res.is_none() { if let Poll::Ready(res) = a.poll(cx) { match res { Ok(x) => this.a_res = Some(x), Err(e) => return Err(e).into(), } } } if this.b_res.is_none() { if let Poll::Ready(res) = b.poll(cx) { match res { Ok(x) => this.b_res = Some(x), Err(e) => return Err(e).into(), } } } if let (Some(_), Some(_)) = (&this.a_res, &this.b_res) { let a = this.a_res.take().unwrap(); let b = this.b_res.take().unwrap(); Ok((a, b)).into() } else { Poll::Pending } }

There! And that...

$ RUST_LOG=info cargo run --quiet --release Jul 25 23:22:40.238 INFO waytoodeep: Joining... Jul 25 23:22:40.253 INFO waytoodeep: All done! res=Err( 0: uh oh Location: src/main.rs:28 (cut)

...works! By which I mean it fails. As expected. Expected failure is success. Which sounds pessimistic but what fault of mine is it that they're always right eventually.

And if we go back to the way we used to invoke try_join:

#[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Report> { setup()?; info!("Joining..."); let res = tj::try_join(fetch_thing("first"), fetch_thing("second")).await?; info!(?res, "All done!"); Ok(()) }

We can see that the race is back on: sometimes first finishes first, sometimes it finishes second:

$ RUST_LOG=info cargo run --quiet --release Jul 25 23:25:25.925 INFO waytoodeep: Joining... Jul 25 23:25:26.224 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 23:25:26.236 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second Jul 25 23:25:26.236 INFO waytoodeep: All done! res=("first", "second") $ RUST_LOG=info cargo run --quiet --release Jul 25 23:25:26.937 INFO waytoodeep: Joining... Jul 25 23:25:27.237 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 23:25:27.242 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second Jul 25 23:25:27.242 INFO waytoodeep: All done! res=("first", "second") $ RUST_LOG=info cargo run --quiet --release Jul 25 23:25:27.865 INFO waytoodeep: Joining... Jul 25 23:25:28.164 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second Jul 25 23:25:28.818 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 23:25:28.818 INFO waytoodeep: All done! res=("first", "second") $ RUST_LOG=info cargo run --quiet --release Jul 25 23:25:30.153 INFO waytoodeep: Joining... Jul 25 23:25:31.477 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second Jul 25 23:25:31.496 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 23:25:31.496 INFO waytoodeep: All done! res=("first", "second")

...and yet the results are in the correct order.

Alright, we did it!

We can do better

And lucky for us, worse is better.

See, this type bothers me:

struct TryJoin<A, B, AR, BR, E> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { a: A, b: B, a_res: Option<AR>, b_res: Option<BR>, }

We don't need to have a_res until a has completed. And once it has completed, and a_res contains its result, we don't need a anymore.

In fact, it's stronger than that, we should never touch a anymore.

It almost sounds like we should have either A, or AR, but never both...

Mhhh. Mhhhhhhh.

SUM TYPES!

ahem sorry: sum types. This is a job for sum types.

It sure is, but which ones? Hahaha

...Amos

So! Sum types. Rust enums. These are exactly what we want. Let's make a type called State, and it'll have two variants: one for when it's still a future, and another for when it's a result. Easy!

enum State<F, T, E> where F: Future<Output = Result<T, E>>, { Future(F), Ok(T), }

Ah, this is gonna be great.

Let's give our TryJoin struct two of these:

struct TryJoin<A, B, AR, BR, E> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { a: State<A, AR, E>, b: State<B, BR, E>, }

The symmetry! Beautiful.

And initialize them properly:

pub fn try_join<A, B, AR, BR, E>(a: A, b: B) -> impl Future<Output = Result<(AR, BR), E>> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { TryJoin { a: State::Future(a), b: State::Future(b), } }

Cool cool cool. Now we just have to tune our poll method a little, so uh we have a Pin<&mut Self>, which we violently turn into a &mut Self...

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = unsafe { self.get_unchecked_mut() };

...which is fine, it's fine because we pinky promise to uphold the invariants, in this case it means we don't move what's inside of a State::Future.

And then if a is a State::Future we poll it, and if it's ready we propagate the error, or store the result for later...

if let State::Future(a) = &mut this.a { let a = unsafe { Pin::new_unchecked(a) }; if let Poll::Ready(res) = a.poll(cx) { match res { Ok(t) => this.a = State::Ok(t), Err(e) => return Err(e).into(), } } }

And we do the same with b...

// you can figure that one out, I believe in you

And then if they're both State::Ok, we're done! Otherwise we return Poll::Pending, so we uh:

match (this.a, this.b) { (State::Ok(a), State::Ok(b)) => Ok((a, b)).into(), _ => Poll::Pending, }

Ah, so nice.

Except it doesn't compile:

$ RUST_LOG=info cargo run --quiet --release error[E0507]: cannot move out of `this.a` which is behind a mutable reference --> src/tj.rs:65:16 | 65 | match (this.a, this.b) { | ^^^^^^ move occurs because `this.a` has type `State<A, AR, E>`, which does not implement the `Copy` trait error[E0507]: cannot move out of `this.b` which is behind a mutable reference --> src/tj.rs:65:24 | 65 | match (this.a, this.b) { | ^^^^^^ move occurs because `this.b` has type `State<B, BR, E>`, which does not implement the `Copy` trait error: aborting due to 2 previous errors For more information about this error, try `rustc --explain E0507`. error: could not compile `waytoodeep` To learn more, run the command again with --verbose.

Because uhh... all we have is a &mut Self. Not a Self.

We don't own ourselves. We merely borrow ourselves.

That's.. that's deep.

So, yeah, we cannot move things out of our members, because there's really nothing that prevents someone from polling us again. And in that case, we should panic.

We should?

We should panic!.

Of course things would be easier if we had a .take() method just like Option<T> has. Where it returns whatever the Option had, replacing it with None.

But we don't really have a None. We have State::Future, and State::Ok, but no "neutral" state.

So let's make one!

enum State<F, T, E> where F: Future<Output = Result<T, E>>, { Future(F), Ok(T), Gone, }

And now, we can replace both this.a and this.b with State::Gone... and whatever is returned, we own! So we can move out of it.

But also... we need to pattern match again.

Like so:

match (&this.a, &this.b) { (State::Ok(_), State::Ok(_)) => { let a = match std::mem::replace(&mut this.a, State::Gone) { State::Ok(t) => t, _ => unreachable!(), }; let b = match std::mem::replace(&mut this.b, State::Gone) { State::Ok(t) => t, _ => unreachable!(), }; Ok((a, b)).into() } _ => Poll::Pending, }

Which honestly... I've seen worse code. It's just not very DRY.

It does work great though!

$ RUST_LOG=info cargo run --quiet --release Jul 25 23:52:24.097 INFO waytoodeep: Joining... Jul 25 23:52:25.050 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second Jul 25 23:52:25.061 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 25 23:52:25.061 INFO waytoodeep: All done! res=("first", "second")

Look at that, 11 milliseconds apart!

Deeper?

Something again bothers me about this code:

struct TryJoin<A, B, AR, BR, E> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { a: State<A, AR, E>, b: State<B, BR, E>, }

because now a and b are tri-state: Future, Ok, or Gone.

What if only one of a or b is Gone? That state makes no sense!

If it happens, we would currently just return Poll::Pending forever, which isn't great - that's a deadlock.

Really what we want is... two enums. In fact we want the whole TryJoin type to be an enum.

enum TryJoin<A, B, AR, BR, E> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { Polling { a: State<A, AR, E>, b: State<B, BR, E>, }, Done, }

There. Initialize it like thaaaat:

pub fn try_join<A, B, AR, BR, E>(a: A, b: B) -> impl Future<Output = Result<(AR, BR), E>> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { TryJoin::Polling { a: State::Future(a), b: State::Future(b), } }

And then, surprise! Poll<T> implements the Try trait, so we can use ? with it, so our final code is actually pretty short and sweet:

impl<A, B, AR, BR, E> Future for TryJoin<A, B, AR, BR, E> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { type Output = Result<(AR, BR), E>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = unsafe { self.get_unchecked_mut() }; let (a, b) = match this { Self::Polling { a, b } => (a, b), Self::Done => panic!("TryJoin future polled after completion"), }; if let State::Future(fut) = a { if let Poll::Ready(res) = unsafe { Pin::new_unchecked(fut) }.poll(cx) { *a = State::Ok(res?); } } if let State::Future(fut) = b { if let Poll::Ready(res) = unsafe { Pin::new_unchecked(fut) }.poll(cx) { *b = State::Ok(res?); } } match (a, b) { (State::Ok(_), State::Ok(_)) => match std::mem::replace(this, Self::Done) { Self::Polling { a: State::Ok(a), b: State::Ok(b), } => Ok((a, b)).into(), _ => unreachable!(), }, _ => Poll::Pending, } } }

Now I know what you're thinking.

I know what I'm thinking!! Isn't Pin<&mut T> precisely for preventing things like std::mem::swap and std::mem::replace? Those move things around in memory! Which is sehr verboten!

Well, my furry friend. It's only verboten if we promised not to move it. But in this case, we only move self / this after we're done polling both futures.

Past that point, we never use those futures again, pinned or unpinned. And we never promised the result themselves were going to be pinned!

Mh. Mhhhhhhhh.

We just have to make up our mind on whether something's going to be "always pin" or "never pin", and then we can't lose might write code that turns out to be sound.

In our case, TryJoin::Polling(State::Future(_)) is always pinned, and everything else isn't.

Sure, we make a quick trip from Pin<&mut Self> to &mut Self and back to Pin<&mut A>, but as long as we don't move ourselves in-between, it's all good.

If we used std::mem::replace or std::mem::swap while we're still holding the futures, then that would be unsound. But we're not. So we're fine. I think. I'm fairly sure. I'm sure people will write in if it's not.

That's it

Gaze upon our work, and rejoice:

// in `src/tj.rs` use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; pub fn try_join<A, B, AR, BR, E>(a: A, b: B) -> impl Future<Output = Result<(AR, BR), E>> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { TryJoin::Polling { a: State::Future(a), b: State::Future(b), } } enum State<F, T, E> where F: Future<Output = Result<T, E>>, { Future(F), Ok(T), } enum TryJoin<A, B, AR, BR, E> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { Polling { a: State<A, AR, E>, b: State<B, BR, E>, }, Done, } impl<A, B, AR, BR, E> Future for TryJoin<A, B, AR, BR, E> where A: Future<Output = Result<AR, E>>, B: Future<Output = Result<BR, E>>, { type Output = Result<(AR, BR), E>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = unsafe { self.get_unchecked_mut() }; let (a, b) = match this { Self::Polling { a, b } => (a, b), Self::Done => panic!("TryJoin future polled after completion"), }; if let State::Future(fut) = a { if let Poll::Ready(res) = unsafe { Pin::new_unchecked(fut) }.poll(cx) { *a = State::Ok(res?); } } if let State::Future(fut) = b { if let Poll::Ready(res) = unsafe { Pin::new_unchecked(fut) }.poll(cx) { *b = State::Ok(res?); } } match (a, b) { (State::Ok(_), State::Ok(_)) => match std::mem::replace(this, Self::Done) { Self::Polling { a: State::Ok(a), b: State::Ok(b), } => Ok((a, b)).into(), _ => unreachable!(), }, _ => Poll::Pending, } } }

And our little HTTPS client:

// in `src/main.rs` use color_eyre::Report; use std::{net::SocketAddr, sync::Arc}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::TcpStream, }; use tokio_rustls::{rustls::ClientConfig, TlsConnector}; use tracing::info; use tracing_subscriber::EnvFilter; use webpki::DNSNameRef; mod tj; #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Report> { setup()?; info!("Joining..."); let res = tj::try_join(fetch_thing("first"), fetch_thing("second")).await?; info!(?res, "All done!"); Ok(()) } #[allow(dead_code)] async fn fetch_thing(name: &str) -> Result<&str, Report> { // look out it's port 443 now let addr: SocketAddr = ([1, 1, 1, 1], 443).into(); let socket = TcpStream::connect(addr).await?; // establish a TLS session... let connector: TlsConnector = { let mut config = ClientConfig::new(); config .root_store .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); Arc::new(config).into() }; let dnsname = DNSNameRef::try_from_ascii_str("one.one.one.one")?; let mut socket = connector.connect(dnsname, socket).await?; // we're writing straight to the socket, there's no buffering // so no need to flush socket.write_all(b"GET / HTTP/1.1\r\n").await?; socket.write_all(b"Host: one.one.one.one\r\n").await?; socket.write_all(b"User-Agent: cool-bear\r\n").await?; socket.write_all(b"Connection: close\r\n").await?; socket.write_all(b"\r\n").await?; let mut response = String::with_capacity(256); socket.read_to_string(&mut response).await?; let status = response.lines().next().unwrap_or_default(); info!(%status, %name, "Got response!"); // dropping the socket will close the connection Ok(name) } fn setup() -> Result<(), Report> { if std::env::var("RUST_LIB_BACKTRACE").is_err() { std::env::set_var("RUST_LIB_BACKTRACE", "1") } color_eyre::install()?; if std::env::var("RUST_LOG").is_err() { std::env::set_var("RUST_LOG", "info") } tracing_subscriber::fmt::fmt() .with_env_filter(EnvFilter::from_default_env()) .init(); Ok(()) }

And it works.

$ RUST_LOG=info cargo run --quiet --release Jul 26 00:08:13.399 INFO waytoodeep: Joining... Jul 26 00:08:13.707 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first Jul 26 00:08:13.709 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second Jul 26 00:08:13.710 INFO waytoodeep: All done! res=("first", "second")

Two milliseconds apart! That has to be a new record.

Well I hope you enjoyed the article, and that hopefully it solidified your understanding of Rust futures. I've tried maintaining a healthy 50/50 mix of "of course you know that" (and if you don't you can search for it yourself) and "you've heard of that but let me show you ten different ways you can actually see it for yourself instead of vaguely being aware that's how things normally go".

So that, y'know, the article eventually ends. But if you're puzzled by some of the things you now know the keywords to look for them. Knowing the keywords is half the expertise.

Until next time, take care!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK