elasticsearch集群发现zendiscovery的Ping机制分析

目录
  • zenDiscovery实现机制
    • 广播的过程
    • nodeping处理代码
    • ping请求的发送策略
  • 总结

zenDiscovery实现机制

ping是集群发现的基本手段,通过在网络上广播或者指定ping某些节点获取集群信息,从而可以找到集群的master加入集群。zenDiscovery实现了两种ping机制:广播与单播。本篇将详细分析一些这MulticastZenPing机制的实现为后面的集群发现和master选举做好铺垫。

广播的过程

首先看一下广播(MulticastZenPing),广播的原理很简单,节点启动后向网络发送广播信息,任何收到的节点只要集群名字相同都应该对此广播信息作出回应。这样该节点就获取了集群的相关信息。它定义了一个action:"internal:discovery/zen/multicast"和广播的信息头:INTERNAL_HEADER 。之前说过NettyTransport是cluster通信的基础,但是广播却没有使它。它使用了java的MulticastSocket。这里简单的介绍一下MulticastSocket的使用。它是一个UDP 机制的socket,用来进行多个数据包的广播。它可以帮到一个ip形成一个group,任何MulticastSocket都可以join进来,组内的socket发送的信息会被订阅了改组的所有机器接收到。elasticsearch对其进行了封装形成了MulticastChannel,有兴趣可以参考相关源码。

首先看一下MulticastZenPing的几个辅助内部类:

它总共定义了4个内部类,这些内部类和它一起完成广播功能。FinalizingPingCollection是一pingresponse的容器,所有的响应都用它来存储。MulticastPingResponseRequestHandler它是response处理类,类似于之前所说的nettytransportHandler,它虽然使用的不是netty,但是它也定义了一个messageReceived的方法,当收到请求时直接返回一个response。

MulticastPingResponse就不用细说了,它就是一个响应类。最后要着重说一下Receiver类,因为广播并不是使用NettyTransport,因此对于消息处理逻辑都在Receiver中。在初始化MulticastZenPing时会将receiver注册进去。

protected void doStart() throws ElasticsearchException {
        try {
            ....
            multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
                    new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
                    new Receiver());//将receiver注册到channel中
        } catch (Throwable t) {
          ....
        }
    }

Receiver类基础了Listener,实现了3个方法,消息经过onMessage方法区分,如果是内部ping则使用handleNodePingRequest方法处理,否则使用handleExternalPingRequest处理,区分方法很简单,就是读取信息都看它是否符合所定义的INTERNAL_HEADER 信息头。

nodeping处理代码

private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
           ....
            final DiscoveryNodes discoveryNodes = contextProvider.nodes();
            final DiscoveryNode requestingNode = requestingNodeX;
            if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
                // 自身发出的ping,忽略
                return;
            }
        //只接受本集群ping
            if (!requestClusterName.equals(clusterName)) {
            ...return;
            }
            // 两个client间不需要ping
            if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return;
            }
        //新建一个response
            final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
            multicastPingResponse.id = id;
            multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
        //无法连接的情况
            if (!transportService.nodeConnected(requestingNode)) {
                // do the connect and send on a thread pool
                threadPool.generic().execute(new Runnable() {
                    @Override
                    public void run() {
                        // connect to the node if possible
                        try {
                            transportService.connectToNode(requestingNode);
                            transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                                @Override
                                public void handleException(TransportException exp) {
                                    logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                                }
                            });
                        } catch (Exception e) {
                            if (lifecycle.started()) {
                                logger.warn("failed to connect to requesting node {}", e, requestingNode);
                            }
                        }
                    }
                });
            } else {
                transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                    @Override
                    public void handleException(TransportException exp) {
                        if (lifecycle.started()) {
                            logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                        }
                    }
                });
            }
        }
    }

另外的一个方法是处理外部ping信息,处理过程是返回cluster的信息(这种外部ping的具体作用没有研究不是太清楚)。以上是响应MulticastZenPing的过程,收到其它节点的响应信息后它会把本节点及集群的master节点相关信息返回给广播节点。这样广播节点就获知了集群的相关信息。在MulticastZenPing类中还有一个类 MulticastPingResponseRequestHandler,它的作用是广播节点对其它节点对广播信息响应的回应,广播节点的第二次发送信息的过程。它跟其它TransportRequestHandler一样它有messageReceived方法,在启动时注册到transportserver中,只处理一类action:"internal:discovery/zen/multicast"。

ping请求的发送策略

代码如下:

public void ping(final PingListener listener, final TimeValue timeout) {
       ....
    //产生一个id
        final int id = pingIdGenerator.incrementAndGet();
        try {
            receivedResponses.put(id, new PingCollection());
            sendPingRequest(id);//第一次发送ping请求
            // 等待时间的1/2后再次发送一个请求
            threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                @Override
                public void onFailure(Throwable t) {
                    logger.warn("[{}] failed to send second ping request", t, id);
                    finalizePingCycle(id, listener);
                }
                @Override
                public void doRun() {
                    sendPingRequest(id);
            //再过1/2时间再次发送一个请求
                    threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                        @Override
                        public void onFailure(Throwable t) {
                            logger.warn("[{}] failed to send third ping request", t, id);
                            finalizePingCycle(id, listener);
                        }
                        @Override
                        public void doRun() {
                            // make one last ping, but finalize as soon as all nodes have responded or a timeout has past
                            PingCollection collection = receivedResponses.get(id);
                            FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
                            receivedResponses.put(id, finalizingPingCollection);
                            logger.trace("[{}] sending last pings", id);
                            sendPingRequest(id);
                //最后一次发送请求,超时的1/4后
                            threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                                @Override
                                public void onFailure(Throwable t) {
                                    logger.warn("[{}] failed to finalize ping", t, id);
                                }
                                @Override
                                protected void doRun() throws Exception {
                                    finalizePingCycle(id, listener);
                                }
                            });
                        }
                    });
                }
            });
        } catch (Exception e) {
            logger.warn("failed to ping", e);
            finalizePingCycle(id, listener);
        }
    }

发送过程主要是调用sendPingRequest(id)方法,在该方法中会将id,信息头,版本,本地节点信息一起写入到BytesStreamOutput中然后将其进行广播,这个广播信息会被其它机器上的Receiver接收并处理,并且响应该ping请求。另外一个需要关注的是以上加说明的部分,它通过链时的定期发送请求,在等待时间内可能会发出4次请求,这种发送方式会造成大量的ping请求重复,幸好ping的资源消耗小,但是好处是可以尽可能保证在timeout这个时间段内集群的新增节点都能收到这个ping信息。在单播中也采用了该策略。

总结

广播的过程:广播使用的是jdk的MulticastSocket,在timeout时间内4次发生ping请求,ping请求包括一个id,信息头,本地节点的一些信息;这些信息在其它节点中被接收到交给Receiver处理,Receiver会将集群的master和本机的相关信息通过transport返回给广播节点。广播节点收到这些信息后会理解使用transport返回一个空的response。至此一个广播过程完成。

在节点分布在多个网段时,广播就失效了,因为广播信息不可达。这个时间就需要使用单播去ping指定的节点获取cluster的相关信息。这就是单播的用处。单播使用的是NettyTransport,它会使用跟广播一样的链式请求向指定的节点发送请求。信息的处理方式是之前所介绍的NettyTransport标准的信息处理过程。

以上就是elasticsearch集群发现zendiscovery的Ping机制分析的详细内容,更多关于elasticsearch集群发现zendiscovery Ping的资料请关注我们其它相关文章!

(0)

相关推荐

  • elasticsearch构造Client实现java客户端调用接口示例分析

    目录 client的继承关系 方法实现上 以index方法为例 execute方法代码 总结: elasticsearch通过构造一个client对外提供了一套丰富的java调用接口.总体来说client分为两类cluster信息方面的client及数据(index)方面的client.这两个大类由可以分为通用操作和admin操作两类. client的继承关系 (1.5版本,其它版本可能不一样): 通过这个继承关系图可以很清楚的了解client的实现,及功能.总共有三类即client, indi

  • elasticsearch源码分析index action实现方式

    目录 action的作用 TransportAction的类图 OperationTransportHandler的代码 primary操作的方法 总结 action的作用 上一篇从结构上分析了action的,本篇将以index action为例仔分析一下action的实现方式. 再概括一下action的作用:对于每种功能(如index)action都会包括两个基本的类*action(IndexAction)和Transport*action(TransportIndexAction),前者类中

  • elasticsearch java客户端action的实现简单分析

    上一篇介绍了elasticsearch的client结构,client只是一个门面,在每个方法后面都有一个action来承接相应的功能.但是action也并非是真正的功能实现者,它只是一个代理,它的真正实现者是transportAction.本篇就对action及transportAction的实现做一个简单的分析, elasticsearch中的绝大部分操作都是通过相应的action,这些action在action包中.它的结构如下图所示: 上图是action包的部分截图,这里面对应着各个功能

  • elasticsearch集群cluster discovery可配式模块示例分析

    目录 前言 Discovery模块的概述 cluster节点探测 MasterFaultDetection的启动代码 master连接失败的逻辑 MasterPing的关键代码 前言 elasticsearch cluster实现了自己发现机制zen.Discovery功能主要包括以下几部分内容:master选举,master错误探测,集群中其它节点探测,单播多播ping.本篇会首先概述以下Discovery这一部分的功能,然后介绍节点检测.其它内容会在接下来介绍. Discovery模块的概述

  • elasticsearch节点间通信的基础transport启动过程

    目录 前言 transport 启动serverBootStrap 如何连接到node 连接方法的代码 总结 前言 在前一篇中我们分析了cluster的一些元素.接下来的章节会对cluster的运作机制做详细分析.本节先分析一些transport,它是cluster间通信的基础.它有两种实现,一种是基于netty实现nettytransport,主要用于节点间的通信.另一种是localtransport,主要是用于同一个jvm上的节点通信.因为是同一个jvm上的网络模拟,localtranspo

  • elasticsearch节点的transport请求发送处理分析

    目录 transport请求的发送和处理过程 request的发送过程 request的接受过程 request和response是如何被处理 request的处理 response的处理过程 最后总结 transport请求的发送和处理过程 前一篇分析对nettytransport的启动及连接,本篇主要分析transport请求的发送和处理过程. cluster中各个节点之间需要相互发送很多信息,如master检测其它节点是否存在,node节点定期检测master节点是否存储,cluster状

  • elasticsearch集群发现zendiscovery的Ping机制分析

    目录 zenDiscovery实现机制 广播的过程 nodeping处理代码 ping请求的发送策略 总结 zenDiscovery实现机制 ping是集群发现的基本手段,通过在网络上广播或者指定ping某些节点获取集群信息,从而可以找到集群的master加入集群.zenDiscovery实现了两种ping机制:广播与单播.本篇将详细分析一些这MulticastZenPing机制的实现为后面的集群发现和master选举做好铺垫. 广播的过程 首先看一下广播(MulticastZenPing),广

  • 使用docker快速部署Elasticsearch集群的方法

    本文将使用Docker容器(使用docker-compose编排)快速部署Elasticsearch 集群,可用于开发环境(单机多实例)或生产环境部署. 注意,6.x版本已经不能通过 -Epath.config 参数去指定配置文件的加载位置,文档说明: For the archive distributions, the config directory location defaults to $ES_HOME/config. The location of the >config direc

  • elasticsearch集群cluster示例详解

    目录 前言 节点DiscoveryNode 集群阻塞 clusterService接口 总结 前言 上一篇通过clusterservice对cluster做了一个简单的概述, 应该能够给大家一个初步认识.本篇将对cluster的代码组成进行详细分析,力求能够对cluster做一个更清晰的描述.cluster作为多个节点的协同工作机制,它需要节点,节点间通信,各个节点的状态及各个节点上的数据(index)状态.因此这一部分代码包括了上述的几个部分. 节点DiscoveryNode 首先是节点(Di

  • java连接ElasticSearch集群操作

    我就废话不多说了,大家还是直接看代码吧~ /* *es配置类 * */ @Configuration public class ElasticSearchDataSourceConfigurer { private static final Logger LOG = LogManager.getLogger(ElasticSearchDataSourceConfigurer.class); @Bean public TransportClient getESClient() { //设置集群名称

  • 关于Java中配置ElasticSearch集群环境账号密码的问题

    1.修改主站点的elasticsearch.yml添加一下行: xpack.security.enabled: true 2.生成安全秘钥 切到ES安装目录,执行bin/elasticsearch-certutil ca -out config/elastic-certificates.p12 -pass “” 会在/home/elasticsearch-7.9.3/config目录生成elastic-certificates.p12 3.继续修改ES yml文件 添加以下四行: xpack.s

  • elasticsearch集群cluster主要功能详细分析

    在源码概述中我们分析过,elasticsearch源码从功能上可以分为分布式功能和数据功能,接下来这几篇会就分布式功能展开.这里首先会对cluster作简单概述,然后对cluster所涉及的主要功能详细分析. elasticsearch的集群功能代码在cluster包中,通过ClusterService接口对外暴露. cluster主要包括以下功能: 发现(Discovery),路由(routing),传送功能(transport),集群状态(clusterstates)等. 发现功能功能主要用

  • Redis集群节点通信过程/原理流程分析

    目录 简介 通信流程 Gossip消息 消息流程 消息格式 节点选择 1.选择发送消息的节点数量 2.消息数据量 其他网址 简介 本文介绍Redis的Cluster(集群)的节点通信的流程. 通信流程 在分布式存储中需要提供维护节点元数据信息的机制, 所谓元数据是指: 节点负责哪些数据, 是否出现故障等状态信息. 常见的元数据维护方式分为: 集中式和P2P方式. Redis集群采用P2P的Gossip(流言) 协议,Gossip协议工作原理就是节点彼此不断通信交换信息, 一段时间后所有的节点都会

  • docker安装ElasticSearch:7.8.0集群的详细教程

    ElasticSearch集群支持动态请求的方式搭建集群和静态配置文件搭建集群 关于集群的动态连接方式官方的文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-update-settings.html 前置准备工作 关于参数的官网说明: https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery-

  • Elasticsearches的集群搭建及数据分片过程详解

    目录 Elasticsearch高级之集群搭建,数据分片 广播方式 单播方式 选取主节点 什么是脑裂 错误识别 Elasticsearch高级之集群搭建,数据分片 es使用两种不同的方式来发现对方: 广播 单播 也可以同时使用两者,但默认的广播,单播需要已知节点列表来完成 广播方式 当es实例启动的时候,它发送了广播的ping请求到地址224.2.2.4:54328.而其他的es实例使用同样的集群名称响应了这个请求. 一般这个默认的集群名称就是上面的cluster_name对应的elastics

随机推荐