2

深入理解asyncio(二)

 3 years ago
source link: https://www.dongwm.com/post/understand-asyncio-2/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Asyncio.gather vs asyncio.wait

在上篇文章已经看到多次用asyncio.gather了,还有另外一个用法是asyncio.wait,他们都可以让多个协程并发执行。那为什么提供 2 个方法呢?他们有什么区别,适用场景是怎么样的呢?其实我之前也是有点困惑,直到我读了 asyncio 的源码。我们先看 2 个协程的例子:

async def a():
    print('Suspending a')
    await asyncio.sleep(3)
    print('Resuming a')
    return 'A'


async def b():
    print('Suspending b')
    await asyncio.sleep(1)
    print('Resuming b')
    return 'B'

在 IPython 里面用 gather 执行一下:

In : return_value_a, return_value_b = await asyncio.gather(a(), b())
Suspending a
Suspending b
Resuming b
Resuming a

In : return_value_a, return_value_b
Out: ('A', 'B')

Ok,asyncio.gather方法的名字说明了它的用途,gather 的意思是「搜集」,也就是能够收集协程的结果,而且要注意,它会按输入协程的顺序保存的对应协程的执行结果。

接着我们说asyncio.await,先执行一下:

In : done, pending = await asyncio.wait([a(), b()])
Suspending b
Suspending a
Resuming b
Resuming a

In : done
Out:
{<Task finished coro=<a() done, defined at <ipython-input-5-5ee142734d16>:1> result='A'>,
 <Task finished coro=<b() done, defined at <ipython-input-5-5ee142734d16>:8> result='B'>}

In : pending
Out: set()

In : task = list(done)[0]

In : task
Out: <Task finished coro=<b() done, defined at <ipython-input-5-5ee142734d16>:8> result='B'>

In : task.result()
Out: 'B'

asyncio.wait的返回值有 2 项,第一项表示完成的任务列表 (done),第二项表示等待 (Future) 完成的任务列表 (pending),每个任务都是一个 Task 实例,由于这 2 个任务都已经完成,所以可以执行task.result()获得协程返回值。

Ok, 说到这里,我总结下它俩的区别的第一层区别:

  1. asyncio.gather 封装的 Task 全程黑盒,只告诉你协程结果。
  2. asyncio.wait 会返回封装的 Task (包含已完成和挂起的任务),如果你关注协程执行结果你需要从对应 Task 实例里面用 result 方法自己拿。

为什么说「第一层区别」,asyncio.wait看名字可以理解为「等待」,所以返回值的第二项是 pending 列表,但是看上面的例子,pending 是空集合,那么在什么情况下,pending 里面不为空呢?这就是第二层区别:asyncio.wait支持选择返回的时机。

asyncio.wait支持一个接收参数return_when,在默认情况下,asyncio.wait会等待全部任务完成 (return_when='ALL_COMPLETED'),它还支持 FIRST_COMPLETED(第一个协程完成就返回)和 FIRST_EXCEPTION(出现第一个异常就返回):

In : done, pending = await asyncio.wait([a(), b()], return_when=asyncio.tasks.FIRST_COMPLETED)
Suspending a
Suspending b
Resuming b

In : done
Out: {<Task finished coro=<b() done, defined at <ipython-input-5-5ee142734d16>:8> result='B'>}

In : pending
Out: {<Task pending coro=<a() running at <ipython-input-5-5ee142734d16>:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x108065e58>()]>>}

看到了吧,这次只有协程 b 完成了,协程 a 还是 pending 状态。

在大部分情况下,用 asyncio.gather 是足够的,如果你有特殊需求,可以选择 asyncio.wait,举 2 个例子:

  1. 需要拿到封装好的 Task,以便取消或者添加成功回调等
  2. 业务上需要 FIRST_COMPLETED/FIRST_EXCEPTION 即返回的

asyncio.create_task vs loop.create_task vs asyncio.ensure_future

创建一个 Task 一共有 3 种方法,如这小节的标题。在上篇文章我说过,从 Python 3.7 开始可以统一的使用更高阶的asyncio.create_task。其实asyncio.create_task就是用的loop.create_task

def create_task(coro):
    loop = events.get_running_loop()
    return loop.create_task(coro)

loop.create_task接受的参数需要是一个协程,但是asyncio.ensure_future除了接受协程,还可以是 Future 对象或者 awaitable 对象:

  1. 如果参数是协程,其实底层还是用的 loop.create_task,返回 Task 对象
  2. 如果是 Future 对象会直接返回
  3. 如果是一个 awaitable 对象会 await 这个对象的__await__方法,再执行一次 ensure_future,最后返回 Task 或者 Future

所以就像ensure_future名字说的,确保这个是一个 Future 对象:Task 是 Future 子类,前面说过一般情况下开发者不需要自己创建 Future

其实前面说的asyncio.waitasyncio.gather里面都用了asyncio.ensure_future。对于绝大多数场景要并发执行的是协程,所以直接用asyncio.create_task就足够了~

shield

接着说asyncio.shield,用它可以屏蔽取消操作。一直到这里,我们还没有见识过 Task 的取消。看一个例子:

In : loop = asyncio.get_event_loop()

In : task1 = loop.create_task(a())

In : task2 = loop.create_task(b())

In : task1.cancel()
Out: True

In : await asyncio.gather(task1, task2)
Suspending a
Suspending b
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
cell_name in async-def-wrapper()

CancelledError:

在上面的例子中,task1 被取消了后再用asyncio.gather收集结果,直接抛 CancelledError 错误了。这里有个细节,gather 支持return_exceptions参数:

In : await asyncio.gather(task1, task2, return_exceptions=True)
Out: [concurrent.futures._base.CancelledError(), 'B']

可以看到,task2 依然会执行完成,但是 task1 的返回值是一个 CancelledError 错误,也就是任务被取消了。如果一个创建后就不希望被任何情况取消,可以使用asyncio.shield保护任务能顺利完成:

In : task1 = asyncio.shield(a())

In : task2 = loop.create_task(b())

In : ts = asyncio.gather(task1, task2, return_exceptions=True)

In : task1.cancel()
Out: True

In : await ts
Suspending a
Suspending b
Resuming a
Resuming b
Out: [concurrent.futures._base.CancelledError(), 'B']

可以看到虽然结果是一个 CancelledError 错误,但是看输出能确认协程实际上是执行了的。

注:此处之前有一个理解错误,已经在 深入 asyncio.shield 中重新解释和理解,推荐阅读。

asynccontextmanager

如果你了解 Python,之前可能听过或者用过 contextmanager ,一个上下文管理器。通过一个计时的例子就理解它的作用:

from contextlib import contextmanager


async def a():
    await asyncio.sleep(3)
    return 'A'


async def b():
    await asyncio.sleep(1)
    return 'B'


async def s1():
    return await asyncio.gather(a(), b())


@contextmanager
def timed(func):
    start = time.perf_counter()
    yield asyncio.run(func())
    print(f'Cost: {time.perf_counter() - start}')

timed 函数用了 contextmanager 装饰器,把协程的运行结果 yield 出来,执行结束后还计算了耗时:

In : from contextmanager import *

In : with timed(s1) as rv:
...:     print(f'Result: {rv}')
...:
Result: ['A', 'B']
Cost: 3.0052654459999992

大家先体会一下。在 Python 3.7 添加了 asynccontextmanager,也就是异步版本的 contextmanager,适合异步函数的执行,上例可以这么改:

@asynccontextmanager
async def async_timed(func):
    start = time.perf_counter()
    yield await func()
    print(f'Cost: {time.perf_counter() - start}')


async def main():
    async with async_timed(s1) as rv:
        print(f'Result: {rv}')

In : asyncio.run(main())
Result: ['A', 'B']
Cost: 3.00414147500004

async 版本的 with 要用async with,另外要注意yield await func()这句,相当于 yield +await func()

PS: contextmanager 和 asynccontextmanager 最好的理解方法是去看源码注释,可以看延伸阅读链接 2,另外延伸阅读链接 3 包含的 PR 中相关的测试代码部分也能帮助你理解

本文代码可以在 mp 项目 找到


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK