Python定义一个Actor任务

问题

你想定义跟actor模式中类似“actors”角色的任务

解决方案

actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。 事实上,它天生的简单性是它如此受欢迎的重要原因之一。 简单来讲,一个actor就是一个并发执行的任务,只是简单的执行发送给它的消息任务。 响应这些消息时,它可能还会给其他actor发送更进一步的消息。 actor之间的通信是单向和异步的。因此,消息发送者不知道消息是什么时候被发送, 也不会接收到一个消息已被处理的回应或通知。

结合使用一个线程和一个队列可以很容易的定义actor,例如:

from queue import Queue
from threading import Thread, Event

# Sentinel used for shutdown
class ActorExit(Exception):
  pass

class Actor:
  def __init__(self):
    self._mailbox = Queue()

  def send(self, msg):
    '''
    Send a message to the actor
    '''
    self._mailbox.put(msg)

  def recv(self):
    '''
    Receive an incoming message
    '''
    msg = self._mailbox.get()
    if msg is ActorExit:
      raise ActorExit()
    return msg

  def close(self):
    '''
    Close the actor, thus shutting it down
    '''
    self.send(ActorExit)

  def start(self):
    '''
    Start concurrent execution
    '''
    self._terminated = Event()
    t = Thread(target=self._bootstrap)

    t.daemon = True
    t.start()

  def _bootstrap(self):
    try:
      self.run()
    except ActorExit:
      pass
    finally:
      self._terminated.set()

  def join(self):
    self._terminated.wait()

  def run(self):
    '''
    Run method to be implemented by the user
    '''
    while True:
      msg = self.recv()

# Sample ActorTask
class PrintActor(Actor):
  def run(self):
    while True:
      msg = self.recv()
      print('Got:', msg)

# Sample use
p = PrintActor()
p.start()
p.send('Hello')
p.send('World')
p.close()
p.join()

这个例子中,你使用actor实例的 send() 方法发送消息给它们。 其机制是,这个方法会将消息放入一个队里中, 然后将其转交给处理被接受消息的一个内部线程。 close() 方法通过在队列中放入一个特殊的哨兵值(ActorExit)来关闭这个actor。 用户可以通过继承Actor并定义实现自己处理逻辑run()方法来定义新的actor。 ActorExit 异常的使用就是用户自定义代码可以在需要的时候来捕获终止请求 (异常被get()方法抛出并传播出去)。

如果你放宽对于同步和异步消息发送的要求, 类actor对象还可以通过生成器来简化定义。例如:

def print_actor():
  while True:

    try:
      msg = yield   # Get a message
      print('Got:', msg)
    except GeneratorExit:
      print('Actor terminating')

# Sample use
p = print_actor()
next(p)   # Advance to the yield (ready to receive)
p.send('Hello')
p.send('World')
p.close()

讨论

actor模式的魅力就在于它的简单性。 实际上,这里仅仅只有一个核心操作 send() . 甚至,对于在基于actor系统中的“消息”的泛化概念可以已多种方式被扩展。 例如,你可以以元组形式传递标签消息,让actor执行不同的操作,如下:

class TaggedActor(Actor):
  def run(self):
    while True:
       tag, *payload = self.recv()
       getattr(self,'do_'+tag)(*payload)

  # Methods correponding to different message tags
  def do_A(self, x):
    print('Running A', x)

  def do_B(self, x, y):
    print('Running B', x, y)

# Example
a = TaggedActor()
a.start()
a.send(('A', 1))   # Invokes do_A(1)
a.send(('B', 2, 3))  # Invokes do_B(2,3)
a.close()
a.join()

作为另外一个例子,下面的actor允许在一个工作者中运行任意的函数, 并且通过一个特殊的Result对象返回结果:

from threading import Event
class Result:
  def __init__(self):
    self._evt = Event()
    self._result = None

  def set_result(self, value):
    self._result = value

    self._evt.set()

  def result(self):
    self._evt.wait()
    return self._result

class Worker(Actor):
  def submit(self, func, *args, **kwargs):
    r = Result()
    self.send((func, args, kwargs, r))
    return r

  def run(self):
    while True:
      func, args, kwargs, r = self.recv()
      r.set_result(func(*args, **kwargs))

# Example use
worker = Worker()
worker.start()
r = worker.submit(pow, 2, 3)
worker.close()
worker.join()
print(r.result())

最后,“发送”一个任务消息的概念可以被扩展到多进程甚至是大型分布式系统中去。 例如,一个类actor对象的 send() 方法可以被编程让它能在一个套接字连接上传输数据 或通过某些消息中间件(比如AMQP、ZMQ等)来发送。

以上就是Python定义一个Actor任务的详细内容,更多关于Python actor任务的资料请关注我们其它相关文章!

(0)

相关推荐

  • PageFactory设计模式基于python实现

    前言 pageFactory的设计模式能在java里执行的原因是java自带了PageFactory类,而在python中没有这样的包,但是已经有人写好了pageFactory在python的包,可以拿来用 pageFactory 用于python支持的py文件 __all__ = ['cacheable', 'callable_find_by', 'property_find_by'] def cacheable_decorator(lookup): def func(self): if no

  • 详解Python的Twisted框架中reactor事件管理器的用法

    铺垫 在大量的实践中,似乎我们总是通过类似的方式来使用异步编程: 监听事件 事件发生执行对应的回调函数 回调完成(可能产生新的事件添加进监听队列) 回到1,监听事件 因此我们将这样的异步模式称为Reactor模式,例如在iOS开发中的Run Loop概念,实际上非常类似于Reactor loop,主线程的Run Loop监听屏幕UI事件,一旦发生UI事件则执行对应的事件处理代码,还可以通过GCD等方式产生事件至主线程执行. 上图是boost对Reactor模式的描绘,Twisted的设计就是基于

  • Python定义一个Actor任务

    问题 你想定义跟actor模式中类似"actors"角色的任务 解决方案 actor模式是一种最古老的也是最简单的并行和分布式计算解决方案. 事实上,它天生的简单性是它如此受欢迎的重要原因之一. 简单来讲,一个actor就是一个并发执行的任务,只是简单的执行发送给它的消息任务. 响应这些消息时,它可能还会给其他actor发送更进一步的消息. actor之间的通信是单向和异步的.因此,消息发送者不知道消息是什么时候被发送, 也不会接收到一个消息已被处理的回应或通知. 结合使用一个线程和一

  • Python定义一个跨越多行的字符串的多种方法小结

    方法一: >>> str1 = '''Le vent se lève, il faut tenter de vivre. 起风了,唯有努力生存. (纵有疾风起,人生不言弃.)''' >>> str1 'Le vent se lève, il faut tenter de vivre. \n起风了,唯有努力生存.\n(纵有疾风起,人生不言弃.)' 编辑的时候,引号挺对的,但是不知道为什么发布的时候,第一行的引号总是多了一些,其实应该是下面这样的: 不过感觉这种方法不够纯粹

  • Python定义一个函数的方法

    Python函数的定义 定义函数,也就是创建一个函数,可以理解为创建一个具有某些用途的工具.定义函数需要用 def 关键字实现,具体的语法格式如下: def 函数名(形参列表): //由零条到多条可执行语句组成的代码块 [return [返回值]] 其中,用 [] 括起来的为可选择部分,即可以使用,也可以省略. 此格式中,各部分参数的含义如下: 函数名:从语法角度来看,函数名只要是一个合法的标识符即可:从程序的可读性角度来看,函数名应该由一个或多个有意义的单词连缀而成,每个单词的字母全部小写,单

  • python通过定义一个类实例作为ftp回调方法

    本文实例讲述了python通过定义一个类实例作为ftp回调方法.分享给大家供大家参考.具体实现方法如下: class Writer: def __init__(self, file): self.f = open(file, "w") def __call__(self, data): self.f.write(data) self.f.write('\n') print data FILENAME = "AutoIndent.py" writer = Writer

  • 在Python中定义一个常量的方法

    大家都知道,网络上流行这么一句话 如果一个程序,JAVA需要写1000行,那PHP要写500行,而Python只要写200行就可以拉~~ 那么在Python中,如何用代码去实现一个常量呢? class MyNum(object): def __init__(self): self.__PI = 3.1415926 @property def PI(self): return self.__PI mynum = MyNum() print(mynum.PI) 这是通过私有属性去定义,通过装饰器@p

  • 如何使用Python VTK高亮显示actor

    前言: VTK,(visualizationtoolkit)是一个开放资源的免费软件系统,主要用于三维计算机图形学.图像处理和可视化.Vtk是在面向对象原理的基础上设计和实现的,它的内核是用C++构建的,包含有大约250,000行代码,2000多个类,还包含有几个转换界面,因此也可以自由的通过Java,Tcl/Tk和Python各种语言使用vtk. 主要函数介绍: NewPickedActor.GetProperty(): 通过该函数,可以设置actor的性质,如颜色.表面样式等. vtk.vt

  • Python实现一个简单的验证码程序

    老师讲完random函数,自己写的,虽然和老师示例的不那么美观,智能,但是也自己想出来的,所以记录一下,代码就需要自己不断的自己练习,实战,才能提高啊!不然就像我们这些大部分靠自学的人,何时能学会.还有就是,这次听老师的,把自己的代码添加注释,所以这次把很简单的代码都写上了注释,而且很大白话,不管有没有接触过python的,我相信仔细看了,肯定能看懂.如果看完,再自己尝试着默写出来,那就是更好到了,好了进入正题: 自己写的: __Author__ = "Zhang Peng" impo

  • 用Python编写一个简单的FUSE文件系统的教程

    如果你是我的长期读者,那么你应该知道我在寻找一个完美备份程序,最后我写了一个基于bup的我自己的加密层. 在写encbup的时候,我对仅仅恢复一个文件就必须要下载整个巨大的档案文件的做法不甚满意,但仍然希望能将EncFS和 rdiff-backup一起使用来实现可远程挂载.加密.去重.版本化备份的功能. 再次试用obnam 后(啰嗦一句:它还是慢的出奇),我注意到了它有一个mount命令.深入研究后,我发现了fuse-python和fusepy,感觉用Python写一个FUSE文件系统应该挺简单

  • Python列出一个文件夹及其子目录的所有文件

    python简介 Python是一种解释型.面向对象.动态数据类型的高级程序设计语言. Python由Guido van Rossum于1989年底发明,第一个公开发行版发行于1991年. 像Perl语言一样, Python 源代码同样遵循 GPL(GNU General Public License)协议. >>> import os >>> for i in os.walk("."): ... print i[0],"\n##"

  • 用Python设计一个经典小游戏

    本文主要介绍如何用Python设计一个经典小游戏:猜大小. 在这个游戏中,将用到前面我介绍过的所有内容:变量的使用.参数传递.函数设计.条件控制和循环等,做个整体的总结和复习. 游戏规则: 初始本金是1000元,默认赔率是1倍,赢了,获得一倍金额,输了,扣除1倍金额. 玩家选择下注,押大或押小: 输入下注金额: 摇3个骰子,11≤骰子总数≤18为大,3≤骰子总数≤10为小: 如果赢了,获得1倍金额,输了,扣除1倍金额,本金为0时,游戏结束. 程序运行结果是这样的: 现在,我们来梳理下思路. 我们

随机推荐