redisson 实现分布式锁的源码解析

目录
  • redisson
  • 测试代码
  • 加锁设计
    • 锁续期设计
    • 锁的自旋重试
  • 解锁设计
    • 撤销锁续期
    • 解锁成功唤排队线程

redisson

redisson 实现分布式锁的机制如下:

依赖版本

implementation 'org.redisson:redisson-spring-boot-starter:3.17.0'

测试代码

下面是模拟一个商品秒杀的场景,示例代码如下:

public class RedissonTest {
    public static void main(String[] args) {
        //1. 配置部分
        Config config = new Config();
        String address = "redis://127.0.0.1:6379";
        SingleServerConfig serverConfig = config.useSingleServer();
        serverConfig.setAddress(address);
        serverConfig.setDatabase(0);
        config.setLockWatchdogTimeout(5000);
        Redisson redisson = (Redisson) Redisson.create(config);
        RLock rLock = redisson.getLock("goods:1000:1");
        //2. 加锁
        rLock.lock();
        try {
            System.out.println("todo 逻辑处理 1000000.");
        } finally {
            if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
                //3. 解锁
                rLock.unlock();
            }
        }
    }
}

加锁设计

rLock.lock();是加锁的核心代码,我们一起来看看调用栈

加锁的核心方法是:org.redisson.RedissonLock#tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

其实它的本质是调用一段 LUA 脚本进行加锁。

锁续期设计

锁的续期是在 org.redisson.RedissonLock#tryAcquireAsync方法中调用 scheduleExpirationRenewal实现的。

续期需要注意的是,看门狗是设置在主线程的延迟队列的线程中。

tryAcquireAsync 代码如下:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }

    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 锁过期时间续期
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

锁续期 scheduleExpirationRenewal代码如下:

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

然后在调用 renewExpiration();执行续期逻辑

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 创建延迟任务
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // 真正的续期,调用 LUA 脚本续期
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                // 如果续期成功
                if (res) {
                    // reschedule itself
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    ee.setTimeout(task);
}

renewExpirationAsync方法, 里面还是一段 LUA 脚本,进行重新设置锁的过期时间。

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

锁的自旋重试

org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)在执行获取锁失败的时候,会进入重试。其实这里就会执行 18 行以后的 while (true) 逻辑

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
    RedissonLockEntry entry;
    if (interruptibly) {
        entry = commandExecutor.getInterrupted(future);
    } else {
        entry = commandExecutor.get(future);
    }

    try {
        while (true) {
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                try {
                    // 阻塞锁的超时时间,等锁过期后再尝试加锁
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    entry.getLatch().acquire();
                } else {
                    entry.getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(entry, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}

entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);其实这里就是一个间歇性自旋。 等到上次锁过期的时间,在唤醒进行抢锁 entry.getLatch().acquire();

还有一个逻辑就是

CompletableFuture future = subscribe(threadId);

这里其实是会订阅一个消息,如果解锁过后,会发布解锁的消息。

解锁设计

rLock.unlock(); 的核心就是释放锁,撤销续期和唤醒在等待加锁的线程(发布解锁成功消息)。

核心方法(解锁): org.redisson.RedissonLock#unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        // 发布解锁成功消息
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

还是 LUA 的执行方式。

撤销锁续期

核心方法 org.redisson.RedissonBaseLock#unlockAsync(long)

@Override
public RFuture<Void> unlockAsync(long threadId) {
    // 解锁
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    // 撤销续期
    CompletionStage<Void> f = future.handle((opStatus, e) -> {
        cancelExpirationRenewal(threadId);
        if (e != null) {
            throw new CompletionException(e);
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            throw new CompletionException(cause);
        }
        return null;
    });
    return new CompletableFutureWrapper<>(f);
}

解锁成功唤排队线程

org.redisson.pubsub.LockPubSub#onMessage中回去唤醒阻塞的线程,让执行前面的锁自旋逻辑,具体代码如下:

@Override
protected void onMessage(RedissonLockEntry value, Long message) {
    if (message.equals(UNLOCK_MESSAGE)) {
        Runnable runnableToExecute = value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }
        value.getLatch().release();
    } else if (message.equals(READ_UNLOCK_MESSAGE)) {
        while (true) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute == null) {
                break;
            }
            runnableToExecute.run();
        }
        value.getLatch().release(value.getLatch().getQueueLength());
    }
}

到此这篇关于redisson 实现分布式锁的文章就介绍到这了,更多相关redisson 分布式锁内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • redisson分布式锁的用法大全

    Redisson是Redis官方推荐的Java版的Redis客户端.它提供的功能非常多,此处我们只用它的分布式锁功能. 以springboot整合Redisson项目为例 添加springboot maven依赖 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.15

  • Redisson实现Redis分布式锁的几种方式

    目录 Redis几种架构 普通分布式锁 单机模式 哨兵模式 集群模式 总结 Redlock分布式锁 实现原理 问题合集 前几天发的一篇文章<Redlock:Redis分布式锁最牛逼的实现>,引起了一些同学的讨论,也有一些同学提出了一些疑问,这是好事儿.本文在讲解如何使用Redisson实现Redis普通分布式锁,以及Redlock算法分布式锁的几种方式的同时,也附带解答这些同学的一些疑问. Redis几种架构 Redis发展到现在,几种常见的部署架构有: 单机模式: 主从模式: 哨兵模式: 集

  • Springboot中如何使用Redisson实现分布式锁浅析

    目录 前言 1. 概述 2. Redisson 在 Springboot 中的使用 2.1 引入依赖 2.2 在 Springboot 配置中配置Redis 2.3 Demo代码 3. 综述 前言 在分布式场景下为了保证数据最终一致性.在单进程的系统中,存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步(lock-synchronized),使其在修改这种变量时能够线性执行消除并发修改变量.但分布式系统是多部署.多进程的,开发语言提供的并发处理API在此场景下就无能为

  • 详解Spring Cache使用Redisson分布式锁解决缓存击穿问题

    目录 1 什么是缓存击穿 2 为什么要使用分布式锁 3 什么是Redisson 4 Spring Boot集成Redisson 4.1 添加maven依赖 4.2 配置yml 4.3 配置RedissonConfig 5 使用Redisson的分布式锁解决缓存击穿 1 什么是缓存击穿 一份热点数据,它的访问量非常大.在其缓存失效的瞬间,大量请求直达存储层,导致服务崩溃. 2 为什么要使用分布式锁 在项目中,当共享资源出现竞争情况的时候,为了防止出现并发问题,我们一般会采用锁机制来控制.在单机环境

  • 详解redis分布式锁(优化redis分布式锁的过程及Redisson使用)

    目录 1. redis在实际的应用中 2.如何使用redis的功能进行实现分布式锁 2.1 redis分布式锁思想 2.1.1设计思想: 2.1.2 根据上面的设计思想进行代码实现 2.2 使用redisson进行实现分布式锁 1. redis在实际的应用中 不仅可以用来缓存数据,在分布式应用开发中,经常被用来当作分布式锁的使用,为什么要用到分布式锁呢? 在分布式的开发中,以电商库存的更新功能进行讲解,在实际的应用中相同功能的消费者是有多个的,假如多个消费者同一时刻要去消费一条数据,假如业务逻辑

  • Java redisson实现分布式锁原理详解

    Redisson分布式锁 之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的. 不同版本实现锁的机制并不相同 引用的redisson最近发布的版本3.2.3,不同的版本可能实现锁的机制并不相同,早期版本好像是采用简单的setnx,getset等常规命令来配置完成,而后期由于redis支持了脚本Lua变更了实现原理. <dependency> <groupId>org.redisson&

  • 使用自定义注解实现redisson分布式锁

    目录 自定义注解实现redisson分布式锁 自定义注解 aop解析注解 service中使用注解加锁使用 redisson分布式锁应用 应用场景 Redisson管理类 分布式锁 测试类 自定义注解实现redisson分布式锁 自定义注解 package com.example.demo.annotation; import java.lang.annotation.*; /** * desc: 自定义 redisson 分布式锁注解 * * @author: 邢阳 * @mail: xyde

  • SpringBoot整合Redisson实现分布式锁

    目录 一.添加依赖 二.redis配置文件 三.新建配置类 四.使用分布式锁 可重入锁 读写锁 信号量(Semaphore) 闭锁(CountDownLatch) Redisson是架设在redis基础上的一个Java驻内存数据网格(In-Memory Data Grid).充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类.使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低

  • Redisson分布式锁源码解析

    Redisson锁继承Implements Reentrant Lock,所以具备 Reentrant Lock 锁中的一些特性:超时,重试,可中断等.加上Redisson中Redis具备分布式的特性,所以非常适合用来做Java中的分布式锁. 下面我们对其加锁.解锁过程中的源码细节进行一一分析. 锁的接口定义了一下方法: 分布式锁当中加锁,我们常用的加锁接口: boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws

  • redisson 实现分布式锁的源码解析

    目录 redisson 测试代码 加锁设计 锁续期设计 锁的自旋重试 解锁设计 撤销锁续期 解锁成功唤排队线程 redisson redisson 实现分布式锁的机制如下: 依赖版本 implementation 'org.redisson:redisson-spring-boot-starter:3.17.0' 测试代码 下面是模拟一个商品秒杀的场景,示例代码如下: public class RedissonTest { public static void main(String[] arg

  • Redisson分布式锁的源码解读分享

    目录 前言 前置知识 分布式锁的思考 Redis订阅/发布机制 Redisson 加锁 订阅 解锁 看门狗 前言 Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid).Redisson有一样功能是可重入的分布式锁.本文来讨论一下这个功能的特点以及源码分析. 前置知识 在讲Redisson,咱们先来聊聊分布式锁的特点以及Redis的发布/订阅机制,磨刀不误砍柴工. 分布式锁的思考 首先思考下,如果我们自己去实现一个分布式锁,这个锁需要具备

  • Redisson公平锁的源码解读分享

    目录 前言 公平锁 加锁 解锁 总结 前言 我在上一篇文章聊了Redisson的分布式锁,这次继续来聊聊Redisson的公平锁.下面是官方原话: 它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程.所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒. 源码版本:3.17.7 这是我 fork 的分支,添加了自己理解的中文注释:https://g

  • Netty分布式NioEventLoop优化selector源码解析

    目录 优化selector selector的创建过程 代码剖析 这里一步创建了这个优化后的数据结构 最后返回优化后的selector 优化selector selector的创建过程 在剖析selector轮询之前, 我们先讲解一下selector的创建过程 回顾之前的小节, 在创建NioEventLoop中初始化了唯一绑定的selector: NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider

  • Netty分布式客户端处理接入事件handle源码解析

    目录 处理接入事件创建handle 我们看其RecvByteBufAllocator接口 跟进newHandle()方法中 继续回到read()方法 我们跟进reset中 前文传送门 :客户端接入流程初始化源码分析 上一小节我们剖析完成了与channel绑定的ChannelConfig初始化相关的流程, 这一小节继续剖析客户端连接事件的处理 处理接入事件创建handle 回到上一章NioEventLoop的processSelectedKey ()方法 private void processS

  • Netty分布式ByteBuf缓冲区分配器源码解析

    目录 缓冲区分配器 以其中的分配ByteBuf的方法为例, 对其做简单的介绍 跟到directBuffer()方法中 我们回到缓冲区分配的方法 然后通过validate方法进行参数验证 缓冲区分配器 顾明思议就是分配缓冲区的工具, 在netty中, 缓冲区分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关缓冲区分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByt

  • Netty分布式ByteBuf的分类方式源码解析

    目录 ByteBuf根据不同的分类方式 会有不同的分类结果 1.Pooled和Unpooled 2.基于直接内存的ByteBuf和基于堆内存的ByteBuf 3.safe和unsafe 上一小节简单介绍了AbstractByteBuf这个抽象类, 这一小节对其子类的分类做一个简单的介绍 ByteBuf根据不同的分类方式 会有不同的分类结果 我们首先看第一种分类方式 1.Pooled和Unpooled pooled是从一块内存里去取一段连续内存封装成byteBuf 具体标志是类名以Pooled开头

  • Netty分布式ByteBuf使用的底层实现方式源码解析

    目录 概述 AbstractByteBuf属性和构造方法 首先看这个类的属性和构造方法 我们看几个最简单的方法 我们重点关注第二个校验方法ensureWritable(length) 我们跟到扩容的方法里面去 最后将写指针后移length个字节 概述 熟悉Nio的小伙伴应该对jdk底层byteBuffer不会陌生, 也就是字节缓冲区, 主要用于对网络底层io进行读写, 当channel中有数据时, 将channel中的数据读取到字节缓冲区, 当要往对方写数据的时候, 将字节缓冲区的数据写到cha

  • Netty分布式行解码器逻辑源码解析

    目录 行解码器LineBasedFrameDecoder 首先看其参数 我们跟到重载的decode方法中 我们看findEndOfLine(buffer)方法 这一小节了解下行解码器LineBasedFrameDecoder, 行解码器的功能是一个字节流, 以\r\n或者直接以\n结尾进行解码, 也就是以换行符为分隔进行解析 同样, 这个解码器也继承了ByteToMessageDecoder 行解码器LineBasedFrameDecoder 首先看其参数 //数据包的最大长度, 超过该长度会进

  • java底层AQS实现类kReentrantLock锁的构成及源码解析

    目录 引导语 1.类注释 2.类结构 3.构造器 4.Sync同步器 4.1.nonfairTryAcquire 4.2.tryRelease 5.FairSync公平锁 6.NonfairSync非公平锁 7.如何串起来 7.1lock加锁 7.2tryLock尝试加锁 7.3unlock释放锁 7.4Condition 8.总结 引导语 本章的描述思路是先描述清楚 ReentrantLock 的构成组件,然后使用加锁和释放锁的方法把这些组件串起来. 1.类注释 ReentrantLock 中

随机推荐