17

异步代码的几种写法 | Rust学习笔记

 3 years ago
source link: https://blog.51cto.com/14915984/2538928
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

作者:谢敬伟,江湖人称“刀哥”,20年IT老兵,数据通信网络专家,电信网络架构师,目前任Netwarps开发总监。刀哥在操作系统、网络编程、高并发、高吞吐、高可用性等领域有多年的实践经验,并对网络及编程等方面的新技术有浓厚的兴趣。

Rust 历史不长,仍然处于快速发展的历程中。关于异步编程的模式,现在已经发展到 async/await 协程的高级阶段。大概是因为 async/await 出现的时间还不长,所以现有大多数的开源项目并不是或不是纯粹使用 async/await 来书写的,而是前前后后有多种的写法。这样的状况给 Rust 的学习带来了一些的难度。在这里,我们来捋一捋异步代码的几种写法。

mio

最原始的方式是使用 mio 进行开发。 mio 是一个底层异步 I/O 库,提供非阻塞方式的 API ,具有很高的性能。实际上 mio 是对于操作系统 epoll/kqueue/IOCP 的封装。在 C/C++ 中我们使用 libevent 之类的库, mio 可以理解为对应的 Rust 版本。基于 mio 的代码大致如下:

loop {
     // Poll Mio for events, blocking until we get an event.
     poll.poll(&mut events, None)?;

     // Process each event.
     for event in events.iter() {
         if event.is_writable() {
             // socket可写,开始发送数据
         }

         if event.is_readable() {
             // socket可读,开始接收数据
         }
         // socket 关闭,退出循环
         return Ok(());
     }
 }

总的来说,这是完全基于异步事件通知的写法,和 C/C++ 区别不是很大,异步代码对于程序员是一个挑战,当代码逻辑越来越复杂,添加新功能或是解决已有问题的难度也越来越大。

另外, mio 实现的是一个单线程事件循环,虽然可以处理成千上万路的 I/O 操作,但没有多线程的能力,需要自己扩充。

Future Poll

为了更好地规范异步的逻辑, Rust 抽象出 Future 表示尚未发生的事物。这些 Future 可以用很多方式組合成一个更复杂的复合 Future 来代表一系列的事件。 Future 需要程序主动去 poll (轮询)才能获取到最终的结果,每一次轮询的结果可能是 Ready 或者 Pending

运行库提供 ExecutorReactor 来执行 Future ,也就是调用 Futurepoll 方法循环执行一系列就绪的 Future ,当 Future 返回 Pending 的时候,会将 Future 转移到 Reactor 上等待唤醒。 Reactor 被用来负责唤醒之前无法完成的 Future 。事实上, tokioReactor 是基于 mio 实现的,而 async-std/smol 则是封装了 epoll/kqueue/IOCP ,提供类似的功能。

手动实现 Future 是一件相对繁琐的工作,主要的问题在于异步模式本身的特性。例如,接收网络数据,无法臆测每次轮询会收到多少字节的数据,往往需要开辟一段接收缓冲区容纳数据,协议解码也需要一个状态机拼包向上层提交;发送网络数据存在相似问题,发送数据时底层未就绪,则缓冲发送数据,待下次轮询时,需要首先检查并处理发送缓冲区。另外还有一些值得注意的地方,如果手动实现的 Future 返回 Pending ,则必须自己实现唤醒机制,也就是需要将 cx 克隆一份记下来,然后在适当的时侯调用 cx.wake() 。因为网络相关的功能往往是分层的,因此手动的 Poll 循环也会是层层堆叠的,这时候,返回值 Poll::Ready(T) 就有学问了。泛型T可能包裹各种不同的数据, Option<T>Result<T,E> ,或者两者的组合。因为最外层还有一个 Poll<T> ,所有这时候的 match 语句写起来会非常臃肿,粘贴复制写很多代码,完成的功能却非常有限,而且由于这些代码很相似,大大增加了出错的可能性。

标准库中仅仅定义了 Future ,更多的相关功能需要引用 futures-rs 类库,里面定义了一系列有关异步的操作,包括 StreamSinkAsyncReadAsyncWrite 等基础 Trait ,以及对应实现了大量方便操作的组合子的 Ext Trait ,特别用途的 fusedBoxTry 系列的扩展,诸如 join!select!pin_mut! 等一系列的宏。理论上,不使用这些扩展也能写出代码,只不过那样的代码很可能篇幅会长的可怕。值得一提的是,除了一些可以简化代码的过程宏之外,扩展 Trait 提供的组合子也会让代码精简不少。比如 Future::and_then 可以让代码写成链式调用的方式; Sink::send 包装了 Sink 发送三步骤 poll_ready/start_send/poll_flush ,使用 .await 一行代码直接就可以完成发送。因此,很多 poll 方式的代码实际上是准确地说是混合式的,其中也使用了不少 async 代码块。

总之,搞清楚 Future 相关的这些内容是需要花费不少时间,更不用说用它们来写代码了。不过,即便是使用 async/await 这种更高级原语,也是有必要了解底层的工作原理和实现机制,所谓知其然知其所以然。

async/await

使用 async/await 可以将异步的代码写得类似同步的过程,更加符合人体工程学。因为 async 被翻译为一个 Future 状态机,原先在 poll 方式中需要处理的与 Pending 相关的状态现在都由 async 生成的状态机自动完成,因此大大减轻了程序员的心智负担。

如前所述,底层的 Futures 提供了很多方便的组合子扩展 Future ,使用起来很简洁,可以极大地简化代码。例如,上文提到过的 Sink::send 包装了发送缓冲区的实现和异步发送的三个步骤; AsyncRead::read_exact 实现了读取指定字节数的功能,在处理网络协议解析时可以避免手写一个拼包状态机; AsyncWrite::write_all 实现了发送全部数据以及发送缓冲,等等。正是在这些底层功能的支持下, async/await 成为了更高级的书写异步代码的方式。也许会有少许担心,这样所谓“高级”会不会在性能上有很大损失?笔者个人不这么认为。自动实现的状态机也许未必比程序员手动完成的性能更差。状态机编程对于任何人,即便是一个有经验的程序员都是不小挑战。蹩脚的状态机实现不仅可能有性能问题,更大的风险来自于实现上的漏洞,以及维护上的困难。代码写出来更多是给别人看的,完成同样的功能,简洁的代码更有可能是更高质量的代码。

以下例子是固定长度分割的报文接收过程,使用 async/await 是很简单的。如果实现为一个 Stream/poll_next ,代码会复杂很多。

/// convenient method for reading a whole frame
    pub async fn recv_frame(&mut self) -> io::Result<Vec<u8>> {
        let mut len = [0; 4];
        let _ = self.inner.read_exact(&mut len).await?;  // inner socket, 支持 AsyncRead

        let n = u32::from_be_bytes(len) as usize;
        if n > self.max_frame_len {
            let msg = format!(
                "data length {} exceeds allowed maximum {}",
                n, self.max_frame_len
            );
            return Err(io::Error::new(io::ErrorKind::PermissionDenied, msg));
        }

        let mut frame = vec![0; n];
        self.inner.read_exact(&mut frame).await?;

        Ok(frame)
    }

最后,完全使用 async/await 写代码目前还有几个问题:

  • async trait

    当前 Trait 不支持 async fn ,无法直接用 Trait 来抽象异步方法。暂时解决办法是使用三方库 async-trait 。如下:

use async_trait::async_trait;

#[async_trait]
trait Advertisement {
    async fn run(&self);
}

async_trait 将代码转换为一个返回 Pin&lt;Box&lt;dyn Future + Send + 'async&gt;&gt; 的同步方法。因为装箱和动态派发的原因,性能上会有少许损失。

  • 异步析构

    当前 drop 方法必须是同步调用,不能使用 await 语法。当一个 I/O 对象越过生命周期被析构,往往在关闭底层句柄之前,还需要完成某些 I/O 操作。比如,通知网络对端连接已经关闭。在同步代码中,我们只需要在 drop() 中置入这些操作,但是在异步代码中,无法在 drop() 中做类似的事情。

解决办法,总是在异步 I/O 对象越过生命周期之前显式地执行关闭动作,或是,实现一个类似 GC 的功能,专门负责清理工作。

展望

笔者在学习 Rust 过程中,主要关注网络相关的并发编程。因为之前有在 Go 版本的 ipfs/libp2p 上的开发经验,故而学习研究了 rust-libp2p 以及 nervos tentaclerust-libp2pParity 实现的准官方版本,但是这个项目的代码及其难懂,过于强调使用泛型参数的抽象,导致代码可读性非常差。请教了代码作者,他承认代码可能有些复杂,但也强调都是有原因的... nervos tentacle 的实现在协议上不够完整,特别是与标准 libp2p 并不兼容。两个项目共有的特点是主要用 poll 的方式写代码,逻辑上都是状态机的嵌套。

因此,笔者试图完全使用 async/await 方式重构 libp2p ,参考 rust-libp2p 的实现,代码协程化,向上层提供纯粹的异步接口,争取在 API 层面的体验接近 go-libp2p ,这是推广 Rust 协程机制的一个尝试,同时也是个人的一个学习的过程。目前刚刚起步,仅完成了 secioyamux 部分,待合适时机开源,期望更多 Rust 爱好者共同来开发完善。

深圳星链网科科技有限公司(Netwarps),专注于互联网安全存储领域技术的研发与应用,是先进的安全存储基础设施提供商,主要产品有去中心化文件系统(DFS)、企业联盟链平台(EAC)、区块链操作系统(BOS)。

微信公众号:Netwarps


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK