python队列queue模块详解

队列queue 多应用在多线程应用中,多线程访问共享变量。对于多线程而言,访问共享变量时,队列queue是线程安全的。从queue队列的具体实现中,可以看出queue使用了1个线程互斥锁(pthread.Lock()),以及3个条件标量(pthread.condition()),来保证了线程安全。

queue队列的互斥锁和条件变量,可以参考另一篇文章:python线程中同步锁

queue的用法如下:

import Queque
a=[1,2,3]
device_que=Queque.queue()
device_que.put(a)
device=device_que.get()

先看看它的初始化函数__init__(self,maxsize=0):

def __init__(self, maxsize=0):
 self.maxsize = maxsize
 self._init(maxsize)
 # mutex must be held whenever the queue is mutating. All methods
 # that acquire mutex must release it before returning. mutex
 # is shared between the three conditions, so acquiring and
 # releasing the conditions also acquires and releases mutex.
 self.mutex = _threading.Lock()
 # Notify not_empty whenever an item is added to the queue; a
 # thread waiting to get is notified then.
 self.not_empty = _threading.Condition(self.mutex)
 # Notify not_full whenever an item is removed from the queue;
 # a thread waiting to put is notified then.
 self.not_full = _threading.Condition(self.mutex)
 # Notify all_tasks_done whenever the number of unfinished tasks
 # drops to zero; thread waiting to join() is notified to resume
 self.all_tasks_done = _threading.Condition(self.mutex)
 self.unfinished_tasks = 0

定义队列时有一个默认的参数maxsize, 如果不指定队列的长度,即manxsize=0,那么队列的长度为无限长,如果定义了大于0的值,那么队列的长度就是maxsize。

self._init(maxsize):使用了python自带的双端队列deque,来存储元素。

self.mutex互斥锁:任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有该互斥锁。共有两种操作require获取锁,release释放锁。同时该互斥锁被三个共享变量同时享有,即操作conditiond时的require和release操作也就是操作了该互斥锁。

self.not_full条件变量:当队列中有元素添加后,会通知notify其他等待添加元素的线程,唤醒等待require互斥锁,或者有线程从队列中取出一个元素后,通知其它线程唤醒以等待require互斥锁。

self.not empty条件变量:线程添加数据到队列中后,会调用self.not_empty.notify()通知其它线程,唤醒等待require互斥锁后,读取队列。

self.all_tasks_done条件变量:消费者线程从队列中get到任务后,任务处理完成,当所有的队列中的任务处理完成后,会使调用queue.join()的线程返回,表示队列中任务以处理完毕。

queue.put(self, item, block=True, timeout=None)函数:

申请获得互斥锁,获得后,如果队列未满,则向队列中添加数据,并通知notify其它阻塞的某个线程,唤醒等待获取require互斥锁。如果队列已满,则会wait等待。最后处理完成后释放互斥锁。其中还有阻塞block以及非阻塞,超时等逻辑,可以自己看一下:

def put(self, item, block=True, timeout=None):
 """Put an item into the queue. 

 If optional args 'block' is true and 'timeout' is None (the default),
 block if necessary until a free slot is available. If 'timeout' is
 a non-negative number, it blocks at most 'timeout' seconds and raises
 the Full exception if no free slot was available within that time.
 Otherwise ('block' is false), put an item on the queue if a free slot
 is immediately available, else raise the Full exception ('timeout'
 is ignored in that case).
 """
 self.not_full.acquire()
 try:
  if self.maxsize > 0:
   if not block:
    if self._qsize() == self.maxsize:
     raise Full
   elif timeout is None:
    while self._qsize() == self.maxsize:
     self.not_full.wait()
   elif timeout < 0:
    raise ValueError("'timeout' must be a non-negative number")
   else:
    endtime = _time() + timeout
    while self._qsize() == self.maxsize:
     remaining = endtime - _time()
     if remaining <= 0.0:
      raise Full
     self.not_full.wait(remaining)
  self._put(item)
  self.unfinished_tasks += 1
  self.not_empty.notify()
 finally:
  self.not_full.release()

queue.get(self, block=True, timeout=None)函数:

从队列中获取任务,并且从队列中移除此任务。首先尝试获取互斥锁,获取成功则队列中get任务,如果此时队列为空,则wait等待生产者线程添加数据。get到任务后,会调用self.not_full.notify()通知生产者线程,队列可以添加元素了。最后释放互斥锁。

def get(self, block=True, timeout=None):
 """Remove and return an item from the queue. 

 If optional args 'block' is true and 'timeout' is None (the default),
 block if necessary until an item is available. If 'timeout' is
 a non-negative number, it blocks at most 'timeout' seconds and raises
 the Empty exception if no item was available within that time.
 Otherwise ('block' is false), return an item if one is immediately
 available, else raise the Empty exception ('timeout' is ignored
 in that case).
 """
 self.not_empty.acquire()
 try:
  if not block:
   if not self._qsize():
    raise Empty
  elif timeout is None:
   while not self._qsize():
    self.not_empty.wait()
  elif timeout < 0:
   raise ValueError("'timeout' must be a non-negative number")
  else:
   endtime = _time() + timeout
   while not self._qsize():
    remaining = endtime - _time()
    if remaining <= 0.0:
     raise Empty
    self.not_empty.wait(remaining)
  item = self._get()
  self.not_full.notify()
  return item
 finally:
  self.not_empty.release()

queue.put_nowait():无阻塞的向队列中添加任务,当队列为满时,不等待,而是直接抛出full异常,重点是理解block=False:

def put_nowait(self, item):
 """Put an item into the queue without blocking. 

 Only enqueue the item if a free slot is immediately available.
 Otherwise raise the Full exception.
 """
 return self.put(item, False)

queue.get_nowait():无阻塞的向队列中get任务,当队列为空时,不等待,而是直接抛出empty异常,重点是理解block=False:

def get_nowait(self):
  """Remove and return an item from the queue without blocking. 

  Only get an item if one is immediately available. Otherwise
  raise the Empty exception.
  """
  return self.get(False)

queue.qsize empty full 分别获取队列的长度,是否为空,是否已满等:

def qsize(self):
 """Return the approximate size of the queue (not reliable!)."""
 self.mutex.acquire()
 n = self._qsize()
 self.mutex.release()
 return n 

def empty(self):
 """Return True if the queue is empty, False otherwise (not reliable!)."""
 self.mutex.acquire()
 n = not self._qsize()
 self.mutex.release()
 return n 

def full(self):
 """Return True if the queue is full, False otherwise (not reliable!)."""
 self.mutex.acquire()
 n = 0 < self.maxsize == self._qsize()
 self.mutex.release()
 return n

queue.join()阻塞等待队列中任务全部处理完毕,需要配合queue.task_done使用:

def task_done(self):
 """Indicate that a formerly enqueued task is complete. 

 Used by Queue consumer threads. For each get() used to fetch a task,
 a subsequent call to task_done() tells the queue that the processing
 on the task is complete. 

 If a join() is currently blocking, it will resume when all items
 have been processed (meaning that a task_done() call was received
 for every item that had been put() into the queue). 

 Raises a ValueError if called more times than there were items
 placed in the queue.
 """
 self.all_tasks_done.acquire()
 try:
  unfinished = self.unfinished_tasks - 1
  if unfinished <= 0:
   if unfinished < 0:
    raise ValueError('task_done() called too many times')
   self.all_tasks_done.notify_all()
  self.unfinished_tasks = unfinished
 finally:
  self.all_tasks_done.release() 

def join(self):
 """Blocks until all items in the Queue have been gotten and processed. 

 The count of unfinished tasks goes up whenever an item is added to the
 queue. The count goes down whenever a consumer thread calls task_done()
 to indicate the item was retrieved and all work on it is complete. 

 When the count of unfinished tasks drops to zero, join() unblocks.
 """
 self.all_tasks_done.acquire()
 try:
  while self.unfinished_tasks:
   self.all_tasks_done.wait()
 finally:
  self.all_tasks_done.release()

Queue模块除了queue线性安全队列(先进先出),还有优先级队列LifoQueue(后进先出),也就是新添加的先被get到。PriorityQueue具有优先级的队列,即队列中的元素是一个元祖类型,(优先级级别,数据)。

class PriorityQueue(Queue):
 '''''Variant of Queue that retrieves open entries in priority order (lowest first). 

 Entries are typically tuples of the form: (priority number, data).
 ''' 

 def _init(self, maxsize):
  self.queue = [] 

 def _qsize(self, len=len):
  return len(self.queue) 

 def _put(self, item, heappush=heapq.heappush):
  heappush(self.queue, item) 

 def _get(self, heappop=heapq.heappop):
  return heappop(self.queue) 

class LifoQueue(Queue):
 '''''Variant of Queue that retrieves most recently added entries first.''' 

 def _init(self, maxsize):
  self.queue = [] 

 def _qsize(self, len=len):
  return len(self.queue) 

 def _put(self, item):
  self.queue.append(item) 

 def _get(self):
  return self.queue.pop() 

至此queue模块介绍完毕,重点是理解互斥锁,条件变量如果协同工作,保证队列的线程安全。

下面是queue的完全代码:

class Queue:
 """Create a queue object with a given maximum size. 

 If maxsize is <= 0, the queue size is infinite.
 """
 def __init__(self, maxsize=0):
  self.maxsize = maxsize
  self._init(maxsize)
  # mutex must be held whenever the queue is mutating. All methods
  # that acquire mutex must release it before returning. mutex
  # is shared between the three conditions, so acquiring and
  # releasing the conditions also acquires and releases mutex.
  self.mutex = _threading.Lock()
  # Notify not_empty whenever an item is added to the queue; a
  # thread waiting to get is notified then.
  self.not_empty = _threading.Condition(self.mutex)
  # Notify not_full whenever an item is removed from the queue;
  # a thread waiting to put is notified then.
  self.not_full = _threading.Condition(self.mutex)
  # Notify all_tasks_done whenever the number of unfinished tasks
  # drops to zero; thread waiting to join() is notified to resume
  self.all_tasks_done = _threading.Condition(self.mutex)
  self.unfinished_tasks = 0 

 def task_done(self):
  """Indicate that a formerly enqueued task is complete. 

  Used by Queue consumer threads. For each get() used to fetch a task,
  a subsequent call to task_done() tells the queue that the processing
  on the task is complete. 

  If a join() is currently blocking, it will resume when all items
  have been processed (meaning that a task_done() call was received
  for every item that had been put() into the queue). 

  Raises a ValueError if called more times than there were items
  placed in the queue.
  """
  self.all_tasks_done.acquire()
  try:
   unfinished = self.unfinished_tasks - 1
   if unfinished <= 0:
    if unfinished < 0:
     raise ValueError('task_done() called too many times')
    self.all_tasks_done.notify_all()
   self.unfinished_tasks = unfinished
  finally:
   self.all_tasks_done.release() 

 def join(self):
  """Blocks until all items in the Queue have been gotten and processed. 

  The count of unfinished tasks goes up whenever an item is added to the
  queue. The count goes down whenever a consumer thread calls task_done()
  to indicate the item was retrieved and all work on it is complete. 

  When the count of unfinished tasks drops to zero, join() unblocks.
  """
  self.all_tasks_done.acquire()
  try:
   while self.unfinished_tasks:
    self.all_tasks_done.wait()
  finally:
   self.all_tasks_done.release() 

 def qsize(self):
  """Return the approximate size of the queue (not reliable!)."""
  self.mutex.acquire()
  n = self._qsize()
  self.mutex.release()
  return n 

 def empty(self):
  """Return True if the queue is empty, False otherwise (not reliable!)."""
  self.mutex.acquire()
  n = not self._qsize()
  self.mutex.release()
  return n 

 def full(self):
  """Return True if the queue is full, False otherwise (not reliable!)."""
  self.mutex.acquire()
  n = 0 < self.maxsize == self._qsize()
  self.mutex.release()
  return n 

 def put(self, item, block=True, timeout=None):
  """Put an item into the queue. 

  If optional args 'block' is true and 'timeout' is None (the default),
  block if necessary until a free slot is available. If 'timeout' is
  a non-negative number, it blocks at most 'timeout' seconds and raises
  the Full exception if no free slot was available within that time.
  Otherwise ('block' is false), put an item on the queue if a free slot
  is immediately available, else raise the Full exception ('timeout'
  is ignored in that case).
  """
  self.not_full.acquire()
  try:
   if self.maxsize > 0:
    if not block:
     if self._qsize() == self.maxsize:
      raise Full
    elif timeout is None:
     while self._qsize() == self.maxsize:
      self.not_full.wait()
    elif timeout < 0:
     raise ValueError("'timeout' must be a non-negative number")
    else:
     endtime = _time() + timeout
     while self._qsize() == self.maxsize:
      remaining = endtime - _time()
      if remaining <= 0.0:
       raise Full
      self.not_full.wait(remaining)
   self._put(item)
   self.unfinished_tasks += 1
   self.not_empty.notify()
  finally:
   self.not_full.release() 

 def put_nowait(self, item):
  """Put an item into the queue without blocking. 

  Only enqueue the item if a free slot is immediately available.
  Otherwise raise the Full exception.
  """
  return self.put(item, False) 

 def get(self, block=True, timeout=None):
  """Remove and return an item from the queue. 

  If optional args 'block' is true and 'timeout' is None (the default),
  block if necessary until an item is available. If 'timeout' is
  a non-negative number, it blocks at most 'timeout' seconds and raises
  the Empty exception if no item was available within that time.
  Otherwise ('block' is false), return an item if one is immediately
  available, else raise the Empty exception ('timeout' is ignored
  in that case).
  """
  self.not_empty.acquire()
  try:
   if not block:
    if not self._qsize():
     raise Empty
   elif timeout is None:
    while not self._qsize():
     self.not_empty.wait()
   elif timeout < 0:
    raise ValueError("'timeout' must be a non-negative number")
   else:
    endtime = _time() + timeout
    while not self._qsize():
     remaining = endtime - _time()
     if remaining <= 0.0:
      raise Empty
     self.not_empty.wait(remaining)
   item = self._get()
   self.not_full.notify()
   return item
  finally:
   self.not_empty.release() 

 def get_nowait(self):
  """Remove and return an item from the queue without blocking. 

  Only get an item if one is immediately available. Otherwise
  raise the Empty exception.
  """
  return self.get(False) 

 # Override these methods to implement other queue organizations
 # (e.g. stack or priority queue).
 # These will only be called with appropriate locks held 

 # Initialize the queue representation
 def _init(self, maxsize):
  self.queue = deque() 

 def _qsize(self, len=len):
  return len(self.queue) 

 # Put a new item in the queue
 def _put(self, item):
  self.queue.append(item) 

 # Get an item from the queue
 def _get(self):
  return self.queue.popleft()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 利用Python学习RabbitMQ消息队列

    RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱.邮局.投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处. RabbitMQ和邮局的主要区别就是RabbitMQ接收.存储和发送的是二进制数据----消息. rabbitmq基本管理命令: 一步启动Erlang node和Rabbit应用:sudo rabbitmq-serv

  • python实现进程间通信简单实例

    本文实例讲解了python实现两个程序之间通信的方法,具体方法如下: 该实例采用socket实现,与socket网络编程不一样的是socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)的第一个参数是socket.AF_UNIX 而不是 socket.AF_INET 例中两个python程序 s.py/c.py 要先运行s.py 基于fedora13/python2.6测试,成功实现! s.py代码如下: #!/usr/bin/env python im

  • 详解Python中的四种队列

    队列是一种只允许在一端进行插入操作,而在另一端进行删除操作的线性表. 在Python文档中搜索队列(queue)会发现,Python标准库中包含了四种队列,分别是queue.Queue / asyncio.Queue / multiprocessing.Queue / collections.deque. collections.deque deque是双端队列(double-ended queue)的缩写,由于两端都能编辑,deque既可以用来实现栈(stack)也可以用来实现队列(queue

  • python队列Queue的详解

    Queue Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递 基本FIFO队列 class Queue.Queue(maxsize=0) FIFO即First in First Out,先进先出.Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限.一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉.如果maxsize小

  • Python基于list的append和pop方法实现堆栈与队列功能示例

    本文实例讲述了Python基于list的append和pop方法实现堆栈与队列功能.分享给大家供大家参考,具体如下: #coding=utf8 ''''' 堆栈: 堆栈是一个后进先出(LIFO)的数据结构. 在栈上"push"元素是个常用术语,意思是把一个对象添加到堆栈中. 删除一个元素,可以把它"pop"出堆栈. 队列: 队列是一种先进先出(FIFO)的数据类型. 新的元素通过"入队"的方式添加进队列的末尾, "出对"就是从

  • 详解Python的collections模块中的deque双端队列结构

    deque 是 double-ended queue的缩写,类似于 list,不过提供了在两端插入和删除的操作. appendleft 在列表左侧插入 popleft 弹出列表左侧的值 extendleft 在左侧扩展 例如: queue = deque() # append values to wait for processing queue.appendleft("first") queue.appendleft("second") queue.appendl

  • Python中线程的MQ消息队列实现以及消息队列的优点解析

    "消息队列"是在消息的传输过程中保存消息的容器.消息队列管理器在将消息从它的源中继到它的目标时充当中间人.队列的主要目的是提供路由并保证消息的传递:如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它.相信对任何架构或应用来说,消息队列都是一个至关重要的组件,下面是十个理由: Python的消息队列示例: 1.threading+Queue实现线程队列 #!/usr/bin/env python import Queue import threading import

  • Python队列、进程间通信、线程案例

    进程互斥锁 多进程同时抢购余票 # 并发运行,效率高,但竞争写同一文件,数据写入错乱 # data.json文件内容为 {"ticket_num": 1} import json import time from multiprocessing import Process def search(user): with open('data.json', 'r', encoding='utf-8') as f: dic = json.load(f) print(f'用户{user}查看

  • python队列queue模块详解

    队列queue 多应用在多线程应用中,多线程访问共享变量.对于多线程而言,访问共享变量时,队列queue是线程安全的.从queue队列的具体实现中,可以看出queue使用了1个线程互斥锁(pthread.Lock()),以及3个条件标量(pthread.condition()),来保证了线程安全. queue队列的互斥锁和条件变量,可以参考另一篇文章:python线程中同步锁 queue的用法如下: import Queque a=[1,2,3] device_que=Queque.queue(

  • Python数据结构之优先级队列queue用法详解

    一.基本用法 Queue类实现了一个基本的先进先出容器.使用put()将元素增加到这个序列的一端,使用get()从另一端删除.具体代码如下所示: import queue q = queue.Queue() for i in range(1, 10): q.put(i) while not q.empty(): print(q.get(), end=" ") 运行之后,效果如下: 这里我们依次添加1到10到队列中,因为先进先出,所以出来的顺序也与添加的顺序相同. 二.LIFO队列 既然

  • python中random模块详解

    Python中的random模块用于生成随机数,它提供了很多函数.常用函数总结如下: 1. random.random() 用于生成一个0到1的随机浮点数: 0 <= n < 1.0 2. random.seed(n) 用于设定种子值,其中的n可以是任意数字.random.random() 生成随机数时,每一次生成的数都是随机的.但是,使用 random.seed(n) 设定好种子之后,在先调用seed(n)时,使用 random() 生成的随机数将会是同一个. 3. random.unifo

  • python爬虫selenium模块详解

    selenium模块 selenium基本概念 selenium优势 便捷的获取网站中动态加载的数据 便捷实现模拟登陆 selenium使用流程: 1.环境安装:pip install selenium 2.下载一个浏览器的驱动程序(谷歌浏览器) 3.实例化一个浏览器对象 基本使用 代码 from selenium import webdriver from lxml import etree from time import sleep if __name__ == '__main__': b

  • java实现队列queue数据结构详解

    目录 概念 队列中两个主要操作 队列遵循以下条件: 队列的数组实现 总结 概念 队列是一种非原始(特殊)的线性表,是一种先进先出(FIFO)的数据结构.它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作. FIFO:first input first output,即先添加的元素,先移除,最后添加的元素,最后移除. 工作方式类似于商场排队结账情形: 数组模拟队列图示: 队列中两个主要操作 插入值操作:insert ——> enqueue(入队) ——>参数是要插

  • Python中BeautifulSoup模块详解

    目录 前言 安装库 导入库 解析文档示例 提取数据示例 CSS选择器 实例小项目 总结 前言 BeautifulSoup是主要以解析web网页的Python模块,它会提供一些强大的解释器,以解析网页,然后提供一些函数,从页面中提取所需要的数据,目前是Python爬虫中最常用的模块之一. 安装库 在使用前需要安装库,这里建议安装bs4,也就是第四版本,因为根据官方文档第三版的已经停止更新.同时安装lxml解释器 pip3 install bs4 pip3 install lxml 导入库 from

  • Python正则表达式re模块详解(建议收藏!)

    目录 前言 match 匹配字符串 单字符匹配 . 匹配任意一个字符 \d 匹配数字 \D 匹配非数字 \S 匹配非空白 \w 匹配单词.字符,如大小写字母,数字,_ 下划线 \W 匹配非单词字符 [ ] 匹配[ ]中列举的字符 表示数量 * 出现0次或无数次 + 至少出现一次 ? 1次或则0次 {m,} 至少出现m次 匹配边界 $ 匹配结尾字符 ^ 匹配开头字符 \b 匹配一个单词的边界 \B 匹配非单词边界 匹配分组 | 匹配左右任意一个表达式 (ab) 将括号中字符作为一个分组 searc

  • Python Queue模块详解

    Python中,队列是线程间最常用的交换数据的形式.Queue模块是提供队列操作的模块,虽然简单易用,但是不小心的话,还是会出现一些意外. 创建一个"队列"对象 import Queue q = Queue.Queue(maxsize = 10) Queue.Queue类即是一个队列的同步实现.队列长度可为无限或者有限.可通过Queue的构造函数的可选参数maxsize来设定队列长度.如果maxsize小于1就表示队列长度无限. 将一个值放入队列中 q.put(10) 调用队列对象的p

  • JS中的算法与数据结构之队列(Queue)实例详解

    本文实例讲述了JS中的算法与数据结构之队列(Queue).分享给大家供大家参考,具体如下: 队列(Queue) 我们之前说到了栈,它是一种比较高效的数据结构,遵循 先入后出(LIFO,last-in-first-out) 的原则.而今天我们要讨论的队列,它也是一种特殊的列表,它与栈不同的是, 队列只能在队尾插入元素,在队首删除元素,就像我们平时排队买票一样~ 队列用于存储按顺序排列的数据,遵循 先进先出(FIFO,First-In-First-Out) 的原则,也是计算机常用的一种数据结构,别用

随机推荐