15

async/await on embedded Rust

 4 years ago
source link: https://ferrous-systems.com/blog/async-on-embedded/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

In aprevious post we explored what needs to be done on the rustc side to bring async/await to no_std Rust.

In this post we'll explore what could be done once async/await is available in no_std land and why we think async/await is a big deal for embedded development.

From blocking to non-blocking

A blocking blinky program looks like this:

use cortex_m_rt::entry;
use embedded_hal::blocking::DelayMs as _; // `delay_ms` trait
use hal::{Led, Timer};

#[entry]
fn main() -> ! {
    let mut led = Led::new();
    let mut timer = Timer::new();

    loop {
        led.on();
        timer.delay_ms(1_000);
        led.off();
        timer.delay_ms(1_000);
    }
}

The program will turn an LED on for one second and then turn it off for the next second. The program will then repeat these two steps over and over.

The interesting part here is timer.delay_ms . The trait documentation indicates that the function must "pause execution for n milliseconds" but neither the documentation or the signature specify how the pause should be implemented. An implementation could implement the pause in one of two ways:

  • By busy waiting , that is by continuously polling the state of the Timer to see if the desired time has elapsed (this approach is bad for power conscious applications),

  • Or by sleeping , that is by setting an interrupt to fire at some point in the future and then putting the device in low power mode (e.g. by stopping the CPU) until the interrupt fires.

Which behavior gets implemented is up to the author of the Timer abstraction.

The Timer abstraction has a device-specific implementation so there will be several different implementations of it, at least one per device family ( nrf52::Timer , stm32::Timer , etc.). This means that you could run into one behavior or the other depending on which chip you pick for your application.

An async/await blinky program may look like this:

use async_cortex_m::task; // <- async runtime
use cortex_m_rt::entry;
use hal::{Led, Timer};

#[entry]
fn main() -> ! {
    let mut led = Led::new();
    let mut timer = Timer::new();

    // `block_on` runs the future (`async` block) to completion
    task::block_on(async {
        loop {
            led.on();
            timer.wait(Duration::from_secs(1)).await;
            // ^ suspends the task for one second
            led.off();
            timer.wait(Duration::from_secs(1)).await;
        }
    })
}

The two don't look much different in terms of code and, to an external observer, the program will appear to do the same but their actual semantics are quite different. async/await code uses futures under the hood. A future represents an asynchronous computation and comes with a contract that specifies its runtime characteristics. In particular, the "futures should not be poll -ed in a tight loop" (paraphrased) part indicates that timer.wait should not result in continuously polling the state of the Timer (i.e. busy waiting).

How often the future is polled and whether the device is put in sleep mode when no future can made progress is up to the runtime , or executor, used to run the future. A runtime that continuously polls (all or some) futures can result in a more responsive application whereas a runtime that puts the device in deep sleep mode when no future can make progress will sacrifice responsiveness in favor of improved power savings.

The main take away here is that the application author now gets to pick the desired runtime characteristic by picking, or building , the right async runtime. On the other hand, the authors of HAL abstractions like Timer now have to write abstractions that are flexible enough to work with different async runtimes – they can no longer implement busy-waiting APIs as these go against the contract of the Future API.

Multitasking

Async/await is a building block for multitasking. Most async executors will expose a concept of tasks , a future that will be run to completion, and allow running them concurrently (for example, see std_async::task::spawn ).

no_std lacks multitasking primitives. std::thread is implemented on top of OS threads only available in hosted environment like Linux. Embedded no_std applications are usually bare-metal applications with no OS underneath. Given these conditions, async/await tasks could become the missing "standard" multitasking primitive in bare-metal no_std land – after all the task module is already in core .

To give you a feel for async/await based multitasking here's an application that uses the task::spawn API to spawn an additional task onto the executor.

use async_cortex_m::task;
use cortex_m_rt::entry;
use hal::{Led, Timer, serial};

#[entry]
fn main() -> ! {
    let mut led = Led::new();
    let mut timer = Timer::new();

    // heartbeat task
    task::spawn(async move {
        loop {
            Led.on();
            timer.wait(Duration::from_millis(500)).await;
            Led.off();
            timer.wait(Duration::from_millis(500)).await;
        }
    });

    // opens the serial port; returns transmit and receive handles
    let (mut tx, mut rx): (Tx, Rx) = serial::open();
    
    // echo task: sends back all incoming bytes
    task::block_on(async {
        loop {
            let mut buf = [0];
            rx.read(&mut buf).await;
            // ^ suspends the task until enough data has been received
            tx.write(&buf).await;
        }
    })
}

// where Tx and Rx have the following async API
impl Tx {
    /// Sends *all* `bytes` over the serial interface
    pub async fn write(&mut self, bytes: &[u8]) { /* .. */ }
}

impl Rx {
    /// *Completely* fills the given `buffer` with bytes received
    /// over the serial interface
    pub async fn read(&mut self, buffer: &mut [u8]) { /* .. */ }
}

Here, the previous blinky program has been converted into a "heartbeat" task that visually indicates that the program is making progress and has not locked up (due to an unhandled exception or some software bug). An "echo" task is run concurrently; this second task reads data from the serial interface and sends it back without altering it.

Threads

Another multitasking option commonly used in embedded systems, specially in C firmware (see FreeRTOS, Zephyr, etc.), are threads , as in OS-like threads where each thread gets its own, separate call stack. As most microcontrollers are single-core systems using threads does not improve parallelism or throughput but threads are a concurrency model that most programmers are familiar with so they are a commonly offered multitasking option.

Using threads increases the risk of stack overflows, however. Microcontrollers don't have much RAM available. As more threads are spawned each thread gets a smaller chunk of the available RAM to place its call stack. If a thread does too many nested function calls or uses too many local variables it can overflow its assigned stack space and overwrite the call stack of another thread resulting in memory corruption.

Threading runtimes will usually use some runtime mechanism, like the Memory Protection Unit (MPU) in ARM Cortex-M devices, to catch and prevent these stack overflows. Not all microcontroller architectures, e.g. ARM Cortex-M0 and MSP430, have an MPU or a equivalent mechanism so threads are inherently memory unsafe to use on those architectures.

There are techniques, like flipping the layout of the program memory , that can be used to protect against stack overflows in devices with no MPU but these techniques don't work if there's more than one call stack.

We are bringing threads into the discussion because an async runtime could be implemented without using, or even implementing, threads by running all tasks cooperatively and on the same call stack. Having a single call stack reduces the chances of stack overflows and at the same time lets us use the stack overflow protection mechanism described in the previous paragraph.

Sharing data

Readers familiar with the std::thread API know that (explicitly) sharing data between threads will require some form of synchronization like sync::Mutex or sync::RwLock , which usually will come wrapped in an Arc . The reason these wrappers are needed is to make the data thread-safe to access.

Interestingly, if one is dealing with an async runtime that runs all tasks cooperatively ("on the same thread") then these synchronization wrappers are not needed and types with interior mutability (types that allow mutation through a shared reference ( &T )) like Cell and RefCell are sufficient to safely share data between tasks.

For example, to extend the previous program to have the serial interface control the state of the blinking LED we can share a boolean ( Cell<bool> ) between the two tasks.

#![deny(unsafe_code)]

use core::cell::Cell;

use cortex_m_rt::entry;
use hal::{Led, Timer, serial};

#[entry]
fn main() -> ! {
    // the state of the LED: off (`false`) or blinking (`true`)
    static mut STATE: Cell<bool> = Cell::new(true);

    let state: &'static Cell<bool> = STATE;
    let mut led = Led::new();
    let mut timer = Timer::new();

    // the future argument must satisfy the bound `: 'static` but
    // no `: Send` bound is required. `Cell<T>` is *not* `Sync` so
    // `&Cell<T>` is *not* `Send`
    task::spawn(async move {
        // `state: &'static Cell<_>` gets moved into the async block

        loop {
            if state.get() {
                led.on();
            }
            timer.wait(Duration::from_millis(500)).await;
            led.off();
            timer.wait(Duration::from_millis(500)).await;
        }
    });

    let (tx, rx) = serial::open();
    task::block_on(async move {
        // `state: &'static Cell<_>` gets moved into the async block

        loop {
            let mut buf = [0];
            rx.read(&mut buf).await;
            
            // toggles the state of the LED
            if buf[0] == b't' {
               state.set(!state.get());
            }

            tx.write(&buf).await;
        }
    })
}

Perhaps the most surprising part of the above snippet, if you are not familiar with the cortex_m_rt crate, is that the static mut variable is safe to access and that its type changes from T to &'static mut T . This is a feature of the cortex_m_rt::entry macro / attribute; it does this transformation in its expansion. The reason this is safe is that the main function can not be (safely) called from software (calling main() will not compile); instead it will be called exactly once by the hardware (reset handler).

There are other ways to obtain a static reference ( &'static T ) that one could have used here, like Box::leak . It would also have been OK to send an Rc<Cell<bool>> to each task. These two options require a #[global_allocator] and the unstable #[alloc_error_handler] feature.

Channels

Sometimes inter-task communication is better expressed using channels rather than explicit shared state. An asynchronous runtime will usually re-export some asynchronous SPSC (Single-Producer Single-Consumer) or MPMC (Multiple-Producer Multiple-Consumer) channel as part of its API.

Modifying the previous program to have the serial task and the LED task talk through a channel would look like this:

use async_cortex_m::{
    Channel, // MPMC channel
    task,
};
use cortex_m_rt::entry;
use hal::{Led, serial};

#[entry]
fn main() -> ! {
    static mut CHANNEL: Channel<u8> = Channel::new();

    let channel: &'static Channel<u8> = CHANNEL;
    let mut led = Led::new();

    task::spawn(async move {
        loop {
            let byte = channel.recv().await;
            // ^ suspends the task while the channel is empty

            if byte == b'0' {
                led.off();
            } else if byte == b'1' {
                led.on();
            } else {
                // unknown command
            }
        }
    });

    let (tx, rx) = serial::open();
    task::block_on(async move {
        loop {
            let mut buf = [0];
            rx.read(&mut buf).await;

            // the input controls the state of the LED
            channel.send(buf[0]).await; 
            // ^ will suspend the task if the channel is full

            tx.write(&buf).await;
        }
    })
}

Mutex

Even though Cell and RefCell can be used to share data between tasks there's still room for an async Mutex abstraction. The following contrived example will help us visualize the need for it:

use async_cortex_m::{task, Mutex, MutexGuard};
use cortex_m_rt::entry;

#[entry]
fn main() -> ! {
    static mut MUTEX: Mutex<i32> = Mutex::new(0);

    let mutex: &'static Mutex<i32> = MUTEX;
    task::spawn(async move {
        println!("A: before lock");

        let lock: MutexGuard = mutex.lock().await;

        println!("A: mutex contains the value {}", *lock);

        loop {
            println!("A: yield");
            task::r#yield().await;
        }
    });

    let mut lock: MutexGuard = mutex.try_lock().unwrap();
    task::block_on(async {
        println!("B: yield");

        // suspend the task / yield control
        task::r#yield().await;

        println!("B: after yield");

        *lock += 1;

        drop(lock); // release the lock

        println!("B: released the lock");

        loop {
            println!("B: yield");
            task::r#yield().await;
        }
    })
}

This program prints:

B: yield
A: before lock
B: after yield
B: released the lock
B: yield
A: mutex contains the value 1
A: yield
(..)

The key points here are that (a) one task can hold the lock ( MutexGuard ) across a suspension point, in the example the suspension point is an explicit task::yield call but all .await calls contain potential suspension points; and (b) Mutex::lock contention suspends the caller task until the task currently holding the lock releases it.

If you replace the async::Mutex with a plain RefCell (and the async lock().await s with non-async borrow_mut() s) you'll get a panic at the contention point:

B: yield
A: before lock
panicked at 'already borrowed: BorrowMutError'

Holding a BorrowMut (what RefCell::borrow_mut returns) across a suspension point is likely to be wrong and may result in a panic at runtime. On the other hand, if you are using an async::Mutex but the MutexGuard never lives across a suspension point then chances are you could be using a (cheaper) RefCell instead of the async::Mutex .

async::Mutex is particularly useful when the inner type has an async API. Let's see how async::Mutex could be used to communicate with two I2C devices connected to the same I2C bus.

I2C

Inter-Integrated Circuit (I2C or I squared C) is a bi-directional communication protocol widely used in embedded systems. The protocol allows a host to communicate with many devices that are connected the same bus (all of them share two electrical lines plus ground).

The key points of the protocol are:

  • The host drives the communication regardless of the direction of the data (host to device, or the way around)

  • Each device has an address that the host must use to select the device it will communicate with

  • Devices cannot communicate with each other or start communication with the host

  • A special set of electrical signals, START and STOP, are used to delimit data transfers .

  • The address of the device must be sent after the START condition.

We can summarize the I2C protocol from the point of view of the host using the following async API:

// I2C bus (host side) 
pub struct I2c { /* .. */ }

impl I2c {
    /// Sends `bytes` to the device with the specified address
    ///
    /// Events: START - ADDR - (H -> D) - STOP 
    ///
    /// `(H -> D)` denotes data being sent from the Host to the Device
    pub fn async write(
        &mut self,
        addr: u8,
        bytes: &[u8],
    ) -> Result<(), I2cError> { /* .. */ }

    /// Fills the given buffer with data from the device with the
    /// specified address 
    /// 
    /// Events: START - ADDR - (D -> H) - STOP 
    ///
    /// `(D -> H)` denotes data being sent from the Device to the Host
    pub fn async read(
        &mut self,
        addr: u8,
        buf: &mut [u8],
    ) -> Result<(), I2cError> { /* .. */ }

    /// `write` followed by `read` in a single transaction (without an
    /// intermediate STOP)
    /// 
    /// Events:
    /// START - ADDR - (H -> D) - reSTART - ADDR - (D -> H) - STOP
    ///
    /// `reSTART` denotes a "repeated START"
    pub fn async write_then_read(
        &mut self,
        addr: u8,
        tx_buf: &[u8],
        rx_buf: &mut [u8],
    ) -> Result<(), I2cError> { /* .. */ }
}

Common I2C devices include sensors like accelerometers, temperature sensors, gas (air quality) sensors, etc; and external peripherals like Real Time Clocks, IO port expanders, etc. Let's use the SCD30 gas sensor and the DS3231 real time clock to show how to write asynchronous driver APIs.

To read data from an I2C device one will usually use the write_then_read API to first send (write transaction) the address of the register (on the I2C device) that one wants to read and then receive (read transaction) the contents of that register. Some I2C devices take a command instead of an address in the write phase. There's not much difference between the two; they are just a sequence of bytes sent on the bus.

The DS3231 represents its data and state as registers. An API to retrieve the current date and time would look like this:

use chrono::NaiveDateTime;

/// DS3231 I2C driver
pub struct Ds3231 {
    i2c: I2c,
}

// I2C address of this device
const ADDRESS: u8 = 0x61;

impl Ds3231 {
    pub fn new(i2c: I2c) -> Self {
        Self { i2c }
    }

    /// Returns the current date and time
    pub async fn get_datetime(
        &mut self,
    ) -> Result<NaiveDateTime, Error> {
        let mut buf = [0; 7];

        // reads 7 registers starting at register address 0x00
        self.i2c.write_then_read(ADDRESS, &[0x00], &mut buf).await?;
        
        Ok(bytes2datetime(&buf)?)
    }
}

fn bytes2datetime(
   bytes: &[u8],
) -> Result<NaiveDateTime, InvalidDateError> {
    // ..
}

The SCD30 uses commands instead of registers. An API to retrieve the last measurement of the sensor would look like this:

/// SCD30 I2C driver
pub struct Scd30 {
    i2c: I2c,
}

// I2C address of this device
const ADDRESS: u8 = 0b110_1000;

// A command encoded as 2 bytes
const READ_CMD: [u8; 2] = [0x03, 0x00];

impl Scd30 {
    pub fn new(i2c: I2c) -> Self {
        Self { i2c }
    }

    /// Returns the last sensor measurement
    pub async fn get_measurement(
        &mut self,
    ) -> Result<Measurement, Error> {
        let mut buf = [0; 18];

        // the data sheet indicates there must be a STOP condition
        // between the write and the read; this is why
        // `write_then_read` is not used here
        self.i2c.write(ADDRESS, &READ_CMD).await?;
        self.i2c.read(ADDRESS, &mut buf).await?;
        
        Ok(bytes2measurement(&buf)?)
    }
}

fn bytes2measurement(
   bytes: &[u8],
) -> Result<Measurement, CrcError> {
    // ..
}

pub struct Measurement {
    /// CO2 concentrain in parts per million (0 - 40,000 ppm)
    pub co2: f32,

    /// Relative humidity (0 - 100%)
    pub humidity: f32,
    
    /// Temperature in Celsius (-40 - 70 C)
    pub temperature: f32,
}

These async APIs work fine on their own but won't let you use the same I2c instance to talk to both devices because each abstraction takes I2c by value.

Sharing the I2C bus

To make the Ds3231 and Scd30 drivers work with a shared I2c we can change the implementation to use a shared async::Mutex<I2c> .

The updated Ds3231 driver would look like this:

pub struct Ds3231<'a> {
    i2c: &'a Mutex<I2c>, // <-
}

impl<'a> Ds3231<'a> {
    pub fn new(i2c: &'a Mutex<I2c>) -> Self {
        Self { i2c }
    }

    pub async fn get_datetime(
        &mut self,
    ) -> Result<NaiveDateTime, Error> {
        let mut buf = [0; 7];

        {   // this block has exclusive access to the I2C bus
            let i2c = self.i2c.lock().await;
            i2c.write_then_read(ADDRESS, &[0x00], &mut buf).await?;
            drop(i2c); 
        }   // ^ releases the I2C bus
        
        Ok(bytes2datetime(&buf)?)
    }
}

The updated Scd30 driver would look like this:

pub struct Scd30<'a> {
    i2c: &'a Mutex<I2c>, // <-
}

impl Scd30 {
    pub fn new(i2c: &'a Mutex<I2c>) -> Self {
        Self { i2c }
    }

    pub async fn get_measurement(
        &mut self,
    ) -> Result<Measurement, Error> {
        let mut buf = [0; 18];

        {
            let i2c = self.i2c.lock().await;
            i2c.write(ADDRESS, &READ_CMD).await?;

            // no other I2C transaction will occur between these
            // two function calls because we have exclusive access
            // to the I2C bus

            i2c.read(ADDRESS, &mut buf).await?;
            drop(i2c); // release the I2C bus
        }
        
        Ok(bytes2measurement(&buf)?)
    }
}

(Small digression: under a multi-threaded executor it's important to hold the MutexGuard for the span of write.await and read.await ; not doing so could lead to another task stealing the I2C to communicate with a different device; this may be problematic as not all I2C devices may correctly handle this scenario. This, however, is a discussion to be had when trying to make this driver generic so we won't delve into it right now)

Using the two drivers, from different tasks, with the same I2C bus would look like this:

use async_cortex_m::{Mutex, task};
use cortex_m_rt::entry;

#[entry]
fn main() -> ! {
    static mut M: Option<Mutex<I2c>> = None;
    
    let i2c = I2c::new();
    let m: &'static Mutex<I2c> = M.get_or_insert(Mutex::new(i2c));

    let scd30 = Scd30::new(m);
    task::spawn(async move {
        loop {
             // .. other async things ..
             let m = scd30
                 .get_measurement()
                 .await
                 .unwrap_or_else(handle_error);
             // .. other async things ..
        }
    });
    
    let ds3231 = Ds3231::new(m);
    task::block_on(async {
        loop {
             // .. other async things ..
             let datetime = ds3231
                 .get_datetime()
                 .await
                 .unwrap_or_else(handle_error);
             // .. other async things ..
        }
    })
}

(Small digression: under a single-threaded executor and certain circumstances the above program can run into resource starvation where one task consecutively locks the I2C bus denying the other task access to it. An interesting topic that we won't expand on in this blog post.)

Nothing of this is "In Theory"

At Ferrous Systems we have been building a proof of concept executor for the Cortex-M architecture (though it has very few architecture specific bits so it should be fairly portable to other architectures). All the snippets presented in this blog post are fragments of fully working examples that you can find in this repository . The most complete example uses the async::Mutex<I2c> pattern to build an interactive serial console (see below) that lets you access a I2C real time clock and a I2C gas sensor connected to the same I2C bus.

> help
Commands:
help              displays this text
date              display the current date and time
sensors           displays the gas sensor data
set date %Y-%m-%d changes the date
set time %H:%M:%S changes the time
> sensors
CO2: 652ppm
T: 26C
RH: 23%
> set time 18:49:30
> date
2020-02-28 18:49:32

The executor and examples use zero unstable features, like #[alloc_error_handler] , but depend on this pull request of ours that makes async/await work on no_std . As that PR does not add an unstable feature but rather changes the implementation details of an existing stable feature the change will immediately ride the train towards stable once it lands.

There still lots of work to do in the area of asynchronous embedded Rust. This post only roughly covers some of the API that application and driver authors will deal with but there's still plenty of work to do before asynchronous HALs can prosper. Namely, an asynchronous version of the existing embedded-hal traits need to be developed; and community consensus, and documentation, about how to port existing blocking HALs to async/await also needs to be built.

We see plenty of potential in async/await for embedded. It can become the go to multitasking solution for applications that don't have hard real-time requirements. In particular, the concept of tunable async runtimes seems well suited for these applications where one may need to be highly energy efficient (e.g. battery powered) and the next may need to be highly responsive.

Ferrous Systems GmbH is a Rust consultancy based in Berlin. Interested in leveraging async / await for your next embedded Rust project or using Rust in your next embedded project? We do development and consulting! Want to learn how to effectively use Rust's async/await feature or get started with (embedded) Rust development? We also do trainings!Contact us


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK