巧用Redis实现分布式锁详细介绍

目录
  • 前言
  • 手写Redis分布式锁
  • Redisson
    • lock()
    • lock(long leaseTime, TimeUnit unit)
    • tryLock(long waitTime, long leaseTime, TimeUnit unit)
  • RedLock红锁
  • 总结

前言

无论是synchronized还是Lock,都运行在线程级别上,必须运行在同一个JVM中。如果竞争资源的进程不在同一个JVM中时,这样线程锁就无法起到作用,必须使用分布式锁来控制多个进程对资源的访问。

分布式锁的实现一般有三种方式,使用MySql数据库行锁,基于Redis的分布式锁,以及基于Zookeeper的分布式锁。本文中我们重点看一下Redis如何实现分布式锁。

首先,看一下用于实现分布式锁的两个Redis基础命令:

setnx key value

这里的setnx,是"set if Not eXists"的缩写,表示当指定的key值不存在时,为key设定值为value。如果key存在,则设定失败。

setex key timeout value

setex命令为指定的key设置值及其过期时间(以秒为单位)。如果key已经存在,setex命令将会替换旧的值。

基于这两个指令,我们能够实现:

使用setnx 命令,保证同一时刻只有一个线程能够获取到锁使用setex 命令,保证锁会超期释放,从而不因一个线程长期占有一个锁而导致死锁。

这里将两个命令结合在一起使用的原因是,在正常情况下,如果只使用setnx 命令,使用完成后使用delete命令删除锁进行释放,不存在什么问题。但是如果获取分布式锁的线程在运行中挂掉了,那么锁将不被释放。如果使用setex 设置了过期时间,即使线程挂掉,也可以自动进行锁的释放。

手写Redis分布式锁

接下来,我们基于Redis+Spring手写实现一个分布式锁。首先配置Jedis连接池:

@Configuration
public class Config {
    @Bean
    public JedisPool jedisPool(){
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(100);
        jedisPoolConfig.setMinIdle(1);
        jedisPoolConfig.setMaxWaitMillis(2000);
        jedisPoolConfig.setTestOnBorrow(true);
        jedisPoolConfig.setTestOnReturn(true);
        JedisPool jedisPool=new JedisPool(jedisPoolConfig,"127.0.0.1",6379);
        return  jedisPool;
    }
}

实现RedisLock分布式锁:

public class RedisLock implements Lock {
    @Autowired
    JedisPool jedisPool;

    private static final String key = "lock";
    private ThreadLocal<String> threadLocal = new ThreadLocal<>();

    @Override
    public void lock() {
        boolean b = tryLock();
        if (b) {
            return;
        }
        try {
            TimeUnit.MILLISECONDS.sleep(50);
        } catch (Exception e) {
            e.printStackTrace();
        }
        lock();//递归调用
    }

    @Override
    public boolean tryLock() {
        SetParams setParams = new SetParams();
        setParams.ex(10);
        setParams.nx();
        String s = UUID.randomUUID().toString();
        Jedis resource = jedisPool.getResource();
        String lock = resource.set(key, s, setParams);
        resource.close();
        if ("OK".equals(lock)) {
            threadLocal.set(s);
            return true;
        }
        return false;
    }

    //解锁判断锁是不是自己加的
    @Override
    public void unlock(){
        //调用lua脚本解锁
        String script="if redis.call(\"get\",KEYS[1]==ARGV[1] then\n"+
                "   return redis.call(\"del\",KEYS[1])\n"+
                "else\n"+
                "   return 0\n"+
                "end";
        Jedis resource = jedisPool.getResource();
        Object eval=resource.eval(script, Arrays.asList(key),Arrays.asList(threadLocal.get()));
        if (Integer.valueOf(eval.toString())==0){
            resource.close();
            throw new RuntimeException("解锁失败");
        }
        /*
        *不写成下面这种也是因为不是原子操作,和ex、nx相同
        String s = resource.get(key);
        if (threadLocal.get().equals(s)){
            resource.del(key);
        }
        */
        resource.close();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

简单对上面代码中需要注意的地方做一解释:

加锁过程中,使用SetParams 同时设置nx和ex的值,保证原子操作通过ThreadLocal保存key对应的value,通过value来判断锁是否当前线程自己加的,避免线程错乱解锁释放锁的过程中,使用lua脚本进行删除,保证Redis在执行此脚本时不执行其他操作,从而保证操作的原子性

但是,这段手写的代码可能会存在一个问题,就是不能保证业务逻辑一定能被执行完成,因为设置了锁的过期时间可能导致过期。

Redisson

基于上面存在的问题,我们可以使用Redisson分布式可重入锁。Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。

引入依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.10.7</version>
</dependency>

配置RedissonClient,然后我们对常用方法进行测试。

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient(){
        Config config=new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redissonClient= Redisson.create(config);
        return redissonClient;
    }
}

lock()

先写一个测试接口:

@GetMapping("/lock")
public String test() {
    RLock lock = redissonClient.getLock("lock");
    lock.lock();
    System.out.println(Thread.currentThread().getName()+" get redisson lock");

    try {
        System.out.println("do something");
        TimeUnit.SECONDS.sleep(20);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    lock.unlock();
    System.out.println(Thread.currentThread().getName()+ " release lock");

   return "locked";
}

进行测试,同时发送两个请求,redisson锁生效:

lock(long leaseTime, TimeUnit unit)

Redisson可以给lock()方法提供leaseTime参数来指定加锁的时间,超过这个时间后锁可以自动释放。测试接口:

@GetMapping("/lock2")
public String test2() {
    RLock lock = redissonClient.getLock("lock");
    lock.lock(10,TimeUnit.SECONDS);
    System.out.println(Thread.currentThread().getName()+" get redisson lock");

    try {
        System.out.println("do something");
        TimeUnit.SECONDS.sleep(20);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(Thread.currentThread().getName()+ " release lock");
    return "locked";
}

运行结果:

可以看出,在第一个线程还没有执行完成时,就释放了redisson锁,第二个线程进入后,两个线程可以同时执行被锁住的代码逻辑。这样可以实现无需调用unlock方法手动解锁。

tryLock(long waitTime, long leaseTime, TimeUnit unit)

tryLock方法会尝试加锁,最多等待waitTime秒,上锁以后过leaseTime秒自动解锁;如果没有等待时间,锁不住直接返回false。

@GetMapping("/lock3")
public String test3() {
    RLock lock = redissonClient.getLock("lock");
    try {
        boolean res = lock.tryLock(5, 30, TimeUnit.SECONDS);
        if (res){
            try{
                System.out.println(Thread.currentThread().getName()+" 获取到锁,返回true");
                System.out.println("do something");
                TimeUnit.SECONDS.sleep(20);
            }finally {
                lock.unlock();
                System.out.println(Thread.currentThread().getName()+" 释放锁");
            }
        }else {
            System.out.println(Thread.currentThread().getName()+" 未获取到锁,返回false");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "lock";
}

运行结果:

可见在第一个线程获得锁后,第二个线程超过等待时间仍未获得锁,返回false放弃获得锁的过程。

除了以上单机Redisson锁以外,还支持我们之前提到过的哨兵模式和集群模式,只需要改变Config的配置即可。以集群模式为例:

@Bean
public RedissonClient redissonClient(){
    Config config=new Config();
    config.useClusterServers().addNodeAddress("redis://172.20.5.170:7000")
        .addNodeAddress("redis://172.20.5.170:7001")
        .addNodeAddress("redis://172.20.5.170:7002")
        .addNodeAddress("redis://172.20.5.170:7003")
        .addNodeAddress("redis://172.20.5.170:7004")
        .addNodeAddress("redis://172.20.5.170:7005");
    RedissonClient redissonClient = Redisson.create(config);
    return redissonClient;
}

RedLock红锁

下面介绍一下Redisson红锁RedissonRedLock,该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。

RedissonRedLock针对的多个Redis节点,这多个节点可以是集群,也可以不是集群。当我们使用RedissonRedLock时,只要在大部分节点上加锁成功就算成功。看一下使用:

@GetMapping("/testRedLock")
public void testRedLock() {
    Config config1 = new Config();
    config1.useSingleServer().setAddress("redis://172.20.5.170:6379");
    RedissonClient redissonClient1 = Redisson.create(config1);

    Config config2 = new Config();
    config2.useSingleServer().setAddress("redis://172.20.5.170:6380");
    RedissonClient redissonClient2 = Redisson.create(config2);

    Config config3 = new Config();
    config3.useSingleServer().setAddress("redis://172.20.5.170:6381");
    RedissonClient redissonClient3 = Redisson.create(config3);

    String resourceName = "REDLOCK";
    RLock lock1 = redissonClient1.getLock(resourceName);
    RLock lock2 = redissonClient2.getLock(resourceName);
    RLock lock3 = redissonClient3.getLock(resourceName);

    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
    boolean isLock;
    try {
        isLock = redLock.tryLock(5, 30, TimeUnit.SECONDS);
        if (isLock) {
            System.out.println("do something");
            TimeUnit.SECONDS.sleep(20);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        redLock.unlock();
    }
}

相对于单Redis节点来说,RedissonRedLock的优点在于防止了单节点故障造成整个服务停止运行的情况;并且在多节点中锁的设计,及多节点同时崩溃等各种意外情况有自己独特的设计方法。使用RedissonRedLock,性能方面会比单节点Redis分布式锁差一些,但可用性比普通锁高很多。

总结

到此这篇关于巧用Redis实现分布式锁详细介绍的文章就介绍到这了,更多相关Redis分布式锁内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Redis实现分布式锁的实例讲解

    在一个分布式系统中,会遇到一些需要对多个节点共享的资源加锁的情况,这个时候需要用到分布式锁.分布式锁通常保存在一个共享的存储系统中,可以被多个节点共享和访问. 锁的本质 简单来讲,锁可以用一个变量来表示.比如,在一个单机多线程的程序来说,某个资源的锁用一个 bit 的数据就可以表示.即 0 表示没有资源可以访问,1 表示资源的锁已被别的线程获取,不能访问. 获取和释放特定资源的锁,本质上就是为获取和修改这个变量的值.如果值是 0 则将其修改为 1,就完成了获取的过程,如果访问到的值不是 0,则获

  • Redis分布式锁详细介绍

    目录 分布式锁 redis实现分布式锁的原理 死锁问题 超时问题 锁误放问题 可重入性 Redlock 分布式锁 在单进程应用中,当一段代码同一时间内只能由一个线程执行时, 多线程下可能会出错,例如两个线程同时对一个数字做累加,两个线程同时拿到了该数字,例如40,一个线程加了10,一个线程加了20,正确结果应该是70, 但由于两个线程在自己的内存中一个算出的是50,一个算出的是60,此时二者都将自己的结果往该数字原本的地方写(保存), 这时候,肯定会有一个线程的值会被覆盖,因为读取->计算->

  • redis分布式锁解决表单重复提交的问题

    假如用户的网速慢,用户点击提交按钮,却因为网速慢,而没有跳转到新的页面,这时的用户会再次点击提交按钮,举个例子:用户点击订单页面,当点击提交按钮的时候,也许因为网速的原因,没有跳转到新的页面,这时的用户会再次点击提交按钮,如果没有经过处理的话,这时用户就会生成两份订单,类似于这种场景都叫重复提交. 使用redis的setnx和getset命令解决表单重复提交的问题. 1.引入redis依赖和aop依赖 <dependency> <groupId>org.springframewor

  • 带你轻松掌握Redis分布式锁

    目录 1. 什么是分布式锁 2. 分布式锁该具备的特性 3. 基于数据库做分布式锁 4. 基于Redis做分布式锁 4.1 超时问题 4.2 可重入锁 4.3 集群环境的缺陷 4.4 Redlock 目前很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题. 基于 CAP理论,任何一个分布式系统都无法同时满足一致性(Consistency).可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项. 我们为

  • 巧用Redis实现分布式锁详细介绍

    目录 前言 手写Redis分布式锁 Redisson lock() lock(long leaseTime, TimeUnit unit) tryLock(long waitTime, long leaseTime, TimeUnit unit) RedLock红锁 总结 前言 无论是synchronized还是Lock,都运行在线程级别上,必须运行在同一个JVM中.如果竞争资源的进程不在同一个JVM中时,这样线程锁就无法起到作用,必须使用分布式锁来控制多个进程对资源的访问. 分布式锁的实现一般

  • Golang分布式锁详细介绍

    目录 进程内加锁 trylock 基于redis的setnx 基于zk 基于etcd redlock 如何选择 在单机程序并发或并行修改全局变量时,需要对修改行为加锁以创造临界区.为什么需要加锁呢?可以看看下段代码: package main import ( "sync" ) // 全局变量 var counter int func main() { var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go f

  • Redis实现分布式锁方法详细

    目录 1. 单机数据一致性 2. 分布式数据一致性 3. Redis实现分布式锁 3.1 方式一 3.2 方式二(改进方式一) 3.3 方式三(改进方式二) 3.4 方式四(改进方式三) 3.5 方式五(改进方式四) 3.6 小结 在单体应用中,如果我们对共享数据不进行加锁操作,会出现数据一致性问题,我们的解决办法通常是加锁. 在分布式架构中,我们同样会遇到数据共享操作问题,本文章使用Redis来解决分布式架构中的数据一致性问题. 1. 单机数据一致性 单机数据一致性架构如下图所示:多个可客户访

  • python如何使用Redis构建分布式锁

    这篇文章主要介绍了python如何使用Redis构建分布式锁,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 在实际应用场景中,我们可能有多个worker,可能在一台机器,也可能分布在不同的机器,但只有一个worker可以同时持有一把锁,这个时候我们就需要用到分布式锁了. 这里推荐python的实现库,Redlock-py(Python 实现). 正常情况下,worker获得锁后,处理自己的任务,完成后自动释放持有的锁,是不是感觉有点熟悉,很容易

  • Redis实现分布式锁的五种方法详解

    目录 1. 单机数据一致性 2. 分布式数据一致性 3. Redis实现分布式锁 3.1 方式一 3.2 方式二(改进方式一) 3.3 方式三(改进方式二) 3.4 方式四(改进方式三) 3.5 方式五(改进方式四) 3.6 小结 在单体应用中,如果我们对共享数据不进行加锁操作,会出现数据一致性问题,我们的解决办法通常是加锁. 在分布式架构中,我们同样会遇到数据共享操作问题,本文章使用Redis来解决分布式架构中的数据一致性问题. 1. 单机数据一致性 单机数据一致性架构如下图所示:多个可客户访

  • Redis实现分布式锁详解

    目录 一.前言 为什么需要分布式锁? 二.基于redis实现分布式锁 为什么redis可以实现分布式锁? 如何实现? 锁的获取 锁的释放 三.如何避免死锁?锁的过期时间如何设置? 避免死锁 锁过期处理 释放其他服务的锁如何处理呢? 那么redis宕机了呢? 四.RedLock 什么是RedLock? 实现流程 分布式系统中的NPC问题 个人思考 五.基于zookeeper实现分布式锁 什么是zookeeper(zk)? zookeeper节点介绍 zookeeper分布式锁的实现 六.zooke

  • SpringBoot使用Redis实现分布式锁

    前言 在单机应用时代,我们对一个共享的对象进行多线程访问的时候,使用java的synchronized关键字或者ReentrantLock类对操作的对象加锁就可以解决对象的线程安全问题. 分布式应用时代这个方法却行不通了,我们的应用可能被部署到多台机器上,运行在不同的JVM里,一个对象可能同时存在多台机器的内存中,怎样使共享对象同时只被一个线程处理就成了一个问题. 在分布式系统中为了保证一个对象在高并发的情况下只能被一个线程使用,我们需要一种跨JVM的互斥机制来控制共享资源的访问,此时就需要用到

  • 基于redis实现分布式锁的原理与方法

    前言 系统的不断扩大,分布式锁是最基本的保障.与单机的多线程不一样的是,分布式跨多个机器.线程的共享变量无法跨机器. 为了保证一个在高并发存场景下只能被同一个线程操作,java并发处理提供ReentrantLock或Synchronized进行互斥控制.但是这仅仅对单机环境有效.我们实现分布式锁大概通过三种方式. redis实现分布式锁 数据库实现分布式锁 zk实现分布式锁 今天我们介绍通过redis实现分布式锁.实际上这三种和java对比看属于一类.都是属于程序外部锁. 原理剖析 上述三种分布

  • SpringBoot中使用redis做分布式锁的方法

    一.模拟问题 最近在公司遇到一个问题,挂号系统是做的集群,比如启动了两个相同的服务,病人挂号的时候可能会出现同号的情况,比如两个病人挂出来的号都是上午2号.这就出现了问题,由于是集群部署的,所以单纯在代码中的方法中加锁是不能解决这种情况的.下面我将模拟这种情况,用redis做分布式锁来解决这个问题. 1.新建挂号明细表 2.在idea上新建项目 下图是创建好的项目结构,上面那个parent项目是其他项目不用管它,和新建的没有关系 3.开始创建controller,service,dao(mapp

随机推荐