RabbitMQ 的消息持久化与 Spring AMQP 的实现详解

前言

要从奔溃的 RabbitMQ 中恢复的消息,我们需要做消息持久化。如果消息要从 RabbitMQ 奔溃中恢复,那么必须满足三点,且三者缺一不可。

  • 交换器必须是持久化。
  • 队列必须是持久化的。
  • 消息必须是持久化的。

原生的实现方式

原生的 RabbitMQ 客户端需要完成三个步骤。

第一步,交换器的持久化。

// 参数1 exchange :交换器名
// 参数2 type :交换器类型
// 参数3 durable :是否持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

第二步,队列的持久化。

// 参数1 queue :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
// 参数5 arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

第三步,消息的持久化。

// 参数1 exchange :交换器
// 参数2 routingKey : 路由键
// 参数3 props : 消息的其他参数,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化
// 参数4 body : 消息体
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

Spring AMQP 的实现方式

Spring AMQP 是对原生的 RabbitMQ 客户端的封装。一般情况下,我们只需要定义交换器的持久化和队列的持久化。

其中,交换器的持久化配置如下。

// 参数1 name :交互器名
// 参数2 durable :是否持久化
// 参数3 autoDelete :当所有消费客户端连接断开后,是否自动删除队列
new TopicExchange(name, durable, autoDelete)

此外,还需要再配置队列的持久化。

// 参数1 name :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
new Queue(name, durable, exclusive, autoDelete);

至此,RabbitMQ 的消息持久化配置完毕。

那么,消息的持久化难道不需要配置么?确实如此,我们来看下源码。

一般情况下,我们会通过这种方式发送消息。

rabbitTemplate.convertAndSend(exchange, routeKey, message);

其中,调用了 convertAndSend(String exchange, String routingKey, final Object object) 方法。

@Override
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
 convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}

接着,用调用了 convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) 方法。

public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) throws AmqpException {
  send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
 }

此时,最关键的方法出现了,它是 convertMessageIfNecessary(final Object object)。

protected Message convertMessageIfNecessary(final Object object) {
 if (object instanceof Message) {
  return (Message) object;
 }
 return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}

其中,关键的是 MessageProperties 类,它持久化的策略是 MessageDeliveryMode.PERSISTENT,因此它会初始化时默认消息是持久化的。

public class MessageProperties implements Serializable {
 public MessageProperties() {
  this.deliveryMode = DEFAULT_DELIVERY_MODE;
  this.priority = DEFAULT_PRIORITY;
 }
 static {
  DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
  DEFAULT_PRIORITY = Integer.valueOf(0);
 }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • rabbitmq结合spring实现消息队列优先级的方法

    1.1项目背景:做一个灾情预警的消息平台,灾情检查系统需要向消息平台里面推送消息,这里是典型的异构系统的消息传递,我们需要选择一个中间件作为消息队列,调研分析了rabbitmq,zeromq,activemq,kafka等消息中间件,综合性能,安全,可持久化等角度果断选择了rabbitmq作为我们的消息中间件 (其实这里是因为rabbitmq 是spring官方支持的,开发起来方便).需求上我们有多种类型的消息,这里有紧急推送的和一般的等区分,高并发时,就会有对消息进行优先推送的情况出现,于是r

  • JAVA获取rabbitmq消息总数过程详解

    公司使用的是rabbitMQ,需要做监控预警的job去监控rabbitMQ里面的堆积消息个数,如何使用rabbitMQ获取监控的队列里面的队列消息个数呢? 首先需要创建一个连接,配置文件注入相关的值,然后设置连接的相关信息,创建链接. 导入的包是使用: import com.rabbitmq.client @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}

  • PHP+RabbitMQ实现消息队列的完整代码

    前言 为什么使用RabbitMq而不是ActiveMq或者RocketMq? 首先,从业务上来讲,我并不要求消息的100%接受率,并且,我需要结合php开发,RabbitMq相较RocketMq,延迟较低(微妙级).至于ActiveMq,貌似问题较多.RabbitMq对各种语言的支持较好,所以选择RabbitMq. 先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异. php扩展地址: http://pecl.php.net/package/amq

  • Java编程rabbitMQ实现消息的收发

    java实现rAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然. AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全. RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.Actio

  • Spring Boot RabbitMQ 延迟消息实现完整版示例

    概述 曾经去网易面试的时候,面试官问了我一个问题,说 下完订单后,如果用户未支付,需要取消订单,可以怎么做 我当时的回答是,用定时任务扫描DB表即可.面试官不是很满意,提出: 用定时任务无法做到准实时通知,有没有其他办法? 我当时的回答是: 可以用队列,订单下完后,发送一个消息到队列里,并指定过期时间,时间一到,执行回调接口. 面试官听完后,就不再问了.其实我当时的思路是对的,只不过讲的不是很专业而已.专业说法是利用 延迟消息 . 其实用定时任务,确实有点问题,原本业务系统希望10分钟后,如果订

  • Spring学习笔记3之消息队列(rabbitmq)发送邮件功能

    rabbitmq简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其中较为成熟的MQ产品有IBM WEBSPHERE MQ. 本节的内容是用户注册时,将邮

  • 利用Python学习RabbitMQ消息队列

    RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱.邮局.投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处. RabbitMQ和邮局的主要区别就是RabbitMQ接收.存储和发送的是二进制数据----消息. rabbitmq基本管理命令: 一步启动Erlang node和Rabbit应用:sudo rabbitmq-serv

  • RabbitMQ 的消息持久化与 Spring AMQP 的实现详解

    前言 要从奔溃的 RabbitMQ 中恢复的消息,我们需要做消息持久化.如果消息要从 RabbitMQ 奔溃中恢复,那么必须满足三点,且三者缺一不可. 交换器必须是持久化. 队列必须是持久化的. 消息必须是持久化的. 原生的实现方式 原生的 RabbitMQ 客户端需要完成三个步骤. 第一步,交换器的持久化. // 参数1 exchange :交换器名 // 参数2 type :交换器类型 // 参数3 durable :是否持久化 channel.exchangeDeclare(EXCHANG

  • rabbitmq(中间消息代理)在python中的使用详解

    在之前的有关线程,进程的博客中,我们介绍了它们各自在同一个程序中的通信方法.但是不同程序,甚至不同编程语言所写的应用软件之间的通信,以前所介绍的线程.进程队列便不再适用了:此种情况便只能使用socket编程了,然而不同程序之间的通信便不再像线程进程之间的那么简单了,要考虑多种情况(比如其中一方断线另一方如何处理:消息群发,多个程序之间的通信等等),如果每遇到一次程序间的通信,便要根据不同情况编写不同的socket,还要维护.完善这个socket这会使得编程人员的工作量大大增加,也使得程序更易崩溃

  • 详解RabbitMQ中死信队列和延迟队列的使用详解

    目录 简介 死信队列 简介 示例 延迟队列 简介 使用场景 简介 本文介绍RabbitMQ的死信队列和延迟队列. 本内容也是Java后端面试中常见的问题. 死信队列 简介 DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列. 以下几种情况会导致消息变成死信: 消息被拒绝(Basic.Reject/Ba

  • Spring @Transactional工作原理详解

    本文将深入研究Spring的事务管理.主要介绍@Transactional在底层是如何工作的.之后的文章将介绍: propagation(事务传播)和isolation(隔离性)等属性的使用 事务使用的陷阱有哪些以及如何避免 JPA和事务管理 很重要的一点是JPA本身并不提供任何类型的声明式事务管理.如果在依赖注入容器之外使用JPA,事务处理必须由开发人员编程实现. UserTransaction utx = entityManager.getTransaction(); try{ utx.be

  • Java Spring JdbcTemplate基本使用详解

    目录 JdbcTemplate概述 JdbcTemplate开发步骤 JdbcTemplate快速入门 Spring产生JdbcTemplate对象 JdbcTemplate概述 它是spring框架中提供的一个对象,是对原始繁琐的Jdbc API对象的简单封装.spring框架为我们提供了很多的操作模板类.例如:操作关系型数据的JdbcTemplate和HibernateTemplate,操作nosql数据库的RedisTemplate,操作消息队列的JmsTemplate等等. JdbcTe

  • JSP 中Spring Bean 的作用域详解

    JSP 中Spring Bean 的作用域详解 Bean元素有一个scope属性,用于定义Bean的作用域,该属性有如下五个值: 1>singleton: 单例模式,在整个spring IOC容器中,单例模式作用域的Bean都将只生成一个实例.一般Spring容器默认Bean的作用域为singleton 2>prototype: 与singleton相反, 每次通过容器的getBean()方法获取该作用域下的Bean时都将产生一个新的Bean实例 3>request: 对于同一次Http

  • java中Spring Security的实例详解

    java中Spring Security的实例详解 spring security是一个多方面的安全认证框架,提供了基于JavaEE规范的完整的安全认证解决方案.并且可以很好与目前主流的认证框架(如CAS,中央授权系统)集成.使用spring security的初衷是解决不同用户登录不同应用程序的权限问题,说到权限包括两部分:认证和授权.认证是告诉系统你是谁,授权是指知道你是谁后是否有权限访问系统(授权后一般会在服务端创建一个token,之后用这个token进行后续行为的交互). spring

  • Spring Java-based容器配置详解

    装Java-based的配置 使用 @Import 注解 跟在Spring XML文件中使用<import>元素添加模块化的配置类似,@Import注解允许你加载其他配置类中的@Bean定义: @Configuration public class ConfigA { @Bean public A a() { return new A(); } } @Configuration @Import(ConfigA.class) public class ConfigB { @Bean public

  • 微信小程序 消息推送php服务器验证实例详解

    微信小程序 消息推送php服务器验证实例详解 微信文档(靠下有个"接入指引"):https://mp.weixin.qq.com/debug/wxadoc/dev/api/custommsg/callback_help.html 设置页面("设置">>"开发设置"): https://mp.weixin.qq.com/wxopen/initprofile?action=home&lang=zh_CN 1.设置服务器域名 比如:

  • Spring AOP 基于注解详解及实例代码

    Spring AOP  基于注解详解及实例代码 1.启用spring对@AspectJ注解的支持: <beans xmlns:aop="http://www.springframework.org/schema/aop"...> <!--启动支持--> <aop:aspectj-autoproxy /> </beans> 也可以配置AnnotationAwareAspectJAutoProxyCreator Bean来启动Spring对@

随机推荐