Rxjava功能操作符的使用方法详解

Rxjava功能个人感觉很好用,里面的一些操作符很方便,Rxjava有:被观察者,观察者,订阅者,

被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据

下面把Rxjava常用的模型代码列出来,还有一些操作符的运用:

依赖:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
  compile 'io.reactivex.rxjava2:rxjava:2.1.5'

这个是另一种解析数据的方法,阿里巴巴旗下的,听说是解析最快的解析器。。。。

compile 'com.alibaba:fastjson:1.2.39'
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
import android.widget.TextView;

import com.alibaba.fastjson.JSONObject;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

public class MainActivity extends AppCompatActivity {

  private TextView name;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);

    name = (TextView) findViewById(R.id.name);
    //用来调用下面的方法,监听。
    name.setOnClickListener(new View.OnClickListener() {
      @Override
      public void onClick(View v) {

        interval();
      }
    });
  }

  //例1:Observer
  public void observer() {
    //观察者
    Observer<string> observer = new Observer<string>() {
      @Override
      public void onSubscribe(@NonNull Disposable d) {

      }
      @Override
      public void onNext(@NonNull String s) {
        //接收从被观察者中返回的数据
        System.out.println("onNext :" + s);
      }
      @Override
      public void onError(@NonNull Throwable e) {

      }
      @Override
      public void onComplete() {

      }
    };
    //被观察者
    Observable<string> observable = new Observable<string>() {
      @Override
      protected void subscribeActual(Observer<!--? super String--> observer) {
        observer.onNext("11111");
        observer.onNext("22222");
        observer.onComplete();
      }
    };
    //产生了订阅
    observable.subscribe(observer);
  }

  //例2:Flowable
  private void flowable(){
    //被观察者
    Flowable.create(new FlowableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull FlowableEmitter<string> e) throws Exception {
        for (int i = 0; i < 100; i++) {
          e.onNext(i+"");
        }
      }
      //背压的策略,buffer缓冲区        观察者
      //背压一共给了五种策略
      // BUFFER、
      // DROP、打印前128个,后面的删除
      // ERROR、
      // LATEST、打印前128个和最后一个,其余删除
      // MISSING
      //这里的策略若不是BUFFER 那么,会出现著名的:MissingBackpressureException错误
    }, BackpressureStrategy.BUFFER).subscribe(new Consumer<string>() {
      @Override
      public void accept(String s) throws Exception {
        System.out.println("subscribe accept"+s);
        Thread.sleep(1000);
      }
    });
  }

  //例3:线程调度器 Scheduler
  public void flowable1(){
    Flowable.create(new FlowableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull FlowableEmitter<string> e) throws Exception {
        for (int i = 0; i < 100; i++) {
          //输出在哪个线程
          System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());
          e.onNext(i+"");
        }
      }
    },BackpressureStrategy.BUFFER)
        //被观察者一般放在子线程
        .subscribeOn(Schedulers.io())
        //观察者一般放在主线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<string>() {
          @Override
          public void accept(String s) throws Exception {
            System.out.println("s"+ s);
            Thread.sleep(100);
            //输出在哪个线程
            System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());
          }
        });
  }

  //例4:http请求网络,map转化器,fastjson解析器
  public void map1(){
    Observable.create(new ObservableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull final ObservableEmitter<string> e) throws Exception {
        OkHttpClient client = new OkHttpClient();
        Request request = new Request.Builder()
            .url("https://qhb.2dyt.com/Bwei/login")
            .build();
        client.newCall(request).enqueue(new Callback() {
          @Override
          public void onFailure(Call call, IOException e) {

          }

          @Override
          public void onResponse(Call call, Response response) throws IOException {
            String result = response.body().string();
            e.onNext(result);
          }
        });
      }
    })
        //map转换器 flatmap(无序),concatmap(有序)
        .map(new Function<string, bean="">() {
      @Override
      public Bean apply(@NonNull String s) throws Exception {
        //用fastjson来解析数据
        return JSONObject.parseObject(s,Bean.class);
      }
    }).subscribe(new Consumer<bean>() {
      @Override
      public void accept(Bean bean) throws Exception {
        System.out.println("bean = "+ bean.toString() );
      }
    });
  }

  //常见rxjava操作符
  //例 定时发送消息
  public void interval(){
    Observable.interval(2,1, TimeUnit.SECONDS)
        .take(10)
        .subscribe(new Consumer<long>() {
          @Override
          public void accept(Long aLong) throws Exception {
            System.out.println("aLong = " + aLong);
          }
        });
  }

  //例 zip字符串合并
  public void zip(){
    Observable observable1 = Observable.create(new ObservableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull ObservableEmitter<string> e) throws Exception {
        e.onNext("1");
        e.onNext("2");
        e.onNext("3");
        e.onNext("4");
        e.onComplete();

      }
    });
    Observable observable2 = Observable.create(new ObservableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull ObservableEmitter<string> e) throws Exception {
        e.onNext("A");
        e.onNext("B");
        e.onNext("C");
        e.onNext("D");
        e.onComplete();
      }
    });

    Observable.zip(observable1, observable2, new BiFunction<string,string,string>() {
      @Override
      public String apply(@NonNull String o, @NonNull String o2) throws Exception {
        return o + o2;
      }
    }).subscribe(new Consumer<string>() {
      @Override
      public void accept(String o) throws Exception {
        System.out.println("o"+ o);
      }
    });
  }

总结

以上就是本文关于Rxjava功能操作符的使用方法详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Javaweb应用使用限流处理大量的并发请求详解、分享一个简单的java爬虫框架、Java线程之线程同步synchronized和volatile详解等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!

(0)

相关推荐

  • Java的RxJava库操作符的用法及实例讲解

    操作符就是为了解决对Observable对象的变换的问题,操作符用于在Observable和最终的Subscriber之间修改Observable发出的事件.RxJava提供了很多很有用的操作符. 比如map操作符,就是用来把把一个事件转换为另一个事件的. Observable.just("Hello, world!") .map(new Func1<String, String>() { @Override public String call(String s) { r

  • Rxjava功能操作符的使用方法详解

    Rxjava功能个人感觉很好用,里面的一些操作符很方便,Rxjava有:被观察者,观察者,订阅者, 被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据 下面把Rxjava常用的模型代码列出来,还有一些操作符的运用: 依赖: compile 'io.reactivex.rxjava2:rxandroid:2.0.1' // Because RxAndroid releases are few and far between, it is recommended you also /

  • C语言 操作符#与##使用方法详解

    目录 一.# 运算符 二.## 运算符 三.小结 一.# 运算符 # 运算符用于在预处理期将宏参数转换为字符串 # 的转换作用是在预处理期完成的,因此只在宏定义中有效 编译器不知道 # 的转换作用 用法: #define STRING(x) #x printf("%s\n",STRING(Hello World!)); 下面通过一个示例感受一下: test.c: #include <stdio.h> #define STRING(x) #x int main() { pri

  • Android编程使用加速度传感器实现摇一摇功能及优化的方法详解

    本文实例讲述了Android编程使用加速度传感器实现摇一摇功能及优化的方法.分享给大家供大家参考,具体如下: 目前很多应用已经实现了摇一摇功能,这里通过讲解该功能的原理及实现回顾一下加速度传感器的使用: 1.首先获得传感器管理器的实例 sensorManager = (SensorManager) context.getSystemService(Context.SENSOR_SERVICE); 2.通过传感器管理器获得加速传感器 accelerateSensor = getSensorMana

  • CKEditor扩展插件:自动排版功能autoformat插件实现方法详解

    本文实例讲述了CKEditor扩展插件:自动排版功能autoformat插件实现方法.分享给大家供大家参考,具体如下: 1.注册插件 首先找到根目录下的ckeditor/config.js文件,打开文件如下: CKEDITOR.editorConfig = function (config) { // Define changes to default configuration here. For example: // config.language = 'fr'; // config.ui

  • Android中用Bmob实现短信验证码功能的方法详解

    这篇文章主要介绍发送验证码和校验验证码的功能,用到一个第三方平台Bmob,那Bmob是什么呢?Bmob可以开发一个云存储的移动应用软件,他提供了大量的标准的API接口,根据需要接入相关服务,开发者可以更加专注于应用的开发,让产品交付更快速,验证码功能就是其中一个. 一.跟其他第三方一样,我们开发之前要做一些准备工作. 1.首先,去官网注册一个帐号:http://www.bmob.cn/: 2.然后就可以创建应用了:具体怎么做Bmob说得很清楚了(官方操作介绍),如果你不想看,我简单说一下:点击右

  • 为Jquery EasyUI 组件加上清除功能的方法(详解)

    1.背景 在使用 EasyUI 各表单组件时,尤其是使用 ComboBox(下拉列表框).DateBox(日期输入框).DateTimeBox(日期时间输入框)这三个组件时,经常有这样的需求,下拉框或日期只允许选择.不允许手动输入,这时只要在组件选项中加入 editable:false 就可以实现,但有一个问题,就是:一旦选择了,没办法清空.经过研究,可以用一个变通的解决方案:给组件加上一个"清除"按钮,当有值是,显示按钮,点击按钮可清空值,当无值是,隐藏按钮. 2.函数定义 定义JS

  • jQuery ajax的功能实现方法详解

    jQuery的ajax方法非常好用,这么好的东西,你想拥有一个属于自己的ajax么?接下来,我们来自己做一个简单的ajax吧. 实现功能 由于jq中的ajax方法是用了内置的deferred模块,是Promise模式的一种实现,而我们这里没有讲过,所以我们就不使用这一模式啦. 我们只定义一个ajax方法,他可以简单的get,post,jsonp请求就可以啦~~ var ajax = function () { // 做一些初始化,定义一些私有函数等 return function () { //

  • Android中实现ping功能的多种方法详解

    使用java来实现ping功能. 并写入文件.为了使用java来实现ping的功能,有人推荐使用java的 Runtime.exec()方法来直接调用系统的Ping命令,也有人完成了纯Java实现Ping的程序,使用的是Java的NIO包(native io, 高效IO包).但是设备检测只是想测试一个远程主机是否可用.所以,可以使用以下三种方式来实现: 1. Jdk1.5的InetAddresss方式 自从Java 1.5,java.net包中就实现了ICMP ping的功能. 使用时应注意,如

  • Python实现画图软件功能方法详解

    概述 虽然Python的强项在人工智能,数据处理方面,但是对于日常简单的应用,Python也提供了非常友好的支持(如:Tkinter),本文主要一个简单的画图小软件,简述Python在GUI(图形用户界面)方面的应用,仅供学习分享使用,如有不足之处,还请指正. 设计思路 页面布局:主要分为上下两部分 a. 绘图区域,本例以Canvas实现 b. 下部:功能区,由按钮实现 事件监听:通过给功能按钮绑定事件,来实现不同的功能,如:绘线,绘矩形等功能. 绘图区域:监听鼠标左键的按下(开始绘图)和抬起(

  • .NET 6实现基于JWT的Identity功能方法详解

    目录 需求 目标 原理与思路 实现 引入Identity组件 添加认证服务 使用JWT认证和定义授权方式 引入认证授权中间件 添加JWT配置 增加认证用户Model 实现认证服务CreateToken方法 添加认证接口 保护API资源 验证 验证1: 验证直接访问创建TodoList接口 验证2: 获取Token 验证3: 携带Token访问创建TodoList接口 验证4: 更换Policy 一点扩展 总结 参考资料 需求 在.NET Web API开发中还有一个很重要的需求是关于身份认证和授

随机推荐