Redisson分布式锁的源码解读分享

目录
  • 前言
  • 前置知识
  • 分布式锁的思考
  • Redis订阅/发布机制
  • Redisson
    • 加锁
    • 订阅
    • 解锁
    • 看门狗

前言

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。Redisson有一样功能是可重入的分布式锁。本文来讨论一下这个功能的特点以及源码分析。

前置知识

在讲Redisson,咱们先来聊聊分布式锁的特点以及Redis的发布/订阅机制,磨刀不误砍柴工。

分布式锁的思考

首先思考下,如果我们自己去实现一个分布式锁,这个锁需要具备哪些功能?

  • 互斥(这是一个锁最基本的功能)
  • 锁失效机制(也就是可以设置锁定时长,防止死锁)
  • 高性能、高可用
  • 阻塞、非阻塞
  • 可重入、公平锁
  • 。。。

可见,实现一个分布式锁,需要考虑的东西有很多。那么,如果用Redis来实现分布式锁呢?如果只需要具备上面说的1、2点功能,要怎么写?(ps:我就不写了,自己想去)

Redis订阅/发布机制

Redisson中用到了Redis的订阅/发布机制,下面简单介绍下。

简单来说就是如果client2 、 client5 和 client1 订阅了 channel1,当有消息发布到 channel1 的时候,client2 、 client5 和 client1 都会收到这个消息。

图片来自 菜鸟教程-Redis发布订阅

Redisson

源码版本:3.17.7

下面以Redisson官方的可重入同步锁例子为入口,解读下源码。

RLock lock = redisson.getLock("anyLock");
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

加锁

我用时序图来表示加锁和订阅的过程。时序图中括号后面的c1、c2代表client1,client2

当线程2获取了锁但还没释放锁时,如果线程1去获取锁,会阻塞等待,直到线程2解锁,通过Redis的发布订阅机制唤醒线程1,再次去获取锁。

加锁方法是 lock.tryLock(100, 10, TimeUnit.SECONDS),对应着就是RedissonLock#tryLock

/**
 * 获取锁
 * @param waitTime  尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
 * @param leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
 * @param unit 时间单位
 * @return 获取锁成功返回true,失败返回false
 */
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();// 当前时间
    long threadId = Thread.currentThread().getId();// 当前线程id

    // 尝试加锁,加锁成功返回null,失败返回锁的剩余超时时间
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // 获取锁成功
    if (ttl == null) {
        return true;
    }

    // time小于0代表此时已经超过获取锁的等待时间,直接返回false
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        // 没看懂这个方法,里面里面空运行,有知道的大神还请不吝赐教
        acquireFailed(waitTime, unit, threadId);
        return false;
    }

    current = System.currentTimeMillis();
    CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    try {
        subscribeFuture.get(time, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.whenComplete((res, ex) -> {
                // 出现异常,取消订阅
                if (ex == null) {
                    unsubscribe(res, threadId);
                }
            });
        }
        acquireFailed(waitTime, unit, threadId);
        return false;
    } catch (ExecutionException e) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }

    try {
        // 判断是否超时(超过了waitTime)
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        while (true) {
            // 再次获取锁,成功则返回
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }

            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }

            // 阻塞等待信号量唤醒或者超时,接收到订阅时唤醒
            // 使用的是Semaphore#tryAcquire()
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }

            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        // 因为是同步操作,所以无论加锁成功或失败,都取消订阅
        unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
    }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
}

先看一下整体逻辑:

1.尝试加锁,成功直接返回true

2.判断超时

3.订阅

4.判断超时

5.循环 ( 尝试获取锁 → 判断超时 → 阻塞等待 )

tryLock方法看着很长,但是有很多代码都是重复的,本小节重点说一下尝试加锁的方法tryAcquire

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime > 0) {
        // 调用lua脚本,尝试加锁
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // 这里的if、else的区别就在于,如果没有设置leaseTime,就使用默认的internalLockLeaseTime(默认30秒)
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        // 如果ttlRemaining为空,也就是tryLockInnerAsync方法中的lua执行结果返回空,证明获取锁成功
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 如果没有设置锁的持有时间(leaseTime),则启动看门狗,定时给锁续期,防止业务逻辑未执行完成锁就过期了
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

在tryAcquireAsync方法中,主要分为两段逻辑:

1.调用lua脚本加锁:tryLockInnerAsync

2.看门狗:scheduleExpirationRenewal

看门狗在后面讲,本小节重点还是在加锁

// RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

Redisson使用了 Hash 结构来表示一个锁,这样 Hash 里面的 key 为线程id,value 为锁的次数。这样巧妙地解决了可重入锁的问题。

下面我们来分析下这段 lua 脚本的逻辑(下面说的threadId都是指变量,不是说key就叫’threadId’):

  • 如果锁(hash结构)不存在,则创建,并添加一个键值对 (threadId : 1),并设置锁的过期时间
  • 如果锁存在,则将键值对 threadId 对应的值 + 1,并设置锁的过期时间
  • 如果不如何1,2点,则返回锁的剩余过期时间

订阅

让我们把视线重新回到RedissonLock#tryLock中,当经过一些尝试获取锁,超时判断之后,代码来到while循环中。这个while循环是个死循环,只有成功获取锁或者超时,才会退出。一般死循环的设计中,都会有阻塞等待的代码,否则如果循环中的逻辑短时间拿不到结果,会造成资源抢占和浪费。阻塞代码就是下面这段

if (ttl >= 0 && ttl < time) {
    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

commandExecutor.getNow(subscribeFuture).getLatch() 得到的是一个Semaphore信号量对象,这是jdk的内置对象,Semaphore#tryAcquire表示阻塞并等待唤醒。那么信号量什么时候被唤醒呢?在订阅方法中RedissonLock#subscribe。订阅方法的逻辑也不少,咱们直接讲其最终调用的处理方法

// LockPubSub#onMessage
protected void onMessage(RedissonLockEntry value, Long message) {
    // 普通的解锁走的是这个
    if (message.equals(UNLOCK_MESSAGE)) {
        Runnable runnableToExecute = value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }
        // 这里就是唤醒信号量的地方
        value.getLatch().release();
    // 这个是读写锁用的
    } else if (message.equals(READ_UNLOCK_MESSAGE)) {
        while (true) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute == null) {
                break;
            }
            runnableToExecute.run();
        }

        value.getLatch().release(value.getLatch().getQueueLength());
    }
}

value.getLatch().release() 也就是Semaphore#release ,会唤醒Semaphore#tryAcquire阻塞的线程

解锁

上面我们聊了加锁,本小节来聊下解锁。调用路径如下

// RedissonLock#unlock
// RedissonBaseLock#unlockAsync(long threadId)
public RFuture<Void> unlockAsync(long threadId) {
    // 调用lua解锁
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    CompletionStage<Void> f = future.handle((opStatus, e) -> {
        // 取消看门狗
        cancelExpirationRenewal(threadId);

        if (e != null) {
            throw new CompletionException(e);
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            throw new CompletionException(cause);
        }

        return null;
    });

    return new CompletableFutureWrapper<>(f);
}

解锁的逻辑不复杂,调用lua脚本解锁以及取消看门狗。看门狗晚点说,先说下lua解锁

// RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

老规矩,分析下这段lua:

  • 如果锁不存在,返回null
  • 锁的值减1,如果锁的值大于0(也就是可重入锁仍然有加锁次数),则重新设置过期时间
  • 如果锁的值小于等于0,这说明可以真正解锁了,删除锁并通过发布订阅机制发布解锁消息

从 lua 中可以看到,解锁时会发布消息到 channel 中,加锁方法RedissonLock#tryLock中有相对应的订阅操作。

看门狗

试想一个场景:程序执行需要10秒,程序执行完成才去解锁,而锁的存活时间只有5秒,也就是程序执行到一半的时候锁就可以被其他程序获取了,这显然不合适。那么怎么解决呢?

方式一:锁永远存在,直到解锁。不设置存活时间。

这种方法的弊端在于,如果程序没解锁就挂了,锁就成了死锁

方式二:依然设置锁存活时间,但是监控程序的执行,如果程序还没有执行完成,则定期给锁续期。

方式二就是Redisson的看门狗机制。看门狗只有在没有显示指定锁的持有时间(leaseTime)时才会生效。

// RedissonLock#tryAcquireAsync
// RedissonBaseLock#scheduleExpirationRenewal
protected void scheduleExpirationRenewal(long threadId) {
    // 创建ExpirationEntry,并放入EXPIRATION_RENEWAL_MAP中,下面的renewExpiration()方法会从map中再拿出来用
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            // 看门狗的具体逻辑
            renewExpiration();
        } finally {
            // 如果线程被中断了,就取消看门狗
            if (Thread.currentThread().isInterrupted()) {
                // 取消看门狗
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

scheduleExpirationRenewal 方法处理了ExpirationEntry和如果出现异常则取消看门狗,具体看门狗逻辑在 renewExpiration 方法中

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }

    // 创建延时任务,延时时间是internalLockLeaseTime / 3
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }

            // lua脚本判断,如果锁存在,则续期并返回true,不存在则返回false
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                if (res) {
                    // 锁续期成功,则再启动一个延时任务,继续监测
                    renewExpiration();
                } else {
                    // 取消看门狗
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
}

Timeout 是一个延时任务,延时 internalLockLeaseTime / 3 时间执行。任务的内容主要是通过 renewExpirationAsync 方法对锁进行续期,如果续期失败(解锁了、锁到期等),则取消看门狗,如果续期成功,则递归 renewExpiration 方法,继续创建延时任务。

internalLockLeaseTime 也就是 lockWatchdogTimeout 参数,默认是 30 秒。

以上就是Redisson分布式锁的源码解读分享的详细内容,更多关于Redisson分布式锁的资料请关注我们其它相关文章!

(0)

相关推荐

  • Redisson分布式锁源码解析

    Redisson锁继承Implements Reentrant Lock,所以具备 Reentrant Lock 锁中的一些特性:超时,重试,可中断等.加上Redisson中Redis具备分布式的特性,所以非常适合用来做Java中的分布式锁. 下面我们对其加锁.解锁过程中的源码细节进行一一分析. 锁的接口定义了一下方法: 分布式锁当中加锁,我们常用的加锁接口: boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws

  • redisson 实现分布式锁的源码解析

    目录 redisson 测试代码 加锁设计 锁续期设计 锁的自旋重试 解锁设计 撤销锁续期 解锁成功唤排队线程 redisson redisson 实现分布式锁的机制如下: 依赖版本 implementation 'org.redisson:redisson-spring-boot-starter:3.17.0' 测试代码 下面是模拟一个商品秒杀的场景,示例代码如下: public class RedissonTest { public static void main(String[] arg

  • Java redisson实现分布式锁原理详解

    Redisson分布式锁 之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的. 不同版本实现锁的机制并不相同 引用的redisson最近发布的版本3.2.3,不同的版本可能实现锁的机制并不相同,早期版本好像是采用简单的setnx,getset等常规命令来配置完成,而后期由于redis支持了脚本Lua变更了实现原理. <dependency> <groupId>org.redisson&

  • Java使用Redisson分布式锁实现原理

    1. 基本用法 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.8.2</version> </dependency> Config config = new Config(); config.useClusterServers() .setScanInterval(2000) /

  • 使用自定义注解实现redisson分布式锁

    目录 自定义注解实现redisson分布式锁 自定义注解 aop解析注解 service中使用注解加锁使用 redisson分布式锁应用 应用场景 Redisson管理类 分布式锁 测试类 自定义注解实现redisson分布式锁 自定义注解 package com.example.demo.annotation; import java.lang.annotation.*; /** * desc: 自定义 redisson 分布式锁注解 * * @author: 邢阳 * @mail: xyde

  • 关于分布式锁(Redisson)的原理分析

    目录 1.分布式锁场景 1.1 案例1 1.2 案例2-使用synchronized 实现单机锁 1.3 案例3-使用redis的SETNX实现分布式锁 1.4 案例4-加入过期时间 1.5 案例5-使用唯一id作为锁的value值 1.6 案例6-Redisson分布式锁 1.分布式锁场景 互联网秒杀 抢优惠卷 接口幂等性校验 1.1 案例1 如下代码模拟了下单减库存的场景,我们分析下在高并发场景下会存在什么问题 package com.wangcp.redisson; import org.

  • Redisson分布式锁的源码解读分享

    目录 前言 前置知识 分布式锁的思考 Redis订阅/发布机制 Redisson 加锁 订阅 解锁 看门狗 前言 Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid).Redisson有一样功能是可重入的分布式锁.本文来讨论一下这个功能的特点以及源码分析. 前置知识 在讲Redisson,咱们先来聊聊分布式锁的特点以及Redis的发布/订阅机制,磨刀不误砍柴工. 分布式锁的思考 首先思考下,如果我们自己去实现一个分布式锁,这个锁需要具备

  • Redisson公平锁的源码解读分享

    目录 前言 公平锁 加锁 解锁 总结 前言 我在上一篇文章聊了Redisson的分布式锁,这次继续来聊聊Redisson的公平锁.下面是官方原话: 它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程.所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒. 源码版本:3.17.7 这是我 fork 的分支,添加了自己理解的中文注释:https://g

  • redisson分布式锁的用法大全

    Redisson是Redis官方推荐的Java版的Redis客户端.它提供的功能非常多,此处我们只用它的分布式锁功能. 以springboot整合Redisson项目为例 添加springboot maven依赖 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.15

  • Ajax::prototype 源码解读

    AJAX之旅(1):由prototype_1.3.1进入javascript殿堂-类的初探  还是决定冠上ajax的头衔,毕竟很多人会用这个关键词搜索.虽然我认为这只是个炒作的概念,不过不得不承认ajax叫起来要方便多了.所以ajax的意思我就不详细解释了. 写这个教程的起因很简单:经过一段时间的ajax学习,有一些体会,并且越发认识到ajax技术的强大,所以决定记录下来,顺便也是对自己思路的整理.有关这个教程的后续,请关注http://www.x2design.net 前几年,javascri

  • jQuery源码解读之addClass()方法分析

    本文较为详细的分析了jQuery源码解读之addClass()方法.分享给大家供大家参考.具体分析如下: 给jQuery原型对象扩展addClass功能,jQuery.fn就是jQuery.prototype 复制代码 代码如下: jQuery.fn.extend({ /* 可以看出这是一个函数名叫addClass的插件方法. */     addClass: function( value ) {         var classes, elem, cur, clazz, j, finalV

  • jQuery源码解读之hasClass()方法分析

    本文较为详细的分析了jQuery源码解读之hasClass()方法.分享给大家供大家参考.具体分析如下: 复制代码 代码如下: jQuery.fn.extend({     hasClass: function( selector ) { //将要检查的类名selector赋值给className, l为选择器选择的当前要检查的jQuery对象数组的长度.         var className = " " + selector + " ",          

  • jQuery源码解读之removeAttr()方法分析

    本文较为详细的分析了jQuery源码解读之removeAttr()方法.分享给大家供大家参考.具体分析如下: 扩展jQuery原型对象的方法: 复制代码 代码如下: jQuery.fn.extend({ //name,传入要DOM元素要移除的属性名.     removeAttr: function( name ) { //使用jQuery.fn对象,即jQuery原型对象的each方法遍历当前选择器选择的jQuery对象数组,并返回该jQuery对象以便链式调用.         return

  • jQuery源码解读之removeClass()方法分析

    本文较为详细的分析了jQuery源码解读之removeClass()方法.分享给大家供大家参考.具体分析如下: removeClass()方法和addClass()差别不大.这就来看看: 复制代码 代码如下: jQuery.fn.extend({     removeClass: function( value ) {         var classes, elem, cur, clazz, j, finalValue,             i = 0,             len

  • jQuery源码解读之extend()与工具方法、实例方法详解

    本文实例讲述了jQuery源码解读之extend()与工具方法.实例方法.分享给大家供大家参考,具体如下: 使用jQuery的时候会发现,jQuery中有的函数是这样使用的: $.get(); $.post(); $.getJSON(); 有些函数是这样使用的: $('div').css(); $('ul').find('li'); 有些函数是这样使用的: $('li').each(callback); $.each(lis,callback); 这里涉及到两个概念:工具方法与实例方法.通常我们

随机推荐