当前位置: 欣欣网 > 码农

深入理解Python异步编程(中)

2024-03-21码农


上篇教程【深入理解Python异步编程(上)】中,我们深入学习了异步编程的相关概念,如阻塞/非阻塞、同步/异步、并行/并发;也学习了异步的难点与在Python中的发展历程,学习了 asyncio 的原型,知道了事件循环(Event Loop)、回调(Callback)、协程(Coroutine)、未来对象(Future)、任务(Task)之间的联系,彻底弄清楚了异步编程是什么、为什么、怎么样。此篇教程将带领大家学习如何使用 asyncio

提示:本文共2万余字,建议先走马观花看个大概,再细读重点关心的部分。当然,也欢迎从头到尾仔细阅读。

内容安排

上篇

  • 了解 异步编程及其紧密相关的概念,如阻塞/非阻塞、同步/异步、并发/并行等

  • 理解 异步编程是什么,以及异步编程的困难之处

  • 理解 为什么需要异步编程

  • 熟悉 如何从同步阻塞发展到异步非阻塞的

  • 掌握epoll + Callback + Event loop是如何工作的

  • 掌握 Python 是如何逐步从回调到生成器再到原生协程以支持异步编程的

  • 掌握 asyncio 的工作原理

  • 中篇

  • 掌握 asyncio 标准库基本使用

  • 掌握 asyncio 的事件循环

  • 掌握 协程与任务如何使用与管理(如调度与取消调度)

  • 掌握 同步原语的使用(Lock、Event、Condition、Queue)

  • 掌握 asyncio 和多进程、多线程结合使用

  • 掌握 常用的异步编程模式和最佳实践

  • 了解 Python3.6到3.12各版本的asyncio的变化

  • 下篇

  • 理解 GIL 对异步编程的影响

  • 理解 asyncio 踩坑经验

  • 理解 回调、协程、绿程(Green-Thread)、线程对比总结

  • 掌握 多进程、多线程、协程各自的适用场景

  • 了解 Gevent/libev、uvloop/libuv 与asyncio的区别和联系

  • 掌握 Python异步编程的一些指导细则

  • 掌握 asyncio 在大型应用中的可扩展性和性能优化

  • :为减轻学习负担,本教程基于 Python 3.6,历史遗留的某些旧的、或不再推荐的用法将刻意避开。所以讨论细节时,除非文中特别提及其他版本做对比,可能只适应于 3.6。

    注2 :本文撰写时是Python3.6发布不久,更新时已是Python3.12。这期间的变动主要是性能改进、提供了新的上下文管理器和任务处理方法,并不影响本系列文章的主线内容,本文「中篇」会在文末列出3.6到3.12各版本的主要变化。若因作者疏漏导致文中有版本不兼容的代码示例,恳请大家斧正。

    1 原生协程语法

    1.1 async def

    在Python 3.5之后,引入了 async/await 关键字。用它们定义的函数/方法被称为原生协程。需注意,确切讲 async def 定义的是一个 原生协程函数 ,而 调用协程函数 得到的返回值才是该函数对应的 协程 。不过在口头交流中,一般不会刻意区分这两者。

    async def read_data(db): pass# 类型为 function,原生协程函数type(read_data)# 类型为 coroutine,调用协程函数返回的才是协程coro = read_data(db)type(coro)

    要注意的是, async def 的函数内可以没有 await 语句。另外,3.5 和 3.6 有所区别:

    async def func(): yield 42 # Python 3.5 语法错误async def func(): yield 42 # Python 3.6 正确,func() 是一个异步生成器async def func(): yield from coro() # Python 3.5 语法错误async def func(): yield from coro() # Python 3.6语法错误

    1.2 await

    await 用于接受可被异步等待的 awaitable 的对象的返回值,返回值可以绑定到一个变量。例如:

    async def read_data(db): data = await db.fetch('SELECT ...') ...

    可被异步等待的 awaitable 对象有如下几种:

  • 调用原生协程函数返回的原生协程;

  • @asyncio.coroutine 装饰的生成器函数调用后返回的生成器协程;

  • 实现了 __await__ 魔术方法的对象; __await__ 方法必须返回迭代器,否则会报 TypeError 。实现了 __await__ 方法的对象也被称为 类似未来对象 (Future-like)。

  • 关键字 await 后面只能跟着 awaitable 对象,否则会报 TypeError 。而且 await 必须在 async def 的函数内使用。那么问题来了:

    async def say_hello(): print('in say_hello') return 'Hello'async def say_world(): print('in say_world') return 'World'async def say_helloworld(): print('in say_helloworld') value = await say_hello() + await say_world() return value

    按上述代码, say_hello() say_world() 都使用 await 发起调用并接收返回值。那 say_helloworld() 这个入口又该怎么调用?又如何接收到它的结果呢?

    2 协程的调用

    在【上篇】中,我们已经学习了 EventLoop、Task、Future、Coroutine 之间的联系。 Future 用来放置异步操作的返回结果, Task 用来管理协程,把 Coroutine 对象封装到 Task 中,然后启动 EventLoop 程序就开始执行了。接下来学习如何使用 asyncio 来完成这一系列操作。

    2.1 内置事件循环

    在【上篇】中,我们学习到了如何让事件循环不用关心业务逻辑,而只做事件监听的技术操作。所以 EventLoop 可以作为基础技术组件事先编写好, asyncio 就提供了对 EventLoop 的封装。

    2.1.1 事件循环策略

    Python 内置的事件循环比我们在【上篇】中自己写的强大。首先,它引入了 事件循环策略 的概念。事件循环策略是全局对象,每进程一个。 asyncio 提供了API可以程序员获取和更改每个Python进程内的事件循环策略。如此,让应用开发有了更多的选择,可以使用 asyncio 默认的策略,也可以使用第三方提供或自己编写的策略。

    asyncio.get_event_loop_policy() # 获取当前的事件循环策略asyncio.set_event_loop_policy(policy) # 设置当前的事件循环策略

    我们可能听说过 uvloop 是一个很高效的第三方事件循环框架,以上两个接口就给我们提供了利用 uvloop 代替内置事件循环的可能。

    policy = uvloop.EventLoopPolicy() # 获取 uvloop 提供的事件循环策略asyncio.set_event_loop_policy(policy) # 用uvloop的循环策略代替内置策略

    事件循环策略中,又引入了 事件循环上下文 的概念。一个事件循环的上下文,就是指一个线程,即 每个线程可以设置不同的事件循环

    2.1.2 事件循环

    asyncio 提供了如下三个接口来获取或设置事件循环:

    default_loop = asyncio.get_event_loop() # 获取当前线程的事件循环default_loop = asyncio.get_event_loop_policy().get_event_loop() # 与上一行等效asyncio.set_event_loop(loop) # 为当前线程设置事件循环asyncio.get_event_loop_policy().set_event_loop(loop) # 与上一行等效new_loop = asyncio.new_event_loop() # 根据当前事件循环策略生成一个新的事件循环new_loop = asyncio.get_event_loop_policy().new_event_loop() # 与上一行等效

    调用 get_event_loop() 得到的是默认事件循环。如果是 Unix-like 系统,会调用 asyncio.SelectorEventLoop() 得到基于 epoll kqueue 选择机制的事件循环;如果是Windows系统,会调用 asyncio.ProactorEventLoop() 得到基于 IOPC (I/O完成端口)的事件循环。

    注意 ,这里我们遇到了 asyncio 的第一个坑。每个线程可以设置不同的事件循环,但是每个进程又只能有一个事件循环策略。导致想要使用多种事件循环的情况混乱。具体现象下篇再解释。先记住 经验1 同一个Python进程中,最好只使用一种事件循环策略,且在多线程情况下只使用该策略生成的事件循环对象。

    第二个容易被忽略的坑是,使用默认策略时,主线程能够 asyncio.get_event_loop() 得到默认事件循环,但是子线程内做此操作却不行。会报 RuntimeError ,提示当前线程无事件循环。 uvloop 的策略也是这样。再记住 经验2 子线程中有异步时,需在子线程内先设置事件循环,或将主线程中获取到的循环对象传递给子线程。

    经验1 经验2 可以推导出 经验3:一个Python进程最好只有一种事件循环策略,只使用一个事件循环对象。

    上述经验在Python < 3.6.x 版本中有效,随着Python的发展也许这些经验会发生变化。

    2.1.3 协程加入事件循环调度

    在【上篇】教程中,我们在 fetch() 协程内调用了 selector 并在其上面注册了事件和对应的回调,然后再把协程封装到 Task 中, loop() 里使用了与 fetch() 内相同的 selector ,所以 loop() 启动后即可监听 fetch() 内的事件并持续运行。

    在使用 asyncio 时,没有亲自调用 selector 注册事件,如何将协程封装为 Task 并加入到 asyncio 提供的 EventLoop 上呢?

    # 获取事件循环对象loop = asyncio.get_event_loop()loop.create_task(say_helloworld())

    按上述代码,我们就将 say_helloworld() 这个协程封装为了 Task 对象,并加入了 loop 中开始调度。只等着 loop 启动, say_helloworld() 就会被执行。

    2.1.4 事件循环的启停

    asyncio 的事件循环对象提供了下列方法控制它的启停和状态检查。

  • loop.run_forever() :启动事件循环,直到 stop() 方法被调用。若先调 stop() 后调 run_forever() ,则把已经加入到 loop 中的任务全部执行完后退出退出;若先调 run_forever() 后调 stop() 则把当前已加入 loop 的任务执行完然后退出。

  • loop.run_until_complete(coro_or_future) : 启动事件循环,直到传递给它的协程执行结束或未来对象 set_result() 得到结果,并且返回协程或未来对象的结果,然后结束事件循环。故,此方法可以同时调度协程运行并得到返回值。如果在调用此方法前已经向 loop 加入了其他任务,则其他任务也会执行。

  • loop.is_running() loop 是否在运行中。

  • loop.stop() : 停止事件循环的运行,但不会立即停止,参看 run_forever() 说明。

  • loop.is_closed() : loop 是否已被关闭。

  • loop.close() : 关闭 loop ,所有已调度但未执行的任务将被强制清理,也不会等待正在执行的任务结束。这个操作是毁灭性的,需谨慎。当 loop 被关闭之后,它则不可以再次运行,若要再次调度任务,必须使用新创建的 loop 。若一个 loop 正在运行中,则无法被关闭,调 close() 是会报 RuntimeError 的。

  • 有些同学懵逼了, stop() close() 为啥要存在这两种不同的方法?因为这是两个不同的概念。 loop 就像汽车的引擎, stop() 是熄火, close() 是报废。当引擎出现故障时,需要把它报废,报废之后必须换新的才能用。如果只是正常的跑完了目标距离,可以熄火,有了新任务重新点火就跑。正在运行中的引擎是不可以强制报废的。

    2.2 放在一起执行

    现在我们终于知道了如何解决 1.2 节末尾的问题。如下代码所示:

    import asyncio# 根据当前事件循环策略获取事件循环# 得到的是 asyncio 默认的 UnixSelectorEventLooploop = asyncio.get_event_loop()...loop.create_task(say_helloworld())loop.run_forever()

    上述代码完成了 say_helloworld() 协程的调度和执行, say_hello() say_world() 也会随之被调用。但是上述代码并不能得到 say_helloworld() 的返回值,怎么才能输出它的返回值 'helloworld' 呢?

    import asyncioimport uvloop# 顺便演示替换事件循环的用法# 用 uvloop 的事件循环策略作为 asyncio 的默认策略asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())# 根据当前事件循环策略获取事件循环# 得到的是 uvloop.Looploop = asyncio.get_event_loop()...retval = loop.run_until_complete(say_helloworld())print(retval)

    上述代码同样完成了对 say_helloworld() 的调用,和前一种方案不同的是:事件循环被替换了,循环执行完任务以后会自动退出,接收到了协程的返回值。

    2.3 总结

    本节我们学习了如何定义原生协程,如何在协程内如何调用别的协程,如何在将协程封装为任务加入事件循环,如何启停事件循环,如何接收协程的返回值,如何替换 asyncio 的默认事件循环。还介绍了一些应该注意的采坑经验。

    到此为止,我们已经学习了利用 asyncio 库调度协程运行的基本用法。接下来,我们将学习 asyncio 提供的 Task Future 提供了哪些接口以及如何使用。以免在面对复杂问题时不知所措。

    3 Future 和 Task

    3.1 回顾 asyncio 核心原理

    在【上篇】中,我们学习了asyncio的核心运行原理,讲解了 EventLoop + Coroutine + Task + Future 是如何协同完成异步程序的调度和执行的。

    下面我们看看【上篇】异步爬虫的关键运行过程, asyncio 的框架的核心也是如此。

    asyncio 核心原理图

    如上图所示,asyncio 封装了事件循环、任务对象、未来对象。程序员用协程编写需要异步处理的业务,然后把这些协程按 asyncio 提供的接口转换为 Task 任务对象,再启动事件循环,当事件发生以后,给对应的 Future 对象设置结果,再执行到 Task 里的 step() ... 这样一直循环下去,就能执行完所有异步任务。

    聪明的读者看到此处,应该会有两个疑惑:

    1. 不是改为原生协程了吗,为何图上还写着 yield 解释生成器版本的协程?

    2. 在第2节的学习中,并没创建 Future 对象,怎么回事?

    3.1.1 无需 EventLoop 执行原生协程的魔法

    先解释第一个疑惑,为什么上述原理图中还画着生成器协程。在【上篇】教程中我们提及过,基于 async/await 语法的原生协程背后和生成器协程复用着共同的实现。来看一段代码:

    # 复用 1.2 节的 say_hello(), say_world(), say_helloworld()# 新写一个 run() 函数def run(coro): try: coro.send(None) except StopIteration as e: return e.valueresult = run(say_helloworld())print(result)

    上述代码执行后会打印出 HelloWorld ,没有 EventLoop 也可以执行协程,我们做到了。其原因就是 async/await 的背后还是生成器。可以看出,原生协程里的 return 其实是把返回值放到异常对象里,在处理异常时读出返回值。这点是不是和 Tornado 一样?

    本小节只是为了演示原生协程的背后和生成器的关系。实际编程过程中我们不要玩这个技巧。因为自己要正确处理协程的所有事情很不容易。还是乖乖地按照 asyncio 规定的接口去编程。

    3.2 Future

    Future 是用于产生表示未来执行结果对象的类。所有的异步任务的返回结果,实际都会被封装到 future 对象中。

    3.2.1 Future与Task的瓜葛

    通过【上篇】的学习,我们已经掌握了自己怎么写 Future Task 的核心部分。重新审视一下设计理念, Future 代表的是「未来的执行结果」。 Task 是为了把协程包装成任务,并驱动协程的执行,实际就是说 Task 表示「未来将要执行的异步任务」。

    若把「任务」和「未来对象」做完善,那「任务」当然需要知道自己是否还在执行、其结果是什么、如何注册或注销等等,「未来对象」也需要知道自己等待的结果好了没有,可以选择等或者不等这个结果。它们的行为很相似。从3.1节的原理图中可以看出 Task Future 的关系紧密。

    再审视一下代码, Task 的初始化方法中使用到了一个 Future 对象,这个未来对象除了代表一个 None 值,没其他用处。而且「任务」和「未来对象」那么多相似行为, Task 的主要不同只有 step() 方法。

    基于多种因素考虑, asyncio 里的 Task 是派生自 Future 的子类。3.1节第二个疑惑的答案就在此处, create_task() 的时候,实际上也创建了 future 对象。

    3.2.2 Future 未来对象

    注意要和 concurrent.futures.Future 区分开,这两者目前不完全兼容。 concurrent.futures.Future 是为多线程和多进程执行异步任务而准备的,而 asyncio.Future 是为单线程内用协程做异步编程而准备的。

    要使用 Future 对象,首先得学会创建 Future 对象。显式地创建和获取它有如下两种方式。

  • 方式1: fut = asyncio.Future(loop=None)

  • 方式2: fut = loop.create_future()

  • 方式1创建对象时可以传递 loop 参数,若不传递则为默认值 None ,将使用 asyncio.get_event_loop() 获取事件循环。方式2创建的则使用调用 create_future() loop

    显式地创建出来的 Future 对象没有和任何协程关联,只是被创建出来等待我们主动使用它。 Future 也实现了 __await__ 方法,它们是可以被 await 关键字调用的。

    两种创建未来对象的区别 在于,方式1的与其绑定的事件循环是默认用 asyncio.get_event_loop() 得到的,而方式2的则是与调用 create_future() loop 绑定。在实际使用时,根据需要选择创建方式。

    下面了解 asyncio.Future 对象具有的方法:

  • cancel() :取消 future 并执行它的回调函数。若它已经拿到结果或已被取消,返回 False ,否则,取消它并执行回调,再返回 True

  • cancelled() :若 future 已被取消则返回 True

  • done() :若 future 已经拿到结果,或抛了异常,或被取消成功都返回 True

  • result() :返 future 代表的结果。若已被取消,则抛出 CancelledError ;若还未拿到结果,则抛出 InvalidStateError ;如果既拿到结果,也有异常,则抛出异常。

  • exception() :返回在 future 上设置的异常。如果没有则为 None

  • add_done_callback(fn) :为 future 设置回调函数,这些回调函数将在 future 拿到结果后执行。 注意 ,这里只接受一个可调用的函数 fn ,当执行回调的时候,只会给这个函数传递一个参数,就是 future 自己,即 fn(self) 。如果想给回调函数传递额外参数,需借助 functools.partial

  • remove_done_callback(fn) :从 future 的回调函数列表里删除 fn 的所有实例。如果用 fn==callback True ,则该callback就会被移除。

  • set_result(result) :为 future 设置结果,并标记其状态为已完成。若 future 已经被完成,则抛出 InvalidStateError

  • set_exception(exception) :为 future 设置异常,并标记其状态为已完成。若 future 已经被完成,则抛出 InvalidStateError

  • 3.2.3 Future 的使用场景

    通过第2节的学习,我们学会了无需显式调用 Future Task 类,也能够调度协程到 EventLoop 上运行。但有些时候不够灵活,例如,我们想在某个 Future 得到结果后,不仅仅是返回结果,而且要做一个其他操作,怎么办?这时候就需要显式地使用 Future 对象。

    import asyncioloop = asyncio.get_event_loop()async def take_exam(fut): # 进行考试 await asyncio.sleep(1) fut.set_result(100) return 'Exam is completed.'def check_score(fut): score = fut.result() if score >= 60: print("Passed.") else: print("Failed.")fut = asyncio.Future()fut.add_done_callback(check_score)task = asyncio.ensure_future(take_exam(fut))retval = loop.run_until_complete(fut)print(task.result()) # Exam is completed.print(fut.result()) # 100print(retval) # 100

    在上述代码中,我们定义了 take_exam() 协程函数,接收一个 Future 对象作为参数,在协程内会进行异步非阻塞代替非阻塞I/O操作(用 sleep() 代替),当异步操作完成以后,再设置 fut 对象的值,最后该协程自己也返回一个数据。

    asyncio.ensure_future(coro_or_future) 函数会将传递给它的协程封装为 Task ,加入 EventLoop 的调度,并返回该协程的 Task 对象。故而 task.result() 是协程 return 的值。关于 ensure_future() 的更多细节在【下篇】中继续讨论。

    3.3 Task

    Task 主要用于管理需调度到 EventLoop 上执行的协程对象。它的行为与 Future 很类似,它实际也是 Future 的子类,所以 Future 具有的操作它也有。参考 3.2.1节。

    显式地创建 Task 对象有如下三种方式:

  • 方式1: task = asyncio.Task(coro, *, loop=None)

  • 方式2: task = asyncio.ensure_future(coro, *, loop=None)

  • 方式3: task = loop.create_task(coro)

  • 以上三种方式都能获取封装了 coro 协程的 task 对象,并且把 task 调度到相应的事件循环上。区别在于,方式1和方式2可以接受一个可选关键字参数 loop ,会把 task 调度到传递的那个 loop 上去,默认为 asyncio.get_event_loop() 获取的 loop 。而方式3则会将 task 调度到调用 create_task() loop 上。

    3.3.1 ensure_future() create_task() 区别

    ensure_future() 里实际调用的是 create_task() create_task() 里实际调用了 Task 来创建对象。为什么有了 create_task() 以后还需要 ensure_future() 呢?

    如果传递给这两个函数的参数是协程,它们没什么区别。如果要把 Future Future-like (实现了 __await__ 方法)的对象传递给 create_task() 就不行了,它只接受协程。 ensure_future() 在接收到 Future 对象时,什么也不做,直接返回该 Future 对象,而接收到 Future-like 对象时,需要先把 Future-like 对象包装到一个协程里,再把协程传给 create_task()

    3.3.2 Task 对 Future 的扩展

    Task Future 的子类,它除了具有 Future 所具有的方法外,还扩展了如下几个方法:

  • classmethod all_tasks(loop=None) :类方法,用于获取已调度到 loop 上的所有任务。

  • classmethod current_task(loop=None) :类方法,用于获取 loop 上正在执行的任务。

  • get_stack(*, limit=None) :获取任务对象的栈帧,若该任务对象状态已经为 done ,则返回空列表。

  • print_stack(*, limit=None, file=None) :将任务对象的栈帧输出到文件。

  • 3.3.3 Task 的使用场景

    得到 Task 对象以后,可以通过 add_done_callback() 来为它设置回调函数。所以可以通过 Task 把 Twisted 这种回调风格的框架和 asyncio 这种原生协程的框架结合起来。

    一般地,在Python 3.6及其之后,推荐尽可能使用原生协程来做异步编程。而在这种情况下,我们无需直接调用 Task 类的初始化方法来得到协程的任务对象,而是用 create_task() ensure_future()

    3.4 总结

    本节我们学习了 asyncio 库中 Future Task 的核心概念和使用方式。 Future 对象作为异步操作的结果占位符,为异步编程提供了极大的灵活性和控制能力。而 Task ,作为 Future 的子类,进一步封装了协程的调度和管理,使得异步编程更加高效和易于管理。通过深入理解这些概念,读者可以更好地掌握如何使用 asyncio 来构建高效、可读性强的异步应用。

    4 协程的调度与执行

    4.1 单个协程的调度

    在异步编程中,理解如何正确地调度和执行单个协程是很重要的。

    4.1.1 协程的定义

    在Python中,协程通过 async def 定义。这种定义创建了一个协程函数,调用这个函数会返回一个协程对象。

    async def my_coroutine(): # 协程的逻辑 ...

    4.1.2 调度单个协程

    Python 3.7及更高版本中,可以使用 asyncio.run() 来启动和调度协程。这个函数接收一个协程对象,运行它,并自动管理事件循环。

    import asyncioasync def main(): # 协程的逻辑 ...# 启动协程asyncio.run(main())

    4.1.3 协程的等待与结果获取

    在协程内部,可以使用 await 来等待另一个协程的结果。 await 会暂停当前协程的执行,直到等待的协程完成。

    async def fetch_data(): # 模拟数据获取 await asyncio.sleep(2) return "data"async def main(): data = await fetch_data() print(data)asyncio.run(main())

    4.1.4 错误处理

    协程中可能出现的异常可以使用 try...except 块来捕获和处理。

    async def might_fail(): raise Exception("出错了!")async def main(): try: await might_fail() except Exception as e: print(f"错误捕获: {e}")asyncio.run(main())

    以下是一个完整的示例,展示了从定义协程函数到启动协程,并获取结果的流程:

    import asyncioasync def compute_square(number): await asyncio.sleep(1) return number * numberasync def main(): result = await compute_square(2) print(f"结果: {result}")asyncio.run(main())

    通过以上步骤,我们展示了如何创建、调度,并执行单个协程,同时处理可能出现的错误。这为深入理解Python中的异步编程奠定了基础。

    4.2 同时执行多个协程

    前文我们已经学习了 Future Task 如何使用以及它们的区别和联系,也知道如何利用 EventLoop 执行异步任务。

    但仍有个问题,回到【上篇】中的爬虫代码,之所以利用异步编程提高了它的整体效率,是因为我们一次性创建了10个不同的抓取任务, EventLoop 同时监听这10个异步任务的 sock 对象的状态。同理,转换到 asyncio 库时,我们也需要能一次性创建多个任务,并执行。

  • 方式1: asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

  • 方式2: asyncio.as_completed(fs, *, loop=None, timeout=None)

  • 方式3: asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

  • 4.2.1 asyncio.wait

    用于等待多个未来对象或协程的执行。需注意本方法也是一个协程,需要用 await 关键字来调用或使用 EventLoop 调度协程执行的接口来执行。

    asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

  • futures : 必选参数,表示多个 Future Coroutine 对象,如果是 Coroutine 则会被封装为 Task

  • loop : 可选参数,调度 futures 执行的事件循环对象,默认为通过 get_event_loop() 获取。

  • timeout :可选参数,等待 futures 执行的超时时间(秒),默认为没有超时限制。注意,并不会抛出超时异常,只是结束 wait

  • return_when : 可选参数,表示 wait 返回的时机。可以取的值为 FIRST_COMPLETED , FIRST_EXCEPTION , ALL_COMPLETED ,分别代表「执行完一个之后」、「碰到第一个异常之后」、「所有都完成之后」。

  • asyncio.wait 返回一个包含两个元素的元组,分别为 done pending 状态的未来对象或协程集合。默认情况下pending状态的集合为空,因为没有超时限制且等待了所有任务执行完成。

    done, pending = await asyncio.wait([say_hello(), say_world()], timeout=1.5, return_when=asyncio.FIRST_COMPLETED)

    上述代码演示了如何在协程内调用 wait 等待多个异步任务的执行结果。示例中设置了1.5秒的超时时间,而且 wait 的返回时机为 FIRST_COMPLETED 第一个完成后。

    注意 :超时并非严格的指协程从头执行到尾是否超过该时间限制,某些情况下 done pending 集合中得到的结果非预期;例如协程中有计算密集型任务耗时较多,而非阻塞异步操作的时间又恰好与超时限制一致,此时最稳妥的办法是遍历这两个集合中的任务,根据 task.done() 自行判断任务的状态。当然这种极端情况很少,不过我们需要知道存在这种问题,以免遇到 done pending 集合非预期时不知所措,更详细的部分在【下篇】中探讨。

    如果要获取已经执行完成的异步任务的返回值,可以取出 done 集合中的 Task 对象,通过 task.result() 获取。

    4.2.2 asyncio.as_completed

    asyncio.as_completed(fs, *, loop=None, timeout=None) 此函数接收一组 Future Coroutine 对象,返回一个迭代器。可以接收 loop timeout 参数,默认无超时限制。

    其用法如下:

    async def main(): values = [] for f in asyncio.as_completed([say_hello(), say_world()], timeout=2): ret = await f values.append(ret) return values

    需要注意三个点,一是迭代器返回的future或task对象的顺序与传参时的顺序无关,二是超时以后会抛出 asyncio.TimeoutError 异常(与 asyncio.wait() 函数的超时不同),三是迭代器中的每个 future 对象最好都用 await 等待其结果,否则本该被异步调用的任务没有被异步调用,程序可能会有非预期行为。

    当有多个协程或未来对象需一个接一个地、顺序无关地执行,且需要设置超时限制时,本函数 as_completed() 适用。否则,直接用 for 循环迭代协程列表一个一个地 await 就可以了。

    4.2.3 asyncio.gather

    asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False) 本函数接收多个协程或未来对象,并返回一个 future 对象,通过 await loop 等待该 future 的结果则可得到该组异步任务的结果列表,顺序与传参时对应。当 return_exceptions False 时,则遇到异常就抛出,否则将异常作为结果返回到结果列表中,并不抛出异常。

    当有多个协程或未来对象需同时执行,也要获取其返回结果时,本函数 gather() 适用。

    用法如下:

    results = await asyncio.gather(say_hello(), say_world(), return_exceptions=True)# 或者results = loop.run_until_complete(asyncio.gather(*[say_hello(),say_world()]))

    4.3 总结

    本节我们学习了如何利用 Future Task 对象对协程进行调度和执行。在 asyncio 中没有显式的调度器,只需要通过 create_task ensure_future run_until_complete 等即可将协程加入 loop 调度执行。我们还学习了单个协程和多个协程的调度执行。

    5 同步操作

    虽然大多数时候我们都是在单线程内使用协程并发地执行异步任务,但即便是单线程内,协程的调度以及完成顺序难以预测和控制。而在某些场景下,我们需要让一组有关联的协程按顺序执行,这时候我们就需要把异步的变成同步的。

    asyncio 提供了 队列 两种机制来把多个异步任务变成同步执行的。

    5.1 锁

    顾名思义,是一种保护措施。在计算机术语中, 锁是指用来保护临界资源的机制 。临界资源是指不能被被多个程序同时访问/操作的数据(包括用数据结构来表示的文件、网络连接等任何资源)。例如存储了库存、银行账户余额的变量,它们就是临界资源,不能够被多个程序同时去修改。

    当一段程序想要访问临界资源时,必须取得保护该临界资源的锁的权限,才可以进一步操作该临界资源。 站在临界资源的角度,锁是保护它的,使临界资源在整个生命周期内保持正确性。站在异步并发程序的角度,锁是一种限制它访问临界资源的同步机制,以避免并发产生的副作用。

    单纯地讲「锁」这个术语,就是上述的概念,还比较基础和抽象。为了应付软件的复杂性和沟通的效率,在不同的情况下还会衍生出不同类型的锁,其命名也花样百出。有些叫法是从临界资源是什么格式的数据角度出发,例如「表锁、行锁、文件锁」;有些叫法是从保护临界资源不被什么行为操作而变更的角度出发,例如「读锁、写锁」;还有些叫法是从锁的实现原理上命名,例如「乐观锁、悲观锁、自旋锁」;还有的叫法则是从锁是否可以被多个程序共享或互斥性质角度,如「排他锁、共享锁、独占锁、互斥锁」;还有的称呼是描述并发程序在获取临界资源时而进入了某种特殊状态,如「死锁、活锁、饥饿」;还有的锁的实现依赖于分布式系统,则称之为「分布式锁」……

    希望大家不要对些数不清的锁望而生畏,其实锁的基本原理、基本特征、存在的意义就本节开头表述的那一句话而已,不过是针对具体场景、具体实现、或者观察角度不同而叫法不同。

    就像生活中,有「电子锁、机械锁、自行车锁、门锁、自动锁、指纹锁、U型锁、环形锁、铜锁、金锁、智能锁、愚蠢锁……」。锁是什么?锁就是锁!计算机软件中也一样。

    asyncio 为协程提供了5种具体的实现,分别是 Lock Event Condition Semaphore BoundedSemaphore 。下面介绍它们的作用与用法。

    5.1.1 Lock

    asyncio.Lock 实例化以后可以得到原始的锁对象,它只具有 locked unlocked 两种状态。刚初始化的 Lock 对象的状态是 unlocked 。可以通过 acquire() 方法将状态从 unlocked 更改为 locked ,反之,通过 release() 方法可以把 locked 改为 unlocked

    Lock 对象主要会用到以下三个方法:

  • locked() 检查锁对象是否处于 locked 状态,返回 True False

  • acquire() 加锁,把锁对象改为已加锁状态,并返回 True 。如果多个协程都调用 acquire() 方法来加锁,那么会按照调用顺序依次加锁。

  • release() 释放锁,把锁对象从 locked 改为 unlocked 状态,然后返回,以便别的协程可以加锁。如果锁对象本就处于 unlocked 状态,再调用 release() 会报 RuntimeError

  • 注意, acquire() 是一个协程,需按协程的方式调用。 Lock 对象还实现了上下文管理器,可以使用 with 语句。

    示例代码如下,请按自己的想法多做一些修改尝试然后执行观察效果:

    用生活中的场景类比,几个人(不同的任务)要吃同一口锅里的饭(共享数据),但只配了一把勺子(锁),只有拿到勺子的人可以去锅里打饭吃(访问或修改共享数据),而且一个人用完勺子(释放锁)后,该让谁(在争用锁的协程)接着用,是按先来后到的顺序。

    根据这个比喻大家还可以思考一下,更多的人、更多的锅、更多的勺子、有的饭只能被一个勺子打、有的饭可以同时被多个勺子打等这多种场景应该如何处理?同样可以借鉴到编程中来,以下给出 Lock 的使用场景:

    1. 互斥访问共享资源 :当多个协程需要访问相同的共享资源(例如共享数据结构、文件等)时, Lock 可以防止数据竞争和不一致性,确保每次只有一个协程能访问该资源。

    2. 防止资源重入 :在某些情况下,一个协程在完成前不应被再次调用。使用 Lock 可以防止这种重入,确保协程安全完成其工作。

    5.1.2 Event

    Event 事件 的实现。这种对象可以理解为事件通知器。多个协程可以利用它等待某个事件发生并获取通知。它内部有一个标志变量,初始化时为 False ,调用它的 wait() 方法后进入阻塞态,等待它的标志变为 True ,即等待事件的发生。当一个事件发生后,可以通过 set() 方法来改变标志状态为 True


    通过上述例子,可以看出 Event 的作用跟比赛用的信号弹类似,信号弹爆炸了大家都得按它行动。一颗信号弹也只能用一次。以下给出 Event 的使用场景:

    1. 协程间的信号通知 :当一个协程需要等待来自另一个协程的信号时,可以使用 Event 。例如,在一个数据处理流程中,数据生产者可以通过设置 Event 来通知数据消费者开始处理。

    2. 状态变更通知 :如果系统的某部分状态发生变化(例如配置更新、系统就绪等),并且这个状态变化需要通知给多个协程, Event 可以作为一个简单的通知机制。

    5.1.3 Condition

    Condition 条件变量 的实现,它可以让一个或多个协程一直等待某个条件达成以后再继续运行。 Conditon Event 很像,都是通过一个信号让别的协程继续运行下去,但有不同点:

    1. Event 是一次性通知所有阻塞在该事件上的协程,而 Condition 可以分别通知其中的一个或多个,而未被通知的继续等待。

    2. Condition 底层实现使用了 Lock 对象,而 Event 没有。

    关于上述第1点我们参考如下示例:

    如果仅按上述示例的用途, Condition Event 几乎没有差异,尤其是用 cond.notify_all() 的时候,效果跟 Event 一毛一样,这不是重复造轮子吗?那么 Condition 的特别之处有啥?

    源于上述第2点不同。那为啥 Condition 就得依靠锁来实现呢?其实 条件变量 设计的本意比 Event 复杂,它通常会和三种对象关联: 指示条件是否已满足的对象、串行化访问该对象的互斥对象、等待条件的条件变量

    条件变量通常和决定是否成立的共享对象关联,所以条件变量的底层实现使用了互斥锁对象 Lock 。提供了 acquire() release() 方法,当某个协程想要改动共享数据,告诉别人条件已修改,就必须使用 acquire 来取得修改权。

    Condition 对象还提供了 coroutine wait_for(predicate) 方法,参数 predicate 需要是一个直接调用就可以得到返回值的函数。当 predicate() 的返回值的布尔上下文为真时,代码将继续执行。上述例子我们还可以换成如下写法:

    本小节讲解了 asyncio 的同步机制之一:条件变量(Condition)。并展示了其三种主要使用方法,每种使用方法都适用于不同的情景,需在实践中不断体会。以下给出常用场景:

    1. 生产者-消费者 问题:在生产者和消费者模式中, Condition 可以用来确保消费者只在有数据可消费时才消费,同时允许生产者在生产新数据时通知消费者。这种情况下, Condition 用于同步生产者和消费者对共享资源(如队列)的访问。

    2. 等待多个协程完成特定任务 :在某些情况下,可能需要等待多个协程完成它们的任务,直到一个特定的条件被满足(如数据达到特定状态)。这时, Condition 可以用来阻塞一个协程,直到另一个协程改变了状态并通知该条件对象。

    3. 顺序控制 :如果有多个协程必须按特定顺序执行, Condition 可以用来控制执行流程,确保每个协程在前一个协程完成特定任务后才开始执行。

    4. 定期检查共享资源 :在某些应用中,可能需要定期检查共享资源的状态,并在状态达到特定条件时执行操作。 Condition wait_for() 方法可以在这种场景下很好地使用,它允许协程在条件满足时被唤醒并执行相应的操作。

    在这些场景中使用 Condition 的主要优势在于其能够提供比简单的 Lock Event 更复杂的同步机制,允许在复杂的协程间同步场景中精确控制执行流程。

    5.1.4 Semaphore

    Semaphore 信号量 ,原本是铁路交通系统中的一个术语,后被荷兰计算机科学家Dijkstra引入计算机科学领域。 用于控制仅支持有限个用户同时操作的共享资源 ,在计算机系统中有着广泛的应用。

    信号量对象的内部会维护一个计数值,取值范围是任意正整数,默认为1。当一个协程中对信号量进行一次 acquire() ,其计数值就减1, release() 一次则加1。当计数值减至0时,后来的协程将阻塞在 acquire() 操作上,直到有其他协程 release() 该信号量。

    当信号量对象内的计数值初始化为大于等于2的正整数,称其为 一般信号量 计数信号量 ;计数器初始化为1时,则其变动范围只有{0,1}两种可能,又被称为 二进制信号量 ,二进制信号量又被称为 互斥锁 。虽然 asyncio 中的 Lock Semaphore 分别由两个不同的类实现,但其源代码十分相似。从原理上我们已经知道,互斥锁是信号量的一个特例。

    threading 模块也提供了 Semaphore ,原理都一样,使用方法也非常相似。不再赘述。

    asyncio信号量

    上图示例演示了 Semaphore 对象的用法,只允许3个worker同时被调度执行,第四个和第五个worker必须等待前面的release()之后,有「名额」了才能执行。以下给出信号量的使用场景:

    1. 限制资源访问 :在有限资源(如数据库连接、网络带宽)的环境中,使用信号量可以限制同时访问这些资源的协程数量,防止资源过度使用。

    2. 控制并发度 :在执行并发操作时,如API调用或网络请求,信号量可以用来控制同时运行的操作数量,以避免超出服务器或API的负载能力。

    5.1.5 BoundedSemaphore

    BoundedSemaphore 称为 有界信号量 ,它和一般信号量只有在计数值管理上存在细微差别。有界信号量的计数值的初始化值就是上限,不允许越界,而 Semaphore 的计数值在初始化之后却是可以通过多次 release() 来提高上限。

    以下给出有界信号量的使用场景:

    1. 严格控制资源访问上限 :在需要严格限制同时访问某个资源的最大协程数时使用,确保资源不会因为程序错误而被过度使用。

    2. 防止程序逻辑错误 :在设计需要精确控制资源访问计数的场景时, BoundedSemaphore 可以帮助捕捉程序中的逻辑错误,如不恰当的release()调用。

    3. 资源池管理 :管理有限资源的池(例如数据库连接池),确保池中资源数量不会因错误操作而超出预设的界限。

    5.2 队列

    队列本质是一个用于 存储容器 ,它的使用模式是「 存进去——取出来 」,正是因为这样的过程,所以队列的基本职责就是「 缓冲、暂存 」。

    在此基础上,让存于其中的元素们 具有某种特征的存取顺序 ,就构成了不同类型的队列,常见的有 先进先出队列(FIFO Queue),优先级队列(Priority Queue),后进先出队列(LIFO Queue)

    由队列的 存储 实质可以看出,存储组件(如磁盘、数据库)具有的考量点它都具有,比如需考察队列的读写能力(I/O吞吐能力)、存储容量、可靠性(数据防缺损)等等。

    正因为是「 存进去——取出来 」模式,故而队列扮演的角色就是 协调数据存入方与数据取出方 ,另一个称呼是数据的 生产者和消费者 那么什么时候需要用到队列?很简单,既然是协调生产者和消费者,这两者不协调的时候就让它上!

    那具体该如何思考?之前我们讲过, 认识事物,从其时间结构和空间结构着手 ,当生产者与消费者在时间或空间上不协调时,队列就可以发挥作用。比如,生产者与消费者的工作速率不匹配,或者它们无法在同一个时间点直接交互(比如消费者数量不够),就是时间上的不协调。具体场景如高并发应用,产生请求的速率太高(QPS很高)而处理请求的程序(服务端的线程/协程数有限),所谓 削峰填谷 是也。再比如,生产者不论主动或被动地无法直接与消费者交互,这就是空间不协调,比如 程序解耦 ,跨域(跨作用域、线程、进程、网络域等等,不同级别的程序,有不同级别的队列)通信等。

    我们只有像上面这样探究,从根本性质出发,才不会被各种表象所迷惑,也不会人云亦云而遇到困难时手足无措。对事物的本质认识越深刻,掌握它越轻松,越能够应付千变万化。

    现在我们要学习的是Python标准库 asyncio 里提供的 异步队列 ,无他,不过是「 存——取 」这两个操作支持异步罢了。

    5.2.1 Queue

    asyncio.Queue 是一个搭配协程使用的 先进先出 队列,跟搭配线程使用的 quque.Queue 和搭配进程使用的 multiprocessing.Queue 很相似,只是搭配的对象不同,进而实现细节不同。

    asyncio.Queue(maxsize=0,*,loop=None) 有2个初始化参数, loop 指明调度该队列的事件循环对象, maxsize 限制队列的最大长度 。当maxsize <= 0 时,队列无限长度;当maxsize>0时,如果已经存入队列的元素数量超过限制(队列已满),则后来的元素想 put() 存入队列,则会阻塞,直到别的元素被 get() 取出留出空位。

    asyncio.Queue 的常用方法如下:

  • empty() 检查队列是否为空,为空返回True,否则返回False。

  • full() 检查队列是否已满,满了返回True,否则返回False,无限长队列只会返回False。

  • coroutine get() 从队列中取出元素,如果队列为空,则一直等到有元素可取。

  • get_nowait() 从队列中取出元素并立即返回该元素,如果队列为空则抛出 QueueEmpty 异常,而不是等待有元素可取出。

  • coroutine put(item) 向队列中存入元素,若队满则等着,直到有空位再存入。

  • put_nowait() 向队列中存入元素,若队列已满,则抛出 QueueFull 异常。

  • qsize() 返回队列中已有元素的数量,注意这个值并不准确,因为非阻塞队列在被统计时,也有元素在不断地进出。

  • coroutine join() task_done() 这两个方法是 Python 3.4.4 版才新加入的,它俩需要搭配使用。这两方法稍显鸡肋,后文分析。

  • Queue 示例代码:

    大家先忽略上述代码中第13、14、26、27行,即调用 task_done() join() 的地方, 实践中也不必写这几行代码,它们对元素的存取并无影响 。剩下的代码很容易理解,producer 不断地存入元素,只放了5个元素后退出,而 consumer 不断地取出元素进行处理,直到队列为空时退出。

    join() task_done() 的探讨(不关心可忽略) Queue 内部维护了一个名为 _unfinished_tasks 的计数变量,其初始值为0,每当调用 put() 或者 put_nowait() 时该变量的值加1,而调用 task_done() 时,该变量值减1。调用 join() 后,若该值大于0,则会阻塞着直到该值变为0,然后返回 join() 处继续往下执行。

    问题来了,名为「未完成」的计数变量其增加是因为存入元素,而较少却并不是因为其直接取出元素,通常做法是取出元素进行处理之后,调用 task_done() 来减少该值。如果对这两个方法不熟悉的同学,可能并不会将它们配对使用,就会出错,比如程序执行到 join() 处就不继续执行了,即便队列中元素已经取出完毕。

    鸡肋的另一点原因,因为队列中不定时地有元素在进进出出,并不能确切地知道 _unfinished_tasks 变量何时变为0。那么 join() 阻塞后再被唤醒的时机不好控制,如果刻意控制则会使代码逻辑变得晦涩。

    关于 join() task_done() 的讨论我们就此打住,像更深入探讨的同学欢迎留言。驹哥更推荐使用 asyncio.Event 或者其他同步机制来显式而明确地控制协程在某种条件下的切换时机。而 join() task_done() 背后的实现就是基于 asyncio.Event 的。

    5.2.2 PriorityQueue

    PriorityQueue 的对象所具有的方法跟 Queue 完全一样,只是元素取出的顺序是按优先级取出。通常用 (priority number, data) 这样的二元组表示一个元素, 注意, priority number 值越小优先级越高

    PriorityQueue 底层实现依赖于 heapq ,实际上,队列中个元素的比较是按**富比较(Rich-Compare)**的方式进行的,所以存入其中的元素不仅可以是上述二元组的形式,还可以是字符串、整数、小数、自定义的对象等各种凡是实现了 富比较 方法的对象。

    但需注意付比较的细节规则,比如 (1,2,3) 不可以和 (1,2,'3') 比较,因为他们的第三个元素的数据类型不同,整数无法与字符串直接比较。但 (1,2) 是可以和 (1,2,3) 比较的。关于 富比较 的更多细节,本微信公众号" 驹说码事 "以后再发文解释。 在彻底理解富比较之前,要存入优先级队列的元素先纯数字或二元组表示。一般情况下我们都应该这么做,让程序足够简明。

    5.2.3 LifoQueue

    LIFO(Last-In-First-Out)顾名思义它是 后进先出 队列。它所拥有的操作方法也跟 Queue 完全一样,仅仅是元素取出的顺序是越后存入队列的越先被取出。

    其底层实现依赖于内置数据类型 list put() 方法里直接调用 list.append(element) get() 方法则直接调用了 list.pop()

    5.2.4 异步编程中Queue的使用场景

    1. 任务调度 :在异步应用中,用于在生产者和消费者之间分发任务,特别是当任务生产和消费的速率不一致时。

    2. 流量控制 :当处理高并发请求时,asyncio.Queue可以作为缓冲区,帮助控制数据流,以避免过载。

    3. 数据共享 :在多个异步任务之间安全地共享数据。

    4. 异步工作流程 :用于实现复杂的异步工作流程,如管道(pipeline)模式,其中数据逐步通过一系列的处理阶段。

    5. 事件处理 :在事件驱动的应用中,用于管理和分发事件消息。

    5.3 总结

    本节内容对 asyncio 支持的同步操作的总结非常全面和详尽,介绍了 asyncio 中多种同步机制的作用和应用场景。从锁( Lock )的基本概念和用途,事件( Event )的信号通知机制,条件变量( Condition )的更复杂同步控制,到信号量( Semaphore BoundedSemaphore )的资源访问控制,以及队列( Queue , PriorityQueue , LifoQueue )在异步编程中的应用,这些内容详细描述了 asyncio 库中不同同步操作的特点和使用方法。每个部分都配以合适的示例和解释,有助于深入理解异步编程中同步机制的重要性和实际应用。

    6 回调函数的调度和执行

    即便 asyncio 提供了协程,但在有些情况下还是规避不了跟普通函数交互,即用不同函数写的回调函数。而且有些第三异步库的主要编程模式还是基于回调的,为了让这类框架要与 asyncio 兼容也促使 asyncio 提供了调度常规函数的方法。

    6.1 调度接口

    asyncio 调度回调函数有以下三种方式:

  • 方式1: EventLoop.call_soon(callback, *args)

  • 方式2: EventLoop.call_later(delay, callback, *args)

  • 方式3: EventLoop.call_at(when, callback, *args)

  • 方式1 call_soon 是立即调度 callback 函数到 EventLoop 中,将在下一次事件循环中被执行;方式2 call_later 是延迟 delay 秒后执行;方式3 call_at 是在 when 时刻执行。

    注意, asyncio 内部维护了一个单调递增时钟 EventLoop.time() ,其实就是 time.monotonic() , call_later delay 参数和 call_at when 参数都是以它为标准,而不是 time.time() 。最大的影响在于 call_at() when 参数 并不能 直接传递某个具体的时间戳。

    另外有个注意点,上述调度函数中的 *args 是传递给 callback 的参数,并不能接收关键字参数。如果要传递关键字参数时,则需使用 functools.partical() 方法封一层。

    三种方式的示例代码如下:

    上述代码中需特别留意之处是定时调用 call_at 函数,一定不能直接传递 datetime 对象的时间戳。其实 call_later 是基于 call_at 实现的,请读者自己思考下实现方案。

    6.2 周期性回调

    asyncio 本身并没有提供周期性执行回调函数的方法,不像 Tornado 提供了 PeriodicCallback 类。按 PEP 3156 的解释, asyncio 若是提供此方法是多此一举,如果是简单情况,那么不断地 call_later() 自身就好了,也可以在循环里 sleep() ,总之实现起来特别容易。如果是精度要求高的复杂情况,每种情境下边界要求又不一样,所以 asyncio 里提供通用方法也不合适。

    实际项目中往往要求很简单,精度也只需要控制在秒级,下面给出一段代码来实现类似 Tornaodo PeriodicCallback



    6.3 回调处理

    前面介绍了如何通过三种方式( call_soon() , call_later() , call_at() )去调度回调函数,下面我们介绍协程如何被用作回调函数,以及这些回调函数被调度后,又该如何控制?比如想取消怎么办?

    6.3.1 协程作回调函数

    asyncio 中,协程可以通过特定的方法被用作回调函数。这通常涉及到将协程封装成一个可以被事件循环调用的普通函数。这可以通过 asyncio.ensure_future loop.create_task 来实现。这些函数接受一个协程对象,并返回一个 Task 对象,这个 Task 对象可以被当作回调函数使用。

    示例代码如下:

    async def my_coroutine(): # 协程逻辑 ...loop = asyncio.get_event_loop()task = loop.create_task(my_coroutine())loop.call_soon(task)

    6.3.2 回调处理器

    在Python的 asyncio 模块中, Handle TimerHandle 是与事件循环调度回调相关的类:

    asyncio.Handle

    用途 :Handle是一个回调包装对象,由loop.call_soon()和loop.call_soon_threadsafe()返回。 方法

  • get_context(): 返回与句柄相关联的contextvars.Context对象。这是Python 3.12中的新功能。

  • cancel(): 取消回调。如果回调已经被取消或执行,此方法无效。

  • cancelled(): 如果回调已被取消,则返回True。这是Python 3.7中的新功能。

  • asyncio.TimerHandle

    用途 :TimerHandle是Handle的子类,由loop.call_later()和loop.call_at()返回,用于调度具有特定延迟或计划时间的回调。 方法

  • when(): 返回计划的回调时间(以浮点秒为单位)。时间是绝对时间戳,使用与loop.time()相同的时间参考。这是Python 3.7中的新功能。

  • 这俩用于管理和控制事件循环中的回调。例如,可能会使用call_later()来调度一些需要在将来某个时刻执行的操作,并使用返回的 TimerHandle 来管理这个操作(如取消或查询计划时间)。

    示例代码:

    import asyncioimport contextvars# 定义一个简单的回调函数def my_callback(name): print(f"Hello, {name}!")# 设置一个事件循环loop = asyncio.get_event_loop()# 调用 call_soon() 来计划回调的执行handle = loop.call_soon(my_callback, "Alice")# 检查回调是否被取消print(f"Cancelled: {handle.cancelled()}")# 取消回调handle.cancel()# 使用 call_later() 来延迟回调的执行timer_handle = loop.call_later(5, my_callback, "Bob")# 获取计划执行的时间print(f"Scheduled at: {timer_handle.when()}")# 取消计划的回调timer_handle.cancel()# 运行一次事件循环以使调度的回调有机会执行loop.run_until_complete(asyncio.sleep(0.1))loop.close()

    6.4 异步读文件

    在asyncio中,由于直接的文件IO操作通常不是非阻塞的,因此asyncio并没有提供直接的异步文件读取API。然而,我们可以通过使用线程池来实现异步读文件的效果。asyncio提供了loop.run_in_executor方法,该方法可以在指定的执行器(例如线程池)中运行给定的函数。

    代码示例如下:

    import asyncioasync def read_file_async(file_path): loop = asyncio.get_event_loop() with open(file_path, 'r') as file: return await loop.run_in_executor(None, file.read)# 使用协程读取文件async def main(): content = await read_file_async('path/to/your/file.txt') print(content)asyncio.run(main())

    上面只是个基本示例,仿佛看不出来这种异步读文件的用处是什么。首先,在广泛以异步编程为主导的应用中,提倡每一个操作都是一部的,避免阻塞其他协程,这对提高系统的整体效率和响应性非常重要,所以把文件操作也改为 非阻塞IO操作 。进一步,异步读取使得CPU可以在等待文件IO完成时执行其他任务,从而提高了 资源的利用率 。在处理大文件时,异步读取可以显著提高性能。在用户界面中,也可以即时响应,避免UI卡顿,提供更流畅的 用户体验

    7 asyncio和多线程和多进程搭配

    本节将学习为什么有了协程可以大幅提高程序性能的时候,还需要关心多线程和多进程?又如何结合使用?

    7.1 理解使用场景

    其一,对于I/O密集型或需要执行阻塞调用的任务,例如文件读写或者阻塞的网络通信,使用多线程可以避免阻塞整个事件循环,提高整体性能。

    其二,在处理CPU密集型任务时,例如大规模数学计算,使用多进程可以有效利用多核处理器,因为Python的全局解释器锁(GIL)限制了同一时间只有一个线程执行Python字节码。

    7.2 结合多线程使用

    在使用 asyncio 时,会遇到需要执行阻塞I/O操作或调用非异步兼容的库函数的情况。在这种情况下,将这些操作放在单独的线程中执行是一种有效的策略,以避免阻塞整个事件循环。 asyncio 提供了 loop.run_in_executor() 函数,可以线程池中执行阻塞调用。

    run_in_executor 的第一个参数为None则默认在线程池中调度,当需要自定义线程池时可以再手工创建线程池对象传入。

    示例代码:

    import asynciodef blocking_io(): # 执行阻塞I/O操作 print("start blocking I/O") time.sleep(1) print("end blocking I/O")async def main(): loop = asyncio.get_running_loop() # 在默认的线程池中运行阻塞I/O函数 await loop.run_in_executor(None, blocking_io) print("main continues")asyncio.run(main())

    结合多线程使用的注意事项

    需要确保线程安全尤其重要。多个线程可能会同时访问和修改共享资源,如全局变量,从而引发竞态条件。竞态条件可能导致数据不一致或程序行为异常。

    可以采取的措施如下:

    1. 使用锁 :利用 threading.Lock asyncio.Lock (取决于是在普通线程还是在异步协程内)来确保在任何时刻只有一个线程可以访问共享资源。在读操作远多于写操作的场景中,使用读写锁(如threading.RLock)可以提高效率,因为它允许多个读操作同时进行,只在写操作时才需要独占访问。

    2. 避免共享状态 :尽可能设计无状态的函数或使用局部变量(如threading.local),减少共享状态的使用,从而减少线程间的依赖和交互。

    3. 线程安全的数据结构 :使用线程安全的队列(如queue.Queue)或其他数据结构(collections.deque)来管理线程间的数据交换。

    7.3 结合多进程使用

    当您有CPU密集型任务时,如大量数学计算或数据处理,最好使用多进程来利用多核CPU。asyncio可以通过 concurrent.futures.ProcessPoolExecutor 与多进程结合使用,将计算密集型任务分配到单独的进程中执行。

    示例代码:

    import asyncioimport concurrent.futuresimport mathdef compute_some_thing(numbers): # 一些CPU密集型的计算 return [math.sqrt(number) for number in numbers]async def main(): numbers = range(10) loop = asyncio.get_running_loop() # 创建一个进程池执行器 with concurrent.futures.ProcessPoolExecutor() as pool: # 在进程池中运行计算密集型任务 result = await loop.run_in_executor(pool, compute_some_thing, numbers) print("Custom computation results:", result)asyncio.run(main())

    结合多进程使用的注意事项

    在结合多进程使用时,线程安全问题的影响会减少,因为每个进程都有自己独立的内存空间,进程间一般不会直接共享相同内存资源。因此,进程间通常不会直接引起竞态条件或数据不一致问题。但引入了新的挑战:

    1. 进程间通信 :使用适当的机制(如队列、管道)进行进程间通信。 multiprocessing 模块,其中的 Queue Pipe 等可以用于进程间的数据传递。

    2. 资源共享 :如果必须共享资源(如文件、数据库),确保每个进程都正确地管理其对这些资源的访问,比如该同步的需要使用锁。

    3. 同步和状态管理 :在多个进程需要协调工作时,例如在使用共享数据库或文件系统时,需要考虑合适的同步机制,比如使用文件锁或数据库事务来确保操作的原子性和一致性。

    4. 数据序列化 :在进程间传递数据时,需要进行序列化和反序列化,这可能会引入性能瓶颈,尤其是大量数据时。因此,需要优化序列化过程或减少需要传输的数据量,比如使用比JSON、XML更紧凑许多的protobuff协议。

    8 常用的异步编程模式和最佳实践

    8.1 常用编程模式

  • 生产者-消费者模式 在异步编程中,此模式非常常见,尤其适用于数据的生产和消费速率不一致的场景。它可以有效管理任务队列,提高数据处理效率。

  • 发布-订阅模式 这是事件驱动编程中的经典模式,适用于多个消费者对事件源的响应。在异步编程中,此模式有助于解耦事件的发布者和订阅者。

  • 异步回调模式 前面第6节提到过的内容,尽管Python的async/await语法减少了回调的需求,但在与一些旧的异步库或API交互时,异步回调仍然很重要。

  • 在这里有些同学可能会问:「生产者-消费者模式」和「发布-订阅模式」有什么区别?好像都会用到MQ?

    确实在现代的服务端技术中,这两种模式因为高级消息队列的功能丰富已经没有了明显的界限。但在语义明确的特定的上下文中,我们却不会混用这两个术语。生产者-消费者模式更侧重于数据流的处理,数据驱动,队列用于存储待处理的数据项,消费者必须处理自己接受到的数据;发布-订阅模式侧重于消息或事件的分发,事件/消息驱动,队列用作消息代理或事件通道,订阅者只关心自己感兴趣的主题。

    8.2 最佳实践和设计建议

    1. 模块化设计 :将不同的功能分割成独立的模块或协程,确保代码清晰和可维护。

    2. 每个协程很小 :尽量满足单一职责原则,让每个协程(或被封装的异步任务)必须尽快返回。

    3. 避免阻塞调用 :确保不在异步代码中调用阻塞操作,以避免阻塞事件循环。

    4. I/O密集型上线程池 :注意确保线程安全。详本文见第7节。

    5. CPU密集型上进程池 :注意进程间通信、资源共享和性能。详本文见第7节。

    6. 利用async/await :尽可能使用async和await关键字,以增强代码的可读性和简洁性,不再推荐其他协程方案。

    7. 异步上下文管理 :使用异步上下文管理器来处理资源的初始化和清理(with语句)。

    8. 正确管理任务 :合理使用 Task 来管理协程,并注意任务的取消和异常处理。

    9. 避免死锁 :注意异步同步原语(如锁和事件)的使用,以防止死锁。

    10. 异常和超时处理正确 :确保异常被捕获并处理,避免未处理的异常导致程序崩溃,确保程序在合适的时间已执行完毕和正确退出。

    9 Python3.6到3.12中asyncio的变化

    由于本文的绝大部分内容写成于4年多以前(这也许打破了技术圈放鸽子的世界记录),为了打消各位看官们心中的疑虑(文中的内容是否过时了?)特此整理了3.6到3.12期间asyncio的变化。我们的内容时不过时的,只是性能有提升、操作接口更简单、安全性更好了。在开发中最大的变化就是某些函数的调用,要从 loop.function() 改为 asyncio.function ,诸如此类。

    1. Python 3.6:

  • asyncio模块被正式纳入稳定API,不再是临时状态。它接收了新特性、显著的可用性和性能改进,以及大量的错误修复。

  • Python 3.7:

  • asyncio模块继续获得新特性和显著的可用性和性能改进。

  • asyncio和decimal模块开始支持并使用上下文变量(新的contextvars模块引入了对上下文变量的支持)。特别是,活动的小数上下文现在存储在一个上下文变量中,这允许异步代码中的小数操作使用正确的上下文。

  • Python 3.8

  • asyncio.run()从临时API变为稳定API,用于执行协程并在自动管理事件循环时返回结果。

  • 在Windows上,默认事件循环变为 ProactorEventLoop。

  • ProactorEventLoop 现在支持UDP和可以被 KeyboardInterrupt 打断。

  • 添加了 asyncio.Task.get_coro() 和任务命名功能。

  • 引入了支持 Happy Eyeballs 算法的 asyncio.loop.create_connection()。

  • Python 3.9

  • 移除了 reuse_address 参数的支持,因为在UDP中有重大安全隐患。

  • 新增了 shutdown_default_executor() 协程,用于关闭默认执行者。

  • 新增了 PidfdChildWatcher,一个特定于Linux的子进程监视器。

  • 引入了 asyncio.to_thread() 协程,用于在单独线程中运行IO绑定函数。

  • 更新了 asyncio.wait_for() 的取消行为。

  • 对 ssl.SSLSocket 的不兼容方法调用现在会引发 TypeError。

  • Python 3.10

  • 添加了缺失的 connect_accepted_socket() 方法。

  • 标记了asynchat, asyncore, smtpd这些模块为已弃用,并在这些模块中添加了导入时的DeprecationWarning。

  • Python 3.11

  • 新增了TaskGroup类,这是一个异步上下文管理器,它可以在退出时等待一组任务全部完成。对于新代码,推荐使用它而不是直接使用create_task()和gather()。

  • 新增了timeout(),这是一个用于设置异步操作超时的异步上下文管理器。对于新代码,推荐使用它而不是直接使用wait_for()。

  • 新增了Runner类,它公开了run()使用的机制。

  • 在asyncio库的同步原语中新增了Barrier类和相关的BrokenBarrierError异常。

  • 在asyncio.loop.create_connection()中新增了all_errors关键字参数,允许将多个连接错误作为ExceptionGroup抛出。

  • 为现有基于流的连接升级到TLS新增了asyncio.StreamWriter.start_tls()方法。

  • 向事件循环添加了原始数据报套接字函数:sock_sendto()、sock_recvfrom()和sock_recvfrom_into(),这些在SelectorEventLoop和ProactorEventLoop中都有实现。

  • 为Task添加了cancelling()和uncancel()方法,主要用于内部使用,特别是由TaskGroup使用。

  • Python 3.12

  • 提高了写入套接字的性能。asyncio现在避免了向套接字写入时不必要的复制,并在平台支持的情况下使用sendmsg()。

  • 添加了asyncio.eager_task_factory()和asyncio.create_eager_task_factory()函数,允许事件循环选择积极任务执行,使某些用例的速度提高2到5倍。

  • 在Linux上,如果os.pidfd_open()可用且功能正常,asyncio默认使用asyncio.PidfdChildWatcher而不是asyncio.ThreadedChildWatcher。

  • 事件循环现在为每个平台使用最佳可用的子进程观察器(如果支持则为asyncio.PidfdChildWatcher,否则为asyncio.ThreadedChildWatcher),因此不建议手动配置子进程观察器。

  • 在asyncio.run()中添加了loop_factory参数,允许指定自定义事件循环工厂。

  • 添加了asyncio.current_task()的C实现,速度提高了4到6倍。

  • asyncio.iscoroutine()现在对生成器返回False,因为asyncio不支持基于生成器的遗留协程。

  • asyncio.wait()和asyncio.as_completed()现在接受生成任务的生成器。

  • 从Python 3.6到3.12,asyncio的主要发展方向是增强其稳定性、可用性和性能,同时不断引入新特性。重点包括改善事件循环机制、增强异步任务管理、支持更灵活的上下文变量处理,以及提升网络编程性能。这些改进使asyncio更加高效和易用。

    10 总结

    本文精华内容主要包括Python asyncio 库的基础概念、使用方法和最佳实践。 asyncio 是Python用于编写异步并发代码的库,使用 async await 语法。主要学习点包括:

  • 原生协程语法 :使用 async def 定义原生协程函数, await 用于等待协程执行完成。

  • 协程的调用 :通过事件循环和任务( Task )调用和管理协程。

  • Future和Task Future 作为异步操作的结果占位符, Task 管理和调度协程。

  • 协程的调度与执行 :单个协程的调度、同时执行多个协程,以及相关错误处理。

  • 同步操作 :使用锁( Lock )、事件( Event )、条件变量( Condition )、信号量( Semaphore )等同步机制。

  • 队列 :使用异步队列( Queue , PriorityQueue , LifoQueue )在协程之间传递数据。

  • 回调函数的调度和执行 :通过事件循环调度回调函数,处理回调函数。

  • asyncio与多线程/多进程结合 :结合多线程处理阻塞I/O操作,多进程处理CPU密集型任务。

  • 常用异步编程模式和最佳实践 :包括生产者-消费者、发布-订阅等模式,以及异步编程的最佳实践。

  • Python3.6至3.12中asyncio的变化 asyncio 库的更新和改进。

  • 本文帮助读者理解和掌握Python异步编程的关键概念、技术和最佳实践,特别是对于需要处理I/O密集型或并发任务的应用程序开发者。

    *END*

    这里是 驹说码事,分享程序猿的码路历程

    感谢您的关注