apache zookeeper使用方法实例详解

本文涉及了Apache Zookeeper使用方法实例详解的相关知识,接下来我们就看看具体内容。

简介

Apache Zookeeper 是由 Apache Hadoop 的 Zookeeper 子项目发展而来,现在已经成为了 Apache 的顶级项目。Zookeeper 为分布式系统提供了高效可靠且易于使用的协同服务,它可以为分布式应用提供相当多的服务,诸如统一命名服务,配置管理,状态同步和组服务等。 Zookeeper 接口简单,开发人员不必过多地纠结在分布式系统编程难于处理的同步和一致性问题上,你可以使用 Zookeeper 提供的现成(off-the-shelf)服务来实现分布式系统的配置管理,组管理,Leader 选举等功能。

英文原文地址:http://zookeeper.apache.org/doc/current/javaExample.html

一个简单的 Zookeeper Watch 客户端

为了介绍 Zookeeper Java API 的基本用法,本文将带你如何一步一步实现一个功能简单的  Zookeeper 客户端。该 Zookeeper 客户端会监视一个你指定 Zookeeper 节点 Znode, 当被监视的节点发生变化时,客户端会启动或者停止某一程序。

基本要求

该客户端具备四个基本要求:

(1)客户端所带参数:

(2)Zookeeper 服务地址。

(3)被监视的 Znode 节点名称。

(4)可执行程序及其所带的参数

客户端会获取被监视 Znode 节点的数据并启动你所指定的可执行程序。如果被监视的 Znode 节点发生改变,客户端重新获取其内容并再次启动你所指定的可执行程序。如果被监视的 Znode 节点消失,客户端会杀死可执行程序。

程序设计

一般而言,Zookeeper 应用程序分为两部分,其中一部分维护与服务器端的连接,另外一部分监视 Znode 节点的数据。在本程序中,Executor 类负责维护 Zookeeper 连接,DataMonitor 类监视 Zookeeper 目录树中的数据, 同时,Executor 包含了主线程和程序主要的执行逻辑,它负责少量的用户交互,以及与可执行程序的交互,该可执行程序接受你向它传入的参数,并且会根据被监视的 Znode 节点的状态变化停止或重启。

Executor类

Executor 对象是本例程最基本的“容器”,它包括Zookeeper 对象和DataMonitor对象。

public static void main(String[] args) {
    if (args.length < 4) {
      System.err
          .println("USAGE: Executor hostPort znode filename program [args ...]");
      System.exit(2);
    }
    String hostPort = args[0];
    String znode = args[1];
    String filename = args[2];
    String exec[] = new String[args.length - 3];
    System.arraycopy(args, 3, exec, 0, exec.length);
    try {
      new Executor(hostPort, znode, filename, exec).run();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  public Executor(String hostPort, String znode, String filename,
      String exec[]) throws KeeperException, IOException {
    this.filename = filename;
    this.exec = exec;
    zk = new ZooKeeper(hostPort, 3000, this);
    dm = new DataMonitor(zk, znode, null, this);
  }
  public void run() {
    try {
      synchronized (this) {
        while (!dm.dead) {
          wait();
        }
      }
    } catch (InterruptedException e) {
    }
  }

回忆一下 Executor 的任务是根据 Zookeeper 中 Znode 节点状态改变所触发的事件来启动和停止你在命令行指定的可执行程序, 在上面的代码你可以看到,Executor 类在其构造函数中实例化 Zookeeper 对象时,将其自身的引用作为 Watch 参数传递给 Zookeeper 的构造函数,同时它也将其自身的引用作为 DataMonitorListener 参数传递给 DataMonitor 的构造函数。Executor 本身实现了以下接口:

public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {
...

Watcher 接口是在ZooKeeper Java API 中定义的。 ZooKeeper 用它来与“容器”(此处“容器”与上面的 Executor 类相似)进行通信,Watcher 只支持一个方法,即process(), ZooKeeper 用该函数来处理主线程可能感兴趣的事件,例如 Zookeeper 连接或会话的状态,本例中的“容器” Executor只是简单地把事件向下传递给 DataMonitor,具体如何处理事件是由 DataMonitor 决定的。本文只是简单地描述了如何使用 Watcher,通常情况下,Executor 或 与 Executor 类似的对象拥有 与Zookeeper 服务端的连接,但它可以将事件传递给其他对象,并有其它的对象处理该事件。

public void process(WatchedEvent event) {
    dm.process(event);
  }

DataMonitorListener 接口本身不是Zookeeper API 的一部分,它完全是一个自定义的接口,可以说是专门为本程序设计的。DataMonitor 对象使用该接口和“容器”(即 Executor 类)进行通信,DataMonitorListener 接口如下:

public interface DataMonitorListener {
  /**
  * The existence status of the node has changed.
  */
  void exists(byte data[]);
  /**
  * The ZooKeeper session is no longer valid.
  *
  * @param rc
  * the ZooKeeper reason code
  */
  void closing(int rc);
}

该接口在 DataMonitor 中定义,Executor 类实现该接口,当 Executor.exists() 被调用的时候,Executor 决定是否启动或停止事先指定的应用程序(回忆一下前文所说的,当 Znode 消失时 Zookeeper 客户端会杀死该可执行程序)。

当 Executor.closing() 被调用的时候,Executor 会根据 Zookeeper 连接永久性地消失来决定是否关闭自己。

你或许已经猜到,DataMonitor 对象根据 Zookeeper 状态变化来调用这些方法吧?

以下是 Executor 类中实现 DataMonitorListener.exists() DataMonitorListener.closing()的代码:

public void exists( byte[] data ) {
  if (data == null) {
    if (child != null) {
      System.out.println("Killing process");
      child.destroy();
      try {
        child.waitFor();
      } catch (InterruptedException e) {
      }
    }
    child = null;
  } else {
    if (child != null) {
      System.out.println("Stopping child");
      child.destroy();
      try {
        child.waitFor();
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
    }
    try {
      FileOutputStream fos = new FileOutputStream(filename);
      fos.write(data);
      fos.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
    try {
      System.out.println("Starting child");
      child = Runtime.getRuntime().exec(exec);
      new StreamWriter(child.getInputStream(), System.out);
      new StreamWriter(child.getErrorStream(), System.err);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}
public void closing(int rc) {
  synchronized (this) {
    notifyAll();
  }
}

DataMonitor 类

DataMonitor 类是本程序 Zookeeper 逻辑的核心, 它差不多是异步的,并由事件驱动的。DataMonitor 构造函数如下:

public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
    DataMonitorListener listener) {
  this.zk = zk;
  this.znode = znode;
  this.chainedWatcher = chainedWatcher;
  this.listener = listener;
  // Get things started by checking if the node exists. We are going
  // to be completely event driven
  zk.exists(znode, true, this, null);
}

调用 ZooKeeper.exists() 检查指定的 Znode 是否存在,并设置监视,传递自身引用作为回调对象,在某种意义上,在 watch 触发时就会引起真实的处理流程。

ZooKeeper.exists() 操作在服务器端完成时,ZooKeeper API 会在客户端调用 completion callback

public void processResult(int rc, String path, Object ctx, Stat stat) {
  boolean exists;
  switch (rc) {
  case Code.Ok:
    exists = true;
    break;
  case Code.NoNode:
    exists = false;
    break;
  case Code.SessionExpired:
  case Code.NoAuth:
    dead = true;
    listener.closing(rc);
    return;
  default:
    // Retry errors
    zk.exists(znode, true, this, null);
    return;
  }
  byte b[] = null;
  if (exists) {
    try {
      b = zk.getData(znode, false, null);
    } catch (KeeperException e) {
      // We don't need to worry about recovering now. The watch
      // callbacks will kick off any exception handling
      e.printStackTrace();
    } catch (InterruptedException e) {
      return;
    }
  }
  if ((b == null && b != prevData)
      || (b != null && !Arrays.equals(prevData, b))) {
    listener.exists(b);
    prevData = b;
  }
}

上述代码首先检查 Znode 是否存在,以及其他重大的不可恢复的错误。如果文件(或者Znode)存在,它将从 Znode 获取数据,如果状态发生变化再调用 Executor 的 exists() 回调函数。注意,getData 函数本省必须要做任何的异常处理,因为本身就有监视可以处理任何错误:如果节点在调用 ZooKeeper.getData() 之前被删除,ZooKeeper.exists() 就会触发回调函数,如果存在通信错误,在连接上的监视会在该连接重建之前触发相应的事件,同时引发相应的处理。

最后,DataMonitor 处理监视事件的代码如下:

public void process(WatchedEvent event) {
    String path = event.getPath();
    if (event.getType() == Event.EventType.None) {
      // We are are being told that the state of the
      // connection has changed
      switch (event.getState()) {
      case SyncConnected:
        // In this particular example we don't need to do anything
        // here - watches are automatically re-registered with
        // server and any watches triggered while the client was
        // disconnected will be delivered (in order of course)
        break;
      case Expired:
        // It's all over
        dead = true;
        listener.closing(KeeperException.Code.SessionExpired);
        break;
      }
    } else {
      if (path != null && path.equals(znode)) {
        // Something has changed on the node, let's find out
        zk.exists(znode, true, this, null);
      }
    }
    if (chainedWatcher != null) {
      chainedWatcher.process(event);
    }
  }

如果客户端 Zookeeper 程序在会话失效时(Expired event)重新建立了通信信道(SyncConnected event) ,所有的会话监视会自动和服务器进行重连, (Zookeeper 3.0.0以上版本会重置之前设置的监视). 更多编程指南请参见 ZooKeeper Watches 。 当 DataMonitor 获得了指定 Znode 的事件后,它将调用 ZooKeeper.exists() 来决定究竟发生了什么。

完整的程序:

Executor.java:

/**
 * A simple example program to use DataMonitor to start and
 * stop executables based on a znode. The program watches the
 * specified znode and saves the data that corresponds to the
 * znode in the filesystem. It also starts the specified program
 * with the specified arguments when the znode exists and kills
 * the program if the znode goes away.
 */
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class Executor
  implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
  String znode;
  DataMonitor dm;
  ZooKeeper zk;
  String filename;
  String exec[];
  Process child;
  public Executor(String hostPort, String znode, String filename,
      String exec[]) throws KeeperException, IOException {
    this.filename = filename;
    this.exec = exec;
    zk = new ZooKeeper(hostPort, 3000, this);
    dm = new DataMonitor(zk, znode, null, this);
  }
  /**
   * @param args
   */
  public static void main(String[] args) {
    if (args.length < 4) {
      System.err
          .println("USAGE: Executor hostPort znode filename program [args ...]");
      System.exit(2);
    }
    String hostPort = args[0];
    String znode = args[1];
    String filename = args[2];
    String exec[] = new String[args.length - 3];
    System.arraycopy(args, 3, exec, 0, exec.length);
    try {
      new Executor(hostPort, znode, filename, exec).run();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  /***************************************************************************
   * We do process any events ourselves, we just need to forward them on.
   *
   * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
   */
  public void process(WatchedEvent event) {
    dm.process(event);
  }
  public void run() {
    try {
      synchronized (this) {
        while (!dm.dead) {
          wait();
        }
      }
    } catch (InterruptedException e) {
    }
  }
  public void closing(int rc) {
    synchronized (this) {
      notifyAll();
    }
  }
  static class StreamWriter extends Thread {
    OutputStream os;
    InputStream is;
    StreamWriter(InputStream is, OutputStream os) {
      this.is = is;
      this.os = os;
      start();
    }
    public void run() {
      byte b[] = new byte[80];
      int rc;
      try {
        while ((rc = is.read(b)) > 0) {
          os.write(b, 0, rc);
        }
      } catch (IOException e) {
      }
    }
  }
  public void exists(byte[] data) {
    if (data == null) {
      if (child != null) {
        System.out.println("Killing process");
        child.destroy();
        try {
          child.waitFor();
        } catch (InterruptedException e) {
        }
      }
      child = null;
    } else {
      if (child != null) {
        System.out.println("Stopping child");
        child.destroy();
        try {
          child.waitFor();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      try {
        FileOutputStream fos = new FileOutputStream(filename);
        fos.write(data);
        fos.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
      try {
        System.out.println("Starting child");
        child = Runtime.getRuntime().exec(exec);
        new StreamWriter(child.getInputStream(), System.out);
        new StreamWriter(child.getErrorStream(), System.err);
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}

DataMonitor.java:

/**
 * A simple class that monitors the data and existence of a ZooKeeper
 * node. It uses asynchronous ZooKeeper APIs.
 */
import java.util.Arrays;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
public class DataMonitor implements Watcher, StatCallback {
  ZooKeeper zk;
  String znode;
  Watcher chainedWatcher;
  boolean dead;
  DataMonitorListener listener;
  byte prevData[];
  public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
      DataMonitorListener listener) {
    this.zk = zk;
    this.znode = znode;
    this.chainedWatcher = chainedWatcher;
    this.listener = listener;
    // Get things started by checking if the node exists. We are going
    // to be completely event driven
    zk.exists(znode, true, this, null);
  }
  /**
   * Other classes use the DataMonitor by implementing this method
   */
  public interface DataMonitorListener {
    /**
     * The existence status of the node has changed.
     */
    void exists(byte data[]);
    /**
     * The ZooKeeper session is no longer valid.
     *
     * @param rc
     *        the ZooKeeper reason code
     */
    void closing(int rc);
  }
  public void process(WatchedEvent event) {
    String path = event.getPath();
    if (event.getType() == Event.EventType.None) {
      // We are are being told that the state of the
      // connection has changed
      switch (event.getState()) {
      case SyncConnected:
        // In this particular example we don't need to do anything
        // here - watches are automatically re-registered with
        // server and any watches triggered while the client was
        // disconnected will be delivered (in order of course)
        break;
      case Expired:
        // It's all over
        dead = true;
        listener.closing(KeeperException.Code.SessionExpired);
        break;
      }
    } else {
      if (path != null && path.equals(znode)) {
        // Something has changed on the node, let's find out
        zk.exists(znode, true, this, null);
      }
    }
    if (chainedWatcher != null) {
      chainedWatcher.process(event);
    }
  }
  public void processResult(int rc, String path, Object ctx, Stat stat) {
    boolean exists;
    switch (rc) {
    case Code.Ok:
      exists = true;
      break;
    case Code.NoNode:
      exists = false;
      break;
    case Code.SessionExpired:
    case Code.NoAuth:
      dead = true;
      listener.closing(rc);
      return;
    default:
      // Retry errors
      zk.exists(znode, true, this, null);
      return;
    }
    byte b[] = null;
    if (exists) {
      try {
        b = zk.getData(znode, false, null);
      } catch (KeeperException e) {
        // We don't need to worry about recovering now. The watch
        // callbacks will kick off any exception handling
        e.printStackTrace();
      } catch (InterruptedException e) {
        return;
      }
    }
    if ((b == null && b != prevData)
        || (b != null && !Arrays.equals(prevData, b))) {
      listener.exists(b);
      prevData = b;
    }
  }
}

总结

本文关于Apache Zookeeper使用方法实例详解的介绍就到这里,希望对大家有所帮助。如果有什么问题可以留言,小编会及时回复大家的,感谢大家对我们网站的支持!

(0)

相关推荐

  • 使用curator实现zookeeper锁服务的示例分享

    复制代码 代码如下: import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit; import com.netflix.curator.RetryPolicy;import com.netflix.curator.framework.

  • 理解zookeeper选举机制

    zookeeper集群 配置多个实例共同构成一个集群对外提供服务以达到水平扩展的目的,每个服务器上的数据是相同的,每一个服务器均可以对外提供读和写的服务,这点和redis是相同的,即对客户端来讲每个服务器都是平等的. 这篇主要分析leader的选择机制,zookeeper提供了三种方式: LeaderElection AuthFastLeaderElection FastLeaderElection 默认的算法是FastLeaderElection,所以这篇主要分析它的选举机制. 选择机制中的概

  • 源码阅读之storm操作zookeeper-cluster.clj

    storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中). backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState. clojure中的protocol可以看成java中的接口,封装了一组方法.ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterStat

  • Shell脚本实现自动安装zookeeper

    A:本脚本运行的机器,Linux RHEL6 B,C,D,...:待安装zookeeper cluster的机器, Linux RHEL6 首先在脚本运行的机器A上确定可以ssh无密码登录到待安装zk的机器B,C,D,...上,然后就可以在A上运行本脚本: 复制代码 代码如下: $ ./install_zookeeper 前提: B, C, D机器必须配置好repo,本脚本使用的是cdh5的repo, 下面的内容保存到:/etc/yum.repos.d/cloudera-cdh5.repo: 复

  • apache zookeeper使用方法实例详解

    本文涉及了Apache Zookeeper使用方法实例详解的相关知识,接下来我们就看看具体内容. 简介 Apache Zookeeper 是由 Apache Hadoop 的 Zookeeper 子项目发展而来,现在已经成为了 Apache 的顶级项目.Zookeeper 为分布式系统提供了高效可靠且易于使用的协同服务,它可以为分布式应用提供相当多的服务,诸如统一命名服务,配置管理,状态同步和组服务等. Zookeeper 接口简单,开发人员不必过多地纠结在分布式系统编程难于处理的同步和一致性问

  • 如何在Linux操作系统下安装Apache服务的方法实例详解

    链接下载: 操作环境 VMware虚拟机中CentOS 7.6 SecureCRT Xftp(Xmanager) 需求分析 使用Apache服务实现访问http 操作步骤 1.挂载光盘 [root@localhost ~]# mount /dev/cdrom /mnt 查看是否挂载 [root@localhost ~]# df -Th 2.从源码包编译安装程序 (编译安装) [root@localhost Packages]# yum -y install gcc gcc-c++ make 3.

  • Android 实现夜间模式的快速简单方法实例详解

    ChangeMode 项目地址:ChangeMode Implementation of night mode for Android. 用最简单的方式实现夜间模式,支持ListView.RecyclerView. Preview Usage xml android:background="?attr/zzbackground" app:backgroundAttr="zzbackground"//如果当前页面要立即刷新,这里传入属性名称 比如 R.attr.zzb

  • C#调用Java方法实例详解

    C#可以直接引用C++的DLL和转换JAVA写好的程序.最近由于工作原因接触这方面比较多,根据实际需求,我们通过一个具体例子把一个JAVA方法转换成可以由C#直接调用的DLL C#调用c++ C#调用C++的例子网上很多,以一个C++的具体方法为例. C++代码 // 获取一帧图像数据 MVSMARTCAMCTRL_API int __stdcall MV_SC_GetOneFrame(IN void* handle, IN OUT unsigned char *pData , IN unsig

  • Springmvc ajax跨域请求处理方法实例详解

    上次给一个网站写网站  前后端分离 最后跪在ajax跨域上面了  自己在网上找了个方法  亲试可用  记录一下 写一个类  继承HandlerInterceptorAdapter package com.util; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.springframework.web.servlet.handler.Ha

  • Linux traceroute命令使用方法实例详解

    Linux traceroute命令使用方法实例详解 一.默认使用的是UDP协议(30000以上端口) 二.使用TCP协议 -T -p 三.使用ICMP协议 -I 四.实战 [root@localhost hping-master]# whereis traceroute traceroute: /usr/bin/traceroute /usr/share/man/man8/traceroute.8.gz [root@localhost hping-master]# [root@localhos

  • Oracle表中重复数据去重的方法实例详解

    Oracle表中重复数据去重的方法实例详解 我们在项目中肯定会遇到一种情况,就是表中没有主键 有重复数据 或者有主键 但是部分字段有重复数据 而我们需要过滤掉重复数据 下面是一种解决方法 delete from mytest ms where rowid in (select aa.rid from (select rowid as rid, row_number() over(partition by s.name order by s.id) as nu from mytest s) aa

  • Vue.js框架路由使用方法实例详解

    Vue.js框架路由使用方法实例详解 html代码: <!DOCTYPE html> <html> <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name='viewport' content='width=device-width,initial-

  • C#动态创建button按钮的方法实例详解

    C#动态创建button按钮的方法实例详解 C#编程中经常需要动态创建,本文主要介绍C#动态创建button按钮的方法,涉及C#按钮属性动态设置的相关技巧,以供借鉴参考.具体实现方法如下: 例子: using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.T

  • IOS自带Email的两种方法实例详解

    IOS自带Email的两种方法实例详解 IOS系统框架提供的两种发送Email的方法:openURL 和 MFMailComposeViewController.借助这两个方法,我们可以轻松的在应用里加入如用户反馈这类需要发送邮件的功能. 1.openURL 使用openURL调用系统邮箱客户端是我们在IOS3.0以下实现发邮件功能的主要手段.我们可以通过设置url里的相关参数来指定邮件的内容,不过其缺点很明显,这样的过程会导致程序暂时退出.下面是使用openURL来发邮件的一个小例子: #pr

随机推荐