详解用RxJava实现事件总线(Event Bus)

目前大多数开发者使用EventBus或者Otto作为事件总线通信库,对于RxJava使用者来说,RxJava也可以轻松实现事件总线,因为它们都依据于观察者模式。

不多说,上代码

/**
* RxBus
* Created by YoKeyword on 2015/6/17.
*/
public class RxBus {
  private static volatile RxBus defaultInstance;

  private final Subject<Object, Object> bus;
  // PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
  public RxBus() {
   bus = new SerializedSubject<>(PublishSubject.create());
  }
  // 单例RxBus
  public static RxBus getDefault() {
    if (defaultInstance == null) {
      synchronized (RxBus.class) {
        if (defaultInstance == null) {
          defaultInstance = new RxBus();
        }
      }
    }
    return defaultInstance ;
  }
  // 发送一个新的事件
  public void post (Object o) {
    bus.onNext(o);
  }
  // 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
  public <T> Observable<T> toObservable (Class<T> eventType) {
    return bus.ofType(eventType);
//    这里感谢小鄧子的提醒: ofType = filter + cast
//    return bus.filter(new Func1<Object, Boolean>() {
//      @Override
//      public Boolean call(Object o) {
//        return eventType.isInstance(o);
//      }
//    }) .cast(eventType);
  }
}

注:

1、Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。

2、PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

3、ofType操作符只发射指定类型的数据,其内部就是filter+cast(这里非常感谢@小鄧子  的提醒)

public final <R> Observable<R> ofType(final Class<R> klass) {
  return filter(new Func1<T, Boolean>() {
    @Override
    public final Boolean call(T t) {
      return klass.isInstance(t);
    }
  }).cast(klass);
}

filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。

cast操作符可以将一个Observable转换成指定类型的Observable。

分析:

RxBus工作流程图

1、首先创建一个可同时充当Observer和Observable的Subject;

2、在需要接收事件的地方,订阅该Subject(此时Subject是作为Observable),在这之后,一旦Subject接收到事件,立即发射给该订阅者;

3、在我们需要发送事件的地方,将事件post至Subject,此时Subject作为Observer接收到事件(onNext),然后会发射给所有订阅该Subject的订阅者。

对于RxBus的使用,就和普通的RxJava订阅事件很相似了。

先看发送事件的代码:

RxBus.getDefault().post(new UserEvent (1, "yoyo"));

userEvent是要发送的事件,如果你用过EventBus, 很容易理解,UserEvent的代码:

public class UserEvent {
  long id;
  String name;
  public UserEvent(long id,String name) {
    this.id= id;
    this.name= name;
  }
  public long getId() {
    return id;
  }
  public String getName() {
    return name;
  }
}

再看接收事件的代码:

// rxSubscription是一个Subscription的全局变量,这段代码可以在onCreate/onStart等生命周期内
rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class)
    .subscribe(new Action1<UserEvent>() {
        @Override
        public void call(UserEvent userEvent) {
          long id = userEvent.getId();
          String name = userEvent.getName();
          ...
        }
      },
    new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {
        // TODO: 处理异常
      }
    });

最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

@Override
protected void onDestroy() {
  super.onDestroy();
  if(!rxSubscription.isUnsubscribed()) {
    rxSubscription.unsubscribe();
  }
}

这样,一个简单的Event Bus就实现了!如果你的项目已经开始使用RxJava,也许可以考虑替换掉EventBus或Otto,减小项目体积。

RxBus、EventBus因为解耦太彻底,滥用的话,项目可维护性会越来越低;一些简单场景更推荐用回调、Subject来代替事件总线。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 简单谈谈RxJava和多线程并发

    前言 相信对于RxJava,大家应该都很熟悉,他最核心的两个字就是异步,诚然,它对异步的处理非常的出色,但是异步绝对不等于并发,更不等于线程安全,如果把这几个概念搞混了,错误的使用RxJava,是会来带非常多的问题的. RxJava与并发 首先让我们来看一段RxJava协议的原文: Observables must issue notifications to observers serially (not in parallel). They may issue these notificat

  • RxJava入门之介绍与基本运用

    前言 因为这个RxJava内容不算少,而且应用场景非常广,所以这个关于RxJava的文章我们会陆续更新,今天就来先来个入门RxJava吧 初识RxJava 什么是Rx 很多教程在讲解RxJava的时候,上来就介绍了什么是RxJava.这里我先说一下什么是Rx,Rx就是ReactiveX,官方定义是: Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序 看到这个定义我只能呵呵,稍微通俗点说是这样的: Rx是微软.NET的一个响应式扩展.Rx借助可观测的序

  • Android中通过RxJava进行响应式程序设计的入门指南

    错误处理 到目前为止,我们都没怎么介绍onComplete()和onError()函数.这两个函数用来通知订阅者,被观察的对象将停止发送数据以及为什么停止(成功的完成或者出错了). 下面的代码展示了怎么使用这两个函数: Observable.just("Hello, world!") .map(s -> potentialException(s)) .map(s -> anotherPotentialException(s)) .subscribe(new Subscrib

  • Android性能优化之利用Rxlifecycle解决RxJava内存泄漏详解

    前言: 其实RxJava引起的内存泄漏是我无意中发现了,本来是想了解Retrofit与RxJava相结合中是如何通过适配器模式解决的,结果却发现了RxJava是会引起内存泄漏的,所有想着查找一下资料学习一下如何解决RxJava引起的内存泄漏,就查到了利用Rxlifecycle开源框架可以解决,今天周末就来学习一下如何使用Rxlifecycle. 引用泄漏的背景: RxJava作为一种响应式编程框架,是目前编程界网红,可谓是家喻户晓,其简洁的编码风格.易用易读的链式方法调用.强大的异步支持等使得R

  • RxJava入门指南及其在Android开发中的使用示例

    RxJava的GitHub主页,部署部分就没什么好说的了~ https://github.com/ReactiveX/RxJava 基础 RxJava最核心的两个东西是Observables(被观察者,事件源)和Subscribers(观察者).Observables发出一系列事件,Subscribers处理这些事件.这里的事件可以是任何你感兴趣的东西(触摸事件,web接口调用返回的数据...) 一个Observable可以发出零个或者多个事件,知道结束或者出错.每发出一个事件,就会调用它的Su

  • RxJava 1升级到RxJava 2过程中踩过的一些“坑”

    RxJava2介绍 RxJava2 发布已经有一段时间了,是对 RxJava 的一次重大的升级,由于我的一个库cv4j使用了 RxJava2 来尝鲜,但是 RxJava2 跟 RxJava1 是不能同时存在于一个项目中的,逼不得已我得把自己所有框架中使用 RxJava 的地方以及 App 中使用 RxJava 的地方都升级到最新版本.所以我整理并记录了一些已经填好的坑.分享出来供大家参考学习,下面来看看详细的介绍: 填坑记录 1. RxJava1 跟 RxJava2 不能共存 如果,在同一个mo

  • Android中的Retrofit+OkHttp+RxJava缓存架构使用

    RxJava如何与Retrofit结合 先扔出build.gradle文件的内容 dependencies { compile fileTree(dir: 'libs', include: ['*.jar']) testCompile 'junit:junit:4.12' compile 'com.android.support:appcompat-v7:23.2.0' compile 'io.reactivex:rxjava:1.1.0' compile 'io.reactivex:rxand

  • RxJava2.x实现定时器的实例代码

    前言 由于现在网络层已经升级到RxJava2.x相关的了,所以需要做些调整.虽然RxJava1.x和RxJava2.x同属RxJava系列,但由于RxJava2.x部分代码的重写,导致RxJava2.x与RxJava1.x已是两个不同的版本,RxJava2.x在性能上更优,尤其在背压支持上.当然,此篇重点不在Rx版本上的区别,有兴趣的同学可以自行研究.当然,2.x之于1.x的区别之一是2.x中已经没有 Subscription mSubscription, Observable.create()

  • 详解用RxJava实现事件总线(Event Bus)

    目前大多数开发者使用EventBus或者Otto作为事件总线通信库,对于RxJava使用者来说,RxJava也可以轻松实现事件总线,因为它们都依据于观察者模式. 不多说,上代码 /** * RxBus * Created by YoKeyword on 2015/6/17. */ public class RxBus { private static volatile RxBus defaultInstance; private final Subject<Object, Object> bu

  • JavaScript实现事件总线(Event Bus)的方法详解

    目录 介绍 原理 分析 进阶 1. 如何在发送消息时传递参数 2. 订阅后如何取消订阅 3. 如何只订阅一次 4. 如何清除某个事件或者所有事件 5. TypeScript 版本 6. 单例模式 总结 介绍 Event Bus 事件总线,通常作为多个模块间的通信机制,相当于一个事件管理中心,一个模块发送消息,其它模块接受消息,就达到了通信的作用. 比如,Vue 组件间的数据传递可以使用一个 Event Bus 来通信,也可以用作微内核插件系统中的插件和核心通信. 原理 Event Bus 本质上

  • 详解pygame捕获键盘事件的两种方式

    方式1:在pygame中使用pygame.event.get()方法捕获键盘事件,使用这个方式捕获的键盘事件必须要是按下再弹起才算一次. 示例示例: for event in pygame.event.get(): # 捕获键盘事件 if event.type == pygame.QUIT: # 判断按键类型 print("按下了退出按键") 方式2:在pygame中可以使用pygame.key.get_pressed()来返回所有按键元组,通过判断键盘常量,可以在元组中判断出那个键被

  • 一文详解JS中的事件循环机制

    目录 前言 1.JavaScript是单线程的 2.同步和异步 3.事件循环 前言 我们知道JavaScript 是单线程的编程语言,只能同一时间内做一件事,按顺序来处理事件,但是在遇到异步事件的时候,js线程并没有阻塞,还会继续执行,这又是为什么呢?本文来总结一下js 的事件循环机制. 1.JavaScript是单线程的 JavaScript 是一种单线程的编程语言,只有一个调用栈,决定了它在同一时间只能做一件事.在代码执行的时候,通过将不同函数的执行上下文压入执行栈中来保证代码的有序执行.在

  • 详解SpringBoot实现ApplicationEvent事件的监听与发布

    目录 新建SpringBoot项目 实现代码 pom.xml Application.java TaskPoolConfig.java EmailDto.java SendEmailEvent.java SendEmailListener.java SendEmailService.java SendEmailServiceImpl.java IndexController.java 通过发布订阅模式实现数据的异步处理,比如异步处理邮件发送 新建SpringBoot项目 项目结构 .├── po

  • 详解vue中v-on事件监听指令的基本用法

    一.本节说明 我们在开发过程中经常需要监听用户的输入,比如:用户的点击事件.拖拽事件.键盘事件等等.这就需要用到我们下面要学习的内容v-on指令. 我们通过一个简单的计数器的例子,来讲解v-on指令的使用. 二. 怎么做 定义数据counter,用于表示计数器数字,初始值设置为0 v-on:click 表示当发生点击事件的时候,触发等号里面的表达式或者函数 表达式counter++和counter--分别实现计数器数值的加1和减1操作 语法糖:我们可以将v-on:click简写为@click 三

  • 实例详解JS中的事件循环机制

    目录 一.前言 二.宏.微任务 三.Tick 执行顺序 四.案例详解 1.掺杂setTimeout 2.掺杂微任务,此处主要是Promise.then 3.掺杂async/await 一.前言 之前我们把react相关钩子函数大致介绍了一遍,这一系列完结之后我莫名感到空虚,不知道接下来应该更新有关哪方面的文章.最近想了想,打算先回归一遍JS基础,把一些比较重要的基础知识点回顾一下,然后继续撸框架(可能是源码.也可能补全下全家桶).不积跬步无以至千里,万丈高楼咱们先从JS的事件循环机制开始吧,废话

  • 详解python多线程、锁、event事件机制的简单使用

    线程和进程 1.线程共享创建它的进程的地址空间,进程有自己的地址空间 2.线程可以访问进程所有的数据,线程可以相互访问 3.线程之间的数据是独立的 4.子进程复制线程的数据 5.子进程启动后是独立的 ,父进程只能杀掉子进程,而不能进行数据交换 6.修改线程中的数据,都是会影响其他的线程,而对于进程的更改,不会影响子进程 threading.Thread Thread 是threading模块中最重要的类之一,可以使用它来创建线程.有两种方式来创建线程:一种是通过继承Thread类,重写它的run

  • 详解python tkinter教程-事件绑定

    一个Tkinter主要跑在mainloop进程里.Events可能来自多个地方,比如按键,鼠标,或是系统事件. Tkinter提供了丰富的方法来处理这些事件.对于每一个控件Widget,你都可以为其绑定方法function. widget.bind(event,handler) 如果相应的event发生了,就会调用handler处理事件.举个例子: 捕获鼠标点击事件: from Tkinter import * root = Tk() def callback(event): print "cl

  • 详解Vue 方法与事件处理器

    方法与事件处理器 方法处理器 可以用 v-on 指令监听 DOM 事件: <div id="example"> <button v-on:click="greet">Greet</button> </div> 我们绑定了一个单击事件处理器到一个方法 greet.下面在 Vue 实例中定义这个方法: var vm = new Vue({ el: '#example', data: { name: 'Vue.js' },

随机推荐