Redisson 加锁解锁的实现

目录
  • 分布式锁使用
  • getLock
  • tryLock
  • unLock
  • 总结

分布式锁使用

对于 redisson 分布式锁的使用很简单:

1、调用 getLock 函数获取锁操作对象;
2、调用 tryLock 函数进行加锁;
3、调用 unlock 函数进行解锁;

注意 unlock 操作需要放到 finally 代码段中,保证锁可以被释放。

private void sumLock() {
    lock = redissonClient.getLock("sum-lock");

    boolean b = lock.tryLock();
    if (!b) {
        log.info("获取不到锁");
        return;
    }

    try {
        for (int j = 0; j < 20000; j++) {
            ++sum;
        }
    } finally {
        lock.unlock();
    }

}

getLock

getLock 实例化 RedissonLock,相当于 Lock lock = new ReentrantLock() 操作;

public RLock getLock(String name) {
    // 实例化 RedissonLock,参数为指令执行器和锁名称
    return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    // 命令执行器,用于执行lua脚本
    this.commandExecutor = commandExecutor;
    // 连接管理器的ID
    this.id = commandExecutor.getConnectionManager().getId();
    // 锁续期时间(看门狗),锁默认续期时间是 30s。
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.entryName = this.id + ":" + name;
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

tryLock

@Override
public boolean tryLock() {
    return get(tryLockAsync());
}

@Override
public RFuture<Boolean> tryLockAsync() {
    return tryLockAsync(Thread.currentThread().getId());
}

@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
    return tryAcquireOnceAsync(-1, -1, null, threadId);
}

这里是一系列的调用,可以直接跳过,直接进入到 tryAcquireOnceAsync 函数,看看 tryAcquireOnceAsync 函数的处理逻辑。

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // 由于我们调用tryLocak没有传递任何参数,leaseTime默认为-1,不走判断
    if (leaseTime != -1) {
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    }
    
    // 调用获取锁 枷锁的主要逻辑在这里
    RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        // 如果发生异常那么直接放回了
        if (e != null) {
            return;
        }

        // 锁续期
        if (ttlRemaining) {
            scheduleExpirationRenewal(threadId);
        }
    });
    // 返回结果
    return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    // 将时间转化为毫秒
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 执行脚本
    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

Redisson 中存储锁的数据类型结构采用的的是 hash,Key 为锁名称,VALUE的属性是 Redisson 客户端ID和线程ID组合而成的字符串,值是锁的重入次数,采用 hash 计数实现锁的重入性。

该函数主要执行 lua 脚本,脚本的逻辑为:

1、redis.call(‘exists’, KEYS[1]) == 0 用于判断锁是否存在,等于 0 说明不存在,表明此时没有客户端持有锁,此客户端获取锁成功;走步骤 2,否则走步骤 4;
2、设置锁,并且对锁进行 +1 操作,标识获取锁的次数;
3、为锁设置过期时间,成功返回 nil;
4、redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1 判断锁是否本客户端持有,等于1说明是,此时是再次获取锁(重入),走步骤 5,否则走 7;
5、对锁进行 +1 操作,标识获取锁的次数;
6、为锁设置过期时间,成功返回 nil;
7、如果1和4的判断都不满足,那么返回锁的的剩余时间;

unLock

@Override
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    // 释放锁的逻辑主要这里
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        cancelExpirationRenewal(threadId);

        if (e != null) {
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        result.trySuccess(null);
    });

    return result;
}

这里是一系列的调用,可以直接跳过,直接进入到 unlockInnerAsync 函数,看看 unlockInnerAsync 函数的处理逻辑。

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

该函数主要执行 lua 脚本,脚本的逻辑为:

1、redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0 用于判断锁是否为当前客户端持有,等于 0 说明不是直接返回 nil,否则说明是,走步骤 2;
2、对锁进行 -1 操作,并且获取其计数 counter;
3、判断 counter > 0,如果大于 0 说明该客户端多次获取锁,对锁进行续期并且返回 0,因为此时业务还没有执行完毕,否则走步骤 4;
4、如果count 小于等于 0 则删除锁,发送释放锁的消息,返回 1;
5、如果以上逻辑都不满足,那么直接返回nil;

总结

redisson 加锁解锁:

1、redisson 使用 lua 脚本保证命令执行的原子性;
2、redisson 使用 redis 的 hash 数据结构类型来存储锁信息,使用 锁名称作为 hash 名称,使用“客户端ID:线程ID”作为键,使用重入次数作为值;
3、每次获取锁,会对值进行 +1 操作,并且设置过期时间;
4、每次释放锁,会对值进行 -1 操作,如果没有减少为 0,则继续设置锁的超时时间,否则删除锁,并且发送释放锁的消息。

到此这篇关于Redisson 加锁解锁的实现的文章就介绍到这了,更多相关Redisson 加锁解锁内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Redisson如何解决Redis分布式锁提前释放问题

    目录 前言: 一.问题描述: 二.原因分析: 三.解决方案: 1.思考: 2.Redisson简单配置: 3.使用样例: 四.源码分析 1.lock加锁操作 2.unlock解锁操作 总结: 相关参考: 前言: 在分布式场景下,相信你或多或少需要使用分布式锁来访问临界资源,或者控制耗时操作的并发性. 当然,实现分布式锁的方案也比较多,比如数据库.redis.zk 等等.本文主要结合一个线上案例,讲解 redis 分布式锁的相关实现. 一.问题描述: 某天线上出现了数据重复处理问题,经排查后发现,

  • redis中RedissonLock如何实现等待锁的

    目录 前言 问题 方案 tryLock unlockInnerAsync 思考 前言 经常会有到这样的需求,就是在一个查询接口,第一次查询的时候,如果没有查询到就要执行初始化方法,初始化数据出来,之后的查询就可以直接查询库里的数据了.这样设计的目的是,如果需要初始化的数据特别大,无法再一次调用方法里处理完,或者说数据并不是每条都需要初始化,这种情况下,优先查询的数据优先初始化. 问题 这种方案随之而来就会引发一个问题.查询接口众所周知是个自然幂等的,不需要我们额外去做幂等处理.但是在方案中,这个

  • Redisson分布式锁源码解析

    Redisson锁继承Implements Reentrant Lock,所以具备 Reentrant Lock 锁中的一些特性:超时,重试,可中断等.加上Redisson中Redis具备分布式的特性,所以非常适合用来做Java中的分布式锁. 下面我们对其加锁.解锁过程中的源码细节进行一一分析. 锁的接口定义了一下方法: 分布式锁当中加锁,我们常用的加锁接口: boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws

  • 详解redis分布式锁(优化redis分布式锁的过程及Redisson使用)

    目录 1. redis在实际的应用中 2.如何使用redis的功能进行实现分布式锁 2.1 redis分布式锁思想 2.1.1设计思想: 2.1.2 根据上面的设计思想进行代码实现 2.2 使用redisson进行实现分布式锁 1. redis在实际的应用中 不仅可以用来缓存数据,在分布式应用开发中,经常被用来当作分布式锁的使用,为什么要用到分布式锁呢? 在分布式的开发中,以电商库存的更新功能进行讲解,在实际的应用中相同功能的消费者是有多个的,假如多个消费者同一时刻要去消费一条数据,假如业务逻辑

  • redis分布式锁RedissonLock的实现细节解析

    redis分布式锁RedissonLock 简单使用 String key = "key-lock"; RLock lock = redisson.getLock(key); lock.lock(); try { // TODO } catch (Exception e){ log.error(e.getMessage(), e); } finally { lock.unlock(); } String key = "key-tryLock"; long maxWa

  • redisson实现分布式锁原理

    Redisson分布式锁 之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的. 不同版本实现锁的机制并不相同 引用的redisson最近发布的版本3.2.3,不同的版本可能实现锁的机制并不相同,早期版本好像是采用简单的setnx,getset等常规命令来配置完成,而后期由于redis支持了脚本Lua变更了实现原理. <dependency> <groupId>org.redisson&

  • redisson分布式锁的用法大全

    Redisson是Redis官方推荐的Java版的Redis客户端.它提供的功能非常多,此处我们只用它的分布式锁功能. 以springboot整合Redisson项目为例 添加springboot maven依赖 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.15

  • Redisson实现Redis分布式锁的几种方式

    目录 Redis几种架构 普通分布式锁 单机模式 哨兵模式 集群模式 总结 Redlock分布式锁 实现原理 问题合集 前几天发的一篇文章<Redlock:Redis分布式锁最牛逼的实现>,引起了一些同学的讨论,也有一些同学提出了一些疑问,这是好事儿.本文在讲解如何使用Redisson实现Redis普通分布式锁,以及Redlock算法分布式锁的几种方式的同时,也附带解答这些同学的一些疑问. Redis几种架构 Redis发展到现在,几种常见的部署架构有: 单机模式: 主从模式: 哨兵模式: 集

  • Redisson 加锁解锁的实现

    目录 分布式锁使用 getLock tryLock unLock 总结 分布式锁使用 对于 redisson 分布式锁的使用很简单: 1.调用 getLock 函数获取锁操作对象:2.调用 tryLock 函数进行加锁:3.调用 unlock 函数进行解锁: 注意 unlock 操作需要放到 finally 代码段中,保证锁可以被释放. private void sumLock() { lock = redissonClient.getLock("sum-lock"); boolean

  • Oracle 11g用户修改密码及加锁解锁功能实例代码

    1.运行 cmd.exe: 2.输入 sqlplus / as sysdba,以系统管理员(sysdba)身份连接数据库,进行数据库管理操作. 3.连接成功后执行 alter user identityName identified by password; -–修改密码 alter user identityName account unlock; -–解锁 alter user identityName account lock; -–加锁 identityName:需要修改的用户: pas

  • redisson特性及优雅实现示例

    目录 redisson的几大特性 可重入性 阻塞能力 续约 初始化timer的代码 redisson的几大特性 相信看了这个标题的同学,对这个问题以已经非常不陌生了,信手拈来redisson的几大特性: 可重入性 [多个业务线同一时刻n条扣款,如果用setnx,我怎么监控的加锁解锁线程?死锁不得发生呐?] redisson使用hash结构,业务名称作为key,uuid+线程id作为field,加锁次数作为value,这不就解决上述问题了吗 阻塞能力 [加锁有两个策略,一是互斥,二是阻塞.互斥比如

  • Redisson分布式锁的源码解读分享

    目录 前言 前置知识 分布式锁的思考 Redis订阅/发布机制 Redisson 加锁 订阅 解锁 看门狗 前言 Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid).Redisson有一样功能是可重入的分布式锁.本文来讨论一下这个功能的特点以及源码分析. 前置知识 在讲Redisson,咱们先来聊聊分布式锁的特点以及Redis的发布/订阅机制,磨刀不误砍柴工. 分布式锁的思考 首先思考下,如果我们自己去实现一个分布式锁,这个锁需要具备

  • Redisson公平锁的源码解读分享

    目录 前言 公平锁 加锁 解锁 总结 前言 我在上一篇文章聊了Redisson的分布式锁,这次继续来聊聊Redisson的公平锁.下面是官方原话: 它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程.所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒. 源码版本:3.17.7 这是我 fork 的分支,添加了自己理解的中文注释:https://g

  • C++开发:为什么多线程读写shared_ptr要加锁的详细介绍

    我在<Linux 多线程服务端编程:使用 muduo C++ 网络库>第 1.9 节"再论 shared_ptr 的线程安全"中写道: (shared_ptr)的引用计数本身是安全且无锁的,但对象的读写则不是,因为 shared_ptr 有两个数据成员,读写操作不能原子化.根据文档(http://www.boost.org/doc/libs/release/libs/smart_ptr/shared_ptr.htm#ThreadSafety), shared_ptr 的线程

  • 基于springboot实现redis分布式锁的方法

    在公司的项目中用到了分布式锁,但只会用却不明白其中的规则 所以写一篇文章来记录 使用场景:交易服务,使用redis分布式锁,防止重复提交订单,出现超卖问题 分布式锁的实现方式 基于数据库乐观锁/悲观锁 Redis分布式锁(本文) Zookeeper分布式锁 redis是如何实现加锁的? 在redis中,有一条命令,实现锁 SETNX key value 该命令的作用是将 key 的值设为 value ,当且仅当 key 不存在.若给定的 key 已经存在,则 SETNX不做任何动作.设置成功,返

  • Redis如何实现分布式锁详解

    一.前言 在Java的并发编程中,我们通过锁,来避免由于竞争而造成的数据不一致问题.通常,我们以synchronized .Lock来使用它. 但是Java中的锁,只能保证在同一个JVM进程内中执行.如果在分布式集群环境下,就需要分布式锁了. 通常的分布式锁的实现方式有redis,zookeeper,但是一般我们的程序中都会用到redis,用redis做分布式锁,也能够降低成本. 二.实现原理 2.1 加锁 加锁实际上就是在redis中,给Key键设置一个值,为避免死锁,并给定一个过期时间. 在

  • 详解Redis分布式锁的原理与实现

    目录 前言 使用场景 为什么要使用分布式锁 如何使用分布式锁 流程图 分布式锁的状态 分布式锁的特点 分布式锁的实现方式(以redis分布式锁实现为例) 总结 前言 在单体应用中,如果我们对共享数据不进行加锁操作,会出现数据一致性问题,我们的解决办法通常是加锁.在分布式架构中,我们同样会遇到数据共享操作问题,此时,我们就需要分布式锁来解决问题,下面我们一起聊聊使用redis来实现分布式锁. 使用场景 库存超卖 比如 5个笔记本 A 看 准备买3个 B 买2个 C 4个 一下单 3+2+4 =9

  • 如何使用注解方式实现 Redis 分布式锁

    目录 引入 Redisson 初始化 Redisson 编写 Redisson 分布式锁工具类 声明注解 @Lock 注解解析类 引入 Redisson <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.14.1</version> </depend

随机推荐