spring与disruptor集成的简单示例

disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用 A 向应用 B 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用Reactor

BaseQueueHelper.java

/**
 * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。
 *
 * 调用init()时才真正启动线程开始处理 系统退出自动清理资源.
 *
 * @author xielongwang
 * @create 2018-01-18 下午3:49
 * @email xielong.wang@nvr-china.com
 * @description
 */
public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> {

  /**
   * 记录所有的队列,系统退出时统一清理资源
   */
  private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>();
  /**
   * Disruptor 对象
   */
  private Disruptor<E> disruptor;
  /**
   * RingBuffer
   */
  private RingBuffer<E> ringBuffer;
  /**
   * initQueue
   */
  private List<D> initQueue = new ArrayList<D>();

  /**
   * 队列大小
   *
   * @return 队列长度,必须是2的幂
   */
  protected abstract int getQueueSize();

  /**
   * 事件工厂
   *
   * @return EventFactory
   */
  protected abstract EventFactory<E> eventFactory();

  /**
   * 事件消费者
   *
   * @return WorkHandler[]
   */
  protected abstract WorkHandler[] getHandler();

  /**
   * 初始化
   */
  public void init() {
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();
    disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy());
    disruptor.setDefaultExceptionHandler(new MyHandlerException());
    disruptor.handleEventsWithWorkerPool(getHandler());
    ringBuffer = disruptor.start();

    //初始化数据发布
    for (D data : initQueue) {
      ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
        @Override
        public void translateTo(E event, long sequence, D data) {
          event.setValue(data);
        }
      }, data);
    }

    //加入资源清理钩子
    synchronized (queueHelperList) {
      if (queueHelperList.isEmpty()) {
        Runtime.getRuntime().addShutdownHook(new Thread() {
          @Override
          public void run() {
            for (BaseQueueHelper baseQueueHelper : queueHelperList) {
              baseQueueHelper.shutdown();
            }
          }
        });
      }
      queueHelperList.add(this);
    }
  }

  /**
   * 如果要改变线程执行优先级,override此策略. YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU,
   * 慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景.
   *
   * @return WaitStrategy
   */
  protected abstract WaitStrategy getStrategy();

  /**
   * 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.
   */
  public synchronized void publishEvent(D data) {
    if (ringBuffer == null) {
      initQueue.add(data);
      return;
    }
    ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
      @Override
      public void translateTo(E event, long sequence, D data) {
        event.setValue(data);
      }
    }, data);
  }

  /**
   * 关闭队列
   */
  public void shutdown() {
    disruptor.shutdown();
  }
}

EventFactory.java

/**
 * @author xielongwang
 * @create 2018-01-18 下午6:24
 * @email xielong.wang@nvr-china.com
 * @description
 */
public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> {

  @Override
  public SeriesDataEvent newInstance() {
    return new SeriesDataEvent();
  }
}

MyHandlerException.java

public class MyHandlerException implements ExceptionHandler {

  private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);

  /*
   * (non-Javadoc) 运行过程中发生时的异常
   *
   * @see
   * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable
   * , long, java.lang.Object)
   */
  @Override
  public void handleEventException(Throwable ex, long sequence, Object event) {
    ex.printStackTrace();
    logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
  }

  /*
   * (non-Javadoc) 启动时的异常
   *
   * @see
   * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.
   * Throwable)
   */
  @Override
  public void handleOnStartException(Throwable ex) {
    logger.error("start disruptor error ==[{}]!", ex.getMessage());
  }

  /*
   * (non-Javadoc) 关闭时的异常
   *
   * @see
   * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang
   * .Throwable)
   */
  @Override
  public void handleOnShutdownException(Throwable ex) {
    logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
  }
}

SeriesData.java (代表应用A发送给应用B的消息)

public class SeriesData {
  private String deviceInfoStr;
  public SeriesData() {
  }

  public SeriesData(String deviceInfoStr) {
    this.deviceInfoStr = deviceInfoStr;
  }

  public String getDeviceInfoStr() {
    return deviceInfoStr;
  }

  public void setDeviceInfoStr(String deviceInfoStr) {
    this.deviceInfoStr = deviceInfoStr;
  }

  @Override
  public String toString() {
    return "SeriesData{" +
        "deviceInfoStr='" + deviceInfoStr + '\'' +
        '}';
  }
}

SeriesDataEvent.java

public class SeriesDataEvent extends ValueWrapper<SeriesData> {
}

SeriesDataEventHandler.java

public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> {
  private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class);
  @Autowired
  private DeviceInfoService deviceInfoService;

  @Override
  public void onEvent(SeriesDataEvent event) {
    if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) {
      logger.warn("receiver series data is empty!");
    }
    //业务处理
    deviceInfoService.processData(event.getValue().getDeviceInfoStr());
  }
}

SeriesDataEventQueueHelper.java

@Component
public class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean {
  private static final int QUEUE_SIZE = 1024;
  @Autowired
  private List<SeriesDataEventHandler> seriesDataEventHandler;

  @Override
  protected int getQueueSize() {
    return QUEUE_SIZE;
  }

  @Override
  protected com.lmax.disruptor.EventFactory eventFactory() {
    return new EventFactory();
  }

  @Override
  protected WorkHandler[] getHandler() {
    int size = seriesDataEventHandler.size();
    SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]);
    return paramEventHandlers;
  }

  @Override
  protected WaitStrategy getStrategy() {
    return new BlockingWaitStrategy();
    //return new YieldingWaitStrategy();
  }

  @Override
  public void afterPropertiesSet() throws Exception {
    this.init();
  }
}

ValueWrapper.java

public abstract class ValueWrapper<T> {
  private T value;
  public ValueWrapper() {}
  public ValueWrapper(T value) {
    this.value = value;
  }

  public T getValue() {
    return value;
  }

  public void setValue(T value) {
    this.value = value;
  }
}

DisruptorConfig.java

@Configuration
@ComponentScan(value = {"com.portal.disruptor"})
//多实例几个消费者
public class DisruptorConfig {

  /**
   * smsParamEventHandler1
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler1() {
    return new SeriesDataEventHandler();
  }

  /**
   * smsParamEventHandler2
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler2() {
    return new SeriesDataEventHandler();
  }

  /**
   * smsParamEventHandler3
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler3() {
    return new SeriesDataEventHandler();
  }

  /**
   * smsParamEventHandler4
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler4() {
    return new SeriesDataEventHandler();
  }

  /**
   * smsParamEventHandler5
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler5() {
    return new SeriesDataEventHandler();
  }
}

测试

  //注入SeriesDataEventQueueHelper消息生产者
  @Autowired
  private SeriesDataEventQueueHelper seriesDataEventQueueHelper;

  @RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
  public DataResponseVo<String> receiverDeviceData(@RequestBody String deviceData) {
    long startTime1 = System.currentTimeMillis();

    if (StringUtils.isEmpty(deviceData)) {
      logger.info("receiver data is empty !");
      return new DataResponseVo<String>(400, "failed");
    }
    seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData));
    long startTime2 = System.currentTimeMillis();
    logger.info("receiver data ==[{}] millisecond ==[{}]", deviceData, startTime2 - startTime1);
    return new DataResponseVo<String>(200, "success");
  }

应用A通过/data 接口把数据发送到应用B ,然后通过seriesDataEventQueueHelper 把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用A. 可接受消息丢失, 可以通过扩展SeriesDataEventQueueHelper来达到对disruptor队列的监控

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • spring与disruptor集成的简单示例

    disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用 A 向应用 B 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用Reactor BaseQueueHelper.java /** * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布. * * 调用init()时才真正启动线程开始处理 系统退出自动清理资源

  • SpringCloudAlibaba整合Feign实现远程HTTP调用的简单示例

    目录 前言 环境 简单示例 Feign 的组成和支持的配置项 Feign 的组成 Feign 支持的配置项 Feign 的日志 Feign 的日志级别 自定义配置 Feign 的日志级别 全局配置 Feign 的日志级别 Feign 日志级别配置方式总结 项目源码 前言 Feign是Netflix开源的声明式HTTP客户端,致力于让编写http client更加简单,Feign可以通过声明接口自动构造请求的目标地址完成请求 环境 Spring Cloud Hoxton.SR9 + Spring

  • Spring Boot集成Kafka的示例代码

    本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记 系统环境 使用远程服务器上搭建的kafka服务 Ubuntu 16.04 LTS kafka_2.12-0.11.0.0.tgz zookeeper-3.5.2-alpha.tar.gz 集成过程 1.创建spring boot工程,添加相关依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns=&qu

  • 使用Spring Boot集成FastDFS的示例代码

    这篇文章我们介绍如何使用Spring Boot将文件上传到分布式文件系统FastDFS中. 这个项目会在上一个项目的基础上进行构建. 1.pom包配置 我们使用Spring Boot最新版本1.5.9.jdk使用1.8.tomcat8.0. <dependency> <groupId>org.csource</groupId> <artifactId>fastdfs-client-java</artifactId> <version>

  • spring boot与kafka集成的简单实例

    本文介绍了spring boot与kafka集成的简单实例,分享给大家,具体如下: 引入相关依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.

  • spring boot的拦截器简单使用示例代码

    1.spring boot拦截器默认有: HandlerInterceptorAdapter AbstractHandlerMapping UserRoleAuthorizationInterceptor LocaleChangeInterceptor ThemeChangeInterceptor 其中 LocaleChangeInterceptor 和 ThemeChangeInterceptor 比较常用. 2.实现自定义拦截器只需要3步: 1).创建我们自己的拦截器类并实现 Handler

  • Java Spring开发环境搭建及简单入门示例教程

    本文实例讲述了Java Spring开发环境搭建及简单入门示例.分享给大家供大家参考,具体如下: 前言 虽然之前用过Spring,但是今天试着去搭建依然遇到了困难,而且上网找教程,很多写的是在web里使用Spring MVC的示例,官方文档里的getting start一开始就讲原理去了(可能打开的方法不对).没办法,好不容易实验成功了,记下来免得自己以后麻烦. 添加依赖包 进入spring官网,切换到projects下点击 spring framework.官网上写的是以maven依赖的形式写

  • Spring Boot2.x集成JPA快速开发的示例代码

     什么是JPA 一种规范,并非ORM框架,也就是ORM上统一的规范 spring-boot-starter-data-jpa 是Spring Boot的项目,包含了spring-data-jpa和一些其他依赖用于Spring Boot项目 spring-data-jpa 是Spring Data的项目,就是本体,用于任何项目 解决 为了执行简单查询分页,编写太多重复代码 基于JPA的数据访问层的增强支持 用了之后可以做什么,为什么要用?如下代码解释 实体类 package com.example

  • Springboot WebFlux集成Spring Security实现JWT认证的示例

    1 简介 在之前的文章<Springboot集成Spring Security实现JWT认证>讲解了如何在传统的Web项目中整合Spring Security和JWT,今天我们讲解如何在响应式WebFlux项目中整合.二者大体是相同的,主要区别在于Reactive WebFlux与传统Web的区别. 2 项目整合 引入必要的依赖: <dependency> <groupId>org.springframework.boot</groupId> <art

  • Spring 5.0集成log4j2日志管理的示例代码

    在使用Spring框架的时候,我们可以很方便的配置log4j来进行日志管理. Spring 5.0发布一段时间了,最近将项目从Spring 4.3升级到Spring 5.0,Spring 4.3集成log4j所用的类org.springframework.web.util.Log4jConfigListener在Spring 5.0版本已经删除,而且log4j 1.x版已经不再更新.我们将log4j-1.x升级为log4j-2.x 先引入log4j 2的三个jar包 log4j-api-2.10

随机推荐