Android Rxjava3 使用场景详解

目录
  • 一、Rxjava使用场景
    • 1、多任务嵌套回调
    • 2、多任务合并处理
    • 3、轮询
    • 4、其他小场景
      • 1)倒计时
      • 2)打字机效果
  • 二、结合Rxbinding的使用场景
    • 1、点击事件防抖
    • 2、输入搜索优化
    • 3、联合判断
  • 三、防泄漏
    • 1、Observable.unsubscribeOn
    • 2、disposable.dispose
    • 3、CompositeDisposable
  • 参考了以下文章,表示感谢:

一、Rxjava使用场景

为了模拟实际场景,从wanandroid网站找了二个接口,如下:(对Wanandroid表示感谢!)

public interface ApiServer {

    /**
     * 接口一
     * 获取文章列表
     * @return
     */
    @GET("article/list/1/json")
    Observable<BaseResponse<ArticleListResp>> getArticleList();

    /**
     * 接口二
     * 获取热词
     * @return
     */
    @GET("hotkey/json")
    Observable<BaseResponse<List<HotKeyResp.DataBean>>> getHotKey();

}

1、多任务嵌套回调

场景:比如调用接口一有回调后才能调用接口二,如果接口一调用失败不再调用接口二。下面是二种写法。

写法一,代码如下:

//为了看清楚代码,没有使用lambda简化
//接口一
Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
//接口二
Observable<BaseResponse<List<HotKeyResp.DataBean>>> hotKey = ApiManager.getInstance().getApiService().getHotKey();
Observable.just(articleList)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .map(new Function<Observable<BaseResponse<ArticleListResp>>, Observable<BaseResponse<List<HotKeyResp.DataBean>>>>() {
            @Override
            public Observable<BaseResponse<List<HotKeyResp.DataBean>>> apply(Observable<BaseResponse<ArticleListResp>> baseResponseObservable) throws Throwable {
               //处理第一个请求返回的数据
                if(baseResponseObservable!=null) mTv.setText(baseResponseObservable.blockingSingle().toString());
                return hotKey;   //发起第二次网络请求
            }
        }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Observable<BaseResponse<List<HotKeyResp.DataBean>>>>() {
            @Override
            public void accept(Observable<BaseResponse<List<HotKeyResp.DataBean>>> baseResponseObservable) throws Throwable {
                //处理第二次网络请求的结果
                if(baseResponseObservable!=null) mTvTwo.setText(baseResponseObservable.blockingSingle().toString());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Throwable {
                //异常的处理:比如Dialog的Dismiss,缺省页展示等
                //注意:如果第一个网络请求异常,整个事件会中断,不会执行第二个网络请求,如果多个请求同理
                //但是请求成功的还是能正常处理
                LogUtil.e(throwable.toString());
            }
        });

写法二,代码如下:

        //为了看清楚代码,没有使用lambda简化
        //接口一
        Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
        //接口二
        Observable<BaseResponse<List<HotKeyResp.DataBean>>> hotKey = ApiManager.getInstance().getApiService().getHotKey();
        //请求第一个
        articleList.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<BaseResponse<ArticleListResp>>() {
                    @Override
                    public void accept(BaseResponse<ArticleListResp> articleListRespBaseResponse) throws Throwable {
                        //处理第一个网络请求的结果
                        if(articleListRespBaseResponse!=null) mTv.setText(articleListRespBaseResponse.toString());
                    }
                }).observeOn(Schedulers.io())
                .flatMap(new Function<BaseResponse<ArticleListResp>, ObservableSource<BaseResponse<List<HotKeyResp.DataBean>>>>() {
                    @Override
                    public ObservableSource<BaseResponse<List<HotKeyResp.DataBean>>> apply(BaseResponse<ArticleListResp> articleListRespBaseResponse) throws Throwable {
                        return hotKey;   //将第一个网络请求转换为第二个网络请求
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<BaseResponse<List<HotKeyResp.DataBean>>>() {
                    @Override
                    public void accept(BaseResponse<List<HotKeyResp.DataBean>> listBaseResponse) throws Throwable {
                        //处理第二次网络请求的结果
                        if(listBaseResponse!=null) mTvTwo.setText(listBaseResponse.toString());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Throwable {
                        //注意:如果第一个网络请求异常,整个事件会中断,不会执行第二个网络请求,多个请求同理
                        //但是在异常前面已经成功的网络请求还是能正常处理
                        //异常的处理:比如Dialog的Dismiss,缺省页展示等
                        LogUtil.e(throwable.toString());
                    }
                });

注意异常处理和线程切换,其他细节代码和注释比较详细。

2、多任务合并处理

场景:接口一和接口二返回数据后一起处理。
代码如下:

private void zipRequest() {
    //为了看清楚代码,没有使用lambda简化
    //接口一
    Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
    //接口二
    Observable<BaseResponse<List<HotKeyResp.DataBean>>> hotKey = ApiManager.getInstance().getApiService().getHotKey();
    Observable.zip(articleList, hotKey, this::combiNotification)  //传入方法定义合并规则
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {

                }

                @Override
                public void onNext(@NonNull String msg) {
                    if(!TextUtils.isEmpty(msg)){
                        mTv.setText(msg);
                    }
                }

                @Override
                public void onError(@NonNull Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });

}

//合并的规则,以及定义合并的返回值
public String combiNotification(BaseResponse<ArticleListResp> articleListRespBaseResponse, BaseResponse<List<HotKeyResp.DataBean>> hotkeyResponse) {
    //比如这里取二个接口数据toString返回
    if (articleListRespBaseResponse != null && hotkeyResponse != null) {
        return articleListRespBaseResponse.toString() + hotkeyResponse.toString();
    }
    return null;
}

3、轮询

场景一:轮询固定的次数(间隔一定的时间),可以提前退出轮询,也可以等轮询到指定次数后自动退出,每次轮询必须等上一次轮询有结果后才能开始下一次轮询。

特别注意repeatWhen操作符,只有在repeatWhen的Function方法中发射onNext事件,重复(repeat)才能触发,发射onError或者onComplite都会结束重复(repeat),基于这一点,通过flatMap操作符将事件转化为延迟一定时间的onNext事件,就达到了延时轮询的目的。至于onNext事件发射的什么不重要。

延伸:retryWhen的Function方法发射onError事件才会重试(retry)。

takeUntil操作符可以定义一定的条件,当达到条件时自动结束整个事件的目的,事件结束时会回调subscribe。

代码如下:

/**
 * 轮询
 * @param pollingTimes 轮询的次数
 */
private void timedPolling(int pollingTimes) {
    AtomicInteger times = new AtomicInteger();
    Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
    articleList.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Observable<Object> objectObservable) throws Throwable {
            return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {  //转换事件
                @Override
                public ObservableSource<?> apply(Object o) throws Throwable {
                    //这里发射延时的onNext事件,触发repeat动作,发射的0不会回调到下面的subscribe
                    return Observable.just(0).delay(2, TimeUnit.SECONDS);
                }
            });
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            //takeUntil定义了二个结束条件:前面是达到了轮询的次数,后面是网络请求返回了成功,当然也可以写成代码块做其他的返回判断
            .takeUntil(response -> times.incrementAndGet() >= pollingTimes || response.getErrorCode() == 0)
            .subscribe(new Observer<BaseResponse<ArticleListResp>>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {

                }

                @Override
                public void onNext(@NonNull BaseResponse<ArticleListResp> articleListRespBaseResponse) {

                }

                @Override
                public void onError(@NonNull Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}

如果想改成不限制次数的也比较简单。

场景二:轮询固定的次数(间隔一定的时间),可以提前退出轮询,也可以等轮询到指定次数后自动退出,这里的轮询不关心上次请求的结果。
代码如下:

/**
 * 轮询一定的次数
 * @param pollTimes 轮询次数
 */
private void timedPolling(int pollTimes) {
    //网络请求
    Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
    //返回值用于取消轮询
    mSubscribe = Observable.intervalRange(0, pollTimes, 0, 2000, TimeUnit.MILLISECONDS)
            .flatMap(new Function<Long, ObservableSource<BaseResponse<ArticleListResp>>>() {
                @Override
                public ObservableSource<BaseResponse<ArticleListResp>> apply(Long aLong) throws Throwable {
                    return articleList;  //转换事件
                }
            }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<BaseResponse<ArticleListResp>>() {
                @Override
                public void accept(BaseResponse<ArticleListResp> listBaseResponse) throws Throwable {
                    //如果满足了退出轮询的条件,可以调用下面的方法退出轮询
                    //mSubscribe.dispose();
                }
            });
}

思路是定时发射事件,然后将事件转化为网络请求。同理可以写出不限次数的轮询。

场景三:不限次数轮询(间隔一定的时间),不关心上次请求的结果。

假如接口返回的code为0时需要取消轮询,代码如下:

Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
//返回值用于取消轮询
mSubscribe = Observable.interval(0, 2000, TimeUnit.MILLISECONDS)
        .flatMap(new Function<Long, ObservableSource<BaseResponse<ArticleListResp>>>() {
            @Override
            public ObservableSource<BaseResponse<ArticleListResp>> apply(Long aLong) throws Throwable {
                return articleList;
            }
        })
        .takeUntil(response -> response.getErrorCode() == 0)  //使用takeUntil自动取消发射
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<BaseResponse<ArticleListResp>>() {
            @Override
            public void accept(BaseResponse<ArticleListResp> articleListRespBaseResponse) throws Throwable {
                //处理回调
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Throwable {
               //处理异常
            }
        });

如果是其他取消条件,也可以写在代码块里:

.takeUntil(response -> {
    //处理接口数据,然后判断是返回true还是false,true:停止发射,false:继续发射
    return false;
})  //使用takeUntil自动取消发射

不管何种轮询,注意在OnDestroy中取消。

4、其他小场景

1)倒计时

验证码的倒计时功能,代码如下:

/**
 * 倒计时
 * @param countDownSeconds 倒计时的秒数
 */
private void countDown(int countDownSeconds) {
    Observable.intervalRange(0, countDownSeconds, 0, 1000, TimeUnit.MILLISECONDS)
            .map(new Function<Long, String>() {
                @Override
                public String apply(Long aLong) throws Throwable {
                    return (countDownSeconds - aLong) + "s后重新获取";
                }
            }).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    mTv.setEnabled(false);
                }

                @Override
                public void onNext(@NonNull String s) {
                    mTv.setText(s);
                }

                @Override
                public void onError(@NonNull Throwable e) {
                    mTv.setEnabled(true);
                    mTv.setText("获取验证码");
                }

                @Override
                public void onComplete() {
                    mTv.setText("获取验证码");
                    mTv.setEnabled(true);
                }
            });
}

效果

2)打字机效果

几行代码实现打字机效果:

@RequiresApi(api = Build.VERSION_CODES.M)  //6.0
public class DaziView extends View {
    private TextPaint mTextPaint;
    private StaticLayout mStaticLayout;

    public DaziView(Context context) {
        super(context,null);
    }

    public DaziView(Context context, @Nullable AttributeSet attrs) {
        super(context, attrs);
        initTextPaint();
    }

    /**
     * 初始化画笔
     */
    private void initTextPaint() {
        mTextPaint = new TextPaint(Paint.ANTI_ALIAS_FLAG);
        mTextPaint.setTextSize(48);
        mTextPaint.setColor(Color.parseColor("#000000"));
    }

    /**
     * 绘制
     * @param content
     */
  public void drawText(String content){
        if(!TextUtils.isEmpty(content)){
            Observable.intervalRange(0,content.length()+1,0,150, TimeUnit.MILLISECONDS)
                    .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Throwable {
                //动态改变文本长度
                    mStaticLayout = StaticLayout.Builder.obtain(content, 0, aLong.intValue(), mTextPaint, getWidth())
                            .build();
                    invalidate();
                }
            });
        }
  }

    @Override
    protected void onDraw(Canvas canvas) {
        super.onDraw(canvas);
        //绘制文本
        mStaticLayout.draw(canvas);
    }
}

文本

<string name="dazi_content">\u3000\u3000你好,这是一个打字机,这是一个打字机这是一个打字机这是一个打字机。\n\u3000\u3000换行空格继续打印。</string>

二、结合Rxbinding的使用场景

RxBinding 提供的绑定能够将任何 Android View 事件转换为 Observable。

一旦将 View 事件转换为 Observable ,将发射数据流形式的 UI 事件,我们就可以订阅这个数据流,这与订阅其他 Observable 方式相同。

引入下面的库:

implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'

1、点击事件防抖

点击事件的写法:

RxView.clicks(button)   //button为控件
        .subscribe(new Consumer<Unit>() {
            @Override
            public void accept(Unit unit) throws Throwable {
                //点击事件
            }
        });

长点击事件的写法:

RxView.longClicks(button)
        .subscribe(new Consumer<Unit>() {
            @Override
            public void accept(Unit unit) throws Throwable {
                //长点击自动响应,不需要等放开手指
            }
        });

点击防抖事件的写法:

RxView.clicks(button)
        .throttleFirst(1000, TimeUnit.MILLISECONDS)   //一秒以内第一次点击事件有效
        .subscribe(new Consumer<Unit>() {
            @Override
            public void accept(Unit unit) throws Throwable {
                //点击事件
            }
        });

2、输入搜索优化

RxTextView.textChanges(editText)  //传入EditText控件
        .debounce(1000,TimeUnit.MILLISECONDS)  //一秒内没有新的事件时,取最后一次事件发射
        .skip(1)    //跳过第一次EditText的空内容
        .subscribeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<CharSequence>() {
            @Override
            public void accept(CharSequence charSequence) throws Throwable {
                //EditText的内容
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Throwable {

            }
        });

3、联合判断

combineLatest 操作符将多个 Observable 发射的事件组装起来,然后再发射组装后的新事件。

Observable<CharSequence> observableEdittext = RxTextView.textChanges(editText).skip(1);
 Observable<CharSequence> observableEdittextTwo =   RxTextView.textChanges(editText_two).skip(1);

 Observable.combineLatest(observableEdittext, observableEdittextTwo, new BiFunction<CharSequence, CharSequence, Boolean>() {
     @Override
     public Boolean apply(CharSequence charSequence, CharSequence charSequence2) throws Throwable {
         if(!TextUtils.isEmpty(charSequence)&&!TextUtils.isEmpty(charSequence2)){
             return true;
         }
         return false;
     }
 }).subscribe(new Consumer<Boolean>() {
     @Override
     public void accept(Boolean aBoolean) throws Throwable {
             //TODO 其他处理
     }
 });

三、防泄漏

1、Observable.unsubscribeOn

Observable<Integer> just = Observable.just(0);
just.subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io());  //取消事件,防止泄漏

2、disposable.dispose

这个比较常用。

3、CompositeDisposable

对订阅事件统一管理

CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(disposableOne);
compositeDisposable.add(disposableTwo);
compositeDisposable.clear();

参考了以下文章,表示感谢:

最适合使用 RxJava 处理的四种场景

Android RxJava应用:网络请求轮询(有条件)

Rxjava3文档级教程三: 实战演练

到此这篇关于Android Rxjava3 使用场景详解的文章就介绍到这了,更多相关Android Rxjava3 使用场景内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • android使用RxJava实现预加载

    在上一篇文章中介绍了使用非RxJava环境下,使用Handler机制SyncBarrier的特性实现预加载功能的方法. 在RxJava的环境下使用BehaviorSubject的特性来实现也是很方便的. BehaviorSubject内部会缓存消息流中最近的一个消息, 在后续有Subscriber订阅时,会直接将缓存的消息发送给Subscriber. RxPreLoader.java封装如下: import android.support.annotation.NonNull; import j

  • Android上传多张图片的实例代码(RxJava异步分发)

    学习RxJava有一段时间了,一直在考虑怎么使用,如何在项目中合理运用它.在android很多项目中,都会存在图片上传,下面我介绍如何用Rxjava异步上传多张图片. 一,用到的框架 compile 'top.zibin:Luban:1.0.9'//图片压缩 compile 'org.xutils:xutils:3.3.34'//网络请求 compile 'io.reactivex:rxandroid:1.1.0'//rxandroid compile 'io.reactivex:rxjava:

  • Android中通过RxJava进行响应式程序设计的入门指南

    错误处理 到目前为止,我们都没怎么介绍onComplete()和onError()函数.这两个函数用来通知订阅者,被观察的对象将停止发送数据以及为什么停止(成功的完成或者出错了). 下面的代码展示了怎么使用这两个函数: Observable.just("Hello, world!") .map(s -> potentialException(s)) .map(s -> anotherPotentialException(s)) .subscribe(new Subscrib

  • Android中的Retrofit+OkHttp+RxJava缓存架构使用

    RxJava如何与Retrofit结合 先扔出build.gradle文件的内容 dependencies { compile fileTree(dir: 'libs', include: ['*.jar']) testCompile 'junit:junit:4.12' compile 'com.android.support:appcompat-v7:23.2.0' compile 'io.reactivex:rxjava:1.1.0' compile 'io.reactivex:rxand

  • android使用Rxjava实现倒计时功能

    一般我们在开发时,常会遇到使用倒计时的场景,以前一般会使用thread+handler来实现,而强大的Rxjava横空出世后,使这一切变得简单了.我们可以在子线程中直接使用发射器每融1S发出一个时间,在主线程中接收更新ui,在等倒计时结束恢复界面,下面给出在用户注册时获取验证码的,倒计时使用的代码demo.具体调用方法如下: /** * 点击获取验证码,10S倒计时,利用Rxjava进行线程切换 * @param view */ public void getSureCode(View view

  • Android RxJava创建操作符Timer的方法

    本文实例为大家分享了Android RxJava创建操作符Timer的具体代码,供大家参考,具体内容如下 之前有写过Android实现倒计时之使用CountDownTimer,除了CountDownTimer,开发中我们也会用到handler,例如 mHandler.sendEmptyMessageDelayed(1, 10*1000); private Handler mHandler = new Handler() { @Override public void handleMessage(

  • Android中用RxJava和ViewPager实现轮播图

    前言 很多人要实现轮播图都会想到使用ViewPager + Handler来完成轮播图的效果.但是在RxJava快速发展的情况下,已经可以使用RxJava来代替Handler完成这样任务了. 下面我们就来介绍如何实现RxJava+ViewPager的轮播图. 效果图如下 ViewPager的操作 说到ViwePager应该大家都不陌生,它可以结合普通的View也可以结合Fragment一起使用.在此我也就不对它的使用方法进行过多的介绍了.直接开始介绍轮播的方法. 常见的轮播操作 private

  • Android中Rxjava实现三级缓存的两种方式

    本文正如标题所说的用rxjava实现数据的三级缓存分别为内存,磁盘,网络,刚好最近在看Android源码设计模式解析与实战(受里面的ImageLoader的设计启发). 我把代码放到了我的hot项目中,github地址 源码下载地址:Rxjava_jb51.rar 1.使用concat()和first()的操作符. 2.使用BehaviorSubject. 先说BehaviorSubject的实现方法,废话不多说直接上代码, /** * Created by wukewei on 16/6/20

  • Android RxJava创建操作符Interval

    Interval操作符:用于创建Observable,跟TimerTask类似,用于周期性发送信息,是一个可以指定线程的TimerTask 首先添加类库 // RxAndroid compile 'io.reactivex:rxandroid:1.2.1' // RxJava compile 'io.reactivex:rxjava:1.2.4' (1) 间隔时间3秒发送一次 private Subscription subscribe; private void start() { if (s

  • Android性能优化之利用Rxlifecycle解决RxJava内存泄漏详解

    前言: 其实RxJava引起的内存泄漏是我无意中发现了,本来是想了解Retrofit与RxJava相结合中是如何通过适配器模式解决的,结果却发现了RxJava是会引起内存泄漏的,所有想着查找一下资料学习一下如何解决RxJava引起的内存泄漏,就查到了利用Rxlifecycle开源框架可以解决,今天周末就来学习一下如何使用Rxlifecycle. 引用泄漏的背景: RxJava作为一种响应式编程框架,是目前编程界网红,可谓是家喻户晓,其简洁的编码风格.易用易读的链式方法调用.强大的异步支持等使得R

随机推荐