Kotlin下Rxjava的基础用法及流式调用示例详解

目录
  • 前言
  • 基础用法
    • fromXXX
    • create
    • interval & timer
    • 指定线程
    • observeOn & subscribeOn
  • Flowable
  • 流式调用
  • 背压

前言

万事开头难,写文章也是,现在越来越不知道开头怎么写了,所以在前言中,简单介绍下RxJava吧,第一次听说还是以前做Android开发的时候,那时候好多库中都使用了Rxjava,而在网络请求中,也有很多都是使用Rxjava去写,但自己却没怎么在项目中写过,而在搜索资料中发现,微信中搜rxjava时,最多介绍他的还是Android开发者,所以今天来记录下。

而所谓的响应式编程,就是一种用于应用程序异步编程的技术,他是一个通用的思想,类似与AOP,不只是在java中才有。他专注于对数据的变化做出反应,例如,有一个数据源(这里被称为生产者),一个数据目标(这里被成为消费者),然后在将消费者连接到订阅者之后,响应式编程框架负责将生产者生产的数据推送给消费者,一个可观察对象可以有任意数量的订阅者。

而对于一些思想上的框架,类似于Spring,源码上大体还是比较难的,毕竟就算是人,在思想上跨越也是有难度的,但对于RxJava来说,源码也不是很多,所以在以后会尝试介绍他的源码实现,而使用Rxjava的好处不是在于实现了什么具体的技术功能,比如使用CGLIB可以实现动态代理的技术,使用JDBC可以进行数据查询,而没有rxjava,我们的代码还可以借助Java8的Stream、CompletableFuture来实现。

而rxjava的好处在于让代码更简洁、优雅,通过他的链式调用,消除嵌套等。

在下面的例子中,我们会使用Kotlin来做示范。

基础用法

在这里,Observable 字面意思是可观察者,他表示数据源,通常,一旦订阅者开始收听,他们就会开始提供数据,而just表示仅仅,仅仅生产的数据是一个"T",即泛型类型,在这里是String。

而subscribe表示订阅,当订阅后,他会收到Observable生产的数据,来消费。

fun main() {
     Observable.just("hello rxjava").subscribe {
         println(it)
     }
}
输出:
hello rxjava

fromXXX

而上面说到,just表示仅仅,在rxjava中,不仅仅是具体的数据,还可以是Callable、Array、Future对象等,详细可以看fromXXX等方法,最终的结果由rxjava调用后如Callable的结果后,传递给订阅者。

fun main() {
    Observable.fromCallable {
        println("callable")
        "hello rxjava"
    }.subscribe {
        println(it)
    }
}

create

这个方法给我了我们手动执行的能力,即传递数据到订阅者是我们手动执行的。

fun main() {
    Observable.create<String> {
        it.onNext("hello")
        it.onError(IllegalArgumentException("错误"))
        it.onComplete()
    }.subscribe ({
        println(it)
    },{
        println(it.message)
    },{
        println("完成")
    })
}

interval & timer

还可以通过interval实现固定间隔定时。

fun main() {
    val observable = Observable.interval(1, TimeUnit.SECONDS)
    observable.subscribe {
        println(it)
    }
    observable.subscribe {
        println(it)
        Thread.sleep(2000)
    }
    Thread.sleep(100000);
}

而timer方法则是延迟N时间后,发送数据到订阅者.

fun main() {
    val observable = Observable.timer(2, TimeUnit.SECONDS)
    observable.subscribe {
        println(it)
    }
    observable.subscribe {
        println(it)
        Thread.sleep(2000)
    }
    Thread.sleep(100000);
}

指定线程

而使用上面方法有一个好处,即生产者可以在子线程中完成,而实际消费的时候在主线程,这在Android可谓是一种福利,如下。

fun main() {
    val threadPool = Executors.newCachedThreadPool()
    val anyFuture = threadPool.submit(Callable {
        Thread.sleep(2000)
        "hello"
    })
    Observable.fromFuture(anyFuture).subscribe {
        println(it)
    }
}

而如果担心等待时间问题,可是使用第二个重载方法,指定一个超时时间,而subscribe还有两个主要参数我们没说,一个是error发生错误时回调,一个是complete完成时回调,但在发生错误后,complete是不会回调的。

fun main() {
    val threadPool = Executors.newCachedThreadPool()
    val anyFuture = threadPool.submit(Callable {
        Thread.sleep(2000)
        "hello"
    })
    Observable.fromFuture(anyFuture,1,TimeUnit.SECONDS).subscribe({
        println(it)
    },{
        println("错误")
    },{
        println("完成")
    })
}

observeOn & subscribeOn

但你以为这就结束了吗,不,rxjava提供了丰富的线程切换,observeOn & subscribeOn这两个方法就是用来指定在哪里运行,Schedulers.newThread()表示在新线程,但rxjava实现的线程中,是守护线程,也就是当主线程退出后,他们也会自动退出,而在下面的例子中,如果在最后不加sleep,会导致主线程退出后,rxjava的所有线程在可能没执行完成后也将退出。

fun main() {
    Observable.create<String> {
        println(Thread.currentThread().isDaemon)
        it.onNext("hello")
    }
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.newThread())
        .subscribe {
            println(Thread.currentThread().name)
            println(it)
        }
    Thread.sleep(10000)
}

而如果想自定义线程,也是支持的。

fun createSchedulers(): Scheduler {
    return Schedulers.from {
        thread { it.run() }
    }
}
fun main() {
    Observable.create<String> {
        it.onNext("hello")
    }
        .observeOn(createSchedulers())
        .subscribeOn(Schedulers.newThread())
        .subscribe {
            println(Thread.currentThread().name)
            println(it)
        }
}

Flowable

Flowable可以看成Observable新的实现,他支持背压,而他的API和Observable相似,在最后会介绍背压。

流式调用

我们已经熟悉了Java Stream的好处,所以在这里简单看下rxjava的实现,用法都一样,如下,创建集合"a","b","c","d"

  • map将所有item前添加字符"1"。
  • filter将b结尾的数据过滤掉。
  • skip忽略前n个数据。
fun main() {
    Flowable.fromIterable(mutableListOf("a","b","c","d"))
        .map { "1${it}" }
        .filter { !it.endsWith("b") }
        .skip(1)
        .subscribe {
            println(it)
        }
}

所以最后收到的消息将是 1c、1d

当然他提供的这类API非常之多,就不介绍了。

背压

背压指的是遇到被观察者发送的消息太快,至于它的订阅者不能及时处理数据,而我们可以提供一种告诉被观察者遇到这种情况的策略。

这种场景有个前提条件,被观察者和订阅者在不同线程。

背压策略被定义在BackpressureStrategy,有五种。

MISSING

通过create方法创建的Flowable没有指定背压策略,不会对通过OnNext发送的数据做缓存或丢弃,需要下游通过背压操作符制定策略。

ERROR

如果缓存池数据超限,则抛出异常。

BUFFER

可以无限制添加数据。

DROP

如果缓存池满了,则丢弃。

LATEST

仅保留最新的onNext值,如果下游无法跟上,则覆盖之前的值。

如下,我们使用BUFFER策略,默认的缓存池大小是128,可以通过System.setProperty("rx3.buffer-size","5")指定,而这个策略会导致只有缓存池不满的情况下,才会生产数据并发送给订阅者。

fun main() {
    System.setProperty("rx3.buffer-size","5")
    Observable.interval(1,TimeUnit.MILLISECONDS)
        .toFlowable(BackpressureStrategy.BUFFER)
        .map { User(1) }
        .observeOn(Schedulers.newThread())
        .subscribe {
            Thread.sleep(1000)
            println("hander $it")
        }
    Thread.sleep(100000)
}

而如果我们改成DROP,那么最终只有5条数据被消费,其他全部丢弃。

fun main() {
    System.setProperty("rx3.buffer-size","5")
    Observable.range(1,999)
        .toFlowable(BackpressureStrategy.DROP)
        .map { User(1) }
        .observeOn(Schedulers.newThread())
        .subscribe {
            Thread.sleep(1000)
            println("hander $it")
        }
    Thread.sleep(100000)
}

其他就不做demo了。

以上就是Kotlin下Rxjava的基础用法及流式调用示例详解的详细内容,更多关于Kotlin Rxjava的资料请关注我们其它相关文章!

(0)

相关推荐

  • Kotlin + Retrofit + RxJava简单封装使用详解

    本文介绍了Kotlin + Retrofit + RxJava简单封装使用详解,分享给大家,具体如下: 实例化Retrofit object RetrofitUtil { val CONNECT_TIME_OUT = 30//连接超时时长x秒 val READ_TIME_OUT = 30//读数据超时时长x秒 val WRITE_TIME_OUT = 30//写数据接超时时长x秒 val retrofit: Retrofit by lazy { Log.d("RetrofitUtil"

  • RxJava中map和flatMap的用法区别源码解析

    目录 前言: 作用 使用方法: map flatMap 源码分析 map flatMap 结语 前言: RxJava中提供了大量的操作符,这大大提高了了我们的开发效率.其中最基本的两个变换操作符就是map和flatMap.而其他变换操作符的原理基本与map类似. map和flatMap都是接受一个函数作为参数(Func1)并返回一个被观察者Observable Func1的< I,O >I,O模版分别为输入和输出值的类型,实现Func1的call方法对I类型进行处理后返回O类型数据,只是fla

  • Android RxJava异步数据处理库使用详解

    目录 观察者模式 操作符 创建Observable 转换Observable 过滤Observable 组合Observable 错误处理 Schedulers调度器-解决多线程问题 管理RxJava的生命周期 RxJava与Retrofit完成网络请求 观察者模式 四大要素:Observable(被观察者),Observer (观察者),subscribe (订阅),事件. 观察者订阅被观察者,一旦被观察者发出事件,观察者就可以接收到. 扩展的观察者模式 当事件完成时会回调onComplete

  • Android使用Kotlin和RxJava 2.×实现短信验证码倒计时效果

    本文介绍了Android使用Kotlin和RxJava 2.×实现短信验证码倒计时效果,分享给大家,具体如下: 场景:注册账号页面时,我们点击按钮发送验证码,在等待验证码时,界面会有倒计时提示,这此期间按钮不可点击.当倒计时结束时,按钮恢复. 实现代码 val timer:TextView = findViewById(R.id.textView) //这里的 timer 就是你要控制显示倒计时效果的 TextView val mSubscription: Subscription? = nul

  • Kotlin结合Rxjava+Retrofit实现极简网络请求的方法

    前言 因为最近正在写的项目集成了两个网络请求框架(Volley and Retrofit)对比之下也是选择了Retrofit.既然选择那自然要让自己以后开发更加省力(就是懒).于是我在Retrofit中加入了Rxjava,这也是当下蛮流行的一个请求框架.然后又利用了Kotlin的一些新特性,使网络请求变得特别简单,代码量特别少. Kotlin镇楼 RxJava RxJava学习是一个曲折漫长的过程,但一旦掌握,妙用无穷. 通过这里了解更多:http://www.jb51.net/article/

  • Rxjava+Retrofit+Okhttp进行网络访问及数据解析

    目录 1,创建Android项目(Android studio)导入相关依赖 2,定义接口类 3,发出请求,回调信息 4,Rxjava 和 Retrofit的结合 前言: 在平时项目开发中Okhttp3.x.Rxjava2.x.Retrofit2.x,使用的越来越多了,需要我们不断的去学习别人的优秀开发设计程序,今天简单的了解下 1,创建Android项目(Android studio)导入相关依赖 implementation 'com.squareup.okhttp3:okhttp:3.11

  • Kotlin下Rxjava的基础用法及流式调用示例详解

    目录 前言 基础用法 fromXXX create interval & timer 指定线程 observeOn & subscribeOn Flowable 流式调用 背压 前言 万事开头难,写文章也是,现在越来越不知道开头怎么写了,所以在前言中,简单介绍下RxJava吧,第一次听说还是以前做Android开发的时候,那时候好多库中都使用了Rxjava,而在网络请求中,也有很多都是使用Rxjava去写,但自己却没怎么在项目中写过,而在搜索资料中发现,微信中搜rxjava时,最多介绍他的

  • Java使用FileInputStream流读取文件示例详解

    一.File流概念 JAVA中针对文件的读写操作设置了一系列的流,其中主要有FileInputStream,FileOutputStream,FileReader,FileWriter四种最为常用的流 二.FileInputStream 1)FileInputStream概念  FileInputStream流被称为文件字节输入流,意思指对文件数据以字节的形式进行读取操作如读取图片视频等 2)构造方法 2.1)通过打开与File类对象代表的实际文件的链接来创建FileInputStream流对象

  • Go语言基础变量的声明及初始化示例详解

    目录 一.概述 二.声明变量 三.编译器推导类型的格式[一定要赋值] 四.短变量声明并初始化 五.匿名变量--没有名字的变量 六.注意 七.案例 一.概述 变量的功能是存储用户的数据 二.声明变量 Go语言的每一个变量都拥有自己的类型,必须经过声明才能开始用 变量的声明格式: var <变量名称> [变量类型] var a int //声明一个整型类型的变量,可以保存整数数值 var b string //声明一个字符串类型的变量 var c float32 //声明一个32位浮点切片类型的变

  • flutter中使用流式布局示例详解

    目录 简介 Flow和FlowDelegate Flow的应用 总结 简介 我们在开发web应用的时候,有时候为了适应浏览器大小的调整,需要动态对页面的组件进行位置的调整.这时候就会用到flow layout,也就是流式布局. 同样的,在flutter中也有流式布局,这个流式布局的名字叫做Flow.事实上,在flutter中,Flow通常是和FlowDelegate一起使用的,FlowDelegate用来设置Flow子组件的大小和位置,通过使用FlowDelegate.paintChildre可

  • 在Android环境下WebView中拦截所有请求并替换URL示例详解

    需求背景 接到这样一个需求,需要在 WebView 的所有网络请求中,在请求的url中,加上一个xxx=1的标志位. 例如 http://www.baidu.com 加上标志位就变成了 http://www.baidu.com?xxx=1 寻找解决方案 从 Android API 11 (3.0) 开始,WebView 开始在 WebViewClient 内提供了这样一条 API ,如下: public WebResourceResponse shouldInterceptRequest(Web

  • Go语言基础切片的创建及初始化示例详解

    目录 概述 语法 一.创建和初始化切片 make 字面量 二.使用切片 赋值和切片 切片增长 遍历切片 总结 总示例 示例一  两个slice是否相等 示例二 两个数字是否包含 概述 切片是一种动态数组 按需自动改变大小 与数组相比,切片的长度可以在运行时修改 语法 一.创建和初始化切片 make 使用内置函数make()创建切片: var slice []type = make([]type, len, cap) //简写: slice := make([]type, len, cap) 字面

  • Git基础学习之分支操作的示例详解

    目录 1.新建一个分支并且使分支指向指定的提交对象 2.思考 3.项目分叉历史的形成 4.分支的总结 1.新建一个分支并且使分支指向指定的提交对象 使用命令:git branch branchname commitHash. 我们现在本地库中只有一个 master 分支,并且在 master 分支有三个提交历史. 需求:创建一个 testing 分支,并且testing 分支指向 master 分支第二个版本. # 1.查看提交历史记录 L@DESKTOP-T2AI2SU MINGW64 /j/

  • RxJava和Retrofit2的统一处理单个请求示例详解

    前言 RxJava和Retrofit2用了一段时间了,写个小例子,分享出来,有什么不对的地方还请大神在评论区指正. 什么是Retrofit2 官网是这么介绍的: Retrofit adapts a Java interface to HTTP calls by using annotations on the declared methods to define how requests are made. 我翻译的可能不准确,他的大概意思是说:Retrofit 是一个 java 接口类,以注解

  • C++预定义的流对象基本示例详解

    目录 C++预定义的流对象 示例说明 总结: C++预定义的流对象 C++预定义的流对象是可用于输入和输出的数据流向对象.它们是在C++语言中内置的,可以使用标准库的iostream头文件来调用这些流对象. 在这篇文章中,我们将介绍C++预定义的流对象,并提供一些示例说明. 示例说明 cin: cin是标准输入流对象,用于从控制台读取输入. 示例: int num; cout << "Enter a number: "; cin >> num; cout <

  • C++语言io流处理基本操作教程示例详解

    目录 一.输入输出流对象 流对象常用的处理函数 流控制字符 二.字符流操作 sstream 三. 文件流流类 四.文件指针定位 一.输入输出流对象 cout:标准输出流 cerr:标准出凑  和cout(只是用于如果是错误时要输出的) cin  :   标准输入 流对象常用的处理函数 输出字符 put() 输入字符:get() 输出字符串:write() 输入字符串getline() char ch; cin.get(ch); cout << ch<<endl; cout.put(

随机推荐