RxJava中map和flatMap的用法区别源码解析

目录
  • 前言:
  • 作用
  • 使用方法:
    • map
    • flatMap
  • 源码分析
    • map
    • flatMap
  • 结语

前言:

RxJava中提供了大量的操作符,这大大提高了了我们的开发效率。其中最基本的两个变换操作符就是mapflatMap。而其他变换操作符的原理基本与map类似。

  • map和flatMap都是接受一个函数作为参数(Func1)并返回一个被观察者Observable
  • Func1的< I,O >I,O模版分别为输入和输出值的类型,实现Func1的call方法对I类型进行处理后返回O类型数据,只是flatMap中执行的方法的返回类型为Observable类型

作用

map对Observable发射的每一项数据应用一个函数,执行变换操作。对原始的Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。

flatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射

使用方法:

通过代码来看一下两者的使用用方法:

map

Observable.just(new User("白瑞德"))
                  .map(new Function<User, String>() {
                      @Override
                      public String apply(User user) throws Throwable {
                          return user.getName();
                      }
                  })
                  .subscribe(new Consumer<String>() {
                      @Override
                      public void accept(String s) throws Throwable {
                          System.out.println(s);
                      }
                  });
<<<白瑞德

这段代码接受一个User对象,最后打印出User中的name。

flatMap

假设存在一个需求,图书馆要打印每个User借走每一本书的名字: User结构如下:

class User {
    private String       name;
    private List<String> book;
}

我们来看一下map的实现方法:

Observable.fromIterable(userList)
                  .map(new Function<User, List<String>>() {
                      @Override
                      public List<String> apply(User user) throws Throwable {
                          return user.getBook();
                      }
                  })
                  .subscribe(new Consumer<List<String>>() {
                      @Override
                      public void accept(List<String> strings) throws Throwable {
                          for (String s : strings) {
                              System.out.println(s);
                          }
                      }
                  });

可以看到,map的转换总是一对一,只能单一转换。我们不得不借助循环进行打印。 下面我们来看一下flatMap的实现方式:

Observable.fromIterable(userList)
                  .flatMap(new Function<User, ObservableSource<String>>() {
                      @Override
                      public ObservableSource<String> apply(User user) throws Throwable {
                          return Observable.fromIterable(user.getBook());
                      }
                  })
                  .subscribe(new Consumer<String>() {
                      @Override
                      public void accept(String  o) throws Throwable {
                          System.out.println(o);
                      }
                  });

flatmap既可以单一转换也可以一对多/多对多转换。flatMap使用一个指定的函数对原始Observable发射的每一项数据之行相应的变换操作,这个函数返回一个本身也发射数据的Observable,因此可以再内部再次进行事件的分发。然后flatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

源码分析

下面我们就结合源码来分析一下这两个操作符。为了降低代码阅读难道,这里只保留核心代码:

map

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    //接受一个Function实例,并返回一个ObservableMap
    return new ObservableMap<T, R>(this, mapper);
}
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        //调用用父类构造方法,初始化父类中的downstream
        super(source);
        this.function = function;
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
        @Override
        public void onNext(T t) {
            v = mapper.apply(t);
            downstream.onNext(v);
        }
    }
}

这段代码是去掉map源码中一些校验和其它相关回调后的精简代码。接下来分析一下代码流程:

  • 当在调用map时,map接受一个匿名内部类Function的实例,并返回一个ObservableMap对象。
  • ObservableMap本质上是一个Observable,也是一个被观察者,其构造方法接受最外层的那个被Observable实例,和Function实例。ObservableMap重写了subscribeActual方法,在subscribeActual中使用新建了一个MapObserver实现了对原始Observable的观察。
  • 原始的Observable中的数据变会被发送到MapObserver的实例中。
  • MapObserver构造方法接收原始Observable的观察者actual,和Function的实例mapper
  • MapObserver在其onNext方法中调用mapperapply方法,获得该方法的返回值v apply方法就是map实例中: public String apply(User user) throws Throwable { return user.getName(); }
  • 调用downstream的onNext方法,并传入实参v。其中downstreamMapObserver父类中定义的变量,在MapObserver构造方法中super(actual);时初始化,其本身就是传入的actual,本质上就是最原始的Observable

整个流程可以概括如下: 存在一个原始的ObservableA和一个观察者ObserverA,当原始的被观察者ObservableA调用map,并传入一个匿名内部类实例化的’function‘,map新建并返回了一个被观察者ObservableB,通过subscribe让观察者ObserverA对其进行订阅。并重写subscribeActual方法,在其被订阅时创建一个新的观察者ObserverB其接受的,并用ObserverB对原始的ObservableA进行订阅观察。当原始的ObservableA发出事件,调用ObserverBonNext方法,subscribeActual接受的观察者便是最原始的观察者ObserverAObserverB变执行通过匿名内部类实例化的’function‘的apply方法得到数据v,紧接着调用原始的ObservableAonNext方法,并传入实参vObserverA观察到事件。 一句话概括:一个原始的被观察者和观察者,但是让原始的观察者去订阅一个新的观察者,当新的被观察者被订阅的时候,创建一个新的观察者去订阅原始的被观察者,并在监听的事件之后执行指定的操作后再通知原始观察者。所以这里面一共涉及到两对观察者和被观察者,map方法会创建一对新的观察者和被观察者作为原始观察者和被观察者通讯的纽带,并在其中做一些数据变换。

用图片显示流程如下:

蓝色框内就是map创建的观察者和被观察者。实际上我们的原始ObserverA并没有对ObservableA进行订阅。

flatMap

faltMapmap的基本原理类似,其代码如下:

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize);
}
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final boolean delayErrors;
    final int maxConcurrency;
    final int bufferSize;
    public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
    static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
        MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
                boolean delayErrors, int maxConcurrency, int bufferSize) {
            ...
            this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
        }
        @Override
        public void onSubscribe(Disposable d) {
            downstream.onSubscribe(this);
        }
        @Override
        public void onNext(T t) {
            ObservableSource<? extends U> p;
            p = mapper.apply(t);
            subscribeInner(p);
        }
        @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
                InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                p.subscribe(inner);
        }
        void drain() {
            drainLoop();
        }
        void drainLoop() {
            final Observer<? super U> child = this.downstream;
            child.onNext(o);
        }
    }
    static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {
        private static final long serialVersionUID = -4606175640614850599L;
        final long id;
        final MergeObserver<T, U> parent;
        volatile boolean done;
        volatile SimpleQueue<U> queue;
        int fusionMode;
        InnerObserver(MergeObserver<T, U> parent, long id) {
            this.id = id;
            this.parent = parent;
        }
        @Override
        public void onNext(U t) {
            parent.drain();
        }
    }
}

上述代码即是faltMap精简后的源码,其中大部分代码的运作流程和前文中的map源码一致,我们继续延续上文中讲解中的观察者和被观察者。重点关注其不同的地方: faltMap返回一个新的被观察者ObservableB,重写ObservableBsubscribeActual方法在原始的观察者ObserverA对其进行订阅时新建一个观察者ObserverB对原始的ObservableA进行订阅。新的观察者ObserverB持有原始的ObserverAfaltMap接收的匿名对象实例function。当ObserverB监听到原始的被观察者ObservableA的事件时,ObserverB调用functionapply方法获得新新的被观察者ObservableC,再创建一个新的观察者ObserverCObservableC进行订阅,ObserverC持有原始的观察者ObserverA,在ObserverC观察到被观察者ObservableC的时间时,调用原始的观察者ObserverA的方法。

概括就是:faltMap方法要求调用者提供一个Observable,最原始的Observable在调用faltMap后,faltMap会创建一个新的Observable,并对原始的进行订阅。当拿到订阅后,会通过flatMap接收的函数拿到调用者传入的Observable,并用最原始的观察者对它进行订阅。这期间涉及三对观察者和被观察者,flatMap会创建一对,同时也接收一对用户创建的。flatMap创建的和Map中的作用一样,不过flatMap连接的是原始的和用户通过flatMap提供的两对观察者和被观察者。而原始的观察者最终是对用户通过flatMap提供的那个观察者进行订阅。

用图片显示流程如下:

和Map的流程很相似,只不过是需要用户再提供一对观察者和被观察者。最终实现对用户提供的被观察者进行订阅。

结语

至此,map和flatMap已基本分析完毕,其中map的代码比较简单易懂,flatMap中还涉及到大量辅助操作,文中并未涉及到其中的合并等操作,阅读起来有些困难。如果仅仅是为了了解二者的原理,可以阅读Single<T>中的代码。其中的代码量远远少于Observable中的代码量。如果对RxJava基本的模式还不了解,可以阅读 手写极简版的Rxjava

以上就是RxJava中map和flatMap的用法区别源码解析的详细内容,更多关于RxJava map flatMap区别的资料请关注我们其它相关文章!

(0)

相关推荐

  • Rxjava+Retrofit+Okhttp进行网络访问及数据解析

    目录 1,创建Android项目(Android studio)导入相关依赖 2,定义接口类 3,发出请求,回调信息 4,Rxjava 和 Retrofit的结合 前言: 在平时项目开发中Okhttp3.x.Rxjava2.x.Retrofit2.x,使用的越来越多了,需要我们不断的去学习别人的优秀开发设计程序,今天简单的了解下 1,创建Android项目(Android studio)导入相关依赖 implementation 'com.squareup.okhttp3:okhttp:3.11

  • Android RxJava异步数据处理库使用详解

    目录 观察者模式 操作符 创建Observable 转换Observable 过滤Observable 组合Observable 错误处理 Schedulers调度器-解决多线程问题 管理RxJava的生命周期 RxJava与Retrofit完成网络请求 观察者模式 四大要素:Observable(被观察者),Observer (观察者),subscribe (订阅),事件. 观察者订阅被观察者,一旦被观察者发出事件,观察者就可以接收到. 扩展的观察者模式 当事件完成时会回调onComplete

  • Kotlin下Rxjava的基础用法及流式调用示例详解

    目录 前言 基础用法 fromXXX create interval & timer 指定线程 observeOn & subscribeOn Flowable 流式调用 背压 前言 万事开头难,写文章也是,现在越来越不知道开头怎么写了,所以在前言中,简单介绍下RxJava吧,第一次听说还是以前做Android开发的时候,那时候好多库中都使用了Rxjava,而在网络请求中,也有很多都是使用Rxjava去写,但自己却没怎么在项目中写过,而在搜索资料中发现,微信中搜rxjava时,最多介绍他的

  • Kotlin + Retrofit + RxJava简单封装使用详解

    本文介绍了Kotlin + Retrofit + RxJava简单封装使用详解,分享给大家,具体如下: 实例化Retrofit object RetrofitUtil { val CONNECT_TIME_OUT = 30//连接超时时长x秒 val READ_TIME_OUT = 30//读数据超时时长x秒 val WRITE_TIME_OUT = 30//写数据接超时时长x秒 val retrofit: Retrofit by lazy { Log.d("RetrofitUtil"

  • Kotlin结合Rxjava+Retrofit实现极简网络请求的方法

    前言 因为最近正在写的项目集成了两个网络请求框架(Volley and Retrofit)对比之下也是选择了Retrofit.既然选择那自然要让自己以后开发更加省力(就是懒).于是我在Retrofit中加入了Rxjava,这也是当下蛮流行的一个请求框架.然后又利用了Kotlin的一些新特性,使网络请求变得特别简单,代码量特别少. Kotlin镇楼 RxJava RxJava学习是一个曲折漫长的过程,但一旦掌握,妙用无穷. 通过这里了解更多:http://www.jb51.net/article/

  • Android使用Kotlin和RxJava 2.×实现短信验证码倒计时效果

    本文介绍了Android使用Kotlin和RxJava 2.×实现短信验证码倒计时效果,分享给大家,具体如下: 场景:注册账号页面时,我们点击按钮发送验证码,在等待验证码时,界面会有倒计时提示,这此期间按钮不可点击.当倒计时结束时,按钮恢复. 实现代码 val timer:TextView = findViewById(R.id.textView) //这里的 timer 就是你要控制显示倒计时效果的 TextView val mSubscription: Subscription? = nul

  • RxJava中map和flatMap的用法区别源码解析

    目录 前言: 作用 使用方法: map flatMap 源码分析 map flatMap 结语 前言: RxJava中提供了大量的操作符,这大大提高了了我们的开发效率.其中最基本的两个变换操作符就是map和flatMap.而其他变换操作符的原理基本与map类似. map和flatMap都是接受一个函数作为参数(Func1)并返回一个被观察者Observable Func1的< I,O >I,O模版分别为输入和输出值的类型,实现Func1的call方法对I类型进行处理后返回O类型数据,只是fla

  • Python中getpass模块无回显输入源码解析

    本文主要讨论了python中getpass模块的相关内容,具体如下. getpass模块 昨天跟学弟吹牛b安利Python标准库官方文档的时候偶然发现了这个模块.仔细一看内容挺少的,只有两个主要api,就花了点时间阅读了一下源码,感觉挺实用的,在这安利给大家. getpass.getpass(prompt='Password: ', stream=None) 调用该函数可以在命令行窗口里面无回显输入密码.参数prompt代表提示字符串,默认是'Password: '.在Unix系统中,strea

  • jQuery中数据缓存$.data的用法及源码完全解析

    一.实现原理: 对于DOM元素,通过分配一个唯一的关联id把DOM元素和该DOM元素的数据缓存对象关联起来,关联id被附加到以jQuery.expando的值命名的属性上,数据存储在全局缓存对象jQuery.cache中.在读取.设置.移除数据时,将通过关联id从全局缓存对象jQuery.cache中找到关联的数据缓存对象,然后在数据缓存对象上执行读取.设置.移除操作. 对于Javascript对象,数据则直接存储在该Javascript对象的属性jQuery.expando上.在读取.设置.移

  • C++11中的智能指针shared_ptr、weak_ptr源码解析

    目录 1.前言 2.源码准备 3.智能指针概念 4.源码解析 4.1.shared_ptr解析 4.1.1.shared_ptr 4.1.2.__shared_ptr 4.1.3.__shared_count 4.1.4._Sp_counted_base 4.1.5._Sp_counted_ptr 4.1.6.shared_ptr总结 4.2.weak_ptr解析 4.2.1.weak_ptr 4.2.2.__weak_ptr 4.2.3.__weak_count 4.2.4.回过头看weak_

  • golang中cache组件的使用及groupcache源码解析

    groupcache 简介 在软件系统中使用缓存,可以降低系统响应时间,提高用户体验,降低某些系统模块的压力. groupcache是一款开源的缓存组件.与memcache与redis不同的时,groupcache不需要单独的部署,可以作为你程序的一个库来使用. 这样方便我们开发的程序部署. 本篇主要解析groupcache源码中的关键部分, lru的定义以及如何做到同一个key只加载一次. 缓存填充以及加载抑制的实现 上篇有提到load函数的实现, 缓存填充的逻辑也体现在这里. groupca

  • 浅谈Pandas中map, applymap and apply的区别

    1.apply() 当想让方程作用在一维的向量上时,可以使用apply来完成,如下所示 In [116]: frame = DataFrame(np.random.randn(4, 3), columns=list('bde'), index=['Utah', 'Ohio', 'Texas', 'Oregon']) In [117]: frame Out[117]: b d e Utah -0.029638 1.081563 1.280300 Ohio 0.647747 0.831136 -1.

  • 关于Java8中map()和flatMap()的一些事

    两个方法的背景 这两个方法看起来做着同样的事情,但实际上又有些不一样.看源码部分是这样的 package java.util.stream; map()方法 /** * @param <R> The element type of the new stream * @param mapper a <a href="package-summary.html#NonInterference" rel="external nofollow" rel=&q

  • 数据库SQL中having和where的用法区别

    "Where" 是一个约束声明,使用Where来约束来之数据库的数据,Where是在结果返回之前起作用的,且Where中不能使用聚合函数. "Having"是一个过滤声明,是在查询返回结果集以后对查询结果进行的过滤操作,在Having中可以使用聚合函数. 聚合函数,SQL基本函数,聚合函数对一组值执行计算,并返回单个值.除了 COUNT 以外,聚合函数都会忽略空值. 聚合函数经常与 SELECT 语句的 GROUP BY 子句一起使用. Transact-SQL编程

  • jquery中map函数遍历数组用法实例

    本文实例讲述了jquery中map函数遍历数组用法.分享给大家供大家参考.具体如下: <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <

  • 源码解析JDK 1.8 中的 Map.merge()

    Map 中ConcurrentHashMap是线程安全的,但不是所有操作都是,例如get()之后再put()就不是了,这时使用merge()确保没有更新会丢失. 因为Map.merge()意味着我们可以原子地执行插入或更新操作,它是线程安全的. 一.源码解析 default V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { Objects.requireNonNu

随机推荐