基于ZooKeeper实现队列源码

实现原理

先进先出队列是最常用的队列,使用Zookeeper实现先进先出队列就是在特定的目录下创建PERSISTENT_EQUENTIAL节点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费。此场景下Zookeeper的znode用于消息存储,znode存储的数据就是消息队列中的消息内容,SEQUENTIAL序列号就是消息的编号,按序取出即可。由于创建的节点是持久化的,所以不必担心队列消息的丢失问题。

队列(Queue)

分布式队列是通用的数据结构,为了在 Zookeeper 中实现分布式队列,首先需要指定一个 Znode 节点作为队列节点(queue node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带"qn-"结尾,并设置顺序(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:"_path-to-queue-node_/qn-X",X 是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getChildren() 函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。

应用场景

Zookeeper队列不太适合要求高性能的场合,但可以在数据量不大的情况下考虑使用。比如已在项目中使用Zookeeper又需要小规模的队列应用,这时可以使用Zookeeper实现的队列;毕竟引进一个消息中间件会增加系统的复杂性和运维的压力。

详细代码

ZookeeperClient工具类

package org.massive.common;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
 * Created by Massive on 2016/12/18.
 */
public class ZooKeeperClient {
 private static String connectionString = "localhost:2181";
 private static int sessionTimeout = 10000;
 public static ZooKeeper getInstance() throws IOException, InterruptedException {
 //--------------------------------------------------------------
 // 为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss)
 // 这里等Zookeeper的连接完成才返回实例
 //--------------------------------------------------------------
 final CountDownLatch connectedSignal = new CountDownLatch(1);
 ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
  @Override
  public void process(WatchedEvent event) {
   if (event.getState() == Event.KeeperState.SyncConnected) {
   connectedSignal.countDown();
   } else if (event.getState() == Event.KeeperState.Expired) {
   }
  }
  });
 connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);
 return zk;
 }
 public static int getSessionTimeout() {
 return sessionTimeout;
 }
 public static void setSessionTimeout(int sessionTimeout) {
 ZooKeeperClient.sessionTimeout = sessionTimeout;
 }
}

ZooKeeperQueue

package org.massive.queue;
import org.apache.commons.lang3.RandomUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.massive.common.ZooKeeperClient;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
/**
 * Created by Allen on 2016/12/22.
 */
public class ZooKeeperQueue {
 private ZooKeeper zk;
 private int sessionTimeout;
 private static byte[] ROOT_QUEUE_DATA = {0x12,0x34};
 private static String QUEUE_ROOT = "/QUEUE";
 private String queueName;
 private String queuePath;
 private Object mutex = new Object();
 public ZooKeeperQueue(String queueName) throws IOException, KeeperException, InterruptedException {
 this.queueName = queueName;
 this.queuePath = QUEUE_ROOT + "/" + queueName;
 this.zk = ZooKeeperClient.getInstance();
 this.sessionTimeout = zk.getSessionTimeout();
 //----------------------------------------------------
 // 确保队列根目录/QUEUE和当前队列的目录的存在
 //----------------------------------------------------
 ensureExists(QUEUE_ROOT);
 ensureExists(queuePath);
 }
 public byte[] consume() throws InterruptedException, KeeperException, UnsupportedEncodingException {
 List<String> nodes = null;
 byte[] returnVal = null;
 Stat stat = null;
 do {
  synchronized (mutex) {
  nodes = zk.getChildren(queuePath, new ProduceWatcher());
  //----------------------------------------------------
  // 如果没有消息节点,等待生产者的通知
  //----------------------------------------------------
  if (nodes == null || nodes.size() == 0) {
   mutex.wait();
  } else {
   SortedSet<String> sortedNode = new TreeSet<String>();
   for (String node : nodes) {
   sortedNode.add(queuePath + "/" + node);
   }
   //----------------------------------------------------
   // 消费队列里序列号最小的消息
   //----------------------------------------------------
   String first = sortedNode.first();
   returnVal = zk.getData(first, false, stat);
   zk.delete(first, -1);
   System.out.print(Thread.currentThread().getName() + " ");
   System.out.print("consume a message from queue:" + first);
   System.out.println(", message data is: " + new String(returnVal,"UTF-8"));
   return returnVal;
  }
  }
 } while (true);
 }
 class ProduceWatcher implements Watcher {
 @Override
 public void process(WatchedEvent event) {
  //----------------------------------------------------
  // 生产一条消息成功后通知一个等待线程
  //----------------------------------------------------
  synchronized (mutex) {
  mutex.notify();
  }
 }
 }
 public void produce(byte[] data) throws KeeperException, InterruptedException, UnsupportedEncodingException {
 //----------------------------------------------------
 // 确保当前队列目录存在
 // example: /QUEUE/queueName
 //----------------------------------------------------
 ensureExists(queuePath);
 String node = zk.create(queuePath + "/", data,
  ZooDefs.Ids.OPEN_ACL_UNSAFE,
  CreateMode.PERSISTENT_SEQUENTIAL);
 System.out.print(Thread.currentThread().getName() + " ");
 System.out.print("produce a message to queue:" + node);
 System.out.println(" , message data is: " + new String(data,"UTF-8"));
 }
 public void ensureExists(String path) {
 try {
  Stat stat = zk.exists(path, false);
  if (stat == null) {
  zk.create(path, ROOT_QUEUE_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  }
 } catch (KeeperException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
 }
 public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
 String queueName = "test";
 final ZooKeeperQueue queue = new ZooKeeperQueue(queueName);
 for (int i = 0; i < 10; i++) {
  new Thread(new Runnable() {
  @Override
  public void run() {
   try {
   queue.consume();
   System.out.println("--------------------------------------------------------");
   System.out.println();
   } catch (InterruptedException e) {
   e.printStackTrace();
   } catch (KeeperException e) {
   e.printStackTrace();
   } catch (UnsupportedEncodingException e) {
   e.printStackTrace();
   }
  }
  }).start();
 }
 new Thread(new Runnable() {
  @Override
  public void run() {
  for (int i = 0; i < 10; i++) {
   try {
   Thread.sleep(RandomUtils.nextInt(100 * i, 200 * i));
   queue.produce(("massive" + i).getBytes());
   } catch (InterruptedException e) {
   e.printStackTrace();
   } catch (KeeperException e) {
   e.printStackTrace();
   } catch (UnsupportedEncodingException e) {
   e.printStackTrace();
   }
  }
  }
 },"Produce-thread").start();
 }
}

测试

运行main方法,本机器的某次输出结果

Produce-thread produce a message to queue:/QUEUE/test/0000000000 , message data is: massive0
Thread-8 consume a message from queue:/QUEUE/test/0000000000, message data is: massive0
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000001 , message data is: massive1
Thread-6 consume a message from queue:/QUEUE/test/0000000001, message data is: massive1
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000002 , message data is: massive2
Thread-3 consume a message from queue:/QUEUE/test/0000000002, message data is: massive2
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000003 , message data is: massive3
Thread-0 consume a message from queue:/QUEUE/test/0000000003, message data is: massive3
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000004 , message data is: massive4
Thread-5 consume a message from queue:/QUEUE/test/0000000004, message data is: massive4
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000005 , message data is: massive5
Thread-2 consume a message from queue:/QUEUE/test/0000000005, message data is: massive5
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000006 , message data is: massive6
Thread-4 consume a message from queue:/QUEUE/test/0000000006, message data is: massive6
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000007 , message data is: massive7
Thread-9 consume a message from queue:/QUEUE/test/0000000007, message data is: massive7
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000008 , message data is: massive8
Thread-7 consume a message from queue:/QUEUE/test/0000000008, message data is: massive8
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000009 , message data is: massive9
Thread-1 consume a message from queue:/QUEUE/test/0000000009, message data is: massive9 

总结

以上就是本文有关于队列和基于ZooKeeper实现队列源码介绍的全部内容,希望对大家有所帮助。

感谢朋友们对本站的支持!

(0)

相关推荐

  • 为zookeeper配置相应的acl权限

    Zookeeper使用ACL来控制访问Znode,ACL的实现和UNIX的实现非常相似:它采用权限位来控制那些操作被允许,那些操作被禁止.但是和标准的UNIX权限不同的是,Znode没有限制用户(user,即文件的所有者),组(group)和其他(world).Zookeepr是没有所有者的概念的. 每个ZNode的ACL是独立的,且子节点不会继承父节点的ACL.例如:Znode /app对于ip为172.16.16.1只有只读权限,而/app/status是world可读,那么任何人都可以获取

  • 理解zookeeper选举机制

    zookeeper集群 配置多个实例共同构成一个集群对外提供服务以达到水平扩展的目的,每个服务器上的数据是相同的,每一个服务器均可以对外提供读和写的服务,这点和redis是相同的,即对客户端来讲每个服务器都是平等的. 这篇主要分析leader的选择机制,zookeeper提供了三种方式: LeaderElection AuthFastLeaderElection FastLeaderElection 默认的算法是FastLeaderElection,所以这篇主要分析它的选举机制. 选择机制中的概

  • zookeeper watch机制的理解

    首先我们看看为什么添加Watch. ZooKeeper是用来协调(同步)分布式进程的服务,提供了一个简单高性能的协调内核,用户可以在此之上构建更多复杂的分布式协调功能. 多个分布式进程通过ZooKeeper提供的 API来操作共享的ZooKeeper内存数据对象ZNode来达成某种一致的行为或结果,这种模式本质上是基于状态共享的并发模型,与Java的多线程并发模型一致,他们的线程或进程都是"共享式内存通信".Java没有直接提供某种响应式通知接口来监控某个对象状态的变化,只能要么浪费C

  • apache zookeeper使用方法实例详解

    本文涉及了Apache Zookeeper使用方法实例详解的相关知识,接下来我们就看看具体内容. 简介 Apache Zookeeper 是由 Apache Hadoop 的 Zookeeper 子项目发展而来,现在已经成为了 Apache 的顶级项目.Zookeeper 为分布式系统提供了高效可靠且易于使用的协同服务,它可以为分布式应用提供相当多的服务,诸如统一命名服务,配置管理,状态同步和组服务等. Zookeeper 接口简单,开发人员不必过多地纠结在分布式系统编程难于处理的同步和一致性问

  • 基于Zookeeper的使用详解

    更多内容请查看zookeeper官网 Zookper: 一种分布式应用的协作服务 Zookper是一种分布式的,开源的,应用于分布式应用的协作服务.它提供了一些简单的操作,使得分布式应用可以基于这些接口实现诸如同步.配置维护和分集群或者命名的服务.Zookper很容易编程接入,它使用了一个和文件树结构相似的数据模型.可以使用Java或者C来进行编程接入. 众所周知,分布式的系统协作服务很难有让人满意的产品.这些协作服务产品很容易陷入一些诸如竞争选择条件或者死锁的陷阱中.Zookper的目的就是将

  • 基于ZooKeeper实现队列源码

    实现原理 先进先出队列是最常用的队列,使用Zookeeper实现先进先出队列就是在特定的目录下创建PERSISTENT_EQUENTIAL节点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费.此场景下Zookeeper的znode用于消息存储,znode存储的数据就是消息队列中的消息内容,SEQUENTIAL序列号就是消息的编号,按序取出即可.由于创建的节点是持久化的,所以不必担心队列消息的丢失问题. 队列(Queue) 分布式队列是通用的数据结构,为了在 Zookee

  • JDK数组阻塞队列源码深入分析总结

    目录 前言 阻塞队列的功能 数组阻塞队列设计 数组的循环使用 字段设计 构造函数 put函数 take函数 offer函数 add函数 poll函数 总结 前言 在前面一篇文章从零开始自己动手写阻塞队列当中我们仔细介绍了阻塞队列提供给我们的功能,以及他的实现原理,并且基于谈到的内容我们自己实现了一个低配版的数组阻塞队列.在这篇文章当中我们将仔细介绍JDK具体是如何实现数组阻塞队列的. 阻塞队列的功能 而在本篇文章所谈到的阻塞队列当中,是在并发的情况下使用的,上面所谈到的是队列是并发不安全的,但是

  • 基于ArrayList常用方法的源码全面解析

    我相信几乎所有的同学在大大小小的笔试.面试过程中都会被问及ArrayList与LinkedList之间的异同点.稍有准备的人这些问题早已烂熟于心,前者基于数组实现,后者基于链表实现:前者随机方法速度快删除和插入指定位置速度慢,后者随机访问速度慢删除和插入指定位置速度快:两者都是线程不安全的:列表与数组之间的区别等等. 列表与数组之间很大的一个区别就是:数组在其初始化就需要给它确定大小不能动态扩容,而列表则可以动态扩容.ArrayList是基于数组实现的,那么它是如何实现的动态扩容呢? 对于Arr

  • 找到一款不错的基于AJAX留言板源码(PHP版、ASP版)提供下载了

    一.说明 大家好,现将51AJAX的留言板源码放出,有PHP和ASP两个版本. PHP版基于AJAX+PHP4.3+MySql 4.1+Dojo 0.3,ASP版基于AJAX+ASP+Access+Dojo 0.3. 压缩包中已包含了Dojo框架的主文件dojo.js,无需再下载Dojo包: 要下载完整的Dojo框架,请点击这里下载:http://download.dojotoolkit.org/release-0.3.1/dojo-0.3.1-ajax.zip. 关于Dojo的更多信息,请点击

  • 浅谈Vuejs中nextTick()异步更新队列源码解析

    vue官网关于此解释说明如下: vue2.0里面的深入响应式原理的异步更新队列 官网说明如下: 只要观察到数据变化,Vue 将开启一个队列,并缓冲在同一事件循环中发生的所有数据改变.如果同一个 watcher 被多次触发,只会一次推入到队列中.这种在缓冲时去除重复数据对于避免不必要的计算和 DOM 操作上非常重要.然后,在下一个的事件循环"tick"中,Vue 刷新队列并执行实际(已去重的)工作.Vue 在内部尝试对异步队列使用原生的 Promise.then 和 MutationOb

  • Netty分布式flush方法刷新buffer队列源码剖析

    flush方法 上一小节学习了writeAndFlush的write方法, 这一小节我们剖析flush方法 通过前面的学习我们知道, flush方法通过事件传递, 最终会传递到HeadContext的flush方法: public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } 这里最终会调用AbstractUnsafe的flush方法 public final void flush() { a

  • java.lang.Void类源码解析

    在一次源码查看ThreadGroup的时候,看到一段代码,为以下: /* * @throws NullPointerException if the parent argument is {@code null} * @throws SecurityException if the current thread cannot create a * thread in the specified thread group. */ private static Void checkParentAcc

  • ActionScript 3.0中用XMLSocket与服务器通讯程序(源码)

    复制代码 代码如下: // // CXMLSocket.as // // // Written by Leezhm, 20th Oct, 2008 // Contact : Leezhm@luxoom.cn // package { import flash.events.DataEvent; import flash.events.Event; import flash.events.IEventDispatcher; import flash.events.IOErrorEvent; imp

  • Spring源码解析容器初始化构造方法

    目录 前言 构造方法 前言 Spring框架被广泛应用于我们的日常工作中,但是很长时间以来我都是只会使用,不懂它的作用原理.通过最近一段时间的阅读源码,个人发现通过阅读源码,能够帮助我们了解Spring的设计理念,并且对Java编程中的一些设计模式更加熟悉,所以记录一下自己对Spring源码的理解. 在开始进行源码学习前,首先再回顾一下三种Spring编程风格: 基于Schema,即通过xml标签的配置方式 基于Annotation的注解技术,使用@Component等注解配置bean 基于Ja

  • Spring源码分析容器启动流程

    目录 前言 源码解析 1.初始化流程 流程分析 核心代码剖析 2.刷新流程 流程分析 核心代码剖析 前言 本文基于 Spring 的 5.1.6.RELEASE 版本 Spring的启动流程可以归纳为三个步骤: 1.初始化Spring容器,注册内置的BeanPostProcessor的BeanDefinition到容器中 2.将配置类的BeanDefinition注册到容器中 3.调用refresh()方法刷新容器 Spring Framework 是 Java 语言中影响最为深远的框架之一,其

随机推荐