elasticsearch源码分析index action实现方式

目录
  • action的作用
  • TransportAction的类图
  • OperationTransportHandler的代码
  • primary操作的方法
  • 总结

action的作用

上一篇从结构上分析了action的,本篇将以index action为例仔分析一下action的实现方式。

再概括一下action的作用:对于每种功能(如index)action都会包括两个基本的类*action(IndexAction)和Transport*action(TransportIndexAction),前者类中会有一个实例(IndexAction INSTANCE = new IndexAction())这个实例用于client绑定对应的TransportAction(registerAction(IndexAction.INSTANCE, TransportIndexAction.class)),绑定过程发送在ActionModuel中。

另外在Action类中还会定义一个action的名字(String NAME = "indices:data/write/index")这个名字用于TransportService绑定对于的handle,用于处理NettyTransport接收到的信息。TransportAction的是最终的逻辑处理者,当接收到请求时,会首先判断本节点能否处理,如果能够处理则调用相关的方法处理得到结果返回,否则将通过NettyTransport转发该请求到对应的node进行处理。所有的Transport的结构都是这种类型。

TransportAction的类图

首先看一下TransportAction的类图,所的Transport*action都继承自于它。

它主要由两个方法execute和doExecute,execute方法有两种实现,第一种实现需要自行添加actionListener。最终的逻辑都在doExecute方法中,这个方法在各个功能模块中实现。以下是TransportIndexAction的继承关系:

实现上由于功能划分的原因,TransportIndexAction直接继承自TranspShardReplicationOperationAction,这个抽象类中的方法是所有需要操作shard副本的功能action的父,因此它的实现还包括delete,bulk等功能action。它实现了多个内部类,这些内部类用来辅助完成相关的功能。这里主要说一下OperationTransportHandler,ReplicaOperationTransportHandler及AsyncShardOperationAction三个子类。

OperationTransportHandler的代码

如下所示:

class OperationTransportHandler extends BaseTransportRequestHandler<Request> {
//继承自BaseTransportRequestHanlder
………………
        @Override
        public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
            // no need to have a threaded listener since we just send back a response
            request.listenerThreaded(false);
            // if we have a local operation, execute it on a thread since we don't spawn
            request.operationThreaded(true);
      //调用Transport的execute方法,通过channel返回结果
            execute(request, new ActionListener<Response>() {
                @Override
                public void onResponse(Response result) {
                    try {
                        channel.sendResponse(result);
                    } catch (Throwable e) {
                        onFailure(e);
                    }
                }
                @Override
                public void onFailure(Throwable e) {
                    try {
                        channel.sendResponse(e);
                    } catch (Throwable e1) {
                        logger.warn("Failed to send response for " + actionName, e1);
                    }
                }
            });
        }

看过NettyTransport请求发送和处理的同学一定对这个代码不陌生,这就是elasticsearch节点间处理信息的典型模式。当请求通过NettyTransport发送到本节点时会根据请求的action名称找到对应的handler,使用对应的handler来处理该请求。这个handler就对应着“indices:data/write/index”,可以看到它调用execute方法来处理。它的注册时在TransportShardReplicationOperationAction构造函数中完成的。

知道了OperationTransportHandler,ReplicaOperationTransportHandler就好理解了它的实现方式跟前者完全一样,对应的action名称加了一个“[r]”,它的作用是处理需要在副本上进行的操作,代码如下所示:

class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
……………………
        @Override
        public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
            try {
                shardOperationOnReplica(request);
            } catch (Throwable t) {
                failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t);
                throw t;
            }
            channel.sendResponse(TransportResponse.Empty.INSTANCE);
        }
    }

可以看到代码结构非常像,只是调用了副本操作的方法shardOperationOnReplica,这个方法在这TransportShardReplicationOperationAction中是抽象的,它的实现在各个子类中,例如deleteaction中实现了对于delete请求如何在副本上处理。

分析完这两个handle是不是对于action的处理过程有了一定的眉目了呢?但是这才是冰山一角,这两个Handler是用来接收来自其它节点的请求,如果请求的正好是本节点该如何处理呢?这些逻辑都在AsyncShardOperationAction类中。首先看一下它的内部结构:

因为TransportShardReplicationOperationAction的所有子类都是对索引的修改,会引起数据不一致,因此它的操作流程都是现在primaryShard上操作然后是Replicashard上操作。代码如下所示:

protected void doStart() throws ElasticsearchException {
            try {
          //检查是否有阻塞
                ClusterBlockException blockException = checkGlobalBlock(observer.observedState());
                if (blockException != null) {
                    if (blockException.retryable()) {
                        logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
                        retry(blockException);
                        return;
                    } else {
                        throw blockException;
                    }
                }
          //检测是否是创建索引
                if (resolveIndex()) {
                    internalRequest.concreteIndex(observer.observedState().metaData().concreteSingleIndex(internalRequest.request().index(), internalRequest.request().indicesOptions()));
                } else {
                    internalRequest.concreteIndex(internalRequest.request().index());
                }
                // check if we need to execute, and if not, return
                if (!resolveRequest(observer.observedState(), internalRequest, listener)) {
                    return;
                }
          //再次检测是否有阻塞
                blockException = checkRequestBlock(observer.observedState(), internalRequest);
                if (blockException != null) {
                    if (blockException.retryable()) {
                        logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
                        retry(blockException);
                        return;
                    } else {
                        throw blockException;
                    }
                }
                shardIt = shards(observer.observedState(), internalRequest);
            } catch (Throwable e) {
                listener.onFailure(e);
                return;
            }
        //查找primaryShard
            boolean foundPrimary = false;
            ShardRouting shardX;
            while ((shardX = shardIt.nextOrNull()) != null) {
                final ShardRouting shard = shardX;
                // we only deal with primary shardIt here...
                if (!shard.primary()) {
                    continue;
                }
                if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) {
                    logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId());
                    retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
                    return;
                }
                if (!primaryOperationStarted.compareAndSet(false, true)) {
                    return;
                }
                foundPrimary = true;
          //primaryShard就在本地,直接进行相关操作
                if (shard.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
                    try {
                        if (internalRequest.request().operationThreaded()) {
                            internalRequest.request().beforeLocalFork();
                            threadPool.executor(executor).execute(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        performOnPrimary(shard.id(), shard);
                                    } catch (Throwable t) {
                                        listener.onFailure(t);
                                    }
                                }
                            });
                        } else {
                            performOnPrimary(shard.id(), shard);
                        }
                    } catch (Throwable t) {
                        listener.onFailure(t);
                    }
                } else {//primaryShard在其它节点上,将请求通过truansport发送到对应的节点。
                    DiscoveryNode node = observer.observedState().nodes().get(shard.currentNodeId());
                    transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() {
                        @Override
                        public Response newInstance() {
                            return newResponseInstance();
                        }
                        @Override
                        public String executor() {
                            return ThreadPool.Names.SAME;
                        }
                        @Override
                        public void handleResponse(Response response) {
                            listener.onResponse(response);
                        }
                        @Override
                        public void handleException(TransportException exp) {
                            // if we got disconnected from the node, or the node / shard is not in the right state (being closed)
                            if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
                                    retryPrimaryException(exp)) {
                                primaryOperationStarted.set(false);
                                internalRequest.request().setCanHaveDuplicates();
                                // we already marked it as started when we executed it (removed the listener) so pass false
                                // to re-add to the cluster listener
                                logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
                                retry(exp);
                            } else {
                                listener.onFailure(exp);
                            }
                        }
                    });
                }
                break;
            }
            ………………
        }

这就是对应请求的处理过程。

primary操作的方法

void performOnPrimary(int primaryShardId, final ShardRouting shard) {
           ……
                PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request()));
                performReplicas(response);
            …………
        }

以上就是performOnPrimary方法的部分代码,首先调用外部类的shardOperationOnPrimary方法,该方法实现在各个子类中,在TransportIndexAction中的实现如下所示:

@Override
    protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
        final IndexRequest request = shardRequest.request;
        // 查看是否需要routing
        IndexMetaData indexMetaData = clusterState.metaData().index(shardRequest.shardId.getIndex());
        MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
        if (mappingMd != null && mappingMd.routing().required()) {
            if (request.routing() == null) {
                throw new RoutingMissingException(shardRequest.shardId.getIndex(), request.type(), request.id());
            }
        }
      //调用indexserice执行对应的index操作
        IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
        IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
        SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
                .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
        long version;
        boolean created;
        try {
            Engine.IndexingOperation op;
            if (request.opType() == IndexRequest.OpType.INDEX) {
                Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
                if (index.parsedDoc().mappingsModified()) {
                    mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());
                }
                indexShard.index(index);
                version = index.version();
                op = index;
                created = index.created();
            } else {
                Engine.Create create = indexShard.prepareCreate(sourceToParse,
                        request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
                if (create.parsedDoc().mappingsModified()) {
                    mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());
                }
                indexShard.create(create);
                version = create.version();
                op = create;
                created = true;
            }
            if (request.refresh()) {
                try {
                    indexShard.refresh("refresh_flag_index");
                } catch (Throwable e) {
                    // ignore
                }
            }
            // update the version on the request, so it will be used for the replicas
            request.version(version);
            request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
            assert request.versionType().validateVersionForWrites(request.version());
            IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created);
            return new PrimaryResponse<>(shardRequest.request, response, op);
        } catch (WriteFailureException e) {
            if (e.getMappingTypeToUpdate() != null) {
                DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate());
                if (docMapper != null) {
                    mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());
                }
            }
            throw e.getCause();
        }
    }

上面的代码就是index的执行过程,这一过程涉及到index的底层操作,这里就不展开,只是说明它在action中是如何实现的,后面会有详细说明。接下来看在副本上的操作。副本可能有多个,因此首先调用了performReplicas方法,在这个方法中首先开始监听集群的状态,然后便利所有的副本进行处理,如果是异步则加入一个listener,否则同步执行返回结果。最后调用performReplica,在该方法中调用外部类的抽象方法shardOperationOnReplica。 这一过程比较简单,这里就不再贴代码,有兴趣可以参考相关源码。

总结

这里以TransportIndexAction为例分析了tansportaction的结构层次。它在TransportAction直接还有一层那就是TransportShardReplicationOperationAction,这个类是actionsupport包中的一个,这个包把所有的子操作方法做了进一步的抽象,抽象出几个大类放到了这里,所有其它子功能很多都继承自这。这个包会在后面有详细分析。

以上就是elasticsearch源码分析index action实现方式的详细内容,更多关于elasticsearch源码分析index action的资料请关注我们其它相关文章!

(0)

相关推荐

  • elasticsearch节点间通信的基础transport启动过程

    目录 前言 transport 启动serverBootStrap 如何连接到node 连接方法的代码 总结 前言 在前一篇中我们分析了cluster的一些元素.接下来的章节会对cluster的运作机制做详细分析.本节先分析一些transport,它是cluster间通信的基础.它有两种实现,一种是基于netty实现nettytransport,主要用于节点间的通信.另一种是localtransport,主要是用于同一个jvm上的节点通信.因为是同一个jvm上的网络模拟,localtranspo

  • elasticsearch构造Client实现java客户端调用接口示例分析

    目录 client的继承关系 方法实现上 以index方法为例 execute方法代码 总结: elasticsearch通过构造一个client对外提供了一套丰富的java调用接口.总体来说client分为两类cluster信息方面的client及数据(index)方面的client.这两个大类由可以分为通用操作和admin操作两类. client的继承关系 (1.5版本,其它版本可能不一样): 通过这个继承关系图可以很清楚的了解client的实现,及功能.总共有三类即client, indi

  • elasticsearch集群cluster discovery可配式模块示例分析

    目录 前言 Discovery模块的概述 cluster节点探测 MasterFaultDetection的启动代码 master连接失败的逻辑 MasterPing的关键代码 前言 elasticsearch cluster实现了自己发现机制zen.Discovery功能主要包括以下几部分内容:master选举,master错误探测,集群中其它节点探测,单播多播ping.本篇会首先概述以下Discovery这一部分的功能,然后介绍节点检测.其它内容会在接下来介绍. Discovery模块的概述

  • elasticsearch集群发现zendiscovery的Ping机制分析

    目录 zenDiscovery实现机制 广播的过程 nodeping处理代码 ping请求的发送策略 总结 zenDiscovery实现机制 ping是集群发现的基本手段,通过在网络上广播或者指定ping某些节点获取集群信息,从而可以找到集群的master加入集群.zenDiscovery实现了两种ping机制:广播与单播.本篇将详细分析一些这MulticastZenPing机制的实现为后面的集群发现和master选举做好铺垫. 广播的过程 首先看一下广播(MulticastZenPing),广

  • elasticsearch节点的transport请求发送处理分析

    目录 transport请求的发送和处理过程 request的发送过程 request的接受过程 request和response是如何被处理 request的处理 response的处理过程 最后总结 transport请求的发送和处理过程 前一篇分析对nettytransport的启动及连接,本篇主要分析transport请求的发送和处理过程. cluster中各个节点之间需要相互发送很多信息,如master检测其它节点是否存在,node节点定期检测master节点是否存储,cluster状

  • elasticsearch java客户端action的实现简单分析

    上一篇介绍了elasticsearch的client结构,client只是一个门面,在每个方法后面都有一个action来承接相应的功能.但是action也并非是真正的功能实现者,它只是一个代理,它的真正实现者是transportAction.本篇就对action及transportAction的实现做一个简单的分析, elasticsearch中的绝大部分操作都是通过相应的action,这些action在action包中.它的结构如下图所示: 上图是action包的部分截图,这里面对应着各个功能

  • elasticsearch源码分析index action实现方式

    目录 action的作用 TransportAction的类图 OperationTransportHandler的代码 primary操作的方法 总结 action的作用 上一篇从结构上分析了action的,本篇将以index action为例仔分析一下action的实现方式. 再概括一下action的作用:对于每种功能(如index)action都会包括两个基本的类*action(IndexAction)和Transport*action(TransportIndexAction),前者类中

  • elasticsearch元数据构建metadata及routing类源码分析

    目录 metadata部分 元数据部分主要包括 shardRouting,继承关系 总结 metadata部分 虽然在刚开始源码概述时把代码分为分布式和数据两部分,但是它们的界限并不明显.之前这几篇可以说是这两部分的衔接.我们在快速接近数据(index)部分.本篇分析一下之前分析cluster遗留下的问题:Metadata与routing,虽然这两部分的代码在cluster中,但是却直接和index相关. metadata部分主要是和索引相关的一些元数据构建和操作. 元数据部分主要包括 别名元数

  • Java编程中ArrayList源码分析

    之前看过一句话,说的特别好.有人问阅读源码有什么用?学习别人实现某个功能的设计思路,提高自己的编程水平. 是的,大家都实现一个功能,不同的人有不同的设计思路,有的人用一万行代码,有的人用五千行.有的人代码运行需要的几十秒,有的人只需要的几秒..下面进入正题了. 本文的主要内容: · 详细注释了ArrayList的实现,基于JDK 1.8 . ·迭代器SubList部分未详细解释,会放到其他源码解读里面.此处重点关注ArrayList本身实现. ·没有采用标准的注释,并适当调整了代码的缩进以方便介

  • Postgres中UPDATE更新语句源码分析

    目录 PG中UPDATE源码分析 整体流程分析 解析部分——生成语法解析树UpdateStmt 解析部分——生成查询树Query 优化器——生成执行计划 执行器 事务 总结 PG中UPDATE源码分析 本文主要描述SQL中UPDATE语句的源码分析,代码为PG13.3版本. 整体流程分析 以update dtea set id = 1;这条最简单的Update语句进行源码分析(dtea不是分区表,不考虑并行等,没有建立任何索引),帮助我们理解update的大致流程. SQL流程如下: parse

  • jQuery源码分析-03构造jQuery对象-工具函数

    作者:nuysoft/高云 QQ:47214707 EMail:nuysoft@gmail.com 声明:本文为原创文章,如需转载,请注明来源并保留原文链接. 读读写写,不对的地方请告诉我,多多交流共同进步,本章的的PDF等本章写完了发布. jQuery源码分析系列的目录请查看 http://nuysoft.iteye.com/blog/1177451,想系统的好好写写,目前还是从我感兴趣的部分开始,如果大家有对哪个模块感兴趣的,建议优先分析的,可以告诉我,一起学习. 3.4 其他静态工具函数

  • jQuery 1.9.1源码分析系列(十五)之动画处理

    首先需要有队列(queue)的基本知识.见上一章. 相关教程:jQuery下的动画处理总结: http://www.jb51.net/article/42000.htm jQuery 1.9.1源码分析系列(十五)动画处理之缓动动画核心Tween  :http://www.jb51.net/article/75821.htm a.动画入口jQuery.fn.animate函数执行流程详解 ------------------------------------------------------

  • Java BufferedWriter BufferedReader 源码分析

    一:BufferedWriter 1.类功能简介: BufferedWriter.缓存字符输出流.他的功能是为传入的底层字符输出流提供缓存功能.同样当使用底层字符输出流向目的地中写入字符或者字符数组时.每写入一次就要打开一次到目的地的连接.这样频繁的访问不断效率底下.也有可能会对存储介质造成一定的破坏.比如当我们向磁盘中不断的写入字节时.夸张一点.将一个非常大单位是G的字节数据写入到磁盘的指定文件中的.没写入一个字节就要打开一次到这个磁盘的通道.这个结果无疑是恐怖的.而当我们使用Buffered

  • Java String源码分析并介绍Sting 为什么不可变

    Java String源码分析 什么是不可变对象? 众所周知, 在Java中, String类是不可变的.那么到底什么是不可变的对象呢? 可以这样认为:如果一个对象,在它创建完成之后,不能再改变它的状态,那么这个对象就是不可变的.不能改变状态的意思是,不能改变对象内的成员变量,包括基本数据类型的值不能改变,引用类型的变量不能指向其他的对象,引用类型指向的对象的状态也不能改变. 区分对象和对象的引用 对于Java初学者, 对于String是不可变对象总是存有疑惑.看下面代码: String s =

  • Java并发系列之ConcurrentHashMap源码分析

    我们知道哈希表是一种非常高效的数据结构,设计优良的哈希函数可以使其上的增删改查操作达到O(1)级别.Java为我们提供了一个现成的哈希结构,那就是HashMap类,在前面的文章中我曾经介绍过HashMap类,知道它的所有方法都未进行同步,因此在多线程环境中是不安全的.为此,Java为我们提供了另外一个HashTable类,它对于多线程同步的处理非常简单粗暴,那就是在HashMap的基础上对其所有方法都使用synchronized关键字进行加锁.这种方法虽然简单,但导致了一个问题,那就是在同一时间

  • 详解Vue-Router源码分析路由实现原理

    深入Vue-Router源码分析路由实现原理 使用Vue开发SPA应用,离不开vue-router,那么vue和vue-router是如何协作运行的呢,下面从使用的角度,大白话帮大家一步步梳理下vue-router的整个实现流程. 到发文时使用的版本是: - vue (v2.5.0) - vue-router (v3.0.1) 一.vue-router 源码结构 github 地址:https://github.com/vuejs/vue-router components下是两个组件<rout

随机推荐