RocketMQ Push 消费模型示例详解

目录
  • 使用 DefaultMQPushConsumer 消费消息
  • 基于长轮询机制的伪 push 实现
    • 客户端侧发起的长轮询请求
    • 服务端阻塞请求
    • 客户端回调处理
    • 客户端发起请求的底层逻辑
    • PullCallback 回调
  • 总结

Push 模式是指由 Server 端来控制消息的推送,即当有消息到 Server 之后,会将消息主动投递给 client(Consumer 端)。

使用 DefaultMQPushConsumer 消费消息

下面是使用 DefaultMQPushConsumer 消费消息的官方示例代码:

// 初始化consumer,并设置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyGroup");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS 为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 启动Consumer
consumer.start();

这里看到主要是通过 consumer 注册回调接口来处理从 Broker 中收到的消息。这种监听回调的机制很容易想到是一种观察者模式或者事件机制;对于这种 C-S 模型的架构来说,如果要做到 Server 在有新消息时立即推送给 Client,那么 Client 和 Server 之间应该是有连接存在的,Client 端开放端口来 watch Server 的推送。这里好论证,即可以查看当前 Client 端所在进程开启了什么端口即可,通过如下指令查看:

  • 1、先通过 jps 查看 Consumer Client 的进程号
➜  rocketmq-4.9.4 git:(06f2208a3) jps
10722 Jps
4676 rocketmq-dashboard-1.0.1-SNAPSHOT.jar
1766
4121 BrokerStartup
4009 NamesrvStartup
9419 PushConsumer
9692 RemoteMavenServer36

可以看到 PushConsumer 的进程号是 9419

  • 2、通过 lsof 命令查看进程端口占用
➜  rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 9419| grep LISTEN
➜

这里没有看到 PushConsumer 有开启端口。同样,这里可以看看 Broker 的进程端口占用

➜  rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 4121| grep LISTEN
java    4121 glmapper  137u    IPv6 0xca1142b0f200067d        0t0                 TCP *:10912 (LISTEN)
java    4121 glmapper  141u    IPv6 0xca1142b0f1fc8cfd        0t0                 TCP *:10911 (LISTEN)
java    4121 glmapper  142u    IPv6 0xca1142b0f1fc935d        0t0                 TCP *:10909 (LISTEN)

所以得到一个初步的结论是,在 Push 模式下,Consumer Client 并没有启动端口来接收 Server 的消息推送。 那么 RocketMQ 是怎么实现的?

基于长轮询机制的伪 push 实现

真正的 Push 方式,是 Server 端接收到消息后,主动把消息推送给 Client 端,这种情况一般需要 Client 和 Server 之间建立长连接。通过前面的分析,Client 既然没有开启端口用于接收 Server 的信息推送,那么只有一种可能就是 Client 自己去拉了消息,但是这种主动拉消息的方式是对于用户无感的,从使用上体验上来看,做到了和 push 一样的效果;这种机制就是“长轮询”。

为啥不用长连接方式,让 Server 主动 Push 呢?其实很好理解,对于一个提供队列服务的 Server 来说,用 Push方式主动推送有两个问题:

  • 1、会增加 Server 端的工作量,进而影响 Server 的性能
  • 2、Client 的处理能力存在差异,Client 的状态不受 Server 控制,如果 Client 不能及时处理 Server 推送过来的消息,会造成各种潜在问题

客户端侧发起的长轮询请求

下图是初始化相关资源的过程,DefaultMQPushConsumer 是面向用户使用的 API client 类,内部处理实际上是委托给 DefaultMQPushConsumerImpl 来处理的。DefaultMQPushConsumerImpl#start 时,会初始化 MQClientInstance ,MQClientInstance 初始化过程中又会初始化一堆资源,比如请求-响应的通道,开启各种各样的调度任务(定期拉去 NameServerAddress、定期更新 Topic 路由信息、定期清理 Offline状态的 Broker、定期发送心跳给 Broker、定期持久化所有 Consumer Offset等等),开启 pullMessageService,开启 rebalance Service 等等。大致的调用链如下图

下面这个代码片段是 pullMessageService 的 run 方法(pullMessageService 是 Runnable 子类)

@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            // 从 pullRequestQueue 中取 pullRequest
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}

通过代码,可以直观的看起,pullMessageService 会一直从 pullRequestQueue 中取 pullRequest,然后执行 pullMessage 请求。实际上 MessageQueue 是和 pullRequest 一一对应的 ,pullRequest 全部存储到该 Consumer 的 pullRequestQueue 队列里面;消费者会不停的从 PullRequest 的队列里取 request 然后向broker 请求消息。

这里还有一个问题是队列取出之后什么时候放回去的?在 pullMessage 的回调方法中,如果正常得到了 broker 的响应,那么会把 PullRequest放回队列,相关代码可以从 org.apache.rocketmq.client.consumer.PullCallbackonSuccess 方法中得到答案。

服务端阻塞请求

服务端处理 pullRequest 请求的是 PullMessageProcessor,当没有消息时,则通过 PullRequestHoldService 将当前请求先 hold 住。

case ResponseCode.PULL_NOT_FOUND:
    if (brokerAllowSuspend && hasSuspendFlag) {
        long pollingTimeMills = suspendTimeoutMillisLong;
        // 如果是 LongPolling,则 hold 住
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
        }
        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
        response = null;
        break;
    }

PullRequestHoldService 中会将所有的 PullRequest 缓存到 pullRequestTable。PullRequestHoldService 也是一个 task,默认每次 hold 5s 然后再去检查是否有新的消息过来,如果有新的消息到来,则唤醒对应的线程来将消息返回给客户端。

// 已省略无关代码
public void run() {
    // loop
    while (!this.isStopped()) {
        // default hold 5s
        if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            this.waitForRunning(5 * 1000);
        } else {
            this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
        }
        long beginLockTimestamp = this.systemClock.now();
        // 检查是否有新的消息到达
        this.checkHoldRequest();
        long costTime = this.systemClock.now() - beginLockTimestamp;
        if (costTime > 5 * 1000) {
            log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
        }
	}
}

客户端回调处理

我们在编写 consumer 代码时,基于 push 模式是通过如下方式来监听消息的

//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS 为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

通过前面的分析,对于如何通过“长轮询”实现伪“push” 有了大概得了解;客户端通过一个定时任务不断向 Broker 发请求,Broker 在没有消息时先 hold 住一小段时间,当有新的消息时会立即将消息返回给 consumer;本节就主要探讨 consumer 在收到消息之后的处理逻辑,以及是怎么触发 MessageListener 回调执行的。

客户端发起请求的底层逻辑

以异步调用为例,代码在

org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync中,截取部分代码如下:

this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
    @Override
    public void operationComplete(ResponseFuture responseFuture) {
        RemotingCommand response = responseFuture.getResponseCommand();
        if (response != null) {
            try {
                PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
                assert pullResult != null;
                // 成功回调
                pullCallback.onSuccess(pullResult);
            } catch (Exception e) {
                // 异常回调
                pullCallback.onException(e);
            }
        } else {
            if (!responseFuture.isSendRequestOK()) {
                 // 异常回调
                pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
            } else if (responseFuture.isTimeout()) {
                 // 异常回调
                pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
                    responseFuture.getCause()));
            } else {
                 // 异常回调
                pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
            }
        }
    }
});

PullCallback 回调

PullCallback 回调逻辑在 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage方法中,以正常返回消息为例:

// 已省略无关代码
public void onSuccess(PullResult pullResult) {
    // 将接收到的消息 交给 consumeMessageService 处理
    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
        pullResult.getMsgFoundList(),
        processQueue,
        pullRequest.getMessageQueue(),
        dispatchToConsume);
    // 将 pullRequest 放回 pullRequestQueue
 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}

ConsumeRequest 是一个 Runnable,submitConsumeRequest 就是将返回结果丢在一个单独的线程池中去处理返回结果的。ConsumeRequest 的 run 方法中,会拿到 messageListener,然后执行 consumeMessage 方法。

总结

到此,关于 RocketMQ push 消费模型基本就探讨完了。从实现机制上来看,push 本质上并不是在建立双向通道的前提下,由 Server 主动推送给 Client 的,而是由 Client 端触发 pullRequest 请求,以长轮询的方式“伪装”的结果。从代码上来,RocketMQ 代码中使用了非常多的异步机制,如 pullRequestQueue 来解耦发送请求和等待结果,各种定时任务等等。

整体看,PushConsumer 采用了 长轮询+超时时间+Pull的模式, 这种方式带来的好处总结如下

  • 1、减少 Broker 的压力,避免由于不同 Consumer 消费能力导致 Broker 出现问题
  • 2、确保了 Consumer 不会负载过高,Consumer 在校验自己的缓存消息没有超过阈值才会去从 Broker 拉取消息,Broker 不会主动推过来
  • 3、兼顾了消息的即时性,Broker 在没有消息的时候会先 hold 一小段时间,有消息会立即唤起线程将消息返回给 Consumer
  • 4、Broker 端无效请求的次数大大降低,Broker 在没有消息时会挂起 PullRequest,而 Consumer 在未接收到Response 且未超时时,也不会重新发起 PullRequest

以上就是RocketMQ Push 消费模型示例详解的详细内容,更多关于RocketMQ Push 消费模型的资料请关注我们其它相关文章!

(0)

相关推荐

  • RocketMQ的push消费方式实现示例

    目录 引言 MQ消费方式 1.push(推方式) 2.pull(拉方式) RocketMQ对于消费方式的实现 RocketMQ聪明地实现push的原因 轮询与长轮询 轮询 长轮询 push消费方式源码探究 1.消费者拉取消息控制压力源码 2.MQ将请求hold住源码 3.MQ收到消息响应给消费者的源码 最后 引言 最近仍然畅游在RocketMQ的源码中,这几天刚好翻到了消费者的源码,发现RocketMQ的对于push消费方式的实现简直太聪明了,所以趁着我脑子里还有点印象的时候,赶紧来写一篇文章,

  • RocketMQ普通消息实战演练详解

    目录 引言 普通消息同步发送 普通消息异步发送 普通消息单向发送 集群消费模式 广播消费模式 引言 之前研究了RocketMQ的源码,在这里将各种消息发送与消费的demo进行举例,方便以后使用的时候CV. 相关的配置,安装和启动在这篇文章有相关讲解  https://www.jb51.net/article/260237.htm 普通消息同步发送 同步消息是指发送出消息后,同步等待,直到接收到Broker发送成功的响应才会继续发送下一个消息.这个方式可以确保消息发送到Broker成功,一些重要的

  • java开发RocketMQ生产者高可用示例详解

    目录 引言 1 消息 1.1 topic 1.2 Body 1.3 tag 1.4 key 1.5 延迟级别 2 生产者高可用 2.1 客户端保证生产者高可用 2.1.1 重试机制 2.1.2 客户端容错 2.2 Broker端保证生产者高可用 引言 前边两章说了点基础的,从这章开始,我们挖挖源码.看看RocketMQ是怎么工作的. 首先呢,这个生产者就是送孩子去码头的家长,孩子们呢,就是消息了. 我们看看消息孩子们都长啥样. 1 消息 public class Message implemen

  • rocketmq消费负载均衡--push消费详解

    前言 本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析,共分为6个部分进行介绍,其中第6个部分 rebalanceByTopic 为负载均衡的核心逻辑模块,具体过程运用了图文进行阐述. 介绍之前首先抛出几个问题: 1. 要做负载均衡,首先要解决的一个问题是什么? 2. 负载均衡是Client端处理还是Broker端处理? 个人理解: 1. 要做负载均衡

  • RocketMQ源码解析topic创建机制详解

    目录 1. RocketMQ Topic创建机制 2. 自动Topic 3. 手动创建--预先创建 通过界面控制台创建 1. RocketMQ Topic创建机制 以下源码基于Rocket MQ 4.7.0 RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建.可以通过设置broker的配置文件来禁用或者允许自动创建.默认是开启的允许自动创建 autoCreateTopicEnable=true/false 下面会结合源码来深度分析一下自动创建和手动创建的过程. 2. 自动T

  • RocketMQ消息发送流程源码剖析

    目录 正文 读源码 1 调用defaultMQProducerImpl.send() 2 设置过期时间 3 执行defaultMQProducerImpl.sendDefaultImpl()方法 sendDefaultImpl是发送消息的核心方法. 1 两个校验 2 获取topic路由信息 3 计算重试次数 4 执行队列选择方法 5 发送消息 正文 就是说,我们打了个比方,把RocketMQ比作码头上的一个小房子,来送孩子登船的家长比作生产者,拉走孩子们的船夫比作消费者,所以,RocketMQ的

  • RocketMQ Push 消费模型示例详解

    目录 使用 DefaultMQPushConsumer 消费消息 基于长轮询机制的伪 push 实现 客户端侧发起的长轮询请求 服务端阻塞请求 客户端回调处理 客户端发起请求的底层逻辑 PullCallback 回调 总结 Push 模式是指由 Server 端来控制消息的推送,即当有消息到 Server 之后,会将消息主动投递给 client(Consumer 端). 使用 DefaultMQPushConsumer 消费消息 下面是使用 DefaultMQPushConsumer 消费消息的

  • 20行代码简单实现koa洋葱圈模型示例详解

    目录 引言 koa中间件的使用 洋葱圈模型 洋葱圈模型的实现,koa-compose 单次调用限制 koa-compose与流程引擎 总结 引言 koa想必很多人直接或间接的都用过,其源码不知道阅读本文的你有没有看过,相当精炼,本文想具体说说koa的中间件模型,一起看看koa-compose的源码,这也是koa系列的第一篇文章,后续会更新一下koa相关的其他知识点 koa中间件的使用 先让我们启动一个koa服务 // app.js const koa = require('koa'); cons

  • Redis中事件驱动模型示例详解

    前言 Redis 是一个事件驱动的内存数据库,服务器需要处理两种类型的事件. 文件事件 时间事件 下面就会介绍这两种事件的实现原理. 文件事件 Redis 服务器通过 socket 实现与客户端(或其他redis服务器)的交互,文件事件就是服务器对 socket 操作的抽象. Redis 服务器,通过监听这些 socket 产生的文件事件并处理这些事件,实现对客户端调用的响应. Reactor Redis 基于 Reactor 模式开发了自己的事件处理器. 这里就先展开讲一讲 Reactor 模

  • RocketMQ实现随缘分BUG小功能示例详解

    目录 正文 实现过程 生产者: 正文 以前公司的产品已经上线20多年了,主要是维护,也就是改bug.每周我们Team会从Jira上拿我们可以改的bug,因为每个团队负责的业务范围不一样,我们团队只能改我们自己业务范围的.这样每周大概有20个左右的新bug,假如团队一共10个人,那么均分就是每人两个,改完下班. 但是这BUG肯定有难有简单,大家肯定都愿意改简单的,在家办公,任务量完了不就等于放假么.开始是自己给自己抢,就忒卷,是欧美项目,所以客服晚上报出来的bug多.有的哥们早上5点起来看有没有新

  • Django中的模型类设计及展示示例详解

    django中设计数据模型类是基于ORM的对象关系映射更方便的进行数据库中的数据操作. 对象关系映射 把面向对象中的类和数据库表--对应,通过操作类和对象,对数表实现数据操作,不需要写sql,由ORM框架生成 django实现了ORM框架,在项目中与数据库之间产生桥梁作用 django数据库定义模型的步骤如下: python manage.py makemigrations python mange.py migrate 在应用models.py中编写模型类,继承models.Model类 在模

  • RocketMQ消息生产者是如何选择Broker示例详解

    目录 前言 从NameServer查询Topic信息 如何选择Broker 小结 前言 在RocketMQ中为,我们创建消息生产者时,只需要设置NameServer地址,消息就能正确地发送到对应的Broker中,那么RocketMQ消息生产者是如何找到Broker的呢?如果有多个Broker实例,那么消息发送是如何选择发送到哪个Broker的呢? 从NameServer查询Topic信息 通过Debug消息发送send()方法,我们最终可以定位到DefaultMQProducerImpl.sen

  • java开发Dubbo负载均衡与集群容错示例详解

    目录 负载均衡与集群容错 Invoker 服务目录 RegistryDirectory 获取Invoker列表 监听注册中心 刷新Invoker列表 StaticDirectory 服务路由 Cluster FailoverClusterInvoker FailfastClusterInvoker FailsafeClusterInvoker FailbackClusterInvoker ForkingClusterInvoker BroadcastClusterInvoker Abstract

  • Java实现无向图的示例详解

    目录 基本概念 图的定义 无向图的定义 无向图的 API 无向图的实现方式 邻接矩阵 边的数组 邻接表数组 无向图的遍历 深度优先搜索 广度优先搜索 后记 基本概念 图的定义 一个图是由点集V={vi} 和 VV 中元素的无序对的一个集合E={ek} 所构成的二元组,记为G=(V,E),V中的元素vi叫做顶点,E中的元素 ek叫做边. 对于V中的两个点 u,v,如果边(u,v) 属于E,则称 u,v两点相邻,u,v称为边(u,v)的端点. 我们可以用m(G)=|E| 表示图G中的边数,用n(G)

  • vue+three.js实现炫酷的3D登陆页面示例详解

    目录 前言: Three.js的基础知识 关于场景 关于光源 关于相机(重要) 关于渲染器 完善效果 创建一个左上角的地球 使地球自转 创建星星 使星星运动 创建云以及运动轨迹 使云运动 完成three.js有关效果 结语 前言: 大家好,我是xx传媒严导(xx这两个字请自行脑补) . 该篇文章用到的主要技术:vue3.three.js 我们先看看成品效果: 高清大图预览(会有些慢): 座机小图预览: 废话不多说,直接进入正题 Three.js的基础知识 想象一下,在一个虚拟的3D世界中都需要什

随机推荐