基于Java在netty中实现线程和CPU绑定

目录
  • 简介
  • 引入affinity
  • AffinityThreadFactory
  • 在netty中使用AffinityThreadFactory
  • 总结

简介

使用java thread affinity库我们可以将线程绑定到特定的CPU或者CPU核上,通过减少线程在CPU之间的切换,从而提升线程执行的效率。

虽然netty已经够优秀了,但是谁不想更加优秀一点呢?于是一个想法产生了,那就是能不能把affinity库用在netty中呢?

答案是肯定的,一起来看看吧。

引入affinity

affinity是以jar包的形式提供出去的,目前最新的正式版本是3.20.0,所以我们需要这样引入:

<!-- https://mvnrepository.com/artifact/net.openhft/affinity -->
<dependency>
    <groupId>net.openhft</groupId>
    <artifactId>affinity</artifactId>
    <version>3.20.0</version>
</dependency>

引入affinity之后,会在项目的依赖库中添加一个affinity的lib包,这样我们就可以在netty中愉快的使用affinity了。

AffinityThreadFactory

有了affinity,怎么把affinity引入到netty中呢?

我们知道affinity是用来控制线程的,也就是说affinity是跟线程有关的。而netty中跟线程有关的就是EventLoopGroup,先看一下netty中EventLoopGroup的基本用法,这里以NioEventLoopGroup为例,NioEventLoopGroup有很多构造函数的参数,其中一种是传入一个ThreadFactory:

    public NioEventLoopGroup(ThreadFactory threadFactory) {
        this(0, threadFactory, SelectorProvider.provider());
    }

这个构造函数表示NioEventLoopGroup中使用的线程都是由threadFactory创建而来的。这样以来我们就找到了netty和affinity的对应关系。只需要构造affinity的ThreadFactory即可。

刚好affinity中有一个AffinityThreadFactory类,专门用来创建affinity对应的线程。

接下来我们来详细了解一下AffinityThreadFactory。

AffinityThreadFactory可以根据提供的不同AffinityStrategy来创建对应的线程。AffinityStrategy表示的是线程之间的关系。

在affinity中,有5种线程关系,分别是:

  • SAME_CORE - 线程会运行在同一个CPU core中。
  • SAME_SOCKET - 线程会运行在同一个CPU socket中,但是不在同一个core上。
  • DIFFERENT_SOCKET - 线程会运行在不同的socket中。
  • DIFFERENT_CORE - 线程会运行在不同的core上。
  • ANY - 只要是可用的CPU资源都可以。

这些关系是通过AffinityStrategy中的matches方法来实现的:

boolean matches(int cpuId, int cpuId2);

matches传入两个参数,分别是传入的两个cpuId。

我们以SAME_CORE为例来看看这个mathes方法到底是怎么工作的:

    SAME_CORE {
        @Override
        public boolean matches(int cpuId, int cpuId2) {
            CpuLayout cpuLayout = AffinityLock.cpuLayout();
            return cpuLayout.socketId(cpuId) == cpuLayout.socketId(cpuId2) &&
                    cpuLayout.coreId(cpuId) == cpuLayout.coreId(cpuId2);
        }
    }

可以看到它的逻辑是先获取当前CPU的layout,CpuLayout中包含了cpu个数,sockets个数,每个sockets的cpu核数等基本信息。

并且提供了三个方法根据给定的cpuId返回对应的socket、core和thread信息:

    int socketId(int cpuId);
    int coreId(int cpuId);
    int threadId(int cpuId);

matches方法就是根据传入的cpuId找到对应的socket,core信息进行比较,从而生成了5中不同的策略。

先看一下AffinityThreadFactory的构造函数:

    public AffinityThreadFactory(String name, boolean daemon, @NotNull AffinityStrategy... strategies) {
        this.name = name;
        this.daemon = daemon;
        this.strategies = strategies.length == 0 ? new AffinityStrategy[]{AffinityStrategies.ANY} : strategies;
    }

可以传入thread的name前缀,和是否是守护线程,最后如果strategies不传的话,默认使用的是AffinityStrategies.ANY策略,也就是说为线程分配任何可以绑定的CPU。

接下来看下这个ThreadFactory是怎么创建新线程的:

public synchronized Thread newThread(@NotNull final Runnable r) {
        String name2 = id <= 1 ? name : (name + '-' + id);
        id++;
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                try (AffinityLock ignored = acquireLockBasedOnLast()) {
                    r.run();
                }
            }
        }, name2);
        t.setDaemon(daemon);
        return t;
    }
    private synchronized AffinityLock acquireLockBasedOnLast() {
        AffinityLock al = lastAffinityLock == null ? AffinityLock.acquireLock() : lastAffinityLock.acquireLock(strategies);
        if (al.cpuId() >= 0)
            lastAffinityLock = al;
        return al;
    }

从上面的代码可以看出,创建的新线程会以传入的name为前缀,后面添加1,2,3,4这种后缀。并且根据传入的是否是守护线程的标记,将调用对应线程的setDaemon方法。

重点是Thread内部运行的Runnable内容,在run方法内部,首先调用acquireLockBasedOnLast方法获取lock,在获得lock的前提下运行对应的线程方法,这样就会将当前运行的Thread和CPU进行绑定。

从acquireLockBasedOnLast方法中,我们可以看出AffinityLock实际上是一个链式结构,每次请求的时候都调用的是lastAffinityLock的acquireLock方法,如果获取到lock,则将lastAffinityLock进行替换,用来进行下一个lock的获取。

有了AffinityThreadFactory,我们只需要在netty的使用中传入AffinityThreadFactory即可。

在netty中使用AffinityThreadFactory

上面讲到了要在netty中使用affinity,可以将AffinityThreadFactory传入EventLoopGroup中。对于netty server来说可以有两个EventLoopGroup,分别是acceptorGroup和workerGroup,在下面的例子中我们将AffinityThreadFactory传入workerGroup,这样后续work中分配的线程都会遵循AffinityThreadFactory中配置的AffinityStrategies策略,来获得对应的CPU:

//建立两个EventloopGroup用来处理连接和消息
        EventLoopGroup acceptorGroup = new NioEventLoopGroup(acceptorThreads);
        //创建AffinityThreadFactory
        ThreadFactory threadFactory = new AffinityThreadFactory("affinityWorker", AffinityStrategies.DIFFERENT_CORE,AffinityStrategies.DIFFERENT_SOCKET,AffinityStrategies.ANY);
        //将AffinityThreadFactory加入workerGroup
        EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreads,threadFactory);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptorGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new AffinityServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口并开始接收连接
            ChannelFuture f = b.bind(port).sync();

            // 等待server socket关闭
            f.channel().closeFuture().sync();
        } finally {
            //关闭group
            workerGroup.shutdownGracefully();
            acceptorGroup.shutdownGracefully();
        }

为了获取更好的性能,Affinity还可以对CPU进行隔离,被隔离的CPU只允许执行本应用的线程,从而获得更好的性能。

要使用这个特性需要用到linux的isolcpus。这个功能主要是将一个或多个CPU独立出来,用来执行特定的Affinity任务。

isolcpus命令后面可以接CPU的ID,或者可以修改/boot/grub/grub.conf文件,添加要隔离的CPU信息如下:

isolcpus=3,4,5

总结

affinity可以对线程进行极致管控,对性能要求严格的朋友可以试试,但是在使用过程中需要选择合适的AffinityStrategies,否则可能会得不到想要的结果。

到此这篇关于基于Java在netty中实现线程和CPU绑定的文章就介绍到这了,更多相关 线程和CPU绑定内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java实战之用springboot+netty实现简单的一对一聊天

    一.引入pom <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4

  • Java搭建简单Netty开发环境入门教程

    下面就是准备Netty的jar包了,如果你会maven的话自然是使用maven最为方便了.只需要在pom文件中导入以下几行 <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1

  • Java基于Netty实现Http server的实战

    目录 HTTP协议基础知识 Netty的http协议栈 基于Netty实现httpserver HTTP协议基础知识 HTTP(超文本传输协议,英文:HyperText Transfer Protocol,缩写:HTTP)是基于TCP/IP协议的应用层的协议,常用于分布式.协作式和超媒体信息系统的应用层协议. http协议的主要特点:(1)支持CS(客户端/服务器)模式.(2)使用简单,指定URL并携带必要的参数即可.(3)灵活.传输非常多类型的数据对象,通过Content-Type指定即可.(

  • Java游戏服务器系列之Netty相关知识总结

    一.简介 Java的底层API逐渐复杂,而开发者面对的开发场景需求也在逐渐增大.如果直接针对底层API进行编程,无疑是耗时耗力的.这时就催生了极多的编程框架,这些框架隐藏了API实现的复杂细节,以最简洁的方式给开发人员提供功能的实现接口.Netty就是一款针对于网络链接的框架,他的出现让服务器开发人员更加的集中关注于更多逻辑的实现,而不为了实现更好更多更稳定的链接而头疼.Netty的核心功能基于NIO实现. 二.Netty的应用场景 几乎适用于所有的长短链接场景,由于Java应用的广泛性,几乎所

  • Java设计模式之责任链模式的概念、实现以及netty中的责任链模式

    本文先介绍了责任链模式的概念及简单实现.再贴了netty中对责任链的实现.最后总结了一点点思考. 1.概念相关 1.1.概念 责任链模式为请求创建了一个接收者对象的链,每个接收者都包含对另一个接收者的引用.如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,沿着这条链传递请求,直到有对象处理它为止. 1.2.解决了什么: 客户只需要将请求发送到职责链上即可,无须关心请求的处理细节和请求的传递,所以职责链将请求的发送者和请求的处理者解耦了. 1.3.场景: 1.有多个对象可以处理同一

  • Java基于websocket协议与netty实时视频弹幕交互实现

    目录 摘要 1 技术选型 1.1 netty 1.2 WebSocket 1.3 为什么做这样的技术选型. 2 实现思路 2.1 服务架构 3 实现效果 3.1 视频展示 4 代码实现 4.1 项目结构 4.2 Java服务端 4.3 网页客户端实现 5 小结 摘要 2021年了,还有不支持弹幕的视频网站吗,现在各种弹幕玩法层出不穷,抽奖,ppt都上弹幕玩法了,不整个弹幕都说不过去了,今天笔者就抽空做了一个实时视频弹幕交互功能的实现,不得不说这样的形式为看视频看直播,讲义PPT,抽奖等形式增加了

  • 基于Java在netty中实现线程和CPU绑定

    目录 简介 引入affinity AffinityThreadFactory 在netty中使用AffinityThreadFactory 总结 简介 使用java thread affinity库我们可以将线程绑定到特定的CPU或者CPU核上,通过减少线程在CPU之间的切换,从而提升线程执行的效率. 虽然netty已经够优秀了,但是谁不想更加优秀一点呢?于是一个想法产生了,那就是能不能把affinity库用在netty中呢? 答案是肯定的,一起来看看吧. 引入affinity affinity

  • Java多线程编程中synchronized线程同步的教程

    0.关于线程同步 (1)为什么需要同步多线程? 线程的同步是指让多个运行的线程在一起良好地协作,达到让多线程按要求合理地占用释放资源.我们采用Java中的同步代码块和同步方法达到这样的目的.比如这样的解决多线程无固定序执行的问题: public class TwoThreadTest { public static void main(String[] args) { Thread th1= new MyThread1(); Thread th2= new MyThread2(); th1.st

  • 详解Java多线程编程中的线程同步方法

    1.多线程的同步: 1.1.同步机制: 在多线程中,可能有多个线程试图访问一个有限的资源,必须预防这种情况的发生.所以引入了同步机制:在线程使用一个资源时为其加锁,这样其他的线程便不能访问那个资源了,直到解锁后才可以访问. 1.2.共享成员变量的例子: 成员变量与局部变量: 成员变量: 如果一个变量是成员变量,那么多个线程对同一个对象的成员变量进行操作,这多个线程是共享一个成员变量的. 局部变量: 如果一个变量是局部变量,那么多个线程对同一个对象进行操作,每个线程都会有一个该局部变量的拷贝.他们

  • java导出数据库中Excel表格数据的方法

    本篇文章基于java把数据库中的数据以Excel的方式导出,欢迎各位大神吐槽: 1.基于maven jar包引入如下: <dependency> <groupId>net.sourceforge.jexcelapi</groupId> <artifactId>jxl</artifactId> <version>2.6.12</version> </dependency> 2.首先创建数据库对应的实体类VO :U

  • 基于Java子线程中的异常处理方法(通用)

    在普通的单线程程序中,捕获异常只需要通过try ... catch ... finally ...代码块就可以了.那么,在并发情况下,比如在父线程中启动了子线程,如何在父线程中捕获来自子线程的异常,从而进行相应的处理呢? 常见错误 也许有人会觉得,很简单嘛,直接在父线程启动子线程的地方try ... catch一把就可以了,其实这是不对的. 原因分析 让我们回忆一下Runnable接口的run方法的完整签名,因为没有标识throws语句,所以方法是不会抛出checked异常的.至于Runtime

  • 基于Spring中的线程池和定时任务功能解析

    1.功能介绍 Spring框架提供了线程池和定时任务执行的抽象接口:TaskExecutor和TaskScheduler来支持异步执行任务和定时执行任务功能.同时使用框架自己定义的抽象接口来屏蔽掉底层JDK版本间以及Java EE中的线程池和定时任务处理的差异. 另外Spring还支持集成JDK内部的定时器Timer和Quartz Scheduler框架. 2.线程池的抽象:TaskExecutor TaskExecutor涉及到的相关类图如下: TaskExecutor接口源代码如下所示: p

  • 基于Java中的数值和集合详解

    数组array和集合的区别: (1) 数值是大小固定的,同一数组只能存放一样的数据. (2) java集合可以存放不固定的一组数据 (3) 若程序事不知道究竟需要多少对象,需要在空间不足时自动扩增容量,则需要使用容器类库,array不适用 数组转换为集合: Arrays.asList(数组) 示例: int[] arr = {1,3,4,6,6}; Arrays.asList(arr); for(int i=0;i<arr.length;i++){ System.out.println(arr[

  • 基于java线程安全问题及原理性分析

    1.什么是线程安全问题? 从某个线程开始访问到访问结束的整个过程,如果有一个访问对象被其他线程修改,那么对于当前线程而言就发生了线程安全问题:如果在整个访问过程中,无一对象被其他线程修改,就是线程安全的. 2.线程安全问题产生的根本原因 首先是多线程环境,即同时存在有多个操作者,单线程环境不存在线程安全问题.在单线程环境下,任何操作包括修改操作都是操作者自己发出的,操作者发出操作时不仅有明确的目的,而且意识到操作的影响. 多个操作者(线程)必须操作同一个对象,只有多个操作者同时操作一个对象,行为

  • 基于java 线程的几种状态(详解)

    线程可以有六种状态: 1.New(新创建) 2.Runnable(可运行)(运行) 3.Blocked(被阻塞) 4.Waiting(等待) 5.Timed waiting(计时等待) 6.Terminated(被终止) 新创建线程: 当用new操作符创建一个新线程时,如new Thread(r),该线程还没有开始运行,它的当前状态为new,在线程运行之前还有一些基础工作要做. 可运行线程: 一旦线程调用start方法,线程处于runnable状态.在这个状态下的线程可能正在运行也可能没有运行(

  • 基于java中集合的概念(详解)

    1.集合是储存对象的,长度可变,可以封装不同的对象 2.迭代器: 其实就是取出元素的方式(只能判断,取出,移除,无法增加) 就是把取出方式定义在集合内部,这样取出方式就可以直接访问集合内部的元素,那么取出方式就被定义成了内部类. 二每一个容器的数据结构不同,所以取出的动作细节也不一样.但是都有共性内容判断和取出,那么可以将共性提取,这些内部类都符合一个规则Iterator Iterator it = list.iterator(); while(it.hasNext()){ System.out

随机推荐