elasticsearch的zenDiscovery和master选举机制原理分析

目录
  • 前言
  • join的代码
  • findMaster方法
  • 总结

前言

上一篇通过 ElectMasterService源码,分析了master选举的原理的大部分内容:master候选节点ID排序保证选举一致性及通过设置最小可见候选节点数目避免brain split。节点排序后选举只能保证局部一致性,如果发生节点接收到了错误的集群状态就会选举出错误的master,因此必须有其它措施来保证选举的一致性。这就是上一篇所提到的第二点:被选举的数量达到一定的数目同时自己也选举自己,这个节点才能成为master。这一点体现在zenDiscovery中,本篇将结合节点的发现过程进一步介绍master选举机制。

节点启动后首先启动join线程,join线程会寻找cluster的master节点,如果集群之前已经启动,并且运行良好,则试图连接集群的master节点,加入集群。否则(集群正在启动)选举master节点,如果自己被选为master,则向集群中其它节点发送一个集群状态更新的task,如果master是其它节点则试图加入该集群。

join的代码

private void innerJoinCluster() {
        DiscoveryNode masterNode = null;
        final Thread currentThread = Thread.currentThread();
     //一直阻塞直到找到master节点,在集群刚刚启动,或者集群master丢失的情况,这种阻塞能够保证集群一致性
        while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
            masterNode = findMaster();
        }
      //有可能自己会被选举为master(集群启动,或者加入时正在选举)
      if (clusterService.localNode().equals(masterNode)) {
      //如果本身是master,则需要向其它所有节点发送集群状态更新
            clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() {
                @Override
                public ClusterState execute(ClusterState currentState) {
            //选举时错误的,之前的master状态良好,则不更新状态,仍旧使用之前状态。
                    if (currentState.nodes().masterNode() != null) {
                       return currentState;
                    }
                    DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(currentState.nodes()).masterNodeId(currentState.nodes().localNode().id());
                    // update the fact that we are the master...
                    ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
                    currentState = ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build();
                    // eagerly run reroute to remove dead nodes from routing table
                    RoutingAllocation.Result result = allocationService.reroute(currentState);
                    return ClusterState.builder(currentState).routingResult(result).build();
                }
                @Override
                public void onFailure(String source, Throwable t) {
                    logger.error("unexpected failure during [{}]", t, source);
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                }
                @Override
                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                    if (newState.nodes().localNodeMaster()) {
                        // we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)
                        joinThreadControl.markThreadAsDone(currentThread);
                        nodesFD.updateNodesAndPing(newState); // start the nodes FD
                    } else {
                        // if we're not a master it means another node published a cluster state while we were pinging
                        // make sure we go through another pinging round and actively join it
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    }
                    sendInitialStateEventIfNeeded();
                    long count = clusterJoinsCounter.incrementAndGet();
                    logger.trace("cluster joins counter set to [{}] (elected as master)", count);
                }
            });
        } else {
            // 找到的节点不是我,试图连接该master
            final boolean success = joinElectedMaster(masterNode);
            // finalize join through the cluster state update thread
            final DiscoveryNode finalMasterNode = masterNode;
            clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new ClusterStateNonMasterUpdateTask() {
                @Override
                public ClusterState execute(ClusterState currentState) throws Exception {
                    if (!success) {
                        // failed to join. Try again...
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                        return currentState;
                    }
                    if (currentState.getNodes().masterNode() == null) {
                        // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                        // a valid master.
                        logger.debug("no master node is set, despite of join request completing. retrying pings.");
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                        return currentState;
                    }
                    if (!currentState.getNodes().masterNode().equals(finalMasterNode)) {
                        return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join");
                    }
                    // Note: we do not have to start master fault detection here because it's set at {@link #handleNewClusterStateFromMaster }
                    // when the first cluster state arrives.
                    joinThreadControl.markThreadAsDone(currentThread);
                    return currentState;
                }
                @Override
                public void onFailure(String source, @Nullable Throwable t) {
                    logger.error("unexpected error while trying to finalize cluster join", t);
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                }
            });
        }
    }

以上就是join的过程。zenDiscovery在启动时会启动一个join线程,这个线程调用了该方法。同时在节点离开,master丢失等情况下也会重启这一线程仍然运行join方法。

findMaster方法

这个方法体现了master选举的机制。代码如下:

private DiscoveryNode findMaster() {
      //ping集群中的节点
        ZenPing.PingResponse[] fullPingResponses = pingService.pingAndWait(pingTimeout);
        if (fullPingResponses == null) {return null;
        }// 过滤所得到的ping响应,虑除client节点,单纯的data节点
        List<ZenPing.PingResponse> pingResponses = Lists.newArrayList();
        for (ZenPing.PingResponse pingResponse : fullPingResponses) {
            DiscoveryNode node = pingResponse.node();
            if (masterElectionFilterClientNodes && (node.clientNode() || (!node.masterNode() && !node.dataNode()))) {
                // filter out the client node, which is a client node, or also one that is not data and not master (effectively, client)
            } else if (masterElectionFilterDataNodes && (!node.masterNode() && node.dataNode())) {
                // filter out data node that is not also master
            } else {
                pingResponses.add(pingResponse);
            }
        }
       final DiscoveryNode localNode = clusterService.localNode();
        List<DiscoveryNode> pingMasters = newArrayList();
     //获取所有ping响应中的master节点,如果master节点是节点本身则过滤掉。pingMasters列表结果要么为空(本节点是master)要么是同一个节点(出现不同节点则集群出现了问题
不过没关系,后面会进行选举)
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            if (pingResponse.master() != null) {
                if (!localNode.equals(pingResponse.master())) {
                    pingMasters.add(pingResponse.master());
                }
            }
        }
        // nodes discovered during pinging
        Set<DiscoveryNode> activeNodes = Sets.newHashSet();
        // nodes discovered who has previously been part of the cluster and do not ping for the very first time
        Set<DiscoveryNode> joinedOnceActiveNodes = Sets.newHashSet();
        Version minimumPingVersion = localNode.version();
    for (ZenPing.PingResponse pingResponse : pingResponses) {
        activeNodes.add(pingResponse.node());
        minimumPingVersion = Version.smallest(pingResponse.node().version(), minimumPingVersion);
        if (pingResponse.hasJoinedOnce() != null && pingResponse.hasJoinedOnce()) {
          joinedOnceActiveNodes.add(pingResponse.node());
        }
    }
//本节点暂时是master也要加入候选节点进行选举
        if (localNode.masterNode()) {
            activeNodes.add(localNode);
            long joinsCounter = clusterJoinsCounter.get();
            if (joinsCounter > 0) {
                logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinsCounter);
                joinedOnceActiveNodes.add(localNode);
            }
        }
      //pingMasters为空,则本节点是master节点,
    if (pingMasters.isEmpty()) {
            if (electMaster.hasEnoughMasterNodes(activeNodes)) {//保证选举数量,说明有足够多的节点选举本节点为master,但是这还不够,本节点还需要再选举一次,如果
          本次选举节点仍旧是自己,那么本节点才能成为master。这里就体现了master选举的第二条原则。
                DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes);
                if (master != null) {
                    return master;
                }
                return electMaster.electMaster(activeNodes);
            } else {
                // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                logger.trace("not enough master nodes [{}]", activeNodes);
                return null;
            }
        } else {
        //pingMasters不为空(pingMasters列表中应该都是同一个节点),本节点没有被选举为master,那就接受之前的选举。
            return electMaster.electMaster(pingMasters);
        }
    }

上面的重点部分都做了标注,就不再分析。除了findMaster方法,还有一个方法也体现了master选举,那就是handleMasterGone。下面是它的部分代码,提交master丢失task部分,

clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() {
       @Override
            public ClusterState execute(ClusterState currentState) {
                //获取到当前集群状态下的所有节点
                DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(currentState.nodes())
                        // make sure the old master node, which has failed, is not part of the nodes we publish
                        .remove(masterNode.id())
                        .masterNodeId(null).build();
          //rejoin过程仍然是重复findMaster过程
          if (rejoin) {
                    return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "master left (reason = " + reason + ")");
                }
          //无法达到选举数量,进行findMaster过程
                if (!electMaster.hasEnoughMasterNodes(discoveryNodes)) {
                    return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "not enough master nodes after master left (reason = " + reason + ")");
                }
          //在当前集群状态下,如果候选节点数量达到预期数量,那么选举出来的节点一定是同一个节点,因为所有的节点看到的集群states是一致的
                final DiscoveryNode electedMaster = electMaster.electMaster(discoveryNodes); // elect master
                final DiscoveryNode localNode = currentState.nodes().localNode();
              ....
            }

从以上的代码可以看到master选举节点的应用场景,无论是findMaster还是handlemasterGone,他们都保证了选举一致性。那就是所选节点数量必须要达到一定的数量,否则不能认为选举成功,进入等待环境。如果当前节点被其它节点选举为master,仍然要进行选举一次以保证选举的一致性。这样在保证了选举数量同时对候选节点排序从而保证选举的一致性。

发现和加入集群是zenDiscovery的主要功能,当然它还有一些其它功能,如处理节点离开(handleLeaveRequest),处理master发送的最小clustersates(handleNewClusterStateFromMaster)等功能。这里就不一一介绍,有兴趣请参考相关源码。

总结

本节结合zenDiscovery,分析了master选举的另外一部分内容。同时zenDiscovery是节点发现集群功能的集合,它主要功能是发现(选举)出集群的master节点,并试图加入集群。同时如果 本机是master还会处理节点的离开和节点丢失,如果不是master则会处理来自master的节点状态更新。

以上就是elasticsearch的zenDiscovery和master选举机制原理分析的详细内容,更多关于elasticsearch的zenDiscovery和master选举机制的资料请关注我们其它相关文章!

(0)

相关推荐

  • SpringBoot集成ElasticSearch的示例代码

    目录 一.Elasticseach介绍 1.简单介绍 2.对比关系: 3.详细说明: 4.查出数据的解释 二.SpringBoot集成Elasticseach 1.引入依赖 2.添加配置 3.创建pojo类与索引对应 4.SpringData封装了基础的增删改查,自定义增删改查 5.测试方法--增删改查 一.Elasticseach介绍 1.简单介绍 官网:开源搜索:Elasticsearch.ELK Stack 和 Kibana 的开发者 | Elastic https://www.elast

  • java 通过聚合查询实现elasticsearch的group by后的数量

    通过聚合查询获取group by 后的数量 /** * 获取key的个数 * * @param key 要group by的字段名 * @param index 索引名称 * @return id的个数 */ public static int getKeyCount(String key, String index) { int count = 0; TransportClient client = null; try { client = connectionPool.getConnecti

  • Springboot集成Elasticsearch的步骤与相关功能

    目录 集成配置步骤 步骤1:加入 Maven 相关依赖 步骤2:配置 elasticsearch 的主机和端口 步骤3:配置 Elaseticsearch 客户端 步骤4:创建文档实体 步骤5:创建 controller,service, dao 层 相关功能实现 1. 添加文档  2. 修改文档  3. 根据ID查询文档  4. 根据ID删除文档  5. 查询所有文档  6. 条件查询(单个条件)  7. 条件查询(多条件)  8. 分页查询(降序)  9. 分页查询(升序)  10. 分页查

  • elasticsearch的灵魂唯一master选举机制原理分析

    master作为cluster的灵魂必须要有,还必须要唯一,否则集群就出大问题了.因此master选举在cluster分析中尤为重要.对于这个问题我将分两篇来分析.第一篇也就是本篇,首先会简单说一说mater选举的一些算法,及elasticsearch的选举原理.第二篇也就是下一篇,会结合zenDiscovery代码为仔细分析elasticsearch的master选举的实现. 简单来说master的作用跟单个jvm中的同步关键字synchronized相同,集群中多节点协调工作必须要保证数据的

  • Elasticsearch映射字段数据类型及管理

    目录 Elasticsearch映射管理 一 映射介绍 1.1 字段数据类型 1.2 映射参数 二 创建索引 三 查看索引 Elasticsearch映射管理 在Elasticsearch 6.0.0或更高版本中创建的索引只包含一个mapping type. 在5.x中使用multiple mapping types创建的索引将继续像以前一样在Elasticsearch 6.x中运行. Mapping types将在Elasticsearch 7.0.0中完全删除 一 映射介绍 在创建索引的时候

  • elasticsearch的zenDiscovery和master选举机制原理分析

    目录 前言 join的代码 findMaster方法 总结 前言 上一篇通过 ElectMasterService源码,分析了master选举的原理的大部分内容:master候选节点ID排序保证选举一致性及通过设置最小可见候选节点数目避免brain split.节点排序后选举只能保证局部一致性,如果发生节点接收到了错误的集群状态就会选举出错误的master,因此必须有其它措施来保证选举的一致性.这就是上一篇所提到的第二点:被选举的数量达到一定的数目同时自己也选举自己,这个节点才能成为master

  • PHP进阶学习之类的自动加载机制原理分析

    本文实例讲述了PHP类的自动加载机制.分享给大家供大家参考,具体如下: 前言 我们在常见的PHP的主流框架中通常写好一个类只需写好相应的命名空间或直接实例化类就可以实现类的使用.而不需要使用原生的方式把类文件一个个用require.include引入包含进来,这归功于PHP的类自动加载机制,也是本文讨论的要点. 一.概念 在PHP代码中,不需要显式地使用文件路径将类库文件包含进来,便可使用该文件中定义的类库,这种技术称作自动加载. 在使用类或者定义了命名空间的类时,只需要直接实例化使用,PHP机

  • Python语法垃圾回收机制原理解析

    一 引入 解释器在执行到定义变量的语法时,会申请内存空间来存放变量的值,而内存的容量是有限的,这就涉及到变量值所占用内存空间的回收问题,当一个变量值没有用了(简称垃圾)就应该将其占用的内存给回收掉,那什么样的变量值是没有用的呢? 由于变量名是访问到变量值的唯一方式,所以当一个变量值不再关联任何变量名时,我们就无法再访问到该变量值了,该变量值就是没有用的,就应该被当成一个垃圾回收. 毫无疑问,内存空间的申请与回收是非常耗费精力的事情,而且存在很大的危险性,稍有不慎就有可能引发内存溢出问题,好在Cp

  • JAVA中实现原生的 socket 通信机制原理

    本文介绍了JAVA中实现原生的 socket 通信机制原理,分享给大家,具体如下: 当前环境 jdk == 1.8 知识点 socket 的连接处理 IO 输入.输出流的处理 请求数据格式处理 请求模型优化 场景 今天,和大家聊一下 JAVA 中的 socket 通信问题.这里采用最简单的一请求一响应模型为例,假设我们现在需要向 baidu 站点进行通信.我们用 JAVA 原生的 socket 该如何实现. 建立 socket 连接 首先,我们需要建立 socket 连接(核心代码) impor

  • PHP反射机制原理与用法详解

    本文实例讲述了PHP反射机制原理与用法.分享给大家供大家参考,具体如下: 反射 面向对象编程中对象被赋予了自省的能力,而这个自省的过程就是反射. 反射,直观理解就是根据到达地找到出发地和来源.比如,一个光秃秃的对象,我们可以仅仅通过这个对象就能知道它所属的类.拥有哪些方法. 反射是指在PHP运行状态中,扩展分析PHP程序,导出或提出关于类.方法.属性.参数等的详细信息,包括注释.这种动态获取信息以及动态调用对象方法的功能称为反射API. 如何使用反射API <?php class person{

  • 理解zookeeper选举机制

    zookeeper集群 配置多个实例共同构成一个集群对外提供服务以达到水平扩展的目的,每个服务器上的数据是相同的,每一个服务器均可以对外提供读和写的服务,这点和redis是相同的,即对客户端来讲每个服务器都是平等的. 这篇主要分析leader的选择机制,zookeeper提供了三种方式: LeaderElection AuthFastLeaderElection FastLeaderElection 默认的算法是FastLeaderElection,所以这篇主要分析它的选举机制. 选择机制中的概

  • PHP面向对象自动加载机制原理与用法分析

    本文实例讲述了PHP面向对象自动加载机制原理与用法.分享给大家供大家参考,具体如下: 在学习PHP的面向对象的时候,会知道很多"语法糖",也就是魔术方法.有一个加自动加载的魔术方法,叫:__autoload(); 先看一段代码 <?php function __autoload($classname) { $filename = "./". $classname .".php"; include_once($filename); } new

  • javascript垃圾收集机制的原理分析

    前面的话 javascript具有自动垃圾收集机制,执行环境会负责管理代码执行过程中使用的内存.在编写javascript程序时,开发人员不用再关心内存使用问题,所需内存的分配以及无用内存的回收完全实现了自动管理.下面将详细介绍javascript的垃圾收集机制 原理 垃圾收集机制的原理很简单:找出那些不再继续使用的变量,然后释放其占用的内存,垃圾收集器会按照固定的时间间隔,或代码执行中预定的收集时间,周期性地执行这一操作 局部变量只在函数执行的过程中存在.而在这个过程中,会为局部变量在栈(或堆

  • Android6.0 消息机制原理解析

    消息都是存放在一个消息队列中去,而消息循环线程就是围绕这个消息队列进入一个无限循环的,直到线程退出.如果队列中有消息,消息循环线程就会把它取出来,并分发给相应的Handler进行处理:如果队列中没有消息,消息循环线程就会进入空闲等待状态,等待下一个消息的到来.在编写Android应用程序时,当程序执行的任务比较繁重时,为了不阻塞UI主线程而导致ANR的发生,我们通常的做法的创建一个子线程来完成特定的任务.在创建子线程时,有两种选择,一种通过创建Thread对象来创建一个无消息循环的子线程:还有一

随机推荐