详解Java ES多节点任务的高效分发与收集实现

目录
  • 一、概述
  • 二、请求分发的简单思路
  • 三、es中search的多节点分发收集
    • 3.1、多节点响应结果处理
    • 3.2、异步提交请求实现

一、概述

我们知道,当我们对es发起search请求或其他操作时,往往都是随机选择一个coordinator发起请求。而这请求,可能是该节点能处理,也可能是该节点不能处理的,也可能是需要多节点共同处理的,可以说是情况比较复杂。

所以,coordinator的重要工作是,做请求分发与结果收集。那么,如何高性能和安全准确地实现这一功能则至关重要。

二、请求分发的简单思路

我们这里所说的请求分发,一般是针对多个网络节点而言的。那么,如何将请求发往多节点,并在最终将结果合并起来呢?

同步请求各节点,当第一个节点响应后,再向第二个节点发起请求,以此类推,直到所有节点请求完成,然后再将结果聚合起来。就完成了需求了,不费吹灰之力。简单不?

无脑处理自有无脑处理的缺点。依次请求各节点,无法很好利用系统的分布式特点,变并行为串行了,好不厉害。另外,对于当前请求,当其未处理完成这所有节点的分发收集工作时,当前线程将会一直被占用。从而,下游请求将无法再接入,从而将你了并发能力,使其与线程池大小同日而语了。这可不好。

我们依次想办法优化下。

首先,我们可以将串行分发请求变成并行分发,即可以使用多线程,向多节点发起请求,当某线程处理完成时,就返回结果。使用类似于CountDownLatch的同步工具,保证所有节点都处理完成后,再由外单主线程进行结果合并操作。

以上优化,看起来不错,避免了同步的性能问题。但是,当有某个节点响应非常慢时,它将阻塞后续节点的工作,从而使整个请求变慢,从而同样变成线程池的大小即是并发能力的瓶颈。可以说,治标不治本。

再来,继续优化。我们可以释放掉主线程的持有,让每个分发线程处理完成当前任务时,都去检查任务队列,是否已完成。如果未完成则忽略,如果已完成,则启动合并任务。

看起来不错,已经有完全并发样子了。但还能不能再优化?各节点的分发,同样是同步请求,虽然处理简单,但在这server响应期间,该线程仍是无法被使用的,如果类似请求过多,则必然是不小的消耗。如果能将单节点的请求,能够做到异步处理,那样岂不完美?但这恐怕不好做吧!不过,终归是一个不错的想法了。

三、es中search的多节点分发收集

我们以search的分发收集为出发点,观看es如何办成这件。原因是search在es中最为普遍与经典,虽说不得每个地方实现都一样,但至少参考意义还是有的。故以search为切入点。search的框架工作流程,我们之前已经研究过,本节就直接以核心开始讲解,它是在 TransportSearchAction.executeRequest() 中的。

// org.elasticsearch.action.search.TransportSearchAction#executeRequest
    private void executeRequest(Task task, SearchRequest searchRequest,
                                SearchAsyncActionProvider searchAsyncActionProvider, ActionListener<SearchResponse> listener) {
        final long relativeStartNanos = System.nanoTime();
        final SearchTimeProvider timeProvider =
            new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
        ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
            if (source != searchRequest.source()) {
                // only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
                // situations when source is rewritten to null due to a bug
                searchRequest.source(source);
            }
            final ClusterState clusterState = clusterService.state();
            final SearchContextId searchContext;
            final Map<String, OriginalIndices> remoteClusterIndices;
            if (searchRequest.pointInTimeBuilder() != null) {
                searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId());
                remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
            } else {
                searchContext = null;
                remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),
                    searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState));
            }
            OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
            if (remoteClusterIndices.isEmpty()) {
                executeLocalSearch(
                    task, timeProvider, searchRequest, localIndices, clusterState, listener, searchContext, searchAsyncActionProvider);
            } else {
                // 多节点数据请求
                if (shouldMinimizeRoundtrips(searchRequest)) {
                    // 通过 parentTaskId 关联所有子任务
                    final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
                    ccsRemoteReduce(parentTaskId, searchRequest, localIndices, remoteClusterIndices, timeProvider,
                        searchService.aggReduceContextBuilder(searchRequest),
                        remoteClusterService, threadPool, listener,
                        (r, l) -> executeLocalSearch(
                            task, timeProvider, r, localIndices, clusterState, l, searchContext, searchAsyncActionProvider));
                } else {
                    AtomicInteger skippedClusters = new AtomicInteger(0);
                    // 直接分发多shard请求到各节点
                    collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
                        skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
                        ActionListener.wrap(
                            searchShardsResponses -> {
                                // 当所有节点都响应后,再做后续逻辑处理,即此处的后置监听
                                final BiFunction<String, String, DiscoveryNode> clusterNodeLookup =
                                    getRemoteClusterNodeLookup(searchShardsResponses);
                                final Map<String, AliasFilter> remoteAliasFilters;
                                final List<SearchShardIterator> remoteShardIterators;
                                if (searchContext != null) {
                                    remoteAliasFilters = searchContext.aliasFilter();
                                    remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses,
                                        searchContext, searchRequest.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
                                } else {
                                    remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
                                    remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices,
                                        remoteAliasFilters);
                                }
                                int localClusters = localIndices == null ? 0 : 1;
                                int totalClusters = remoteClusterIndices.size() + localClusters;
                                int successfulClusters = searchShardsResponses.size() + localClusters;
                                // 至于后续搜索实现如何,不在此间
                                executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators,
                                    clusterNodeLookup, clusterState, remoteAliasFilters, listener,
                                    new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
                                    searchContext, searchAsyncActionProvider);
                            },
                            listener::onFailure));
                }
            }
        }, listener::onFailure);
        if (searchRequest.source() == null) {
            rewriteListener.onResponse(searchRequest.source());
        } else {
            Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
                rewriteListener);
        }
    }

可以看到,es的search功能,会被划分为几种类型,有点会走集群分发,而有的则不需要。我们自然是希望走集群分发的,所以,只需看 collectSearchShards() 即可。这里面其实就是对多个集群节点的依次请求,当然还有结果收集。

// org.elasticsearch.action.search.TransportSearchAction#collectSearchShards
static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters,
                                Map<String, OriginalIndices> remoteIndicesByCluster, RemoteClusterService remoteClusterService,
                                ThreadPool threadPool, ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
    // 使用该计数器进行结果控制
    final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
    final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
    final AtomicReference<Exception> exceptions = new AtomicReference<>();
    // 迭代各节点,依次发送请求
    for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
        final String clusterAlias = entry.getKey();
        boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
        Client clusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
        final String[] indices = entry.getValue().indices();
        ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices)
            .indicesOptions(indicesOptions).local(true).preference(preference).routing(routing);
        // 向集群中 clusterAlias 异步发起请求处理 search
        clusterClient.admin().cluster().searchShards(searchShardsRequest,
            new CCSActionListener<ClusterSearchShardsResponse, Map<String, ClusterSearchShardsResponse>>(
                clusterAlias, skipUnavailable, responsesCountDown, skippedClusters, exceptions, listener) {
                @Override
                void innerOnResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
                    // 每次单节点响应时,将结果存放到 searchShardsResponses 中
                    searchShardsResponses.put(clusterAlias, clusterSearchShardsResponse);
                }

                @Override
                Map<String, ClusterSearchShardsResponse> createFinalResponse() {
                    // 所有节点都返回时,将结果集返回
                    return searchShardsResponses;
                }
            }
        );
    }
}
// org.elasticsearch.client.support.AbstractClient.ClusterAdmin#searchShards
@Override
public void searchShards(final ClusterSearchShardsRequest request, final ActionListener<ClusterSearchShardsResponse> listener) {
    // 发起请求 indices:admin/shards/search_shards, 其对应处理器为 TransportClusterSearchShardsAction
    execute(ClusterSearchShardsAction.INSTANCE, request, listener);
}

以上是es向集群中多节点发起请求的过程,其重点在于所有的请求都是异步请求,即向各节点发送完成请求后,当前线程即为断开状态。这就体现了无阻塞的能力了,以listner形式进行处理后续业务。这对于发送自然没有问题,但如何进行结果收集呢?实际上就是通过listner来处理的。在远程节点响应后,listener.onResponse()将被调用。

3.1、多节点响应结果处理

这是我们本文讨论的重点。前面我们看到es已经异步发送请求出去了(且不论其如何发送),所以如何收集结果也很关键。而es中的做法则很简单,使用一个 ConcurrentHashMap 收集每个结果,一个CountDown标识是否已处理完成。

// org.elasticsearch.action.search.TransportSearchAction.CCSActionListener#CCSActionListener
CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters,
                    AtomicReference<Exception> exceptions, ActionListener<FinalResponse> originalListener) {
    this.clusterAlias = clusterAlias;
    this.skipUnavailable = skipUnavailable;
    this.countDown = countDown;
    this.skippedClusters = skippedClusters;
    this.exceptions = exceptions;
    this.originalListener = originalListener;
}

// 成功时的响应
@Override
public final void onResponse(Response response) {
    // inner响应为将结果放入 searchShardsResponses 中
    innerOnResponse(response);
    // maybeFinish 则进行结果是否完成判定,如果完成,则调用回调方法,构造结果
    maybeFinish();
}

private void maybeFinish() {
    // 使用一个 AtomicInteger 进行控制
    if (countDown.countDown()) {
        Exception exception = exceptions.get();
        if (exception == null) {
            FinalResponse response;
            try {
                // 创建响应结果,此处 search 即为 searchShardsResponses
                response = createFinalResponse();
            } catch(Exception e) {
                originalListener.onFailure(e);
                return;
            }
            // 成功响应回调,实现结果收集后的其他业务处理
            originalListener.onResponse(response);
        } else {
            originalListener.onFailure(exceptions.get());
        }
    }
}
// CountDown 实现比较简单,只有最后一个返回true, 其他皆为false, 即实现了 At Most Once 语义
/**
* Decrements the count-down and returns <code>true</code> iff this call
* reached zero otherwise <code>false</code>
*/
public boolean countDown() {
    assert originalCount > 0;
    for (;;) {
        final int current = countDown.get();
        assert current >= 0;
        if (current == 0) {
            return false;
        }
        if (countDown.compareAndSet(current, current - 1)) {
            return current == 1;
        }
    }
}

可见,ES中的结果收集,是以一个 AtomicInteger 实现的CountDown来处理的,当所有节点都响应时,就处理最终结果,否则将每个节点的数据放入ConcurrentHashMap中暂存起来。

而通过一个Client通用的异步调用框架,实现多节点的异步提交。整个节点响应以 CCSActionListener 作为接收者。可以说是比较简洁的了,好像也没有我们前面讨论的复杂性。因为:大道至简。

3.2、异步提交请求实现

我们知道,如果本地想实现异步提交请求,只需使用另一个线程或者线程池技术,即可实现。而对于远程Client的异步提交,则还需要借助于外部工具了。此处借助于Netty的channel.write()实现,节点响应时再回调回来,从而恢复上下文。整个过程,没有一点阻塞同步,从而达到了高效的处理能力,当然还有其他的一些异常处理,自不必说。

具体样例大致如下:因最终的处理器是以 TransportClusterSearchShardsAction 进行处理的,所以直接转到 TransportClusterSearchShardsAction。

// org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction
public class TransportClusterSearchShardsAction extends
    TransportMasterNodeReadAction<ClusterSearchShardsRequest, ClusterSearchShardsResponse> {

    private final IndicesService indicesService;

    @Inject
    public TransportClusterSearchShardsAction(TransportService transportService, ClusterService clusterService,
                                              IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters,
                                              IndexNameExpressionResolver indexNameExpressionResolver) {
        super(ClusterSearchShardsAction.NAME, transportService, clusterService, threadPool, actionFilters,
            ClusterSearchShardsRequest::new, indexNameExpressionResolver, ClusterSearchShardsResponse::new, ThreadPool.Names.SAME);
        this.indicesService = indicesService;
    }

    @Override
    protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, ClusterState state) {
        return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ,
                indexNameExpressionResolver.concreteIndexNames(state, request));
    }

    @Override
    protected void masterOperation(final ClusterSearchShardsRequest request, final ClusterState state,
                                   final ActionListener<ClusterSearchShardsResponse> listener) {
        ClusterState clusterState = clusterService.state();
        String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request);
        Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(state, request.routing(), request.indices());
        Map<String, AliasFilter> indicesAndFilters = new HashMap<>();
        Set<String> indicesAndAliases = indexNameExpressionResolver.resolveExpressions(clusterState, request.indices());
        for (String index : concreteIndices) {
            final AliasFilter aliasFilter = indicesService.buildAliasFilter(clusterState, index, indicesAndAliases);
            final String[] aliases = indexNameExpressionResolver.indexAliases(clusterState, index, aliasMetadata -> true, true,
                indicesAndAliases);
            indicesAndFilters.put(index, new AliasFilter(aliasFilter.getQueryBuilder(), aliases));
        }

        Set<String> nodeIds = new HashSet<>();
        GroupShardsIterator<ShardIterator> groupShardsIterator = clusterService.operationRouting()
            .searchShards(clusterState, concreteIndices, routingMap, request.preference());
        ShardRouting shard;
        ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
        int currentGroup = 0;
        for (ShardIterator shardIt : groupShardsIterator) {
            ShardId shardId = shardIt.shardId();
            ShardRouting[] shardRoutings = new ShardRouting[shardIt.size()];
            int currentShard = 0;
            shardIt.reset();
            while ((shard = shardIt.nextOrNull()) != null) {
                shardRoutings[currentShard++] = shard;
                nodeIds.add(shard.currentNodeId());
            }
            groupResponses[currentGroup++] = new ClusterSearchShardsGroup(shardId, shardRoutings);
        }
        DiscoveryNode[] nodes = new DiscoveryNode[nodeIds.size()];
        int currentNode = 0;
        for (String nodeId : nodeIds) {
            nodes[currentNode++] = clusterState.getNodes().get(nodeId);
        }
        listener.onResponse(new ClusterSearchShardsResponse(groupResponses, nodes, indicesAndFilters));
    }
}
// doExecute 在父类中完成
// org.elasticsearch.action.support.master.TransportMasterNodeAction#doExecute
@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
    ClusterState state = clusterService.state();
    logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
    if (task != null) {
        request.setParentTask(clusterService.localNode().getId(), task.getId());
    }
    new AsyncSingleAction(task, request, listener).doStart(state);
}

// org.elasticsearch.action.support.master.TransportMasterNodeAction.AsyncSingleAction#doStart
AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
    this.task = task;
    this.request = request;
    this.listener = listener;
    this.startTime = threadPool.relativeTimeInMillis();
}

protected void doStart(ClusterState clusterState) {
    try {
        final DiscoveryNodes nodes = clusterState.nodes();
        if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
            // check for block, if blocked, retry, else, execute locally
            final ClusterBlockException blockException = checkBlock(request, clusterState);
            if (blockException != null) {
                if (!blockException.retryable()) {
                    listener.onFailure(blockException);
                } else {
                    logger.debug("can't execute due to a cluster block, retrying", blockException);
                    // 重试处理
                    retry(clusterState, blockException, newState -> {
                        try {
                            ClusterBlockException newException = checkBlock(request, newState);
                            return (newException == null || !newException.retryable());
                        } catch (Exception e) {
                            // accept state as block will be rechecked by doStart() and listener.onFailure() then called
                            logger.trace("exception occurred during cluster block checking, accepting state", e);
                            return true;
                        }
                    });
                }
            } else {
                ActionListener<Response> delegate = ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
                    if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) {
                        logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " +
                            "stepped down before publishing action [{}], scheduling a retry", actionName), t);
                        retryOnMasterChange(clusterState, t);
                    } else {
                        delegatedListener.onFailure(t);
                    }
                });
                // 本地节点执行结果,直接以异步线程处理即可
                threadPool.executor(executor)
                    .execute(ActionRunnable.wrap(delegate, l -> masterOperation(task, request, clusterState, l)));
            }
        } else {
            if (nodes.getMasterNode() == null) {
                logger.debug("no known master node, scheduling a retry");
                retryOnMasterChange(clusterState, null);
            } else {
                DiscoveryNode masterNode = nodes.getMasterNode();
                final String actionName = getMasterActionName(masterNode);
                // 发送到master节点,以netty作为通讯工具,完成后回调 当前listner
                transportService.sendRequest(masterNode, actionName, request,
                    new ActionListenerResponseHandler<Response>(listener, responseReader) {
                        @Override
                        public void handleException(final TransportException exp) {
                            Throwable cause = exp.unwrapCause();
                            if (cause instanceof ConnectTransportException ||
                                (exp instanceof RemoteTransportException && cause instanceof NodeClosedException)) {
                                // we want to retry here a bit to see if a new master is elected
                                logger.debug("connection exception while trying to forward request with action name [{}] to " +
                                        "master node [{}], scheduling a retry. Error: [{}]",
                                    actionName, nodes.getMasterNode(), exp.getDetailedMessage());
                                retryOnMasterChange(clusterState, cause);
                            } else {
                                listener.onFailure(exp);
                            }
                        }
                });
            }
        }
    } catch (Exception e) {
        listener.onFailure(e);
    }
}

可见,es中确实有两种异步的提交方式,一种是当前节点就是执行节点,直接使用线程池提交;另一种是远程节点则起网络调用,最终如何实现异步且往下看。

// org.elasticsearch.transport.TransportService#sendRequest
public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
                                                            final TransportRequest request,
                                                            final TransportRequestOptions options,
                                                            TransportResponseHandler<T> handler) {
    final Transport.Connection connection;
    try {
        // 假设不是本节点,则获取远程的一个 connection, channel
        connection = getConnection(node);
    } catch (final NodeNotConnectedException ex) {
        // the caller might not handle this so we invoke the handler
        handler.handleException(ex);
        return;
    }
    sendRequest(connection, action, request, options, handler);
}
// org.elasticsearch.transport.TransportService#getConnection
/**
    * Returns either a real transport connection or a local node connection if we are using the local node optimization.
    * @throws NodeNotConnectedException if the given node is not connected
    */
public Transport.Connection getConnection(DiscoveryNode node) {
    if (isLocalNode(node)) {
        return localNodeConnection;
    } else {
        return connectionManager.getConnection(node);
    }
}

// org.elasticsearch.transport.TransportService#sendRequest
/**
    * Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
    *
    * @param connection the connection to send the request on
    * @param action     the name of the action
    * @param request    the request
    * @param options    the options for this request
    * @param handler    the response handler
    * @param <T>        the type of the transport response
    */
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
                                                            final TransportRequest request,
                                                            final TransportRequestOptions options,
                                                            final TransportResponseHandler<T> handler) {
    try {
        final TransportResponseHandler<T> delegate;
        if (request.getParentTask().isSet()) {
            // If the connection is a proxy connection, then we will create a cancellable proxy task on the proxy node and an actual
            // child task on the target node of the remote cluster.
            //  ----> a parent task on the local cluster
            //        |
            //         ----> a proxy task on the proxy node on the remote cluster
            //               |
            //                ----> an actual child task on the target node on the remote cluster
            // To cancel the child task on the remote cluster, we must send a cancel request to the proxy node instead of the target
            // node as the parent task of the child task is the proxy task not the parent task on the local cluster. Hence, here we
            // unwrap the connection and keep track of the connection to the proxy node instead of the proxy connection.
            final Transport.Connection unwrappedConn = unwrapConnection(connection);
            final Releasable unregisterChildNode = taskManager.registerChildConnection(request.getParentTask().getId(), unwrappedConn);
            delegate = new TransportResponseHandler<T>() {
                @Override
                public void handleResponse(T response) {
                    unregisterChildNode.close();
                    handler.handleResponse(response);
                }

                @Override
                public void handleException(TransportException exp) {
                    unregisterChildNode.close();
                    handler.handleException(exp);
                }

                @Override
                public String executor() {
                    return handler.executor();
                }

                @Override
                public T read(StreamInput in) throws IOException {
                    return handler.read(in);
                }

                @Override
                public String toString() {
                    return getClass().getName() + "/[" + action + "]:" + handler.toString();
                }
            };
        } else {
            delegate = handler;
        }
        asyncSender.sendRequest(connection, action, request, options, delegate);
    } catch (final Exception ex) {
        // the caller might not handle this so we invoke the handler
        final TransportException te;
        if (ex instanceof TransportException) {
            te = (TransportException) ex;
        } else {
            te = new TransportException("failure to send", ex);
        }
        handler.handleException(te);
    }
}

// org.elasticsearch.transport.TransportService#sendRequestInternal
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
                                                                final TransportRequest request,
                                                                final TransportRequestOptions options,
                                                                TransportResponseHandler<T> handler) {
    if (connection == null) {
        throw new IllegalStateException("can't send request to a null connection");
    }
    DiscoveryNode node = connection.getNode();

    Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
    ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
    // TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring
    final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
    final TimeoutHandler timeoutHandler;
    if (options.timeout() != null) {
        timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
        responseHandler.setTimeoutHandler(timeoutHandler);
    } else {
        timeoutHandler = null;
    }
    try {
        if (lifecycle.stoppedOrClosed()) {
            /*
                * If we are not started the exception handling will remove the request holder again and calls the handler to notify the
                * caller. It will only notify if toStop hasn't done the work yet.
                */
            throw new NodeClosedException(localNode);
        }
        if (timeoutHandler != null) {
            assert options.timeout() != null;
            timeoutHandler.scheduleTimeout(options.timeout());
        }
        connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
    } catch (final Exception e) {
        // usually happen either because we failed to connect to the node
        // or because we failed serializing the message
        final Transport.ResponseContext<? extends TransportResponse> contextToNotify = responseHandlers.remove(requestId);
        // If holderToNotify == null then handler has already been taken care of.
        if (contextToNotify != null) {
            if (timeoutHandler != null) {
                timeoutHandler.cancel();
            }
            // callback that an exception happened, but on a different thread since we don't
            // want handlers to worry about stack overflows. In the special case of running into a closing node we run on the current
            // thread on a best effort basis though.
            final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
            final String executor = lifecycle.stoppedOrClosed() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
            threadPool.executor(executor).execute(new AbstractRunnable() {
                @Override
                public void onRejection(Exception e) {
                    // if we get rejected during node shutdown we don't wanna bubble it up
                    logger.debug(
                        () -> new ParameterizedMessage(
                            "failed to notify response handler on rejection, action: {}",
                            contextToNotify.action()),
                        e);
                }
                @Override
                public void onFailure(Exception e) {
                    logger.warn(
                        () -> new ParameterizedMessage(
                            "failed to notify response handler on exception, action: {}",
                            contextToNotify.action()),
                        e);
                }
                @Override
                protected void doRun() throws Exception {
                    contextToNotify.handler().handleException(sendRequestException);
                }
            });
        } else {
            logger.debug("Exception while sending request, handler likely already notified due to timeout", e);
        }
    }
}
// org.elasticsearch.transport.RemoteConnectionManager.ProxyConnection#sendRequest
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
    throws IOException, TransportException {
    connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
        TransportActionProxy.wrapRequest(targetNode, request), options);
}
// org.elasticsearch.transport.TcpTransport.NodeChannels#sendRequest
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
    throws IOException, TransportException {
    if (isClosing.get()) {
        throw new NodeNotConnectedException(node, "connection already closed");
    }
    TcpChannel channel = channel(options.type());
    outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), compress, false);
}
// org.elasticsearch.transport.OutboundHandler#sendRequest
/**
    * Sends the request to the given channel. This method should be used to send {@link TransportRequest}
    * objects back to the caller.
    */
void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action,
                    final TransportRequest request, final TransportRequestOptions options, final Version channelVersion,
                    final boolean compressRequest, final boolean isHandshake) throws IOException, TransportException {
    Version version = Version.min(this.version, channelVersion);
    OutboundMessage.Request message = new OutboundMessage.Request(threadPool.getThreadContext(), features, request, version, action,
        requestId, isHandshake, compressRequest);
    ActionListener<Void> listener = ActionListener.wrap(() ->
        messageListener.onRequestSent(node, requestId, action, request, options));
    sendMessage(channel, message, listener);
}
// org.elasticsearch.transport.OutboundHandler#sendMessage
private void sendMessage(TcpChannel channel, OutboundMessage networkMessage, ActionListener<Void> listener) throws IOException {
    MessageSerializer serializer = new MessageSerializer(networkMessage, bigArrays);
    SendContext sendContext = new SendContext(channel, serializer, listener, serializer);
    internalSend(channel, sendContext);
}
private void internalSend(TcpChannel channel, SendContext sendContext) throws IOException {
    channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
    BytesReference reference = sendContext.get();
    // stash thread context so that channel event loop is not polluted by thread context
    try (ThreadContext.StoredContext existing = threadPool.getThreadContext().stashContext()) {
        channel.sendMessage(reference, sendContext);
    } catch (RuntimeException ex) {
        sendContext.onFailure(ex);
        CloseableChannel.closeChannel(channel);
        throw ex;
    }
}
// org.elasticsearch.transport.netty4.Netty4TcpChannel#sendMessage
@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
    // netty 发送数据,异步回调,完成异步请求
    channel.writeAndFlush(Netty4Utils.toByteBuf(reference), addPromise(listener, channel));

    if (channel.eventLoop().isShutdown()) {
        listener.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
    }
}

简单说,就是依托于netty的pipeline机制以及eventLoop实现远程异步请求,至于具体实现如何,请参考之前文章或各网文。

以上就是详解Java ES多节点任务的高效分发与收集实现的详细内容,更多关于Java ES多节点任务的高效分发 收集实现的资料请关注我们其它相关文章!

(0)

相关推荐

  • Elasticsearch工具cerebro的安装与使用教程

    Cerebro是Elasticsearch 版本5.x 以前插件 Elasticsearch Kopf 的演变,可以通过图形界面查看分片分配和执行常见的索引操作.完全开源,需要依赖 Java 1.8 或更高版本才能运行. 1. 工具信息 项目 说明 类型 开源 项目地址 https://github.com/lmenezes/cerebro/ 开发框架 Scala, Play Framework, AngularJS , Bootstrap 当前稳定版本 v0.9.3 (2020.12.27)

  • Elasticsearch使用常见问题解决方案

    一.和redis一起使用会造成netty启动冲突问题,所以需要在***Application入口文件中添加方法: @PostConstruct public void init() { // see Netty4Utils.setAvailableProcessors() System.setProperty("es.set.netty.runtime.available.processors", "false"); } 二.NoNodeAvailableExcep

  • docker安装ElasticSearch:7.8.0集群的详细教程

    ElasticSearch集群支持动态请求的方式搭建集群和静态配置文件搭建集群 关于集群的动态连接方式官方的文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-update-settings.html 前置准备工作 关于参数的官网说明: https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery-

  • Java调用elasticsearch本地代码的操作方法

    虽然Java虚拟机为开发人员屏蔽了底层的实现细节,使得开发人员不用考虑底层操作系统的差异性.不过在某些应用程序中,还是免不了要直接与底层操作系统上的原生代码进行交互.今天我们就来看一下Java对本地调用提供的支持. 一.为什么要进行本地调用 1.基于性能的考虑 Java语言从其运行速度上来说,在大多数方面是慢于底层操作系统上原生的C和C++等语言的.这主要是由于Java虚拟机这个中间层次的存在.如果完全用Java语言实现的性能无法达到程序的预期要求,可以选择把部分重要且耗时的代码用C或C++来实

  • springboot中使用ElasticSearch的详细教程

    新建项目 新建一个springboot项目springboot_es用于本次与ElasticSearch的整合,如下图 引入依赖 修改我们的pom.xml,加入spring-boot-starter-data-elasticsearch <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</

  • elasticsearch+logstash并使用java代码实现日志检索

    为了项目日志不被泄露,数据展示不采用Kibana 1.环境准备 1.1 创建普通用户 #创建用户 useradd querylog #设置密码 passwd queylog #授权sudo权限 查找sudoers文件位置 whereis sudoers #修改文件为可编辑 chmod -v u+w /etc/sudoers #编辑文件 vi /etc/sudoers #收回权限 chmod -v u-w /etc/sudoers #第一次使用sudo会有提示 We trust you have

  • Java elasticsearch安装以及部署教程

    1 概述 1.1 Elastic Stack的核心 The Elastic Stack,包括ElasticSearch,Kibana,Beats和Logstash(也称为ELK Stack) 能够安全可靠的获取任何来源,任何格式的数据,然后实时的对诗句进行搜索,分析和可视化 Elasticsearch,简称ES,ES是一个开源的高扩展的分布式全文搜索引擎 是整个Elastic Stack技术栈的核心,它可以近乎实时的存储,检索数据 1.2 Elasticsearch And Solr(对比图)

  • 使用elasticsearch定时删除索引数据

    1.有的时候我们在使用ES 由于资源有限或业务需求,我们只想保存最近一段时间的数据,所以有必要做定时删除数据. 2.编写脚本 vim del_es_by_day.sh #!/bin/bash #定时删除elasticsearch索引 #author menard 2019-3-25 date=`date -d "-7 days" "+%Y.%m.%d"` /usr/bin/curl -v --user elastic:password -XDELETE "

  • 使用postman操作ElasticSearch的方法

    Postman是一款功能强大的网页调试与发送网页HTTP请求的Chrome插件 Postman背景介绍 用户在开发或者调试网络程序或者是网页B/S模式的程序的时候是需要一些方法来跟踪网页请求的,用户可以使用一些网络的监视工具比如著名的Firebug等网页调试工具.今天给大家介绍的这款网页调试工具不仅可以调试简单的css.html.脚本等简单的网页基本信息,它还可以发送几乎所有类型的HTTP请求!Postman在发送网络HTTP请求方面可以说是Chrome插件类产品中的代表产品之一. Postma

  • 关于注解式的分布式Elasticsearch的封装案例

    原生的Rest Level Client不好用,构建检索等很多重复操作. 对bboss-elasticsearch进行了部分增强:通过注解配合实体类进行自动构建索引和自动刷入文档,复杂的业务检索需要自己在xml中写Dsl.用法与mybatis-plus如出一辙. 依赖 <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> &

随机推荐