RxJava2 线程调度的方法

subscribeOn和observeOn负责线程切换,同时某些操作符也默认指定了线程.

我们这里不分析在线程中怎么执行的.只看如何切换到某个指定线程.

subscribeOn

Observable.subscribeOn()在方法内部生成了一个ObservableSubscribeOn对象.

主要看一下ObservableSubscribeOn的subscribeActual方法.

 @Override
  public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    //调用下游的Observer的onSubscribe方法
    observer.onSubscribe(parent);
    //通过SubscribeTask执行了上游Observable的subscribeActual方法
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
  }

scheduler.scheduleDirect(Runnable)用于执行SubscribeTask这个任务.SubscribeTask本身是Runnable的实现类.看一下其run方法.

    @Override
    public void run() {
      //上游的Observable.subscribe方法被切换到了新的线程
      source.subscribe(parent);
    }

首先可以得出结论:subscribeOn将上游的Observable的subscribe方法切换到了新的线程.

如果多次调用subscribeOn切换线程,会有什么效果?

由下往上,每次调用subscribeOn,都会导致上游的Observable的subscribeActual切换到指定的线程.那么最后一次调用的切换最上游的创建型操作符的subscribeActual的执行线程.如果操作符有默认执行线程怎么办?

操作符默认线程

如果是创建型操作符,处于最上游,那么subscribeOn的线程切换对它不起作用.天高皇帝远,县官不如现管.就是这个道理.
如果是其它操作符,会是怎样的?

以操作符timeout为例:它对应ObservableTimeoutTimed和TimeoutObserver

 @Override
    public void onNext(T t) {
      downstream.onNext(t);
      //超时计时
      startTimeout(idx + 1);
    }

    void startTimeout(long nextIndex) {
      //交给操作符默认的线程执行
      task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));
    }

    @Override
    public void onError(Throwable t) {
        downstream.onError(t);
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
      }
    }

    @Override
    public void onTimeout(long idx) {
        downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
    }
//TimeoutTask.java
static final class TimeoutTask implements Runnable {

    @Override
    public void run() {
      parent.onTimeout(idx);
    }
  }

可以看到操作符默认的执行线程只用来做超时计时任务,如果超时了,会在操作符的默认线程执行onError方法..操作符默认线程对下游的observer造成什么影响要做具体对待.

observeOn

observeOn对应ObservableObserveOnObserveOnObserver.

 //ObservableObserveOn.java
 @Override
  protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
      source.subscribe(observer);
    } else {
      Scheduler.Worker w = scheduler.createWorker();
      source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
  }
 //ObserveOnObserver.java
  @Override
    public void onSubscribe(Disposable d) {
      if (DisposableHelper.validate(this.upstream, d)) {
        if (d instanceof QueueDisposable) {
          if (m == QueueDisposable.SYNC) {
          //执行下游Observer的onSubscribe方法
            downstream.onSubscribe(this);
            schedule();
            return;
          }
          if (m == QueueDisposable.ASYNC) {
           //执行下游Observer的onSubscribe方法
            downstream.onSubscribe(this);
            return;
          }
        }
         //执行下游Observer的onSubscribe方法
        downstream.onSubscribe(this);
      }
    }
    @Override
    public void onNext(T t) {
     //省略
      schedule();
    }
    @Override
    public void onError(Throwable t) {
     //省略
      schedule();
    }
     void schedule() {
      if (getAndIncrement() == 0) {
      /*
      ObserveOnObserver是Runnable的实现类.交给线程池执行
      */
        worker.schedule(this);
      }
    }

    void drainNormal() {
      final Observer<? super T> a = downstream;
      for (;;) {
        for (;;) {
          T v;
          try {
            v = q.poll();
          } catch (Throwable ex) {
            a.onError(ex);
            return;
          }
          //执行下游Observer的onNext方法
          a.onNext(v);
        }
      }
    }

    void drainFused() {
      for (;;) {
        if (!delayError && d && ex != null) {
          //执行下游Observer的onError方法
          downstream.onError(error);
          return;
        }
        downstream.onNext(null);
        if (d) {
          ex = error;
          if (ex != null) {
            //执行下游Observer的onError方法
            downstream.onError(ex);
          } else {
            //执行下游Observer的onComplete方法
            downstream.onComplete();
          }
          return;
        }
      }
    }
    //执行线程任务
    @Override
    public void run() {
      if (outputFused) {
        drainFused();
      } else {
        drainNormal();
      }
    }

从上面可以看出ObservableObserveOn在其subscribeActual方法中并没有切换上游Observable的subscribe方法的执行线程.但是ObserveOnObserver在其onNext,onError和onComplete中通过schedule()方法将下游Observer的各个方法切换到了新的线程.

得出结论: observeOn负责切换的是下游Observer的各个方法的执行线程

如果下游多次通过observeOn切换线程,会有什么效果?

每次切换都会对其下游造成影响,直到遇到下一个observeOn为止.

Observer(onSubscribe,onNext,onError,onComplete)

onNext,onError,onComplete与上游最近的observeOn所切换的线程保持一致.onSubscribe则不同.
遇到线程切换的时候,会首先在对应的Observable的subscribeActual方法内,先调用observer.onSubscribe方法.而observer.onSubscribe会逐级向上传递直到最上游,而最上游的observer.onSubscribe是在subscribeActual方法内调用,这是在主线程执行的.所以onSubscribe方法无论如何都是在主线程执行.

doOnSubscribe

.doOnSubscribe(new Consumer<Disposable>() {
          @Override
          public void accept(Disposable disposable) throws Exception {

          }
        })

我们要看的是方法accept的执行线程.

通过源码找到对应的DisposableLambdaObserver.

 @Override
  public void onSubscribe(Disposable d) {
  //在这里调用了accept方法.
      onSubscribe.accept(d);
  }

这就要看上游在哪个线程执行了Observer.onSubscribe(disposable)方法.

在创建型操作符的subscribeActual方法和subscribeOn对应的Observable的subscribeActual方法内调用了Observer.onSubscribe(disposable)方法.那么这两处的执行线程就决定了onSubscribe.accept(d);的执行线程.

doFinally

对应ObservableDoFinally和DoFinallyObserver

 //DoFinallyObserver.java
 @Override
    public void onError(Throwable t) {
      runFinally();
    }

    @Override
    public void onComplete() {
      runFinally();
    }

    @Override
    public void dispose() {
      runFinally();
    }

     void runFinally() {
       onFinally.run();
    }

可以看到与它所对应的DoFinallyObserver的onError,onComplete,dispose方法的执行线程有关,这三个方法的执行线程又受到上游的observeOn的影响.如果没有observeOn,则会受到最上游的observable.subscribeActual方法影响.

doOnError

对应ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java
 @Override
    public void onError(Throwable t) {
        onError.accept(t);
    }

和自身对应的observer.onError所在线程保持一致.

doOnNext

对应ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java
 @Override
    public void onNext(T t) {
        onNext.accept(t);
    }

和自身对应的observer.onNext所在线程保持一致.

操作符对应方法参数的执行线程

包io.reactivex.functions下的接口类一般用于处理上游数据然后往下传递.这些接口类的方法一般在对应的observer.onNext中调用.所以他们的线程保持一致.

总结:

subscribeOn由下往上逐级切换Observable.subscribe的执行线程,不受observeOn影响,也不受具有默认指定线程的非创建型操作符影响,但是会被更上游的subscribeOn夺取线程切换的权利,直到最上游.如果最上游的创建型操作符也有默认执行线程,那么任何一个subscribeOn的线程切换不起作用.subscribeOn由下向上到达最上游后,然后由上往下影响下游的observer的执行线程.遇到observeOn会被夺取线程切换的权利.observeOn影响的是下游的observer的执行线程,由上往下,遇到另一个observeOn会移交线程控制权力,遇到指定默认线程非创建型的操作符,要视具体情况对待.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • RxJava2配置及使用详解

    RxJava2.0是一个非常棒的流式编程,采用的观察者模式思想,事件的产生者产生事间之后发送给绑定的接受者,接受顺序与发送顺序一致. 依赖: compile 'io.reactivex.rxjava2:rxjava:2.0.1' compile 'io.reactivex.rxjava2:rxandroid:2.0.1' 简单使用: //观察者模式,这里产生事件,事件产生后发送给接受者,但是一定要记得将事件的产生者和接收者捆绑在一起,否则会出现错误 Observable.create(new O

  • RxJava2和Retrofit2封装教程(整洁、简单、实用)

    前言 RxJava2与Retrofit2是老搭档了,之前写了一篇<RxJava和Retrofit2的统一处理单个请求>,是用的Rxjava1.0,本次使用Rxjava2.0与Retrofit2进行封装,一样整洁.简单.实用.Rxjava2相比Rxjava1优化和改动不少了东西,网上有很多大神写的文章,这里就不粘贴复制了.封装的过程有什么问题.疑问,请在下方留言. 下面话不多说了,来一起看看详细的介绍吧 封装教程如下: 核心网络请求: package com.lin.netrequestdemo

  • Rxjava2_Flowable_Sqlite_Android数据库访问实例

    一.使用Rxjava访问数据库的优点: 1.随意的线程控制,数据库操作在一个线程,返回数据处理在ui线程 2.随时订阅和取消订阅,而不必再使用回调函数 3.对读取的数据用rxjava进行过滤,流式处理 4.使用sqlbrite可以原生返回rxjava的格式,同时是响应式数据库框架 (有数据添加和更新时自动调用之前订阅了的读取函数,达到有数据添加自动更新ui的效果, 同时这个特性没有禁止的方法,只能通过取消订阅停止这个功能,对于有的框架这反而是一种累赘) 二.接下来之关注实现过程: 本次实现用rx

  • RxJava2.x+ReTrofit2.x多线程下载文件的示例代码

    写在前面: 接到公司需求:要做一个apk升级的功能,原理其实很简单,百度也一大堆例子,可大部分都是用框架,要么就是HttpURLConnection,实在是不想这么干.正好看了两天的RxJava2.x+ReTrofit2.x,据说这俩框架是目前最火的异步请求框架了.固本文使用RxJava2.x+ReTrofit2.x实现多线程下载文件的功能. 如果对RxJava2.x+ReTrofit2.x不太了解的请先去看相关的文档. 大神至此请无视. 思路分析: 思路及其简洁明了,主要分为以下四步 1.获取

  • 基于Retrofit2+RxJava2实现Android App自动更新

    本文实例为大家分享了Retrofit2 RxJava2实现Android App自动更新,具体内容如下 功能解析 自动更新可以说已经是App的标配了,很多第三方平台也都支持这个功能,最近手头上的项目需要加入这个App自动更新,考虑到项目里有用到Retrofit2和RxJava2,于是打算使用它俩自己实现这个功能. 分析App自动更新,可以分为以下三个功能点: 1.APK文件的下载 2.下载进度的实时更新显示 3.下载完成后的自动安装 其中比较难的一点是下载进度的实时更新显示,更难的是如何优雅的进

  • RxJava2 线程调度的方法

    subscribeOn和observeOn负责线程切换,同时某些操作符也默认指定了线程. 我们这里不分析在线程中怎么执行的.只看如何切换到某个指定线程. subscribeOn Observable.subscribeOn()在方法内部生成了一个ObservableSubscribeOn对象. 主要看一下ObservableSubscribeOn的subscribeActual方法. @Override public void subscribeActual(final Observer<? s

  • iOS NSThread和NSOperation的基本使用详解

    NSThread适合简单的耗时任务的执行,它有两种执行方法 - (void)oneClick{ [NSThread detachNewThreadSelector:@selector(doSomething:) toTarget:self withObject:@"oneClick"]; } -(void)doSomething:(NSString*) str{ NSLog(@"%@",str); } - (void)twoClick{ NSThread* myTh

  • C# 中如何使用Thread

    线程是进程中的最小执行单元,多线程是指在给定时间内拥有多个线程的能力,并且可以调度它们从而在某一时刻处理多个操作,微软的 .Net Framework 提供了 Thread 来帮助我们完成多线程开发. Thread 编程 要想使用 Thread,需要在程序中引用 System.Threading 命名空间,然后再提供一个供线程调度的方法,这个方法是通过 Thread 中的 ThreadStart 委托代理的,下面的代码展示了如何创建线程. Thread t = new Thread(new Th

  • 基于RxJava2实现的简单图片爬虫的方法

    今年十月份以来,跟朋友尝试导入一些图片到tensorflow来生成模型,这就需要大量的图片.刚开始我只写了一个简单的HttpClient程序来抓取图片,后来为了通用性索性写一个简单的图片爬虫程序.它可以用于抓取单张图片.多张图片.某个网页下的所有图片.多个网页下的所有图片. github地址:https://github.com/fengzhizi715/PicCrawler 这个爬虫使用了HttpClient.RxJava2以及Java 8的一些特性.它支持一些简单的定制,比如定制User-A

  • 谈谈RxJava2中的异常及处理方法

    前言 众所周知,RxJava2 中当链式调用中抛出异常时,如果没有对应的 Consumer 去处理异常,则这个异常会被抛出到虚拟机中去,Android 上的直接表现就是 crash,程序崩溃. 订阅方式 说异常处理前咱们先来看一下 RxJava2 中 Observable 订阅方法 subscribe() 我们常用的几种订阅方式: // 1 subscribe() // 2 Disposable subscribe(Consumer<? super T> onNext) // 3 Dispos

  • Rxjava功能操作符的使用方法详解

    Rxjava功能个人感觉很好用,里面的一些操作符很方便,Rxjava有:被观察者,观察者,订阅者, 被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据 下面把Rxjava常用的模型代码列出来,还有一些操作符的运用: 依赖: compile 'io.reactivex.rxjava2:rxandroid:2.0.1' // Because RxAndroid releases are few and far between, it is recommended you also /

  • java 多线程的几种实现方法总结

    java 多线程的几种实现方法总结 1.多线程有几种实现方法?同步有几种实现方法? 多线程有两种实现方法,分别是继承Thread类与实现Runnable接口 同步的实现方面有两种,分别是synchronized,wait与notify wait():使一个线程处于等待状态,并且释放所持有的对象的lock. sleep():使一个正在运行的线程处于睡眠状态,是一个静态方法,调用此方法要捕捉InterruptedException异常. notify():唤醒一个处于等待状态的线程,注意的是在调用此

  • Android 用RxBinding与RxJava2实现短信验证码倒计时功能

    场景:注册账号页面时,我们点击按钮发送验证码,在等待验证码时,界面会有倒计时提示,这此期间按钮不可点击.当倒计时结束时,按钮恢复. 实现与功能都不难,这次用 RxBinding,RxJava2 的方法去实现.并实现了手动.自动停止倒计时,防止多次点击. 功能动态图 要使用 RxBinding.RxJava2 先添加 Gradle 配置: compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile 'io.reactivex.rxjava2:rxj

随机推荐