RxJava+Retrofit+OkHttp实现多文件下载之断点续传

背景

断点续传下载一直是移动开发中必不可少的一项重要的技术,同样的Rxjava和Retrofit的结合让这个技术解决起来更加的灵活,我们完全可以封装一个适合自的下载框架,简单而且安全!

效果

实现

下载和之前的http请求可以相互独立,所以我们单独给download建立一个工程moudel处理

1.创建service接口

和以前一样,先写接口

注意:Streaming是判断是否写入内存的标示,如果小文件可以考虑不写,一般情况必须写;下载地址需要通过@url动态指定(不适固定的),@head标签是指定下载的起始位置(断点续传的位置)

 /*断点续传下载接口*/
  @Streaming/*大文件需要加入这个判断,防止下载过程中写入到内存中*/
  @GET
  Observable<ResponseBody> download(@Header("RANGE") String start, @Url String url);

2.复写ResponseBody

和之前的上传封装一样,下载更加的需要进度,所以我们同样覆盖ResponseBody类,写入进度监听回调

/**
 * 自定义进度的body
 * @author wzg
 */
public class DownloadResponseBody extends ResponseBody {
  private ResponseBody responseBody;
  private DownloadProgressListener progressListener;
  private BufferedSource bufferedSource;

  public DownloadResponseBody(ResponseBody responseBody, DownloadProgressListener progressListener) {
    this.responseBody = responseBody;
    this.progressListener = progressListener;
  }

  @Override
  public BufferedSource source() {
    if (bufferedSource == null) {
      bufferedSource = Okio.buffer(source(responseBody.source()));
    }
    return bufferedSource;
  }

  private Source source(Source source) {
    return new ForwardingSource(source) {
      long totalBytesRead = 0L;
      @Override
      public long read(Buffer sink, long byteCount) throws IOException {
        long bytesRead = super.read(sink, byteCount);
        // read() returns the number of bytes read, or -1 if this source is exhausted.
        totalBytesRead += bytesRead != -1 ? bytesRead : 0;
        if (null != progressListener) {
          progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
        }
        return bytesRead;
      }
    };
  }
}

3.自定义进度回调接口

/**
 * 成功回调处理
 * Created by WZG on 2016/10/20.
 */
public interface DownloadProgressListener {
  /**
   * 下载进度
   * @param read
   * @param count
   * @param done
   */
  void update(long read, long count, boolean done);
}

4.复写Interceptor

复写Interceptor,可以将我们的监听回调通过okhttp的client方法addInterceptor自动加载我们的监听回调和ResponseBody

/**
 * 成功回调处理
 * Created by WZG on 2016/10/20.
 */
public class DownloadInterceptor implements Interceptor {

  private DownloadProgressListener listener;

  public DownloadInterceptor(DownloadProgressListener listener) {
    this.listener = listener;
  }

  @Override
  public Response intercept(Chain chain) throws IOException {
    Response originalResponse = chain.proceed(chain.request());

    return originalResponse.newBuilder()
        .body(new DownloadResponseBody(originalResponse.body(), listener))
        .build();
  }
}

5.封装请求downinfo数据

这个类中的数据可自由扩展,用户自己选择需要保持到数据库中的数据,可以自由选择需要数据库第三方框架,demo采用greenDao框架存储数据

public class DownInfo {
  /*存储位置*/
  private String savePath;
  /*下载url*/
  private String url;
  /*基础url*/
  private String baseUrl;
  /*文件总长度*/
  private long countLength;
  /*下载长度*/
  private long readLength;
  /*下载唯一的HttpService*/
  private HttpService service;
  /*回调监听*/
  private HttpProgressOnNextListener listener;
  /*超时设置*/
  private int DEFAULT_TIMEOUT = 6;
  /*下载状态*/
  private DownState state;
  }

6.DownState状态封装

很简单,和大多数封装框架一样

public enum DownState {
  START,
  DOWN,
  PAUSE,
  STOP,
  ERROR,
  FINISH,
}

7.请求HttpProgressOnNextListener回调封装类

注意:这里和DownloadProgressListener不同,这里是下载这个过程中的监听回调,DownloadProgressListener只是进度的监听

通过抽象类,可以自由选择需要覆盖的类,不需要完全覆盖!更加灵活

/**
 * 下载过程中的回调处理
 * Created by WZG on 2016/10/20.
 */
public abstract class HttpProgressOnNextListener<T> {
  /**
   * 成功后回调方法
   * @param t
   */
  public abstract void onNext(T t);

  /**
   * 开始下载
   */
  public abstract void onStart();

  /**
   * 完成下载
   */
  public abstract void onComplete();

  /**
   * 下载进度
   * @param readLength
   * @param countLength
   */
  public abstract void updateProgress(long readLength, long countLength);

  /**
   * 失败或者错误方法
   * 主动调用,更加灵活
   * @param e
   */
   public void onError(Throwable e){

   }

  /**
   * 暂停下载
   */
  public void onPuase(){

  }

  /**
   * 停止下载销毁
   */
  public void onStop(){

  }
}

8.封装回调Subscriber

准备的工作做完,需要将回调和传入回调的信息统一封装到sub中,统一判断;和封装二的原理一样,我们通过自定义Subscriber来提前处理返回的数据,让用户字需要关系成功和失败以及向关心的数据,避免重复多余的代码出现在处理类中

  1. sub需要继承DownloadProgressListener,和自带的回调一起组成我们需要的回调结果
  2. 传入DownInfo数据,通过回调设置DownInfo的不同状态,保存状态
  3. 通过RxAndroid将进度回调指定到主线程中(如果不需要进度最好去掉该处理避免主线程处理负担)
  4. update进度回调在断点续传使用时,需要手动判断断点后加载的长度,因为指定断点下载长度下载后总长度=(物理长度-起始下载长度)
/**
 * 用于在Http请求开始时,自动显示一个ProgressDialog
 * 在Http请求结束是,关闭ProgressDialog
 * 调用者自己对请求数据进行处理
 * Created by WZG on 2016/7/16.
 */
public class ProgressDownSubscriber<T> extends Subscriber<T> implements DownloadProgressListener {
  //弱引用结果回调
  private WeakReference<HttpProgressOnNextListener> mSubscriberOnNextListener;
  /*下载数据*/
  private DownInfo downInfo;

  public ProgressDownSubscriber(DownInfo downInfo) {
    this.mSubscriberOnNextListener = new WeakReference<>(downInfo.getListener());
    this.downInfo=downInfo;
  }

  /**
   * 订阅开始时调用
   * 显示ProgressDialog
   */
  @Override
  public void onStart() {
    if(mSubscriberOnNextListener.get()!=null){
      mSubscriberOnNextListener.get().onStart();
    }
    downInfo.setState(DownState.START);
  }

  /**
   * 完成,隐藏ProgressDialog
   */
  @Override
  public void onCompleted() {
    if(mSubscriberOnNextListener.get()!=null){
      mSubscriberOnNextListener.get().onComplete();
    }
    downInfo.setState(DownState.FINISH);
  }

  /**
   * 对错误进行统一处理
   * 隐藏ProgressDialog
   *
   * @param e
   */
  @Override
  public void onError(Throwable e) {
    /*停止下载*/
    HttpDownManager.getInstance().stopDown(downInfo);
    if(mSubscriberOnNextListener.get()!=null){
      mSubscriberOnNextListener.get().onError(e);
    }
    downInfo.setState(DownState.ERROR);
  }

  /**
   * 将onNext方法中的返回结果交给Activity或Fragment自己处理
   *
   * @param t 创建Subscriber时的泛型类型
   */
  @Override
  public void onNext(T t) {
    if (mSubscriberOnNextListener.get() != null) {
      mSubscriberOnNextListener.get().onNext(t);
    }
  }

  @Override
  public void update(long read, long count, boolean done) {
    if(downInfo.getCountLength()>count){
      read=downInfo.getCountLength()-count+read;
    }else{
      downInfo.setCountLength(count);
    }
    downInfo.setReadLength(read);
    if (mSubscriberOnNextListener.get() != null) {
      /*接受进度消息,造成UI阻塞,如果不需要显示进度可去掉实现逻辑,减少压力*/
      rx.Observable.just(read).observeOn(AndroidSchedulers.mainThread())
          .subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
           /*如果暂停或者停止状态延迟,不需要继续发送回调,影响显示*/
          if(downInfo.getState()==DownState.PAUSE||downInfo.getState()==DownState.STOP)return;
          downInfo.setState(DownState.DOWN);
          mSubscriberOnNextListener.get().updateProgress(aLong,downInfo.getCountLength());
        }
      });
    }
  }

}

9.下载管理类封装HttpDownManager

单利获取

 /**
   * 获取单例
   * @return
   */
  public static HttpDownManager getInstance() {
    if (INSTANCE == null) {
      synchronized (HttpDownManager.class) {
        if (INSTANCE == null) {
          INSTANCE = new HttpDownManager();
        }
      }
    }
    return INSTANCE;
  }

因为单利所以需要记录正在下载的数据和回到sub

 /*回调sub队列*/
  private HashMap<String,ProgressDownSubscriber> subMap;
  /*单利对象*/
  private volatile static HttpDownManager INSTANCE;

  private HttpDownManager(){
    downInfos=new HashSet<>();
    subMap=new HashMap<>();
  }

开始下载需要记录下载的service避免每次都重复创建,然后请求sercie接口,得到ResponseBody数据后将数据流写入到本地文件中(6.0系统后需要提前申请权限)

 /**
   * 开始下载
   */
  public void startDown(DownInfo info){
    /*正在下载不处理*/
    if(info==null||subMap.get(info.getUrl())!=null){
      return;
    }
    /*添加回调处理类*/
    ProgressDownSubscriber subscriber=new ProgressDownSubscriber(info);
    /*记录回调sub*/
    subMap.put(info.getUrl(),subscriber);
    /*获取service,多次请求公用一个sercie*/
    HttpService httpService;
    if(downInfos.contains(info)){
      httpService=info.getService();
    }else{
      DownloadInterceptor interceptor = new DownloadInterceptor(subscriber);
      OkHttpClient.Builder builder = new OkHttpClient.Builder();
      //手动创建一个OkHttpClient并设置超时时间
      builder.connectTimeout(info.getConnectionTime(), TimeUnit.SECONDS);
      builder.addInterceptor(interceptor);

      Retrofit retrofit = new Retrofit.Builder()
          .client(builder.build())
          .addConverterFactory(GsonConverterFactory.create())
          .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
          .baseUrl(info.getBaseUrl())
          .build();
      httpService= retrofit.create(HttpService.class);
      info.setService(httpService);
    }
    /*得到rx对象-上一次下載的位置開始下載*/
    httpService.download("bytes=" + info.getReadLength() + "-",info.getUrl())
        /*指定线程*/
        .subscribeOn(Schedulers.io())
        .unsubscribeOn(Schedulers.io())
          /*失败后的retry配置*/
        .retryWhen(new RetryWhenNetworkException())
        /*读取下载写入文件*/
        .map(new Func1<ResponseBody, DownInfo>() {
          @Override
          public DownInfo call(ResponseBody responseBody) {
            try {
              writeCache(responseBody,new File(info.getSavePath()),info);
            } catch (IOException e) {
              /*失败抛出异常*/
              throw new HttpTimeException(e.getMessage());
            }
            return info;
          }
        })
        /*回调线程*/
        .observeOn(AndroidSchedulers.mainThread())
        /*数据回调*/
        .subscribe(subscriber);

  }

写入文件

注意:一开始调用进度回调是第一次写入在进度回调之前,所以需要判断一次DownInfo是否获取到下载总长度,没有这选择当前ResponseBody 读取长度为总长度

  /**
   * 写入文件
   * @param file
   * @param info
   * @throws IOException
   */
  public void writeCache(ResponseBody responseBody,File file,DownInfo info) throws IOException{
    if (!file.getParentFile().exists())
      file.getParentFile().mkdirs();
    long allLength;
    if (info.getCountLength()==0){
      allLength=responseBody.contentLength();
    }else{
      allLength=info.getCountLength();
    }
      FileChannel channelOut = null;
      RandomAccessFile randomAccessFile = null;
      randomAccessFile = new RandomAccessFile(file, "rwd");
      channelOut = randomAccessFile.getChannel();
      MappedByteBuffer mappedBuffer = channelOut.map(FileChannel.MapMode.READ_WRITE,
          info.getReadLength(),allLength-info.getReadLength());
      byte[] buffer = new byte[1024*8];
      int len;
      int record = 0;
      while ((len = responseBody.byteStream().read(buffer)) != -1) {
        mappedBuffer.put(buffer, 0, len);
        record += len;
      }
      responseBody.byteStream().close();
        if (channelOut != null) {
          channelOut.close();
        }
        if (randomAccessFile != null) {
          randomAccessFile.close();
        }
  }

停止下载

调用 subscriber.unsubscribe()解除监听,然后remove记录的下载数据和sub回调,并且设置下载状态(同步数据库自己添加)

/**
   * 停止下载
   */
  public void stopDown(DownInfo info){
    if(info==null)return;
    info.setState(DownState.STOP);
    info.getListener().onStop();
    if(subMap.containsKey(info.getUrl())) {
      ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
      subscriber.unsubscribe();
      subMap.remove(info.getUrl());
    }
    /*同步数据库*/
  }

暂停下载

原理和停止下载原理一样

 /**
   * 暂停下载
   * @param info
   */
  public void pause(DownInfo info){
    if(info==null)return;
    info.setState(DownState.PAUSE);
    info.getListener().onPuase();
    if(subMap.containsKey(info.getUrl())){
      ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
      subscriber.unsubscribe();
      subMap.remove(info.getUrl());
    }
    /*这里需要讲info信息写入到数据中,可自由扩展,用自己项目的数据库*/
  }

暂停全部和停止全部下载任务

/**
   * 停止全部下载
   */
  public void stopAllDown(){
    for (DownInfo downInfo : downInfos) {
      stopDown(downInfo);
    }
    subMap.clear();
    downInfos.clear();
  }

  /**
   * 暂停全部下载
   */
  public void pauseAll(){
    for (DownInfo downInfo : downInfos) {
      pause(downInfo);
    }
    subMap.clear();
    downInfos.clear();
  }

整合代码HttpDownManager

同样使用了封装二中的retry处理和运行时异常自定义处理封装(不复述了)

补充

有同学说不知道数据库这块怎么替换,所以我加入了greenDao框架去优化数据库存储,在实际运用中可以将这块的逻辑替换成你项目的数据库框架(之前用的都是realm,这回正好练练手)

只需要替换DbUtil的方法即可

总结

到此我们的Rxjava+ReTrofit+okHttp深入浅出-封装就基本完成了,已经可以完全胜任开发和学习的全部工作,如果后续再使用过程中有任何问题欢迎留言给我,会一直维护!

1.Retrofit+Rxjava+okhttp基本使用方法
    2.统一处理请求数据格式
    3.统一的ProgressDialog和回调Subscriber处理
    4.取消http请求
    5.预处理http请求
    6.返回数据的统一判断
    7.失败后的retry封装处理
    8.RxLifecycle管理生命周期,防止泄露
    9.文件上传和文件下载(支持多文件断点续传)

源码:传送门-全部封装源码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • RxJava+Retrofit+OkHttp实现文件上传

    背景 在实际运用中上传是一个必不可少的功能,所以我们在封装二的基础上加入上传的功能,同时需要附带上传进度! 效果 实现 1.定义service接口 注意:Multipart是指定大文件上传过程中的标示,一般上传图片的过程中我们需要附带信息,所以我们需要用到@part指定传递的数值,MultipartBody.Part是指定传递的文件: /*上传文件*/ @Multipart @POST("AppYuFaKu/uploadHeadImg") Observable<BaseResul

  • 浅谈RxJava+Retrofit+OkHttp 封装使用

    背景 之前学习完Retrofit+Rxjava之后写了一篇关于封装的博客,发出后受到大家的关注以及使用,由于不断的完善之前的项目,所以决定把最新的项目封装过程讲解出来,供大家查看! Retrofit介绍: Retrofit和okHttp师出同门,也是Square的开源库,它是一个类型安全的网络请求库,Retrofit简化了网络请求流程,基于OkHtttp做了封装,解耦的更彻底:比方说通过注解来配置请求参数,通过工厂来生成CallAdapter,Converter,你可以使用不同的请求适配器(Ca

  • 深入浅出RxJava+Retrofit+OkHttp网络请求

    浅谈RxJava+Retrofit+OkHttp 封装使用之前发出后收到很多朋友的关注,原本只是自己学习后的一些经验总结,但是有同学运用到实战当中,这让我很惶恐,所有后续一直更新了很多次版本,有些地方难免有所变动导致之前的博客有所出入,正好最近受到掘金邀请内测博客,所以决定重新写一版,按照最后迭代完成的封装详细的讲述一遍,欢迎大家关注! 注意:由于本章的特殊性,后续文章比较长而且复杂,涉及内容也很多,所以大家准备好茶水,前方高能预警. 简介: Retrofit: Retrofit是Square

  • Android中的Retrofit+OkHttp+RxJava缓存架构使用

    RxJava如何与Retrofit结合 先扔出build.gradle文件的内容 dependencies { compile fileTree(dir: 'libs', include: ['*.jar']) testCompile 'junit:junit:4.12' compile 'com.android.support:appcompat-v7:23.2.0' compile 'io.reactivex:rxjava:1.1.0' compile 'io.reactivex:rxand

  • RxJava+Retrofit+OkHttp实现多文件下载之断点续传

    背景 断点续传下载一直是移动开发中必不可少的一项重要的技术,同样的Rxjava和Retrofit的结合让这个技术解决起来更加的灵活,我们完全可以封装一个适合自的下载框架,简单而且安全! 效果 实现 下载和之前的http请求可以相互独立,所以我们单独给download建立一个工程moudel处理 1.创建service接口 和以前一样,先写接口 注意:Streaming是判断是否写入内存的标示,如果小文件可以考虑不写,一般情况必须写:下载地址需要通过@url动态指定(不适固定的),@head标签是

  • Rxjava+Retrofit+Okhttp进行网络访问及数据解析

    目录 1,创建Android项目(Android studio)导入相关依赖 2,定义接口类 3,发出请求,回调信息 4,Rxjava 和 Retrofit的结合 前言: 在平时项目开发中Okhttp3.x.Rxjava2.x.Retrofit2.x,使用的越来越多了,需要我们不断的去学习别人的优秀开发设计程序,今天简单的了解下 1,创建Android项目(Android studio)导入相关依赖 implementation 'com.squareup.okhttp3:okhttp:3.11

  • 基于Ok+Rxjava+retrofit实现断点续传下载

    本文为大家分享了实现断点续传下载的具体代码,供大家参考,具体内容如下 1.基于Ok+Rxjava实现断点续传下载 2.基于Ok+Rxjava+Retrofit实现断点续传下载 上一篇博客中介绍了基于Ok+Rxjava实现断点续传下载,这一篇给大家介绍下基于Ok+Rxjava+Retrofit实现断点续传下载,demo下载地址,效果图跟上一篇图片一样,哈哈 说下我的大致思路吧(跟上一篇略有不同):根据文件下载url按照自己定义的规则生成文件名,判断本地同路径下是否存在此文件,如果存在,文件大小与服

  • Rxjava+Retrofit+MVP实现购物车功能

    本文实例为大家分享了Rxjava Retrofit实现购物车的具体代码,供大家参考,具体内容如下 效果图: 1.依赖 annotationProcessor 'com.jakewharton:butterknife-compiler:8.8.1' compile 'com.jakewharton:butterknife:8.8.1' compile 'com.android.support:recyclerview-v7:26.0.0-alpha1' compile 'com.squareup.

  • RxJava+Retrofit+Mvp实现购物车

    本文实例为大家分享了RxJava Retrofit实现购物车展示的具体代码,供大家参考,具体内容如下 先给大家展示一下效果图 框架结构: 1.项目框架:MVP,图片加载用Fresco,网络请求用OKhttp+Retrofit实现(自己封装,加单例模式), 2.完成购物车数据添加(如果接口无数据,可用接口工具添加数据), 3.自定义view实现加减按钮,每次点击加减,item中的总数及总价要做出相应的改变. 4.当数量为1时,点击减号,数量不变,吐司提示用户最小数量为1. 5.底部总数及总价为所有

  • RxJava+Retrofit实现网络请求封装的方法

    简要介绍 Retrofit是当前应用非常广泛的网络请求框架,通常结合RxJava来进行网络请求,本文将展示一个采用RxJava+Retrofit的网络请求demo. 集成步骤 1.app工程的build.gradle中添加依赖 //retrofit2 implementation 'com.google.code.gson:gson:2.8.5' implementation 'com.squareup.retrofit2:retrofit:2.5.0' implementation 'com.

  • Retrofit + OkHttp缓存处理的示例代码

    通过缓存处理可以有效降低服务器的负荷,加快APP界面加载速度,提升用户体验.Retrofit + OkHttp缓存处理流程是这样的,请求响应之后会在data/data/packageName/cache下建立一个response文件夹,保存缓存数据,后续请求时若无网络,则直接读取缓存内容,若有网络则从网络获取最新数据并缓存. 1.设置缓存路径,大小及添加缓存拦截器 //设置缓存路径 File httpCacheDirectory = new File(CommonApplication.getI

随机推荐