RocketMQ Namesrv架构工作原理详解

目录
  • 1 概念
  • 2 核心数据结构和API
    • 2.1 Namesrv的核心数据结构
    • 2.2 Namesrv的API
  • 3 Namesrv架构
    • 3.1组件
    • 3.2 Namesrv四个功能模块

1 概念

Namesrv的作用是保存元数据提高Broker的可用性

Namesrv的主要功能是临时存储管理Topic路由信息,各个Namesrv节点之间是不通信无状态的,互相不知道对方的存在。

当Broker,生产者,消费者启动的时候,会轮询全部的Namesrv节点,获取路由信息。

2 核心数据结构和API

2.1 Namesrv的核心数据结构

Namesrv中保存的信息是Topic的路由信息,Topic的路由决定了Topic的信息发送给哪些Broker,或者从哪些Broker获取消息。

路由数据结构的实现代码都在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager中

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    //Broker存活的时间周期,默认120秒
    private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    //保存Topic和队列的路由信息
    private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
    //Broker名字和Broker信息的对应信息
    private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
    //集群和Broker的对应关系
    private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    //在线的Broker地址和Broker信息的对应关系
    private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    //过滤服务器消息
    private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
    private final BatchUnRegisterService unRegisterService;
    private final NamesrvController namesrvController;
    private final NamesrvConfig namesrvConfig;

2.2 Namesrv的API

Namesrv的的API在org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor中,根据方法名很容易判断出来方法的作用。

switch (request.getCode()) {
    case RequestCode.PUT_KV_CONFIG:
        return this.putKVConfig(ctx, request);
    case RequestCode.GET_KV_CONFIG:
        return this.getKVConfig(ctx, request);
    case RequestCode.DELETE_KV_CONFIG:
        return this.deleteKVConfig(ctx, request);
    case RequestCode.QUERY_DATA_VERSION:
        return this.queryBrokerTopicConfig(ctx, request);
    case RequestCode.REGISTER_BROKER:
        //Broker注册自身信息到Namesrv
        return this.registerBroker(ctx, request);
    case RequestCode.UNREGISTER_BROKER:
        //Broker取消注册自身信息到Namesrv
        return this.unregisterBroker(ctx, request);
    case RequestCode.BROKER_HEARTBEAT:
        return this.brokerHeartbeat(ctx, request);
    case RequestCode.GET_BROKER_MEMBER_GROUP:
        return this.getBrokerMemberGroup(ctx, request);
    case RequestCode.GET_BROKER_CLUSTER_INFO:
        return this.getBrokerClusterInfo(ctx, request);
    case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
        return this.wipeWritePermOfBroker(ctx, request);
    case RequestCode.ADD_WRITE_PERM_OF_BROKER:
        return this.addWritePermOfBroker(ctx, request);
    case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
        return this.getAllTopicListFromNameserver(ctx, request);
    case RequestCode.DELETE_TOPIC_IN_NAMESRV:
        return this.deleteTopicInNamesrv(ctx, request);
    case RequestCode.REGISTER_TOPIC_IN_NAMESRV:
        return this.registerTopicToNamesrv(ctx, request);
    case RequestCode.GET_KVLIST_BY_NAMESPACE:
        return this.getKVListByNamespace(ctx, request);
    case RequestCode.GET_TOPICS_BY_CLUSTER:
        return this.getTopicsByCluster(ctx, request);
    case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
        return this.getSystemTopicListFromNs(ctx, request);
    case RequestCode.GET_UNIT_TOPIC_LIST:
        return this.getUnitTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
        return this.getHasUnitSubTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
        return this.getHasUnitSubUnUnitTopicList(ctx, request);
    case RequestCode.UPDATE_NAMESRV_CONFIG:
        return this.updateConfig(ctx, request);
    case RequestCode.GET_NAMESRV_CONFIG:
        return this.getConfig(ctx, request);
    case RequestCode.GET_CLIENT_CONFIG:
        return this.getClientConfigs(ctx, request);
    default:
        String error = " request type " + request.getCode() + " not supported";
        return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
}

3 Namesrv架构

下图是一个消息的常规流转过程,生产者,消费者,Broker通过与Namesrv交换信息来实现自己的功能。

3.1组件

  • Broker

Broker在启动的时候,将自己的元数据信息,上报给Namesrv,这部分信息也就是Topic路由。

这里的元数据包含Broker本身的元数据和该Broker中Topic的信息。

  • 生产者

生产者只关注Topic路由,从namesrv获取到Topic路由后就可以知道这个Topic的消息存放到了哪些Broker中。

  • 消费者

消费者也只关注Topic路由,从namesrv获取到获取到Topic路由之后,才能知道自己订阅的Topic的Broker地址,从而获取消息。

3.2 Namesrv四个功能模块

  • Topic功能管理模块

这是Namesrv最核心的模块,Topic路由决定,Topic的数据会保存在哪些Broker上。Broker启动的时候,会将自身的信息注册到Namesrv中,以供消费者和生产者获取。生产者和消费者与Namesrv之间会有心跳通信,从而获取最新的Broker信息。

  • Remoting通信模块

这个模块是基于Netty的网络通信封装,担任各个组件之间的网络通信任务。

  • 定时任务模块

定时任务模块包括:定时扫描宕机的Broker,定时打印KV配置,定时扫描超时请求。

  • KV管理模块

Namesrv维护了一个全局的KV配置魔窟啊,方便全局配置。

以上就是RocketMQ Namesrv架构工作原理详解的详细内容,更多关于RocketMQ Namesrv架构的资料请关注我们其它相关文章!

(0)

相关推荐

  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

  • 解决springboot集成rocketmq关于tag的坑

    springboot集成rocketmq关于tag的坑 新项目使用springboot的若依框架集成rocketmq,选择集成RocketMQTemplate这种方式实现消息的发送和接收. 1.客户端发送代码 此处回调方法里有些业务不用关注,只关心发送方法 @Component public class RocketMqHelper { Logger logger = LoggerFactory.getLogger(RocketMqHelper.class); @Resource private

  • SpringCloud+RocketMQ实现分布式事务的实践

    目录 一.RocketMQ的分布式事务结构和说明 二.搭建RocketMQ 三.事务场景,然后准备工程,运行代码 随着互联网公司的微服务越来越多,分布式事务已经成为了我们的经常使用的.所以我们来一步一步的实现基于RocketMQ的分布式事务.接下来,我们将要做的主题写出来. RocketMQ的分布式事务结构和说明 搭建RocketMQ步骤 事务场景,然后准备工程,运行代码 一.RocketMQ的分布式事务结构和说明 我们通过下图来了解一下RocketMQ实现分布式事务的结构.采用半消息机制实现分

  • RocketMQ4.5.2 修改mqnamesrv 和 mqbroker的日志路径操作

    此解决方案是针对window的,因为日志默认保存路径在C盘,linux忽略. 学习RocketMQ过程中,总是出现 com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 14 DESC: service not available now, maybe disk full, CL: 0.87 CQ: 0.87 INDEX: 0.87, maybe your broker machine memory too small. 这

  • RocketMQ实现随缘分BUG小功能示例详解

    目录 正文 实现过程 生产者: 正文 以前公司的产品已经上线20多年了,主要是维护,也就是改bug.每周我们Team会从Jira上拿我们可以改的bug,因为每个团队负责的业务范围不一样,我们团队只能改我们自己业务范围的.这样每周大概有20个左右的新bug,假如团队一共10个人,那么均分就是每人两个,改完下班. 但是这BUG肯定有难有简单,大家肯定都愿意改简单的,在家办公,任务量完了不就等于放假么.开始是自己给自己抢,就忒卷,是欧美项目,所以客服晚上报出来的bug多.有的哥们早上5点起来看有没有新

  • SpringBoot整合RocketMQ实现消息发送和接收的详细步骤

    我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷: 最终项目结构如下: 具体步骤如下: 第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <vers

  • RocketMQ Namesrv架构工作原理详解

    目录 1 概念 2 核心数据结构和API 2.1 Namesrv的核心数据结构 2.2 Namesrv的API 3 Namesrv架构 3.1组件 3.2 Namesrv四个功能模块 1 概念 Namesrv的作用是保存元数据,提高Broker的可用性. Namesrv的主要功能是临时存储,管理Topic路由信息,各个Namesrv节点之间是不通信,无状态的,互相不知道对方的存在. 当Broker,生产者,消费者启动的时候,会轮询全部的Namesrv节点,获取路由信息. 2 核心数据结构和API

  • 微服务架构设计RocketMQ进阶事务消息原理详解

    目录 前言 RocketMQ事务流程概要 RocketMQ事务流程关键 实现 基础配置 引入组件 添加配置 发送半消息 执行本地事务与回查 消费消息 测试 总结 前言 分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好.今天我们来唠唠如何实现RocketMQ的事务消息! Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败

  • Spring @Transactional工作原理详解

    本文将深入研究Spring的事务管理.主要介绍@Transactional在底层是如何工作的.之后的文章将介绍: propagation(事务传播)和isolation(隔离性)等属性的使用 事务使用的陷阱有哪些以及如何避免 JPA和事务管理 很重要的一点是JPA本身并不提供任何类型的声明式事务管理.如果在依赖注入容器之外使用JPA,事务处理必须由开发人员编程实现. UserTransaction utx = entityManager.getTransaction(); try{ utx.be

  • Javascript对象及Proxy工作原理详解

    正文 这一章其实算是javascript的科普文章,其实这本书的读者一般都不会是入门者,因此按道理说应该不需要再科普才对.但是作者依旧安排了这一章,证明就是这一章内容与我们以为的对象不一样. Javascript中一切皆对象 这一句话大家应该耳熟能详,对于常规的字面量对象,和new出来的对象,大家应该都能分辨 const str = '' const str2 = new String() const obj = {} const obj2 = Object.create() 但是根据ECMA,

  • PHP底层运行机制与工作原理详解

    最近搭建服务器,突然感觉lamp之间到底是怎么工作的,或者是怎么联系起来?平时只是写程序,重来没有思考过他们之间的工作原理: PHP底层工作原理 图1 php结构 从图上可以看出,php从下到上是一个4层体系 ①Zend引擎 Zend整体用纯c实现,是php的内核部分,它将php代码翻译(词法.语法解析等一系列编译过程)为可执行opcode的处理并实现相应的处理方法.实现了基本的数据结构(如hashtable.oo).内存分配及管理.提供了相应的api方法供外部调用,是一切的核心,所有的外围功能

  • Servlet生命周期与工作原理详解

    本文为大家分享了Servlet生命周期与工作原理,供大家参考,具体内容如下 Servlet生命周期分为三个阶段: 1.初始化阶段  调用init()方法 2.响应客户请求阶段 调用service()方法 3.终止阶段 调用destroy()方法 Servlet初始化阶段: 在下列时刻Servlet容器装载Servlet: 1.Servlet容器启动时自动装载某些Servlet,实现它只需要在web.XML文件中的<Servlet></Servlet>之间添加如下代码: <lo

  • java HashMap 的工作原理详解

    HashMap的工作原理是近年来常见的Java面试题.几乎每个Java程序员都知道HashMap,都知道哪里要用HashMap,知道Hashtable和HashMap之间的区别,那么为何这道面试题如此特殊呢?是因为这道题考察的深度很深.这题经常出现在高级或中高级面试中.投资银行更喜欢问这个问题,甚至会要求你实现HashMap来考察你的编程能力.ConcurrentHashMap和其它同步集合的引入让这道题变得更加复杂.让我们开始探索的旅程吧! 先来些简单的问题 "你用过HashMap吗?&quo

  • Web程序工作原理详解

    1.Web程序工作原理 (1)Web一词的含义 Network:[计算机]电脑网络,网 Web:[计算机]万维网(WorldWideWeb),互联网(Internet) Web程序,顾名思义,即工作在Web上的程序. (2)单机程序工作原理 单机,即不连接到其他计算机的计算机,不在网络中.例如:两单机A.B,只在A上安装有程序X,若要在B上得到X的运行结果,则必须在B上安装一遍X,然后运行.若B类的计算机比较多,则需要逐一安装运行.它们之间不能直接进行通信和协作.如图1所示. (3)客户机/服务

  • AngularJS 工作原理详解

    个人觉得,要很好的理解AngularJS的运行机制,才能尽可能避免掉到坑里面去.在这篇文章中,我将根据网上的资料和自己的理解对AngularJS的在启动后,每一步都做了些什么,做一个比较清楚详细的解析.      首先上一小段代码(index.html),结合代码我们来看看,angular一步一步都做了些什么. <!doctype html> <html ng-app> <head> <script src="angular.js">&l

  • Docker 网络工作原理详解

    Docker 网络工作原理 当Docker server也就是docker daemon启动时,会自动创建一个名字是docker0的bridge,每当docker创建一个Container时,会在主机上面创建一个名字是veth*的ethernet 端口,并把这个eth*加入到docker0的bridge,在container中会自动创建一个名字是eth0的ethernet端口,这个eth0和veth*会形成一个类似管道的对,一一对应. 配置DNS docker是如何分配每个container的h

随机推荐