Netty分布式高性能工具类FastThreadLocal和Recycler分析

目录
  • 概述
  • 第一节:FastThreadLocal的使用和创建
    • 首先我们看一个最简单的demo
    • 跟到nextVariableIndex方法中
    • 我们首先剖析slowGet()方法
    • 我们跟进fastGet
    • 回到FastThreadLocal的get方法中
    • 在我们的demo中对应这个方法

前文传送门:Netty分布式Future与Promise执行回调相关逻辑剖析

概述

FastThreadLocal我们在剖析堆外内存分配的时候简单介绍过, 它类似于JDK的ThreadLocal, 也是用于在多线程条件下, 保证统一线程的对象共享, 只是netty中定义的FastThreadLocal, 性能要高于jdk的ThreadLocal, 具体原因会在之后的小节进行剖析

Recyler我们应该也不会太陌生, 因为在之前章节中, 有好多地方使用了Recyler

Recyler是netty实现的一个轻量级对象回收站, 很多对象在使用完毕之后, 并没有直接交给gc去处理, 而是通过对象回收站将对象回收, 目的是为了对象重用和减少gc压力

比如ByteBuf对象的回收, 因为ByteBuf对象在netty中会频繁创建, 并且会占用比较大的内存空间, 所以使用完毕后会通过对象回收站的方式进行回收, 已达到资源重用的目的

这一章就对FastThreadLocal和Recyler两个并发工具类进行分析

第一节:FastThreadLocal的使用和创建

首先我们看一个最简单的demo

public class FastThreadLocalDemo {
    final class FastThreadLocalTest extends FastThreadLocal<Object>{
        @Override
        protected Object initialValue() throws Exception {
            return new Object();
        }
    }
    private final FastThreadLocalTest fastThreadLocalTest;
    public FastThreadLocalDemo(){
        fastThreadLocalTest = new FastThreadLocalTest();
    }
    public static void main(String[] args){
        FastThreadLocalDemo fastThreadLocalDemo = new FastThreadLocalDemo();
        new Thread(new Runnable() {
            @Override
            public void run() {
                Object obj  = fastThreadLocalDemo.fastThreadLocalTest.get();
                try {
                    for (int i=0;i<10;i++){
                        fastThreadLocalDemo.fastThreadLocalTest.set(new Object());
                        Thread.sleep(1000);
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Object obj  = fastThreadLocalDemo.fastThreadLocalTest.get();
                    for (int i=0;i<10;i++){
                        System.out.println(obj == fastThreadLocalDemo.fastThreadLocalTest.get());
                        Thread.sleep(1000);
                    }
                }catch (Exception e){
                }
            }
        }).start();
    }
}

这里首先声明一个内部类FastThreadLocalTest继承FastThreadLocal, 并重写initialValue方法, initialValue方法就是用来初始化线程共享对象的

然后声明一个成员变量fastThreadLocalTest, 类型就是内部类FastThreadLocalTest

在构造方法中初始化fastThreadLocalTest

main方法中创建当前类FastThreadLocalDemo的对象fastThreadLocalDemo

然后启动两个线程, 每个线程通过fastThreadLocalDemo.fastThreadLocalTest.get()的方式拿到线程共享对象, 因为fastThreadLocalDemo是相同的, 所以fastThreadLocalTest对象也是同一个, 同一个对象在不同线程中进行get()

第一个线程循环通过set方法修改共享对象的值

第二个线程则循环判断并输出fastThreadLocalTest.get()出来的对象和第一次get出来的对象是否相等

这里输出结果都true, 说明其他线程虽然不断修改共享对象的值, 但都不影响当前线程共享对象的值

这样就实现了线程共享的对象的功能

根据上述示例, 我们剖析FastThreadLocal的创建

首先跟到FastThreadLocal的构造方法中:

public FastThreadLocal() {
    index = InternalThreadLocalMap.nextVariableIndex();
}

这里的index, 代表FastThreadLocal对象的一个下标, 每创建一个FastThreadLocal, 都会有一个唯一的自增的下标

跟到nextVariableIndex方法中

public static int nextVariableIndex() {
    int index = nextIndex.getAndIncrement();
    if (index &lt; 0) {
        nextIndex.decrementAndGet();
        throw new IllegalStateException("too many thread-local indexed variables");
    }
    return index;
}

这里只是获取nextIndex通过getAndIncrement()进行原子自增, 创建第一个FastThreadLocal对象时, nextIndex为0, 创建第二个FastThreadLocal对象时nextIndex为1, 以此类推, 第n次nextIndex为n-1, 如图所示

8-1-1

我们回到demo中, 我们看线程中的这一句:

Object obj = fastThreadLocalDemo.fastThreadLocalTest.get();

这里调用了FastThreadLocal对象的get方法, 作用是创建一个线程共享对象

我们跟到get方法中:

public final V get() {
    return get(InternalThreadLocalMap.get());
}

这里调用了一个重载的get方法, 参数中通过InternalThreadLocalMap的get方法获取了一个InternalThreadLocalMap对象

我们跟到InternalThreadLocalMap的get方法中, 分析其实如何获取InternalThreadLocalMap对象的

public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
        return fastGet((FastThreadLocalThread) thread);
    } else {
        return slowGet();
    }
}

这里首先拿到当前线程, 然后判断当前线程是否为FastThreadLocalThread线程, 通常NioEventLoop线程都是FastThreadLocalThread, 用于线程则不是FastThreadLocalThread

在这里, 如果FastThreadLocalThread线程, 则调用fastGet方法获取InternalThreadLocalMap, 从名字上我们能知道, 这是一种效率极高的获取方式

如果不是FastThreadLocalThread线程, 则调用slowGet方式获取InternalThreadLocalMap, 同样根据名字, 我们知道这是一种效率不太高的获取方式

我们的demo并不是eventLoop线程, 所以这里会走到slowGet()方法中

我们首先剖析slowGet()方法

private static InternalThreadLocalMap slowGet() {
    ThreadLocal&lt;InternalThreadLocalMap&gt; slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
    InternalThreadLocalMap ret = slowThreadLocalMap.get();
    if (ret == null) {
        ret = new InternalThreadLocalMap();
        slowThreadLocalMap.set(ret);
    }
    return ret;
}

首先通过UnpaddedInternalThreadLocalMap.slowThreadLocalMap拿到一个ThreadLocal对象slowThreadLocalMap, slowThreadLocalMap是UnpaddedInternalThreadLocalMap类的一个静态属性, 类型是ThreadLocal类型

这里的ThreadLocal是jdk的ThreadLocal

然后通过slowThreadLocalMap对象的get方法, 获取一个InternalThreadLocalMap

如果第一次获取, InternalThreadLocalMap有可能是null, 所以在if块中, new了一个InternalThreadLocalMap对象, 并设置在ThreadLocal对象中

因为netty实现的FastThreadLocal要比jdk的ThreadLocal要快, 所以这里的方法叫slowGet

回到InternalThreadLocalMap的get方法:

public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
        return fastGet((FastThreadLocalThread) thread);
    } else {
        return slowGet();
    }
}

我们继续剖析fastGet方法, 通常EventLoop线程FastThreadLocalThread线程, 所以EventLoop线程执行到这一步的时候会调用fastGet方法

我们跟进fastGet

private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
    if (threadLocalMap == null) {
        thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    }
    return threadLocalMap;
}

首先FastThreadLocalThread对象直接通过threadLocalMap拿到threadLocalMap对象

如果threadLocalMap为null, 则创建一个InternalThreadLocalMap对象设置到FastThreadLocalThread的成员变量中

这里我们可以知道FastThreadLocalThread对象中维护了一个InternalThreadLocalMap类型的成员变量, 可以直接通过threadLocalMap()方法获取该变量的值, 也就是InternalThreadLocalMap

我们跟到InternalThreadLocalMap的构造方法中:

private InternalThreadLocalMap() {
    super(newIndexedVariableTable());
}

这里调用了父类的构造方法, 传入一个newIndexedVariableTable()

我们跟到newIndexedVariableTable()中:

private static Object[] newIndexedVariableTable() {
    Object[] array = new Object[32];
    Arrays.fill(array, UNSET);
    return array;
}

这里创建一个长度为32的数组, 并为数组中的每一个对象设置为UNSET, UNSET是一个Object的对象, 表示该下标的值没有被设置

回到InternalThreadLocalMap的构造方法, 再看其父类的构造方法:

UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
    this.indexedVariables = indexedVariables;
}

这里初始化了一个数组类型的成员变量indexedVariables, 就是newIndexedVariableTable返回object的数组

这里我们可以知道, 每个InternalThreadLocalMap对象中都维护了一个Object类型的数组, 那么这个数组有什么作用呢?我们继续往下剖析

回到FastThreadLocal的get方法中

public final V get() {
    return get(InternalThreadLocalMap.get());
}

我们剖析完了InternalThreadLocalMap.get()的相关逻辑, 再继续看重载的get方法:

public final V get(InternalThreadLocalMap threadLocalMap) {
    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }
    return initialize(threadLocalMap);
}

首先看这一步:

Object v = threadLocalMap.indexedVariable(index);

这一步是拿到当前index下标的object, 其实也就是拿到每个FastThreadLocal对象的绑定的线程共享对象

index是我们刚才分析过, 是每一个FastThreadLocal的唯一下标

我们跟到indexedVariable方法中:

public Object indexedVariable(int index) {
    Object[] lookup = indexedVariables;
    return index &lt; lookup.length? lookup[index] : UNSET;
}

这里首先拿到indexedVariables, 我们刚才分析过, indexedVariables是InternalThreadLocalMap对象中维护的数组, 初始大小是32

然后再return中判断当前index是不是小于当前数组的长度, 如果小于则获取当前下标index的数组元素, 否则返回UNSET代表没有设置的对象

这里我们可以分析到, 其实每一个FastThreadLocal对象中所绑定的线程共享对象, 是存放在threadLocalMap对象中的一个对象数组的中的, 数组中的元素的下标其实就是对应着FastThreadLocal中的index属性, 对应关系如图所示

8-1-2

回到FastThreadLocal重载的get方法:

public final V get(InternalThreadLocalMap threadLocalMap) {
    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }
    return initialize(threadLocalMap);
}

根据以上逻辑, 我们知道, 第一次获取对象v是只能获取到UNSET对象, 因为该对象并没有保存在threadLocalMap中的数组indexedVariables中, 所以第一次获取在if判断中为false, 会走到initialize方法中

跟到initialize方法中:

private V initialize(InternalThreadLocalMap threadLocalMap) {
    V v = null;
    try {
        v = initialValue();
    } catch (Exception e) {
        PlatformDependent.throwException(e);
    }
    threadLocalMap.setIndexedVariable(index, v);
    addToVariablesToRemove(threadLocalMap, this);
    return v;
}

这里首先调用的initialValue方法, 这里的initialValue实际上走的是FastThreadLocal子类的重写initialValue方法

在我们的demo中对应这个方法

@Override
protected Object initialValue() throws Exception {
    return new Object();
}

通过这个方法会创建一个线程共享对象

然后通过threadLocalMap对象的setIndexedVariable方法将创建的线程共享对象设置到threadLocalMap中维护的数组中, 参数为FastThreadLocal和创建的对象本身

跟到setIndexedVariable方法中:

public boolean setIndexedVariable(int index, Object value) {
    Object[] lookup = indexedVariables;
    if (index &lt; lookup.length) {
        Object oldValue = lookup[index];
        lookup[index] = value;
        return oldValue == UNSET;
    } else {
        expandIndexedVariableTableAndSet(index, value);
        return true;
    }
}

这里首先判断FastThreadLocal对象的index是否超过数组indexedVariables的长度, 如果没有超过, 则直接通过下标设置新创建的线程共享对象, 通过这个操作, 下次获取该对象的时候就可以直接通过数组下标进行取出

如果index超过了数组indexedVariables的长度, 则通过expandIndexedVariableTableAndSet方法将数组扩容, 并且根据index的通过数组下标的方式将线程共享对象设置到数组indexedVariables中

以上就是线程共享对象的创建和获取的过程,更多关于Netty分布式工具类FastThreadLocal和Recycler的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty分布式flush方法刷新buffer队列源码剖析

    flush方法 上一小节学习了writeAndFlush的write方法, 这一小节我们剖析flush方法 通过前面的学习我们知道, flush方法通过事件传递, 最终会传递到HeadContext的flush方法: public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } 这里最终会调用AbstractUnsafe的flush方法 public final void flush() { a

  • Netty分布式Future与Promise执行回调相关逻辑剖析

    目录 Future和Promise执行回调 首先我们看一段写在handler中的业务代码 这里关注newPromise()方法, 跟进去 我们继续跟write方法 跟进tryFailure方法 跟到addMessage方法中 最后跟到AbstractUnsafe的flush方法 我们跟到remove()方法中 再跟到trySuccess方法中 我们看用户代码 跟到addListener0方法中 回到addListener0方法中 跟到isDone方法中 跟到notifyListeners()方法

  • Netty分布式高性能工具类同线程下回收对象解析

    目录 同线程回收对象 回顾第三小节的demo中的main方法 我们跟进recycle方法 然后获取当前size 同线程回收对象 上一小节剖析了从recycler中获取一个对象, 这一小节分析在创建和回收是同线程的前提下, recycler是如何进行回收的 回顾第三小节的demo中的main方法 public static void main(String[] args){ User user1 = RECYCLER.get(); user1.recycle(); User user2 = REC

  • Netty分布式高性能工具类recycler的使用及创建

    目录 recycler的使用 这里看一个示例 在Recycler的类的源码中, 我们看到这一段逻辑 跟到Stack的构造方法中 继续跟重载的构造方法 我们再回到Stack的构造方法中 前文传送门:Netty分布式FastThreadLocal的set方法实现逻辑剖析 recycler的使用 这一小节开始学习recycler相关的知识, recycler是netty实现的一个轻量级对象回收站, 在netty中, recycler的使用也是相当之频繁的 recycler作用是保证了对象的循环利用, 

  • Netty分布式FastThreadLocal的set方法实现逻辑剖析

    目录 FastThreadLocal的set方法实现 线程set对象 我们跟到setIndexedVariable中 我们跟进removeIndexedVariable方法 上一小节我们学习了FastThreadLocal的创建和get方法的实现逻辑, 这一小节学习FastThreadLocal的set方法的实现逻辑 FastThreadLocal的set方法实现 set方法, 其实就是修改线程共享对象, 作用域只是当前线程, 我们回顾根据上一小节demo中, 其中一个线程set对象的过程: 线

  • Netty分布式从recycler对象回收站获取对象过程剖析

    前文传送门:Netty分布式高性能工具类recycler的使用及创建 从对象回收站中获取对象 我们回顾上一小节demo的main方法中 从回收站获取对象 public static void main(String[] args){ User user1 = RECYCLER.get(); user1.recycle(); User user2 = RECYCLER.get(); user2.recycle(); System.out.println(user1==user2); } 这个通过R

  • Netty分布式高性能工具类FastThreadLocal和Recycler分析

    目录 概述 第一节:FastThreadLocal的使用和创建 首先我们看一个最简单的demo 跟到nextVariableIndex方法中 我们首先剖析slowGet()方法 我们跟进fastGet 回到FastThreadLocal的get方法中 在我们的demo中对应这个方法 前文传送门:Netty分布式Future与Promise执行回调相关逻辑剖析 概述 FastThreadLocal我们在剖析堆外内存分配的时候简单介绍过, 它类似于JDK的ThreadLocal, 也是用于在多线程条

  • Netty分布式高性能工具类异线程下回收对象解析

    目录 异线程回收对象 跟到pushLater方法中 跟到allocate方法中 回到pushLater方法中 简单看下link的类的定义 回到pushLater方法中 前文传送门:Netty分布式高性能工具类同线程下回收对象解析 异线程回收对象 就是创建对象和回收对象不在同一条线程的情况下, 对象回收的逻辑 我们之前小节简单介绍过, 异线程回收对象, 是不会放在当前线程的stack中的, 而是放在一个WeakOrderQueue的数据结构中, 回顾我们之前的一个图: 8-6-1 相关的逻辑, 我

  • Netty分布式客户端接入流程初始化源码分析

    目录 前文概述: 第一节:初始化NioSockectChannelConfig 创建channel 跟到其父类DefaultChannelConfig的构造方法中 再回到AdaptiveRecvByteBufAllocator的构造方法中 继续跟到ChannelMetadata的构造方法中 回到DefaultChannelConfig的构造方法 前文概述: 之前的章节学习了server启动以及eventLoop相关的逻辑, eventLoop轮询到客户端接入事件之后是如何处理的?这一章我们循序渐

  • Netty分布式获取异线程释放对象源码剖析

    目录 获取异线程释放对象 在介绍之前我们首先看Stack类中的两个属性 我们跟到pop方法中 继续跟到scavengeSome方法中 我们继续分析transfer方法 接着我们我们关注一个细节 我们跟到reclaimSpace方法 章节小结 前文传送门:异线程下回收对象 获取异线程释放对象 上一小节分析了异线程回收对象, 原理是通过与stack关联的WeakOrderQueue进行回收 如果对象经过异线程回收之后, 当前线程需要取出对象进行二次利用, 如果当前stack中为空, 则会通过当前st

  • Netty分布式ByteBuf使用directArena分配缓冲区过程解析

    目录 directArena分配缓冲区 回到newDirectBuffer中 我们跟到newByteBuf方法中 跟到reuse方法中 跟到allocate方法中 1.首先在缓存上进行分配 2.如果在缓存上分配不成功, 则实际分配一块内存 上一小节简单分析了PooledByteBufAllocator中, 线程局部缓存和arean的相关逻辑, 这一小节简单分析下directArena分配缓冲区的相关过程 directArena分配缓冲区 回到newDirectBuffer中 protected

  • Netty分布式ByteBuf中PooledByteBufAllocator剖析

    目录 前言 PooledByteBufAllocator分配逻辑 逻辑简述 我们回到newDirectBuffer中 有关缓存列表, 我们循序渐进的往下看 我们在static块中看其初始化过程 我们再次跟到initialValue方法中 我们跟到createSubPageCaches这个方法中 最后并保存其类型 前言 上一小节简单介绍了ByteBufAllocator以及其子类UnPooledByteBufAllocator的缓冲区分类的逻辑, 这一小节开始带大家剖析更为复杂的PooledByt

随机推荐