探索Java I/O 模型的演进

相关概念

同步和异步

描述的是用户线程与内核的交互方式:

  • 同步是指用户线程发起 I/O 请求后需要等待或者轮询内核 I/O 操作完成后才能继续执行;
  • 异步是指用户线程发起 I/O 请求后仍继续执行,当内核 I/O 操作完成后会通知用户线程,或者调用用户线程注册的回调函数。

阻塞和非阻塞

描述的是用户线程调用内核 I/O 操作的方式:

  • 阻塞是指 I/O 操作需要彻底完成后才返回到用户空间;
  • 非阻塞是指 I/O 操作被调用后立即返回给用户一个状态值,无需等到 I/O 操作彻底完成。

一个 I/O 操作其实分成了两个步骤:发起 I/O 请求和实际的 I/O 操作。 阻塞 I/O 和非阻塞 I/O 的区别在于第一步,发起 I/O 请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞 I/O ,如果不阻塞,那么就是非阻塞 I/O 。 同步 I/O 和异步 I/O 的区别就在于第二个步骤是否阻塞,如果实际的 I/O 读写阻塞请求进程,那么就是同步 I/O 。

Unix I/O 模型

Unix 下共有五种 I/O 模型:

  • 阻塞 I/O
  • 非阻塞 I/O
  • I/O 复用(select 和 poll)
  • 信号驱动 I/O(SIGIO)
  • 异步 I/O(POSIX 的 aio_系列函数)

阻塞 I/O

请求无法立即完成则保持阻塞。

阶段1:等待数据就绪。网络 I/O 的情况就是等待远端数据陆续抵达;磁盘I/O的情况就是等待磁盘数据从磁盘上读取到内核态内存中。
阶段2:数据从内核拷贝到进程。出于系统安全,用户态的程序没有权限直接读取内核态内存,因此内核负责把内核态内存中的数据拷贝一份到用户态内存中。

非阻塞 I/O

  • socket 设置为 NONBLOCK(非阻塞)就是告诉内核,当所请求的 I/O 操作无法完成时,不要将进程睡眠,而是返回一个错误码(EWOULDBLOCK) ,这样请求就不会阻塞
  • I/O 操作函数将不断的测试数据是否已经准备好,如果没有准备好,继续测试,直到数据准备好为止。整个 I/O 请求的过程中,虽然用户线程每次发起 I/O 请求后可以立即返回,但是为了等到数据,仍需要不断地轮询、重复请求,消耗了大量的 CPU 的资源
  • 数据准备好了,从内核拷贝到用户空间。

一般很少直接使用这种模型,而是在其他 I/O 模型中使用非阻塞 I/O 这一特性。这种方式对单个 I/O 请求意义不大,但给 I/O 多路复用铺平了道路.

I/O 复用(异步阻塞 I/O)

I/O 多路复用会用到 select 或者 poll 函数,这两个函数也会使进程阻塞,但是和阻塞 I/O 所不同的的,这两个函数可以同时阻塞多个 I/O 操作。而且可以同时对多个读操作,多个写操作的 I/O 函数进行检测,直到有数据可读或可写时,才真正调用 I/O 操作函数。

从流程上来看,使用 select 函数进行 I/O 请求和同步阻塞模型没有太大的区别,甚至还多了添加监视 socket,以及调用 select 函数的额外操作,效率更差。但是,使用 select 以后最大的优势是用户可以在一个线程内同时处理多个 socket 的 I/O 请求。用户可以注册多个 socket,然后不断地调用 select 读取被激活的 socket,即可达到在同一个线程内同时处理多个 I/O 请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

I/O 多路复用模型使用了 Reactor 设计模式实现了这一机制。

调用 select / poll 该方法由一个用户态线程负责轮询多个 socket,直到某个阶段1的数据就绪,再通知实际的用户线程执行阶段2的拷贝。 通过一个专职的用户态线程执行非阻塞I/O轮询,模拟实现了阶段一的异步化

信号驱动 I/O(SIGIO)

首先我们允许 socket 进行信号驱动 I/O,并安装一个信号处理函数,进程继续运行并不阻塞。当数据准备好时,进程会收到一个 SIGIO 信号,可以在信号处理函数中调用 I/O 操作函数处理数据。

异步 I/O

调用 aio_read 函数,告诉内核描述字,缓冲区指针,缓冲区大小,文件偏移以及通知的方式,然后立即返回。当内核将数据拷贝到缓冲区后,再通知应用程序。

异步 I/O 模型使用了 Proactor 设计模式实现了这一机制。

告知内核,当整个过程(包括阶段1和阶段2)全部完成时,通知应用程序来读数据.

几种 I/O 模型的比较

前四种模型的区别是阶段1不相同,阶段2基本相同,都是将数据从内核拷贝到调用者的缓冲区。而异步 I/O 的两个阶段都不同于前四个模型。

同步 I/O 操作引起请求进程阻塞,直到 I/O 操作完成。异步 I/O 操作不引起请求进程阻塞。

常见 Java I/O 模型

在了解了 UNIX 的 I/O 模型之后,其实 Java 的 I/O 模型也是类似。

“阻塞I/O”模式

在上一节 Socket 章节中的 EchoServer 就是一个简单的阻塞 I/O 例子,服务器启动后,等待客户端连接。在客户端连接服务器后,服务器就阻塞读写取数据流。

EchoServer 代码:

public class EchoServer {
public static int DEFAULT_PORT = 7;

public static void main(String[] args) throws IOException {

int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
try (
ServerSocket serverSocket =
new ServerSocket(port);
Socket clientSocket = serverSocket.accept();
PrintWriter out =
new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
out.println(inputLine);
}
} catch (IOException e) {
System.out.println("Exception caught when trying to listen on port "
+ port + " or listening for a connection");
System.out.println(e.getMessage());
}
}
}

改进为“阻塞I/O+多线程”模式

使用多线程来支持多个客户端来访问服务器。

主线程 MultiThreadEchoServer.java

public class MultiThreadEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
Socket clientSocket = null;
try (ServerSocket serverSocket = new ServerSocket(port);) {
while (true) {
clientSocket = serverSocket.accept();
// MultiThread
new Thread(new EchoServerHandler(clientSocket)).start();
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + port + " or listening for a connection");
System.out.println(e.getMessage());
}
}
}

处理器类 EchoServerHandler.java

public class EchoServerHandler implements Runnable {
private Socket clientSocket;
public EchoServerHandler(Socket clientSocket) {
this.clientSocket = clientSocket;
}
@Override
public void run() {
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
out.println(inputLine);
}
} catch (IOException e) {
System.out.println(e.getMessage());
}
}
}

存在问题:每次接收到新的连接都要新建一个线程,处理完成后销毁线程,代价大。当有大量地短连接出现时,性能比较低。

改进为“阻塞I/O+线程池”模式

针对上面多线程的模型中,出现的线程重复创建、销毁带来的开销,可以采用线程池来优化。每次接收到新连接后从池中取一个空闲线程进行处理,处理完成后再放回池中,重用线程避免了频率地创建和销毁线程带来的开销。

主线程 ThreadPoolEchoServer.java

public class ThreadPoolEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
ExecutorService threadPool = Executors.newFixedThreadPool(5);
Socket clientSocket = null;
try (ServerSocket serverSocket = new ServerSocket(port);) {
while (true) {
clientSocket = serverSocket.accept();
// Thread Pool
threadPool.submit(new Thread(new EchoServerHandler(clientSocket)));
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + port + " or listening for a connection");
System.out.println(e.getMessage());
}
}
}

存在问题:在大量短连接的场景中性能会有提升,因为不用每次都创建和销毁线程,而是重用连接池中的线程。但在大量长连接的场景中,因为线程被连接长期占用,不需要频繁地创建和销毁线程,因而没有什么优势。

虽然这种方法可以适用于小到中度规模的客户端的并发数,如果连接数超过 100,000或更多,那么性能将很不理想。

改进为“非阻塞I/O”模式

“阻塞I/O+线程池”网络模型虽然比”阻塞I/O+多线程”网络模型在性能方面有提升,但这两种模型都存在一个共同的问题:读和写操作都是同步阻塞的,面对大并发(持续大量连接同时请求)的场景,需要消耗大量的线程来维持连接。CPU 在大量的线程之间频繁切换,性能损耗很大。一旦单机的连接超过1万,甚至达到几万的时候,服务器的性能会急剧下降。

而 NIO 的 Selector 却很好地解决了这个问题,用主线程(一个线程或者是 CPU 个数的线程)保持住所有的连接,管理和读取客户端连接的数据,将读取的数据交给后面的线程池处理,线程池处理完业务逻辑后,将结果交给主线程发送响应给客户端,少量的线程就可以处理大量连接的请求。

Java NIO 由以下几个核心部分组成:

  • Channel
  • Buffer
  • Selector

要使用 Selector,得向 Selector 注册 Channel,然后调用它的 select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。

主线程 NonBlokingEchoServer.java

public class NonBlokingEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
System.out.println("Listening for connections on port " + port);
ServerSocketChannel serverChannel;
Selector selector;
try {
serverChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
serverChannel.bind(address);
serverChannel.configureBlocking(false);
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException ex) {
ex.printStackTrace();
return;
}
while (true) {
try {
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
break;
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
System.out.println("Accepted connection from " + client);
client.configureBlocking(false);
SelectionKey clientKey = client.register(selector,
SelectionKey.OP_WRITE | SelectionKey.OP_READ);
ByteBuffer buffer = ByteBuffer.allocate(100);
clientKey.attach(buffer);
}
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
client.read(output);
}
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
client.write(output);
output.compact();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
}
}
}
}
}
}

改进为“异步I/O”模式

Java SE 7 版本之后,引入了异步 I/O (NIO.2) 的支持,为构建高性能的网络应用提供了一个利器。

主线程 AsyncEchoServer.java

public class AsyncEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
ExecutorService taskExecutor = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
// create asynchronous server socket channel bound to the default group
try (AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open()) {
if (asynchronousServerSocketChannel.isOpen()) {
// set some options
asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
// bind the server socket channel to local address
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
// display a waiting message while ... waiting clients
System.out.println("Waiting for connections ...");
while (true) {
Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture = asynchronousServerSocketChannel
.accept();
try {
final AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture
.get();
Callable<String> worker = new Callable<String>() {
@Override
public String call() throws Exception {
String host = asynchronousSocketChannel.getRemoteAddress().toString();
System.out.println("Incoming connection from: " + host);
final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
// transmitting data
while (asynchronousSocketChannel.read(buffer).get() != -1) {
buffer.flip();
asynchronousSocketChannel.write(buffer).get();
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
}
asynchronousSocketChannel.close();
System.out.println(host + " was successfully served!");
return host;
}
};
taskExecutor.submit(worker);
} catch (InterruptedException | ExecutionException ex) {
System.err.println(ex);
System.err.println("\n Server is shutting down ...");
// this will make the executor accept no new threads
// and finish all existing threads in the queue
taskExecutor.shutdown();
// wait until all threads are finished
while (!taskExecutor.isTerminated()) {
}
break;
}
}
} else {
System.out.println("The asynchronous server-socket channel cannot be opened!");
}
} catch (IOException ex) {
System.err.println(ex);
}
}
}

源码

本章例子的源码,可以在 https://github.com/waylau/essential-java中 com.waylau.essentialjava.net.echo 包下找到。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java I/O 操作及优化详细介绍

    概要: 流是一组有顺序的,有起点和终点的字节集合,是对数据传输的总称或抽象.即数据在两设备间的传输称为流,流的本质是数据传输,根据数据传输特性将流抽象为各种类,方便更直观的进行数据操作. Java I/O I/O,即 Input/Output(输入/输出) 的简称.就 I/O 而言,概念上有 5 种模型:blocking I/O,nonblocking I/O,I/O multiplexing (select and poll),signal driven I/O (SIGIO),asynchr

  • 基于Java回顾之I/O的使用详解

    工作后,使用的技术随着项目的变化而变化,时而C#,时而Java,当然还有其他一些零碎的技术.总体而言,C#的使用时间要更长一些,其次是Java.我本身对语言没有什么倾向性,能干活的语言,就是好语言.而且从面向对象的角度来看,我觉得C#和Java对我来说,没什么区别. 这篇文章主要回顾Java中和I/O操作相关的内容,I/O也是编程语言的一个基础特性,Java中的I/O分为两种类型,一种是顺序读取,一种是随机读取. 我们先来看顺序读取,有两种方式可以进行顺序读取,一种是InputStream/Ou

  • Java 非阻塞I/O使用方法

    绝大部分知识与实例来自O'REILLY的<Java网络编程>(Java Network Programming,Fourth Edition,by Elliotte Rusty Harold(O'REILLY)). 非阻塞I/O简介 非阻塞I/O(NIO)是处理高并发的一种手段.在高并发的情况下,创建和回收线程以及在线程间切换的开销变得不容忽视,此时就可以使用非阻塞I/O技术.这种技术的核心思想是每次选取一个准备好的连接,尽快地填充这个连接所能管理的尽可能多的数据,然后转向下一个准备好的连接.

  • java基础知识I/O流使用详解

    "流"概念源于UNIX中的管道(pipe)的概念.在UNIX中,管道是一条不间断的字节流,用来实现程序或进程间的通信,或读写外围设备.外部文件等,它屏蔽了实际的I/O设备中处理数据的细节.   一个流,必有源端和目的端,它们可以是计算机内存的某些区域,也可以是磁盘文件,甚至可以是Internet上的某个URL. 流的方向是重要的,根据流的方向,流可以分为两类:输入流和输出流.其实输入/输出是想对于内存来说的.实际上,流的源端和目的端可简单地看成是字节的生产者和消费者,对于输入流,可不必

  • Java I/O技术之文件操作详解

    在java程序设计中,I/O操作是通过java.io包中的类和接口来实现的,因此,我们第一步要做的就是import这个包. java.io提供了一个File类,这是类很容易让人产生误会,它表示的是一个文件名或者目录名,而不是文件本身,所以通过这个类没法对文件里面的数据进行操作.File类提供了一序列对文件操作的功能:删除文件,创建目录,查询文件大小等等.要想对文件数据进行操作那就需要流对象了,在这里就暂时不做介绍. 下面通过一个叫做FileExtension类来对File类中的各种操作进行封装,

  • 探索Java I/O 模型的演进

    相关概念 同步和异步 描述的是用户线程与内核的交互方式: 同步是指用户线程发起 I/O 请求后需要等待或者轮询内核 I/O 操作完成后才能继续执行: 异步是指用户线程发起 I/O 请求后仍继续执行,当内核 I/O 操作完成后会通知用户线程,或者调用用户线程注册的回调函数. 阻塞和非阻塞 描述的是用户线程调用内核 I/O 操作的方式: 阻塞是指 I/O 操作需要彻底完成后才返回到用户空间: 非阻塞是指 I/O 操作被调用后立即返回给用户一个状态值,无需等到 I/O 操作彻底完成. 一个 I/O 操

  • 深入浅出探索Java分布式锁原理

    目录 什么是分布式锁?它能干什么? 分布式锁实现方案 基于数据库的分布式锁实现方案 实现原理 方案分析 基于Redis的分布式锁实现方案 基于sentnx命令的实现原理 方案分析 基于Redisson实现 RedLock 方案分析 基于Zookeeper的分布式锁实现方案 实现原理 方案分析 分布式锁方案到底选哪个? 总结 什么是分布式锁?它能干什么? 相信大家对于Java提供的synchronized关键字以及Lock锁都不陌生,在实际的项目中大家都使用过.如下图所示,在同一个JVM进程中,T

  • Android进阶之从IO到NIO的模型机制演进

    目录 引言 1 Basic IO模型 1.1 RandomAccessFile的缓冲区和BufferedInputStream缓冲区的区别 1.2 Basic IO模型底层原理 2 NIO模型 3 OKIO 引言 其实IO操作相较于服务端,客户端做的并不多,基本的场景就是读写文件的时候会使用到InputStream或者OutputStream,然而客户端能做的就是发起一个读写的指令,真正的操作是内核层通过ioctl指令执行读写操作,因为每次的IO操作都涉及到了线程的操作,因此会有性能上的损耗,那

  • 深入探索Java常量池

    Java的常量池通常分为两种:静态常量池和运行时常量池 静态常量池:class文件中的常量池,class文件中的常量池包括了字符串(数字)字面值,类和方法的信息,占用了class文件的大部分空间. 运行时常量池:JVM在完成加载类之后将class文件中常量池载入到内存中,并保存在方法区中.平时我们所讲的常量池就是指方法区中的运行时常量池.其相对于CLass文件常量池的另外一个重要特征是具备动态性,Java语言并不要求常量一定只有编译期才能产生,也就是并非预置入CLass文件中常量池的内容才能进入

  • 详解Java的内存模型

    JVM的内存模型 Java "一次运行,到处编译" 的真面目 说JVM内存模型之前,先聊一个老生常谈的问题,为什么Java可以 "一次编译,到处运行",这个话题最直接的答案就是,因为Java有JVM啊,解释这个答案之前,我想先回顾一下一个语言被编译的过程: 一般编程语言的编译过程大抵就是,编译--连接--执行,这里的编译就是,把我们写的源代码,根据语义语法进行翻译,形成目标代码,即汇编码.再由汇编程序翻译成机器语言(可以理解为直接运行于硬件上的01语言):然后进行连

  • Java并发内存模型详情

    目录 1.Java内存模型 2.硬件内存架构 3.实际执行 3.1 共享对象可见性 3.2 竞争条件 Java是一门支持多线程执行的语言,要编写正确的并发程序,了解Java内存模型是重要前提.而了解硬件内存模型有助于理解程序的执行. 本文主要整理以下内容 Java内存模型 硬件内存架构 共享对象可见性 竞争条件 1.Java内存模型 Java内存模型最新修订是在Java5. JSR-176 罗列了 J2SE5.0 相关发布特性,包含其中的 JSR-133(JavaTM内存模型与线程规范),jav

  • Redis缓存IO模型的演进教程示例精讲

    目录 前言 事件模型 通信 copy数据的开销 数据怎么知道发给哪个socket socket的数据怎么通知程序来取 Reactor IO多路复用器 select epoll epoll是怎么做到的? 单线程到多线程的演进 单线程 异步线程 多线程 多线程的作用点? 多线程的原理 前言 redis作为应用最广泛的nosql数据库之一,大大小小也经历过很多次升级.在4.0版本之前,单线程+IO多路复用使得redis的性能已经达到一个非常高的高度了.作者也说过,之所以设计成单线程是因为redis的瓶

  • 探索Java中private方法添加@Transactional事务未生效原因

    现在产品期望用户创建和保存逻辑分离:把User实例的创建和保存逻辑拆到两个方法分别进行.然后,把事务的注解 @Transactional 加在保存数据库的方法上. @Service public class StudentService { @Autowired private StudentMapper studentMapper; @Autowired private StudentService studentService; public void saveStudent(String

  • 一文探索Java文件读写更高效方式

    目录 背景 场景分析 场景1:小文件单文件压缩 方式1:网上流传(流传在坊间的神话,其实是带刺的玫瑰) 方式2:使用缓冲区 方式3:使用通道 方式4:使用mmp 场景2:大文件单文件压缩 场景3:大文件多文件压缩 分析结论 背后机密 1.网上流传方法 2.使用缓冲区 3.使用通道 4.使用mmp 背景 最近在探秘kafka为什么如此快?其背后的秘诀又是什么? 怀着好奇之心,开始像剥洋葱 一样逐层内嵌.一步步揭晓kafka能够吊打mq的真因.了解之后不得不说kafka:yyds. 了解到顺序存盘的

  • java自定义线程模型处理方法分享

    看过我之前文章的园友可能知道我是做游戏开发,我的很多思路和出发点是按照游戏思路来处理的,所以和web的话可能会有冲突,不相符合. 来说说为啥我要自定义线程模型呢? 按照我做的mmorpg或者mmoarpg游戏划分,线程被划分为,主线程,全局同步线程,聊天线程,组队线程,地图线程,以及地图消息分发派送线程等: 一些列,都需要根据我的划分,以及数据流向做控制. 游戏服务器,主要要做的事情,肯定是接受玩家的 命令请求 -> 相应的操作 -> 返回结果: 在服务器端所有的消息都会注册到消息管理器里,然

随机推荐