在 Python 中按需处理数据,第 3 部分: 协程和 asyncio

 5 years ago
source link: http://www.ibm.com/developerworks/cn/analytics/library/ba-on-demand-data-python-3/index.html?ca=drs-&%3Butm_source=tuicool&%3Butm_medium=referral
在 Python 中按需处理数据,第 3 部分

协程和 asyncio


Uche Ogbuji

2018 年 7 月 25 日发布


在本系列的第 1 部分中,您了解了 Python 迭代器;在第 2 部分中,您了解了 itertools。在这一部分,将了解一种称为协程(Coroutines)的特殊的生成器函数。您还将了解另一个功能强大但十分复杂的标准库模块: asyncio

想象您走进一家小餐馆。餐馆里有 3 张桌子,且仅有一位服务员。您知道将会发生什么。服务员为您递上菜单,然后回来取走您的订单。在厨师准备好您订购的食物后,服务员会将其端给您。用完餐后,服务员会将账单带给您,并在您准备好付款时回到您的桌边。


如果只有一桌客人,从客人走进餐馆到离开餐馆平均需要 1 小时的时间;如果还有另一桌客人,可能需要花费 1 小时 10 分钟的时间;如果其他两桌都有客人,则会花费 1 小时 20 分钟的时间。这种情况不算太糟糕。




我在这个类比上花这么多时间的原因是,它阐明了一项最重要的技术,开发人员应该适当学习这项技术,以便编写能够使用以输入/输出 (I/O) 为基础的数据库、网络和其他这类资源的可扩展应用程序。现实世界的餐馆之所以使用异步流程,是因为如果不这样做,它们就不会有吸引力,也不会有竞争力。理想情况下,实际程序倾向于使用异步流程,但开发人员需要利用正确的工具、库、技能和实践才能实现异步流程。本教程是教程系列的第 3 部分,将介绍如何在 Python 中这么做。

我想说的是,Python 支持异步编程的工具纷繁复杂,有许多不同的方法来实现异步编程。这些工具中有些是最近才添加的,有些辅助功能尚处于试验阶段。不过,这个主题很重要,值得坚持。我会通过这些工具中的一个实用子集,有意识地引导您熟悉基本概念,然后您就可以自行探索其他方法。



可以多次进出、每次暂停并恢复执行的函数被称为协程。生成器只是一种简化的协程。Python 有多种类型的协程,但本教程的重点是设计用来支持异步编程的协程类型。让我们回到餐馆的类比上。每桌的菜单/下单/用餐/结账/付款步骤是一个单独的协程,但服务员会暂停并重新关注每桌的客人,以便所有 3 个协程能同时运行(可能处于流程的不同阶段)。服务员训练有素的大脑充当着处理这些并行协程的调度程序。




利用 Python 的可读性几乎与伪代码一样的事实,下面给出了服务员协程的一个实际实现。

async def serve_table(table_number): 
    await get_menus()
    print('Welcome.Please sit at table', table_number, 'Here are your menus')
    order = await get_order()
    print('Table', table_number, 'what will you be having today?')
    await prepare_order(order)
    print('Table', table_number, 'here is your meal:', order)
    await eat()
    print('Table', table_number, 'here is your check')
    await get_payment()
    print('Thanks for visiting us! (table', table_number, ')')

此函数是使用 async def 而不只是使用 def 定义的。这会将它标记为异步协程函数。顺便提一下,还有一些异步协程 生成器 函数,其主体中的某处有一个 yield 语句,但这些都是特殊情况,超出了本教程系列的讨论范畴。老实说,Python 3 中众多的函数/生成器/协程类型让人眼花缭乱,但本教程系列会忽略一些可能情况,仅提供一个简单的入门途径。

serve_table 的主体中包含一系列 await 语句。这会从调用的协程函数创建一个协程对象并调用此对象,同时将控制权转交给其他任何准备运行的协程。这相当于餐馆服务员启动了一个流程,比如让厨师开始准备食物,同时检查是否有任何其他桌的客人需要关注。

对任务的这种处理发生在训练有素的服务员的大脑中,在 Python 中,类似情况称为事件循环。我们稍后再介绍事件循环。


让我们看看 serve_table 调用的其他协程的实现。

async def get_menus(): 
    delay_minutes = random.randrange(3) #0 to 3 minutes
    await asyncio.sleep(delay_minutes) #Pretend a second is a minute

async def get_order(): 
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)
    order = random.choice(['Special of the day', 'Fish & Chips', 'Pasta'])
    return order

async def prepare_order(order): 
    delay_minutes = random.randrange(10, 20) #10 to 20 minutes
    await asyncio.sleep(delay_minutes)
    print('   [Order ready from kitchen: ', order, ']')

async def eat(): 
    delay_minutes = random.randrange(20, 40)
    await asyncio.sleep(delay_minutes)

async def get_payment(): 
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)

这些函数使用一个睡眠计时器来模拟如何花时间做一些处理。 random.randrange 函数提供了一个整数范围,用于从中随机挑选一个整数。 asyncio.sleep 函数是一个特殊的协程,它会在给定的秒数内暂停操作。当然,在此睡眠期间,事件循环可以自由运行其他任何准备好的协程。像往常一样,您可以使用 await 关键字调用此函数。

此时顺便提一下,您只能在异步协程函数(比如使用 async def 定义的函数)的主体中使用 await 关键字。在其他任何地方使用 await 都是一个语法错误。

请注意, get_order 协程返回了一个值。此值是在调用者的 await 语句中传回的。


我之前提到过事件循环。您需要使用一些特殊的设置代码来进入异步模式,创建一个事件循环来调度和管理各个协程,就像您将它们编码为协作任务一样。 asyncio 协程也可简化称为任务。当某个协程使用 await 将控制权转交给另一个协程时,它实际上是将控制权转交回事件循环。事件循环就像服务员训练有素的大脑。


#Create coroutines for three tables
gathered_coroutines = asyncio.gather(

#asyncio uses event loops to manage its operation
loop = asyncio.get_event_loop()
#This is the entry from synchronous to asynchronous code.It will block
#Until the coroutine passed in has completed
#We're done with the event loop

特殊协程 asyncio.gather 采用一个或多个其他协程,安排它们全部运行,且在所有集合协程都运行完才算完成。这里使用它在事件循环中运行 3 个桌子的协程,我们已经先使用 asyncio.get_event_loop 获得了事件循环。下一行代码运行了给定的协程,直至它完成。由于向该协程传递了收集的 3 个协程,因此在所有 3 个协程都完成后它才结束运行。当然,每个 serve_table 协程都会调用其他协程,比如使用 await 调用 get_menusget_order ,然后使用事件循环来调度这些协程。


清单 1. serve_tables.py 是完整的程序

import random
import asyncio

async def get_menus(): 
    delay_minutes = random.randrange(3) #0 to 3 minutes
    await asyncio.sleep(delay_minutes) #Pretend a second is a minute

async def get_order(): 
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)
    order = random.choice(['Special of the day', 'Fish & Chips', 'Pasta'])
    return order

async def prepare_order(order): 
    delay_minutes = random.randrange(10, 20) #10 to 20 minutes
    await asyncio.sleep(delay_minutes)
    print('   [Order ready from kitchen: ', order, ']')

async def eat(): 
    delay_minutes = random.randrange(20, 40)
    await asyncio.sleep(delay_minutes)

async def get_payment(): 
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)

async def serve_table(table_number): 
    await get_menus()
    print('Welcome.Please sit at table', table_number, 'Here are your menus')
    order = await get_order()
    print('Table', table_number, 'what will you be having today?')
    await prepare_order(order)
    print('Table', table_number, 'here is your meal:', order)
    await eat()
    print('Table', table_number, 'here is your check')
    await get_payment()
    print('Thanks for visiting us! (table', table_number, ')')

#Create coroutines for three tables
gathered_coroutines = asyncio.gather(

#asyncio uses event loops to manage its operation
loop = asyncio.get_event_loop()
#This is the entry from synchronous to asynchronous code.It will block
#Until the coroutine passed in has completed
#We're done with the event loop


Welcome.Please sit at table 1 Here are your menus
Welcome.Please sit at table 2 Here are your menus
Table 1 what will you be having today? 
Welcome.Please sit at table 3 Here are your menus
Table 3 what will you be having today? 
Table 2 what will you be having today? 
   [Order ready from kitchen:  Pasta ]
Table 1 here is your meal: Pasta
   [Order ready from kitchen:  Fish & Chips ]
Table 3 here is your meal: Fish & Chips
   [Order ready from kitchen:  Special of the day ]
Table 2 here is your meal: Special of the day
Table 3 here is your check
Table 1 here is your check
Thanks for visiting us! (table 3 )
Thanks for visiting us! (table 1 )
Table 2 here is your check
Thanks for visiting us! (table 2 )


另请注意,该程序并不总是完全从 1 号桌开始,然后是 2 号桌、3 号桌。 asyncio.gather 协程会调度您为它提供的协程,但没有特定的顺序。

这里要注意的主要事情是协作式多任务的流程。研究上面的完整清单,同时运行并调整代码,直到您完全了解协程如何释放和重获控制权。有时,所有 3 个 serve_table 协程对象都在调用其他某个协程,所有协程都在等待睡眠延迟。在这几秒内,您看不到任何输出。在这些时刻,事件循环会耐心地检查每个协程,查看它们何时可以恢复执行。


我提到了如何获得在运行清单 1 中程序获得的输出之间的延迟。显示某种进度指示器会更加方便用户的使用。您可以使用协作式多任务的强大功能来实现此目的。下面的协程函数每秒显示一个点两次,以此作为进度指示器。

async def progress_indicator(delay, loop): 
    while True: 
            await asyncio.sleep(delay)
        except asyncio.CancelledError: 
        #Print a dot, with no newline afterward & force the output to appear immediately
        print('.', end='', flush=True)
        #Check if this is the last remaining task, and exit if so
        num_active_tasks = [ task for task in asyncio.Task.all_tasks(loop)
                                  if not task.done() ]
        if len(num_active_tasks) == 1: 

此函数采用了两个参数:打印点的最短延迟和事件循环对象。例如,在清单 1 底部附近创建的就是这种对象。为了确保一组受控协程之间能够保持协作,您需要将一个循环对象传递给许多 asyncio 。在本例中,将事件循环传递给了 asyncio.Task.all_tasks ,然后,后者返回所有在该事件循环中调度的任务(即协程)的列表,包括那些已完成的任务。为了仅获得未完成的任务,请使用 task.done 进一步筛选该列表。

假设您利用此函数创建了一个协程对象,并传入 0.5 作为延迟。它会直接进入一个无限循环,就像您可能还记得的之前教程中的无限生成器那样。然后,它会调用睡眠延迟,但在外部实体取消该协程(可以通过多种方式)时,会出现异常。在这些情况下,协程会中断并抛出 asyncio.CancelledError 异常,导致我们跳出无限循环。

通常,协程恢复正常后,它会输出一个点,然后检查其他所有协程是否在正常运行。如果 progress_indicator 是唯一剩下的协程,它会跳出无限循环。

清单 2.为了使用 progress_indicator 协程而更新的完整清单

import random
import asyncio

async def get_menus(): 
    delay_minutes = random.randrange(3) #0 to 3 minutes
    await asyncio.sleep(delay_minutes) #Pretend a second is a minute

async def get_order(): 
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)
    order = random.choice(['Special of the day', 'Fish & Chips', 'Pasta'])
    return order

async def prepare_order(order): 
    delay_minutes = random.randrange(10, 20) #10 to 20 minutes
    await asyncio.sleep(delay_minutes)
    print('   [Order ready from kitchen: ', order, ']')

async def eat(): 
    delay_minutes = random.randrange(20, 40)
    await asyncio.sleep(delay_minutes)

async def get_payment(): 
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)

async def progress_indicator(delay, loop): 
    while True: 
            await asyncio.sleep(delay)
        except asyncio.CancelledError: 
        #Print a dot, with no newline afterward & force the output to appear immediately
        print('.', end='', flush=True)
        #Check if this is the last remaining task, and exit if so
        num_active_tasks = [ task for task in asyncio.Task.all_tasks(loop)
                                  if not task.done() ]
        if len(num_active_tasks) == 1: 

async def serve_table(table_number): 
    await get_menus()
    print('Welcome.Please sit at table', table_number, 'Here are your menus')
    order = await get_order()
    print('Table', table_number, 'what will you be having today?')
    await prepare_order(order)
    print('Table', table_number, 'here is your meal:', order)
    await eat()
    print('Table', table_number, 'here is your check')
    await get_payment()
    print('Thanks for visiting us! (table', table_number, ')')

#asyncio uses event loops to manage its operation
loop = asyncio.get_event_loop()

#Create coroutines for three tables
gathered_coroutines = asyncio.gather(
    progress_indicator(0.5, loop)

#This is the entry from synchronous to asynchronous code.It will block
#Until the coroutine passed in has completed
#We're done with the event loop

请注意,现在,在创建协程集合之前,事件循环已经出现了。这是因为必须将此循环传递给 progress_indicator ,正如您在要收集的协程列表中看到的那样。


.Welcome.Please sit at table 3 Here are your menus
..Welcome.Please sit at table 2 Here are your menus
Welcome.Please sit at table 1 Here are your menus
........Table 3 what will you be having today? 
......Table 1 what will you be having today? 
....Table 2 what will you be having today? 
..........[Order ready from kitchen:  Fish & Chips ]
Table 3 here is your meal: Fish & Chips
..............[Order ready from kitchen:  Fish & Chips ]
Table 2 here is your meal: Fish & Chips
......[Order ready from kitchen:  Pasta ]
Table 1 here is your meal: Pasta
..............................Table 3 here is your check
Thanks for visiting us! (table 3 )
..........Table 2 here is your check
..Thanks for visiting us! (table 2 )
......................Table 1 here is your check
..........Thanks for visiting us! (table 1 )



如果您在 Python 中实现过多线程或多处理,您可能想知道它们与这种 asyncio 协作式多任务方法有何不同。主要区别在于,在 asyncio 方法中,不会实际尝试让两个协程在同一时间做一些事情,就像餐馆服务员无法在为 3 号桌上餐的同时为 1 号桌提供菜单一样。 asyncio 事件循环所做的是利用任务内的自然停机时间,允许协程在有工作要做时执行工作,但在其他协程空闲时将控制权转交给它们。


一般来说, asyncio 事件循环最适合经常联网的程序,或大量查询数据库和类似对象的程序。等待远程服务器或数据库响应请求或查询时,是将控制权释放给事件循环的理想时刻。在过去,程序员倾向于在这种情况下使用线程,但与多线程相比, asyncio 事件循环是一种更加清晰和灵活的编程方式。一个难点是,要充分发挥 asyncio 事件循环的优势,需要在 asyncio 协程中对网络和数据库 API 进行编码。幸运的是,现在许多 Python 第三方库已实现了对 asyncio 的充分利用。

不过,有时您可能会遇到这样的情况:您想要使用 asyncio ,但需要使用不支持 asyncio 的库。换言之,您需要从异步代码中调用同步代码,而不破坏多线程。可以使用在单独的线程或流程中运行同步代码的 asyncio 执行器实现此目的。我提到这一点是因为您可能想了解这一点,但更多的细节超出了这些教程的讨论范畴。


随着您越来越精通 asyncio ,您会了解到与该技术相关的其他外来概念,包括令人印象深刻的“future”。您还会了解到,协程可通过不同方式将控制权释放给事件循环,包括 async with ,如果您使用的是 Python 3.6 或更高版本,还包括 async for 。由于本教程系列要求的最低版本是 Python 3.5,所以我不会讨论后者,但在下一篇教程中,您将学习 async with ,以及其他很酷的技术。

