Java中使用阻塞队列控制线程集实例

队列以一种先进先出的方式管理数据。如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞。在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期的把中间结果存到阻塞队列中。而其他工作者线程把中间结果取出并在将来修改它们。队列会自动平衡负载。如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞。如果第一个线程集运行的快,那么它将等待第二个线程集赶上来。

下面的程序展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。

java.util.concurrent包提供了阻塞队列的4个变种:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我们用的是ArrayBlockingQueue。ArrayBlockingQueue在构造时需要给定容量,并可以选择是否需要公平性。如果公平参数被设置了,等待时间最长的线程会优先得到处理。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。

生产者线程枚举在所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有设上限的话,很快它就包含了没有找到的文件。

我们同时还启动了大量的搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印出包含关键字的所有行,然后取出下一个文件。我们使用了一个小技巧来在工作结束后终止线程。为了发出完成信号,枚举线程把一个虚拟对象放入队列。(这类似于在行李输送带上放一个写着“最后一个包”的虚拟包。)当搜索线程取到这个虚拟对象时,就将其放回并终止。

注意,这里不需要人任何显示的线程同步。在这个程序中,我们使用队列数据结构作为一种同步机制。

代码如下:

import java.io.*; 
import java.util.*; 
import java.util.concurrent.*;

public class BlockingQueueTest 

   public static void main(String[] args) 
   { 
      Scanner in = new Scanner(System.in); 
      System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): "); 
      String directory = in.nextLine(); 
      System.out.print("Enter keyword (e.g. volatile): "); 
      String keyword = in.nextLine();

final int FILE_QUEUE_SIZE = 10; 
      final int SEARCH_THREADS = 100;

BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);

FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory)); 
      new Thread(enumerator).start(); 
      for (int i = 1; i <= SEARCH_THREADS; i++) 
         new Thread(new SearchTask(queue, keyword)).start(); 
   } 
}

/** 
 * This task enumerates all files in a directory and its subdirectories. 
 */
class FileEnumerationTask implements Runnable 

   /** 
    * Constructs a FileEnumerationTask. 
    * @param queue the blocking queue to which the enumerated files are added 
    * @param startingDirectory the directory in which to start the enumeration 
    */
   public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory) 
   { 
      this.queue = queue; 
      this.startingDirectory = startingDirectory; 
   }

public void run() 
   { 
      try
      { 
         enumerate(startingDirectory); 
         queue.put(DUMMY); 
      } 
      catch (InterruptedException e) 
      { 
      } 
   }

/** 
    * Recursively enumerates all files in a given directory and its subdirectories 
    * @param directory the directory in which to start 
    */
   public void enumerate(File directory) throws InterruptedException 
   { 
      File[] files = directory.listFiles(); 
      for (File file : files) 
      { 
         if (file.isDirectory()) enumerate(file); 
         else queue.put(file); 
      } 
   }

public static File DUMMY = new File("");

private BlockingQueue<File> queue; 
   private File startingDirectory; 
}

/** 
 * This task searches files for a given keyword. 
 */
class SearchTask implements Runnable 

   /** 
    * Constructs a SearchTask. 
    * @param queue the queue from which to take files 
    * @param keyword the keyword to look for 
    */
   public SearchTask(BlockingQueue<File> queue, String keyword) 
   { 
      this.queue = queue; 
      this.keyword = keyword; 
   }

public void run() 
   { 
      try
      { 
         boolean done = false; 
         while (!done) 
         { 
            File file = queue.take(); 
            if (file == FileEnumerationTask.DUMMY) 
            { 
               queue.put(file); 
               done = true; 
            } 
            else search(file);             
         } 
      } 
      catch (IOException e) 
      { 
         e.printStackTrace(); 
      } 
      catch (InterruptedException e) 
      { 
      }       
   }

/** 
    * Searches a file for a given keyword and prints all matching lines. 
    * @param file the file to search 
    */
   public void search(File file) throws IOException 
   { 
      Scanner in = new Scanner(new FileInputStream(file)); 
      int lineNumber = 0; 
      while (in.hasNextLine()) 
      { 
         lineNumber++; 
         String line = in.nextLine().trim(); 
         if (line.contains(keyword)) System.out.printf("%s:%d    %s%n", file.getPath(), lineNumber, line); 
      } 
      in.close(); 
   }

private BlockingQueue<File> queue; 
   private String keyword; 
}

(0)

相关推荐

  • 深入理解Java线程编程中的阻塞队列容器

    1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Q

  • Java 阻塞队列详解及简单使用

     Java 阻塞队列详解 概要: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 认识BlockingQueue阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 从上图我们可以很清楚看到,通过一个共享的队列,

  • 详解Java阻塞队列(BlockingQueue)的实现原理

    阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满:从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空.并发包下很多高级同步类的实现都是基于BlockingQueue实现的. BlockingQueue 的操作方法 BlockingQueue 具有 4 组不同的方法用于插入.移除以及对队列中的元素进行检查.如果

  • java 中 阻塞队列BlockingQueue详解及实例

    java 中 阻塞队列BlockingQueue详解及实例 BlockingQueue很好的解决了多线程中数据的传输,首先BlockingQueue是一个接口,它大致有四个实现类,这是一个很特殊的队列,如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒.同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作.

  • Java并发编程之阻塞队列详解

    1.什么是阻塞队列? 队列是一种数据结构,它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素.阻塞队里与普通的队列的区别在于,普通队列不会对当前线程产生阻塞,在面对类似消费者-生产者模型时,就必须额外的实现同步策略以及线程间唤醒策略.使用阻塞队列,就会对当前线程产生阻塞,当队列是空时,从队列中获取元素的操作将会被阻塞,当队列是满时,往队列里添加元素的操作也会被阻塞. 2.主要的阻塞队列及其方法 java.util.concurrent包下提供主要的几种阻塞队列,主要有以下几个: 1

  • 剖析Java中阻塞队列的实现原理及应用场景

    我们平时使用的一些常见队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦.但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素.当队列中有元素后,被阻塞的线程会自动

  • Java中的阻塞队列详细介绍

    Java中的阻塞队列 1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是: 在队列为空时,获取元素的线程会等待队列变为非空. 当队列满时,存储元素的线程会等待队列可用. 阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 2.Java里的阻塞队列 JDK中提供了七个阻塞队列: ArrayBlockingQueue :一个由数组结

  • Java中使用阻塞队列控制线程集实例

    队列以一种先进先出的方式管理数据.如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞.在多线程进行合作时,阻塞队列是很有用的工具.工作者线程可以定期的把中间结果存到阻塞队列中.而其他工作者线程把中间结果取出并在将来修改它们.队列会自动平衡负载.如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞.如果第一个线程集运行的快,那么它将等待第二个线程集赶上来. 下面的程序展示了如何使用阻塞队列来控制线程集.程序在一个目录及它的所有子目

  • Java使用阻塞队列控制线程通信的方法实例详解

    本文实例讲述了Java使用阻塞队列控制线程通信的方法.分享给大家供大家参考,具体如下: 一 点睛 阻塞队列主要用在生产者/消费者的场景,下面这幅图展示了一个线程生产.一个线程消费的场景: 负责生产的线程不断的制造新对象并插入到阻塞队列中,直到达到这个队列的上限值.队列达到上限值之后生产线程将会被阻塞,直到消费的线程对这个队列进行消费.同理,负责消费的线程不断的从队列中消费对象,直到这个队列为空,当队列为空时,消费线程将会被阻塞,除非队列中有新的对象被插入. BlockingQueue的核心方法:

  • 详解java中的阻塞队列

    阻塞队列简介 阻塞队列(BlockingQueue)首先是一个支持先进先出的队列,与普通的队列完全相同: 其次是一个支持阻塞操作的队列,即: 当队列满时,会阻塞执行插入操作的线程,直到队列不满. 当队列为空时,会阻塞执行获取操作的线程,直到队列不为空. 阻塞队列用在多线程的场景下,因此阻塞队列使用了锁机制来保证同步,这里使用的可重入锁: 而对于阻塞与唤醒机制则有与锁绑定的Condition实现 应用场景:生产者消费者模式 java中的阻塞队列 java中的阻塞队列根据容量可以分为有界队列和无界队

  • Java中常用阻塞队列的问题小结

    Java常用阻塞队列 ArrayBlockingQueue 内部由一个固定长度的数组来实现阻塞队列 /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; public ArrayBlockingQue

  • iOS应用程序中通过dispatch队列控制线程执行的方法

    GCD编程的核心就是dispatch队列,dispatch block的执行最终都会放进某个队列中去进行,它类似NSOperationQueue但更复杂也更强大,并且可以嵌套使用.所以说,结合block实现的GCD,把函数闭包(Closure)的特性发挥得淋漓尽致. dispatch队列的生成可以有这几种方式: 1. dispatch_queue_t queue = dispatch_queue_create("com.dispatch.serial", DISPATCH_QUEUE_

  • 详解Java中的延时队列 DelayQueue

    当用户超时未支付时,给用户发提醒消息.另一种场景是,超时未付款,订单自动取消.通常,订单创建的时候可以向延迟队列种插入一条消息,到时间自动执行.其实,也可以用临时表,把这些未支付的订单放到一个临时表中,或者Redis,然后定时任务去扫描.这里我们用延时队列来做.RocketMQ有延时队列,RibbitMQ也可以实现,Java自带的也有延时队列,接下来就回顾一下各种队列. Queue 队列是一种集合.除了基本的集合操作以外,队列还提供了额外的插入.提取和检查操作.队列的每个方法都以两种形式存在:一

  • java阻塞队列实现原理及实例解析

    这篇文章主要介绍了java阻塞队列实现原理及实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 阻塞队列与普通队列的不同在于.当队列是空的时候,从队列中获取元素的操作将会被阻塞,或者当队列满时,往队列里面添加元素将会被阻塞.试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素.同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完

  • java中常见的6种线程池示例详解

    之前我们介绍了线程池的四种拒绝策略,了解了线程池参数的含义,那么今天我们来聊聊Java 中常见的几种线程池,以及在jdk7 加入的 ForkJoin 新型线程池 首先我们列出Java 中的六种线程池如下 线程池名称 描述 FixedThreadPool 核心线程数与最大线程数相同 SingleThreadExecutor 一个线程的线程池 CachedThreadPool 核心线程为0,最大线程数为Integer. MAX_VALUE ScheduledThreadPool 指定核心线程数的定时

随机推荐