springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步

最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI。不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的。

于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和大伙一起学习学习!

这次我分享的是 springboot + rabbitmq 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,其实整体的内容比较简单,有时候事情就是这么神奇,越是简单的东西就越容易出错。

可以看到使用了 RabbitMQ 以后,我们的业务链路明显变长了,虽然做到了系统间的解耦,但可能造成消息丢失的场景也增加了。例如:

  • 消息生产者 - > rabbitmq服务器(消息发送失败)
  • rabbitmq服务器自身故障导致消息丢失
  • 消息消费者 - > rabbitmq服务(消费消息失败)

所以说能不使用中间件就尽量不要用,如果为了用而用只会徒增烦恼。开启消息确认机制以后,尽管很大程度上保证了消息的准确送达,但由于频繁的确认交互,rabbitmq 整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议你用消息确认机制。

下边我们先来实现springboot + rabbitmq消息确认机制,再对遇到的问题做具体分析。

一、准备环境

1、引入 rabbitmq 依赖包

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、修改 application.properties 配置

配置中需要开启 发送端消费端 的消息确认。

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true
####################################################
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true

3、定义 Exchange 和 Queue

定义交换机 confirmTestExchange 和队列 confirm_test_queue ,并将队列绑定在交换机上。

@Configuration
public class QueueConfig {

 @Bean(name = "confirmTestQueue")
 public Queue confirmTestQueue() {
 return new Queue("confirm_test_queue", true, false, false);
 }

 @Bean(name = "confirmTestExchange")
 public FanoutExchange confirmTestExchange() {
 return new FanoutExchange("confirmTestExchange");
 }

 @Bean
 public Binding confirmTestFanoutExchangeAndQueue(
 @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
 @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
 return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
 }
}

rabbitmq 的消息确认分为两部分:发送消息确认 和 消息接收确认。

二、消息发送确认

发送消息确认:用来确认生产者 producer 将消息发送到 brokerbroker 上的交换机 exchange 再投递给队列 queue的过程中,消息是否成功投递。

消息从 producerrabbitmq broker有一个 confirmCallback 确认模式。

消息从 exchangequeue 投递失败有一个 returnCallback 退回模式。

我们可以利用这两个Callback来确保消的100%送达。

1、 ConfirmCallback确认模式

消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

 @Override
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {

 if (!ack) {
 log.error("消息发送异常!");
 } else {
 log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
 }
 }
}

实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationDataackcause

  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
  • ack:消息投递到broker 的状态,true表示成功。
  • cause:表示投递失败的原因。

但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback

2、 ReturnCallback 退回模式

如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

 @Override
 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
 log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
 }
}

实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

下边是具体的消息发送,在rabbitTemplate中设置 ConfirmReturn 回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建一个 CorrelationData对象,添加一个id10000000000

@Autowired
 private RabbitTemplate rabbitTemplate;

 @Autowired
 private ConfirmCallbackService confirmCallbackService;

 @Autowired
 private ReturnCallbackService returnCallbackService;

 public void sendMessage(String exchange, String routingKey, Object msg) {

 /**
 * 确保消息发送失败后可以重新返回到队列中
 * 注意:yml需要配置 publisher-returns: true
 */
 rabbitTemplate.setMandatory(true);

 /**
 * 消费者确认收到消息后,手动ack回执回调处理
 */
 rabbitTemplate.setConfirmCallback(confirmCallbackService);

 /**
 * 消息投递到队列失败回调处理
 */
 rabbitTemplate.setReturnCallback(returnCallbackService);

 /**
 * 发送消息
 */
 rabbitTemplate.convertAndSend(exchange, routingKey, msg,
 message -> {
  message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  return message;
 },
 new CorrelationData(UUID.randomUUID().toString()));
 }

三、消息接收确认

消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(ack)的过程。使用@RabbitHandler注解标注的方法要增加 channel(信道)、message 两个参数。

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {

 @RabbitHandler
 public void processHandler(String msg, Channel channel, Message message) throws IOException {

 try {
 log.info("小富收到消息:{}", msg);

 //TODO 具体业务

 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

 } catch (Exception e) {

 if (message.getMessageProperties().getRedelivered()) {

 log.error("消息已重复处理失败,拒绝再次接收...");

 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
 } else {

 log.error("消息即将再次返回队列处理...");

 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
 }
 }
 }
}

消费消息有三种回执方法,我们来分析一下每种方法的含义。

1、basicAck

basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

void basicAck(long deliveryTag, boolean multiple) 

deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行acknackreject等操作。

multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

2、basicNack

basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投递序号。

multiple:是否批量确认。

requeue:值为 true 消息将重新入队列。

3、basicReject

basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投递序号。

requeue:值为 true 消息将重新入队列。

四、测试

发送消息测试一下消息确认机制是否生效,从执行结果上看发送者发消息后成功回调,消费端成功的消费了消息。

用抓包工具Wireshark 观察一下rabbitmq amqp协议交互的变化,也多了 ack 的过程。

五、踩坑日志

1、不消息确认

这是一个非常没技术含量的坑,但却是非常容易犯错的地方。

开启消息确认机制,消费消息别忘了channel.basicAck,否则消息会一直存在,导致重复消费。

2、消息无限投递

在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0 发生异常后将消息重新投入队列。

@RabbitHandler
 public void processHandler(String msg, Channel channel, Message message) throws IOException {

 try {
 log.info("消费者 2 号收到:{}", msg);

 int a = 1 / 0;

 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

 } catch (Exception e) {

 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
 }
 }

但是有个问题是,业务代码一旦出现 bug 99.9%的情况是不会自动修复,一条消息会被无限投递进队列,消费端无限执行,导致了死循环。

本地的CPU被瞬间打满了,大家可以想象一下当时在生产环境导致服务死机,我是有多慌。

而且rabbitmq management 只有一条未被确认的消息。

经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。

消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行。导致消息队列处理出现阻塞,导致正常消息也无法运行。

而我们当时的解决方案是,先将消息进行应答,此时消息队列会删除该条消息,同时我们再次发送该消息到消息队列,异常消息就放在了消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
  message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
  JSON.toJSONBytes(msg));

但这种方法并没有解决根本问题,错误消息还是会时不时报错,后面优化设置了消息重试次数,达到了重试上限以后,手动确认,队列删除此消息,并将消息持久化入MySQL并推送报警,进行人工处理和定时任务做补偿。

3、重复消费

如何保证 MQ 的消费是幂等性,这个需要根据具体业务而定,可以借助MySQL、或者redis 将消息持久化,通过再消息中的唯一性属性校验。

demoGitHub 地址 https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-rabbitmq-confirm

总结

到此这篇关于springboot + rabbitmq 如何实现消息确认机制(踩坑经验)的文章就介绍到这了,更多相关springboot rabbitmq 消息确认机制内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springboot实现rabbitmq的队列初始化和绑定

    配置文件,在rabbit中自动建立exchange,queue和绑定它们的关系 代码里初始化exchange 代码里初始化queue 代码里绑定exchange,queue和routekey 配置文件,直接声明vhost 代码里初始化exchange /** * rabbitMq里初始化exchange. * * @return */ @Bean public TopicExchange crmExchange() { return new TopicExchange(EXCHANGE); }

  • springboot整合rabbitmq的示例代码

    概述 RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,或者简单地将作业队列以便让分布式服务器进行处理. 它现实了AMQP协议,并且遵循Mozilla Public License开源协议,它支持多种语言,可以方便的和spring集成. 消息队列使用消息将应用程序连接起来,这些消息通过像RabbitMQ这样的消息代理服务器在应用程序之间路由. 基本概念 Broker 用来处理数据的消息队列服务器实体 vhost 由RabbitMQ服务器创建的虚拟消息

  • springboot2.0+elasticsearch5.5+rabbitmq搭建搜索服务的坑

    前一阵子准备为项目搭建一个简单的搜索服务,虽然业务数据库mongodb提供了文本搜索的支持,但是在大量文档需要通过关键词进行定位时,es明显更加适合去作为一个搜索引擎(虽然我们之前大部分使用到了ELK那套分析和可视化的特性).Elasticsearch建立在Lucene之上并且支持极其快速的查询和丰富的查询语法,偶尔也可以作为一个轻量级的NoSQL.但是对复杂查询和聚合操作的能力并不是很强. 本篇不会提及如何搭建一个简单搜索服务,而是记录一下大约一周工作时间内遇见的几个坑.. 为什么选择elas

  • springboot集成rabbitMQ之对象传输的方法

    rabbitMQ的安装方法网上有很多教程,这里就不重复了. 在springboot上使用rabbitMQ传输字符串和对象,本文所给出的例子是在两个不同的项目之间进行对象和和字符串的传输. rabbitMQ的依赖(在两个项目中一样的配置): <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • springboot+RabbitMQ+InfluxDB+Grafara监控实践

    本文需要有相关spring boot 或spring cloud 相关微服务框架的基础,如果您具备相关基础可以很容易的实现下述过程!!!!!!! 希望 本文的所说对需要的您有所帮助 从这里我们开始进入闲聊阶段. 大家都知道 spring boot整合了很多很多的第三方框架,我们这里就简单讨论和使用 性能监控和JVM监控相关的东西.其他的本文不讨论虽然有些关联,所以开篇有说需要有相关spring boot框架基础说了这么多废话,下面真正进入主题. 这里首先给大家看下整体的数据流程图,其中两条主线一

  • springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步 最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的. 于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和大伙一起学习学习! 这次我分享的是 springboot + rabbitmq 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,

  • SpringBoot整合RabbitMQ实现消息确认机制

    前面几篇案例已经将常用的交换器(DirectExchange.TopicExchange.FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认. 在rabbitmq-provider项目的application.yml文件上加上一些配置 server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.

  • 详解SpringBoot整合RabbitMQ如何实现消息确认

    目录 简介 生产者消息确认 介绍 流程 配置 ConfirmCallback ReturnCallback 注册ConfirmCallback和ReturnCallback 消费者消息确认 介绍 手动确认三种方式 简介 本文介绍SpringBoot整合RabbitMQ如何进行消息的确认. 生产者消息确认 介绍 发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递. 如果消息和队列是可持久化的,那么确认消

  • RabbitMQ消息确认机制剖析

    目录 前言 消息确认 基本流程 消息确认模式 ConfirmCallback确认模式 ReturnCallback退回模式 消息发送者确认 消息接收者确认 basicAck模式 basicNack模式 basicReject模式 测试 解决办法 消费者确认失败 总结 前言 上一章讲解了RabbitMq的三种Exchange消息发送的模式,但是在默认情况下RabbitMQ并不能保证消息是否发送成功,以及是否能被成功消费,为了保证消息在传递过程中不丢失,需要对消息进行确认机制,来提高消息的可靠性.

  • SpringBoot+RabbitMQ方式收发消息的实现示例

    本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~ 交给Spring帮我们管理连接可以让我们专注于业务逻辑,就像声明式事务一样易用,方便又高效. 祝有好收获,先赞后看,快乐无限. 本文代码:   https://gitee.com/he-erduo/spring-boot-learning-demo https://github.com/he-erduo/spring-boot-lea

  • Springboot集成Kafka进行批量消费及踩坑点

    目录 引入依赖 创建配置类 Kafka 消费者 引入依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.11.RELEASE</version> </dependency> 因为我的项目的 springboot 版本是 1.5.22.RELE

  • python变量赋值机制踩坑记录

    目录 1.可变类型赋值 2.不可变类型赋值 3.自定义类型变量赋值 先说结论: 变量赋值属于浅拷贝(关于深拷贝和浅拷贝的区别可以自己了解下).故如果是可变类型变量(如a是list类型,a=b)赋值,修改a会牵连到b:如果是不可变类型(如int)的赋值,则修改任意变量不会传递. 1. 可变类型赋值 可以看出,对于可变类型赋值,变量始终指向同一块地址. 2. 不可变类型赋值 对于不可变类型变量的赋值,刚开始是指向同一块地址,但修改任意变量,则修改的变量指向另外一块地址,不会影响另外一个变量. 那么问

  • SpringBoot 集成 Jasypt 对数据库加密以及踩坑的记录分享

    前言 密码安全是非常重要的,因此我们在代码中往往需要对密码进行加密,以此保证密码的安全 加依赖 <!-- jasypt --> <dependency> <groupId>com.github.ulisesbocchio</groupId> <artifactId>jasypt-spring-boot-starter</artifactId> <version>3.0.3</version> </depe

  • vue项目使用高德地图的定位及关键字搜索功能的实例代码(踩坑经验)

    1.首先在index.html引入高德地图的秘钥.如图: 注意:如果使用关键字搜索功能要加上 plugin=AMap.Autocomplete,AMap.PlaceSearch,否则功能无法使用,并会报错 2. 定位功能,代码如下: const map = new AMap.Map(this.$refs.container, { resizeEnable: true }) // 创建Map实例 const options = { 'showButton': true, // 是否显示定位按钮 '

  • 浅谈golang fasthttp踩坑经验

    一个简单的系统,结构如下: 我们的服务A接受外部的http请求,然后通过golang的fasthttp将请求转发给服务B,流程非常简单.线上运行一段时间之后,发现服务B完全不再接收任何请求,查看服务A的日志,发现大量的如下错误 从错误原因看是因为连接被占满导致的.进入服务A的容器中(服务A和服务B都是通过docker启动的),通过netstat -anlp查看,发现有大量的tpc连接,处于ESTABLISH.我们采用的是长连接的方式,此时心里非常疑惑:1. fasthttp是能够复用连接的,为什

随机推荐