Java使用Redisson分布式锁实现原理

1. 基本用法

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson</artifactId>
  <version>3.8.2</version>
</dependency>
Config config = new Config();
config.useClusterServers()
  .setScanInterval(2000) // cluster state scan interval in milliseconds
  .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
  .addNodeAddress("redis://127.0.0.1:7002");

RedissonClient redisson = Redisson.create(config);

RLock lock = redisson.getLock("anyLock");

lock.lock();

try {
  ...
} finally {
  lock.unlock();
}

针对上面这段代码,重点看一下Redisson是如何基于Redis实现分布式锁的

Redisson中提供的加锁的方法有很多,但大致类似,此处只看lock()方法

更多请参见https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers

2. 加锁

可以看到,调用getLock()方法后实际返回一个RedissonLock对象,在RedissonLock对象的lock()方法主要调用tryAcquire()方法

由于leaseTime == -1,于是走tryLockInnerAsync()方法,这个方法才是关键

首先,看一下evalWriteAsync方法的定义

代码如下:

<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

最后两个参数分别是keys和params

实际调用是这样的:

单独将调用的那一段摘出来看

commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
         "if (redis.call('exists', KEYS[1]) == 0) then " +
           "redis.call('hset', KEYS[1], ARGV[2], 1); " +
           "redis.call('pexpire', KEYS[1], ARGV[1]); " +
           "return nil; " +
         "end; " +
         "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
           "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
           "redis.call('pexpire', KEYS[1], ARGV[1]); " +
           "return nil; " +
         "end; " +
         "return redis.call('pttl', KEYS[1]);",
          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

结合上面的参数声明,我们可以知道,这里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)

假设前面获取锁时传的name是“abc”,假设调用的线程ID是Thread-1,假设成员变量UUID类型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c

那么KEYS[1]=abc,ARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

因此,这段脚本的意思是

  1、判断有没有一个叫“abc”的key

  2、如果没有,则在其下设置一个字段为“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值为“1”的键值对 ,并设置它的过期时间

  3、如果存在,则进一步判断“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,则其值加1,并重新设置过期时间

  4、返回“abc”的生存时间(毫秒)

这里用的数据结构是hash,hash的结构是: key 字段1 值1 字段2 值2 。。。

用在锁这个场景下,key就表示锁的名称,也可以理解为临界资源,字段就表示当前获得锁的线程

所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段,如果没有则不能获得锁,如果有,则相当于重入,字段值加1(次数)

3. 解锁

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
      "if (redis.call('exists', KEYS[1]) == 0) then " +
        "redis.call('publish', KEYS[2], ARGV[1]); " +
        "return 1; " +
      "end;" +
      "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
        "return nil;" +
      "end; " +
      "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
      "if (counter > 0) then " +
        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
        "return 0; " +
      "else " +
        "redis.call('del', KEYS[1]); " +
        "redis.call('publish', KEYS[2], ARGV[1]); " +
        "return 1; "+
      "end; " +
      "return nil;",
      Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

我们还是假设name=abc,假设线程ID是Thread-1

同理,我们可以知道

KEYS[1]是getName(),即KEYS[1]=abc

KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{abc}

ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0

ARGV[2]是生存时间

ARGV[3]是getLockName(threadId),即ARGV[3]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

因此,上面脚本的意思是:

  1、判断是否存在一个叫“abc”的key

  2、如果不存在,向Channel中广播一条消息,广播的内容是0,并返回1

  3、如果存在,进一步判断字段6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1是否存在

  4、若字段不存在,返回空,若字段存在,则字段值减1

  5、若减完以后,字段值仍大于0,则返回0

  6、减完后,若字段值小于或等于0,则广播一条消息,广播内容是0,并返回1;

可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了

4. 等待

以上是正常情况下获取到锁的情况,那么当无法立即获取到锁的时候怎么办呢?

再回到前面获取锁的位置

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
  long threadId = Thread.currentThread().getId();
  Long ttl = tryAcquire(leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
    return;
  }

  //  订阅
  RFuture<RedissonLockEntry> future = subscribe(threadId);
  commandExecutor.syncSubscription(future);

  try {
    while (true) {
      ttl = tryAcquire(leaseTime, unit, threadId);
      // lock acquired
      if (ttl == null) {
        break;
      }

      // waiting for message
      if (ttl >= 0) {
        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
      } else {
        getEntry(threadId).getLatch().acquire();
      }
    }
  } finally {
    unsubscribe(future, threadId);
  }
//    get(lockAsync(leaseTime, unit));
}

protected static final LockPubSub PUBSUB = new LockPubSub();

protected RFuture<RedissonLockEntry> subscribe(long threadId) {
  return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
  PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源

当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞

5. 小结

6. 其它相关

《基于Redis的分布式锁的简单实现》

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java编程redisson实现分布式锁代码示例

    最近由于工作很忙,很长时间没有更新博客了,今天为大家带来一篇有关Redisson实现分布式锁的文章,好了,不多说了,直接进入主题. 1. 可重入锁(Reentrant Lock) Redisson的分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口,同时还支持自动过期解锁. public void testReentrantLock(RedissonClient redisson){ RLock lock = redisson.getL

  • 详解Java如何实现基于Redis的分布式锁

    前言 单JVM内同步好办, 直接用JDK提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用Redis,当然还有很多其他的实现方式.其实基于Redis实现的原理还算比较简单的,在看代码之前建议大家先去看看原理,看懂了之后看代码应该就容易理解了. 我这里不实现JDK的java.util.concurrent.locks.Lock接口,而是自定义一个,因为JDK的有个newCondition方法我这里暂时没实现.这个Lock提供了5个lock方法的变体,可以

  • java基于jedisLock—redis分布式锁实现示例代码

    分布式锁是啥? 单机锁的概念:我们正常跑的单机项目(也就是在tomcat下跑一个项目不配置集群)想要在高并发的时候加锁很容易就可以搞定,java提供了很多的机制例如:synchronized.volatile.ReentrantLock等锁的机制. 为啥需要分布式锁:当我们的项目比较庞大的时候,单机版的项目已经不能满足吞吐量的需求了,需要对项目做负载均衡,有可能还需要对项目进行解耦拆分成不同的服务,那么肯定是做成分布式的项目,分布式的项目因为是不同的程序控制,所以使用java提供的锁并不能完全保

  • redis中使用java脚本实现分布式锁

    redis被大量用在分布式的环境中,自然而然分布式环境下的锁如何解决,立马成为一个问题.例如我们当前的手游项目,服务器端是按业务模块划分服务器的,有应用服,战斗服等,但是这两个vm都有可能同时改变玩家的属性,这如果在同一个vm下面,就很容易加锁,但如果在分布式环境下就没那么容易了,当然利用redis现有的功能也有解决办法,比如redis的脚本. redis在2.6以后的版本中增加了Lua脚本的功能,可以通过eval命令,直接在RedisServer环境中执行Lua脚本,并且可以在Lua脚本中调用

  • java语言描述Redis分布式锁的正确实现方式

    分布式锁一般有三种实现方式:1.数据库乐观锁:2.基于Redis的分布式锁:3.基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁. 可靠性 首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件: 互斥性.在任意时刻,只有一个客户端能持有锁. 不会发生死锁.即使有一个客户端在持有锁的期间

  • Java Redis分布式锁的正确实现方式详解

    前言 分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁. 可靠性 首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件: 互斥性.在任意时刻,只有一个客户端能持有锁. 不会发生死锁.即使有一个客户端在

  • Java使用Redisson分布式锁实现原理

    1. 基本用法 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.8.2</version> </dependency> Config config = new Config(); config.useClusterServers() .setScanInterval(2000) /

  • Java实现redis分布式锁的三种方式

    目录 一.引入原因 二.分布式锁实现过程中的问题 问题一:异常导致锁没有释放 问题二:获取锁与设置过期时间操作不是原子性的 问题三:锁过期之后被别的线程重新获取与释放 问题四:锁的释放不是原子性的 问题五:其他的问题? 三.具体实现 1. RedisTemplate 2. RedisLockRegistry 3. 使用redisson实现分布式锁 一.引入原因 在分布式服务中,常常有如定时任务.库存更新这样的场景. 在定时任务中,如果不使用quartz这样的分布式定时工具,只是简单的使用定时器来

  • Redisson分布式锁之加解锁详解

    目录 引言 锁的可重入性 加锁 锁续命 释放锁 引言 2023的金三银四来的没想象中那么激烈,一个朋友前段时间投了几十家,多数石沉大海,好不容易等来面试机会,就恰好被问道项目中关于分布式锁的应用,后涉及Redisson实现分布式锁的原理,答不上来. 锁的可重入性 我们都知道,Java中synchronized和lock都支持可重入,synchronized的锁关联一个线程持有者和一个计数器.当一个线程请求成功后,JVM会记下持有锁的线程,并将计数器计为1.此时其他线程请求该锁,则必须等待.而该持

  • Redisson分布式锁源码解析

    Redisson锁继承Implements Reentrant Lock,所以具备 Reentrant Lock 锁中的一些特性:超时,重试,可中断等.加上Redisson中Redis具备分布式的特性,所以非常适合用来做Java中的分布式锁. 下面我们对其加锁.解锁过程中的源码细节进行一一分析. 锁的接口定义了一下方法: 分布式锁当中加锁,我们常用的加锁接口: boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws

  • 基于redis实现分布式锁的原理与方法

    前言 系统的不断扩大,分布式锁是最基本的保障.与单机的多线程不一样的是,分布式跨多个机器.线程的共享变量无法跨机器. 为了保证一个在高并发存场景下只能被同一个线程操作,java并发处理提供ReentrantLock或Synchronized进行互斥控制.但是这仅仅对单机环境有效.我们实现分布式锁大概通过三种方式. redis实现分布式锁 数据库实现分布式锁 zk实现分布式锁 今天我们介绍通过redis实现分布式锁.实际上这三种和java对比看属于一类.都是属于程序外部锁. 原理剖析 上述三种分布

  • redisson分布式锁的用法大全

    Redisson是Redis官方推荐的Java版的Redis客户端.它提供的功能非常多,此处我们只用它的分布式锁功能. 以springboot整合Redisson项目为例 添加springboot maven依赖 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.15

  • 使用自定义注解实现redisson分布式锁

    目录 自定义注解实现redisson分布式锁 自定义注解 aop解析注解 service中使用注解加锁使用 redisson分布式锁应用 应用场景 Redisson管理类 分布式锁 测试类 自定义注解实现redisson分布式锁 自定义注解 package com.example.demo.annotation; import java.lang.annotation.*; /** * desc: 自定义 redisson 分布式锁注解 * * @author: 邢阳 * @mail: xyde

  • 详解Spring Cache使用Redisson分布式锁解决缓存击穿问题

    目录 1 什么是缓存击穿 2 为什么要使用分布式锁 3 什么是Redisson 4 Spring Boot集成Redisson 4.1 添加maven依赖 4.2 配置yml 4.3 配置RedissonConfig 5 使用Redisson的分布式锁解决缓存击穿 1 什么是缓存击穿 一份热点数据,它的访问量非常大.在其缓存失效的瞬间,大量请求直达存储层,导致服务崩溃. 2 为什么要使用分布式锁 在项目中,当共享资源出现竞争情况的时候,为了防止出现并发问题,我们一般会采用锁机制来控制.在单机环境

  • 详解Redis分布式锁的原理与实现

    目录 前言 使用场景 为什么要使用分布式锁 如何使用分布式锁 流程图 分布式锁的状态 分布式锁的特点 分布式锁的实现方式(以redis分布式锁实现为例) 总结 前言 在单体应用中,如果我们对共享数据不进行加锁操作,会出现数据一致性问题,我们的解决办法通常是加锁.在分布式架构中,我们同样会遇到数据共享操作问题,此时,我们就需要分布式锁来解决问题,下面我们一起聊聊使用redis来实现分布式锁. 使用场景 库存超卖 比如 5个笔记本 A 看 准备买3个 B 买2个 C 4个 一下单 3+2+4 =9

  • Redisson分布式锁的源码解读分享

    目录 前言 前置知识 分布式锁的思考 Redis订阅/发布机制 Redisson 加锁 订阅 解锁 看门狗 前言 Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid).Redisson有一样功能是可重入的分布式锁.本文来讨论一下这个功能的特点以及源码分析. 前置知识 在讲Redisson,咱们先来聊聊分布式锁的特点以及Redis的发布/订阅机制,磨刀不误砍柴工. 分布式锁的思考 首先思考下,如果我们自己去实现一个分布式锁,这个锁需要具备

随机推荐