深入理解Python中的asyncio
source link: https://www.tuicool.com/articles/BRZZv2Q
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
asyncio介绍
熟悉c#的同学可能知道,在c#中可以很方便的使用 async
和 await
来实现异步编程,那么在python中应该怎么做呢,其实python也支持异步编程,一般使用 asyncio
这个库,下面介绍下什么是 asyncio
:
asyncio
是用来编写 并发 代码的库,使用 async/await
语法。 asyncio
被用作多个提供高性能 Python
异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。 asyncio
往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。
asyncio中的基本概念
可以看见,使用asyncio库我们也可以在python代码中使用 async
和 await
。在 asyncio
中,有四个基本概念,分别是:
Eventloop
Eventloop
可以说是 asyncio
应用的核心,中央总控, Eventloop
实例提供了注册、取消、执行任务和回调 的方法。
简单来说,就是我们可以把一些异步函数注册到这个事件循环上,事件循环回循环执行这些函数(每次只能执行一个),如果当前正在执行的函数在等待I/O返回,那么事件循环就会暂停它的执行去执行其他函数。当某个函数完成I/O后会恢复,等到下次循环到它的时候就会继续执行。
Coroutine
协程本质就是一个函数,
import asyncio import time async def a(): print('Suspending a') await asyncio.sleep(3) print('Resuming a') async def b(): print('Suspending b') await asyncio.sleep(1) print('Resuming b') async def main(): start = time.perf_counter() await asyncio.gather(a(), b()) print(f'{main.__name__} Cost: {time.perf_counter() - start}') if __name__ == '__main__': asyncio.run(main())
执行上述代码,可以看到类似这样的输出:
Suspending a Suspending b Resuming b Resuming a main Cost: 3.0023356619999997
关于协程的具体介绍,可以参考我以前的文章python中的协程 不过以前的那种写法,需要使用装饰器,已经过时了。
Future
Future
是表示一个“未来”对象,类似于 javascript
中的 promise
,当异步操作结束后会把最终结果设置到这个 Future
对象上, Future
是对协程的封装。
>>> import asyncio >>> def fun(): ... print("inner fun") ... return 111 ... >>> loop = asyncio.get_event_loop() >>> future = loop.run_in_executor(None, fun) #这里没有使用await inner fun >>> future #可以看到,fun方法状态是pending <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/futures.py:348]> >>> future.done() # 还没有完成 False >>> [m for m in dir(future) if not m.startswith('_')] ['add_done_callback', 'cancel', 'cancelled', 'done', 'exception', 'get_loop', 'remove_done_callback', 'result', 'set_exception', 'set_result'] >>> future.result() #这个时候如果直接调用result()方法会报错 Traceback (most recent call last): File "<input>", line 1, in <module> asyncio.base_futures.InvalidStateError: Result is not set. >>> async def runfun(): ... result=await future ... print(result) ... >>>loop.run_until_complete(runfun()) #也可以通过 loop.run_until_complete(future) 来执行,这里只是为了演示await 111 >>> future <Future finished result=111> >>> future.done() True >>> future.result() 111
Task
Eventloop
除了支持协程,还支持注册 Future
和 Task
2种类型的对象,而 Future
是协程的封装, Future
对象提供了很多任务方法(如完成后的回调,取消,设置任务结果等等),但是一般情况下开发者不需要操作 Future
这种底层对象,而是直接用 Future
的子类 Task
协同的调度协程来实现并发。那么什么是 Task
呢?下面介绍下:
一个与 Future
类似的对象,可运行 Python
协程。非线程安全。 Task
对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future
对象, Task
对象会挂起该协程的执行并等待该 Future
对象完成。当该 Future
对象完成被打包的协程将恢复执行。
事件循环使用协同日程调度: 一个事件循环每次运行一个 Task
对象。而一个 Task
对象会等待一个 Future
对象完成,该事件循环会运行其他 Task
、回调或执行IO操作。
下面看看用法:
>>> async def a(): ... print('Suspending a') ... await asyncio.sleep(3) ... print('Resuming a') ... >>> task = asyncio.ensure_future(a()) >>> loop.run_until_complete(task) Suspending a Resuming a
asyncio中一些常见用法的区别
Asyncio.gather和asyncio.wait
我们在上面的代码中用到过 asyncio.gather
,其实还有另外一种用法是 asyncio.wait
,他们都可以让多个协程并发执行,那么他们有什么区别呢?下面介绍下。
>>> import asyncio >>> async def a(): ... print('Suspending a') ... await asyncio.sleep(3) ... print('Resuming a') ... return 'A' ... ... ... async def b(): ... print('Suspending b') ... await asyncio.sleep(1) ... print('Resuming b') ... return 'B' ... >>> async def fun1(): ... return_value_a, return_value_b = await asyncio.gather(a(), b()) ... print(return_value_a,return_value_b) ... >>> asyncio.run(fun1()) Suspending a Suspending b Resuming b Resuming a A B >>> async def fun2(): ... done,pending=await asyncio.wait([a(),b()]) ... print(done) ... print(pending) ... task=list(done)[0] ... print(task) ... print(task.result()) ... >>> asyncio.run(fun2()) Suspending b Suspending a Resuming b Resuming a {<Task finished coro=<a() done, defined at <input>:1> result='A'>, <Task finished coro=<b() done, defined at <input>:8> result='B'>} set() <Task finished coro=<a() done, defined at <input>:1> result='A'> A
根据上述代码,我们可以看出两者的区别:
-
asyncio.gather
能收集协程的结果,而且会按照输入协程的顺序保存对应协程的执行结果,而asyncio.wait
的返回值有两项,第一项是完成的任务列表,第二项表示等待完成的任务列表。 -
asyncio.wait
支持接受一个参数return_when
,在默认情况下,asyncio.wait
会等待全部任务完成(return_when='ALL_COMPLETED')
,它还支持FIRST_COMPLETED
(第一个协程完成就返回)和FIRST_EXCEPTION
(出现第一个异常就返回):>>> async def fun2(): ... done,pending=await asyncio.wait([a(),b()],return_when=asyncio.tasks.FIRST_COMPLETED) ... print(done) ... print(pending) ... task=list(done)[0] ... print(task) ... print(task.result()) ... >>> asyncio.run(fun2()) Suspending a Suspending b Resuming b {<Task finished coro=<b() done, defined at <input>:8> result='B'>} {<Task pending coro=<a() running at <input>:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10757bf18>()]>>} <Task finished coro=<b() done, defined at <input>:8> result='B'> B
一般情况下,用asyncio.gather
就足够了。
asyncio.create_task和loop.create_task以及asyncio.ensure_future
这三种方法都可以创建 Task
,从Python3.7开始可以统一的使用更高阶的 asyncio.create_task
.其实 asyncio.create_task
就是用的 loop.create_task
. loop.create_task
接受的参数需要是一个协程,但是 asyncio.ensure_future
除了接受协程,还可以是 Future
对象或者 awaitable
对象:
-
如果参数是协程,其底层使用
loop.create_task
,返回Task
对象 -
如果是
Future
对象会直接返回 -
如果是一个
awaitable
对象,会await
这个对象的__await__
方法,再执行一次ensure_future
,最后返回Task
或者Future
。
所以 ensure_future
方法主要就是确保这是一个 Future
对象,一般情况下直接用 asyncio.create_task
就可以了。
注册回调和执行同步代码
可以使用 add_done_callback
来添加成功回调:
def callback(future): print(f'Result: {future.result()}') def callback2(future, n): print(f'Result: {future.result()}, N: {n}') async def funa(): await asyncio.sleep(1) return "funa" async def main(): task = asyncio.create_task(funa()) task.add_done_callback(callback) await task #这样可以为callback传递参数 task = asyncio.create_task(funa()) task.add_done_callback(functools.partial(callback2, n=1)) await task if __name__ == '__main__': asyncio.run(main())
执行同步代码
如果有同步逻辑,想要用 asyncio
来实现并发,那么需要怎么做呢?下面看看:
def a1(): time.sleep(1) return "A" async def b1(): await asyncio.sleep(1) return "B" async def main(): loop = asyncio.get_running_loop() await asyncio.gather(loop.run_in_executor(None, a1), b1()) if __name__ == '__main__': start = time.perf_counter() asyncio.run(main()) print(f'main method Cost: {time.perf_counter() - start}') # 输出: main method Cost: 1.0050589740000002
可以使用 run_into_executor
来将同步函数逻辑转化成一个协程,第一个参数是要传递 concurrent.futures.Executor
实例的,传递 None
会选择默认的 executor
。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK