详解Zookeeper基础知识

目录
  • 1. 简介
  • 2. 数据模型
    • 2.1 模型结构
    • 2.2 模型的特点
    • 2.3 节点分类
      • 2.3.1 Persistent
      • 2.3.2 Persistent Sequential
      • 2.3.3 Ephemeral
      • 2.3.4 Ephemeral Sequential
  • 3. 安装
    • 3.1 官方
    • 3.2 docker
    • 3.3 docker-compose
    • 3.4 配置信息
  • 4. 基础命令
    • 4.1 创建会话
    • 4.2 ls
    • 4.3 create
    • 4.4 get
    • 4.5 stat
    • 4.6 set
    • 4.7 delete
    • 4.8 quit
  • 5. 节点的监听机制
  • 6. quick start
    • 6.1 项目结构
    • 6.2 引入依赖
    • 6.3 application.
    • 6.4 CuratorClientProperties
    • 6.5 CuratorClientConfig
    • 6.6 CuratorClient
    • 6.7 AppController
  • 7. ZAB 协议介绍

1. 简介

zookeeper是一个开源的分布式协调服务, 提供分布式数据一致性解决方案,分布式应用程序可以实现数据统一配置管理、统一命名服务、分布式锁、集群管理等功能.

ZooKeeper主要服务于分布式系统,使用分布式系统就无法避免对节点管理的问题(需要实时感知节点的状态、对节点进行统一管理等等),而由于这些问题处理起来可能相对麻烦和提高了系统的复杂性,ZooKeeper作为一个能够通用解决这些问题的中间件就应运而生了。

2. 数据模型

2.1 模型结构

ZooKeeper 提供的命名空间很像标准文件系统的命名空间。名称是由斜杠 (/) 分隔的一系列路径元素。ZooKeeper 命名空间中的每个节点都由路径标识。

与典型的为存储而设计的文件系统不同,ZooKeeper 数据保存在内存中,这意味着 ZooKeeper 可以实现高吞吐量和低延迟。

2.2 模型的特点

  • 每个子目录如/app1都被称作一个znode(节点)。这个znode是被它所在的路径唯一标识。
  • znode可以有子节点目录,并且每个znode可以存储数据。
  • znode是有版本的,每个znode中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据。每次 znode 的数据更改时,版本号都会增加。例如,每当客户端检索数据时,它也会收到数据的版本。
  • 每个znode都有一个访问控制列表 (ACL),它限制了谁可以做什么。
  • znode可以被监控,包括这个目录中存储的数据的修改,子节点目录变化等,一旦变化可以通知设置监控的客户端。

⚠️注意:当使用zkCli.sh 创建会话时,节点的监听事件只能被触发一次。

2.3 节点分类

2.3.1 Persistent

持久节点:节点被创建后,就一直存在,除非客户端主动删除这个节点。

2.3.2 Persistent Sequential

持久顺序节点:有序的持久节点。在zk中,每个父节点会为它的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。例如:

$ create -s /app1 # 创建 /app1 节点
Created /app10000000000 # 创建成功后的节点名称为 /app10000000000 

2.3.3 Ephemeral

临时节点:和持久节点不同的是,临时节点的生命周期和客户端会话绑定且临时节点下面不能创建子节点。如果客户端会话失效,那么这个节点就会自动被清除掉。

注意:客户端失效临时节点会被清除,但如果是断开链接,临时节点并不会立马被清除。

“立马”:在会话超时持续时间内未从客户端收到任何心跳信号之后,zk服务器将删除该会话的临时节点。但如果正常关闭会话,临时节点会立马被清除。

# 1. 创建会话
$ ./zkCli.sh
# 2. 创建临时节点 /app3
$ create -e /app3
Created /app3
# 3. ctrl + c 关闭会话
# 4. 紧接着再次创建会话
$ ./zkCli.sh
# 5. 查看节点内容 发现临时节点依旧存在
$ ls /
[app10000000000, app3, zookeeper]
# 6. 等待几秒,再次查看 发现临时节点消失
$ ls /
[app10000000000, zookeeper]

# 7. 再次创建临时节点 /app3
$ create -e /app3
Created /app3
# 8. 正常关闭会话
$ quit
# 9. 再次创建会话 临时节点消失
$ ./zkCli.sh
$ ls /
[app10000000000, zookeeper]

2.3.4 Ephemeral Sequential

临时顺序节点:有序的临时节点。创建es节点时,zk会维护一份时序,会记录每个节点的顺序。例如:

$ create -s -e /app2 # 创建临时有序节点 /app2
Created /app20000000001 # 创建成功后的节点名称为 app20000000001

3. 安装

3.1 官方

官方地址:https://zookeeper.apache.org/releases.html

3.2 docker

$ docker run --name zookeeper --restart always -d zookeeper

3.3 docker-compose

⚠️注意:执行脚本前,需先将配置文件挂载到宿主机上。

version: '3.1'
services:
  zk:
    image: zookeeper
    restart: always
    container_name: zookeeper
    ports:
      - 2181:2181
    volumes:
      - ./data:/data
      - ./logs:/datalog
      - ./conf/zoo.cfg:/conf/zoo.cfg

3.4 配置信息

# 数据存储目录
dataDir=/data
# 日志存储目录
dataLogDir=/datalog
# 集群模式下 节点之间的心跳时间2s(每2s进行一次心跳检测)
tickTime=2000
# 集群初始化时 节点之间同步次数。(5 * 2s = 10s 10s内未初始化成功则初始化失败)
initLimit=5
# 集群模式下 同步数据次数。 (2 * 2s = 4s 4s内未同步则超时失败)
syncLimit=2
# 数据快照保留个数
autopurge.snapRetainCount=3
# 数据快照清除时间间隔(单位为小时) 0:不清除 ,1:1小时
autopurge.purgeInterval=0
# 最大的客户端链接数
maxClientCnxns=60

4. 基础命令

不同版本之前的命令会有所差异,本章是基于zk:3.7版本。官方地址

4.1 创建会话

首先执行命令,创建新的会话,进入终端。

# 进入到zk的安装目录的bin目录下执行
$ ./zkCli.sh
# 如果zk不是在本机 则可以使用server参数指定链接地址
$ ./zkCli.sh -server 127.0.0.1:2181

4.2 ls

ls [-s] [-w] [-R] path:ls 命令用于查看某个路径下目录列表。

  • -s:查看此znode状态信息。
  • -w:监听此znode目录变化。
  • -R:递归查看
$ ls / # 查看根目录
[app, app10000000000, zookeeper]
$ ls /app
[]
$ ls -s /app
[app01, app02]
cZxid = 0x1b
ctime = Sun Aug 29 13:07:24 UTC 2021
mZxid = 0x1b
mtime = Sun Aug 29 13:07:24 UTC 2021
pZxid = 0x2e
cversion = 2
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 2

监听znode目录变化:

4.3 create

create [-s] [-e] [-c] [-t ttl] path [data] [acl]:create 用于创建znode。

  • -s:创建顺序节点。
  • -e:创建临时节点。
  • -c:创建一个容器节点,当容器中的最后一个子节点被删除时,容器也随之消失。
  • -t:设置节点存活时间(毫秒)。

⚠️注意:如果要设置超时时间需在配置文件中激活配置:zookeeper.extendedTypesEnabled=true

  • data:设置的数据。
  • acl:访问控制配置。
$ ls /
[app, app10000000000, zookeeper]
# 创建持久节点 /app3
$ create /app3 zhangtieniu
Created /app3
# 创建有序的持久节点 /app4
$ create -s /app4 zhangsan
Created /app40000000011
# 创建临时节点 /app5
$ create -e /app5 linshi
Created /app5
# 创建临时有序节点 /app6
$ create -s -e /app6 linshiyouxu
Created /app60000000013
$ ls /
[app, app10000000000, app3, app40000000011, app5, app60000000013, zookeeper]

4.4 get

get [-s] [-w] path: 命令用于获取节点数据和状态信息。

  • -s:查看此znode状态信息。
  • -w:监听此znode值变化。
$ get /app3 # 查看/app3 znode内容
zhangtieniu

监听znod值变化:

4.5 stat

stat [-w] path:命令用于查看节点状态信息。

-w:监听节点状态变化。

$ stat /app3
cZxid = 0x34 # 创建节点时的事务id
ctime = Sun Aug 29 14:31:52 UTC 2021 # 创建时间
mZxid = 0x34 # 创建的版本号
mtime = Sun Aug 29 14:31:52 UTC 2021 # 修改时间
pZxid = 0x34 # 子节点列表最后一次被修改的事务id
cversion = 0 # 节点版本号
dataVersion = 0 # 数据版本号
aclVersion = 0 # 权限版本号
ephemeralOwner = 0x0 # 临时拥有者
dataLength = 11 # 节点值数据长度
numChildren = 0 # 子节点数量

4.6 set

set [-s] [-v version] path data: 命令用于修改节点存储的数据。

-s:查看设置成功后的状态信息。

-v:指定设置值的版本号,该值只能为该节点的最新版本。可以使用其实现乐观锁。

$ set /app wangmazi
$ set -s /app lisi
cZxid = 0x1b
ctime = Sun Aug 29 13:07:24 UTC 2021
mZxid = 0x43
mtime = Sun Aug 29 14:50:52 UTC 2021
pZxid = 0x3f
cversion = 4
dataVersion = 6
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 4

4.7 delete

delete [-v version] path:删除指定节点。

⚠️注意:当删除的节点有子节点时,不能使用该命令。需使用deleteall命令删除。

-v:删除指定版本。与set同理

$ delete /app3

4.8 quit

关闭会话。

5. 节点的监听机制

zk的监听机制,可以使客户端可以监听znode节点的变化,znode节点的变化出发相应的事件,然后清除对该节点的检测。

⚠️注意:在这里再强调一次,当使用zkCli.sh 创建会话时,对znode的监听只能触发一次。但在使用java客户端链接时,可以一直触发。

$ ls -w /path # 监听节点目录的变化
$ get -w /path # 监听节点数据的变化

6. quick start

6.1 项目结构

.
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── ldx
        │           └── zookeeper
        │               ├── ZookeeperApplication.java # 启动类
        │               ├── config
        │               │   ├── CuratorClientConfig.java # zk配置类
        │               │   └── CuratorClientProperties.java # zk 配置属性文件
        │               ├── controller
        │               │   └── AppController.java # zk 测试文件
        │               └── util
        │                   └── CuratorClient.java # zk工具类
        └── resources
            └── application.yaml # 服务配置文件

6.2 引入依赖

Curator 是 Netflix 公司开源的一套 zookeeper 客户端框架,解决了很多 Zookeeper 客户端非常底层的细节开发工作,包括连接重连、反复注册 Watcher 和 NodeExistsException 异常等。

curator-recipes:封装了一些高级特性,如:Cache 事件监听、选举、分布式锁、分布式计数器、分布式 Barrier 等。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ldx</groupId>
    <artifactId>zookeeper</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>zookeeper</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
       <!-- client 操作工具包 -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

6.3 application.

server:
  port: 8080
# curator配置
curator-client:
  # 连接字符串
  connection-string: localhost:2181
  # 根节点
  namespace: ldx
  # 节点数据编码
  charset: utf8
  # session超时时间
  session-timeout-ms: 60000
  # 连接超时时间
  connection-timeout-ms: 15000
  # 关闭连接超时时间
  max-close-wait-ms: 1000
  # 默认数据
  default-data: ""
  # 当半数以上zookeeper服务出现故障仍然提供读服务
  can-be-read-only: false
  # 自动创建父节点
  use-container-parents-if-available: true
  # 重试策略,默认使用BoundedExponentialBackoffRetry
  retry:
    max-sleep-time-ms: 10000
    base-sleep-time-ms: 1000
    max-retries: 3
  # 认证信息
  #auth:
    #scheme: digest
    # auth: username:password

6.4 CuratorClientProperties

package com.ldx.zookeeper.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * zk 属性配置类
 *
 * @author ludangxin
 * @date 2021/8/31
 */
@Data
@ConfigurationProperties(prefix = "curator-client")
public class CuratorClientProperties {
    /**
     * 连接地址
     */
    private String connectionString;
    /**
     * 命名空间
     */
    private String namespace;
    /**
     * 字符集
     */
    private String charset = "utf8";
    /**
     * 会话超时时间 毫秒
     */
    private int sessionTimeoutMs = 60000;
    /**
     * 连接超时时间 毫秒
     */
    private int connectionTimeoutMs = 15000;
    /**
     * 最大关闭等待时间 毫秒
     */
    private int maxCloseWaitMs = 1000;
    /**
     * 默认数据
     */
    private String defaultData = "";
    /**
     * 当半数以上zookeeper服务出现故障仍然提供读服务
     */
    private boolean canBeReadOnly = false;
    /**
     * 自动创建父节点
     */
    private boolean useContainerParentsIfAvailable = true;
    /**
     * 线程池名称
     */
    private String threadFactoryClassName;
    private Retry retry = new Retry();
    private Auth auth = new Auth();

    @Data
    public static class Retry {
        private int maxSleepTimeMs = 10000;
        private int baseSleepTimeMs = 1000;
        private int maxRetries = 3;
    }

    @Data
    public static class Auth {
        private String scheme = "digest";
        private String auth;
    }

}

6.5 CuratorClientConfig

package com.ldx.zookeeper.config;

import com.ldx.zookeeper.util.CuratorClient;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.CompressionProvider;
import org.apache.curator.framework.imps.GzipCompressionProvider;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.curator.utils.DefaultZookeeperFactory;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.ThreadFactory;

/**
 * zk 配置类
 *
 * @author ludangxin
 * @date 2021/8/31
 */
@Slf4j
@Configuration
@EnableConfigurationProperties(CuratorClientProperties.class)
public class CuratorClientConfig {
    @Bean
    public EnsembleProvider ensembleProvider(CuratorClientProperties curatorClientProperties) {
        return new FixedEnsembleProvider(curatorClientProperties.getConnectionString());
    }

    @Bean
    public RetryPolicy retryPolicy(CuratorClientProperties curatorClientProperties) {
        CuratorClientProperties.Retry retry = curatorClientProperties.getRetry();
        return new BoundedExponentialBackoffRetry(retry.getBaseSleepTimeMs(), retry.getMaxSleepTimeMs(), retry.getMaxRetries());
    }

    @Bean
    public CompressionProvider compressionProvider() {
        return new GzipCompressionProvider();
    }

    @Bean
    public ZookeeperFactory zookeeperFactory() {
        return new DefaultZookeeperFactory();
    }

    @Bean
    public ACLProvider aclProvider() {
        return new ACLProvider() {
            @Override
            public List<ACL> getDefaultAcl() {
                return ZooDefs.Ids.CREATOR_ALL_ACL;
            }
            @Override
            public List<ACL> getAclForPath(String path) {
                return ZooDefs.Ids.CREATOR_ALL_ACL;
            }
        };
    }

    @Bean
    @SneakyThrows
    public CuratorFrameworkFactory.Builder builder(EnsembleProvider ensembleProvider,
                                                   RetryPolicy retryPolicy,
                                                   CompressionProvider compressionProvider,
                                                   ZookeeperFactory zookeeperFactory,
                                                   ACLProvider aclProvider,
                                                   CuratorClientProperties curatorClientProperties) {
        String charset = curatorClientProperties.getCharset();
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                .ensembleProvider(ensembleProvider)
                .retryPolicy(retryPolicy)
                .compressionProvider(compressionProvider)
                .zookeeperFactory(zookeeperFactory)
                .namespace(curatorClientProperties.getNamespace())
                .sessionTimeoutMs(curatorClientProperties.getSessionTimeoutMs())
                .connectionTimeoutMs(curatorClientProperties.getConnectionTimeoutMs())
                .maxCloseWaitMs(curatorClientProperties.getMaxCloseWaitMs())
                .defaultData(curatorClientProperties.getDefaultData().getBytes(Charset.forName(charset)))
                .canBeReadOnly(curatorClientProperties.isCanBeReadOnly());
        if (!curatorClientProperties.isUseContainerParentsIfAvailable()) {
            builder.dontUseContainerParents();
        }
        CuratorClientProperties.Auth auth = curatorClientProperties.getAuth();
        if (StringUtils.isNotBlank(auth.getAuth())) {
            builder.authorization(auth.getScheme(), auth.getAuth().getBytes(Charset.forName(charset)));
            builder.aclProvider(aclProvider);
        }
        String threadFactoryClassName = curatorClientProperties.getThreadFactoryClassName();
        if (StringUtils.isNotBlank(threadFactoryClassName)) {
            try {
                Class<?> cls = Class.forName(threadFactoryClassName);
                ThreadFactory threadFactory = (ThreadFactory) cls.newInstance();
                builder.threadFactory(threadFactory);
            } catch (Exception e) {
                log.error("init CuratorClient error", e);
            }
        }
        return builder;
    }

    @Bean(initMethod = "init", destroyMethod = "stop")
    public CuratorClient curatorClient(CuratorFrameworkFactory.Builder builder) {
        return new CuratorClient(builder);
    }

}

6.6 CuratorClient

package com.ldx.zookeeper.util;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.locks.*;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.zookeeper.CreateMode;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
 * zookeeper工具类
 *
 * @author ludangxin
 * @date 2021/8/30
 */
@Slf4j
public class CuratorClient {
    /**
     * 默认的字符编码集
     */
    private static final String DEFAULT_CHARSET = "utf8";
    /**
     * 客户端
     */
    private final CuratorFramework client;
    /**
     * 字符集
     */
    private String charset = DEFAULT_CHARSET;

    @SneakyThrows
    public CuratorClient(CuratorFrameworkFactory.Builder builder) {
        client = builder.build();
    }

    @SneakyThrows
    public CuratorClient(CuratorFrameworkFactory.Builder builder, String charset) {
        client = builder.build();
        this.charset = charset;
    }

    public void init() {
        client.start();
        client.getConnectionStateListenable().addListener((client, state) -> {
            if (state==ConnectionState.LOST) {
                // 连接丢失
                log.info("lost session with zookeeper");
            } else if (state==ConnectionState.CONNECTED) {
                // 连接新建
                log.info("connected with zookeeper");
            } else if (state==ConnectionState.RECONNECTED) {
                // 重新连接
                log.info("reconnected with zookeeper");
            }
        });
    }

    /**
     * 关闭会话
     */
    public void stop() {
        log.info("zookeeper session close");
        client.close();
    }

    /**
     * 创建节点
     *
     * @param mode     节点类型
     *   1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
     *   2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
     *   3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
     *   4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已经存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
     * @param path     节点名称
     * @param nodeData 节点数据
     */
    @SneakyThrows
    public void createNode(CreateMode mode, String path, String nodeData) {
       // 使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
       client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, nodeData.getBytes(Charset.forName(charset)));
    }

    /**
     * 创建节点
     *
     * @param mode 节点类型
     *   1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
     *   2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
     *   3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
     *   4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已经存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
     * @param path 节点名称
     */
    @SneakyThrows
    public void createNode(CreateMode mode, String path) {
       // 使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
       client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
    }

    /**
     * 删除节点数据
     *
     * @param path 节点名称
     */
    @SneakyThrows
    public void deleteNode(final String path) {
       deleteNode(path, true);
    }

    /**
     * 删除节点数据
     *
     * @param path 节点名称
     * @param deleteChildre 是否删除子节点
     */
    @SneakyThrows
    public void deleteNode(final String path, Boolean deleteChildre) {
       if (deleteChildre) {
           // guaranteed()删除一个节点,强制保证删除,
           // 只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功
           client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
       } else {
           client.delete().guaranteed().forPath(path);
       }
    }

    /**
     * 设置指定节点的数据
     *
     * @param path 节点名称
     * @param data 节点数据
     */
    @SneakyThrows
    public void setNodeData(String path, String data) {
       client.setData().forPath(path, data.getBytes(Charset.forName(charset)));
    }

    /**
     * 获取指定节点的数据
     *
     * @param path 节点名称
     * @return 节点数据
     */
    @SneakyThrows
    public String getNodeData(String path) {
       return new String(client.getData().forPath(path), Charset.forName(charset));
    }

    /**
     * 获取数据时先同步
     *
     * @param path 节点名称
     * @return 节点数据
     */
    public String synNodeData(String path) {
        client.sync();
        return getNodeData(path);
    }

    /**
     * 判断节点是否存在
     *
     * @param path 节点名称
     * @return true 节点存在,false 节点不存在
     */
    @SneakyThrows
    public boolean isExistNode(final String path) {
        client.sync();
        return Objects.nonNull(client.checkExists().forPath(path));
    }

    /**
     * 获取节点的子节点
     *
     * @param path 节点名称
     * @return 子节点集合
     */
    @SneakyThrows
    public List<String> getChildren(String path) {
        return client.getChildren().forPath(path);
    }

    /**
     * 创建排他锁
     *
     * @param path 节点名称
     * @return 排他锁
     */
    public InterProcessSemaphoreMutex getSemaphoreMutexLock(String path) {
       return new InterProcessSemaphoreMutex(client, path);
    }

    /**
     * 创建可重入排他锁
     *
     * @param path 节点名称
     * @return 可重入排他锁
     */
    public InterProcessMutex getMutexLock(String path) {
       return new InterProcessMutex(client, path);
    }

    /**
     * 创建一组可重入排他锁
     *
     * @param paths 节点名称集合
     * @return 锁容器
     */
    public InterProcessMultiLock getMultiMutexLock(List<String> paths) {
        return new InterProcessMultiLock(client, paths);
    }

    /**
     * 创建一组任意类型的锁
     *
     * @param locks 锁集合
     * @return 锁容器
     */
    public InterProcessMultiLock getMultiLock(List<InterProcessLock> locks) {
        return new InterProcessMultiLock(locks);
    }

    /**
     * 加锁
     *
     * @param lock 分布式锁对象
     */
    @SneakyThrows
    public void acquire(InterProcessLock lock) {
       lock.acquire();
    }

    /**
     * 加锁
     *
     * @param lock 分布式锁对象
     * @param time 等待时间
     * @param unit 时间单位
     */
    @SneakyThrows
    public void acquire(InterProcessLock lock, long time, TimeUnit unit) {
       lock.acquire(time, unit);
    }

    /**
     * 释放锁
     *
     * @param lock 分布式锁对象
     */
    @SneakyThrows
    public void release(InterProcessLock lock) {
       lock.release();
    }

    /**
     * 检查是否当前jvm的线程获取了锁
     *
     * @param lock 分布式锁对象
     * @return true/false
     */
    public boolean isAcquiredInThisProcess(InterProcessLock lock) {
        return lock.isAcquiredInThisProcess();
    }

    /**
     * 获取读写锁
     *
     * @param path 节点名称
     * @return 读写锁
     */
    public InterProcessReadWriteLock getReadWriteLock(String path) {
        return new InterProcessReadWriteLock(client, path);
    }

    /**
     * 监听数据节点的变化情况
     *
     * @param path 节点名称
     * @param listener 监听器
     * @return 监听节点的TreeCache实例
     */
    @SneakyThrows
    public CuratorCache watch(String path, CuratorCacheListener listener) {
        CuratorCache curatorCache = CuratorCache.builder(client, path).build();
        curatorCache.listenable().addListener(listener);
        curatorCache.start();
        return curatorCache;
    }

    /**
     * 监听数据节点的变化情况
     *
     * @param path 节点名称
     * @param listener 监听器
     * @return 监听节点的TreeCache实例
     */
    public CuratorCache watch(String path, CuratorCacheListener listener, Executor executor) {
        CuratorCache curatorCache = CuratorCache.builder(client, path).build();
        curatorCache.listenable().addListener(listener, executor);
        curatorCache.start();
        return curatorCache;
    }

    /**
     * 取消监听节点
     *
     * @param path 节点名称
     * @param listener 监听器
     */
    public void unwatch(String path, CuratorCacheListener listener) {
        CuratorCache curatorCache = CuratorCache.builder(client, path).build();
        curatorCache.listenable().removeListener(listener);
    }

}

6.7 AppController

package com.ldx.zookeeper.controller;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ldx.zookeeper.util.CuratorClient;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.web.bind.annotation.*;

/**
 * demo
 *
 * @author ludangxin
 * @date 2021/8/30
 */
@Slf4j
@RestController
@RequestMapping("app")
@RequiredArgsConstructor
public class AppController {

   private final CuratorClient curatorClient;

   @GetMapping("{appName}")
   public String getData(@PathVariable String appName) {
      return curatorClient.getNodeData(setPrefix(appName));
   }

   @PostMapping("{appName}")
   public String addApp(@PathVariable String appName, @RequestParam String data) {
      curatorClient.createNode(CreateMode.PERSISTENT, setPrefix(appName), data);
      return "ok";
   }

   @PostMapping("{appName}/{childName}")
   public String addAppChild(@PathVariable String appName, @PathVariable String childName, @RequestParam String data) {
      curatorClient.createNode(CreateMode.PERSISTENT, setPrefix(appName).concat(setPrefix(childName)), data);
      return "ok";
   }

   @PutMapping("{appName}")
   public String setData(@PathVariable String appName, String data) {
      curatorClient.setNodeData(setPrefix(appName), data);
      return "ok";
   }

   @PutMapping("{appName}/{childName}")
   public String setData(@PathVariable String appName, @PathVariable String childName, String data) {
      curatorClient.setNodeData(setPrefix(appName).concat(setPrefix(childName)), data);
      return "ok";
   }

   @DeleteMapping("{appName}")
   public String delApp(@PathVariable String appName) {
      curatorClient.deleteNode(setPrefix(appName));
      return "ok";
   }

   @PostMapping("{appName}/watch/dir")
   public String watchAppDir(@PathVariable String appName) {
      CuratorCacheListener listener = CuratorCacheListener.builder().forDeletes(obj -> {
         String path = obj.getPath();
         String data = new String(obj.getData());
         Stat stat = obj.getStat();
         log.info("节点:{} 被删除,节点数据:{},节点状态:\\{version:{},createTime:{}\\}", path, data,
            stat.getVersion(), stat.getCtime());
      }).build();
      curatorClient.watch(setPrefix(appName), listener);
      return "ok";
   }

   @PostMapping("{appName}/watch/data")
   public String watchAppData(@PathVariable String appName) {
      ObjectMapper mapper = new ObjectMapper();
      CuratorCacheListener listener = CuratorCacheListener.builder().forChanges((oldNode, newNode) -> {
         try {
            String path = oldNode.getPath();
            log.info("节点:{} 被修改,修改前:{} ", path, mapper.writeValueAsString(oldNode));
            log.info("节点:{} 被修改,修改后:{} ", path, mapper.writeValueAsString(newNode));
         } catch(JsonProcessingException e) {
            e.printStackTrace();
         }
      }).build();
      curatorClient.watch(setPrefix(appName), listener);
      return "ok";
   }

   private String setPrefix(String appName) {
      String prefix = "/";
      if(!appName.startsWith(prefix)) {
         appName = prefix.concat(appName);
      }
      return appName;
   }
}

7. ZAB 协议介绍

ZAB(ZooKeeper Atomic Broadcast 原子广播) 协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议。 在 ZooKeeper 中,主要依赖 ZAB 协议来实现分布式数据一致性,基于该协议,ZooKeeper 实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性。

ZAB 协议两种基本的模式:崩溃恢复和消息广播

ZAB协议包括两种基本的模式,分别是 崩溃恢复和消息广播。当整个服务框架在启动过程中,或是当 Leader 服务器出现网络中断、崩溃退出与重启等异常情况时,ZAB 协议就会进人恢复模式并选举产生新的Leader服务器。当选举产生了新的 Leader 服务器,同时集群中已经有过半的机器与该Leader服务器完成了状态同步之后,ZAB协议就会退出恢复模式。其中,所谓的状态同步是指数据同步,用来保证集群中存在过半的机器能够和Leader服务器的数据状态保持一致。

当集群中已经有过半的Follower服务器完成了和Leader服务器的状态同步,那么整个服务框架就可以进人消息广播模式了。 当一台同样遵守ZAB协议的服务器启动后加人到集群中时,如果此时集群中已经存在一个Leader服务器在负责进行消息广播,那么新加人的服务器就会自觉地进人数据恢复模式:找到Leader所在的服务器,并与其进行数据同步,然后一起参与到消息广播流程中去。正如上文介绍中所说的,ZooKeeper设计成只允许唯一的一个Leader服务器来进行事务请求的处理。Leader服务器在接收到客户端的事务请求后,会生成对应的事务提案并发起一轮广播协议;而如果集群中的其他机器接收到客户端的事务请求,那么这些非Leader服务器会首先将这个事务请求转发给Leader服务器。

以上就是详解Zookeeper基础知识的详细内容,更多关于Zookeeper基础的资料请关注我们其它相关文章!

(0)

相关推荐

  • 如何使用Java操作Zookeeper

    简介 Java操作Zookeeper有很多种方式,如zookeeper.zkclient.curator等等,下面介绍下使用zkclient的方式操作Zookeeper. Maven依赖: <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.11</version> </dependency&g

  • C#如何连接使用Zookeeper

    Zookeeper作为分布式的服务框架,虽然是java写的,但是强大的C#也可以连接使用. C#要连接使用Zookeeper,需要借助第三方插件,而现在主要有两个插件可供使用,分别是ZooKeeperNetEx和Zookeeper.Net Zookeeper.Net好像是是Apache官方提供的,但是5年没更新了,也就是说他依赖于.net framework,因此无法在.net core项目中使用 ZooKeeperNetEx是从java改过来的,因此里面的一些习惯是java风格的,但是好像有人

  • 如何用python 操作zookeeper

    ZooKeeper 简介 ZooKeeper 是一个分布式的.开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 Hbase 的重要组件.它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护.域名服务.分布式同步.组服务等.ZooKeeper 支持大部分开发语言,除了某些特定的功能只支持 Java 和 C.python 通过 kazoo 可以实现操作 ZooKeeper . 一.安装 这个简单,使用 pip 命令安装 pip3

  • 分析ZooKeeper分布式锁的实现

    目录 一.分布式锁方案比较 二.ZooKeeper实现分布式锁 2.1.方案一 2.2.方案二 一.分布式锁方案比较 方案 实现思路 优点 缺点 利用 MySQL 的实现方案 利用数据库自身提供的锁机制实现,要求数据库支持行级锁 实现简单 性能差,无法适应高并发场景:容易出现死锁的情况:无法优雅的实现阻塞式锁 利用 Redis 的实现方案 使用 Setnx 和 lua 脚本机制实现,保证对缓存操作序列的原子性 性能好 实现相对复杂,有可能出现死锁:无法优雅的实现阻塞式锁 利用 ZooKeeper

  • 详解Zookeeper基础知识

    目录 1. 简介 2. 数据模型 2.1 模型结构 2.2 模型的特点 2.3 节点分类 2.3.1 Persistent 2.3.2 Persistent Sequential 2.3.3 Ephemeral 2.3.4 Ephemeral Sequential 3. 安装 3.1 官方 3.2 docker 3.3 docker-compose 3.4 配置信息 4. 基础命令 4.1 创建会话 4.2 ls 4.3 create 4.4 get 4.5 stat 4.6 set 4.7 d

  • 详解Java基础知识——JDBC

    JDBC Java DataBase Connectivity,java数据库连接,为了降低操作数据的难度,java提供jdbc,按照java面向对象特点,对操作进行了很多封装. JDBC提供了很多接口,然后不同数据库厂商去实现这个接口,到底底层如何去实现,不同的数据库不一样,不同的数据库厂商需要提供接口实现类(驱动类.驱动程序 Driver.驱动) 我们连接不同的数据库,我们只需要使用不同的驱动即可. J:Java:提供访问数据库的规范(接口), DBC:接口的实现,厂商去实现这个接口. JD

  • 详解JavaScript基础知识(JSON、Function对象、原型、引用类型)

    1.JSON 概念:JavaScript 对象表示法(JavaScript Object Notation),是一种轻量级的数据交换格式 特点:易于程序员编写和查看:易于计算机解析和生成 数据结构:Object对象格式   { key: value , key: value } Array数组格式   [ value , value ] Eg:  var json = ' { "   " : ''  '' } '    (内部双引号,外部单引号) * JSON - 支持独立的"

  • 详解Java进阶知识注解

    一.注解的概念 1.注解官方解释 注解 叫元数据,一种代码级别的说明,它是JDK1.5及以后版本引入的一个特性,与类.接口.枚举在同一个层次,它可以声明在包.类.字段.局部变量.方法参数等的前面,用来对这些元素进行说明.注释. 注解的作用分类 编写文档:通过代码里表示的元数据生成文档[生成doc文档] 代码分析:通过代码里表示的元数据进行分析[使用反射] 编译检查:通过代码里表示的元数据让编译器能够实现基本的编译检查[Override] 注解按照运行机制分类 源码注解:注解只在源码中存在,编译成

  • 详解Sql基础语法

    1.创建数据库 create  database 数据库名称 2.删除数据库 drop database 数据库名称 3.备份sql server 创建备份数据的device use master exec sp_addumpdevice '名称','新的名称','路径' 开始备份 backup database pubs to 新的名称 4.创建表 create table 表名(列名1 类型,列名2 类型) 5.根据已有表创建新表 create table 新表名称 like 旧表名称 cr

  • C++中的STL中map用法详解(零基础入门)

    目录 一.什么是 map ? 二.map的定义 2.1 头文件 2.2 定义 2.3 方法 三.实例讲解 3.1 增加数据 3.2 删除数据 3.3 修改数据 3.4 查找数据 3.5 遍历元素 3.6 其它方法 四.总结 map 在编程中是经常使用的一个容器,本文来讲解一下 STL 中的 map,赶紧来看下吧! 一.什么是 map ? map 是具有唯一键值对的容器,通常使用红黑树实现. map 中的键值对是 key value 的形式,比如:每个身份证号对应一个人名(反过来不成立哦!),其中

  • C语言 详解字符串基础

    目录 一.字符串的概念 二.字符数组与字符串 三.字符串字面量的秘密 四.字符串的长度 五.小结 一.字符串的概念 字符串是有序字符的集合 字符串是程序中的基本元素之一 C 语言中没有字符串的概念 C 语言中通过特殊的字符数组模拟字符串 C 语言中的字符串是以 ‘\0’ 结尾的字符数组 二.字符数组与字符串 在C语言中,双引号引用的单个或多个字符是—种特殊的字面量 存储于程序的全局只读存诸区 本质为字符数组,编译器自动在结尾加上 ‘\0' 字符 下面看一段字符数组与字符串的代码: #includ

  • 详解JVM基础之字节码的增强技术

    目录 字节码增强技术 ASM Javassist 运行时类的重载 问题引出 Instrument JVMTI & Agent & Attach API 使用场景 总结 字节码增强技术 在上文中,着重介绍了字节码的结构,这为我们了解字节码增强技术的实现打下了基础.字节码增强技术就是一类对现有字节码进行修改或者动态生成全新字节码文件的技术.接下来,我们将从最直接操纵字节码的实现方式开始深入进行剖析 ASM 对于需要手动操纵字节码的需求,可以使用ASM,它可以直接生产 .class字节码文件,也

  • JS表格组件神器bootstrap table详解(基础版)

    一.Bootstrap Table的引入 关于Bootstrap Table的引入,一般来说还是两种方法: 1.直接下载源码,添加到项目里面来. 由于Bootstrap Table是Bootstrap的一个组件,所以它是依赖Bootstrap的,我们首先需要添加Bootstrap的引用. 2.使用我们神奇的Nuget 打开Nuget,搜索这两个包 Bootstrap已经是最新的3.3.5了,我们直接安装即可. 而Bootstrap Table的版本竟然是0.4,这也太坑爹了.所以博主建议Boot

  • 详解C++基础——类继承中方法重载

    一.前言 在上一篇C++基础博文中讨论了C++最基本的代码重用特性--类继承,派生类可以在继承基类元素的同时,添加新的成员和方法.但是没有考虑一种情况:派生类继承下来的方法的实现细节并不一定适合派生类的需求,此时派生类需要重载集成方法. 二.重载方法及虚函数 我们讨论<C++ Primer Plus>中的如下场景:银行记录客户信息,包括客户姓名.当前余额.客户这一类别当然能够创建客户对象.存款.取款以及显示信息.银行需要特殊记录具有透支权限的客户,因此这一类别的客户要额外记录透支上限.透支贷款

随机推荐