C# 实现Zookeeper分布式锁的参考示例

目录
  •   分布式锁 
  •   Zookeeper分布式锁原理
  •   C#实现Zookeeper分布式锁

  分布式锁 

  互联网初期,我们系统一般都是单点部署,也就是在一台服务器完成系统的部署,后期随着用户量的增加,服务器的压力也越来越大,响应速度越来越慢,甚至出现服务器崩溃的情况。

  为解决服务器压力太大,响应慢的特点,分布式系统部署出现了。

  简单的说,就是我们将系统资源部署到多台服务器中,然后使用一台服务器做入口代理,根据一些决策将接收到的请求转发到资源服务器,这也就是我们常说的 反向代理(一般就是使用nginx)

  虽然分布式解决了服务器压力的问题,但也带来了新的问题。

  比如,我们有一个下单统计的功能,当完成下单后,需要执行统计功能,而在高访问的情况下,可能有两个下单请求(A和B)同时完成,然后一起执行了统计功能,这样可能导致的结果就是A请求未将B请求数据统计在内,而B请求可能也未将A请求数据统计在内,这样就造成了数据的统计错,这个问题的产生的根本原因就是统计功能的并发导致的,如果是单点部署的系统,我们简单的使用一个锁操作就能完成了,但是在分布式环境下,A和B请求可能同时运行在两个服务器中,普通的锁就不能起到效果了,这个时候就要使用分布式锁了。

  Zookeeper分布式锁原理

  分布式锁的实现发放有多种,简单的,我们可以使用数据库表去实现它,也可以使用redis去实现它,这里要使用的Zookeeper去实现分布式锁

  Zookeeper分布式锁的原理是巧妙的是使用了znode临时节点的特点和监听(watcher)机制,监听机制很简单,就是我们可以给znode添加一个监听器,当znode节点状态发生改变时(如:数据内容改变,节点被删除),会通知到监听器。

  前面几节介绍过znode有三种类型  

  PERSISTENT:持久节点,即使在创建该特定znode的客户端断开连接后,持久节点仍然存在。默认情况下,除非另有说明,否则所有znode都是持久的。
  EPHEMERAL:临时节点,客户端是连接状态时,临时节点就是有效的。当客户端与ZooKeeper集合断开连接时,临时节点会自动删除。临时节点不允许有子节点。临时节点在leader选举中起着重要作用。
  SEQUENTIAL:顺序节点,可以是持久的或临时的。当一个新的znode被创建为一个顺序节点时,ZooKeeper通过将10位的序列号附加到原始名称来设置znode的路径,顺序节点在锁定和同步中起重要作用。

  其中,顺序节点,可以是持久的或临时的,而临时节点有个特点,就是它属于创建它的那个会话,当会话断开,临时节点就会自动删除,如果在临时节点上注册了监听器,那么监听器就会收到通知,如果临时节点有了时间顺序,那我们为实现分布式锁就又有一个想法:

  假如在Zookeeper中有一个znode节点/Locker

  1、当client1连接Zookeeper时,先判断/Locker节点是否存在子节点,如果没有子节点,那么会在/Locker节点下创建一个临时顺序的znode节点,假如是/client1,表示client1获取了锁状态,client1可以继续执行。

  2、当client2连接Zookeeper时,先判断/Locker节点是否存在子节点,发现已经存在子节点了,然后获取/Locker下的所有子节点,同时按时间顺序排序,在最后一个节点,也就是/client1节点上注册一个监听器(watcher1),同时在/Locker节点下创建一个临时顺序的znode节点,假如是/client2。同时client2将被阻塞,而阻塞状态的释放是在监听器(watcher1)中的。

  3、当client3连接Zookeeper时,先判断/Locker节点是否存在子节点,发现已经存在子节点了,然后获取/Locker下的所有子节点,同时按时间顺序排序,在最后一个节点,也就是/client2节点上注册一个监听器(watcher2),同时在/Locker节点下创建一个临时顺序的znode节点,假如是/client3。同时client2将被阻塞,而阻塞状态的释放是在监听器(watcher2)中的。

  以此类推。

  4、当client1执行完操作了,断开Zookeeper的连接,因为/client1是临时顺序节点,于是将会自动删除,而client2已经往/client1节点中注册了一个监听器(watcher1),于是watcher1将会受到通知,而watcher1又会释放client2的阻塞状态。于是client2获取锁状态,继续执行。

  5、当client2执行完操作了,断开Zookeeper的连接,因为/client2是临时顺序节点,于是将会自动删除,而client3已经往/client2节点中注册了一个监听器(watcher2),于是watcher2将会受到通知,而watcher2又会释放client3的阻塞状态。于是client3获取锁状态,继续执行。

  以此类推。

  这样,不管分布式环境中有几台服务器,都可以保证程序的排队似的执行了。

  C#实现Zookeeper分布式锁

  上一节有封装过一个ZookeeperHelper的辅助类(Zookeeper基础教程(四):C#连接使用Zookeeper),使用这个辅助类实现了一个ZookeeperLocker类: 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace AspNetCore.ZookeeperConsole
{
    /// <summary>
    /// 基于Zookeeper的分布式锁
    /// </summary>
    public class ZookeeperLocker : IDisposable
    {
        /// <summary>
        /// 单点锁
        /// </summary>
        static object locker = new object();
        /// <summary>
        /// Zookeeper集群地址
        /// </summary>
        string[] address;
        /// <summary>
        /// Zookeeper操作辅助类
        /// </summary>
        ZookeeperHelper zookeeperHelper;

        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="lockerPath">分布式锁的根路径</param>
        /// <param name="address">集群地址</param>
        public ZookeeperLocker(string lockerPath, params string[] address) : this(lockerPath, 0, address)
        {
        }
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="lockerPath">分布式锁的根路径</param>
        /// <param name="sessionTimeout">回话过期时间</param>
        /// <param name="address">集群地址</param>
        public ZookeeperLocker(string lockerPath, int sessionTimeout, params string[] address)
        {
            this.address = address.ToArray();

            zookeeperHelper = new ZookeeperHelper(address, lockerPath);
            if (sessionTimeout > 0)
            {
                zookeeperHelper.SessionTimeout = sessionTimeout;
            }
            if (!zookeeperHelper.Connect())
            {
                throw new Exception("connect failed:" + string.Join(",", address));
            }
            lock (locker)
            {
                if (!zookeeperHelper.Exists())//根节点不存在则创建
                {
                    zookeeperHelper.SetData("", "", true);
                }
            }
        }
        /// <summary>
        /// 生成一个锁
        /// </summary>
        /// <returns>返回锁名</returns>
        public string CreateLock()
        {
            var path = Guid.NewGuid().ToString().Replace("-", "");
            while (zookeeperHelper.Exists(path))
            {
                path = Guid.NewGuid().ToString().Replace("-", "");
            }
            return CreateLock(path);
        }
        /// <summary>
        /// 使用指定的路径名称设置锁
        /// </summary>
        /// <param name="path">锁名,不能包含路径分隔符(/)</param>
        /// <returns>返回锁名</returns>
        public string CreateLock(string path)
        {
            if (path.Contains("/"))
            {
                throw new ArgumentException("invalid path");
            }
            return zookeeperHelper.SetData(path, "", false, true);
        }
        /// <summary>
        /// 获取锁
        /// </summary>
        /// <param name="path">锁名</param>
        /// <returns>如果获得锁返回true,否则一直等待</returns>
        public bool Lock(string path)
        {
            return LockAsync(path).GetAwaiter().GetResult();
        }
        /// <summary>
        /// 获取锁
        /// </summary>
        /// <param name="path">锁名</param>
        /// <param name="millisecondsTimeout">超时时间,单位:毫秒</param>
        /// <returns>如果获得锁返回true,否则等待指定时间后返回false</returns>
        public bool Lock(string path, int millisecondsTimeout)
        {
            return LockAsync(path, millisecondsTimeout).GetAwaiter().GetResult();
        }
        /// <summary>
        /// 异步获取锁等等
        /// </summary>
        /// <param name="path">锁名</param>
        /// <returns>如果获得锁返回true,否则一直等待</returns>
        public async Task<bool> LockAsync(string path)
        {
            return await LockAsync(path, System.Threading.Timeout.Infinite);
        }
        /// <summary>
        /// 异步获取锁等等
        /// </summary>
        /// <param name="path">锁名</param>
        /// <param name="millisecondsTimeout">超时时间,单位:毫秒</param>
        /// <returns>如果获得锁返回true,否则等待指定时间后返回false</returns>
        public async Task<bool> LockAsync(string path, int millisecondsTimeout)
        {
            var array = await zookeeperHelper.GetChildrenAsync("", true);
            if (array != null && array.Length > 0)
            {
                var first = array.FirstOrDefault();
                if (first == path)//正好是优先级最高的,则获得锁
                {
                    return true;
                }

                var index = array.ToList().IndexOf(path);
                if (index > 0)
                {
                    //否则添加监听
                    var are = new AutoResetEvent(false);
                    var watcher = new NodeWatcher();
                    watcher.NodeDeleted += (ze) =>
                    {
                        are.Set();
                    };
                    if (await zookeeperHelper.WatchAsync(array[index - 1], watcher))//监听顺序节点中的前一个节点
                    {
                        if (!are.WaitOne(millisecondsTimeout))
                        {
                            return false;
                        }
                    }

                    are.Dispose();
                }
                else
                {
                    throw new InvalidOperationException($"no locker found in path:{zookeeperHelper.CurrentPath}");
                }
            }
            return true;
        }
        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            zookeeperHelper.Dispose();
        }
    }
}

  现在写个程序可以模拟一下

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace AspNetCore.ZookeeperConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            //Zookeeper连接字符串,采用host:port格式,多个地址之间使用逗号(,)隔开
            string[] address = new string[] { "192.168.209.133:2181", "192.168.209.133:2181", "192.168.209.133:2181" };
            //会话超时时间,单位毫秒
            int sessionTimeOut = 10000;
            //锁节点根路径
            string lockerPath = "/Locker";

            for (var i = 0; i < 10; i++)
            {
                string client = "client" + i;
                //多线程模拟并发
                new Thread(() =>
                {
                    using (ZookeeperLocker zookeeperLocker = new ZookeeperLocker(lockerPath, sessionTimeOut, address))
                    {
                        string path = zookeeperLocker.CreateLock();
                        if (zookeeperLocker.Lock(path))
                        {
                            //模拟处理过程
                            Console.WriteLine($"【{client}】获得锁:{DateTime.Now}");
                            Thread.Sleep(3000);
                            Console.WriteLine($"【{client}】处理完成:{DateTime.Now}");
                        }
                        else
                        {
                            Console.WriteLine($"【{client}】获得锁失败:{DateTime.Now}");
                        }
                    }
                }).Start();
            }

            Console.ReadKey();
        }
    }
}

  运行结果如下:

   

  可以发现,锁功能是实现了的

  如果程序运行中打印日志:Client session timed out, have not heard from server in 8853ms for sessionid 0x1000000ec5500b2

  或者直接抛出异常:org.apache.zookeeper.KeeperException.ConnectionLossException:“Exception_WasThrown”

  只需要适当调整sessionTimeOut时间即可

以上就是C# 实现Zookeeper分布式锁的参考示例的详细内容,更多关于C# 实现Zookeeper分布式锁的资料请关注我们其它相关文章!

(0)

相关推荐

  • ZooKeeper 实现分布式锁的方法示例

    ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅.负载均衡.分布式协调/通知.集群管理.Master 选举.分布式锁等功能. 节点 在介绍 ZooKeeper 分布式锁前需要先了解一下 ZooKeeper 中节点(Znode),ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock).每个 Znode 上都会保存自己的数

  • 浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

    通过zookeeper实现分布式锁 1.创建zookeeper的client 首先通过CuratorFrameworkFactory创建一个连接zookeeper的连接CuratorFramework client public class CuratorFactoryBean implements FactoryBean<CuratorFramework>, InitializingBean, DisposableBean { private static final Logger LOGG

  • 如何操作Redis和zookeeper实现分布式锁

    如何操作Redis和zookeeper实现分布式锁 在分布式场景下,有很多种情况都需要实现最终一致性.在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可以共享存储(领域模型和消息的持久化数据源),或者做全局XA事务(两阶段提交,数据源可分开),也可以借助消息中间件(消费者处理需要能幂等).通过Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性. 1.分布式锁 分布式锁一般用在分布

  • zookeeper实现分布式锁

    一.分布式锁介绍 分布式锁主要用于在分布式环境中保护跨进程.跨主机.跨网络的共享资源实现互斥访问,以达到保证数据的一致性. 二.架构介绍 在介绍使用Zookeeper实现分布式锁之前,首先看当前的系统架构图 解释: 左边的整个区域表示一个Zookeeper集群,locker是Zookeeper的一个持久节点,node_1.node_2.node_3是locker这个持久节点下面的临时顺序节点.client_1.client_2.client_n表示多个客户端,Service表示需要互斥访问的共享

  • 分布式锁为什么要选择Zookeeper而不是Redis?看完这篇你就明白了

    在分布式的应用中,为了防止单点故障,保障高可用,通常会采用主从结构,当主节点挂掉后,从节点可以代替主节点提供服务. Redis通过复制 + sentinel哨兵来实现主从模式. Zookeeper通过replicated mode复制模式来实现主从模式. 单从结构上看,Redis和Zookeeper都是主从架构,那Zookeeper的优势是什么?为什么要选择Zookeeper?难道只是因为Zookeeper是目录结构,Redis是K-V结构吗? 同步机制的不同 Redis Redis在给从节点同

  • 分析ZooKeeper分布式锁的实现

    目录 一.分布式锁方案比较 二.ZooKeeper实现分布式锁 2.1.方案一 2.2.方案二 一.分布式锁方案比较 方案 实现思路 优点 缺点 利用 MySQL 的实现方案 利用数据库自身提供的锁机制实现,要求数据库支持行级锁 实现简单 性能差,无法适应高并发场景:容易出现死锁的情况:无法优雅的实现阻塞式锁 利用 Redis 的实现方案 使用 Setnx 和 lua 脚本机制实现,保证对缓存操作序列的原子性 性能好 实现相对复杂,有可能出现死锁:无法优雅的实现阻塞式锁 利用 ZooKeeper

  • java使用zookeeper实现的分布式锁示例

    使用zookeeper实现的分布式锁 分布式锁,实现了Lock接口 复制代码 代码如下: package com.concurrent; import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeU

  • 浅谈分布式锁的几种使用方式(redis、zookeeper、数据库)

    Q:一个业务服务器,一个数据库,操作:查询用户当前余额,扣除当前余额的3%作为手续费 synchronized lock dblock Q:两个业务服务器,一个数据库,操作:查询用户当前余额,扣除当前余额的3%作为手续费 分布式锁 我们需要怎么样的分布式锁? 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行. 这把锁要是一把可重入锁(避免死锁) 这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条) 这把锁最好是一把公平锁(根据业务需求考虑要不要这条) 有高可用

  • C# 实现Zookeeper分布式锁的参考示例

    目录 分布式锁 Zookeeper分布式锁原理 C#实现Zookeeper分布式锁 分布式锁 互联网初期,我们系统一般都是单点部署,也就是在一台服务器完成系统的部署,后期随着用户量的增加,服务器的压力也越来越大,响应速度越来越慢,甚至出现服务器崩溃的情况. 为解决服务器压力太大,响应慢的特点,分布式系统部署出现了. 简单的说,就是我们将系统资源部署到多台服务器中,然后使用一台服务器做入口代理,根据一些决策将接收到的请求转发到资源服务器,这也就是我们常说的 反向代理(一般就是使用nginx) 虽然

  • Java ZooKeeper分布式锁实现图解

    什么是分布式锁 1.在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题. 2.但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境(多台机器),跨JVM之间已经无法通过多线程的锁解决同步问题.那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题--这就是分布式锁.(多节点从分布式组件中获取锁) 例如以下实例: 各种抢票软件客户端通过zoo

  • InterProcessMutex实现zookeeper分布式锁原理

    原理简介: zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且其他节点就会监听序号比自己小的节点,一旦序号比自己小的节点被删除了,其他节点就会得到相应的事件,然后查看自己是否为序号最小的节点,如果是,则获取锁. zookeeper节点图分析 InterProcessMutex实现的锁机制是公平且互斥的,公平的方式是按照每个请求的顺序进行排队的. InterProcessMutex实现的InterProcessLo

  • Redis实现分布式锁的方法示例

    之前我们使用的定时任务都是只部署在了单台机器上,为了解决单点的问题,为了保证一个任务,只被一台机器执行,就需要考虑锁的问题,于是就花时间研究了这个问题.到底怎样实现一个分布式锁呢? 锁的本质就是互斥,保证任何时候能有一个客户端持有同一个锁,如果考虑使用redis来实现一个分布式锁,最简单的方案就是在实例里面创建一个键值,释放锁的时候,将键值删除.但是一个可靠完善的分布式锁需要考虑的细节比较多,我们就来看看如何写一个正确的分布式锁. 单机版分布式锁 SETNX 所以我们直接基于 redis 的 s

  • SpringBoot集成Redisson实现分布式锁的方法示例

    上篇 <SpringBoot 集成 redis 分布式锁优化>对死锁的问题进行了优化,今天介绍的是 redis 官方推荐使用的 Redisson ,Redisson 架设在 redis 基础上的 Java 驻内存数据网格(In-Memory Data Grid),基于NIO的 Netty 框架上,利用了 redis 键值数据库.功能非常强大,解决了很多分布式架构中的问题. Github的wiki地址: https://github.com/redisson/redisson/wiki 官方文档

  • SpringBoot整合分布式锁redisson的示例代码

    目录 1.导入maven坐标 2.redisson配置类(如果redis没有密码就不需要private String password) 3.创建redisson的bean 4.测试,入队 5.测试,出队 6.分布式锁 1.导入maven坐标 <!-- 用redisson作为所有分布式锁,分布式对象等功能框架--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson&

  • springboot+zookeeper实现分布式锁的示例代码

    目录 依赖 本地封装 配置 测试代码 JMeter测试 InterProcessMutex内部实现了zookeeper分布式锁的机制,所以接下来我们尝试使用这个工具来为我们的业务加上分布式锁处理的功能 zookeeper分布式锁的特点:1.分布式 2.公平锁 3.可重入 依赖 <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId>

  • 基于Zookeeper实现分布式锁详解

    目录 1.什么是Zookeeper? 2.Zookeeper节点类型 3.Zookeeper环境搭建 4.Zookeeper基本使用 5.Zookeeper应用场景 6.Zookeeper分布式锁 7.公平式Zookeeper分布式锁 8.zookeeper和Redis锁对比? 1.什么是Zookeeper? Zookeeper是一个分布式的,开源的分布式应用程序协调服务,是Hadoop和hbase的重要组件. 引用官网的图例: 特征: zookeeper的数据机构是一种节点树的数据结构,zNo

随机推荐