基于java中BlockingQueue的使用介绍
最近在维护一个java工程,在群里面也就聊起来java的优劣!无奈一些Java的终极粉丝,总是号称性能已经不必C++差,并且很多标准类库都是大师级的人写的,如何如何稳定等等。索性就认真研究一番,他们给我的一项说明就是,在线程之间投递消息,用java已经封装好的BlockingQueue,就足够用了。
既然足够用那就写代码测试喽,简简单单写一个小程序做了一番测试:
代码如下:
//默认包
import java.util.concurrent.*;
import base.MyRunnable;
public class Test
{
public static void main(String[] args)
{
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
java.lang.Runnable r = new MyRunnable(queue);
Thread t = new Thread(r);
t.start();
while(true)
{
try
{
while(true)
{
for(int i =0;i < 10000;i++)
{
queue.offer(i);
}
}
}
catch ( Exception e)
{
e.printStackTrace();
}
}
}
}
//需要添加的包
package base;
import java.lang.Runnable;
import java.util.concurrent.*;
import java.util.*;
public class MyRunnable implements Runnable
{
public MyRunnable(BlockingQueue<Integer> queue)
{
this.queue = queue;
}
public void run()
{
Date d = new Date();
long starttime = d.getTime();
System.err.println(starttime);
int count = 0;
while(true)
{
try
{
Integer i = this.queue.poll();
if(i != null)
{
count ++;
}
if(count == 100000)
{
Date e = new Date();
long endtime = e.getTime();
System.err.println(count);
System.err.println(endtime);
System.err.print(endtime - starttime);
break;
}
}
catch (Exception e)
{
}
}
}
private BlockingQueue<Integer> queue;
}
传递十万条数据,在我的测试机上面,大概需要50ms左右,倒是还可以!索性就看了一下BlockingQueue的底层实现
我在上面的测试代码中使用的offer 和 poll,就看看这两个实现函数吧,首先是offer
代码如下:
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
和一般的同步线程类似,只是多加了一个signal,在学习unix环境高级编程时候,看到条件变量用于线程之间的同步,可以实现线程以竞争的方式实现同步!
poll函数的实现也是类似!
代码如下:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}