4

How to implement HTTP Long Polling in Rust

 2 years ago
source link: https://kerkour.com/blog/rust-http-long-polling/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

How to implement HTTP Long Polling in Rust

Tue, Sep 14, 2021

We will use the new web framework developed by tokio’s team: axum. Its performance and simplicity are unparalleled in the Rust world. Also, please note that porting this code to another web framework is easy.

We will implement a simple chat server, as chat is the textbook application that benefits the most from long polling.

There are 3 tricks to make this implementation efficient, so stay attentive ;)

The Chat Service

The Chat Service is an object that encapsulates all our business logic. To keep the example simple, we will only make database calls.

Here is our first trick: In order to enable message ordering, we don’t use a UUIDv4. Instead, we use a ULID that we convert to a UUID so there is no problem to serialize / deserialize it: Uuid = Ulid::new().into()

chat.rs

impl ChatService {
    pub fn new(db: DB) -> Self {
        ChatService { db }
    }

    pub async fn create_message(&self, body: String) -> Result<Message, Error> {
        if body.len() > 10_000 {
            return Err(Error::InvalidArgument("Message is too large".to_string()));
        }

        let created_at = chrono::Utc::now();
        let id: Uuid = Ulid::new().into();

        let query = "INSERT INTO messages
            (id, created_at, body)
            VALUES ($1, $2, $3)";

        sqlx::query(query)
            .bind(id)
            .bind(created_at)
            .bind(&body)
            .execute(&self.db)
            .await?;

        Ok(Message {
            id,
            created_at,
            body,
        })
    }

Here is our second trick: notice the after.unwrap_or(Uuid::nil()) which return a “zero” UUID (00000000-0000-0000-0000-000000000000). With WHERE id > $1 it allows us to return all the messages if after is None.

It’s useful to rehydrate the whole state of a client, for example.

    pub async fn find_messages(&self, after: Option<Uuid>) -> Result<Vec<Message>, Error> {
        let query = "SELECT *
            FROM messages
            WHERE id > $1";

        let messages: Vec<Message> = sqlx::query_as::<_, Message>(query)
            .bind(after.unwrap_or(Uuid::nil()))
            .fetch_all(&self.db)
            .await?;

        Ok(messages)
    }
}

The Web Server

Next, the boilerplate to run the web server.

Thanks to .layer(AddExtensionLayer::new(ctx)), ServerContext is injected into all the routes so we can call ChatService’s methods.

struct ServerContext {
    chat_service: chat::ChatService,
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    std::env::set_var("RUST_LOG", "rust_long_polling=info");
    env_logger::init();

    let database_url = std::env::var("DATABASE_URL")
        .map_err(|_| Error::BadConfig("DATABASE_URL env var is missing".to_string()))?;

    let db = db::connect(&database_url).await?;
    db::migrate(&db).await?;

    let chat_service = chat::ChatService::new(db);
    let ctx = Arc::new(ServerContext::new(chat_service));

    let app = Router::new()
        .route(
            "/messages",
            get(handler_find_messages).post(handler_create_message),
        )
        .or(handler_404.into_service())
        .layer(AddExtensionLayer::new(ctx));

    log::info!("Starting server on 0.0.0.0:8080");
    axum::Server::bind(
        &"0.0.0.0:8080"
            .parse()
            .expect("parsing server's bind address"),
    )
    .serve(app.into_make_service())
    .await
    .expect("running server");

    Ok(())
}

Long Polling

Finally, our third trick: long polling is a simple loop with tokio::time::sleep.

By using tokio::time::sleep, an active connection will barely use any resources when waiting.

If new data is found, we immediately return with the new data. Else, we wait one more second.

After 10 seconds, we return empty data.

main.rs

async fn handler_find_messages(
    Extension(ctx): Extension<Arc<ServerContext>>,
    query_params: Query<FindMessagesQueryParameters>,
) -> Result<Json<Vec<Message>>, Error> {
    let sleep_for = Duration::from_secs(1);

    // long polling: 10 secs
    for _ in 0..10u64 {
        let messages = ctx.chat_service.find_messages(query_params.after).await?;
        if messages.len() != 0 {
            return Ok(messages.into());
        }

        tokio::time::sleep(sleep_for).await;
    }

    // return an empty response
    Ok(Vec::new().into())
}

The code is on GitHub

As usual, you can find the code on GitHub: github.com/skerkour/kerkour.com (please don’t forget to star the repo 🙏).

Want to learn how to craft more advanced web applications in Rust (such as a WebAssembly frontend and a JSON API backend)? Take a look at my book: Black Hat Rust. All early-access supporters get a special discount and awesome bonuses: https://academy.kerkour.com/black-hat-rust?coupon=BLOG.

Warning: this offer is limited in time!

Join the private club where I share exclusive tips and stories about programming, hacking and entrepreneurship. 1 message / week.
I hate spam even more than you do. I'll never share your email, and you can unsubscribe at any time.
black_hat_rust_cover.svg

Want to learn Rust and offensive security? Take a look at my book Black Hat Rust. All early-access supporters get a special discount and awesome bonuses: https://academy.kerkour.com/black-hat-rust?coupon=BLOG.
Warning: this offer is limited in time!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK