java中并发Queue种类与各自API特点以及使用场景说明

一 先说下队列

队列是一种数据结构.它有两个基本操作:在队列尾部加入一个元素,和从队列头部移除一个元素(注意不要弄混队列的头部和尾部)

就是说,队列以一种先进先出的方式管理数据,如果你试图向一个 已经满了的阻塞队列中添加一个元素或者是从一个空的阻塞队列中移除一个元索,将导致线程阻塞.

在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期地把中间结果存到阻塞队列中而其他工作者线程把中间结果取出并在将来修改它们。队列会自动平衡负载。

如果第一个线程集运行得比第二个慢,则第二个 线程集在等待结果时就会阻塞。如果第一个线程集运行得快,那么它将等待第二个线程集赶上来.

说白了,就是先进先出,线程安全!

java中并发队列都是在java.util.concurrent并发包下的,Queue接口与List、Set同一级别,都是继承了Collection接口,最近学习了java中的并发Queue的所有子类应用场景,这里记录分享一下:

1.1 这里可以先用wait与notify(脑忒fai) 模拟一下队列的增删数据,简单了解一下队列:

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 模拟队列增删数据
 * @author houzheng
 */
public class MyQueue {
    //元素集合
    private LinkedList<Object> list=new LinkedList<Object>();
    //计数器(同步),判断集合元素数量
    private AtomicInteger count=new AtomicInteger();
    //集合上限与下限,final必须指定初值
    private final int minSize=0;
    private final int maxSize;
    //构造器指定最大值
    public MyQueue(int maxSize) {
        this.maxSize = maxSize;
    }

    //初始化对象,用于加锁,也可直接用this
    private Object lock=new Object();
    //put方法:往集合中添加元素,如果集合元素已满,则此线程阻塞,直到有空间再继续
    public void put(Object obj){
        synchronized (lock) {
            while(count.get()==this.maxSize){
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();}
            }
            list.add(obj);
            //计数器加一
            count.incrementAndGet();
            System.out.println("放入元素:"+obj);
            //唤醒另一个线程,(处理极端情况:集合一开始就是空,此时take线程会一直等待)
            lock.notify();
        }
    }
    //take方法:从元素中取数据,如果集合为空,则线程阻塞,直到集合不为空再继续
    public Object take(){
        Object result=null;
        synchronized(lock){
            while(count.get()==this.minSize){
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();}
            }
            //移除第一个
            result=list.removeFirst();
            //计数器减一
            count.decrementAndGet();
            System.out.println("拿走元素:"+result);
            //唤醒另一个线程,(处理极端情况:集合一开始就是满的,此时put线程会一直等待)
            lock.notify();
        }
        return result;
    }
    public int getSize(){
        return this.count.get();
    }
    public static void main(String[] args) {
        //创建集合容器
        MyQueue queue=new MyQueue(5);
        queue.put("1");
        queue.put("2");
        queue.put("3");
        queue.put("4");
        queue.put("5");
        System.out.println("当前容器长度为:"+queue.getSize());
        Thread t1=new Thread(()->{
            queue.put("6");
            queue.put("7");
        },"t1");
        Thread t2=new Thread(()->{
            Object take1 = queue.take();
            Object take2 = queue.take();
        },"t2");
        //测试极端情况,两秒钟后再执行另一个线程
        t1.start();
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t2.start();
    }
}

这里用线程通信的方式简单模拟了队列的进出,那么接下来就正式进入java的并发队列:

二 并发Queue

JDK中并发队列提供了两种实现,一种是高性能队列ConcurrentLinkedQueue,一种是阻塞队列BlockingQueue,两种都继承自Queue:

1ConcurrentLinkedQueue

这是一个使用于高并发场景的队列(额,各位看这块博客的小朋友,最好对线程基础比较熟悉再来看,当然我也在拼命学习啦,哈哈哈),主要是无锁的方式,他的想能要比BlockingQueue好

是基于链接节点的无界线程安全队列,先进先出,不允许有null元素,废话不多说,上demo:

这种queue比较简单,没什么好说的,和ArrayList一样用就可以,关键是BlockingQUeue

2BlockingQueue

blockingQueue主要有5中实现,我感觉都挺有意思的,其中几种还比较常用就都学习了下,这里都介绍下:

2.1ArrayBlockingQueue

@Test
public void test02() throws Exception{
    //必须指定队列长度
    ArrayBlockingQueue<String> abq=new ArrayBlockingQueue<String>(2);
    abq.add("a");
    //add :添加元素,如果BlockingQueue可以容纳,则返回true,否则抛异常,支持添加集合
    System.out.println(abq.offer("b"));//容量如果不够,返回false
    //offer: 如果可能的话,添加元素,即如果BlockingQueue可以容纳,则返回true,否则返回false,支持设置超时时间
    //设置超时,如果超过时间就不添加,返回false,
    abq.offer("d", 2, TimeUnit.SECONDS);// 添加的元素,时长,单位
    //put 添加元素,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
    abq.put("d");//会一直等待
    //poll 取走头部元素,若不能立即取出,则可以等time参数规定的时间,取不到时返回null,支持设置超时时间
    abq.poll();
    abq.poll(2,TimeUnit.SECONDS);//两秒取不到返回null
    //take()  取走头部元素,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
    abq.take();
    //取出头部元素,但不删除
    abq.element();
    //drainTo()
    //一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
    List list=new ArrayList();
    abq.drainTo(list,2);//将队列中两个元素取到list中,取走后队列中就没有取走的元素
    System.out.println(list); //[a,b]
    System.out.println(abq);  //[]
}

2.2 LinkedBlockingQueue

@Test
public void test03(){
    LinkedBlockingQueue lbq=new LinkedBlockingQueue();//可指定容量,也可不指定
    lbq.add("a");
    lbq.add("b");
    lbq.add("c");
    //API与ArrayBlockingQueue相同
    //是否包含
    System.out.println(lbq.contains("a"));
    //移除头部元素或者指定元素  remove("a")
    System.out.println(lbq.remove());
    //转数组
    Object[] array = lbq.toArray();
    //element 取出头部元素,但不删除
    System.out.println(lbq.element());
    System.out.println(lbq.element());
    System.out.println(lbq.element());
}

2.3 SynchronousQueue

public static void main(String[] args) {
    SynchronousQueue<String> sq=new SynchronousQueue<String>();
    // iterator() 永远返回空,因为里面没东西。
    // peek() 永远返回null
    /**
     * isEmpty()永远是true。
     * remainingCapacity() 永远是0。
     * remove()和removeAll() 永远是false。
     */
    new Thread(()->{
        try {
            //取出并且remove掉queue里的element(认为是在queue里的。。。),取不到东西他会一直等。
            System.out.println(sq.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }).start();
    new Thread(()->{
        try {
            //offer() 往queue里放一个element后立即返回,
            //如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false
            //true ,上面take线程一直在等,
            ////下面刚offer进去就被拿走了,返回true,如果offer线程先执行,则返回false
            System.out.println(sq.offer("b"));

        } catch (Exception e) {
            e.printStackTrace();
        }

    }).start();
    new Thread(()->{
        try {
            //往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走
            sq.put("a");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }).start();
}

2.4 PriorityBlockingQueue

@Test
public void test04() throws Exception{
    //队列里元素必须实现Comparable接口,用来决定优先级
    PriorityBlockingQueue<String> pbq=new PriorityBlockingQueue<String>();
    pbq.add("b");
    pbq.add("g");
    pbq.add("a");
    pbq.add("c");
    //获取的时候会根据优先级取元素,插入的时候不会排序,节省性能
    //System.out.println(pbq.take());//a,获取时会排序,按优先级获取
    System.out.println(pbq.toString());//如果前面没有取值,直接syso也不会排序
    Iterator<String> iterator = pbq.iterator();
    while(iterator.hasNext()){
        System.out.println(iterator.next());
    }
}
@Test
public void test05(){
    PriorityBlockingQueue<Person> pbq=new PriorityBlockingQueue<Person>();
    Person p2=new Person("姚振",20);
    Person p1=new Person("侯征",24);
    Person p3=new Person("何毅",18);
    Person p4=new Person("李世彪",22);
    pbq.add(p1);
    pbq.add(p2);
    pbq.add(p3);
    pbq.add(p4);
    System.out.println(pbq);//没有按优先级排序
    try {
        //只要take获取元素就会按照优先级排序,获取一次就全部排好序了,后面就会按优先级迭代
        pbq.take();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    //按年龄排好了序
    for (Iterator iterator = pbq.iterator(); iterator.hasNext();) {
        Person person = (Person) iterator.next();
        System.out.println(person);
    }
}

2.5 最后说一下DelayQueue ,这里用个网上很经典的例子,网吧上网计时

网民实体queue中元素

//网民
public class Netizen implements Delayed {
    //身份证
private String ID;
//名字
private String name;
//上网截止时间
private long playTime;
//比较优先级,时间最短的优先
@Override
public int compareTo(Delayed o) {
    Netizen netizen=(Netizen) o;
    return this.getDelay(TimeUnit.SECONDS)-o.getDelay(TimeUnit.SECONDS)>0?1:0;
}
public Netizen(String iD, String name, long playTime) {
    ID = iD;
    this.name = name;
    this.playTime = playTime;
}
//获取上网时长,即延时时长
@Override
public long getDelay(TimeUnit unit) {
    //上网截止时间减去现在当前时间=时长
    return this.playTime-System.currentTimeMillis();
}

网吧类:

//网吧
public class InternetBar implements Runnable {
    //网民队列,使用延时队列
private DelayQueue<Netizen> dq=new DelayQueue<Netizen>();
//上网
public void startPlay(String id,String name,Integer money){
    //截止时间= 钱数*时间+当前时间(1块钱1秒)
    Netizen netizen=new Netizen(id,name,1000*money+System.currentTimeMillis());
    System.out.println(name+"开始上网计费......");
    dq.add(netizen);
}
//时间到下机
public void endTime(Netizen netizen){
    System.out.println(netizen.getName()+"余额用完,下机");
}
@Override
public void run() {
    //线程,监控每个网民上网时长
    while(true){
        try {
            //除非时间到.否则会一直等待,直到取出这个元素为止
            Netizen netizen=dq.take();
            endTime(netizen);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public static void main(String[] args) {
    //新建一个网吧
    InternetBar internetBar=new InternetBar();
    //来了三个网民上网
    internetBar.startPlay("001","侯征",3);
    internetBar.startPlay("002","姚振",7);
    internetBar.startPlay("003","何毅",5);
        Thread t1=new Thread(internetBar);
        t1.start();
    }
}

这样就可以完美实现业务需求了

结果

,

这块东西比较深,还需要不断加强学习实践才行!!

Java中的Queue和自定义堆栈

Queue:单向

- 队列通常 FIFO (先进先出)

- 优先级队列和堆栈 LIFO (后进先出)

package com.bjsxt.others.que;
import java.util.ArrayDeque;
import java.util.Queue;
/**
 * 使用队列模拟银行存款业务
 * @author Administrator
 *
 */
public class Demo01 {

 /**
  * @param args
  */
 public static void main(String[] args) {
  Queue<Request> que =new ArrayDeque<Request>();
  //模拟排队情况
  for(int i=0;i<10;i++){
   final int num =i;
   que.offer(new Request(){
//应用匿名内部类对象只能访问 final 修饰的变量
    @Override
    public void deposit() {
     System.out.println("第"+num+"个人,办理存款业务,存款额度为:"+(Math.random()*10000));
    }

   });
  }
  dealWith(que);
 }
 //处理业务
 public static void dealWith(Queue<Request> que){
  Request req =null;
  while(null!=(req=que.poll())){
   req.deposit();
  }
 }
}
interface Request{
 //存款
 void deposit();
}

自定义堆栈

package com.bjsxt.others.que;
import java.util.ArrayDeque;
import java.util.Deque;
/**
 * 使用队列实现自定义堆栈
 * 1、弹
 * 2、压
 * 3、获取头
 * @author Administrator
 *
 * @param <E>
 */
public class MyStack<E> {
 //容器
 private Deque<E> container =new ArrayDeque<E>();
 //容量
 private int cap;
 public MyStack(int cap) {
  super();
  this.cap = cap;
 }

 //压栈
 public boolean push(E e){
  if(container.size()+1>cap){
   return false;
  }
  return container.offerLast(e);
 }
 //弹栈
 public E pop(){
  return container.pollLast();
 }
 //获取
 public E peek(){
  return container.peekLast();
 }

 public int size(){
  return this.container.size();
 }
}

package com.bjsxt.others.que;
//测试自定义堆栈
public class Demo02 {
 /**
  * @param args
  */
 public static void main(String[] args) {
  MyStack<String> backHistory =new MyStack<String>(3);
  backHistory.push("www.baidu.com");
  backHistory.push("www.google.com");
  backHistory.push("www.sina.com");
  backHistory.push("www.bjsxt.cn");
  System.out.println("大小:"+backHistory.size());

  //遍历
  String item=null;
  while(null!=(item=backHistory.pop())){
   System.out.println(item);
  }
 }
}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Java 通过API操作GraphQL

    GraphQL可以通过Java的API来实现数据的查询,通过特定的SDL查询语句,获取特定的查询数据.相当于后端作为提供数据源的"数据库",前端根据定义的SDL语句查询需要的数据,将查询数据的控制权交给前端,提高后端接口的通用性和灵活性 引入依赖 <dependency> <groupId>com.graphql-java</groupId> <artifactId>graphql-java</artifactId> <

  • Java8新特性之Stream API详解

    一.前言 StreamAPI在Java8版本中使用,关注的是对数据的筛选.查找.存储等 它可以做的事情有:过滤.排序.映射.归约 二.使用流程 Stream实例化中间操作(过滤.排序.映射.规约)终止操作(匹配查找.归约.收集) 三.案例演示 public class EmployeeData { public static List<Employee> getEmployees(){ List<Employee> list = new ArrayList<>(); l

  • Java之api网关断言及过滤器案例讲解

    目录 一.什么是api网关? 二.常见的api网关 三.使用步骤 1.Spring Cloud Gateway 2.优缺点 3.传统的过滤器 4.使用gateway 4.1module 4.2添加pom依赖 4.3yaml配置 4.4主程序开启注解@EnableDiscoveryClient 四.执行流程 五.断言 5.1: 自定义断言 5.2: 过滤器 一.什么是api网关? 所谓的API网关,就是指后台系统的统一入口,它封装了应用程序的内部结构,为客户端提供统一 路由服务,一些与业务本身功能

  • Java环境下高德地图Api的使用方式

    Java高德地图Api的使用 使用高德经纬度获取地址信息 一些准备用到的常量 /** * 高德地图请求秘钥 */ private static final String KEY = "密钥,可以去高德地图免费申请"; /** * 返回值类型 */ private static final String OUTPUT = "JSON"; /** * 根据地名获取高德经纬度Api */ private static final String GET_LNG_LAT_UR

  • Java使用elasticsearch基础API使用案例讲解

    1.依赖 我用的是 springboot 2.2.5.RELEASE 版本,这里只贴出主要依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency> 2.配置+测试源码 这里根据elasticsearch服务端的地址进行

  • Java通过百度API实现图片车牌号识别

    本代码功能是通过调用百度API实现的,所有你需要去百度API官网申请下你的API Key 以及Secret Key才能使用它的功能哦! 拟采用百度AI实现该功能(http://ai.baidu.com/docs#/OCR-API/5116ac95) 根据百度的文档描述,初步明确需要的几个参数为: 1.应用的API Key 2.应用的Secret Key 3.access_token 4.图片数据 首先导入maven依赖 <dependency> <groupId>com.baidu

  • elasticSearch-api的具体操作步骤讲解

    使用步骤 1.环境准备 用的是windows版,自行下载 链接: 下载地址 2.针对索引操作 这里是kibana上操作的(也可以用postman操作): #创建索引,指定文档id PUT /test1/type1/1 { "name":"张三", "age":30 } #创建索引规则(类似数据库建表) PUT /test2 { "mappings": { "properties": { "name

  • Java API如何实现向Hive批量导入数据

    Java API实现向Hive批量导入数据 Java程序中产生的数据,如果导入oracle或者mysql库,可以通过jdbc连接insert批量操作完成,但是当前版本的hive并不支持批量insert操作,因为需要先将结果数据写入hdfs文件,然后插入Hive表中. package com.enn.idcard; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; impor

  • java中并发Queue种类与各自API特点以及使用场景说明

    一 先说下队列 队列是一种数据结构.它有两个基本操作:在队列尾部加入一个元素,和从队列头部移除一个元素(注意不要弄混队列的头部和尾部) 就是说,队列以一种先进先出的方式管理数据,如果你试图向一个 已经满了的阻塞队列中添加一个元素或者是从一个空的阻塞队列中移除一个元索,将导致线程阻塞. 在多线程进行合作时,阻塞队列是很有用的工具.工作者线程可以定期地把中间结果存到阻塞队列中而其他工作者线程把中间结果取出并在将来修改它们.队列会自动平衡负载. 如果第一个线程集运行得比第二个慢,则第二个 线程集在等待

  • Java中队列Queue和Deque的区别与代码实例

    目录 一.Queue和Deque 二.api对比 三.代码实例 1.queue 2.deque 总结 一.Queue和Deque Queue以及Deque都是继承于Collection,Deque是Queue的子接口. Queue是FIFO的单向队列,Deque是双向队列. Queue有一个直接子类PriorityQueue,而Deque中直接子类有两个:LinkedList以及ArrayDeque. PriorityQueue的底层数据结构是数组,而无边界的形容,那么指明了PriorityQu

  • 浅谈Java中的Queue家族

    Queue接口 先看下Queue的继承关系和其中定义的方法: Queue继承自Collection,Collection继承自Iterable. Queue有三类主要的方法,我们用个表格来看一下他们的区别: 方法类型 方法名称 方法名称 区别 Insert add offer 两个方法都表示向Queue中添加某个元素,不同之处在于添加失败的情况,add只会返回true,如果添加失败,会抛出异常.offer在添加失败的时候会返回false.所以对那些有固定长度的Queue,优先使用offer方法.

  • 浅谈Java中BIO、NIO和AIO的区别和应用场景

    最近一直在准备面试,为了使自己的Java水平更上一个档次,拜读了李林峰老师的<Netty权威指南>,了解了Java关于IO的发展和最新的技术,真是受益匪浅,现在把我总结的关于BIO.NIO和AIO的区别和应用场景概述一遍. 在此之前,先弄清几个概念: 1.同步:使用同步IO时,Java自己处理IO读写. 2.异步:使用异步IO时,Java将IO读写委托给OS处理,需要将数据缓冲区地址和大小传给OS,完成后OS通知Java处理(回调). 3.阻塞:使用阻塞IO时,Java调用会一直阻塞到读写完成

  • 详解Java中的锁Lock和synchronized

    一.Lock接口 1.Lock接口和synchronized内置锁 a)synchronized:Java提供的内置锁机制,Java中的每个对象都可以用作一个实现同步的锁(内置锁或者监视器Monitor),线程在进入同步代码块之前需要或者这把锁,在退出同步代码块会释放锁.而synchronized这种内置锁实际上是互斥的,即没把锁最多只能由一个线程持有. b)Lock接口:Lock接口提供了与synchronized相似的同步功能,和synchronized(隐式的获取和释放锁,主要体现在线程进

  • Java中notify和notifyAll的区别及何时使用

    提几个问题,从问题中去了解去学习: 他们之间有啥区别? 如果我使用notify(),将通知哪个线程? 我怎么知道有多少线程在等待,所以我可以使用notifyAll()? 如何调用notify()? 什么是这些线程等待被通知等? 我给点建议:建议使用jdk8里的lock包 java.util.concurrent.locks下的Condition 他可以支持唤醒指定的线程. 他只是一个接口 具体实现类是在AbstractQueuedSynchronizer 也就是AQS框架里的 你可以自己继承他

  • Java中四种引用详解

    目录 强引用 软引用 弱引用 虚引用 总结 java 中的 4 种引用方式,适用于不同的场景,重点需要理解虚引用,结合文字和代码 强引用 被强引用的对象,不会被垃圾回收器回收,JVM 宁愿抛出 OOM 也不会去回收被强引用的对象: M m = new M(); 软引用 当堆空间够用时,GC 不会对软引用的对象进行回收,当堆空间不足以分配新的空间时,触发 GC 就会对这部分对象进行回收,通常用在缓存等领域.将缓存对象使用软引用,空间不足的时候释放这部分空间,需要再次使用的时候,重新从 DB 中加载

  • Java中同步与并发用法分析

    本文较为详细的分析了Java中同步与并发的用法.分享给大家供大家参考.具体分析如下: 1.同步容器类包括两部分:vector和hashtable 另一类是同步包装类,由Collections.synchronizedXXX创建.同步容器对容器的所有状态进行串行访问,从而实现线程安全. 它们存在如下问题: a) 对于符合操作,需要额外的锁保护.比如迭代,缺少则添加等条件运算. b) toString,hashCode,equals都会间接的调用迭代,都需要注意并发.   2.java5.0中的并发

  • java中栈和队列的实现和API的用法(详解)

    在java中要实现栈和队列,需要用到java集合的相关知识,特别是Stack.LinkedList等相关集合类型. 一.栈的实现 栈的实现,有两个方法:一个是用java本身的集合类型Stack类型:另一个是借用LinkedList来间接实现Stack. 1.Stack实现 直接用Stack来实现非常方便,常用的api函数如下: boolean        isEmpty() // 判断当前栈是否为空 synchronized E        peek() //获得当前栈顶元素 synchro

  • 了解java中的Clojure如何抽象并发性和共享状态

    前言 在所有 Java 下一代语言中,Clojure 拥有最激进的并发性机制和功能.Groovy 和 Scala 都为并发性提供了改善的抽象和语法糖的一种组合,而 Clojure 坚持了它始终在 JVM 上提供独一无二的行为的强硬立场.在本期 Java 下一代 中,我将介绍 Clojure 中众多并发性选项的一部分.首先是为 Clojure 中易变的引用提供支撑的基础抽象:epochal 时间模型. Epochal 事件模型 或许 Clojure 与其他语言最显著的区别与易变的状态和值 密切相关

随机推荐