58

在 Python 中按需处理数据,第 3 部分: 协程和 asyncio

 5 years ago
source link: http://www.ibm.com/developerworks/cn/analytics/library/ba-on-demand-data-python-3/index.html?ca=drs-&%3Butm_source=tuicool&%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在 Python 中按需处理数据,第 3 部分

协程和 asyncio

学习如何通过输入/输出处理显著提高软件性能

Uche Ogbuji

2018 年 7 月 25 日发布

系列内容:

此内容是该系列 # 部分中的第 # 部分: 在 Python 中按需处理数据,第 3 部分

https://www.ibm.com/developerworks/cn/library/?series_title_by=**auto**

敬请期待该系列的后续内容。

此内容是该系列的一部分: 在 Python 中按需处理数据,第 3 部分

敬请期待该系列的后续内容。

在本系列的第 1 部分中,您了解了 Python 迭代器;在第 2 部分中,您了解了 itertools。在这一部分,将了解一种称为协程(Coroutines)的特殊的生成器函数。您还将了解另一个功能强大但十分复杂的标准库模块: asyncio

想象您走进一家小餐馆。餐馆里有 3 张桌子,且仅有一位服务员。您知道将会发生什么。服务员为您递上菜单,然后回来取走您的订单。在厨师准备好您订购的食物后,服务员会将其端给您。用完餐后,服务员会将账单带给您,并在您准备好付款时回到您的桌边。

其他桌的客人正在愉快地用餐,因为当您考虑点什么菜时,或者在厨师准备您的食物时,抑或在您用餐时,服务员可以对其他桌的客人执行这些步骤。当多个桌子的客人同时需要服务员的关注时,您可能需要等几分钟,但等待时间不会太长。

如果只有一桌客人,从客人走进餐馆到离开餐馆平均需要 1 小时的时间;如果还有另一桌客人,可能需要花费 1 小时 10 分钟的时间;如果其他两桌都有客人,则会花费 1 小时 20 分钟的时间。这种情况不算太糟糕。

现在假设您走进了餐馆,但服务员只关注一桌客人,直到他们完成所有的步骤。您甚至可能需要等待两个小时后才开始用餐,因为服务员率先处理了其他桌客人的用餐。这不会是一家受欢迎的餐馆。

同步与异步

令人惊讶的是,我们编写的许多计算机代码的工作方式类似于这家效率极低的餐馆。在计算机术语中,不受欢迎的餐馆情形称为串行操作,服务员的操作称为同步操作。我们习惯的餐馆情形是,服务员可以同时关注不同桌子的客人,满足他们的需求,这种情形称为并行操作,服务员的操作称为异步操作。

我在这个类比上花这么多时间的原因是,它阐明了一项最重要的技术,开发人员应该适当学习这项技术,以便编写能够使用以输入/输出 (I/O) 为基础的数据库、网络和其他这类资源的可扩展应用程序。现实世界的餐馆之所以使用异步流程,是因为如果不这样做,它们就不会有吸引力,也不会有竞争力。理想情况下,实际程序倾向于使用异步流程,但开发人员需要利用正确的工具、库、技能和实践才能实现异步流程。本教程是教程系列的第 3 部分,将介绍如何在 Python 中这么做。

我想说的是,Python 支持异步编程的工具纷繁复杂,有许多不同的方法来实现异步编程。这些工具中有些是最近才添加的,有些辅助功能尚处于试验阶段。不过,这个主题很重要,值得坚持。我会通过这些工具中的一个实用子集,有意识地引导您熟悉基本概念,然后您就可以自行探索其他方法。

协程

您在前面的教程中了解了生成器函数,以及它们与常规函数的区别。当调用者调用常规函数时,流程是从顶部开始的,并根据函数的逻辑从某个位置退出。借助生成器,调用者可以多次进出单个函数,暂停并恢复其执行。

可以多次进出、每次暂停并恢复执行的函数被称为协程。生成器只是一种简化的协程。Python 有多种类型的协程,但本教程的重点是设计用来支持异步编程的协程类型。让我们回到餐馆的类比上。每桌的菜单/下单/用餐/结账/付款步骤是一个单独的协程,但服务员会暂停并重新关注每桌的客人,以便所有 3 个协程能同时运行(可能处于流程的不同阶段)。服务员训练有素的大脑充当着处理这些并行协程的调度程序。

在同步餐馆中,所有操作都是常规函数。在客人到达时,进入函数一次;在客人离开时,退出函数一次。一次只能运行一个这样的函数,所以客人可能需要等上两个小时才能开始自己的用餐体验。

在异步餐馆中,会在客人到达时首次进入协程函数,创建一个协程对象,并在客人离开时最终退出函数,此时不再需要协程对象。但是,在服务员提供菜单后,他们可以暂停这桌客人的协程对象,检查是否需要关注其他任何桌子的客人。在任何指定桌子的客人执行下单、结账等步骤后,服务员也会执行同样的操作。

餐馆服务员代码

利用 Python 的可读性几乎与伪代码一样的事实,下面给出了服务员协程的一个实际实现。

async def serve_table(table_number): 
    await get_menus()
    print('Welcome.Please sit at table', table_number, 'Here are your menus')
    order = await get_order()
    print('Table', table_number, 'what will you be having today?')
    await prepare_order(order)
    print('Table', table_number, 'here is your meal:', order)
    await eat()
    print('Table', table_number, 'here is your check')
    await get_payment()
    print('Thanks for visiting us! (table', table_number, ')')

此函数是使用 async def 而不只是使用 def 定义的。这会将它标记为异步协程函数。顺便提一下,还有一些异步协程 生成器 函数,其主体中的某处有一个 yield 语句,但这些都是特殊情况,超出了本教程系列的讨论范畴。老实说,Python 3 中众多的函数/生成器/协程类型让人眼花缭乱,但本教程系列会忽略一些可能情况,仅提供一个简单的入门途径。

serve_table 的主体中包含一系列 await 语句。这会从调用的协程函数创建一个协程对象并调用此对象,同时将控制权转交给其他任何准备运行的协程。这相当于餐馆服务员启动了一个流程,比如让厨师开始准备食物,同时检查是否有任何其他桌的客人需要关注。

对任务的这种处理发生在训练有素的服务员的大脑中,在 Python 中,类似情况称为事件循环。我们稍后再介绍事件循环。

更多协程

让我们看看 serve_table 调用的其他协程的实现。

async def get_menus(): 
    delay_minutes = random.randrange(3) #0 to 3 minutes
    await asyncio.sleep(delay_minutes) #Pretend a second is a minute

async def get_order(): 
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)
    order = random.choice(['Special of the day', 'Fish & Chips', 'Pasta'])
    return order

async def prepare_order(order): 
    delay_minutes = random.randrange(10, 20) #10 to 20 minutes
    await asyncio.sleep(delay_minutes)
    print('   [Order ready from kitchen: ', order, ']')

async def eat(): 
    delay_minutes = random.randrange(20, 40)
    await asyncio.sleep(delay_minutes)

async def get_payment(): 
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)

这些函数使用一个睡眠计时器来模拟如何花时间做一些处理。 random.randrange 函数提供了一个整数范围,用于从中随机挑选一个整数。 asyncio.sleep 函数是一个特殊的协程,它会在给定的秒数内暂停操作。当然,在此睡眠期间,事件循环可以自由运行其他任何准备好的协程。像往常一样,您可以使用 await 关键字调用此函数。

此时顺便提一下,您只能在异步协程函数(比如使用 async def 定义的函数)的主体中使用 await 关键字。在其他任何地方使用 await 都是一个语法错误。

请注意, get_order 协程返回了一个值。此值是在调用者的 await 语句中传回的。

综合所有部分:事件循环

我之前提到过事件循环。您需要使用一些特殊的设置代码来进入异步模式,创建一个事件循环来调度和管理各个协程,就像您将它们编码为协作任务一样。 asyncio 协程也可简化称为任务。当某个协程使用 await 将控制权转交给另一个协程时,它实际上是将控制权转交回事件循环。事件循环就像服务员训练有素的大脑。

以下是运行我们目前为止定义的餐馆服务员协程的代码。

#Create coroutines for three tables
gathered_coroutines = asyncio.gather(
    serve_table(1),
    serve_table(2),
    serve_table(3)
)

#asyncio uses event loops to manage its operation
loop = asyncio.get_event_loop()
#This is the entry from synchronous to asynchronous code.It will block
#Until the coroutine passed in has completed
loop.run_until_complete(gathered_coroutines)
#We're done with the event loop
loop.close()

特殊协程 asyncio.gather 采用一个或多个其他协程,安排它们全部运行,且在所有集合协程都运行完才算完成。这里使用它在事件循环中运行 3 个桌子的协程,我们已经先使用 asyncio.get_event_loop 获得了事件循环。下一行代码运行了给定的协程,直至它完成。由于向该协程传递了收集的 3 个协程,因此在所有 3 个协程都完成后它才结束运行。当然,每个 serve_table 协程都会调用其他协程,比如使用 await 调用 get_menusget_order ,然后使用事件循环来调度这些协程。

完整程序

清单 1. serve_tables.py 是完整的程序

import random
import asyncio

async def get_menus(): 
    delay_minutes = random.randrange(3) #0 to 3 minutes
    await asyncio.sleep(delay_minutes) #Pretend a second is a minute

async def get_order(): 
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)
    order = random.choice(['Special of the day', 'Fish & Chips', 'Pasta'])
    return order

async def prepare_order(order): 
    delay_minutes = random.randrange(10, 20) #10 to 20 minutes
    await asyncio.sleep(delay_minutes)
    print('   [Order ready from kitchen: ', order, ']')

async def eat(): 
    delay_minutes = random.randrange(20, 40)
    await asyncio.sleep(delay_minutes)

async def get_payment(): 
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)

async def serve_table(table_number): 
    await get_menus()
    print('Welcome.Please sit at table', table_number, 'Here are your menus')
    order = await get_order()
    print('Table', table_number, 'what will you be having today?')
    await prepare_order(order)
    print('Table', table_number, 'here is your meal:', order)
    await eat()
    print('Table', table_number, 'here is your check')
    await get_payment()
    print('Thanks for visiting us! (table', table_number, ')')

#Create coroutines for three tables
gathered_coroutines = asyncio.gather(
    serve_table(1),
    serve_table(2),
    serve_table(3)
)

#asyncio uses event loops to manage its operation
loop = asyncio.get_event_loop()
#This is the entry from synchronous to asynchronous code.It will block
#Until the coroutine passed in has completed
loop.run_until_complete(gathered_coroutines)
#We're done with the event loop
loop.close()

这是运行此程序的输出示例。

Welcome.Please sit at table 1 Here are your menus
Welcome.Please sit at table 2 Here are your menus
Table 1 what will you be having today? 
Welcome.Please sit at table 3 Here are your menus
Table 3 what will you be having today? 
Table 2 what will you be having today? 
   [Order ready from kitchen:  Pasta ]
Table 1 here is your meal: Pasta
   [Order ready from kitchen:  Fish & Chips ]
Table 3 here is your meal: Fish & Chips
   [Order ready from kitchen:  Special of the day ]
Table 2 here is your meal: Special of the day
Table 3 here is your check
Table 1 here is your check
Thanks for visiting us! (table 3 )
Thanks for visiting us! (table 1 )
Table 2 here is your check
Thanks for visiting us! (table 2 )

请注意,大部分行之间都会有几秒的延迟。这是各个协程中的睡眠延迟,它模拟了在餐馆中处理事务所花费的时间。时间被压缩,程序中的一秒表示餐馆中的一分钟。由于睡眠延迟时长是随机的,所以每次运行程序时,消息的出现顺序都是不同的。

另请注意,该程序并不总是完全从 1 号桌开始,然后是 2 号桌、3 号桌。 asyncio.gather 协程会调度您为它提供的协程,但没有特定的顺序。

这里要注意的主要事情是协作式多任务的流程。研究上面的完整清单,同时运行并调整代码,直到您完全了解协程如何释放和重获控制权。有时,所有 3 个 serve_table 协程对象都在调用其他某个协程,所有协程都在等待睡眠延迟。在这几秒内,您看不到任何输出。在这些时刻,事件循环会耐心地检查每个协程,查看它们何时可以恢复执行。

添加协程

我提到了如何获得在运行清单 1 中程序获得的输出之间的延迟。显示某种进度指示器会更加方便用户的使用。您可以使用协作式多任务的强大功能来实现此目的。下面的协程函数每秒显示一个点两次,以此作为进度指示器。

async def progress_indicator(delay, loop): 
    while True: 
        try: 
            await asyncio.sleep(delay)
        except asyncio.CancelledError: 
            break
        #Print a dot, with no newline afterward & force the output to appear immediately
        print('.', end='', flush=True)
        #Check if this is the last remaining task, and exit if so
        num_active_tasks = [ task for task in asyncio.Task.all_tasks(loop)
                                  if not task.done() ]
        if len(num_active_tasks) == 1: 
            break

此函数采用了两个参数:打印点的最短延迟和事件循环对象。例如,在清单 1 底部附近创建的就是这种对象。为了确保一组受控协程之间能够保持协作,您需要将一个循环对象传递给许多 asyncio 。在本例中,将事件循环传递给了 asyncio.Task.all_tasks ,然后,后者返回所有在该事件循环中调度的任务(即协程)的列表,包括那些已完成的任务。为了仅获得未完成的任务,请使用 task.done 进一步筛选该列表。

假设您利用此函数创建了一个协程对象,并传入 0.5 作为延迟。它会直接进入一个无限循环,就像您可能还记得的之前教程中的无限生成器那样。然后,它会调用睡眠延迟,但在外部实体取消该协程(可以通过多种方式)时,会出现异常。在这些情况下,协程会中断并抛出 asyncio.CancelledError 异常,导致我们跳出无限循环。

通常,协程恢复正常后,它会输出一个点,然后检查其他所有协程是否在正常运行。如果 progress_indicator 是唯一剩下的协程,它会跳出无限循环。

清单 2.为了使用 progress_indicator 协程而更新的完整清单

import random
import asyncio

async def get_menus(): 
    delay_minutes = random.randrange(3) #0 to 3 minutes
    await asyncio.sleep(delay_minutes) #Pretend a second is a minute

async def get_order(): 
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)
    order = random.choice(['Special of the day', 'Fish & Chips', 'Pasta'])
    return order

async def prepare_order(order): 
    delay_minutes = random.randrange(10, 20) #10 to 20 minutes
    await asyncio.sleep(delay_minutes)
    print('   [Order ready from kitchen: ', order, ']')

async def eat(): 
    delay_minutes = random.randrange(20, 40)
    await asyncio.sleep(delay_minutes)

async def get_payment(): 
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)

async def progress_indicator(delay, loop): 
    while True: 
        try: 
            await asyncio.sleep(delay)
        except asyncio.CancelledError: 
            break
        #Print a dot, with no newline afterward & force the output to appear immediately
        print('.', end='', flush=True)
        #Check if this is the last remaining task, and exit if so
        num_active_tasks = [ task for task in asyncio.Task.all_tasks(loop)
                                  if not task.done() ]
        if len(num_active_tasks) == 1: 
            break

async def serve_table(table_number): 
    await get_menus()
    print('Welcome.Please sit at table', table_number, 'Here are your menus')
    order = await get_order()
    print('Table', table_number, 'what will you be having today?')
    await prepare_order(order)
    print('Table', table_number, 'here is your meal:', order)
    await eat()
    print('Table', table_number, 'here is your check')
    await get_payment()
    print('Thanks for visiting us! (table', table_number, ')')

#asyncio uses event loops to manage its operation
loop = asyncio.get_event_loop()

#Create coroutines for three tables
gathered_coroutines = asyncio.gather(
    serve_table(1),
    serve_table(2),
    serve_table(3),
    progress_indicator(0.5, loop)
)

#This is the entry from synchronous to asynchronous code.It will block
#Until the coroutine passed in has completed
loop.run_until_complete(gathered_coroutines)
#We're done with the event loop
loop.close()

请注意,现在,在创建协程集合之前,事件循环已经出现了。这是因为必须将此循环传递给 progress_indicator ,正如您在要收集的协程列表中看到的那样。

下面的输出来自一个样本运行:

.Welcome.Please sit at table 3 Here are your menus
..Welcome.Please sit at table 2 Here are your menus
Welcome.Please sit at table 1 Here are your menus
........Table 3 what will you be having today? 
......Table 1 what will you be having today? 
....Table 2 what will you be having today? 
..........[Order ready from kitchen:  Fish & Chips ]
Table 3 here is your meal: Fish & Chips
..............[Order ready from kitchen:  Fish & Chips ]
Table 2 here is your meal: Fish & Chips
......[Order ready from kitchen:  Pasta ]
Table 1 here is your meal: Pasta
..............................Table 3 here is your check
Thanks for visiting us! (table 3 )
..........Table 2 here is your check
..Thanks for visiting us! (table 2 )
......................Table 1 here is your check
..........Thanks for visiting us! (table 1 )
.

进度指示器的点会有规律地出现,约半秒出现一个。

这是何种类型的多任务?

如果您在 Python 中实现过多线程或多处理,您可能想知道它们与这种 asyncio 协作式多任务方法有何不同。主要区别在于,在 asyncio 方法中,不会实际尝试让两个协程在同一时间做一些事情,就像餐馆服务员无法在为 3 号桌上餐的同时为 1 号桌提供菜单一样。 asyncio 事件循环所做的是利用任务内的自然停机时间,允许协程在有工作要做时执行工作,但在其他协程空闲时将控制权转交给它们。

协程无法控制它再次运行的时间,此方法称为协作式多任务是有原因的。如果某个协程花了太长时间而没有将控制权转交回事件循环,它会阻塞一切操作,导致不必要的延迟,而且您将失去多任务的优势。这意味着您首先必须确保您的程序适合用这种方式实现,然后必须小心地编写程序代码,将它分解为会在恰当时机相互释放控制权的协程。这可能比听起来还要复杂,因为您可能无意识地从某个协程调用常规函数,这会花费很长时间,而且问题不会很明显。

一般来说, asyncio 事件循环最适合经常联网的程序,或大量查询数据库和类似对象的程序。等待远程服务器或数据库响应请求或查询时,是将控制权释放给事件循环的理想时刻。在过去,程序员倾向于在这种情况下使用线程,但与多线程相比, asyncio 事件循环是一种更加清晰和灵活的编程方式。一个难点是,要充分发挥 asyncio 事件循环的优势,需要在 asyncio 协程中对网络和数据库 API 进行编码。幸运的是,现在许多 Python 第三方库已实现了对 asyncio 的充分利用。

不过,有时您可能会遇到这样的情况:您想要使用 asyncio ,但需要使用不支持 asyncio 的库。换言之,您需要从异步代码中调用同步代码,而不破坏多线程。可以使用在单独的线程或流程中运行同步代码的 asyncio 执行器实现此目的。我提到这一点是因为您可能想了解这一点,但更多的细节超出了这些教程的讨论范畴。

结束语

随着您越来越精通 asyncio ,您会了解到与该技术相关的其他外来概念,包括令人印象深刻的“future”。您还会了解到,协程可通过不同方式将控制权释放给事件循环,包括 async with ,如果您使用的是 Python 3.6 或更高版本,还包括 async for 。由于本教程系列要求的最低版本是 Python 3.5,所以我不会讨论后者,但在下一篇教程中,您将学习 async with ,以及其他很酷的技术。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK