浅谈java实现redis的发布订阅(简单易懂)

redis的应用场景实在太多了,现在介绍一下它的几大特性之一   发布订阅(pub/sub)。

特性介绍:

什么是redis的发布订阅(pub/sub)?   Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为相似。

同样,Redis的pub/sub是一种消息通信模式,主要的目的是解除消息发布者和消息订阅者之间的耦合,  Redis作为一个pub/sub的server, 在订阅者和发布者之间起到了消息路由的功能。

如果没听懂上述的专业解释,没关系,其实我也没太听懂。

简单来讲,这里面还有个channel的概念,这里就是频道的意思,比如你订阅了银行的频道,当你的资金发生变动时,你就会接受到银行就会通过它的频道给你发送信息,在这里,你是属于被动接收的,而不是向银行索要信息,这个例子中,你就是sub(订阅者),而银行就是pub(发布者)。

项目运用场景:

一直都认为你会一样技术之前,都必须先明白这样一种技术在哪些地方会被用到,不能盲目的学东西。

看到发布订阅的特性,用来做一个简单的实时聊天系统再适合不过了。这是其中之一,当然这样的东西,我们开发中很少涉及到。再举一个常用的,在我们的分布式架构中,常常会遇到读写分离的场景,在写入的过程中,就可以使用redis发布订阅,使得写入值及时发布到各个读的程序中,就保证数据的完整一致性。再比如,在一个博客网站中,有100个粉丝订阅了你,当你发布新文章,就可以推送消息给粉丝们拉。总之场景很多,需要去挖掘。。

回顾java如何操作redis:

redis是一种缓存数据库,它也是C/S的结构,也就是客户端和服务端,一般来说,在java中,我们通常使用 jedis(客户端)去操作redis(服务端),这其中操作的时候,两者之间肯定要建立连接,就像数据库链接一样,在关系型数据库中,我们一般都维护一个连接池,以达到链接的复用,来省去建立连接和关闭连接的时间。所以在jedis中,同样也存在一个jedispool(jedis连接池)的概念,我们都是从池中去取连接使用。

上代码:

想使用jedis先引入依赖

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

建立一个Publisher (发布者)

public class Publisher extends Thread{

  private final JedisPool jedisPool;

  public Publisher(JedisPool jedisPool) {
    this.jedisPool = jedisPool;
  }

  @Override
  public void run() {
    BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
    Jedis jedis = jedisPool.getResource();  //连接池中取出一个连接
    while (true) {
      String line = null;
      try {
        line = reader.readLine();
        if (!"quit".equals(line)) {
          jedis.publish("mychannel", line);  //从 mychannel 的频道上推送消息
        } else {
          break;
        }
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}

再建立一个订阅者

public class Subscriber extends JedisPubSub {
  public Subscriber(){}
  @Override
  public void onMessage(String channel, String message) {    //收到消息会调用
    System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
  }
  @Override
  public void onSubscribe(String channel, int subscribedChannels) {  //订阅了频道会调用
    System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
        channel, subscribedChannels));
  }
  @Override
  public void onUnsubscribe(String channel, int subscribedChannels) {  //取消订阅 会调用
    System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
        channel, subscribedChannels));

  }
}

这里订阅者需要继承JedisPubSub,来重写它的三个方法。用途 注释上已经写了,很简单。

我们这里只是定义了一个订阅者,下面去订阅频道。

public class SubThread extends Thread {
  private final JedisPool jedisPool;
  private final Subscriber subscriber = new Subscriber();

  private final String channel = "mychannel";

  public SubThread(JedisPool jedisPool) {
    super("SubThread");
    this.jedisPool = jedisPool;
  }

  @Override
  public void run() {
    System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
    Jedis jedis = null;
    try {
      jedis = jedisPool.getResource();  //取出一个连接
      jedis.subscribe(subscriber, channel);  //通过subscribe 的api去订阅,入参是订阅者和频道名
    } catch (Exception e) {
      System.out.println(String.format("subsrcibe channel error, %s", e));
    } finally {
      if (jedis != null) {
        jedis.close();
      }
    }
  }
}

最后,再写一个测试类去跑一下。键盘输入消息,订阅者就会触发onMessage方法

public class PubSubDemo {
  public static void main( String[] args )
  {
    // 连接redis服务端
    JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379);

    System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", "127.0.0.1", 6379));

    SubThread subThread = new SubThread(jedisPool); //订阅者
    subThread.start();

    Publisher publisher = new Publisher(jedisPool);  //发布者
    publisher.start();
  }
}

看打印结果

附上代码地址  https://github.com/fangyong1421/redis

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

您可能感兴趣的文章:

  • redis发布订阅_动力节点Java学院整理
  • redis发布和订阅_动力节点Java学院整理
(0)

相关推荐

  • redis发布和订阅_动力节点Java学院整理

    Redis 的 pub sub实现了邮件系统,发送者(在 Redis 术语中被称为发布者)发送的邮件,而接收器(用户)接收它们.由该消息传送的链路被称为信道. Redis客户端可以订阅任何数目的通道. 例子 以下举例说明如何发布用户的概念工作.在下面的例子给出一个客户端订阅一个通道名为redisChat redis 127.0.0.1:6379> SUBSCRIBE redisChat Reading messages... (press Ctrl-C to quit) 1) "subsc

  • redis发布订阅_动力节点Java学院整理

    其实在很多的MQ产品中都存在这样的一个模式,我们常听到的一个例子就是邮件订阅的场景,什么意思呢,也就是说100个人订阅了你的博客,如果博主发表了文章,那么100个人就会同时收到通知邮件,除了这个场景还能找到其他场景么,当然有啦,你想想,如果你要在内存里面做一个读写分离的程序,为了维持数据的完整性,你是不是需要保证在写入的时候,也要分发到各个读内存的程序中呢?所以说场景还是很多的,在于你的挖掘~~~ 下面还是从基本命令入手: 一:命令简介 从redis手册上面可以看到,其实"发布.订阅"

  • 浅谈java实现redis的发布订阅(简单易懂)

    redis的应用场景实在太多了,现在介绍一下它的几大特性之一   发布订阅(pub/sub). 特性介绍: 什么是redis的发布订阅(pub/sub)?   Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能.基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件:发布者(如服务器)可将订阅者感兴趣的事件随时通知

  • 浅谈java如何实现Redis的LRU缓存机制

    目录 LRU概述 使用LinkedHashMap实现 使用LinkedHashMap简单方法实现 双链表+hashmap LRU概述 最近使用的放在前面,最近没用的放在后面,如果来了一个新的数,此时内存满了,就需要把旧的数淘汰,那为了方便移动数据,肯定就得使用链表类似的数据结构,再加上要判断这条数据是不是最新的或者最旧的那么应该也要使用hashmap等key-value形式的数据结构. 使用LinkedHashMap实现 package thread; import java.util.Link

  • 浅谈Java开发架构之领域驱动设计DDD落地

    目录 一.前言 二.开发目标 三.服务架构 3.1.应用层{application} 3.2.领域层{domain} 3.3.基础层{infrastructrue} 3.4.接口层{interfaces} 四.开发环境 五.代码示例 六.综上总结 一.前言 整个过程大概是这样的,开发团队和领域专家一起通过 通用语言(Ubiquitous Language)去理解和消化领域知识,从领域知识中提取和划分为一个一个的子领域(核心子域,通用子域,支撑子域),并在子领域上建立模型,再重复以上步骤,这样周而

  • 浅谈MySQL与redis缓存的同步方案

    本文介绍MySQL与Redis缓存的同步的两种方案 方案1:通过MySQL自动同步刷新Redis,MySQL触发器+UDF函数实现 方案2:解析MySQL的binlog实现,将数据库中的数据同步到Redis 一.方案1(UDF) 场景分析:当我们对MySQL数据库进行数据操作时,同时将相应的数据同步到Redis中,同步到Redis之后,查询的操作就从Redis中查找 过程大致如下: 在MySQL中对要操作的数据设置触发器Trigger,监听操作 客户端(NodeServer)向MySQL中写入数

  • 浅谈Java 将图片打包到jar中的路径问题

    Eclipse使用导出Jar包后打开加载不了图像? 出现这种问题的原因大多是因为路径的问题,往往是在项目中运行正常,但是一旦打包成Jar后就不能正常显示了,下面总结一下解决此类问题的方法: 一.通过使用外部资源文件的方式解决 把项目打包生成Jar后,再新建一文件夹,把项目中所有使用的图片文件和生成Jar文件放在该目录下,只要代码对图片文件的引用路径正确,如:ImageIcon image=new ImageIcon(SwingResourceManager.getImage("icons/log

  • 浅谈Java中是否直接可以使用enum进行传输

    背景 我们在进行传输的时候 会有一些状态值,如Status为1代表删除,为0代表失败或者怎么样的.只传输一个)0或者1过去给第三方(此处不包括给前端),如果没有契约第三方会不认识你这个是什么意思,那我们在平时写业务逻辑的时候使用枚举很轻易就知道了什么状态什么值.所以我们在构建DTO对象的时候里面放一个枚举来表示. 首先在阿里的规范里是这样说的: [强制]二方库里可以定义枚举类型,参数可以使用枚举类型,但是接口返回值不允许使用枚举类型或者包含枚举类型的 POJO 对象. 那到底为啥不能用呢? 枚举

  • 浅谈Java分布式架构下如何实现分布式锁

    01分布式锁运用场景 互联网秒杀,抢优惠卷,接口幂等性校验.咱们以互联网秒杀为例. @RestController @Slf4j publicclassIndexController{ @Autowired privateRedissonredission; @Autowired privateStringRedisTemplatestringRedisTemplate; @RequestMapping("/deduct_stock") publicStringdeductStock(

  • 浅谈JAVA版本号的问题 Java版本号与JDk版本

    初学Java时便一直疑惑Java版本号到底是如何命名的?时常在网上看到Java5.Java6.Java7.Java8 (到今天已经到了Java12了,2019.4.5) 这一类 "Java X" 的Java版本名称,同时又会看到诸如JDK1.5.JDK1.6这中 "JDK1.X" 的JDK叫法. 一直以来都在纠结Java以及JDK的规范版本名称到底是如何,直到最近在几本书上看到了相关的解释才有点明白, 现总结在这里: 首先1996年发布了最初版本Java1.0(此前

  • 浅谈java接口的幂等性及解决方案

    目录 一.什么情况下需要幂等 二.幂等性解决方案 2.1 token机制(令牌) 2.2 各种锁机制 2.3 各种唯一约束 2.4 防重表 2.5 全局请求唯一id 总结 一.什么情况下需要幂等 用户多次点击按钮 用户页面回退再次提交 微服务相互调用,由于网络问题,导致请求失败,feign触发重试机制 二.幂等性解决方案 2.1 token机制(令牌) 在加载页面详情时候,服务器会顺便生成一个token一起返回给前端,服务端同时也在Redis中保存这个token数据,前端并不展示这个token,

  • 浅谈Java中Unicode的编码和实现

    Unicode的编码和实现 大概来说,Unicode编码系统可分为编码方式和实现方式两个层次. 编码方式 字符是抽象的最小文本单位.它没有固定的形状(可能是一个字形),而且没有值."A"是一个字符,"€"也是一个字符.字符集是字符的集合.编码字符集是一个字符集,它为每一个字符分配一个唯一数字. Unicode 最初设计是作为一种固定宽度的 16 位字符编码.也就是每个字符占用2个字节.这样理论上一共最多可以表示216(即65536)个字符.上述16位统一码字符构成基

随机推荐