RocketMQ的push消费方式实现示例

目录
  • 引言
  • MQ消费方式
    • 1、push(推方式)
    • 2、pull(拉方式)
  • RocketMQ对于消费方式的实现
  • RocketMQ聪明地实现push的原因
  • 轮询与长轮询
    • 轮询
    • 长轮询
  • push消费方式源码探究
    • 1、消费者拉取消息控制压力源码
    • 2、MQ将请求hold住源码
    • 3、MQ收到消息响应给消费者的源码
  • 最后

引言

最近仍然畅游在RocketMQ的源码中,这几天刚好翻到了消费者的源码,发现RocketMQ的对于push消费方式的实现简直太聪明了,所以趁着我脑子里还有点印象的时候,赶紧来写一篇文章,来掰扯一下,防止过两天就忘得一干二净了。

MQ消费方式

消费方式就是指消费者如何从MQ中获取到消息,分为两种方式,push(推方式)和pull(拉方式)。

1、push(推方式)

push,顾名思义,就是推的意思。就是当MQ收到生产者产生的消息的时候,会主动将消息推送到消费者进行消费,这种模式就叫push,也就是MQ将消息推给到消费者的意思。

push模式

push这种模式的好处就是响应快,消息的实时性比较高,一旦消息MQ收到消息,那么就能立马将消息推送给消费者,消费者也就能立马收到消息进行消费。

但是这种push的模式,有个缺点就是一旦消息量比较大时,对消费者性能要求比较高,因为是消费者无法控制MQ消息的推送速度,一旦消息量大,那么消费者消费消息的压力就比较大。

2、pull(拉方式)

push是MQ主动给消费者推消息,那么pull呢?刚好跟push相反,就是消费者主动去MQ中拉取消息。

pull模式

那么pull的优缺点自然也就跟push刚好相反。因为是消费者主动去MQ中拉取消息,那么消费者可根据自身消费的情况,决定何时去拉取消息,主动权在自己手上,这样消费者的压力就会相对小点;但是缺点也很明显,那么就会实时性相对于push方式会低一些,因为你得决定拉的时间间隔。

其实想想,消费方式就跟拿快递一样,快递就是一个消息,我自己就是消费者,快递要么快递小哥主动送(push)到家,要么我自己去快递站拿(pull)。

RocketMQ对于消费方式的实现

上一节说了消费消息的两种方式push和pull,或者说算一种理念。尚大的周阳老师有一句经常说的话我比较赞同,那就是“天上飞的理念,必然有落地的实现”。所以push或者pull到底如何落地,得看具体的MQ的产品了。

而RocketMQ作为阿里开源的一款高性能、功能丰富的MQ,自然同时实现了push和pull的两种消费方式,用户可以选择在项目中使用push还是pull。

push模式的实现

pull模式的实现

但是一般情况下,项目中都是使用push的方式来消费,因为pull除了时实性差外,pull方式还得让开发人员主动去维护消息消费进度,增加额外的操作。

所以接下来就着重讲一下RocketMQ是如何实现push的逻辑。

RocketMQ聪明地实现push的原因

上文说到push模式的优点是时实性好,但是缺点就是消费者压力会比较大,所以,难道实现push模式,只能舍弃压力的控制么?

就在这时,RocketMQ大喊了一声

是的,RocketMQ对于push模式做到了实时和压力的平衡,这主要是因为RocketMQ的push模式其实算是一个“伪push”模式,真正底层的实现还是基于pull。

到这里可能有的小伙伴比较迷糊,怎么push变成“伪push”了,还是用pull实现的,到底是push还是pull?

前面我说过,push和pull只是一种理论,具体的实现看MQ。

所以RocketMQ为了兼顾两者,就选择通过消费者主动拉消息来实现push的效果,这也是为什么我称为“伪push”的原因,RocketMQ都给封装好了,让你用起来感觉是MQ主动push消息给你的。

既然底层是pull,那么RokcetMQ在实现消费者的逻辑的时候,就可以很容易实现控制压力的效果,毕竟这是“拉”方式天然自带的buff;但是如何通过pull实现push的时实的优点呢?毕竟鱼和熊掌我RokcetMQ偏要兼得。

这时这就不得不提到一种叫“长轮询”的机制。

轮询与长轮询

轮询与长轮询都属于pull的实现,都是由客户端主动给服务端发送请求,拉取数据。套到MQ中,就是都是消费者主动去MQ拉消息。

轮询

轮询是指不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。

再拿快递举例子,轮询就好比,小明买的iphone 13 pro max快递到了,显示正在派送中,但是小明等不及了,于是就去快递站拿,但是快递还没放到快递站,但是小明的心里急啊,他忍受不了相思之苦,于是小明每隔5分钟就往快递站跑一次,问一下快递到了没,到了就拿回来。这就是轮询的意思,也就是不论有没有数据,客户端都会每隔一定时间去请求一次服务端。

来分析一下拿快递的例子的问题:

  • 每隔5分钟就往快递站跑,那不是累死个小明么。
  • 还有一个问题,假设刚跑到快递站,快递没到,就回去了,但是刚到家的时候,快递到了,于是又等了5分钟,再去快递站终于拿到快递了,但是其实快递都到了几分钟了,你还是没有第一时间拿到快递,这就造成了延迟。

从而对应到程序中,就是会产生如下问题

  • 对于消息而言,会一直产生,这就要求消费者不停地间隔一定时间去拉取消息,即使没有消息也需要去请求,就会造成大量无用的请求,白白浪费大量耗费服务器内存和宽带资源。
  • 可能造成数据的延迟

长轮询

说长轮询概念之前,先来救救小明吧,毕竟小明可不想狗带。

既然原先小明每隔5分钟跑一次,那么是不是可以换种思路,当快递还没到的时候,让小明不要回来,直接在快递站待着,当快递到的时候,才让小明拿着快递回家。这下小明就喜死了,既可以有时间刷刷某音,逛逛某东,还可以在第一时间拿到13 pro max。

所以这种可以在快递站等待的机制,就叫长轮询。

长轮询也是客户端请求服务端,如果服务端有数据,那么就立马返回,客户端再次请求;当服务端不存在数据的时候,服务端并不会给客户端响应,而是将请求给hold住,当服务端有数据的时候才会给客户端响应,返回数据。

所以长轮询可以解决如下问题

  • 解决轮询带来的频繁请求服务端但是没有的问题
  • 一旦新的数据到了,那么消费者能立马就可以获取到新的数据,所以从效果上,有点像是push的感觉。

但是长轮询也会带来服务端代码实现逻辑复杂的问题,当然相比于优点来说,都不太重要。

push消费方式源码探究

理论都讲完了,接下来就到了show me the code的时间了,来看看RocketMQ的是如何通过长轮询机制来实现压力和时实的平衡。

这里我画了一张push模式下消费者消费流程图。

消费者拉取消息的逻辑

  • ①消费者有一个后台线程,会去处理拉取消息(PullRequest)
  • ②先去判断有没有过多消息没有消费,如果有的话,那么就间隔一定时间再次从①开始执行拉取消息的逻辑
  • ③消费者没有过多消息没有消费,那么就会直接向MQ发送拉取消息的请求,有消息就返回,没有消息就hold住请求,等有新的消息到的时候才返回
  • ④消费者获取到消息之后,会去找用户自定义的消息处理逻辑的实现(MessageListener的实现)去消费消息,同时会再次拉取消息,继续从①开始执行逻辑

1、消费者拉取消息控制压力源码

当消费者准备去拉消息的时候,会先去判断当前消费者消费的压力再决定是否去拉取消息。

RocketMQ提供了两种判断消费压力逻辑,一种是基于还未消费的消息的数量的大小,还有一种是基于还未消费的消息所占内存的大小。

控制压力源码

  • 判断还未消费消息的数量,数量太多就等会再执行重新执行拉取消息的逻辑
  • 判断还未消费消息的大小,如果还未消息的消息占用的内存过大,就等会再执行重新执行拉取消息的逻辑

总的一句话就是,当消费者消费的压力过大时,就不会去拉取消息,而是等待一定的时间再去执行拉取消息的逻辑,如果压力还是很大,就还继续等,如此循环,直到消费者的消费压力小于阈值的时候,才会真正的发送请求到MQ中拉取消息。

2、MQ将请求hold住源码

当服务端未找到消息时,就将请求进行挂起,存起来

请求hold住源码

拉取不到消息时,会调用PullRequestHoldService的suspendPullRequest方法讲请求存储起来。PullRequestHoldService是用来存储拉取请求的类。

PullRequestHoldService

suspendPullRequest方法会将请求分类,放到ManyPullRequest里,然后用一个ConcurrentHashMap进行存储

3、MQ收到消息响应给消费者的源码

NotifyMessageArrivingListener

当生产者发送的消息达到MQ的时候,MQ会回调NotifyMessageArrivingListener的arriving方法,之后就会调用PullRequestHoldService的notifyMessageArriving方法,MQ会重新处理拉取消息的逻辑,此时就能找到最新来的那条消息,从而将最新的消息通过网络返回给消费者。

notifyMessageArriving和返回消息逻辑

最后

所以从以上的分析可以看出,RocketMQ对于push的消费方式的实现是基于长轮询机制来实现的,同时平衡了时实和压力,这其实就很nice了。

最后我想说一句,其实不论是pull还是push,又或是轮询和长轮询,其实都是一种理论或者说是一种思想,不单单是MQ的东西,就比如在Nacos中,也使用了push和长轮询机制。但是这些理论在不同产品的具体实现,实现方式可能不太一样,但都是大同小异,所以当你懂了这些思想,再看其它框架的源码,其实就很容易了。

以上就是RocketMQ的push消费方式实现示例的详细内容,更多关于RocketMQ push消费方式的资料请关注我们其它相关文章!

(0)

相关推荐

  • RocketMQ4.5.X 实现修改生产者消费者日志保存路径

    RocketMQ修改生产者消费者日志保存路径 rocket默认是将所有日志文件保存到user.home的对于win系统就是C盘了. 1.修改RocketMQ中CLientLogger.class的源码,把经过更改的源码重新打包后,去自己的maven仓库替换rocketmq-client.-4.5.X.jar. 2.对于生产者的启动类里需要配置JVM系统属性: 如果不设置logUserSlf4j为true的话,启动生产者的时候会报找不到日志配置文件的警告. 3.对于消费者,仅仅添加rq.lordi

  • 基于rocketmq的有序消费模式和并发消费模式的区别说明

    rocketmq消费者注册监听有两种模式 有序消费MessageListenerOrderly和并发消费MessageListenerConcurrently,这两种模式返回值不同. MessageListenerOrderly 正确消费返回 ConsumeOrderlyStatus.SUCCESS 稍后消费返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT MessageListenerConcurrently 正确消费返回 Consu

  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

  • rocketmq消费负载均衡--push消费详解

    前言 本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析,共分为6个部分进行介绍,其中第6个部分 rebalanceByTopic 为负载均衡的核心逻辑模块,具体过程运用了图文进行阐述. 介绍之前首先抛出几个问题: 1. 要做负载均衡,首先要解决的一个问题是什么? 2. 负载均衡是Client端处理还是Broker端处理? 个人理解: 1. 要做负载均衡

  • 详解RocketMQ中的消费者启动与消费流程分析

    目录 一.简介 1.1 RocketMQ 简介 1.2 工作流程 二.消费者启动流程 2.1 实例化消费者 2.2 设置NameServer和订阅topic过程 2.2.1 添加tag 2.2.2 发送心跳至Broker 2.2.3上传过滤器类至FilterServer 2.3 注册回调实现类 2.4 消费者启动 三.pull/push 模式消费 3.1 pull模式-DefaultMQPullConsumer 3.2 push模式-DefaultMQPushConsumer 3.3 小结 四.

  • RocketMQ的push消费方式实现示例

    目录 引言 MQ消费方式 1.push(推方式) 2.pull(拉方式) RocketMQ对于消费方式的实现 RocketMQ聪明地实现push的原因 轮询与长轮询 轮询 长轮询 push消费方式源码探究 1.消费者拉取消息控制压力源码 2.MQ将请求hold住源码 3.MQ收到消息响应给消费者的源码 最后 引言 最近仍然畅游在RocketMQ的源码中,这几天刚好翻到了消费者的源码,发现RocketMQ的对于push消费方式的实现简直太聪明了,所以趁着我脑子里还有点印象的时候,赶紧来写一篇文章,

  • RocketMQ Push 消费模型示例详解

    目录 使用 DefaultMQPushConsumer 消费消息 基于长轮询机制的伪 push 实现 客户端侧发起的长轮询请求 服务端阻塞请求 客户端回调处理 客户端发起请求的底层逻辑 PullCallback 回调 总结 Push 模式是指由 Server 端来控制消息的推送,即当有消息到 Server 之后,会将消息主动投递给 client(Consumer 端). 使用 DefaultMQPushConsumer 消费消息 下面是使用 DefaultMQPushConsumer 消费消息的

  • java多线程之线程同步七种方式代码示例

    为何要使用同步?  java允许多线程并发控制,当多个线程同时操作一个可共享的资源变量时(如数据的增删改查),     将会导致数据不准确,相互之间产生冲突,因此加入同步锁以避免在该线程没有完成操作之前,被其他线程的调用,     从而保证了该变量的唯一性和准确性. 1.同步方法  即有synchronized关键字修饰的方法.     由于java的每个对象都有一个内置锁,当用此关键字修饰方法时,     内置锁会保护整个方法.在调用该方法前,需要获得内置锁,否则就处于阻塞状态.     代码

  • Map集合的四种遍历方式代码示例

    很久以前写的代码,和上一个做比较吧!便于以后查看. import java.util.HashMap; import java.util.Iterator; import java.util.Map; public class TestMap { public static void main(String[] args) { Map<Integer, String> map = new HashMap<Integer, String>(); map.put(1, "a&

  • Hibernate中获取Session的两种方式代码示例

    Session:是应用程序与数据库之间的一个会话,是Hibernate运作的中心,持久层操作的基础.对象的生命周期/事务的管理/数据库的存取都与Session息息相关. Session对象是通过SessionFactory构建的,下面举个例子来介绍Hibernate两种获取session的方式. 日志,是编程中很常见的一个关注点.用户在对数据库进行操作的过程需要将这一系列操作记录,以便跟踪数据库的动态.那么一个用户在向数据库插入一条记录的时候,就要向日志文件中记录一条记录,用户的一系列操作都要在

  • vue实现跳转接口push 转场动画示例

    1.index.js 配置子路由children. import Vue from 'vue' import Router from 'vue-router' import SingerDetail from 'components/singer-detail/singer-detail' Vue.use(Router) export default new Router({ routes: [ { path: '/', redirect: '/recommend' }, { path: '/s

  • Python安全获取域管理员权限几种方式操作示例

    目录 第1种方式:利用GPP漏洞获取域管理权限 第2种方式:获取服务器明文登录密码 第3种方式:使用MS14-068漏洞进行提权 第4种方式:窃取域管理员令牌 第5种方式:进程迁移 在大多数情况下,攻击者可以通过定位域管理员所登录的服务器,利用漏洞获取服务器system权限,找到域管理的账号.进程或是身份验证令牌,从而获取域管理员权限 第1种方式:利用GPP漏洞获取域管理权限 SYSVOL是域内的共享文件夹,用来存放登录脚本.组策略脚本等信息.当域管理员通过组策略修改密码时,在脚本中引入用户密码

  • Spring原生Rpc六种的正确打开方式实现示例

    目录 前言 什么是Rpc? Spring中的Rpc 定义服务接口 调用服务代码 WEBSERVICE的RPC实现 服务提供者 服务实现 服务暴露 服务消费者 HTTP的RPC实现 服务提供者 服务实现 服务暴露 服务消费者 文末结语 前言 在java生态圈谈到Rpc,很多人可能就会想到Dubbo.Motan.Grpc等框架.但是你知道吗?作为Java编程全家桶的Spring已经内置了多种RPC的实现方式,可以直接使用.存在即合理,有些场景下其实并不需要Dubbo,Grpc等重量级的RPC组件,那

  • spring boot整合log4j2及MQ消费处理系统日志示例

    目录 前言 1.添加相关jar依赖 2.系统log4j2.xml配置 3.添加处理日志的消息监听 前言 当系统的并发比较高的时候,日志的处理输出也是一种性能的开销负担,所以,选择一个中间件来处理消费日志必不可少! 下面是spring boot整合log4j2结合spring amqp来消费处理系统日志的实例,只需要简单的三步 1.添加相关jar依赖 <dependency> <groupId>org.springframework.boot</groupId> <

  • Beego中ORM操作各类数据库连接方式详细示例

    目录 beego中各类数据库连接方式 1.1 orm使用方式 a. 注册数据库驱动程序 b.注册数据库 c. 注册模型 1.2 操作示例 a. orm连接mysql b. orm连接sqlite3 c. orm连接 postgresql 1.3非orm连接方式 a. mysql b. sqlite3 c. postgresql d. mongodb e.sqlserver f.redis beego中各类数据库连接方式 beego 框架是优秀得go REST API开发框架.下面针对beego中

随机推荐