Java中多线程Reactor模式的实现
目录
- 1、 主服务器
- 2、IO请求handler+线程池
- 3、客户端
多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计为负责连接的主Reactor,其中在主Reactor的run函数中若selector检测到了连接事件的发生则dispatch该事件。
让负责管理连接的Handler处理连接,其中在这个负责连接的Handler处理器中创建子Handler用以处理IO请求。这样一来连接请求与IO请求分开执行提高通道的并发量。同时多个Reactor带来的好处是多个selector可以提高通道的检索速度
1、 主服务器
package com.crazymakercircle.ReactorModel; import com.crazymakercircle.NioDemoConfig; import com.crazymakercircle.util.Logger; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; class MultiThreadEchoServerReactor { ServerSocketChannel serverSocket; AtomicInteger next = new AtomicInteger(0); Selector bossSelector = null; Reactor bossReactor = null; //selectors集合,引入多个selector选择器 //多个选择器可以更好的提高通道的并发量 Selector[] workSelectors = new Selector[2]; //引入多个子反应器 //如果CPU是多核的可以开启多个子Reactor反应器,这样每一个子Reactor反应器还可以独立分配一个线程。 //每一个线程可以单独绑定一个单独的Selector选择器以提高通道并发量 Reactor[] workReactors = null; MultiThreadEchoServerReactor() throws IOException { bossSelector = Selector.open(); //初始化多个selector选择器 workSelectors[0] = Selector.open(); workSelectors[1] = Selector.open(); serverSocket = ServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT); serverSocket.socket().bind(address); //非阻塞 serverSocket.configureBlocking(false); //第一个selector,负责监控新连接事件 SelectionKey sk = serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT); //附加新连接处理handler处理器到SelectionKey(选择键) sk.attach(new AcceptorHandler()); //处理新连接的反应器 bossReactor = new Reactor(bossSelector); //第一个子反应器,一子反应器负责一个选择器 Reactor subReactor1 = new Reactor(workSelectors[0]); //第二个子反应器,一子反应器负责一个选择器 Reactor subReactor2 = new Reactor(workSelectors[1]); workReactors = new Reactor[]{subReactor1, subReactor2}; } private void startService() { new Thread(bossReactor).start(); // 一子反应器对应一条线程 new Thread(workReactors[0]).start(); new Thread(workReactors[1]).start(); } //反应器 class Reactor implements Runnable { //每条线程负责一个选择器的查询 final Selector selector; public Reactor(Selector selector) { this.selector = selector; } public void run() { try { while (!Thread.interrupted()) { //单位为毫秒 //每隔一秒列出选择器感应列表 selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (null == selectedKeys || selectedKeys.size() == 0) { //如果列表中的通道注册事件没有发生那就继续执行 continue; } Iterator<SelectionKey> it = selectedKeys.iterator(); while (it.hasNext()) { //Reactor负责dispatch收到的事件 SelectionKey sk = it.next(); dispatch(sk); } //清楚掉已经处理过的感应事件,防止重复处理 selectedKeys.clear(); } } catch (IOException ex) { ex.printStackTrace(); } } void dispatch(SelectionKey sk) { Runnable handler = (Runnable) sk.attachment(); //调用之前attach绑定到选择键的handler处理器对象 if (handler != null) { handler.run(); } } } // Handler:新连接处理器 class AcceptorHandler implements Runnable { public void run() { try { SocketChannel channel = serverSocket.accept(); Logger.info("接收到一个新的连接"); if (channel != null) { int index = next.get(); Logger.info("选择器的编号:" + index); Selector selector = workSelectors[index]; new MultiThreadEchoHandler(selector, channel); } } catch (IOException e) { e.printStackTrace(); } if (next.incrementAndGet() == workSelectors.length) { next.set(0); } } } public static void main(String[] args) throws IOException { MultiThreadEchoServerReactor server = new MultiThreadEchoServerReactor(); server.startService(); } }
按上述的设计思想,在主服务器中实际上设计了三个Reactor,一个主Reactor专门负责连接请求并配已单独的selector,但是三个Reactor的线程Run函数是做的相同的功能,都是根据每个线程内部的selector进行检索事件列表,若注册的监听事件发生了则调用dispactch分发到每个Reactor对应的Handler。
这里需要注意的一开始其实只有负责连接事件的主Reactor在注册selector的时候给相应的key配了一个AcceptorHandler()。
//第一个selector,负责监控新连接事件 SelectionKey sk = serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT); //附加新连接处理handler处理器到SelectionKey(选择键) sk.attach(new AcceptorHandler());
但是Reactor的run方法里若相应的selector key发生了便要dispatch到一个Handler。这里其他两个子Reactor的Handler在哪里赋值的呢?其实在处理连接请求的Reactor中便创建了各个子Handler,如下代码所示:
主Handler中先是根据服务器channel创建出客服端channel,在进行子selector与channel的绑定。
int index = next.get(); Logger.info("选择器的编号:" + index); Selector selector = workSelectors[index]; new MultiThreadEchoHandler(selector, channel);
2、IO请求handler+线程池
package com.crazymakercircle.ReactorModel; import com.crazymakercircle.util.Logger; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class MultiThreadEchoHandler implements Runnable { final SocketChannel channel; final SelectionKey sk; final ByteBuffer byteBuffer = ByteBuffer.allocate(1024); static final int RECIEVING = 0, SENDING = 1; int state = RECIEVING; //引入线程池 static ExecutorService pool = Executors.newFixedThreadPool(4); MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException { channel = c; channel.configureBlocking(false); //唤醒选择,防止register时 boss线程被阻塞,netty 处理方式比较优雅,会在同一个线程注册事件,避免阻塞boss selector.wakeup(); //仅仅取得选择键,后设置感兴趣的IO事件 sk = channel.register(selector, 0); //将本Handler作为sk选择键的附件,方便事件dispatch sk.attach(this); //向sk选择键注册Read就绪事件 sk.interestOps(SelectionKey.OP_READ); //唤醒选择,是的OP_READ生效 selector.wakeup(); Logger.info("新的连接 注册完成"); } public void run() { //异步任务,在独立的线程池中执行 pool.execute(new AsyncTask()); } //异步任务,不在Reactor线程中执行 public synchronized void asyncRun() { try { if (state == SENDING) { //写入通道 channel.write(byteBuffer); //写完后,准备开始从通道读,byteBuffer切换成写模式 byteBuffer.clear(); //写完后,注册read就绪事件 sk.interestOps(SelectionKey.OP_READ); //写完后,进入接收的状态 state = RECIEVING; } else if (state == RECIEVING) { //从通道读 int length = 0; while ((length = channel.read(byteBuffer)) > 0) { Logger.info(new String(byteBuffer.array(), 0, length)); } //读完后,准备开始写入通道,byteBuffer切换成读模式 byteBuffer.flip(); //读完后,注册write就绪事件 sk.interestOps(SelectionKey.OP_WRITE); //读完后,进入发送的状态 state = SENDING; } //处理结束了, 这里不能关闭select key,需要重复使用 //sk.cancel(); } catch (IOException ex) { ex.printStackTrace(); } } //异步任务的内部类 class AsyncTask implements Runnable { public void run() { MultiThreadEchoHandler.this.asyncRun(); } } }
3、客户端
在处理IO请求的Handler中采用了线程池,已达到异步处理的目的。
package com.crazymakercircle.ReactorModel; import com.crazymakercircle.NioDemoConfig; import com.crazymakercircle.util.Dateutil; import com.crazymakercircle.util.Logger; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; /** * create by 尼恩 @ 疯狂创客圈 **/ public class EchoClient { public void start() throws IOException { InetSocketAddress address = new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT); // 1、获取通道(channel) SocketChannel socketChannel = SocketChannel.open(address); Logger.info("客户端连接成功"); // 2、切换成非阻塞模式 socketChannel.configureBlocking(false); //不断的自旋、等待连接完成,或者做一些其他的事情 while (!socketChannel.finishConnect()) { } Logger.tcfo("客户端启动成功!"); //启动接受线程 Processer processer = new Processer(socketChannel); new Thread(processer).start(); } static class Processer implements Runnable { final Selector selector; final SocketChannel channel; Processer(SocketChannel channel) throws IOException { //Reactor初始化 selector = Selector.open(); this.channel = channel; channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } public void run() { try { while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); Iterator<SelectionKey> it = selected.iterator(); while (it.hasNext()) { SelectionKey sk = it.next(); if (sk.isWritable()) { ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE); Scanner scanner = new Scanner(System.in); Logger.tcfo("请输入发送内容:"); if (scanner.hasNext()) { SocketChannel socketChannel = (SocketChannel) sk.channel(); String next = scanner.next(); buffer.put((Dateutil.getNow() + " >>" + next).getBytes()); buffer.flip(); // 操作三:发送数据 socketChannel.write(buffer); buffer.clear(); } } if (sk.isReadable()) { // 若选择键的IO事件是“可读”事件,读取数据 SocketChannel socketChannel = (SocketChannel) sk.channel(); //读取数据 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int length = 0; while ((length = socketChannel.read(byteBuffer)) > 0) { byteBuffer.flip(); Logger.info("server echo:" + new String(byteBuffer.array(), 0, length)); byteBuffer.clear(); } } //处理结束了, 这里不能关闭select key,需要重复使用 //selectionKey.cancel(); } selected.clear(); } } catch (IOException ex) { ex.printStackTrace(); } } } public static void main(String[] args) throws IOException { new EchoClient().start(); } }
到此这篇关于Java中多线程Reactor模式的实现的文章就介绍到这了,更多相关Java 多线程Reactor内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!