ZooKeeper入门教程三分布式锁实现及完整运行源码

目录
  • 1.0版本
  • 2.0版本
  • LockSample类
    • 构造方法
    • 获取锁实现
      • createLock()
      • attemptLock()
    • 释放锁实现
  • TicketSeller类
    • sell()
    • sellTicketWithLock()
    • 测试入口
    • 测试方法
  • 代码清单如下:
    • 1、LockSample
    • 2、TicketSeller

ZooKeeper入门教程一简介与核心概念

ZooKeeper入门教程二在单机和集群环境下的安装搭建及使用

1.0版本

首先我们先介绍一个简单的zookeeper实现分布式锁的思路:

用zookeeper中一个临时节点代表锁,比如在/exlusive_lock下创建临时子节点/exlusive_lock/lock。

  • 所有客户端争相创建此节点,但只有一个客户端创建成功。
  • 创建成功代表获取锁成功,此客户端执行业务逻辑
  • 未创建成功的客户端,监听/exlusive_lock变更
  • 获取锁的客户端执行完成后,删除/exlusive_lock/lock,表示锁被释放
  • 锁被释放后,其他监听/exlusive_lock变更的客户端得到通知,再次争相创建临时子节点/exlusive_lock/lock。此时相当于回到了第2步。

我们的程序按照上述逻辑直至抢占到锁,执行完业务逻辑。

上述是较为简单的分布式锁实现方式。能够应付一般使用场景,但存在着如下两个问题:

1、锁的获取顺序和最初客户端争抢顺序不一致,这不是一个公平锁。每次锁获取都是当次最先抢到锁的客户端。

2、羊群效应,所有没有抢到锁的客户端都会监听/exlusive_lock变更。当并发客户端很多的情况下,所有的客户端都会接到通知去争抢锁,此时就出现了羊群效应。

为了解决上面的问题,我们重新设计。

2.0版本

我们在2.0版本中,让每个客户端在/exlusive_lock下创建的临时节点为有序节点,这样每个客户端都在/exlusive_lock下有自己对应的锁节点,而序号排在最前面的节点,代表对应的客户端获取锁成功。排在后面的客户端监听自己前面一个节点,那么在他前序客户端执行完成后,他将得到通知,获得锁成功。逻辑修改如下:

  • 每个客户端往/exlusive_lock下创建有序临时节点/exlusive_lock/lock_。创建成功后/exlusive_lock下面会有每个客户端对应的节点,如/exlusive_lock/lock_000000001
  • 客户端取得/exlusive_lock下子节点,并进行排序,判断排在最前面的是否为自己。如果自己的锁节点在第一位,代表获取锁成功,此客户端执行业务逻辑
  • 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点lock_000000002,那么则监听lock_000000001.
  • 当前一位锁节点(lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(lock_000000002)的逻辑。
  • 监听客户端重新执行第2步逻辑,判断自己是否获得了锁。

如此修改后,每个客户端只关心自己前序锁是否释放,所以每次只会有一个客户端得到通知。而且,所有客户端的执行顺序和最初锁创建的顺序是一致的。解决了1.0版本的两个问题。

接下来我们看看代码如何实现。

LockSample类

此类是分布式锁类,实现了2个分布式锁的相关方法:

1、获取锁

2、释放锁

主要程序逻辑围绕着这两个方法的实现,特别是获取锁的逻辑。我们先看一下该类的成员变量:

private ZooKeeper zkClient;
private static final String LOCK_ROOT_PATH = "/Locks";
private static final String LOCK_NODE_NAME = "Lock_";
private String lockPath;

定义了zkClient,用来操作zookeeper。

锁的根路径,及自增节点的前缀。此处生产环境应该由客户端传入。

当前锁的路径。

构造方法

public LockSample() throws IOException {
    zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            if(event.getState()== Event.KeeperState.Disconnected){
                System.out.println("失去连接");

            }
        }
    });
}

创建zkClient,同时创建了状态监听。此监听可以去掉,这里只是打印出失去连接状态。

获取锁实现

暴露出来的获取锁的方法为acquireLock(),逻辑很简单:

public  void acquireLock() throws InterruptedException, KeeperException {
    //创建锁节点
    createLock();
    //尝试获取锁
    attemptLock();
}

首先创建锁节点,然后尝试去取锁。真正的逻辑都在这两个方法中。

createLock()

先判断锁的根节点/Locks是否存在,不存在的话创建。然后在/Locks下创建有序临时节点,并设置当前的锁路径变量lockPath。

代码如下:

private void createLock() throws KeeperException, InterruptedException {
    //如果根节点不存在,则创建根节点
    Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
    if (stat == null) {
        zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    // 创建EPHEMERAL_SEQUENTIAL类型节点
    String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
            Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);
    this.lockPath=lockPath;
}

attemptLock()

这是最核心的方法,客户端尝试去获取锁,是对2.0版本逻辑的实现,这里就不再重复逻辑,直接看代码:

private void attemptLock() throws KeeperException, InterruptedException {
    // 获取Lock所有子节点,按照节点序号排序
    List<String> lockPaths = null;
    lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
    Collections.sort(lockPaths);
    int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
    // 如果lockPath是序号最小的节点,则获取锁
    if (index == 0) {
        System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);
        return ;
    } else {
        // lockPath不是序号最小的节点,监听前一个节点
        String preLockPath = lockPaths.get(index - 1);

        Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);

        // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁
        if (stat == null) {
            attemptLock();
        } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock
            System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath);
            synchronized (watcher) {
                watcher.wait();
            }
            attemptLock();
        }
    }
}

注意这一行代码

Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);

我们在获取前一个节点的时候,同时设置了监听watcher。如果前锁存在,则阻塞主线程。

watcher定义代码如下:

private Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        System.out.println(event.getPath() + " 前锁释放");
        synchronized (this) {
            notifyAll();
        }
    }
};

watcher只是notifyAll,让主线程继续执行,以便再次调用attemptLock(),去尝试获取lock。如果没有异常情况的话,此时当前客户端应该能够成功获取锁。

释放锁实现

释放锁原语实现很简单,参照releaseLock()方法。代码如下:

public void releaseLock() throws KeeperException, InterruptedException {
    zkClient.delete(lockPath, -1);
    zkClient.close();
    System.out.println(" 锁释放:" + lockPath);
}

关于分布式锁的代码到此就讲解完了,我们再看下客户端如何使用它。

我们创建一个TicketSeller类,作为客户端来使用分布式锁。

TicketSeller类

sell()

不带锁的业务逻辑方法,代码如下:

private void sell(){
    System.out.println("售票开始");
    // 线程随机休眠数毫秒,模拟现实中的费时操作
    int sleepMillis = (int) (Math.random() * 2000);
    try {
        //代表复杂逻辑执行了一段时间
        Thread.sleep(sleepMillis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("售票结束");
}

仅是为了演示,sleep了一段时间。

sellTicketWithLock()

此方法中,加锁后执行业务逻辑,代码如下:

public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
    LockSample lock = new LockSample();
    lock.acquireLock();
    sell();
    lock.releaseLock();
}

测试入口

接下来我们写一个main函数做测试:

public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
    TicketSeller ticketSeller = new TicketSeller();
    for(int i=0;i<1000;i++){
        ticketSeller.sellTicketWithLock();
    }
}

main函数中我们循环调用ticketSeller.sellTicketWithLock(),执行加锁后的卖票逻辑。

测试方法

1、先启动一个java程序运行,可以看到日志输出如下:

main 锁创建: /Locks/Lock_0000000391
main 锁获得, lockPath: /Locks/Lock_0000000391
售票开始
售票结束
 锁释放:/Locks/Lock_0000000391
main 锁创建: /Locks/Lock_0000000392
main 锁获得, lockPath: /Locks/Lock_0000000392
售票开始
售票结束
 锁释放:/Locks/Lock_0000000392
main 锁创建: /Locks/Lock_0000000393
main 锁获得, lockPath: /Locks/Lock_0000000393
售票开始
售票结束
 锁释放:/Locks/Lock_0000000393

可见每次执行都是按照锁的顺序执行,而且由于只有一个进程,并没有锁的争抢发生。

2、我们再启动一个同样的程序,锁的争抢此时发生了,可以看到双方的日志输出如下:

程序1:

main 锁获得, lockPath: /Locks/Lock_0000000471
售票开始
售票结束
 锁释放:/Locks/Lock_0000000471
main 锁创建: /Locks/Lock_0000000473
 等待前锁释放,prelocakPath:Lock_0000000472
/Locks/Lock_0000000472 前锁释放
main 锁获得, lockPath: /Locks/Lock_0000000473
售票开始
售票结束
 锁释放:/Locks/Lock_0000000473

可以看到Lock_0000000471执行完成后,该进程获取的锁为Lock_0000000473,这说明Lock_0000000472被另外一个进程创建了。此时Lock_0000000473在等待前锁释放。Lock_0000000472释放后,Lock_0000000473才获得锁,然后才执行业务逻辑。

我们再看程序2的日志:

main 锁获得, lockPath: /Locks/Lock_0000000472
售票开始
售票结束
 锁释放:/Locks/Lock_0000000472
main 锁创建: /Locks/Lock_0000000474
 等待前锁释放,prelocakPath:Lock_0000000473
/Locks/Lock_0000000473 前锁释放
main 锁获得, lockPath: /Locks/Lock_0000000474
售票开始
售票结束
 锁释放:/Locks/Lock_0000000474

可以看到,确实是进程2获取了Lock_0000000472。

zookeeper实现分布式锁就先讲到这。注意代码只做演示用,并不适合生产环境使用。

代码清单如下:

1、LockSample

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

public class LockSample {

    //ZooKeeper配置信息
    private ZooKeeper zkClient;
    private static final String LOCK_ROOT_PATH = "/Locks";
    private static final String LOCK_NODE_NAME = "Lock_";
    private String lockPath;

    // 监控lockPath的前一个节点的watcher
    private Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println(event.getPath() + " 前锁释放");
            synchronized (this) {
                notifyAll();
            }

        }
    };

    public LockSample() throws IOException {
        zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if(event.getState()== Event.KeeperState.Disconnected){
                    System.out.println("失去连接");

                }
            }
        });
    }

    //获取锁的原语实现.
    public  void acquireLock() throws InterruptedException, KeeperException {
        //创建锁节点
        createLock();
        //尝试获取锁
        attemptLock();
    }

    //创建锁的原语实现。在lock节点下创建该线程的锁节点
    private void createLock() throws KeeperException, InterruptedException {
        //如果根节点不存在,则创建根节点
        Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
        if (stat == null) {
            zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

        // 创建EPHEMERAL_SEQUENTIAL类型节点
        String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
                Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);
        this.lockPath=lockPath;
    }

    private void attemptLock() throws KeeperException, InterruptedException {
        // 获取Lock所有子节点,按照节点序号排序
        List<String> lockPaths = null;

        lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);

        Collections.sort(lockPaths);

        int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));

        // 如果lockPath是序号最小的节点,则获取锁
        if (index == 0) {
            System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);
            return ;
        } else {
            // lockPath不是序号最小的节点,监控前一个节点
            String preLockPath = lockPaths.get(index - 1);

            Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);

            // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁
            if (stat == null) {
                attemptLock();
            } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock
                System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath);
                synchronized (watcher) {
                    watcher.wait();
                }
                attemptLock();
            }
        }
    }

    //释放锁的原语实现
    public void releaseLock() throws KeeperException, InterruptedException {
        zkClient.delete(lockPath, -1);
        zkClient.close();
        System.out.println(" 锁释放:" + lockPath);
    }

}

2、TicketSeller

import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class TicketSeller {
    private void sell(){
        System.out.println("售票开始");
        // 线程随机休眠数毫秒,模拟现实中的费时操作
        int sleepMillis = (int) (Math.random() * 2000);
        try {
            //代表复杂逻辑执行了一段时间
            Thread.sleep(sleepMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("售票结束");
    }

    public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
        LockSample lock = new LockSample();
        lock.acquireLock();
        sell();
        lock.releaseLock();
    }

    public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
        TicketSeller ticketSeller = new TicketSeller();
        for(int i=0;i<1000;i++){
            ticketSeller.sellTicketWithLock();

        }
    }
}

以上就是ZooKeeper入门教程三分布式锁实现及完整运行源码的详细内容,更多关于ZooKeeper分布式锁实现源码的资料请关注我们其它相关文章!

(0)

相关推荐

  • 微服务架构之服务注册与发现功能详解

    目录 微服务的注册与发现 1.服务注册 2.服务发现 3.注册中心 4.现下的主流注册中心 4.1 Eureka 4.1.1 介绍 4.1.2 整体架构 4.1.3 接入Spring Cloud 4.2 ZooKeeper 4.2.1 介绍 4.2.2 整体架构 4.2.3 接入Dubbo生态 4.3 Consul 4.3.1 介绍 4.3.2 整体架构 4.3.3 生态对接 4.4 总结对比 详解微服务架构及其演进史 微服务全景架构全面瓦解 微服务架构拆分策略详解 微服务的注册与发现 我们前面

  • ZooKeeper框架教程Curator分布式锁实现及源码分析

    目录 如何使用InterProcessMutex 实现思路 代码实现概述 InterProcessMutex源码分析 实现接口 属性 构造方法 方法 获得锁 释放锁 LockInternals源码分析 获取锁 释放锁 总结 ZooKeeper入门教程一简介与核心概念 ZooKeeper入门教程二在单机和集群环境下的安装搭建及使用 ZooKeeper入门教程三分布式锁实现及完整运行源码 上一篇文章中,我们使用zookeeper的java api实现了分布式排他锁. Curator中有着更为标准.规

  • zookeeper服务优化的一些建议

    1.快照文件和事务日志文件分别挂在不同磁盘.zoo.cfg文件中,dataDir是存放快照数据的,dataLogDir是存放事务日志的.zookeeper更新操作过程:先写事务日志,再写内存,周期性落到磁盘(刷新内存到快照文件).事务日志的对写请求的性能影响很大,保证dataLogDir所在磁盘性能良好.没有竞争者. 2. 默认jvm没有配置Xmx.Xms等信息,可以在conf目录下创建java.env文件(内存堆空间一定要小于机器内存,避免使用swap) export JVMFLAGS="-X

  • ZooKeeper开发实际应用案例实战

    目录 项目背景介绍 面临问题 如何解决 代码讲解 数据服务器 检索服务器 总结 附:完整代码 数据服务端代码 检索服务端代码 项目背景介绍 首先给大家介绍一下本文描述项目的情况.这是一个检索网站,它让你能在几千万份复杂文档数据中检索出你所需要的文档数据.为了加快检索速度,项目的数据分布在100台机器的内存里,我们称之为数据服务器.除了数据,这100台机器上均部署着检索程序.这些server之外,还有数台给前端提供接口的搜索server,这些机器属一个集群,我们称之为检索服务器.当搜索请求过来时,

  • ZooKeeper入门教程一简介与核心概念

    目录 1.ZooKeeper介绍与核心概念 1.1 简介 1.2分布式系统面临的问题 1.通过网络进行信息共享 2.通过共享存储 1.3 ZooKeeper如何解决分布式系统面临的问题 1.4 zookeeper概念介绍 1.4.1 znode 1.4.2 观察与通知 1.4.3 版本 1.4.4 法定人数 1.4.5 会话 1.4.6 会话状态和生命周期 回顾总结 本章是后续学习的基石,只有充分理解了分布式系统的概念和面临的问题,以及ZooKeeper内部的概念,才能懂得ZooKeeper是如

  • zookeeper概述图文详解

    1.1 概述 分布式系统:分布式系统指由很多台计算机组成的一个整体!这个整体一致对外,并且处理同一请求!系统对内透明,对外不透明!内部的每台计算机,都可以相互通信,例如使用RPC/REST或者是WebService!客户端向一个分布式系统发送的一次请求到接受到响应,有可能会经历多台计算机! Zookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目,多用作为集群提供服务的中间件! Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责

  • ZooKeeper的安装及部署教程

    一.Zookeeper介绍 •是一个针对大型分布式系统的可靠协调系统: •提供的功能包括:配置维护.名字服务.分布式同步.组服务等: •目标就是封装好复杂易出错的关键职务,将简单易用的接口和性能高效.功能稳定的系统提供给用户: •Zookeeper已经成为Hadoop生态系统中的基础组件. 二.Zookeeper特点 •最终一致性:为客户端展示同一视图,这是Zookeeper最重要的性能: •可靠性:如果消息被一台服务器接受,那么它将被所有的服务器接受: •原子性:更新只能成功或失败,没有中间状

  • ZooKeeper入门教程三分布式锁实现及完整运行源码

    目录 1.0版本 2.0版本 LockSample类 构造方法 获取锁实现 createLock() attemptLock() 释放锁实现 TicketSeller类 sell() sellTicketWithLock() 测试入口 测试方法 代码清单如下: 1.LockSample 2.TicketSeller ZooKeeper入门教程一简介与核心概念 ZooKeeper入门教程二在单机和集群环境下的安装搭建及使用 1.0版本 首先我们先介绍一个简单的zookeeper实现分布式锁的思路:

  • 零基础易语言入门教程(三)之了解控制台程序

    易语言简介: 易语言是一门以中文作为程序代码编程语言.以"易"著称.创始人为吴涛.早期版本的名字为E语言.易语言最早的版本的发布可追溯至2000年9月11日.创造易语言的初衷是进行用中文来编写程序的实践.从2000年至今,易语言已经发展到一定的规模,功能上.用户数量上都十分可观. 易语言和其它编程语言一样都有后台程序,它也不一定必须是窗口程序的了,下面小编带大家了解易语言的控制台程序. 方法和步骤如下所示: 1.延时命令: 首先学习一个第一个命令,该命令可将其脚本界面延时.1000毫秒

  • Bootstrap零基础入门教程(三)

    什么是 Bootstrap? Bootstrap 是一个用于快速开发 Web 应用程序和网站的前端框架.Bootstrap 是基于 HTML.CSS.JAVASCRIPT 的. 历史 Bootstrap 是由 Twitter 的 Mark Otto 和 Jacob Thornton 开发的.Bootstrap 是 2011 年八月在 GitHub 上发布的开源产品. 写到这里,这篇从零开始学Bootstrap(3)我想写以下几个内容: 1. 基于我对Bootstrap的理解,做一个小小的总结.

  • BootStrap入门教程(三)之响应式原理

    相关阅读: BootStrap入门教程(一)之可视化布局 BootStrap入门教程(二)之固定的内置样式 Bootstrap网格系统(Grid System) 响应式网格系统随着屏幕或视口(viewport)尺寸的增加,系统会自动分为最多12列. 工作原理 · 行必须放置在 .container class 内,以便获得适当的对齐(alignment)和内边距(padding). · 使用行来创建列的水平组. · 内容应该放置在列内,且唯有列可以是行的直接子元素. · 预定义的网格类,比如 .

  • Netty分布式客户端处理接入事件handle源码解析

    目录 处理接入事件创建handle 我们看其RecvByteBufAllocator接口 跟进newHandle()方法中 继续回到read()方法 我们跟进reset中 前文传送门 :客户端接入流程初始化源码分析 上一小节我们剖析完成了与channel绑定的ChannelConfig初始化相关的流程, 这一小节继续剖析客户端连接事件的处理 处理接入事件创建handle 回到上一章NioEventLoop的processSelectedKey ()方法 private void processS

  • Netty分布式server启动流程Nio创建源码分析

    目录 NioServerSocketChannel创建 继承关系 绑定端口 端口封装成socket地址对象 跟进initAndRegister()方法 创建channel 父类的构造方法 将jdk的channel设置为非阻塞模式 前文传送门 Netty分布式Server启动流程服务端初始化源码分析 NioServerSocketChannel创建 我们如果熟悉Nio, 则对channel的概念则不会陌生, channel在相当于一个通道, 用于数据的传输 Netty将jdk的channel进行了

  • Netty分布式获取异线程释放对象源码剖析

    目录 获取异线程释放对象 在介绍之前我们首先看Stack类中的两个属性 我们跟到pop方法中 继续跟到scavengeSome方法中 我们继续分析transfer方法 接着我们我们关注一个细节 我们跟到reclaimSpace方法 章节小结 前文传送门:异线程下回收对象 获取异线程释放对象 上一小节分析了异线程回收对象, 原理是通过与stack关联的WeakOrderQueue进行回收 如果对象经过异线程回收之后, 当前线程需要取出对象进行二次利用, 如果当前stack中为空, 则会通过当前st

  • 网页游戏开发入门教程三(简单程序应用)

    网页游戏开发入门教程二(游戏模式+系统)http://www.jb51.net/article/20724.htm 一.选择开发语言 后台:java .net php 前台:flex javascript ajax 数据库:mysql mssql 用哪种组合,真的不重要.重要的是时间和成本.复杂的地方在数据的交互和完善,而不在技术或效果的实现.往往遇到一些问题.比如地图如何编?人物移动如何实现?其实这些问题从技术上实现都比较容易.难在实现后,数据如何交互.没有解决数据交互的问题,实现这些技术点的

随机推荐