8

[译]构建你自己的block_on

 4 years ago
source link: https://colobu.com/2020/01/30/build-your-own-block-on/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

原文: Build your own block_on()

如果你想搞清楚 future crate中的 block_on 是如何工作的,那么今天就让我们写一个自己的 block_on 函数。

这篇博文的灵感来自两个crate: wakefulextremewakeful 设计了一种从函数中创建 Walker 的简单方法,而 extreme 则是 block_on() 的及其简洁的实现。

我们的实现目标将与 extreme 略有不同。与其追求零依赖和最少的代码行数,不如追求一个安全高效但仍然非常简单的实现。

我们将使用的依赖项是 pin-utils , crossbeam , 和 async-task

函数签名

block_on 的签名如下。使用 future 作为参数,在当前线程中运行它(如果future是pending状态则阻塞),然后返回它的输出:

fn block_on<F: Future>(future: F) -> F::Output {
    todo!()
}

现在让我们实现 todo!() 部分。

初次尝试

注意 Futurepoll 方法的第一个参数是 pinned future ,所以首先我们需要pin住这个future。有一个简单方法可以安全的实现,就是使用 Box::pin() 。我们最好pin这个future在栈上,而不是堆上。

不幸的是,pin future到栈上的唯一的安全办法就是使用 pin-utils crate: pin_utils::pin_mut!(future);

pin_mut 宏把 future 从类型 F 的变量转换成 Pin<&mut F> 的变量。

下一步我们需要实现当这个future被唤醒后的处理逻辑。在我们的场景下,唤醒应该简单的解锁运行这个future的线程。

构造一个 Walker 很麻烦 - 只需看看 extreme 的实现就知道了。 extreme 是手工构建 Walker 的最简单的实现, 还包括那么多的 raw pointer,那么多的非安全代码... 现在先让我们跳过这一部分,留个空格以后填。

let waker = todo!();

最后,让我们从 Walker 中创建一个task context, 在一个循环中轮询这个future。如果它已经完成,则返回输出结果,如果它是 pending 状态,则则色当前线程:

let cx = &mut Context::from_waker(&waker);
loop {
    match future.as_mut().poll(cx) {
        Poll::Ready(output) => return output,
        Poll::Pending => thread::park(),
    }
}

如果你对 Context 感到困惑,那么只需理解它就是一个 Walker 的包装器 - 直此而已。当设计Rust中的 async/await 的时候,我们并不确定除了 Walker 之外是否传递给 poll 其它东西是否有用,因为我们设计了这个包装器,可以传递更多的信息。

不管怎样...我们快完成任务了。让我们回到walker的构建,开始完成填空 todo!()

如果你仔细想想, Walker 真的是一个仔细优化的奇幻的 Arc<dyn Fn() + Send + Sync> 版本, wake() 调用这个函数。换言之, Walker 是一个回调,当future继续执行的时候就会被调用。

既然 Walker 很难去构建, sagebind 提出了 waker_fn() , 一个直接的把任意函数转换成 Walker 的方式。我借用 waker_fn() ,把它放在我的crate async-task 中。

在我们的 block_on 实现中,回调只需解锁运行当前future的线程:

let thread = thread::current();
let waker = async_task::waker_fn(move || thread.unpark());

太简单了,比摆弄 RawWakerRawWakerVTable 好太多了。

内部实现上, waker_fn() 创建一个 Arc<impl Fn() + Send + Sync> ,通过非安全代码把它转换成 Walker ,就像我们在 extreme 中看到的那样。

现在让我们列出 block_on 的完整实现:

fn block_on<F: Future>(future: F) -> F::Output {
    pin_utils::pin_mut!(future);

    let thread = thread::current();
    let waker = async_task::waker_fn(move || thread.unpark());

    let cx = &mut Context::from_waker(&waker);
    loop {
        match future.as_mut().poll(cx) {
            Poll::Ready(output) => return output,
            Poll::Pending => thread::park(),
        }
    }
}

如果你想运行代码,你可以下载代码 v1.rs

parking的问题

但是,先别忙着庆祝,这里有个问题。如果future中的用户代码使用到了 park/unpark API,它可能“偷走”回调中的unpark通知,查看这个 issue 以了解更详细的情况。

一个可能的解决方案就是使用不同于 std::thread 的park/unpark api。这种方案下future内部的代码不会干扰唤醒机能。

crossbeam 有一个类似的park/unpark机制,而且它可以让我们创建任意多个 packer ,而不是每个线程一个。让我们在 block_on 每次调用的时候都创建一个:

fn block_on<F: Future>(future: F) -> F::Output {
    pin_utils::pin_mut!(future);

    let parker = Parker::new();
    let unparker = parker.unparker().clone();
    let waker = async_task::waker_fn(move || unparker.unpark());

    let cx = &mut Context::from_waker(&waker);
    loop {
        match future.as_mut().poll(cx) {
            Poll::Ready(output) => return output,
            Poll::Pending => parker.park(),
        }
    }
}

好啦,问题解决。

如果你想运行代码,可以执行文件 v2.rs

通过cache优化

创建 WalkerParker 并不是没有代价,创建对象都是有花费的,太不幸了,如何提升?

既然每次调用 block_on 都需要创建 WalkerParker ,为什么我们不在thread-local storage中缓存它们呢?这样调用 block_on() 时线程可以重用相同的对象:

fn block_on<F: Future>(future: F) -> F::Output {
    pin_utils::pin_mut!(future);

    thread_local! {
        static CACHE: (Parker, Waker) = {
            let parker = Parker::new();
            let unparker = parker.unparker().clone();
            let waker = async_task::waker_fn(move || unparker.unpark());
            (parker, waker)
        };
    }

    CACHE.with(|(parker, waker)| {
        let cx = &mut Context::from_waker(&waker);
        loop {
            match future.as_mut().poll(cx) {
                Poll::Ready(output) => return output,
                Poll::Pending => parker.park(),
            }
        }
    })
}

如果future可以很快执行,那么这个小小的改变可以使 block_on 显著地提高性能。

v3.rs 代码。

递归怎么办?

我们完成了么? 嗯...还差最后一项。

如果在 block_on 中的future的代码中再递归调用 block_on 会怎样?我们可以允许递归调用或者禁止递归。

如果我们允许递归,我们需要确保 block_on 的递归调用不会共享相同的 ParkerWalker ,否则没有办法区分哪个 block_on 需要唤醒。

futures crate的 block_on 在递归调用的时候会panic。我对允许还是禁止递归调用没有强烈的倾向性 - 它们都有理。 但是既然我们在模仿 futures 版本,就让我们禁止吧。

为了探测递归调用,我们需要引入另一个thread-local变量,指示我们是否已经在 block_on 中还是不在,如果一个mutable borrow已经active则panic:

fn block_on<F: Future>(future: F) -> F::Output {
    pin_utils::pin_mut!(future);

    thread_local! {
        static CACHE: RefCell<(Parker, Waker)> = {
            let parker = Parker::new();
            let unparker = parker.unparker().clone();
            let waker = async_task::waker_fn(move || unparker.unpark());
            RefCell::new((parker, waker))
        };
    }

    CACHE.with(|cache| {
        let (parker, waker) = &mut *cache.try_borrow_mut().ok()
            .expect("recursive `block_on`");

        let cx = &mut Context::from_waker(&waker);
        loop {
            match future.as_mut().poll(cx) {
                Poll::Ready(output) => return output,
                Poll::Pending => parker.park(),
            }
        }
    })
}

现在,我保证,我们的 block_on() 已经实现好了。最终版本的 block_on() 是正确的,健壮的,并且效率也高。

看代码 v4.rs

benchmark

效率高不高拉出来溜溜。让我们和 futures 中实现做比较。

首先让我们写一个辅助future类型,它会yield多次然后完成:

struct Yields(u32);

impl Future for Yields {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if self.0 == 0 {
            Poll::Ready(())
        } else {
            self.0 -= 1;
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

例如yield 10次:

#[bench]
fn custom_block_on_10_yields(b: &mut Bencher) {
    b.iter(|| block_on(Yields(10)));
}

让我们测试三次,分别yield 0次、10次、50次,分别使用我们自己实现的 block_onfutures 中的 block_on 。你可以在 yield.rs 找到全部代码。

以下是我机器上的运行结果:

test custom_block_on_0_yields   ... bench:           3 ns/iter (+/- 0)
test custom_block_on_10_yields  ... bench:         130 ns/iter (+/- 12)
test custom_block_on_50_yields  ... bench:         638 ns/iter (+/- 20)
test futures_block_on_0_yields  ... bench:          10 ns/iter (+/- 0)
test futures_block_on_10_yields ... bench:         236 ns/iter (+/- 10)
test futures_block_on_50_yields ... bench:       1,139 ns/iter (+/- 30)

结果显示我们的实现在这个场景下2到3倍的快。

还不错。

结论

Rust异步编程可能让人害怕,因为它包含太多机器相关的东西: Future trait、pinning、Context类型、Walker以及它们的朋友 RawWakerRawWakerVTable 、async和await、非安全的代码、raw pointer等等。

但问题是,很多丑陋的东西并不重要 - 它们只是无聊的样板,你可以使用 pin-utils , async-taskcrossbeam 等。

事实上,这次我们成功地使用几十行代码就构建了一个健壮高效的 block_on() ,无需理解大多数样板文件。在另一篇博文中,我们将构建一个真正的executor。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK