详细解读tornado协程(coroutine)原理

tornado中的协程是如何工作的

协程定义

Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.。 —— [ 维基百科 ]

我们在平常编程中,更习惯使用的是子例程(subroutine),通俗的叫法是函数,或者过程。子例程,往往只有一个入口(函数调用,实参通过传给形参开始执行),一个出口(函数return,执行完毕,或者引发异常,将控制权转移给调用者)。但协程是子例程基础上,一种更加宽泛定义的计算机程序模块(子例程可以看做协程的特例),它可以有多个入口点,允许从一个入口点,执行到下一个入口点之前暂停,保存执行状态,等到合适的时机恢复执行状态,从下一个入口点重新开始执行,这也是协程应该具有的能力。

定义

协程代码块

一个入口点和下一个入口点(或者退出点)中的代码。

协程模块

由n个入口点代码,和n个协程代码块组成。第一个入口点通常是一个函 数入口点。其组织形式如:函数入口点->协程代码块->入口点->协程代码块…,入口点和代码块相间。

线性模块

一个同步函数的函数体是线性执行的。也就是说一个模块中的每一行代码,相继执行,一个模块在执行中,如果还没有执行完毕,不会去执行其他模块的代码。称这样的代码模块为线性模块。

一个协程模块,如果只含有单一入口点和单一协程代码块(假设这个协程代码块全是同步代码),当然这个协程模块是一个线性执行模块,但是如果含有多个入口点和多个协程代码块,那么就不是一个线性模块。那么执行一个协程模块过程实际是分散的(不同的时间段,执行不同的协程代码块,协程代码块的执行时间段,彼此不相交),但也是顺序的(后一个协程代码块在前一个协程代码块执行结束后才执行)。两个属于同一协程模块的相继协程代码块执行的中间时间间隙,可能有很多其他协程模块的协程代码片段在执行。

生成器和yield语义

谈到协程,必须要说说python语义中的生成器(generator)。

在pep255中提到了”simple generator”和”yield语句”(此时还不是”yield表达式”)的实现。一个basic idea,提供一种函数,能够返回中间结果给调用者,然后维护函数的局部状态,以便函数当离开后,也能恢复执行。

prep255中举了一个简单的例子,生成斐波那契数列:

 def fib():
   a, b = 0, 1
   while 1:
    yield b
    a, b = b, a+b

a,b初始化为0,1。当yield b被执行,1被返回给调用者。当fib恢复执行,a,变成了1,b也是1,然后将1返回给调用者,如此循环。generator是一种非常自然的编程方式,因为对于fib来说,它的功能不变,都还是不断生成下一个斐波那契数。而对于fib的调用者来说,fib像一个列表的迭代器,不断迭代,可以获取下一个斐波那契数。

def caller():
  for num in fib():
    print num

生成器是一个含有yield表达式的函数,此时该函数叫生成器。一个生成器永远是异步的,即使生成器模块中含有阻塞代码。因为调用一个生成器,生成器的参数会绑定到生成器,结果返回是一个生成器对象,它的类型是types.GeneratorType,不会去执行生成器主模块中的代码。

每次调用一个GeneratorType对象的next方法,生成器函数执行到下一个yield语句或者,或者碰到一个return语句,或者执行到生成器函数结束。

在pep342中,对Generator进一步加强,增加了GeneratorType的send方法,和yield表达式语义。yield表达式,可以作为等号右边的表达式。如果对Generator调用send(None)方法,生成器函数会从开始一直执行到yield表达式。那么下一次对Generator调用send(argument),Generator恢复执行。那么可以在生成器函数体内获得这个argument,这个argument将会作为yield表达式的返回值。

从上面可以看到,Generator已经具备协程的一些能力。如:能够暂停执行,保存状态;能够恢复执行;能够异步执行。

但是此时Generator还不是一个协程。一个真正的协程能够控制代码什么时候继续执行。而一个Generator执行遇到一个yield表达式 或者语句,会将执行控制权转移给调用者。

However, it is still possible to implement coroutines on top of a generator facility, with the aid of a top-level dispatcher routine (a trampoline, essentially) that passes control explicitly to child generators identified by tokens passed back from the generators。 —— [ 维基百科 ]

在维基百科中提到,可以实现一个顶级的调度子例程,将执行控制权转移回Generator,从而让它继续执行。在tornado中,ioLoop就是这样的顶级调度子例程,每个协程模块通过,函数装饰器coroutine和ioLoop进行通信,从而ioLoop可以在协程模块执行暂停后,在合适的时机重新调度协程模块执行。

不过,接下来还不能介绍coroutine和ioLoop,在介绍这两者之前,先得明白tornado中在协程环境中一个非常重要的类Future.

Future类

Future类位于tornado源码的concurrent模块中。Future类的完整代码,请查看tornado的源码。在这里截取一部分代码作为分析之用

class Future(object):
  def done(self):
    return self._done

  def result(self, timeout=None):
    self._clear_tb_log()
    if self._result is not None:
      return self._result
    if self._exc_info is not None:
      raise_exc_info(self._exc_info)
    self._check_done()
    return self._result

  def add_done_callback(self, fn):
    if self._done:
      fn(self)
    else:
      self._callbacks.append(fn)

  def set_result(self, result):
    self._result = result
    self._set_done()

  def _set_done(self):
    self._done = True
    for cb in self._callbacks:
      try:
        cb(self)
      except Exception:
        app_log.exception('exception calling callback %r for %r',
                 cb, self)
    self._callbacks = None

Future类重要成员函数:

def done(self):

Future的_result成员是否被设置

def result(self, timeout=None):

获取Future对象的结果

def add_done_callback(self, fn):

添加一个回调函数fn给Future对象。如果这个Future对象已经done,则直接执行fn,否则将fn加入到Future类的一个成员列表中保存。

def _set_done(self):

一个内部函数,主要是遍历列表,逐个调用列表中的callback函数,也就是前面add_done_calback加如来的。

def set_result(self, result):

给Future对象设置result,并且调用_set_done。也就是说,当Future对象获得result后,所有add_done_callback加入的回调函数就会执行。

Future封装了异步操作的结果。实际是它类似于在网页html前端中,图片异步加载的占位符,但加载后最终也是一个完整的图片。Future也是同样用处,tornado使用它,最终希望它被set_result,并且调用一些回调函数。Future对象实际是coroutine函数装饰器和IOLoop的沟通使者,有着非常重要的作用。

IOLoop类

tornado框架的底层核心类,位于tornado的ioloop模块。功能方面类似win32窗口的消息循环。每个窗口可以绑定一个窗口过程。窗口过程主要是一个消息循环在执行。消息循环主要任务是利用PeekMessage系统调用,从消息队列中取出各种类型的消息,判断消息的类型,然后交给特定的消息handler进行执行。

tornado中的IOLoop与此相比具有很大的相似性,在协程运行环境中担任着协程调度器的角色, 和win32的消息循环本质上都是一种事件循环,等待事件,然后运行对应的事件处理器(handler)。不过IOLoop主要调度处理的是IO事件(如读,写,错误)。除此之外,还能调度callback和timeout事件。

在本博文中,我们暂时只关注callback事件,因为这个与协程调度的相关性最大。

def add_future(self, future, callback):
  assert is_future(future)
  callback = stack_context.wrap(callback)
  future.add_done_callback(
    lambda future: self.add_callback(callback, future))

add_future函数在基类IOLoop中实现,函数参数是一个Future对象和一个callback函数。当Future对象被set_result,执行一个回调函数,是个lambda函数,在lambda函数中调用IOLoop的add_callback函数。将add_future的参数callback加入到IOLoop的统一调度中,让callback在IOLoop下一次迭代中执行。

def add_callback(self, callback, *args, **kwargs):
  with self._callback_lock:
    if self._closing:
      raise RuntimeError("IOLoop is closing")
    list_empty = not self._callbacks
    self._callbacks.append(functools.partial(
      stack_context.wrap(callback), *args, **kwargs))
    if list_empty and thread.get_ident() != self._thread_ident:
      self._waker.wake()

add_callback函数主要在IOLoop的子类PollIOLoop中实现。也很容易理解。

将传入的callback函数,利用偏函数进行包装,将所有callback真正运行时需要的参数,都绑定到生成的偏函数中,实际上就是找个地方把callback运行时需要的参数保存起来。将包装好的偏函数加入到回调函数列表。当IOLoop下一次迭代运行的时候,遍历callback函数列表,运行偏函数的时候,就不再需要传入参数执行,效果等同于用实参运行callback。

IOLoop对象调用start函数,会运行event loop。在event loop中,首先遍历callback列表,执行回调函数,然后遍历timeout列表,执行timeoutCallback。最后才执行ioHandler。

coroutine函数装饰器

函数装饰器本质是一个函数,我们称这个函数为装饰器函数。装饰器函数签名含有一个 函数对象(可调用对象callable)参数,返回的结果是一个装饰器内部定义的一个新函数对象。如果返回的函数对象被调用,装饰器函数的参数(函数对象)也会被调用。不过,会在这个参数(装饰器函数参数)调用前做一些事情,或者在这个参数调用后做一些事情。实际上做的这些事情,就是利用内部自定义的函数对象对参数(原函数)的一些装饰(额外操作)

当一个函数被装饰器装饰。那么以后调用这个函数(此函数已经非彼函数)的时候,实际上调用的是装饰器函数返回的内部函数对象。理解tornado中coroutine修饰的函数如何执行,主要是 理解coroutine这个装饰器函数内部定义的新函数对象所做的那些事儿。

def coroutine(func, replace_callback=True):
  return _make_coroutine_wrapper(func, replace_callback=True)

class Runner(object):
  def __init__(self, gen, result_future, first_yielded):
    self.gen = gen
    self.result_future = result_future
    self.future = _null_future
    self.yield_point = None
    self.pending_callbacks = None
    self.results = None
    self.running = False
    self.finished = False
    self.had_exception = False
    self.io_loop = IOLoop.current()
    self.stack_context_deactivate = None
    if self.handle_yield(first_yielded):
      self.run()

  def run(self):
    if self.running or self.finished:
      return
    try:
      self.running = True
      while True:
        future = self.future
        if not future.done():
          return
        self.future = None
        try:
          try:
            value = future.result()
          except Exception:
            self.had_exception = True
            yielded = self.gen.throw(*sys.exc_info())
          else:
            yielded = self.gen.send(value)
        except (StopIteration, Return) as e:
          self.finished = True
          self.future = _null_future
          self.result_future.set_result(getattr(e, 'value', None))
          self.result_future = None
          return
        except Exception:
          self.finished = True
          self.future = _null_future
          self.result_future.set_exc_info(sys.exc_info())
          self.result_future = None
          return
        if not self.handle_yield(yielded):
          return
    finally:
      self.running = False

  def handle_yield(self, yielded):

    try:
      self.future = convert_yielded(yielded)
    except BadYieldError:
      self.future = TracebackFuture()
      self.future.set_exc_info(sys.exc_info())

    if not self.future.done() or self.future is moment:
      self.io_loop.add_future(
        self.future, lambda f: self.run())
      return False
    return True

以上的代码其实都对源码进行了一些调整。但函数调用进入到Runner的构造函数的时候,也就是说Generator的第一次执行已经完毕。那么接下来,调用的是,handle_yield,对第一次Generator执行的返回结果进行处理。当然返回的结果可能是多种类型。可能是一个Future对象,list,dict,或者其他类型对象,或者普通类型。通过convert_yield,self.future保存的是一个Future对象的引用(第一次Generator执行返回的结果)。此时如果self.future还没被set_result。对为self.future绑定一个done_callback(lambda f: self.run()),加入到self.io_loop中。

在前文说到。ioloop的add_future函数中,实际上是只有当参数future,在某个地方调用了set_result, 才在执行done_callback时,将参数callback加入到IOLoop中调度。换句话说。Runner类中,self.run要等到self.future在某个代码块被set_result,IOLoop才有可能在下一次迭代的时候执行它,从而调度协程继续恢复执行。而在self.run函数中,我们可以看到将会通过Generator的send函数,恢复执行下一个协程代码块。所以关键的问题是我们需要明白Runner类中self.future,在什么时候被set_result?

从这里我们可以看到Future类的重要作用。future.set_result起到的作用是:

发送一个信号,告诉IOLoop去调度暂停的协程继续执行。

我们结合下面的代码例子就可以明白协程调度的整个流程是如何进行的了。

import tornado.ioloop
from tornado.gen import coroutine
from tornado.concurrent import Future

@coroutine
def asyn_sum(a, b):
  print("begin calculate:sum %d+%d"%(a,b))
  future = Future()

  def callback(a, b):
    print("calculating the sum of %d+%d:"%(a,b))
    future.set_result(a+b)
  tornado.ioloop.IOLoop.instance().add_callback(callback, a, b)

  result = yield future

  print("after yielded")
  print("the %d+%d=%d"%(a, b, result))

def main():
  asyn_sum(2,3)
  tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
  main()

实际的运行场景是:一个协程(asyn_sum)遇到yield表达式被暂停执行后,IOLoop调用另外一个代码段(asyn_sum中的回调函数callback)执行,而在callback中,刚好可以访问到属于被暂停协程(asyn_sum)中的future对象(也就是Runner对象中的self.future的引用),callback中将future调用set_result,那么这个暂停的协程(asyn_sum)在IOLoop下一次迭代调度回调函数时中,被恢复执行。

总结

tornado中的协程实现基于python语言的Generator并且结合一个全局的调度器IOLoop,Generator通过函数装饰器coroutine和IOLoop进行通信。IOLoop并没有直接控制能力,调度恢复被暂停的协程继续执行。future对象在协程中被yield。协程暂停,IOLoop调度另外一个代码模块执行,而在这个执行的代码模块中刚好,可以访问这个future对象,将其set_result,结果通过IOLoop间接恢复暂停协程执行。不同执行代码模块中,共享future对象,彼此合作,协程调度得顺利执行。

从这种意义上来说,future对象,像window中的Event内核对象的作用。window中的event用于线程中同步。而协程中的yield future相当于WaitForSingleObject(event_object), 而future.set_result(result)。相当于SetEvent(event_object)。而future和Event的不同点在于,协程借future来恢复执行,而线程借Event来进行线程间同步。

以上就是本文关于详细解读tornado协程(coroutine)原理的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!

您可能感兴趣的文章:

  • Tornado协程在python2.7如何返回值(实现方法)
  • 简单介绍Python的Tornado框架中的协程异步实现原理
(0)

相关推荐

  • 简单介绍Python的Tornado框架中的协程异步实现原理

    Tornado 4.0 已经发布了很长一段时间了, 新版本广泛的应用了协程(Future)特性. 我们目前已经将 Tornado 升级到最新版本, 而且也大量的使用协程特性. 很长时间没有更新博客, 今天就简单介绍下 Tornado 协程实现原理, Tornado 的协程是基于 Python 的生成器实现的, 所以首先来回顾下生成器. 生成器 Python 的生成器可以保存执行状态 并在下次调用的时候恢复, 通过在函数体内使用 yield 关键字 来创建一个生成器, 通过内置函数 next 或生

  • Tornado协程在python2.7如何返回值(实现方法)

    错误写法 class RemoteHandler(web.RequestHandler): @gen.coroutine def get(self): response = httpclient('http://www.baidu.com') self.write(response.body) @gen.coroutine def httpClient(url): result = yield httpclient.AsyncHTTPClient().fetch(url) return resu

  • 详细解读tornado协程(coroutine)原理

    tornado中的协程是如何工作的 协程定义 Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.. -- [ 维基百科 ] 我们在平常编程中,更习惯使用的是子

  • Python 异步协程函数原理及实例详解

    这篇文章主要介绍了Python 异步协程函数原理及实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一. asyncio 1.python3.4开始引入标准库之中,内置对异步io的支持 2.asyncio本身是一个消息循环 3.步骤: (1)创建消息循环 (2)把协程导入 (3)关闭 4.举例: import threading # 引入异步io包 import asyncio # 使用协程 @ asyncio.coroutine def

  • Kotlin协程launch原理详解

    目录 正文 launch使用 launch原理 CoroutineStart中找invoke方法 startCoroutineCancellable逻辑 小结 正文 launch我们经常用,今天来看看它是什么原理. 建议: 食用本篇文章之前记得先食用Kotlin协程之createCoroutine和startCoroutine launch使用 launch我们应该很熟悉了,随便举个例子: fun main() { val coroutineScope = CoroutineScope(Job(

  • Kotlin协程Dispatchers原理示例详解

    目录 前置知识 demo startCoroutineCancellable intercepted()函数 DefaultScheduler中找dispatch函数 Runnable传入 Worker线程执行逻辑 小结 前置知识 Kotlin协程不是什么空中阁楼,Kotlin源代码会被编译成class字节码文件,最终会运行到虚拟机中.所以从本质上讲,Kotlin和Java是类似的,都是可以编译产生class的语言,但最终还是会受到虚拟机的限制,它们的代码最终会在虚拟机上的某个线程上被执行. 之

  • python 如何引入协程和原理分析

    相关概念 并发:指一个时间段内,有几个程序在同一个cpu上运行,但是任意时刻只有一个程序在cpu上运行.比如说在一秒内cpu切换了100个进程,就可以认为cpu的并发是100. 并行:值任意时刻点上,有多个程序同时运行在cpu上,可以理解为多个cpu,每个cpu独立运行自己程序,互不干扰.并行数量和cpu数量是一致的. 我们平时常说的高并发而不是高并行,是因为cpu的数量是有限的,不可以增加. 形象的理解:cpu对应一个人,程序对应喝茶,人要喝茶需要四个步骤(可以对应程序需要开启四个线程):1烧

  • Kotlin协程概念原理与使用万字梳理

    目录 一.协程概述 1.概念 2.特点 3.原理 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 三.协程使用 1.协程的启动 2.协程间通信 3.多路复用 4.序列生成器 5.协程异步流 6.全局上下文 一.协程概述 1.概念 协程是Coroutine的中文简称,co表示协同.协作,routine表示程序.协程可以理解为多个互相协作的程序.协程是轻量级的线程,它的轻量体现在启动和切换,协程的启动不需要申请额外的堆栈空间:协程的切换发生在

  • Python并发编程协程(Coroutine)之Gevent详解

    Gevent官网文档地址:http://www.gevent.org/contents.html 基本概念 我们通常所说的协程Coroutine其实是corporateroutine的缩写,直接翻译为协同的例程,一般我们都简称为协程. 在linux系统中,线程就是轻量级的进程,而我们通常也把协程称为轻量级的线程即微线程. 进程和协程 下面对比一下进程和协程的相同点和不同点: 相同点: 我们都可以把他们看做是一种执行流,执行流可以挂起,并且后面可以在你挂起的地方恢复执行,这实际上都可以看做是con

  • python 协程 gevent原理与用法分析

    本文实例讲述了python 协程 gevent原理与用法.分享给大家供大家参考,具体如下: gevent greenlet已经实现了协程,但是这个还的人工切换,是不是觉得太麻烦了,不要捉急,python还有一个比greenlet更强大的并且能够自动切换任务的模块gevent 其原理是当一个greenlet遇到IO(指的是input output 输入输出,比如网络.文件操作等)操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行. 由于IO操

  • C++20中的协程(Coroutine)的实现

    C++20中的协程(Coroutine) 从2017年开始, 协程(Coroutine)的概念就开始被建议加入C++20的标准中了,并已经开始有人对C++20协程的提案进行了介绍.1事实上,协程的概念在很早就出现了,甚至其他语言(JS,Python,C#等)早就已经支持了协程. 可见,协程并不是C++所特有的概念. 那么,什么是协程? 简单来说,协程就是一种特殊的函数,它可以在函数执行到某个地方的时候暂停执行,返回给调用者或恢复者(可以有一个返回值),并允许随后从暂停的地方恢复继续执行.注意,这

随机推荐