87

深入理解Python中的asyncio

 4 years ago
source link: https://www.tuicool.com/articles/BRZZv2Q
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

asyncio介绍

熟悉c#的同学可能知道,在c#中可以很方便的使用 asyncawait 来实现异步编程,那么在python中应该怎么做呢,其实python也支持异步编程,一般使用 asyncio 这个库,下面介绍下什么是 asyncio :

asyncio 是用来编写 并发 代码的库,使用 async/await 语法。 asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。 asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

asyncio中的基本概念

可以看见,使用asyncio库我们也可以在python代码中使用 asyncawait 。在 asyncio 中,有四个基本概念,分别是:

Eventloop

Eventloop 可以说是 asyncio 应用的核心,中央总控, Eventloop 实例提供了注册、取消、执行任务和回调 的方法。 简单来说,就是我们可以把一些异步函数注册到这个事件循环上,事件循环回循环执行这些函数(每次只能执行一个),如果当前正在执行的函数在等待I/O返回,那么事件循环就会暂停它的执行去执行其他函数。当某个函数完成I/O后会恢复,等到下次循环到它的时候就会继续执行。

Coroutine

协程本质就是一个函数,

import asyncio
import time

async def a():
    print('Suspending a')
    await asyncio.sleep(3)
    print('Resuming a')


async def b():
    print('Suspending b')
    await asyncio.sleep(1)
    print('Resuming b')


async def main():
    start = time.perf_counter()
    await asyncio.gather(a(), b())
    print(f'{main.__name__} Cost: {time.perf_counter() - start}')


if __name__ == '__main__':
    asyncio.run(main())

执行上述代码,可以看到类似这样的输出:

Suspending a
Suspending b
Resuming b
Resuming a
main Cost: 3.0023356619999997

关于协程的具体介绍,可以参考我以前的文章python中的协程 不过以前的那种写法,需要使用装饰器,已经过时了。

Future

Future 是表示一个“未来”对象,类似于 javascript 中的 promise ,当异步操作结束后会把最终结果设置到这个 Future 对象上, Future 是对协程的封装。

>>> import asyncio
>>> def fun():
...     print("inner fun")
...     return 111
... 
>>> loop = asyncio.get_event_loop()
>>> future = loop.run_in_executor(None, fun) #这里没有使用await
inner fun
>>> future #可以看到,fun方法状态是pending
<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/futures.py:348]>
>>> future.done() # 还没有完成
False
>>> [m for m in dir(future) if not m.startswith('_')]
['add_done_callback', 'cancel', 'cancelled', 'done', 'exception', 'get_loop', 'remove_done_callback', 'result', 'set_exception', 'set_result']
>>> future.result() #这个时候如果直接调用result()方法会报错
Traceback (most recent call last):
  File "<input>", line 1, in <module>
asyncio.base_futures.InvalidStateError: Result is not set.
>>> async def runfun():
...     result=await future
...     print(result)
...     
>>>loop.run_until_complete(runfun()) #也可以通过 loop.run_until_complete(future) 来执行,这里只是为了演示await
111
>>> future
<Future finished result=111>
>>> future.done()
True
>>> future.result()
111

Task

Eventloop 除了支持协程,还支持注册 FutureTask 2种类型的对象,而 Future 是协程的封装, Future 对象提供了很多任务方法(如完成后的回调,取消,设置任务结果等等),但是一般情况下开发者不需要操作 Future 这种底层对象,而是直接用 Future 的子类 Task 协同的调度协程来实现并发。那么什么是 Task 呢?下面介绍下:

一个与 Future 类似的对象,可运行 Python 协程。非线程安全。 Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象, Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象完成被打包的协程将恢复执行。 事件循环使用协同日程调度: 一个事件循环每次运行一个 Task 对象。而一个 Task 对象会等待一个 Future 对象完成,该事件循环会运行其他 Task 、回调或执行IO操作。

下面看看用法:

>>> async def a():
...     print('Suspending a')
...     await asyncio.sleep(3)
...     print('Resuming a')
...     
>>> task = asyncio.ensure_future(a())
>>> loop.run_until_complete(task)
Suspending a
Resuming a

asyncio中一些常见用法的区别

Asyncio.gather和asyncio.wait

我们在上面的代码中用到过 asyncio.gather ,其实还有另外一种用法是 asyncio.wait ,他们都可以让多个协程并发执行,那么他们有什么区别呢?下面介绍下。

>>> import asyncio
>>> async def a():
...     print('Suspending a')
...     await asyncio.sleep(3)
...     print('Resuming a')
...     return 'A'
... 
... 
... async def b():
...     print('Suspending b')
...     await asyncio.sleep(1)
...     print('Resuming b')
...     return 'B'
... 
>>> async def fun1():
...     return_value_a, return_value_b = await asyncio.gather(a(), b())
...     print(return_value_a,return_value_b)
...     
>>> asyncio.run(fun1())
Suspending a
Suspending b
Resuming b
Resuming a
A B
>>> async def fun2():
...     done,pending=await asyncio.wait([a(),b()])
...     print(done)
...     print(pending)
...     task=list(done)[0]
...     print(task)
...     print(task.result())
...     
>>> asyncio.run(fun2())
Suspending b
Suspending a
Resuming b
Resuming a
{<Task finished coro=<a() done, defined at <input>:1> result='A'>, <Task finished coro=<b() done, defined at <input>:8> result='B'>}
set()
<Task finished coro=<a() done, defined at <input>:1> result='A'>
A

根据上述代码,我们可以看出两者的区别:

  1. asyncio.gather 能收集协程的结果,而且会按照输入协程的顺序保存对应协程的执行结果,而 asyncio.wait 的返回值有两项,第一项是完成的任务列表,第二项表示等待完成的任务列表。
  2. asyncio.wait 支持接受一个参数 return_when ,在默认情况下, asyncio.wait 会等待全部任务完成 (return_when='ALL_COMPLETED') ,它还支持 FIRST_COMPLETED (第一个协程完成就返回)和 FIRST_EXCEPTION (出现第一个异常就返回):
    >>> async def fun2():
    ...     done,pending=await asyncio.wait([a(),b()],return_when=asyncio.tasks.FIRST_COMPLETED)
    ...     print(done)
    ...     print(pending)
    ...     task=list(done)[0]
    ...     print(task)
    ...     print(task.result())
    ...     
    >>> asyncio.run(fun2())
    Suspending a
    Suspending b
    Resuming b
    {<Task finished coro=<b() done, defined at <input>:8> result='B'>}
    {<Task pending coro=<a() running at <input>:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10757bf18>()]>>}
    <Task finished coro=<b() done, defined at <input>:8> result='B'>
    B
    一般情况下,用 asyncio.gather 就足够了。

asyncio.create_task和loop.create_task以及asyncio.ensure_future

这三种方法都可以创建 Task ,从Python3.7开始可以统一的使用更高阶的 asyncio.create_task .其实 asyncio.create_task 就是用的 loop.create_task . loop.create_task 接受的参数需要是一个协程,但是 asyncio.ensure_future 除了接受协程,还可以是 Future 对象或者 awaitable 对象:

  1. 如果参数是协程,其底层使用 loop.create_task ,返回 Task 对象
  2. 如果是 Future 对象会直接返回
  3. 如果是一个 awaitable 对象,会 await 这个对象的 __await__ 方法,再执行一次 ensure_future ,最后返回 Task 或者 Future

所以 ensure_future 方法主要就是确保这是一个 Future 对象,一般情况下直接用 asyncio.create_task 就可以了。

注册回调和执行同步代码

可以使用 add_done_callback 来添加成功回调:

def callback(future):
    print(f'Result: {future.result()}')

def callback2(future, n):
    print(f'Result: {future.result()}, N: {n}')

async def funa():
    await asyncio.sleep(1)
    return "funa"

async def main():
    task = asyncio.create_task(funa())
    task.add_done_callback(callback)
    await task
    #这样可以为callback传递参数
    task = asyncio.create_task(funa())
    task.add_done_callback(functools.partial(callback2, n=1))
    await task

if __name__ == '__main__':
    asyncio.run(main())

执行同步代码

如果有同步逻辑,想要用 asyncio 来实现并发,那么需要怎么做呢?下面看看:

def a1():
    time.sleep(1)
    return "A"

async def b1():
    await asyncio.sleep(1)
    return "B"

async def main():
    loop = asyncio.get_running_loop()
    await asyncio.gather(loop.run_in_executor(None, a1), b1())

if __name__ == '__main__':
    start = time.perf_counter()
    asyncio.run(main())
    print(f'main method Cost: {time.perf_counter() - start}')

# 输出: main method Cost: 1.0050589740000002

可以使用 run_into_executor 来将同步函数逻辑转化成一个协程,第一个参数是要传递 concurrent.futures.Executor 实例的,传递 None 会选择默认的 executor


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK