Python实现简单多线程任务队列

最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码):

def gradient_descent():
  # the gradient descent code
  plotly.write(X, Y)

一般来说,当网络请求 plot.ly 绘图时会阻塞等待返回,于是也会影响到其他的梯度下降函数的执行速度。

一种解决办法是每调用一次 plotly.write 函数就开启一个新的线程,但是这种方法感觉不是很好。 我不想用一个像 cerely(一种分布式任务队列)一样大而全的任务队列框架,因为框架对于我的这点需求来说太重了,并且我的绘图也并不需要 redis 来持久化数据。

那用什么办法解决呢?我在 python 中写了一个很小的任务队列,它可以在一个单独的线程中调用 plotly.write函数。下面是程序代码。

from threading import Thread
import Queue
import time

class TaskQueue(Queue.Queue):

首先我们继承 Queue.Queue 类。从 Queue.Queue 类可以继承 get 和 put 方法,以及队列的行为。

def __init__(self, num_workers=1):
  Queue.Queue.__init__(self)
  self.num_workers = num_workers
  self.start_workers()

初始化的时候,我们可以不用考虑工作线程的数量。

def add_task(self, task, *args, **kwargs):
  args = args or ()
  kwargs = kwargs or {}
  self.put((task, args, kwargs))

我们把 task, args, kwargs 以元组的形式存储在队列中。*args 可以传递数量不等的参数,**kwargs 可以传递命名参数。

def start_workers(self):
  for i in range(self.num_workers):
    t = Thread(target=self.worker)
    t.daemon = True
    t.start()

我们为每个 worker 创建一个线程,然后在后台删除。

下面是 worker 函数的代码:

def worker(self):
  while True:
    tupl = self.get()
    item, args, kwargs = self.get()
    item(*args, **kwargs)
    self.task_done()

worker 函数获取队列顶端的任务,并根据输入参数运行,除此之外,没有其他的功能。下面是队列的代码:

我们可以通过下面的代码测试:

def blokkah(*args, **kwargs):
  time.sleep(5)
  print “Blokkah mofo!”

q = TaskQueue(num_workers=5)

for item in range(1):
  q.add_task(blokkah)

q.join() # wait for all the tasks to finish.

print “All done!”

Blokkah 是我们要做的任务名称。队列已经缓存在内存中,并且没有执行很多任务。下面的步骤是把主队列当做单独的进程来运行,这样主程序退出以及执行数据库持久化时,队列任务不会停止运行。但是这个例子很好地展示了如何从一个很简单的小任务写成像工作队列这样复杂的程序。

def gradient_descent():
  # the gradient descent code
  queue.add_task(plotly.write, x=X, y=Y)

修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感兴趣的话,可以参考下面的代码。

from threading import Thread
import Queue
import time

class TaskQueue(Queue.Queue):

def __init__(self, num_workers=1):
Queue.Queue.__init__(self)
self.num_workers = num_workers
self.start_workers()

def add_task(self, task, *args, **kwargs):
args = args or ()
kwargs = kwargs or {}
self.put((task, args, kwargs))

def start_workers(self):
for i in range(self.num_workers):
t = Thread(target=self.worker)
t.daemon = True
t.start()

def worker(self):
while True:
tupl = self.get()
item, args, kwargs = self.get()
item(*args, **kwargs)
self.task_done()

def tests():
def blokkah(*args, **kwargs):
time.sleep(5)
print "Blokkah mofo!"

q = TaskQueue(num_workers=5)

for item in range(10):
q.add_task(blokkah)

q.join() # block until all tasks are done
print "All done!"

if __name__ == "__main__":
tests()
(0)

相关推荐

  • python实现堆栈与队列的方法

    本文实例讲述了python实现堆栈与队列的方法.分享给大家供大家参考.具体分析如下: 1.python实现堆栈,可先将Stack类写入文件stack.py,在其它程序文件中使用from stack import Stack,然后就可以使用堆栈了. stack.py的程序: 复制代码 代码如下: class Stack():      def __init__(self,size):          self.size=size;          self.stack=[];         

  • python数据结构之二叉树的遍历实例

    遍历方案   从二叉树的递归定义可知,一棵非空的二叉树由根结点及左.右子树这三个基本部分组成.因此,在任一给定结点上,可以按某种次序执行三个操作:   1).访问结点本身(N)   2).遍历该结点的左子树(L)   3).遍历该结点的右子树(R) 有次序:   NLR.LNR.LRN 遍历的命名 根据访问结点操作发生位置命名:NLR:前序遍历(PreorderTraversal亦称(先序遍历))  --访问结点的操作发生在遍历其左右子树之前.LNR:中序遍历(InorderTraversal)

  • python数据结构之二叉树的建立实例

    先建立二叉树节点,有一个data数据域,left,right 两个指针域 复制代码 代码如下: # -*- coding: utf - 8 - *- class TreeNode(object): def __init__(self, left=0, right=0, data=0):        self.left = left        self.right = right        self.data = data 复制代码 代码如下: class BTree(object):

  • python二叉树遍历的实现方法

    复制代码 代码如下: #!/usr/bin/python# -*- coding: utf-8 -*- class TreeNode(object):    def __init__(self,data=0,left=0,right=0):        self.data = data        self.left = left        self.right = right class BTree(object):    def __init__(self,root=0):     

  • Python二叉搜索树与双向链表转换实现方法

    本文实例讲述了Python二叉搜索树与双向链表实现方法.分享给大家供大家参考,具体如下: # encoding=utf8 ''' 题目:输入一棵二叉搜索树,将该二叉搜索树转换成一个排序的双向链表. 要求不能创建任何新的结点,只能调整树中结点指针的指向. ''' class BinaryTreeNode(): def __init__(self, value, left = None, right = None): self.value = value self.left = left self.

  • Python算法之栈(stack)的实现

    本文以实例形式展示了Python算法中栈(stack)的实现,对于学习数据结构域算法有一定的参考借鉴价值.具体内容如下: 1.栈stack通常的操作: Stack() 建立一个空的栈对象 push() 把一个元素添加到栈的最顶层 pop() 删除栈最顶层的元素,并返回这个元素 peek()  返回最顶层的元素,并不删除它 isEmpty()  判断栈是否为空 size()  返回栈中元素的个数 2.简单案例以及操作结果: Stack Operation Stack Contents Return

  • Python3中多线程编程的队列运作示例

    Python3,开一个线程,间隔1秒把一个递增的数字写入队列,再开一个线程,从队列中取出数字并打印到终端 #! /usr/bin/env python3 import time import threading import queue # 一个线程,间隔一定的时间,把一个递增的数字写入队列 # 生产者 class Producer(threading.Thread): def __init__(self, work_queue): super().__init__() # 必须调用 self.

  • 栈和队列数据结构的基本概念及其相关的Python实现

    先来回顾一下栈和队列的基本概念: 相同点:从"数据结构"的角度看,它们都是线性结构,即数据元素之间的关系相同. 不同点:栈(Stack)是限定只能在表的一端进行插入和删除操作的线性表. 队列(Queue)是限定只能在表的一端进行插入和在另一端进行删除操作的线性表.它们是完全不同的数据类型.除了它们各自的基本操作集不同外,主要区别是对插入和删除操作的"限定". 栈必须按"后进先出"的规则进行操作:比如说,小学老师批改学生的作业,如果不打乱作业本的顺

  • Python实现栈的方法

    本文实例讲述了Python实现栈的方法.分享给大家供大家参考.具体实现方法如下: #!/usr/bin/env python #定义一个列表来模拟栈 stack = [] #进栈,调用列表的append()函数加到列表的末尾,strip()没有参数是去掉首尾的空格 def pushit(): stack.append(raw_input('Enter new string: ').strip()) #出栈,用到了pop()函数 def popit(): if len(stack) == 0: p

  • Python编程实现双链表,栈,队列及二叉树的方法示例

    本文实例讲述了Python编程实现双链表,栈,队列及二叉树的方法.分享给大家供大家参考,具体如下: 1.双链表 class Node(object): def __init__(self, value=None): self._prev = None self.data = value self._next = None def __str__(self): return "Node(%s)"%self.data class DoubleLinkedList(object): def

  • python双向链表实现实例代码

    示意图: python双向链表实现代码: 复制代码 代码如下: #!/usr/bin/python# -*- coding: utf-8 -*- class Node(object):    def __init__(self,val,p=0):        self.data = val        self.next = p        self.prev = p class LinkList(object):    def __init__(self):        self.he

随机推荐