netty中pipeline的handler添加删除分析

目录
  • 添加
  • DefaultChannelPipeline.addLast(ChannelHandler... handlers)
    • checkMultiplicity(handler)重复添加验证
      • isSharable()
    • newCtx = newContext(group, filterName(name, handler), handler)
      • filterName(name, handler)
      • checkDuplicateName(name)
      • context0(name)
      • newContext(EventExecutorGroup group, String name, ChannelHandler handler)
      • new DefaultChannelHandlerContext(this, childExecutor(group), name, handler)
      • isInbound(handler)
      • isOutbound(handler)
      • 在跟到其父类AbstractChannelHandlerContext的构造方法中
    • addLast0(newCtx)
    • callHandlerAdded0(newCtx)
      • ChannelHandlerAdapter.(ChannelHandlerContext ctx)
  • 删除

添加

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>(){
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new MyServerHandler());
            }
        });
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();

分析pipeline.addLast(new MyServerHandler())中的addLast

首先通过channel拿到当前的pipline, 拿到pipeline之后再为其添加handler, 因为channel初始化默认创建的是DefualtChannelPipeline

DefaultChannelPipeline.addLast(ChannelHandler... handlers)

public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    if (handlers == null) {
        throw new NullPointerException("handlers");
    }
    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, null, h);
    }
    return this;
}

这里的handlers只有一个

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        //判断handler是否被重复添加(1)
        checkMultiplicity(handler);
        //创建一个HandlerContext并添加到列表(2)
        newCtx = newContext(group, filterName(name, handler), handler);
        //添加HandlerContext(3)
        addLast0(newCtx);
        //是否已注册
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            //回调用户事件
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    //回调添加事件(4)
    callHandlerAdded0(newCtx);
    return this;
}

分为四个步骤:

  • 重复添加验证
  • 创建一个HandlerContext并添加到列表
  • 添加context
  • 回调添加事件

checkMultiplicity(handler)重复添加验证

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        //满足条件设置为true, 代表已添加
        h.added = true;
    }
}
  • 首先判断是不是ChannelHandlerAdapter类型, 因为我们自定义的handler通常会直接或者间接的继承该接口, 所以这里为true拿到handler之后转换成ChannelHandlerAdapter类型。
  • 然后进行条件判断 if (!h.isSharable() && h.added) 代表如果不是共享的handler, 并且是未添加状态, 则抛出异常。

isSharable()

public boolean isSharable() {
    Class<?> clazz = getClass();
    Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
    Boolean sharable = cache.get(clazz);
    if (sharable == null) {
        //如果这个类注解了Sharable.class, 说明这个类会被多个channel共享
        sharable = clazz.isAnnotationPresent(Sharable.class);
        cache.put(clazz, sharable);
    }
    return sharable;
}
  • 首先拿到当前handlerclass对象。
  • 然后再从netty自定义的一个ThreadLocalMap对象中获取一个盛放handlerclass对象的map, 并获取其value
  • 如果value值为空, 则会判断是否被Sharable注解, 并将自身handlerclass对象和判断结果存入map对象中, 最后返回判断结果。
  • 这说明了被Sharable注解的handler是一个共享handler
  • 从这个逻辑我们可以判断, 共享对象是可以重复添加的。

回到DefaultChannelPipeline.addLast,如果是共享对象或者没有被添加, 则将ChannelHandlerAdapteradded设置为true, 代表已添加分析完了重复添加验证, 回到addLast方法中, 我们看第二步, 创建一个HandlerContext并添加到列表

newCtx = newContext(group, filterName(name, handler), handler)

newCtx = newContext(group, filterName(name, handler), handler)

首先看filterName(name, handler)方法, 这个方法是判断添加handler的name是否重复

filterName(name, handler)

首先看filterName(name, handler)方法, 这个方法是判断添加handlername是否重复

private String filterName(String name, ChannelHandler handler) {
    if (name == null) {
        //没有名字创建默认名字
        return generateName(handler);
    }
    //检查名字是否重复
    checkDuplicateName(name);
    return name;
}

因为我们添加handler时候, 不一定会给handler命名, 所以这一步name有可能是null, 如果是null, 则创建一个默认的名字, 这里创建名字的方法就不分析了

checkDuplicateName(name)

private void checkDuplicateName(String name) {
    //不为空
    if (context0(name) != null) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

继续跟进分析context0(name)方法

context0(name)

private AbstractChannelHandlerContext context0(String name) {
    //遍历pipeline
    AbstractChannelHandlerContext context = head.next;
    while (context != tail) {
        //发现name相同, 说明存在handler
        if (context.name().equals(name)) {
            //返回
            return context;
        }
        context = context.next;
    }
    return null;
}

这里的逻辑就是将pipeline中, 从head节点往下遍历HandlerContext, 一直遍历到tail, 如果发现名字相同则会认为重复并返回HandlerContext对象。

继续跟到newContext(group, filterName(name, handler), handler)方法中。

newContext(EventExecutorGroup group, String name, ChannelHandler handler)

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

可以看到创建了一个DefaultChannelHandlerContext对象, 构造方法的参数中, 第一个this代表当前的pipeline对象, groupnull, 所以childExecutor(group)也会返回null, namehandler的名字, handler为新添加的handler对象

new DefaultChannelHandlerContext(this, childExecutor(group), name, handler)

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}
  • 首先调用了父类的构造方法, 之后将handler赋值为自身handler的成员变量, HandlerConexthandler关系在此也展现了出来, 是一种组合关系
  • 父类的构造方法, 有这么两个参数:isInbound(handler), isOutbound(handler), 这两个参数意思是判断需要添加的handlerinboundHandler还是outBoundHandler

isInbound(handler)

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

这里通过是否实现ChannelInboundHandler接口来判断是否为inboundhandler

isOutbound(handler)

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

通过判断是否实现ChannelOutboundHandler接口判断是否为outboundhandler

在跟到其父类AbstractChannelHandlerContext的构造方法中

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

之前tail节点和head节点创建的时候也执行到了这里,初始化了name, pipeline, 以及标识添加的handlerinboundhanlder还是outboundhandler

回到DefaultChannelPipeline.addLast,分析完了创建HandlerContext的相关逻辑, 我们继续跟第三步, 添加HandlerContext

addLast0(newCtx)

private void addLast0(AbstractChannelHandlerContext newCtx) {
    //拿到tail节点的前置节点
    AbstractChannelHandlerContext prev = tail.prev;
    //当前节点的前置节点赋值为tail节点的前置节点
    newCtx.prev = prev;
    //当前节点的下一个节点赋值为tail节点
    newCtx.next = tail;
    //tail前置节点的下一个节点赋值为当前节点
    prev.next = newCtx;
    //tail节点的前一个节点赋值为当前节点
    tail.prev = newCtx;
}

做了一个指针的指向操作, 将新添加的handlerConext放在tail节点之前, 之前tail节点的上一个节点之后, 如果是第一次添加handler, 那么添加后的结构入下图所示

添加完handler之后, 这里会判断当前channel是否已经注册, 这部分逻辑之后再进行分析,先接着继续执行。

之后会判断当前线程线程是否为eventLoop线程, 如果不是eventLoop线程, 就将添加回调事件封装成task交给eventLoop线程执行, 否则, 直接执行添加回调事件callHandlerAdded0(newCtx)

callHandlerAdded0(newCtx)

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.handler().handlerAdded(ctx);
        ctx.setAddComplete();
    } catch (Throwable t) {
        /**
 		* 省略
 		* */
    }
}

分析ctx.handler().handlerAdded(ctx),其中ctx是我们新创建的HandlerContext, 通过handler()方法拿到绑定的handler, 也就是新添加的handler, 然后执行handlerAdded(ctx)方法, 如果我们没有重写这个方法, 则会执行父类的该方法。

ChannelHandlerAdapter.(ChannelHandlerContext ctx)

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    // NOOP
}

没做任何操作, 也就是如果我们没有重写该方法时, 如果添加handler之后将不会做任何操作, 这里如果我们需要做一些业务逻辑, 可以通过重写该方法进行实现

删除

删除的逻辑和添加的逻辑相同,区别删除是将pipeline的双向链表的节点去掉。这里就不详细的分析。

以上就是netty中pipelinehandler添加删除分析的详细内容,更多关于netty pipeline handler的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty分布式pipeline管道异常传播事件源码解析

    目录 传播异常事件 简单的异常处理的场景 我们跟到invokeChannelRead这个方法 我还是通过两种写法来进行剖析 跟进invokeExceptionCaught方法 跟到invokeExceptionCaught方法中 讲完了inbound事件和outbound事件的传输流程, 这一小节剖析异常事件的传输流程 传播异常事件 简单的异常处理的场景 @Override public void channelRead(ChannelHandlerContext ctx, Object msg

  • Netty分布式pipeline管道传播事件的逻辑总结分析

    目录 问题分析 首先完成了handler的添加, 但是并没有马上执行回调 回到callHandlerCallbackLater方法中 章节总结 我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾 问题分析 首先看两个问题: 1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAccep

  • Netty分布式pipeline管道Handler的添加代码跟踪解析

    目录 添加handler 我们跟到其addLast()方法中 再继续跟到addLast()方法中去 我们跟到checkMultiplicity(handler)中 跟到filterName方法中 跟到isInbound(handler)方法中 我们回到最初的addLast()方法中 我们跟进addLast0(newCtx)中 前文传送门:Netty分布式pipeline管道创建 添加handler 我们以用户代码为例进行剖析: .childHandler(new ChannelInitializ

  • Netty分布式pipeline管道Handler的删除逻辑操作

    目录 删除handler操作 我们跟到getContextPrDie这个方法中 首先要断言删除的节点不能是tail和head 回到remove(ctx)方法 上一小节我们学习了添加handler的逻辑操作, 这一小节我们学习删除handler的相关逻辑 删除handler操作 如果用户在业务逻辑中进行ctx.pipeline().remove(this)这样的写法, 或者ch.pipeline().remove(new SimpleHandler())这样的写法, 则就是对handler进行删除

  • Netty分布式pipeline管道传播outBound事件源码解析

    目录 outbound事件传输流程 这里我们同样给出两种写法 跟到其write方法中: 跟到findContextOutbound中 回到write方法: 继续跟invokeWrite0 我们跟到HeadContext的write方法中 了解了inbound事件的传播过程, 对于学习outbound事件传输的流程, 也不会太困难 outbound事件传输流程 在我们业务代码中, 有可能使用wirte方法往写数据: public void channelActive(ChannelHandlerC

  • netty中pipeline异常事件分析

    目录 异常处理的场景 AbstractChannelHandlerContext.invokeChannelRead AbstractChannelHandlerContext.notifyHandlerException(Throwable cause) AbstractChannelHandlerContext.invokeExceptionCaught(final Throwable cause) 两种写法 DefualtChannelPipeline.fireExceptionCaugh

  • netty服务端处理请求联合pipeline分析

    目录 两个问题 NioMessageUnsafe.read() ServerBootstrap.init(Channel channel) ChannelInitializer的继承关系 PendingHandlerAddedTask构造方法 PendingHandlerCallback构造方法 回到callHandlerCallbackLater方法 AbstractChannel.register0(ChannelPromise promise) pipeline.invokeHandler

  • netty pipeline中的inbound和outbound事件传播分析

    目录 传播inbound事件 两种写法 DefaultChannelPipeline.fireChannelRead(msg) AbstractChannelHandlerContext.invokeChannelRead(head, msg) AbstractChannelHandlerContext.invokeChannelRead(m) HeadContext的channelRead方法 AbstractChannelHandlerContext.fireChannelRead(msg)

  • netty中pipeline的handler添加删除分析

    目录 添加 DefaultChannelPipeline.addLast(ChannelHandler... handlers) checkMultiplicity(handler)重复添加验证 isSharable() newCtx = newContext(group, filterName(name, handler), handler) filterName(name, handler) checkDuplicateName(name) context0(name) newContext

  • Netty分布式pipeline传播inbound事件源码分析

    前一小结回顾:pipeline管道Handler删除 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chann

  • Android中AsyncTask与handler用法实例分析

    本文实例讲述了Android中AsyncTask与handler用法.分享给大家供大家参考,具体如下: 首先,我们得明确下一个概念,什么是UI线程.顾名思义,ui线程就是管理着用户界面的那个线程! android的ui线程操作并不是安全的,并且和用户直接进行界面交互的操作都必须在ui线程中进行才可以.这种模式叫做单线程模式. 我们在单线程模式下编程一定要注意:不要阻塞ui线程.确保只在ui线程中访问ui组件 当我们要执行一个复杂耗时的算法并且最终要将计算结果反映到ui上时,我们会发现,我们根本没

  • ASP.NET中为GridView添加删除提示框的方法

    本文实例讲述了ASP.NET中为GridView添加删除提示框的方法.分享给大家供大家参考.具体分析如下: 在GridView中我们可以直接添加一个CommandField删除列来删除某行信息.但为了避免误操作引起的误删除,在删除操作者让操作者再确认下,完后再进行删除. 首先我们给我们的GridView 添加一个模板列,如下: 以下是引用片段: <ASP:TemplateField HeaderText="Delete" ShowHeader="False"&

  • Android中使用TagFlowLayout制作动态添加删除标签

    效果图 简单的效果图(使用开源库)[FlowLayout](" https://github.com/hongyangAndroid/FlowLayout ") 步骤 导包 compile 'com.zhy:flowlayout-lib:1.0.3' <com.zhy.view.flowlayout.TagFlowLayout android:id="@+id/id_flowlayout" zhy:max_select="-1" andro

  • oracle中添加删除主键的方法

    1.创建表的同时创建主键约束 (1)无命名 create table student ( studentid int primary key not null, studentname varchar(8), age int); (2)有命名 create table students ( studentid int , studentname varchar(8), age int, constraint yy primary key(studentid)); 2.删除表中已有的主键约束 (1

  • Android中RecyclerView实现Item添加和删除的代码示例

    本文介绍了Android中RecyclerView实现Item添加和删除的代码示例,分享给大家,具体如下: 先上效果图: RecyclerView简介: RecyclerView用以下两种方式简化了数据的展示和处理: 1. 使用LayoutManager来确定每一个item的排列方式. 2. 为增加和删除项目提供默认的动画效果,也可以自定义. RecyclerView项目结构如下: Adapter:使用RecyclerView之前,你需要一个继承自RecyclerView.Adapter的适配器

随机推荐