解决RabbitMq消息队列Qos Prefetch消息堵塞问题

mq是实现代码扩展的有利手段,个人喜欢用概念来学习新知识,介绍堵塞问题的之前,先来段概念的学习。

ConnectionFactory:创建connection的工厂类

Connection: 简单理解为socket

Channel:和mq交互的接口,定义queue、exchange和绑定queue、exhange等接口都是它。

接下来就是和mq的交互类

exchange:简单地看成路由,类型不是重点,看看官网即可

queue:客户端监听的是queue,而不是exchange,但是使用queue的前提要先将exchange和queue绑定。用过java queue工具类应该很容易上手,queue分为写和读,各自可以有自己频率,写得快读得慢,容易堵塞;写得慢读得快又容易造成消费者的空闲。

Prefetc:一个重要却容易被忽略的指标,也是这次遇到的问题。

prefetch与消息投递

prefetch是指单一消费者最多能消费的unacked messages数目。

如何理解呢?

mq为每一个 consumer设置一个缓冲区,大小就是prefetch。每次收到一条消息,MQ会把消息推送到缓存区中,然后再推送给客户端。当收到一个ack消息时(consumer 发出baseack指令),mq会从缓冲区中空出一个位置,然后加入新的消息。但是这时候如果缓冲区是满的,MQ将进入堵塞状态。

更具体点描述,假设prefetch值设为10,共有两个consumer。也就是说每个consumer每次会从queue中预抓取 10 条消息到本地缓存着等待消费。同时该channel的unacked数变为20。而Rabbit投递的顺序是,先为consumer1投递满10个message,再往consumer2投递10个message。如果这时有新message需要投递,先判断channel的unacked数是否等于20,如果是则不会将消息投递到consumer中,message继续呆在queue中。之后其中consumer对一条消息进行ack,unacked此时等于19,Rabbit就判断哪个consumer的unacked少于10,就投递到哪个consumer中。

我遇到的问题是一个粗心的程序员,在编写代码的时候,他对某些消息处理方式是这样的

  if (success) {
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                    } else {
                        logger.error("######### The message is not delete from queue : {}", body);
                    }

首先他讲ack机制设置为手动的,然后他的理解是如果处理成功的消息,就ack给MQ,期望MQ就可以删除完成的数据。不然,保留数据再次被处理。

这里的误区就是就是对ack的理解,失败的时候,如果需要让程序继续处理,应该使用basicNack,并告诉mq将消息再次放入队列

    if (success) {
                            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                        } else {
                            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                        }

对于客户端意外宕机的情况,没有ack服务器确实不会删除掉数据,但是consumer重启以后,对于服务器就是一个新的消费者了,也就是它的缓冲区又被重置为原来的n-prefetch,所以这个问题被粗心的小哥想当然地测试通过了。

prefetch的大小应该为多少

这篇文章给了很好的建议,我简单地说一下我的理解。

理想状况下,计算MQ SERVER 从缓冲区中拿到消息并推送到消费端,加上消费端处理完ack消息到MQ server,的时间,假设为100ms,其中消费端处理业务话费了10ms。

这里可以得出我们 prefetch = 100ms / 10ms = 10,也就是消息来回的总时间/业务处理的时间,这里要求我们 prefetch >= 10。一般计算这个时间不会太准确只能毛姑姑的,所以prefetch一般要大一点。但是这个值也不能太大,不然消费端就一只处于空闲状态了。

所以如果你保证所有的消息都ack了,但是还是出现比较长时间的堵塞,你就或者加大一点prefetch,或者多加一些机器,或者减少业务处理的时间了。一开始建议采用或者,使用一个线程池来处理这些业务逻辑。

以上就是解决RabbitMqQosPrefetch消息堵塞问题的详细内容,更多关于RabbitMqQosPrefetch消息堵塞的资料请关注我们其它相关文章!

(0)

相关推荐

  • SpringBoot+RabbitMQ+Redis实现商品秒杀的示例代码

    目录 业务分析 创建表 功能实现 1.用户校验 2.下单 3.减少库存 4.支付 总结 业务分析 一般而言,商品秒杀大概可以拆分成以下几步: 用户校验 校验是否多次抢单,保证每个商品每个用户只能秒杀一次 下单 订单信息进入消息队列,等待消费 减少库存 消费订单消息,减少商品库存,增加订单记录 付款 十五分钟内完成支付,修改支付状态 创建表 goods_info 商品库存表 列 说明 id 主键(uuid) goods_name 商品名称 goods_stock 商品库存 package com.

  • RabbitMQ 如何解决消息幂等性的问题

    前言 关于MQ消费者的幂等性问题,在于MQ的重试机制,因为网络原因或客户端延迟消费导致重复消费.使用MQ重试机制需要注意的事项以及如何解决消费者幂等性问题以下将逐一讲解. 1. RabbitMQ自动重试机制 消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这个时候我们如何处理? 使用重试机制,RabbitMQ默认开启重试机制. 实现原理: @RabbitHandler注解 底层使用Aop拦截,如果程序(消费者)没有抛出异常,自动提交事务 如果Aop使用异常通知拦截获取到异常后,自动实现补

  • rabbitmq(中间消息代理)在python中的使用详解

    在之前的有关线程,进程的博客中,我们介绍了它们各自在同一个程序中的通信方法.但是不同程序,甚至不同编程语言所写的应用软件之间的通信,以前所介绍的线程.进程队列便不再适用了:此种情况便只能使用socket编程了,然而不同程序之间的通信便不再像线程进程之间的那么简单了,要考虑多种情况(比如其中一方断线另一方如何处理:消息群发,多个程序之间的通信等等),如果每遇到一次程序间的通信,便要根据不同情况编写不同的socket,还要维护.完善这个socket这会使得编程人员的工作量大大增加,也使得程序更易崩溃

  • Java编程rabbitMQ实现消息的收发

    java实现rAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然. AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全. RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.Actio

  • springboot2.0集成rabbitmq的示例代码

    安装rabbitmq 简介: rabbitmq即一个消息队列,主要用来实现应用程序的异步和解耦,消息缓冲,消息分发的作用. 由于rabbitmq依赖于erlang语言,所以先安装erlang: 添加erlang solutions源 $ wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm $ sudo rpm -Uvh erlang-solutions-1.0-1.noarch.rpm $ su

  • SpringBoot整合RabbitMQ消息队列的完整步骤

    SpringBoot整合RabbitMQ 主要实现RabbitMQ以下三种消息队列: 简单消息队列(演示direct模式) 基于RabbitMQ特性的延时消息队列 基于RabbitMQ相关插件的延时消息队列 公共资源 1. 引入pom依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • 解决RabbitMq消息队列Qos Prefetch消息堵塞问题

    mq是实现代码扩展的有利手段,个人喜欢用概念来学习新知识,介绍堵塞问题的之前,先来段概念的学习. ConnectionFactory:创建connection的工厂类 Connection: 简单理解为socket Channel:和mq交互的接口,定义queue.exchange和绑定queue.exhange等接口都是它. 接下来就是和mq的交互类 exchange:简单地看成路由,类型不是重点,看看官网即可 queue:客户端监听的是queue,而不是exchange,但是使用queue的

  • Python中线程的MQ消息队列实现以及消息队列的优点解析

    "消息队列"是在消息的传输过程中保存消息的容器.消息队列管理器在将消息从它的源中继到它的目标时充当中间人.队列的主要目的是提供路由并保证消息的传递:如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它.相信对任何架构或应用来说,消息队列都是一个至关重要的组件,下面是十个理由: Python的消息队列示例: 1.threading+Queue实现线程队列 #!/usr/bin/env python import Queue import threading import

  • RocketMQ消息队列实现随机消息发送当做七夕礼物

    目录 正文 1 下载并启动RocketMQ 1.1 首先启动name server 1.2 然后启动Broker 2 生产者 3 消费者 正文 都在过情人节,前端的小哥哥们给女朋友画个页面,美美的,写个chrome插件,好看的,俺们后端程序员咋办. 我给媳妇写首诗,哈哈 我决定,把想对媳妇说的,今天发送到一个MQ里边,然后在七夕当天,打开消费者,将这一段话给俺媳妇看.你看,这就是我好久前对你说的话,这就是我们后端程序员的浪漫.当然也可以多发送几个,到时候跟根据topic控制到底发什么,哈哈. 这

  • php基于Redis消息队列实现的消息推送的方法

    基本知识点 重点用到了以下命令实现我们的消息推送 brpop 阻塞模式 从队列右边获取值之后删除 brpoplpush 从队列A的右边取值之后删除,从左侧放置到队列B中 逻辑分析 在普通的任务脚本中写入push_queue队列要发送消息的目标,并为目标设置一个要推送的内容,永不过期 RedisPushQueue中brpoplpush处理,处理后的值放到temp_queue,主要防止程序崩溃造成推送失败 RedisAutoDeleteTempqueueItems处理temp_queue,这里用到了

  • 浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式;2.并行方式 a.串

  • 详解消息队列及RabbitMQ部署和使用

    目录 什么是消息队列 为什么需要消息队列 常见的消息队列 ActiveMQ RabbitMQ ZeroMQ Kafka RocketMQ RabbitMQ 的部署和使用 Python 编写生产者 Python 编写消费者 最后的话 什么是消息队列 消息队列拆开了看,就是消息 + 队列,消息是什么?其实就是程序之间通讯所用到的数据,消息从生产者那里产生,进入队列后,安装设计好的规则出队,由消费者消费.仅此而已. 为什么需要消息队列 消息队列,最重要的是队列,可以想象一下没有队列的场景,你去银行办业

  • rabbitmq学习系列教程之消息应答(autoAck)、队列持久化(durable)及消息持久化

    目录 一.前言 二.autoAck参数的讨论 三.rabbitmq队列持久化操作 四.2019.11.04问题补充 五.2019.11.07消息的持久化 六.2022.02.09增加队列持久化说明 结语 一.前言 Boolean autoAck = false; channel.basicConsume(queue_name, autoAck ,consumer); 在simple queue 和 work queue(轮询) 处理中,我们设置的消费者的消息监听都采用 channel.basic

  • Java RabbitMQ消息队列详解常见问题

    目录 消息堆积 保证消息不丢失 死信队列 延迟队列 RabbitMQ消息幂等问题 RabbitMQ消息自动重试机制 合理的选择重试机制 消费者开启手动ack模式 rabbitMQ如何解决消息幂等问题 RabbitMQ解决分布式事务问题 基于RabbitMQ解决分布式事务的思路 消息堆积 消息堆积的产生场景: 生产者产生的消息速度大于消费者消费的速度.解决:增加消费者的数量或速度. 没有消费者进行消费的时候.解决:死信队列.设置消息有效期.相当于对我们的消息设置有效期,在规定的时间内如果没有消费的

  • Spring Boot 使用 Disruptor 做内部高性能消息队列

    目录 Disruptor介绍 Disruptor 的核心概念 Ring Buffer Sequence Disruptor Sequencer Sequence Barrier Wait Strategy Event EventProcessor EventHandler Producer 案例-demo 总结 工作中遇到项目使用Disruptor做消息队列,对你没看错,不是Kafka,也不是rabbitmq.Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录. D

随机推荐