Netty分布式Future与Promise执行回调相关逻辑剖析

目录
  • Future和Promise执行回调
    • 首先我们看一段写在handler中的业务代码
    • 这里关注newPromise()方法, 跟进去
    • 我们继续跟write方法
    • 跟进tryFailure方法
    • 跟到addMessage方法中
    • 最后跟到AbstractUnsafe的flush方法
    • 我们跟到remove()方法中
    • 再跟到trySuccess方法中
    • 我们看用户代码
    • 跟到addListener0方法中
    • 回到addListener0方法中
    • 跟到isDone方法中
    • 跟到notifyListeners()方法中
  • 章节小结

Future和Promise执行回调

Netty中的Future, 其实类似于jdk的Future, 用于异步获取执行结果

Promise则相当于一个被观察者, 其中promise对象会一直跟随着channel的读写事件, 并跟踪着事件状态, 然后执行相应的回调

这种设计思路也就是java设计模式的观察者模式

首先我们看一段写在handler中的业务代码

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ChannelFuture future = ctx.writeAndFlush("test data");
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()){
                System.out.println("写出成功");
            }else{
                System.out.println("写出失败");
            }
        }
    });
}

熟悉netty的小伙伴估计对这段代码并不陌生, 首先调用writeAndFlush方法将数据写出, 然后返回的future进行添加Listener, 并且重写回调函数

这里举一个最简单的示例, 在回调函数中判断future的状态成功与否, 成功的话就打印"写出成功", 否则节打印"写出失败"

这里如果写在handler中通常是NioEventLoop线程执行的, 在future返回之后才会执行添加listener的操作, 如果在用户线程中writeAndFlush是异步执行的, 在添加监听的时候有可能写出操作没有执行完毕, 等写出操作执行完毕之后才会执行回调

以上逻辑在代码中如何体现的呢?我们首先跟到writeAndFlush的方法中去

这里会走到AbstractChannelHandlerContext中的writeAndFlush方法中:

public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}

这里的逻辑之前剖析过, 想必大家并不陌生

这里关注newPromise()方法, 跟进去

public ChannelPromise newPromise() {
    return new DefaultChannelPromise(channel(), executor());
}

这里直接创建了DefaultChannelPromise这个对象并传入了当前channel和当前channel绑定NioEventLoop对象

在DefaultChannelPromise构造方法中, 也会将channel和NioEventLoop对象绑定在自身成员变量中

回到writeAndFlush方法继续跟

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }
    if (!validatePromise(promise, true)) {
        ReferenceCountUtil.release(msg);
        return promise;
    }
    write(msg, true, promise);
    return promise;
}

这里的逻辑也不陌生, 注意这里最后返回了promise, 其实就是我们上一步创建DefaultChannelPromise对象

DefaultChannelPromise实现了ChannelFuture接口, 所以方法如果返回该对象可以被ChannelFuture类型接收

我们继续跟write方法

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

这里的逻辑我们同样不陌生, 如果nioEventLoop线程, 我们继续调invokeWriteAndFlush方法, 如果不是nioEventLoop线程则将writeAndFlush事件封装成task, 交给eventLoop线程异步

这里如果是异步执行, 则到这一步之后, 我们的业务代码中, writeAndFlush就会返回并添加监听, 有关添加监听的逻辑稍后分析

走到这里, 无论同步异步, 都会执行到invokeWriteAndFlush方法:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

这里也是我们熟悉的逻辑, 我们看到在invokeWrite0方法中传入了我们刚才创建的DefaultChannelPromise

后续逻辑想必大家都比较熟悉, 通过事件传播, 最终会调用head节点的write方法:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

这里最终调用unsafe的write方法, 并传入了promise对象

跟到AbstractUnsafe的write方法中:

public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();
    //负责缓冲写进来的byteBuf
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        ReferenceCountUtil.release(msg);
        return;
    }
    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
    //插入写队列
    outboundBuffer.addMessage(msg, size, promise);
}

这里的逻辑之前小节也剖析过, 这里我们首先关注两个部分, 首先看在catch中safeSetFailure这步

因为是catch块, 说明发生了异常, 写到缓冲区不成功, safeSetFailure就是设置写出失败的状态

我们跟到safeSetFailure方法中:

protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
    if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
        logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
    }
}

这里看if判断, 首先我们的promise是DefaultChannelPromise, 所以!(promise instanceof VoidChannelPromise)为true

重点分析promise.tryFailure(cause), 这里是设置失败状态, 这里会调用DefaultPromise的tryFailure方法

跟进tryFailure方法

public boolean tryFailure(Throwable cause) {
    if (setFailure0(cause)) {
        notifyListeners();
        return true;
    }
    return false;
}

再跟到setFailure0(cause)中:

private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        checkNotifyWaiters();
        return true;
    }
    return false;
}

这里在if块中的cas操作, 会将参数objResult的值设置到DefaultPromise的成员变量result中, 表示当前操作为异常状态

回到tryFailure方法:

这里关注notifyListeners()这个方法, 这个方法是执行添加监听的回调函数, 当writeAndFlush和addListener是异步执行的时候, 这里有可能添加已经添加, 所以通过这个方法可以调用添加监听后的回调

如果writeAndFlush和addListener是同步执行的时候, 也就是都在NioEventLoop线程中执行的时候, 那么走到这里addListener还没执行, 所以这里不能回调添加监听的回调函数, 那么回调是什么时候执行的呢?我们在剖析addListener步骤的时候会给大家分析

具体执行回调我们再讲解添加监听的时候进行剖析

以上就是记录异常状态的大概逻辑

回到AbstractUnsafe的write方法:

我们再关注这一步:

outboundBuffer.addMessage(msg, size, promise);

跟到addMessage方法中

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    //代码省略
}

我们只需要关注包装Entry的newInstance方法, 该方法传入promise对象

跟到newInstance中:

static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
    Entry entry = RECYCLER.get();
    entry.msg = msg;
    entry.pendingSize = size;
    entry.total = total;
    entry.promise = promise;
    return entry;
}

这里将promise设置到Entry的成员变量中了, 也就是说, 每个Entry都关联了唯一的一个promise

我们回到AbstractChannelHandlerContext的invokeWriteAndFlush方法中:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

我们刚才分析了write操作中promise的传递以及状态设置的大概过程, 我们继续看在flush中promise的操作过程

这里invokeFlush0()并没有传入promise对象, 是因为我们刚才分析过, promise对象会绑定在缓冲区中entry的成员变量中, 可以通过其成员变量拿到promise对象

invokeFlush0()我们之前也分析过, 通过事件传递, 最终会调用HeadContext的flush方法:

public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}

最后跟到AbstractUnsafe的flush方法

public final void flush() {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    outboundBuffer.addFlush();
    flush0();
}

这块逻辑之前已分析过, 继续看flush0方法:

protected void flush0() {
    //代码省略
    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        //代码省略
    } finally {
        inFlush0 = false;
    }
}

篇幅原因我们省略大段代码

我们继续跟进doWrite方法:

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;
    boolean setOpWrite = false;
    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            clearOpWrite();
            return;
        }
        if (msg instanceof ByteBuf) {
            //代码省略
            boolean done = false;
            //代码省略
            if (done) {
                //移除当前对象
                in.remove();
            } else {
                break;
            }
        } else if (msg instanceof FileRegion) {
            //代码省略
        } else {
            throw new Error();
        }
    }
    incompleteWrite(setOpWrite);
}

这里也省略了大段代码, 我们重点关注in.remove()这里, 之前介绍过, 如果done为true, 说明刷新事件已完成, 则移除当前entry节点

我们跟到remove()方法中

public boolean remove() {
    Entry e = flushedEntry;
    if (e == null) {
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;
    ChannelPromise promise = e.promise;
    int size = e.pendingSize;
    removeEntry(e);
    if (!e.cancelled) {
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
        decrementPendingOutboundBytes(size, false, true);
    }
    e.recycle();
    return true;
}

这里我们看这一步:

ChannelPromise promise = e.promise;

之前我们剖析promise对象会绑定在entry中, 而这步就是从entry中获取promise对象

等remove操作完成, 会执行到这一步:

safeSuccess(promise);

这一步正好和我们刚才分析的safeSetFailure相反, 这里是设置成功状态

跟到safeSuccess方法中:

private static void safeSuccess(ChannelPromise promise) {
    if (!(promise instanceof VoidChannelPromise)) {
        PromiseNotificationUtil.trySuccess(promise, null, logger);
    }
}

再跟到trySuccess方法中

public static &lt;V&gt; void trySuccess(Promise&lt;? super V&gt; p, V result, InternalLogger logger) {
    if (!p.trySuccess(result) &amp;&amp; logger != null) {
        //代码省略
    }
}

这里再继续跟if中的trySuccess方法, 最后会走到DefaultPromise的trySuccess方法:

public boolean trySuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return true;
    }
    return false;
}

这里跟到setSuccess0方法中:

private boolean setSuccess0(V result) {
    return setValue0(result == null ? SUCCESS : result);
}

这里的逻辑我们刚才剖析过了, 这里参数传入一个信号SUCCESS, 表示设置成功状

再继续跟setValue方法:

private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        checkNotifyWaiters();
        return true;
    }
    return false;
}

同样, 在if判断中, 通过cas操作将参数传入的SUCCESS对象赋值到DefaultPromise的属性result中, 我们看这个属性:

private volatile Object result;

这里是Object类型, 也就是可以赋值成任何类型

SUCCESS是一个Signal类型的对象, 这里我们可以简单理解成一种状态, SUCCESS表示一种成功的状态

通过上述cas操作, result的值将赋值成SUCCESS

我们回到trySuccess方法:

public boolean trySuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return true;
    }
    return false;
}

设置完成功状态之后, 则会通过notifyListeners()执行监听中的回调

我们看用户代码

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ChannelFuture future = ctx.writeAndFlush("test data");
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()){
                System.out.println("写出成功");
            }else{
                System.out.println("写出失败");
            }
        }
    });
}

在回调中会判断future.isSuccess(), promise设置为成功状态这里会返回true, 从而打印写出成功"

跟到isSuccess方法中, 这里会调用DefaultPromise的isSuccess方法:

public boolean isSuccess() {
    Object result = this.result;
    return result != null &amp;&amp; result != UNCANCELLABLE &amp;&amp; !(result instanceof CauseHolder);
}

我们看到首先会拿到result对象, 然后判断result不为空, 并且不是UNCANCELLABLE, 并且不属于CauseHolder对象

我们刚才分析如果promise设置为成功装载, 则result为SUCCESS, 所以这里条件成立, 可以执行 if (future.isSuccess()) 中if块的逻辑

和设置错误状态的逻辑一样, 这里也有同样的问题, 如果writeAndFlush是和addListener是异步操作, 那么执行到回调的时候, 可能addListener已经添加完成, 所以可以正常的执行回调

那么如果writeAndFlush是和addListener是同步操作, writeAndFlush在执行回调的时候, addListener并没有执行, 所以无法执行回调方法, 那么回调方法是如何执行的呢, 我们看addListener这个方法:

addListener传入ChannelFutureListener对象, 并重写了operationComplete方法, 也就是执行回调的方法

这里会执行到DefaultChannelPromise的addListener方法, 跟进去

public ChannelPromise addListener(GenericFutureListener&lt;? extends Future&lt;? super Void&gt;&gt; listener) {
    super.addListener(listener);
    return this;
}

跟到父类的addListener中:

public Promise&lt;V&gt; addListener(GenericFutureListener&lt;? extends Future&lt;? super V&gt;&gt; listener) {
    checkNotNull(listener, "listener");
    synchronized (this) {
        addListener0(listener);
    }
    if (isDone()) {
        notifyListeners();
    }
    return this;
}

这里通过addListener0方法添加listener, 因为添加listener有可能会在不同的线程中操作, 比如用户线程和NioEventLoop线程, 为了防止并发问题, 这里简单粗暴的加了个synchronized关键字

跟到addListener0方法中

private void addListener0(GenericFutureListener&lt;? extends Future&lt;? super V&gt;&gt; listener) {
    if (listeners == null) {
        listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } else {
        listeners = new DefaultFutureListeners((GenericFutureListener&lt;? extends Future&lt;V&gt;&gt;) listeners, listener);
    }
}

如果是第一次添加listener, 则成员变量listeners为null, 这样就把参数传入的GenericFutureListener赋值到成员变量listeners

如果是第二次添加listener, listeners不为空, 会走到else if判断, 因为第一次添加的listener是GenericFutureListener类型, 并不是DefaultFutureListeners类型, 所以else if判断返回false, 进入到else块中

else块中, 通过new的方式创建一个DefaultFutureListeners对象并赋值到成员变量listeners中

DefaultFutureListeners的构造方法中, 第一个参数传入DefaultPromise中的成员变量listeners, 也就是第一次添加的GenericFutureListener对象, 第二个参数为第二次添加的GenericFutureListener对象, 这里通过两个GenericFutureListener对象包装成一个DefaultFutureListeners对象

我们看listeners的定义:

private Object listeners;

这里是个Object类型, 所以可以保存任何类型的对象

再看DefaultFutureListeners的构造方法:

DefaultFutureListeners(
        GenericFutureListener&lt;? extends Future&lt;?&gt;&gt; first, GenericFutureListener&lt;? extends Future&lt;?&gt;&gt; second) {
    listeners = new GenericFutureListener[2];
    //第0个
    listeners[0] = first;
    //第1个
    listeners[1] = second;
    size = 2;
    //代码省略
}

在DefaultFutureListeners类中也定义了一个成员变量listeners, 类型为GenericFutureListener数组

构造方法中初始化listeners这个数组, 并且数组中第一个值赋值为我们第一次添加的GenericFutureListener, 第二个赋值为我们第二次添加的GenericFutureListener

回到addListener0方法中

private void addListener0(GenericFutureListener&lt;? extends Future&lt;? super V&gt;&gt; listener) {
    if (listeners == null) {
        listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } else {
        listeners = new DefaultFutureListeners((GenericFutureListener&lt;? extends Future&lt;V&gt;&gt;) listeners, listener);
    }
}

经过两次添加listener, 属性listeners的值就变成了DefaultFutureListeners类型的对象, 如果第三次添加listener, 则会走到else if块中, DefaultFutureListeners对象通过调用add方法继续添加listener

跟到add方法中:

public void add(GenericFutureListener&lt;? extends Future&lt;?&gt;&gt; l) {
    GenericFutureListener&lt;? extends Future&lt;?&gt;&gt;[] listeners = this.listeners;
    final int size = this.size;
    if (size == listeners.length) {
        this.listeners = listeners = Arrays.copyOf(listeners, size &lt;&lt; 1);
    }
    listeners[size] = l;
    this.size = size + 1;
    //代码省略
}

这里的逻辑也比较简单, 就是为当前的数组对象listeners中追加新的GenericFutureListener对象, 如果listeners容量不足则进行扩容操作

根据以上逻辑, 就完成了listener的添加逻辑

那么再看我们刚才遗留的问题, 如果writeAndFlush和addListener是同步进行的, writeAndFlush执行回调时还没有addListener还没有执行回调, 那么回调是如何执行的呢?

回到DefaultPromise的addListener中:

public Promise&lt;V&gt; addListener(GenericFutureListener&lt;? extends Future&lt;? super V&gt;&gt; listener) {
    checkNotNull(listener, "listener");
    synchronized (this) {
        addListener0(listener);
    }
    if (isDone()) {
        notifyListeners();
    }
    return this;
}

我们分析完了addListener0方法, 再往下看

这个会有if判断isDone(), isDone方法, 就是程序执行到这一步的时候, 判断刷新事件是否执行完成

跟到isDone方法中

public boolean isDone() {
    return isDone0(result);
}

继续跟isDone0, 这里传入了成员变量result

private static boolean isDone0(Object result) {
    return result != null &amp;&amp; result != UNCANCELLABLE;
}

这里判断result不为null并且不为UNCANCELLABLE, 则就表示完成

因为成功的状态是SUCCESS, 所以flush成功这里会返回true

回到 addListener中:

如果执行完成, 就通过notifyListeners()方法执行回调, 这也解释刚才的问题, 在同步操作中, writeAndFlush在执行回调时并没有添加listener, 所以添加listener的时候会判断writeAndFlush的执行状态, 如果状态时完成, 则会这里执行回调

同样, 在异步操作中, 走到这里writeAndFlush可能还没完成, 所以这里不会执行回调, 由writeAndFlush执行回调

所以, 无论writeAndFlush和addListener谁先完成, 都可以执行到回调方法

跟到notifyListeners()方法中

private void notifyListeners() {
    EventExecutor executor = executor();
    if (executor.inEventLoop()) {
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        final int stackDepth = threadLocals.futureListenerStackDepth();
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
                notifyListenersNow();
            } finally {
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }
    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

这里首先判断是否是eventLoop线程, 如果是eventLoop线程则执行if块中的逻辑, 如果不是eventLoop线程, 则把执行回调的逻辑封装成task丢到EventLoop的任务队列中异步执行

我们重点关注notifyListenersNow()方法, 跟进去:

private void notifyListenersNow() {
    Object listeners;
    synchronized (this) {
        if (notifyingListeners || this.listeners == null) {
            return;
        }
        notifyingListeners = true;
        listeners = this.listeners;
        this.listeners = null;
    }
    for (;;) {
        if (listeners instanceof DefaultFutureListeners) {
            notifyListeners0((DefaultFutureListeners) listeners);
        } else {
            notifyListener0(this, (GenericFutureListener&lt;? extends Future&lt;V&gt;&gt;) listeners);
        }
        //代码省略
    }
}

在无限for循环中, 首先首先判断listeners是不是DefaultFutureListeners类型, 根据我们之前的逻辑, 如果只添加了一个listener, 则listeners是GenericFutureListener类型

通常在添加的时候只会添加一个listener, 所以我们跟到else块中的notifyListener0方法:

private static void notifyListener0(Future future, GenericFutureListener l) {
    try {
        l.operationComplete(future);
    } catch (Throwable t) {
        logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
    }
}

我们看到, 这里执行了GenericFutureListener的中我们重写的回调函数operationComplete

以上就是执行回调的相关逻辑

章节小结

这一章讲解了有关write和flush的相关逻辑, 并分析了有关添加监听和异步写数据的相关步骤

经过学习, 同学们应该掌握如下知识:

write操作是如何将ByteBuf添加到发送缓冲区的

flush操作是如何将ByteBuf写出到chanel中的

抽象编码器MessageToByteEncoder中如何定义了编码器的骨架逻辑

writeAndFlush和addListener在同步和异步操作中是如何执行回调的

更多关于Netty分布式Future和Promise执行回调的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty分布式ByteBuf使用subPage级别内存分配剖析

    目录 subPage级别内存分配 我们其中是在构造方法中初始化的, 看构造方法中其初始化代码 在构造方法中创建完毕之后, 会通过循环为其赋值 这里通过normCapacity拿到tableIdx 跟到allocate(normCapacity)方法中 我们跟到PoolSubpage的构造方法中 我们跟到addToPool(head)中 我们跟到allocate()方法中 我们继续跟进findNextAvail方法 我们回到allocate()方法中 我们跟到initBuf方法中 回到initBu

  • Netty分布式ByteBuf使用page级别的内存分配解析

    目录 netty内存分配数据结构 我们看PoolArena中有关chunkList的成员变量 我们看PoolSubpage的属性 我们回到PoolArena的allocate方法 我们跟进allocateNormal 首先会从head节点往下遍历 这里直接通过构造函数创建了一个chunk 首先将参数传入的值进行赋值 我们再回到PoolArena的allocateNormal方法中 跟到allocate(normCapacity)中 我们跟到allocateNode方法中 我们跟进updatePa

  • Netty分布式编码器写buffer队列逻辑剖析

    目录 写buffer队列 我们跟到AbstractUnsafe的write方法中 回到write方法中 我们跟到setUnwritable(invokeLater)方法中 前文传送门:抽象编码器MessageToByteEncoder 写buffer队列 之前的小节我们介绍过, writeAndFlush方法其实最终会调用write和flush方法 write方法最终会传递到head节点, 调用HeadContext的write方法: public void write(ChannelHandl

  • Netty分布式抽象编码器MessageToByteEncoder逻辑分析

    目录 MessageToByteEncoder 首先看MessageToByteEncoder的类声明 跟到allocateBuffer方法中 前文回顾:Netty分布式编码器及写数据事件处理 MessageToByteEncoder 同解码器一样, 编码器中也有一个抽象类叫MessageToByteEncoder, 其中定义了编码器的骨架方法, 具体编码逻辑交给子类实现 解码器同样也是个handler, 将写出的数据进行截取处理, 我们在学习pipeline中我们知道, 写数据的时候会传递wr

  • Netty分布式flush方法刷新buffer队列源码剖析

    flush方法 上一小节学习了writeAndFlush的write方法, 这一小节我们剖析flush方法 通过前面的学习我们知道, flush方法通过事件传递, 最终会传递到HeadContext的flush方法: public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } 这里最终会调用AbstractUnsafe的flush方法 public final void flush() { a

  • Netty分布式ByteBuf使用的回收逻辑剖析

    目录 ByteBuf回收 这里调用了release0, 跟进去 我们首先分析free方法 我们跟到cache中 回到add方法中 我们回到free方法中 前文传送门:ByteBuf使用subPage级别内存分配 ByteBuf回收 之前的章节我们提到过, 堆外内存是不受jvm垃圾回收机制控制的, 所以我们分配一块堆外内存进行ByteBuf操作时, 使用完毕要对对象进行回收, 这一小节, 就以PooledUnsafeDirectByteBuf为例讲解有关内存分配的相关逻辑 PooledUnsafe

  • Netty分布式Future与Promise执行回调相关逻辑剖析

    目录 Future和Promise执行回调 首先我们看一段写在handler中的业务代码 这里关注newPromise()方法, 跟进去 我们继续跟write方法 跟进tryFailure方法 跟到addMessage方法中 最后跟到AbstractUnsafe的flush方法 我们跟到remove()方法中 再跟到trySuccess方法中 我们看用户代码 跟到addListener0方法中 回到addListener0方法中 跟到isDone方法中 跟到notifyListeners()方法

  • Netty分布式解码器读取数据不完整的逻辑剖析

    目录 概述 第一节: ByteToMessageDecoder 我们看他的定义 我们看其channelRead方法 我们看cumulator属性 我们回到channRead方法中 概述 在我们上一个章节遗留过一个问题, 就是如果Server在读取客户端的数据的时候, 如果一次读取不完整, 就触发channelRead事件, 那么Netty是如何处理这类问题的, 在这一章中, 会对此做详细剖析 之前的章节我们学习过pipeline, 事件在pipeline中传递, handler可以将事件截取并对

  • Netty分布式获取异线程释放对象源码剖析

    目录 获取异线程释放对象 在介绍之前我们首先看Stack类中的两个属性 我们跟到pop方法中 继续跟到scavengeSome方法中 我们继续分析transfer方法 接着我们我们关注一个细节 我们跟到reclaimSpace方法 章节小结 前文传送门:异线程下回收对象 获取异线程释放对象 上一小节分析了异线程回收对象, 原理是通过与stack关联的WeakOrderQueue进行回收 如果对象经过异线程回收之后, 当前线程需要取出对象进行二次利用, 如果当前stack中为空, 则会通过当前st

  • Netty分布式从recycler对象回收站获取对象过程剖析

    前文传送门:Netty分布式高性能工具类recycler的使用及创建 从对象回收站中获取对象 我们回顾上一小节demo的main方法中 从回收站获取对象 public static void main(String[] args){ User user1 = RECYCLER.get(); user1.recycle(); User user2 = RECYCLER.get(); user2.recycle(); System.out.println(user1==user2); } 这个通过R

  • Netty分布式高性能工具类FastThreadLocal和Recycler分析

    目录 概述 第一节:FastThreadLocal的使用和创建 首先我们看一个最简单的demo 跟到nextVariableIndex方法中 我们首先剖析slowGet()方法 我们跟进fastGet 回到FastThreadLocal的get方法中 在我们的demo中对应这个方法 前文传送门:Netty分布式Future与Promise执行回调相关逻辑剖析 概述 FastThreadLocal我们在剖析堆外内存分配的时候简单介绍过, 它类似于JDK的ThreadLocal, 也是用于在多线程条

  • Netty分布式NioEventLoop任务队列执行源码分析

    目录 执行任务队列 跟进runAllTasks方法: 我们跟进fetchFromScheduledTaskQueue()方法 回到runAllTasks(long timeoutNanos)方法中 章节小结 前文传送门:NioEventLoop处理IO事件 执行任务队列 继续回到NioEventLoop的run()方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectN

  • Netty分布式pipeline管道传播事件的逻辑总结分析

    目录 问题分析 首先完成了handler的添加, 但是并没有马上执行回调 回到callHandlerCallbackLater方法中 章节总结 我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾 问题分析 首先看两个问题: 1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAccep

  • Netty分布式pipeline管道Handler的删除逻辑操作

    目录 删除handler操作 我们跟到getContextPrDie这个方法中 首先要断言删除的节点不能是tail和head 回到remove(ctx)方法 上一小节我们学习了添加handler的逻辑操作, 这一小节我们学习删除handler的相关逻辑 删除handler操作 如果用户在业务逻辑中进行ctx.pipeline().remove(this)这样的写法, 或者ch.pipeline().remove(new SimpleHandler())这样的写法, 则就是对handler进行删除

  • Netty分布式pipeline管道Handler的添加代码跟踪解析

    目录 添加handler 我们跟到其addLast()方法中 再继续跟到addLast()方法中去 我们跟到checkMultiplicity(handler)中 跟到filterName方法中 跟到isInbound(handler)方法中 我们回到最初的addLast()方法中 我们跟进addLast0(newCtx)中 前文传送门:Netty分布式pipeline管道创建 添加handler 我们以用户代码为例进行剖析: .childHandler(new ChannelInitializ

随机推荐