19

Rust中的协程: Future与async/await

 3 years ago
source link: https://zijiaw.github.io/posts/a7-rsfuture/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文内容来自Writing an OS in Rust博客。

多任务处理

几乎所有操作系统的基本功能都包含多任务处理,即并发执行多个任务的能力(multitasking)。例如,你可能边阅读这篇文章,边打开音乐播放器听歌。即使你只开了浏览器窗口,操作系统肯定也有很多隐藏的进程执行着各种任务(打开任务管理器可以一看究竟)。

虽然看起来所有的任务都并行执行着,但实际上单个CPU核同一时间只能执行一个任务(它并没有分身术)。为了创造这种并行的假象,操作系统快速地切换着当前CPU正在执行的任务,例如在1s中内切换了100次任务,那么100个任务在1s中内都有所进展,现代CPU通常很快,因此在人的感受下,这100个任务就好像是并行执行的一样。

对于多核CPU,多任务并行才是确实存在的。例如,一个8核CPU确实可以并行处理8个任务,然而操作系统的处理与单核类似,依然需要不停切换任务。

通常来说,有两种多任务执行的形式:

  • 协作式多任务(cooperative multitasking):当前任务需要主动放弃CPU资源,然后其他任务才可以切换执行。
  • 抢占式多任务(preemptive multitasking):操作系统可以在任意时间暂停当前任务,强制切换到其他任务执行。

抢占式多任务

抢占式多任务的关键思想是让操作系统来控制切换任务的时机。操作系统通过响应中断,在中断处理函数中执行任务切换的操作(因为在中断的时候操作系统才会重新获取CPU的控制权)。

因为任务可能在任意的时间点被打断,可能处在某个函数/算法的中间点,为了在后续恢复它的执行,操作系统必须备份该任务的所有执行状态,包括栈内容,CPU中所有寄存器的值等等。这一过程叫做上下文切换(context switch)。

由于任务的调用栈信息可能非常庞大,实际上每个任务都具有不同的栈空间(即线程),在切换的时候只需要保存记录对应位置的寄存器(包括program counter,以及栈指针)。这样可以降低上下文切换的开销,因为通常来说1秒内会有约100次上下文切换。

抢占式调度的主要优点是操作系统能够完全控制每个任务的执行时间,这样可以保证每个任务对CPU时间的占用是公平的,而不需要信任任务之间自己的协作。这非常重要,特别是我们会去执行第三方的程序(通常难以信任),也会有多个用户共享同一系统。

缺点是每个任务需要自己的栈空间。和共享栈空间相比,这会导致一个相对高的内存占用,也会限制同一时刻可以存在的任务数量(例如32位的Linux下每个线程默认分配2MB的栈空间)。另一个缺点是操作系统必须要保存所有寄存器的值,即使当前任务可能只用到了其中的一部分。

抢占式调度与线程是现代操作系统的基石,它们使得我们能够在操作系统上执行各种各样的不可信任的第三方程序。

协作式多任务

在协作式多任务下,系统依赖于每个任务自愿放弃CPU的控制,然后才能够执行别的任务。这使得任务能够自己选择更合适的暂停的时机,例如当它需要等待网络IO的时候。

协作式多任务通常用在语言层面,例如以协程,或者async/await的形式出现,而不是在操作系统层面。实现的思路是由程序员或者编译器在程序中添加yield操作,从而让当前任务放弃CPU,转而执行其他任务。

协作式多任务通常与异步编程相结合,在异步操作中,一个耗时未完成的IO操作通常会返回"not ready"状态,而不会阻塞当前任务,此时任务就可以通过yield将CPU让给其他任务,直到IO操作ready。

由于任务自己控制暂停的时机,这样就不依赖于操作系统去保存执行状态了。相对应的,任务可以自行保存其恢复执行需要的信息,这通常带来更好的性能。例如,一个任务刚好结束了一个复杂的算法,那么它可能只需要保存最终的计算结果,而不再需要那些中间值了。

编程语言实现的协作式任务甚至能够在暂停前备份一部分调用栈。例如,Rust的async/await实现会保存在一个自动生成的结构体中保存需要的局部变量。通过保存一部分相关的调用栈,所有任务最终可以共享一个调用栈,这就使得内存消耗更节省,从而几乎可以创建任意数量的协作式任务,而不用担心OOM。

缺点非常明显:任务之间的协作依赖于程序员,否则一个恶意的,或者存在bug的任务会一直执行下去,不主动让出CPU,导致其他任务饿死。因此,协作式多任务仅用于我们知晓每个任务的具体内容的时候,相对的,操作系统层面执行的第三方用户程序肯定不能用协作式调度(因为我们根本不知道这些程序的内容)。

然而,其性能和内存消耗由于抢占式多任务,因此协作式多任务通常用在单个程序中,尤其是与异步操作协同。

Rust中的async/await

Rust语言提供了以async/await为基础的协作式多任务支持。在我们阐述async/await如何工作之前,我们首先需要理解Rust中的futures和异步编程。

Futures

一个future代表着一个可能还未准备好的值。例如一个由另一个任务计算出来的整数,或者一个正在下载中的文件。future的概念可以由下面的小例子阐述。

a

这个时序图展示了一个main函数首先从文件系统中读取一个文件,然后调用函数foo。这个过程重复了两次,第一次使用同步调用read_file,另一次是异步调用async_read_file。

通过同步调用,main函数需要等待文件读取完成,才能够执行foo;而通过异步调用,文件系统直接返回一个future,文件的读取异步地在后台运行,而main函数可以进一步执行foo(此时foo和文件的读取过程是并行的)。

Rust中的future

在Rust中,future的概念由Future这个trait实现,它的定义如下:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

关联类型Output表示这个值的类型,例如async_read_file函数返回的future中,Output应当被设置成File。

poll方法用于检查该future中的值是否ready,它返回一个枚举类型Poll,其定义为:

pub enum Poll<T> {
    Ready(T),
    Pending,
}

当值已经ready时,它被包在Ready里,否则返回Pending,表示值还在准备中。

poll方法接收两个参数:self: Pin<&mut Self>cx: &mut Context<'_>,前者表现为一个普通的&mut Self类型的引用,但是self本身被pin在固定的内存地址上(我们会在后面详细解释pin)。

变量cx: &mut Context<'_>的作用是传递一个Waker实例给异步任务。Waker用于让异步任务传递“完成”的信号。同样我们会在后面详细解释。

使用Future

现在我们了解了一些future的基本概念,但是依然不知道怎么用它。因为future表示异步任务的结果,它的值的完成时间是未知的,我们如何有效地获取它的值呢?

等待future完成

一个可能的答案是轮询等待一个future完成,就像下面的代码一样:

let future = async_read_file("foo.txt");
let file_content = loop {
    match future.poll(…) {
        Poll::Ready(value) => break value,
        Poll::Pending => {}, // do nothing
    }
}

上面的代码在一个循环中不停地轮询future(调用poll),直到返回ready。显然这么做是低效的,因为轮询会无意义地占用大量CPU时间。

一个更高效的办法可能是阻塞当前线程,直到future准备好。这两种都没有意义,因为最终的结果是异步又变成同步了,future的特性被舍弃了。

Future Combinators

future combinator是Rust中提供的一系列方法,如map,fold等,它能够链接不同的future,提供类似迭代器的使用体验。combinator不会等待future,而是经过一定的组合后返回一个新的future。

下面的例子使用combinator的思路将一个Future<Output = String>类型的future转换为Future<Output = usize>类型,计算了一个异步的string值的长度。

struct StringLen<F> {
    inner_future: F,
}
impl<F> Future for StringLen<F> where F: Future<Output = String> {
    type Output = usize;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
        match self.inner_future.poll(cx) {
            Poll::Ready(s) => Poll::Ready(s.len()),
            Poll::Pending => Poll::Pending,
        }
    }
}
fn string_len(string: impl Future<Output = String>)
    -> impl Future<Output = usize> {
    StringLen {
        inner_future: string,
    }
}
// Usage
fn file_len() -> impl Future<Output = usize> {
    let file_content_future = async_read_file("foo.txt");
    string_len(file_content_future)
}

上面的代码并不work,因为没有处理pinning,但是足够阐释future combinator了。它的基本思想就是通过string_len函数将一个future实例封装在另一个结构体StringLen中,而StringLen本身也是一个Future。当我们poll这个StringLen时,实际上就是在poll内部的原始future,然后进行一些额外的操作(例如计算string的长度),再返回。如果内部future没有ready,那么StringLen自然也没有ready。

FutureExt库实现了一些更泛用的combinator,包括map,then等,可以通过自定义的闭包来生成新的future。

future+combinator的优点在于,通过特定的结构体+trait来实现了异步操作,编译器可以很方便的进行优化,非常高效;缺点在于这种编程方式在某些情况下很不友好,受限于Rust严苛的类型系统和闭包传递。例如下面的代码:

fn example(min_len: usize) -> impl Future<Output = String> {
    async_read_file("foo.txt").then(move |content| {
        if content.len() < min_len {
            Either::Left(async_read_file("bar.txt").map(|s| content + &s))
        } else {
            Either::Right(future::ready(content))
        }
    })
}

在上面的代码中,我们实现如下的功能:读取foo.txt,如果其内容的长度小于min_len,则把bar.txt文件的内容加在后面再返回,否则直接返回。

可以看到有很多奇怪的东西,首先是move,因为在闭包里用到了min_len,因此必须把其所有权移入,然后我们通过Either把两个分支内的返回值包装了一下,这是因为Rust规定if-else的不同分支的返回值必须具有相同的类型,而这里map的返回值实际上是一个Map类型的结构体(虽然它实现了Future),而future::ready返回一个直接ready的Future,二者严格上不是同一类型。

这样一段简单的代码就如此复杂,可以想象在更复杂的项目中,特别是变量很多的情况下,需要很谨慎地处理变量的所有权,生命周期,以及类型,带给程序员的心智负担非常的重,代码也会变得很难读。

Async/Await模式

async/await的思想是让程序员以类似编写同步代码的方式编写异步代码,由编译器将其转换成异步代码。它基于两个关键字:async和await。async关键字可以用于函数声明,把一个同步函数变成返回Future的异步函数。

async fn foo() -> u32 {
    0
}
// the above is roughly translated by the compiler to:
fn foo() -> impl Future<Output = u32> {
    future::ready(0)
}

在async函数中,可以使用await关键字来获取异步future的值:

async fn example(min_len: usize) -> String {
    let content = async_read_file("foo.txt").await;
    if content.len() < min_len {
        content + &async_read_file("bar.txt").await
    } else {
        content
    }
}

上面的函数是前面使用combinator的example函数的async/await写法,使用await关键字,我们可以很方便的获取future内的值,而不需要写闭包。程序员写起来好像是在写同步的代码,但实际上这些都是异步执行的。

那么编译器对async函数做了什么呢?实际上整个async函数被编译成了一个状态机(实际上是在Future的poll中实现状态的转移),每次await调用都代表着一个新状态。我们以上面的example函数为例,它实际上有四个状态:

  1. start
  2. waiting on foo.txt
  3. waiting on bar.txt

每个状态都代表着函数的一个暂停位置。start和end状态表示函数开始执行的状态,和结束执行的状态。状态2表示当前正在等待操作系统加载文件foo.txt,状态3同理。如下图,编译出来的Future按照如下的状态图进行状态转换:每次poll都会根据当前的状态来尝试获取对应future的值,更新状态。红色的菱形代表着if content.len() < min_len这一分支判断。整个过程中,只有到达end状态后,poll的返回值是Ready,否则其他状态下,一直返回Pending。

a

状态的保存

为了能够从上一次poll的状态下恢复任务的执行,状态机必须在内部保存当前的状态,以及必要的局部变量,用于下一次poll。这就是编译器需要做的,Rust的编译器能够自动生成一个结构体,结构体内部保存需要的所有变量。例如,下面的代码近似的给出编译器生成的结构体:

async fn example(min_len: usize) -> String {
    let content = async_read_file("foo.txt").await;
    if content.len() < min_len {
        content + &async_read_file("bar.txt").await
    } else {
        content
    }
}
// The compiler-generated state structs:
struct StartState {
    min_len: usize,
}
struct WaitingOnFooTxtState {
    min_len: usize,
    foo_txt_future: impl Future<Output = String>,
}
struct WaitingOnBarTxtState {
    content: String,
    bar_txt_future: impl Future<Output = String>,
}
struct EndState {}

可以看到不同的状态需要保存的变量是不同的,在开始状态下由于任何代码都没有执行,只有min_len这一变量是需要保存的;WaitingOnFooTxtState状态下,多了一个foo文件代表的future;在WaitingOnBarTxtState下,由于min_len已经不再需要了,状态内变量变成了bar的future,以及foo读取出来的content变量。

现在我们可以用上面这几个状态构建一个状态机了,首先把这些状态包装在enum中:

enum ExampleStateMachine {
    Start(StartState),
    WaitingOnFooTxt(WaitingOnFooTxtState),
    WaitingOnBarTxt(WaitingOnBarTxtState),
    End(EndState),
}

现在我们可以基于这些状态编写poll函数了:

impl Future for ExampleStateMachine {
    type Output = String; // return type of `example`

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        loop {
            match self { // TODO: handle pinning
                ExampleStateMachine::Start(state) => {…}
                ExampleStateMachine::WaitingOnFooTxt(state) => {…}
                ExampleStateMachine::WaitingOnBarTxt(state) => {…}
                ExampleStateMachine::End(state) => {…}
            }
        }
    }
}

我们省略了具体的处理过程,但是上面的函数大致上就是example这个async function被编译出来的样子。注意:我们把match包裹在loop中,这是为了尽可能地推进状态的变化,即,如果其中一个状态的转移是ready的,那我们就优先进行状态转移,再返回,只有无法进行更多的状态转移后,我们才返回Pending。

现在我们分别实现四个分支下的状态转移功能,首先是Start:

ExampleStateMachine::Start(state) => {
    let foo_txt_future = async_read_file("foo.txt");
    let state = WaitingOnFooTxtState {
        min_len: state.min_len,
        foo_txt_future,
    };
    *self = ExampleStateMachine::WaitingOnFooTxt(state);
}

可以看到在Start状态下,状态转移一定会发生,因此在此分支下poll不会返回。

然后是WaitingOnFooTxt:

ExampleStateMachine::WaitingOnFooTxt(state) => {
    match state.foo_txt_future.poll(cx) {
        Poll::Pending => return Poll::Pending,
        Poll::Ready(content) => {
            // from body of `example`
            if content.len() < state.min_len {
                let bar_txt_future = async_read_file("bar.txt");
                // `.await` operation
                let state = WaitingOnBarTxtState {
                    content,
                    bar_txt_future,
                };
                *self = ExampleStateMachine::WaitingOnBarTxt(state);
            } else {
                *self = ExampleStateMachine::End(EndState));
                return Poll::Ready(content);
            }
        }
    }
}

在此状态下,我们就需要尝试读取foo.txt了,如果未读取完,则返回Pending,否则,将会进入下一段状态转移逻辑,判断文件长度是否满足条件,如果满足条件,则会转移到End状态(此时意味着整个过程结束,直接返回Ready),否则继续读取bar.txt,逻辑类似。

之后是WaitingOnBarTxt:

ExampleStateMachine::WaitingOnBarTxt(state) => {
    match state.bar_txt_future.poll(cx) {
        Poll::Pending => return Poll::Pending,
        Poll::Ready(bar_txt) => {
            *self = ExampleStateMachine::End(EndState));
            // from body of `example`
            return Poll::Ready(state.content + &bar_txt);
        }
    }
}

同样非常简单,如果读取完成,则Ready,否则Pending;至此,你会发现End状态下poll不需要做什么,实际上Rust规定了返回了Ready后的Future不可以再poll了,因此End下的poll直接panic即可:

ExampleStateMachine::End(_) => {
    panic!("poll called after Poll::Ready was returned");
}

好了,上面仅仅是简单的解释,实际上Rust的实现远比这个复杂,是基于generator来实现的状态机。最后,原先的async函数example变成了如下的一个普通函数,它返回一个状态机兼Future类型的变量。:

fn example(min_len: usize) -> ExampleStateMachine {
    ExampleStateMachine::Start(StartState {
        min_len,
    })
}

Pinning

我们已经好几次遇到pin这个类型了,现在我们来看看它的来龙去脉。

自引用结构体

如前所述,状态机把每一个状态的局部变量存放在struct中,对于example函数这样的很小的函数,没什么问题,但是当函数变得复杂,比如变量开始互相引用的时候,问题就出现了,例如下面这段代码:

async fn pin_example() -> i32 {
    let array = [1, 2, 3];
    let element = &array[2];
    async_write_file("foo.txt", element.to_string()).await;
    *element
}

上面这个函数的功能非常简单,把3写道文件foo.txt中,然后返回3;里面只有一个await,因此状态机存在3个状态,我们只关注中间的等待写文件的状态:

struct WaitingOnWriteState {
    array: [1, 2, 3],
    element: 0x1001c, // address of the last array element
}

可以看到,我们需要同时保存array和element,因为element用于返回值,而array被element引用。element是一个引用,其存储一个指针,指向0x1001c这个地址。这样一个结构体称为自引用结构体:因为它的某个成员引用了自身的另一个成员。

结构体内部的指针导致了一个非常重要的问题,我们看它的内存布局:

a

目前为止都没问题,很正常,但是当我们把这个结构体移动到别的地方,问题就出来了:

a

现在,你会发现array的位置不再是原先的,而element内部的指针确没有改变!这导致引用失效了。然而,struct的地址变化是很常见的,比如我们会把它作为参数传入另一个函数,这样它被copy到另外的函数运行栈上,地址就变化了。

可能的解法

一般来说,有三种办法解决这一指针悬空的问题:

  1. 移动时更新指针:每次移动时都主动修改指针的值。这种做法需要对编译器进行巨大的修改,而且由于指针的值是运行时动态可变的,这一操作无法在编译时确定,需要runtime动态判断引用是否是自引用,这非常损耗性能,不可取。
  2. 指针存储结构体内部的偏移,而不是绝对的内存地址:例如上面的example例,可以让element存储8,而不是那串地址,8代表着从结构体内存位置开始向后偏移8字节。这样的话内存移动后指针的值依然具有意义。然而,同样的,这依然需要在运行时动态判断当前的指针是否自引用,只有在自引用的时候才可以按照这一规则去存储,缺陷同1。
  3. 禁止移动结构体:我们直接解决导致这一问题的罪魁祸首,不让该结构体移动。这一做法的好处是完全没有性能损耗,这一规则完全通过Rust的类型系统在编译期就可以保证。缺点是需要程序员来注意“禁止move”这一限制。

众所周知,Rust的设计哲学就是保证零开销抽象,因此自然Rust选择了第三种方案,并提出了Pinning API,该API在RFC 2349中被提出。下面我们简单看一下这个API,并解释它是如何与async/await一起工作的。

使用堆上内存

堆上分配的对象具有固定的内存地址,因此我们可以用Box引用堆上的结构体,结构体内部的指针指向堆上的内存地址,这样的话由于不管怎么移动,Box指针指向的永远是堆上的对象,自然就无所谓移动与否了。

fn main() {
    let mut heap_value = Box::new(SelfReferential {
        self_ptr: 0 as *const _,
    });
    let ptr = &*heap_value as *const SelfReferential;
    heap_value.self_ptr = ptr;
    println!("heap value at: {:p}", heap_value);
    println!("internal reference: {:p}", heap_value.self_ptr);
}
struct SelfReferential {
    self_ptr: *const Self,
}

上面的代码就是一个例子,我们首先定义一个自引用的结构体SelfReferential,在main函数中,首先定义一个存储空指针的结构体,以Box引用之,而后获取该Box指向的对象的地址,将其赋值给self_ptr,现在heap_value不论怎么被复制、移动,都不会改变内部指针的有效性(因为整个对象在堆上)。

这个做法看起来无懈可击,但是依然可以通过一些特殊手段使其无效化,比如下面的代码:

let stack_value = mem::replace(&mut *heap_value, SelfReferential {
    self_ptr: 0 as *const _,
});
println!("value at: {:p}", &stack_value);
println!("internal reference: {:p}", stack_value.self_ptr);

通过调用mem::replace可以将一个堆上的对象转变为栈上对象,原先位置的对象会被替换为第二个参数,整体copy到了栈上变量stack_value中,然而此时stack_value中的self_ptr仍然指向原先的地址,这就产生错误了。因此,单纯使用堆上内存分配对象不能完全解决这一问题。

其实这个问题的本质原因就是使用Box智能指针允许我们获得内部对象的&mut引用,从而我们可以修改内部对象,调用mem::replace或者mem::swap这类的函数。因此最简单的想法就是禁止获取自引用对象的&mut引用。

Pin<Box<T>>Unpin

Pinning API提供的Pin类型和Unpintrait解决了&mut引用的问题。Unpin这一trait是自动为所有类型生成的,除非显式豁免。对于Pin类型,Rust保证无法以safe的方式获取内部对象的&mut引用。例如下面的代码可以使SelfReferential类型不再是Unpin的:

use core::marker::PhantomPinned;
struct SelfReferential {
    self_ptr: *const Self,
    _pin: PhantomPinned,
}

具体做法是在内部添加一个未实现Unpin的标记类型PhantomPinned变量(这是个0-size类型),那么根据自动trait生成规则,SelfReferential也就不会是Unpin的。这还没完,为了防止获取它的可变引用,我们还需要用Pin包装一下:

let mut heap_value = Box::pin(SelfReferential {
    self_ptr: 0 as *const _,
    _pin: PhantomPinned,
});

现在我们尝试运行一下修改版的example:

use std::mem;
use std::marker::PhantomPinned;

fn main() {
    let mut heap_value = Box::pin(SelfReferential {
        self_ptr: 0 as *const _,
        _pin: PhantomPinned,
    });
    let ptr = &*heap_value as *const SelfReferential;
    heap_value.self_ptr = ptr;
    println!("heap value at: {:p}", heap_value);
    println!("internal reference: {:p}", heap_value.self_ptr);
    
    // break it
    
    let stack_value = mem::replace(&mut *heap_value, SelfReferential {
        self_ptr: 0 as *const _,
        _pin: PhantomPinned,
    });
    println!("value at: {:p}", &stack_value);
    println!("internal reference: {:p}", stack_value.self_ptr);
}
struct SelfReferential {
    self_ptr: *const Self,
    _pin: PhantomPinned,
}

但是会编译失败,报错如下:

error[E0594]: cannot assign to data in a dereference of `std::pin::Pin<std::boxed::Box<SelfReferential>>`
  --> src/main.rs:10:5
   |
10 |     heap_value.self_ptr = ptr;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^ cannot assign
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `std::pin::Pin<std::boxed::Box<SelfReferential>>`

error[E0596]: cannot borrow data in a dereference of `std::pin::Pin<std::boxed::Box<SelfReferential>>` as mutable
  --> src/main.rs:16:36
   |
16 |     let stack_value = mem::replace(&mut *heap_value, SelfReferential {
   |                                    ^^^^^^^^^^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `std::pin::Pin<std::boxed::Box<SelfReferential>>`

这两个错误都是因为目前的这个类型Pin<Box<SelfReferential>>没有实现DerefMut这一trait,无法获取其可变引用,即&mut,这恰恰是我们需要的。

但是很显然,在第十行我们只是修改了变量的一个成员,这并不会移动对象的地址,我们想要只拒绝16行调用,但是编译器无法区分二者,它只能简单的禁止一切获取&mut的操作。

我们可以通过使用unsafe方法get_unchecked_mut来规避这一问题:

// safe because modifying a field doesn't move the whole struct
unsafe {
    let mut_ref = Pin::as_mut(&mut heap_value);
    Pin::get_unchecked_mut(mut_ref).self_ptr = ptr;
}

我们首先使用Pin::as_mutPin<Box<T>>中获取Pin<&mut T>,而后通过Pin::get_unchecked_mut获取内部的可变引用。这样,初始化问题就解决了,编译器只会拒绝第16行的调用。

Stack Pinning和Pin<&mut T>

上一节我们介绍了如何通过Pin来构建安全的堆上自引用结构体,但是堆上分配的对象总是有一定性能损耗,慢于栈上分配。Rust还提供了Pin<&mut T>来指向栈上自引用结构体。

不像Pin<Box<T>>实例完全拥有变量的所有权,Pin<&mut T>只是暂时借用了变量T,这导致程序员编写的时候要格外小心,我不建议Pin栈上变量。进一步阅读可以看pin modulePin::new_checked方法。

Pinning和Future

我们之前看到Future::poll方法接收的第一个参数以Pin<&mut Self>形式给出:

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>

这种方法采用self:Pin<&mut Self>而不是普通的&mut self的原因是,从async/await创建的Future实例通常是自引用的。通过将Self包装到Pin并让编译器为从async/await生成的自引用Future豁免Unpin,可以保证Futurepoll之间不会在内存中移动,这确保所有内部引用仍然有效。

值得注意的是,在第一次poll之前move future是可以的。因为future是懒惰的,在第一次poll之前什么都不做。因此,生成的状态机的起始状态仅包含函数参数,但不包含内部引用。为了调用 poll,调用者必须先将 future 包装到 Pin 中,以确保 future 不能再在内存中移动。由于栈Pinning更难正确处理,因此我建议始终将 Box::pinPin::as_mut 结合使用。

至于如何在poll函数内调用selfpoll方法,建议阅读源码,如map的实现,以及pin文档

Executors和Wakers

现在我们了解了Future的来龙去脉,但是有一个问题还没有解决:Future本身是惰性求值的,只有当我们去poll它的时候,状态才会演进,否则一切都是停滞不前的。

如果只有一个future还好,我们只需要在上面循环poll就行,但是一个应用通常会有很多future并发执行,此时就需要一个全局的executor负责执行所有的future,直到全部ready。

Executor

一个执行器的作用就是能够把不同的future作为一个个任务分配执行,通过某种spawn方法。而后executor就可以不停地执行不同的任务,在Pending时切换,实现协作式的多任务处理,达到并发执行的效果。

许多executor的实现也充分利用了多核性能。它们创建一个线程池来更高效地利用CPU,利用工作窃取(work stealing)算法来作负载均衡。也有一些特殊的executor实现专门为嵌入式系统作了优化,降低了延迟和内存占用。

同时为了避免无意义的轮询一个Pending的Future,这些执行器还利用Rust提供的WakerAPI进行优化。

Waker

Waker的想法是通过透传一个Waker类型的变量(包在Context中),任务本身会在完成后通过Waker通知executor,这样executor只需要在被通知后再去poll这个future,而不需要不停地poll一个Pending中的future。

例如下面的代码:

async fn write_file() {
    async_write_file("foo.txt", "Hello").await;
}

这个函数异步地把"Hello"写入文件foo.txt中,由于写入硬盘比较耗时,第一次对这个future的poll调用可能会返回Pending。然而,文件系统API内部会保存通过poll传入的Waker实例,在写入完成后通知executor写入完成,从而避免无意义的poll

在这篇文章的开头,我们讨论了抢占式和协作式多任务处理。抢占式多任务依赖操作系统在运行中对任务进行强制切换,而协作多任务则要求任务定期通过yield操作主动放弃对CPU的控制。协作方法的一大优点是任务可以自己保存它们的状态,这使得上下文切换更搞笑,并使任务之间共享相同的调用栈成为可能。

现在我们可以说,future和async/await就是协作式多任务(或者说协程机制)的一种实现:

  • 每个提交给executor的future基本上就是一个协作式的任务。
  • 在future机制下没有显式的yield操作,但是future返回Pending/Ready本身就是一种对CPU控制权的让出。
    • 如果future本身不想让出CPU,它只要不在poll中返回即可。
    • 每个future都可能完全占有CPU,让其他future饿死,因此我们必须信任future。
  • Futures内部存储了它们在下一次poll调用时继续执行所需的所有状态。使用async/await,编译器会自动检测所有需要的变量并将它们存储在生成的状态机中。
    • Rust只保存最小的必要的状态值。
    • 所有future的执行都共用一个栈,不存在任务独有的栈空间。

我们看到futures和async/await的特性基本上符合协作式多任务处理模式,它们只是使用了一些不同的术语。


Recommend

  • 44
    • www.tuicool.com 6 years ago
    • Cache

    Async and Await in Rust: a full proposal

    I’m really excited to announce the culmination of much of our work over the last four months: a pair of RFCs for supporting async & await notation in Rust. This will be very impactful for Rust in the network services...

  • 28
    • blog.rust-lang.org 5 years ago
    • Cache

    Async-Await on Stable Rust

    On this coming Thursday, November 7, async-await syntax hits stable Rust, as part of the 1.39.0 release.This work has been a long time in development -- the key ideas for zero-cost futures, for example, were

  • 19
    • ferrous-systems.com 5 years ago
    • Cache

    async/await on embedded Rust

    In aprevious post we explored what needs to be done on the rustc side to bring async/await to no_std Rust. In this post we'll explore what could be done once async/await is...

  • 18
    • os.phil-opp.com 5 years ago
    • Cache

    Writing an OS in Rust: Async/Await

    In this post we explore cooperative multitasking and the async/await feature of Rust. We take a detailed look how async/await works in Rust, including the design of the Future trait, the st...

  • 11
    • blog.rust-lang.org 4 years ago
    • Cache

    Async-await on stable Rust!

    Async-await on stable Rust!Async-await on stable Rust! Nov. 7, 2019 · Niko Matsakis On this coming Thursday, November 7, async-await syntax hits stable Rust, as part of the 1.3...

  • 13

    0:00 / 35:09 ...

  • 8
    • www.youtube.com 3 years ago
    • Cache

    Crust of Rust: async/await

    LOS ANGELESCrust of Rust: async/await29,129 viewsAug 31, 2021

  • 7

    A Neophyte's Introduction to the async/await landscape in Rust The truth of the matter is, the async/await features in other programming languages seem to be more straightforward than it is in Rust. For example in JavaScript, you have...

  • 10

    How to think about `async`/`await` in Rust 2023-06-30 (This is a section of the lilos intro...

  • 5

    器→工具, 编程语言 PEP 492:使用 async 和 awa...

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK