24

[译]异步代码中的阻塞操作

 4 years ago
source link: https://colobu.com/2020/01/28/blocking-inside-async-code/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Stjepan Glavina是Rust流行的库 Crossbeam 的作者,最近一年专注于 async-std 的开发。他最近写了两篇关于rust异步编程的文章,我翻译成中文,学习一下。

本篇原文是: Blocking inside async code

以下是翻译:

大家好,很久没写博文了,这次回来感觉真好。首先带来一个好消息。在Crossbeam上花了两年的时间后,2019年我把我的焦点放在了运行时异步编程研究上(比如 async-stdtokio )。尤其是我想让异步运行时(async runtimes)更有效、更健壮,同时也更简单。

在这篇文章中,我想谈谈所有的运行时都面临的一个有趣的问题:从异步代码中调用阻塞函数。

Async(异步) 和 Sync(同步)

我们最终在stable rust版本中加上了 async/await ,现在准备重写所有的同步代码,让它们变成异步的。我们应该这么做吗?我不知道。

sync库和async库的差异愈来越大,比如 stdasync-std 库。看起来两者很相近,除了一个有阻塞函数(sync),另一个有非阻塞函数(async)。类似的还有 surfattohttpc 库,它们都是http client库,一个是sync库,一个是async库。现在所有的新库的开发者都面临一个简单的抉择:应该提供一个sync库还是async库,还是两者都提供?

目前重复的API看起来很不幸,但是我个人很乐观,相信最终我们会找出一个办法出来。在任何情况下,我们都需要找到尽可能无缝地集成sync和async代码的方法。

从同步到异步,再到同步

rust的 main 函数是同步的,所以为了进入异步世界,我们需要显式的去做。使用 async-std ,我们可以通过调用 block_on() 函数进入异步世界:

use async_std::task;

// This is sync code.
fn main() {
    task::block_on(foo());
}

// This is async code.
async fn foo() {}

如果要从异步世界进入同步世界,可以在异步代码中调用同步代码:

// This is async code.
async fn foo() {
    bar();
}

// This is sync code.
fn bar() {}

所以为了从异步进入同步,我们不需要做任何额外的设置-只需调用同步函数就这么简单,除了...额...我们需要仔细关注需要花费很长时间的同步函数。孔子说在异步世界调用同步函数一定要三思。

阻塞影响并发

异步运行时的一个假设就是每次对 future 轮询的时候,它能很快返回 Ready 状态或者 Pending 状态。在异步代码中长时间阻塞是异步编程的一个很大的禁忌,一定要避免发生。

为了说明为什么,下面使用 surf 并发获取40个网页:

use async_std::task;
use std::time::Instant;

// Fetch the HTML contents of a web page.
async fn get(url: &str) -> String {
    surf::get(url).recv_string().await.unwrap()
}

fn main() {
    task::block_on(async {
        let start = Instant::now();
        let mut tasks = Vec::new();

        // Fetch the list of contributors for the first 40 minor Rust releases.
        for i in 0..40 {
            let url = format!("https://thanks.rust-lang.org/rust/1.{}.0/", i);

            // Spawn a task fetching the list.
            tasks.push(task::spawn(async move {
                let html = get(&url).await;

                // Display the number of contributors to this Rust release.
                for line in html.lines() {
                    if line.contains("individuals") {
                        println!("{}", line.trim());
                    }
                }
            }))
        }

        // Wait for all tasks to complete.
        for t in tasks {
            t.await;
        }

        // Display elapsed time.
        dbg!(start.elapsed());
    });
}

这个程序在我的机器上需要1.5秒就可以完成。注意因为 get 函数是异步的,所以获取40个页面是并发执行的。

现在让我们把 get 改成阻塞方式。我们使用 attohttpc 代替 surf ,它们比较类似,除了提供一个同步的接口:

async fn get(url: &str) -> String {
    attohttpc::get(url).send().unwrap().text().unwrap()
}

不出所料,这个程序现在效率更低,需要3秒钟完成。我的计算机有8个逻辑内核,这意味着异步std执行器生成8个工作线程,因此我们一次只能获取8个web页面。

这个例子的教训是:阻塞会损害并发性。很重要的一点是,我们不要在异步代码内部阻塞,否则执行器将无法执行有用的工作-相反,它只会浪费时间阻塞。

阻塞无处不在

通过上面的我们看到了异步代码中的阻塞是如何影响性能的。当然,这个例子有点”做作“,因为您只需要使用 surf 而不是 attohttpc ,问题就解决了。但坏消息是,阻塞是不易察觉的:它无处不在,你甚至没有意识到!

考虑 标准输入标准输出 。很明显,读取标准输入块时,不应该在异步代码中使用 std::io::Stdin 。但是如果你看到 println! 你会皱眉头吗!我敢打赌,我们大部分时间都假设打印到标准输出不会阻塞,而它确实是阻塞的。

如果你想知道什么场景下 println!() 会阻塞,可以假想我们在shell中执行 program1 | program2 ,这样 program1 的输出就通过管道传输到 program2 中。如果 program2 读取输入的速度非常慢,那么 program1 将不得不在打印某些内容并且管道已满时阻塞。

密集的计算也会导致阻塞。假设我们通过调用 v.sort() 对一个非常大的 Vec 进行排序。如果排序需要一秒钟左右的时间来完成,我们应该考虑将该计算从异步执行器中移除。

有时甚至有一些程序员不太小心会掉进的“陷阱”。例如,假设我们使用 Rayon 在异步代码中调用 v.par_sort() ,人们可能会天真地认为这是可以的,因为排序发生在Rayon的执行器中,而事实是异步执行器仍然会阻塞以等待Rayon的结果。

但性能下降并不是唯一的问题。如果异步执行器的每一个线程都陷在读取标准输入之类的事情上,那么整个程序也有可能陷入 死锁状态 ,无法继续执行!

最后,值得一提的是,即使是简单的内存访问也可能被阻塞!例如,考虑驻留在旋转磁盘上的swap memory。如果线程正在访问磁盘上的swap memory,它将不得不阻塞,直到该页从物理磁盘中取出并移到主内存中。

所以阻塞是非常普遍的,很难从异步代码中完全分离出来。我相信我们必须接受这样一个事实:不管我们如何小心地消除阻塞,阻塞总是存在于异步代码中。

可能的解决方案

当我们预期在异步代码中阻塞时,我们应该考虑将阻塞逻辑移动到不同的线程池中,这样执行器就可以继续运行而不必等待它。像 async stdtokio 这样的运行时提供了 spawn_blocking() 函数来帮助解决这个问题。

为了演示如何使用该函数,让我们看看 fs::read_to_string() 是如何在 async std 中实现的:

async fn read_to_string<P: AsRef<Path>>(path: P) -> io::Result<String> {
    let path = path.as_ref().to_owned();
    spawn_blocking(move || std::fs::read_to_string(path)).await
}

函数 spawn_blocking() 将闭包生成到专用于运行阻塞函数的特殊线程池中。然后,异步执行器不必阻塞闭包的结果,而是异步 await 返回的 JoinHandle 的结果。

注意,我们不能将对path的引用传递到闭包中,因为在同步版本完成之前,可能会取消 read_to_string() 函数。不幸的是,将路径传递到闭包的唯一方法是克隆它。这有点低效,也有点笨重。

幸运的是,Tokio有一种运行阻塞函数的替代方法:它可以就地执行闭包,并告诉当前线程停止作为异步执行器的一部分,并将该职责移交给一个新线程。在某种程度上,它与 spawn_blocking() 相反——我们没有将闭包发送到新线程并继续事件循环,而是将事件循环发送到新线程并继续运行闭包。

这是 block_in_place() 实现异步 read_to_string() 的方式:

async fn read_to_string<P: AsRef<Path>>(path: P) -> io::Result<String> {
    block_in_place(|| std::fs::read_to_string(path))
}

注意我们不必再clone path,这是因为在内部 sync read_to_string() 完成之前你不可能取消外部的 async read_to_string()

虽然 spawn_blocking()block_in_place() 都解决了异步执行器陷入阻塞代码的问题,但它们之间有一个重要的区别。注意 spawn_blocking() 实际上是一个异步函数,因为它返回一个可以等待的 future ,而 block_in_place() 只是一个普通的同步函数。

通过例子看看有什么区别:

let (s1, s2) = futures::join!(read_to_string("foo.txt"), read_to_string("bar.txt"));

如果 read_to_string() 是通过 spawn_blocking() 实现的,那么这两个文件可以并行的读取,而如果是通过 block_in_place() 实现的,那么这两个文件是串行读取的,一个读完才读下一个。

结论

关键结论是:

  • 在异步代码中阻塞将使性能受损,甚至导致死锁。
  • 我们需要使用 spawn_blocking()block_in_place() 隔离程序的阻塞部分。
  • 阻塞无处不在,很难完全隔离它。

此外,有时甚至很难说什么代码是阻塞的,什么代码不是阻塞的。如果一个函数需要1秒来完成,我们可能会认为它是阻塞的。但如果需要1毫秒呢?好吧,取决于特定的用例-有时我们应该考虑阻塞,有时我们不应该。这完全取决于你的场景。

阻塞是可怕的,我们需要防御地将它与异步代码隔离开来。但是我们只能做这么多,阻塞仍然不可避免地会潜入到我们的异步代码中。这听起来可能是一个令人悲伤和失望的状况,但我很乐观。我相信有比 spawn_blocking()block_in_place() 更好的解决方案,我将在下面的博客文章中讨论。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK