Python Asyncio中Coroutines,Tasks,Future可等待对象的关系及作用

目录
  • 前记
  • 1.Asyncio的入口
  • 2.两种Coroutine调用方法的区别
  • 3.Task与Future
  • 3.1.Future
  • 3.2.Task
  • 4.总结

前记

上一遍文章《Python中Async语法协程的实现》介绍了Python是如何以生成器来实现协程的以及Python Asyncio通过Future和Task的封装来实现协程的调度,而在Python Asyncio之中Coroutines, Tasks和Future都属于可等待对象,在使用的Asyncio的过程中,经常涉及到三者的转换和调度,开发者容易在概念和作用上犯迷糊,本文主要阐述的是三者之间的关系以及他们的作用。

1.Asyncio的入口

协程是线程中的一种特例,协程的入口和切换都是靠事件循环来调度的,在新版的Python中协程的入口是Asyncio.run,当程序运行到Asyncio.run后,可以简单的理解为程序由线程模式切换为协程模式(只是方便理解,对于计算机而言,并没有这样区分),

以下是一个最小的协程例子代码:

import asyncio
async def main():
    await asyncio.sleep(0)
asyncio.run(main())

在这段代码中,main函数和asyncio.sleep都属于Coroutine,main是通过asyncio.run进行调用的,接下来程序也进入一个协程模式,asyncio.run的核心调用是Runner.run,它的代码如下:

class Runner:
    ...
    def run(self, coro, *, context=None):
        """Run a coroutine inside the embedded event loop."""
        # 省略代码
        ...

        # 把coroutine转为task
        task = self._loop.create_task(coro, context=context)

        # 省略代码
        ...

        try:
            # 如果传入的是Future或者coroutine,也会专为task
            return self._loop.run_until_complete(task)
        except exceptions.CancelledError:

        # 省略代码
        ...

这段代码中删去了部分其它功能和初始化的代码,可以看到这段函数的主要功能是通过loop.create_task方法把一个Coroutine对象转为一个Task对象,然后通过loop.run_until_complete等待这个Task运行结束。

可以看到,Asycnio并不会直接去调度Coroutine,而是把它转为Task再进行调度,这是因为在Asyncio中事件循环的最小调度对象就是Task。不过在Asyncio中并不是所有的Coroutine的调用都会先被转为Task对象再等待,比如示例代码中的asyncio.sleep,由于它是在main函数中直接await的,所以它不会被进行转换,而是直接等待,通过调用工具分析展示的图如下:

在这个图示中,从main函数到asyncio.sleep函数中没有明显的loop.create_task等把Coroutine转为Task调用,这里之所以不用进行转换的原因不是做了一些特殊优化,而是本因如此, 这个await asyncio.sleep函数实际上还是会被main这个Coroutine转换成的Task继续调度到。

2.两种Coroutine调用方法的区别

在了解Task的调度原理之前,还是先回到最初的调用示例,看看直接用Task调用和直接用Coroutine调用的区别是什么。

如下代码,我们显示的执行一个Coroutine转为Task的操作再等待,那么代码会变成下面这样:

import asyncio
async def main():
    await asyncio.create_task(asyncio.sleep(0))
asyncio.run(main())

这样的代码看起来跟最初的调用示例很像,没啥区别,但是如果进行一些改变,比如增加一些休眠时间和Coroutine的调用,就能看出Task对象的作用了,现在编写两份文件,

他们的代码如下:

# demo_coro.py
import asyncio
import time
async def main():
    await asyncio.sleep(1)
    await asyncio.sleep(2)
s_t = time.time()
asyncio.run(main())
print(time.time() - s_t)
# // Output: 3.0028765201568604
# demo_task.py
import asyncio
import time
async def main():
    task_1 = asyncio.create_task(asyncio.sleep(1))
    task_2 = asyncio.create_task(asyncio.sleep(2))
    await task_1
    await task_2

s_t = time.time()
asyncio.run(main())
print(time.time() - s_t)
# // Output: 2.0027475357055664

其中demo_coro.py进行了两次await调用,程序的运行总时长为3秒,而demo_task.py则是先把两个Coroutine对象转为Task对象,然后再进行两次await调用,程序的运行总时长为2秒。可以发现,demo_task.py的运行时长近似于其中运行最久的Task对象时长,而demo_coro.py的运行时长则是近似于两个Coroutine对象的总运行时长。

之所以会是这样的结果,是因为直接awaitCoroutine对象时,这段程序会一直等待,直到Coroutine对象执行完毕再继续往下走,而Task对象的不同之处就是在创建的那一刻,就已经把自己注册到事件循环之中等待被安排运行了,然后返回一个task对象供开发者等待,由于asyncio.sleep是一个纯IO类型的调用,所以在这个程序中,两个asyncio.sleepCoroutine被转为Task从而实现了并发调用。

3.Task与Future

上述的代码之所以通过Task能实现并发调用,是因为Task中出现了一些与事件循环交互的函数,正是这些函数架起了Coroutine并发调用的可能, 不过Task是Future的一个子对象,所以在了解Task之前,需要先了解Future。

3.1.Future

与Coroutine只有让步和接收结果不同的是Future除了让步和接收结果功能外,它还是一个只会被动进行事件调用且带有状态的容器,它在初始化时就是Pending状态,这时可以被取消,被设置结果和设置异常。而在被设定对应的操作后,Future会被转化到一个不可逆的对应状态,并通过loop.call_sonn来调用所有注册到本身上的回调函数,同时它带有__iter____await__方法使其可以被awaityield from调用,它的主要代码如下:

class Future:
    ...
    def set_result(self, result):
        """设置结果,并安排下一个调用"""
        if self._state != _PENDING:
            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
        self._result = result
        self._state = _FINISHED
        self.__schedule_callbacks()
    def set_exception(self, exception):
        """设置异常,并安排下一个调用"""
        if self._state != _PENDING:
            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
        if isinstance(exception, type):
            exception = exception()
        if type(exception) is StopIteration:
            raise TypeError("StopIteration interacts badly with generators "
                            "and cannot be raised into a Future")
        self._exception = exception
        self._state = _FINISHED
        self.__schedule_callbacks()
        self.__log_traceback = True
    def __await__(self):
        """设置为blocking,并接受await或者yield from调用"""
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        if not self.done():
            raise RuntimeError("await wasn't used with future")
        return self.result()  # May raise too.

    __iter__ = __await__  # make compatible with 'yield from'.

单看这段代码是很难理解为什么下面这个future被调用set_result后就能继续往下走:

async def demo(future: asyncio.Future):
    await future
    print("aha")

这是因为Future跟Coroutine一样,没有主动调度的能力,只能通过Task和事件循环联手被调度。

3.2.Task

Task是Future的子类,除了继承了Future的所有方法,它还多了两个重要的方法__step__wakeup,通过这两个方法赋予了Task调度能力,这是Coroutine和Future没有的,Task的涉及到调度的主要代码如下(说明见注释):

class Task(futures._PyFuture):  # Inherit Python Task implementation
                                # from a Python Future implementation.
    _log_destroy_pending = True
    def __init__(self, coro, *, loop=None, name=None, context=None):
        super().__init__(loop=loop)
        # 省略部分初始化代码
        ...

        # 托管的coroutine
        self._coro = coro
        if context is None:
            self._context = contextvars.copy_context()
        else:
            self._context = context
        # 通过loop.call_sonn,在Task初始化后马上就通知事件循环在下次有空的时候执行自己的__step函数
        self._loop.call_soon(self.__step, context=self._context)
    def __step(self, exc=None):
        coro = self._coro
        # 方便asyncio自省
        _enter_task(self._loop, self)
        # Call either coro.throw(exc) or coro.send(None).
        try:
            if exc is None:
                # 通过send预激托管的coroutine
                # 这时候只会得到coroutine yield回来的数据或者收到一个StopIteration的异常
                # 对于Future或者Task返回的是Self
                result = coro.send(None)
            else:
                # 发送异常给coroutine
                result = coro.throw(exc)
        except StopIteration as exc:
            # StopIteration代表Coroutine运行完毕
            if self._must_cancel:
                # coroutine在停止之前被执行了取消操作,则需要显示的执行取消操作
                self._must_cancel = False
                super().cancel(msg=self._cancel_message)
            else:
                # 把运行完毕的值发送到结果值中
                super().set_result(exc.value)
        # 省略其它异常封装
        ...
        else:
            # 如果没有异常抛出
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # 通过Future代码可以判断,如果带有_asyncio_future_blocking属性,则代表当前result是Future或者是Task
                # 意味着这个Task里面裹着另外一个的Future或者Task
                # 省略Future判断
                ...

                if blocking:
                    # 代表这这个Future或者Task处于卡住的状态,
                    # 此时的Task放弃了自己对事件循环的控制权,等待这个卡住的Future或者Task执行完成时唤醒一下自己
                    result._asyncio_future_blocking = False
                    result.add_done_callback(self.__wakeup, context=self._context)
                    self._fut_waiter = result
                    if self._must_cancel:
                        if self._fut_waiter.cancel(msg=self._cancel_message):
                            self._must_cancel = False
                else:
                    # 不能被await两次
                    new_exc = RuntimeError(
                        f'yield was used instead of yield from '
                        f'in task {self!r} with {result!r}')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)

            elif result is None:
                # 放弃了对事件循环的控制权,代表自己托管的coroutine可能有个coroutine在运行,接下来会把控制权交给他和事件循环
                # 当前的coroutine里面即使没有Future或者Task,但是子Future可能有
                self._loop.call_soon(self.__step, context=self._context)
        finally:
            _leave_task(self._loop, self)
            self = None  # Needed to break cycles when an exception occurs.

    def __wakeup(self, future):
        # 其它Task和Future完成后会调用到该函数,接下来进行一些处理
        try:
            # 回收Future的状态,如果Future发生了异常,则把异常传回给自己
            future.result()
        except BaseException as exc:
            # This may also be a cancellation.
            self.__step(exc)
        else:
            # Task并不需要自己托管的Future的结果值,而且如下注释,这样能使调度变得更快
            # Don't pass the value of `future.result()` explicitly,
            # as `Future.__iter__` and `Future.__await__` don't need it.
            # If we call `_step(value, None)` instead of `_step()`,
            # Python eval loop would use `.send(value)` method call,
            # instead of `__next__()`, which is slower for futures
            # that return non-generator iterators from their `__iter__`.
            self.__step()
        self = None  # Needed to break cycles when an exception occurs.

这份源码的Task对象中的__setp方法比较长,经过精简后可以发现他主要做的工作有三个:

  • 1.通过send或者throw来驱动Coroutine进行下一步
  • 2.通过给被自己托管的Future或者Task添加回调来获得完成的通知并重新获取控制权
  • 3.通过loop.call_soon来让步,把控制权交给事件循环

单通过源码分析可能很难明白, 以下是以两种Coroutine的代码为例子,简单的阐述Task与事件循环调度的过程,首先是demo_coro,这个例子中只有一个Task:

# demo_coro.py
import asyncio
import time
async def main():
    await asyncio.sleep(1)
    await asyncio.sleep(2)
s_t = time.time()
asyncio.run(main())
print(time.time() - s_t)
# // Output: 3.0028765201568604

这个例子中第一步是把main转为一个Task,然后调用到了对应的__step方法,这时候__step方法会会调用main()这个Coroutine的send(None)方法。
之后整个程序的逻辑会直接转到main函数中的await asyncio.sleep(1)这个Coroutine中,await asyncio.sleep(1)会先生成一个Future对象,并通过loop.call_at告诉事件循环在1秒后激活这个Future对象,然后把对象返回。这时候逻辑会重新回到Task的__step方法中,__step发现send调用得到的是一个Future对象,所以就在这个Future添加一个回调,让Future完成的时候来激活自己,然后放弃了对事件循环的控制权。接着就是事件循环在一秒后激活了这个Future对象,这时程序逻辑就会执行到Future的回调,也就是Task的__wakeup方法,于是Task的__step又被调用到了,而这次遇到的是后面的await asyncio.sleep(2),于是又走了一遍上面的流程。当两个asyncio.sleep都执行完成后,Task的__step方法里在对Coroutine发送一个send(None)后就捕获到了StopIteration异常,这时候Task就会通过set_result设置结果,并结束自己的调度流程。

可以看到demo_core.py中只有一个Task在负责和事件循环一起调度,事件循环的开始一定是一个Task,并通过Task来调起一个Coroutine,通过__step方法把后续的Future,Task,Coroutine都当成一条链来运行,而demo_task.py则不一样了,它有两个Task,代码如下:

# demo_task.py
import asyncio
import time
async def main():
    task_1 = asyncio.create_task(asyncio.sleep(1))
    task_2 = asyncio.create_task(asyncio.sleep(2))
    await task_1
    await task_2
s_t = time.time()
asyncio.run(main())
print(time.time() - s_t)
# // Output: 2.0027475357055664

这个例子中第一步还是跟demo_coro一样,但跳转到main函数后就开始有区别了,首先在这函数中创建了task1和task2两个Task,他们分别都会通过__step方法中的send激活对应的asyncio.sleepCoroutine,然后等待对应的Future来通知自己已经完成了。而对于创建了这两个Task的main Task来说,通过main函数的awati task_1await task_2来获取到他们的“控制权“。首先是通过await task_1语句,main Task中的__step方法里在调用send后得到的是task_1对应的Future,这时候就可以为这个Future添加一个回调,让他完成时通知自己,自己再走下一步,对于task_2也是如此。 直到最后两个task都执行完成,main Task也捕获到了StopIteration异常,通过set_result设置结果,并结束自己的调度流程。

可以看到demo_task.pydemo_coro.py有个明显的区别在于main Task在运行的生命周期中创建了两个Task,并通过await托管了两个Task,同时两个Task又能实现两个协程的并发,所以可以发现事件循环运行期间,当前协程的并发数永远小于事件循环中注册的Task数量。此外,如果在main Task中如果没有显式的进行await,那么子Task就会逃逸,不受main Task管理,如下:

# demo_task.py
import asyncio
import time
def mutli_task():
    task_1 = asyncio.create_task(asyncio.sleep(1))
    task_2 = asyncio.create_task(asyncio.sleep(2))
async def main():
    mutli_task()
    await asyncio.sleep(1.5)
s_t = time.time()
asyncio.run(main())
print(time.time() - s_t)
# // Output: 1.5027475357055664 

在这段代码中,main Task在执行到mutli_task时,会创建出两个task,但是在__step中的coro.send(None)调用得到的结果却是await asyncio.sleep(1.5)返回的Future,所以main Task只能调用到这个Future的add_don_callback来装载自己的__wakeup方法,最终导致到main Task只能托管到await asyncio.sleep(1.5)的Future,而mutli_task创建的task则逃逸了,成为另一条链的顶点Task。

不过这个程序的事件循环只管理到了main Task所以事件循环会一直运行,直到main Task运行结束的时候才退出,这时程序会跟着一起退出,所以程序的运行时间只有1.5秒左右。
此外由于另外的Task也是注册到这个事件循环上面,所以事件循环会帮忙把task_1执行完毕,而task_2定义的休眠时间是2秒,程序退出之前事件循环会发现有个Task尚未执行完毕,于是会对这个Task进行清理并打印一条警报。

4.总结

在深入了Task,Future的源码了解后,了解了Task和Future在Asyncio的作用,同时也发现Task和Future都跟loop有一定的耦合,而loop也可以通过一定的方法来创建Task和Future,所以如果要真正的理解到Asyncio的调度原理,还需要更进入一步,通过Asyncio的源码来了解整个Asyncio的设计。

到此这篇关于Python Asyncio中Coroutines,Tasks,Future可等待对象的关系及作用的文章就介绍到这了,更多相关Python Asyncio 内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python协程与 asyncio 库详情

    目录 1.asyncio 异步 I/O 库 异步函数的定义 事件循环 event_loop 创建 task 回调返回值 循环事件关闭 2.本节爬虫项目 前言: python 中协程概念是从 3.4 版本增加的,但 3.4 版本采用是生成器实现,为了将协程和生成器的使用场景进行区分,使语义更加明确,在 python 3.5 中增加了 async 和 await 关键字,用于定义原生协程. 1.asyncio 异步 I/O 库 python 中的 asyncio 库提供了管理事件.协程.任务和线程的

  • Python asyncio的一个坑

    我们先从一个常见的Python编程错误开始说起,我已经见过非常多的程序员犯过这种错误了: def do_not_raise(user_defined_logic): try: user_defined_logic() except: logger.warning("User defined logic raises an exception", exc_info=True) # ignore 这段代码的错误之处在哪里呢? 我们从Python的异常结构开始说起.Python中的异常基类有

  • Python使用signal定时结束AsyncIOScheduler任务的问题

    在使用aiohttp结合apscheduler的AsyncIOScheduler模拟定点并发的时候遇到两个问题 在调度器scheduler.start()后,程序直接退出(在Jupiter中任务可以正常启动)如何在指定时间调用scheduler.shutdown()? (因为程序直接退出了) 原调试代码如下: from datetime import datetime, timedelta import aiohttp from apscheduler.schedulers.asyncio im

  • 浅谈Python协程asyncio

    一.协程 官方描述; 协程是子例程的更一般形式. 子例程可以在某一点进入并在另一点退出. 协程则可以在许多不同的点上进入.退出和恢复. 它们可通过 async def 语句来实现. 参见 PEP 492. 协程不是计算机内部提供的,不像进程.线程,由电脑本身提供,它是由程序员人为创造的, 实现函数异步执行. 协程(Coroutine),也可以被称为微线程,是一种用户太内的上下文切换技术,其实就是通过一个线程实现代码块相互切换执行.看上去像子程序,但执行过程中,在子程序内部可中断,然后转而执行别的

  • Python Asyncio中Coroutines,Tasks,Future可等待对象的关系及作用

    目录 前记 1.Asyncio的入口 2.两种Coroutine调用方法的区别 3.Task与Future 3.1.Future 3.2.Task 4.总结 前记 上一遍文章<Python中Async语法协程的实现>介绍了Python是如何以生成器来实现协程的以及Python Asyncio通过Future和Task的封装来实现协程的调度,而在Python Asyncio之中Coroutines, Tasks和Future都属于可等待对象,在使用的Asyncio的过程中,经常涉及到三者的转换和

  • Python Asyncio调度原理详情

    目录 前言 1.基本介绍 2.EventLoop的调度实现 3.网络IO事件的处理 前言 在文章<Python Asyncio中Coroutines,Tasks,Future可等待对象的关系及作用>中介绍了Python的可等待对象作用,特别是Task对象在启动的时候可以自我驱动,但是一个Task对象只能驱动一条执行链,如果要多条链执行(并发),还是需要EventLoop来安排驱动,接下来将通过Python.Asyncio库的源码来了解EventLoop是如何运作的. 1.基本介绍 Python

  • Python Asyncio库之asyncio.task常用函数详解

    目录 前记 0.基础 1.休眠--asyncio.sleep 2.屏蔽取消--asyncio.shield 3.超时--asyncio.wait_for 4.简单的等待--wait 5.迭代可等待对象的完成--asyncio.as_completed 前记 Asyncio在经过一段时间的发展以及获取Curio等第三方库的经验来提供更多的功能,目前高级功能也基本完善,但是相对于其他语言,Python的Asyncio高级功能还是不够的,但好在Asyncio的低级API也比较完善,开发者可以通过参考A

  • Python 异步之在 Asyncio中如何运行阻塞任务详解

    目录 正文 1. 阻塞任务 2. 如何运行阻塞任务 3. 实例 正文 阻塞任务是阻止当前线程继续进行的任务. 如果在 asyncio 程序中执行阻塞任务,它会停止整个事件循环,从而阻止任何其他协程继续进行. 我们可以通过 asyncio.to_thread() 和 loop.run_in_executor() 函数在 asyncio 程序中异步运行阻塞调用. 1. 阻塞任务 asyncio的重点是异步编程和非阻塞IO.然而,我们经常需要在 asyncio 应用程序中执行阻塞函数调用. 这可能有很

  • Python  Asyncio模块实现的生产消费者模型的方法

    asyncio的关键字说明 event_loop事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数 coroutine协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象,协程对象需要注册到事件循环,由事件循环调用. task任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态 future:代表将来执行或没有执行的任务结果.它和task上没有本质上的区

  • 在Python 3中缓存Exception对象会造成什么后果?

    目录 ​​Python 3有一个不太容易被注意到的改进:异常对象现在有了一个新的属性__traceback__.这个属性自动保存了traceback列表,当每次这个异常被重新raise出来的时候,会自动在__traceback__中追加一条记录.这个功能对于异步编程来说非常有帮助:在另一个线程或者协程中抛出的异常,被捕获.传输到其他地方,再重新抛出来的时候,不仅最初的traceback得以保留,每次被重新抛出的记录也都会保留下来,这样异常的traceback就可以提供很细致的信息. 说完了好处,

  • Python asyncio常用函数使用详解

    目录 协程的定义 协程的运行 多个协程运行 关于loop.close() 回调 事件循环 协程的定义 需要使用 async def 语句 协程可以做哪些事: 1.等待一个future结果 2.等待另一个协程(产生一个结果或引发一个异常) 3.产生一个结果给正在等它的协程 4.引发一个异常给正在等它的协程 协程的运行 调用协程函数,协程不会开始运行,只是返回一个协程对象 要让协程对象运行有两种方式: 1.在另一个已经运行的协程中用await等待它 2.通过ensure_future函数计划它的执行

  • Python Asyncio 库之同步原语常用函数详解

    目录 前记 0.基础 1.Lock 2.Event 4.Condition 5.Semaphore 前记 Asyncio的同步原语可以简化我们编写资源竞争的代码和规避资源竞争导致的Bug的出现. 但是由于协程的特性,在大部分业务代码中并不需要去考虑资源竞争的出现,导致Asyncio同步原语被使用的频率比较低,但是如果想基于Asyncio编写框架则需要学习同步原语的使用. 0.基础 同步原语都是适用于某些条件下对某个资源的争夺,在代码中大部分的资源都是属于一个代码块,而Python对于代码块的管理

  • python asyncio 协程库的使用

    asyncio 是 python 力推多年的携程库,与其 线程库 相得益彰,更轻量,并且协程可以访问同一进程中的变量,不需要进程间通信来传递数据,所以使用起来非常顺手. asyncio 官方文档写的非常简练和有效,半小时内可以学习和测试完,下面为我的一段 HelloWrold,感觉可以更快速的帮你认识 协程 . 定义协程 import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay)

随机推荐