ZooKeeper 实现分布式锁的方法示例

ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、分布式协调/通知、集群管理、Master 选举、分布式锁等功能。

节点

在介绍 ZooKeeper 分布式锁前需要先了解一下 ZooKeeper 中节点(Znode),ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock)。每个 Znode 上都会保存自己的数据内容,同时还会保存一系列属性信息。

Znode 又分为以下四种类型:

类型 描述
持久节点 节点创建后,会一直存在,不会因客户端会话失效而删除
持久顺序节点 基本特性与持久节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名
临时节点 客户端会话失效或连接关闭后,该节点会被自动删除
临时顺序节点 基本特性与临时节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名

锁原理

ZooKeeper 分布式锁是基于 临时顺序节点 来实现的,锁可理解为 ZooKeeper 上的一个节点,当需要获取锁时,就在这个锁节点下创建一个临时顺序节点。当存在多个客户端同时来获取锁,就按顺序依次创建多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其他节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者就可以马上获得锁。

而且用临时顺序节点的另外一个用意是如果某个客户端创建临时顺序节点后,自己意外宕机了也没关系,ZooKeeper 感知到某个客户端宕机后会自动删除对应的临时顺序节点,相当于自动释放锁。

如上图:ClientA 和 ClientB 同时想获取锁,所以都在 locks 节点下创建了一个临时节点 1 和 2,而 1 是当前 locks 节点下排列序号第一的节点,所以 ClientA 获取锁成功,而 ClientB 处于等待状态,这时 ZooKeeper 中的 2 节点会监听 1 节点,当 1节点锁释放(节点被删除)时,2 就变成了 locks 节点下排列序号第一的节点,这样 ClientB 就获取锁成功了。

代码测试

请确保 ZooKeeper 服务已启动,ZooKeeper 的搭建可参考Kafka 集群 中的 ZooKeeper 集群部分

以下是基于 C# 的测试,Java 可使用 Curator 框架,实现原理和上面描述是一致的,有兴趣可以看看源码,应该也不难理解。

创建 .NET Core 控制台程序 Nuget

安装 ZooKeeperNetEx.Recipes

创建 ZooKeeper Client

private const int CONNECTION_TIMEOUT = 50000;
private const string CONNECTION_STRING = "127.0.0.1:2181";
private ZooKeeper CreateClient()
{
	var zooKeeper = new ZooKeeper(CONNECTION_STRING, CONNECTION_TIMEOUT, NullWatcher.Instance);
	Stopwatch sw = new Stopwatch();
	sw.Start();
	while (sw.ElapsedMilliseconds < CONNECTION_TIMEOUT)
	{
		var state = zooKeeper.getState();
		if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTING)
		{
			break;
		}
	}
	sw.Stop();
	return zooKeeper;
}

class NullWatcher : Watcher
  {
    public static readonly NullWatcher Instance = new NullWatcher();
    private NullWatcher() { }
    public override Task process(WatchedEvent @event)
    {
      return Task.CompletedTask;
    }
  }

添加 Lock 方法

/// <summary>
/// 加锁
/// </summary>
/// <param name="key">加锁的节点名</param>
/// <param name="lockAcquiredAction">加锁成功后需要执行的逻辑</param>
/// <param name="lockReleasedAction">锁释放后需要执行的逻辑,可为空</param>
/// <returns></returns>
public async Task Lock(string key, Action lockAcquiredAction, Action lockReleasedAction = null)
{
	// 获取 ZooKeeper Client
	ZooKeeper keeper = CreateClient();
	// 指定锁节点
	WriteLock writeLock = new WriteLock(keeper, $"/{key}", null);

	var lockCallback = new LockCallback(() =>
	{
		lockAcquiredAction.Invoke();
		writeLock.unlock();
	}, lockReleasedAction);
	// 绑定锁获取和释放的监听对象
	writeLock.setLockListener(lockCallback);
	// 获取锁(获取失败时会监听上一个临时节点)
	await writeLock.Lock();
}

class LockCallback : LockListener
{
	private readonly Action _lockAcquiredAction;
	private readonly Action _lockReleasedAction;

	public LockCallback(Action lockAcquiredAction, Action lockReleasedAction)
	{
		_lockAcquiredAction = lockAcquiredAction;
		_lockReleasedAction = lockReleasedAction;
	}

	/// <summary>
	/// 获取锁成功回调
	/// </summary>
	/// <returns></returns>
	public Task lockAcquired()
	{
		_lockAcquiredAction?.Invoke();
		return Task.FromResult(0);
	}

	/// <summary>
	/// 释放锁成功回调
	/// </summary>
	/// <returns></returns>
	public Task lockReleased()
	{
		_lockReleasedAction?.Invoke();
		return Task.FromResult(0);
	}
}

多线程模拟测试

static async Task RunAsync()
{
	Parallel.For(1, 10, async (i) =>
	{
		await new ZooKeeprDistributedLock().Lock("locks", () =>
		{
			Console.WriteLine($"第{i}个请求,获取锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}");
			Thread.Sleep(1000); // 业务逻辑...
		}, () =>
		{
			Console.WriteLine($"第{i}个请求,释放锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}");
			Console.WriteLine("-------------------------------");
		});
	});
	await Task.CompletedTask;
}

虽然模拟的是多线程并行执行,但最终都会依赖锁的获取和释放而串行执行实际业务逻辑。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 浅谈分布式锁的几种使用方式(redis、zookeeper、数据库)

    Q:一个业务服务器,一个数据库,操作:查询用户当前余额,扣除当前余额的3%作为手续费 synchronized lock dblock Q:两个业务服务器,一个数据库,操作:查询用户当前余额,扣除当前余额的3%作为手续费 分布式锁 我们需要怎么样的分布式锁? 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行. 这把锁要是一把可重入锁(避免死锁) 这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条) 这把锁最好是一把公平锁(根据业务需求考虑要不要这条) 有高可用

  • 如何操作Redis和zookeeper实现分布式锁

    如何操作Redis和zookeeper实现分布式锁 在分布式场景下,有很多种情况都需要实现最终一致性.在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可以共享存储(领域模型和消息的持久化数据源),或者做全局XA事务(两阶段提交,数据源可分开),也可以借助消息中间件(消费者处理需要能幂等).通过Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性. 1.分布式锁 分布式锁一般用在分布

  • 浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

    通过zookeeper实现分布式锁 1.创建zookeeper的client 首先通过CuratorFrameworkFactory创建一个连接zookeeper的连接CuratorFramework client public class CuratorFactoryBean implements FactoryBean<CuratorFramework>, InitializingBean, DisposableBean { private static final Logger LOGG

  • zookeeper实现分布式锁

    一.分布式锁介绍 分布式锁主要用于在分布式环境中保护跨进程.跨主机.跨网络的共享资源实现互斥访问,以达到保证数据的一致性. 二.架构介绍 在介绍使用Zookeeper实现分布式锁之前,首先看当前的系统架构图 解释: 左边的整个区域表示一个Zookeeper集群,locker是Zookeeper的一个持久节点,node_1.node_2.node_3是locker这个持久节点下面的临时顺序节点.client_1.client_2.client_n表示多个客户端,Service表示需要互斥访问的共享

  • java使用zookeeper实现的分布式锁示例

    使用zookeeper实现的分布式锁 分布式锁,实现了Lock接口 复制代码 代码如下: package com.concurrent; import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeU

  • ZooKeeper 实现分布式锁的方法示例

    ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅.负载均衡.分布式协调/通知.集群管理.Master 选举.分布式锁等功能. 节点 在介绍 ZooKeeper 分布式锁前需要先了解一下 ZooKeeper 中节点(Znode),ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock).每个 Znode 上都会保存自己的数

  • Redis实现分布式锁的方法示例

    之前我们使用的定时任务都是只部署在了单台机器上,为了解决单点的问题,为了保证一个任务,只被一台机器执行,就需要考虑锁的问题,于是就花时间研究了这个问题.到底怎样实现一个分布式锁呢? 锁的本质就是互斥,保证任何时候能有一个客户端持有同一个锁,如果考虑使用redis来实现一个分布式锁,最简单的方案就是在实例里面创建一个键值,释放锁的时候,将键值删除.但是一个可靠完善的分布式锁需要考虑的细节比较多,我们就来看看如何写一个正确的分布式锁. 单机版分布式锁 SETNX 所以我们直接基于 redis 的 s

  • SpringBoot集成Redisson实现分布式锁的方法示例

    上篇 <SpringBoot 集成 redis 分布式锁优化>对死锁的问题进行了优化,今天介绍的是 redis 官方推荐使用的 Redisson ,Redisson 架设在 redis 基础上的 Java 驻内存数据网格(In-Memory Data Grid),基于NIO的 Netty 框架上,利用了 redis 键值数据库.功能非常强大,解决了很多分布式架构中的问题. Github的wiki地址: https://github.com/redisson/redisson/wiki 官方文档

  • C# 实现Zookeeper分布式锁的参考示例

    目录 分布式锁 Zookeeper分布式锁原理 C#实现Zookeeper分布式锁 分布式锁 互联网初期,我们系统一般都是单点部署,也就是在一台服务器完成系统的部署,后期随着用户量的增加,服务器的压力也越来越大,响应速度越来越慢,甚至出现服务器崩溃的情况. 为解决服务器压力太大,响应慢的特点,分布式系统部署出现了. 简单的说,就是我们将系统资源部署到多台服务器中,然后使用一台服务器做入口代理,根据一些决策将接收到的请求转发到资源服务器,这也就是我们常说的 反向代理(一般就是使用nginx) 虽然

  • springboot+zookeeper实现分布式锁的示例代码

    目录 依赖 本地封装 配置 测试代码 JMeter测试 InterProcessMutex内部实现了zookeeper分布式锁的机制,所以接下来我们尝试使用这个工具来为我们的业务加上分布式锁处理的功能 zookeeper分布式锁的特点:1.分布式 2.公平锁 3.可重入 依赖 <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId>

  • mysql居然还能实现分布式锁的方法

    前言 之前的文章中通过电商场景中秒杀的例子和大家分享了单体架构中锁的使用方式,但是现在很多应用系统都是相当庞大的,很多应用系统都是微服务的架构体系,那么在这种跨jvm的场景下,我们又该如何去解决并发. 单体应用锁的局限性 在进入实战之前简单和大家粗略聊一下互联网系统中的架构演进. 在互联网系统发展之初,消耗资源比较小,用户量也比较小,我们只部署一个tomcat应用就可以满足需求.一个tomcat我们可以看做是一个jvm的进程,当大量的请求并发到达系统时,所有的请求都落在这唯一的一个tomcat上

  • 基于Redis实现分布式锁的方法(lua脚本版)

    1.前言 在Java中,我们通过锁来避免由于竞争而造成的数据不一致问题.通常我们使用synchronized .Lock来实现.但是Java中的锁只能保证在同一个JVM进程内中可用,在跨JVM进程,例如分布式系统上则不可靠了. 2.分布式锁 分布式锁,是一种思想,它的实现方式有很多,如基于数据库实现.基于缓存(Redis等)实现.基于Zookeeper实现等等.为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件 互斥性:在任意时刻,只有一个客户端能持有锁. 不会发生死锁:即使客户端

随机推荐