19

Writing AWS Lambda Functions with Bastion

 4 years ago
source link: https://blog.bastion.rs/2020/02/01/writing-aws-lambda-functions-with-bastion.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

j2yYzmi.png!web

In this post, we will look how AWS Lambda Functions can be implemented using Bastion. AWS Lambda functions are well known serverless platform used for tasks varying from infrastructure management, serving APIs, data processing to orchestration tasks.

Let’s start! We are going to use the Bastion AWS Lambda Example in our showcase repo.

First let’s clone the Bastion showcase repository:

$ git clone [email protected]:bastion-rs/showcase.git

Get into the containing directory of our example lambda declaration:

$ cd showcase/bastion-aws-lambda

We are using serverless utility to leverage compilation and configuration of our lambda. In this directory you will see how our lambda is configured by serverless.yml . For installing serverless and its dependencies for Rust environment we will do:

$ npm i -g serverless
$ npm i -D serverless-rust

Now we have installed both dependencies for compiling and configuring our lambda. Our lambda is taking a list of sites as input and concurrently making GET requests. Immediately returns their body strings whenever they are available. Main code for lambda is in page_fetcher/src/main.rs . Let’s take a look at the Lambda code and walk through it:

/// This is the JSON payload we expect to be passed to us by the client accessing our lambda.
#[derive(Deserialize, Debug)]
struct InputPayload {
    sites: Vec<String>
}

This is our input payload that contains sites that will be requested when lambda is triggered.

/// This is the JSON payload we will return back to the client if the request was successful.
#[derive(Serialize, Debug)]
struct OutputPayload {
    status: String
}

When we process sites and receive body of the sites we are going to return the lambda status. Think that these are the cluster internal applications which we concurrently trigger their endpoints. Instead of sites you can use your cluster internal naming.

Message Handler (Dispatcher)

fn dispatcher(
    payload: InputPayload,
    _c: Context,
) -> Result<OutputPayload, HandlerError> {
    let (p, mut c) = unbounded::<bool>();

    Bastion::children(|children: Children| {
        children.with_exec(move |_ctx: BastionContext| {
            let sites = payload.sites.clone();
            let workers = worker_pool(payload.sites.len());
            let p = p.clone();

            async move {
                info!("Dispatching started");

                for (worker, site) in workers.elems().iter().zip(sites) {
                    info!("Site sent for processing!");
                    let answer = worker.ask_anonymously(site).unwrap();
                    // Or use the returned body
                    let _ = answer.await.unwrap();
                }

                let _ = p.unbounded_send(true);

                Ok(())
            }
        })
    }).unwrap();

    // Wait for completion signal OR data itself
    while let Err(_) = c.try_next() {}

    Ok(OutputPayload { status: "OK".into() })
}

Let’s break this dispatcher down. Lambda runtime unfortunately doesn’t allow us to write direct runtimeless lambda application so we need to write a dispatcher that will be registered to underlying runtime. So this is our dispatcher that will use the underlying runtime to dispatch requests to Bastion for processing.

We created an unbounded channel for receiving completion that we made requests to all given sites and received a response from each other:

let (p, mut c) = unbounded::<bool>();

We are creating a children group to setup workers and fan out requests to workers. So we will make our workers run individually:

let workers = worker_pool(payload.sites.len());

The line above sets up workers using worker_pool method with given amount of sites. We will explain setup method later.

async move {
    info!("Dispatching started");

    for (worker, site) in workers.elems().iter().zip(sites) {
        info!("Site sent for processing!");
        let answer = worker.ask_anonymously(site).unwrap();
        // Or use the returned body
        let _ = answer.await.unwrap();
    }

    let _ = p.unbounded_send(true);

    Ok(())
}

We link each worker and the site. Using ask , we are sending site to the workers. Here, we need to know the difference between ask and tell . tell method works like a fire and forget messaging system. Message passed with tell won’t expect response from the message receiving actor. In Bastion we have ask and ask_anonymously . Latter is usable across BastionContext ’s. What we call context is basically a hierarchy branch manager which allows granular microconcurrency between children, child and supervisors. ask is usable within the same context which allows us to directly identify children inside the same hierarchy. We are using ask_anonymously in here because workers and dispatcher are separate branches.

Our workers are sending back the body. Even though we are not using it we wait for the response from all the workers. If we don’t wait inside the for loop, order will be different across responses. In Bastion order of messages belong to user’s code.

After these explanations for the dispatcher . Let’s take a look at the workers:

Worker Actors

fn worker_pool(pool_size: usize) -> ChildrenRef {
    Bastion::children(|children: Children| {
        children
            .with_redundancy(pool_size)
            .with_exec(move |ctx: BastionContext| {
                async move {
                    info!("Worker started!");

                    // Start receiving work
                    loop {
                        msg! { ctx.recv().await?,
                            site: String =!> {
                                info!("Received site: {}!", site.clone());
                                let body = surf::get(site.clone()).recv_string().await.unwrap();
                                warn!("Site: {} Body: {}", site, body);
                                let _ = answer!(ctx, body);
                            };
                            _: _ => ();
                        }
                    }
                }
            })
    })
    .expect("Couldn't start a new children group.")
}

We are passing the given pool_size as redundancy:

children
    .with_redundancy(pool_size)

Then we are giving the actor body:

.with_exec(move |ctx: BastionContext| {
    async move {
        info!("Worker started!");

        // Start receiving work
        loop {
            msg! { ctx.recv().await?,
                site: String =!> {
                    info!("Received site: {}!", site.clone());
                    let body = surf::get(site.clone()).recv_string().await.unwrap();
                    warn!("Site: {} Body: {}", site, body);
                    let _ = answer!(ctx, body);
                };
                _: _ => ();
            }
        }
    }
})

In this body you might have realized that we receive messages with msg! macro. And it has an interesting syntax. While designing Bastion we’ve thought about tell , ask and broadcast scenarios between actors. =!> corresponds to asked messages. So as you know we’ve asked our workers to fetch a site. We are receiving this match from ask arm of our mailbox.

Oh yes, we are using surf to issue our call. and returning the response with our answer! macro.

Application’s Main

These were the methods that were doing the processing. It’s time to take a look at the application protected against crashes. Our main entry:

#[fort::root]
async fn main(_: BastionContext) -> Result<(), ()> {
    let _ = simple_logger::init_with_level(log::Level::Info);
    lambda!(dispatcher);

    Ok(())
}

Fort is a proc macro attribute that supplies fault tolerance to main method and wraps your main method into Bastion. It gives you root context of the Bastion system as argument for further hierarchy and let you run your application in panic handler context. Rest is basically initializing our logging system, and lambda handler registration.

That’s all we have inside the code. For more information take a look at to our Documentation .

Testing locally

For testing locally what we need to do is:

$ serverless invoke local -f page_fetcher -d \
    '{
      "sites": [
        "https://bastion.rs",
        "https://blog.bastion.rs",
        "http://google.com",
        "https://docs.rs/",
        "https://crates.io/",
        "https://twitter.com/",
        "https://news.ycombinator.com/",
        "http://play.rust-lang.org/",
        "http://catb.org/jargon/html/hates.html"
      ]
    }'

Then wait for container build finish, artefact extracted, and run inside the container. Then hopefully you will see the lambda logs.

Conclusion

In this post, we introduced Bastion to AWS Lambda runtime. For your own applications feel free to explore more advanced methods, use message passing with different approaches mentioned above, design your own hierarchy and concurrency nodes. Don’t forget to share your suggestions and ideas with us. Join our Discord server , open an issue/feature request/support ticket in our GitHub.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK