Java多线程之Disruptor入门
一、Disruptor简介
Disruptor目前是世界上最快的单机消息队列,由英国外汇交易公司LMAX开发,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。
二、浅聊Disruptor的核心
Disruptor维护了一个环形队列RingBuffer,这个队列本质上是一个首位相连的数组。相比于LinkedBlockdingQueue,RingBuffer的数组结构在查找方面效率更高。此外,LinkedBlockingQueue需要维护一个头节点指针head和一个尾节点指针tail,而RingBuffer只需要维护一个sequence指向下一个可用的位置即可。所以从这两点来说,RingBuffer比LinkedBlockingQueue要快。
三、Disruptor使用
3.1 pom.xml
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.3</version> </dependency>
3.2 事件Event
Disruptor是基于事件的生产者消费者模型。其RingBuffer中存放的其实是将消息封装成的事件。这里定义了一个LongEvent,表示消息队列中存放的是long类型的数据。
public class LongEvent { private long value; public void set(long value) { this.value = value; } @Override public String toString() { return "LongEvent{" + "value=" + value + '}'; } }
3.3 EventFactory
实现EventFactory接口,定义Event工厂,用于填充队列。Event工厂其实是为了提高Disruptor的效率,初始化的时候,会调用Event工厂,对RingBuffer进行内存的提前分配,GC的频率会降低。
import com.lmax.disruptor.EventFactory; public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
3.4 EventHandler
实现EventHandler接口,定义EventHandler(消费者),处理容器中的元素。
import com.lmax.disruptor.EventHandler; public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println("Event: " + event + ", sequence: " + sequence); } }
3.5 使用Disruptor原始API发布消息
import cn.flying.space.disruptor.demo.LongEvent; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; /** * 定义一个生产者,往Disruptor中投递消息 */ public class LongEventProducer { private RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer byteBuffer) { // 定位到下一个可存放的位置 long sequence = ringBuffer.next(); try { // 拿到该位置的event LongEvent event = ringBuffer.get(sequence); // 设置event的值 event.set(byteBuffer.getLong(0)); } finally { // 发布 ringBuffer.publish(sequence); } } } import cn.flying.space.disruptor.demo.LongEvent; import cn.flying.space.disruptor.demo.LongEventFactory; import cn.flying.space.disruptor.demo.LongEventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import java.nio.ByteBuffer; import java.util.concurrent.Executors; public class TestMain { public static void main(String[] args) throws InterruptedException { // 定义event工厂 LongEventFactory factory = new LongEventFactory(); // ringBuffer长度 int bufferSize = 1024; // 构造一个Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory()); // 绑定handler disruptor.handleEventsWith(new LongEventHandler()); // 启动Disruptor disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer byteBuffer = ByteBuffer.allocate(8); for (long i = 0; true; i++) { byteBuffer.clear(); byteBuffer.putLong(i); // 投递消息 producer.onData(byteBuffer); Thread.sleep(1000); } } }
3.6 使用Translators发布消息
import cn.flying.space.disruptor.demo.LongEvent; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; public class LongEventProducerUsingTranslator { private RingBuffer<LongEvent> ringBuffer; public LongEventProducerUsingTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { @Override public void translateTo(LongEvent longEvent, long l, ByteBuffer byteBuffer) { longEvent.set(byteBuffer.getLong(0)); } }; public void onData(ByteBuffer byteBuffer) { ringBuffer.publishEvent(TRANSLATOR, byteBuffer); } } import cn.flying.space.disruptor.demo.LongEvent; import cn.flying.space.disruptor.demo.LongEventFactory; import cn.flying.space.disruptor.demo.LongEventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; import java.nio.ByteBuffer; /** * @author ZhangSheng * @date 2021-4-26 14:23 */ public class TestMain { public static void main(String[] args) throws InterruptedException { LongEventFactory factory = new LongEventFactory(); int bufferSize = 1024; Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE); disruptor.handleEventsWith(new LongEventHandler()); disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducerUsingTranslator producer = new LongEventProducerUsingTranslator(ringBuffer); ByteBuffer byteBuffer = ByteBuffer.allocate(8); for (long i = 0L; true; i++) { byteBuffer.putLong(0, i); // 发布 producer.onData(byteBuffer); Thread.sleep(1000); } } }
到此这篇关于Java多线程之Disruptor入门的文章就介绍到这了,更多相关Java Disruptor入门内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!