浅谈Zookeeper开源客户端框架Curator

zookeepercurator

Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. Netflix在用ZooKeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究一下, 首先从他在github上的源码, wiki文档以及Netflix的技术blog入手.

看完官方的文档之后, 发现Curator主要解决了三类问题:

1.封装ZooKeeper client与ZooKeeper server之间的连接处理;
2.提供了一套Fluent风格的操作API;
3.提供ZooKeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装.

Curator列举的ZooKeeper使用过程中的几个问题

1.初始化连接的问题: 在client与server之间握手建立连接的过程中, 如果握手失败, 执行所有的同步方法(比如create, getData等)将抛出异常
2.自动恢复(failover)的问题: 当client与一台server的连接丢失,并试图去连接另外一台server时, client将回到初始连接模式
session过期的问题: 在极端情况下, 出现ZooKeeper session过期, 客户端需要自己去监听该状态并重新创建ZooKeeper实例 .
3.对可恢复异常的处理:当在server端创建一个有序ZNode, 而在将节点名返回给客户端时崩溃, 此时client端抛出可恢复的异常, 用户需要自己捕获这些异常并进行重试
4.使用场景的问题:Zookeeper提供了一些标准的使用场景支持, 但是ZooKeeper对这些功能的使用说明文档很少, 而且很容易用错. 在一些极端场景下如何处理, zk并没有给出详细的文档说明. 比如共享锁服务, 当服务器端创建临时顺序节点成功, 但是在客户端接收到节点名之前挂掉了, 如果不能很好的处理这种情况, 将导致死锁.

Curator主要从以下几个方面降低了zk使用的复杂性:

1.重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略, 并且内部也提供了几种标准的重试策略(比如指数补偿).
2.连接状态监控: Curator初始化之后会一直的对zk连接进行监听, 一旦发现连接状态发生变化, 将作出相应的处理.
3.zk客户端实例管理:Curator对zk客户端到server集群连接进行管理. 并在需要的情况, 重建zk实例, 保证与zk集群的可靠连接
各种使用场景支持:Curator实现zk支持的大部分使用场景支持(甚至包括zk自身不支持的场景), 这些实现都遵循了zk的较佳实践, 并考虑了各种极端情况.

Curator通过以上的处理, 让用户专注于自身的业务本身, 而无需花费更多的精力在zk本身.

Curator声称的一些亮点:

日志工具

内部采用SLF4J 来输出日志
采用驱动器(driver)机制, 允许扩展和定制日志和跟踪处理
提供了一个TracerDriver接口, 通过实现addTrace()和addCount()接口来集成用户自己的跟踪框架

和Curator相比, 另一个ZooKeeper客户端——zkClient

文档几乎没有
异常处理弱爆了(简单的抛出RuntimeException)
重试处理太难用了
没有提供各种使用场景的实现
对ZooKeeper自带客户端(ZooKeeper类)的"抱怨":
只是一个底层实现
要用需要自己写大量的代码
很容易误用
需要自己处理连接丢失, 重试等

Curator几个组成部分

1.Client: 是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法.
2.Framework: 用来简化ZooKeeper高级功能的使用, 并增加了一些新的功能, 比如管理到ZooKeeper集群的连接, 重试处理
3.Recipes: 实现了通用ZooKeeper的recipe, 该组件建立在Framework的基础之上
4.Utilities:各种ZooKeeper的工具类
5.Errors: 异常处理, 连接, 恢复等.
6.Extensions: recipe扩展

Client

这是一个底层的API, 应用方基本对这个可以无视, 较好直接从Curator Framework入手,主要包括三部分:

不间断连接管理
连接重试处理
Retry Loop(循环重试)

一种典型的用法:

RetryLoop retryLoop = client.newRetryLoop();
while ( retryLoop.shouldContinue() )
{
  try
  {
    // perform your work
    ...
    // it's important to re-get the ZK instance as there may have been an error and the instance was re-created
    ZooKeeper   zk = client.getZookeeper();
    retryLoop.markComplete();
  }
  catch ( Exception e )
  {
    retryLoop.takeException(e);
  }
}

如果在操作过程中失败, 且这种失败是可重试的, 而且在允许的次数内, Curator将保证操作的最终完成.
另一种使用Callable接口的重试做法:

RetryLoop.callWithRetry(client, new Callable()
{
   @Override
   public Void call() throws Exception
   {
     // do your work here - it will get retried if needed
     return null;
   }
});

重试策略

RetryPolicy接口只有一个方法(以前版本有两个方法):

public boolean allowRetry(int retryCount, long elapsedTimeMs); 

在开始重试之前, allowRetry方法被调用, 其参数将指定当前重试次数, 和操作已消耗时间. 如果允许, 将继续重试, 否则抛出异常.

Curator内部实现的几种重试策略:

1.ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加.
2.RetryNTimes:指定较大重试次数的重试策略
3.RetryOneTime:仅重试一次
4.RetryUntilElapsed:一直重试直到达到规定的时间

Framework

是ZooKeeper Client更高的抽象API

自动连接管理: 当ZooKeeper客户端内部出现异常, 将自动进行重连或重试, 该过程对外几乎完全透明

更清晰的API: 简化了ZooKeeper原生的方法, 事件等, 提供流程的接口

CuratorFrameworkFactory类提供了两个方法, 一个工厂方法newClient, 一个构建方法build. 使用工厂方法newClient可以创建一个默认的实例, 而build构建方法可以对实例进行定制. 当CuratorFramework实例构建完成, 紧接着调用start()方法, 在应用结束的时候, 需要调用close()方法.  CuratorFramework是线程安全的. 在一个应用中可以共享同一个zk集群的CuratorFramework.

CuratorFramework API采用了连贯风格的接口(Fluent Interface). 所有的操作一律返回构建器, 当所有元素加在一起之后, 整个方法看起来就像一个完整的句子. 比如下面的操作:

client.create().forPath("/head", new byte[0]);
client.delete().inBackground().forPath("/head");
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
client.getData().watched().inBackground().forPath("/test");

方法说明:

1.create(): 发起一个create操作. 可以组合其他方法 (比如mode 或background) 最后以forPath()方法结尾
2.delete(): 发起一个删除操作. 可以组合其他方法(version 或background) 最后以forPath()方法结尾
3.checkExists(): 发起一个检查ZNode 是否存在的操作. 可以组合其他方法(watch 或background) 最后以forPath()方法结尾
4.getData(): 发起一个获取ZNode数据的操作. 可以组合其他方法(watch, background 或get stat) 最后以forPath()方法结尾
5.setData(): 发起一个设置ZNode数据的操作. 可以组合其他方法(version 或background) 最后以forPath()方法结尾
6.getChildren(): 发起一个获取ZNode子节点的操作. 可以组合其他方法(watch, background 或get stat) 最后以forPath()方法结尾
7.inTransaction(): 发起一个ZooKeeper事务. 可以组合create, setData, check, 和/或delete 为一个操作, 然后commit() 提交
.

通知(Notification)

Curator的相关代码已经更新了, 里面的接口已经由ClientListener改成CuratorListener了, 而且接口中去掉了clientCloseDueToError方法. 只有一个方法:
eventReceived()            当一个后台操作完成或者指定的watch被触发时该方法被调用

UnhandledErrorListener接口用来对异常进行处理.

CuratorEvent(在以前版本为ClientEvent)是对各种操作触发相关事件对象(POJO)的一个完整封装, 而事件对象的内容跟事件类型相关, 下面是对应关系:

CREATE getResultCode() and getPath()
DELETE getResultCode() and getPath()
EXISTS getResultCode(), getPath() and getStat()
GET_DATA getResultCode(), getPath(), getStat() and getData()
SET_DATA getResultCode(), getPath() and getStat()
CHILDREN getResultCode(), getPath(), getStat(), getChildren()
WATCHED getWatchedEvent()

名称空间(Namespace)

因为一个zk集群会被多个应用共享, 为了避免各个应用的zk patch冲突, Curator Framework内部会给每一个Curator Framework实例分配一个namespace(可选). 这样你在create ZNode的时候都会自动加上这个namespace作为这个node path的root. 使用代码如下:

CuratorFramework  client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
…
client.create().forPath("/test", data);
// node was actually written to: "/MyApp/test"

Recipe

Curator实现ZooKeeper的所有recipe(除了两段提交)

选举

集群领导选举(leader election)

锁服务

共享锁: 全局同步分布式锁, 同一时间两台机器只有一台能获得同一把锁.
共享读写锁: 用于分布式的读写互斥处理, 同时生成两个锁:一个读锁, 一个写锁, 读锁能被多个应用持有, 而写锁只能一个独占, 当写锁未被持有时, 多个读锁持有者可以同时进行读操作
共享信号量: 在分布式系统中的各个JVM使用同一个zk lock path, 该path将跟一个给定数量的租约(lease)相关联, 然后各个应用根据请求顺序获得对应的lease, 相对来说, 这是最公平的锁服务使用方式.
多共享锁:内部构件多个共享锁(会跟一个znode path关联), 在acquire()过程中, 执行所有共享锁的acquire()方法, 如果中间出现一个失败, 则将释放所有已require的共享锁; 执行release()方法时, 则执行内部多个共享锁的release方法(如果出现失败将忽略)

队列(Queue)

分布式队列:采用持久顺序zk node来实现FIFO队列, 如果有多个消费者, 可以使用LeaderSelector来保证队列的消费者顺序
分布式优先队列: 优先队列的分布式版本

BlockingQueueConsumer: JDK阻塞队列的分布式版本

关卡(Barrier)

分布式关卡:一堆客户端去处理一堆任务, 只有所有的客户端都执行完, 所有客户端才能继续往下处理

双分布式关卡:同时开始, 同时结束

计数器(Counter)

共享计数器:所有客户端监听同一个znode path, 并共享一个的integer计数值
分布式AtomicLong(AtomicInteger): AtomicXxx的分布式版本, 先采用乐观锁更新, 若失败再采用互斥锁更新, 可以配置重试策略来处理重试

工具类

Path Cache

Path Cache用于监听ZNode的子节点的变化, 当add, update, remove子节点时将改变Path Cache state, 同时返回所有子节点的data和state.

Curator中采用了PathChildrenCache类来处理Path Cache, 状态的变化则采用PathChildrenCacheListener来监听.

相关用法参见TestPathChildrenCache测试类

注意: 当zk server的数据发生变化, zk client会出现不一致, 这个需要通过版本号来识别这种状态的变化

Test Server

用来在测试中模拟一个本地进程内ZooKeeper Server.

Test Cluster

用来在测试中模拟一个ZooKeeper Server集群

ZKPaths工具类

提供了和ZNode相关的path处理工具方法:

1.getNodeFromPath: 根据给定path获取node name. i.e. "/one/two/three" -> "three"

2.mkdirs: 根据给定路径递归创建所有node

3.getSortedChildren: 根据给定路径, 返回一个按序列号排序的子节点列表

4.makePath: 根据给定的path和子节点名, 创建一个完整path

EnsurePath工具类

直接看例子, 具体的说就是调用多次, 只会执行一次创建节点操作.

EnsurePath    ensurePath = new EnsurePath(aFullPathToEnsure);
...
String      nodePath = aFullPathToEnsure + "/foo";
ensurePath.ensure(zk);  // first time syncs and creates if needed
zk.create(nodePath, ...);
...
ensurePath.ensure(zk);  // subsequent times are NOPs
zk.create(nodePath, ...);

Notification事件处理

Curator对ZooKeeper的事件Watcher进行了封装处理, 然后实现了一套监听机制. 提供了几个监听接口用来处理ZooKeeper连接状态的变化

当连接出现异常, 将通过ConnectionStateListener接口进行监听, 并进行相应的处理, 这些状态变化包括:

1.暂停(SUSPENDED): 当连接丢失, 将暂停所有操作, 直到连接重新建立, 如果在规定时间内无法建立连接, 将触发LOST通知
2.重连(RECONNECTED): 连接丢失, 执行重连时, 将触发该通知
3.丢失(LOST): 连接超时时, 将触发该通知

从com.netflix.curator.framework.imps.CuratorFrameworkImpl.validateConnection(CuratorEvent)方法中我们可以知道, Curator分别将ZooKeeper的Disconnected, Expired, SyncConnected三种状态转换成上面三种状态.

总结

以上就是本文关于浅谈Zookeeper开源客户端框架Curator的全部内容,感兴趣的朋友可以参阅:zookeeper watch机制的理解、为zookeeper配置相应的acl权限、apache zookeeper使用方法实例详解等,希望对大家有所帮助。如有不足之处,欢迎留言指正。感谢朋友们对本站的支持!

(0)

相关推荐

  • 理解zookeeper选举机制

    zookeeper集群 配置多个实例共同构成一个集群对外提供服务以达到水平扩展的目的,每个服务器上的数据是相同的,每一个服务器均可以对外提供读和写的服务,这点和redis是相同的,即对客户端来讲每个服务器都是平等的. 这篇主要分析leader的选择机制,zookeeper提供了三种方式: LeaderElection AuthFastLeaderElection FastLeaderElection 默认的算法是FastLeaderElection,所以这篇主要分析它的选举机制. 选择机制中的概

  • apache zookeeper使用方法实例详解

    本文涉及了Apache Zookeeper使用方法实例详解的相关知识,接下来我们就看看具体内容. 简介 Apache Zookeeper 是由 Apache Hadoop 的 Zookeeper 子项目发展而来,现在已经成为了 Apache 的顶级项目.Zookeeper 为分布式系统提供了高效可靠且易于使用的协同服务,它可以为分布式应用提供相当多的服务,诸如统一命名服务,配置管理,状态同步和组服务等. Zookeeper 接口简单,开发人员不必过多地纠结在分布式系统编程难于处理的同步和一致性问

  • Shell脚本实现自动安装zookeeper

    A:本脚本运行的机器,Linux RHEL6 B,C,D,...:待安装zookeeper cluster的机器, Linux RHEL6 首先在脚本运行的机器A上确定可以ssh无密码登录到待安装zk的机器B,C,D,...上,然后就可以在A上运行本脚本: 复制代码 代码如下: $ ./install_zookeeper 前提: B, C, D机器必须配置好repo,本脚本使用的是cdh5的repo, 下面的内容保存到:/etc/yum.repos.d/cloudera-cdh5.repo: 复

  • zookeeper watch机制的理解

    首先我们看看为什么添加Watch. ZooKeeper是用来协调(同步)分布式进程的服务,提供了一个简单高性能的协调内核,用户可以在此之上构建更多复杂的分布式协调功能. 多个分布式进程通过ZooKeeper提供的 API来操作共享的ZooKeeper内存数据对象ZNode来达成某种一致的行为或结果,这种模式本质上是基于状态共享的并发模型,与Java的多线程并发模型一致,他们的线程或进程都是"共享式内存通信".Java没有直接提供某种响应式通知接口来监控某个对象状态的变化,只能要么浪费C

  • 浅谈Zookeeper开源客户端框架Curator

    zookeepercurator Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. Netflix在用ZooKeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究一下, 首先从他在github上的源码, wiki文档以及Netflix的技术blog入手. 看完官方的文档之后,

  • 浅谈Node.js ORM框架Sequlize之表间关系

    Sequelize模型之间存在关联关系,这些关系代表了数据库中对应表之间的主/外键关系.基于模型关系可以实现关联表之间的连接查询.更新.删除等操作.本文将通过一个示例,介绍模型的定义,创建模型关联关系,模型与关联关系同步数据库,及关系模型的增.删.改.查操作. 数据库中的表之间存在一定的关联关系,表之间的关系基于主/外键进行关联.创建约束等.关系表中的数据分为1对1(1:1).1对多(1:M).多对多(N:M)三种关联关系. 在Sequelize中建立关联关系,通过调用模型(源模型)的belon

  • 浅谈PHP之ThinkPHP框架使用详解

    Thinkphp框架其精髓就在于实现了MVC思想,其中M为模板.V为视图.C为控制器,模板一般是公共使用类,在涉及数据库时,一般会跟数据表同名,视图会和控制器类里的方法进行名字的一一对应. 下载及配置 官网(http://www.thinkphp.cn/)下载ThinkPHP5.0,将解压文件放在网站目录下的ATP5子目录下 默认主页:http://localhost:8099/ATP5/public/index.php 如果要隐藏index.php且服务器为Apache则需要将public\.

  • 浅谈五大Python Web框架

    说到Web Framework,Ruby的世界Rails一统江湖,而Python则是一个百花齐放的世界,各种micro-framework.framework不可胜数,不完全列表见: http://wiki.python.org/moin/WebFrameworks 虽然另一大脚本语言PHP也有不少框架,但远没有Python这么夸张,也正是因为Python Web Framework(Python Web开发框架,以下简称Python框架)太多,所以在Python社区总有关于Python框架孰优

  • 浅谈Scrapy网络爬虫框架的工作原理和数据采集

    今天小编给大家详细的讲解一下Scrapy爬虫框架,希望对大家的学习有帮助. 1.Scrapy爬虫框架 Scrapy是一个使用Python编程语言编写的爬虫框架,任何人都可以根据自己的需求进行修改,并且使用起来非常的方便.它可以应用在数据采集.数据挖掘.网络异常用户检测.存储数据等方面. Scrapy使用了Twisted异步网络库来处理网络通讯.整体架构大致如下图所示. 2.由上图可知Scrapy爬虫框架主要由5个部分组成,分别是:Scrapy Engine(Scrapy引擎),Scheduler

  • 浅谈JavaScript的Polymer框架中的事件绑定

    既然是一套完整的前端框架那就一定有提供事件绑定相关的支持.其实在之前的例子中就使用过事件绑定,只是没有单独系统地介绍过而已. Polymer 的事件思想是对事件处理函数尽可能地都命名并定义到 VM 上,我觉得这个做法是在有意地把 VM 这一层隔离出来. 下面这个例子给按钮和其所在的 Shadow DOM Host 都绑定了个事件,点击按钮后两个事件都会触发. 运行 <script> var Polymer = { dom: 'shadow' }; </script> <bas

  • 浅谈Spring Boot日志框架实践

    Java应用中,日志一般分为以下5个级别: ERROR 错误信息 WARN 警告信息 INFO 一般信息 DEBUG 调试信息 TRACE 跟踪信息 Spring Boot使用Apache的Commons Logging作为内部的日志框架,其仅仅是一个日志接口,在实际应用中需要为该接口来指定相应的日志实现. SpringBt默认的日志实现是Java Util Logging,是JDK自带的日志包,此外SpringBt当然也支持Log4J.Logback这类很流行的日志实现. 统一将上面这些 日志

  • 浅谈PostgreSQL的客户端认证pg_hba.conf

    大家都知道防火墙主要是用来过滤客户端并保护服务器不被恶意访问攻击,那在pg中同样存在一个类似于防火墙的工具用来控制客户端的访问,也就是pg_hba.conf这个东东. 在initdb初始化数据文件时,默认提供pg_hba.conf. 通过配置该文件,能够指定哪些ip可以访问,哪些ip不可以访问,以及访问的资源和认证方式,该文件类似于oracle中的监听中的白名单黑名单功能,且同样可以reload在线生效. 记录可以是下面七种格式之一: local database user auth-metho

  • 浅谈Python的Django框架中的缓存控制

    关于缓存剩下的问题是数据的隐私性以及在级联缓存中数据应该在何处储存的问题. 通常用户将会面对两种缓存: 他或她自己的浏览器缓存(私有缓存)以及他或她的提供者缓存(公共缓存). 公共缓存由多个用户使用,而受其他某人的控制. 这就产生了你不想遇到的敏感数据的问题,比如说你的银行账号被存储在公众缓存中. 因此,Web 应用程序需要以某种方式告诉缓存那些数据是私有的,哪些是公共的. 解决方案是标示出某个页面缓存应当是私有的. 要在 Django 中完成此项工作,可使用 cache_control 视图修

  • 浅谈JavaScript的Polymer框架中的behaviors对象

    localStorage 应是家喻户晓的?但本地存储这个家族可远不止它.以前扯过 sessionStorage,现在还有个神奇的 CacheStorage.它用来存储 Response 对象的.也就是说用来对 HTTP ,响应做缓存的.虽然 localStorage 也能做,但是它可能更专业. CacheStorage 在浏览器上的引用名叫 caches 而不是驼峰写法的 cacheStorage,它定义在 ServiceWorker 的规范中.CacheStorage 是多个 Cache 的集

随机推荐