Redisson分布式信号量RSemaphore的使用超详细讲解

目录
  • 一、RSemaphore的使用
  • 二、RSemaphore设置许可数量
  • 三、RSemaphore的加锁流程
  • 四、RSemaphore的解锁流程

本篇文章基于redisson-3.17.6版本源码进行分析

一、RSemaphore的使用

@Test
public void testRSemaphore() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    RedissonClient redissonClient = Redisson.create(config);
    RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore");
    // 设置5个许可,模拟五个停车位
    rSemaphore.trySetPermits(5);
    // 创建10个线程,模拟10辆车过来停车
    for (int i = 1; i <= 10; i++) {
        new Thread(() -> {
            try {
                rSemaphore.acquire();
                System.out.println(Thread.currentThread().getName() + "进入停车场...");
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
                System.out.println(Thread.currentThread().getName() + "离开停车场...");
                rSemaphore.release();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, "A" + i).start();
    }
    try {
        TimeUnit.MINUTES.sleep(1);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

二、RSemaphore设置许可数量

初始化RSemaphore,需要调用trySetPermits()设置许可数量:

/**
 * 尝试设置许可数量,设置成功,返回true,否则返回false
 */
boolean trySetPermits(int permits);

trySetPermits()内部调用了trySetPermitsAsync():

// 异步设置许可
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 判断分布式信号量的key是否存在,如果不存在,才设置
            "local value = redis.call('get', KEYS[1]); " +
                    "if (value == false) then "
                    // set "semaphore" permits
                    // 使用String数据结构设置信号量的许可数
                    + "redis.call('set', KEYS[1], ARGV[1]); "
                    // 发布一条消息到redisson_sc:{semaphore}通道
                    + "redis.call('publish', KEYS[2], ARGV[1]); "
                    // 设置成功,返回1
                    + "return 1;"
                    + "end;"
                    // 否则返回0
                    + "return 0;",
            Arrays.asList(getRawName(), getChannelName()), permits);
    if (log.isDebugEnabled()) {
        future.thenAccept(r -> {
            if (r) {
                log.debug("permits set, permits: {}, name: {}", permits, getName());
            } else {
                log.debug("unable to set permits, permits: {}, name: {}", permits, getName());
            }
        });
    }
    return future;
}

可以看到,设置许可数量底层使用LUA脚本,实际上就是使用redis的String数据结构,保存了我们指定的许可数量。如下图:

参数说明:

  • KEYS[1]: 我们指定的分布式信号量key,例如redissonClient.getSemaphore("semaphore")中的"semaphore")
  • KEYS[2]: 释放锁的channel名称,redisson_sc:{分布式信号量key},在本例中,就是redisson_sc:{semaphore}
  • ARGV[1]: 设置的许可数量

总结设置许可执行流程为:

  • get semaphore,获取到semaphore信号量的当前的值
  • 第一次数据为0, 然后使用set semaphore 3,将这个信号量同时能够允许获取锁的客户端的数量设置为3。(注意到,如果之前设置过了信号量,将无法再次设置,直接返回0。想要更改信号量总数可以使用addPermits方法)
  • 然后redis发布一些消息,返回1

三、RSemaphore的加锁流程

许可数量设置好之后,我们就可以调用acquire()方法获取了,如果未传入许可数量,默认获取一个许可。

public void acquire() throws InterruptedException {
    acquire(1);
}
public void acquire(int permits) throws InterruptedException {
    // 尝试获取锁成功,直接返回
    if (tryAcquire(permits)) {
        return;
    }
    // 对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息
    CompletableFuture<RedissonLockEntry> future = subscribe();
    semaphorePubSub.timeout(future);
    RedissonLockEntry entry = commandExecutor.getInterrupted(future);
    try {
        // 不断循环尝试获取许可
        while (true) {
            if (tryAcquire(permits)) {
                return;
            }
            entry.getLatch().acquire();
        }
    } finally {
        // 取消订阅
        unsubscribe(entry);
    }
//        get(acquireAsync(permits));
}

可以看到,获取许可的核心逻辑在tryAcquire()方法中,如果tryAcquire()返回true说明获取许可成功,直接返回;如果返回false,说明当前没有许可可以使用,则对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息,并通过死循环不断尝试获取锁。

我们看一下tryAcquire()方法的逻辑,内部调用了tryAcquireAsync()方法:

// 异步获取许可
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return new CompletableFutureWrapper<>(true);
    }
    return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              // 获取当前剩余的许可数量
              "local value = redis.call('get', KEYS[1]); " +
              // 许可不为空,并且许可数量 大于等于 当前线程申请的许可数量
              "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
                  // 通过decrby减少剩余可用许可
                  "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
                  // 返回1
                  "return 1; " +
              "end; " +
              // 其它情况,返回0
              "return 0;",
              Collections.<Object>singletonList(getRawName()), permits);
}

从源码可以看到,获取许可就是操作redis中的数据,首先获取到redis中剩余的许可数量,只有当剩余的许可数量大于线程申请的许可数量时,才获取成功,返回1;否则获取失败,返回0;

总结加锁执行流程为:

  • get semaphore,获取到一个当前的值,比如说是3,3 > 1
  • decrby semaphore 1,将信号量允许获取锁的客户端的数量递减1,变成2
  • decrby semaphore 1
  • decrby semaphore 1
  • 执行3次加锁后,semaphore值为0
  • 此时如果再来进行加锁则直接返回0,然后进入死循环去获取锁

四、RSemaphore的解锁流程

通过前面对RSemaphore获取锁的分析,我们很容易能猜到,释放锁,无非就是归还许可数量到redis中。我们查看具体的源码:

public RFuture<Void> releaseAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return new CompletableFutureWrapper<>((Void) null);
    }
    RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
            // 通过incrby增加许可数量
            "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
                    // 发布一条消息到redisson_sc:{semaphore}中
                    "redis.call('publish', KEYS[2], value); ",
            Arrays.asList(getRawName(), getChannelName()), permits);
    if (log.isDebugEnabled()) {
        future.thenAccept(o -> {
            log.debug("released, permits: {}, name: {}", permits, getName());
        });
    }
    return future;
}

到此这篇关于Redisson分布式信号量RSemaphore的使用超详细讲解的文章就介绍到这了,更多相关Redisson RSemaphore内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Redisson公平锁的源码解读分享

    目录 前言 公平锁 加锁 解锁 总结 前言 我在上一篇文章聊了Redisson的分布式锁,这次继续来聊聊Redisson的公平锁.下面是官方原话: 它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程.所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒. 源码版本:3.17.7 这是我 fork 的分支,添加了自己理解的中文注释:https://g

  • Redisson分布式锁的源码解读分享

    目录 前言 前置知识 分布式锁的思考 Redis订阅/发布机制 Redisson 加锁 订阅 解锁 看门狗 前言 Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid).Redisson有一样功能是可重入的分布式锁.本文来讨论一下这个功能的特点以及源码分析. 前置知识 在讲Redisson,咱们先来聊聊分布式锁的特点以及Redis的发布/订阅机制,磨刀不误砍柴工. 分布式锁的思考 首先思考下,如果我们自己去实现一个分布式锁,这个锁需要具备

  • Redisson分布式信号量RSemaphore的使用超详细讲解

    目录 一.RSemaphore的使用 二.RSemaphore设置许可数量 三.RSemaphore的加锁流程 四.RSemaphore的解锁流程 本篇文章基于redisson-3.17.6版本源码进行分析 一.RSemaphore的使用 @Test public void testRSemaphore() { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379&quo

  • Mysql超详细讲解死锁问题的理解

    目录 1.什么是死锁? 2.Mysql出现死锁的必要条件 资源独占条件 请求和保持条件 不剥夺条件 相互获取锁条件 3. Mysql经典死锁案例 3.1 建表语句 3.2 初始化相关数据 3.3 正常转账过程 3.4 死锁转账过程 3.5 死锁导致的问题 4.如何解决死锁问题? 4.1 打破请求和保持条件 4.2 打破相互获取锁条件(推荐) 5.总结 1.什么是死锁? 死锁指的是在两个或两个以上不同的进程或线程中,由于存在共同资源的竞争或进程(或线程)间的通讯而导致各个线程间相互挂起等待,如果没

  • 超详细讲解Linux C++多线程同步的方式

    目录 一.互斥锁 1.互斥锁的初始化 2.互斥锁的相关属性及分类 3,测试加锁函数 二.条件变量 1.条件变量的相关函数 1)初始化的销毁读写锁 2)以写的方式获取锁,以读的方式获取锁,释放读写锁 四.信号量 1)信号量初始化 2)信号量值的加减 3)对信号量进行清理 背景问题:在特定的应用场景下,多线程不进行同步会造成什么问题? 通过多线程模拟多窗口售票为例: #include <iostream> #include<pthread.h> #include<stdio.h&

  • Redis超详细讲解高可用主从复制基础与哨兵模式方案

    目录 高可用基础---主从复制 主从复制的原理 主从复制配置 示例 1.创建Redis实例 2.连接数据库并设置主从复制 高可用方案---哨兵模式sentinel 哨兵模式简介 哨兵工作原理 哨兵故障修复原理 sentinel.conf配置讲解 哨兵模式的优点 哨兵模式的缺点 高可用基础---主从复制 Redis的复制功能是支持将多个数据库之间进行数据同步,主数据库可以进行读写操作.当主数据库数据发生改变时会自动同步到从数据库,从数据库一般是只读的,会接收注数据库同步过来的数据. 一个主数据库可

  • SpringCloud超详细讲解Feign声明式服务调用

    目录 入门案例 @FeignClient注解详解 Feign Client的配置 Feign请求添加headers 负载均衡 (Ribbon) 容错机制 Hystrix支持 Sentinel支持 Feign开启容错机制支持后的使用方式 请求压缩feign.compression 日志级别 入门案例 在服务消费者导入依赖 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>

  • Redis缓存实例超详细讲解

    目录 1 前言 1.1 什么是缓存 1.2 缓存的作用及成本 1.3 Redis缓存模型 2 给商户信息添加缓存 3 缓存更新策略 3.1 更新策略介绍 3.2 主动更新策略 3.3 主动更新策略练习 4 缓存穿透及其解决方案 4.1 缓存穿透的概念 4.2 解决方案及实现 5 缓存雪崩的概念及其解决方案 6 缓存击穿及解决方案 6.1 什么是缓存击穿 6.2 缓存击穿解决方法 6.2.1 互斥锁 6.2.2 逻辑过期 1 前言 1.1 什么是缓存 缓存就是数据交换的缓冲区(称作Cache [

  • Python多进程并发与同步机制超详细讲解

    目录 多进程 僵尸进程 Process类 函数方式 继承方式 同步机制 状态管理Managers 在<多线程与同步>中介绍了多线程及存在的问题,而通过使用多进程而非线程可有效地绕过全局解释器锁. 因此,通过multiprocessing模块可充分地利用多核CPU的资源. 多进程 多进程是通过multiprocessing包来实现的,multiprocessing.Process对象(和多线程的threading.Thread类似)用来创建一个进程对象: 在类UNIX平台上,需要对每个Proce

  • RocketMQ生产消息与消费消息超详细讲解

    目录 1 RocketMQ简介 2 MQ的常见产品 3 环境搭建 4 单生产者单消费者模式 5 单生产者多消费者模式 5.1默认模式(负载均衡) 5.2广播模式 6 多生产者多消费者模式 1 RocketMQ简介 RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目.并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的

  • java反射超详细讲解

    目录 Java反射超详解✌ 1.反射基础 1.1Class类 1.2类加载 2.反射的使用 2.1Class对象的获取 2.2Constructor类及其用法 2.4Method类及其用法 Java反射超详解✌ 1.反射基础 Java反射机制是在程序的运行过程中,对于任何一个类,都能够知道它的所有属性和方法:对于任意一个对象,都能够知道它的任意属性和方法,这种动态获取信息以及动态调用对象方法的功能称为Java语言的反射机制. Java反射机制主要提供以下这几个功能: 在运行时判断任意一个对象所属

  • 超详细讲解Linux DHCP服务

    目录 一.DHCP服务(动态主机配置协议) 1.背景 2.概述 3.优点 4.DHCP报文类型 5.DHCP 的分配方式 二.安装 DHCP 服务器 1.DHCP 服务软件 2.主配置文件 三.配置步骤 1.使用 DHCP 动态的给 PC 机分配 IP 地址 ① eNSP ②虚拟机 ③验证 ④进入命令行"ipconfig"测试 一.DHCP服务(动态主机配置协议) 1.背景 1.手动设置工作量大且容易冲突 2.用DHCP可以减少工作量和避免地址冲突 2.概述 作用:为局域网内的电脑分配

随机推荐