用Python生成器实现微线程编程的教程

微线程领域(至少在 Python 中)一直都是 Stackless Python 才能涉及的特殊增强部分。关于 Stackless 的话题以及最近它经历的变化,可能本身就值得开辟一个专栏了。但其中简单的道理就是,在“新的 Stackless”下,延续(continuation)显然是不合时宜的,但微线程还是这个项目 存在的理由。这一点很复杂……

刚开始,我们还是先来回顾一些内容。那么,什么是微线程呢? 微线程基本上可以说是只需要很少的内部资源就可以运行的进程 ― 并且是在 Python 解释器的单个实例中(在公共内存空间中,等等)运行的进程。有了微线程,我们就可能在目前中等性能的 PC 机上运行数以万计的并行进程,还可以每秒钟几十万次地在上下文之间切换。对 fork() 的调用或标准的 OS 线程调用根本不能达到这个程度!甚至所谓的“轻量级”线程库中的线程也比这里提出的微线程“重”好几个数量级。

我在本专栏中介绍的轻便线程的含义与 OS 线程的含义有一点不同。就这点而言,它们与 Stackless 所提供的也不尽相同。在很多方面,轻便线程比大多数变体都简单得多;大多数关于信号、锁定及诸如此类的问题都不存在了。简单性的代价就是,我提出了一种“协作多线程”的形式;我觉得在标准 Python 框架中加入抢占并不可行(至少在非 Stackless 的 Python 2.2 中 — 没有人知道 __future__ 会带来什么)。

轻便线程在某种意义上会令人回想起较早的 Windows 和 MacOS 版本的协作多任务(不过是在单个应用程序中)。然而,在另一种意义上,轻便线程只不过是在程序中表达流的另一种方式;轻便线程所做的一切(至少在原则上)都可以用“真正庞大的 if/elif 块”技术来完成(蛮干的程序员的黔驴之计)。

一种用简单的生成器模拟协同程序的机制。这个机制的核心部分非常简单。 scheduler() 函数中包装了一组生成器对象,这个函数控制将控制流委托给合适的分支的过程。这些协同程序并不是 真正的协同程序,因为它们只控制到 scheduler() 函数和来自该函数的分支。不过出于实用的目的,您可以用非常少的额外代码来完成同样的事情。 scheduler() 就是类似于下面这样的代码:
清单 1. 模拟协同程序的 Scheduler()

def scheduler(gendct, start):
 global cargo
 coroutine = start
 while 1:
  (coroutine, cargo) = gendct[coroutine].next()

关于这个包装器要注意的一点是,每个生成器/协同程序都会生成一个包含它的预期分支目标的元组。生成器/协同程序基本上都在 GOTO 目标处退出。为了方便起见,我还让生成器生成了一个标准的 cargo 容器,作为形式化在协同程序之间传送的数据的方法 — 不过您也可以只用已经达成一致的全局变量或回调 setter/getter 函数来传送数据。Raymond Hettinger 撰写了一个 Python 增强倡议(Python Enhancement Proposal,PEP),旨在使传送的数据能被更好地封装;可能今后的 Python 将包括这个倡议。

新的调度程序

对于轻便线程来说,它们的需求与协同程序的需求稍有不同。不过我们还是可以在它的核心处使用 scheduler() 函数。不同之处在于,调度程序本身应该决定分支目标,而不是从生成器/协同程序接收分支目标。下面让我向您展示一个完整的测试程序和样本:
清单 2. microthreads.py 示例脚本

from

   __future__ 

  import

   generators

  import

   sys, time
threads = []
TOTALSWITCHES = 

  10**6
NUMTHREADS = 

  10**5def

   null_factory():

  def

   empty():

  while1: 

  yield

   None

  return

   empty()

  def

   quitter():

  for

   n 

  in

   xrange(TOTALSWITCHES/NUMTHREADS):

  yield

   None

  def

   scheduler():

  global

   threads

  try

  :

  while1:

  for

   thread 

  in

   threads: thread.next()

  except

   StopIteration:

  passif

   __name__ == 

  "__main__"

  :

  for

   i 

  in

   range(NUMTHREADS):
  threads.append(null_factory())
 threads.append(quitter())
 starttime = time.clock()
 scheduler()

  print"TOTAL TIME: "

  , time.clock()-starttime

  print"TOTAL SWITCHES:"

  , TOTALSWITCHES

  print"TOTAL THREADS: "

  , NUMTHREADS

这大概就是您能够选择的最简单的轻便线程调度程序了。每个线程都按固定顺序进入,而且每个线程都有同样的优先级。接下来,让我们来看看如何处理细节问题。和前面部分所讲的协同程序一样,编写轻便线程时应该遵守一些约定。

处理细节

大多数情况下,轻便线程的生成器都应该包括在 while 1: 循环中。这里设置调度程序的方法将导致在其中一个线程停止时整个调度程序停止。这在某种意义上“健壮性”不如 OS 线程 ― 不过在 scheduler() 的循环 内捕获异常不会比在循环外需要更多的机器资源。而且,我们可以从 threads 列表删除线程,而不必终止(由它本身或其它线程终止)。我们其实并没有提供让删除更加容易的详细方法;不过比较常用的扩展方法可能是将线程存储在字典或某种其它的结构中,而不是列表中。

该示例说明了最后终止调度程序循环的一种合理的方法。 quitter() 是一种特殊的生成器/线程,它监视某种条件(在本示例中只是一个上下文切换的计数),并在条件满足时抛出 StopIteration (本示例中不捕获其它异常)。请注意,在终止之后,其它所有生成器还是完整的,如果需要,还可以在今后恢复(在微线程调度程序或其它程序中)。显然,如果需要,您可以 delete 这些生成器/线程。

这里讨论的示例使用了特殊的无意义线程。它们什么也不做,而且以一种可能性最小的形式实现这一点。我们这样建立该示例是为了说明一点 ― 轻便线程的内在开销是非常低的。在一台比较老的只有 64 MB 内存的 Windows 98 Pentium II 膝上型电脑上创建 100,000 个轻便线程是轻而易举的(如果达到了一百万个线程,就会出现长时间的磁盘“猛转”)。请用 OS 线程试试看! 而且,在这个比较慢的 366 MHz 芯片上可以在大约 10 秒内执行一百万次上下文切换(所涉及的线程数对耗时并无重大影响)。显然,真正的轻便线程应该 做一些事情,而这将根据任务使用更多的资源。不过线程本身却赢得了“轻便”的名声。

切换开销

在轻便线程之间切换开销很小,但还不是完全没有开销。为了测试这种情况,我构建了一个执行 某种工作(不过大约是您在线程中按道理可以完成的最少量)的示例。因为线程调度程序 真的等同于“执行 A,接着执行 B,然后执行 C,等等”的指令,所以要在主函数中创建一个完全并行的情况也不困难。
清单 3. overhead.py 示例脚本

from

   __future__ 

  import

   generators

  import

   time
TIMES = 100000

  def

   stringops():

  for

   n 

  in

   xrange(TIMES):
  s = 

  "Mary had a little lamb"

  s = s.upper()
  s = 

  "Mary had a little lamb"

  s = s.lower()
  s = 

  "Mary had a little lamb"

  s = s.replace('a','A')

  def

   scheduler():

  for

   n 

  in

   xrange(TIMES):

  for

   thread 

  in

   threads: thread.next()

  def

   upper():

  while1:
  s = 

  "Mary had a little lamb"

  s = s.upper()

  yield

   None

  def

   lower():

  while1:
  s = 

  "Mary had a little lamb"

  s = s.lower()

  yield

   None

  def

   replace():

  while1:
  s = 

  "Mary had a little lamb"

  s = s.replace(

  'a'

  ,

  'A'

  )

  yield

   None

  if

   __name__==

  '__main__':

 start = time.clock()
 stringops()
 looptime = time.clock()-start

  print"LOOP TIME:"

  , looptime

  global

   threads
 threads.append(upper())
 threads.append(lower())
 threads.append(replace())
 start = time.clock()
 scheduler()
 threadtime = time.clock()-start

  print"THREAD TIME:"

  , threadtime

结果表明,在直接循环的版本运行一次的时间内,轻便线程的版本运行了两次还多一点点 ― 也就相当于在上面提到的机器上,轻便线程运行了不到 3 秒,而直接循环运行了超过 6 秒。显然,如果每个工作单元都相当于单个字符串方法调用的两倍、十倍或一百倍,那么所花费的线程开销比例就相应地更小了。

设计线程

轻便线程可以(而且通常应该)比单独的概念性操作规模更大。无论是何种线程,都是用来表示描述一个特定 任务或 活动所需的流上下文的量。但是,任务花费的时间/大小可能比我们希望在单独线程上下文中使用的要多/大。抢占将自动处理这种问题,不需要应用程序开发者作出任何特定干涉。不幸的是,轻便线程用户需要注意“好好地处理”其它线程。

至少,轻便线程应该设计得足够周全,在完成概念性操作时应该能够 yield 。调度程序将回到这里以进行下一步。举例来说:
清单 4. 伪码友好的轻便线程

def nicethread():
    while 1:
        ...operation A...
        yield None
        ...operation B...
        yield None
        ...operation C...
        yield None

多数情况下,好的设计将比在基本操作之间的边界 yield 更多次。虽然如此,通常在概念上“基本”的东西都涉及对一个大集合的循环。如果情况如此(根据循环体耗费时间的程度),在循环体中加入一到两个 yield (可能在特定数量的循环迭代执行过后再次发生)可能会有所帮助。和优先权线程的情况不同,一个行为不良的轻便线程会获取无限量的独占处理器时间。

调度的其它部分

迄今为止,上面的示例只展示了形式最基本的几个线程调度程序。可能实现的还有很多(这个问题与设计一个好的生成器/线程没什么关系)。让我来顺便向您展示几个传送中可能出现的增强。
更好的线程管理

一个简单的 threads 列表就可以使添加调度程序要处理的生成器/线程非常容易。但是这种数据结构并不能使删除或暂挂不再相关的线程变得容易。字典或类可能是线程管理中更好的数据结构。下面是一个快捷的示例,这个类能够(几乎能够)顺便访问示例中的 threads 列表:
清单 5. 线程管理的 Python 类示例

class

     ThreadPool: 

    """Enhanced threads list as class
  threads = ThreadPool()
  threads.append(threadfunc) # not generator object
  if threads.query(num) <<has some property>>:
    threads.remove(num)
  """def

     __init__(self):
    self.threadlist = []
    self.threaddict = {}
    self.avail = 

    1def

     __getitem__(self, n):

    return

     self.threadlist[n]

    def

     append(self, threadfunc, docstring=None):

    # Argument is the generator func, not the gen object
# Every threadfunc should contain a docstring

    docstring = docstring 

    or

     threadfunc.__doc__
    self.threaddict[self.avail] = (docstring, threadfunc())
    self.avail += 

    1
    self.threadlist = [p[

    1] 

    for

     p 

    in

     self.threaddict.values()]

    return

     self.avail-

    1# return the threadIDdef

     remove(self, threadID):

    del

     self.threaddict[threadID]
    self.threadlist = [p[

    1] 

    for

     p 

    in

     self.threaddict.values()]

    def

     query(self, threadID):

    "

    Information on thread, 

    if

     it exists (otherwise None)

    return

     self.threaddict.get(threadID,[None])[0]

您可以实现更多内容,而这是个好的起点。
线程优先级

在简单的示例中,所有线程都获得调度程序同等的关注。至少有两种普通方法可以实现调优程度更好的线程优先级系统。一个优先级系统可以只对“高优先级”线程投入比低优先级线程更多的注意力。我们可以用一种直接的方式实现它,就是创建一个新类 PriorityThreadPool(ThreadPool) ,这个类在线程迭代期间更频繁地返回更重要的线程。最简单的方法可能会在 .__getitem__() 方法中连续多次返回某些线程。那么,高优先级线程就可能接收到两个,或多个,或一百个连续的“时间片”,而不只是原来的一个。这里的一个(非常弱的)“实时”变量最多可能返回散落在线程列表中各处的重要线程的多个副本。这将增加服务于高优先级线程的实际频率,而不只是它们受到的所有关注。

在纯 Python 中使用更复杂的线程优先级方法可能不是很容易(不过它是使用某种第三方特定于 OS/处理器的库来实现的)。调度程序不是只给高优先级线程一个时间片的整型数,它还可以测量每个轻便线程中实际花费的时间,然后动态调整线程调度,使其对等待处理的线程更加“公平”(也许公平性和线程优先级是相关的)。不幸的是,Python 的 time.clock() 和它的系列都不是精度足够高的计时器,不足以使这种方式有效。另一方面,没有什么可以阻止“多时间片”方法中处理不足的线程去动态提高它自己的优先级。
将微线程和协作程序结合在一起

为了创建一个轻便线程(微线程)调度程序,我删除了协作程序逻辑“please branch to here”。这样做其实并不必要。示例中的轻便线程生成的通常都是 None ,而不是跳转目标。我们完全可以把这两个概念结合在一起:如果协同程序/线程生成了跳转目标,调度程序就可以跳转到被请求的地方(也许,除非被线程优先级覆盖)。然而,如果协同程序/线程只生成 None ,调度程序就可以自己决定下一步要关注哪个适当的线程。决定(以及编写)一个任意的跳转究竟会如何与线性线程队列交互将涉及到不少工作,不过这些工作中没有什么特别神秘的地方。

快速而廉价 — 为什么不喜欢它呢?

微线程模式(或者“轻便线程”)基本上可以归结为 Python 中流控制的另一种奇怪的风格。本专栏的前面几个部分已经谈到了另外几种风格。有各种控制机制的引人之处在于,它让开发者将代码功能性隔离在其逻辑组件内,并最大化代码的上下文相关性。

其实,要实现做任何可能做到的事的 可能性并不复杂(只要用一个“loop”和一个“if”就可以了)。对于轻易地分解为很多细小的“代理”、“服务器”或“进程”的一类问题来说,轻便线程可能是表达应用程序的底层“业务逻辑”的最清楚的模型。当然,轻便线程与一些大家更熟知的流机制相比速度可能非常快,就这点而言并无大碍。

(0)

相关推荐

  • python迭代器与生成器详解

    例子 老规矩,先上一个代码: def add(s, x): return s + x def gen(): for i in range(4): yield i base = gen() for n in [1, 10]: base = (add(i, n) for i in base) print list(base) 这个东西输出可以脑补一下, 结果是[20,21,22,23], 而不是[10, 11, 12, 13]. 当时纠结了半天,一直没搞懂,后来齐老师稍微指点了一下, 突然想明白了-

  • 举例讲解Python中的迭代器、生成器与列表解析用法

    迭代器:初探 上一章曾经提到过,其实for循环是可用于任何可迭代的对象上的.实际上,对Python中所有会从左至右扫描对象的迭代工具而言都是如此,这些迭代工具包括了for循环.列表解析.in成员关系测试以及map内置函数等. "可迭代对象"的概念在Python中是相当新颖的,基本这就是序列观念的通用化:如果对象时实际保存的序列,或者可以再迭代工具环境中一次产生一个结果的对象,那就看做是可迭代的. >>文件迭代器 作为内置数据类型的文件也是可迭代的,它有一个名为__next_

  • Python使用设计模式中的责任链模式与迭代器模式的示例

    责任链模式 责任链模式:将能处理请求的对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理请求为止,避免请求的发送者和接收者之间的耦合关系. #encoding=utf-8 # #by panda #职责连模式 def printInfo(info): print unicode(info, 'utf-8').encode('gbk') #抽象职责类 class Manager(): successor = None name = '' def __init__(self, name):

  • python生成器表达式和列表解析

    绝大多数情况下,遍历一个集合都是为了对元素应用某个动作或是进行筛选.如果看过本文的第二部分,你应该还记得有内建函数map和filter提供了这些功能,但Python仍然为这些操作提供了语言级的支持. (x+1 for x in lst) #生成器表达式,返回迭代器.外部的括号可在用于参数时省略. [x+1 for x in lst] #列表解析,返回list 如你所见,生成器表达式和列表解析(注:这里的翻译有很多种,比如列表展开.列表推导等等,指的是同一个意思)的区别很小,所以人们提到这个特性时

  • Python中的列表生成式与生成器学习教程

    列表生成式 即创建列表的方式,最笨的方法就是写循环逐个生成,前面也介绍过可以使用range()函数来生成,不过只能生成线性列表,下面看看更为高级的生成方式: >>> [x * x for x in range(1, 11)] [1, 4, 9, 16, 25, 36, 49, 64, 81, 100] 写列表生成式时,把要生成的元素x * x放到前面,后面跟for循环,就可以把list创建出来,十分有用,多写几次,很快就可以熟悉这种语法. 你甚至可以在后面加上if判断: >>

  • Python的迭代器和生成器

    先说迭代器,对于string.list.dict.tuple等这类容器对象,使用for循环遍历是很方便的.在后台for语句对容器对象调用iter()函数,iter()是python的内置函数.iter()会返回一个定义了next()方法的迭代器对象,它在容器中逐个访问容器内元素,next()也是python的内置函数.在没有后续元素时,next()会抛出一个StopIteration异常,通知for语句循环结束.比如: >>> s = 'abc' >>> it = it

  • 浅谈Python中列表生成式和生成器的区别

    列表生成式语法: [x*x for x in range(0,10)] //列表生成式,这里是中括号 //结果 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] (x*x for x in range(0,10)) //生成器, 这里是小括号 //结果 <generator object <genexpr> at 0x7f0b072e6140> 二者的区别很明显: 一个直接返回了表达式的结果列表, 而另一个是一个对象,该对象包含了对表达式结果的计算引用, 通

  • 用Python生成器实现微线程编程的教程

    微线程领域(至少在 Python 中)一直都是 Stackless Python 才能涉及的特殊增强部分.关于 Stackless 的话题以及最近它经历的变化,可能本身就值得开辟一个专栏了.但其中简单的道理就是,在"新的 Stackless"下,延续(continuation)显然是不合时宜的,但微线程还是这个项目 存在的理由.这一点很复杂-- 刚开始,我们还是先来回顾一些内容.那么,什么是微线程呢? 微线程基本上可以说是只需要很少的内部资源就可以运行的进程 ― 并且是在 Python

  • 用Python进行基础的函数式编程的教程

    许多函数式文章讲述的是组合,流水线和高阶函数这样的抽象函数式技术.本文不同,它展示了人们每天编写的命令式,非函数式代码示例,以及将这些示例转换为函数式风格. 文章的第一部分将一些短小的数据转换循环重写成函数式的maps和reduces.第二部分选取长一点的循环,把他们分解成单元,然后把每个单元改成函数式的.第三部分选取一个很长的连续数据转换循环,然后把它分解成函数式流水线. 示例都是用Python写的,因为很多人觉得Python易读.为了证明函数式技术对许多语言来说都相同,许多示例避免使用Pyt

  • 在Python下进行UDP网络编程的教程

    TCP是建立可靠连接,并且通信双方都可以以流的形式发送数据.相对TCP,UDP则是面向无连接的协议. 使用UDP协议时,不需要建立连接,只需要知道对方的IP地址和端口号,就可以直接发数据包.但是,能不能到达就不知道了. 虽然用UDP传输数据不可靠,但它的优点是和TCP比,速度快,对于不要求可靠到达的数据,就可以使用UDP协议. 我们来看看如何通过UDP协议传输数据.和TCP类似,使用UDP的通信双方也分为客户端和服务器.服务器首先需要绑定端口: s = socket.socket(socket.

  • Python并发编程实例教程之线程的玩法

    目录 一.线程基础以及守护进程 二.线程锁(互斥锁) 三.线程锁(递归锁) 四.死锁 五.队列 六.相关面试题 七.判断数据是否安全 八.进程池 & 线程池 总结 一.线程基础以及守护进程 线程是CPU调度的最小单位 全局解释器锁 全局解释器锁GIL(global interpreter lock) 全局解释器锁的出现主要是为了完成垃圾回收机制的回收机制,对不同线程的引用计数的变化记录的更加精准. 全局解释器锁导致了同一个进程中的多个线程只能有一个线程真正被CPU执行. GIL锁每执行700条指

  • Python Socket编程入门教程

    这是用来快速学习 Python Socket 套接字编程的指南和教程.Python 的 Socket 编程跟 C 语言很像. Python 官方关于 Socket 的函数请看 http://docs.python.org/library/socket.html 基本上,Socket 是任何一种计算机网络通讯中最基础的内容.例如当你在浏览器地址栏中输入 www.jb51.net 时,你会打开一个套接字,然后连接到 www.jb51.net 并读取响应的页面然后然后显示出来.而其他一些聊天客户端如

  • python 专题九 Mysql数据库编程基础知识

    在Python网络爬虫中,通常是通过TXT纯文本方式存储,其实也是可以存储在数据库中的:同时在WAMP(Windows.Apache.MySQL.PHP或Python)开发网站中,也可以通过Python构建网页的,所以这篇文章主要讲述Python调用MySQL数据库相关编程知识.从以下几个方面进行讲解: 1.配置MySLQ 2.SQL语句基础知识 3.Python操作MySQL基础知识 4.Python调用MySQL示例 一. 配置MySQL 首先下载mysql-5.0.96-winx64,安装

  • Python 生成器,迭代,yield关键字,send()传参给yield语句操作示例

    本文实例讲述了Python 生成器,迭代,yield关键字,send()传参给yield语句操作.分享给大家供大家参考,具体如下: demo.py(生成器,yield关键字): # 生成器是一个特殊的迭代器.可以用for...in遍历. # 带有yield关键字的函数,不再是一个函数,而是一个生成器模板.调用该模板会返回一个生成器对象. def create_num(all_num): a, b = 0, 1 current_num = 0 while current_num < all_num

  • python生成器/yield协程/gevent写简单的图片下载器功能示例

    本文实例讲述了python生成器/yield协程/gevent写简单的图片下载器功能.分享给大家供大家参考,具体如下: 1.生成器: '''第二种生成器''' # 函数只有有yield存在就是生成器 def test(i): while True: i += 1 res = yield i print(res) i += 1 return res def main(): t = test(1) # 创建生成器对象 print(next(t)) # next第一次执行从上到下,yield是终点 p

  • 一文搞懂Python中的进程,线程和协程

    目录 1.什么是并发编程 2.进程与多进程 3.线程与多线程 4.协程与多协程 5.总结 1.什么是并发编程 并发编程是实现多任务协同处理,改善系统性能的方式.Python中实现并发编程主要依靠 进程(Process):进程是计算机中的程序关于某数据集合的一次运行实例,是操作系统进行资源分配的最小单位 线程(Thread):线程被包含在进程之中,是操作系统进行程序调度执行的最小单位 协程(Coroutine):协程是用户态执行的轻量级编程模型,由单一线程内部发出控制信号进行调度 直接上一张图看看

  • Python快速实现一个线程池的示例代码

    目录 楔子 Future 对象 提交函数自动创建 Future 对象 future.set_result 到底干了什么事情 提交多个函数 使用 map 来提交多个函数 按照顺序等待执行 取消一个函数的执行 函数执行时出现异常 等待所有函数执行完毕 小结 楔子 当有多个 IO 密集型的任务要被处理时,我们自然而然会想到多线程.但如果任务非常多,我们不可能每一个任务都启动一个线程去处理,这个时候最好的办法就是实现一个线程池,至于池子里面的线程数量可以根据业务场景进行设置. 比如我们实现一个有 10

随机推荐