RocketMQ源码解析topic创建机制详解

目录
  • 1. RocketMQ Topic创建机制
  • 2. 自动Topic
  • 3. 手动创建--预先创建
    • 通过界面控制台创建

1. RocketMQ Topic创建机制

以下源码基于Rocket MQ 4.7.0

RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建。可以通过设置broker的配置文件来禁用或者允许自动创建。默认是开启的允许自动创建

autoCreateTopicEnable=true/false

下面会结合源码来深度分析一下自动创建和手动创建的过程。

2. 自动Topic

默认情况下,topic不用手动创建,当producer进行消息发送时,会从nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么会默认拉取broker启动时默认创建好名为“TBW102”的Topic,这定义在org.apache.rocketmq.common.MixAll类中

// Will be created at broker when isAutoCreateTopicEnable
 public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";

自动创建开关是下BrokerConfig类中有一个私有变量:

@ImportantField
private boolean autoCreateTopicEnable = true;

这变量可以通过配置文件配置来进行修改,代码中的默认值为true,所以在默认的情况下Rocket MQ是会自动创建Topic的。

在Broker启动,会调用TopicConfigManager的构造方法,在构造方法中定义了一系列RocketMQ系统内置的一些系统Topic(这里只关注一下TBW102):

{
    // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
    if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
        String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
        TopicConfig topicConfig = new TopicConfig(topic);
        this.systemTopicList.add(topic);
        topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
            .getDefaultTopicQueueNums()); //8
        topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
            .getDefaultTopicQueueNums()); //8
        int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
        topicConfig.setPerm(perm);
        this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
    }
}

这里有 this.brokerController.getBrokerConfig().isAutoCreateTopicEnable() 这样一段代码,在开启允许自动创建的时候,会把当前Topic的信息存入topicConfigTable变量中。

然后通过发送定期发送心跳包把Topic和Broker的信息发送到NameServer的RouteInfoManager中进行保存。在BrokerController中定义了这样的一个定时任务来执行这个心跳包的发送:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

这里就说明了如何把每个Broker的系统自定义的Topic注册到NameServer。

接下来看在发送过程中如何从NameServer获取Topic的路由信息: DefaultMQProducerImpl.sendDefaultImpl

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //省略代码
        //获取路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    }

通过DefaultMQProducerImpl.tryToFindTopicPublishInfo方法获取Topic的路由信息。

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        //第一次从缓存中获取--肯定没有因为还没创建
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //从NameServer获取--也是没有,因为没有创建
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            //第二次从这里获取
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

下面来看一下 MQClientInstance.updateTopicRouteInfoFromNameServer 的方法:

 public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
    //省略代码
    if (isDefault && defaultMQProducer != null) {
            //使用默认的TBW102 Topic获取数据
            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                if (topicRouteData != null) {
                    for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        //这是正常的
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
      //省略代码
    }

如果isDefault=true并且defaultMQProducer不为空,从nameserver中获取默认路由信息,此时会获取所有已开启自动创建开关的broker的默认“TBW102”topic路由信息,并保存默认的topic消息队列数量。

这里会比较一下配在在 DefaultMQProducer.defaultTopicQueueNums中的默认值和TBW102中的值哪个更小。

 if (topicRouteData != null) {
        TopicRouteData old = this.topicRouteTable.get(topic);
        boolean changed = topicRouteDataIsChange(old, topicRouteData);
        if (!changed) {
            changed = this.isNeedUpdateTopicRouteInfo(topic);
        } else {
            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
        }
 }

判断获取默认的是否存在,如果存在把当前的Topic的信息更新。

也就是把TBW102 Topic的数据更新为自动创建的数据。

 if (changed) {
    TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
        this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
    }
    // Update Pub info
    {
        TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
        publishInfo.setHaveTopicRouterInfo(true);
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQProducerInner> entry = it.next();
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                impl.updateTopicPublishInfo(topic, publishInfo);
            }
        }
    }
    	// Update sub info
    {
        Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                impl.updateTopicSubscribeInfo(topic, subscribeInfo);
            }
        }
    }
    log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
    this.topicRouteTable.put(topic, cloneTopicRouteData);
    return true;
}

更新本地的缓存。这样TBW102 Topic的负载和一些默认的路由信息就会被自己创建的Topic使用。这里就是整个自动创建的过程.

总结一下就是:通过使用系统内部的一个TBW102的Topic的配置来自动创建当前用户的要创建的自定义Topic。

3. 手动创建--预先创建

手动创建也叫预先创建,就是在使用Topic之前就创建,可以通过命令行或者通过RocketMQ的管理界面创建Topic。

通过界面控制台创建

项目地址: github.com/apache/rock…

TopicController主要负责Topic的管理

@RequestMapping(value = "/createOrUpdate.do", method = { RequestMethod.POST})
@ResponseBody
public Object topicCreateOrUpdateRequest(@RequestBody TopicConfigInfo topicCreateOrUpdateRequest) {
    Preconditions.checkArgument(CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getBrokerNameList()) || CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getClusterNameList()),
            "clusterName or brokerName can not be all blank");
    logger.info("op=look topicCreateOrUpdateRequest={}", JsonUtil.obj2String(topicCreateOrUpdateRequest));
    topicService.createOrUpdate(topicCreateOrUpdateRequest);
    return true;
}

然后通过MQAdminExtImpl.createAndUpdateTopicConfig方法来创建:

    @Override
    public void createAndUpdateTopicConfig(String addr, TopicConfig config)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
    }

通过调用DefaultMQAdminExtImpl.createAndUpdateTopicConfig创建Topic

@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
        InterruptedException, MQClientException {
    this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}

最后通过MQClientAPIImpl.createTopic创建Topic

    public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
        final long timeoutMillis)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
        requestHeader.setTopic(topicConfig.getTopicName());
        requestHeader.setDefaultTopic(defaultTopic);
        requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
        requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
        requestHeader.setPerm(topicConfig.getPerm());
        requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
        requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
        requestHeader.setOrder(topicConfig.isOrder());
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break;
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

以上就是RocketMQ源码解析topic创建机制详解的详细内容,更多关于RocketMQ topic创建的资料请关注我们其它相关文章!

(0)

相关推荐

  • java开发线上事故理解RocketMQ异步精髓

    目录 引言 1 业务场景 2 线程池模式 3 本地内存 + 定时任务 4 MQ 模式 5 Agent 服务 + MQ 模式 6 总结 第一层:什么场景下需要异步 第二层:异步的外功心法 第三层:异步的本质 引言 在高并发的场景下,异步是一个极其重要的优化方向. 前段时间,生产环境发生一次事故,笔者认为事故的场景非常具备典型性 . 写这篇文章,笔者想和大家深入探讨该场景的架构优化方案.希望大家读完之后,可以对异步有更深刻的理解. 1 业务场景 老师登录教研平台,会看到课程列表,点击课程后,课程会以

  • RocketMQ之NameServer架构设计及启动关闭流程源码分析

    目录 NameServer 1.架构设计 2.核心类与配置 NamesrvController NamesrvConfig NettyServerConfig RouteInfoManager 3.启动与关闭流程 3.1.步骤一 3.2.步骤二 3.3.步骤三 NameServer 1.架构设计 消息中间件的设计思路一般都是基于主题订阅与发布的机制,RocketMQ也不例外.RocketMQ中,消息生产者(Producer)发送某主题的消息到消息服务器,消息服务器对消息进行持久化存储,而消息消费

  • SpringBoot整合RocketMQ的方法详解

    目录 一:Ubuntu安装RocketMQ 二:添加RocketMQ依赖 三:在application中添加RocketMQ配置 四:编写消费者,消息生产者,消息实体类(自定义) 五:测试Controller 一:Ubuntu安装RocketMQ 1.下载(在下面地址选择自己需要的版本的rocketmq) http://rocketmq.apache.org/release_notes/ 2.解压,更改配置 将下载的zip文件解压到自己需要安装的位置 在unbuntu系统下需要修改安装跟目录下的

  • RocketMQ的push消费方式实现示例

    目录 引言 MQ消费方式 1.push(推方式) 2.pull(拉方式) RocketMQ对于消费方式的实现 RocketMQ聪明地实现push的原因 轮询与长轮询 轮询 长轮询 push消费方式源码探究 1.消费者拉取消息控制压力源码 2.MQ将请求hold住源码 3.MQ收到消息响应给消费者的源码 最后 引言 最近仍然畅游在RocketMQ的源码中,这几天刚好翻到了消费者的源码,发现RocketMQ的对于push消费方式的实现简直太聪明了,所以趁着我脑子里还有点印象的时候,赶紧来写一篇文章,

  • RocketMQ NameServer 核心源码解析

    目录 带着问题 往下看 (namesrv) nameserver 启动的逻辑 nameserver 功能 nameserver 问题解答 我们在写组件的时候 怎么管理version 遍历 Field[] KVConfigManager 有什么作用 KVConfigManager 持久化 broker 是不是master判断 @ImportantField broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢? 总结 带着问题 往下看 (namesrv) 我们在写组件的时候 怎么

  • java开发RocketMQ之NameServer路由管理源码分析

    目录 1.前言 2.路由元信息 3.路由注册 3.1Broker路由注册 3.2NameServer处理路由注册 3.3路由删除 3.3.1Broker异常关闭 3.3.2Broker正常关闭 3.4路由发现 3.5总结 1.前言 NameServer主要作用是为消息消费者和消息生产者提供关于主题Topic的路由信息,那么NameServer需要存储路由的基本信息,还要管理Broker节点,包括路由注册.路由删除等. 2.路由元信息 路由元信息主要由RouteInfoManager来进行管理,这

  • RocketMQ源码解析topic创建机制详解

    目录 1. RocketMQ Topic创建机制 2. 自动Topic 3. 手动创建--预先创建 通过界面控制台创建 1. RocketMQ Topic创建机制 以下源码基于Rocket MQ 4.7.0 RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建.可以通过设置broker的配置文件来禁用或者允许自动创建.默认是开启的允许自动创建 autoCreateTopicEnable=true/false 下面会结合源码来深度分析一下自动创建和手动创建的过程. 2. 自动T

  • Android源码解析之属性动画详解

    前言 大家在日常开发中离不开动画,属性动画更为强大,我们不仅要知道如何使用,更要知道他的原理.这样,才能得心应手.那么,今天,就从最简单的来说,了解下属性动画的原理. ObjectAnimator .ofInt(mView,"width",100,500) .setDuration(1000) .start(); ObjectAnimator#ofInt 以这个为例,代码如下. public static ObjectAnimator ofInt(Object target, Stri

  • Netty源码解析NioEventLoop创建的构造方法

    目录 前文传送门:Netty源码分析 NioEventLoop 回到上一小节的MultithreadEventExecutorGroup类的构造方法: protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //代码省略 if (executor == null) { //创建一个新的

  • php源码 fsockopen获取网页内容实例详解

    PHP fsockopen函数说明: Open Internet or Unix domain socket connection(打开套接字链接) Initiates a socket connection to the resource specified by target . fsockopen() returns a file pointer which may be used together with the other file functions (such as fgets(

  • HashMap源码中的位运算符&详解

    引言 最近在读HashMap源码的时候,发现在很多运算符替代常规运算符的现象.比如说用hash & (table.length-1) 来替代取模运算hash&(table.length):用if((e.hash & oldCap) == 0)判断扩容后元素的位置等等. 1.取模运算符%底层原理 ​总所周知,位运算&直接对二进制进行运算:而对于取模运算符%:a % b 相当于 a - a / b * b,底层实际上是除法器,究其根源也是由底层的减法和加法共同完成.所以其运行效

  • Spring-boot 2.3.x源码基于Gradle编译过程详解

    spring Boot源码编译 1. git上下拉最新版的spring Boot 下载:git clone git@github.com:spring-projects/spring-boot.git,建议下载release版本,不会出现奇奇怪怪的错误 2.修改下载源, gradle\wrapper中的配置文件 gradle-wrapper.properties distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists #d

  • vue源码解析之事件机制原理

    上一章没什么经验.直接写了组件机制.感觉涉及到的东西非常的多,不是很方便讲.今天看了下vue的关于事件的机制.有一些些体会.写出来.大家一起纠正,分享.源码都是基于最新的Vue.js v2.3.0.下面我们来看看vue中的事件机制: 老样子还是先上一段贯穿全局的代码,常见的事件机制demo都会包含在这段代码中: <div id="app"> <div id="test1" @click="click1">click1<

  • Android源码中的目录结构详解

    Android 2.1 |-- Makefile |-- bionic                        (bionic C库) |-- bootable                (启动引导相关代码) |-- build                        (存放系统编译规则及generic等基础开发包配置) |-- cts                        (Android兼容性测试套件标准) |-- dalvik                    

  • 基于Spring Boot的Environment源码理解实现分散配置详解

    前提 org.springframework.core.env.Environment是当前应用运行环境的公开接口,主要包括应用程序运行环境的两个关键方面:配置文件(profiles)和属性.Environment继承自接口PropertyResolver,而PropertyResolver提供了属性访问的相关方法.这篇文章从源码的角度分析Environment的存储容器和加载流程,然后基于源码的理解给出一个生产级别的扩展. 本文较长,请用一个舒服的姿势阅读. Environment类体系 Pr

  • Vue.js源码分析之自定义指令详解

    前言 除了核心功能默认内置的指令 (v-model 和 v-show),Vue 也允许注册自定义指令. 官网介绍的比较抽象,显得很高大上,我个人对自定义指令的理解是:当自定义指令作用在一些DOM元素或组件上时,该元素在初次渲染.插入到父节点.更新.解绑时可以执行一些特定的操作(钩子函数() 自定义指令有两种注册方式,一种是全局注册,使用Vue.directive(指令名,配置参数)注册,注册之后所有的Vue实例都可以使用,另一种是局部注册,在创建Vue实例时通过directives属性创建局部指

随机推荐