Android RxJava异步数据处理库使用详解

目录
  • 观察者模式
  • 操作符
    • 创建Observable
    • 转换Observable
    • 过滤Observable
    • 组合Observable
    • 错误处理
    • Schedulers调度器-解决多线程问题
  • 管理RxJava的生命周期
    • RxJava与Retrofit完成网络请求

观察者模式

四大要素:Observable(被观察者),Observer (观察者),subscribe (订阅),事件。

观察者订阅被观察者,一旦被观察者发出事件,观察者就可以接收到。

扩展的观察者模式

当事件完成时会回调onComplete(),在完成过程中发生了异常会回调onError(),onError()和onComplete()只会回调一个。

引入依赖

implementation 'io.reactivex.rxjava3:rxjava:3.1.3'
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'

        //创建被观察者
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("Hello Uncle Xing");
                emitter.onComplete();
            }
        });
        //创建观察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.i(tag, "onSubscribe");
            }
            @Override
            public void onNext(@NonNull String s) {
                Log.i(tag, "onNext:" + s);
            }
            @Override
            public void onError(@NonNull Throwable e) {
                Log.i(tag, "onError:" + e.getMessage());
            }
            @Override
            public void onComplete() {
                Log.i(tag, "onComplete");
            }
        };
        //订阅事件
        observable.subscribe(observer);

操作符

创建Observable

create:用于创建Observable

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("Hello Uncle Xing");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.i(tag, "onSubscribe");
            }
            @Override
            public void onNext(@NonNull String s) {
                Log.i(tag, "onNext:" + s);
            }
            @Override
            public void onError(@NonNull Throwable e) {
                Log.i(tag, "onError:" + e.getMessage());
            }
            @Override
            public void onComplete() {
                Log.i(tag, "onComplete");
            }
        });

just:创建一个Observable并自动调用onNext发射数据,just中传递的参数将直接在Observer的onNext方法中接收到

        Observable.just("Uncle Xing").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.i(tag, "onSubscribe");
            }
            @Override
            public void onNext(@NonNull String s) {
                Log.i(tag, "onNext:" + s);
            }
            @Override
            public void onError(@NonNull Throwable e) {
                Log.i(tag, "onError:" + e.getMessage());
            }
            @Override
            public void onComplete() {
                Log.i(tag, "onComplete");
            }
        });

interval:创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。

        Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
            @Override
            public void onNext(@NonNull Long aLong) {
                Log.i(tag, "count:" + aLong); //这里是非主线程,会隔1s打印出0,1,2,3....
            }
            @Override
            public void onError(@NonNull Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

timer:创建一个Observable,它在一个特定延迟后发射一个值

        Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onNext(Long aLong) {
                Log.i(tag, "count:" + aLong);
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

转换Observable

map:对数据进行变换后,可以返回任意值,对数据的变换是1对1进行的。

        Observable.just(666).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Throwable {
                return integer.toString();
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {
                Log.i(tag, "map:" + s);
            }
        });

flatMap:对数据变换后,返回ObservableSource对象,可以对数据进行一对多,多对多的变换。

        Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Throwable {
                return Observable.just(integer.toString());
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {
                Log.i(tag, "accept:" + s);
            }
        });

buffer:把Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值

        Observable.just(1, 2, 3, 4, 5, 6).buffer(3).subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Throwable {
                Log.i(tag, integers.toString());
            }
        });

Log会分两次打印,第一次打印 [1, 2, 3],第二次打印 [4, 5, 6]

过滤Observable

distinct:去掉重复数据

        Observable.just(1, 2, 3, 4, 2, 3).distinct().subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
            @Override
            public void onNext(@NonNull Integer integer) {
                Log.i(tag, "distinct:" + integer);
            }
            @Override
            public void onError(@NonNull Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

elementAt:取出指定位置的数据

        Observable.just(1, 2, 3, 4).elementAt(1).subscribe(new MaybeObserver<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
            @Override
            public void onSuccess(@NonNull Integer integer) {
                Log.i(tag, "onSuccess:" + integer);
            }
            @Override
            public void onError(@NonNull Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

filter:对数据进行指定规则的过滤

        Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Throwable {
                return integer > 1;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.i(tag, "filter:" + integer);
            }
        });

组合Observable

zip:通过一个函数将多个Observable的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项

        Observable<Integer> observable = Observable.just(10, 20, 30, 40);
        Observable<Integer> observable2 = Observable.just(1, 2, 3);
        Observable.zip(observable, observable2, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Throwable {
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.i(tag, "zip:" + integer);
            }
        });

注意:当其中一个Observable发送数据结束或异常,另外一个也停止发送,所以这里只会打印出11,22,33

merge:合并多个Observable的发射物

        Observable<Integer> observable = Observable.just(10, 20, 30, 40);
        Observable<Integer> observable2 = Observable.just(1, 2, 3);
        Observable.merge(observable, observable2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.i(tag, "merge:" + integer);//会打印出10,20,30,1,2,3
            }
        });

错误处理

  • onErrorReturn:让Observable遇到错误时发射一个特殊的项并且正常终止
  • onErrorResumeNext:让Observable在遇到错误时开始发射第二个Observable的数据序列

Schedulers调度器-解决多线程问题

  1. io():用于I/O操作;
  2. computation():计算工作默认的调度器;
  3. immediate():立即执行,允许立即在当前线程执行你指定的工作;
  4. newThread():创建新线程;
  5. trampoline():顺序处理,按需处理队列,并运行队列的每一个任务。

AndroidSchedulers:RxAndroid提供在Android平台的调度器,指定观察者在主线程。

SubscribeOn用于每个Observable对象,ObserveOn用于每个Observer对象

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                emitter.onNext(100);
                emitter.onComplete();
                Log.i(tag, "subscribe thread:" + Thread.currentThread().getName());//打印subscribe thread:RxNewThreadScheduler-1
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                    }
                    @Override
                    public void onNext(@NonNull Integer integer) {
                        Log.i(tag, "onNext thread:" + Thread.currentThread().getName());//打印onNext thread:main
                    }
                    @Override
                    public void onError(@NonNull Throwable e) {
                    }
                    @Override
                    public void onComplete() {
                    }
                });

管理RxJava的生命周期

在使用RxJava的时候,如果没有及时解除订阅,在退出Activity的时候,异步线程还在执行,对Activity的引用还在,此时就会产生内存泄露问题。

可使用RxLifecycle,传送门

引入依赖

implementation 'com.trello.rxlifecycle4:rxlifecycle:4.0.2'
    implementation 'com.trello.rxlifecycle4:rxlifecycle-components:4.0.2'

让你的Activity继承RxAppCompatActivity,Fragment继承RxFragment,其余类似,然后使用bindUntilEvent或者bindToLifecycle

        Observable.interval(1000, TimeUnit.MILLISECONDS)
                .compose(bindUntilEvent(ActivityEvent.DESTROY)) //当前Activity执行到onDestroy时,Observable取消订阅
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Throwable {
                        Log.i(tag, "accept:" + aLong);
                    }
                });
        Observable.interval(1000, TimeUnit.MILLISECONDS)
                .compose(bindToLifecycle())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Throwable {
                        Log.i(tag, "accept:" + aLong);
                    }
                });

使用bindToLifecycle:

如果Observable在onCreate执行,那么当执行到onDestroy时取消订阅。

如果Observable在onStart执行,那么当执行到onStop时取消订阅。

如果Observable在onResume执行,那么当执行到onPause时取消订阅。

RxJava与Retrofit完成网络请求

public interface MyService {
    @GET("gallery/{imageType}/response")
    Observable<List<String>> getImages(@Path("imageType") String imageType);
}
        Retrofit retrofit = new Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .baseUrl(BASE_URL)
                .build();
        MyService service = retrofit.create(MyService.class);
        service.getImages("banner")
                .compose(bindToLifecycle())
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<List<String>>() {
                    @Override
                    public void accept(List<String> strings) throws Throwable {
                        //todo
                    }
                });

到此这篇关于Android RxJava异步数据处理库使用详解的文章就介绍到这了,更多相关Android RxJava异步数据处理内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Rxjava+Retrofit+Okhttp进行网络访问及数据解析

    目录 1,创建Android项目(Android studio)导入相关依赖 2,定义接口类 3,发出请求,回调信息 4,Rxjava 和 Retrofit的结合 前言: 在平时项目开发中Okhttp3.x.Rxjava2.x.Retrofit2.x,使用的越来越多了,需要我们不断的去学习别人的优秀开发设计程序,今天简单的了解下 1,创建Android项目(Android studio)导入相关依赖 implementation 'com.squareup.okhttp3:okhttp:3.11

  • RxJava中map和flatMap的用法区别源码解析

    目录 前言: 作用 使用方法: map flatMap 源码分析 map flatMap 结语 前言: RxJava中提供了大量的操作符,这大大提高了了我们的开发效率.其中最基本的两个变换操作符就是map和flatMap.而其他变换操作符的原理基本与map类似. map和flatMap都是接受一个函数作为参数(Func1)并返回一个被观察者Observable Func1的< I,O >I,O模版分别为输入和输出值的类型,实现Func1的call方法对I类型进行处理后返回O类型数据,只是fla

  • Android使用Kotlin和RxJava 2.×实现短信验证码倒计时效果

    本文介绍了Android使用Kotlin和RxJava 2.×实现短信验证码倒计时效果,分享给大家,具体如下: 场景:注册账号页面时,我们点击按钮发送验证码,在等待验证码时,界面会有倒计时提示,这此期间按钮不可点击.当倒计时结束时,按钮恢复. 实现代码 val timer:TextView = findViewById(R.id.textView) //这里的 timer 就是你要控制显示倒计时效果的 TextView val mSubscription: Subscription? = nul

  • Kotlin结合Rxjava+Retrofit实现极简网络请求的方法

    前言 因为最近正在写的项目集成了两个网络请求框架(Volley and Retrofit)对比之下也是选择了Retrofit.既然选择那自然要让自己以后开发更加省力(就是懒).于是我在Retrofit中加入了Rxjava,这也是当下蛮流行的一个请求框架.然后又利用了Kotlin的一些新特性,使网络请求变得特别简单,代码量特别少. Kotlin镇楼 RxJava RxJava学习是一个曲折漫长的过程,但一旦掌握,妙用无穷. 通过这里了解更多:http://www.jb51.net/article/

  • Kotlin + Retrofit + RxJava简单封装使用详解

    本文介绍了Kotlin + Retrofit + RxJava简单封装使用详解,分享给大家,具体如下: 实例化Retrofit object RetrofitUtil { val CONNECT_TIME_OUT = 30//连接超时时长x秒 val READ_TIME_OUT = 30//读数据超时时长x秒 val WRITE_TIME_OUT = 30//写数据接超时时长x秒 val retrofit: Retrofit by lazy { Log.d("RetrofitUtil"

  • Kotlin下Rxjava的基础用法及流式调用示例详解

    目录 前言 基础用法 fromXXX create interval & timer 指定线程 observeOn & subscribeOn Flowable 流式调用 背压 前言 万事开头难,写文章也是,现在越来越不知道开头怎么写了,所以在前言中,简单介绍下RxJava吧,第一次听说还是以前做Android开发的时候,那时候好多库中都使用了Rxjava,而在网络请求中,也有很多都是使用Rxjava去写,但自己却没怎么在项目中写过,而在搜索资料中发现,微信中搜rxjava时,最多介绍他的

  • Android RxJava异步数据处理库使用详解

    目录 观察者模式 操作符 创建Observable 转换Observable 过滤Observable 组合Observable 错误处理 Schedulers调度器-解决多线程问题 管理RxJava的生命周期 RxJava与Retrofit完成网络请求 观察者模式 四大要素:Observable(被观察者),Observer (观察者),subscribe (订阅),事件. 观察者订阅被观察者,一旦被观察者发出事件,观察者就可以接收到. 扩展的观察者模式 当事件完成时会回调onComplete

  • Android JetPack组件的支持库Databinding详解

    目录 简介 启用databinding 布局xml variable (变量标签) data (数据标签) @{}表达式 绑定普通数据 绑定可观察数据 对单个变量的绑定-fields 对集合的绑定-collections 绑定对象-objects 绑定LiveData 双向绑定 简介 DataBinding 是 Google 在 Jetpack 中推出的一款数据绑定的支持库,利用该库可以实现在页面组件中直接绑定应用程序的数据源.使其维护起来更加方便,架构更明确简介. DataBinding 唯一

  • Android 中RxPermissions 的使用方法详解

    Android 中RxPermissions 的使用方法详解 以请求拍照.读取位置权限为例 module的build.gradle: compile 'com.tbruyelle.rxpermissions2:rxpermissions:0.9.4@aar' compile 'io.reactivex.rxjava2:rxjava:2.0.5' AndroidManifest.xml: <uses-permission android:name="android.permission.AC

  • Android VideoCache视频缓存的方法详解

    Android VideoCache视频缓存的方法详解 项目中遇到视频播放,需要加载网络url,不可能每次都进行网络加载,当然了,就需要用到我们的缓存机制 AndroidVideoCache AndroidVideoCache是一个视频/音频缓存库,利用本地代理实现了边下边播,使用起来非常简单. HttpProxyCacheServer是主要类,是一个代理服务器,可以配置缓存文件的数量.缓存文件的大小.缓存文件的目录和缓存文件命名算法,文件缓存均基于LRU算法,利用Builder来配置: //配

  • Android 网络请求框架Volley实例详解

    Android 网络请求框架Volley实例详解 首先上效果图 Logcat日志信息on Reponse Volley特别适合数据量不大但是通信频繁的场景,像文件上传下载不适合! 首先第一步 用到的RequetQueue RequestQueue.Java RequestQueue请求队列首先得先说一下,ReuqestQueue是如何对请求进行管理的...RequestQueue是对所有的请求进行保存...然后通过自身的start()方法开启一个CacheDispatcher线程用于缓存调度,开

  • Android架构组件Room的使用详解

    Room其实就是一个orm,抽象了SQLite的使用,但是它作为Android的亲儿子orm,并且原生支持LiveData和Rxjava嵌套使用,学习一下还是不错的. Room有3个主要组件 Database :数据库 Entity : 代表数据库一个表结构 Dao : 包含访问数据库的方法 简单使用 添加Google Maven仓库 allprojects { repositories { jcenter() google() } } 添加依赖 dependencies { // Room i

  • Android属性动画之ValueAnimator代码详解

    属性动画通过改变一个对象的属性值来进行动画,属性动画包含了以下几个特性: 1.持续时间(Duration) 主要用来定义动画的持续时间,默认值为300ms. 2.时间插值器(Time interpolation) 指定时间变化的百分比,就是当前流逝时间除以指定的持续时间,这个可以自定义,继承Interpolator,重写getInterpolation方法. 3.重复次数和行为(Repeat count and behavior) 指定动画的执行次数和动画的重复模式 4.动画集(Animator

  • Android+OpenCV4.2.0环境配置详解(Android studio)

    仅是个人记录,希望能对有需要的给予一些小小的帮助 首先我们肯定是要去到OpenCV的官网下载对应的SDK,并解压得到文件夹(opencv-4.2.0-android-sdk) 其次是NDK环境搭建(双击shift,输入sdk,找到sdk manager,将下面红色框框勾选安装) 创建项目,我选用的是(并不是只有这一选择) 导入Module File->New->Import Module 路径选择**\opencv-4.2.0-android-sdk\OpenCV-android-sdk\sd

  • Springboot任务之异步任务的使用详解

    一.SpringBoot--异步任务 1.1 什么是同步和异步 同步是阻塞模式,异步是非阻塞模式. 同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个进程将会-直等待下去,知道收到返回信息才继续执行下去 异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态.当有消息返回式系统会通知进程进行处理,这样可以提高执行的效率. 1.2 Java模拟一个异步请求(线程休眠) AsyncService.java package com.tian.async

  • Android AS创建自定义布局案例详解

    先创建一个title.xml <LinearLayout xmlns:android="http://schemas.android.com/apk/res/android" xmlns:tools="http://schemas.android.com/tools" android:layout_width="match_parent" android:layout_height="match_parent" andr

随机推荐