java线程池实现批量下载文件

本文实例为大家分享了java线程池实现批量下载文件的具体代码,供大家参考,具体内容如下

1 创建线程池

package com.cheng.webb.thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadUtil {

 /**
   * 创建批量下载线程池
   *
   * @param threadSize 下载线程数
   * @return ExecutorService
   */
  public static ExecutorService buildDownloadBatchThreadPool(int threadSize) {
    int keepAlive = 0;
    String prefix = "download-batch";
    ThreadFactory factory = ThreadUtil.buildThreadFactory(prefix);

    return new ThreadPoolExecutor(threadSize,
        threadSize,
        keepAlive,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(threadSize),
        factory);
  }

  /**
   * 创建自定义线程工厂
   *
   * @param prefix 名称前缀
   * @return ThreadFactory
   */
  public static ThreadFactory buildThreadFactory(String prefix) {
    return new CustomThreadFactory(prefix);
  }

  /**
   * 自定义线程工厂
   */
  public static class CustomThreadFactory implements ThreadFactory {

    private String threadNamePrefix;

    private AtomicInteger counter = new AtomicInteger(1);

    /**
     * 自定义线程工厂
     *
     * @param threadNamePrefix 工厂名称前缀
     */
    CustomThreadFactory(String threadNamePrefix) {
      this.threadNamePrefix = threadNamePrefix;
    }

    @Override
    public Thread newThread(Runnable r) {
      String threadName = threadNamePrefix + "-t" + counter.getAndIncrement();
      return new Thread(r, threadName);
    }
  }

}

2 批量下载文件

package com.cheng.webb.thread;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 文件下载类
 *
 * @author shucheng
 * @creation 2019年1月30日下午4:41:32
 */
public class DownloadUtil {

 private static Logger logger = LoggerFactory.getLogger(DownloadUtil.class);

 /**
 * 下载线程数
 */
 private static final int DOWNLOAD_THREAD_NUM = 14;

 /**
 * 下载线程池
 */
 private static ExecutorService downloadExecutorService = ThreadUtil
  .buildDownloadBatchThreadPool(DOWNLOAD_THREAD_NUM);

 /**
 * 文件下载
 *
 * @param fileUrl
 *      文件url,如:<code>https://img3.doubanio.com//view//photo//s_ratio_poster//public//p2369390663.webp</code>
 * @param path
 *      存放路径,如: /opt/img/douban/my.webp
 */
 public static void download(String fileUrl, String path) {
 // 判断存储文件夹是否已经存在或者创建成功
 if (!createFolderIfNotExists(path)) {
  logger.error("We can't create folder:{}", getFolder(path));
  return;
 }

 InputStream in = null;
 FileOutputStream out = null;
 try {
  URL url = new URL(fileUrl);
  HttpURLConnection conn = (HttpURLConnection) url.openConnection();
  conn.setRequestMethod("GET");
  // 2s
  conn.setConnectTimeout(10000);
  in = conn.getInputStream();

  out = new FileOutputStream(path);

  int len;
  byte[] arr = new byte[1024 * 1000];
  while (-1 != (len = in.read(arr))) {
  out.write(arr, 0, len);
  }
  out.flush();
  conn.disconnect();
 } catch (Exception e) {
  logger.error("Fail to download: {} by {}", fileUrl, e.getMessage());
 } finally {
  try {
  if (null != out) {
   out.close();
  }
  if (null != in) {
   in.close();
  }
  } catch (Exception e) {
  // do nothing
  }
 }
 }

 /**
 * 创建文件夹,如果文件夹已经存在或者创建成功返回true
 *
 * @param path
 *      路径
 * @return boolean
 */
 private static boolean createFolderIfNotExists(String path) {
 String folderName = getFolder(path);
 if (folderName.equals(path)) {
  return true;
 }
 File folder = new File(getFolder(path));
 if (!folder.exists()) {
  synchronized (DownloadUtil.class) {
  if (!folder.exists()) {
   return folder.mkdirs();
  }
  }
 }
 return true;
 }

 /**
 * 获取文件夹
 *
 * @param path
 *      文件路径
 * @return String
 */
 private static String getFolder(String path) {
 int index = path.lastIndexOf("/");
 return -1 != index ? path.substring(0, index) : path;
 }

 /**
 * 下载资源
 * <p>
 * issue: 线程池创建过多
 * <p>
 * 最大批量下载为5,请知悉
 *
 * @param resourceMap
 *      资源map, key为资源下载url,value为资源存储位置
 */
 public static void batch(Map<String, String> resourceMap) {
 if (resourceMap == null || resourceMap.isEmpty()) {
  return;
 }

 try {
  List<String> keys = new ArrayList<>(resourceMap.keySet());
  int size = keys.size();
  int pageNum = getPageNum(size);
  for (int index = 0; index < pageNum; index++) {
  int start = index * DOWNLOAD_THREAD_NUM;
  int last = getLastNum(size, start + DOWNLOAD_THREAD_NUM);

  final CountDownLatch latch = new CountDownLatch(last - start);
  // 获取列表子集
  List<String> urlList = keys.subList(start, last);
  for (String url : urlList) {
   // 提交任务
   Runnable task = new DownloadWorker(latch, url, resourceMap.get(url));
   downloadExecutorService.submit(task);
  }
  latch.await();
  }
 } catch (Exception e) {
  logger.error("{}", e);
 }
 logger.info("Download resource map is all done");
 }

 /**
 * 获取最后一个元素
 *
 * @param size
 *      列表长度
 * @param index
 *      下标
 * @return int
 */
 private static int getLastNum(int size, int index) {
 return index > size ? size : index;
 }

 /**
 * 获取划分页面数量
 *
 * @param size
 *      列表长度
 * @return int
 */
 private static int getPageNum(int size) {
 int tmp = size / DOWNLOAD_THREAD_NUM;
 return size % DOWNLOAD_THREAD_NUM == 0 ? tmp : tmp + 1;
 }

 /**
 * 下载线程
 */
 static class DownloadWorker implements Runnable {

 private CountDownLatch latch;

 private String url;
 private String path;

 DownloadWorker(CountDownLatch latch, String url, String path) {
  this.latch = latch;
  this.url = url;
  this.path = path;
 }

 @Override
 public void run() {
  logger.debug("Start batch:[{}] into: [{}]", url, path);
  DownloadUtil.download(url, path);
  logger.debug("Download:[{}] into: [{}] is done", url, path);
  latch.countDown();
 }
 }

}

3 测试批量下载文件

package com.cheng.webb.thread;

import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import com.alibaba.fastjson.JSON;

public class DownLoadTest {
 String json = "{\r\n"
  + " \"http://www.xxx.com/111/123.mp4\":\"myFile/111/123.mp4\",\r\n"
  + " \"http://www.xxx.com/111/124.mp4\":\"myFile/111/124.mp4\",\r\n"
  + " \"http://www.xxx.com/111/125.mp4\":\"myFile/111/125.mp4\"\r\n"
  + "}";

 @SuppressWarnings("unchecked")
 @Test
 public void test() {
 Map<String, String> map = new HashMap<>();
 Map<String, String> resMap = JSON.parseObject(json, map.getClass());

 int times = 1;
 for (int index = 0; index < times; index++) {
  DownloadUtil.batch(resMap);
 }
 }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java实现FTP批量大文件上传下载篇1

    本文介绍了在Java中,如何使用Java现有的可用的库来编写FTP客户端代码,并开发成Applet控件,做成基于Web的批量.大文件的上传下载控件.文章在比较了一系列FTP客户库的基础上,就其中一个比较通用且功能较强的j-ftp类库,对一些比较常见的功能如进度条.断点续传.内外网的映射.在Applet中回调JavaScript函数等问题进行详细的阐述及代码实现,希望通过此文起到一个抛砖引玉的作用. 一.引子 笔者在实施一个项目过程中出现了一种基于Web的文件上传下载需求.在全省(或全国)各地的用

  • java实现批量下载 多文件打包成zip格式下载

    本文实例为大家分享了java实现批量下载的具体代码,供大家参考,具体内容如下 现在的需求的: 根据产品族.产品类型,下载该产品族.产品类型下面的pic包: pic包是zip压缩文件: t_product表: 这些包以blob形式存在另一张表中: t_imagefile表: 现在要做的是:将接入网.OLT下面的两个包downloadPIC:MA5800系列-pic.zip 和 MA5900-pic.rar一起打包成zip压缩文件下载下来: 代码: ProductController.java: /

  • Java实现FTP批量大文件上传下载篇2

    接着上一篇进行学习java文件上传下载1. 五.断点续传 对于熟用QQ的程序员,QQ的断点续传功能应该是印象很深刻的.因为它很实用也很方面.因此,在我们的上传下载过程中,很实现了断点续传的功能. 其实断点续传的原理很简单,就在上传的过程中,先去服务上进行查找,是否存在此文件,如果存在些文件,则比较服务器上文件的大小与本地文件的大小,如果服务器上的文件比本地的要小,则认为此文件上传过程中应该可以进行断点续传. 在实现的过程中,RandomAccessFile类变得很有用.此类的实例支持对随机存取文

  • java后台批量下载文件并压缩成zip下载的方法

    本文实例为大家分享了java后台批量下载文件并压缩成zip下载的具体代码,供大家参考,具体内容如下 因项目需要,将服务器上的图片文件压缩打包zip,下载到本地桌面. 首先,前端js: function doQueryPic() { var picsDate = $("#picsDate").val(); var piceDate = $("#piceDate").val(); var picInst = $("#pic_inst").combot

  • JAVA SFTP文件上传、下载及批量下载实例

    1.jsch官方API查看地址(附件为需要的jar) http://www.jcraft.com/jsch/ 2.jsch简介 JSch(Java Secure Channel)是一个SSH2的纯Java实现.它允许你连接到一个SSH服务器,并且可以使用端口转发,X11转发,文件传输等,当然你也可以集成它的功能到你自己的应用程序. SFTP(Secure File Transfer Protocol)安全文件传送协议.可以为传输文件提供一种安全的加密方法.SFTP 为 SSH的一部份,是一种传输

  • Java 批量文件压缩导出并下载到本地示例代码

    主要用的是org.apache.tools.zip.ZipOutputStream  这个zip流,这里以Execl为例子. 思路首先把zip流写入到http响应输出流中,再把excel的流写入zip流中(这里可以不用生成文件再打包,只需把execl模板读出写好数据输出到zip流中,并为每次的流设置文件名) 例如:在项目webapp下execl文件中 存在1.xls,2.xls,3.xls文件 1.Controller @RequestMapping(value = "/exportAll&qu

  • Java实现批量下载选中文件功能

    1.在action中定义变量 private List<String> downLoadPaths = new ArrayList<String>();//存储选中文件的下载地址 private OutputStream res; private ZipOutputStream zos; private String outPath; private String lessionIdStr;// 选中文件ID拼接的字符串 private String fileName; //浏览器

  • javaweb文件打包批量下载代码

    本文实例为大家分享了javaweb文件打包批量下载,供大家参考,具体内容如下 // 批量下载未批改作业 @RequestMapping(value = "/downloadAllHomework", method = RequestMethod.GET) public void downloadAllHomework(HttpSession httpSession, HttpServletRequest request, HttpServletResponse response, St

  • java线程池实现批量下载文件

    本文实例为大家分享了java线程池实现批量下载文件的具体代码,供大家参考,具体内容如下 1 创建线程池 package com.cheng.webb.thread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.Thr

  • 使用java实现http多线程断点下载文件(一)

    基本原理:利用URLConnection获取要下载文件的长度.头部等相关信息,并设置响应的头部信息.并且通过URLConnection获取输入流,将文件分成指定的块,每一块单独开辟一个线程完成数据的读取.写入.通过输入流读取下载文件的信息,然后将读取的信息用RandomAccessFile随机写入到本地文件中.同时,每个线程写入的数据都文件指针也就是写入数据的长度,需要保存在一个临时文件中.这样当本次下载没有完成的时候,下次下载的时候就从这个文件中读取上一次下载的文件长度,然后继续接着上一次的位

  • 四种Java线程池用法解析

    本文为大家分析四种Java线程池用法,供大家参考,具体内容如下 1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } } ).start(); 那你就out太多了,new Thread的弊端如下: a. 每次new Thread新建对象性能差. b. 线程缺乏统一管理,可能无限

  • 一种类似JAVA线程池的C++线程池实现方法

    什么是线程池 线程池(thread pool)是一种线程使用模式.线程过多或者频繁创建和销毁线程会带来调度开销,进而影响缓存局部性和整体性能.而线程池维护着多个线程,等待着管理器分配可并发执行的任务.这避免了在处理短时间任务时创建与销毁线程的代价,以及保证了线程的可复用性.线程池不仅能够保证内核的充分利用,还能防止过分调度. 线程池的实现 线程池在JAVA平台上已经有成熟的实现方式,本文介绍参考JAVA线程池实现方式实现的C++线程池类库. 该类库代码已上传至github仓库中,下载地址为:ht

  • Handler实现线程之间的通信下载文件动态更新进度条

    1. 原理 每一个线程对应一个消息队列MessageQueue,实现线程之间的通信,可通过Handler对象将数据装进Message中,再将消息加入消息队列,而后线程会依次处理消息队列中的消息. 2. Message 初始化:一般使用Message.obtain()方法获取一个消息对象,该方法会检查Message对象池中是否存在可重复利用的对象,若无,才会new一个新对象. what:相当于Message的标识符,区别于其它消息. arg1.arg2:int类型,可传递整数. obj:objec

  • JAVA线程池原理实例详解

    本文实例讲述了JAVA线程池原理.分享给大家供大家参考,具体如下: 线程池的优点 1.线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用. 2.可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃. 线程池的创建 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQu

  • Java线程池用法实战案例分析

    本文实例讲述了Java线程池用法.分享给大家供大家参考,具体如下: 一 使用newSingleThreadExecutor创建一个只包含一个线程的线程池 1 代码 import java.util.concurrent.*; public class executorDemo { public static void main( String[] args ) { ExecutorService executor = Executors.newSingleThreadExecutor(); ex

  • Java线程池的应用实例分析

    本文实例讲述了Java线程池的应用.分享给大家供大家参考,具体如下: 一 使用Future与Callable来计算斐波那契数列 1 代码 import java.util.concurrent.*; public class FutureCallableDemo { static long fibonacci(long n) { if (n == 1 ||n == 2) return 1; else return fibonacci(n - 1) + fibonacci(n - 2); } pu

  • 教你如何监控 Java 线程池运行状态的操作(必看)

    之前写过一篇 Java 线程池的使用介绍文章<线程池全面解析>,全面介绍了什么是线程池.线程池核心类.线程池工作流程.线程池分类.拒绝策略.及如何提交与关闭线程池等. 但在实际开发过程中,在线程池使用过程中可能会遇到各方面的故障,如线程池阻塞,无法提交新任务等. 如果你想监控某一个线程池的执行状态,线程池执行类 ThreadPoolExecutor 也给出了相关的 API, 能实时获取线程池的当前活动线程数.正在排队中的线程数.已经执行完成的线程数.总线程数等. 总线程数 = 排队线程数 +

随机推荐