浅谈Springboot整合RocketMQ使用心得

一、阿里云官网---帮助文档

https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh

按照官网步骤,创建Topic、申请发布(生产者)、申请订阅(消费者)

二、代码

1、配置:

public class MqConfig {
  /**
   * 启动测试之前请替换如下 XXX 为您的配置
   */
  public static final String PUBLIC_TOPIC = "test";//公网测试
  public static final String PUBLIC_PRODUCER_ID = "PID_SCHEDULER";
  public static final String PUBLIC_CONSUMER_ID = "CID_SERVICE";

  public static final String ACCESS_KEY = "123";
  public static final String SECRET_KEY = "123";
  public static final String TAG = "";
  public static final String THREAD_NUM = "25";//消费端线程数
  /**
   * ONSADDR 请根据不同Region进行配置
   * 公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
   * 公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
   * 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
   * 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
   */
  public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal";
}

ONSADDR 阿里云用 公有云生产,测试用公网

不同的业务可以设置不同的tag,但是如果发送消息量大的话,建议新建TOPIC

2、生产者

方式1:

配置文件:producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
  <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean"
     init-method="start" destroy-method="shutdown">
    <property name="properties">
      <map>
        <entry key="ProducerId" value="" /> <!-- PID,请替换 -->
        <entry key="AccessKey" value="" /> <!-- ACCESS_KEY,请替换 -->
        <entry key="SecretKey" value="" /> <!-- SECRET_KEY,请替换 -->
        <!--PropertyKeyConst.ONSAddr 请根据不同Region进行配置
         公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
         公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
         杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
         深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal -->
        <entry key="ONSAddr" value="http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/>
      </map>
    </property>
  </bean>
</beans>

启动方式1,在使用类的全局里设置:

//初始化生产者
  private ApplicationContext ctx;
  private ProducerBean producer;

  @Value("${producerConfig.enabled}")//开关,spring配置项,true为开启,false关闭
  private boolean producerConfigEnabled;

  @PostConstruct
  public void init(){
    if (true == producerConfigEnabled) {
      ctx = new ClassPathXmlApplicationContext("producer.xml");
      producer = (ProducerBean) ctx.getBean("producer");
    }
  }

PS:最近发现一个坑,如果producer用上面这种方式启动的话,一旦启动的多了,会造成fullGC,所以可以换成下面这种注解方式启动,在用到的地方手动start、shutdown

方式2:配置类(不需要xml)

@Configuration
public class ProducerBeanConfig {

  @Value("${openservices.ons.producerBean.producerId}")
  private String producerId;

  @Value("${openservices.ons.producerBean.accessKey}")
  private String accessKey;

  @Value("${openservices.ons.producerBean.secretKey}")
  private String secretKey;

  private ProducerBean producerBean;

  @Value("${openservices.ons.producerBean.ONSAddr}")
  private String ONSAddr;

  @Bean
  public ProducerBean oneProducer() {
    ProducerBean producerBean = new ProducerBean();
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.ProducerId, producerId);
    properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
    properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
    properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);

    producerBean.setProperties(properties);
    return producerBean;
  }
}

PS:经过这次双11发现,以上2种方式在大数据量,多线程情况下都不太适用, 性能很差,所以推荐用3

方式3:(不需要xml)

@Component
public class ProducerBeanSingleTon {

  @Value("${openservices.ons.producerBean.producerId}")
  private String producerId;

  @Value("${openservices.ons.producerBean.accessKey}")
  private String accessKey;

  @Value("${openservices.ons.producerBean.secretKey}")
  private String secretKey;

  @Value("${openservices.ons.producerBean.ONSAddr}")
  private String ONSAddr;

  private static Producer producer;

  private static class SingletonHolder {
    private static final ProducerBeanSingleTon INSTANCE = new ProducerBeanSingleTon();
  }

  private ProducerBeanSingleTon (){}

  public static final ProducerBeanSingleTon getInstance() {
    return SingletonHolder.INSTANCE;
  }

  @PostConstruct
  public void init(){
    // producer 实例配置初始化
    Properties properties = new Properties();
    //您在控制台创建的Producer ID
    properties.setProperty(PropertyKeyConst.ProducerId, producerId);
    // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
    // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
    //设置发送超时时间,单位毫秒
    properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
    // 设置 TCP 接入域名(此处以公共云生产环境为例)
    properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);
    producer = ONSFactory.createProducer(properties);
    // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可
    producer.start();
  }

  public Producer getProducer(){
    return producer;
  }
}

spring配置

spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect

consumerConfig.enabled = true

producerConfig.enabled = true #方式1:

scheduling.enabled = false

#方式2、3:rocketMQ \u516C\u7F51\u914D\u7F6E
openservices.ons.producerBean.producerId = pid
openservices.ons.producerBean.accessKey =
openservices.ons.producerBean.secretKey = 

openservices.ons.producerBean.ONSAddr = 公网、杭州公有云生产

方式1投递消息代码:

 try {
   String jsonC = JsonUtils.toJson(elevenMessage);
   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
   SendResult sendResult = producer.send(message);
   if (sendResult != null) {
     logger.info(".Send mq message success!”;

   } else {
     logger.warn(".sendResult is null.........");
   }
   } catch (Exception e) {
      logger.warn("DoubleElevenAllPreService");
      Thread.sleep(1000);//如果有异常,休眠1秒
   }

方式2投递消息代码:(可以每发1000个启动/关闭一次)

   producerBean.start();
try {
   String jsonC = JsonUtils.toJson(elevenMessage);
   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
   SendResult sendResult = producer.send(message);
   if (sendResult != null) {
     logger.info(".Send mq message success!”;

   } else {
     logger.warn(".sendResult is null.........");
   }
   } catch (Exception e) {
      logger.warn("DoubleElevenAllPreService");
      Thread.sleep(1000);//如果有异常,休眠1秒
   }

   producerBean.shutdown();

方式3:投递消息

 try {
   String jsonC = JsonUtils.toJson(elevenMessage);
   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
   Producer producer = ProducerBeanSingleTon.getInstance().getProducer();
   SendResult sendResult = producer.send(message);
   if (sendResult != null) {
     logger.info("DoubleElevenMidService.Send mq message success! Topic is:"”;

   } else {
     logger.warn("DoubleElevenMidService.sendResult is null.........");
   }
   } catch (Exception e) {
     logger.error("DoubleElevenMidService Thread.sleep 1 s___error is "+e.getMessage(), e);
     Thread.sleep(1000);//如果有异常,休眠1秒
   }

发送消息的代码一定要捕获异常,不然会重复发送。

这里的TOPIC用自己创建的,elevenMessage是要发送的内容,我这里是自己建的对象

3、消费者

配置启动类:

@Configuration
@ConditionalOnProperty(value = "consumerConfig.enabled", havingValue = "true", matchIfMissing = true)
public class ConsumerConfig {

  private Logger logger = LoggerFactory.getLogger(LoggerAppenderType.smsdist.name());

  @Bean
  public Consumer consumerFactory(){//不同消费者 这里不能重名
    Properties consumerProperties = new Properties();
    consumerProperties.setProperty(PropertyKeyConst.ConsumerId, MqConfig.CONSUMER_ID);
    consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY);
    consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY);
    //consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums,MqConfig.THREAD_NUM);
    consumerProperties.setProperty(PropertyKeyConst.ONSAddr, MqConfig.ONSADDR);
    Consumer consumer = ONSFactory.createConsumer(consumerProperties);
    consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new DoubleElevenMessageListener());//new对应的监听器
    consumer.start();
    logger.info("ConsumerConfig start success.");

    return consumer;

  }
}

CID和ONSADDR一点要选对,用自己的,消费者线程数等可以在这里配置

创建消息监听器类,消费消息:

@Component
public class MessageListener implements MessageListener {
  private Logger logger = LoggerFactory.getLogger("remind");

  protected static ElevenReposity elevenReposity;
  @Resource
  public void setElevenReposity(ElevenReposity elevenReposity){
    MessageListener .elevenReposity=elevenReposity;
  }

  @Override
  public Action consume(Message message, ConsumeContext consumeContext) {

    if(message.getTopic().equals("自己的TOPIC")){//避免消费到其他消息 json转换报错
      try {

      byte[] body = message.getBody();
      String res = new String(body);

      //res 是生产者传过来的消息内容

        //业务代码

      }else{
        logger.warn("!");
      }

      } catch (Exception e) {
        logger.error("MessageListener.consume error:" + e.getMessage(), e);
      }

      logger.info("MessageListener.Receive message”);
      //如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
      return Action.CommitMessage;
    }else{
      logger.warn();
      return Action.ReconsumeLater;
    }

  }

注意,由于消费者是多线程的,所以对象要用static+set注入,把对象的级别提升到进程,这样多个线程就可以共用,但是无法调用父类的方法和变量

消费者状态可以查看消费者是否连接成功,消费是否延迟,消费速度等

重置消费位点可以清空所有消息

三、注意事项

1、发送的消息体 最大为256KB

2、消息最多存在3天

3、消费端默认线程数是20

4、如果运行过程中出现java挂掉或者cpu占用异常高,可以在发送消息的时候,每发送1000条让线程休息1s

5、本地测试或启动的时候,把ONSADDR换成公网,不然报错无法启动

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

您可能感兴趣的文章:

  • springboot整合H2内存数据库实现单元测试与数据库无关性
  • Docker 部署 SpringBoot 项目整合 Redis 镜像做访问计数示例代码
  • Spring Boot2.0整合ES5实现文章内容搜索实战
  • 浅谈Spring Boot 整合ActiveMQ的过程
  • 详解Springboot整合Dubbo之代码集成和发布
  • springboot整合mybatis将sql打印到日志的实例详解
  • Springboot整合Dubbo教程之项目创建和环境搭建
  • springboot整合rabbitmq的示例代码
  • Spring + Spring Boot + MyBatis + MongoDB的整合教程
(0)

相关推荐

  • 浅谈Spring Boot 整合ActiveMQ的过程

    RabbitMQ是比较常用的AMQP实现,这篇文章是一个简单的Spring boot整合RabbitMQ的教程. 安装ActiveMQ服务器,(也可以不安装,如果不安装,会使用内存mq) 构建Spring boot项目,增加依赖项,只需要添加这一项即可 <!-- 添加acitivemq依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring

  • 详解Springboot整合Dubbo之代码集成和发布

    本文介绍了Springboot整合Dubbo之代码集成和发布,分享给大家,具体如下: 1. boot-dubbo-api相关 打开boot-dubbo-api项目,正在src/main/java下创建一个包,并创建你需要dubbo暴露的接口TestService.java,并创建一个实体类用于测试User.java.如下图所示: 创建文件和包结构 User.java package com.boot.domain; import lombok.Data; import java.io.Seria

  • Spring Boot2.0整合ES5实现文章内容搜索实战

    一.文章内容搜索思路 上一篇讲了在怎么在 Spring Boot 2.0 上整合 ES 5 ,这一篇聊聊具体实战.简单讲下如何实现文章.问答这些内容搜索的具体实现.实现思路很简单: 基于「短语匹配」并设置最小匹配权重值 哪来的短语,利用 IK 分词器分词 基于 Fiter 实现筛选 基于 Pageable 实现分页排序 这里直接调用搜索的话,容易搜出不尽人意的东西.因为内容搜索关注内容的连接性.所以这里处理方法比较 low ,希望多交流一起实现更好的搜索方法.就是通过分词得到很多短语,然后利用短

  • Docker 部署 SpringBoot 项目整合 Redis 镜像做访问计数示例代码

    最终效果如下 大概就几个步骤 1.安装 Docker CE 2.运行 Redis 镜像 3.Java 环境准备 4.项目准备 5.编写 Dockerfile 6.发布项目 7.测试服务 环境准备 系统:Ubuntu 17.04 x64 Docker 17.12.0-ce IP:45.32.31.101 一.安装 Docker CE 国内不建议使用:"脚本进行安装",会下载安装很慢,使用步骤 1 安装,看下面的链接:常规安装方式 1.常规安装方式 Ubuntu 17.04 x64 安装

  • Spring + Spring Boot + MyBatis + MongoDB的整合教程

    前言 我之前是学Spring MVC的,后面听同学说Spring Boot挺好用,极力推荐我学这个鬼.一开始,在网上找Spring Boot的学习资料,他们博文写得不是说不好,而是不太详细. 我就在想我要自己写一篇尽可能详细的文章出来,下面话不多说了,来一看看详细的介绍吧. 技术栈 Spring Spring Boot MyBatis MongoDB MySQL 设计模式 MVC 功能 注册(用户完成注册后是默认未激活的,程序有个定时器在检测没有激活的用户,然后发一次邮件提醒用户激活) 登录 发

  • springboot整合H2内存数据库实现单元测试与数据库无关性

    一.新建spring boot工程 新建工程的时候,需要加入JPA,H2依赖 二.工程结构 pom文件依赖如下: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:s

  • springboot整合mybatis将sql打印到日志的实例详解

    在前台请求数据的时候,sql语句一直都是打印到控制台的,有一个想法就是想让它打印到日志里,该如何做呢? 见下面的mybatis配置文件: <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-

  • springboot整合rabbitmq的示例代码

    概述 RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,或者简单地将作业队列以便让分布式服务器进行处理. 它现实了AMQP协议,并且遵循Mozilla Public License开源协议,它支持多种语言,可以方便的和spring集成. 消息队列使用消息将应用程序连接起来,这些消息通过像RabbitMQ这样的消息代理服务器在应用程序之间路由. 基本概念 Broker 用来处理数据的消息队列服务器实体 vhost 由RabbitMQ服务器创建的虚拟消息

  • Springboot整合Dubbo教程之项目创建和环境搭建

    本文介绍了Springboot整合Dubbo教程之项目创建和环境搭建,分享给大家,具体如下: 1. 使用IDEA新建一个Maven项目 新建项目 选择Maven后,点击next下一步 选择项目类型 配置项目的Maven坐标 设置项目名称和保存位置 修改项目的pom.xml文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM

  • 浅谈Springboot整合RocketMQ使用心得

    一.阿里云官网---帮助文档 https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh 按照官网步骤,创建Topic.申请发布(生产者).申请订阅(消费者) 二.代码 1.配置: public class MqConfig { /** * 启动测试之前请替换如下 XXX 为您的配置 */ public static final String PUBLIC_TOPIC = "test"

  • 浅谈springboot项目中定时任务如何优雅退出

    在一个springboot项目中需要跑定时任务处理批数据时,突然有个Kill命令或者一个Ctrl+C的命令,此时我们需要当批数据处理完毕后才允许定时任务关闭,也就是当定时任务结束时才允许Kill命令生效. 启动类 启动类上我们获取到相应的上下文,捕捉相应命令.在这里插入代码片 @SpringBootApplication /**指定mapper对应包的路径*/ @MapperScan("com.youlanw.kz.dao") /**开启计划任务*/ @EnableScheduling

  • 浅谈springboot @Repository与@Mapper的区别

    目录 1.@Repository 2.@Mapper 3.区别 相同点: 不同点: 4.解决使用@mapper接口时,注入mapper爆红问题 今天在用springboot整合mybatis时,mapper接口上用的注解是以前学spring时用的@Repository注解,可一运行,就出现了错误. Error starting ApplicationContext. To display the conditions report re-run your application with 'de

  • 浅谈springboot 属性定义

    本文介绍了浅谈springboot 属性定义,分享给大家.具体如下: 简单属性自定义 一般属性可以定义在通用的配置文件application.properties里面 # 自定义属性 boot.userName = yuxi 如何获取呢? 按照spring的获取方式就可以了,很简单 @Value(value = "${boot.userName}") private String userName; 复杂属性自定义 在配置里配置属性 # 复杂属性 test.id=1 test.name

  • 浅谈SpringBoot处理url中的参数的注解

    1.介绍几种如何处理url中的参数的注解 @PathVaribale 获取url中的数据 @RequestParam 获取请求参数的值 @GetMapping 组合注解,是 @RequestMapping(method = RequestMethod.GET) 的缩写 (1)PathVaribale 获取url中的数据 看一个例子,如果我们需要获取Url=localhost:8080/hello/id中的id值,实现代码如下: @RestController public class Hello

  • 浅谈spring-boot的单元测试中,@Before不被执行的原因

    我们先来看下笔者的单元测试的依赖版本: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from reposi

  • 浅谈SpringBoot主流读取配置文件三种方式

    读取配置SpringBoot配置文件三种方式 一.利用Bean注解中的Value(${})注解 @Data @Component public class ApplicationProperty { @Value("${application.name}") private String name; } 该方式可以自动读取当前配置文件appliation.yml  或者application.properties中的配置值 区别在于读取yml文件时候支持中文编码,peoperties需

  • 浅谈springboot中tk.mapper代码生成器的用法说明

    问:什么是tk.mapper? 答:这是一个通用的mapper框架,相当于把mybatis的常用数据库操作方法封装了一下,它实现了jpa的规范,简单的查询更新和插入操作都可以直接使用其自带的方法,无需写额外的代码. 而且它还有根据实体的不为空的字段插入和更新的方法,这个是非常好用的哈. 而且它的集成非常简单和方便,下面我来演示下使用它怎么自动生成代码. pom中引入依赖,这里引入tk.mybatis.mapper的版本依赖是因为在mapper-spring-boot-starter的新版本中没有

  • 浅谈springboot内置tomcat和外部独立部署tomcat的区别

    前两天,我去面了个试,面试官问了我个问题,独立部署的tomcat跟springboot内置的tomcat有什么区别,为什么存在要禁掉springboot的tomcat然后将项目部署到独立的tomcat当中? 我就想,不都一个样?独立部署的tomcat可以配置优化?禁AJP,开多线程,开nio?而且springboot内置的tomcat多方便,部署上服务器写个java脚本运行即可.现在考虑下有什么条件能优于内置tomcat的. 1.tomcat的优化配置多线程?内置的也可以配置多线程 server

  • 浅谈SpringBoot项目打成war和jar的区别

    首先给大家来讲一个我们遇到的一个奇怪的问题: 1.我的一个springboot项目,用mvn install打包成jar,换一台有jdk的机器就直接可以用java -jar 项目名.jar的方式运行,没任何问题,为什么这里不需要tomcat也可以运行了? 2.然后我打包成war放进tomcat运行,发现端口号变成tomcat默认的8080(我在server.port中设置端口8090)项目名称也必须加上了. 也就是说我在原来的机器的IDEA中运行,项目接口地址为 ip:8090/listall,

随机推荐