Java中多线程Reactor模式的实现

目录
  • 1、 主服务器
  • 2、IO请求handler+线程池
  • 3、客户端

多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计为负责连接的主Reactor,其中在主Reactor的run函数中若selector检测到了连接事件的发生则dispatch该事件。

让负责管理连接的Handler处理连接,其中在这个负责连接的Handler处理器中创建子Handler用以处理IO请求。这样一来连接请求与IO请求分开执行提高通道的并发量。同时多个Reactor带来的好处是多个selector可以提高通道的检索速度

1、 主服务器

package com.crazymakercircle.ReactorModel;

import com.crazymakercircle.NioDemoConfig;
import com.crazymakercircle.util.Logger;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

class MultiThreadEchoServerReactor {
    ServerSocketChannel serverSocket;
    AtomicInteger next = new AtomicInteger(0);
    Selector bossSelector = null;
    Reactor bossReactor = null;
    //selectors集合,引入多个selector选择器
    //多个选择器可以更好的提高通道的并发量
    Selector[] workSelectors = new Selector[2];
    //引入多个子反应器
    //如果CPU是多核的可以开启多个子Reactor反应器,这样每一个子Reactor反应器还可以独立分配一个线程。
    //每一个线程可以单独绑定一个单独的Selector选择器以提高通道并发量
    Reactor[] workReactors = null;

    MultiThreadEchoServerReactor() throws IOException {

        bossSelector = Selector.open();
        //初始化多个selector选择器
        workSelectors[0] = Selector.open();
        workSelectors[1] = Selector.open();
        serverSocket = ServerSocketChannel.open();

        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                        NioDemoConfig.SOCKET_SERVER_PORT);
        serverSocket.socket().bind(address);
        //非阻塞
        serverSocket.configureBlocking(false);

        //第一个selector,负责监控新连接事件
        SelectionKey sk =
                serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);
        //附加新连接处理handler处理器到SelectionKey(选择键)
        sk.attach(new AcceptorHandler());

        //处理新连接的反应器
        bossReactor = new Reactor(bossSelector);

        //第一个子反应器,一子反应器负责一个选择器
        Reactor subReactor1 = new Reactor(workSelectors[0]);
        //第二个子反应器,一子反应器负责一个选择器
        Reactor subReactor2 = new Reactor(workSelectors[1]);
        workReactors = new Reactor[]{subReactor1, subReactor2};
    }

    private void startService() {
        new Thread(bossReactor).start();
        // 一子反应器对应一条线程
        new Thread(workReactors[0]).start();
        new Thread(workReactors[1]).start();
    }

    //反应器
    class Reactor implements Runnable {
        //每条线程负责一个选择器的查询
        final Selector selector;

        public Reactor(Selector selector) {
            this.selector = selector;
        }

        public void run() {
            try {
                while (!Thread.interrupted()) {
                    //单位为毫秒
                    //每隔一秒列出选择器感应列表
                    selector.select(1000);
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    if (null == selectedKeys || selectedKeys.size() == 0) {
                        //如果列表中的通道注册事件没有发生那就继续执行
                        continue;
                    }
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    while (it.hasNext()) {
                        //Reactor负责dispatch收到的事件
                        SelectionKey sk = it.next();
                        dispatch(sk);
                    }
                    //清楚掉已经处理过的感应事件,防止重复处理
                    selectedKeys.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }

        void dispatch(SelectionKey sk) {
            Runnable handler = (Runnable) sk.attachment();
            //调用之前attach绑定到选择键的handler处理器对象
            if (handler != null) {
                handler.run();
            }
        }
    }

    // Handler:新连接处理器
    class AcceptorHandler implements Runnable {
        public void run() {
        try {
                SocketChannel channel = serverSocket.accept();
                Logger.info("接收到一个新的连接");

                if (channel != null) {
                    int index = next.get();
                    Logger.info("选择器的编号:" + index);
                    Selector selector = workSelectors[index];
                    new MultiThreadEchoHandler(selector, channel);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (next.incrementAndGet() == workSelectors.length) {
                next.set(0);
            }
        }
    }

    public static void main(String[] args) throws IOException {
        MultiThreadEchoServerReactor server =
                new MultiThreadEchoServerReactor();
        server.startService();
    }

}

按上述的设计思想,在主服务器中实际上设计了三个Reactor,一个主Reactor专门负责连接请求并配已单独的selector,但是三个Reactor的线程Run函数是做的相同的功能,都是根据每个线程内部的selector进行检索事件列表,若注册的监听事件发生了则调用dispactch分发到每个Reactor对应的Handler。

这里需要注意的一开始其实只有负责连接事件的主Reactor在注册selector的时候给相应的key配了一个AcceptorHandler()。

 //第一个selector,负责监控新连接事件
        SelectionKey sk =
                serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);
        //附加新连接处理handler处理器到SelectionKey(选择键)
        sk.attach(new AcceptorHandler());

但是Reactor的run方法里若相应的selector key发生了便要dispatch到一个Handler。这里其他两个子Reactor的Handler在哪里赋值的呢?其实在处理连接请求的Reactor中便创建了各个子Handler,如下代码所示:
主Handler中先是根据服务器channel创建出客服端channel,在进行子selector与channel的绑定。

                   int index = next.get();
                   Logger.info("选择器的编号:" + index);
                   Selector selector = workSelectors[index];
                   new MultiThreadEchoHandler(selector, channel);

2、IO请求handler+线程池

package com.crazymakercircle.ReactorModel;

import com.crazymakercircle.util.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class MultiThreadEchoHandler implements Runnable {
    final SocketChannel channel;
    final SelectionKey sk;
    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    static final int RECIEVING = 0, SENDING = 1;
    int state = RECIEVING;
    //引入线程池
    static ExecutorService pool = Executors.newFixedThreadPool(4);

    MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        channel.configureBlocking(false);

        //唤醒选择,防止register时 boss线程被阻塞,netty 处理方式比较优雅,会在同一个线程注册事件,避免阻塞boss
        selector.wakeup();

        //仅仅取得选择键,后设置感兴趣的IO事件
        sk = channel.register(selector, 0);
        //将本Handler作为sk选择键的附件,方便事件dispatch
        sk.attach(this);
        //向sk选择键注册Read就绪事件
        sk.interestOps(SelectionKey.OP_READ);

        //唤醒选择,是的OP_READ生效
        selector.wakeup();
        Logger.info("新的连接 注册完成");

    }

    public void run() {
        //异步任务,在独立的线程池中执行
        pool.execute(new AsyncTask());
    }

    //异步任务,不在Reactor线程中执行
    public synchronized void asyncRun() {
        try {
            if (state == SENDING) {
                //写入通道
                channel.write(byteBuffer);

                //写完后,准备开始从通道读,byteBuffer切换成写模式
                byteBuffer.clear();
                //写完后,注册read就绪事件
                sk.interestOps(SelectionKey.OP_READ);
                //写完后,进入接收的状态
                state = RECIEVING;
            } else if (state == RECIEVING) {
                //从通道读
                int length = 0;
                while ((length = channel.read(byteBuffer)) > 0) {
                    Logger.info(new String(byteBuffer.array(), 0, length));
                }
                //读完后,准备开始写入通道,byteBuffer切换成读模式
                byteBuffer.flip();
                //读完后,注册write就绪事件
                sk.interestOps(SelectionKey.OP_WRITE);
                //读完后,进入发送的状态
                state = SENDING;
            }
            //处理结束了, 这里不能关闭select key,需要重复使用
            //sk.cancel();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    //异步任务的内部类
    class AsyncTask implements Runnable {
        public void run() {
            MultiThreadEchoHandler.this.asyncRun();
        }
    }
}

3、客户端

在处理IO请求的Handler中采用了线程池,已达到异步处理的目的。

package com.crazymakercircle.ReactorModel;

import com.crazymakercircle.NioDemoConfig;
import com.crazymakercircle.util.Dateutil;
import com.crazymakercircle.util.Logger;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
 * create by 尼恩 @ 疯狂创客圈
 **/
public class EchoClient {

    public void start() throws IOException {

        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                        NioDemoConfig.SOCKET_SERVER_PORT);

        // 1、获取通道(channel)
        SocketChannel socketChannel = SocketChannel.open(address);
        Logger.info("客户端连接成功");
        // 2、切换成非阻塞模式
        socketChannel.configureBlocking(false);
        //不断的自旋、等待连接完成,或者做一些其他的事情
        while (!socketChannel.finishConnect()) {

        }
        Logger.tcfo("客户端启动成功!");

        //启动接受线程
        Processer processer = new Processer(socketChannel);
        new Thread(processer).start();

    }

    static class Processer implements Runnable {
        final Selector selector;
        final SocketChannel channel;

        Processer(SocketChannel channel) throws IOException {
            //Reactor初始化
            selector = Selector.open();
            this.channel = channel;
            channel.register(selector,
                    SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        }

        public void run() {
            try {
                while (!Thread.interrupted()) {
                    selector.select();
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        SelectionKey sk = it.next();
                        if (sk.isWritable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);

                            Scanner scanner = new Scanner(System.in);
                            Logger.tcfo("请输入发送内容:");
                            if (scanner.hasNext()) {
                                SocketChannel socketChannel = (SocketChannel) sk.channel();
                                String next = scanner.next();
                                buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
                                buffer.flip();
                                // 操作三:发送数据
                                socketChannel.write(buffer);
                                buffer.clear();
                            }

                        }
                        if (sk.isReadable()) {
                            // 若选择键的IO事件是“可读”事件,读取数据
                            SocketChannel socketChannel = (SocketChannel) sk.channel();

                            //读取数据
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            int length = 0;
                            while ((length = socketChannel.read(byteBuffer)) > 0) {
                                byteBuffer.flip();
                                Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));
                                byteBuffer.clear();
                            }

                        }
                        //处理结束了, 这里不能关闭select key,需要重复使用
                        //selectionKey.cancel();
                    }
                    selected.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new EchoClient().start();
    }
}

到此这篇关于Java中多线程Reactor模式的实现的文章就介绍到这了,更多相关Java 多线程Reactor内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java Reactor反应器模式使用方法详解

    Reactor反应器模式 到目前为止,高性能网络编程都绕不开反应器模式.很多著名的服务器软件或者中间件都是基于反应器模式实现的,如Nginx.Redis.Netty. 反应器模式是高性能网络编程的必知.必会的模式. Reactor简介 反应器模式由Reactor反应器线程.Handlers处理器两大角色组成: (1)Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器. (2)Handlers处理器的职责:非阻塞的执行业务处理逻辑. 从上面的反应器模式定义,看不出这

  • 简单了解Java Netty Reactor三种线程模型

    1. Reactor三种线程模型 1.1. 单线程模型 Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成,NIO线程的职责如下: 1)作为NIO服务端,接收客户端的TCP连接: 2)作为NIO客户端,向服务端发起TCP连接: 3)读取通信对端的请求或者应答消息: 4)向通信对端发送消息请求或者应答消息. Reactor单线程模型示意图如下所示: Reactor单线程模型 由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞,理论上一个线程可以独立处

  • Java反应式框架Reactor中的Mono和Flux

    1. 前言 最近写关于响应式编程的东西有点多,很多同学反映对Flux和Mono这两个Reactor中的概念有点懵逼.但是目前Java响应式编程中我们对这两个对象的接触又最多,诸如Spring WebFlux.RSocket.R2DBC.我开始也对这两个对象头疼,所以今天我们就简单来探讨一下它们. 2. 响应流的特点 要搞清楚这两个概念,必须说一下响应流规范.它是响应式编程的基石.他具有以下特点: 响应流必须是无阻塞的.响应流必须是一个数据流.它必须可以异步执行.并且它也应该能够处理背压. 背压是

  • Java中多线程Reactor模式的实现

    目录 1. 主服务器 2.IO请求handler+线程池 3.客户端 多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计为负责连接的主Reactor,其中在主Reactor的run函数中若selector检测到了连接事件的发生则dispatch该事件. 让负责管理连接的Handler处理连接,其中在这个负责连接的Handler处理器中创建子Handler用以处理IO请求.这样一来连接请求与IO请求分开执行提高通道的并发量.同时

  • 老生常谈java中的Future模式

    jdk1.7.0_79 本文实际上是对上文<简单谈谈ThreadPoolExecutor线程池之submit方法>的一个延续或者一个补充.在上文中提到的submit方法里出现了FutureTask,这不得不停止脚步将方向转向Java的Future模式. Future是并发编程中的一种设计模式,对于多线程来说,线程A需要等待线程B的结果,它没必要一直等待B,可以先拿到一个未来的Future,等B有了结果后再取真实的结果. ExecutorService executor = Executors.

  • 详解Java中多线程异常捕获Runnable的实现

    详解Java中多线程异常捕获Runnable的实现 1.背景: Java 多线程异常不向主线程抛,自己处理,外部捕获不了异常.所以要实现主线程对子线程异常的捕获. 2.工具: 实现Runnable接口的LayerInitTask类,ThreadException类,线程安全的Vector 3.思路: 向LayerInitTask中传入Vector,记录异常情况,外部遍历,判断,抛出异常. 4.代码: package step5.exception; import java.util.Vector

  • java 中多线程生产者消费者问题详细介绍

    java 中多线程生产者消费者问题 前言: 一般面试喜欢问些线程的问题,较基础的问题无非就是死锁,生产者消费者问题,线程同步等等,在前面的文章有写过死锁,这里就说下多生产多消费的问题了 import java.util.concurrent.locks.*; class BoundedBuffer { final Lock lock = new ReentrantLock();//对象锁 final Condition notFull = lock.newCondition(); //生产者监视

  • Java 中责任链模式实现的三种方式

    责任链模式 责任链模式的定义:使多个对象都有机会处理请求,从而避免请求的发送者和接受者之间的耦合关系, 将这个对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理他为止.这里就不再过多的介绍什么是责任链模式,主要来说说java中如何编写.主要从下面3个框架中的代码中介绍. servlet中的filter dubbo中的filter mybatis中的plugin 这3个框架在实现责任链方式不尽相同. servlet中的Filter servlet中分别定义了一个 Filter和Filter

  • Java中多线程与并发_volatile关键字的深入理解

    一.volatile关键字 volatile是JVM提供的一种轻量级的同步机制,特性: 1.保证内存可见性 2.不保证原子性 3.防止指令重排序 二.JMM(Java Memory Model) Java内存模型中规定了所有的变量都存储在主内存中(如虚拟机物理内存中的一部分),每条线程还有自己的工作内存(如CPU中的高速缓存),线程的工作内存中保存了该线程使用到的变量到主内存的副本拷贝,线程对变量的所有操作(读取.赋值)都必须在工作内存中进行,而不能直接读写主内存中的变量.不同线程之间无法直接访

  • Java中多线程下载图片并压缩能提高效率吗

    目录 前言 实现思路 实测 前言 需求 导出Excel:本身以为是一个简单得导出,但是每行得记录文件中有一列为图片url,需要下载所有记录行对应得图片,然后压缩整个文件夹. 这里只做4.5.得代码讲解描述,其它也没什么好说得,话不多说上代码. 实现思路 多线程实现使用了线程池,Jdk1.8并发包下的CompletableFuture 第一步:得到基础数值 // 线程数 Integer threadNum = 10; // 每条线程需要处理的图片数 int dataNum = imageInfoV

  • java中多线程与线程池的基本使用方法

    目录 前言 继承Thread 实现Runnale接口 Callable 线程池 常见的4种线程池. 总结 前言 在java中,如果每个请求到达就创建一个新线程,开销是相当大的.在实际使用中,服务器在创建和销毁线程上花费的时间和消耗的系统资源都相当大,甚至可能要比在处理实际的用户请求的时间和资源要多的多.除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源.如果在一个jvm里创建太多的线程,可能会使系统由于过度消耗内存或"切换过度"而导致系统资源不足.为了防止资源不足,服务器应用程

  • java中多线程的超详细介绍

    1.线程概述 几乎所有的操作系统都支持同时运行多个任务,一个任务通常就是一个程序,每个运行中的程序就是一个进程.当一个程序运行时,内部可能包含了多个顺序执行流,每个顺序执行流就是一个线程. 2.线程与进程 进程概述: 几乎所有的操作系统都支持进程的概念,所有运行中的任务通常对应一个进程( Process).当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位. 进程特征: 1.独立性:进程是系统中独立存在的实体,

  • Java中的代理模式详解及实例代码

    java 代理模式详解 前言: 在某些情况下,一个客户不想或者不能直接引用一个对象,此时可以通过一个称之为"代理"的第三者来实现间接引用.代理对象可以在客户端和目标对象之间起到 中介的作用,并且可以通过代理对象去掉客户不能看到 的内容和服务或者添加客户需要的额外服务. 简单来说代理模式就是通过一个代理对象去访问一个实际对象,并且可以像装饰模式一样给对象添加一些功能. 静态代理 所谓静态代理即在程序运行前代理类就已经存在,也就是说我们编写代码的时候就已经把代理类的代码写好了,而动态代理则

随机推荐