谈谈RxJava2中的异常及处理方法

前言

众所周知,RxJava2 中当链式调用中抛出异常时,如果没有对应的 Consumer 去处理异常,则这个异常会被抛出到虚拟机中去,Android 上的直接表现就是 crash,程序崩溃。

订阅方式

说异常处理前咱们先来看一下 RxJava2 中 Observable 订阅方法 subscribe() 我们常用的几种订阅方式:

// 1
subscribe()
// 2
Disposable subscribe(Consumer<? super T> onNext)
// 3
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
// 4
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
// 5
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)
// 6
void subscribe(Observer<? super T> observer)

无参和以 Consumer为参数的几种方法内部都是以默认参数补齐的方式最终调用第 5 个方法,而方法 5 内部通过 LambdaObserver 将参数包装成 Observer 再调用第 6 个方法

 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
 Action onComplete, Consumer<? super Disposable> onSubscribe) {
 ObjectHelper.requireNonNull(onNext, "onNext is null");
 ObjectHelper.requireNonNull(onError, "onError is null");
 ObjectHelper.requireNonNull(onComplete, "onComplete is null");
 ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

 LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

 subscribe(ls);

 return ls;
 }

所以使用 Consumer 参数方式和  Observer 参数方式进行订阅除了观察回调来源不一样其他没有任何差别。但就是因为这种差别,在异常情况发生时的处理结果上也会产生差别

异常处理

我们分别进行一下几种方式模拟异常:

1、Observer onNext 中抛出异常(切换线程)

 apiService.newJsonKeyData()
  .doOnSubscribe { t -> compositeDisposable.add(t) }
  .compose(RxScheduler.sync()) // 封装的线程切换
  .subscribe(object : Observer<List<ZooData>> {
  override fun onComplete() {

  }

  override fun onSubscribe(d: Disposable) {

  }

  override fun onNext(t: List<ZooData>) {
  throw RuntimeException("runtime exception")
  }

  override fun onError(e: Throwable) {
  Log.d("error", e.message)
  }

  })

结果:不会触发 onError,App 崩溃

2、Observer onNext 中抛出异常(未切换线程)

  Observable.create<String> {
   it.onNext("ssss")
   }
    .subscribe(object : Observer<String> {
    override fun onComplete() {

    }

    override fun onSubscribe(d: Disposable) {

    }

    override fun onNext(t: String) {
     Log.d("result::", t)
     throw RuntimeException("run llllll")
    }

    override fun onError(e: Throwable) {
     Log.e("sss", "sss", e)
    }

    })

结果:会触发 onError,App 未崩溃

3、Observer map 操作符中抛出异常

  apiService.newJsonKeyData()
   .doOnSubscribe { t -> compositeDisposable.add(t) }
   .map {
   throw RuntimeException("runtime exception")
   }
   .compose(RxScheduler.sync())
   .subscribe(object : Observer<List<ZooData>> {
   override fun onComplete() {

   }

   override fun onSubscribe(d: Disposable) {

   }

   override fun onNext(t: List<ZooData>) {

   }

   override fun onError(e: Throwable) {
    Log.d("error", e.message)
   }

   })

结果:会触发 Observer 的 onError,App 未崩溃

4、Consumer onNext 中抛出异常

  apiService.newJsonKeyData()
   .doOnSubscribe { t -> compositeDisposable.add(t) }
   .compose(RxScheduler.sync())
   .subscribe({
   throw RuntimeException("messsasassssssssssssssssssssssssssssssssssssss")
   }, {
   Log.d("Error", it.message)
   })

结果 A:有 errorConsumer 触发 errorConsumer,App 未崩溃

 apiService.newJsonKeyData()
   .doOnSubscribe { t -> compositeDisposable.add(t) }
   .compose(RxScheduler.sync())
   .subscribe {
   throw RuntimeException("messsasassssssssssssssssssssssssssssssssssssss")
   }

结果 B:无 errorConsumer,App 崩溃

那么为什么会出现这些不同情况呢?我们从源码中去一探究竟。

Consumer 订阅方式的崩溃与不崩溃

subscribe() 传入 consumer 类型参数最终在 Observable 中会将传入的参数转换为 LambdaObserver 再调用 subscribe(lambdaObserver)进行订阅。展开  LambdaObserver:(主要看 onNext 和 onError 方法中的处理)

		.
		.
		.
		 @Override
 public void onNext(T t) {
 if (!isDisposed()) {
  try {
  onNext.accept(t);
  } catch (Throwable e) {
  Exceptions.throwIfFatal(e);
  get().dispose();
  onError(e);
  }
 }
 }

 @Override
 public void onError(Throwable t) {
 if (!isDisposed()) {
  lazySet(DisposableHelper.DISPOSED);
  try {
  onError.accept(t);
  } catch (Throwable e) {
  Exceptions.throwIfFatal(e);
  RxJavaPlugins.onError(new CompositeException(t, e));
  }
 } else {
  RxJavaPlugins.onError(t);
 }
 }
		.
		.
		.

onNext 中调用了对应 consumer 的 apply() 方法,并且进行了 try catch。因此我们在 consumer 中进行的工作抛出异常会被捕获触发 LambdaObserver 的 onError。再看 onError 中,如果订阅未取消且 errorConsumer 的 apply() 执行无异常则能正常走完事件流,否则会调用 RxJavaPlugins.onError(t)。看到这里应该就能明白了,当订阅时未传入 errorConsumer时 Observable 会指定 OnErrorMissingConsumer 为默认的 errorConsumer,发生异常时抛出 OnErrorNotImplementedException。

RxJavaPlugins.onError(t)

上面分析,发现异常最终会流向 RxJavaPlugins.onError(t)。这个方法为 RxJava2 提供的一个全局的静态方法。

 public static void onError(@NonNull Throwable error) {
 Consumer<? super Throwable> f = errorHandler;

 if (error == null) {
  error = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
 } else {
  if (!isBug(error)) {
  error = new UndeliverableException(error);
  }
 }

 if (f != null) {
  try {
  f.accept(error);
  return;
  } catch (Throwable e) {
  // Exceptions.throwIfFatal(e); TODO decide
  e.printStackTrace(); // NOPMD
  uncaught(e);
  }
 }

 error.printStackTrace(); // NOPMD
 uncaught(error);
 }

查看其源码发现,当 errorHandler 不为空时异常将由其消耗掉,为空或者消耗过程产生新的异常则 RxJava 会将异常抛给虚拟机(可能导致程序崩溃)。 errorHandler本身是一个 Consumer 对象,我们可以通过如下方式配置他:

 RxJavaPlugins.setErrorHandler(object : Consumer1<Throwable> {
 override fun accept(t: Throwable?) {
  TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
 }

 })

数据操作符中抛出异常

以 map 操作符为例,map 操作符实际上 RxJava 是将事件流 hook 了另一个新的 Observable ObservableMap

 @CheckReturnValue
 @SchedulerSupport(SchedulerSupport.NONE)
 public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
 ObjectHelper.requireNonNull(mapper, "mapper is null");
 return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
 }

进入 ObservableMap 类,发现内部订阅了一个内部静态类 MapObserver,重点看 MapObserver  的 onNext 方法

 public void onNext(T t) {
  if (done) {
  return;
  }

  if (sourceMode != NONE) {
  downstream.onNext(null);
  return;
  }

  U v;

  try {
  v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
  } catch (Throwable ex) {
  fail(ex);
  return;
  }
  downstream.onNext(v);
 }

onNext 中 try catch 了 mapper.apply(),这个 apply 执行的就是我们在操作符中实现的 function 方法。因此在 map 之类数据变换操作符中产生异常能够自身捕获并发送给最终的 Observer。如果此时的订阅对象中能消耗掉异常则事件流正常走 onError() 结束,如果订阅方式为上以节中的 consumer,则崩溃情况为上一节中的分析结果。

Observer 的 onNext 中抛出异常

上述的方式 1 为一次网络请求,里面涉及到线程的切换。方式 2 为直接 create 一个 Observable 对象,不涉及线程切换,其结果为线程切换后,观察者 Observer 的 onNext() 方法中抛出异常无法触发 onError(),程序崩溃。

未切换线程的 Observable.create

查看 create() 方法源码,发现内部创建了一个 ObservableCreate 对象,在调用订阅时会触发 subscribeActual()  方法。在  subscribeActual() 中再调用我们 create 时传入的 ObservableOnSubscribe 对象的 subscribe() 方法来触发事件流。

 @Override
 protected void subscribeActual(Observer<? super T> observer) {

		// 对我们的观察者使用 CreateEmitter 进行包装,内部的触发方法是相对应的
 CreateEmitter<T> parent = new CreateEmitter<T>(observer);
 observer.onSubscribe(parent);

 try {
			// source 为 create 时创建的 ObservableOnSubscribe 匿名内部接口实现类
  source.subscribe(parent);
 } catch (Throwable ex) {
  Exceptions.throwIfFatal(ex);
  parent.onError(ex);
 }
 }

上述代码中的订阅过程是使用 try catch 今夕包裹的。订阅及订阅触发后发送的事件流都在一个线程,所以能够捕获整个事件流中的异常。(PS : 大家可以尝试下使用  observeOn() 切换事件发送线程。会发现异常不能再捕获,程序崩溃)

涉及线程变换时的异常处理

Retrofit 进行网络请求返回的 Observable 对象实质上是 RxJava2CallAdapter 中生成的 BodyObservable,期内部的 onNext 是没有进行异常捕获的。其实这里是否捕获并不是程序崩溃的根本原因,因为进行网络请求,必然是涉及到线程切换的。就算此处 try catch 处理了,也并不能捕获到事件流下游的异常。

 @Override public void onNext(Response<R> response) {
 if (response.isSuccessful()) {
 observer.onNext(response.body());
 } else {
 terminated = true;
 Throwable t = new HttpException(response);
 try {
  observer.onError(t);
 } catch (Throwable inner) {
  Exceptions.throwIfFatal(inner);
  RxJavaPlugins.onError(new CompositeException(t, inner));
 }
 }
 }

以我们在最终的 Observer 的 onNext 抛出异常为例,要捕获这次异常那么必须在最终的调用线程中去进行捕获。即 .observeOn(AndroidSchedulers.mainThread()) 切换过来的 Android 主线程。与其他操作符一样,线程切换时产生了一组新的订阅关系,RxJava 内部会创建一个新的观察对象 ObservableObserveOn。

 @Override
 public void onNext(T t) {
  if (done) {
  return;
  }

  if (sourceMode != QueueDisposable.ASYNC) {
  queue.offer(t);
  }
  schedule();
 }
		.
		.
		.
		void schedule() {
  if (getAndIncrement() == 0) {
  worker.schedule(this); // 执行 ObservableObserveOn 的 run 方法
  }
 }
		.
		.
		.
	 @Override
 public void run() {
  if (outputFused) {
  drainFused();
  } else {
  drainNormal();
  }
 }
	

而执行任务的 worker 即为对应线程 Scheduler 的对应实现子类所创建的 Worker,以 AndroidSchedulers.mainThread() 为例,Scheduler 实现类为 HandlerScheduler,其对应 Worker 为 HandlerWorker,最终任务交给 ScheduledRunnable 来执行。

 private static final class ScheduledRunnable implements Runnable, Disposable {
 private final Handler handler;
 private final Runnable delegate;

 private volatile boolean disposed; // Tracked solely for isDisposed().

 ScheduledRunnable(Handler handler, Runnable delegate) {
  this.handler = handler;
  this.delegate = delegate;
 }

 @Override
 public void run() {
  try {
  delegate.run();
  } catch (Throwable t) {
  RxJavaPlugins.onError(t);
  }
 }

 @Override
 public void dispose() {
  handler.removeCallbacks(this);
  disposed = true;
 }

 @Override
 public boolean isDisposed() {
  return disposed;
 }
 }

会发现,run 中 进行了 try catch。但 catch 内消化异常使用的是全局异常处理 RxJavaPlugins.onError(t);,而不是某一个观察者的 onError。所以在经过切换线程操作符后,观察者 onNext 中抛出的异常,onError 无法捕获。

处理方案

既然知道了问题所在,那么处理问题的方案也就十分清晰了。

1、注册全局的异常处理

 RxJavaPlugins.setErrorHandler(object : Consumer<Throwable> {
  override fun accept(t: Throwable?) {
  // do something
  }

 })

2、Consumer 作为观察者时,不完全确定没有异常一定要添加异常处理 Consumer

 apiService.stringData()
   .doOnSubscribe { t -> compositeDisposable.add(t) }
   .compose(RxScheduler.sync())
   .subscribe(Consumer<Boolean>{ }, Consumer<Throwable> { })

3、Observer 可以创建一个 BaseObaerver 将 onNext 内部进行 try catch 人为的流转到 onError 中,项目中的观察这都使用这个 BaseObserver 的子类。

 @Override
 public void onNext(T t) {
 try {
  onSuccess(t);
 } catch (Exception e) {
  onError(e);
 }
 data = t;
 success = true;
 }

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。

(0)

相关推荐

  • 详解RxJava2 Retrofit2 网络框架简洁轻便封装

    前言 RxJava2.Retrofit2火了有一段时间了,前段时间给公司的项目引入了这方面相关的技术,在此记录一下相关封装的思路. 需求 封装之前要先明白需要满足哪些需求. RxJava2衔接Retrofit2 Retrofit2网络框架异常的统一处理 兼容fastjson(可选) RxJava2内存泄漏的处理 异步请求加入Loading Dialog 依赖 implementation 'io.reactivex.rxjava2:rxandroid:2.0.1' implementation

  • Rxjava2_Flowable_Sqlite_Android数据库访问实例

    一.使用Rxjava访问数据库的优点: 1.随意的线程控制,数据库操作在一个线程,返回数据处理在ui线程 2.随时订阅和取消订阅,而不必再使用回调函数 3.对读取的数据用rxjava进行过滤,流式处理 4.使用sqlbrite可以原生返回rxjava的格式,同时是响应式数据库框架 (有数据添加和更新时自动调用之前订阅了的读取函数,达到有数据添加自动更新ui的效果, 同时这个特性没有禁止的方法,只能通过取消订阅停止这个功能,对于有的框架这反而是一种累赘) 二.接下来之关注实现过程: 本次实现用rx

  • Android 使用 RxJava2 实现倒计时功能的示例代码

    倒计时功能被广泛运用在 App 启动页.短信验证码倒计时等,通常做法是起一个Handler ,在子线程里完成倒计时,如今这一做法有了替代品 -- RxJava ,RxJava是被行内一致认可的第三方开源库,我们可以使用RxJava实现倒计时功能. 示例图: 示例代码: 导入必要的库文件(Android支持库和Reactivex系列支持库) implementation 'com.android.support:appcompat-v7:27.1.0' implementation 'com.an

  • RxJava2.x+ReTrofit2.x多线程下载文件的示例代码

    写在前面: 接到公司需求:要做一个apk升级的功能,原理其实很简单,百度也一大堆例子,可大部分都是用框架,要么就是HttpURLConnection,实在是不想这么干.正好看了两天的RxJava2.x+ReTrofit2.x,据说这俩框架是目前最火的异步请求框架了.固本文使用RxJava2.x+ReTrofit2.x实现多线程下载文件的功能. 如果对RxJava2.x+ReTrofit2.x不太了解的请先去看相关的文档. 大神至此请无视. 思路分析: 思路及其简洁明了,主要分为以下四步 1.获取

  • RxJava2和Retrofit2封装教程(整洁、简单、实用)

    前言 RxJava2与Retrofit2是老搭档了,之前写了一篇<RxJava和Retrofit2的统一处理单个请求>,是用的Rxjava1.0,本次使用Rxjava2.0与Retrofit2进行封装,一样整洁.简单.实用.Rxjava2相比Rxjava1优化和改动不少了东西,网上有很多大神写的文章,这里就不粘贴复制了.封装的过程有什么问题.疑问,请在下方留言. 下面话不多说了,来一起看看详细的介绍吧 封装教程如下: 核心网络请求: package com.lin.netrequestdemo

  • 基于RxJava2实现的简单图片爬虫的方法

    今年十月份以来,跟朋友尝试导入一些图片到tensorflow来生成模型,这就需要大量的图片.刚开始我只写了一个简单的HttpClient程序来抓取图片,后来为了通用性索性写一个简单的图片爬虫程序.它可以用于抓取单张图片.多张图片.某个网页下的所有图片.多个网页下的所有图片. github地址:https://github.com/fengzhizi715/PicCrawler 这个爬虫使用了HttpClient.RxJava2以及Java 8的一些特性.它支持一些简单的定制,比如定制User-A

  • 基于Retrofit2+RxJava2实现Android App自动更新

    本文实例为大家分享了Retrofit2 RxJava2实现Android App自动更新,具体内容如下 功能解析 自动更新可以说已经是App的标配了,很多第三方平台也都支持这个功能,最近手头上的项目需要加入这个App自动更新,考虑到项目里有用到Retrofit2和RxJava2,于是打算使用它俩自己实现这个功能. 分析App自动更新,可以分为以下三个功能点: 1.APK文件的下载 2.下载进度的实时更新显示 3.下载完成后的自动安装 其中比较难的一点是下载进度的实时更新显示,更难的是如何优雅的进

  • RxJava2.x实现定时器的实例代码

    前言 由于现在网络层已经升级到RxJava2.x相关的了,所以需要做些调整.虽然RxJava1.x和RxJava2.x同属RxJava系列,但由于RxJava2.x部分代码的重写,导致RxJava2.x与RxJava1.x已是两个不同的版本,RxJava2.x在性能上更优,尤其在背压支持上.当然,此篇重点不在Rx版本上的区别,有兴趣的同学可以自行研究.当然,2.x之于1.x的区别之一是2.x中已经没有 Subscription mSubscription, Observable.create()

  • RxJava2配置及使用详解

    RxJava2.0是一个非常棒的流式编程,采用的观察者模式思想,事件的产生者产生事间之后发送给绑定的接受者,接受顺序与发送顺序一致. 依赖: compile 'io.reactivex.rxjava2:rxjava:2.0.1' compile 'io.reactivex.rxjava2:rxandroid:2.0.1' 简单使用: //观察者模式,这里产生事件,事件产生后发送给接受者,但是一定要记得将事件的产生者和接收者捆绑在一起,否则会出现错误 Observable.create(new O

  • Android 用RxBinding与RxJava2实现短信验证码倒计时功能

    场景:注册账号页面时,我们点击按钮发送验证码,在等待验证码时,界面会有倒计时提示,这此期间按钮不可点击.当倒计时结束时,按钮恢复. 实现与功能都不难,这次用 RxBinding,RxJava2 的方法去实现.并实现了手动.自动停止倒计时,防止多次点击. 功能动态图 要使用 RxBinding.RxJava2 先添加 Gradle 配置: compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile 'io.reactivex.rxjava2:rxj

随机推荐