springboot内置tomcat之NIO处理流程一览

目录
  • 前言
  • tomcat组件
    • Acceptor组件
    • Poller
  • 总结
    • 大致流程为
    • 相较于BIO模型的tomcat,NIO的优势分析

前言

springboot内置的tomcat目前默认是基于NIO来实现的,本文介绍下tomcat接受请求的一些组件及组件之间的关联

tomcat组件

本文只介绍NIO中tomcat的组件

我们直接看NIO的核心类NioEndpoint的startInternal方法

Acceptor组件

public void startInternal() throws Exception {
        if (!running) {
            running = true;
            paused = false;
            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getProcessorCache());
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getEventCache());
            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getBufferPool());
            // Create worker collection
            if ( getExecutor() == null ) {
                createExecutor();
            }
            initializeConnectionLatch();
            // Start poller threads
            // 核心代码1
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }
			// 核心代码2
            startAcceptorThreads();
        }
    }

看核心代码1的位置构造了一个Poller数组,Poller是一个实现了Runnable的类,并且启动了该线程类,

getPollerThreadCount()方法返回了2和当前物理机CPU内核数的最小值,即创建的数组最大值为2

接下来看核心代码2startAcceptorThreads()

protected final void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new ArrayList<>(count);
    for (int i = 0; i < count; i++) {
        Acceptor<U> acceptor = new Acceptor<>(this);
        String threadName = getName() + "-Acceptor-" + i;
        acceptor.setThreadName(threadName);
        acceptors.add(acceptor);
        Thread t = new Thread(acceptor, threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}

创建了多个Acceptor类,Acceptor也是实现了Runnable的线程类,创建个数默认是1

然后我们看下Acceptor启动后做了什么,我们直接看run方法

  public void run() {
        ......
                U socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    // 核心代码1
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                    ......
                }
                // Successful accept, reset the error delay
                errorDelay = 0;
                // Configure the socket
                if (endpoint.isRunning() && !endpoint.isPaused()) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    // 核心代码2
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
               ......
    }

核心代码1很明显是一个阻塞模型,即接受客户端连接的,当没有客户端连接时会处于阻塞,这里可以看到默认情况下tomcat在nio模式下只有一个Acceptor线程类来接受连接

然后看核心代码2

 protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
            NioChannel channel = nioChannels.pop();
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                channel.reset();
            }
            // 核心代码
            getPoller0().register(channel);
        } catch (Throwable t) {
           ......
        }
        return true;
    }

我们看核心代码getPoller0().register(channel)

public void register(final NioChannel socket) {
        socket.setPoller(this);
        NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
        socket.setSocketWrapper(ka);
        ka.setPoller(this);
        ka.setReadTimeout(getConnectionTimeout());
        ka.setWriteTimeout(getConnectionTimeout());
        ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
        ka.setSecure(isSSLEnabled());
        PollerEvent r = eventCache.pop();
        ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
        if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
        else r.reset(socket,ka,OP_REGISTER);
        // 核心代码
        addEvent(r);
    }

看addEvent

  private void addEvent(PollerEvent event) {
        events.offer(event);
        if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
    }

events的定义

private final SynchronizedQueue<PollerEvent> events =
                new SynchronizedQueue<>();

这里可以看到封装了一个PollerEvent 并且扔到了一个队列里面,然后当前类就结束了

由此可得Acceptor的作用就是接受客户端连接,并且把连接封装起来扔到了一个队列中

Poller

我们前面已经创建并且启动了多个Poller线程类,默认的数量是小于等于2的。

然后我们看下Poller类做了什么,同样我们看run方法

  @Override
    public void run() {
        // Loop until destroy() is called
        while (true) {
            boolean hasEvents = false;
            try {
                if (!close) {
                	// 核心代码1
                    hasEvents = events();

            .......

            Iterator<SelectionKey> iterator =
                keyCount > 0 ? selector.selectedKeys().iterator() : null;
            // Walk through the collection of ready keys and dispatch
            // any active event.
            while (iterator != null && iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                // Attachment may be null if another thread has called
                // cancelledKey()
                if (attachment == null) {
                    iterator.remove();
                } else {
                    iterator.remove();
                    // 核心代码2
                    processKey(sk, attachment);
                }
            }//while
            //process timeouts
            timeout(keyCount,hasEvents);
        }//while
        getStopLatch().countDown();
    }

先看核心代码1 hasEvents = events()

 public boolean events() {
        boolean result = false;
        PollerEvent pe = null;
        for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
            result = true;
            try {
            	// 核心代码
                pe.run();
                pe.reset();
                if (running && !paused) {
                    eventCache.push(pe);
                }
            } catch ( Throwable x ) {
                log.error("",x);
            }
        }
        return result;
    }

核心代码run

@Override
    public void run() {
        if (interestOps == OP_REGISTER) {
            try {
            	// 核心代码,注册到selector轮训器
                socket.getIOChannel().register(
                        socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
            } catch (Exception x) {
                log.error(sm.getString("endpoint.nio.registerFail"), x);
            }
        } else {
            ......
        }
    }

可以看出大概的意思就是从刚才我们放进去的队列events里面取数据放到了eventCache里面,eventCache的定义SynchronizedStack eventCache,当取到数据后返回true,这个时候就会进入核心代码2处的processKey(sk, attachment),也就是开始处理请求了

protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
        try {
            if ( close ) {
                cancelledKey(sk);
            } else if ( sk.isValid() && attachment != null ) {
                if (sk.isReadable() || sk.isWritable() ) {
                    if ( attachment.getSendfileData() != null ) {
                        processSendfile(sk,attachment, false);
                    } else {
                        unreg(sk, attachment, sk.readyOps());
                        boolean closeSocket = false;
                        // Read goes before write
                        if (sk.isReadable()) {
                        	// 核心代码
                            if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                closeSocket = true;
                            }
                        }
                        if (!closeSocket && sk.isWritable()) {
                            if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                closeSocket = true;
                            }
                        }
                      ......
    }

这里就可以看到我们的NIO的模型了,也就是多路复用的模型,轮询来判断key的状态,当key是可读或者可写时执行processSocket

  public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            SocketProcessorBase<S> sc = processorCache.pop();
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            // 核心代码
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

这里就是核心代码了,可以看到getExecutor()方法,获取线程池,这个线程池是在初始化tomcat时提前初始化好的,默认情况下核心线程是10,最大线程是200。线程池的配置可以根据我们自己配置来设置大小。

这里拿到线程池然后包装了一个SocketProcessorBase线程类扔到线程池里面取执行

从这里可以看到Poller的功能就是从前面的队列里面获取连接然后包装成SocketProcessorBase之后扔到线程池里面去执行,SocketProcessorBase才是最终真正处理请求的

总结

根据上面的分析我们已经可以看到tomcat的执行流程了,这里盗用网上的一张比较好的图

大致流程为

1、创建一个Acceptor线程来接收用户连接,接收到之后扔到events queue队列里面,默认情况下只有一个线程来接收

2、创建Poller线程,数量小于等于2,Poller对象是NIO的核心,在Poller中,维护了一个Selector对象;当Poller从队列中取出socket后,注册到该Selector中;然后通过遍历Selector,找出其中可读的socket,然后扔到线程池中处理相应请求,这就是典型的NIO多路复用模型

3、扔到线程池中的SocketProcessorBase处理请求

相较于BIO模型的tomcat,NIO的优势分析

1、BIO中的流程应该是接收到请求之后直接把请求扔给线程池去做处理,在这个情况下一个连接即需要一个线程来处理,线程既需要读取数据还需要处理请求,线程占用时间长,很容易达到最大线程

2、NIO的流程的不同点在于Poller类采用了多路复用模型,即Poller类只有检查到可读或者可写的连接时才把当前连接扔给线程池来处理,这样的好处是大大节省了连接还不能读写时的处理时间(如读取请求数据),也就是说NIO“读取socket并交给Worker中的线程”这个过程是非阻塞的,当socket在等待下一个请求或等待释放时,并不会占用工作线程,因此Tomcat可以同时处理的socket数目远大于最大线程数,并发性能大大提高。

以上就是我对于tomcat中nio处理模型的一些理解。希望能给大家一个参考,也希望大家多多支持我们

(0)

相关推荐

  • SpringBoot内置tomcat启动原理详解

    前言 不得不说SpringBoot的开发者是在为大众程序猿谋福利,把大家都惯成了懒汉,xml不配置了,连tomcat也懒的配置了,典型的一键启动系统,那么tomcat在springboot是怎么启动的呢? 内置tomcat 开发阶段对我们来说使用内置的tomcat是非常够用了,当然也可以使用jetty. <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-bo

  • SpringBoot中如何启动Tomcat流程

    前面在一篇文章中介绍了 Spring 中的一些重要的 context.有一些在此文中提到的 context,可以参看上篇文章. SpringBoot 项目之所以部署简单,其很大一部分原因就是因为不用自己折腾 Tomcat 相关配置,因为其本身内置了各种 Servlet 容器.一直好奇: SpringBoot 是怎么通过简单运行一个 main 函数,就能将容器启动起来,并将自身部署到其上 .此文想梳理清楚这个问题. 我们从SpringBoot的启动入口中分析: Context 创建 // Crea

  • SpringBoot应用启动内置Tomcat的过程源码分析

    Connector启动过程 Connector是Tomcat提供的类. // 通过此 Connector 开始处理请求 @Override protected void startInternal() throws LifecycleException { // Validate settings before starting if (getPortWithOffset() < 0) { throw new LifecycleException(sm.getString( "coyote

  • 浅谈SpringBoot内嵌Tomcat的实现原理解析

    一.序言 使用SpringBoot经常会使用内嵌的tomcat做为项目的启动容器,本文将从源码的角度出发,剖析SpringBoot内嵌Tomcat的实现原理,讨论Tomcat何时创建.何时启动以及怎么启动. 二.引入Tomcat组件 导入依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId&

  • Spring Boot如何优化内嵌的Tomcat示例详解

    前言 本文主要给大家介绍了关于Spring Boot优化内嵌Tomcat的相关内容,分享出来供大家参考学习,下面话不多说了,来一看看详细的介绍吧. Spring Boot测试版本 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.3.3.RELEASE&l

  • springboot内置tomcat之NIO处理流程一览

    目录 前言 tomcat组件 Acceptor组件 Poller 总结 大致流程为 相较于BIO模型的tomcat,NIO的优势分析 前言 springboot内置的tomcat目前默认是基于NIO来实现的,本文介绍下tomcat接受请求的一些组件及组件之间的关联 tomcat组件 本文只介绍NIO中tomcat的组件 我们直接看NIO的核心类NioEndpoint的startInternal方法 Acceptor组件 public void startInternal() throws Exc

  • 浅谈springboot内置tomcat和外部独立部署tomcat的区别

    前两天,我去面了个试,面试官问了我个问题,独立部署的tomcat跟springboot内置的tomcat有什么区别,为什么存在要禁掉springboot的tomcat然后将项目部署到独立的tomcat当中? 我就想,不都一个样?独立部署的tomcat可以配置优化?禁AJP,开多线程,开nio?而且springboot内置的tomcat多方便,部署上服务器写个java脚本运行即可.现在考虑下有什么条件能优于内置tomcat的. 1.tomcat的优化配置多线程?内置的也可以配置多线程 server

  • springboot内置tomcat调优并发线程数解析

    目录 前言 参数 线程池核心线程数 线程池最大线程数 请求最大连接数 accept-count tomcat线程池处理机制 总结 前言 本文解析springboot内置tomcat调优并发线程数的一些参数,并结合源码进行分析 参数 线程池核心线程数 server.tomcat.min-spare-threads:该参数为tomcat处理业务的核心线程数大小,默认值为10 线程池最大线程数 server.tomcat.max-threads:该参数为tomcat处理业务的最大线程数大小,默认值为2

  • SpringBoot内置tomcat调优测试优化

    问题 怎么配置springBoot 内置tomcat,才能使得自己的服务效率更高呢? 基础配置 Spring Boot 能支持的最大并发量主要看其对Tomcat的设置,可以在配置文件中对其进行更改.我们可以看到默认设置中,Tomcat的最大线程数是200,最大连接数是10000. 这个不同SpringBoot 版本可能有所细微差别.本文测试基于Springboot 2.0.7.RELEASE 默认配置 /** * Maximum amount of worker threads. */ priv

  • Springboot内置tomcat配置虚拟路径过程解析

    在Springboot中默认的静态资源路径有:classpath:/METAINF/resources/,classpath:/resources/,classpath:/static/,classpath:/public/,从这里可以看出这里的静态资源路径都是在classpath中(也就是在项目路径下指定的这几个文件夹) 试想这样一种情况:一个网站有文件上传文件的功能,如果被上传的文件放在上述的那些文件夹中会有怎样的后果? 网站数据与程序代码不能有效分离: 当项目被打包成一个.jar文件部署时

  • Springboot 使用内置tomcat禁止不安全HTTP的方法

    Springboot 内置tomcat禁止不安全HTTP方法 1.在tomcat的web.xml中可以配置如下内容 让tomcat禁止不安全的HTTP方法 <security-constraint> <web-resource-collection> <url-pattern>/*</url-pattern> <http-method>PUT</http-method> <http-method>DELETE</ht

  • 解决SpringBoot内嵌Tomcat并发容量的问题

    一.SpringBoot内嵌Tomcat默认配置与优化 在做一个关于秒杀系统的模块,进行Jmeter压测性能的时候发现tomcat并发上不去,深入原因找到可供优化的地方,力求最大性能. 发现并发容器问题 对单接口进行6000线程压测,每个线程请求5次,线程在5秒内创建完毕,当进行一半的时候,已经出现了请求响应时间过大及其错误率达到了43%.这个并发容量对于配置比较好点的服务器相对来说有点弱. 深入SpringBoot底层了解原因 在SpringBoot官方文档中提到了关于元数据的配置 可以看到,

  • SpringBoot如何取消内置Tomcat启动并改用外接Tomcat

    这篇文章主要介绍了SpringBoot如何取消内置Tomcat启动并改用外接Tomcat,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1,修改pom.xml <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <!-- 去除内嵌t

随机推荐