Redisson公平锁的源码解读分享

目录
  • 前言
  • 公平锁
  • 加锁
  • 解锁
  • 总结

前言

我在上一篇文章聊了Redisson的分布式锁,这次继续来聊聊Redisson的公平锁。下面是官方原话:

它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。

源码版本:3.17.7

这是我 fork 的分支,添加了自己理解的中文注释:https://github.com/xiaoguyu/redisson

公平锁

先上官方例子:

RLock fairLock = redisson.getFairLock("anyLock");
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();

因为在Redisson中,公平锁和普通可重入锁的逻辑大体上一样,我在上一篇文章都介绍了,这里就不再赘述。下面开始介绍合理逻辑。

加锁

加锁的 lua 脚本在 RedissonFairLock#tryLockInnerAsync方法中

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        long wait = threadWaitTime;
        if (waitTime > 0) {
            wait = unit.toMillis(waitTime);
        }

        long currentTime = System.currentTimeMillis();
        if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
            ......
        }

        if (command == RedisCommands.EVAL_LONG) {
            return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                    // remove stale threads
                    "while true do " +  // list为空,证明没有人排队,退出循环
                        "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
                        "if firstThreadId2 == false then " +
                            "break;" +
                        "end;" +
                        // 能到这里,证明有人排队,拿出在排队的第一个人的超时时间,如果超时了,则移除相应数据
                        "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
                        "if timeout <= tonumber(ARGV[4]) then " +
                            // remove the item from the queue and timeout set
                            // NOTE we do not alter any other timeout
                            "redis.call('zrem', KEYS[3], firstThreadId2);" +
                            "redis.call('lpop', KEYS[2]);" +
                        "else " +
                            "break;" +
                        "end;" +
                    "end;" +

                    // check if the lock can be acquired now
                    // 检查是否可以获取锁。如果hash和list都不存在,或者线程队列的第一个是当前线程,则可以获取锁
                    "if (redis.call('exists', KEYS[1]) == 0) " +
                        "and ((redis.call('exists', KEYS[2]) == 0) " +
                            "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +

                        // remove this thread from the queue and timeout set
                        // 都获取锁了,当然要从线程队列和时间队列中移除
                        "redis.call('lpop', KEYS[2]);" +
                        "redis.call('zrem', KEYS[3], ARGV[2]);" +

                        // decrease timeouts for all waiting in the queue
                        // 刷新时间集合中的时间
                        "local keys = redis.call('zrange', KEYS[3], 0, -1);" +
                        "for i = 1, #keys, 1 do " +
                            "redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
                        "end;" +

                        // acquire the lock and set the TTL for the lease
                        // 和公平锁的设置一样,值加1并且设置过期时间
                        "redis.call('hset', KEYS[1], ARGV[2], 1);" +
                        "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                        "return nil;" +
                    "end;" +

                    // check if the lock is already held, and this is a re-entry
                    // 能到这里,证明前面拿不到锁,但是也要做可重入锁的处理
                    "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2],1);" +
                        "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                        "return nil;" +
                    "end;" +

                    // the lock cannot be acquired
                    // check if the thread is already in the queue
                    // 时间集合中有值,证明线程已经在队列中,不需要往后执行逻辑了
                    "local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
                    "if timeout ~= false then " +
                        // the real timeout is the timeout of the prior thread
                        // in the queue, but this is approximately correct, and
                        // avoids having to traverse the queue
                        // 因为下面的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4])
                        // 所以这里的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4])
                        "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
                    "end;" +

                    // add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
                    // the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the
                    // threadWaitTime
                    "local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
                    "local ttl;" +
                    // 如果最后一个线程不是当前线程,则从时间集合取出(举例:线程1/2/3按顺序获取锁,此时pttl得到的是线程1的锁过期时间,zscore拿到的是线程2的锁的过期时间,此时线程3应该以线程2的为准)
                    "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
                        "ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
                    "else " +
                        // 否则直接获取锁的存活时间
                        "ttl = redis.call('pttl', KEYS[1]);" +
                    "end;" +
                    // 过期时间 = 锁存活时间 + 等待时间 + 当前时间戳
                    "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
                    // 如果添加到时间集合成功,则同时添加线程集合
                    "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
                        "redis.call('rpush', KEYS[2], ARGV[2]);" +
                    "end;" +
                    "return ttl;",
                    Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),
                    unit.toMillis(leaseTime), getLockName(threadId), wait, currentTime);
        }

        throw new IllegalArgumentException();
    }

公平锁总共用了Redis的三种数据类型,对应着 lua 脚本里面的keys1、2、3的参数:

KEYS[1]

锁的名字,使用 Hash 数据类型,是可重入锁的基础,结构为 {”threadId1”: 1, “thread2”: 1},key为线程id,value是锁的次数

KEYS[2]

线程队列的名字,使用 List 数据类型,结构为 [ “threadId1”, “threadId2” ],按顺序存放需要获取锁的线程的id

KEYS[3]

时间队列的名字,使用 sorted set 数据类型,结构为 {”threadId2”:123, “threadId1”:190},key为线程id,value为获取锁的超时时间戳

我下面会用 锁、线程队列、时间队列 来表示这3个数据结构,需要注意下我的表述。

同样的,介绍下参数:

  • ARGV[1]:leaseTime 锁的持有时间
  • ARGV[2]:线程id(描述不太准确,暂时按这样理解)
  • ARGV[3]:waitTime 尝试获取锁的最大等待时间
  • ARGV[4]:currentTime 当前时间戳

接下来,我们一段一段分析 lua 脚本,首先看最开始的 while 循环

"while true do " +  // list为空,证明没有人排队,退出循环
    "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
    "if firstThreadId2 == false then " +
        "break;" +
    "end;" +
    // 能到这里,证明有人排队,拿出在排队的第一个人的超时时间,如果超时了,则移除相应数据
    "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
    "if timeout <= tonumber(ARGV[4]) then " +
        // 从时间队列和线程队列中移除
        "redis.call('zrem', KEYS[3], firstThreadId2);" +
        "redis.call('lpop', KEYS[2]);" +
    "else " +
        "break;" +
    "end;" +
"end;" +

具体的逻辑我在注释中写的很清楚了,看的时候记住 KEYS[2]、KEYS[3] 对应着线程队列和时间队列接口。主要注意的是,线程队列只有当一个线程持有锁,另一个线程获取不到锁时,才会有值(前面有人才排队,没人排什么队)。接着看第二段

// 检查是否可以获取锁。当锁不存在,并且线程队列不存在或者线程队列第一位是当前线程,则可以获取锁
"if (redis.call('exists', KEYS[1]) == 0) " +
    "and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +

    // remove this thread from the queue and timeout set
    // 都获取锁了,当然要从线程队列和时间队列中移除
    "redis.call('lpop', KEYS[2]);" +
    "redis.call('zrem', KEYS[3], ARGV[2]);" +

    // decrease timeouts for all waiting in the queue
    // 刷新时间队列中的时间
    "local keys = redis.call('zrange', KEYS[3], 0, -1);" +
    "for i = 1, #keys, 1 do " +
        "redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
    "end;" +

    // acquire the lock and set the TTL for the lease
    // 和公平锁的设置一样,值加1并且设置过期时间
    "redis.call('hset', KEYS[1], ARGV[2], 1);" +
    "redis.call('pexpire', KEYS[1], ARGV[1]);" +
    "return nil;" +
"end;" +

翻译翻译就是,锁不存在(别人没有持有锁)并且线程队列不存在或者线程队列第一位是当前线程(不用排队或者自己排第一)才能获得锁。因为时间队列中存放的是各个线程等待锁的超时时间戳,所以每次都需要刷新下。继续下一段逻辑

// 能到这里,证明前面拿不到锁,但是也要做可重入锁的处理
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
    "redis.call('hincrby', KEYS[1], ARGV[2],1);" +
    "redis.call('pexpire', KEYS[1], ARGV[1]);" +
    "return nil;" +
"end;" +

这是可重入锁的处理,继续下一段

// 时间队列中有值,证明线程已经在队列中,不需要往后执行逻辑了
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
    // the real timeout is the timeout of the prior thread
    // in the queue, but this is approximately correct, and
    // avoids having to traverse the queue
    // 因为下面的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4])
    // 所以这里的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4])
    "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +

举例子:线程1持有锁,线程2尝试第一次获取锁(不进入这段if),线程2第二次获取锁(进入了这段if)。继续下一段

"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
// 如果最后一个线程不是当前线程,则从时间集合取出(举例:线程1/2/3按顺序获取锁,此时pttl得到的是线程1的锁过期时间,zscore拿到的是线程2的锁的过期时间,此时线程3应该以线程2的为准)
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
    "ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
    // 否则直接获取锁的存活时间
    "ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
// 过期时间 = 锁存活时间 + 等待时间 + 当前时间戳
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
// 如果添加到时间集合成功,则同时添加线程集合
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
    "redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",

ttl 这段的获取逻辑,翻译翻译就是,如果前面有人排队,就以前面的超时时间为准,如果没人排队,就拿锁的超时时间。获取到 ttl ,就对添加到线程集合和时间集合。

以上就是公平锁的加锁 lua 脚本的全部逻辑。讲的有点乱,但是只要能搞清楚keys1、2、3对应着哪种数据类型,理解整个逻辑应该问题不大。

解锁

解锁的核心 lua 脚本是下面这段RedissonFairLock#unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // remove stale threads
            "while true do "  // 线程队列为空,证明没有人排队,退出循环
            + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
            + "if firstThreadId2 == false then "
                + "break;"
            + "end; "
            // 能到这里,证明有人排队,拿出在排队的第一个人的超时时间,如果超时了,则移除相应数据
            + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
            + "if timeout <= tonumber(ARGV[4]) then "
                + "redis.call('zrem', KEYS[3], firstThreadId2); "
                + "redis.call('lpop', KEYS[2]); "
            + "else "
                + "break;"
            + "end; "
          + "end;"
            // 如果锁不存在,则通过订阅发布机制通知下一个等待中的线程
          + "if (redis.call('exists', KEYS[1]) == 0) then " +
                "local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
                "if nextThreadId ~= false then " +
                    "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                "end; " +
                "return 1; " +
            "end;" +
            // 如果当前线程已经不存在锁里面,直接返回null
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // 可重入锁处理逻辑,对当前线程的锁次数减1
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                // 锁次数仍然大于0,则刷新锁的存活时间
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "end; " +

            // 删除锁
            "redis.call('del', KEYS[1]); " +
            // 订阅发布机制通知下一个等待中的线程
            "local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
            "if nextThreadId ~= false then " +
                "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
            "end; " +
            "return 1; ",
            Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),
            LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
}

算了,不想写了,看注释吧。

总结

本文介绍了Redisson的公平锁,逻辑大体上和普通可重入锁一致,核心在于 lua 脚本,运用了Redis的3种数据类型。

到此这篇关于Redisson公平锁的源码解读分享的文章就介绍到这了,更多相关Redisson公平锁内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Redisson分布式信号量RSemaphore的使用超详细讲解

    目录 一.RSemaphore的使用 二.RSemaphore设置许可数量 三.RSemaphore的加锁流程 四.RSemaphore的解锁流程 本篇文章基于redisson-3.17.6版本源码进行分析 一.RSemaphore的使用 @Test public void testRSemaphore() { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379&quo

  • Redisson如何解决redis分布式锁过期时间到了业务没执行完问题

    目录 面试问题 问题分析 如何回答 一.写在前面 二.Redisson实现Redis分布式锁的底层原理 (1)加锁机制 (2)锁互斥机制 (3)watch dog自动延期机制 (4)可重入加锁机制 (5)释放锁机制 (6)上述Redis分布式锁的缺点 总结 面试问题 Redis锁的过期时间小于业务的执行时间该如何续期? 问题分析 首先如果你之前用Redis的分布式锁的姿势正确,并且看过相应的官方文档的话,这个问题So easy.我们来看 很多同学在用分布式锁时,都是直接百度搜索找一个Redis分

  • Springboot基于Redisson实现Redis分布式可重入锁源码解析

    目录 一.前言 二.为什么使用Redisson 1.我们打开官网 2.我们可以看到官方让我们去使用其他 3.打开官方推荐 4.找到文档 三.Springboot整合Redisson 1.导入依赖 2.以官网为例查看如何配置 3.编写配置类 4.官网测试加锁例子 5.根据官网简单Controller接口编写 6.测试 四.lock.lock()源码分析 1.打开RedissonLock实现类 2.找到实现方法 3.按住Ctrl进去lock方法 4.进去尝试获取锁方法 5.查看tryLockInne

  • Redisson可重入锁解锁逻辑详细讲解

    目录 主动释放 自动释放 本篇文章基于redisson-3.17.6版本源码进行分析 相比较Redisson可重入锁的加锁逻辑,释放锁的逻辑就相对简单一些.释放锁分为主动释放和自动释放两种方式. 主动释放 我们查看org.redisson.RedissonLock#unlock()方法: public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e)

  • Redisson分布式锁的源码解读分享

    目录 前言 前置知识 分布式锁的思考 Redis订阅/发布机制 Redisson 加锁 订阅 解锁 看门狗 前言 Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid).Redisson有一样功能是可重入的分布式锁.本文来讨论一下这个功能的特点以及源码分析. 前置知识 在讲Redisson,咱们先来聊聊分布式锁的特点以及Redis的发布/订阅机制,磨刀不误砍柴工. 分布式锁的思考 首先思考下,如果我们自己去实现一个分布式锁,这个锁需要具备

  • Redisson RedLock红锁加锁实现过程及原理

    目录 一.主从redis架构中分布式锁存在的问题 二.红锁算法原理 三.红锁算法的使用 四.红锁加锁流程 五.RedLock算法问题 六.总结 本篇文章基于redisson-3.17.6版本源码进行分析 一.主从redis架构中分布式锁存在的问题 1.线程A从主redis中请求一个分布式锁,获取锁成功: 2.从redis准备从主redis同步锁相关信息时,主redis突然发生宕机,锁丢失了: 3.触发从redis升级为新的主redis: 4.线程B从继任主redis的从redis上申请一个分布式

  • Redisson公平锁的源码解读分享

    目录 前言 公平锁 加锁 解锁 总结 前言 我在上一篇文章聊了Redisson的分布式锁,这次继续来聊聊Redisson的公平锁.下面是官方原话: 它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程.所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒. 源码版本:3.17.7 这是我 fork 的分支,添加了自己理解的中文注释:https://g

  • Ajax::prototype 源码解读

    AJAX之旅(1):由prototype_1.3.1进入javascript殿堂-类的初探  还是决定冠上ajax的头衔,毕竟很多人会用这个关键词搜索.虽然我认为这只是个炒作的概念,不过不得不承认ajax叫起来要方便多了.所以ajax的意思我就不详细解释了. 写这个教程的起因很简单:经过一段时间的ajax学习,有一些体会,并且越发认识到ajax技术的强大,所以决定记录下来,顺便也是对自己思路的整理.有关这个教程的后续,请关注http://www.x2design.net 前几年,javascri

  • jQuery源码解读之addClass()方法分析

    本文较为详细的分析了jQuery源码解读之addClass()方法.分享给大家供大家参考.具体分析如下: 给jQuery原型对象扩展addClass功能,jQuery.fn就是jQuery.prototype 复制代码 代码如下: jQuery.fn.extend({ /* 可以看出这是一个函数名叫addClass的插件方法. */     addClass: function( value ) {         var classes, elem, cur, clazz, j, finalV

  • jQuery源码解读之hasClass()方法分析

    本文较为详细的分析了jQuery源码解读之hasClass()方法.分享给大家供大家参考.具体分析如下: 复制代码 代码如下: jQuery.fn.extend({     hasClass: function( selector ) { //将要检查的类名selector赋值给className, l为选择器选择的当前要检查的jQuery对象数组的长度.         var className = " " + selector + " ",          

  • jQuery源码解读之removeAttr()方法分析

    本文较为详细的分析了jQuery源码解读之removeAttr()方法.分享给大家供大家参考.具体分析如下: 扩展jQuery原型对象的方法: 复制代码 代码如下: jQuery.fn.extend({ //name,传入要DOM元素要移除的属性名.     removeAttr: function( name ) { //使用jQuery.fn对象,即jQuery原型对象的each方法遍历当前选择器选择的jQuery对象数组,并返回该jQuery对象以便链式调用.         return

  • jQuery源码解读之removeClass()方法分析

    本文较为详细的分析了jQuery源码解读之removeClass()方法.分享给大家供大家参考.具体分析如下: removeClass()方法和addClass()差别不大.这就来看看: 复制代码 代码如下: jQuery.fn.extend({     removeClass: function( value ) {         var classes, elem, cur, clazz, j, finalValue,             i = 0,             len

  • jQuery源码解读之extend()与工具方法、实例方法详解

    本文实例讲述了jQuery源码解读之extend()与工具方法.实例方法.分享给大家供大家参考,具体如下: 使用jQuery的时候会发现,jQuery中有的函数是这样使用的: $.get(); $.post(); $.getJSON(); 有些函数是这样使用的: $('div').css(); $('ul').find('li'); 有些函数是这样使用的: $('li').each(callback); $.each(lis,callback); 这里涉及到两个概念:工具方法与实例方法.通常我们

  • CI框架源码解读之利用Hook.php文件完成功能扩展的方法

    本文实例讲述了CI框架源码解读之利用Hook.php文件完成功能扩展的方法.分享给大家供大家参考,具体如下: 看了hook.php的源码,就知道CI使用hook来进行扩展的原理了. hook的基本知识 http://codeigniter.org.cn/user_guide/general/hooks.html CI中hook的使用经历了一个:开启hook,定义hook,调用hook,执行hook的过程. 手册中已经告知了开启.定义.调用的方法.那么hook的实现原理是啥呢. <?php if

  • 基于线程池的工作原理与源码解读

    随着cpu核数越来越多,不可避免的利用多线程技术以充分利用其计算能力.所以,多线程技术是服务端开发人员必须掌握的技术. 线程的创建和销毁,都涉及到系统调用,比较消耗系统资源,所以就引入了线程池技术,避免频繁的线程创建和销毁. 在Java用有一个Executors工具类,可以为我们创建一个线程池,其本质就是new了一个ThreadPoolExecutor对象.线程池几乎也是面试必考问题.本节结合源代码,说说ThreadExecutor的工作原理 一.线程池创建 先看一下ThreadPoolExec

随机推荐