java线程阻塞中断与LockSupport使用介绍

上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下Thread.interrupt和java 5之后的LockSupport的实现。
在介绍之前,先抛几个问题。
Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常?
Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?
一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?
LockSupport.park()和unpark(),与object.wait()和notify()的区别?
LockSupport.park(Object blocker)传递的blocker对象做什么用?
LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?
Thread.interrupt()处理是否有对应的回调函数?类似于钩子调用?
如果你都都能很明确的答上来了,说明你已经完全懂Thread.interrupt,可以不用往下看那了。
那如果不清楚的,带着这几个问题,一起来梳理下。
Thread的interrupt处理的几个方法:
public void interrupt() : 执行线程interrupt事件
public boolean isInterrupted() : 检查当前线程是否处于interrupt
public static boolean interrupted() : check当前线程是否处于interrupt,并重置interrupt信息。类似于resetAndGet()
理解:
1. 每个线程都有一个interrupt status标志位,用于表明当前线程是否处于中断状态
2. 一般调用Thread.interrupt()会有两种处理方式
遇到一个低优先级的block状态时,比如object.wait(),object.sleep(),object.join()。它会立马触发一个unblock解除阻塞,并throw一个InterruptedException。
其他情况,Thread.interrupt()仅仅只是更新了status标志位。然后你的工作线程通过Thread.isInterrrupted()进行检查,可以做相应的处理,比如也throw InterruptedException或者是清理状态,取消task等。
在interrupt javadoc中描述:
 
最佳实践
IBM上有篇文章写的挺不错。Java theory and practice: Dealing with InterruptedException , 里面提到了Interrupt处理的几条最佳实践。
Don't swallow interrupts (别吃掉Interrupt,一般是两种处理: 继续throw InterruptedException异常。 另一种就是继续设置Thread.interupt()异常标志位,让更上一层去进行相应处理。


代码如下:

public class TaskRunner implements Runnable {
private BlockingQueue<Task> queue;
public TaskRunner(BlockingQueue<Task> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
Task task = queue.take(10, TimeUnit.SECONDS);
task.execute();
}
}
catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
}

代码如下:

public class TaskRunner implements Runnable {
private BlockingQueue<Task> queue;
public TaskRunner(BlockingQueue<Task> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
Task task = queue.take(10, TimeUnit.SECONDS);
task.execute();
}
}
catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
}

Implementing cancelable tasks with Interrupt (使用Thread.interrupt()来设计和支持可被cancel的task)


代码如下:

public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() { interrupt(); } // 发起中断
}<SPAN style="WHITE-SPACE: normal"> </SPAN>

代码如下:

public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() { interrupt(); } // 发起中断
}<SPAN style="WHITE-SPACE: normal"> </SPAN>

注册Interrupt处理事件(非正常用法)
一般正常的task设计用来处理cancel,都是采用主动轮询的方式检查Thread.isInterrupt(),对业务本身存在一定的嵌入性,还有就是存在延迟,你得等到下一个检查点(谁知道下一个检查点是在什么时候,特别是进行一个socket.read时,遇到过一个HttpClient超时的问题)。
来看一下,主动抛出InterruptedException异常的实现,借鉴于InterruptibleChannel的设计,比较取巧。


代码如下:

interface InterruptAble { // 定义可中断的接口
public void interrupt() throws InterruptedException;
}
abstract class InterruptSupport implements InterruptAble {
private volatile boolean interrupted = false;
private Interruptible interruptor = new Interruptible() {
public void interrupt() {
interrupted = true;
InterruptSupport.this.interrupt(); // 位置3
}
};
public final boolean execute() throws InterruptedException {
try {
blockedOn(interruptor); // 位置1
if (Thread.currentThread().isInterrupted()) { // 立马被interrupted
interruptor.interrupt();
}
// 执行业务代码
bussiness();
} finally {
blockedOn(null); // 位置2
}
return interrupted;
}
public abstract void bussiness() ;
public abstract void interrupt();
// -- sun.misc.SharedSecrets --
static void blockedOn(Interruptible intr) { // package-private
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
}
}

代码如下:

interface InterruptAble { // 定义可中断的接口
public void interrupt() throws InterruptedException;
}
abstract class InterruptSupport implements InterruptAble {
private volatile boolean interrupted = false;
private Interruptible interruptor = new Interruptible() {
public void interrupt() {
interrupted = true;
InterruptSupport.this.interrupt(); // 位置3
}
};
public final boolean execute() throws InterruptedException {
try {
blockedOn(interruptor); // 位置1
if (Thread.currentThread().isInterrupted()) { // 立马被interrupted
interruptor.interrupt();
}
// 执行业务代码
bussiness();
} finally {
blockedOn(null); // 位置2
}
return interrupted;
}
public abstract void bussiness() ;
public abstract void interrupt();
// -- sun.misc.SharedSecrets --
static void blockedOn(Interruptible intr) { // package-private
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
}
}

代码说明,几个取巧的点:
位置1:利用sun提供的blockedOn方法,绑定对应的Interruptible事件处理钩子到指定的Thread上。
位置2:执行完代码后,清空钩子。避免使用连接池时,对下一个Thread处理事件的影响。
位置3:定义了Interruptible事件钩子的处理方法,回调InterruptSupport.this.interrupt()方法,子类可以集成实现自己的业务逻辑,比如sock流关闭等等。
使用:


代码如下:

class InterruptRead extends InterruptSupport {
private FileInputStream in;
@Override
public void bussiness() {
File file = new File("/dev/urandom"); // 读取linux黑洞,永远读不完
try {
in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
// Thread.sleep(100);
// if (Thread.interrupted()) {// 以前的Interrupt检查方式
// throw new InterruptedException("");
// }
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public FileInputStream getIn() {
return in;
}
@Override
public void interrupt() {
try {
in.getChannel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) throws Exception {
final InterruptRead test = new InterruptRead();
Thread t = new Thread() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
System.out.println("InterruptRead start!");
test.execute();
} catch (InterruptedException e) {
System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
e.printStackTrace();
}
}
};
t.start();
// 先让Read执行3秒
Thread.sleep(3000);
// 发出interrupt中断
t.interrupt();
}

代码如下:

class InterruptRead extends InterruptSupport {
private FileInputStream in;
@Override
public void bussiness() {
File file = new File("/dev/urandom"); // 读取linux黑洞,永远读不完
try {
in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
// Thread.sleep(100);
// if (Thread.interrupted()) {// 以前的Interrupt检查方式
// throw new InterruptedException("");
// }
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public FileInputStream getIn() {
return in;
}
@Override
public void interrupt() {
try {
in.getChannel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) throws Exception {
final InterruptRead test = new InterruptRead();
Thread t = new Thread() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
System.out.println("InterruptRead start!");
test.execute();
} catch (InterruptedException e) {
System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
e.printStackTrace();
}
}
};
t.start();
// 先让Read执行3秒
Thread.sleep(3000);
// 发出interrupt中断
t.interrupt();
}

jdk源码介绍:
1. sun提供的钩子可以查看System的相关代码, line : 1125


代码如下:

sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
public sun.reflect.ConstantPool getConstantPool(Class klass) {
return klass.getConstantPool();
}
public void setAnnotationType(Class klass, AnnotationType type) {
klass.setAnnotationType(type);
}
public AnnotationType getAnnotationType(Class klass) {
return klass.getAnnotationType();
}
public <E extends Enum<E>>
E[] getEnumConstantsShared(Class<E> klass) {
return klass.getEnumConstantsShared();
}
public void blockedOn(Thread t, Interruptible b) {
t.blockedOn(b);
}
});

代码如下:

sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
public sun.reflect.ConstantPool getConstantPool(Class klass) {
return klass.getConstantPool();
}
public void setAnnotationType(Class klass, AnnotationType type) {
klass.setAnnotationType(type);
}
public AnnotationType getAnnotationType(Class klass) {
return klass.getAnnotationType();
}
public <E extends Enum<E>>
E[] getEnumConstantsShared(Class<E> klass) {
return klass.getEnumConstantsShared();
}
public void blockedOn(Thread t, Interruptible b) {
t.blockedOn(b);
}
});

2. Thread.interrupt()


代码如下:

public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(); //回调钩子
return;
}
}
interrupt0();
}

代码如下:

public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(); //回调钩子
return;
}
}
interrupt0();
}

更多
更多关于Thread.stop,suspend,resume,interrupt的使用注意点,可以看一下sun的文档,比如http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html
最后来解答一下之前的几个问题:
问题1: Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常?
答: Thread.interrupt()只是在Object.wait() .Object.join(), Object.sleep()几个方法会主动抛出InterruptedException异常。而在其他的的block常见,只是通过设置了Thread的一个标志位信息,需要程序自我进行处理。


代码如下:

if (Thread.interrupted()) // Clears interrupted status!
throw new InterruptedException();

代码如下:

if (Thread.interrupted()) // Clears interrupted status!
throw new InterruptedException();

问题2:Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?
答:Thread.interrupt设计的目的主要是用于处理线程处于block状态,比如wait(),sleep()状态就是个例子。但可以在程序设计时为支持task cancel,同样可以支持RUNNING状态。比如Object.join()和一些支持interrupt的一些nio channel设计。
问题3: 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?
答: interrupt用途: unBlock操作,支持任务cancel, 数据清理等。
问题4: LockSupport.park()和unpark(),与object.wait()和notify()的区别?
答:
1. 面向的主体不一样。LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。
2. 实现机制不同。虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。可以看下测试例子。object.notifyAll()不能唤醒LockSupport的阻塞Thread.
问题5: LockSupport.park(Object blocker)传递的blocker对象做什么用?
答: 对应的blcoker会记录在Thread的一个parkBlocker属性中,通过jstack命令可以非常方便的监控具体的阻塞对象.


代码如下:

public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker); // 设置Thread.parkBlocker属性的值
unsafe.park(false, 0L);
setBlocker(t, null); // 清除Thread.parkBlocker属性的值
}

代码如下:

public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker); // 设置Thread.parkBlocker属性的值
unsafe.park(false, 0L);
setBlocker(t, null); // 清除Thread.parkBlocker属性的值
}

具体LockSupport的javadoc描述也比较清楚,可以看下:
 
问题6: LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?
答:能响应interrupt事件,但不会抛出InterruptedException异常。针对LockSupport对Thread.interrupte支持,也先看一下javadoc中的描述:
 
相关测试代码


代码如下:

package com.agapple.cocurrent;
import java.io.File;
import java.io.FileInputStream;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest {
private static LockSupportTest blocker = new LockSupportTest();
public static void main(String args[]) throws Exception {
lockSupportTest();
parkTest();
interruptParkTest();
interruptSleepTest();
interruptWaitTest();
}
/**
* LockSupport.park对象后,尝试获取Thread.blocker对象,调用其single唤醒
*
* @throws Exception
*/
private static void lockSupportTest() throws Exception {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 尝试sleep 5s
System.out.println("blocker");
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "lockSupportTest";
}
});
t.start(); // 启动读取线程
Thread.sleep(150);
synchronized (blocker) {
Field field = Thread.class.getDeclaredField("parkBlocker");
field.setAccessible(true);
Object fBlocker = field.get(t);
System.out.println(blocker == fBlocker);
Thread.sleep(100);
System.out.println("notifyAll");
blocker.notifyAll();
}
}
/**
* 尝试去中断一个object.wait(),会抛出对应的InterruptedException异常
*
* @throws InterruptedException
*/
private static void interruptWaitTest() throws InterruptedException {
final Object obj = new Object();
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 尝试sleep 5s
obj.wait();
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptWaitTest";
}
});
t.start(); // 启动读取线程
Thread.sleep(2000);
t.interrupt(); // 检查下在park时,是否响应中断
}
/**
* 尝试去中断一个Thread.sleep(),会抛出对应的InterruptedException异常
*
* @throws InterruptedException
*/
private static void interruptSleepTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 尝试sleep 5s
Thread.sleep(5000);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptSleepTest";
}
});
t.start(); // 启动读取线程
Thread.sleep(2000);
t.interrupt(); // 检查下在park时,是否响应中断
}
/**
* 尝试去中断一个LockSupport.park(),会有响应但不会抛出InterruptedException异常
*
* @throws InterruptedException
*/
private static void interruptParkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
// 尝试去park 自己线程
LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptParkTest";
}
});
t.start(); // 启动读取线程
Thread.sleep(2000);
t.interrupt(); // 检查下在park时,是否响应中断
}
/**
* 尝试去中断一个LockSupport.unPark(),会有响应
*
* @throws InterruptedException
*/
private static void parkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
// 尝试去park 自己线程
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "parkTest";
}
});
t.start(); // 启动读取线程
Thread.sleep(2000);
LockSupport.unpark(t);
t.interrupt();
}
public static Thread doTest(final TestCallBack call) {
return new Thread() {
@Override
public void run() {
File file = new File("/dev/urandom"); // 读取linux黑洞
try {
FileInputStream in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
if (Thread.interrupted()) {
throw new InterruptedException("");
}
System.out.println(bytes[0]);
Thread.sleep(100);
long start = System.currentTimeMillis();
call.callback();
System.out.println(call.getName() + " callback finish cost : "
+ (System.currentTimeMillis() - start));
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
}
interface TestCallBack {
public void callback() throws Exception;
public String getName();
}

代码如下:

package com.agapple.cocurrent;
import java.io.File;
import java.io.FileInputStream;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest {
private static LockSupportTest blocker = new LockSupportTest();
public static void main(String args[]) throws Exception {
lockSupportTest();
parkTest();
interruptParkTest();
interruptSleepTest();
interruptWaitTest();
}
/**
* LockSupport.park对象后,尝试获取Thread.blocker对象,调用其single唤醒
*
* @throws Exception
*/
private static void lockSupportTest() throws Exception {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 尝试sleep 5s
System.out.println("blocker");
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "lockSupportTest";
}
});
t.start(); // 启动读取线程
Thread.sleep(150);
synchronized (blocker) {
Field field = Thread.class.getDeclaredField("parkBlocker");
field.setAccessible(true);
Object fBlocker = field.get(t);
System.out.println(blocker == fBlocker);
Thread.sleep(100);
System.out.println("notifyAll");
blocker.notifyAll();
}
}
/**
* 尝试去中断一个object.wait(),会抛出对应的InterruptedException异常
*
* @throws InterruptedException
*/
private static void interruptWaitTest() throws InterruptedException {
final Object obj = new Object();
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 尝试sleep 5s
obj.wait();
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptWaitTest";
}
});
t.start(); // 启动读取线程
Thread.sleep(2000);
t.interrupt(); // 检查下在park时,是否响应中断
}
/**
* 尝试去中断一个Thread.sleep(),会抛出对应的InterruptedException异常
*
* @throws InterruptedException
*/
private static void interruptSleepTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 尝试sleep 5s
Thread.sleep(5000);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptSleepTest";
}
});
t.start(); // 启动读取线程
Thread.sleep(2000);
t.interrupt(); // 检查下在park时,是否响应中断
}
/**
* 尝试去中断一个LockSupport.park(),会有响应但不会抛出InterruptedException异常
*
* @throws InterruptedException
*/
private static void interruptParkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
// 尝试去park 自己线程
LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptParkTest";
}
});
t.start(); // 启动读取线程
Thread.sleep(2000);
t.interrupt(); // 检查下在park时,是否响应中断
}
/**
* 尝试去中断一个LockSupport.unPark(),会有响应
*
* @throws InterruptedException
*/
private static void parkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
// 尝试去park 自己线程
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "parkTest";
}
});
t.start(); // 启动读取线程
Thread.sleep(2000);
LockSupport.unpark(t);
t.interrupt();
}
public static Thread doTest(final TestCallBack call) {
return new Thread() {
@Override
public void run() {
File file = new File("/dev/urandom"); // 读取linux黑洞
try {
FileInputStream in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
if (Thread.interrupted()) {
throw new InterruptedException("");
}
System.out.println(bytes[0]);
Thread.sleep(100);
long start = System.currentTimeMillis();
call.callback();
System.out.println(call.getName() + " callback finish cost : "
+ (System.currentTimeMillis() - start));
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
}
interface TestCallBack {
public void callback() throws Exception;
public String getName();
}

最后
发觉文章越写越长,那就索性发到了论坛,大家一起讨论下.毕竟文章中描述的都是一些使用层面的东东,并没有从操作系统或者sun native实现上去介绍Thread的一些机制,熟悉这块的大牛门也可以出来发表下高见.

(0)

相关推荐

  • Java多线程基础——Lock类

    之前已经说道,JVM提供了synchronized关键字来实现对变量的同步访问以及用wait和notify来实现线程间通信.在jdk1.5以后,JAVA提供了Lock类来实现和synchronized一样的功能,并且还提供了Condition来显示线程间通信. Lock类是Java类来提供的功能,丰富的api使得Lock类的同步功能比synchronized的同步更强大.本文章的所有代码均在Lock类例子的代码 本文主要介绍一下内容: Lock类 Lock类其他功能 Condition类 Con

  • java集合框架 arrayblockingqueue应用分析

    Queue ------------ 1.ArrayDeque, (数组双端队列) 2.PriorityQueue, (优先级队列) 3.ConcurrentLinkedQueue, (基于链表的并发队列) 4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 5.ArrayBlockingQueue, (基于数组的并发阻塞队列) 6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列) 7.LinkedBlockingDeque, (

  • 详解Java多线程编程中互斥锁ReentrantLock类的用法

    0.关于互斥锁 所谓互斥锁, 指的是一次最多只能有一个线程持有的锁. 在jdk1.5之前, 我们通常使用synchronized机制控制多个线程对共享资源的访问. 而现在, Lock提供了比synchronized机制更广泛的锁定操作, Lock和synchronized机制的主要区别: synchronized机制提供了对与每个对象相关的隐式监视器锁的访问, 并强制所有锁获取和释放均要出现在一个块结构中, 当获取了多个锁时, 它们必须以相反的顺序释放. synchronized机制对锁的释放是

  • java线程并发blockingqueue类使用示例

    如果BlockingQueue是满的任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有新的空间才会被唤醒继续操作. BlockingQueue提供的方法主要有: add(anObject): 把anObject加到BlockingQueue里,如果BlockingQueue可以容纳返回true,否则抛出IllegalStateException异常. offer(anObject):把anObject加到BlockingQueue里,如果BlockingQueue

  • Java多线程编程之读写锁ReadWriteLock用法实例

    读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由jvm自己控制的,你只要上好相应的锁即可.如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁:如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁.总之,读的时候上读锁,写的时候上写锁! 三个线程读数据,三个线程写数据示例: 可以同时读,读的时候不能写,不能同时写,写的时候不能读. 读的时候上读锁,读完解锁:写的时候上写锁,写完解锁. 注意finally解锁. package com.ljq.test.th

  • Java并发编程之显示锁ReentrantLock和ReadWriteLock读写锁

    在Java5.0之前,只有synchronized(内置锁)和volatile. Java5.0后引入了显示锁ReentrantLock. ReentrantLock概况 ReentrantLock是可重入的锁,它不同于内置锁, 它在每次使用都需要显示的加锁和解锁, 而且提供了更高级的特性:公平锁, 定时锁, 有条件锁, 可轮询锁, 可中断锁. 可以有效避免死锁的活跃性问题.ReentrantLock实现了 Lock接口: 复制代码 代码如下: public interface Lock {  

  • java多线程并发中使用Lockers类将多线程共享资源锁定

    复制代码 代码如下: package com.yao; import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReadWriteLock;import java.util.c

  • 深入Synchronized和java.util.concurrent.locks.Lock的区别详解

    主要相同点:Lock能完成Synchronized所实现的所有功能.主要不同点:Lock有比Synchronized更精确的线程予以和更好的性能.Synchronized会自动释放锁,但是Lock一定要求程序员手工释放,并且必须在finally从句中释放.synchronized 修饰方法时 表示同一个对象在不同的线程中 表现为同步队列如果实例化不同的对象 那么synchronized就不会出现同步效果了.1.对象的锁 所有对象都自动含有单一的锁. JVM负责跟踪对象被加锁的次数.如果一个对象被

  • 基于java中BlockingQueue的使用介绍

    最近在维护一个java工程,在群里面也就聊起来java的优劣!无奈一些Java的终极粉丝,总是号称性能已经不必C++差,并且很多标准类库都是大师级的人写的,如何如何稳定等等.索性就认真研究一番,他们给我的一项说明就是,在线程之间投递消息,用java已经封装好的BlockingQueue,就足够用了. 既然足够用那就写代码测试喽,简简单单写一个小程序做了一番测试: 复制代码 代码如下: //默认包 import java.util.concurrent.*; import base.MyRunna

  • java线程阻塞中断与LockSupport使用介绍

    上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下Thread.interrupt和java 5之后的LockSupport的实现. 在介绍之前,先抛几个问题. Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常? Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING? 一般Thread编程需要关注

  • 线程阻塞唤醒工具 LockSupport使用详解

    目录 LockSupport 简介 回顾 synchronized 和 Lock LockSupport 和 synchronized 和 Lock 的阻塞方式对比 LockSupport 的使用 LockSupport 注意事项 许可证提前发放 许可证不会累计 LockSupport 底层实现 结语 LockSupport 简介 LockSupport 是 Java 并发编程中一个非常重要的组件,我们熟知的并发组件 Lock.线程池.CountDownLatch 等都是基于 AQS 实现的,而

  • Java线程阻塞方法sleep()与wait()的全面讲解

    一.前期基础知识储备 sleep()和wait()方法都是Java中造成线程阻塞的方法.感兴趣的读者可以参见笔者之前的文章<Java中什么方法导致线程阻塞>,里面详细讲述了为什么Java要造成线程阻塞和Java中造成线程阻塞的几种方法. (1)线程的生命周期 这是笔者在谷歌图片中找到的一张简单描述线程生命周期的图片,可以看到,一个线程正常的生命周期中会经历"创建""就绪""运行""阻塞""运行"

  • java线程池详解及代码介绍

    目录 一.线程池简介 二.四种常见的线程池详解 三.缓冲队列BlockingQueue和自定义线程池ThreadPoolExecutor 总结 一.线程池简介 线程池的概念 线程池就是首先创建一些线程,它们的集合称为线程池,使用线程池可以很好的提高性能,线程池在系统启动时既创建大量空闲的线程,程序将一个任务传给线程池.线程池就会启动一条线程来执行这个任务,执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务. 线程池的工作机制 在线程池的编程模式下,任务是提交给整个

  • 深入Java线程中断的本质与编程原则的概述

    在历史上,Java试图提供过抢占式限制中断,但问题多多,例如前文介绍的已被废弃的Thread.stop.Thread.suspend和 Thread.resume等.另一方面,出于Java应用代码的健壮性的考虑,降低了编程门槛,减少不清楚底层机制的程序员无意破坏系统的概率. 如今,Java的线程调度不提供抢占式中断,而采用协作式的中断.其实,协作式的中断,原理很简单,就是轮询某个表示中断的标记,我们在任何普通代码的中都可以实现. 例如下面的代码:    volatile bool isInter

  • Java线程中断的本质深入理解

    一.Java中断的现象 首先,看看Thread类里的几个方法: public static boolean interrupted 测试当前线程是否已经中断.线程的中断状态 由该方法清除.换句话说,如果连续两次调用该方法,则第二次调用将返回 false(在第一次调用已清除了其中断状态之后,且第二次调用检验完中断状态前,当前线程再次中断的情况除外). public boolean isInterrupted() 测试线程是否已经中断.线程的中断状态 不受该方法的影响. public void in

  • Java线程同步问题--哲学家就餐

    目录 1.场景 2.解决方案 方法一:限制吃饭的哲学家人数 方法二:找到一个左撇子哲学家 1.场景 有五位沉默的哲学家围坐在一张圆桌旁,他们一生都在吃东西和思考. 有五只筷子供他们使用,哲学家需要双手拿到一双筷子之后才能吃饭:吃完后会将筷子放下继续思考. 那么现在有一个问题,我们需要想出一种方案,如何保证哲学家们可以交替吃饭和思考,而不会被饿死. 上面这个问题是由Dijkstra提出的一个经典的线程同步问题. 2.解决方案 我们在开始想如何解决问题之前,可以先将这个场景通过代码还原,在程序中进行

  • java线程之死锁

    目录 一.什么是死锁 二.死锁产生的原因 三.死锁演示 1.synchronized 2.lock 四.如何查看死锁 1.使用jps命令找到运行程序的pid 2.jstack查看栈信息 一.什么是死锁 死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去.此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程. 二.死锁产生的原因 1.互斥条件:指进程对所分配到的资源进行排它性使用,即在一段时间内某

  • 简单聊一聊Java线程池ThreadPoolExecutor

    目录 简介 参数说明 如何创建线程池 拒绝策略 总结 简介 ThreadPoolExecutor是一个实现ExecutorService接口的线程池,ExecutorService是主要用来处理多线程任务的一个接口,通常比较简单是用法是由Executors工厂类去创建. 线程池主要解决了两个不同的问题: 在执行大量异步任务时,为了能够提高性能,通常会减少每个任务的调用开销. 提供了一系列多线程任务的管理方法,便于多任务执行时合理分配资源以及一些异常情况的处理.每个ThreadPoolExecut

  • 如何实现Java线程安全问题

    这篇文章主要介绍了如何实现Java线程安全问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 解决线程安全问题的第一种方案:使用同步代码块 格式: synchronized(锁对象) { 可能会出现线程安全问题的代码(访问了共享数据的代码) } 注意:代码块中的锁对象,可以是任意对象,但必须保证多个线程之间使用的是同一个 锁对象的作用是把同步代码块锁住,同一时间只能让一个线程在同步代码块中执行 package com.fgy.demo02; /

随机推荐