elasticsearch集群cluster discovery可配式模块示例分析

目录
  • 前言
  • Discovery模块的概述
  • cluster节点探测
  • MasterFaultDetection的启动代码
    • master连接失败的逻辑
  • MasterPing的关键代码

前言

elasticsearch cluster实现了自己发现机制zen。Discovery功能主要包括以下几部分内容:master选举,master错误探测,集群中其它节点探测,单播多播ping。本篇会首先概述以下Discovery这一部分的功能,然后介绍节点检测。其它内容会在接下来介绍。

Discovery模块的概述

discovery是可配式模块,官方支持亚马逊的Azure discovery,Google Compute Engine,EC2 Discovery三种发现机制,根据插件规则完全可以自己实现其它的发现机制。整个模块通过实现guice的DiscoveryModule对外提供模块的注册和启动, 默认使用zen discovery。发现模块对外接口为DiscoveryService,它的方法如下所示:

它本质上是discovery的一个代理,所有的功能最终都是由所绑定的discovery所实现的。节点启动时通过DiscoveryModule获取DiscoveryService,然后启动DiscoveryService,DiscoveryService启动绑定的Discovery,整个功能模块就完成了加载和启动。这也是elasticsearch所有模块的实现方式,通过module对外提供绑定和获取,通过service接口对外提供模块的功能,在后面的分析中会经常遇到。

cluster节点探测

接下来分析cluster的一个重要功能就是节点探测。cluster中不能没有master节点,因此集群中所有节点都要周期探测master节点,一旦无法检测到,将会进行master选举。同时作为master,对于节点变动也要时刻关注,因此它需要周期性探测集群中所有节点,确保及时剔除已经宕机的节点。这种相互间的心跳检测就是cluster的faultdetection。下图是faultdetection的继承关系:

有两种实现方式,分别是master探测集群中其它节点和其它节点对master节点的探测。

FaultDetection只要一个抽象方法handleTransportDisconnect,该方法在内部类FDConnectionListener中被调用。在elasticsearch中大量使用了listener的异步方式,异步可以极大提升系统性能。它的代码如下所示:

private class FDConnectionListener implements TransportConnectionListener {
        @Override
        public void onNodeConnected(DiscoveryNode node) {
        }
        @Override
        public void onNodeDisconnected(DiscoveryNode node) {
            handleTransportDisconnect(node);
        }
    }

faultdetection启动时会注册相应的FDConnetionListener,当探测到节点丢失,会通过onNodeDisconnected方法回调对于的handleTransportDisconnect进行处理。

MasterFaultDetection的启动代码

private void innerStart(final DiscoveryNode masterNode) {
        this.masterNode = masterNode;
        this.retryCount = 0;
        this.notifiedMasterFailure.set(false);
        // 尝试连接master节点
        try {
            transportService.connectToNode(masterNode);
        } catch (final Exception e) {
            // 连接失败通知masterNode失败
            notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]");
            return;
        }
    //关闭之前的masterping,重启新的masterping
        if (masterPinger != null) {
            masterPinger.stop();
        }
        this.masterPinger = new MasterPinger();
        // 周期之后启动masterPing,这里并没有周期启动masterPing,只是设定了延迟时间。
        threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
    }

代码有有详细注释,就不再过多解释。

master连接失败的逻辑

代码如下:

private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
        if (notifiedMasterFailure.compareAndSet(false, true)) {
            threadPool.generic().execute(new Runnable() {
                @Override
                public void run() {
            //通知所有listener master丢失
                    for (Listener listener : listeners) {
                        listener.onMasterFailure(masterNode, reason);
                    }
                }
            });
            stop("master failure, " + reason);
        }
    }

在ZenDiscovery中实现了listener.onMasterFailure接口。会进行master丢失的相关处理,在后面再分析。

MasterPing的关键代码

private class MasterPinger implements Runnable {
        private volatile boolean running = true;
        public void stop() {
            this.running = false;
        }
        @Override
        public void run() {
            if (!running) {
                // return and don't spawn...
                return;
            }
            final DiscoveryNode masterToPing = masterNode;
   final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
            final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
            transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
                        @Override
                        public MasterPingResponseResponse newInstance() {
                            return new MasterPingResponseResponse();
                        }
                        @Override
                        public void handleResponse(MasterPingResponseResponse response) {
                            if (!running) {
                                return;
                            }
                            // reset the counter, we got a good result
                            MasterFaultDetection.this.retryCount = 0;
                            // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
                            if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                // 启动新的ping周期
                                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
                            }
                        }
                        @Override
                        public void handleException(TransportException exp) {
                            if (!running) {
                                return;
                            }
                            synchronized (masterNodeMutex) {
                                // check if the master node did not get switched on us...
                                if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                    if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                                        handleTransportDisconnect(masterToPing);
                                        return;
                                    } else if (exp.getCause() instanceof NoLongerMasterException) {
                                        logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
                                        notifyMasterFailure(masterToPing, "no longer master");
                                        return;
                                    } else if (exp.getCause() instanceof NotMasterException) {
                                        logger.debug("[master] pinging a master {} that is not the master", masterNode);
                                        notifyMasterFailure(masterToPing, "not master");
                                        return;
                                    } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
                                        logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode);
                                        notifyMasterFailure(masterToPing, "do not exists on master, act as master failure");
                                        return;
                                    }
                                    int retryCount = ++MasterFaultDetection.this.retryCount;
                                    logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount);
                                    if (retryCount >= pingRetryCount) {
                                        logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout);
                                        // not good, failure
                                        notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with  maximum [" + pingRetryTimeout + "] timeout");
                                    } else {
                                         // resend the request, not reschedule, rely on send timeout
                                        transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
                                    }
                                }
                            }
                        }
            );
        }
    }

MasterPing是一个线程,在innerStart的方法中没有设定周期启动masterping,但是masterping需要周期进行,这个秘密就在run 方法中,如果ping成功就会重启一个新的ping。这样既保证了ping线程的唯一性同时也保证了ping的顺序和间隔。

ping的方式跟之前一样是也是通过transport发送一个masterpingrequest,进行一个连接。节点收到该请求后,如果已不再是master会抛出NotMasterException,状态更新出差会抛出其它异常,异常会通过。否则会正常响应notifyMasterFailure方法处理跟启动逻辑一样。

对于网络问题导致的无响应情况,会调用handleTransportDisconnect(masterToPing)方法处理。masterfaultDetection对该方法的实现如下:

protected void handleTransportDisconnect(DiscoveryNode node) {
    //这里需要同步
        synchronized (masterNodeMutex) {
        //master 已经换成其它节点,就没必要再连接
            if (!node.equals(this.masterNode)) {
                return;
            }
            if (connectOnNetworkDisconnect) {
                try {
            //尝试再次连接
                    transportService.connectToNode(node);
                    // if all is well, make sure we restart the pinger
                    if (masterPinger != null) {
                        masterPinger.stop();
                    }
            //连接成功启动新的masterping
                    this.masterPinger = new MasterPinger();
                    // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
                    threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
                } catch (Exception e) {
            //连接出现异常,启动master节点丢失通知
                    logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
                    notifyMasterFailure(masterNode, "transport disconnected (with verified connect)");
                }
            } else {
          //不需要重连,通知master丢失。
                logger.trace("[master] [{}] transport disconnected", node);
                notifyMasterFailure(node, "transport disconnected");
            }
        }
    }

这就是masterfaultDetection的整个流程:启动中如果master丢失则通知节点丢失,否则在一定延迟(3s)后启动masterping,masterping线程尝试连接master节点,如果master节点网络失联,尝试再次连接。master节点收到masterpingrequest后首先看一下自己还是不是master,如果不是则抛出异常,否则正常回应。节点如果收到响应是异常则启动master丢失通知,否则此次ping结束。在一定延迟后启动新的masterping线程。

NodeFaultDetection的逻辑跟实现上跟MasterFualtDetetion相似,区别主要在于ping异常处理上。当某个节点出现异常或者没有响应时,会启动节点丢失机制,只是受到通知后的处理逻辑不通。就不再详细分析,有兴趣可以参考具体代码,希望大家以后多多支持我们!

(0)

相关推荐

  • elasticsearch构造Client实现java客户端调用接口示例分析

    目录 client的继承关系 方法实现上 以index方法为例 execute方法代码 总结: elasticsearch通过构造一个client对外提供了一套丰富的java调用接口.总体来说client分为两类cluster信息方面的client及数据(index)方面的client.这两个大类由可以分为通用操作和admin操作两类. client的继承关系 (1.5版本,其它版本可能不一样): 通过这个继承关系图可以很清楚的了解client的实现,及功能.总共有三类即client, indi

  • elasticsearch节点的transport请求发送处理分析

    目录 transport请求的发送和处理过程 request的发送过程 request的接受过程 request和response是如何被处理 request的处理 response的处理过程 最后总结 transport请求的发送和处理过程 前一篇分析对nettytransport的启动及连接,本篇主要分析transport请求的发送和处理过程. cluster中各个节点之间需要相互发送很多信息,如master检测其它节点是否存在,node节点定期检测master节点是否存储,cluster状

  • elasticsearch集群发现zendiscovery的Ping机制分析

    目录 zenDiscovery实现机制 广播的过程 nodeping处理代码 ping请求的发送策略 总结 zenDiscovery实现机制 ping是集群发现的基本手段,通过在网络上广播或者指定ping某些节点获取集群信息,从而可以找到集群的master加入集群.zenDiscovery实现了两种ping机制:广播与单播.本篇将详细分析一些这MulticastZenPing机制的实现为后面的集群发现和master选举做好铺垫. 广播的过程 首先看一下广播(MulticastZenPing),广

  • elasticsearch节点间通信的基础transport启动过程

    目录 前言 transport 启动serverBootStrap 如何连接到node 连接方法的代码 总结 前言 在前一篇中我们分析了cluster的一些元素.接下来的章节会对cluster的运作机制做详细分析.本节先分析一些transport,它是cluster间通信的基础.它有两种实现,一种是基于netty实现nettytransport,主要用于节点间的通信.另一种是localtransport,主要是用于同一个jvm上的节点通信.因为是同一个jvm上的网络模拟,localtranspo

  • elasticsearch java客户端action的实现简单分析

    上一篇介绍了elasticsearch的client结构,client只是一个门面,在每个方法后面都有一个action来承接相应的功能.但是action也并非是真正的功能实现者,它只是一个代理,它的真正实现者是transportAction.本篇就对action及transportAction的实现做一个简单的分析, elasticsearch中的绝大部分操作都是通过相应的action,这些action在action包中.它的结构如下图所示: 上图是action包的部分截图,这里面对应着各个功能

  • elasticsearch源码分析index action实现方式

    目录 action的作用 TransportAction的类图 OperationTransportHandler的代码 primary操作的方法 总结 action的作用 上一篇从结构上分析了action的,本篇将以index action为例仔分析一下action的实现方式. 再概括一下action的作用:对于每种功能(如index)action都会包括两个基本的类*action(IndexAction)和Transport*action(TransportIndexAction),前者类中

  • elasticsearch集群cluster discovery可配式模块示例分析

    目录 前言 Discovery模块的概述 cluster节点探测 MasterFaultDetection的启动代码 master连接失败的逻辑 MasterPing的关键代码 前言 elasticsearch cluster实现了自己发现机制zen.Discovery功能主要包括以下几部分内容:master选举,master错误探测,集群中其它节点探测,单播多播ping.本篇会首先概述以下Discovery这一部分的功能,然后介绍节点检测.其它内容会在接下来介绍. Discovery模块的概述

  • elasticsearch集群cluster主要功能详细分析

    在源码概述中我们分析过,elasticsearch源码从功能上可以分为分布式功能和数据功能,接下来这几篇会就分布式功能展开.这里首先会对cluster作简单概述,然后对cluster所涉及的主要功能详细分析. elasticsearch的集群功能代码在cluster包中,通过ClusterService接口对外暴露. cluster主要包括以下功能: 发现(Discovery),路由(routing),传送功能(transport),集群状态(clusterstates)等. 发现功能功能主要用

  • elasticsearch集群cluster示例详解

    目录 前言 节点DiscoveryNode 集群阻塞 clusterService接口 总结 前言 上一篇通过clusterservice对cluster做了一个简单的概述, 应该能够给大家一个初步认识.本篇将对cluster的代码组成进行详细分析,力求能够对cluster做一个更清晰的描述.cluster作为多个节点的协同工作机制,它需要节点,节点间通信,各个节点的状态及各个节点上的数据(index)状态.因此这一部分代码包括了上述的几个部分. 节点DiscoveryNode 首先是节点(Di

  • 使用docker快速部署Elasticsearch集群的方法

    本文将使用Docker容器(使用docker-compose编排)快速部署Elasticsearch 集群,可用于开发环境(单机多实例)或生产环境部署. 注意,6.x版本已经不能通过 -Epath.config 参数去指定配置文件的加载位置,文档说明: For the archive distributions, the config directory location defaults to $ES_HOME/config. The location of the >config direc

  • java连接ElasticSearch集群操作

    我就废话不多说了,大家还是直接看代码吧~ /* *es配置类 * */ @Configuration public class ElasticSearchDataSourceConfigurer { private static final Logger LOG = LogManager.getLogger(ElasticSearchDataSourceConfigurer.class); @Bean public TransportClient getESClient() { //设置集群名称

  • 关于Java中配置ElasticSearch集群环境账号密码的问题

    1.修改主站点的elasticsearch.yml添加一下行: xpack.security.enabled: true 2.生成安全秘钥 切到ES安装目录,执行bin/elasticsearch-certutil ca -out config/elastic-certificates.p12 -pass “” 会在/home/elasticsearch-7.9.3/config目录生成elastic-certificates.p12 3.继续修改ES yml文件 添加以下四行: xpack.s

  • 使用Lvs+Nginx集群搭建高并发架构的实现示例

    目录 1. Lvs介绍 2. Lvs 负载均衡模式 2.1 NAT 2.2 TUN 2.3 DR模式 3. Lvs DR模式配置 3.1 Vip配置 3.2 LVS集群管理工具安装 3.3 地址解析协议 3.4 集群配置 高并发站点不仅要考虑网站后端服务的稳定,还需要考虑服务能否接入巨大流量.承受巨大流量,如下图: 1:流量接入,可以采用Lvs+Nginx集群,这种方式能接入的QPS能高达数百万 2:通过Lvs实现Nginx集群,Nginx+Tomcat实现后端服务集群,完成了从接入层流量处理到

  • Windows Server 2003 下配置 MySQL 集群(Cluster)教程

    MySQL 群集是 MySQL 适合于分布式计算环境的高可用.高冗余版本.它采用了 NDB Cluster 存储引擎,允许在 1 个群集中运行多个 MySQL 服务器.在 MySQL 5.0 及以上的二进制版本中,以及与最新的 Linux 版本兼容的 RPM 包中提供了该存储引擎. MySQL 群集是一种技术,该技术允许在无共享的系统中部署"内存中"和"磁盘中"数据库的 Cluster .通过无共享体系结构,系统能够使用廉价的硬件,而且对软硬件无特殊要求.此外,由于

  • ElasticSearch学习之多条件组合查询验证及示例分析

    目录 多条件组合查询 bool constant_score 查询验证 & 分析 验证 分析 排序 默认排序 自定义排序 tips 单字段排序 多字段 scroll分页 初始化快照 & 快照保存10分钟 根据快照ID滚动查询 多条件组合查询 bool es中使用bool来控制多条件查询,bool查询支持以下参数: must:被查询的数据必须满足当前条件 mush_not:被查询的数据必须不满足当前条件 should:被查询的数据应该满足当前条件.should查询被用于修正查询结果的评分.需

随机推荐