基于Java实现多线程下载并允许断点续传

完整代码:https://github.com/iyuanyb/Downloader

多线程下载及断点续传的实现是使用 HTTP/1.1 引入的 Range 请求参数,可以访问Web资源的指定区间的内容。虽然实现了多线程及断点续传,但还有很多不完善的地方。

包含四个类:

Downloader: 主类,负责分配任务给各个子线程,及检测进度DownloadFile: 表示要下载的哪个文件,为了能写输入到文件的指定位置,使用 RandomAccessFile 类操作文件,多个线程写同一个文件需要保证线程安全,这里直接调用 getChannel 方法,获取一个文件通道,FileChannel是线程安全的。DownloadTask: 实际执行下载的线程,获取 [lowerBound, upperBound] 区间的数据,当下载过程中出现异常时要通知其他线程(使用 AtomicBoolean),结束下载Logger: 实时记录下载进度,以便续传时知道从哪开始。感觉这里做的比较差,为了能实时写出日志及方便地使用Properties类的load/store方法格式化输入输出,每次都是打开后再关闭。

演示:

随便找一个文件下载:

强行结束程序并重新运行:

日志文件:

断点续传的关键是记录各个线程的下载进度,这里细节比较多,花了很久。只需要记录每个线程请求的Range区间极客,每次成功写数据到文件时,就更新一次下载区间。下面是下载完成后的日志内容。

代码:

Downloader.java

package downloader;

import java.io.*;
import java.net.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicBoolean;

public class Downloader {
  private static final int DEFAULT_THREAD_COUNT = 4; // 默认线程数量
  private AtomicBoolean canceled; // 取消状态,如果有一个子线程出现异常,则取消整个下载任务
  private DownloadFile file; // 下载的文件对象
  private String storageLocation;
  private final int threadCount; // 线程数量
  private long fileSize; // 文件大小
  private final String url;
  private long beginTime; // 开始时间
  private Logger logger;

  public Downloader(String url) {
    this(url, DEFAULT_THREAD_COUNT);
  }

  public Downloader(String url, int threadCount) {
    this.url = url;
    this.threadCount = threadCount;
    this.canceled = new AtomicBoolean(false);
    this.storageLocation = url.substring(url.lastIndexOf('/')+1);
    this.logger = new Logger(storageLocation + ".log", url, threadCount);
  }

  public void start() {
    boolean reStart = Files.exists(Path.of(storageLocation + ".log"));
    if (reStart) {
      logger = new Logger(storageLocation + ".log");
      System.out.printf("* 继续上次下载进度[已下载:%.2fMB]:%s\n", logger.getWroteSize() / 1014.0 / 1024, url);
    } else {
      System.out.println("* 开始下载:" + url);
    }
    if (-1 == (this.fileSize = getFileSize()))
      return;
    System.out.printf("* 文件大小:%.2fMB\n", fileSize / 1024.0 / 1024);

    this.beginTime = System.currentTimeMillis();
    try {
      this.file = new DownloadFile(storageLocation, fileSize, logger);
      if (reStart) {
        file.setWroteSize(logger.getWroteSize());
      }
      // 分配线程下载
      dispatcher(reStart);
      // 循环打印进度
      printDownloadProgress();
    } catch (IOException e) {
      System.err.println("x 创建文件失败[" + e.getMessage() + "]");
    }
  }

  /**
   * 分配器,决定每个线程下载哪个区间的数据
   */
  private void dispatcher(boolean reStart) {
    long blockSize = fileSize / threadCount; // 每个线程要下载的数据量
    long lowerBound = 0, upperBound = 0;
    long[][] bounds = null;
    int threadID = 0;
    if (reStart) {
      bounds = logger.getBounds();
    }
    for (int i = 0; i < threadCount; i++) {
      if (reStart) {
        threadID = (int)(bounds[i][0]);
        lowerBound = bounds[i][1];
        upperBound = bounds[i][2];
      } else {
        threadID = i;
        lowerBound = i * blockSize;
        // fileSize-1 !!!!! fu.ck,找了一下午的错
        upperBound = (i == threadCount - 1) ? fileSize-1 : lowerBound + blockSize;
      }
      new DownloadTask(url, lowerBound, upperBound, file, canceled, threadID).start();
    }
  }

  /**
   * 循环打印进度,直到下载完毕,或任务被取消
   */
  private void printDownloadProgress() {
    long downloadedSize = file.getWroteSize();
    int i = 0;
    long lastSize = 0; // 三秒前的下载量
    while (!canceled.get() && downloadedSize < fileSize) {
      if (i++ % 4 == 3) { // 每3秒打印一次
        System.out.printf("下载进度:%.2f%%, 已下载:%.2fMB,当前速度:%.2fMB/s\n",
            downloadedSize / (double)fileSize * 100 ,
            downloadedSize / 1024.0 / 1024,
            (downloadedSize - lastSize) / 1024.0 / 1024 / 3);
        lastSize = downloadedSize;
        i = 0;
      }
      try {
        Thread.sleep(1000);
      } catch (InterruptedException ignore) {}
      downloadedSize = file.getWroteSize();
    }
    file.close();
    if (canceled.get()) {
      try {
        Files.delete(Path.of(storageLocation));
      } catch (IOException ignore) {
      }
      System.err.println("x 下载失败,任务已取消");
    } else {
      System.out.println("* 下载成功,本次用时"+ (System.currentTimeMillis() - beginTime) / 1000 +"秒");
    }
  }

  /**
   * @return 要下载的文件的尺寸
   */
  private long getFileSize() {
    if (fileSize != 0) {
      return fileSize;
    }
    HttpURLConnection conn = null;
    try {
      conn = (HttpURLConnection)new URL(url).openConnection();
      conn.setConnectTimeout(3000);
      conn.setRequestMethod("HEAD");
      conn.connect();
      System.out.println("* 连接服务器成功");
    } catch (MalformedURLException e) {
      throw new RuntimeException("URL错误");
    } catch (IOException e) {
      System.err.println("x 连接服务器失败["+ e.getMessage() +"]");
      return -1;
    }
    return conn.getContentLengthLong();
  }

  public static void main(String[] args) throws IOException {
    new Downloader("http://js.xiazaicc.com//down2/ucliulanqi_downcc.zip").start();
  }
}

DownloadTask.java

package downloader;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.atomic.AtomicBoolean;

class DownloadTask extends Thread {
  private final String url;
  private long lowerBound; // 下载的文件区间
  private long upperBound;
  private AtomicBoolean canceled;
  private DownloadFile downloadFile;
  private int threadId;

  DownloadTask(String url, long lowerBound, long upperBound, DownloadFile downloadFile,
            AtomicBoolean canceled, int threadID) {
    this.url = url;
    this.lowerBound = lowerBound;
    this.upperBound = upperBound;
    this.canceled = canceled;
    this.downloadFile = downloadFile;
    this.threadId = threadID;
  }

  @Override
  public void run() {
    ReadableByteChannel input = null;
    try {
      ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 * 2); // 2MB
      input = connect();
      System.out.println("* [线程" + threadId + "]连接成功,开始下载...");

      int len;
      while (!canceled.get() && lowerBound <= upperBound) {
        buffer.clear();
        len = input.read(buffer);
        downloadFile.write(lowerBound, buffer, threadId, upperBound);
        lowerBound += len;
      }
      if (!canceled.get()) {
        System.out.println("* [线程" + threadId + "]下载完成" + ": " + lowerBound + "-" + upperBound);
      }
    } catch (IOException e) {
      canceled.set(true);
      System.err.println("x [线程" + threadId + "]遇到错误[" + e.getMessage() + "],结束下载");
    } finally {
      if (input != null) {
        try {
          input.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }

  /**
   * 连接WEB服务器,并返回一个数据通道
   * @return 返回通道
   * @throws IOException 网络连接错误
   */
  private ReadableByteChannel connect() throws IOException {
    HttpURLConnection conn = (HttpURLConnection)new URL(url).openConnection();
    conn.setConnectTimeout(3000);
    conn.setRequestMethod("GET");
    conn.setRequestProperty("Range", "bytes=" + lowerBound + "-" + upperBound);
//    System.out.println("thread_"+ threadId +": " + lowerBound + "-" + upperBound);
    conn.connect();

    int statusCode = conn.getResponseCode();
    if (HttpURLConnection.HTTP_PARTIAL != statusCode) {
      conn.disconnect();
      throw new IOException("状态码错误:" + statusCode);
    }

    return Channels.newChannel(conn.getInputStream());
  }
}

DownloadFile.java

package downloader;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicLong;

class DownloadFile {
  private final RandomAccessFile file;
  private final FileChannel channel; // 线程安全类
  private AtomicLong wroteSize; // 已写入的长度
  private Logger logger;

  DownloadFile(String fileName, long fileSize, Logger logger) throws IOException {
    this.wroteSize = new AtomicLong(0);
    this.logger = logger;
    this.file = new RandomAccessFile(fileName, "rw");
    file.setLength(fileSize);
    channel = file.getChannel();
  }

  /**
   * 写数据
   * @param offset 写偏移量
   * @param buffer 数据
   * @throws IOException 写数据出现异常
   */
  void write(long offset, ByteBuffer buffer, int threadID, long upperBound) throws IOException {
    buffer.flip();
    int length = buffer.limit();
    while (buffer.hasRemaining()) {
      channel.write(buffer, offset);
    }
    wroteSize.addAndGet(length);
    logger.updateLog(threadID, length, offset + length, upperBound); // 更新日志
  }

  /**
   * @return 已经下载的数据量,为了知道何时结束整个任务,以及统计信息
   */
  long getWroteSize() {
    return wroteSize.get();
  }

  // 继续下载时调用
  void setWroteSize(long wroteSize) {
    this.wroteSize.set(wroteSize);
  }

  void close() {
    try {
      file.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

Logger.java

package downloader;

import java.io.*;
import java.util.Properties;

class Logger {
  private String logFileName; // 下载的文件的名字
  private Properties log;

   /**
   * 重新开始下载时,使用该构造函数
   * @param logFileName
   */
  Logger(String logFileName) {
    this.logFileName = logFileName;
    log = new Properties();
    FileInputStream fin = null;
    try {
      log.load(new FileInputStream(logFileName));
    } catch (IOException ignore) {
    } finally {
      try {
        fin.close();
      } catch (Exception ignore) {}
    }
  }

  Logger(String logFileName, String url, int threadCount) {
    this.logFileName = logFileName;
    this.log = new Properties();
    log.put("url", url);
    log.put("wroteSize", "0");
    log.put("threadCount", String.valueOf(threadCount));
    for (int i = 0; i < threadCount; i++) {
      log.put("thread_" + i, "0-0");
    }
  }

  synchronized void updateLog(int threadID, long length, long lowerBound, long upperBound) {
    log.put("thread_"+threadID, lowerBound + "-" + upperBound);
    log.put("wroteSize", String.valueOf(length + Long.parseLong(log.getProperty("wroteSize"))));

    FileOutputStream file = null;
    try {
      file = new FileOutputStream(logFileName); // 每次写时都清空文件
      log.store(file, null);
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      if (file != null) {
        try {
          file.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }

  /**
   * 获取区间信息
   *   ret[i][0] = threadID, ret[i][1] = lowerBoundID, ret[i][2] = upperBoundID
   * @return
   */
  long[][] getBounds() {
    long[][] bounds = new long[Integer.parseInt(log.get("threadCount").toString())][3];
    int[] index = {0};
    log.forEach((k, v) -> {
      String key = k.toString();
      if (key.startsWith("thread_")) {
        String[] interval = v.toString().split("-");
        bounds[index[0]][0] = Long.parseLong(key.substring(key.indexOf("_") + 1));
        bounds[index[0]][1] = Long.parseLong(interval[0]);
        bounds[index[0]++][2] = Long.parseLong(interval[1]);
      }
    });
    return bounds;
  }
  long getWroteSize() {
    return Long.parseLong(log.getProperty("wroteSize"));
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java多线程文件分片下载实现的示例代码

    多线程下载介绍 多线程下载技术是很常见的一种下载方案,这种方式充分利用了多线程的优势,在同一时间段内通过多个线程发起下载请求,将需要下载的数据分割成多个部分,每一个线程只负责下载其中一个部分,然后将下载后的数据组装成完整的数据文件,这样便大大加快了下载效率.常见的下载器,迅雷,QQ旋风等都采用了这种技术. 分片下载 所谓分片下载就是要利用多线程的优势,将要下载的文件一块一块的分配到各个线程中去下载,这样就极大的提高了下载速度. 技术难点 并不能说是什么难点,只能说没接触过不知道罢了. 1.如何请

  • Java多线程生产者消费者模式实现过程解析

    单生产者与单消费者 示例: public class ProduceConsume { public static void main(String[] args) { String lock = new String(""); Produce produce = new Produce(lock); Consume consume = new Consume(lock); new Thread(() -> { while (true) { produce.setValue();

  • java多线程关键字final和static详解

    这篇文章主要介绍了java多线程关键字final和static详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 final关键字 1.final关键字在单线程中的特点: 1)final修饰的静态成员:必须在进行显示初始化或静态代码块赋值,并且仅能赋值一次. 2)final修饰的类成员变量,可以在三个地方进行赋值:显示初始化.构造代码块和构造方法,并且仅能赋值一次. 3)final修饰的局部变量,必须在使用之前进行显示初始化(并不一定要在定义是

  • Java多线程并发生产者消费者设计模式实例解析

    一.两个线程一个生产者一个消费者 需求情景 两个线程,一个负责生产,一个负责消费,生产者生产一个,消费者消费一个. 涉及问题 同步问题:如何保证同一资源被多个线程并发访问时的完整性.常用的同步方法是采用标记或加锁机制. wait() / nofity() 方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制. wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行. not

  • Java多线程状态及方法实例解析

    这篇文章主要介绍了Java多线程状态及方法实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 首先介绍线程的五种状态: 新生态:New Thread() 就绪态:准备抢CPU时间片 运行态:抢到了CPU时间片 阻塞态:放弃已经抢到的CPU时间片,且暂时不参与争抢 死亡态:Run运行完了之后 接下来介绍三种方法:线程的阻塞,线程的优先级设置,线程的礼让 public class MutliThreadDemo4 { public static

  • Java模拟多线程实现抢票代码实例

    这篇文章主要介绍了Java模拟多线程实现抢票,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 实现100张票抢购的demo 这里需要一个变量,来保存100张 局部变量: 定义在方法内,方法运行存在,方法运行结束销毁,无法保存一个持久化数据!!! 成员变量: 保存在类对象内,创建对象之后存在,对象不销毁成员变量也不会被内存收回.因为 在每一个类对象中,都存在一个对应的成员变量,这些成员变量不是同一个数据.不是 共享资源,不合适!!! 静态成员变量:

  • Java多线程 线程状态原理详解

    这篇文章主要介绍了Java多线程 线程状态原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 java.lang.Thread.State枚举定义了6种线程状态. NEW: 尚未启动(start)的线程的线程状态 RUNNABLE: 运行状态,但线程可能正在JVM中执行,也可能在等待CPU调度 BLOCKED: 线程阻塞,等待监视器锁以进入同步代码块/方法 WAITING: 等待状态.使用以下不带超时的方式时会进入:Object.wait.

  • Java多线程模拟电影售票过程

    用多线程模拟电影售票过程(Java实训) 实训目的: 多线程的实现.线程同步 实训要求: 总票数和售票窗口数由键盘输入,用每个线程处理一个窗口的售票. Test.java package program5; import java.util.Scanner; public class Test { public static void main(String[] args) { // TODO Auto-generated method stub Scanner sc=new Scanner(S

  • 基于Java实现多线程下载并允许断点续传

    完整代码:https://github.com/iyuanyb/Downloader 多线程下载及断点续传的实现是使用 HTTP/1.1 引入的 Range 请求参数,可以访问Web资源的指定区间的内容.虽然实现了多线程及断点续传,但还有很多不完善的地方. 包含四个类: Downloader: 主类,负责分配任务给各个子线程,及检测进度DownloadFile: 表示要下载的哪个文件,为了能写输入到文件的指定位置,使用 RandomAccessFile 类操作文件,多个线程写同一个文件需要保证线

  • Java实现多线程下载和断点续传

    java的多线程下载能够明显提升下载的速度,平时我们用的迅雷软件之所以能够下载那么快,就是使用了多线程:当用户在下载的过程中,有断电或断网的可能,当用户再次点击下载时,应该让用户接着原来的进度进行下载,这可以节约用户的流量,所以要用到断点续传的功能.下面是通过Java代码实现多线程下载和断点续传的详细代码. 1.创建一个类,用于文件的下载 package com.edu.thread;   import java.io.BufferedReader; import java.io.File; i

  • Java中多线程下载图片并压缩能提高效率吗

    目录 前言 实现思路 实测 前言 需求 导出Excel:本身以为是一个简单得导出,但是每行得记录文件中有一列为图片url,需要下载所有记录行对应得图片,然后压缩整个文件夹. 这里只做4.5.得代码讲解描述,其它也没什么好说得,话不多说上代码. 实现思路 多线程实现使用了线程池,Jdk1.8并发包下的CompletableFuture 第一步:得到基础数值 // 线程数 Integer threadNum = 10; // 每条线程需要处理的图片数 int dataNum = imageInfoV

  • android实现多线程下载文件(支持暂停、取消、断点续传)

    多线程下载文件(支持暂停.取消.断点续传) 多线程同时下载文件即:在同一时间内通过多个线程对同一个请求地址发起多个请求,将需要下载的数据分割成多个部分,同时下载,每个线程只负责下载其中的一部分,最后将每一个线程下载的部分组装起来即可. 涉及的知识及问题 请求的数据如何分段 分段完成后如何下载和下载完成后如何组装到一起 暂停下载和继续下载的实现(wait().notifyAll().synchronized的使用) 取消下载和断点续传的实现 一.请求的数据如何分段 首先通过HttpURLConne

  • Java基于Socket实现HTTP下载客户端

    没有借助任何第三方库,完全基于JAVA Socket实现一个最小化的HTTP文件下载客户端.完整的演示如何通过Socket实现下载文件的HTTP请求(request header)发送如何从Socket中接受HTTP响应(Response header, Response body)报文并解析与保存文件内容.如何通过SwingWork实现UI刷新,实时显示下载进度. 首先看一下UI部分: [添加下载]按钮: 点击弹出URL输入框,用户Copy要下载文件URL到输入框以后,点击[OK]按钮即开始

  • Java多线程下载文件实例详解

    本文实例为大家分享了Java多线程下载文件的具体代码,供大家参考,具体内容如下 import java.io.File; import java.io.InputStream; import java.io.RandomAccessFile; import java.net.HttpURLConnection; import java.net.URL; public class MulThreadDownload { public static void main(String[] args)

  • 基于java下载中getContentLength()一直为-1的一些思路

    如果Content Length 在头文件中没有描述 暂时还没有解决方案 如果Content Long在头文件中有描述 方案一: 伪装成浏览器 conn.setRequestProperty("User-Agent", " Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36")

  • java多线程下载实例详解

    本文实例讲述了java多线程下载.分享给大家供大家参考,具体如下: 使用多线程下载文件可以更快完成文件的下载,多线程下载文件之所以快,是因为其抢占的服务器资源多.如:假设服务器同时最多服务100个用户,在服务器中一条线程对应一个用户,100条线程在计算机中并非并发执行,而是由CPU划分时间片轮流执行,如果A应用使用了99条线程下载文件,那么相当于占用了99个用户的资源,假设一秒内CPU分配给每条线程的平均执行时间是10ms,A应用在服务器中一秒内就得到了990ms的执行时间,而其他应用在一秒内只

  • Java多线程下载的实现方法

    复制代码 代码如下: package cn.me.test; import java.io.InputStream; import java.io.RandomAccessFile; import java.net.HttpURLConnection; import java.net.URL; /** * 多线程下载 * 1:使用RandomAccessFile在任意的位置写入数据. * 2:需要计算第一个线程下载的数据量,可以平均分配.如果不够平均时, *    则直接最后一个线程处理相对较少

  • C#实现支持断点续传多线程下载客户端工具类

    复制代码 代码如下: /* .Net/C#: 实现支持断点续传多线程下载的 Http Web 客户端工具类 (C# DIY HttpWebClient) * Reflector 了一下 System.Net.WebClient ,改写或增加了若干: * DownLoad.Upload 相关方法! * DownLoad 相关改动较大! * 增加了 DataReceive.ExceptionOccurrs 事件! * 了解服务器端与客户端交互的 HTTP 协议参阅: * 使文件下载的自定义连接支持

随机推荐