浅析Disruptor高性能线程消息传递并发框架

目录
  • 前言碎语
  • 核心概念?
    • 实践Disruptor
  • 文末结语

前言碎语

Disruptor是英国LMAX公司开源的高性能的线程间传递消息的并发框架,和jdk中的BlockingQueue非常类似,但是性能却是BlockingQueue不能比拟的,下面是官方给出的一分测试报告,可以直观的看出两者的性能区别:

Disruptor 项目地址:https://github.com/LMAX-Exchange/disruptor

核心概念?

这么性能炸裂的框架肯定要把玩一番,试用前,我们先了解下disruptor的主要的概念,然后结合楼主的weblog项目(之前使用的BlockingQueue),来实践下

RingBuffer:环形的缓冲区,消息事件信息的载体。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。

Event:定义生产者和消费者之间进行交换的数据类型。

EventFactory:创建事件的工厂类接口,由用户实现,提供具体的事件

EventHandler:事件处理接口,由用户实现,用于处理事件。

目前为止,我们了解以上核心内容即可,更多的详情,可以移步wiki文档:https://github.com/LMAX-Exchange/disruptor

核心架构图:

实践Disruptor

改造boot-websocket-log项目,这是一个典型的生产者消费者模式的实例。然后将BlockingQueue替换成Disruptor,完成功能,有兴趣的可以对比下。

第一步,定义事件类型

/**
 * Created by kl on 2018/8/24.
 * Content :进程日志事件内容载体
 */
public class LoggerEvent {
    private LoggerMessage log;
    public LoggerMessage getLog() {
        return log;
    }
    public void setLog(LoggerMessage log) {
        this.log = log;
    }
}

第二步,定义事件工厂

/**
 * Created by kl on 2018/8/24.
 * Content :进程日志事件工厂类
 */
public class LoggerEventFactory implements EventFactory{
    @Override
    public LoggerEvent newInstance() {
        return new LoggerEvent();
    }
}

第三步,定义数据处理器

/**
 * Created by kl on 2018/8/24.
 * Content :进程日志事件处理器
 */
@Component
public class LoggerEventHandler implements EventHandler{
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    @Override
    public void onEvent(LoggerEvent stringEvent, long l, boolean b) {
        messagingTemplate.convertAndSend("/topic/pullLogger",stringEvent.getLog());
    }
}

第四步,创建Disruptor实操类,定义事件发布方法,发布事件

/**
 * Created by kl on 2018/8/24.
 * Content :Disruptor 环形队列
 */
@Component
public class LoggerDisruptorQueue {
    private Executor executor = Executors.newCachedThreadPool();
    // The factory for the event
    private LoggerEventFactory factory = new LoggerEventFactory();
    private FileLoggerEventFactory fileLoggerEventFactory = new FileLoggerEventFactory();
    // Specify the size of the ring buffer, must be power of 2.
    private int bufferSize = 2 * 1024;
    // Construct the Disruptor
    private Disruptordisruptor = new Disruptor<>(factory, bufferSize, executor);;
    private DisruptorfileLoggerEventDisruptor = new Disruptor<>(fileLoggerEventFactory, bufferSize, executor);;
    private static  RingBufferringBuffer;
    private static  RingBufferfileLoggerEventRingBuffer;
    @Autowired
    LoggerDisruptorQueue(LoggerEventHandler eventHandler,FileLoggerEventHandler fileLoggerEventHandler) {
        disruptor.handleEventsWith(eventHandler);
        fileLoggerEventDisruptor.handleEventsWith(fileLoggerEventHandler);
        this.ringBuffer = disruptor.getRingBuffer();
        this.fileLoggerEventRingBuffer = fileLoggerEventDisruptor.getRingBuffer();
        disruptor.start();
        fileLoggerEventDisruptor.start();
    }
    public static void publishEvent(LoggerMessage log) {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try {
            LoggerEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
            // for the sequence
            event.setLog(log);  // Fill with data
        } finally {
            ringBuffer.publish(sequence);
        }
    }
    public static void publishEvent(String log) {
        if(fileLoggerEventRingBuffer == null) return;
        long sequence = fileLoggerEventRingBuffer.next();  // Grab the next sequence
        try {
            FileLoggerEvent event = fileLoggerEventRingBuffer.get(sequence); // Get the entry in the Disruptor
            // for the sequence
            event.setLog(log);  // Fill with data
        } finally {
            fileLoggerEventRingBuffer.publish(sequence);
        }
    }
}

文末结语

以上四步已经完成了Disruptor的使用,启动项目后就会不断的发布日志事件,处理器会将事件内容通过websocket传送到前端页面上展示,

boot-websocket-log项目地址:https://gitee.com/kailing/boot-websocket-log

Disruptor是高性能的进程内线程间的数据交换框架,特别适合日志类的处理。Disruptor也是从https://github.com/alipay/sofa-tracer了解到的,这是蚂蚁金服 团队开源的分布式链路追踪项目,其中日志处理部分就是使用了Disruptor。

以上就是浅析Disruptor高性能线程消息传递并发框架的详细内容,更多关于Disruptor线程消息传递并发框架的资料请关注我们其它相关文章!

(0)

相关推荐

  • 从实战角度详解Disruptor高性能队列

    目录 一.背景 二.Java内置队列 三.ArrayBlockingQueue的问题 1.加锁 a.关于锁和CAS b.锁 c.原子变量 2.伪共享 a.什么是共享 b.缓存行 c.什么是伪共享 四.Disruptor的设计方案 1.一个生产者 2.多个生产者 a.读数据 b.写数据 五.总结 六.性能 七.等待策略 生产者的等待策略 消费者的等待策略 八.Log4j 2应用场景 1.性能差异 一.背景 Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的

  • Java多线程之Disruptor入门

    一.Disruptor简介 Disruptor目前是世界上最快的单机消息队列,由英国外汇交易公司LMAX开发,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级).基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注.2011年,企业应用软件专家Martin Fowler专门撰写长文介绍.同年它还获得了Oracle官方的Duke大奖.目前,包括Apache Storm.Camel.Log4j 2在内的很多知名项

  • 从log4j2到Disruptor详解

    目录 log4j2异步日志简要回顾 Disruptor在log4j2中的应用 异步日志Disruptor启动 异步日志Disruptor写入 架构及流程 Disruptor为什么这么快? Log4j2为什么这么快? log4j2实现原理可查看://www.jb51.net/article/232602.htm 文章同样基于log4j-2.7版本,disruptor-3.3.6 相信看过log4j2的源码后大家应该明白为什么第二代日志性能会提升那么多,这其中最大的功臣莫过于Disruptor并发编

  • spring与disruptor集成的简单示例

    disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用 A 向应用 B 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用Reactor BaseQueueHelper.java /** * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布. * * 调用init()时才真正启动线程开始处理 系统退出自动清理资源

  • 浅析Disruptor高性能线程消息传递并发框架

    目录 前言碎语 核心概念? 实践Disruptor 文末结语 前言碎语 Disruptor是英国LMAX公司开源的高性能的线程间传递消息的并发框架,和jdk中的BlockingQueue非常类似,但是性能却是BlockingQueue不能比拟的,下面是官方给出的一分测试报告,可以直观的看出两者的性能区别: Disruptor 项目地址:https://github.com/LMAX-Exchange/disruptor 核心概念? 这么性能炸裂的框架肯定要把玩一番,试用前,我们先了解下disru

  • 浅析java中常用的定时任务框架-单体

    目录 一.阅读收获 二.本章源码下载 三.Timer+TimerTask 四.ScheduledExecutorService 五.Spring Task 5.1 单线程串行执行-@Scheduled 5.2 多线程并发运行-@Scheduled+配置定时器的程池(推荐) 5.3 多线程并发执行-@Scheduled+@Async+配置异步线程池 5.4 @Scheduled参数解析 六.Quartz 6.1. 创建任务类 6.2. 配置任务描述和触发器 一.阅读收获 1. 了解常用的单体应用定

  • Java线程的并发工具类实现原理解析

    目录 一.fork/join 1. Fork-Join原理 2. 工作窃取 3. 代码实现 二.CountDownLatch 三.CyclicBarrier 四.Semaphore 五.Exchange 六.Callable.Future.FutureTask 在JDK的并发包里提供了几个非常有用的并发工具类.CountDownLatch.CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段.本章会配合一些应

  • 剖析Fork join并发框架工作窃取算法

    目录 什么是Fork/Join框架 工作窃取算法 Fork/Join框架的介绍 使用Fork/Join框架 Fork/Join框架的异常处理 Fork/Join框架的实现原理 Fork/Join源码剖析与算法解析 与ThreadPool的区别 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,F

  • Java线程池并发执行多个任务方式

    目录 Java线程池并发执行多个任务 Java线程池的正确使用 1.Executors存在什么问题 2.Executors为什么存在缺陷 3. 线程池参数详解 4. 线程池正确打开方式 Java线程池并发执行多个任务 Java在语言层面提供了多线程的支持,线程池能够避免频繁的线程创建和销毁的开销,因此很多时候在项目当中我们是使用的线程池去完成多线程的任务. Java提供了Executors 框架提供了一些基础的组件能够轻松的完成多线程异步的操作,Executors提供了一系列的静态工厂方法能够获

  • 基于线程、并发的基本概念(详解)

    什么是线程? 提到"线程"总免不了要和"进程"做比较,而我认为在Java并发编程中混淆的不是"线程"和"进程"的区别,而是"任务(Task)".进程是表示资源分配的基本单位.而线程则是进程中执行运算的最小单位,即执行处理机调度的基本单位.关于"线程"和"进程"的区别耳熟能详,说来说去就一句话:通常来讲一个程序有一个进程,而一个进程可以有多个线程. 但是"任务

  • Golang WorkerPool线程池并发模式示例详解

    目录 正文 处理CVS文件记录 获取测试数据 线程池耗时差异 正文 Worker Pools 线程池是一种并发模式.该模式中维护了固定数量的多个工作器,这些工作器等待着管理者分配可并发执行的任务.该模式避免了短时间任务创建和销毁线程的代价. 在 golang 中,我们使用 goroutine 和 channel 来构建这种模式.工作器 worker 由一个 goroutine 定义,该 goroutine 通过 channel 获取数据. 处理CVS文件记录 接下来让我们通过一个例子,来进一步理

  • 浅析Tomcat使用线程池配置高并发连接

    目录 Tomcat使用线程池配置高并发连接 1:配置executor属性 2:配置Connector 一.Tomcat内存优化 1.JAVA_OPTS参数说明 二.Tomcat并发优化 1.Tomcat连接相关参数 1.参数说明 2.Tomcat中的配置示例 2.调整连接器connector的并发处理能力 1.参数说明 2.Tomcat中的配置示例 3.Tomcat缓存优化 1.参数说明 2.Tomcat中的配置示例 4.参考配置 1.旧有的配置 2.更改后的配置 Tomcat使用线程池配置高并

  • Java并发之线程池Executor框架的深入理解

    线程池 无限制的创建线程 若采用"为每个任务分配一个线程"的方式会存在一些缺陷,尤其是当需要创建大量线程时: 线程生命周期的开销非常高 资源消耗 稳定性 引入线程池 任务是一组逻辑工作单元,线程则是使任务异步执行的机制.当存在大量并发任务时,创建.销毁线程需要很大的开销,运用线程池可以大大减小开销. Executor框架 说明: Executor 执行器接口,该接口定义执行Runnable任务的方式. ExecutorService 该接口定义提供对Executor的服务. Sched

随机推荐