关于dubbo 自定义线程池的问题

目录
  • 初识dubbo
    • 一、什么是dubbo?
    • 二、为什么要用dubbo
  • 前言
  • dubbo线程池
  • dubbo线程池说明
  • 自定义线程池代码实现步骤

初识dubbo

一、什么是dubbo?

Dubbo是阿里巴巴开源的基于 Java 的高性能 RPC(一种远程调用) 分布式服务框架(SOA),致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案,其实就是一种远程服务调用的分布式框架

二、为什么要用dubbo

在互联网的发展,网站应用的规模不断扩大,常规的垂直应用架构已无法应对,分布式服务架构以及流动计算架构势在必行,亟需一个治理系统确保架构有条不紊的演进,所以就出现了dubbo

单一应用框架:当网站流量很小时,只需一个应用,将所有功能都部署在一起。

垂直应用框架:当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,将应用拆成互不相干的几个应用,以提升效率。

分布式服务架构:当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心,使前端应用能更快速的响应多变的市场需求。此时,用于提高业务复用及整合的分布式服务框架(RPC)是关键。

流动计算架构:当服务越来越多,容量的评估,小服务资源的浪费等问题逐渐显现,此时需增加一个调度中心基于访问压力实时管理集群容量,提高集群利用率。此时,用于提高机器利用率的资源调度和治理中心(SOA)是关键。

前言

在日常开发中,线程池几乎涉及到了所有的开发框架,或者一些中间件,像我们熟悉的JDK线程池,druid连接线程池等等,线程池的使用,大大降低了人工维护线程的成本,而且提升了线程资源在使用中的效率;

dubbo线程池

dubbo也不例外,默认情况下,当我们在说到dubbo线程池的时候,通常是指服务提供者一端的线程池,其常用配置参数如下:

spring.dubbo.protocol.threads = 2000
spring.dubbo.protocol.threadpool = cached
spring.dubbo.protocol.dispatcher = message

对应到dubbo的配置文件中如下:

 <dubbo:protocol name="dubbo" threadpool="cached" dispatcher="message" threads="50" port="20880"/>

dubbo线程池说明

dubbo在使用的时候,都是通过创建真实的业务线程来进行操作的。已知的线程池模型主要有2个,固定大小线程池和带缓存的线程池;

  • fix线程池,即固定大小线程池。也是dubbo默认的使用方式,默认情况下,不做任何配置,线程池最大的线程个数为200个,并且是没有任何等待队列的。所以这种线程池,在极端情况下,可能会存在问题,比如某个应用执行某个大批量操作时,可能因为线程堵塞造成其他应用无法调用的情况;
  • cached线程池,非固定大小线程池,当线程不足的时候,会自动创建新线程。这种类型的线程池的问题在于,如果有较高的TPS过来的时候,如果请求的dubbo接口比较耗时,未能及时响应,则会连续不断的创建新线程,则对系统的CPU以及负载都是巨大的压力,甚至可能造成系统宕机的风险; dubbo 自定义线程池

在真实的使用过程中,大多数开发人员是忽略这个配置的,也就是说通常情况下默认是使用fix模式的,如果对于那种TPS比较高,或者dubbo接口中执行的核心业务逻辑比较耗时,并且系统要应对的并发也是居高不下的场景下,fix模式最终因为线程数创建不足而产生错误;

在这种情况下,出了错误之后,通常来说也是无感知的,怎么能快速定位因线程创建不足而导致的问题呢?这就需要一种机制,能够监控dubbo线程池,对运行过程中线程池的状况进行监控,当核心指标达到告警阈值时,及时给出预警,通知开发或运维人员快速做出响应和调整;

这就需要自定义线程池来解决这个问题

自定义线程池代码实现步骤

1、自定义一个maven模块并添加核心依赖

<dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo</artifactId>
            <version>2.7.0</version>
        </dependency>

2、自定义线程池类

自定义一个类,继承FixedThreadPool 类,并实现Runnable接口,即该类本身作为一个线程;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.*;

public class WatchingPool extends FixedThreadPool implements Runnable {

    private static final Logger LOGGER = LoggerFactory.getLogger(WatchingPool.class);

    // 线程池预警值【可以根据实际情况动态调整大小】
    private static final double ALARM_PERCENT = 0.70;

    private final Map<URL, ThreadPoolExecutor> theadPoolMap = new ConcurrentHashMap<>();

    public WatchingPool() {
        // 创建一个定时任务,每3秒执行一次【可以根据实际情况动态调整参数】
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this, 1, 3, TimeUnit.SECONDS);
    }

    @Override
    public Executor getExecutor(URL url) {
        // 重写父类getExecutor, 如果executor是ThreadPoolExecutor,则放入theadPoolMap中
        Executor executor = super.getExecutor(url);
        if (executor instanceof ThreadPoolExecutor) {
            theadPoolMap.put(url, (ThreadPoolExecutor) executor);
        }

        return executor;

    public void run() {
        for (Map.Entry<URL, ThreadPoolExecutor> entry : theadPoolMap.entrySet()) {
            URL url = entry.getKey();
            ThreadPoolExecutor threadPoolExecutor = entry.getValue();
            // 获取正在活动的线程数
            int activeCount = threadPoolExecutor.getActiveCount();
            // 获取总的线程数  (继承的FixedThreadPool , 所以这里获取核心的线程数就是总的线程数)
            int corePoolSize = threadPoolExecutor.getCorePoolSize();

            double percent = activeCount / (corePoolSize * 1.0);

            LOGGER.info("线程池状态:{}/{},: {}%", activeCount, corePoolSize, percent*100);
            if (percent > ALARM_PERCENT) {
                LOGGER.error("超出警戒线 : host:{}, 当前使用量 {}%, URL:{}", url.getHost(), percent*100, url);
            }

}

该类的逻辑,主要是设置了一个最大的阈值,当超出这个阈值时候,打印出相关的信息,实际应用中,可以集成发短信或邮件进行告警;

3、将上面的类配置到META-INF目录中

4、在生产端工程中引入上面的模块

<dependency>
            <groupId>com.congge</groupId>
            <artifactId>common-pool</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

5、在生产端配置文件中,指定自定义线程池

<dubbo:protocol name="dubbo" threadpool="watchingPool" threads="50" port="20880"/>

6、改造消费端的启动类,使用1000个线程不断调用接口

import com.congge.service.HelloService;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.io.IOException;

public class ConsumerMain {
    public static void main(String[] args) throws Exception {

        ApplicationContext ac = new ClassPathXmlApplicationContext("spring-consumer.xml");
        HelloService service = (HelloService) ac.getBean("helloService");
        while (true){
            for(int i=0;i<1000;i++){
                Thread.sleep(6);
                new Thread( () ->{
                    String hello = service.hello("Hello Provider");
                    System.out.println(hello);
                }).start();
            }
        }
    }
}

为了看到效果,我们将配置文件这里的线程数调整到了50个,下面启动生产端和消费端的代码,观察控制台输出日志(先启动生产端,再启动消费端)

生产端已就绪

启动消费端,不断发起了调用

这时再次回到生产端控制台,通过输出日志可以看到整个线程池经历了线程数不断往上增长的过程,直到最后达到了警戒线;

到此这篇关于dubbo 自定义线程池的文章就介绍到这了,更多相关dubbo 自定义线程池内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 线上dubbo线程池耗尽CyclicBarrier线程屏障异常解决记录

    目录 事件背景 问题定位 解决问题 文末结语 事件背景 系统相关使用人员反馈系统故障,日志显示从ams系统服务提示dubbo处理线程不足,具体异常信息如下: 问题定位 从上图可知,dubbo的处理线程池满了,默认200个线程,活动线程也是200个.这个现象非常不正常,我们的应用并发还没有到这个程度能同时占用200个线程处理请求.然后去读了下dubbo源码,发现dubbo也认为这种情况不正常,然后帮我们记录了应用的线程堆栈信息,这个非常赞.代码如下: 上面这段代码,在线程池不够用时,会每隔十分钟输

  • 关于dubbo 自定义线程池的问题

    目录 初识dubbo 一.什么是dubbo? 二.为什么要用dubbo 前言 dubbo线程池 dubbo线程池说明 自定义线程池代码实现步骤 初识dubbo 一.什么是dubbo? Dubbo是阿里巴巴开源的基于 Java 的高性能 RPC(一种远程调用) 分布式服务框架(SOA),致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案,其实就是一种远程服务调用的分布式框架 二.为什么要用dubbo 在互联网的发展,网站应用的规模不断扩大,常规的垂直应用架构已无法应对,分布式

  • Java自定义线程池的实现示例

    目录 一.Java语言本身也是多线程,回顾Java创建线程方式如下: 二.JDK线程池工具类. 三.业界知名自定义线程池扩展使用. 一.Java语言本身也是多线程,回顾Java创建线程方式如下: 1.继承Thread类,(Thread类实现Runnable接口),来个类图加深印象. 2.实现Runnable接口实现无返回值.实现run()方法,啥时候run,黑话了. 3.实现Callable接口重写call()+FutureTask获取. public class CustomThread {

  • java多线程学习笔记之自定义线程池

    当我们使用 线程池的时候,可以使用 newCachedThreadPool()或者 newFixedThreadPool(int)等方法,其实我们深入到这些方法里面,就可以看到它们的是实现方式是这样的. public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueu

  • Android编程自定义线程池与用法示例

    本文实例讲述了Android编程自定义线程池与用法.分享给大家供大家参考,具体如下: 一.概述: 1.因为线程池是固定不变的,所以使用了单例模式 2.定义了两个线程池,长的与短的,分别用于不同的地方.因为使用了单例模式,所以定义两个. 3.定义了两个方法,执行的与取消的 二.代码: /** * @描述 线程管理池 * @项目名称 App_Shop * @包名 com.android.shop.manager * @类名 ThreadManager * @author chenlin * @dat

  • Spring Boot利用@Async如何实现异步调用:自定义线程池

    前言 在之前的Spring Boot基础教程系列中,已经通过<Spring Boot中使用@Async实现异步调用>一文介绍过如何使用@Async注解来实现异步调用了.但是,对于这些异步执行的控制是我们保障自身应用健康的基本技能.本文我们就来学习一下,如果通过自定义线程池的方式来控制异步调用的并发. 本文中的例子我们可以在之前的例子基础上修改,也可以创建一个全新的Spring Boot项目来尝试. 定义线程池 第一步,先在Spring Boot主类中定义一个线程池,比如: @SpringBoo

  • python自定义线程池控制线程数量的示例

    1.自定义线程池 import threading import Queue import time queue = Queue.Queue() def put_data_in_queue(): for i in xrange(10): queue.put(i) class MyThread(threading.Thread): def run(self): while not queue.empty(): sleep_times = queue.get() time.sleep(sleep_t

  • Python自定义线程池实现方法分析

    本文实例讲述了Python自定义线程池实现方法.分享给大家供大家参考,具体如下: 关于python的多线程,由与GIL的存在被广大群主所诟病,说python的多线程不是真正的多线程.但多线程处理IO密集的任务效率还是可以杠杠的. 我实现的这个线程池其实是根据银角的思路来实现的. 主要思路: 任务获取和执行: 1.任务加入队列,等待线程来获取并执行. 2.按需生成线程,每个线程循环取任务. 线程销毁: 1.获取任务是终止符时,线程停止. 2.线程池close()时,向任务队列加入和已生成线程等量的

  • Java8并行流中自定义线程池操作示例

    本文实例讲述了Java8并行流中自定义线程池操作.分享给大家供大家参考,具体如下: 1.概览 java8引入了流的概念,流是作为一种对数据执行大量操作的有效方式.并行流可以被包含于支持并发的环境中.这些流可以提高执行性能-以牺牲多线程的开销为代价 在这篇短文中,我们将看一下 Stream API的最大限制,同时看一下如何让并行流和线程池实例(ThreadPool instance)一起工作. 2.并行流Parallel Stream 我们先以一个简单的例子来开始-在任一个Collection类型

  • JAVA 自定义线程池的最大线程数设置方法

    一:CPU密集型: 定义:CPU密集型也是指计算密集型,大部分时间用来做计算逻辑判断等CPU动作的程序称为CPU密集型任务.该类型的任务需要进行大量的计算,主要消耗CPU资源.  这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数. 特点:    01:CPU 使用率较高(也就是经常计算一些复杂的运算,逻辑处理等情况)非常多的情况下使用    02:针对单台机

  • Java 自定义线程池和线程总数控制操作

    1 概述 池化是常见的思想,线程池是非常典型的池化的实现,<Java并发编程实战>也大篇幅去讲解了Java中的线程池.本文实现一个简单的线程池. 2 核心类 [1]接口定义 public interface IThreadPool<Job extends Runnable> { /** * 关闭线程池 */ public void shutAlldown(); /** * 执行任务 * * @param job 任务 */ public void execute(Job job);

随机推荐