RocketMQ之NameServer架构设计及启动关闭流程源码分析

目录
  • NameServer
    • 1.架构设计
    • 2.核心类与配置
      • NamesrvController
      • NamesrvConfig
      • NettyServerConfig
      • RouteInfoManager
  • 3.启动与关闭流程
    • 3.1.步骤一
    • 3.2.步骤二
    • 3.3.步骤三

NameServer

1.架构设计

消息中间件的设计思路一般都是基于主题订阅与发布的机制,RocketMQ也不例外。RocketMQ中,消息生产者(Producer)发送某主题的消息到消息服务器,消息服务器对消息进行持久化存储,而消息消费者(Consumer)订阅所需要的主题,消息服务器根据订阅信息(路由信息)将消息推送至消息消费者(Push模式)或者消息消费者主动向消息服务器进行拉取(Pull模式),从而实现消息生产者与消息消费者之间解耦。

为了避免消息服务器单点故障而导致的系统瘫痪,消息服务器常常会集群分布,部署多台服务器共同处理消息并且承担消息的存储,消息生产者如何知道要将消息发送至哪台服务器和消息消费者如何知道要从哪台消息服务器进行消息的拉取等等问题,都要由NameServer来处理,其实NameServer充当的角色与Zookeeper十分相似。

Broker消息服务器启动时,需要向NameServer集群进行信息注册,消息生产者Producer发送消息之前主动向NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选出一台服务器进行消息的发送。NameServer与每台Broker保持长连接,并每隔30s对Broker存活状态进行检测,如果检测到Broker宕机并且长时间没有进行连接重试,则会将该Broker从路由注册表中删除,以此保证Broker集群的高可用,但是路由变化不会立马对生产者进行通知,需要Producer一段时间之后重新向NameServer进行获取并更新路由信息。这也是NameServer与Zookeeper的不同,NameServer这样的设计降低了整个NameServer实现的复杂度,整个NameServer代码实现不超过一千行,简单而高效!

以下是NameServer整个项目预览:

可以看到NameServer主要有以下几个作用:

配置信息管理

请求处理

路由信息管理

2.核心类与配置

NamesrvController

NameserController 是 NameServer 模块的核心控制类。

private final NamesrvConfig namesrvConfig;//主要指定 nameserver 的相关配置属性
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("NSScheduledThread"));//NameServer定时任务执行线程池-->每隔10s扫描broker,对存活的Broker信息进行维护并且打印KVConfig
private final KVConfigManager kvConfigManager;//读取或变更NameServer的配置属性,加载 NamesrvConfig中配置到内存
private final RouteInfoManager routeInfoManager;//NameServer 数据的载体,记录 Broker、Topic 等信息。

private final NettyServerConfig nettyServerConfig;//与网络通讯相关的配置
private RemotingServer remotingServer;//网络通信服务
private ExecutorService remotingExecutor;//网络通信服务

NamesrvConfig

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false;

rocketmqHome:rocketmq主目录

kvConfigPath:NameServer存储KV配置属性的持久化路径

configStorePath:nameServer默认配置文件路径

orderMessageEnable:是否支持顺序消息

NettyServerConfig

private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;

private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;

listenPort:NameServer监听端口,该值默认会被初始化为9876
serverWorkerThreads:Netty业务线程池线程个数
serverCallbackExecutorThreads:Netty public任务线程池线程个数,Netty网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。
serverSelectorThreads:IO线程池个数,主要是NameServer、Broker端解析请求、返回相应的线程个数,这类线程主要是处理网路请求的,解析请求包,然后转发到各个业务线程池完成具体的操作,然后将结果返回给调用方;
serverOnewaySemaphoreValue:send oneway消息请求;
serverAsyncSemaphoreValue:异步消息发送最大并发数;
serverChannelMaxIdleTimeSeconds :网络连接最大的空闲时间,默认120s。
serverSocketSndBufSize:网络socket发送端缓冲区大小。
serverSocketRcvBufSize: 网络socket接收端缓存区大小。
serverPooledByteBufAllocatorEnable:ByteBuffer是否开启缓存;
useEpollNativeSelector:是否启用Epoll IO模型。

RouteInfoManager

private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

BROKER_CHANNEL_EXPIRED_TIME:NameServer与Broker空闲连接时长,在2 minNameServer之内没有收到Broker的心跳包,则NameServer会关闭与该Broker的连接并删除Broker的路由信息。

lock:读写锁,用来保护以下用于存储关键信息的非线程安全容器HashMap。

topicQueueTable:用于存储主题与队列的映射关系,记录一个主题topic的队列分布在哪些Broker上。以下是QueueData属性值:

private String brokerName;			//broker名称
private int readQueueNums;			//读队列个数
private int writeQueueNums;			//写队列个数
private int perm;				   //操作权限
private int topicSysFlag;			//同步复制还是异步复制的标识

brokerAddrTable:用于记录所有Broker信息。以下是BrokerData属性值:

private String cluster;				//当前Broker所属集群
private String brokerName;			//Broker名称
//BrokerId=0表示主节点,BrokerId>0表示从节点
//记录BrokerId与对应节点地址的映射信息
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

clusterAddrTable:用于记录Broker集群信息

brokerLiveTable:用于记录活跃状态的Broker,NameServer每隔10s对所有Broker进行扫描,如果有Broker宕机,会将该Broker从该表中删去,以此维护可用的Broker列表信息。以下是BrokerLiveInfo的属性值:

private long lastUpdateTimestamp;		//上次发送心跳包的时间戳
private DataVersion dataVersion;		//记录数据版本信息
private Channel channel;
private String haServerAddr;			//Master节点地址

3.启动与关闭流程

NameServer启动时序图:

启动类:org.apache.rocketmq.namesrv.NamesrvStartup.java

3.1.步骤一

解析配置文件,填充NamesrvConfigNettyServerConfig并创建NamesrvController

启动类:

public static void main(String[] args) {
    main0(args);
}
public static NamesrvController main0(String[] args) {

    try {
        //创建NamesrvController的入口
        NamesrvController controller = createNamesrvController(args);
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}

NamesrvController#createNamesrvController:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    //....
    //创建namesrvConfig
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    //创建nettyServerConfig
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();

    //设置默认端口9876
    nettyServerConfig.setListenPort(9876);
    //-c 指定属性配置文件的位置
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }
    //-p 属性名=属性值
    if (commandLine.hasOption('p')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }
    //将启动参数填充到namesrvConfig中
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

    //如果未指定'ROCKETMQ_HOME'环境变量
    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }
	//....
    //打印配置信息日志
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
    //根据namesrvConfig和nettyServerConfig创建NamesrvController
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
    // 将配置存入controller.configuration以防止配置丢失
    controller.getConfiguration().registerConfig(properties);
    return controller;
}

3.2.步骤二

根据配置创建好NamesrvController之后,对其进行初始化:

//NamesrvStartup#start
public static NamesrvController start(final NamesrvController controller) throws Exception {

    //进行简单的检查
    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }
    //controller初始化
    boolean initResult = controller.initialize();
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }
    //....
    controller.start();		//开启远程服务-this.remotingServer.start();

    return controller;
}
//NamesrvController#initialize
public boolean initialize() {

    	//加载配置管理器
        this.kvConfigManager.load();
		//创建Netty远程服务
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
		//创建远程服务线程池
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        //注册线程池
        this.registerProcessor();
        //定时任务线程池--->每隔十秒扫描活跃状态异常的Broker信息
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            /**
             * 对Not Active Broker 进行扫描
             */
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        //定时任务线程池--->每隔十秒打印KVConfig信息
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
       //....
        return true;
    }

3.3.步骤三

在JVM进程关闭之前,先将线程池关闭,及时释放资源。

public static NamesrvController start(final NamesrvController controller) throws Exception {
	//....
    //JVM进程关闭之前,将线程池关闭,资源释放
    Runtime.getRuntime().addShutdownHook/*注册JVM钩子函数*/(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            controller.shutdown();
            return null;
        }
    }));
    //....
}

以上仅供个人学习使用,如有不足请指正!

以上就是RocketMQ之NameServer架构设计及启动关闭流程源码分析的详细内容,更多关于RocketMQ之NameServer架构设计及启动关闭的资料请关注我们其它相关文章!

(0)

相关推荐

  • springBoot整合RocketMQ及坑的示例代码

    版本: JDK:1.8 springBoot:1.5.10 rocketMQ:4.2.0 pom 配置: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> </parent> <d

  • RocketMQ重试机制及消息幂代码实例解析

    这篇文章主要介绍了RocketMQ重试机制及消息幂代码实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.重试机制 1.由于MQ经常处于复杂的分布式系统中,考虑网络波动,服务宕机,程序异常因素,很有可能出现消息发送或者消费失败的问题.因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点.如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响.所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好

  • 微服务架构设计RocketMQ基础及环境整合

    目录 概述&选型 单机安装配置 双机主从高可用搭建 启动多个NameServer 和 Broker 重要参数说明 可视化管理平台 SpringBoot整合RocketMQ 引入组件rocketmq-spring-boot-starter 依赖 修改application.yml,添加RocketMQ相关配置 编写消息生产者 MessageProduce 编写消息消费者 MessageConsumer 编写单元测试发送消息 测试 概述&选型 消息队列作为高并发系统的核心组件之一,能够帮助业务

  • Springboot RocketMq实现过程详解

    首先,在虚拟机上安装rocketmq和rocketMq可视化控制,安装不做描述. 1.pom.xml文件添加依赖 mq的版本与连接的rocketmq版本保持一致 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-remoting</artifactId> <version>4.4.0</version> </depende

  • RocketMQ之NameServer架构设计及启动关闭流程源码分析

    目录 NameServer 1.架构设计 2.核心类与配置 NamesrvController NamesrvConfig NettyServerConfig RouteInfoManager 3.启动与关闭流程 3.1.步骤一 3.2.步骤二 3.3.步骤三 NameServer 1.架构设计 消息中间件的设计思路一般都是基于主题订阅与发布的机制,RocketMQ也不例外.RocketMQ中,消息生产者(Producer)发送某主题的消息到消息服务器,消息服务器对消息进行持久化存储,而消息消费

  • Spring启动过程源码分析及简介

    目录 1.BeanDefinition 2.beanFactory 3.BeanDefinitionReader 4.ClassPathBeanDefinitionScanner 5.ConditionEvaluator 6.Aware 本文是通过AnnotationConfigApplicationContext读取配置类来一步一步去了解Spring的启动过程. 在看源码之前,我们要知道某些类的作用,这样更方便后续的了解. 1.BeanDefinition BeanDefinition就是Be

  • RocketMQ消息发送流程源码剖析

    目录 正文 读源码 1 调用defaultMQProducerImpl.send() 2 设置过期时间 3 执行defaultMQProducerImpl.sendDefaultImpl()方法 sendDefaultImpl是发送消息的核心方法. 1 两个校验 2 获取topic路由信息 3 计算重试次数 4 执行队列选择方法 5 发送消息 正文 就是说,我们打了个比方,把RocketMQ比作码头上的一个小房子,来送孩子登船的家长比作生产者,拉走孩子们的船夫比作消费者,所以,RocketMQ的

  • Spring MVC 启动过程源码分析详解

    今天小编尝试从源码层面上对Spring mvc的初始化过程进行分析,一起揭开Spring mvc的真实面纱,也许我们都已经学会使用spring mvc,或者说对spring mvc的原理在理论上已经能倒背如流.在开始之前,这可能需要你掌握Java EE的一些基本知识,比如说我们要先学会Java EE 的Servlet技术规范,因为Spring mvc框架实现,底层是遵循Servlet规范的. 在开始源码分析之前,我们可能需要一个简单的案例工程,不慌,小编已经安排好了: 样例工程下载地址 : ht

  • SpringBoot应用启动流程源码解析

    前言 Springboot应用在启动的时候分为两步:首先生成 SpringApplication 对象 ,运行 SpringApplication 的 run 方法,下面一一看一下每一步具体都干了什么 public static ConfigurableApplicationContext run(Class<?>[] primarySources, String[] args) { return new SpringApplication(primarySources).run(args);

  • SpringBoot应用启动内置Tomcat的过程源码分析

    Connector启动过程 Connector是Tomcat提供的类. // 通过此 Connector 开始处理请求 @Override protected void startInternal() throws LifecycleException { // Validate settings before starting if (getPortWithOffset() < 0) { throw new LifecycleException(sm.getString( "coyote

  • Android10 启动之SystemServer源码分析

    目录 正文 createSystemContext startBootstrapServices startCoreServices startOtherServices 正文 上一篇文章: # Android 10 启动分析之Zygote篇 (三) 紧接着上一篇文章的内容,我们从这篇文章开始来分析一下 SystemServer. system_server 进程承载着整个framework的核心服务,例如创建 ActivityManagerService.PowerManagerService

  • java开发微服务架构设计消息队列的水有多深

    目录 消息队列的作用 消息队列的设计难题 处理并发和顺序消息 处理重复消息 编写幂等消息处理器 跟踪消息并丢弃重复消息 处理事务性消息 使用数据库表作为消息队列 使用事务日志发布事件 RocketMQ事务消息解决方案 很多人在做架构设计时往往会"过度设计",简单问题复杂化,上来就引一堆中间件,我想大概原因主要有下面两点: 为了秀(学)技术而架构 我们常说技术是为业务服务的,不能为了技术而技术,为了秀技术引入一堆复杂架构这是要不得的. 考虑问题不全面,或者说广度不够,不知道如何简单化 举

  • java开发RocketMQ之NameServer路由管理源码分析

    目录 1.前言 2.路由元信息 3.路由注册 3.1Broker路由注册 3.2NameServer处理路由注册 3.3路由删除 3.3.1Broker异常关闭 3.3.2Broker正常关闭 3.4路由发现 3.5总结 1.前言 NameServer主要作用是为消息消费者和消息生产者提供关于主题Topic的路由信息,那么NameServer需要存储路由的基本信息,还要管理Broker节点,包括路由注册.路由删除等. 2.路由元信息 路由元信息主要由RouteInfoManager来进行管理,这

随机推荐