31

Combining Axum, Hyper, Tonic, and Tower for hybrid web/gRPC apps: Part 2

 2 years ago
source link: https://www.fpcomplete.com/blog/axum-hyper-tonic-tower-part2/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Share this

This is the second of four posts in a series on combining web and gRPC services into a single service using Tower, Hyper, Axum, and Tonic. The full four parts are:

I recommend checking out the first post in the series if you haven't already.

Subscribe to our blog via email
Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.

Quick recap

  • Tower provides a Service trait, which is basically an asynchronous function from requests to responses
  • Service is parameterized on the request type, and has an associated type for Response
  • It also has an associated Error type, and an associated Future type
  • Service allows async behavior in both checking whether the service is ready to accept a request, and for handling the request
  • A web application ends up having two sets of async request/response behavior
    • Inner: a service that accepts HTTP requests and returns HTTP responses
    • Outer: a service that accepts the incoming network connections and returns an inner service

With that in mind, let's look at Hyper.

Services in Hyper

Now that we've got Tower under our belts a bit, it's time to dive into the specific world of Hyper. Much of what we saw above will apply directly to Hyper. But Hyper has a few additional curveballs to deal with:

  • Both the Request and Response types are parameterized over the representation of the request/response bodies
  • There are a bunch of additional traits and type parameterized in the public API, some not appearing in the docs at all, and many that are unclear

In place of the run function we had in our previous fake server example, Hyper follows a builder pattern for initializing HTTP servers. After providing configuration values, you create an active Server value from your Builder with the serve method. Just to get it out of the way now, this is the type signature of serve from the public docs:

pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
where
    I: Accept,
    I::Error: Into<Box<dyn StdError + Send + Sync>>,
    I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
    S: MakeServiceRef<I::Conn, Body, ResBody = B>,
    S::Error: Into<Box<dyn StdError + Send + Sync>>,
    B: HttpBody + 'static,
    B::Error: Into<Box<dyn StdError + Send + Sync>>,
    E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
    E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,

That's a lot of requirements, and not all of them are clear from the docs. Hopefully we can bring some clarity to this. But for now, let's start off with something simpler: the "Hello world" example from the Hyper homepage:

use std::{convert::Infallible, net::SocketAddr};
use hyper::{Body, Request, Response, Server};
use hyper::service::{make_service_fn, service_fn};

async fn handle(_: Request<Body>) -> Result<Response<Body>, Infallible> {
    Ok(Response::new("Hello, World!".into()))
}

#[tokio::main]
async fn main() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

    let make_svc = make_service_fn(|_conn| async {
        Ok::<_, Infallible>(service_fn(handle))
    });

    let server = Server::bind(&addr).serve(make_svc);

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

This follows the same pattern we established above:

  • handle is an async function from a Request to a Response, which may fail with an Infallible value.
    • Both Request and Response are parameterized with Body, a default HTTP body representation.
  • handle gets wrapped up in service_fn to produce a Service<Request<Body>>. This is like app_fn above.
  • We use make_service_fn, like app_factory_fn above, to produce the Service<&AddrStream> (we'll get to that &AddrStream shortly).
    • We don't care about the &AddrStream value, so we ignore it
    • The return value from the function inside make_service_fn must be a Future, so we wrap with async
    • The output of that Future must be a Result, so we wrap with an Ok
    • We need to help the compiler out a bit and provide a type annotation of Infallible, otherwise it won't know the type of the Ok(service_fn(handle)) expression

Using this level of abstraction for writing a normal web app is painful for (at least) three different reasons:

  • Managing all of these Service pieces manually is a pain
  • There's very little in the way high level helpers, like "parse the request body as a JSON value"
  • Any kind of mistake in your types may lead to very large, non-local error messages that are difficult to diagnose

So we'll be more than happy to move on from Hyper to Axum a bit later. But for now, let's continue exploring things at the Hyper layer.

Bypassing service_fn and make_service_fn

What I found most helpful when trying to grok Hyper was implementing a simple app without service_fn and make_service_fn. So let's go through that ourselves here. We're going to create a simple counter app (I'm nothing if not predictable). We'll need two different data types: one for the "app factory", and one for the app itself. Let's start with the app itself:

struct DemoApp {
    counter: Arc<AtomicUsize>,
}

impl Service<Request<Body>> for DemoApp {
    type Response = Response<Body>;
    type Error = hyper::http::Error;
    type Future = Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, _req: Request<Body>) -> Self::Future {
        let counter = self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        let res = Response::builder()
            .status(200)
            .header("Content-Type", "text/plain; charset=utf-8")
            .body(format!("Counter is at: {}", counter).into());
        std::future::ready(res)
    }
}

This implementation uses the std::future::Ready struct to create a Future which is immediately ready. In other words, our application doesn't perform any async actions. I've set the Error associated type to hyper::http::Error. This error would be generated if, for example, you provided invalid strings to the header method call, such as non-ASCII characters. As we've seen multiple times, poll_ready just advertises that it's always ready to handle another request.

The implementation of DemoAppFactory isn't terribly different:

struct DemoAppFactory {
    counter: Arc<AtomicUsize>,
}

impl Service<&AddrStream> for DemoAppFactory {
    type Response = DemoApp;
    type Error = Infallible;
    type Future = Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, conn: &AddrStream) -> Self::Future {
        println!("Accepting a new connection from {:?}", conn);
        std::future::ready(Ok(DemoApp {
            counter: self.counter.clone()
        }))
    }
}

We have a different parameter to Service, this time &AddrStream. I did initially find the naming here confusing. In Tower, a Service takes some Request. And with our DemoApp, the Request it takes is a Hyper Request<Body>. But in the case of DemoAppFactory, the Request it's taking is a &AddrStream. Keep in mind that a Service is really just a generalization of failable, async functions from input to output. The input may be a Request<Body>, or may be a &AddrStream, or something else entirely.

Similarly, the "response" here isn't an HTTP response, but a DemoApp. I again find it easier to use the terms "input" and "output" to avoid the name overloading of request and response.

Finally, our main function looks much the same as the original from the "Hello world" example:

#[tokio::main]
async fn main() {
    let addr = SocketAddr::from(([0, 0, 0, 0], 3000));

    let factory = DemoAppFactory {
        counter: Arc::new(AtomicUsize::new(0)),
    };

    let server = Server::bind(&addr).serve(factory);

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

If you're looking to extend your understanding here, I'd recommend extending this example to perform some async actions within the app. How would you modify Future? If you use a trait object, how exactly do you pin?

But now it's time to take a dive into a topic I've avoided for a while.

Understanding the traits

Let's refresh our memory from above on the signature of serve:

pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
where
    I: Accept,
    I::Error: Into<Box<dyn StdError + Send + Sync>>,
    I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
    S: MakeServiceRef<I::Conn, Body, ResBody = B>,
    S::Error: Into<Box<dyn StdError + Send + Sync>>,
    B: HttpBody + 'static,
    B::Error: Into<Box<dyn StdError + Send + Sync>>,
    E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
    E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,

Up until preparing this blog post, I have never tried to take a deep dive into understanding all of these bounds. So this will be an adventure for us all! (And perhaps it should end up with some documentation PRs by me...) Let's start off with the type variables. Altogether, we have four: two on the impl block itself, and two on this method:

  • I represents the incoming stream of connections.
  • E represents the executor.
  • S is the service we're going to run. Using our terminology from above, this would be the "app factory." Using Tower/Hyper terminology, this is the "make service."
  • B is the choice of response body the service returns (the "app", not the "app factory", using nomenclature above).

I: Accept

I needs to implement the Accept trait, which represents the ability to accept a new connection from some a source. The only implementation out of the box is for AddrIncoming, which can be created from a SocketAddr. And in fact, that's exactly what Server::bind does.

Accept has two associated types. Error must be something that can be converted into an error object, or Into<Box<dyn StdError + Send + Sync>>. This is the requirement of (almost?) every associated error type we look at, so from now on I'll just skip over them. We need to be able to convert whatever error happened into a uniform representation.

The Conn associated type represents an individual connection. In the case of AddrIncoming, the associated type is AddrStream. This type must implement AsyncRead and AsyncWrite for communication, Send and 'static so it can be sent to different threads, and Unpin. The requirement for Unpin bubbles up from deeper in the stack, and I honestly don't know what drives it.

S: MakeServiceRef

MakeServiceRef is one of those traits that doesn't appear in the public documentation. This seems to be intentional. Reading the source:

Just a sort-of "trait alias" of MakeService, not to be implemented by anyone, only used as bounds.

Were you confused as to why we were receiving a reference with &AddrStream? This is the trait that powers that transformation. Overall, the trait bound S: MakeServiceRef<I::Conn, Body, ResBody = B> means:

  • S must be a Service
  • S will accept input of type &I::Conn
  • It will in turn produce a new Service as output
  • That new service will accept Request<Body> as input, and produce Response<ResBody> as output

And while we're talking about it: that ResBody has the restriction that it must implement HttpBody. As you might guess, the Body struct mentioned above implements HttpBody. There are a number of implementations too. When we get to Tonic and gRPC, we'll see that there are, in fact, other response bodies we have to deal with.

NewSvcExec and ConnStreamExec

The default value for the E parameter is Exec, which does not appear in the generated docs. But of course you can find it in the source. The concept of Exec is to specify how tasks are spawned off. By default, it leverages tokio::spawn.

I'm not entirely certain of how all of these plays out, but I believe the two traits in the heading allow for different handling of spawning for the connection service (app factory) versus the request service (app).

Using Axum

Axum is the new web framework that kicked off this whole blog post. Instead of dealing directly with Hyper like we did above, let's reimplement our counter web service using Axum. We'll be using axum = "0.2". The crate docs provide a great overview of Axum, and I'm not going to try to replicate that information here. Instead, here's my rewritten code. We'll analyze a few key pieces below:

use axum::extract::Extension;
use axum::handler::get;
use axum::{AddExtensionLayer, Router};
use hyper::{HeaderMap, Server, StatusCode};
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

#[derive(Clone, Default)]
struct AppState {
    counter: Arc<AtomicUsize>,
}

#[tokio::main]
async fn main() {
    let addr = SocketAddr::from(([0, 0, 0, 0], 3000));

    let app = Router::new()
        .route("/", get(home))
        .layer(AddExtensionLayer::new(AppState::default()));

    let server = Server::bind(&addr).serve(app.into_make_service());

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

async fn home(state: Extension<AppState>) -> (StatusCode, HeaderMap, String) {
    let counter = state
        .counter
        .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
    let mut headers = HeaderMap::new();
    headers.insert("Content-Type", "text/plain; charset=utf-8".parse().unwrap());
    let body = format!("Counter is at: {}", counter);
    (StatusCode::OK, headers, body)
}

The first thing I'd like to get out of the way is this whole AddExtensionLayer/Extension bit. This is how we're managing shared state within our application. It's not directly relevant to our overall analysis of Tower and Hyper, so I'll suffice with a link to the docs demonstrating how this works. Interestingly, you may notice that this implementation relies on middlewares, which does in fact leverage Tower, so it's not completely separate.

Anyway, back to our point at hand. Within our main function, we're now using this Router concept to build up our application:

let app = Router::new()
    .route("/", get(home))
    .layer(AddExtensionLayer::new(AppState::default()));

This says, essentially, "please call the home function when you receive a request for /, and add a middleware that does that whole extension thing." The home function uses an extractor to get the AppState, and returns a value of type (StatusCode, HeaderMap, String) to represent the response. In Axum, any implementation of the appropriately named IntoResponse trait can be returned from handler functions.

Anyway, our app value is now a Router. But a Router cannot be directly run by Hyper. Instead, we need to convert it into a MakeService (a.k.a. an app factory). Fortunately, that's easy: we call app.into_make_service(). Let's look at that method's signature:

impl<S> Router<S> {
    pub fn into_make_service(self) -> IntoMakeService<S>
    where
        S: Clone;
}

And going down the rabbit hole a bit further:

pub struct IntoMakeService<S> { /* fields omitted */ }

impl<S: Clone, T> Service<T> for IntoMakeService<S> {
    type Response = S;
    type Error = Infallible;
    // other stuff omitted
}

The type Router<S> is a value that can produce a service of type S. IntoMakeService<S> will take some kind of connection info, T, and produce that service S asynchronously. And since Error is Infallible, we know it can't fail. But as much as we say "asynchronously", looking at the implementation of Service for IntoMakeService, we see a familiar pattern:

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    Poll::Ready(Ok(()))
}

fn call(&mut self, _target: T) -> Self::Future {
    future::MakeRouteServiceFuture {
        future: ready(Ok(self.service.clone())),
    }
}

Also, notice how that T value for connection info doesn't actually have any bounds or other information. IntoMakeService just throws away the connection information. (If you need it for some reason, see into_make_service_with_connect_info.) In other words:

  • Router<S> is a type that lets us add routes and middleware layers
  • You can convert a Router<S> into an IntoMakeService<S>
  • But IntoMakeService<S> is really just a fancy wrapper around an S to appease the Hyper requirements around app factories
  • So the real workhorse here is just S

So where does that S type come from? It's built up by all the route and layer calls you make. For example, check out the get function's signature:

pub fn get<H, B, T>(handler: H) -> OnMethod<H, B, T, EmptyRouter>
where
    H: Handler<B, T>,

pub struct OnMethod<H, B, T, F> { /* fields omitted */ }

impl<H, B, T, F> Service<Request<B>> for OnMethod<H, B, T, F>
where
    H: Handler<B, T>,
    F: Service<Request<B>, Response = Response<BoxBody>, Error = Infallible> + Clone,
    B: Send + 'static,
{
    type Response = Response<BoxBody>;
    type Error = Infallible;
    // and more stuff
}

get returns an OnMethod value. And OnMethod is a Service that takes a Request<B> and returns a Response<BoxBody>. There's some funny business at play regarding the representations of bodies, which we'll eventually dive into a bit more. But with our newfound understanding of Tower and Hyper, the types at play here are no longer inscrutable. In fact, they may even be scrutable!

And one final note on the example above. Axum works directly with a lot of the Hyper machinery. And that includes the Server type. While the axum crate reexports many things from Hyper, you can use those types directly from Hyper instead if so desired. In other words, Axum is pretty close to the underlying libraries, simply providing some convenience on top. It's one of the reasons I'm pretty excited to get a bit deeper into my experiments with Axum.

So to sum up at this point:

  • Tower provides an abstraction for asynchronous functions from input to output, which may fail. This is called a service.
  • HTTP servers have two levels of services. The lower level is a service from HTTP requests to HTTP responses. The upper level is a service from connection information to the lower level service.
  • Hyper has a lot of additional traits floating around, some visible, some invisible, which allow for more generality, and also make things a bit more complicated to understand.
  • Axum sits on top of Hyper and provides an easier to use interface for many common cases. It does this by providing the same kind of services that Hyper is expecting to see. And it seems to be doing a bunch of fancy footwork around HTTP body representations.

Next step on our journey: let's look at another library for building Hyper services. We'll follow up on this in our next post.

Read part 3 now

If you're looking for more Rust content from FP Complete, check out:

Subscribe to our blog via email
Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.

Do you like this blog post and need help with DevOps, Rust or functional programming? Contact us.

Share this


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK