Java常见的阻塞队列总结

Java阻塞队列

阻塞队列和普通队列主要区别在阻塞二字:

  • 阻塞添加:队列已满时,添加元素线程会阻塞,直到队列不满时才唤醒线程执行添加操作
  • 阻塞删除:队列元素为空时,删除元素线程会阻塞,直到队列不为空再执行删除操作

常见的阻塞队列有 LinkedBlockingQueue 和 ArrayBlockingQueue,其中它们都实现 BlockingQueue 接口,该接口定义了阻塞队列需实现的核心方法:

public interface BlockingQueue<E> extends Queue<E> {
	// 添加元素到队尾,成功返回true,队列满抛出异常 IllegalStateException
    boolean add(E e);
	// 添加元素到队尾,成功返回 true,队列满返回 false
    boolean offer(E e);
	// 阻塞添加
    void put(E e) throws InterruptedException;
	// 阻塞添加,包含最大等待时长
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
	// 阻塞移除队顶元素
    E take() throws InterruptedException;
	// 阻塞移除队顶元素,包含最大等待时长
    E poll(long timeout, TimeUnit unit) throws InterruptedException;
	// 返回可以添加到队列不阻塞的最大数量
    int remainingCapacity();
	// 如果存在元素则删除,成功返回 true,失败返回 false
    boolean remove(Object o);
	// 是否包含某元素
    public boolean contains(Object o);
    // 批量移除元素并添加入指定集合
    int drainTo(Collection<? super E> c);
	// 批量移除包含最大数量
    int drainTo(Collection<? super E> c, int maxElements);
}

除了上面的方法,还有三个继承自 Queue 接口的方法常常被用到:

// 获取队列头元素,不删除,没有抛出异常 NoSuchElementException
E element();
// 获取队列头元素,不删除,没有返回 null
E peek();
// 获取并移除队列头元素,没有返回 nul
E poll();

根据具体作用,方法可以被分为以下三类:

  • 添加元素类:add() 成功返回 true,失败抛异常、offer() 成功返回 true,失败返回 false,可以定义最大等待时长、put() 阻塞方法
  • 删除元素类:remove() 成功返回 true,失败返回 false、poll() 成功返回被移除元素,为空返回 null、take() 阻塞方法
  • 查询元素类:element() 成功返回元素,否则抛出异常、peek() 返回对应元素或 null

根据方法类型又可以分为阻塞和非阻塞,其中 put()、take() 是阻塞方法,带最大等待时长的 offer() 和 poll() 也是阻塞方法,其余都是非阻塞方法,阻塞队列基于上述方法实现

ArrayBlockingQueue 基于数组实现,满足队列先进先出特性,下面我们通过一段代码初步认识:

public class ArrayBlockingQueueTest {

    ArrayBlockingQueue<TestProduct> queue = new ArrayBlockingQueue<TestProduct>(1);

    public static void main(String[] args) {
        ArrayBlockingQueueTest test = new ArrayBlockingQueueTest();
        new Thread(test.new Product()).start();
        new Thread(test.new Customer()).start();
    }

    class Product implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    queue.put(new TestProduct());
                    System.out.println("生产者创建产品等待消费者消费");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Customer implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(1000);
                    queue.take();
                    System.out.println("消费者消费产品等待生产者创建");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class TestProduct {
    }

}

上述代码比较简单,在一个容量为1的阻塞队列中,生产者和消费者由于容量限制依次阻塞运行。

ArrayBlockingQueue 基于 ReentrantLock 锁和 Condition 等待队列实现,因此存在公平和非公平的两种模式。公平场景下所有被阻塞的线程按照阻塞顺序执行,非公平场景下,队列中的线程和恰好准备进入队列的线程竞争,谁抢到就是谁的。默认使用非公平锁,因为效率更高:

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

从代码可以看出,ArrayBlockingQueue 通过一个 ReentrantLock 锁以及两个 Condition 等待队列实现,它的属性如下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
	// 保存数据的数组
    final Object[] items;
	// 移除元素的索引
    int takeIndex;
	// 添加元素的索引
    int putIndex;
	// 元素数量
    int count;
	// 用于并发控制的锁
    final ReentrantLock lock;
	// 不为空,用于take()操作
    private final Condition notEmpty;
	// 不满,用于put()操作
    private final Condition notFull;
	// 迭代器
    transient Itrs itrs = null;
}

从代码可以看出,ArrayBlockingQueue 使用同一个锁、移除元素和添加元素通过数组下标的方式记录,分表表示队列头和队列尾。通过两个等待队列分别阻塞 take() 和 put() 方法,下面我们直接看源码:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
	// 检查是否为空
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    	// 判断队列是否已满
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    final Object[] items = this.items;
    // 赋值保存数据
    items[putIndex] = x;
    // 循环复用空间
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 唤醒take线程
    notEmpty.signal();
}

从代码可以看出:add() 方法基于 offer() 方法实现,offer() 方法添加失败返回 false 后,add() 方法抛出异常。offer() 方法会加锁,保证线程安全,队列没满时执行入队操作,入队操作通过操作数组实现,并且通过循环复用数组空间。元素添加成功后队列不为空,调用 signal() 方法唤醒移除元素的阻塞线程,最后我们看 put() 方法:

public void put(E e) throws InterruptedException {
	// 判断不为空
	checkNotNull(e);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		// 队列满就挂起在等待队列
	    while (count == items.length)
	        notFull.await();
	    enqueue(e);
	} finally {
	    lock.unlock();
	}
}

从代码可以看出,当队列满时,当前线程会被挂起到等待队列中,直到队列不满时被唤醒执行添加操作。下面我们看删除操作:

public boolean remove(Object o) {
	// 判断是否为 NULL
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            // 从移除下标开始遍历到添加新元素的下标
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                // 循环判断,移除下标可能大于添加下标(添加下标二次遍历时)
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}
void removeAt(final int removeIndex) {
	final Object[] items = this.items;
	// 要删除的元素正好是移除下标
	if (removeIndex == takeIndex) {
	    items[takeIndex] = null;
	    // 循环删除
	    if (++takeIndex == items.length)
	        takeIndex = 0;
	    count--;
	    if (itrs != null)
	        itrs.elementDequeued();
	} else {
	    final int putIndex = this.putIndex;
	    // 如果不是移除下标,从该下标开始到添加下标,所有元素左移一位
	    for (int i = removeIndex;;) {
	        int next = i + 1;
	        if (next == items.length)
	            next = 0;
	        if (next != putIndex) {
	        	// 向左移除
	            items[i] = items[next];
	            i = next;
	        } else {
	        	// 最后put下标置为null
	            items[i] = null;
	            this.putIndex = i;
	            break;
	        }
	    }
	    count--;
	    // 更新迭代器
	    if (itrs != null)
	        itrs.removedAt(removeIndex);
	}
	notFull.signal();
}

remove() 和 poll()、take() 不同,它可以删除指定的元素。这里需要考虑删除的元素不是移除索引指向的情况,从代码可以看出,当要删除的元素不是移除索引指向的元素时,将所有从被删除元素下标开始到添加元素下标所有元素左移一位。

public E poll() {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
	    return (count == 0) ? null : dequeue();
	} finally {
	    lock.unlock();
	}
}
private E dequeue() {
	final Object[] items = this.items;
	E x = (E) items[takeIndex];
	items[takeIndex] = null;
	if (++takeIndex == items.length)
	    takeIndex = 0;
	count--;
	if (itrs != null)
	    itrs.elementDequeued();
	// 移除元素后唤醒put()添加线程
	notFull.signal();
	return x;
}

相比 remove() 方法,poll() 方法简单了很多,这里不做赘述,下面我们看 take():

public E take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		// 队列为空就挂起
	    while (count == 0)
	        notEmpty.await();
	    return dequeue();
	} finally {
	    lock.unlock();
	}
}

take() 方法和 put() 方法可以说基本一致,相对也比较简单,最后我们来看看两个查询方法:

public E element() {
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    	// 直接返回移除元素下标对应的元素,也就是队列头
        return itemAt(takeIndex);
    } finally {
        lock.unlock();
    }
}
final E itemAt(int i) {
    return (E) items[i];
}

element() 基于 peek() 方法实现实现、当队列为空时,peek() 方法返回 null,element() 抛出异常。关于 ArrayBlockingQueue 就介绍到这里

LinkedBlockingQueue 基于链表实现,它的属性如下:

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
	// 链表节点,存储元素
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
	// 链表容量
    private final int capacity;
	// 当前元素数量
    private final AtomicInteger count = new AtomicInteger();
	// 头节点
    transient Node<E> head;
    // 尾节点
    private transient Node<E> last;
	// 删除锁
    private final ReentrantLock takeLock = new ReentrantLock();
	// 不为空等待队列
    private final Condition notEmpty = takeLock.newCondition();
	// 添加锁
    private final ReentrantLock putLock = new ReentrantLock();
	// 不满等待队列
    private final Condition notFull = putLock.newCondition();
}

从代码可以看出,元素被封装为 Node 节点保存在单向链表中,其中链表默认长度为 Integer.MAX_VALUE,因此在使用时需注意内存溢出:当添加元素速度大于删除元素速度时,队列最终会记录到大量不会用到并且无法回收的对象,导致内存溢出。

ArrayBlockingQueue 和 LinkedBlockingQueue 的主要区别在于 ReentrantLock 锁的数量和等待队列,LinkedBlockingQueue 用到两个锁和两个等待队列,也就是说添加和删除操作可以并发执行,整体效率更高。下面我们直接看代码:

public boolean add(E e) {
     if (offer(e))
         return true;
     else
         throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
	// 元素为空抛出异常
	if (e == null) throw new NullPointerException();
	// 获取当前队列容量
	final AtomicInteger count = this.count;
	// 队列已满时直接返回false
	if (count.get() == capacity)
	    return false;
	int c = -1;
	Node<E> node = new Node<E>(e);
	// 获取添加锁
	final ReentrantLock putLock = this.putLock;
	putLock.lock();
	try {
		// 二次判断,因为上面判断时未加锁,数据可能已更新
	    if (count.get() < capacity) {
	    	// 入队操作
	        enqueue(node);
	        // 获取还未添加元素前,队列的容量
	        c = count.getAndIncrement();
	        if (c + 1 < capacity)
	        	// 唤醒其它添加元素的线程
	            notFull.signal();
	    }
	} finally {
	    putLock.unlock();
	}
	// 如果添加前队列没有数据,也就是说现在有一条数据时
	if (c == 0)
		// 唤醒take线程
	  	signalNotEmpty();
	return c >= 0;
}
private void enqueue(Node<E> node) {
     last = last.next = node;
}
private void signalNotEmpty() {
	// 唤醒take线程前必须获取对应take锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

这里有以下几点需要我们注意:

1.LinkedBlockingQueue count 属性必须通过并发类封装,因为可能存在添加、删除两个线程并发执行,需考虑同步

2.这里需要判断两次的主要原因在于方法开始时并没有加锁,数值可能改变,因此在获取到锁后需要二次判断

3.和 ArrayBlockingQueue 不同,LinkedBlockingQueue 在队列不满时会唤醒添加线程,这样做的原因是 LinkedBlockingQueue 中添加和删除操作使用不同的锁,各自只需管好自己,还可以提高吞吐量。而 ArrayBlockingQueue 使用唯一锁,这样做会导致移除线程永远不被唤醒或添加线程永远不被唤醒,吞吐量较低

4.添加元素前队列长度为0才唤醒移除线程,因为队列长度为0时,移除线程肯定已经挂起,此时唤醒一个移除线程即可。因为移除线程和添加线程类似,都会自己唤醒自己。而 c>0 时只会有两种情况:存在移除线程在运行,如果有会递归唤醒,无须我们参与、不存在移除线程运行,此时也无须我们参与,等待调用 take()、poll() 方法即可

5.唤醒只针对 put()、take() 方法阻塞的线程,offer() 方法直接返回(不包含最大等待时长),不参与唤醒场景

下面我们来看 put() 阻塞方法的实现:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
    	// 队列满时阻塞
        while (count.get() == capacity) {
            notFull.await();
        }
        // 入队
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

从代码可以看出,put() 方法和 offer() 方法唯一区别在于自身通过 condition 阻塞挂起到等待队列,其余基本相同。至此关于添加操作介绍完毕,下面我们看移除方法:

public boolean remove(Object o) {
	if (o == null) return false;
	// 同时加两个锁
	fullyLock();
	try {
		// 循环查找
	    for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {
	        if (o.equals(p.item)) {
	            unlink(p, trail);
	            return true;
	        }
	    }
	    return false;
	} finally {
	    fullyUnlock();
	}
}
void unlink(Node<E> p, Node<E> trail) {
	// p是要溢出的节点,trail是它的前驱节点
	// 方便gc
    p.item = null;
    // 引用取消
    trail.next = p.next;
    if (last == p)
        last = trail;
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}
void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

从代码可以看出,remove() 方法只会在操作前容量不满时唤醒创建线程,并不会唤醒移除线程。并且由于我们不确定要删除元素的位置,因此此时需要加两个锁,确保数据安全。

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            // 获取移除前队列的元素数量
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    // 移除前如果队列是满的,唤醒添加线程
    if (c == capacity)
        signalNotFull();
    return x;
}
private E dequeue() {
	Node<E> h = head;
	// 获取要删除的节点
	Node<E> first = h.next;
	// 清除原来的头结点(方便gc)
	h.next = h;
	// 设置新的头结点
	head = first;
	// 获取返回值
	E x = first.item;
	// 新头结点置为空
	first.item = null;
	return x;
}

需要注意的一点,每次出队时更换 head 节点,head 节点本身不保存数据,head.next 记录下次需要出队的元素,每次出队后 head.next 变为新的 head 节点返回并置为 null

poll() 方法和上面提到的 offer() 方法基本镜像相同,这里我再不做过多赘述

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
    	// 队列为空就挂起
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

take() 方法和 poll() 方法类似,区别在于新增了阻塞逻辑。至此关于溢出元素方法介绍完毕,最后我们看看查询方法源码:

public LinkedBlockingQueue(int capacity) {
   if (capacity <= 0) throw new IllegalArgumentException();
   this.capacity = capacity;
   last = head = new Node<E>(null);
}
public E element() {
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

从代码可以看出,默认 head 和 last 头尾节点都为 null,入队时直接从 next 开始操作,也就是说 head 节点不保存数据。

最后我们来看看有最大等待时长的 offer() 方法:

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
	if (e == null) throw new NullPointerException();
	// 将时间转换成纳秒
	long nanos = unit.toNanos(timeout);
	int c = -1;
	// 获取锁
	final ReentrantLock putLock = this.putLock;
	// 获取当前队列大小
	final AtomicInteger count = this.count;
	// 可中断锁
	putLock.lockInterruptibly();
	try {
	    while (count.get() == capacity) {
	    	// 小于0说明已到达最大等待时长
	        if (nanos <= 0)
	            return false;
	        // 如果队列已满,根据等待队列阻塞等待
	        nanos = notFull.awaitNanos(nanos);
	    }
	    // 队列没满直接入队
	    enqueue(new Node<E>(e));
	    c = count.getAndIncrement();
	    if (c + 1 < capacity)
	        notFull.signal();
	} finally {
	    putLock.unlock();
	}
	if (c == 0)
	    signalNotEmpty();
	return true;
}
 public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException();
	// 将当前线程封装为 AQS Node 类加入等待队列
    Node node = addConditionWaiter();
    // 释放锁
    int savedState = fullyRelease(node);
    //计算过期时间
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    // 当前线程没有唤醒进入同步队列时
    while (!isOnSyncQueue(node)) {
    	// 已经等待相应时间,删除当前节点,将状态设置为已关闭从队列删除
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        // 判断是否超时
        if (nanosTimeout >= spinForTimeoutThreshold)
        	// 挂起线程
            LockSupport.parkNanos(this, nanosTimeout);
        // 判断线程状态是否被中断
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        // 重新计算剩余等待时间
        nanosTimeout = deadline - System.nanoTime();
    }
    // 被唤醒后执行自旋操作争取获得锁,同时判断线程是否被中断
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
    	// 清理等待队列中不为Condition状态的线程
        unlinkCancelledWaiters();
    // 判断是否被中断
    if (interruptMode != 0)
    	// 抛出异常或中断线程,独占模式抛出异常,共享模式中断线程
        reportInterruptAfterWait(interruptMode);
    // 返回时差,如果成功当前时间小于最大等待时长,返回值大于0,否则返回值小于0
    return deadline - System.nanoTime();
}

从代码可以看出,包含最大等待时长的 offer()、poll() 方法通过循环判断时间是否超时的方式挂起在等待队列,达到最大等待时长还未被唤醒或没被执行就返回

ArrayBlockingQueue 和 LinkedBlockingQueue 对比:

  • 大小不同,一个有界,一个无界。ArrayBlockingQueue 必须指定初始大小,LinkedBlockingQueue 无界时可能内存溢出
  • 一个采用数组,一个采用链表,数组保存无须创建新对象,链表需创建 Node 对象
  • 锁机制不同,ArrayBlockingQueue 添加删除操作使用同一个锁,两者操作不能并发执行。LinkedBlockingQueue 添加和删除使用不同锁,添加和删除操作可并发执行,整体效率 LinkedBlockingQueue 更高

到此这篇关于Java常见的阻塞队列总结的文章就介绍到这了,更多相关Java阻塞队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • java 中 阻塞队列BlockingQueue详解及实例

    java 中 阻塞队列BlockingQueue详解及实例 BlockingQueue很好的解决了多线程中数据的传输,首先BlockingQueue是一个接口,它大致有四个实现类,这是一个很特殊的队列,如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒.同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作.

  • Java中的阻塞队列详细介绍

    Java中的阻塞队列 1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是: 在队列为空时,获取元素的线程会等待队列变为非空. 当队列满时,存储元素的线程会等待队列可用. 阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 2.Java里的阻塞队列 JDK中提供了七个阻塞队列: ArrayBlockingQueue :一个由数组结

  • java阻塞队列实现原理及实例解析

    这篇文章主要介绍了java阻塞队列实现原理及实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 阻塞队列与普通队列的不同在于.当队列是空的时候,从队列中获取元素的操作将会被阻塞,或者当队列满时,往队列里面添加元素将会被阻塞.试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素.同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完

  • 深入理解Java线程编程中的阻塞队列容器

    1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Q

  • Java并发编程之阻塞队列详解

    1.什么是阻塞队列? 队列是一种数据结构,它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素.阻塞队里与普通的队列的区别在于,普通队列不会对当前线程产生阻塞,在面对类似消费者-生产者模型时,就必须额外的实现同步策略以及线程间唤醒策略.使用阻塞队列,就会对当前线程产生阻塞,当队列是空时,从队列中获取元素的操作将会被阻塞,当队列是满时,往队列里添加元素的操作也会被阻塞. 2.主要的阻塞队列及其方法 java.util.concurrent包下提供主要的几种阻塞队列,主要有以下几个: 1

  • Java wait和notifyAll实现简单的阻塞队列

    wait,会使调用的线程进入等待状态,会释放所持有的对象锁(调用的时候也必须先获取到锁,否则会抛出异常 IllegalMonitorStateException) notifyAll.notify,会去唤醒应当前对象而等待的线程,(调用的时候也必须先获取到锁,否则会抛出异常 IllegalMonitorStateException) 顺便也记录一下join方法,调用join方法,会使当前线程进入等待,如果没有设置等待时间,就会等待另一个线程执行完成才返回(ps:调用join方法并不一定立刻执行另

  • Java源码解析阻塞队列ArrayBlockingQueue介绍

    Java的阻塞队列,在实现时,使用到了lock和condition,下面是对其主要方法的介绍. 首先看一下,阻塞队列中使用到的锁. /** Main lock guarding all access **/ final ReentrantLock lock;​ /** Condition for waiting takes **/ private final Condition notEmpty;​ /** Condition for waiting puts **/ private final

  • 详解java中的阻塞队列

    阻塞队列简介 阻塞队列(BlockingQueue)首先是一个支持先进先出的队列,与普通的队列完全相同: 其次是一个支持阻塞操作的队列,即: 当队列满时,会阻塞执行插入操作的线程,直到队列不满. 当队列为空时,会阻塞执行获取操作的线程,直到队列不为空. 阻塞队列用在多线程的场景下,因此阻塞队列使用了锁机制来保证同步,这里使用的可重入锁: 而对于阻塞与唤醒机制则有与锁绑定的Condition实现 应用场景:生产者消费者模式 java中的阻塞队列 java中的阻塞队列根据容量可以分为有界队列和无界队

  • Java常见的阻塞队列总结

    Java阻塞队列 阻塞队列和普通队列主要区别在阻塞二字: 阻塞添加:队列已满时,添加元素线程会阻塞,直到队列不满时才唤醒线程执行添加操作 阻塞删除:队列元素为空时,删除元素线程会阻塞,直到队列不为空再执行删除操作 常见的阻塞队列有 LinkedBlockingQueue 和 ArrayBlockingQueue,其中它们都实现 BlockingQueue 接口,该接口定义了阻塞队列需实现的核心方法: public interface BlockingQueue<E> extends Queue

  • Java实现自定义阻塞队列

    今天重温了下 java 多线程中的 notify() 方法以及 wait() 方法,一时兴起,决定通过这俩个方法,实现一个简易的自定义阻塞队列. 阻塞队列是什么,与普通队列的区别是什么? 阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞.试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素.同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来.

  • 利用Java手写阻塞队列的示例代码

    目录 前言 需求分析 阻塞队列实现原理 线程阻塞和唤醒 数组循环使用 代码实现 成员变量定义 构造函数 put函数 offer函数 add函数 take函数 重写toString函数 完整代码 总结 前言 在我们平时编程的时候一个很重要的工具就是容器,在本篇文章当中主要给大家介绍阻塞队列的原理,并且在了解原理之后自己动手实现一个低配版的阻塞队列. 需求分析 在前面的两篇文章ArrayDeque(JDK双端队列)源码深度剖析和深入剖析(JDK)ArrayQueue源码当中我们仔细介绍了队列的原理,

  • Java中使用阻塞队列控制线程集实例

    队列以一种先进先出的方式管理数据.如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞.在多线程进行合作时,阻塞队列是很有用的工具.工作者线程可以定期的把中间结果存到阻塞队列中.而其他工作者线程把中间结果取出并在将来修改它们.队列会自动平衡负载.如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞.如果第一个线程集运行的快,那么它将等待第二个线程集赶上来. 下面的程序展示了如何使用阻塞队列来控制线程集.程序在一个目录及它的所有子目

  • Java中常用阻塞队列的问题小结

    Java常用阻塞队列 ArrayBlockingQueue 内部由一个固定长度的数组来实现阻塞队列 /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; public ArrayBlockingQue

  • Java 阻塞队列和线程池原理分析

    目录 [1]阻塞队列 一.什么是阻塞队列? 二.阻塞队列有什么用? 三.阻塞队列的简单实用 [2]Java 线程池 一.我们为什么需要Java 线程池?使用它的好处是什么? 二.Java中主要提供了哪几种线程的线程池? 三.线程类的继承关系 四.ThreadPoolExecutor参数的含义 corePoolSize 五.线程池工作流程(机制) 六.关于两种提交方法的比较 [1]阻塞队列 一.什么是阻塞队列? ① 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满. ②

  • Java阻塞队列BlockingQueue详解

    目录 队列的类型 数据结构 阻塞队列 BlockingQueue 常见的阻塞队列 BlockingQueue API ArrayBlockingQueue 源码简解 生产者消费者模式 延迟队列 DelayQueue 队列的类型 无限队列(unbounded queue) 无容量限定,只随存储变化 有限队列(bounded queue) 定义了最大容量 向无限队列添加元素的所有操作都将永远不会阻塞(也是线程安全的),因此它可以增长到非常大的容量. 使用无限阻塞队列 BlockingQueue 设计

随机推荐