SpringBoot+RabbitMQ方式收发消息的实现示例

本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~

交给Spring帮我们管理连接可以让我们专注于业务逻辑,就像声明式事务一样易用,方便又高效。

祝有好收获,先赞后看,快乐无限。

本文代码:  

https://gitee.com/he-erduo/spring-boot-learning-demo

https://github.com/he-erduo/spring-boot-learning-demo

1. 环境配置

第一节我们先来搞一下环境的配置,上一篇中我们已经引入了自动配置的包,我们既然使用了自动配置的方式,那RabbitMQ的连接信息我们直接放在配置文件中就行了,就像我们需要用到JDBC连接的时候去配置一下DataSource一样。

如图所示,我们只需要指明一下连接的IP+端口号和用户名密码就行了,这里我用的是默认的用户名与密码,不写的话默认也都是guest,端口号也是默认5672。

主要我们需要看一下手动确认消息的配置,需要配置成manual才是手动确认,日后还会有其他的配置项,眼下我们配置这一个就可以了。

接下来我们要配置一个Queue,上一篇中我们往一个名叫erduo的队列中发送消息,当时是我们手动定义的此队列,这里我们也需要手动配置,声明一个Bean就可以了。

@Configuration
public class RabbitmqConfig {
  @Bean
  public Queue erduo() {
    // 其三个参数:durable exclusive autoDelete
    // 一般只设置一下持久化即可
    return new Queue("erduo",true);
  }
} 

就这么简单声明一下就可以了,当然了RabbitMQ毕竟是一个独立的组件,如果你在RabbitMQ中通过其他方式已经创建过一个名叫erduo的队列了,你这里也可以不声明,这里起到的一个效果就是如果你没有这个队列,会按照你声明的方式帮你创建这个队列。

配置完环境之后,我们就可以以SpringBoot的方式来编写生产者和消费者了。

2. 生产者与RabbitTemplate

和上一篇的节奏一样,我们先来编写生产者,不过这次我要引入一个新的工具:RabbitTemplate。

听它的这个名字就知道,又是一个拿来即用的工具类,Spring家族这点就很舒服,什么东西都给你封装一遍,让你用起来更方便更顺手。

RabbitTemplate实现了标准AmqpTemplate接口,功能大致可以分为发送消息和接受消息。

我们这里是在生产者中来用,主要就是使用它的发送消息功能:send和convertAndSend方法。

// 发送消息到默认的Exchange,使用默认的routing key
void send(Message message) throws AmqpException;
// 使用指定的routing key发送消息到默认的exchange
void send(String routingKey, Message message) throws AmqpException;
// 使用指定的routing key发送消息到指定的exchange
void send(String exchange, String routingKey, Message message) throws AmqpException; 

send方法是发送byte数组的数据的模式,这里代表消息内容的对象是Message对象,它的构造方法就是传入byte数组数据,所以我们需要把我们的数据转成byte数组然后构造成一个Message对象再进行发送。

// Object类型,可以传入POJO
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

convertAndSend方法是可以传入POJO对象作为参数,底层是有一个MessageConverter帮我们自动将数据转换成byte类型或String或序列化类型。

所以这里支持的传入对象也只有三种:byte类型,String类型和实现了Serializable接口的POJO。

介绍完了,我们可以看一下代码:

@Slf4j
@Component("rabbitProduce")
public class RabbitProduce {
  @Autowired
  private RabbitTemplate rabbitTemplate;
  public void send() {
    String message = "Hello 我是作者和耳朵,欢迎关注我。" + LocalDateTime.now().toString();
    System.out.println("Message content : " + message);
    // 指定消息类型
    MessageProperties props = MessagePropertiesBuilder.newInstance()
        .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build();
    rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props));
    System.out.println("消息发送完毕。");
  }
  public void convertAndSend() {
    User user = new User();
    System.out.println("Message content : " + user);
    rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user);
    System.out.println("消息发送完毕。");
  }
} 

这里我特意写明了两个例子,一个用来测试send,另一个用来测试convertAndSend。

send方法里我们看下来和之前的代码是几乎一样的,定义一个消息,然后直接send,但是这个构造消息的构造方法可能比我们想的要多一个参数,我们原来说的只要把数据转成二进制数组放进去即可,现在看来还要多放一个参数了。

MessageProperties,是的我们需要多放一个MessageProperties对象,从他的名字我们也可以看出它的功能就是附带一些参数,但是某些参数是少不了的,不带不行。

比如我的代码这里就是设置了一下消息的类型,消息的类型有很多种可以是二进制类型,文本类型,或者序列化类型,JSON类型,我这里设置的就是文本类型,指定类型是必须的,也可以为我们拿到消息之后要将消息转换成什么样的对象提供一个参考。

convertAndSend方法就要简单太多,这里我放了一个User对象拿来测试用,直接指定队列然后放入这个对象即可。

Tips:User必须实现Serializable接口,不然的话调用此方法的时候会抛出IllegalArgumentException异常。

代码完成之后我们就可以调用了,这里我写一个测试类进行调用:

@SpringBootTest
public class RabbitProduceTest {
  @Autowired
  private RabbitProduce rabbitProduce;
  @Test
  public void sendSimpleMessage() {
    rabbitProduce.send();
    rabbitProduce.convertAndSend();
  }
} 

效果如下图~

同时在控制台使用命令rabbitmqctl.bat list_queues查看队列-erduo现在的情况:

如此一来,我们的生产者测试就算完成了,现在消息队列里两条消息了,而且消息类型肯定不一样,一个是我们设置的文本类型,一个是自动设置的序列化类型。

3. 消费者与RabbitListener

既然队列里面已经有消息了,接下来我们就要看我们该如何通过新的方式拿到消息并消费与确认了。

消费者这里我们要用到@RabbitListener来帮我们拿到指定队列消息,它的用法很简单也很复杂,我们可以先来说简单的方式,直接放到方法上,指定监听的队列就行了。

@Slf4j
@Component("rabbitConsumer")
public class RabbitConsumer {
  @RabbitListener(queues = Producer.QUEUE_NAME)
  public void onMessage(Message message, Channel channel) throws Exception {
    System.out.println("Message content : " + message);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("消息已确认");
  }
} 

这段代码就代表onMessage方法会处理erduo(Producer.QUEUE_NAME是常量字符串"erduo")队列中的消息。

我们可以看到这个方法里面有两个参数,Message和Channel,如果用不到Channel可以不写此参数,但是Message消息一定是要的,它代表了消息本身。

我们可以想想,我们的程序从RabbitMQ之中拉回一条条消息之后,要以怎么样的方式展示给我们呢?

没错,就是封装为一个个Message对象,这里面放入了一条消息的所有信息,数据结构是什么样一会我一run你就能看到了。

同时这里我们使用Channel做一个消息确认的操作,这里的DeliveryTag代表的是这个消息在队列中的序号,这个信息存放在MessageProperties中。

4. SpringBoot 启动!

编写完生产者和消费者,同时已经运行过生产者往消息队列里面放了两条信息,接下来我们可以直接启动消息,查看消费情况:

在我红色框线标记的地方可以看到,因为我们有了消费者所以项目启动后先和RabbitMQ建立了一个连接进行监听队列。

随后就开始消费我们队列中的两条消息:

第一条信息是contentType=text/plain类型,所以直接就在控制台上打印出了具体内容。

第二条信息是contentType=application/x-java-serialized-object,在打印的时候只打印了一个内存地址+字节大小。

不管怎么说,数据我们是拿到了,也就是代表我们的消费是没有问题的,同时也都进行了消息确认操作,从数据上看,整个消息可以分为两部分:body和MessageProperties。

我们可以单独使用一个注解拿到这个body的内容 - @Payload

@RabbitListener(queues = Producer.QUEUE_NAME)
public void onMessage(@Payload String body, Channel channel) throws Exception {
  System.out.println("Message content : " + body);
 } 

也可以单独使用一个注解拿到MessageProperties的headers属性,headers属性在截图里也可以看到,只不过是个空的 - @Headers。

@RabbitListener(queues = Producer.QUEUE_NAME)
public void onMessage(@Payload String body, @Headers Map<String,Object> headers) throws Exception {
  System.out.println("Message content : " + body);
  System.out.println("Message headers : " + headers);
} 

这两个注解都算是扩展知识,我还是更喜欢直接拿到全部,全都要!!!

上面我们已经完成了消息的发送与消费,整个过程我们可以再次回想一下,一切都和我画的这张图上一样的轨迹:

只不过我们一直没有指定Exchage一直使用的默认路由,希望大家好好记住这张图。

5. @RabbitListener与@RabbitHandler

下面再来补一些知识点,有关@RabbitListener与@RabbitHandler。

@RabbitListener上面我们已经简单的进行了使用,稍微扩展一下它其实是可以监听多个队列的,就像这样:

@RabbitListener(queues = { "queue1", "queue2" })
public void onMessage(Message message, Channel channel) throws Exception {
  System.out.println("Message content : " + message);
  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false)
   System.out.println("消息已确认");
} 

还有一些其他的特性如绑定之类的,这里不再赘述因为太硬编码了一般用不上。

下面来说说这节要主要讲的一个特性:@RabbitListener和@RabbitHandler的搭配使用。

前面我们没有提到,@RabbitListener注解其实是可以注解在类上的,这个注解在类上标志着这个类监听某个队列或某些队列。

这两个注解的搭配使用就要让@RabbitListener注解在类上,然后用@RabbitHandler注解在方法上,根据方法参数的不同自动识别并去消费,写个例子给大家看一看更直观一些。

@Slf4j
@Component("rabbitConsumer")
@RabbitListener(queues = Producer.QUEUE_NAME)
public class RabbitConsumer {
  @RabbitHandler
  public void onMessage(@Payload String message){
    System.out.println("Message content : " + message);
  }
  @RabbitHandler
  public void onMessage(@Payload User user) {
    System.out.println("Message content : " + user);
  }
} 

大家可以看看这个例子,我们先用@RabbitListener监听erduo队列中的消息,然后使用@RabbitHandler注解了两个方法。

第一个方法的body类型是String类型,这就代表着这个方法只能处理文本类型的消息。  第二个方法的body类型是User类型,这就代表着这个方法只能处理序列化类型且为User类型的消息。

这两个方法正好对应着我们第二节中测试类会发送的两种消息,所以我们往RabbitMQ中发送两条测试消息,用来测试这段代码,看看效果:

都在控制台上如常打印了,如果@RabbitHandler注解的方法中没有一个的类型可以和你消息的类型对的上,比如消息都是byte数组类型,这里没有对应的方法去接收,系统就会在控制台不断的报错,如果你出现这个情况就证明你类型写的不正确。

假设你的erduo队列中会出现三种类型的消息:byte,文本和序列化,那你就必须要有对应的处理这三种消息的方法,不然消息发过来的时候就会因为无法正确转换而报错。

而且使用了@RabbitHandler注解之后就不能再和之前一样使用Message做接收类型。

@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
  System.out.println("Message content : " + message);
  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  System.out.println("消息已确认");
} 

这样写的话会报类型转换异常的,所以二者选其一。

同时上文我的@RabbitHandler没有进行消息确认,大家可以自己试一下进行消息确认。

6. 消息的序列化转换

通过上文我们已经知道,能被自动转换的对象只有byte[]、String、java序列化对象(实现了Serializable接口的对象),但是并不是所有的Java对象都会去实现Serializable接口,而且序列化的过程中使用的是JDK自带的序列化方法,效率低下。

所以我们更普遍的做法是:使用Jackson先将数据转换成JSON格式发送给RabbitMQ,再接收消息的时候再用Jackson将数据反序列化出来。

这样做可以完美解决上面的痛点:消息对象既不必再去实现Serializable接口,也有比较高的效率(Jackson序列化效率业界应该是最好的了)。

默认的消息转换方案是消息转换顶层接口-MessageConverter的一个子类:SimpleMessageConverter,我们如果要换到另一个消息转换器只需要替换掉这个转换器就行了。

上图是MessageConverter结构树的结构树,可以看到除了SimpleMessageConverter之外还有一个Jackson2JsonMessageConverter,我们只需要将它定义为Bean,就可以直接使用这个转换器了。

@Bean
  public MessageConverter jackson2JsonMessageConverter() {
    return new Jackson2JsonMessageConverter(jacksonObjectMapper);
  } 

这样就可以了,这里的jacksonObjectMapper可以不传入,但是默认的ObjectMapper方案对JDK8的时间日期序列化会不太友好,具体可以参考我的上一篇文章:从LocalDateTime序列化探讨全局一致性序列化,总的来说就是定义了自己的ObjectMapper。

同时为了接下来测试方便,我又定义了一个专门测试JSON序列化的队列:

@Bean
public Queue erduoJson() {
  // 其三个参数:durable exclusive autoDelete
  // 一般只设置一下持久化即可
  return new Queue("erduo_json",true);
} 

如此之后就可以进行测试了,先是生产者代码:

public void sendObject() {
    Client client = new Client();
    System.out.println("Message content : " + client);
    rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client);
    System.out.println("消息发送完毕。");
   } 

我又重新定义了一个Client对象,它和之前测试使用的User对象成员变量都是一样的,不一样的是它没有实现Serializable接口。

同时为了保留之前的测试代码,我又新建了一个RabbitJsonConsumer,用于测试JSON序列化的相关消费代码,里面定义了一个静态变量:JSON_QUEUE = "erduo_json";

所以这段代码是将Client对象作为消息发送到"erduo_json"队列中去,随后我们在测试类中run一下进行一次发送。

紧着是消费者代码:

@Slf4j
@Component("rabbitJsonConsumer")
@RabbitListener(queues = RabbitJsonConsumer.JSON_QUEUE)
public class RabbitJsonConsumer {
  public static final String JSON_QUEUE = "erduo_json";
  @RabbitHandler
  public void onMessage(Client client, @Headers Map<String,Object> headers, Channel channel) throws Exception {
    System.out.println("Message content : " + client);
    System.out.println("Message headers : " + headers);
    channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
    System.out.println("消息已确认");
  }
} 

有了上文的经验之后,这段代码理解起来也是很简单了吧,同时给出了上一节没写的如何在@RabbitHandler模式下进行消息签收。

我们直接来看看效果:

在打印的Headers里面,往后翻可以看到contentType=application/json,这个contentType是表明了消息的类型,这里正是说明我们新的消息转换器生效了,将所有消息都转换成了JSON类型。

后记

这两篇讲完了RabbitMQ的基本收发消息,包括手动配置和自动配置的两种方式,这些大家仔细研读之后应该会对RabbitMQ收发消息没什么疑问了~

不过我们一直以来发消息时都是使用默认的交换机,下篇将会讲述一下RabbitMQ的几种交换机类型,以及其使用方式。

到此这篇关于SpringBoot+RabbitMQ方式收发消息的实现示例的文章就介绍到这了,更多相关SpringBoot RabbitMQ 收发消息内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Spring Boot RabbitMQ 延迟消息实现完整版示例

    概述 曾经去网易面试的时候,面试官问了我一个问题,说 下完订单后,如果用户未支付,需要取消订单,可以怎么做 我当时的回答是,用定时任务扫描DB表即可.面试官不是很满意,提出: 用定时任务无法做到准实时通知,有没有其他办法? 我当时的回答是: 可以用队列,订单下完后,发送一个消息到队列里,并指定过期时间,时间一到,执行回调接口. 面试官听完后,就不再问了.其实我当时的思路是对的,只不过讲的不是很专业而已.专业说法是利用 延迟消息 . 其实用定时任务,确实有点问题,原本业务系统希望10分钟后,如果订

  • springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步 最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的. 于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和大伙一起学习学习! 这次我分享的是 springboot + rabbitmq 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,

  • SpringBoot+RabbitMQ方式收发消息的实现示例

    本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~ 交给Spring帮我们管理连接可以让我们专注于业务逻辑,就像声明式事务一样易用,方便又高效. 祝有好收获,先赞后看,快乐无限. 本文代码:   https://gitee.com/he-erduo/spring-boot-learning-demo https://github.com/he-erduo/spring-boot-lea

  • c# rabbitmq 简单收发消息的示例代码

    发布消息:(生产者) /// <summary> /// 发送消息 /// </summary> /// <param name="queue">队列名</param> /// <param name="message">消息内容</param> private static void PublishInfo(string queue, string message) { try { var f

  • SpringBoot+Netty+WebSocket实现消息发送的示例代码

    一.导入Netty依赖 <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency> 二.搭建websocket服务器 @Component public class WebSocketServer { /** * 主线程池 */

  • Springboot 整合 RocketMQ 收发消息

    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

  • Springboot 整合 RocketMQ 收发消息的配置过程

    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

  • SpringBoot+RabbitMQ+Redis实现商品秒杀的示例代码

    目录 业务分析 创建表 功能实现 1.用户校验 2.下单 3.减少库存 4.支付 总结 业务分析 一般而言,商品秒杀大概可以拆分成以下几步: 用户校验 校验是否多次抢单,保证每个商品每个用户只能秒杀一次 下单 订单信息进入消息队列,等待消费 减少库存 消费订单消息,减少商品库存,增加订单记录 付款 十五分钟内完成支付,修改支付状态 创建表 goods_info 商品库存表 列 说明 id 主键(uuid) goods_name 商品名称 goods_stock 商品库存 package com.

  • 详解SpringBoot整合RabbitMQ如何实现消息确认

    目录 简介 生产者消息确认 介绍 流程 配置 ConfirmCallback ReturnCallback 注册ConfirmCallback和ReturnCallback 消费者消息确认 介绍 手动确认三种方式 简介 本文介绍SpringBoot整合RabbitMQ如何进行消息的确认. 生产者消息确认 介绍 发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递. 如果消息和队列是可持久化的,那么确认消

  • SpringBoot整合Redis实现消息发布与订阅的示例代码

    当我们在多个集群应用中使用到本地缓存时,在数据库数据得到更新后,为保持各个副本当前被修改的数据与数据库数据保持同步,在数据被操作后向其他集群应用发出被更新数据的通知,使其删除;下次当其他应用请求该被更新的数据时,应用会到数据库去取,也就是最新的数据,从而使得被更新数据与数据库保持同步! 能实现发送与接收信息的中间介有很多,比如:RocketMQ.RabbitMQ.ActiveMQ.Kafka等,本次主要简单介绍Redis的推送与订阅功能并集成Spring Boot的实现. 1.添加SpringB

  • SpringBoot+WebSocket+Netty实现消息推送的示例代码

    上一篇文章讲了Netty的理论基础,这一篇讲一下Netty在项目中的应用场景之一:消息推送功能,可以满足给所有用户推送,也可以满足给指定某一个用户推送消息,创建的是SpringBoot项目,后台服务端使用Netty技术,前端页面使用WebSocket技术. 大概实现思路: 前端使用webSocket与服务端创建连接的时候,将用户ID传给服务端 服务端将用户ID与channel关联起来存储,同时将channel放入到channel组中 如果需要给所有用户发送消息,直接执行channel组的writ

随机推荐