深入多线程之:深入生产者、消费者队列分析

上次我们使用AutoResetEvent实现了一个生产/消费者队列。这一次我们要使用Wait和Pulse方法来实现一个更强大的版本,它允许多个消费者,每一个消费者都在自己的线程中运行。

我们使用数组来跟踪线程。

Thread[] _workers;

通过跟踪线程可以让我们在所有的线程都结束后再结束我们的队列任务。

每一个消费者线程都执行一个叫做Consume的方法,在一个for循环中,我们可以创建和启动线程。例如:


代码如下:

public PCQueue(int workerCount)
        {
            _workers = new Thread[workerCount];
            for (int i = 0; i < workerCount; i++)
                (_workers[i] = new Thread(Consume)).Start();
        }

上次我们使用的是一个字符串来代表任务,这次我们使用Action委托,它的定义如下:

Public delegate void Action();

为了表示一系列的任务,我们使用Queue<T> 集合,例如:

Queue<Action> _itemQ = new Queue<Action>();

在我们调用生产(EnqueueItem)和消费(Consume)方法前,还是完整的看一看代码吧:


代码如下:

class PCQueue
    {
        readonly object _locker = new object();
        Thread[] _workers;
        Queue<Action> _itemQ = new Queue<Action>(); //保存任务的队列
        public PCQueue(int workerCount)
        {
            _workers = new Thread[workerCount];
            for (int i = 0; i < workerCount; i++)
                (_workers[i] = new Thread(Consume)).Start();
        }

public void Shutdown(bool waitForWorkers)
        {
           //为每一个线程插入一个null item,可以是每一个worker 退出
            foreach (Thread worker in _workers)
                EnqueueItem(null);

//等待所有的线程退出。
            if (waitForWorkers)
                foreach (Thread worker in _workers)
                    worker.Join();
        }

public void EnqueueItem(Action item)
        {
            lock (_locker)
            {
                _itemQ.Enqueue(item);
                Monitor.Pulse(_locker); //通知等待队列中的线程
            }
        }

void Consume()
        {
            while (true)
            {
                Action item;
                lock (_locker)
                {
                    while (_itemQ.Count == 0)
                    {
                        Monitor.Wait(_locker); //释放锁,并阻止当前线程,直到其他线程发送pulse信号。                    }
                    item = _itemQ.Dequeue();
                }

if (item == null) return; //退出的信号
                item();
            }
        }
    }

我们可以有一个退出策略,插入一个null item作为consumer退出的信号。如果我们想要快速的退出,可以使用一个独立的”cancel” 标记,因为我们支持多个consumers,所以我们必须为每一个consumer插入一个null item。

下面是Main方法。使用两个consumer线程,然后让这两个consumers执行10个委托。


代码如下:

public static void Main()
        {
            PCQueue q = new PCQueue(2);
            Console.WriteLine("Enqueuing 10 items...");

for (int i = 0; i < 10; i++)
            {
                int itemNumber = i;
                q.EnqueueItem(() =>
                    {
                        Thread.Sleep(1000); //模拟耗时的工作
                        Console.WriteLine(" Task " + itemNumber);
                    });
            }

q.Shutdown(true); //等待关闭
            Console.WriteLine();
            Console.WriteLine("Workers complete!");
        }

下面让我们细致的看一看EnqueueItem方法:


代码如下:

public void EnqueueItem(Action item)
        {
            lock (_locker)
            {
                _itemQ.Enqueue(item);
                Monitor.Pulse(_locker); //通知等待队列中的线程
            }
        }

因为我们的队列_itemQ被多线程环境使用,因此在对_itemQ进行读取的时候需要加锁lock.

因为我们插入了一个新的任务,我们必须修改阻塞条件,也就是调用pulse方法,来唤醒调用了wait方法的线程。

出于对效率的考虑,当插入一个Item的时候使用Pulse来代替PulseAll方法,因为大部分时候每一个Item只需要一个consumer来执行。如果你有一个冰淇淋,你不可能叫30个睡眠的孩子都起来吃它,同样,对于一个item,同时唤醒30个consumers一点好处都没有。

让我们再看看Consumer方法。

我们希望当没什么事情做的时候,线程阻塞就可以了,换句话说,队列中没有item的时候,线程就应该阻塞。因此我们的阻塞条件是_itemQ.Count ==0;


代码如下:

Action item;
                lock (_locker)
                {
                    while (_itemQ.Count == 0)
                    {
                        Monitor.Wait(_locker); //释放锁,并阻止当前线程,直到其他线程发送pulse信号。                    }
                    item = _itemQ.Dequeue();
                }

if (item == null) return; //退出的信号
                item();

while循环退出的时候也意味着_itemQ 至少有一个item。我们必须在释放锁之前调用你哦个dequeue方法来获取item,考虑下下面的代码:


代码如下:

lock (_locker)
                {
                    while (_itemQ.Count == 0)
                    {
                        Monitor.Wait(_locker); //释放锁,并阻止当前线程,直到其他线程发送pulse信号。                    }
                }
                //现在在这里可能被抢占,_itemQ可能被修改
                lock (_locker)
                {
                    item = _itemQ.Dequeue();
                }

在item被Dequeued后,我们就应该立即释放锁了,如果我们在执行task的时候,一直持有锁,则会没有必要的阻塞其他线程来获取任务。

Wait Timeouts

在调用Wait方法的时候可以传递一个毫秒或Timespan的时间来设置超时。如果Wait超时了,那么Wait方法就会返回false。

带有超时功能的Wait方法的主要步骤:

释放锁。
    阻塞 直到 pulsed 或者超时。
    重新获取锁。

超时就好像CLR 在超时到了的时候自动的调用了 pulse方法一样。

下面是使用超时的Wait的主要代码:

lock(_locker)

while(<阻塞条件>)

Monitor.Wait(_locker,<超时时间>);

Monitor.Wait 方法返回一个bool值来代表是调用了pulse还是已经超时了。

如果是true: 代表调用了pulse。

如果是false:代表超时了。

这对记录日志很有用。

(0)

相关推荐

  • 深入多线程之:深入生产者、消费者队列分析

    上次我们使用AutoResetEvent实现了一个生产/消费者队列.这一次我们要使用Wait和Pulse方法来实现一个更强大的版本,它允许多个消费者,每一个消费者都在自己的线程中运行. 我们使用数组来跟踪线程. Thread[] _workers; 通过跟踪线程可以让我们在所有的线程都结束后再结束我们的队列任务. 每一个消费者线程都执行一个叫做Consume的方法,在一个for循环中,我们可以创建和启动线程.例如: 复制代码 代码如下: public PCQueue(int workerCoun

  • ruby线程实现生产者消费者问题示例(队列Queue实现线程同步)

    Ruby线程实现经典的生产者消费者问题,用ruby中的Queue类实现线程同步问题. 复制代码 代码如下: require "thread"  puts "ProAndCon" queue = Queue.new    #用队列Queue实现线程同步 producer = Thread.new do      10.times do |i|          sleep rand(i) # 让线程睡眠一段时间          queue << i   

  • Java多线程之Disruptor入门

    一.Disruptor简介 Disruptor目前是世界上最快的单机消息队列,由英国外汇交易公司LMAX开发,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级).基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注.2011年,企业应用软件专家Martin Fowler专门撰写长文介绍.同年它还获得了Oracle官方的Duke大奖.目前,包括Apache Storm.Camel.Log4j 2在内的很多知名项

  • 浅谈生产者消费者模型(Linux系统下的两种实现方法)

    生产者消费者问题是同步问题中的一种常见情况,借用一下维基百科的话 生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例.该问题描述了两个共享固定大小缓冲区的线程--即所谓的"生产者"和"消费者"--在实际运行时会发生的问题.生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程.与此同时,消费者也在缓冲区消耗这些数据.该问题的关键就

  • java并发学习之BlockingQueue实现生产者消费者详解

    1.介绍 阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满:从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空.并发包下很多高级同步类的实现都是基于BlockingQueue实现的. JDK7提供了以下7个阻塞队列: ArrayBlockingQueue :由数组结构组成的有界阻塞队列. LinkedBloc

  • 生产者消费者模型ThreadLocal原理及实例详解

    1.生产者消费者模型作用和示例如下: 1)通过平衡生产者的生产能力和消费者的消费能力来提升整个系统的运行效率 ,这是生产者消费者模型最重要的作用 2)解耦,这是生产者消费者模型附带的作用,解耦意味着生产者和消费者之间的联系少,联系越少越可以独自发展而不需要收到相互的制约 备注:对于生产者消费者模型的理解将在并发队列BlockingQueue章节进行说明,本章不做详细介绍. package threadLearning.productCustomerModel; /* wait/notify 机制

  • golang 并发编程之生产者消费者详解

    golang 最吸引人的地方可能就是并发了,无论代码的编写上,还是性能上面,golang 都有绝对的优势 学习一个语言的并发特性,我喜欢实现一个生产者消费者模型,这个模型非常经典,适用于很多的并发场景,下面我通过这个模型,来简单介绍一下 golang 的并发编程 go 并发语法 协程 go 协程是 golang 并发的最小单元,类似于其他语言的线程,只不过线程的实现借助了操作系统的实现,每次线程的调度都是一次系统调用,需要从用户态切换到内核态,这是一项非常耗时的操作,因此一般的程序里面线程太多会

  • 理解生产者消费者模型及在Python编程中的运用实例

    什么是生产者消费者模型 在 工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产 生数据的模块,就形象地称为生产者:而处理数据的模块,就称为消费者.在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商 品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模型.结构图如下: 生产者消费者模型的优点: 1.解耦 假设生产者和消费者分别是两个类.如果让生产者直接调用消费者的某个方法,

  • 详解Python 模拟实现生产者消费者模式的实例

    详解Python 模拟实现生产者消费者模式的实例 散仙使用python3.4模拟实现的一个生产者与消费者的例子,用到的知识有线程,队列,循环等,源码如下: Python代码 import queue import time import threading import random q=queue.Queue(5) #生产者 def pr(): name=threading.current_thread().getName() print(name+"线程启动......") for

  • Queue 实现生产者消费者模型(实例讲解)

    Python中,队列是线程间最常用的交换数据的形式. Python Queue模块有三种队列及构造函数: 1.Python Queue模块的FIFO队列先进先出. class Queue.Queue(maxsize) 2.LIFO类似于堆,即先进后出. class Queue.LifoQueue(maxsize) 3.还有一种是优先级队列级别越低越先出来. class Queue.PriorityQueue(maxsize) 此包中的常用方法(q = Queue.Queue()): q.qsiz

随机推荐