RxJava中多种场景的实现总结

一、推迟执行动作

可以使用timer+map方法实现.代码如下:

Observable.timer(5, TimeUnit.MILLISECONDS).map(value->{
   return doSomething();
  }).subscribe(System.out::println);
 }

二、推迟发送执行的结果

这种场景要求产生数据的动作是马上执行,但是结果推迟发送.这和上面场景的是不一样的.

这种场景可以使用Observable.zip来实现.

zip操作符将多个Observable发射的数据按顺序组合起来,每个数据只能组合一次,而且都是有序的。最终组合的数据的数量由发射数据最少的Observable来决定。

对于各个observable相同位置的数据,需要相互等待,也就说,第一个observable第一个位置的数据产生后,要等待第二个observable第一个位置的数据产生,等各个Observable相同位置的数据都产生后,才能按指定规则进行组合.这真是我们要利用的.

zip有很多种声明,但大致上是一样的,就是传入几个observable,然后指定一个规则,对每个observable对应位置的数据进行处理,产生一个新的数据, 下面是其中一个最简单的:

 public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);

用zip实现推送发送执行结果如下:

 Observable.zip(Observable.timer(5,TimeUnit.MILLISECONDS)
         ,Observable.just(doSomething()), (x,y)->y)
   .subscribe(System.out::println));

三、使用defer在指定线程里执行某种动作

如下面的代码,虽然我们指定了线程的运行方式,但是doSomething()这个函数还是在当前代码调用的线程中执行的.

 Observable.just(doSomething())
     .subscribeOn(Schedulers.io())
     .observeOn(Schedulers.computation())
     .subscribe(v->Utils.printlnWithThread(v.toString()););

通常我们采用下面的方法达到目的:

 Observable.create(s->{s.onNext(doSomething());})
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .subscribe(v->{
     Utils.printlnWithThread(v.toString());
  });

但其实我们采用defer也能达到相同的目的.

关于defer

defer 操作符与create、just、from等操作符一样,是创建类操作符,不过所有与该操作符相关的数据都是在订阅是才生效的。

声明:

 public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);

defer的Func0里的Observable是在订阅(subscribe)的时候才创建的.

作用:

Do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription.

也就说observable是在订阅的时候才创建的.

上面的问题用defer实现:

 Observable.defer(()->Observable.just(doSomething()))
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .subscribe(v->{Utils.printlnWithThread(v.toString());
  });

四、使用compose不要打断链式结构

我们经常看到下面的代码:

 Observable.just(doSomething())
    .subscribeOn(Schedulers.io())
     .observeOn(Schedulers.computation())
    .subscribe(v->{Utils.printlnWithThread(v.toString());

上面的代码中,subscribeOn(xxx).observeOn(xxx)可能在很多地方都是一样的, 如果我们打算把它统一在某一个地方实现, 我们可以这么写:

 private static <T> Observable<T> applySchedulers(Observable<T> observable) {
  return observable.subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation());
 }

但是这样每次我们需要调用上面的方法, 大致会像下面这样,最外面是一个函数,等于打破了链接结构:

 applySchedulers(Observable.from(someSource).map(new Func1<Data, Data>() {
   @Override public Data call(Data data) {
   return manipulate(data);
   }
  })
 ).subscribe(new Action1<Data>() {
  @Override public void call(Data data) {
  doSomething(data);
  }
 });

可以使用compose操作符达到不打破链接结构的目的.

compose的申明如下:

 public Observable compose(Transformer<? super T, ? extends R> transformer);

它的入参是一个Transformer接口,输出是一个Observable. 而Transformer实际上就是一个Func1<Observable<T>, Observable<R>> ,换言之就是:可以通过它将一种类型的Observable转换成另一种类型的Observable.

简单的说,compose可以通过指定的转化方式(输入参数transformer),将原来的observable转化为另外一种Observable.

通过compose, 采用下面方式指定线程方式:

 private static <T> Transformer<T, T> applySchedulers() {
   return new Transformer<T, T>() {
    @Override
    public Observable<T> call(Observable<T> observable) {
     return observable.subscribeOn(Schedulers.io())
       .observeOn(Schedulers.computation());
    }
   };
  }

 Observable.just(doSomething()).compose(applySchedulers())
    .subscribe(v->{Utils.printlnWithThread(v.toString());
   });

函数applySchedulers可以使用lambda表达式进一步简化为下面为:

 private static <T> Transformer<T, T> applySchedulers() {
  return observable->observable.subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation());
 }

五、按优先级使用不同的执行结果

上面这个标题估计没表达清楚我想表达的场景. 其实我想表达的场景类似于平常的获取网络数据场景:如果缓存有,从缓存获取,如果没有,再从网络获取.

这里要求,如果缓存有,不会做从网络获取数据的动作.

这个可以采用concat+first实现.

concat将几个Observable合并成一个Observable,返回最终的一个Observable. 而那些数据就像从一个Observable发出来一样. 参数可以是多个Observable,也可以是包含Observalbe的Iterator.

新的observable内的数据排列按原来concat里的observable顺序排列,即新结果内的数据是按原来的顺序排序的.

下面是上述需求的实现:

 Observable.concat(getDataFromCache(),getDataFromNetwork()).first()
    .subscribe(v->System.out.println("result:"+v));
 //从缓存获取数据
 private static Observable<String> getDataFromCache(){
  return Observable.create(s -> {
   //dosomething to get data
   int value = new Random().nextInt();
   value = value%2;
   if (value!=0){
    s.onNext("data from cache:"+value); //产生数据
   }
   //s.onError(new Throwable("none"));
   s.onCompleted();
  }
    );
 }
 //从网络获取数据
 private static Observable<String> getDataFromNetwork(){
  return Observable.create(s -> {
   for (int i = 0; i < 10; i++) {
    Utils.println("obs2 generate "+i);
    s.onNext("data from network:" + i); //产生数据
   }
   s.onCompleted();
  }
    );
 }

上面的实现,如果getDataFromCache有数据, getDataFromNetwork这里的代码是不会执行的, 这正是我们想要的.

上面实现有几个需要注意:

1、有可能从两个地方都获取不到数据, 这种场景下使用first会抛出异常NoSuchElementException,如果是这样的场景,需要用firstOrDefault替换上面的first.

2、上面getDataFromCache()里,如果没有数据,我们直接调用onCompleted,如果不调用onCompleted,而是调用onError,则上述采用concat是得不到任何结果的.因为concat在收到任何一个error,合并就会停止.所以,如果要用onError, 则需要用concatDelayError替代concat.concatDelayError会先忽略error,将error推迟到最后在处理.

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流。

(0)

相关推荐

  • 简单谈谈RxJava和多线程并发

    前言 相信对于RxJava,大家应该都很熟悉,他最核心的两个字就是异步,诚然,它对异步的处理非常的出色,但是异步绝对不等于并发,更不等于线程安全,如果把这几个概念搞混了,错误的使用RxJava,是会来带非常多的问题的. RxJava与并发 首先让我们来看一段RxJava协议的原文: Observables must issue notifications to observers serially (not in parallel). They may issue these notificat

  • RxJava入门之介绍与基本运用

    前言 因为这个RxJava内容不算少,而且应用场景非常广,所以这个关于RxJava的文章我们会陆续更新,今天就来先来个入门RxJava吧 初识RxJava 什么是Rx 很多教程在讲解RxJava的时候,上来就介绍了什么是RxJava.这里我先说一下什么是Rx,Rx就是ReactiveX,官方定义是: Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序 看到这个定义我只能呵呵,稍微通俗点说是这样的: Rx是微软.NET的一个响应式扩展.Rx借助可观测的序

  • Java的RxJava库操作符的用法及实例讲解

    操作符就是为了解决对Observable对象的变换的问题,操作符用于在Observable和最终的Subscriber之间修改Observable发出的事件.RxJava提供了很多很有用的操作符. 比如map操作符,就是用来把把一个事件转换为另一个事件的. Observable.just("Hello, world!") .map(new Func1<String, String>() { @Override public String call(String s) { r

  • RxJava入门指南及其在Android开发中的使用示例

    RxJava的GitHub主页,部署部分就没什么好说的了~ https://github.com/ReactiveX/RxJava 基础 RxJava最核心的两个东西是Observables(被观察者,事件源)和Subscribers(观察者).Observables发出一系列事件,Subscribers处理这些事件.这里的事件可以是任何你感兴趣的东西(触摸事件,web接口调用返回的数据...) 一个Observable可以发出零个或者多个事件,知道结束或者出错.每发出一个事件,就会调用它的Su

  • RxJava中多种场景的实现总结

    一.推迟执行动作 可以使用timer+map方法实现.代码如下: Observable.timer(5, TimeUnit.MILLISECONDS).map(value->{ return doSomething(); }).subscribe(System.out::println); } 二.推迟发送执行的结果 这种场景要求产生数据的动作是马上执行,但是结果推迟发送.这和上面场景的是不一样的. 这种场景可以使用Observable.zip来实现. zip操作符将多个Observable发射

  • SpringBoot多种场景传参模式

    目录 web技术 多种传参方式 传统参数传递 复杂对象映射 数组和集合类型参数 Restful风格传参 web技术 我们知道常见的web技术也就是网站开发,分为静态网站,和动态网站,动态网站技术常见的有三种,分别是 jsp java web,asp c# web,php web但是它们对应请求request,响应response 都是一样的我们用java web开发动态网站用的mvc框架就是,springmvc,当然我们现在用的是springboot 它只是对spirng全家桶的一个整合框架,他

  • Vue实现组件间通信的几种方式(多种场景)

    目录 1.Props 父 >>> 子  (Props) 子 >>> 父 ($emit) 2.Bus事件总线 3.Vuex状态管理库 4.Router 5.缓存 以下是我在开发中用到过的vue组件之间的通信方式,不同的场景使用不同的方式,基本满足所有开发场景中的通信需求,从最简单的事例着手,讲述如何使用,话不多说直接开始,满满的干货,建议看完. 1.Props 父 >>> 子  (Props) 一个组件里面引入另外一个组件,此时构成了一种"父子

  • 浅谈spring使用策略模式实现多种场景登录方式

     @Autowired注解可以帮我们自动注入我们想要的 Bean. 如果只是简单使用@Autowired会遇到spring IOC容器中一个接口有多个实现的情况,spring无法识别具体的实现类,如果不是策略模式,我们可以进行具体的指定@Qualifier和@primary来避免bean冲突的情况.但在策略模式中是不行的. 而除了这个基本功能之外, @Autowired 还有更加强大的功能,还可以注入指定类型的数组,List/Set 集合,甚至还可以是 Map 对象. 为每个具体的实现类添加了一

  • JS中多种方式创建对象详解

    1.内置对象创建 var girl=new Object(); girl.name='hxl'; console.log(typeof girl); 2.工厂模式,寄生构造函数模式 function Person(name){ var p=new Object();//内部进行实例化 p.name=name; p.say=function(){ console.log('my name is '+ p.name); } return p;//注:一定要返回 } var girl=Person('

  • 详解ListView中多种item的实现方式

    大家都知道在实际开发时,对ListView的使用比较频繁,其表现也非常复杂.本文将通过实例介绍ListView中多种item的实现方式,下面来一起看看吧. 使用ListView一般步骤: 设置显示的ListView,设置显示的每一项item的view布局文件 设置每个item显示的数据 将数据显示的View中,继承BaseAdapter,重写getCount() , getItemId() , getItem() , getView()这个四个方法: 如果实现ListView的多种类型item的

  • 使用RxJava中遇到的一些”坑“

    前言 大家越用RxJava,越觉得它好用,所以不知不觉地发现代码里到处都是RxJava的身影.然而,RxJava也不是银弹,其中仍然有很多问题需要解决.这里,我简单地总结一下自己遇到的一些"坑",内容上可能会比较松散. 一.考虑主线程的切换 RxJava中一个常用的使用方法是--在其他线程中做处理,然后切换到UI线程中去更新页面.其中,线程切换就是使用了observeOn().后台下载文件,前台显示下载进度就可以使用这种方式完成.然而,实践发现这其中有坑.如果文件比较大,而下载包的粒度

  • 关于Yii中模型场景的一些简单介绍

    前言 在Yii中模型字段验证有一个场景的概念,可以在不同的场景下设置不同的验证规则,在Yii中的场景默认为default,简单实现如下 下面我以用户表,表中字段为user_name,password 简单规则如下 public function rules() { return [ [['user_name', 'password'], 'required'], [['user_name', 'password'], 'string', 'max' => 255], ]; } 一: 如果我们需要

  • Java各种锁在工作中使用场景和细节经验总结

    目录 1.synchronized 1.1.共享资源初始化 2.CountDownLatch 2.1.场景 2.2.实现 3.总结 1.synchronized synchronized 是可重入的排它锁,和 ReentrantLock 锁功能相似,任何使用 synchronized 的地方,几乎都可以使用 ReentrantLock 来代替,两者最大的相似点就是:可重入 + 排它锁,两者的区别主要有这些: ReentrantLock 的功能更加丰富,比如提供了 Condition,可以打断的加

  • Redis中秒杀场景下超时与超卖问题的解决方案

    目录 超时 1.redis连接超时原因 2.解决方法 超卖 1.秒杀超卖现象 2.解决方案 (1)利用乐观锁淘汰用户,解决超卖问题 (2).使用reids的 watch + multi + setnx 指令实现 在开发过程中高并发问题是很棘手的一个问题(对于博主这样的小菜鸡来说),当我们学习redis之前,知道redis是单线程运行的所以任务不会出现线程不安全问题.当我们在linux中使用ab来模拟高并发秒杀时可能会遇到两种问题,“超时和超卖”. 超时 1.redis连接超时原因 (1)虚拟机中

随机推荐