c# 理解csredis库实现分布式锁的详细流程

声明:

这里首先使用的是csredis,地址是https://github.com/2881099/csredis

该库本身已经足够完善,这里我画蛇添足一下,为了方便自己的使用。

本身csredis库已经实现了完整的加锁和去锁的逻辑,这里实现的与库本身所实现的有以下几点区别(csredis实现代码位置为:https://github.com/2881099/csredis/blob/bb6d947695770333027f3936f80052041db41b64/src/CSRedisCore/CSRedisClient.cs#L4344,有兴趣可以去了解看下)

1. 去掉了csredis的锁续租部分的功能,尽量简化

2. 将锁的token的设定交给外部,使用guid也罢,使用id也行。通过已知的token,保证了你可以在任意地方以观察者的身份释放锁。

3. 尽量不修改其key的原本值,不添加前缀,防止在观测时出现不必要的麻烦。

逻辑:

加锁就是set 一个 key ,如果key 存在的情况下则返回失败。那么典型的命令就是setnx.

一个锁显然是需要一个过期时间的,那么我们可能要用到 expire命令。

释放锁则是一个del命令

查看锁的值是需要get命令

比较常见的加锁使用的是setnx,不过由于redis支持了SETkeytokenNXEX/PXmax-lock-time(sec/millsec) (设置key token 是否不存在才set 秒数模式/毫秒数模式 秒数或毫秒数) 这种传参模式,由此,这里更加推荐使用set 命令。

如果在我们的代码端执行del 则小概率发生以下情况:

  A 申请锁set x,过期时间为t。

  经过时间t后,A恰好忙完了,A通过get命令看看token是否一致,得到结果发现一致的。

A决定发送del到redis服务器,此时A恰好网络拥堵。

redis服务器由于锁x超时,进而释放了锁x。

  此时B恰好也申请了锁x,无过期时间。

A网络恢复,del命令发送成功。

结果 B的锁被A释放了。

幸好redis支持了lua脚本。让我们得以简单的实现过期,加锁,去锁功能,而不需要自己手动timer过期。

这里要使用到eval命令执行脚本。

代码

using CSRedis;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace CsRedis.Helper
{
    /// <summary>
    ///  基于csredis的简单封装
    /// </summary>
    public class CsRedisManager
    {

        private ConcurrentDictionary<string, CSRedisClient> _serviceNameWithClient;

        /// <summary>
        ///  初始化
        /// </summary>
        public void Init()
        {
            _serviceNameWithClient = new ConcurrentDictionary<string, CSRedisClient>();
        }
        ///  获取业务redis服务
        /// <param name="serviceName"></param>
        /// <returns></returns>
        public CSRedisClient GetRedisClient(string serviceName)
            CSRedisClient result = null;
            _serviceNameWithClient.TryGetValue(serviceName,out result);
            return result;
        ///  添加redis服务
        /// <param name="connectStr"></param>
        public bool AddRedisClient(string serviceName,string connectStr)
            CSRedisClient cSRedisClient = new CSRedisClient(connectStr);
            return _serviceNameWithClient.TryAdd(serviceName, cSRedisClient);
        ///  设置字符串型kv
        /// <param name="key">key</param>
        /// <param name="value">value</param>
        /// <param name="expireSecond">过期时间(秒)</param>
        /// <returns>是否成功</returns>
        public bool Set(string serviceName,string key,string value,int expireSecond=-1)
            var redisClient = GetRedisClient(serviceName);
            GetExceptionOfClient(redisClient);

            return redisClient.Set(key, value, expireSecond);

        ///  获取相应key的值
        /// <param name="key"></param>
        public string Get(string serviceName, string key)
            return redisClient.Get(key);
        ///  如果不存在则执行,存在则忽略
        /// <param name="value"></param>
        public bool SetNx(string serviceName, string key, string value)
            var res = redisClient.SetNx(key, value);
            return res;
        ///  带过期时间的setNx
        /// <param name="seconds"></param>
        public bool SetNx(string serviceName, string key, string value, int millSeconds = -1)
            var res = Set(serviceName, key, value, RedisExistence.Nx, millSeconds);
        ///  带过期时间的SetXx
        public bool SetXx(string serviceName, string key, string value, int millSeconds = -1)
            var res = Set(serviceName, key, value, RedisExistence.Xx, millSeconds);
        ///  带参数set
        /// <param name="existence"></param>
        public bool Set(string serviceName, string key, string value, RedisExistence existence, int millSeconds = -1)
            var res = redisClient.Set(key, value, millSeconds, existence);
        ///  设置生存时间
        public bool Expire(string serviceName, string key, int seconds)
            return redisClient.Expire(key, seconds);
        ///  获取剩余的生存时间(秒)
        public long Ttl(string serviceName, string key)
            return redisClient.Ttl(key);
        ///  删除del
        public long Del(string serviceName,params string[] keys)
            return redisClient.Del(keys);
        ///  执行脚本
        /// <param name="script"></param>
        /// <param name="args"></param>
        public object Eval(string serviceName, string script,string key,params object[] args)
            var res = redisClient.Eval(script, key,args);
        ///  添加共享锁
        public bool AddLock(string serviceName, string key,string token, int millSeconds = -1)
            var valRes = SetNx(serviceName, key, token, millSeconds);
            return valRes;
        ///  删除共享锁
        public bool ReleaseLock(string serviceName, string key,string token)
            var script = GetReleaseLockScript();
            var res = redisClient.Eval(script, key, token);
            if (0== (long)res)
            {
                return false;
            }
            return true;
        ///  获取键值
        /// <param name="pattern"></param>
        public string[] Keys(string serviceName, string pattern)
            var res = redisClient.Keys(pattern);
        ///  获取client发生异常
        /// <param name="client"></param>
        private void GetExceptionOfClient(CSRedisClient client)
            if (client == null)
                throw new Exception("无有效的redis服务");
        ///  lua脚本删除共享锁
        ///  解决在A申请锁 xxkey  过期的瞬间,B 申请锁xxkey,
        ///  此时恰好A执行到释放xxkey从而引起的异常释放
        private static  string GetReleaseLockScript()
            return "if redis.call(\"get\",KEYS[1]) == ARGV[1] \nthen\nreturn redis.call(\"del\", KEYS[1])\nelse\nreturn 0\nend";

    }

}

这里我把要单独执行的lua脚本单独提出来

if redis.call("get",KEYS[1]) == ARGV[1]
then
    return redis.call("del",KEYS[1])
else
    return 0
end

这段脚本对应的是c# 中的GetReleaseLockScript()方法中的文字。

这里我个人偷了个懒,按照道理,这里应该有个LoadScriptPath,加载脚本所在位置,调用的时候先检查脚本是否在内存中,不在则去LoadScriptPath找对应的脚本,方便不同的人协同合作。不过那个就是脚本管理器了,还要设计interface,有点偏离主题了。

下面是测试代码

using CsRedis.Helper;
using NUnit.Framework;

namespace TestProject
{
    public class Tests
    {
        [SetUp]
        public void Setup()
        {
        }
        [Test]
        public void Test1()
            CsRedisManager csRedisManager = new CsRedisManager();
            csRedisManager.Init();
            csRedisManager.AddRedisClient("TEST", "127.0.0.1:6379,password=123456, connectTimeout =1000,connectRetry=1,syncTimeout=10000,defaultDatabase=0");
            //csRedisManager.AddRedisClient("PRODUCT", "127.0.0.1:6379,password=123456, connectTimeout =1000,connectRetry=1,syncTimeout=10000,defaultDatabase=1");

            var token = "123";
            var lockKey = "LOCKKEY1";
            csRedisManager.AddLock("TEST", lockKey,token,20 * 1000);
            csRedisManager.ReleaseLock("TEST", lockKey, token);
    }
}

这里就是对于共享锁的一点简单实现,多了挺多与本次的命令无关的代码,海涵海涵

到此这篇关于c# 理解csredis实现分布式锁的文章就介绍到这了,更多相关c# 分布式锁内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • C#实现Redis的分布式锁

    目录 Redis实现分布式锁(悲观锁/乐观锁) Redis连接池 使用Redis的SetNX命令实现加锁, 调用方式 Redis实现分布式锁(悲观锁/乐观锁) 对锁的概念和应用场景在此就不阐述了,网上搜索有很多解释,只是我搜索到的使用C#利用Redis的SetNX命令实现的锁虽然能用,但是都不太适合我需要的场景. Redis有三个最基本属性来保证分布式锁的有效实现: 安全性: 互斥,在任何时候,只有一个客户端能持有锁. 活跃性A:没有死锁,即使客户端在持有锁的时候崩溃,最后也会有其他客户端能获得

  • C# 实现Zookeeper分布式锁的参考示例

    目录 分布式锁 Zookeeper分布式锁原理 C#实现Zookeeper分布式锁 分布式锁 互联网初期,我们系统一般都是单点部署,也就是在一台服务器完成系统的部署,后期随着用户量的增加,服务器的压力也越来越大,响应速度越来越慢,甚至出现服务器崩溃的情况. 为解决服务器压力太大,响应慢的特点,分布式系统部署出现了. 简单的说,就是我们将系统资源部署到多台服务器中,然后使用一台服务器做入口代理,根据一些决策将接收到的请求转发到资源服务器,这也就是我们常说的 反向代理(一般就是使用nginx) 虽然

  • c# 理解csredis库实现分布式锁的详细流程

    声明: 这里首先使用的是csredis,地址是https://github.com/2881099/csredis 该库本身已经足够完善,这里我画蛇添足一下,为了方便自己的使用. 本身csredis库已经实现了完整的加锁和去锁的逻辑,这里实现的与库本身所实现的有以下几点区别(csredis实现代码位置为:https://github.com/2881099/csredis/blob/bb6d947695770333027f3936f80052041db41b64/src/CSRedisCore/

  • Redis实现分布式锁方法详细

    目录 1. 单机数据一致性 2. 分布式数据一致性 3. Redis实现分布式锁 3.1 方式一 3.2 方式二(改进方式一) 3.3 方式三(改进方式二) 3.4 方式四(改进方式三) 3.5 方式五(改进方式四) 3.6 小结 在单体应用中,如果我们对共享数据不进行加锁操作,会出现数据一致性问题,我们的解决办法通常是加锁. 在分布式架构中,我们同样会遇到数据共享操作问题,本文章使用Redis来解决分布式架构中的数据一致性问题. 1. 单机数据一致性 单机数据一致性架构如下图所示:多个可客户访

  • win10系统VS2019配置点云库PCL1.12.1的详细流程

    目录 0 写在最前 1 PCL的下载安装 1.1 PCL下载 1.2 PCL安装 1.3 bin文件拷贝 1.4 其他问题 2 在VS中配置PCL 2.1 前期工作 2.1 添加包含目录 2.2 添加库目录 2.3 添加依赖项 3 PCL运行验证 4 写在最后 0 写在最前 这是本人昨天在VS2019上配置PCL点云库的一些基本流程以及一些问题的总结,有些问题我的解决办法不一定是最优的,仅供大家参考,当然这里也记录了我这个小白在配置过程中踩过的一些小坑,希望能为大家带来方便. 本文参考博文:ht

  • 详细解读分布式锁原理及三种实现方式

    目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题.分布式的CAP理论告诉我们"任何一个分布式系统都无法同时满足一致性(Consistency).可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项."所以,很多系统在设计之初就要对这三者做出取舍.在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证"最终一致性",只要这个最终

  • 浅谈分布式锁的几种使用方式(redis、zookeeper、数据库)

    Q:一个业务服务器,一个数据库,操作:查询用户当前余额,扣除当前余额的3%作为手续费 synchronized lock dblock Q:两个业务服务器,一个数据库,操作:查询用户当前余额,扣除当前余额的3%作为手续费 分布式锁 我们需要怎么样的分布式锁? 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行. 这把锁要是一把可重入锁(避免死锁) 这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条) 这把锁最好是一把公平锁(根据业务需求考虑要不要这条) 有高可用

  • Java基于redis实现分布式锁

    为了保证一个在高并发存场景下只能被同一个线程操作,java并发处理提供ReentrantLock或Synchronized进行互斥控制.但是这仅仅对单机环境有效.我们实现分布式锁大概通过三种方式. redis实现分布式锁 数据库实现分布式锁 zk实现分布式锁 实际上这三种和java对比看属于一类.都是属于程序外部锁. 原理剖析 上述三种分布式锁都是通过各自为依据对各个请求进行上锁,解锁从而控制放行还是拒绝.redis锁是基于其提供的setnx命令. setnx当且仅当key不存在.若给定key已

  • 详解基于redis实现分布式锁

    前言 为了保证一个在高并发存场景下只能被同一个线程操作,java并发处理提供ReentrantLock或Synchronized进行互斥控制.但是这仅仅对单机环境有效.我们实现分布式锁大概通过三种方式. redis实现分布式锁 数据库实现分布式锁 zk实现分布式锁 原理剖析 上述三种分布式锁都是通过各自为依据对各个请求进行上锁,解锁从而控制放行还是拒绝.redis锁是基于其提供的setnx命令. setnx当且仅当key不存在.若给定key已经存在,则setnx不做任何动作.setnx是一个原子

  • 关于分布式锁的三种实现方式

    目录 分布式锁实现方案 基于数据库实现分布式锁 基于Zookeeper实现分布式锁 加锁和解锁流程 利用curator实现 基于缓存实现分布式锁,以Redis为例 三种方案比较 Java中的锁主要包括synchronized锁和JUC包中的锁,这些锁都是针对单个JVM实例上的锁,对于分布式环境如果我们需要加锁就显得无能为力. 在单个JVM实例上,锁的竞争者通常是一些不同的线程,而在分布式环境中,锁的竞争者通常是一些不同的线程或者进程.如何实现在分布式环境中对一个对象进行加锁呢?答案就是分布式锁.

  • Spring Boot基于数据库如何实现简单的分布式锁

    1.简介 分布式锁的方式有很多种,通常方案有: 基于mysql数据库 基于redis 基于ZooKeeper 网上的实现方式有很多,本文主要介绍的是如果使用mysql实现简单的分布式锁,加锁流程如下图: 其实大致思想如下: 1.根据一个值来获取锁(也就是我这里的tag),如果当前不存在锁,那么在数据库插入一条记录,然后进行处理业务,当结束,释放锁(删除锁). 2.如果存在锁,判断锁是否过期,如果过期则更新锁的有效期,然后继续处理业务,当结束时,释放锁.如果没有过期,那么获取锁失败,退出. 2.数

  • 详解redis分布式锁的这些坑

    一.白话分布式 什么是分布式,用最简单的话来说,就是为了较低单个服务器的压力,将功能分布在不同的机器上面,本来一个程序员可以完成一个项目:需求->设计->编码->测试 但是项目多的时候,一个人也扛不住,这就需要不同的人进行分工合作了 这就是一个简单的分布式协同工作了: 二.分布式锁 首先看一个问题,如果说某个环节被终止或者别侵占,就会发生不可知的事情 这就会出现,设计好的或者设计的半成品会被破坏,导致后面环节出错: 这时候,我们就需要引入分布式锁的概念: 何为分布式锁 当在分布式模型下,

随机推荐