elasticsearch节点的transport请求发送处理分析

目录
  • transport请求的发送和处理过程
    • request的发送过程
    • request的接受过程
  • request和response是如何被处理
    • request的处理
    • response的处理过程
  • 最后总结

transport请求的发送和处理过程

前一篇分析对nettytransport的启动及连接,本篇主要分析transport请求的发送和处理过程。

cluster中各个节点之间需要相互发送很多信息,如master检测其它节点是否存在,node节点定期检测master节点是否存储,cluster状态的发布及搜索数据请求等等。为了保证信息传输,elasticsearch定义了一个19字节长度的信息头HEADER_SIZE = 2 + 4 + 8 + 1 + 4,以'E','S'开头,接着是4字节int信息长度,然后是8字节long型信息id,接着是一个字节的status,最后是4字节int型version。

所有的节点间的信息都是以这19个字节开始。同时elasticsearch对于节点间的所有action都定义 了名字,如对master的周期检测action,internal:discovery/zen/fd/master_ping,每个action对应着相应的messagehandler。接下来会进行详分析。

request的发送过程

代码在nettytransport中如下所示:

public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
        //参数说明:node发送的目的节点,requestId请求id,action action名称,request请求,options包括以下几种操作 RECOVERY,BULK,REG,STATE,PING;
     Channel targetChannel = nodeChannel(node, options);//获取对应节点的channel,channel在连接节点时初始化完成(请参考上一篇)
        if (compress) {
            options.withCompress(true);
        }
        byte status = 0;
     //设置status 包括以下几种STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2;
    status = TransportStatus.setRequest(status);
     ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);//初始写出流
        boolean addedReleaseListener = false;
        try {
            bStream.skip(NettyHeader.HEADER_SIZE);//留出message header的位置
            StreamOutput stream = bStream;
            // only compress if asked, and, the request is not bytes, since then only
            // the header part is compressed, and the "body" can't be extracted as compressed
            if (options.compress() && (!(request instanceof BytesTransportRequest))) {
                status = TransportStatus.setCompress(status);
                stream = CompressorFactory.defaultCompressor().streamOutput(stream);
            }
            stream = new HandlesStreamOutput(stream);
            // we pick the smallest of the 2, to support both backward and forward compatibility
            // note, this is the only place we need to do this, since from here on, we use the serialized version
            // as the version to use also when the node receiving this request will send the response with
            Version version = Version.smallest(this.version, node.version());
            stream.setVersion(version);
            stream.writeString(transportServiceAdapter.action(action, version));
            ReleasableBytesReference bytes;
            ChannelBuffer buffer;
            // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
            // that create paged channel buffers, but its tricky to know when to do it (where this option is
            // more explicit).
            if (request instanceof BytesTransportRequest) {
                BytesTransportRequest bRequest = (BytesTransportRequest) request;
                assert node.version().equals(bRequest.version());
                bRequest.writeThin(stream);
                stream.close();
                bytes = bStream.bytes();
                ChannelBuffer headerBuffer = bytes.toChannelBuffer();
                ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
                buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
            } else {
                request.writeTo(stream);
                stream.close();
                bytes = bStream.bytes();
                buffer = bytes.toChannelBuffer();
            }
            NettyHeader.writeHeader(buffer, requestId, status, version);//写信息头
            ChannelFuture future = targetChannel.write(buffer);//写buffer同时获取future,发送信息发生在这里
            ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
            future.addListener(listener);//添加listener
            addedReleaseListener = true;
            transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
        } finally {
            if (!addedReleaseListener) {
                Releasables.close(bStream.bytes());
            }
        }
    }

以上就是request的发送过程,获取目标node的channel封装请求写入信息头,然后发送并使用listener监听,这里transportRequest是一个抽象类,它继承了TransportMessage同时实现了streamable接口。cluster中对它的实现非常多,各个功能都有相应的request,这里就不一一列举,后面的代码分析中会时常涉及。

request的接受过程

request发送只是transport的一部分功能,有发送就要有接收,这样transport的功能才完整。接下来就是对接收过程的分析。上一篇中简单介绍过netty的使用,message的处理是通过MessageHandler处理,因此nettyTransport的信息处理逻辑都在MessageChannelHandler的messageReceived()方法中,代码如下所示:

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        Transports.assertTransportThread();
        Object m = e.getMessage();
        if (!(m instanceof ChannelBuffer)) {//非buffer之间返回
            ctx.sendUpstream(e);
            return;
        }
     //解析message头
        ChannelBuffer buffer = (ChannelBuffer) m;
        int size = buffer.getInt(buffer.readerIndex() - 4);
        transportServiceAdapter.received(size + 6);
        // we have additional bytes to read, outside of the header
        boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6)) != 0;
        int markedReaderIndex = buffer.readerIndex();
        int expectedIndexReader = markedReaderIndex + size;
        // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
        // buffer, or in the cumlation buffer, which is cleaned each time
        StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
      //读取信息头中的几个重要元数据
        long requestId = buffer.readLong();
        byte status = buffer.readByte();
        Version version = Version.fromId(buffer.readInt());
        StreamInput wrappedStream;
      …………
        if (TransportStatus.isRequest(status)) {//处理请求
            String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
            if (buffer.readerIndex() != expectedIndexReader) {
                if (buffer.readerIndex() < expectedIndexReader) {
                    logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
                } else {
                    logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
                }
                buffer.readerIndex(expectedIndexReader);
            }
        } else {//处理响应
            TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
            // ignore if its null, the adapter logs it
            if (handler != null) {
                if (TransportStatus.isError(status)) {
                    handlerResponseError(wrappedStream, handler);
                } else {
                    handleResponse(ctx.getChannel(), wrappedStream, handler);
                }
            } else {
                // if its null, skip those bytes
                buffer.readerIndex(markedReaderIndex + size);
            }
          …………
        wrappedStream.close();
    }

以上就是信息处理逻辑,这个方法基础自netty的SimpleChannelUpstreamHandler类。作为MessageHandler会在client和server启动时加入到handler链中,在信息到达后netty会自动调用handler链依次处理。这是netty的内容,就不详细说明,请参考netty文档。

request和response是如何被处理

request的处理

代码如下所示:

protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
        final String action = buffer.readString();//读出action的名字
        transportServiceAdapter.onRequestReceived(requestId, action);
        final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
        try {
            final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);//获取处理该信息的handler
            if (handler == null) {
                throw new ActionNotFoundTransportException(action);
            }
            final TransportRequest request = handler.newInstance();
            request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
            request.readFrom(buffer);
            if (handler.executor() == ThreadPool.Names.SAME) {
                //noinspection unchecked
                handler.messageReceived(request, transportChannel);//使用该handler处理信息。
            } else {
                threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
            }
        } catch (Throwable e) {
            try {
                transportChannel.sendResponse(e);
            } catch (IOException e1) {
                logger.warn("Failed to send error message back to client for action [" + action + "]", e);
                logger.warn("Actual Exception", e1);
            }
        }
        return action;
    }

几个关键部分在代码中进行了标注。这里仍旧不能看到请求是如何处理的。因为cluster中的请求各种各样,如ping,discovery,index等等,因此不可能使用同一种处理方式。因此request最终又被提交给handler处理。每个功能请求都实现了自己的handler,当请求被提交给handler时会做对应的处理。这里再说一下transportServiceAdapter,消息的处理都是通过它适配转发完成。request的完整处理流程是:messageReceived()方法收到信息判断是request会将其转发到transportServiceAdapter的handler方法,handler方法查找对应的requesthandler,使用将信息转发给该handler进行处理。这里就不举例说明,在后面的discover分析中我们会看到发现,ping等请求的处理过程。

response的处理过程

response通过handleResponse方法进行处理,代码如下:

protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
        final TransportResponse response = handler.newInstance();
        response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
        response.remoteAddress();
        try {
            response.readFrom(buffer);
        } catch (Throwable e) {
            handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
            return;
        }
        try {
            if (handler.executor() == ThreadPool.Names.SAME) {
                //noinspection unchecked
                handler.handleResponse(response);//转发给对应的handler
            } else {
                threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
            }
        } catch (Throwable e) {
            handleException(handler, new ResponseHandlerFailureTransportException(e));
        }
    }

response的处理过程跟request很类似。每个request都会对应一个handler和一个response的处理handler,会在时候的时候注册到transportService中。请求到达时根据action名称获取到handler处理request,根据requestId获取对应的response handler进行响应。

最后总结

nettyTransport的信息处理过程:信息通过request方法发送到目标节点,目标节点的messagehandler会受到该信息,确定是request还是response,将他们分别转发给transportServiceAdapter,TransportServiceAdapter会查询到对应的handler,信息最终会被转发给对应的handler处理并反馈。

对于nettyTransport信息发送的分析就到这里,在下一篇的cluster discovery分析中,我们会看到信息发送及处理的具体过程,希望大家以后多多支持我们!

(0)

相关推荐

  • elasticsearch节点间通信的基础transport启动过程

    目录 前言 transport 启动serverBootStrap 如何连接到node 连接方法的代码 总结 前言 在前一篇中我们分析了cluster的一些元素.接下来的章节会对cluster的运作机制做详细分析.本节先分析一些transport,它是cluster间通信的基础.它有两种实现,一种是基于netty实现nettytransport,主要用于节点间的通信.另一种是localtransport,主要是用于同一个jvm上的节点通信.因为是同一个jvm上的网络模拟,localtranspo

  • elasticsearch集群cluster示例详解

    目录 前言 节点DiscoveryNode 集群阻塞 clusterService接口 总结 前言 上一篇通过clusterservice对cluster做了一个简单的概述, 应该能够给大家一个初步认识.本篇将对cluster的代码组成进行详细分析,力求能够对cluster做一个更清晰的描述.cluster作为多个节点的协同工作机制,它需要节点,节点间通信,各个节点的状态及各个节点上的数据(index)状态.因此这一部分代码包括了上述的几个部分. 节点DiscoveryNode 首先是节点(Di

  • elasticsearch分布式及数据的功能源码分析

    从功能上说,可以分为两部分,分布式功能和数据功能.分布式功能主要是节点集群及集群附属功能如restful借口.集群性能检测功能等,数据功能主要是索引和搜索.代码上这些功能并不是完全独立,而是由相互交叉部分.当然分布式功能是为数据功能服务,数据功能肯定也难以完全独立于分布式功能. 它的源码有以下几个特点: 模块化: 每个功能都以模块化的方式实现,最后以一个借口向外暴露,最终通过guice(google轻量级DI框架)进行管理.整个系统有30多个模块(version1.5). 接口解耦: es代码中

  • elasticsearch集群cluster主要功能详细分析

    在源码概述中我们分析过,elasticsearch源码从功能上可以分为分布式功能和数据功能,接下来这几篇会就分布式功能展开.这里首先会对cluster作简单概述,然后对cluster所涉及的主要功能详细分析. elasticsearch的集群功能代码在cluster包中,通过ClusterService接口对外暴露. cluster主要包括以下功能: 发现(Discovery),路由(routing),传送功能(transport),集群状态(clusterstates)等. 发现功能功能主要用

  • elasticsearch通过guice注入Node组装启动过程

    目录 elasticsearch启动过程 首先看一下node的初始化 启动各个模块的过程 插件的加载过程 elasticsearch启动过程 elasticsearch的启动过程是根据配置和环境组装需要的模块并启动的过程.这一过程就是通过guice注入各个功能模块并启动这些模块,从而得到一个功能完整的node.正如之前所说elasticsearch的模块化特点,它的各个功能都是独立实现,然后实现通过guice对外提供. 首先简单的说一下guice,它是google的一个轻量级依赖注入框架.它的作

  • elasticsearch节点的transport请求发送处理分析

    目录 transport请求的发送和处理过程 request的发送过程 request的接受过程 request和response是如何被处理 request的处理 response的处理过程 最后总结 transport请求的发送和处理过程 前一篇分析对nettytransport的启动及连接,本篇主要分析transport请求的发送和处理过程. cluster中各个节点之间需要相互发送很多信息,如master检测其它节点是否存在,node节点定期检测master节点是否存储,cluster状

  • elasticsearch java客户端action的实现简单分析

    上一篇介绍了elasticsearch的client结构,client只是一个门面,在每个方法后面都有一个action来承接相应的功能.但是action也并非是真正的功能实现者,它只是一个代理,它的真正实现者是transportAction.本篇就对action及transportAction的实现做一个简单的分析, elasticsearch中的绝大部分操作都是通过相应的action,这些action在action包中.它的结构如下图所示: 上图是action包的部分截图,这里面对应着各个功能

  • CSRF跨站请求伪造漏洞分析与防御

    目录 CSRF 漏洞原理 漏洞危害 防御绕过 漏洞利用 防御措施 总结 CSRF 现在的网站都有利用CSRF令牌来防止CSRF,就是在请求包的字段加一个csrf的值,防止csrf,要想利用该漏洞,要和xss组合起来,利用xss获得该csrf值,在构造的请求中将csrf值加进去,就可以绕过csrf防御,利用该漏洞. 该漏洞与xss的区别:xss是通过执行恶意脚本,获取到用户的cookie等信息,再利用cookie等信息进行绕过登录限制,做一些用户可以做的事. 而csrf是伪造请求,让用户自己执行攻

  • java模拟post请求发送json的例子

    java模拟post请求发送json,用两种方式实现,第一种是HttpURLConnection发送post请求,第二种是使用httpclient模拟post请求, 方法一: package main.utils; import java.io.*; import java.net.HttpURLConnection; import java.net.URL; public class HttpUtilTest { Log log = new Log(this.getClass());//初始化

  • Go语言Web编程实现Get和Post请求发送与解析的方法详解

    本文实例讲述了Go语言Web编程实现Get和Post请求发送与解析的方法.分享给大家供大家参考,具体如下: 这是一篇入门文章,通过一个简单的例子介绍Golang的Web编程主要用到的技术. 文章结构包括: 1. Client-Get 请求 2. Client-Post 请求 3. Server 处理 Get 和 Post 数据 在数据的封装中,我们部分采用了json,因而本文也涉及到Golang中json的编码和解码. 一.Client-Get 复制代码 代码如下: package main i

  • node作为中间服务层如何发送请求(发送请求的实现方法详解)

    GET请求: var http = require('http'); var qs = require('querystring'); var data = { a: 123, time: new Date().getTime()};//这是需要提交的数据 var content = qs.stringify(data); var options = { hostname: '127.0.0.1', port: 10086, path: '/pay/pay_callback?' + conten

  • Ajax请求发送成功但不进success的解决方法

    1.情况描述:ajax发送成功,后台也成功响应请求,并返回了json数据,通过chrome监听请求也可以看到响应的json数据,但是就是不进success方法,反而跑到error方法中了 前端: $.ajax({ type : "get", data : {'dbId':node.dbId,'viewId':node.id,'date':new Date()}, url : "${ctp}/ViewOperate/ShowViewSql", dataType : &

  • PostMan post请求发送Json数据的方法

    很多同学都习惯了使用发送get请求以及POST请求发送表单数据,但是如何使用postman post请求发送json数据呢. 第一步:在post请求的header里边设置发送数据的类型 设置发送数据类型为json 第二部:填写发送的json数据 选中raw,并粘贴要发送的json数据.send,就可以看到我们返回的数据了. 以上这篇PostMan post请求发送Json数据的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们. 您可能感兴趣的文章: postman

  • Java struts2请求源码分析案例详解

    Struts2是Struts社区和WebWork社区的共同成果,我们甚至可以说,Struts2是WebWork的升级版,他采用的正是WebWork的核心,所以,Struts2并不是一个不成熟的产品,相反,构建在WebWork基础之上的Struts2是一个运行稳定.性能优异.设计成熟的WEB框架. 我这里的struts2源码是从官网下载的一个最新的struts-2.3.15.1-src.zip,将其解压即可.里面的目录页文件非常的多,我们只需要定位到struts-2.3.15.1\src\core

  • Vue-cli中post请求发送Json格式数据方式

    目录 post请求发送Json格式数据 举个例子 解决post请求无法携带数据问题 post请求发送Json格式数据 这里就不详细说明了 举个例子 var param = new URLSearchParams();         param.append("productId",this.$route.params.id)         this.axios({             url:"http://39.106.44.63:10086/loadAllProd

随机推荐