RocketMQ消息发送流程源码剖析

目录
  • 正文
  • 读源码
    • 1 调用defaultMQProducerImpl.send()
    • 2 设置过期时间
    • 3 执行defaultMQProducerImpl.sendDefaultImpl()方法
  • sendDefaultImpl是发送消息的核心方法。
    • 1 两个校验
    • 2 获取topic路由信息
    • 3 计算重试次数
    • 4 执行队列选择方法
    • 5 发送消息

正文

就是说,我们打了个比方,把RocketMQ比作码头上的一个小房子,来送孩子登船的家长比作生产者,拉走孩子们的船夫比作消费者,所以,RocketMQ的故事就这么展开了。

这节我们研究研究,消息的发送流程。也就是说,消息孩子从进门到坐到message queue座位上都经历了啥。

父母把消息孩子送到码头之后,门口的门童defaultMQProducerImpl.send()接过孩子,进入到MQ房子内部,然后引导孩子进入Broker候船大厅内的message queue座位上就坐。这就是消息发送的流程了。

而且孩子在刚被门童接到之后,就被规定了能在候船大厅待多久,默认是3秒。也就是说,要是再小房子内等了三秒没走,就离开吧,你怕是没想明白自己来干啥的。这就是消息的超时时间。

读源码

1 调用defaultMQProducerImpl.send()

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

2 设置过期时间

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

3 执行defaultMQProducerImpl.sendDefaultImpl()方法

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}

这里看看这几个参数,

  • communicationMode 是通信模式,同步异步还是单向
  • sendCallback 是针对异步模式的,异步模式需要设置发送完成后的回调。

sendDefaultImpl是发送消息的核心方法。

这里消息孩子进到第一个卡口,先要检查送孩子来的家长是否还能联系上,若是能联系到,就继续。要是联系不到,这孩子岂不是被抛弃了,不敢接不敢接,送到孤儿院吧。

然后需要检查消息孩子了,首先是检查孩子还在不在,别扔个衣服跑了。
然后看看孩子指定的这个topic,不能说我想去内个topic哈,必须是实实在在的名字。而且上头也规定了,这个topic的名字也不能太长,也不能包含特殊字符。已有的一些领导定过的也不能用哈。
接下来就是检查孩子的body了,之前说body就是孩子的技能,首先,技能为空,不行不行,啥都不会是不行的。再者太长也不行,你唱首歌两年,这没法玩。

检查message不为null

检查topic

  • topic不能为空
  • topic不能太长
  • 不能包含特殊字符

检查话题的名字是否被系统已占用

检查body

  • 检查是否为空
  • 检查长度是否过长,最大为4MB 这样

下边我们看看sendDefaultImpl这个方法。给他拆成一段一段的看。

1 两个校验

//校验生产者服务是ok的,可以联系到的
this.makeSureStateOK();
//校验消息的参数
Validators.checkMessage(msg, this.defaultMQProducer);
  • 第一个检查,检查生产者服务是否是正常工作的,若是不正常工作,就抛出异常。
private void makeSureStateOK() throws MQClientException {
    if (this.serviceState != ServiceState.RUNNING) {
        throw new MQClientException("The producer service state not OK, "
            + this.serviceState
            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
            null);
    }
}
  • 第二个检查,检查消息本身是否为空,检查topic,检查消息的body
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // 这里校验Topic的时候,校验了不能为空,长度和特殊字符
    Validators.checkTopic(msg.getTopic());
    //这里则校验了一些不允许使用的topic名字
    Validators.isNotAllowedSendTopic(msg.getTopic());
    // body不为空
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }
    // body长度不为0
    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }
    // body 长度不能过长
    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}

2 获取topic路由信息

嗯,这里孩子终于通过了检查,服务人员开始带着他去找自己指定的topic区域,指定是自己指定,划分还是工作人员划分的。咱总得知道这个topic区域在哪吧。

先去缓存笔记里找,有没有这个区域的信息,若是没有这个topic,就新建一个,然后更新到缓存笔记里边。若有topic但是不知道在哪,就找name server大脑去申请这个topic在哪的信息。

执行tryToFindTopicPublishInfo方法去获取Topic的路由信息,若是不存在就新建,若是有topic但是缓存中没有路由信息,则通过name server获取路由信息。

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //获取topic信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    //不存在
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        //新建
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        //修改topic的路由信息并更新到本地
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    //包含路由信息就直接返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        //不包含路由信息则向name server申请,修改topic的路由信息并更新到本地
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

3 计算重试次数

这就是计算消息孩子可以尝试去找地方坐几次,没坐上,欸,我又来了,没坐上,欸,我又来了。

这行代码就是计算重试次数的,根据communicationMode传入的值,同步异步还是单向的来决定重试次数是几次。 很明显,若是同步的,就会尝试三次。若是异步的或者单向的就只发送一次。

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

4 执行队列选择方法

我们之前说了,Broker类似于候船大厅,为了均分压力,每次都要进与上次不同的候船大厅。

执行selectOneMessageQueue方法通过Queue将消息发送到与上次不同的一个Broker。也可以通过 sendLatencyFaultEnable判断是否启用延迟容错开关

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

5 发送消息

这就是走过巷道坐到属于自己的座位上了

然后就通过sendKernelImpl发送消息了,这是发送消息的核心方法。会准备通信层的入参,并将请求发送给通信层,内部实现是基于Netty的。

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

以上就是RocketMQ消息发送流程源码剖析的详细内容,更多关于RocketMQ消息发送流程的资料请关注我们其它相关文章!

(0)

相关推荐

  • RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    目录 消息队列常见问题处理 分布式事务 什么是分布式事务 常见的分布式事务解决方案 基于MQ实现的分布式事务 本地消息表-最终一致性 MQ事务-最终一致性 RocketMQ中如何处理事务 Kafka中如何处理事务 RabbitMQ中的事务 消息防丢失 生产阶段防止消息丢失 RabbitMQ中的防丢失措施 Kafka中的防丢失措施 RocketMQ中的防丢失措施使用SYNC的发送消息方式,等待broker处理结果 存储阶段 RabbitMQ中的防丢失措施 Kafka中的防丢失措施 RocketMQ

  • Springboot详解RocketMQ实现消息发送与接收流程

    springboot+rockermq 实现简单的消息发送与接收 普通消息的发送方式有3种:单向发送.同步发送和异步发送. 下面来介绍下 springboot+rockermq 整合实现 普通消息的发送与接收 创建Springboot项目,添加rockermq 依赖 <!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-

  • SpringBoot整合RocketMQ实现消息发送和接收的详细步骤

    我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷: 最终项目结构如下: 具体步骤如下: 第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <vers

  • RocketMq事务消息发送代码流程详解

    一.RocketMq事务消息流程: 1.首先会向broker发送一个预请求消息,消费者不可见 2.回调执行本地事务(比如操作数据库) 3.事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见.如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功. 二.RocketMq事务消息实例: 1.引入rocketMq相关的依赖: <dependency> <groupId>org.apache.ro

  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

  • Springboot 整合 RocketMQ 收发消息

    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

  • RocketMQ消息发送流程源码剖析

    目录 正文 读源码 1 调用defaultMQProducerImpl.send() 2 设置过期时间 3 执行defaultMQProducerImpl.sendDefaultImpl()方法 sendDefaultImpl是发送消息的核心方法. 1 两个校验 2 获取topic路由信息 3 计算重试次数 4 执行队列选择方法 5 发送消息 正文 就是说,我们打了个比方,把RocketMQ比作码头上的一个小房子,来送孩子登船的家长比作生产者,拉走孩子们的船夫比作消费者,所以,RocketMQ的

  • RocketMQ之NameServer架构设计及启动关闭流程源码分析

    目录 NameServer 1.架构设计 2.核心类与配置 NamesrvController NamesrvConfig NettyServerConfig RouteInfoManager 3.启动与关闭流程 3.1.步骤一 3.2.步骤二 3.3.步骤三 NameServer 1.架构设计 消息中间件的设计思路一般都是基于主题订阅与发布的机制,RocketMQ也不例外.RocketMQ中,消息生产者(Producer)发送某主题的消息到消息服务器,消息服务器对消息进行持久化存储,而消息消费

  • SpringSecurity 默认表单登录页展示流程源码

    SpringSecurity 默认表单登录页展示流程源码 本篇主要讲解 SpringSecurity提供的默认表单登录页 它是如何展示的的流程, 涉及 1.FilterSecurityInterceptor, 2.ExceptionTranslationFilc,xmccmc,ter , 3.DefaultLoginPageGeneratingFilter 过滤器, 并且简单介绍了 AccessDecisionManager 投票机制  1.准备工作(体验SpringSecurity默认表单认证

  • 浅析Spring Security登录验证流程源码

    一.登录认证基于过滤器链 Spring Security的登录验证流程核心就是过滤器链.当一个请求到达时按照过滤器链的顺序依次进行处理,通过所有过滤器链的验证,就可以访问API接口了. SpringSecurity提供了多种登录认证的方式,由多种Filter过滤器来实现,比如: BasicAuthenticationFilter实现的是HttpBasic模式的登录认证 UsernamePasswordAuthenticationFilter实现用户名密码的登录认证 RememberMeAuthe

  • 详解Android布局加载流程源码

    一.首先看布局层次 看这么几张图 我们会发现DecorView里面包裹的内容可能会随着不同的情况而变化,但是在Decor之前的层次关系都是固定的.即Activity包裹PhoneWindow,PhoneWindow包裹DecorView.接下来我们首先看一下三者分别是如何创建的. 二.Activity是如何创建的 首先看到入口类ActivityThread的performLaunchActivity方法: private Activity performLaunchActivity(Activi

  • Spring Security 实现用户名密码登录流程源码详解

    目录 引言 探究 登录流程 校验 用户信息保存 引言 你在服务端的安全管理使用了 Spring Security,用户登录成功之后,Spring Security 帮你把用户信息保存在 Session 里,但是具体保存在哪里,要是不深究你可能就不知道, 这带来了一个问题,如果用户在前端操作修改了当前用户信息,在不重新登录的情况下,如何获取到最新的用户信息? 探究 无处不在的 Authentication 玩过 Spring Security 的小伙伴都知道,在 Spring Security 中

  • SpringMVC请求流程源码解析

    目录 一.SpringMVC使用 1.工程创建 2.工程配置 3.启动工程 二.SpringMVC启动过程 1.父容器启动过程 2.子容器启动过程(SpringMvc容器) 3.九大组件的初始化 1.处理器映射器的初始化 2.处理器适配器的初始化 4.拦截器的初始化 三.SpringMVC请求过程 1.请求流程图 2.业务描述 一.SpringMVC使用 1.工程创建 创建maven工程. 添加java.resources目录. 引入Spring-webmvc 依赖. <dependency>

  • Dubbo3的Spring适配原理与初始化流程源码解析

    目录 引言 Spring Context Initialization FactoryBean BeanDefinition 初始化bean 解决依赖 解决属性 Dubbo Spring的一些问题及解决办法 Dubbo spring 2.7 初始化过程 Dubbo spring 3的初始化过程 属性占位符解决失败 ReferenceBean被过早初始化问题 Reference注解可能出现@Autowire注入失败的问题 引言 Dubbo 国内影响力最大的开源框架之一,非常适合构建大规模微服务集群

  • Kubernetes kubectl中Pod创建流程源码解析

    目录 确立目标 先写一个Pod的Yaml 部署Pod 查询Pod kubectl create 的调用逻辑 Main Match Command Create RunCreate Summary 确立目标 从创建pod的全流程入手,了解各组件的工作内容,组件主要包括以下 kubectl kube-apiserver kube-scheduler kube-controller kubelet 理解各个组件之间的相互协作,目前是kubectl 先写一个Pod的Yaml apiVersion: v1

  • Spring Transaction事务实现流程源码解析

    目录 一.基于xml形式开启Transaction 1. 创建数据库user 2. 创建一个maven 项目 3. 通过xml形式配置事务 1) 创建Spring命名空间 2) 开启事务配置 3) 创建UserService类 4. 测试事务 1) 抛出RuntimeException 2) 注释掉RuntimeException 二.事务开启入口TxNamespaceHandler AnnotationDrivenBeanDefinitionParser 三.AOP驱动事务 Transacti

随机推荐