RabbitMQ 如何解决消息幂等性的问题

前言

关于MQ消费者的幂等性问题,在于MQ的重试机制,因为网络原因或客户端延迟消费导致重复消费。使用MQ重试机制需要注意的事项以及如何解决消费者幂等性问题以下将逐一讲解。

1. RabbitMQ自动重试机制

消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这个时候我们如何处理?

使用重试机制,RabbitMQ默认开启重试机制。

实现原理:

  • @RabbitHandler注解 底层使用Aop拦截,如果程序(消费者)没有抛出异常,自动提交事务
  • 如果Aop使用异常通知拦截获取到异常后,自动实现补偿机制,消息缓存在RabbitMQ服务器端

注意:

  • 默认会一直重试到消费者不抛异常为止,这样显然不好。我们需要修改重试机制策略,如间隔3s重试一次)

配置:

spring:
  rabbitmq:
    # 连接地址
    host: 127.0.0.1
    # 端口号
    port: 5672
    # 账号
    username: guest
    # 密码
    password: guest
    # 地址(类似于数据库的概念)
    virtual-host: /admin_vhost
    # 消费者监听相关配置
    listener:
      simple:
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000

2. 如何合理选择重试机制?

情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? 需要重试,可能是因为网络原因短暂不能访问

情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试? 不需要重试,因为属于程序bug需要重新发布版本

总结:对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job进行健康检查+人工进行补偿

3. 调用第三方接口自动实现补偿机制

我们知道了,RabbitMQ在消费者消费发生异常时,会自动进行补偿机制,所以我们(消费者)在调用第三方接口时,可以根据返回结果判断是否成功:

  • 成功:正常消费
  • 失败:手动抛处一个异常,这时RabbitMQ自动给我们做重试 (补偿)。

4. 如何解决消费者幂等性问题

防止重复消费 (MQ重试机制需要注意的问题)

产生原因:网络延迟传输中,消费者出现异常或者消费者延迟消费,会造成进行MQ重试补偿,在重试过程中,可能会造成重复消费。

面试题:MQ中消费者如何保证幂等性问题,不被重复消费?

伪代码:

生产者核心代码:

请求头设置消息id(messageId)

@Component
public class FanoutProducer {
 @Autowired
 private AmqpTemplate amqpTemplate;

 public void send(String queueName) {
  String msg = "my_fanout_msg:" + System.currentTimeMillis();
  //请求头设置消息id(messageId)
  Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
    .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();
  System.out.println(msg + ":" + msg);
  amqpTemplate.convertAndSend(queueName, message);
 }
}

消费者核心代码:

@RabbitListener(queues = "fanout_email_queue")
 public void process(Message message) throws Exception {
  // 获取消息Id
  String messageId = message.getMessageProperties().getMessageId();
  String msg = new String(message.getBody(), "UTF-8");
  //② 判断唯一Id是否被消费,消息消费成功后将id和状态保存在日志表中,我们从(①步骤)表中获取并判断messageId的状态即可
  //从redis中获取messageId的value
  String value = redisUtils.get(messageId)+"";
  if(value.equals("1") ){ //表示已经消费
   return; //结束
  }
  System.out.println("邮件消费者获取生产者消息" + "messageId:" + messageId + ",消息内容:" + msg);
  JSONObject jsonObject = JSONObject.parseObject(msg);
  // 获取email参数
  String email = jsonObject.getString("email");
  // 请求地址
  String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
  JSONObject result = HttpClientUtils.httpGet(emailUrl);
  if (result == null) {
   // 因为网络原因,造成无法访问,继续重试
   throw new Exception("调用接口失败!");
  }
  System.out.println("执行结束....");
  //① 执行到这里已经消费成功,我们可以修改messageId的状态,并存入日志表(可以存到redis中,key为消息Id、value为状态)
 }

5. SpringBoot整合RabbitMQ应答模式(ACK)

1.修改配置simple下添加 acknowledge-mode: manual:

spring:
  rabbitmq:
    # 连接地址
    host: 127.0.0.1
    # 端口号
    port: 5672
    # 账号
    username: guest
    # 密码
    password: guest
    # 地址(类似于数据库的概念)
    virtual-host: /admin_vhost
    # 消费者监听相关配置
    listener:
      simple:
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000
        # 开启手动ack
        acknowledge-mode: manual

2.消费者增加代码:

Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); 手动ack
channel.basicAck(deliveryTag, false);手动签收
//邮件队列
@Component
public class FanoutEamilConsumer {
 @RabbitListener(queues = "fanout_email_queue")
 public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
  System.out
    .println(Thread.currentThread().getName() + ",邮件消费者获取生产者消息msg:" + new String(message.getBody(), "UTF-8")
      + ",messageId:" + message.getMessageProperties().getMessageId());
  // 手动ack
  Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
  // 手动签收
  channel.basicAck(deliveryTag, false);
 }
}

RabbitMQ 如何保证幂等性,数据一致性

mq的作用主要是用来解耦,削峰,异步,

增加MQ,系统的复杂性也会增加很多,

也会带来其他的问题,比如MQ挂了怎么办,怎么保持数据的幂等性

幂等性问题通俗点讲就是保证数据不被重复消费,同时数据也不能少,

也就是数据一致性问题。

下面是MQ丢失的3种情况

1,生产者发送消息至MQ的数据丢失

解决方法:在生产者端开启comfirm 确认模式,你每次写的消息都会分配一个唯一的 id,

然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了

2,MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失

解决方式:MQ设置为持久化。将内存数据持久化到磁盘中

3,消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完

解决方式:用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

数据重复的问题简单的多,就是在消费端判断数据是否已经被消费过

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • RabbitMQ 的七种队列模式和应用场景

    七种模式介绍与应用场景 简单模式(Hello World) 做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B 应用场景:将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人 工作队列模式(Work queues) 在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理 应用场景:一个订单的处理需要10s,有多个订单可以同时放到消息队列

  • RabbitMQ交换机与Springboot整合的简单实现

    RabbitMQ-交换机 1.交换机是干什么的? 消息(Message)由Client发送,RabbitMQ接收到消息之后通过交换机转发到对应的队列上面.Worker会从队列中获取未被读取的数据处理. 1.交换机的种类 RabbitMQ包含四种不同的交换机类型: Direct exchange:直连交换机,转发消息到routigKey指定的队列 Fanout exchange:扇形交换机,转发消息到所有绑定队列(速度最快) Topic exchange:主题交换机,按规则转发消息(最灵活) He

  • RabbitMQ的基础知识

    目录 RabbitMQ 1.对MQ的介绍 2.RabbitMQ的六种模式 及工作原理 3.hello world队列 4.工作队列模式 5.消息应答机制 自动应答 手动应答 消息自动进行重新入队 6.RabbitMQ的持久化,不公平分发及预取值 7.发布确认 8.交换机 <1>交换机的认识 <2>交换机具体介绍 9.死信队列 <1>认识死信队列 <2>死信实战 RabbitMQ 1.对MQ的介绍 1.说明是MQ MQ(message queue),从字面意思

  • RabbitMQ 如何解决消息幂等性的问题

    前言 关于MQ消费者的幂等性问题,在于MQ的重试机制,因为网络原因或客户端延迟消费导致重复消费.使用MQ重试机制需要注意的事项以及如何解决消费者幂等性问题以下将逐一讲解. 1. RabbitMQ自动重试机制 消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这个时候我们如何处理? 使用重试机制,RabbitMQ默认开启重试机制. 实现原理: @RabbitHandler注解 底层使用Aop拦截,如果程序(消费者)没有抛出异常,自动提交事务 如果Aop使用异常通知拦截获取到异常后,自动实现补

  • springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步 最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的. 于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和大伙一起学习学习! 这次我分享的是 springboot + rabbitmq 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,

  • SpringBoot+RabbitMQ方式收发消息的实现示例

    本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~ 交给Spring帮我们管理连接可以让我们专注于业务逻辑,就像声明式事务一样易用,方便又高效. 祝有好收获,先赞后看,快乐无限. 本文代码:   https://gitee.com/he-erduo/spring-boot-learning-demo https://github.com/he-erduo/spring-boot-lea

  • 详解RabbitMq如何做到消息的可靠性投递

    目录 前言 RabbitMq的投递及消费流程 提供者如何确保消息的成功投递 单条消息的同步确认 多条消息的同步确认 异步消息确认 消息的返回机制 前言 现在的一些互联网项目或者是高并发的项目中很少有没有引入消息队列的. 引入消息队列可以给这个项目带来很多的好处:比如 削峰 这个就很好的理解,在系统中的请求量是固定的,但是有的时候会多出很多的突发流量,比如在有秒杀活动的时候,这种瞬时的高流量可能会打垮系统,这个时候就可以很好的引入MQ,将这些请求积压到MQ中,然后消费端在按照自已的能力去处理这里请

  • .Net使用RabbitMQ即时发消息Demo

    前言 最近项目要使用RabbitMQ,网上已经有很多优秀的文章了,百度百科对RabbitMQ阐述也非常明确,建议去看下,还有amqp协议.必须一提的是rabbitmq是由LShift提供的一个消息队列协议(AMQP)的开源实现,由以高性能.健壮以及可伸缩性出名的Erlang写成(因此也是继承了这些优点). 最近参考大神们的博客,自己做了一个RabbitMQ即时发消息的Demo.下面话不多说了,来一起看看详细的介绍吧. 步骤如下: 1.使用VS的NuGet安装包管理工具安装RabbitMQ.Cli

  • c# rabbitmq 简单收发消息的示例代码

    发布消息:(生产者) /// <summary> /// 发送消息 /// </summary> /// <param name="queue">队列名</param> /// <param name="message">消息内容</param> private static void PublishInfo(string queue, string message) { try { var f

  • C#利用RabbitMQ实现点对点消息传输

    消息队列模型 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列.生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者. RabbitMQ设置 RabbitMQ是通过交换机将消息转发到对应队列,所以队列需要和交换机进行绑定.本例将队列绑定到默认的amq.direct交换机,并设置Routing key,如下图所示: RabbitMQ动态库安装 通过NuGet包管理器进行安装RabbitMQ.Client,如下所示:

  • Java RabbitMQ 中的消息长期不消费会过期吗

    目录 1. 默认情况 2. TTL 2.1 单条消息过期 2.2 队列消息过期 2.3 特殊情况 3. 死信队列 3.1 死信交换机 3.2 死信队列 3.3 实践 4. 小结 RabbitMQ 中的消息长期未被消费会过期吗?用过 RabbitMQ 的小伙伴可能都有这样的疑问,今天松哥就来和大家捋一捋这个问题. 1. 默认情况 首先我们来看看默认情况. 默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直

  • 分布式面试消息队列解决消息重复保证消息顺序

    目录 引言 1.面试官: 那你有考虑过消息重复问题怎么解决吗? 2.面试官: 在多集群消息架构中,如果消费端要求接收到的消息是有序的,怎么解决消息顺序消费问题? 3.面试官: 那如何做到topic不分区,能举例说明一下吗? 总结 引言 我在<项目中为什么要使用消息队列>中列举了两个使用消息队列的例子. (1)收银系统,确认收款成功,通过MQ通知给物流系统发货. (2)消费积分,用户每消费一笔给用户增加一定积分,京东豆,信用卡积分,2020年如果还没倒闭的电商平台中,可以100%的确定订单系统和

  • 详解SpringBoot整合RabbitMQ如何实现消息确认

    目录 简介 生产者消息确认 介绍 流程 配置 ConfirmCallback ReturnCallback 注册ConfirmCallback和ReturnCallback 消费者消息确认 介绍 手动确认三种方式 简介 本文介绍SpringBoot整合RabbitMQ如何进行消息的确认. 生产者消息确认 介绍 发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递. 如果消息和队列是可持久化的,那么确认消

随机推荐