elasticsearch数据信息索引操作action support示例分析

目录
  • 抽象类分析
    • doExecute方法
    • performOperation代码
    • master的相关操作
  • 总结

抽象类分析

Action这一部分主要是数据(索引)的操作和部分集群信息操作。 所有的请求通过client转发到对应的action上然后再由对应的TransportAction来执行相关请求。如果请求能在本机上执行则在本机上执行,否则使用Transport进行转发到对应的节点。action support部分是对action的抽象,所有的具体action都继承了support action中的某个类。这里将对这些抽象类进行分析。

这一部分总共分为broadcast(广播),master,nodes,replication及single几个部分。broadcast主要针对一些无具体目标主机的操作,如查询index是否存在,所有继承这个类的action都具有这种类似的性质;nodes主要是对节点的操作,如热点线程查询(hotThread)查询节点上的繁忙线程;replication的子类主要是需要或可以在副本上进行的操作,如索引操作,数据不仅要发送到主shard还要发送到各个副本。single则主要是目标明确的单shard操作,如get操作,根据doc的id取doc,doc 的id能够确定它在哪个shard上,因此操作也在此shard上执行。

doExecute方法

这些support action的实现可以分为两类,第一类就是实现一个内部类作为异步操作器,子类执行doExecute时,初始化该操作器并启动。另外一种就是直接实现一个方法,子类doExecute方法调用该方法进行。TransportBroadcastOperationAction就属于前者,它实现了内部操作器AsyncBroadcastAction。TransportCountAction继承于它,它doExecute方法如下所示:

@Override
    protected void doExecute(CountRequest request, ActionListener<CountResponse> listener) {
        request.nowInMillis = System.currentTimeMillis();
        super.doExecute(request, listener);
    }

调用父类的doExecute方法,也就是TransportBroadcastOperationAction的方法,它的实现如下所示:

@Override
    protected void doExecute(Request request, ActionListener&lt;Response&gt; listener) {
        new AsyncBroadcastAction(request, listener).start();
    }

可以看到它初始化了AsyncBroadcastAction并启动。AsyncBroadcastAction只是确定了操作的流程,及操作完成如何返回response,并未涉及到具体的操作逻辑。因为这些逻辑都在每个子action中实现,不同的action需要进行不同的操作。如count需要count每个shard并且返回最后的总数值,而IndexExistAction则需要对比所有索引查看查询的索引是否存在。start方法的代码如下所示:

public void start() {
      //没有shards
            if (shardsIts.size() == 0) {
                // no shards
                try {
                    listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState));
                } catch (Throwable e) {
                    listener.onFailure(e);
                }
                return;
            }
            request.beforeStart();
            // count the local operations, and perform the non local ones
            int shardIndex = -1;
       //遍历对每个shards进行操作
            for (final ShardIterator shardIt : shardsIts) {
                shardIndex++;
                final ShardRouting shard = shardIt.nextOrNull();
                if (shard != null) {
                    performOperation(shardIt, shard, shardIndex);
                } else {
                    // really, no shards active in this group
                    onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
                }
            }
        }

start方法就是遍历所有shards,如果shard存在则执行performOperation方法,在这个方法中会区分该请求能否在本机上进行,能执行则调用shardOperation方法得到结果。这个方法在这是抽象的,每个子类都有实现。否则发送到对应的主机上。,如果shard为null则进行onOperation操作,遍历该shard的其它副本看能否找到可以操作的shard。

performOperation代码

如下所示:

protected void performOperation(final ShardIterator shardIt, final ShardRouting shard, final int shardIndex) {
            if (shard == null) {//shard 为null抛出异常
                // no more active shards... (we should not really get here, just safety)
                onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
            } else {
                try {
                    final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
                    if (shard.currentNodeId().equals(nodes.localNodeId())) {//shard在本地执行shardOperation方法,并通过onOperation方法封装结果
                        threadPool.executor(executor).execute(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    onOperation(shard, shardIndex, shardOperation(shardRequest));
                                } catch (Throwable e) {
                                    onOperation(shard, shardIt, shardIndex, e);
                                }
                            }
                        });
                    } else {//不是本地shard,发送到对应节点。
                        DiscoveryNode node = nodes.get(shard.currentNodeId());
                        if (node == null) {
                            // no node connected, act as failure
                            onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
                        } else {
                            transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler&lt;ShardResponse&gt;() {
                                @Override
                                public ShardResponse newInstance() {
                                    return newShardResponse();
                                }
                                @Override
                                public String executor() {
                                    return ThreadPool.Names.SAME;
                                }
                                @Override
                                public void handleResponse(ShardResponse response) {
                                    onOperation(shard, shardIndex, response);
                                }
                                @Override
                                public void handleException(TransportException e) {
                                    onOperation(shard, shardIt, shardIndex, e);
                                }
                            });
                        }
                    }
                } catch (Throwable e) {
                    onOperation(shard, shardIt, shardIndex, e);
                }
            }
        }

方法shardOperation在countTransportAction的实现如下所示:

@Override
    protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticsearchException {
        IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());//
        IndexShard indexShard = indexService.shardSafe(request.shardId().id());
    //构造查询context
        SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id());
        SearchContext context = new DefaultSearchContext(0,
                new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
                shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,
                scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
        SearchContext.setCurrent(context);
        try {
            // TODO: min score should move to be "null" as a value that is not initialized...
            if (request.minScore() != -1) {
                context.minimumScore(request.minScore());
            }
            BytesReference source = request.querySource();
            if (source != null &amp;&amp; source.length() &gt; 0) {
                try {
                    QueryParseContext.setTypes(request.types());
                    context.parsedQuery(indexService.queryParserService().parseQuery(source));
                } finally {
                    QueryParseContext.removeTypes();
                }
            }
            final boolean hasTerminateAfterCount = request.terminateAfter() != DEFAULT_TERMINATE_AFTER;
            boolean terminatedEarly = false;
            context.preProcess();
            try {
                long count;
                if (hasTerminateAfterCount) {//调用lucene的封装接口执行查询并返回结果
                    final Lucene.EarlyTerminatingCollector countCollector =
                            Lucene.createCountBasedEarlyTerminatingCollector(request.terminateAfter());
                    terminatedEarly = Lucene.countWithEarlyTermination(context.searcher(), context.query(), countCollector);
                    count = countCollector.count();
                } else {
                    count = Lucene.count(context.searcher(), context.query());
                }
                return new ShardCountResponse(request.shardId(), count, terminatedEarly);
            } catch (Exception e) {
                throw new QueryPhaseExecutionException(context, "failed to execute count", e);
            }
        } finally {
            // this will also release the index searcher
            context.close();
            SearchContext.removeCurrent();
        }
    }

可以看到这里是每个action真正的逻辑实现。因为这里涉及到index部分的内容,这里就不详细分析。后面关于index的分析会有涉及。这就是support action中的第一种实现。

master的相关操作

第二种就master的相关操作,因此没有实现对应的操作类,而只是实现了一个方法。该方法的作用跟操作器作用相同,唯一的不同是它没有操作器这么多的变量, 而且它不是异步的。master的操作需要实时进行,执行过程中需要阻塞某些操作,保证集群状态一致性。这里就不再说明,请参考TransportMasterNodeOperationAction原码。

总结

本篇概括说了support action,并以countTransportAction为例说明了support Action中的异步操作器实现,最后简单的分析了master的同步操作。因为这里涉及到很多action不可能一一分析,有兴趣可以参考对应的代码。而且这里有以下index部分的内容,所以没有更深入的分析。在后面分析完index的相关功能后,会挑出几个重要的action做详细分析。

以上就是elasticsearch数据信息索引操作action support示例分析的详细内容,更多关于elasticsearch数据信息索引操作action support的资料请关注我们其它相关文章!

(0)

相关推荐

  • elasticsearch索引index之engine读写控制结构实现

    目录 engine的实现结构 Engine类的方法: 如index方法的实现: 总结 engine的实现结构 elasticsearch对于索引中的数据操作如读写get等接口都封装在engine中,同时engine还封装了索引的读写控制,如流量.错误处理等.engine是离lucene最近的一部分. engine的实现结构如下所示: engine接口有三个实现类,主要逻辑都在InternalEngine中. ShadowEngine之实现了engine接口的部分读方法,主要用于对于索引的读操作.

  • elasticsearch索引index之Translog数据功能分析

    目录 跟大多数分布式系统一样,es也通过临时写入写操作来保证数据安全.因为lucene索引过程中,数据会首先据缓存在内存中直到达到一个量(文档数或是占用空间大小)才会写入到磁盘.这就会带来一个风险,如果在写入磁盘前系统崩溃,那么这些缓存数据就会丢失.es通过translog解决了这个问题,每次写操作都会写入一个临时文件translog中,这样如果系统需要恢复数据可以从translog中读取.本篇就主要分析translog的结构及写入方式. 这一部分主要包括两部分translog和tanslogF

  • elasticsearch源码分析index action实现方式

    目录 action的作用 TransportAction的类图 OperationTransportHandler的代码 primary操作的方法 总结 action的作用 上一篇从结构上分析了action的,本篇将以index action为例仔分析一下action的实现方式. 再概括一下action的作用:对于每种功能(如index)action都会包括两个基本的类*action(IndexAction)和Transport*action(TransportIndexAction),前者类中

  • elasticsearch索引index数据功能源码示例

    从本篇开始,对elasticsearch的介绍将进入数据功能部分(index),这一部分包括索引的创建,管理,数据索引及搜索等相关功能.对于这一部分的介绍,首先对各个功能模块的分析,然后详细分析数据索引和搜索的整个流程. 这一部分从代码包结构上可以分为:index, indices及lucene(common)几个部分.index包中的代码主要是各个功能对应于lucene的底层操作,它们的操作对象是index的shard,是elasticsearch对lucene各个功能的扩展和封装.indic

  • Elasticsearch索引的分片分配Recovery使用讲解

    目录 什么是recovery? 减少集群full restart造成的数据来回拷贝 减少主副本之间的数据复制 特大热索引为何恢复慢 什么是recovery? 在elasticsearch中,recovery指的是一个索引的分片分配到另外一个节点的过程,一般在快照恢复.索引复制分片的变更.节点故障或重启时发生,由于master节点保存整个集群相关的状态信息,因此可以判断哪些分片需要再分配及分配到哪个节点,例如: 如果某个主分片在,而复制分片所在的节点挂掉了,那么master需要另行选择一个可用节点

  • Elasticsearch Recovery索引分片分配详解

    目录 基础知识点 减少集群Full Restart造成的数据来回拷贝 减少主副本之间的数据复制 特大热索引为何恢复慢 其他Recovery相关的专家级设置 基础知识点 在Eleasticsearch中recovery指的就是一个索引的分片分配到另外一个节点的过程:一般在快照恢复.索引副本数变更.节点故障.节点重启时发生.由于master保存整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点,例如: 如果某个shard主分片在,副分片所在结点挂了,那么选择另外一个可用

  • elasticsearch数据信息索引操作action support示例分析

    目录 抽象类分析 doExecute方法 performOperation代码 master的相关操作 总结 抽象类分析 Action这一部分主要是数据(索引)的操作和部分集群信息操作. 所有的请求通过client转发到对应的action上然后再由对应的TransportAction来执行相关请求.如果请求能在本机上执行则在本机上执行,否则使用Transport进行转发到对应的节点.action support部分是对action的抽象,所有的具体action都继承了support action

  • 解析linq to xml操作XML的示例分析

    .Net中的System.Xml.Linq命名空间提供了linq to xml的支持.这个命名空间中的XDocument,XElement以及XText,XAttribute提供了读写xml文档的关键方法.1. 使用linq to xml写xml:使用XDocument的构造函数可以构造一个Xml文档对象:使用XElement对象可以构造一个xml节点元素,使用XAttribute构造函数可以构造元素的属性:使用XText构造函数可以构造节点内的文本.如下实例代码: 复制代码 代码如下: cla

  • ElasticSearch学习之多条件组合查询验证及示例分析

    目录 多条件组合查询 bool constant_score 查询验证 & 分析 验证 分析 排序 默认排序 自定义排序 tips 单字段排序 多字段 scroll分页 初始化快照 & 快照保存10分钟 根据快照ID滚动查询 多条件组合查询 bool es中使用bool来控制多条件查询,bool查询支持以下参数: must:被查询的数据必须满足当前条件 mush_not:被查询的数据必须不满足当前条件 should:被查询的数据应该满足当前条件.should查询被用于修正查询结果的评分.需

  • 使用Python操作Elasticsearch数据索引的教程

    Elasticsearch是一个分布式.Restful的搜索及分析服务器,Apache Solr一样,它也是基于Lucence的索引服务器,但我认为Elasticsearch对比Solr的优点在于: 轻量级:安装启动方便,下载文件之后一条命令就可以启动: Schema free:可以向服务器提交任意结构的JSON对象,Solr中使用schema.xml指定了索引结构: 多索引文件支持:使用不同的index参数就能创建另一个索引文件,Solr中需要另行配置: 分布式:Solr Cloud的配置比较

  • PHP框架Laravel中使用UUID实现数据分表操作示例

    本文实例讲述了PHP框架Laravel中使用UUID实现数据分表操作.分享给大家供大家参考,具体如下: UUID UUID是指在一台机器上生成的数字,它保证对在同一时空中的所有机器都是唯一的. 说的简单点,它就是通过一个规则(如:业务标识号+年月日+当日自增数字格式化)生成的一个具有唯一性的辨识资讯.用于关联我们的一些额数据和资讯. 实例 之前在做一个项目的时候用到这个东西,现在我就用Laravel框架写一个简单的demo 前端form表单 <!DOCTYPE html> <html&g

  • python elasticsearch从创建索引到写入数据的全过程

    python elasticsearch从创建索引到写入数据 创建索引 from elasticsearch import Elasticsearch es = Elasticsearch('192.168.1.1:9200') mappings = { "mappings": { "type_doc_test": { #type_doc_test为doc_type "properties": { "id": { "

  • Elasticsearch文档索引基本操作增删改查示例

    接口幂等性 接口幂等性:数学概念,多次请求,相当于一次请求 get,put,delete都是幂等性的接口 post 存在幂等性的问题 前端速度很快,点了两次,会生成两个订单 用户在访问新增页面时(提交订单)--->接口返回一个唯一id,提交订单,携带唯一id过来,后端判断这个唯一id是否被用过--->没用过,创建订单 你在项目中碰到的问题和如何解决(项目收获)下订单,经常重复订单,点得快,幂等性问题,如何解决的 倒排索引 1.es介绍10个点 2.安装 -jdk :java开发环境 官网下载e

  • Elasticsearch之倒排索引及索引操作

    目录 倒排索引 一 倒排索引是什么 二 举例 三 倒排索引待解决的问题 索引操作 一 索引初始化 二 查询索引配置 三 更新索引 四 删除索引 倒排索引 一 倒排索引是什么 倒排索引源于实际应用中需要根据属性的值来查找记录,这种索引表中的每一个项都包括一个属性值和具有该属性值的各记录的地址.由于不是由记录来确定属性值,而是由属性值来确定记录的位置,因而成为倒排索引.带有倒排索引的文件我们称之为倒排索引文件,简称倒排文件 二 举例 例如有如下三个文件: 文件A:通过Python django搭建网

  • 使用elasticsearch定时删除索引数据

    1.有的时候我们在使用ES 由于资源有限或业务需求,我们只想保存最近一段时间的数据,所以有必要做定时删除数据. 2.编写脚本 vim del_es_by_day.sh #!/bin/bash #定时删除elasticsearch索引 #author menard 2019-3-25 date=`date -d "-7 days" "+%Y.%m.%d"` /usr/bin/curl -v --user elastic:password -XDELETE "

  • thinkPHP5框架实现多数据库连接,跨数据连接查询操作示例

    本文实例讲述了thinkPHP5框架实现多数据库连接,跨数据连接查询操作.分享给大家供大家参考,具体如下: 1. 多数据库连接 方法1:在需要连接其他数据库的地方,使用Db::connect()方法动态连接数据库,方法参数为数据库配置的数组或字符串例如: 字符串参数: Db::connect('mysql://root:1234@127.0.0.1:3306/thinkphp#utf8'); 配置数组参数: Db::connect([ // 数据库类型 'type' => 'mysql', //

随机推荐