InterProcessMutex实现zookeeper分布式锁原理

原理简介:

zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且其他节点就会监听序号比自己小的节点,一旦序号比自己小的节点被删除了,其他节点就会得到相应的事件,然后查看自己是否为序号最小的节点,如果是,则获取锁

zookeeper节点图分析

InterProcessMutex实现的锁机制是公平且互斥的,公平的方式是按照每个请求的顺序进行排队的。

InterProcessMutex实现的InterProcessLock接口,InterProcessLock主要规范了如下几个方法:

// 获取互斥锁
public void acquire() throws Exception;
// 在给定的时间内获取互斥锁
public boolean acquire(long time, TimeUnit unit) throws Exception;
// 释放锁处理
public void release() throws Exception;
// 如果此JVM中的线程获取了互斥锁,则返回true
boolean isAcquiredInThisProcess();

接下来我们看看InterProcessMutex中的实现,它究竟有哪些属性,以及实现细节

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
    // LockInternals是真正实现操作zookeeper的类,它内部包含连接zookeeper客户端的CuratorFramework
    // LockInternals的具体实现后面我会讲到
    private final LockInternals internals;
  	// basePath是锁的根结点,所有的临时有序的节点都是basePath的子节点,
    private final String basePath;
    //
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
	// LockData封装了请求对应的线程(owningThread)、锁的重入的次数(lockCount)、线程对应的临时节点(lockPath)
    private static class LockData {
        final Thread owningThread;
        final String lockPath;
      	// 原子性的
        final AtomicInteger lockCount = new AtomicInteger(1);

        private LockData(Thread owningThread, String lockPath)
        {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

    private static final String LOCK_NAME = "lock-";
    // 获取互斥锁,阻塞【InterProcessLock的实现】
    @Override
    public void acquire() throws Exception {
      	// 获取锁,一直等待
        if ( !internalLock(-1, null) ) {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
    // 获取互斥锁,指定时间time【InterProcessLock的实现】
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        return internalLock(time, unit);
    }
    // 当前线程是否占用锁中【InterProcessLock的实现】
    @Override
    public boolean isAcquiredInThisProcess() {
        return (threadData.size() > 0);
    }
    //如果调用线程与获取互斥锁的线程相同,则执行一次互斥锁释放。如果线程已多次调用acquire,当此方法返回时,互斥锁仍将保留 【InterProcessLock的实现】
    @Override
    public void release() throws Exception {
        Thread currentThread = Thread.currentThread(); //当前线程
        LockData lockData = threadData.get(currentThread); //线程对应的锁信息
        if ( lockData == null ) {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
      	// 因为获取到的锁是可重入的,对lockCount进行减1,lockCount=0时才是真正释放锁
        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 ) {
            return;
        }
        if ( newLockCount < 0 ) {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try {
          	// 到这里时lockCount=0,具体释放锁的操作交给LockInternals中的releaseLock方法实现
            internals.releaseLock(lockData.lockPath);
        }
        finally {
            threadData.remove(currentThread);
        }
    }
  	// 获取basePath根结点下的所有临时节点的有序集合
  	public Collection<String> getParticipantNodes() throws Exception {
        return LockInternals.getParticipantNodes(internals.getClient(), basePath, internals.getLockName(), internals.getDriver());
    }

  	boolean isOwnedByCurrentThread() {
        LockData lockData = threadData.get(Thread.currentThread());
        return (lockData != null) && (lockData.lockCount.get() > 0);
    }
  	protected String getLockPath() {
        LockData lockData = threadData.get(Thread.currentThread());
        return lockData != null ? lockData.lockPath : null;
    }
  	// acquire()中调用的internalLock()方法
  	private boolean internalLock(long time, TimeUnit unit) throws Exception {
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null ) {
            // 如果当前线程已经获取到了锁,那么将重入次数lockCount+1,返回true
            lockData.lockCount.incrementAndGet();
            return true;
        }
      	// attemptLock方法是获取锁的真正实现,lockPath是当前线程成功在basePath下创建的节点,若lockPath不为空代表成功获取到锁
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null ) {
          	// lockPath封装到当前线程对应的锁信息中
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }
        return false;
    }
}

接下来我们看看InterProcessMutex中使用的LockInternals类的实现细节

public class LockInternals {
    private final CuratorFramework                  client; // 连接zookeeper的客户端
    private final String                            path;	// 等于basePath,InterProcessMutex中传进来的
    private final String                            basePath; // 根结点
    private final LockInternalsDriver               driver; // 操作zookeeper节点的driver
    private final String                            lockName; // "lock-"
    private final AtomicReference<RevocationSpec>   revocable = new AtomicReference<RevocationSpec>(null);

    private final CuratorWatcher                    revocableWatcher = new CuratorWatcher() {
        @Override
        public void process(WatchedEvent event) throws Exception {
            if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) {
                checkRevocableWatcher(event.getPath());
            }
        }
    };
    // 监听节点的监听器,若被监听的节点有动静,则唤醒 notifyFromWatcher()=>notifyAll();
   	private final Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            notifyFromWatcher();
        }
    };
   	private volatile int    maxLeases;
  	// 获取basePath的子节点,排序后的
  	public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception
    {
        List<String> children = client.getChildren().forPath(basePath);
        List<String> sortedList = Lists.newArrayList(children);
        Collections.sort
        (
            sortedList,
            new Comparator<String>()
            {
                @Override
                public int compare(String lhs, String rhs)
                {
                    return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
                }
            }
        );
        return sortedList;
    }
  	// 尝试获取锁【internalLock=>attemptLock】
  	String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {	// 开始时间
        final long      startMillis = System.currentTimeMillis();
      	// 记录等待时间
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
      	// 重试次数
        int             retryCount = 0;
      	// 当前节点
        String          ourPath = null;
      	// 是否获取到锁的标志
        boolean         hasTheLock = false;
      	// 是否放弃获取到标志
        boolean         isDone = false;
      	// 不停尝试获取
        while ( !isDone )
        {
            isDone = true;

            try
            {	// 创建当前线程对应的节点
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
              	// internalLockLoop中获取
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {	// 是否可再次尝试
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }
		// 获取到锁后,返回当前线程对应创建的节点路径
        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }
  	// 循环获取【attemptLock=>internalLockLoop】
  	private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false; // 是否拥有分布式锁
        boolean     doDelete = false;	// 是否需要删除当前节点
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
			// 循环尝试获取锁
            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {	// 得到basePath下排序后的临时子节点
                List<String>        children = getSortedChildren();
              	// 获取之前创建的当前线程对应的子节点
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
				// 判断是否获取到锁,没有就返回监听路径
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
              	// 成功获取到
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {	// 没有获取到锁,监听前一个临时顺序节点
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try
                        {
					// 上一个临时顺序节点如果被删除,会唤醒当前线程继续竞争锁
                          client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                              	// 获取锁超时
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e )
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
              	// 因为获取锁超时,所以删除之前创建的临时子节点
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

  	private void deleteOurPath(String ourPath) throws Exception {
        try
        {
          	// 删除
            client.delete().guaranteed().forPath(ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // ignore - already deleted (possibly expired session, etc.)
        }
    }

 }

StandardLockInternalsDriver implements LockInternalsDriver

	// 前面internalLockLoop方法中driver.getsTheLock执行的方法
	@Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
    {
    	// 获取子节点在临时顺序节点列表中的位置
        int             ourIndex = children.indexOf(sequenceNodeName);
        // 检验子节点在临时顺序节点列表中是否有效
        validateOurIndex(sequenceNodeName, ourIndex);
        // 若当前子节点的位置<maxLeases,代表可获取锁【maxLeases默认=1,若ourIndex=0,代笔自己位置最小】
        boolean         getsTheLock = ourIndex < maxLeases;
        // getsTheLock=true,则不需要监听前maxLeases的节点【maxLeases默认=1,代表监听前面最靠近自己的节点】
        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);

        return new PredicateResults(pathToWatch, getsTheLock);
    }

用InterProcessMutex在自己业务实现分布式锁,请点击此链接阅读点我

到此这篇关于InterProcessMutex实现zookeeper分布式锁原理的文章就介绍到这了,更多相关InterProcessMutex实现zookeeper分布式锁内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java ZooKeeper分布式锁实现图解

    什么是分布式锁 1.在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题. 2.但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境(多台机器),跨JVM之间已经无法通过多线程的锁解决同步问题.那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题--这就是分布式锁.(多节点从分布式组件中获取锁) 例如以下实例: 各种抢票软件客户端通过zoo

  • 分布式锁为什么要选择Zookeeper而不是Redis?看完这篇你就明白了

    在分布式的应用中,为了防止单点故障,保障高可用,通常会采用主从结构,当主节点挂掉后,从节点可以代替主节点提供服务. Redis通过复制 + sentinel哨兵来实现主从模式. Zookeeper通过replicated mode复制模式来实现主从模式. 单从结构上看,Redis和Zookeeper都是主从架构,那Zookeeper的优势是什么?为什么要选择Zookeeper?难道只是因为Zookeeper是目录结构,Redis是K-V结构吗? 同步机制的不同 Redis Redis在给从节点同

  • zookeeper实现分布式锁

    一.分布式锁介绍 分布式锁主要用于在分布式环境中保护跨进程.跨主机.跨网络的共享资源实现互斥访问,以达到保证数据的一致性. 二.架构介绍 在介绍使用Zookeeper实现分布式锁之前,首先看当前的系统架构图 解释: 左边的整个区域表示一个Zookeeper集群,locker是Zookeeper的一个持久节点,node_1.node_2.node_3是locker这个持久节点下面的临时顺序节点.client_1.client_2.client_n表示多个客户端,Service表示需要互斥访问的共享

  • ZooKeeper 实现分布式锁的方法示例

    ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅.负载均衡.分布式协调/通知.集群管理.Master 选举.分布式锁等功能. 节点 在介绍 ZooKeeper 分布式锁前需要先了解一下 ZooKeeper 中节点(Znode),ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock).每个 Znode 上都会保存自己的数

  • 基于Zookeeper实现分布式锁详解

    目录 1.什么是Zookeeper? 2.Zookeeper节点类型 3.Zookeeper环境搭建 4.Zookeeper基本使用 5.Zookeeper应用场景 6.Zookeeper分布式锁 7.公平式Zookeeper分布式锁 8.zookeeper和Redis锁对比? 1.什么是Zookeeper? Zookeeper是一个分布式的,开源的分布式应用程序协调服务,是Hadoop和hbase的重要组件. 引用官网的图例: 特征: zookeeper的数据机构是一种节点树的数据结构,zNo

  • 分析ZooKeeper分布式锁的实现

    目录 一.分布式锁方案比较 二.ZooKeeper实现分布式锁 2.1.方案一 2.2.方案二 一.分布式锁方案比较 方案 实现思路 优点 缺点 利用 MySQL 的实现方案 利用数据库自身提供的锁机制实现,要求数据库支持行级锁 实现简单 性能差,无法适应高并发场景:容易出现死锁的情况:无法优雅的实现阻塞式锁 利用 Redis 的实现方案 使用 Setnx 和 lua 脚本机制实现,保证对缓存操作序列的原子性 性能好 实现相对复杂,有可能出现死锁:无法优雅的实现阻塞式锁 利用 ZooKeeper

  • ZooKeeper框架教程Curator分布式锁实现及源码分析

    目录 如何使用InterProcessMutex 实现思路 代码实现概述 InterProcessMutex源码分析 实现接口 属性 构造方法 方法 获得锁 释放锁 LockInternals源码分析 获取锁 释放锁 总结 ZooKeeper入门教程一简介与核心概念 ZooKeeper入门教程二在单机和集群环境下的安装搭建及使用 ZooKeeper入门教程三分布式锁实现及完整运行源码 上一篇文章中,我们使用zookeeper的java api实现了分布式排他锁. Curator中有着更为标准.规

  • InterProcessMutex实现zookeeper分布式锁原理

    原理简介: zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且其他节点就会监听序号比自己小的节点,一旦序号比自己小的节点被删除了,其他节点就会得到相应的事件,然后查看自己是否为序号最小的节点,如果是,则获取锁. zookeeper节点图分析 InterProcessMutex实现的锁机制是公平且互斥的,公平的方式是按照每个请求的顺序进行排队的. InterProcessMutex实现的InterProcessLo

  • C# 实现Zookeeper分布式锁的参考示例

    目录 分布式锁 Zookeeper分布式锁原理 C#实现Zookeeper分布式锁 分布式锁 互联网初期,我们系统一般都是单点部署,也就是在一台服务器完成系统的部署,后期随着用户量的增加,服务器的压力也越来越大,响应速度越来越慢,甚至出现服务器崩溃的情况. 为解决服务器压力太大,响应慢的特点,分布式系统部署出现了. 简单的说,就是我们将系统资源部署到多台服务器中,然后使用一台服务器做入口代理,根据一些决策将接收到的请求转发到资源服务器,这也就是我们常说的 反向代理(一般就是使用nginx) 虽然

  • 详细解读分布式锁原理及三种实现方式

    目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题.分布式的CAP理论告诉我们"任何一个分布式系统都无法同时满足一致性(Consistency).可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项."所以,很多系统在设计之初就要对这三者做出取舍.在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证"最终一致性",只要这个最终

  • 深入浅出探索Java分布式锁原理

    目录 什么是分布式锁?它能干什么? 分布式锁实现方案 基于数据库的分布式锁实现方案 实现原理 方案分析 基于Redis的分布式锁实现方案 基于sentnx命令的实现原理 方案分析 基于Redisson实现 RedLock 方案分析 基于Zookeeper的分布式锁实现方案 实现原理 方案分析 分布式锁方案到底选哪个? 总结 什么是分布式锁?它能干什么? 相信大家对于Java提供的synchronized关键字以及Lock锁都不陌生,在实际的项目中大家都使用过.如下图所示,在同一个JVM进程中,T

  • Spring Boot 实现Redis分布式锁原理

    目录 分布式锁实现 引入jar包 封装工具类 模拟秒杀扣减库存 测试代码 方案优化 问题1:扣减库存逻辑无法保证原子性, 问题2:如果加锁失败,则会直接访问,无法重入锁 总结 分布式锁实现 引入jar包 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclus

  • redisson实现分布式锁原理

    Redisson分布式锁 之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的. 不同版本实现锁的机制并不相同 引用的redisson最近发布的版本3.2.3,不同的版本可能实现锁的机制并不相同,早期版本好像是采用简单的setnx,getset等常规命令来配置完成,而后期由于redis支持了脚本Lua变更了实现原理. <dependency> <groupId>org.redisson&

  • Java redisson实现分布式锁原理详解

    Redisson分布式锁 之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的. 不同版本实现锁的机制并不相同 引用的redisson最近发布的版本3.2.3,不同的版本可能实现锁的机制并不相同,早期版本好像是采用简单的setnx,getset等常规命令来配置完成,而后期由于redis支持了脚本Lua变更了实现原理. <dependency> <groupId>org.redisson&

随机推荐