浅谈Python响应式类库RxPy

一、基本概念

Reactive X中有几个核心的概念,先来简单介绍一下。

1.1、Observable和Observer(可观察对象和观察者)

首先是Observable和Observer,它们分别是可观察对象和观察者。Observable可以理解为一个异步的数据源,会发送一系列的值。Observer则类似于消费者,需要先订阅Observable,然后才可以接收到其发射的值。可以说这组概念是设计模式中的观察者模式和生产者-消费者模式的综合体。

1.2、Operator(操作符)

另外一个非常重要的概念就是操作符了。操作符作用于Observable的数据流上,可以对其施加各种各样的操作。更重要的是,操作符还可以链式组合起来。这样的链式函数调用不仅将数据和操作分隔开来,而且代码更加清晰可读。一旦熟练掌握之后,你就会爱上这种感觉的。

1.3、Single(单例)

在RxJava和其变体中,还有一个比较特殊的概念叫做Single,它是一种只会发射同一个值的Observable,说白了就是单例。当然如果你对Java等语言比较熟悉,那么单例想必也很熟悉。

1.4、Subject(主体)

主体这个概念非常特殊,它既是Observable又是Observer。正是因为这个特点,所以Subject可以订阅其他Observable,也可以将发射对象给其他Observer。在某些场景中,Subject会有很大的作用。

1.5、Scheduler(调度器)

默认情况下Reactive X只运行在当前线程下,但是如果有需要的话,也可以用调度器来让Reactive X运行在多线程环境下。有很多调度器和对应的操作符,可以处理多线程场景下的各种要求。

1.6、Observer和Observable

先来看看一个最简单的例子,运行的结果会依次打印这些数字。这里的of是一个操作符,可以根据给定的参数创建一个新的Observable。创建之后,就可以订阅Observable,三个回调方法在对应的时机执行。一旦Observer订阅了Observable,就会接收到后续Observable发射的各项值。

from rx import of

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.subscribe(
    on_next=lambda i: print(f'Received: {i}'),
    on_error=lambda e: print(f'Error: {e}'),
    on_completed=lambda: print('Completed')

)

这个例子看起来好像很简单,并且看起来没什么用。但是当你了解了Rx的一些核心概念,就会理解到这是一个多么强大的工具。更重要的是,Observable生成数据和订阅的过程是异步的,如果你熟悉的话,就可以利用这个特性做很多事情。

1.7、操作符

在RxPy中另一个非常重要的概念就是操作符了,甚至可以说操作符就是最重要的一个概念了。几乎所有的功能都可以通过组合各个操作符来实现。熟练掌握操作符就是学好RxPy的关键了。操作符之间也可以用pipe函数连接起来,构成复杂的操作链。

from rx import of, operators as op
import rx

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.pipe(
    op.map(lambda i: i ** 2),
    op.filter(lambda i: i >= 10)
).subscribe(lambda i: print(f'Received: {i}'))

在RxPy中有大量操作符,可以完成各种各样的功能。我们来简单看看其中一些常用的操作符。如果你熟悉Java8的流类库或者其他函数式编程类库的话,应该对这些操作符感到非常亲切。

1.8、创建型操作符

首先是创建Observable的操作符,列举了一些比较常用的创建型操作符。

1.9、过滤型操作符

过滤型操作符的主要作用是对Observable进行筛选和过滤。

1.10、转换型操作符

1.11、算术操作符

1.12、Subject

Subject是一种特殊的对象,它既是Observer又是Observable。不过这个对象一般不太常用,但是假如某些用途还是很有用的。所以还是要介绍一下。下面的代码,因为订阅的时候第一个值已经发射出去了,所以只会打印订阅之后才发射的值。

from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject

# Subject同时是Observer和Observable

print('--------Subject---------')
subject = Subject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

另外还有几个特殊的Subject,下面来介绍一下。

1.13、ReplaySubject

ReplaySubject是一个特殊的Subject,它会记录所有发射过的值,不论什么时候订阅的。所以它可以用来当做缓存来使用。ReplaySubject还可以接受一个bufferSize参数,指定可以缓存的最近数据数,默认情况下是全部。

下面的代码和上面的代码几乎完全一样,但是因为使用了ReplaySubject,所以所有的值都会被打印。当然大家也可以试试把订阅语句放到其他位置,看看输出是否会产生变化。

# ReplaySubject会缓存所有值,如果指定参数的话只会缓存最近的几个值
print('--------ReplaySubject---------')
subject = ReplaySubject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 1 2 3 4

1.14、BehaviorSubject

BehaviorSubject是一个特殊的Subject,它只会记录最近一次发射的值。而且在创建它的时候,必须指定一个初始值,所有订阅它的对象都可以接收到这个初始值。当然如果订阅的晚了,这个初始值同样会被后面发射的值覆盖,这一点要注意。

# BehaviorSubject会缓存上次发射的值,除非Observable已经关闭
print('--------BehaviorSubject---------')
subject = BehaviorSubject(0)
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

1.15、AsyncSubject

AsyncSubject是一个特殊的Subject,顾名思义它是一个异步的Subject,它只会在Observer完成的时候发射数据,而且只会发射最后一个数据。因此下面的代码仅仅会输出4.假如注释掉最后一行co_completed调用,那么什么也不会输出。

# AsyncSubject会缓存上次发射的值,而且仅会在Observable关闭后开始发射
print('--------AsyncSubject---------')
subject = AsyncSubject()
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 4

1.16、Scheduler

虽然RxPy算是异步的框架,但是其实它默认还是运行在单个线程之上的,因此如果使用了某些会阻碍线程运行的操作,那么程序就会卡死。当然针对这些情况,我们就可以使用其他的Scheduler来调度任务,保证程序能够高效运行。

下面的例子创建了一个ThreadPoolScheduler,它是基于线程池的调度器。两个Observable用subscribe_on方法指定了调度器,因此它们会使用不同的线程来工作。

import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as op

import multiprocessing
import time
import threading
import random

def long_work(value):
    time.sleep(random.randint(5, 20) / 10)
    return value

pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())

rx.range(5).pipe(
    op.map(lambda i: long_work(i + 1)),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))

rx.of(1, 2, 3, 4, 5).pipe(
    op.map(lambda i: i * 2),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))

如果你观察过各个操作符的API的话,可以发现大部分操作符都支持可选的Scheduler参数,为操作符指定一个调度器。如果操作符上指定了调度器的话,会优先使用这个调度器;其次的话,会使用subscribe方法上指定的调度器;如果以上都没有指定的话,就会使用默认的调度器。

二、应用场景

好了,介绍了一些Reactive X的知识之后,下面来看看如何来使用Reactive X。在很多应用场景下,都可以利用Reactive X来抽象数据处理,把概念简单化。

2.1、防止重复发送

很多情况下我们都需要控制事件的发生间隔,比如有一个按钮不小心按了好几次,只希望第一次按钮生效。这种情况下可以使用debounce操作符,它会过滤Observable,小于指定时间间隔的数据会被过滤掉。debounce操作符会等待一段时间,直到过了间隔时间,才会发射最后一次的数据。如果想要过滤后面的数据,发送第一次的数据,则要使用throttle_first操作符。

下面的代码可以比较好的演示这个操作符,快速按回车键发送数据,注意观察按键和数据显示之间的关系,还可以把throttle_first操作符换成debounce操作符,然后再看看输出会发生什么变化,还可以完全注释掉pipe中的操作符,再看看输出会有什么变化。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# debounce操作符,仅在时间间隔之外的可以发射

ob = Subject()
ob.pipe(
    op.throttle_first(3)
    # op.debounce(3)
).subscribe(
    on_next=lambda i: print(i),
    on_completed=lambda: print('Completed')
)

print('press enter to print, press other key to exit')
while True:
    s = input()
    if s == '':
        ob.on_next(datetime.datetime.now().time())
    else:
        ob.on_completed()
        break

2.2、操作数据流

如果需要对一些数据进行操作,那么同样有一大堆操作符可以满足需求。当然这部分功能并不是Reactive X独有的,如果你对Java 8的流类库有所了解,会发现这两者这方面的功能几乎是完全一样的。

下面是个简单的例子,将两个数据源结合起来,然后找出来其中所有的偶数。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# 操作数据流
some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)
some_data2 = rx.from_iterable(range(10, 20))
some_data.pipe(
    op.merge(some_data2),
    op.filter(lambda i: i % 2 == 0),
    # op.map(lambda i: i * 2)
).subscribe(lambda i: print(i))

再或者一个利用reduce的简单例子,求1-100的整数和。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

rx.range(1, 101).pipe(
    op.reduce(lambda acc, i: acc + i, 0)
).subscribe(lambda i: print(i))

以上就是浅谈Python响应式类库RxPy的详细内容,更多关于Python响应式类库RxPy的资料请关注我们其它相关文章!

(0)

相关推荐

  • Python调用系统命令os.system()和os.popen()的实现

    作为一门脚本语言,写脚本时执行系统命令可以说很常见了,python提供了相关的模块和方法. os模块提供了访问操作系统服务的功能,由于涉及到操作系统,它包含的内容比较多,这里只说system和popen方法. >>> import os >>> dir(os) ['DirEntry', 'F_OK', 'MutableMapping', 'O_APPEND', 'O_BINARY', 'O_CREAT', 'O_EXCL', 'O_NOINHERIT', 'O_RAND

  • Python wordcloud库安装方法总结

    碰到有关于"词云"的概念,那就一定要用到本章教学库--wordcloud,这是第三方的库,主要是用于词云的展示,基本的单位也是以词云为主,利用它的功能,我们可以实现过滤文本信息,这样,就可以直观的观察到我们所需要的信息内容,因此,根据技能上的应用,在实际操作中还是非常常见的,下面来看下安装操作. 安装命令: pip install wordcloud 导入包: from wordcloud import WordCloud 常见方法: 1.加载文本及输出 w = wordcloud.W

  • Python列表元素删除和remove()方法详解

    删除列表中元素的方法有三种: 1. del命令 使用del命令能够删除列表中指定位置上的元素,也可以删除整个列表. 2. pop( )方法 使用列表的pop()方法能够删除并返回列表指定位置(默认为最后一个位置)的元素. 3. remove方法 使用列表的remove()方法能够删除列表中首次出现的指定元素,如果列表中不存在该元素则抛出异常.有的时候可能需要删除列表中某一大量重复的数据,我们很容易就会想到列表的remove()方法,例如: x=[1,2,1,2,1,2,1,2] y=[1,1,2

  • python中pivot()函数基础知识点

    不同于以往为大家介绍的函数使用,我们利用pivot函数可以实现的方式,就是用来重塑数据使用的,在python的使用上并不常见,但是如果需要利用这种功能,基本上能够被我们选择调用的函数,pivot函数一定是榜上有名,下面我们就围绕着该函数,给大家做详细的内容讲解,一起来看下吧. 函数语法: pivot() 参数: Index.columns需要注意的是前者是可选参数,后者是必选参数. 使用实例: import pandas as pd df=pd.read_csv("user_label_part

  • 关于python中remove的一些坑小结

    前几天,使用python时遇到这么一个需求,删除一个列表中值为1的元素.我寻思着使用remove方法,但是remove方法只会删除第一个,于是我使用for循环去删除.代码和运行结果如下: 当时这个结果让我很懵逼,为什么1没有被删除完?查了资料发现,是for循环捣的鬼.因为for循环实际是循环的列表下标(索引),同时由于列表的可变性,每一次删除一个元素,列表的长度就会发生变化,元素的索引也会发生变化.这里来具体分析一下这段代码: 第一次循环,循环索引为0,此时索引为0的元素是1,满足条件,因此my

  • python 使用xlsxwriter循环向excel中插入数据和图片的操作

    写入Excel中后有显示第一列客户款号总库存这些,开始写在第12行第一列开始写入,一行写入5个,然后再隔12行,再写入下边的数据,图片需要对应客户款号在Excel写入图片,类似下面的格式 import xlsxwriter import os #以空字符填充缺失值,不然写入数据会报错 data.fillna('',inplace=True) #创建一个新Excel文件并添加一个工作表. workbook = xlsxwriter.Workbook('images.xlsx') worksheet

  • python opencv实现直线检测并测出倾斜角度(附源码+注释)

    由于学习需要,我想要检测出图片中的直线,并且得到这些直线的角度.于是我在网上搜了好多直线检测的代码,但是没有搜到附有计算直线倾斜角度的代码,所以我花了一点时间,自己写了一份直线检测并测出倾斜角度的代码,希望能够帮助到大家! 注:这份代码只能够检测简单结构图片的直线,复杂结构的图片还需要设置合理的参数 下面展示 源码. import cv2 import numpy as np def line_detect(image): # 将图片转换为HSV hsv = cv2.cvtColor(image

  • 浅谈Python响应式类库RxPy

    一.基本概念 Reactive X中有几个核心的概念,先来简单介绍一下. 1.1.Observable和Observer(可观察对象和观察者) 首先是Observable和Observer,它们分别是可观察对象和观察者.Observable可以理解为一个异步的数据源,会发送一系列的值.Observer则类似于消费者,需要先订阅Observable,然后才可以接收到其发射的值.可以说这组概念是设计模式中的观察者模式和生产者-消费者模式的综合体. 1.2.Operator(操作符) 另外一个非常重要

  • 浅谈Java响应式系统

    初识响应式系统 ReactiveX的本质就是Observer+Iterator+函数编程+异步.是一个事件驱动的,异步的,可观察的序列. 使用RxJava可以将异步的回调改写成为链式调用.在代码上看起来非常简洁明了.当然JDK也提供了CompletionStage提供了类似的解决回调的功能. Rxjava只是一个java的基本库,如果我们想要构建响应式的服务器,响应式的web,响应式的数据访问,甚至是响应式的微服务,又该如何处理呢? 这个时候我了解到了Vert.x.Vert.x就是用来构建Rea

  • 浅谈Rx响应式编程

    目录 一.Observable 二.高阶函数 三.快递盒模型 3.1.快递盒模型1:fromEvent 3.2.快递盒模型2:interval 四.高阶快递盒 五.销毁快递盒 5.1.销毁快递盒--取消订阅 5.2.销毁高阶快递盒 六.补充 七.后记 一.Observable Observable从字面翻译来说叫做"可观察者",换言之就是某种"数据源"或者"事件源",这种数据源具有可被观察的能力,这个和你主动去捞数据有本质区别.用一个形象的比喻就

  • 浅谈Vue响应式(数组变异方法)

    前言 很多初使用Vue的同学会发现,在改变数组的值的时候,值确实是改变了,但是视图却无动于衷,果然是因为数组太高冷了吗? 查看官方文档才发现,不是女神太高冷,而是你没用对方法. 看来想让女神自己动,关键得用对方法.虽然在官方文档中已经给出了方法,但是在下实在好奇的紧,想要解锁更多姿势的话,那就必须先要深入女神的心,于是乎才有了去探索Vue响应式原理的想法.(如果你愿意一层一层地剥开我的心.你会发现,你会讶异-- 沉迷于鬼哭狼嚎 无法自拔QAQ). 前排提示,Vue的响应式原理主要是使用了ES5的

  • 浅谈Spring5 响应式编程

    近年来,响应式编程在开发者社区和客户中很受欢迎,由于其以声明的方式构建应用程序的能力,而不是强制,形成更加敏感和有弹性的应用.Spring 5 将反应系统纳入其核心框架的事实已经显示出向声明式编程的范式转变. 响应式编程管理数据生产者和消费者之间的异步数据流,它们需要以流畅的方式对数据进行响应.所以,响应式编程都是异步和事件驱动的流畅应用程序,需要少量的线程进行缩放. 响应式编程很难构建基于线程的架构,由于在基于共享可变状态.线程和锁的应用程序扩展过程中涉及到高度复杂性. 在响应式编程的上下文中

  • 浅谈Python xlwings 读取Excel文件的正确姿势

    使用Python加载最新的Excel读取类库xlwings可以说是Excel数据处理的利器,但使用起来还是有一些注意事项,否则高大上的Python会跑的比老旧的VBA还要慢. 这里我们对比一下,用几种不同的方法,从一个Excel表格中读取一万行数据,然后计算结果,看看他们的耗时. 1. 处理要求: 一个Excel表格中包含了3万条记录,其中B,C两个列记录了某些计算值,读取前一万行记录,将这两个列的差值进行计算,然后汇总得出差的和. 文件是这个样子:Book300s.xlsx . 2. 处理方式

  • 浅谈Python基础之I/O模型

    一.I/O模型 IO在计算机中指Input/Output,也就是输入和输出.由于程序和运行时数据是在内存中驻留,由CPU这个超快的计算核心来执行,涉及到数据交换的地方,通常是磁盘.网络等,就需要IO接口. 同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别? 这个问题其实不同的人给出的答案都可能不同,比如wiki,就认为asynchronous IO和non-blockin

  • 浅谈python中的面向对象和类的基本语法

    当我发现要写python的面向对象的时候,我是踌躇满面,坐立不安呀.我一直在想:这个坑应该怎么爬?因为python中关于面向对象的内容很多,如果要讲透,最好是用面向对象的思想重新学一遍前面的内容.这个坑是如此之大,犹豫再三,还是只捡一下重要的内容来讲吧,不足的内容只能靠大家自己去补充了. 惯例声明一下,我使用的版本是 python2.7,版本之间可能存在差异. 好,在开讲之前,我们先思考一个问题,看代码: 为什么我只创建是为 a 赋值,就可以使用一些我没写过的方法? 可能会有小伙伴说:因为 a

  • 浅谈python中的数字类型与处理工具

    python中的数字类型工具 python中为更高级的工作提供很多高级数字编程支持和对象,其中数字类型的完整工具包括: 1.整数与浮点型, 2.复数, 3.固定精度十进制数, 4.有理分数, 5.集合, 6.布尔类型 7.无穷的整数精度 8.各种数字内置函数及模块. 基本数字类型 python中提供了两种基本类型:整数(正整数金额负整数)和浮点数(注:带有小数部分的数字),其中python中我们可以使用多种进制的整数.并且整数可以用有无穷精度. 整数的表现形式以十进制数字字符串写法出现,浮点数带

  • 浅谈Python Opencv中gamma变换的使用详解

    伽马变换就是用来图像增强,其提升了暗部细节,简单来说就是通过非线性变换,让图像从暴光强度的线性响应变得更接近人眼感受的响应,即将漂白(相机曝光)或过暗(曝光不足)的图片,进行矫正. 伽马变换的基本形式如下: 大于1时,对图像的灰度分布直方图具有拉伸作用(使灰度向高灰度值延展),而小于1时,对图像的灰度分布直方图具有收缩作用(是使灰度向低灰度值方向靠拢). #分道计算每个通道的直方图 img0 = cv2.imread('12.jpg') hist_b = cv2.calcHist([img0],

随机推荐