在Java中操作Zookeeper的示例代码详解

依赖

 <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.0</version>
  </dependency>

连接到zkServer

//连接字符串,zkServer的ip、port,如果是集群逗号分隔
  String connectStr = "192.168.1.9:2181";

  //zookeeper就是一个zkCli
  ZooKeeper zooKeeper = null;

  try {
     //初始次数为1。后面要在内部类中使用,三种写法:1、写成外部类成员变量,不用加final;2、作为函数局部变量,放在try外面,写成final;3、写在try中,不加final
     CountDownLatch countDownLatch = new CountDownLatch(1);
    //超时时间ms,监听器
    zooKeeper = new ZooKeeper(connectStr, 5000, new Watcher() {
      public void process(WatchedEvent watchedEvent) {
        //如果状态变成已连接
        if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
          System.out.println("连接成功");
          //次数-1
          countDownLatch.countDown();
        }
      }
    });
    //等待,次数为0时才会继续往下执行。等待监听器监听到连接成功,才能操作zk
    countDownLatch.await();
  } catch (IOException | InterruptedException e) {
    e.printStackTrace();
  }

  //...操作zk。后面的demo都是写在此处的

  //关闭连接
  try {
    zooKeeper.close();
  } catch (InterruptedException e) {
    e.printStackTrace();
  }

检测节点是否存在

// 检测节点是否存在

  // 同步方式
  Stat exists = null;
  try {
    //如果存在,返回节点状态Stat;如果不存在,返回null。第二个参数是watch
    exists = zooKeeper.exists("/mall",false);
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }
  if (exists==null){
    System.out.println("节点不存在");
  }
  else {
    System.out.println("节点存在");
  }

  //异步回调
  zooKeeper.exists("/mall",false, new AsyncCallback.StatCallback() {
    //第二个是path znode路径,第三个是ctx 后面传入实参,第四个是znode的状态
    public void processResult(int i, String s, Object o, Stat stat) {
      //如果节点不存在,返回的stat是null
      if (stat==null){
        System.out.println("节点不存在");
      }
      else{
        System.out.println("节点存在");
      }
    }
  // 传入ctx,Object类型
  },null);

操作后,服务端会返回处理结果,返回void、null也算处理结果。

同步指的是当前线程阻塞,等待服务端返回数据,收到返回的数据才继续往下执行;

异步回调指的是,把对结果(返回的数据)的处理写在回调函数中,当前线程不等待返回的数据,继续往下执行,收到返回的数据时自动调用回调函数来处理。

如果处理返回数据的代码之后的操作,不依赖返回数据、对返回数据的处理,那么可以把返回数据的处理写成回调函数。

创建节点

//创建节点

  //同步方式
  try {
    //数据要写成byte[],不携带数据写成null;默认acl权限使用ZooDefs.Ids.OPEN_ACL_UNSAFE;最后一个是节点类型,P是永久,E是临时,S是有序
    zooKeeper.create("/mall", "abcd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    System.out.println("已创建节点/mall");
    //如果节点已存在,会抛出异常
  } catch (KeeperException | InterruptedException e) {     System.out.println("创建节点/mall失败,请检查节点是否已存在");
    e.printStackTrace();
  }

  //异步回调
  zooKeeper.create("/mall", "abcd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.Create2Callback(){
    //第二个path,第三个ctx,第四个节点状态
    public void processResult(int i, String s, Object o, String s1, Stat stat) {
      //回调方式不抛出异常,返回的stat是创建节点的状态,如果节点已存在,返回的stat是null
      if (stat==null){
        System.out.println("创建节点/mall失败,请检查节点是否已存在");
      }
      else {
        System.out.println("节点创建成功");
      }
    }
    //ctx实参
  },null);

删除节点

//删除节点

  //同步方式
  try {
    //第二个参数是版本号,-1表示可以是任何版本
    zooKeeper.delete("/mall1",-1);
    System.out.println("成功删除节点/mall");
  } catch (InterruptedException | KeeperException e) {
    System.out.println("删除节点/mall失败");
    e.printStackTrace();
  }

  //异步回调
  zooKeeper.delete("/mall2", -1, new AsyncCallback.VoidCallback() {
    //第二个是path,第三个是ctx
    public void processResult(int i, String s, Object o) {

    }
  //
  //ctx实参
  },null);

delete()只能删除没有子节点的znode,如果该znode有子节点会抛出异常。

没有提供递归删除子节点的方法,如果要删除带有子节点的znode,需要自己实现递归删除。可以先getChildren()获取子节点列表,遍历列表依次删除子节点,再删除父节点。

获取子节点列表

//获取子节点列表,List<String>,比如/mall/user,/mall/order,返回的是["user"、"order"]

  //同步方式
  List<String> children = null;
  try {
    //第二个参数是watch
    children = zooKeeper.getChildren("/mall", false);
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }
  System.out.println("子节点列表:" + children);

  //异步
  zooKeeper.getChildren("/mall", false, new AsyncCallback.ChildrenCallback() {
    //第二个起依次是:path、ctx、返回的子节点列表
    public void processResult(int i, String s, Object o, List<String> list) {
      System.out.println("子节点列表:" + list);
    }
  //ctx实参
  }, null);

只获取子节点,不获取孙节点。

watch都是:可以写boolean,要添加监听就写true,不监听写false;也可以写Watcher对象,new一个Watcher对象表示要监听,null表示不监听。

获取节点数据

//获取节点数据,返回byte[]

  //同步方式
  byte[] data = null;
  try {
    //第二个参数是watch,第三个是stat
    data = zooKeeper.getData("/mall", false, null);
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }
  //调用new String()时要判断data是否为null,如果是null会抛NPE
  if (data==null){
    System.out.println("该节点没有数据");
  }
  else{
    System.out.println("节点数据:"+new String(data));
  }

  //异步回调
  zooKeeper.getData("/mall", false, new AsyncCallback.DataCallback() {
    //第二个起依次是:path、ctx、返回的节点数据、节点状态
    public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
      //不必判断bytes是否是null,如果节点没有数据,不会调用回调函数;执行到此,说明bytes不是null
      System.out.println("节点数据:" + new String(bytes) );
    }
    //ctx实参
  }, null);

设置|修改节点数据

//设置|更新节点据

  //同步方式
  try {
    //最后一个参数是版本号
    zooKeeper.setData("/mall", "1234".getBytes(), -1);
    System.out.println("设置节点数据成功");
  } catch (KeeperException | InterruptedException e) {
    System.out.println("设置节点数据失败");
    e.printStackTrace();
  }

  //异步回调
  zooKeeper.setData("/mall", "1234".getBytes(), -1, new AsyncCallback.StatCallback() {
    //第二个是path,第三个是ctx
    public void processResult(int i, String s, Object o, Stat stat) {

    }
  // ctx
  },null);

设置acl权限

//设置acl权限

  //第一个参数指定权限,第二个是Id对象
  ACL acl = new ACL(ZooDefs.Perms.ALL, new Id("auth", "chy:abcd"));

  List<ACL> aclList = new ArrayList<>();
  aclList.add(acl);

  //如果List中只有一个ACL对象,也可以这样写
  //List<ACL> aclList = Collections.singletonList(auth);

  //验证权限,需写在设置权限之前。如果之前没有设置权限,也需要先验证本次即将设置的用户
  zooKeeper.addAuthInfo("digest","chy:abcd".getBytes());

  //方式一 setAcl
  try {
    //第二个参数是List<ACL>,第三个参数是版本号
    zooKeeper.setACL("/mall", aclList, -1);
    System.out.println("设置权限成功");
  } catch (KeeperException | InterruptedException e) {
    System.out.println("设置权限失败");
    e.printStackTrace();
  }

  //方式二 在创建节点时设置权限
  try {
    zooKeeper.create("/mall","abcd".getBytes(),aclList,CreateMode.PERSISTENT);
    System.out.println("已创建节点并设置权限");
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }

设置权限之后,连接zkServer进行操作时,都需要先验证用户。

此处未写对应的异步回调。

查看acl权限

//查看acl权限

  //设置权限之后,以后操作时需要先验证用户,一次session中验证一次即可
  zooKeeper.addAuthInfo("digest","chy:abcd".getBytes());

  //同步方式
  try {
    List<ACL> aclList = zooKeeper.getACL("/mall", null);
    System.out.println("acl权限:"+aclList);
  } catch (KeeperException | InterruptedException e) {
    System.out.println("获取acl权限失败");
    e.printStackTrace();
  }

  //异步回调
  zooKeeper.getACL("/mall3", null, new AsyncCallback.ACLCallback() {
    //第二个起:path、ctx、获取到的List<ACL>、节点状态
    public void processResult(int i, String s, Object o, List<ACL> list, Stat stat) {
      //就算没有手动设置acl权限,默认也是有值的
      System.out.println("acl权限:"+list);
    }
  //ctx实参
  },null);

添加监听器

//添加监听 方式一
  try {
    CountDownLatch countDownLatch = new CountDownLatch(1);

    zooKeeper.getData("/mall", new Watcher() {
      public void process(WatchedEvent watchedEvent) {
        //watcher会监听该节点所有的事件,不管发生什么事件都会调用process()来处理,需要先判断一下事件类型
        if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){
          System.out.println("节点数据改变了");
          //会一直监听,如果只监听一次数据改变,将下面这句代码取消注释即可
          //countDownLatch.countDown();
        }
      }
    }, null);
    //默认watcher是一次性的,如果要一直监听,需要借助CountDownLatch
    countDownLatch.await();
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }

ZooKeeper类的exists()、getData()、getChildren()方法都具有添加监听的功能,用法类似。

watchedEvent.getType().equals(Event.EventType.NodeDataChanged)
watchedEvent.getState().equals(Event.KeeperState.SyncConnected)

getType是获取事件类型,getState是获取连接状态。

上面这种方式,会递归监听子孙节点,子孙节点的数据改变也算NodeDataChanged,子孙节点的创建|删除也算NodeCreated|NodeDeleted。

//添加监听 方式二
   try {
    CountDownLatch countDownLatch1 = new CountDownLatch(1);
    zooKeeper.addWatch("/mall", new Watcher() {
      @Override
      public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){
          System.out.println("节点数据改变了");
          //如果只监听一次数据改变,将下面这句代码注释掉
          //countDownLatch1.countDown();
        }
      }
    //监听模式,PERSISTENT是不监听子孙节点,PERSISTENT_RECURSIVE是递归监听子孙节点
    }, AddWatchMode.PERSISTENT_RECURSIVE);
    countDownLatch1.await();
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }

countDownLatch1.await();要阻塞线程,最好启动一条新线程来监听。

只有设置了监听的zkCli,该节点发生事件时才会收到zkServer的通知。

watch只保存在zkServer的内存中(zk依赖jdk,运行在jvm上,堆中的session对象),不持久化到硬盘,就是说设置的监听只在本次会话期间有效,zkCli关闭连接,zkServer在指定时间后(默认连续没有收到10个心跳),zkServer会自动删除相关session,watcher丢失。

移除监听

//移除监听 方式一
  try {
    zooKeeper.addWatch("/mall",null,AddWatchMode.PERSISTENT);
    System.out.println("已移除监听");
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }

就是上面添加监听的哪些方法,watch|watcher参数,如果是boolean类型,设置为false即可关闭监听;如果是Watcher类型,可以设置null覆盖掉之前设置的监听。

//移除监听 方式二
  try {
    //第二个参数是Watcher,原来添加的那个Watcher监听对象,不能是null
    //第三个参数指定要移除监听的哪部分,Any是移除整个监听,Data是移除对数据的监听,Children是移除对子节点的递归监听
    //最后一个参数指定未连接到zkServe时,是否移除本地监听部分
    zooKeeper.removeWatches("/mall",watcher, Watcher.WatcherType.Children,true);
  } catch (InterruptedException | KeeperException e) {
    e.printStackTrace();
  }

监听由2部分组成,一部分在zkServer上,事件发生时通知对应的zkCli;一部分在zkCli,收到zkServer的通知时做出一些处理。

最后一个参数指定未连接到zkServer,是否移除本地(zkCli)监听部分,true——移除,false——不移除。

比如说没有连接到zkServer,移除本地监听,10个心跳内连上了zkServer,zkServer的监听部分仍在,发生事件时仍会通知此zkCli,但zkCli本地监听已经移除了,对通知不会做出处理。

第一种方式会移除整个监听,不需要传入监听对象watcher;

第二种方式功能更全,可以指定移除监听的哪个部分,但需要传入watcher对象,添加监听时要用一个变量来保存watcher对象。

到此这篇关于在Java中操作Zookeeper的示例代码详解的文章就介绍到这了,更多相关Java操作Zookeeper内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java操作Zookeeper原理及过程详解

    ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅.负载均衡.命名服务.分布式协调/通知.集群管理.Master 选举.分布式锁和分布式队列等功能. Zookeeper 一个最常用的使用场景就是用于担任服务生产者和服务消费者的注册中心. 服务生产者将自己提供的服务注册到Zookeeper中心,服务的消费者在进行服务调用的时候先到Zookeeper中查找服务,获取到服务生产者的详细信息之后,再去调用服务生产者的内容与数据.如

  • 浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

    通过zookeeper实现分布式锁 1.创建zookeeper的client 首先通过CuratorFrameworkFactory创建一个连接zookeeper的连接CuratorFramework client public class CuratorFactoryBean implements FactoryBean<CuratorFramework>, InitializingBean, DisposableBean { private static final Logger LOGG

  • java 中 zookeeper简单使用

    一.zookeeper的基本原理 数据模型,如下: ZooKeeper数据模型的结构与Unix文件系统很类似,整体上可以看作是一棵树,每个节点称做一个ZNode.每个ZNode都可以通过其路径唯一标识,比如上图中第三层的第一个ZNode,它的路径是/app1/c1.在每个ZNode上可存储少量数据(默认是1M, 可以通过配置修改,通常不建议在ZNode上存储大量的数据),这个特性非常有用.另外,每个ZNode上还存储了其Acl信息,这里需要注意,虽说ZNode的树形结构跟Unix文件系统很类似,

  • java使用zookeeper实现的分布式锁示例

    使用zookeeper实现的分布式锁 分布式锁,实现了Lock接口 复制代码 代码如下: package com.concurrent; import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeU

  • windows下zookeeper配置java环境变量的方法

    先找到文件 zookeeper的bin目录下编辑zkEnv.cmd 修改如下图 如果你的系统有多个JAVA_HOME类型的系统变量  则需要在该文件中指定用哪一个,并且还要修改windows的jdk环境变量 总结 以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持.如果你想了解更多相关内容请查看下面相关链接

  • 在Java中操作Zookeeper的示例代码详解

    依赖 <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.0</version> </dependency> 连接到zkServer //连接字符串,zkServer的ip.port,如果是集群逗号分隔 String connectStr = "192.

  • Java的静态类型检查示例代码详解

    关于静态类型检查和动态类型检查的解释: 静态类型检查:基于程序的源代码来验证类型安全的过程: 动态类型检查:在程序运行期间验证类型安全的过程: Java使用静态类型检查在编译期间分析程序,确保没有类型错误.基本的思想是不要让类型错误在运行期间发生. 在各色各样的编程语言中,总共存在着两个类型检查机制:静态类型检查和动态类型检查. 静态类型检查是指通过对应用程序的源码进行分析,在编译期间就保证程序的类型安全. 动态类型检查是在程序的运行过程中,验证程序的类型安全.在Java中,编译期间使用静态类型

  • Java中EnumMap代替序数索引代码详解

    本文研究的主要是Java中EnumMap代替序数索引的相关内容,具体介绍如下. 学习笔记<Effective Java 中文版 第2版> 经常会碰到使用Enum的ordinal方法来索引枚举类型. public class Herb { public enum Type { ANNUAL, PERENNIAL, BIENNIAL }; private final String name; private final Type type; Herb(String name, Type type)

  • Vue中key的作用示例代码详解

    Vue中key的作用 key的特殊attribute主要用在Vue的虚拟DOM算法,在新旧Nodes对比时辨识VNodes.如果不使用key,Vue会使用一种最大限度减少动态元素并且尽可能的尝试就地修改.复用相同类型元素的算法,而使用key时,它会基于key的变化重新排列元素顺序,并且会移除key不存在的元素.此外有相同父元素的子元素必须有独特的key,重复的key会造成渲染错误. 描述 首先是官方文档的描述,当Vue正在更新使用v-for渲染的元素列表时,它默认使用就地更新的策略,如果数据项的

  • java中的前++和后++的区别示例代码详解

    java中的前加加++和后加加++,有很多人搞的很晕,不太明白!今天我举几个例子说明下前++和后++的区别! 其实大家只要记住一句话就可以了,前++是先自加再使用而后++是先使用再自加! 前++和后++总结:其实大家只要记住一句话就可以了,前++是先自加再使用而后++是先使用再自加! 请大家看下面的例子就明白了! public class Test { public static void main(String[] args) { //测试,前加加和后加加 //前++和后++总结:其实大家只要

  • 在Java中使用Jwt的示例代码

    JWT 特点 JWT 默认是不加密,但也是可以加密的.生成原始 Token 以后,可以用密钥再加密一次. JWT 不加密的情况下,不能将秘密数据写入 JWT. JWT 不仅可以用于认证,也可以用于交换信息.有效使用 JWT,可以降低服务器查询数据库的次数. JWT 的最大缺点是,由于服务器不保存 session 状态,因此无法在使用过程中废止某个 token,或者更改 token 的权限.也就是说,一旦 JWT 签发了,在到期之前就会始终有效,除非服务器部署额外的逻辑. JWT 本身包含了认证信

  • Java 使用 FFmpeg 处理视频文件示例代码详解

    目前在公司做一个小东西,里面用到了 FFmpeg 简单处理音视频,感觉功能特别强大,在做之前我写了一个小例子,现在记录一下分享给大家,希望大家遇到这个问题知道解决方案. FFmpeg是一套可以用来记录.转换数字音频.视频,并能将其转化为流的开源计算机程序.采用LGPL或GPL许可证.它提供了录制.转换以及流化音视频的完整解决方案.它包含了非常先进的音频/视频编解码库libavcodec,为了保证高可移植性和编解码质量,libavcodec里很多code都是从头开发的. FFmpeg在Linux平

  • Java 添加Word目录的2种方法示例代码详解

    目录是一种能够快速.有效地帮助读者了解文档或书籍主要内容的方式.在Word中,插入目录首先需要设置相应段落的大纲级别,根据大纲级别来生成目录表.本文中生成目录分2种情况来进行: 1.文档没有设置大纲级别,生成目录前需要手动设置 2.文档已设置大纲级别,通过域代码生成目录 使用工具: •Free Spire.Doc for Java 2.0.0 (免费版) •IntelliJ IDEA 工具获取途径1:通过官网下载jar文件包,解压并导入jar文件到IDEA程序. 工具获取途径2:通过Maven仓

  • android 限制某个操作每天只能操作指定的次数(示例代码详解)

    最近有个需求,要求启动页的拦截页每天只能显示3次,超过三次就显示别的页面,然后到第二天才可以再次显示,利用SharePreferences保存天数和每天的次数,大概是思路是:判断 如果是同一天,就去拿保存的次数,当次数小于3才执弹出拦截页,然后,每次弹出,次数就加1,并且保存次数和当天的时间:如果不是同一天,就把次数赋值为1,并且把当天赋值给最后访问的时间,然后保存当前的次数.具体实现如下: package com.example.demo1.test; import android.suppo

  • 封装一下vue中的axios示例代码详解

    在vue项目中,和后台交互获取数据这块,我们通常使用的是axios库,它是基于promise的http库,可运行在浏览器端和node.js中.他有很多优秀的特性,例如拦截请求和响应.取消请求.转换json.客户端防御cSRF等.所以我们的尤大大也是果断放弃了对其官方库vue-resource的维护,直接推荐我们使用axios库.如果还对axios不了解的,可以移步axios文档. 安装 npm install axios; // 安装axios 好了,下面开始今天的正文. 此次封装用以解决: (

随机推荐