RocketMQ NameServer 核心源码解析

目录
  • 带着问题 往下看 (namesrv)
  • nameserver 启动的逻辑
    • nameserver 功能
  • nameserver 问题解答
    • 我们在写组件的时候 怎么管理version
  • 遍历 Field[]
  • KVConfigManager 有什么作用
  • KVConfigManager 持久化
  • broker 是不是master判断
  • @ImportantField
  • broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢?
  • 总结

带着问题 往下看 (namesrv)

  • 我们在写组件的时候 怎么管理version
  • 如果现在让你 维护一个 各个jar包公用的属性
  • System.exit(-1); 0 -1 -2 各种数都是干什么的,什么时候 用哪个
  • 环境变量如果不想使用 ROCKETMQ_HOME, 想变为 xxx 这怎么做,能做么?
  • 我们启动broker 老是用 -n ip:9876 9876是什么,我们可以改变么?怎么改
  • 大家如果想 把命令启动带着的 -c -p等参数放到 我们的属性中,怎么写代码?
  • 如果我们想 自己设置使用的log 组件,怎么办
  • 遍历 Field[] 的时候 想跳过 static的属性 怎么写代码?
  • 多个对象的 属性需要进行聚合到一个对象中,要是你 怎么写
  • KVConfigManager 有什么作用,怎么保证的 并发操作的数据正确性?你感觉有什么问题么?
  • KVConfigManager 怎么保证的 持久化?
  • 怎么在 并发操作的时候 保证数据的安全性?
  • 方法的参数 使用final 有什么用?
  • 怎么判断的broker 是不是master
  • netty 怎么让nameserver 通知broker 信息的。
  • nameserver 是否存活的判断标准是什么? 能修改么? 怎么修改
  • Runtime.getRuntime().addShutdownHook 有什么用,没有不行么?
  • @ImportantField 干什么的? 什么时候 使用
  • 在同一台计算机上部署多个代理时 想区分日志路径 用哪个参数,调成什么?
  • broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢?

请思考下 写写你的答案 再往下看

nameserver 启动的逻辑

nameserver 功能

  • 管理broker 集群
  • 属于注册中心 业务端 和nameserver 进行连接,获取broker地址
  • 负责维护broker 连接/心跳/监控

nameserver 问题解答

我们在写组件的时候 怎么管理version

一方面是 在父类的 pom.xml 通过 进行 控制版本,然后 业务端通过

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.xxx</groupId>
            <artifactId>xxx</artifactId>
            <version>4.0.0-SNAPSHOT</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
     </dependencies>
</dependencyManagement>

这是第一个 ,第二个 是 rocketmq 这种 在common 包下面 新建一个 MQVersion 管理版本

这里会有一个问题,那这个版本管理 我用在哪里啊,不用不行么?

  • 为了方便测试,测试的时候 可能因为版本有差异 导致的问题。指定version 就没有这个问题了 2 broker 操作也是,(其实一句话 为了之后的版本兼容)比如
    if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
    result.setCode(ResponseCode.SYSTEM_ERROR);
    result.setRemark("the client does not support this feature. version="
        + MQVersion.getVersionDesc(version));
    log.warn("[get-consumer-status] the client does not support this feature. channel={}, version={}",
        RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
    return result;
} else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) {
 }

如果现在让你 维护一个 各个jar包公用的属性

1 在common包搞一个 公共的实体类 随时用随时取呗,大不了就一个map 然后就put get

2 System.setProperty 底层就是全局 map 进行put get

extends Hashtable<Object,Object>

环境变量如果不想使用 ROCKETMQ_HOME, 想变为 xxx 这怎么做,能做么?

设置 rocketmq.home.dir=xxx

我们启动broker 老是用 -n ip:9876 9876是什么,我们可以改变么?怎么改

nettyServerConfig.setListenPort(9876);

代码指定 的netty 监听端口,一般情况不改

大家如果想 把命令启动带着的 -c -p等参数放到 我们的属性中,怎么写代码?

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

这是先把commandLine 转变为 Properties 对象,然后调用 namesrvConfig 反射方法 赋值

如果我们想 自己设置使用的log 组件,怎么办

LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

遍历 Field[]

遍历 Field[]的时候 想跳过 static的属性 怎么写代码?

 (field.getModifiers() & 0x00000008) != 0 如果为true 就是 static false为 非static

多个对象的 属性需要进行聚合到一个对象中,要是你 怎么写

for (Entry<Object, Object> next : from.entrySet()) {
    Object fromObj = next.getValue(), toObj = to.get(next.getKey());
    if (toObj != null && !toObj.equals(fromObj)) {
        log.info("Replace, key: {}, value: {} -> {}", next.getKey(), toObj, fromObj);
    }
    to.put(next.getKey(), fromObj);
}

因为 可能同时操作这个对象 导致 数据不一致 ,所以要加上 读写锁的 写锁

KVConfigManager 有什么作用

KVConfigManager 有什么作用,怎么保证的 并发操作的数据正确性?你感觉有什么问题么?

是 kv 配置的管理器,主要是

HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable

以后写map 也要像这种方式 写注释。

//读取的是 ./namesrv/kvConfig.json
kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";

行吧 ,现在还不知道 这些kv的作用,先看看怎么存储的,到用的时候 我们接上,先知道 kv 存储在KVConfigManager类 configTable 属性中

putKVConfig 使用的 ReentrantReadWriteLock 的写锁 保证数据一致性,如果map的key 存在了,不会进行覆盖,而是 跳过。

KVConfigManager 持久化

KVConfigManager 怎么保证的 持久化?

执行过 上面的 那些方法,执行 persist ,加读锁,如下图

怎么在 并发操作的时候 保证数据的安全性?

一方面 是 不可变类,其中返回属性的时候 要进行copy 简单来说 就是我通过get 方法出去的 对象 是 copy的对象,而不是 原来的对象,防止 外面通过引用 修改 属性值,把我们的对象 属性 进行修改。

方法的参数 使用final 有什么用?

  • 确保,不会也不能对于参数进行修改,保证了调用发起方数据的安全;
  • 避免在方法体中修改参数,引起不必要的错误
  • 程序员工作不是一个人的工作,你设置为final,别人将来维护的时候一看就知道这个变量不能修改,而不需要去记忆这个是不能变化的值,是常量。这个是代码规范。

broker 是不是master判断

怎么判断的broker 是不是master

//0 == brokerId
MixAll.MASTER_ID == brokerId

这个其实可以 抽出来一个公共的方法, 方便之后的修改

netty 怎么让nameserver 通知broker 信息的。

netty 保存的 channel 到时候用了 直接从map 获取 然后发送消息

nameserver 是否存活的判断标准是什么? 能修改么? 怎么修改

BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2

static final 写死的,如果 最后一次心跳时间 + 2分钟 都小于System.currentTimeMillis() 执行删除操作

  • 关闭 netty channel
  • brokerLiveTable 删除对应的实例

这是一个定时任务 从项目 启动5s之后 ,每10s执行一次,说明 对broker的感知 会有些许 延迟。(最大也就 20s,一般10s以内感知)

Runtime.getRuntime().addShutdownHook 有什么用,没有不行么?

当程序正常退出,系统调用 System.exit方法或虚拟机被关闭时才会执行添加的shutdownHook线程。其中shutdownHook是一个已初始化但并不有启动的线程,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。所以可通过这些钩子在jvm关闭的时候进行内存清理、资源回收等工作。

@ImportantField

@ImportantField 干什么的? 什么时候 使用

最后的true 代表 是否只打印关键属性,写@ImportantField的 就一定会打,没有这个注解的就不打印了

MixAll.printObjectProperties(console, brokerConfig, true);

在同一台计算机上部署多个代理时 想区分日志路径 用哪个参数,调成什么?

isolateLogEnable 改为 true

if (brokerConfig.isIsolateLogEnable()) {
    System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());
}
if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {
    System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
}

broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢?

无论是 -p 还是 -m 都是print 输出,本来就是希望打印日志,然后进程停止。

opt = new Option("p", "printConfigItem", false, "Print all config item");
opt = new Option("m", "printImportantConfig", false, "Print important config item");

总结

这些 只是 namestr 的NamesrvController 初始化,更多关于RocketMQ NameServer 解析的资料请关注我们其它相关文章!

(0)

相关推荐

  • RocketMQ的push消费方式实现示例

    目录 引言 MQ消费方式 1.push(推方式) 2.pull(拉方式) RocketMQ对于消费方式的实现 RocketMQ聪明地实现push的原因 轮询与长轮询 轮询 长轮询 push消费方式源码探究 1.消费者拉取消息控制压力源码 2.MQ将请求hold住源码 3.MQ收到消息响应给消费者的源码 最后 引言 最近仍然畅游在RocketMQ的源码中,这几天刚好翻到了消费者的源码,发现RocketMQ的对于push消费方式的实现简直太聪明了,所以趁着我脑子里还有点印象的时候,赶紧来写一篇文章,

  • RocketMQ源码解析topic创建机制详解

    目录 1. RocketMQ Topic创建机制 2. 自动Topic 3. 手动创建--预先创建 通过界面控制台创建 1. RocketMQ Topic创建机制 以下源码基于Rocket MQ 4.7.0 RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建.可以通过设置broker的配置文件来禁用或者允许自动创建.默认是开启的允许自动创建 autoCreateTopicEnable=true/false 下面会结合源码来深度分析一下自动创建和手动创建的过程. 2. 自动T

  • java开发RocketMQ之NameServer路由管理源码分析

    目录 1.前言 2.路由元信息 3.路由注册 3.1Broker路由注册 3.2NameServer处理路由注册 3.3路由删除 3.3.1Broker异常关闭 3.3.2Broker正常关闭 3.4路由发现 3.5总结 1.前言 NameServer主要作用是为消息消费者和消息生产者提供关于主题Topic的路由信息,那么NameServer需要存储路由的基本信息,还要管理Broker节点,包括路由注册.路由删除等. 2.路由元信息 路由元信息主要由RouteInfoManager来进行管理,这

  • SpringBoot整合RocketMQ的方法详解

    目录 一:Ubuntu安装RocketMQ 二:添加RocketMQ依赖 三:在application中添加RocketMQ配置 四:编写消费者,消息生产者,消息实体类(自定义) 五:测试Controller 一:Ubuntu安装RocketMQ 1.下载(在下面地址选择自己需要的版本的rocketmq) http://rocketmq.apache.org/release_notes/ 2.解压,更改配置 将下载的zip文件解压到自己需要安装的位置 在unbuntu系统下需要修改安装跟目录下的

  • RocketMQ之NameServer架构设计及启动关闭流程源码分析

    目录 NameServer 1.架构设计 2.核心类与配置 NamesrvController NamesrvConfig NettyServerConfig RouteInfoManager 3.启动与关闭流程 3.1.步骤一 3.2.步骤二 3.3.步骤三 NameServer 1.架构设计 消息中间件的设计思路一般都是基于主题订阅与发布的机制,RocketMQ也不例外.RocketMQ中,消息生产者(Producer)发送某主题的消息到消息服务器,消息服务器对消息进行持久化存储,而消息消费

  • java开发线上事故理解RocketMQ异步精髓

    目录 引言 1 业务场景 2 线程池模式 3 本地内存 + 定时任务 4 MQ 模式 5 Agent 服务 + MQ 模式 6 总结 第一层:什么场景下需要异步 第二层:异步的外功心法 第三层:异步的本质 引言 在高并发的场景下,异步是一个极其重要的优化方向. 前段时间,生产环境发生一次事故,笔者认为事故的场景非常具备典型性 . 写这篇文章,笔者想和大家深入探讨该场景的架构优化方案.希望大家读完之后,可以对异步有更深刻的理解. 1 业务场景 老师登录教研平台,会看到课程列表,点击课程后,课程会以

  • RocketMQ NameServer 核心源码解析

    目录 带着问题 往下看 (namesrv) nameserver 启动的逻辑 nameserver 功能 nameserver 问题解答 我们在写组件的时候 怎么管理version 遍历 Field[] KVConfigManager 有什么作用 KVConfigManager 持久化 broker 是不是master判断 @ImportantField broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢? 总结 带着问题 往下看 (namesrv) 我们在写组件的时候 怎么

  • MyBatis核心源码深度剖析SQL语句执行过程

    目录 1 SQL语句的执行过程介绍 2 SQL执行的入口分析 2.1 为Mapper接口创建代理对象 2.2 执行代理逻辑 3 查询语句的执行过程分析 3.1 selectOne方法分析 3.2 sql获取 3.3 参数设置 3.4 SQL执行和结果集的封装 4 更新语句的执行过程分析 4.1 sqlsession增删改方法分析 4.2 sql获取 4.3 参数设置 4.4 SQL执行 5 小结 1 SQL语句的执行过程介绍 MyBatis核心执行组件: 2 SQL执行的入口分析 2.1 为Ma

  • Java SpringBoot核心源码详解

    目录 SpringBoot源码主线分析 1.SpringBoot启动的入口 2.run方法 3.SpringApplication构造器 4.run方法 总结 SpringBoot源码主线分析 我们要分析一个框架的源码不可能通过一篇文章就搞定的,本文我们就来分析下SpringBoot源码中的主线流程.先掌握SpringBoot项目启动的核心操作,然后我们再深入每一个具体的实现细节,注:本系列源码都以SpringBoot2.2.5.RELEASE版本来讲解 1.SpringBoot启动的入口 当我

  • 详解ArrayBlockQueue源码解析

    今天要讲的是ArrayBlockQueue,ArrayBlockQueue是JUC提供的线程安全的有界的阻塞队列,一看到Array,第一反应:这货肯定和数组有关,既然是数组,那自然是有界的了,我们先来看看ArrayBlockQueue的基本使用方法,然后再看看ArrayBlockQueue的源码. ArrayBlockQueue基本使用 public static void main(String[] args) throws InterruptedException { ArrayBlocki

  • Spring源码解析之事务传播特性

    一.使用方式 可以采用Transactional,配置propagation即可. 打开org.springframework.transaction.annotation.Transactional可见默认传播特性是REQUIRED. /** * The transaction propagation type. * <p>Defaults to {@link Propagation#REQUIRED}. * @see org.springframework.transaction.inte

  • Spring源码解析之编程式事务

    一.前言 在Spring中,事务有两种实现方式: 编程式事务管理: 编程式事务管理使用TransactionTemplate可实现更细粒度的事务控制.声明式事务管理: 基于Spring AOP实现.其本质是对方法前后进行拦截,然后在目标方法开始之前创建或者加入一个事务,在执行完目标方法之后根据执行情况提交或者回滚事务. 声明式事务管理不需要入侵代码,通过@Transactional就可以进行事务操作,更快捷而且简单(尤其是配合spring boot自动配置,可以说是精简至极!),且大部分业务都可

  • Spring AOP实现声明式事务机制源码解析

    目录 一.声明式全局事务 二.源码 三.小结: 一.声明式全局事务 在Seata示例工程中,能看到@GlobalTransactional,如下方法示例: @GlobalTransactional public boolean purchase(long accountId, long stockId, long quantity) { String xid = RootContext.getXID(); LOGGER.info("New Transaction Begins: " +

  • 浅析Alibaba Nacos注册中心源码剖析

    Nacos&Ribbon&Feign核心微服务架构图 架构原理 微服务系统在启动时将自己注册到服务注册中心,同时外发布 Http 接口供其它系统调用(一般都是基于Spring MVC) 服务消费者基于 Feign 调用服务提供者对外发布的接口,先对调用的本地接口加上注解@FeignClient,Feign会针对加了该注解的接口生成动态代理,服务消费者针对 Feign 生成的动态代理去调用方法时,会在底层生成Http协议格式的请求,类似 /stock/deduct?productId=100

  • jq源码解析之绑在$,jQuery上面的方法(实例讲解)

    1.当我们用$符号直接调用的方法.在jQuery内部是如何封装的呢?有没有好奇心? // jQuery.extend 的方法 是绑定在 $ 上面的. jQuery.extend( { //expando 用于决定当前页面的唯一性. /\D/ 非数字.其实就是去掉小数点. expando: "jQuery" + ( version + Math.random() ).replace( /\D/g, "" ), // Assume jQuery is ready wit

随机推荐