Spring Boot系列教程之死信队列详解

前言

在说死信队列之前,我们先介绍下为什么需要用死信队列。

如果想直接了解死信对接,直接跳入下文的"死信队列"部分即可。

ack机制和requeue-rejected属性

我们还是基于上篇《Spring Boot系列——7步集成RabbitMQ》的demo代码来说。

在项目springboot-demo我们看到application.yaml文件部分配置内容如下

...

listener:
 type: simple
 simple:
  acknowledge-mode: auto
  concurrency: 5
  default-requeue-rejected: true
  max-concurrency: 100
...

其中

acknowledge-mode

该配置项是用来表示消息确认方式,其有三种配置方式,分别是none、manual和auto。

none意味着没有任何的应答会被发送。

manual意味着监听者必须通过调用Channel.basicAck()来告知所有的消息。

auto意味着容器会自动应答,除非MessageListener抛出异常,这是默认配置方式。

default-requeue-rejected

该配置项是决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为true。

我一开始对于这个属性有个误解,我以为rejected是表示拒绝,所以将requeue-rejected连起来是拒绝重新放回队列,后来查了资料明白这个属性的功能才想起来rejected是个形容词,其表示的应该是被拒绝的消息

所以如果该属性配置为true表示会重新放回队列,如果配置为false表示不会放回队列。

下面我们看看acknowledge-mode参数和default-requeue-rejected参数使用不同的组合方式,RabbitMQ是如何处理消息的。

代码依然使用springboot-demo中的RabbitApplicationTests发送消息,使用Receiver类监听demo-queue队列的消息。

对于Receiver类添加了一行代码,该代码模拟抛出异常

@Component
public class Receiver {

 @RabbitListener(queues = "demo_queue")
 public void created(String message) {
  System.out.println("orignal message: " + message);
  int i = 1/0;
 }
}

acknowledge-mode=none, default-requeue-rejected=false

该配置不会确认消息是否正常消费,所以在控制台没有抛出任何异常。通过在RabbitMQ管理页面也没有看到重新放回队列的消息

acknowledge-mode=none, default-requeue-rejected=true

同样该配置不会确认消息是否正常消费,所以在控制台没有抛出任何异常。而且即使default-requeue-rejected配置为true因为没有确认所以也没有看到重新放回队列的消息

acknowledge-mode=manual, default-requeue-rejected=false

该配置需要手动确认消息是否正常消费,但是代码中并没有手动确认,个人理解是因为没有收到ack,所以消息又回到了队列中。

acknowledge-mode=manual, default-requeue-rejected=true

该配置需要手动确认消息是否正常消费,但是代码中并没有手动确认,所以消息被重新放入到队列中了,并且在控制台发现还抛出了异常(这块不是很清楚,default-requeue-rejected设置true和false带来的不同效果,有了解的麻烦下方留言指教)。

acknowledge-mode=auto, default-requeue-rejected=false

该配置采用自动确认,从结果来看,是自动确认了。

从控制台打印的结果可以看出Receiver方法执行了3次,分别是前面两条放回队列的消息以及这次发送的消息,所以3条消息都消费了。

同时因为default-requeue-rejected设置为false,所以即使消费抛出异常,也没有将消息放回队列。

acknowledge-mode=auto, default-requeue-rejected=true

该配置同样采用自动确认,从结果看出,没有抛出异常(这块也不是很理解),且因为default-requeue-rejected设置为true,所以消息重新回到队列。

综上罗列这么多情况只为说明有些情况下,如果消息消费出错,因为配置问题导致消息丢失了。这在很多情况下是要命的,比如用户支付的订单号,如果因为抛异常等原因直接丢失是很要命的。

所以,我们需要有一个确保机制,能够保证即使失败的消息也能保存下来,这时候死信队列就排上用场了。

死信队列

死信队列的整个设计思路是这样的

生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者

下面我们通过网上的一个简单的死信队列的实现看看如何使用死信队列。

@Bean("deadLetterExchange")
 public Exchange deadLetterExchange() {
  return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
 }

 @Bean("deadLetterQueue")
 public Queue deadLetterQueue() {
  Map<String, Object> args = new HashMap<>(2);
//  x-dead-letter-exchange 声明 死信交换机
  args.put("x-dead-letter-exchange", "DL_EXCHANGE");
//  x-dead-letter-routing-key 声明 死信路由键
  args.put("x-dead-letter-routing-key", "KEY_R");
  return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
 }

 @Bean("redirectQueue")
 public Queue redirectQueue() {
  return QueueBuilder.durable("REDIRECT_QUEUE").build();
 }

 /**
  * 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
  *
  * @return the binding
  */
 @Bean
 public Binding deadLetterBinding() {
  return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);

 }

 /**
  * 死信路由通过 KEY_R 绑定键绑定到死信队列上.
  *
  * @return the binding
  */
 @Bean
 public Binding redirectBinding() {
  return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
 }

注意

声明了一个direct模式的exchange。

声明了一个死信队列deadLetterQueue,该队列配置了一些属性x-dead-letter-exchange表明死信交换机,x-dead-letter-routing-key表明死信路由键,因为是direct模式,所以需要设置这个路由键。

声明了一个替补队列redirectQueue,变成死信的消息最终就是存放在这个队列的。

声明绑定关系,分别是死信队列以及替补队列和交换机的绑定。

那么如何模拟生成一个死信消息呢,可以在发送到DL_QUEUE的消息在10秒后失效,然后转发到替补队列中,代码实现如下

public void sendMsg(String content) {
  CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
  MessagePostProcessor messagePostProcessor = message -> {
   MessageProperties messageProperties = message.getMessageProperties();
//   设置编码
   messageProperties.setContentEncoding("utf-8");
//   设置过期时间10*1000毫秒
   messageProperties.setExpiration("5000");
   return message;
  };
  rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", content, messagePostProcessor);
 }

执行结果如下

消息首先进入DL_QUEUE,5秒后失效,被转发到REDIRECT_QUEUE中。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • SpringBoot利用redis集成消息队列的方法

    一.pom文件依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 二.创建消息接收者 变量.方法及构造函数进行标注,完成自动装配的工作. 通过 @Autowired的使用来消除 set ,get方法. @Autowired pub

  • Spring Boot与RabbitMQ结合实现延迟队列的示例

    背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理.这是就可以使用延时队列将订单信息发送到延时队列. 场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作.这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备. 延迟队列能做什么? 延迟队列多用于需

  • springboot实现rabbitmq的队列初始化和绑定

    配置文件,在rabbit中自动建立exchange,queue和绑定它们的关系 代码里初始化exchange 代码里初始化queue 代码里绑定exchange,queue和routekey 配置文件,直接声明vhost 代码里初始化exchange /** * rabbitMq里初始化exchange. * * @return */ @Bean public TopicExchange crmExchange() { return new TopicExchange(EXCHANGE); }

  • Spring Boot系列教程之死信队列详解

    前言 在说死信队列之前,我们先介绍下为什么需要用死信队列. 如果想直接了解死信对接,直接跳入下文的"死信队列"部分即可. ack机制和requeue-rejected属性 我们还是基于上篇<Spring Boot系列--7步集成RabbitMQ>的demo代码来说. 在项目springboot-demo我们看到application.yaml文件部分配置内容如下 ... listener: type: simple simple: acknowledge-mode: aut

  • Spring Boot实战教程之自动配置详解

    前言 大家应该都有所了解,随着Ruby.Groovy等动态语言的流行,相比较之下Java的开发显得格外笨重.繁多的配置.低下的开发效率.复杂的部署流程以及第三方技术集成难度大等问题一直被人们所诟病.随着Spring家族中的新星Spring Boot的诞生,这些问题都在逐渐被解决. 个人觉得Spring Boot中最重要的两个优势就是可以使用starter简化依赖配置和Spring的自动配置.下面这篇文章将给大家详细介绍Spring Boot自动配置的相关内容,话不多说,来一起看看详细的介绍. 使

  • spring boot Slf4j日志框架的体系结构详解

    目录 前言 一.五花八门的日志工具包 1.1. 日志框架 1.2.日志门面 1.3日志门面存在的意义 二.日志框架选型 三.日志级别 四.常见术语概念解析 总结 前言 刚刚接触到java log日志的同学可能会被各种日志框架吓到,包括各种日志框架之间的jar总是发生冲突,另很多小伙伴头疼不已.那我们本篇的内容就是将各种java 日志框架发展过程,以及他们之间的关系,以及如何选型来介绍给大家. 一.五花八门的日志工具包 1.1. 日志框架 JDK java.util.logging 包:java.

  • Spring boot注解@Async线程池实例详解

    这篇文章主要介绍了Spring boot注解@Async线程池实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 从Spring3开始提供了@Async注解,该注解可以被标注在方法上,以便异步地调用该方法.调用者将在调用时立即返回,方法的实际执行将提交给Spring TaskExecutor的任务中,由指定的线程池中的线程执行. 1. TaskExecutor Spring异步线程池的接口类,其实质是java.util.concurrent

  • Spring Boot实现数据访问计数器方案详解

    目录 1.数据访问计数器 2.代码实现 2.1.方案说明 2.2.代码 2.3.调用 1.数据访问计数器   在Spring Boot项目中,有时需要数据访问计数器.大致有下列三种情形: 1)纯计数:如登录的密码错误计数,超过门限N次,则表示计数器满,此时可进行下一步处理,如锁定该账户. 2)时间滑动窗口:设窗口宽度为T,如果窗口中尾帧时间与首帧时间差大于T,则表示计数器满.   例如使用redis缓存时,使用key查询redis中数据,如果有此key数据,则返回对象数据:如无此key数据,则查

  • spring boot中的properties参数配置详解

    application.properties application.properties是spring boot默认的配置文件,spring boot默认会在以下两个路径搜索并加载这个文件 src\main\resources src\main\resources\config 配置系统参数 在application.properties中可配置一些系统参数,spring boot会自动加载这个参数到相应的功能,如下 #端口,默认为8080 server.port=80 #访问路径,默认为/

  • spring boot application properties配置实例代码详解

    废话不多说了,直接给大家贴代码了,具体代码如下所示: # =================================================================== # COMMON SPRING BOOT PROPERTIES # # This sample file is provided as a guideline. Do NOT copy it in its # entirety to your own application. ^^^ # ========

  • spring boot微服务自定义starter原理详解

    这篇文章主要介绍了spring boot微服务自定义starter原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 使用spring boot开发微服务后,工程的数量大大增加(一定要按照领域来切,不要一个中间件客户端包一个),让各个jar从开发和运行时自包含成了一个重要的内容之一.spring boot starter就可以用来解决该问题(没事启动时别依赖于applicationContext.getBean获取bean进行处理,依赖关系

  • Spring Boot读取resources目录文件方法详解

    这篇文章主要介绍了Spring Boot读取resources目录文件方法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 在Java编码过程中,我们常常希望读取项目内的配置文件,按照Maven的习惯,这些文件一般放在项目的src/main/resources下,因此,合同协议PDF模板.Excel格式的统计报表等模板的存放位置是resources/template/test.pdf,下面提供两种读取方式,它们分别在windows和Linux

  • Spring Boot自定义错误视图的方法详解

    Spring Boot缺省错误视图解析器 Web应用在处理请求的过程中发生错误是非常常见的情况,SpringBoot中为我们实现了一个错误视图解析器(DefaultErrorViewResolver).它基于一些常见的约定,尝试根据HTTP错误状态码解析出错误处理视图.它会在目录/error下针对提供的HTTP错误状态码搜索模板或者静态资源,比如,给定了HTTP状态码404,它会尝试搜索如下模板或者静态资源: /<templates>/error/404.<ext> - 这里<

随机推荐