spring异步service中处理线程数限制详解

情况简介

spring项目,controller异步调用service的方法,产生大量并发。

具体业务:

前台同时传入大量待翻译的单词,后台业务接收单词,并调用百度翻译接口翻译接收单词并将翻译结果保存到数据库,前台不需要实时返回翻译结果。

处理方式:

controller接收文本调用service中的异步方法,将单词先保存到队列中,再启动2个新线程,从缓存队列中取单词,并调用百度翻译接口获取翻译结果并将翻译结果保存到数据库。

本文主要知识点:

多线程同时(异步)调用方法后,开启新线程,并限制线程数量。

代码如下:

@Service
public class LgtsAsyncServiceImpl {
 /** logger日志. */
 public static final Logger LOGGER = Logger.getLogger(LgtsAsyncServiceImpl2.class);

 private final BlockingQueue<Lgts> que = new LinkedBlockingQueue<>();// 待翻译的队列
 private final AtomicInteger threadCnt = new AtomicInteger(0);// 当前翻译中的线程数
 private final Vector<String> existsKey = new Vector<>();// 保存已入队列的数据
 private final int maxThreadCnt = 2;// 允许同时执行的翻译线程数
 private static final int NUM_OF_EVERY_TIME = 50;// 每次提交的翻译条数
 private static final String translationFrom = "zh";

 @Async
 public void saveAsync(Lgts t) {
  if (Objects.isNull(t) || StringUtils.isAnyBlank(t.getGco(), t.getCode())) {
   return;
  }
  offer(t);
  save();
  return;
 }

 private boolean offer(Lgts t) {
  String key = t.getGco() + "-" + t.getCode();
  if (!existsKey.contains(key)) {
   existsKey.add(key);
   boolean result = que.offer(t);
   // LOGGER.trace("待翻译文字[" + t.getGco() + ":" + t.getCode() + "]加入队列结果[" + result
   // + "],队列中数据总个数:" + que.size());
   return result;
  }
  return false;
 }

 @Autowired
 private LgtsService lgtsService;

 private void save() {
  int cnt = threadCnt.incrementAndGet();// 当前线程数+1
  if (cnt > maxThreadCnt) {
   // 已启动的线程大于设置的最大线程数直接丢弃
   threadCnt.decrementAndGet();// +1的线程数再-回去
   return;
  }
  GwallUser user = UserUtils.getUser();
  Thread thr = new Thread() {
   public void run() {
    long sleepTime = 30000l;
    UserUtils.setUser(user);
    boolean continueFlag = true;
    int maxContinueCnt = 5;// 最大连续休眠次数,连续休眠次数超过最大休眠次数后,while循环退出,当前线程销毁
    int continueCnt = 0;// 连续休眠次数

    while (continueFlag) {// 队列不为空时执行
     if (Objects.isNull(que.peek())) {
      try {
       if (continueCnt > maxContinueCnt) {
        // 连续休眠次数达到最大连续休眠次数,当前线程将销毁。
        continueFlag = false;
        continue;
       }
       // 队列为空,准备休眠
       Thread.sleep(sleepTime);
       continueCnt++;
       continue;
      } catch (InterruptedException e) {
       // 休眠失败,无需处理
       e.printStackTrace();
      }
     }
     continueCnt = 0;// 重置连续休眠次数为0

     List<Lgts> params = new ArrayList<>();
     int totalCnt = que.size();
     que.drainTo(params, NUM_OF_EVERY_TIME);
     StringBuilder utf8q = new StringBuilder();
     String code = "";
     List<Lgts> needRemove = new ArrayList<>();
     for (Lgts lgts : params) {
      if (StringUtils.isAnyBlank(code)) {
       code = lgts.getCode();
      }
      // 移除existsKey中保存的key,以免下面翻译失败时再次加入队列时,加入不进去
      String key = lgts.getGco() + "-" + lgts.getCode();
      existsKey.remove(key);

      if (!code.equalsIgnoreCase(lgts.getCode())) {// 要翻译的目标语言与当前列表中的第一个不一致
       offer(lgts);// 重新将待翻译的语言放回队列
       needRemove.add(lgts);
       continue;
      }
      utf8q.append(lgts.getGco()).append("\n");
     }
     params.removeAll(needRemove);
     LOGGER.debug("队列中共" + totalCnt + " 个,获取" + params.size() + " 个符合条件的待翻译内容,编码:" + code);
     String to = "en";
     if (StringUtils.isAnyBlank(utf8q, to)) {
      LOGGER.warn("调用翻译出错,未找到[" + code + "]对应的百度编码。");
      continue;
     }
     Map<String, String> result = getBaiduTranslation(utf8q.toString(), translationFrom, to);
     if (Objects.isNull(result) || result.isEmpty()) {// 把没有获取到翻译结果的重新放回队列
      for (Lgts lgts : params) {
       offer(lgts);
      }
      LOGGER.debug("本次翻译结果为空。");
      continue;
     }
     int sucessCnt = 0, ignoreCnt = 0;
     for (Lgts lgts : params) {
      lgts.setBdcode(to);
      String gna = result.get(lgts.getGco());
      if (StringUtils.isAnyBlank(gna)) {
       offer(lgts);// 重新将待翻译的语言放回队列
       continue;
      }
      lgts.setStat(1);
      lgts.setGna(gna);
      int saveResult = lgtsService.saveIgnore(lgts);
      if (0 == saveResult) {
       ignoreCnt++;
      } else {
       sucessCnt++;
      }
     }
     LOGGER.debug("待翻译个数:" + params.size() + ",翻译成功个数:" + sucessCnt + ",已存在并忽略个数:" + ignoreCnt);
    }
    threadCnt.decrementAndGet();// 运行中的线程数-1
    distory();// 清理数据,必须放在方法最后,否则distory中的判断需要修改
   }

   /**
    * 如果是最后一个线程,清空队列和existsKey中的数据
    */
   private void distory() {
    if (0 == threadCnt.get()) {
     // 最后一个线程退出时,执行清理操作
     existsKey.clear();
     que.clear();
    }
   }
  };
  thr.setDaemon(true);// 守护线程,如果主线程执行完毕,则此线程会自动销毁
  thr.setName("baidufanyi-" + RandomUtils.nextInt(1000, 9999));
  thr.start();// 启动插入线程
 }

 /**
  * 百度翻译
  *
  * @param utf8q
  *   待翻译的字符串,需要utf8格式的
  * @param from
  *   百度翻译语言列表中的代码
  *   参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
  * @param to
  *   百度翻译语言列表中的代码
  *   参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
  * @return 翻译结果
  */
 private Map<String, String> getBaiduTranslation(String utf8q, String from, String to) {
  Map<String, String> result = new HashMap<>();
  String baiduurlStr = "http://api.fanyi.baidu.com/api/trans/vip/translate";
  if (StringUtils.isAnyBlank(baiduurlStr)) {
   LOGGER.warn("百度翻译API接口URL相关参数为空!");
   return result;
  }
  Map<String, String> params = buildParams(utf8q, from, to);
  if (params.isEmpty()) {
   return result;
  }

  String sendUrl = getUrlWithQueryString(baiduurlStr, params);
  try {
   HttpClient httpClient = new HttpClient();
   httpClient.setMethod("GET");
   String remoteResult = httpClient.pub(sendUrl, "");
   result = convertRemote(remoteResult);
  } catch (Exception e) {
   LOGGER.info("百度翻译API返回结果异常!", e);
  }
  return result;
 }

 private Map<String, String> convertRemote(String remoteResult) {
  Map<String, String> result = new HashMap<>();
  if (StringUtils.isBlank(remoteResult)) {
   return result;
  }
  JSONObject jsonObject = JSONObject.parseObject(remoteResult);
  JSONArray trans_result = jsonObject.getJSONArray("trans_result");
  if (Objects.isNull(trans_result) || trans_result.isEmpty()) {
   return result;
  }
  for (Object object : trans_result) {
   JSONObject trans = (JSONObject) object;
   result.put(trans.getString("src"), trans.getString("dst"));
  }
  return result;
 }

 private Map<String, String> buildParams(String utf8q, String from, String to) {
  if (StringUtils.isBlank(from)) {
   from = "auto";
  }
  Map<String, String> params = new HashMap<String, String>();
  String skStr = "sk";
  String appidStr = "appid";
  if (StringUtils.isAnyBlank(skStr, appidStr)) {
   LOGGER.warn("百度翻译API接口相关参数为空!");
   return params;
  }

  params.put("q", utf8q);
  params.put("from", from);
  params.put("to", to);

  params.put("appid", appidStr);

  // 随机数
  String salt = String.valueOf(System.currentTimeMillis());
  params.put("salt", salt);

  // 签名
  String src = appidStr + utf8q + salt + skStr; // 加密前的原文
  params.put("sign", MD5Util.md5Encrypt(src).toLowerCase());
  return params;
 }

 public static String getUrlWithQueryString(String url, Map<String, String> params) {
  if (params == null) {
   return url;
  }

  StringBuilder builder = new StringBuilder(url);
  if (url.contains("?")) {
   builder.append("&");
  } else {
   builder.append("?");
  }

  int i = 0;
  for (String key : params.keySet()) {
   String value = params.get(key);
   if (value == null) { // 过滤空的key
    continue;
   }

   if (i != 0) {
    builder.append('&');
   }

   builder.append(key);
   builder.append('=');
   builder.append(encode(value));

   i++;
  }

  return builder.toString();
 }

 /**
  * 对输入的字符串进行URL编码, 即转换为%20这种形式
  *
  * @param input
  *   原文
  * @return URL编码. 如果编码失败, 则返回原文
  */
 public static String encode(String input) {
  if (input == null) {
   return "";
  }

  try {
   return URLEncoder.encode(input, "utf-8");
  } catch (UnsupportedEncodingException e) {
   e.printStackTrace();
  }

  return input;
 }
}

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。

(0)

相关推荐

  • spring boot使用自定义配置的线程池执行Async异步任务

    在前面的博客中,http://www.jb51.net/article/106718.htm 我们使用了spring boot的异步操作,当时,我们使用的是默认的线程池,但是,如果我们想根据项目来定制自己的线程池了,下面就来说说,如何定制线程池! 一.增加配置属性类 package com.chhliu.springboot.async.configuration; import org.springframework.boot.context.properties.ConfigurationP

  • Spring Boot利用@Async异步调用:ThreadPoolTaskScheduler线程池的优雅关闭详解

    前言 之前分享了一篇关于Spring Boot中使用@Async来实现异步任务和线程池控制的文章:<Spring Boot使用@Async实现异步调用:自定义线程池>.由于最近身边也发现了不少异步任务没有正确处理而导致的不少问题,所以在本文就接前面内容,继续说说线程池的优雅关闭,主要针对ThreadPoolTaskScheduler线程池. 问题现象 在上篇文章的例子Chapter4-1-3中,我们定义了一个线程池,然后利用@Async注解写了3个任务,并指定了这些任务执行使用的线程池.在上文

  • 详解Spring框架下向异步线程传递HttpServletRequest参数的坑

    在spring的注解 @RequestMapping 之下可以直接获取 HttpServletRequest 来获得诸如request header等重要的请求信息: @Slf4j @RestController @RequestMapping("/test") public class TestController { private static final String HEADER = "app-version"; @RequestMapping(value

  • 浅谈Spring @Async异步线程池用法总结

    本文介绍了Spring @Async异步线程池用法总结,分享给大家,希望对大家有帮助 1. TaskExecutor spring异步线程池的接口类,其实质是Java.util.concurrent.Executor Spring 已经实现的异常线程池: 1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程. 2. SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作.只适用于不需要多线程的地方 3. Conc

  • Spring Boot利用@Async如何实现异步调用:自定义线程池

    前言 在之前的Spring Boot基础教程系列中,已经通过<Spring Boot中使用@Async实现异步调用>一文介绍过如何使用@Async注解来实现异步调用了.但是,对于这些异步执行的控制是我们保障自身应用健康的基本技能.本文我们就来学习一下,如果通过自定义线程池的方式来控制异步调用的并发. 本文中的例子我们可以在之前的例子基础上修改,也可以创建一个全新的Spring Boot项目来尝试. 定义线程池 第一步,先在Spring Boot主类中定义一个线程池,比如: @SpringBoo

  • spring异步service中处理线程数限制详解

    情况简介 spring项目,controller异步调用service的方法,产生大量并发. 具体业务: 前台同时传入大量待翻译的单词,后台业务接收单词,并调用百度翻译接口翻译接收单词并将翻译结果保存到数据库,前台不需要实时返回翻译结果. 处理方式: controller接收文本调用service中的异步方法,将单词先保存到队列中,再启动2个新线程,从缓存队列中取单词,并调用百度翻译接口获取翻译结果并将翻译结果保存到数据库. 本文主要知识点: 多线程同时(异步)调用方法后,开启新线程,并限制线程

  • Python异步爬虫多线程与线程池示例详解

    目录 背景 异步爬虫方式 多线程,多进程(不建议) 线程池,进程池(适当使用) 单线程+异步协程(推荐) 多线程 线程池 背景 当对多个url发送请求时,只有请求完第一个url才会接着请求第二个url(requests是一个阻塞的操作),存在等待的时间,这样效率是很低的.那我们能不能在发送请求等待的时候,为其单独开启进程或者线程,继续请求下一个url,执行并行请求 异步爬虫方式 多线程,多进程(不建议) 好处:可以为相关阻塞的操作单独开启线程或者进程,阻塞操作就可以异步会执行 弊端:不能无限制开

  • Spring MVC项目中log4J和AOP使用详解

    前言 日志处理是每个项目当中一个非常重要的内容.没有了日志,也就失去了对系统的可控性.没有日志,系统出现任何问题,都会没有踪迹可寻,这对一个信息系统而言是非常危险的. 项目中需要将service中的类方法的调用过程,使用log4j日志记录. service中的类和方法都很多,不可能在每个类中单独添加log4j日志记录的功能,因此我们在这里使用AOP的思想进行横向切面. 以service类中的方法为切入点,通过AOP在方法调用前后使用log4j输出日志,内容包括正在调用的类和方法名. 在配置过程中

  • Java中终止线程的方法详解

    Java中终止线程的方式主要有三种: 1.使用stop()方法,已被弃用.原因是:stop()是立即终止,会导致一些数据被到处理一部分就会被终止,而用户并不知道哪些数据被处理,哪些没有被处理,产生了不完整的"残疾"数据,不符合完整性,所以被废弃.So, forget it! 2.使用volatile标志位 看一个简单的例子: 首先,实现一个Runnable接口,在其中定义volatile标志位,在run()方法中使用标志位控制程序运行 public class MyRunnable i

  • Spring Boot框架中的@Conditional注解示例详解

    目录 1. @Conditional 注解 2. Spring boot 扩展 1) @ConditionalOnClass和@ConditionalOnMissingClass注解 2) @ConditionalOnBean 和@ConditionalOnMissingBean注解 3) @ConditionalOnProperty注解 1. @Conditional 注解 @Conditional注解是Spring-context模块提供了一个注解,该注解的作用是可以根据一定的条件来使@Co

  • spring bean标签中的init-method和destroy-method详解

    目录 1 背景介绍 2 init-method 3 destroy-method 4 总结 1 背景介绍 在很多项目中,经常在xml配置文件中看到init-method 或者 destroy-method .因此整理收集下,方便以后参考和学习.可以使用 init-method 和 destroy-method 在bean 配置文件属性用于在bean初始化和销毁某些动作时.这是用来替代 InitializingBean和DisposableBean接口. init-method 用于指定bean的

  • iOS中的线程死锁实例详解

    什么是线程死锁 是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去. 线程死锁怎么发生 发生死锁的情况一般是两个对象的锁相互等待造成的. 死锁发生的条件 1.互斥条件:所谓互斥就是进程在某一时间内独占资源. 2.请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放. 3.不剥夺条件:进程已获得资源,在末使用完之前,不能强行剥夺. 4.循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系. 死锁通常是一个线程锁定了一

  • JavaScript 中的无穷数(Infinity)详解

    为了保证的可读性,本文采用意译而非直译. Infinity(无穷大)在 JS 中是一个特殊的数字,它的特性是:它比任何有限的数字都大,如果不知道 Infinity, 我们在一些运算操作遇到时,就会觉得很有意思. 现在我们来看看 JS 中的Infinity 属性,了解用例并解决一些常见的陷阱. 1.Infinity(无穷)的定义 无穷可以分为两种,正无穷和负无穷,JS 中对应的表示方式为:+Infinity(或者Infinity) 和 -Infinity. 这意味着Infinity和-Infini

  • 在Spring异步调用中传递上下文的方法

    什么是异步调用? 异步调用是相对于同步调用而言的,同步调用是指程序按预定顺序一步步执行,每一步必须等到上一步执行完后才能执行,异步调用则无需等待上一步程序执行完即可执行.异步调用指,在程序在执行时,无需等待执行的返回值即可继续执行后面的代码.在我们的应用服务中,有很多业务逻辑的执行操作不需要同步返回(如发送邮件.冗余数据表等),只需要异步执行即可. 本文将介绍 Spring 应用中,如何实现异步调用.在异步调用的过程中,会出现线程上下文信息的丢失,我们该如何解决线程上下文信息的传递. Sprin

  • Spring boot注解@Async线程池实例详解

    这篇文章主要介绍了Spring boot注解@Async线程池实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 从Spring3开始提供了@Async注解,该注解可以被标注在方法上,以便异步地调用该方法.调用者将在调用时立即返回,方法的实际执行将提交给Spring TaskExecutor的任务中,由指定的线程池中的线程执行. 1. TaskExecutor Spring异步线程池的接口类,其实质是java.util.concurrent

随机推荐