Java多线程之Disruptor入门

一、Disruptor简介

Disruptor目前是世界上最快的单机消息队列,由英国外汇交易公司LMAX开发,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

二、浅聊Disruptor的核心


  

Disruptor维护了一个环形队列RingBuffer,这个队列本质上是一个首位相连的数组。相比于LinkedBlockdingQueue,RingBuffer的数组结构在查找方面效率更高。此外,LinkedBlockingQueue需要维护一个头节点指针head和一个尾节点指针tail,而RingBuffer只需要维护一个sequence指向下一个可用的位置即可。所以从这两点来说,RingBuffer比LinkedBlockingQueue要快。

三、Disruptor使用

3.1 pom.xml

<dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.3</version>
        </dependency>

3.2 事件Event

Disruptor是基于事件的生产者消费者模型。其RingBuffer中存放的其实是将消息封装成的事件。这里定义了一个LongEvent,表示消息队列中存放的是long类型的数据。

public class LongEvent {
	private long value;

	public void set(long value) {
		this.value = value;
	}

    @Override
    public String toString() {
        return "LongEvent{" +
                "value=" + value +
                '}';
    }
}

3.3 EventFactory

实现EventFactory接口,定义Event工厂,用于填充队列。Event工厂其实是为了提高Disruptor的效率,初始化的时候,会调用Event工厂,对RingBuffer进行内存的提前分配,GC的频率会降低。

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent> {
	public LongEvent newInstance() {
		return new LongEvent();
	}
}

3.4 EventHandler

实现EventHandler接口,定义EventHandler(消费者),处理容器中的元素。

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent> {
	public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
		System.out.println("Event: " + event + ", sequence: " + sequence);
	}
}

3.5 使用Disruptor原始API发布消息

import cn.flying.space.disruptor.demo.LongEvent;
import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

/**
 * 定义一个生产者,往Disruptor中投递消息
 */
public class LongEventProducer {

    private RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer byteBuffer) {
        // 定位到下一个可存放的位置
        long sequence = ringBuffer.next();
        try {
            // 拿到该位置的event
            LongEvent event = ringBuffer.get(sequence);
            // 设置event的值
            event.set(byteBuffer.getLong(0));
        } finally {
            // 发布
            ringBuffer.publish(sequence);
        }
    }
}

import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
public class TestMain {
    public static void main(String[] args) throws InterruptedException {
        // 定义event工厂
        LongEventFactory factory = new LongEventFactory();
        // ringBuffer长度
        int bufferSize = 1024;
        // 构造一个Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory());
        // 绑定handler
        disruptor.handleEventsWith(new LongEventHandler());

        // 启动Disruptor
        disruptor.start();
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (long i = 0; true; i++) {
            byteBuffer.clear();
            byteBuffer.putLong(i);
            // 投递消息
            producer.onData(byteBuffer);
            Thread.sleep(1000);
        }
    }
}

3.6 使用Translators发布消息

import cn.flying.space.disruptor.demo.LongEvent;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

public class LongEventProducerUsingTranslator {
    private RingBuffer<LongEvent> ringBuffer;
    public LongEventProducerUsingTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
        @Override
        public void translateTo(LongEvent longEvent, long l, ByteBuffer byteBuffer) {
            longEvent.set(byteBuffer.getLong(0));
        }
    };

    public void onData(ByteBuffer byteBuffer) {
        ringBuffer.publishEvent(TRANSLATOR, byteBuffer);
    }
}

import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;

/**
 * @author ZhangSheng
 * @date 2021-4-26 14:23
 */
public class TestMain {

    public static void main(String[] args) throws InterruptedException {
        LongEventFactory factory = new LongEventFactory();
        int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
        disruptor.handleEventsWith(new LongEventHandler());

        disruptor.start();
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        LongEventProducerUsingTranslator producer = new LongEventProducerUsingTranslator(ringBuffer);
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);

        for (long i = 0L; true; i++) {
            byteBuffer.putLong(0, i);
            // 发布
            producer.onData(byteBuffer);
            Thread.sleep(1000);
        }
    }
}

到此这篇关于Java多线程之Disruptor入门的文章就介绍到这了,更多相关Java Disruptor入门内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java实战之敏感词过滤器

    一.导包 本文的敏感词过滤器用在SpringBoot项目中,因此,首先需要在pom.xml文件中导入如下依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.spring

  • JavaScript使用canvas绘制坐标和线

    本文实例为大家分享了JavaScript使用canvas绘制坐标和线的具体代码,供大家参考,具体内容如下 具体代码如下: <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>在指定位置画多个点</title> <style> canvas{ border: 1px dashed gray; } &

  • java基础之NIO介绍及使用

    一.NIO java.nio全称java non-blocking IO,是指jdk1.4 及以上版本里提供的新api(New IO) ,为所有的原始类型(boolean类型除外)提供缓存支持的数据容器,使用它可以提供非阻塞式的高伸缩性网络. 二.三大组件 NIO三大组件:Channel.Buffer.Selector 1.Channel 和Buffer Channel是一个对象,可以通过它读取和写入数据.拿 NIO 与原来的 I/O 做个比较,通道就像是流,而且他们面向缓冲区(Buffer)的

  • Java实战之用Spring开发条形码和验证码

    一.条形码 代码如下: import javax.swing.*; import java.awt.*; import java.awt.image.BufferedImage; import java.util.Random; public class Text extends JFrame { private static final int WIDTH=300;//窗口的宽度 private static final int HEIGHT=400;//窗口的高度 private stati

  • Java基础之List内元素的排序性能对比

    一.概述 在日常开发中,获取一批数据后,可能需要跟据一定规则对这批数据进行排序操作.在JAVA中,动态数组ArrayList经常被用来存储数据,因此如何高效对ArrayList中元素进行排序,形成符合条件的数据集是日常开发必须要考虑的问题.本文将分析常用ArrayList排序的几种方式,包括集合框架提供的Collections.sort方法.实现Comparable接口.以及JAVA 8 stream流中提供的排序方法,同时对比同一条件不同数据集大小的排序性能. 二.按条件排序几种方案及性能对比

  • spring与disruptor集成的简单示例

    disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用 A 向应用 B 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用Reactor BaseQueueHelper.java /** * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布. * * 调用init()时才真正启动线程开始处理 系统退出自动清理资源

  • java sleep()和wait()的区别点总结

    1.区别说明 wait()是Object的方法,sleep()是Thread的方法. wait()必须采用同步方法,不需要sleep()方法. 线程在同步方法中执行sleep()方法,不释放monitor锁,wait()方法释放monitor锁. 短暂休眠后,sleep()方法会主动退出阻塞,而wait()方法需要在没有指定wait时间的情况下被其他线程中断才能退出阻塞. 2.实例 import java.text.SimpleDateFormat; import java.util.Date;

  • Java实战之吃货联盟订餐系统

    一.前言 刚刚学完java基础入门,主要学习Java基本环境搭建,定义变量.选择结构,循环结构.数组等.本阶段结束尝试接触此次吃货联盟订餐系统,第一次敲这么长的代码,后面多见识见识应该就好了,多加努力啊! 吃货联盟订餐系统包含的功能:我要订餐.查看餐袋.签收订单.删除订单.我要点赞.退出系统 二.基本业务分析 我要订餐 1.输入订餐人姓名 2.将所有的菜品进行展示 3.输入要订的菜品序号 4.输入要订的菜品份数 5.输入配送时间 6.输入配送地址 7.显示订餐信息 查看餐袋 1.直接展示所有的订

  • JavaScript控制台的更多功能

    概述 你可能在JavaScript项目中都用了console.log.这是一种查看变量的值或程序运行中发生的事情的便捷方法.但是JavaScriptconsole 对象还有许多其他的功能,可以在处理项目时提供帮助.本文将会介绍一些我的最爱,希望你在工作时记得使用它们! 请注意,此处的例子适用于在浏览器中运行的 JavaScript.这与在 Node.js中运行的 JavaScript 相似,但是在 Node.js中的行为可能略有不同. console.log 在进入其他选项之前,让我们先回顾一下

  • Java实战之城市多音字处理

    一.需求 对城市名称转化为拼音的时候,当遇到多音字城市的时候,转化拼音就不是我们想要的了. 使用  pinyin4j 无法直接解决这个问题. .网上有很多维护多音字信息的,觉得麻烦. 如: 长沙 ====>"zhangsha" 厦门===>"shamen" 重庆===>"zhongqing" 二.导入 jpinyin 版本自选 <!-- https://mvnrepository.com/artifact/com.gith

随机推荐