Retrofit+Rxjava下载文件进度的实现

前言

最近在学习Retrofit,虽然Retrofit没有提供文件下载进度的回调,但是Retrofit底层依赖的是OkHttp,实际上所需要的实现OkHttp对下载进度的监听,在OkHttp的官方Demo中,有一个Progress.java的文件,顾名思义。点我查看。

准备工作

本文采用Dagger2,Retrofit,RxJava。

compile'com.squareup.retrofit2:retrofit:2.0.2'
compile'com.squareup.retrofit2:converter-gson:2.0.2'
compile'com.squareup.retrofit2:adapter-rxjava:2.0.2'
//dagger2
compile'com.google.dagger:dagger:2.6'
apt'com.google.dagger:dagger-compiler:2.6'
//RxJava
compile'io.reactivex:rxandroid:1.2.0'
compile'io.reactivex:rxjava:1.1.5'
compile'com.jakewharton.rxbinding:rxbinding:0.4.0'

改造ResponseBody

okHttp3默认的ResponseBody因为不知道进度的相关信息,所以需要对其进行改造。可以使用接口监听进度信息。这里采用的是RxBus发送FileLoadEvent对象实现对下载进度的实时更新。这里先讲改造的ProgressResponseBody。

public class ProgressResponseBody extends ResponseBody {
 private ResponseBody responseBody;
 private BufferedSource bufferedSource;
 public ProgressResponseBody(ResponseBody responseBody) {
 this.responseBody = responseBody;
 }
 @Override
 public MediaType contentType() {
 return responseBody.contentType();
 }
 @Override
 public long contentLength() {
 return responseBody.contentLength();
 }
 @Override
 public BufferedSource source() {
 if (bufferedSource == null) {
  bufferedSource = Okio.buffer(source(responseBody.source()));
 }
 return bufferedSource;
 }
 private Source source(Source source) {
 return new ForwardingSource(source) {
  long bytesReaded = 0;
  @Override
  public long read(Buffer sink, long byteCount) throws IOException {
  long bytesRead = super.read(sink, byteCount);
  bytesReaded += bytesRead == -1 ? 0 : bytesRead;
  //实时发送当前已读取的字节和总字节
  RxBus.getInstance().post(new FileLoadEvent(contentLength(), bytesReaded));
  return bytesRead;
  }
 };
 }
}

呃,OKIO相关知识我也正在学,这个是从官方Demo中copy的代码,只不过中间使用了RxBus实时发送FileLoadEvent对象。

FileLoadEvent

FileLoadEvent很简单,包含了当前已加载进度和文件总大小。

public class FileLoadEvent {
 long total;
 long bytesLoaded;
 public long getBytesLoaded() {
 return bytesLoaded;
 }
 public long getTotal() {
 return total;
 }
 public FileLoadEvent(long total, long bytesLoaded) {
 this.total = total;
 this.bytesLoaded = bytesLoaded;
 }
}

RxBus

RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用OTTO或者 EventBus。点我查看详情。

public class RxBus {
 private static volatile RxBus mInstance;
 private SerializedSubject<Object, Object> mSubject;
 private HashMap<String, CompositeSubscription> mSubscriptionMap;
 /**
 * PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
 * Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,
 * 需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。
 */
 private RxBus() {
 mSubject = new SerializedSubject<>(PublishSubject.create());
 }
 /**
 * 单例 双重锁
 * @return
 */
 public static RxBus getInstance() {
 if (mInstance == null) {
  synchronized (RxBus.class) {
  if (mInstance == null) {
   mInstance = new RxBus();
  }
  }
 }
 return mInstance;
 }
 /**
 * 发送一个新的事件
 * @param o
 */
 public void post(Object o) {
 mSubject.onNext(o);
 }
 /**
 * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
 * @param type
 * @param <T>
 * @return
 */
 public <T> Observable<T> tObservable(final Class<T> type) {
 //ofType操作符只发射指定类型的数据,其内部就是filter+cast
 return mSubject.ofType(type);
 }
 public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {
 return tObservable(type)
  .onBackpressureBuffer()
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(next, error);
 }
 public void addSubscription(Object o, Subscription subscription) {
 if (mSubscriptionMap == null) {
  mSubscriptionMap = new HashMap<>();
 }
 String key = o.getClass().getName();
 if (mSubscriptionMap.get(key) != null) {
  mSubscriptionMap.get(key).add(subscription);
 } else {
  CompositeSubscription compositeSubscription = new CompositeSubscription();
  compositeSubscription.add(subscription);
  mSubscriptionMap.put(key, compositeSubscription);
  // Log.e("air", "addSubscription:订阅成功 " );
 }
 }
 public void unSubscribe(Object o) {
 if (mSubscriptionMap == null) {
  return;
 }
 String key = o.getClass().getName();
 if (!mSubscriptionMap.containsKey(key)) {
  return;
 }
 if (mSubscriptionMap.get(key) != null) {
  mSubscriptionMap.get(key).unsubscribe();
 }
 mSubscriptionMap.remove(key);
 //Log.e("air", "unSubscribe: 取消订阅" );
 }
}

FileCallBack

那么,重点来了。代码其实有5个方法需要重写,好吧,其实这些方法可以精简一下。其中progress()方法有两个参数,progress和total,分别表示文件已下载的大小和总大小,我们将这两个参数不断更新到UI上就行了。

public abstract class FileCallBack<T> {
 private String destFileDir;
 private String destFileName;
 public FileCallBack(String destFileDir, String destFileName) {
 this.destFileDir = destFileDir;
 this.destFileName = destFileName;
 subscribeLoadProgress();
 }
 public abstract void onSuccess(T t);
 public abstract void progress(long progress, long total);
 public abstract void onStart();
 public abstract void onCompleted();
 public abstract void onError(Throwable e);
 public void saveFile(ResponseBody body) {
 InputStream is = null;
 byte[] buf = new byte[2048];
 int len;
 FileOutputStream fos = null;
 try {
  is = body.byteStream();
  File dir = new File(destFileDir);
  if (!dir.exists()) {
  dir.mkdirs();
  }
  File file = new File(dir, destFileName);
  fos = new FileOutputStream(file);
  while ((len = is.read(buf)) != -1) {
  fos.write(buf, 0, len);
  }
  fos.flush();
  unsubscribe();
  //onCompleted();
 } catch (FileNotFoundException e) {
  e.printStackTrace();
 } catch (IOException e) {
  e.printStackTrace();
 } finally {
  try {
  if (is != null) is.close();
  if (fos != null) fos.close();
  } catch (IOException e) {
  Log.e("saveFile", e.getMessage());
  }
 }
 }
 /**
 * 订阅加载的进度条
 */
 public void subscribeLoadProgress() {
 Subscription subscription = RxBus.getInstance().doSubscribe(FileLoadEvent.class, new Action1<FileLoadEvent>() {
  @Override
  public void call(FileLoadEvent fileLoadEvent) {
  progress(fileLoadEvent.getBytesLoaded(),fileLoadEvent.getTotal());
  }
 }, new Action1<Throwable>() {
  @Override
  public void call(Throwable throwable) {
  //TODO 对异常的处理
  }
 });
 RxBus.getInstance().addSubscription(this, subscription);
 }
 /**
 * 取消订阅,防止内存泄漏
 */
 public void unsubscribe() {
 RxBus.getInstance().unSubscribe(this);
 }
}

开始下载

使用自己的ProgressResponseBody

通过OkHttpClient的拦截器去拦截Response,并将我们的ProgressReponseBody设置进去监听进度。

public class ProgressInterceptor implements Interceptor {
 @Override
 public Response intercept(Chain chain) throws IOException {
 Response originalResponse = chain.proceed(chain.request());
 return originalResponse.newBuilder()
  .body(new ProgressResponseBody(originalResponse.body()))
  .build();
 }
}

构建Retrofit

@Module
public class ApiModule {
 @Provides
 @Singleton
 public OkHttpClient provideClient() {
 OkHttpClient client = new OkHttpClient.Builder()
  .addInterceptor(new ProgressInterceptor())
  .build();
 return client;
 }
 @Provides
 @Singleton
 public Retrofit provideRetrofit(OkHttpClient client){
 Retrofit retrofit = new Retrofit.Builder()
  .client(client)
  .baseUrl(Constant.HOST)
  .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
  .addConverterFactory(GsonConverterFactory.create())
  .build();
 return retrofit;
 }
 @Provides
 @Singleton
 public ApiInfo provideApiInfo(Retrofit retrofit){
 return retrofit.create(ApiInfo.class);
 }
 @Provides
 @Singleton
 public ApiManager provideApiManager(Application application, ApiInfo apiInfo){
 return new ApiManager(application,apiInfo);
 }
}

请求接口

public interface ApiInfo {
 @Streaming
 @GET
 Observable<ResponseBody> download(@Url String url);
}

执行请求

public void load(String url, final FileCallBack<ResponseBody> callBack){
 apiInfo.download(url)
  .subscribeOn(Schedulers.io())//请求网络 在调度者的io线程
  .observeOn(Schedulers.io()) //指定线程保存文件
  .doOnNext(new Action1<ResponseBody>() {
   @Override
   public void call(ResponseBody body) {
   callBack.saveFile(body);
   }
  })
  .observeOn(AndroidSchedulers.mainThread()) //在主线程中更新ui
  .subscribe(new FileSubscriber<ResponseBody>(application,callBack));
 }

在presenter层中执行网络请求。

通过V层依赖注入的presenter对象调用请求网络,请求网络后调用V层更新UI的操作。

public void load(String url){
 String fileName = "app.apk";
 String fileStoreDir = Environment.getExternalStorageDirectory().getAbsolutePath();
 Log.e(TAG, "load: "+fileStoreDir.toString() );
 FileCallBack<ResponseBody> callBack = new FileCallBack<ResponseBody>(fileStoreDir,fileName) {
  @Override
  public void onSuccess(final ResponseBody responseBody) {
  Toast.makeText(App.getInstance(),"下载文件成功",Toast.LENGTH_SHORT).show();
  }
  @Override
  public void progress(long progress, long total) {
  iHomeView.update(total,progress);
  }
  @Override
  public void onStart() {
  iHomeView.showLoading();
  }
  @Override
  public void onCompleted() {
  iHomeView.hideLoading();
  }
  @Override
  public void onError(Throwable e) {
  //TODO: 对异常的一些处理
  e.printStackTrace();
  }
 };
 apiManager.load(url, callBack);
 }

踩到的坑。

依赖的Retrofit版本一定要保持一致!!!说多了都是泪啊。

保存文件时要使用RxJava的doOnNext操作符,后续更新UI的操作切换到UI线程。

总结

看似代码很多,其实过程并不复杂:

在保存文件时,调用ForwardingSource的read方法,通过RxBus发送实时的FileLoadEvent对象。

FileCallBack订阅RxBus发送的FileLoadEvent。通过接收到FileLoadEvent中的下载进度和文件总大小对UI进行更新。

在下载保存文件完成后,取消订阅,防止内存泄漏。

Demo地址:https://github.com/AirMiya/DownloadDemo

(0)

相关推荐

  • RxJava2.x+ReTrofit2.x多线程下载文件的示例代码

    写在前面: 接到公司需求:要做一个apk升级的功能,原理其实很简单,百度也一大堆例子,可大部分都是用框架,要么就是HttpURLConnection,实在是不想这么干.正好看了两天的RxJava2.x+ReTrofit2.x,据说这俩框架是目前最火的异步请求框架了.固本文使用RxJava2.x+ReTrofit2.x实现多线程下载文件的功能. 如果对RxJava2.x+ReTrofit2.x不太了解的请先去看相关的文档. 大神至此请无视. 思路分析: 思路及其简洁明了,主要分为以下四步 1.获取

  • Android中的Retrofit+OkHttp+RxJava缓存架构使用

    RxJava如何与Retrofit结合 先扔出build.gradle文件的内容 dependencies { compile fileTree(dir: 'libs', include: ['*.jar']) testCompile 'junit:junit:4.12' compile 'com.android.support:appcompat-v7:23.2.0' compile 'io.reactivex:rxjava:1.1.0' compile 'io.reactivex:rxand

  • rxjava+retrofit实现多图上传实例代码

    在看了网上多篇rxjava和retrofit的文章后,大概有了一个初步的认识,刚好要做一个多图上传的功能,就拿它开刀吧.下面的内容将基于之前实现方式和使用rxjava实现之间的异同展开,初次写笔记不喜就喷. 普通版多图上传 由于目前手机照片动辄几M的大小,如果不做处理就直接上传,我就笑笑不说话(给个眼神你自己体会).所以,上传分为两步:对图片进行压缩和请求上传.下面请看伪代码(PS:自己不会写后台,项目后台不能拿来用,所以只能给伪代码了) //图片集合 List<String> imgs =

  • Retrofit Rxjava实现图片下载、保存并展示实例

    首先我们看一下Retrofit常规的用法,在不使用Rxjava的情况下,我们默认返回的是Call. public interface ServiceApi { //下载文件 @GET Call<ResponseBody> downloadPicFromNet(@Url String fileUrl); } 但是如果我们要配合Rxjava使用,那么就要按照如下方式来重新定义我们的方法: @GET Observable<ResponseBody> downloadPicFromNet(

  • Android Retrofit和Rxjava的网络请求

    Android  Retrofit和Rxjava的网络请求 去年的时候好多公司就已经使用Rxjava和Retrofit了,最近自自己学习了一下,感觉真的是很好用,让自己的网络请求变得更简单了,而且封装性极强. 首先做一下准备工作,导入需要引用的文件 compile 'com.android.support:appcompat-v7:25.1.0' testCompile 'junit:junit:4.12' compile 'io.reactivex:rxjava:1.1.0' compile

  • Retrofit+Rxjava下载文件进度的实现

    前言 最近在学习Retrofit,虽然Retrofit没有提供文件下载进度的回调,但是Retrofit底层依赖的是OkHttp,实际上所需要的实现OkHttp对下载进度的监听,在OkHttp的官方Demo中,有一个Progress.java的文件,顾名思义.点我查看. 准备工作 本文采用Dagger2,Retrofit,RxJava. compile'com.squareup.retrofit2:retrofit:2.0.2' compile'com.squareup.retrofit2:con

  • 基于Retrofit+Rxjava实现带进度显示的下载文件

    本文实例为大家分享了Retrofit Rxjava实现下载文件的具体代码,供大家参考,具体内容如下 本文采用 :retrofit + rxjava 1.引入: //rxJava compile 'io.reactivex:rxjava:latest.release' compile 'io.reactivex:rxandroid:latest.release' //network - squareup compile 'com.squareup.retrofit2:retrofit:latest

  • Retrofit+RxJava实现带进度下载文件

    Retrofit+RxJava已经是目前市场上最主流的网络框架,使用它进行平常的网络请求异常轻松,之前也用Retrofit做过上传文件和下载文件,但发现:使用Retrofit做下载默认是不支持进度回调的,但产品大大要求下载文件时显示下载进度,那就不得不深究下了. 接下来我们一起封装,使用Retrofit+RxJava实现带进度下载文件. github:JsDownload 先来看看UML图: 大家可能还不太清楚具体是怎么处理的,别急,我们一步步来: 1.添依赖是必须的啦 compile 'io.

  • Retrofit+Rxjava实现文件上传和下载功能

    Retrofit简介: 在Android API4.4之后,Google官方使用了square公司推出的okHttp替换了HttpClient的请求方式.后来square公司又推出了基于okHttp的网络请求框架:Retrofit. 什么是 RxJava? RxJava 是一个响应式编程框架,采用观察者设计模式.所以自然少不了 Observable 和 Subscriber 这两个东东了. RxJava 是一个开源项目,地址:https://github.com/ReactiveX/RxJava

  • Retrofit+RxJava实现带进度条的文件下载

    项目中需要使用到更新版本,因此研究了一下Retrofit的下载文件,和进度条效果,其间也遇到了一些坑,写出来加深一下记忆,也为别的同学提供一下思路. 先说一下版本控制吧,通用做法基本上是通过接口获取服务器存储的app版本号,与应用的版本号进行比较,版本较低就去更新,先看一下如何获取应用版本号吧 PackageManager packageManager = mActivity.getPackageManager(); PackageInfo packageInfo = null; try { p

  • Python 给下载文件显示进度条和下载时间的实现

    大家在下载文件时能够显示下载进度和时间非常好,其实实现它方法很简单,这里我写了个进度条的模块,其中还附带上了运行时间也就是下载时间了. 该模块调用了三个库: 1.os 2.requests 3.time 话不多说,先上代码!!!. # 进度条模块 def progressbar(url,path): if not os.path.exists(path): # 看是否有该文件夹,没有则创建文件夹 os.mkdir(path) start = time.time() #下载开始时间 respons

  • 使用Retrofit下载文件并实现进度监听的示例

    1.前言 最近要做一个带进度条下载文件的功能,网上看了一圈,发现好多都是基于 OkHttpClient 添加拦截器来实现的,个人觉得略显复杂,所以还是采用最简单的方法来实现:基于文件写入来进行进度的监听. 2.实现步骤 2.1 设计监听接口 根据需求设计一下接口: public interface DownloadListener { void onStart();//下载开始 void onProgress(int progress);//下载进度 void onFinish(String p

  • C# Winform下载文件并显示进度条的实现代码

    方法一: 效果如下图所示: 代码如下: using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; namespace WinShowDown { public partial class F

  • python下载文件时显示下载进度的方法

    本文实例讲述了python下载文件时显示下载进度的方法.分享给大家供大家参考.具体分析如下: 将这段代码放入你的脚本中,类似:urllib.urlretrieve(getFile, saveFile, reporthook=report) 第三个参数如下面的函数定义report,urlretrieve下载文件时会实时回调report函数,显示下载进度 def report(count, blockSize, totalSize): percent = int(count*blockSize*10

随机推荐