Netty分布式获取异线程释放对象源码剖析

目录
  • 获取异线程释放对象
    • 在介绍之前我们首先看Stack类中的两个属性
    • 我们跟到pop方法中
    • 继续跟到scavengeSome方法中
    • 我们继续分析transfer方法
    • 接着我们我们关注一个细节
    • 我们跟到reclaimSpace方法
  • 章节小结

前文传送门:异线程下回收对象

获取异线程释放对象

上一小节分析了异线程回收对象, 原理是通过与stack关联的WeakOrderQueue进行回收

如果对象经过异线程回收之后, 当前线程需要取出对象进行二次利用, 如果当前stack中为空, 则会通过当前stack关联的WeakOrderQueue进行取出, 这也是这一小写要分析的, 获取异线程释放的对象

在介绍之前我们首先看Stack类中的两个属性

private WeakOrderQueue cursor, prev;
private volatile WeakOrderQueue head;

这里都是指向WeakOrderQueue的指针, 其中head我们上一小节分析过, 指向最近创建的和stack关联WeakOrderQueue, 也就是头结点

cursor代表的是寻找的当前WeakOrderQueue, pre则是cursor上一个节点, 如图所示:

8-7-1

我们从获取对象的入口方法, handle的get开始分析

public final T get() {
    if (maxCapacityPerThread == 0) {
        return newObject((Handle<T>) NOOP_HANDLE);
    }
    Stack<T> stack = threadLocal.get();
    DefaultHandle<T> handle = stack.pop();
    if (handle == null) {
        handle = stack.newHandle();
        handle.value = newObject(handle);
    }
    return (T) handle.value;
}

这块逻辑我们并不陌上, stack对象通过pop弹出一个handle

我们跟到pop方法中

DefaultHandle<T> pop() {
    int size = this.size;
    if (size == 0) {
        if (!scavenge()) {
            return null;
        }
        size = this.size;
    }
    size --;
    DefaultHandle ret = elements[size];
    elements[size] = null;
    if (ret.lastRecycledId != ret.recycleId) {
        throw new IllegalStateException("recycled multiple times");
    }
    ret.recycleId = 0;
    ret.lastRecycledId = 0;
    this.size = size;
    return ret;
}

这里我们重点关注, 如果size为空, 也就是当前tack为空的情况下, 会走到scavenge方法, 这个方法, 就是从WeakOrderQueue获取对象的方法

跟进scavenge方法

boolean scavenge() {
    if (scavengeSome()) {
        return true;
    }
    prev = null;
    cursor = head;
    return false;
}

scavengeSome方法表示已经回收到了对象, 则直接返回, 如果没有回收到对象, 则将prev和cursor两个指针进行重置

继续跟到scavengeSome方法中

boolean scavengeSome() {
    WeakOrderQueue cursor = this.cursor;
    if (cursor == null) {
        cursor = head;
        if (cursor == null) {
            return false;
        }
    }
    boolean success = false;
    WeakOrderQueue prev = this.prev;
    do {
        if (cursor.transfer(this)) {
            success = true;
            break;
        }
        WeakOrderQueue next = cursor.next;
        if (cursor.owner.get() == null) {
            if (cursor.hasFinalData()) {
                for (;;) {
                    if (cursor.transfer(this)) {
                        success = true;
                    } else {
                        break;
                    }
                }
            }
            if (prev != null) {
                prev.next = next;
            }
        } else {
            prev = cursor;
        }
        cursor = next;
    } while (cursor != null && !success);
    this.prev = prev;
    this.cursor = cursor;
    return success;
}

首先拿到cursor指针, cursor指针代表要回收的WeakOrderQueue

如果cursor为空, 则让其指向头节点, 如果头节点也空, 说明当前stack没有与其关联的WeakOrderQueue, 则返回false

通过一个布尔值success标记回收状态

然后拿到pre指针, 也就是cursor的上一个节点, 之后进入一个do-while循环

do-while循环的终止条件是, 如果没有遍历到最后一个节点并且回收的状态为false, 这里我们可以分析到再循环体里, 是不管遍历与stack关联的WeakOrderQueue, 直到弹出对象为止

跟到do-while循环中:

首先cursor指针会调用transfer方法, 该方法表示从当前指针指向的WeakOrderQueue中将元素放入到当前stack中, 如果取出成功则将success设置为true并跳出循环, transfer我们稍后分析, 我们继续往下看

如果没有获得元素, 则会通过next属性拿到下一个WeakOrderQueue, 然后会进入一个判断 if (cursor.owner.get() == null)

owner属性我们上一小节提到过, 就是与当前WeakOrderQueue关联的一个线程, get方法就是获得关联的线程对象, 如果这个对象为null说明该线程不存在, 则进入if块, 也就是一些清理的工作

if块中又进入一个判断 if (cursor.hasFinalData()) , 这里表示当前的WeakOrderQueue中是否还有数据, 如果有数据则通过for循环将数据通过transfer方法传输到当前stack中, 传输成功的, 将success标记为true

transfer方法是将WeakOrderQueue中一个link中的handle往stack进行传输, 有关link的相关内容, 我们上一小节也进行过分析

所以这里通过for循环将每个link的中的数据传输到stack中

继续往下看, 如果pre节点不为空, 则通过 prev.next = next 将cursor节点进行释放, 也就是pre的下一个节点指向cursor的下一个节点

继续往下看else块中的 prev = cursor

这里表示如果当前线程还在, 则将prev赋值为cursor, 代表prev后移一个节点

最后通过cursor = next将cursor后移一位, 然后再继续进行循环

循环结束之后, 将stack的prev和cursor属性进行保存

我们跟到transfer方法中, 分析如何将WeakOrderQueue中的handle传输到stack中:

boolean transfer(Stack<?> dst) {
    Link head = this.head;
    if (head == null) {
        return false;
    }
    if (head.readIndex == LINK_CAPACITY) {
        if (head.next == null) {
            return false;
        }
        this.head = head = head.next;
    }
    final int srcStart = head.readIndex;
    int srcEnd = head.get();
    final int srcSize = srcEnd - srcStart;
    if (srcSize == 0) {
        return false;
    }
    final int dstSize = dst.size;
    final int expectedCapacity = dstSize + srcSize;
    if (expectedCapacity > dst.elements.length) {
        final int actualCapacity = dst.increaseCapacity(expectedCapacity);
        srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
    }
    if (srcStart != srcEnd) {
        final DefaultHandle[] srcElems = head.elements;
        final DefaultHandle[] dstElems = dst.elements;
        int newDstSize = dstSize;
        for (int i = srcStart; i < srcEnd; i++) {
            DefaultHandle element = srcElems[i];
            if (element.recycleId == 0) {
                element.recycleId = element.lastRecycledId;
            } else if (element.recycleId != element.lastRecycledId) {
                throw new IllegalStateException("recycled already");
            }
            srcElems[i] = null;
            if (dst.dropHandle(element)) {
                continue;
            }
            element.stack = dst;
            dstElems[newDstSize ++] = element;
        }
        if (srcEnd == LINK_CAPACITY && head.next != null) {
            reclaimSpace(LINK_CAPACITY);
            this.head = head.next;
        }
        head.readIndex = srcEnd;
        if (dst.size == newDstSize) {
            return false;
        }
        dst.size = newDstSize;
        return true;
    } else {
        return false;
    }
}

8-7-2

我们上一小节分析过, WeakOrderQueue是由多个link组成, 每个link通过链表的方式进行关联, 其中head属性指向第一个link, tail属性指向最后一个link

在每个link中有多个handle

在link中维护了一个读指针readIndex, 标记着读取link中handle的位置

我们继续分析transfer方法

首先获取头结点, 并判断头结点是否为空, 如果头结点为空, 说明当前WeakOrderQueue并没有link, 返回false

if (head.readIndex == LINK_CAPACITY) 这里判断读指针是否为16, 因为link中元素最大数量就是16, 如果读指针为16, 说明当前link中的数据都被取走了

接着判断 head.next == null , 表示是否还有下一个link, 如果没有下一个link, 则说明当前WeakOrderQueue没有元素了, 则返回false

8-7-3

继续往下看, 拿到head节点的读指针和head中元素的数量, 接着计算可以传输元素的大小, 如果大小为0, 则返回false

8-7-4

接着, 拿到当前stack的大小, 当前stack大小加上可以传输的大小表示stack中所需要的容量

if (expectedCapacity > dst.elements.length) 表示如果需要的容量大于当前stack中所维护的数组的大小, 则将stack中维护的数组进行扩容, 进入if块中

扩容之后会返回actualCapacity, 表示扩容之后的大小

再看 srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd) 这步

srcEnd表示可以从Link中取的最后一个元素的下标

srcStart + actualCapacity - dstSize 这里我们进行一个拆分, actualCapacity - dstSize表示扩容后大大小-原stack的大小, 也就是最多能往stack中传输多少元素

读指针+可以往stack传输的数量, 可以表示往stack中传输的最后一个下标, 这里的下标和srcEnd中取一个较小的值, 也就是既不能超过stack的容量, 也不能造成当前link中下标越界

继续往下看

int newDstSize = dstSize 表示初始化stack的下标, 表示stack中从这个下标开始添加数据

然后判断 srcStart != srcEnd , 表示能不能同link中获取内容, 如果不能, 则返回false, 如果可以, 则进入if块中

接着拿到当前link的数组elements和stack中的数组elements

然后通过for循环, 通过数组下标的方式不断的将当前link中的数据放入到stack中

for循环中首先拿到link的第i个元素

接着我们我们关注一个细节

if (element.recycleId == 0) {
    element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
    throw new IllegalStateException("recycled already");
}

这里 element.recycleId == 0 表示对象没有被回收过, 如果没有被回收过, 则赋值为lastRecycledId, 我们前面分析过lastRecycledId是WeakOrderQueue中的唯一下标, 通过赋值标记element被回收过

然后继续判断 element.recycleId != element.lastRecycledId , 这表示该对象被回收过, 但是回收的recycleId却不是最后一次回收lastRecycledId, 这是一种异常情况, 表示一个对象在不同的地方被回收过两次, 这种情况则抛出异常

接着将link的第i个元素设置为null

继续往下看:

if (dst.dropHandle(element)) {
    continue;
}

这里表示控制回收站回收的频率, 之前的小节我们分析过, 这里不再赘述

element.stack = dst 表示将handle的stack属性设置到当前stack

dstElems[newDstSize ++] = element 这里通过数组的下标的方式将link中的handle赋值到stack的数组中

继续往下看:

if (srcEnd == LINK_CAPACITY && head.next != null) {
    reclaimSpace(LINK_CAPACITY);
    this.head = head.next;
}

这里的if表循环结束后, 如果link中的数据已经回收完毕, 并且还有下一个节点则会进到reclaimSpace方法

我们跟到reclaimSpace方法

private void reclaimSpace(int space) {
    assert space >= 0;
    availableSharedCapacity.addAndGet(space);
}

这里将availableSharedCapacity加上16, 表示WeakOrderQueue还可以继续插入link

继续看transfer方法

this.head = head.next 表示将head节点后移一个元素

head.readIndex = srcEnd 表示将读指针指向srcEnd, 下一次读取可以从srcEnd开始

if (dst.size == newDstSize) 表示没有向stack传输任何对象, 则返回false

否则就通过 dst.size = newDstSize 更新stack的大小为newDstSize, 并返回true

以上就是从link中往stack中传输数据的过程

章节小结

这一章主要讲解了两个性能优化工具了FastThreadLocal和Recycler

FastThreadLocal和jdk的ThreadLocal功能类似, 只是性能更快, 通过FastTreadLocalThread中的threadLocalMap对象, 通过数组下标的方式进行保存和获取对象

Recycler是一个轻量级的对象回收站, 用于对象重用, 避免了对象的频繁创建和减轻gc的压力

Recycler同线程回收对象是通过一个线程共享的stack实现的, 将对象包装成handle并存入stack中

Reclyer异线程回收对象是将handle存入一个与stack关联的WeakOrderQueue中, 同一个stack中关联的不同WeakOrderQueue由不同的线程创建

从Recycler获取对象时stack中有值, 则可以直接从stack中获取

如果stack中没有值则通过stack关联的WeakOrderQueue中获取

以上就是Netty分布式获取异线程释放对象源码剖析的详细内容,更多关于Netty分布式获取异线程释放对象的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty分布式Future与Promise执行回调相关逻辑剖析

    目录 Future和Promise执行回调 首先我们看一段写在handler中的业务代码 这里关注newPromise()方法, 跟进去 我们继续跟write方法 跟进tryFailure方法 跟到addMessage方法中 最后跟到AbstractUnsafe的flush方法 我们跟到remove()方法中 再跟到trySuccess方法中 我们看用户代码 跟到addListener0方法中 回到addListener0方法中 跟到isDone方法中 跟到notifyListeners()方法

  • Netty分布式FastThreadLocal的set方法实现逻辑剖析

    目录 FastThreadLocal的set方法实现 线程set对象 我们跟到setIndexedVariable中 我们跟进removeIndexedVariable方法 上一小节我们学习了FastThreadLocal的创建和get方法的实现逻辑, 这一小节学习FastThreadLocal的set方法的实现逻辑 FastThreadLocal的set方法实现 set方法, 其实就是修改线程共享对象, 作用域只是当前线程, 我们回顾根据上一小节demo中, 其中一个线程set对象的过程: 线

  • Netty分布式高性能工具类recycler的使用及创建

    目录 recycler的使用 这里看一个示例 在Recycler的类的源码中, 我们看到这一段逻辑 跟到Stack的构造方法中 继续跟重载的构造方法 我们再回到Stack的构造方法中 前文传送门:Netty分布式FastThreadLocal的set方法实现逻辑剖析 recycler的使用 这一小节开始学习recycler相关的知识, recycler是netty实现的一个轻量级对象回收站, 在netty中, recycler的使用也是相当之频繁的 recycler作用是保证了对象的循环利用, 

  • Netty分布式从recycler对象回收站获取对象过程剖析

    前文传送门:Netty分布式高性能工具类recycler的使用及创建 从对象回收站中获取对象 我们回顾上一小节demo的main方法中 从回收站获取对象 public static void main(String[] args){ User user1 = RECYCLER.get(); user1.recycle(); User user2 = RECYCLER.get(); user2.recycle(); System.out.println(user1==user2); } 这个通过R

  • Netty分布式高性能工具类FastThreadLocal和Recycler分析

    目录 概述 第一节:FastThreadLocal的使用和创建 首先我们看一个最简单的demo 跟到nextVariableIndex方法中 我们首先剖析slowGet()方法 我们跟进fastGet 回到FastThreadLocal的get方法中 在我们的demo中对应这个方法 前文传送门:Netty分布式Future与Promise执行回调相关逻辑剖析 概述 FastThreadLocal我们在剖析堆外内存分配的时候简单介绍过, 它类似于JDK的ThreadLocal, 也是用于在多线程条

  • Netty分布式高性能工具类同线程下回收对象解析

    目录 同线程回收对象 回顾第三小节的demo中的main方法 我们跟进recycle方法 然后获取当前size 同线程回收对象 上一小节剖析了从recycler中获取一个对象, 这一小节分析在创建和回收是同线程的前提下, recycler是如何进行回收的 回顾第三小节的demo中的main方法 public static void main(String[] args){ User user1 = RECYCLER.get(); user1.recycle(); User user2 = REC

  • Netty分布式获取异线程释放对象源码剖析

    目录 获取异线程释放对象 在介绍之前我们首先看Stack类中的两个属性 我们跟到pop方法中 继续跟到scavengeSome方法中 我们继续分析transfer方法 接着我们我们关注一个细节 我们跟到reclaimSpace方法 章节小结 前文传送门:异线程下回收对象 获取异线程释放对象 上一小节分析了异线程回收对象, 原理是通过与stack关联的WeakOrderQueue进行回收 如果对象经过异线程回收之后, 当前线程需要取出对象进行二次利用, 如果当前stack中为空, 则会通过当前st

  • Netty分布式flush方法刷新buffer队列源码剖析

    flush方法 上一小节学习了writeAndFlush的write方法, 这一小节我们剖析flush方法 通过前面的学习我们知道, flush方法通过事件传递, 最终会传递到HeadContext的flush方法: public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } 这里最终会调用AbstractUnsafe的flush方法 public final void flush() { a

  • Netty分布式客户端处理接入事件handle源码解析

    目录 处理接入事件创建handle 我们看其RecvByteBufAllocator接口 跟进newHandle()方法中 继续回到read()方法 我们跟进reset中 前文传送门 :客户端接入流程初始化源码分析 上一小节我们剖析完成了与channel绑定的ChannelConfig初始化相关的流程, 这一小节继续剖析客户端连接事件的处理 处理接入事件创建handle 回到上一章NioEventLoop的processSelectedKey ()方法 private void processS

  • Netty分布式server启动流程Nio创建源码分析

    目录 NioServerSocketChannel创建 继承关系 绑定端口 端口封装成socket地址对象 跟进initAndRegister()方法 创建channel 父类的构造方法 将jdk的channel设置为非阻塞模式 前文传送门 Netty分布式Server启动流程服务端初始化源码分析 NioServerSocketChannel创建 我们如果熟悉Nio, 则对channel的概念则不会陌生, channel在相当于一个通道, 用于数据的传输 Netty将jdk的channel进行了

  • Netty启动流程服务端channel初始化源码分析

    目录 服务端channel初始化 回顾上一小节initAndRegister()方法 init(Channel)方法 前文传送门 Netty分布式server启动流程 服务端channel初始化 回顾上一小节initAndRegister()方法 final ChannelFuture initAndRegister() { Channel channel = null; try { //创建channel channel = channelFactory.newChannel(); //初始化

  • ZooKeeper入门教程三分布式锁实现及完整运行源码

    目录 1.0版本 2.0版本 LockSample类 构造方法 获取锁实现 createLock() attemptLock() 释放锁实现 TicketSeller类 sell() sellTicketWithLock() 测试入口 测试方法 代码清单如下: 1.LockSample 2.TicketSeller ZooKeeper入门教程一简介与核心概念 ZooKeeper入门教程二在单机和集群环境下的安装搭建及使用 1.0版本 首先我们先介绍一个简单的zookeeper实现分布式锁的思路:

  • 游戏服务器中的Netty应用以及源码剖析

    目录 一.Reactor模式和Netty线程模型 1. BIO模型 2. NIO模型 3. Reacor模型 ①. 单Reacor单线程模型 ②. 单Reactor多线程模型 ③. 主从Reactor多线程模型 ④. 部分源码分析 二.select/poll和epoll 1.概念 2.jdk提供selector 3.Netty提供的Epoll封装 4.Netty相关类图 5.配置Netty为EpollEventLoop 三.Netty相关参数 1.SO_KEEPALIVE 2.SO_REUSEA

  • Java线程变量ThreadLocal源码分析

    1.ThreadLocal 线程变量,和当前线程绑定的,只保存当前线程的变量,对于其他线程是隔离的,是访问不到里面的数据的. 2.在Looper中使用到了ThreadLocal,创建了一个Looper是保存到了ThreadLocal中. //这里用到了泛型,ThreadLocal中只保存Looper对象. static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>(); private static

  • Java线程池ThreadPoolExecutor源码深入分析

    1.线程池Executors的简单使用 1)创建一个线程的线程池. Executors.newSingleThreadExecutor(); //创建的源码 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new Linke

  • C#线程倒计时器源码分享

    本文实例为大家分享了C#线程倒计时器源码,供大家参考,具体内容如下 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Windows.Forms; namespace ListZZBG { class TimeHeleper { Thread thread; private TimeSpan time;

随机推荐