Rust异步浅谈 | 无双
source link: https://leaxoy.github.io/2020/03/rust-async-runtime/?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
这篇文章主要描述了Rust中异步的原理与相关的实现,Rust异步也是在最近的版本(1.39)中才稳定下来。希望可以通过这边文章在提高自己认知的情况下,也可以给读者带来一些解惑。(来自于本人被Rust异步毒打的一些经验之谈).
阅读这篇文章需要对操作系统,IO多路复用,以及一些数据结构有一定的概念。
老生常谈,几乎所有的语言中异步相关的解释都是统一的:线程切换开销大,且资源浪费(主要集中在内存上),这篇文章假定读者已对这些情况已知晓。
Future
Future
字面的意思就是未来发生的事情,在程序中则代表了一系列暂时没有结果的运算子,Future
需要程序主动去poll
(轮询)才能获取到最终的结果,每一次轮询的结果可能是Ready
或者Pending
。
当Ready
的时候,证明当前Future
已完成,代码逻辑可以向下执行;当Pending
的时候,代表当前Future
并未执行完成,代码不能向下执行,看到这里就要问了,那什么时候才能向下执行呢,这里的关键在于Runtime
中的Executor
需要不停的去执行Future
的poll
操作,直至Future
返回Ready
可以向下执行为止。等等,熟悉Linux
的同学可能要说了,怎么感觉和Epoll
模型是非常的相似呢,没错,这确实非常相像(但是依然有些许不同,Future
可以避免空的轮询),看样子优秀的设计在哪里都可以看到类似的身影。为了实现Rust声称的高性能与零开销抽象,这里做了一些优化,下面一一讲述。
Future结构
|
|
Future
的定义非常简单,Output
代表了Future
返回的值的类型,而poll
方法是执行Future
的关键,poll
方法可以返回一个Poll
类型,Poll
类型是一个Enum
,包装了Ready
和Pending
两种状态。
Context
Context
提供了对Future
进行调度的功能。目前Context
作为一个结构体,有一个核心成员Waker
,用来唤醒绑定的Future
. 未来不排除在Context
添加新的字段。
|
|
|
|
Runtime
Runtime
由两部分组成,Executor
和Reactor
。
Executor
为执行器,没有任何阻塞的等待,循环执行一系列就绪的Future
,当Future
返回pending
的时候,会将Future
转移到Reactor
上等待进一步的唤醒。
Reactor
为反应器(唤醒器),轮询并唤醒挂载的事件,并执行对应的wake
方法,通常来说,wake
会将Future
的状态变更为就绪,同时将Future
放到Executor
的队列中等待执行。
下面的序列图大概简单的描绘了Future
在Executor
和Reactor
之间来回转移的流程与状态变化。
|
|
rust-future
上面说明了一个简单的Future
的执行,如果是一个比较复杂的Future
的话,比如中间会有多次IO
操作的话,那么流程是怎么样的呢?看下面一段代码:(仅仅作为demo,不代表可以直接使用)
|
|
对应的执行流程为:
|
|
rust-future-complex
上面的这些例子系统中只展示了一个
Future
的执行情况,真实的生产环境中,可能有数十万的Future
同时在执行,Executor
和Reactor
的调度模型要更复杂一些。
一句话概括Runtime
,Future
不能马上返回值的时候,会被交给Reactor
,Future
的值准备就绪后,调用wake
传递给Executor
执行,反复执行,直至整个Future
返回Ready
。
Executor
通常来说,Executor
的实现可以是单线程与线程池两个版本,两种实现间各有优劣,单线程少了数据的竞争,但是吞吐量却容易达到瓶颈,线程池的实现可以提高吞吐量,但是却要处理数据的竞争冲突。下面我们以async-std
来分析基于线程池的实现:
|
|
这里做了大量的简化,整个Executor是一个线程池,每个线程都在不断的寻找可执行的task,然后执行,然后再找下一个task,再执行,永远重复。
从上面的main_loop中可以看到,cpu并不是一直毫无意义的空转,中间会有一些策略来优化cpu的使用。
Reactor
Reactor
作为反应器,上面同时挂载了成千上万个待唤醒的事件, 这里使用了mio
统一封装了操作系统的多路复用API
。在Linux
中使用的是Epoll
,在Mac
中使用的则是Kqueue
,具体的实现在此不多说。
在Future的基础上,出现了AsyncRead/AsyncWrite/AsyncSeek
等抽象来描述IO操作,在执行对应的Read/Write/Seek
操作时,如果底层的数据尚未准备好,会把所在的Future注册至Reactor。Reactor的流程如下:
|
|
Reactor
会不断的poll
就绪的事件,然后依次唤醒绑定在事件上的waker
,waker
唤醒的时候会把对应的task
移动到Executor
的就绪队列上安排执行。
结合
Executor
的运作原理不难发现,Executor
肯定不会poll
到未就绪的task
,因为只有就绪的任务才会被Reactor
放到Executor
的执行队列中,Executor
的资源利用率再一次被提高,这就是整个异步体系的高明之处。
Stream
Future
是异步开发中最基础的概念了,如果说Future
代表了一次性的异步值,那么Stream
则代表了一系列的异步值。Future
是1,Stream
是0,1或者N。 签名如下:
|
|
Stream
对应了同步原语中的Iterator
的概念,回忆一下,是不是连签名都是如此的相像呢!
|
|
Stream
用来抽象源源不断的数据源,当然也可以断(当 poll
到 None
的时候)。可以用来抽象 Websocket Connection
读取端,在Websokcet
中,服务端源源不断的接受客户端的值并处理,直至客户端断开连接。更进一步的抽象,MQ
中的Consumer
, Tcp
中接收方,都可以看作是一个Stream
, 因此Stream
的抽象对异步编程意义非凡。
思考: 除了上面的几种情况,还有什么可以抽象成
Stream
模型呢?
有了代表一次性的异步值Future
, 也有了代表可重复的异步值的Stream
, 因此,需要有一个代表一次或多次的异步值的通道,也就是接下来的Sink
。通常来说, Sink
可以来抽象网络连接的写入端,消息队列中的 Producer
。
|
|
在Sink的上层,我们可以封装 send
以及 send_all
等方法,用来抽象对应的 Future
与 Stream
.
Timer
很多情况下,我们希望可以延时执行一些操作,比如定时发送邮件,每隔一段时间生成一次报表。我们首先想到不就是sleep
一段时间就行了,下面的代码:
|
|
是不是很机智呢!😂😂😂! 遗憾的是,我们写完这段代码,提交后,还没上线,估计就要滚蛋了。因此,我们想要的是一个不阻塞当前线程的定时器,定时器到期自动唤醒并执行之后的操作。
不同于Tcp/Udp/Uds
,mio
没有提供对Timer
的封装。
通常来说,对定时器的处理要么是时间轮,要么堆,要么红黑树(时间复杂度更为平均O(log n)
)。时间轮比较典型的案例就是在Kafka
中的使用了,Go Runtime
用的则是堆,红黑树和堆的实现大致相同。
- 时间轮算法可以想象做钟表,每一格存储了到期的定时器,因此时间轮的最小精度为每一格所代表的时间(因此时间轮算法不适合用于对精度要求高的场景)。如果定时器的时间超过时间轮所能表示的时间怎么办呢,也简单,可以通过两种方式来优化。
- 多级时间轮来优化,可以想象,在钟表上,秒针每走一圈,分针走一格,同理分针走一圈,时针走一格,因此多级时间轮中,第一级的时间最为精确,第二级次之,第三级再次之…, 超过某一级时间轮所能表示的事件后,将定时器放到下一级时间轮中。
- 超过时间轮所能表示的时间范围后,对时间取余,插入到余数所在的格子中,这样一来,每个格子中存放的定时器需要加入轮数的记录,用来表明还差多少轮才能执行。每个格子中在插入新的定时器时,可以使用堆来堆定时器进行排序。
- 堆定时器(红黑树定时器)
使用最小堆来维护所有的定时器。一个工作线程不断的从堆里面寻找最近的定时器,如果定时器的时间比当前时间小,则唤醒该定时器对应的task,如果未达到设定的时间,则进行Thread::park(deadline-now)
操作,让出当前cpu一段时间。
目前futures-timer的实现为全剧唯一的一个堆。存在可优化空间, 比如
Go 1.14
的实现,把定时器提交到当前worker thread的本地堆里面,用来避免锁竞争,提高性能。
上面定义了实现异步的最基本概念,Future
, Stream
以及Sink
。
但是很多情况下,我们直接使用它们来构建我们的应用是非常困难的,例如:多个互为竞争关系的Future
,我们只需其中任意一个Future
返回即可,能想到的做法是,我们不断的遍历所有的Future
,直到某一个返回Ready
:
|
|
我们可以把上面的逻辑给包装一下,提供一个名为select!(futures...)
的宏,select
便可作为一个组合子而存在。类似的组合子还有很多,比如join(futures...)
,等待所有Future
完成。
更多的可以参考futures-util
.
Async/Await
上面所有的概念共同组成了Rust
的异步生态,那么现在想象一下,如何获取一个Future
运行的结果呢。一个可能的做法如下:
|
|
如果每次都要用户这么做的话,将会是多么痛苦的一件事儿呀,还不如用注册回调函数来实现异步呢!
有没有更精炼的方式来获取Future
的值呢,这就是async/await
出现的原因了。本质上来说,async/await
就是上面代码段的一个语法糖,是用户使用起来更加的自然。上面的代码可以替换成:
|
|
是不是有非常大的简化呢!
虽然上面提到了各种各样的概念,但是仔细捋一下,便会发现整个异步可以分为三层:
Future/Stream/Sink
,Reactor/Executor
直接作用于前面的三种类型。此层是为底层,一般用户很少接触,库的开发者接触较多。- 组合子层,为了提供更为复杂的操作,诞生了一系列的异步组合子,使得异步变得更利于使用,用户会使用这些组合子来完成各种各样的逻辑。
async/await
,准确的说,这层远没有上面两层来的重要,但是依然不可或缺,这层使得异步的开发变得轻而易举。
注意的地方
- 不要在任何异步函数中执行任何阻塞操作,不仅仅是
thread::sleep
, 还有标准库的Tcp/Udp
, 以及sync
中的channel
,Mutex
,RWLock
都不应该继续使用,除非你知道你在干什么!替换为async-std
与futures
中实现的版本。 - 如非必要,不要自己尝试去实现
Future
,自己实现的没有触发wake
操作的话,将永远不会唤醒,取而代之,用已经实现好的Future
进行组合。 - 使用
async/await
代替所有需要异步等待的点,这将会极大的简化你的代码。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK