Java RabbitMQ消息队列详解常见问题

目录
  • 消息堆积
  • 保证消息不丢失
  • 死信队列
  • 延迟队列
  • RabbitMQ消息幂等问题
    • RabbitMQ消息自动重试机制
    • 合理的选择重试机制
    • 消费者开启手动ack模式
    • rabbitMQ如何解决消息幂等问题
  • RabbitMQ解决分布式事务问题
    • 基于RabbitMQ解决分布式事务的思路

消息堆积

消息堆积的产生场景:

  • 生产者产生的消息速度大于消费者消费的速度。解决:增加消费者的数量或速度。
  • 没有消费者进行消费的时候。解决:死信队列、设置消息有效期。相当于对我们的消息设置有效期,在规定的时间内如果没有消费的话,自动过期,过期的时候会执行客户端回调监听的方法将消息存放到数据库表记录,后期实现补偿。

保证消息不丢失

1、生产者使用消息确认机制保证消息百分之百能够将消息投递到MQ成功。

2、MQ服务器端应该将消息持久化到硬盘

3、消费者使用手动ack机制确认消息消费成功

如果MQ服务器容量满了怎么办?

使用死信队列将消息存到数据库中去,后期补偿消费。

死信队列

RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。

产生背景:

  • 消息投递到MQ中存放 消息已经过期
  • 队列达到最大的长度 (队列容器已经满了)生产者拒绝接收消息
  • 消费者消费多次消息失败,就会转移存放到死信队列中

代码案例:

maven依赖

<dependencies>
        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

yml配置

server:
#  服务启动端口配置
  port: 8081
  servlet:
#    应用访问路径
    context-path: /
spring:
  #增加application.druid.yml 的配置文件
#  profiles:
#    active: rabbitmq
  rabbitmq:
    ####连接地址
    host: www.kaicostudy.com
    ####端口号
    port: 5672
    ####账号
    username: kaico
    ####密码
    password: kaico
    ### 地址
    virtual-host: /kaicoStudy
###模拟演示死信队列
kaico:
  dlx:
    exchange: kaico_order_dlx_exchange
    queue: kaico_order_dlx_queue
    routingKey: kaico.order.dlx
  ###备胎交换机
  order:
    exchange: kaico_order_exchange
    queue: kaico_order_queue
    routingKey: kaico.order

队列配置类

@Configuration
public class DeadLetterMQConfig {
    /**
     * 订单交换机
     */
    @Value("${kaico.order.exchange}")
    private String orderExchange;
    /**
     * 订单队列
     */
    @Value("${kaico.order.queue}")
    private String orderQueue;
    /**
     * 订单路由key
     */
    @Value("${kaico.order.routingKey}")
    private String orderRoutingKey;
    /**
     * 死信交换机
     */
    @Value("${kaico.dlx.exchange}")
    private String dlxExchange;
    /**
     * 死信队列
     */
    @Value("${kaico.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${kaico.dlx.routingKey}")
    private String dlxRoutingKey;
    /**
     * 声明死信交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }
    /**
     * 声明死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }
    /**
     * 声明订单业务交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }
    /**
     * 绑定死信队列到死信交换机
     *
     * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }
    /**
     * 声明订单队列,并且绑定死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        // 订单队列绑定我们的死信交换机
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange", dlxExchange);
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    }
    /**
     * 绑定订单队列到订单交换机
     *
     * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);
    }
}

死信队列消费者

@Component
public class OrderDlxConsumer {
    /**
     * 死信队列监听队列回调的方法
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_dlx_queue")
    public void orderDlxConsumer(String msg) {
        System.out.println("死信队列消费订单消息" + msg);
    }
}

普通队列消费者

@Component
public class OrderConsumer {
    /**
     * 监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_queue")
    public void orderConsumer(String msg) {
        System.out.println("正常订单消费者消息msg:" + msg);
    }
}

后台队列管理页面如下:

部署方式:死信队列不能够和正常队列存在同一个服务器中,应该分服务器存放。

延迟队列

订单30分钟未支付,系统自动超时关闭的实现方案。

基于任务调度实现,效率是非常低。

基于redis过期key实现,key失效时会回调客户端一个方法。

用户下单的时候,生成一个令牌(有效期)30分钟,存放到我们redis;缺点:非常冗余,会在表中存放一个冗余字段。

基于mq的延迟队列(最佳方案)rabbitmq情况下。

原理:在我们下单的时候,往mq投递一个消息设置有效期为30分钟,但该消息失效的时候(没有被消费的情况下),执行我们客户端一个方法告诉我们该消息已经失效,这时候查询这笔订单是否已经支付。

实现逻辑:

主要使用死信队列来实现。

想要的代码:就是正常的消费者不消费消息,或者没有正常的消费者,在设置的时间后进入死信队列中,然后死信消费者实现相应的业务逻辑。

RabbitMQ消息幂等问题

RabbitMQ消息自动重试机制

当消费者业务逻辑代码中,抛出异常自动实现重试 (默认是无数次重试)

应该对RabbitMQ重试次数实现限制,比如最多重试5次,每次间隔3s;重试多次还是失败的情况下,存放到死信队列或者存放到数据库表中记录后期人工补偿。因为重试失败次数之后,队列会自动删除这个消息。

消息重试原理: 在重试的过程中,使用aop拦截我们的消费监听方法,也不会打印这个错误日志。如果重试多次还是失败,达到最大失败次数的时候才会打印错误日志。

如果消费多次还是失败的情况下:

1、自动删除该消息;(消息可能丢失)

解决办法:

如果充实多次还是失败的情况下,最终存放到死信队列;

采用表日志记,消费失败错误日志的日志记录,后期人工自动对该消息实现补偿。

合理的选择重试机制

消费者获取消息后,调用第三方接口(HTTP请求),但是调用第三方接口失败呢?是否需要重试 ?

答:有时是因为网络异常调用失败,应该需要重试几次。

消费者获取消息后,应该代码问题抛出数据异常,是否需要重试?

答:不需要重试,代码异常需要重新修改代码发布项目。

消费者开启手动ack模式

第一步、springboot项目配置需要开启ack模式

acknowledge-mode: manual

第二步、消费者Java代码

int result = orderMapper.addOrder(orderEntity);
if (result >= 0) {
    // 开启消息确认机制
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

rabbitMQ如何解决消息幂等问题

什么是消息幂等性?MQ消费者如何保证幂等性?

产生的原因:就是因为消费者可能会开启自动重试,重试过程中可能会导致消费者业务逻辑代码重复执行。此刻消息已经消费了,因为业务报错导致消息重新消费,这时会出现

解决方案:采用消息全局id根据业务来定,根据业务id(全局唯一id)消费者可以判断这条消息已经消费了。

消费者代码逻辑:

RabbitMQ解决分布式事务问题

分布式事务:在分布式系统中,因为跨服务调用接口,存在多个不同的事务,每个事务都互不影响。就存在分布式事务的问题。

解决分布式事务核心思想:数据最终一致性。

分布式领域中名词:

强一致性 :要么同步速度非常快或者采用锁的机制 不允许出现脏读;

强一致性解决方案:要么数据库A非常迅速的将数据同步给数据B,或者数据库A没有同步完成之前数据库B不能够读取数据。

弱一致性: 允许读取的数据为原来的脏数据,允许读取的结果不一致性。

最终一致性: 在我们的分布式系统中,因为数据之间同步通过网络实现通讯,短暂的数据延迟是允许的,但是最终数据必须要一致性。

基于RabbitMQ解决分布式事务的思路

基于RabbitMQ解决分布式事务的思路:(采用最终一致性的方案)

  • 确认我们的生产者消息一定要投递到MQ中(消息确认机制)投递失败 就继续重试
  • 消费者采用手动ack的形式确认消息实现消费 注意幂等性问题,消费失败的情况下,mq自动帮消费者重试。
  • 保证我们的生产者第一事务先执行,如果执行失败采用补单队列(给生产者自己事务补充,确保生产者第一事务执行完成【数据最终一致性】)。

解决思路图:核心是利用mq发送消息给其他系统将数据修改回来。

到此这篇关于Java RabbitMQ详解常见问题的解决的文章就介绍到这了,更多相关Java RabbitMQ内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java RabbitMQ 中的消息长期不消费会过期吗

    目录 1. 默认情况 2. TTL 2.1 单条消息过期 2.2 队列消息过期 2.3 特殊情况 3. 死信队列 3.1 死信交换机 3.2 死信队列 3.3 实践 4. 小结 RabbitMQ 中的消息长期未被消费会过期吗?用过 RabbitMQ 的小伙伴可能都有这样的疑问,今天松哥就来和大家捋一捋这个问题. 1. 默认情况 首先我们来看看默认情况. 默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直

  • Java RabbitMQ的持久化和发布确认详解

    目录 1.持久化 1.1实现持久化 1.2不公平分发 1.3测试不公平分发 1.4预取值 1.4.1代码测试 2.发布确认 2.1单个确认发布 2.2批量确认发布 2.3异步确认发布 2.4处理未确认的消息 总结 1. 持久化 当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失.默认情况下RabbitMQ退出或者崩溃时,会忽视掉队列和消息.为了保证消息不丢失需要将队列和消息都标记为持久化. 1.1 实现持久化 1.队列持久化:在创建队列时将channel.queueDeclare();第

  • Java RabbitMQ的工作队列与消息应答详解

    目录 WorkQueues 1.轮询分发消息 1.1抽取工具类 1.2编写两个工作线程 1.3编写生产者 1.4运行测试 1.5异常情况 2.消息应答 2.1自动应答 2.2手动应答 2.3消息自动重新入队 2.4手动应答测试 2.4.1生产者代码 2.4.2消费者代码 2.4.3测试 总结 Work Queues 工作队列(任务队列)主要思想是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后执行.我们把任务封装为消息并将其发送到队列.在后台运行的工作进程将弹出任务并最终执

  • 浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式;2.并行方式 a.串

  • Java搭建RabbitMq消息中间件过程详解

    这篇文章主要介绍了Java搭建RabbitMq消息中间件过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 前言 当系统中出现"生产"和"消费"的速度或稳定性等因素不一致的时候,就需要消息队列. 名词 exchange: 交换机 routingkey: 路由key queue:队列 控制台端口:15672 exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过ro

  • Java Rabbitmq中四种集群架构的区别详解

    目录 主备模式 远程模式 镜像模式 多活模式 Federation插件 总结 Rabbitmq 四种集群架构 1. 主备模式 2. 远程模式3. 镜像模式  4. 多活模式 主备模式 主备模式: warren 兔子窝 一个主.一个备方案 主节点如果挂了 从节点提供服务 和Activemq 利用zk 做主/备一样 主备模式 ----------------------->HaProxy 配置 listen rabbitmq_cluster bind 0.0.0.0:5682 # 配置tcp 模式

  • JAVA获取rabbitmq消息总数过程详解

    公司使用的是rabbitMQ,需要做监控预警的job去监控rabbitMQ里面的堆积消息个数,如何使用rabbitMQ获取监控的队列里面的队列消息个数呢? 首先需要创建一个连接,配置文件注入相关的值,然后设置连接的相关信息,创建链接. 导入的包是使用: import com.rabbitmq.client @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}

  • java中RabbitMQ高级应用

    目录 1.消息可靠性投递 1.1.确认模式 1.2.退回模式 1.3.确认机制 2.消费端限流 3.消息过期时间 4.死信队列 4.1.死信概念 4.2.延迟队列 1.消息可靠性投递  在使用 RabbitMQ 的时候,生产者在进行消息投递的时候如果想知道消息是否成功的投递到对应的交换机和队列中,有两种方式可以用来控制消息投递的可靠性模式 .  由上图的整个消息的投递过程来看,生产者的消息进入到中间件中会首先到达交换机,然后再从交换机传递到队列中去,也就是分为两步走战略.那么消息的丢失情况也就是

  • Java RabbitMQ的TTL和DLX全面精解

    目录 RabbitMQ的TTL 1.TTL概述 2.设置消息有效期 2.1.通过队列设置有效期 2.2.通过发送消息时设置有效期 3.设置队列有效期(不常用,仅作了解) RabbitMQ的DLX 1.DLX是什么 2.DLX有什么用 3.DLX使用方式 本节继续介绍RabbitMQ的高级特性:TTL(Time-To-Live消息有效期)和DLX(Dead-Letter-Exchange死信交换机.死信队列) RabbitMQ的TTL 1.TTL概述 RabbitMQ的TTL全称为Time-To-

  • Java RabbitMQ消息队列详解常见问题

    目录 消息堆积 保证消息不丢失 死信队列 延迟队列 RabbitMQ消息幂等问题 RabbitMQ消息自动重试机制 合理的选择重试机制 消费者开启手动ack模式 rabbitMQ如何解决消息幂等问题 RabbitMQ解决分布式事务问题 基于RabbitMQ解决分布式事务的思路 消息堆积 消息堆积的产生场景: 生产者产生的消息速度大于消费者消费的速度.解决:增加消费者的数量或速度. 没有消费者进行消费的时候.解决:死信队列.设置消息有效期.相当于对我们的消息设置有效期,在规定的时间内如果没有消费的

  • SpringBoot整合rockerMQ消息队列详解

    目录 Springboot整合RockerMQ 使用总结 消费模式 生产者组和消费者组 生产者投递消息的三种方式 如何保证消息不丢失 顺序消息 分布式事务 Springboot整合RockerMQ 1.maven依赖 <dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>

  • MSMQ微软消息队列详解

    一.引言 Windows Communication Foundation(WCF)是Microsoft为构建面向服务的应用程序而提供的统一编程模型,该服务模型提供了支持松散耦合和版本管理的序列化功能,并提供了与消息队列(MSMQ).COM+.Asp.net Web服务..NET Remoting等微软现有的分布式系统技术.利用WCF平台,开发人员可以很方便地构建面向服务的应用程序(SOA).可以认为,WCF是对之前现有的分布式技术(指的是MSMQ..NET Remoting和Web 服务等技术

  • Java中消息队列任务的平滑关闭详解

    前言 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 消息队列应用场景 消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景. 本文主要给大家介绍的是关于Java中消息队列任务平滑关闭的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 1.问题背

  • Java 阻塞队列详解及简单使用

     Java 阻塞队列详解 概要: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 认识BlockingQueue阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 从上图我们可以很清楚看到,通过一个共享的队列,

  • Java队列篇之实现数组模拟队列及可复用环形队列详解

    队列简介 队列是一个有序列表,可以用数组或是链表来实现. 遵循先入先出的原则.即先存入队列的数据,先取出,后存入的后取出. 示意图:(使用数组模拟队列示意图) 有两个分别指向头部和尾部的"指针". 数组模拟队列(无法复用) 1.实现思路 队列本身是有序列表,若使用数组的结构来存储队列的数据,则队列数组的声明如下图,其中maxSize是该队列的最大容量. 因为队列的输出.输入是分别从前后端来处理,因此需要两个变量front及rear分别记录队列前后端的下标,front会随着数据输出而改变

  • Java多线程案例之阻塞队列详解

    目录 一.阻塞队列介绍 1.1阻塞队列特性 1.2阻塞队列的优点 二.生产者消费者模型 2.1阻塞队列对生产者的优化 三.标准库中的阻塞队列 3.1Java提供阻塞队列实现的标准类 3.2Blockingqueue基本使用 四.阻塞队列实现 4.1阻塞队列的代码实现 4.2阻塞队列搭配生产者与消费者的代码实现 一.阻塞队列介绍 1.1阻塞队列特性 阻塞队列特性: 一.安全性 二.产生阻塞效果 阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.阻塞队列能是一种线程安全的数据结构, 并且具有

  • RocketMq 消息重试机制及死信队列详解

    目录 生产者消息重试 消费者消息重试 并发消费 顺序消费 并发消费和顺序消费区别 死信队列 实践出真知 公共部分创建 测试并发消费 并发消费状态 测试顺序消费 顺序消费状态 测试死信队列 死信队列特性 生产者消息重试 消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了. 有时因为网路等原因生产者也可能发送消息失败,也会进行消息重试,生产者消息重试比较简单,在springboot中只要在配置文件中配置一下就可以了. # 异步消息发送

  • spring boot中使用RabbitMQ routing路由详解

    在上一个教程中我们创建了一个扇形(fanout)交换器.我们能把消息已广播的形式传递给多个消费者. 要做什么?Routing 路由 在这个教程中,添加一个新的特性,我们可以只订阅消息的一部分.例如,将只连接我们感兴趣的颜色("orange", "black", "green"),并且把消息全部打印在控制台上. 绑定 交换器和队列是一种绑定关系.简单的理解为:队列对来自这个交换器中的信息感兴趣. 绑定可以加上一个额外的参数routingKey.Sp

随机推荐