详解Java如何实现基于Redis的分布式锁

前言

单JVM内同步好办, 直接用JDK提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用Redis,当然还有很多其他的实现方式。其实基于Redis实现的原理还算比较简单的,在看代码之前建议大家先去看看原理,看懂了之后看代码应该就容易理解了。

我这里不实现JDK的java.util.concurrent.locks.Lock接口,而是自定义一个,因为JDK的有个newCondition方法我这里暂时没实现。这个Lock提供了5个lock方法的变体,可以自行选择使用哪一个来获取锁,我的想法是最好用带超时返回的那几个方法,因为不这样的话,假如redis挂了,线程永远都在那死循环了(关于这里,应该还可以进一步优化,如果redis挂了,Jedis的操作肯定会抛异常之类的,可以定义个机制让redis挂了的时候通知使用这个lock的用户,或者说是线程)

package cc.lixiaohui.lock;

import java.util.concurrent.TimeUnit;

public interface Lock {

 /**
 * 阻塞性的获取锁, 不响应中断
 */
 void lock;

 /**
 * 阻塞性的获取锁, 响应中断
 *
 * @throws InterruptedException
 */
 void lockInterruptibly throws InterruptedException;

 /**
 * 尝试获取锁, 获取不到立即返回, 不阻塞
 */
 boolean tryLock;

 /**
 * 超时自动返回的阻塞性的获取锁, 不响应中断
 *
 * @param time
 * @param unit
 * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未���取到锁
  *
 */
 boolean tryLock(long time, TimeUnit unit);

 /**
 * 超时自动返回的阻塞性的获取锁, 响应中断
 *
 * @param time
 * @param unit
 * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁
 * @throws InterruptedException 在尝试获取锁的当前线程被中断
 */
 boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException;

 /**
 * 释放锁
 */
 void unlock;

}

看其抽象实现:

package cc.lixiaohui.lock;

import java.util.concurrent.TimeUnit;

/**
 * 锁的骨架实现, 真正的获取锁的步骤由子类去实现.
 *
 * @author lixiaohui
 *
 */
public abstract class AbstractLock implements Lock {

 /**
 * <pre>
 * 这里需不需要保证可见性值得讨论, 因为是分布式的锁,
 * 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性
 * 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了.
 * </pre>
 */
 protected volatile boolean locked;

 /**
 * 当前jvm内持有该锁的线程(if have one)
 */
 private Thread exclusiveOwnerThread;

 public void lock {
 try {
 lock(false, 0, null, false);
 } catch (InterruptedException e) {
 // TODO ignore
 }
 }

 public void lockInterruptibly throws InterruptedException {
 lock(false, 0, null, true);
 }

 public boolean tryLock(long time, TimeUnit unit) {
 try {
 return lock(true, time, unit, false);
 } catch (InterruptedException e) {
 // TODO ignore
 }
 return false;
 }

 public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {
 return lock(true, time, unit, true);
 }

 public void unlock {
 // TODO 检查当前线程是否持有锁
 if (Thread.currentThread != getExclusiveOwnerThread) {
 throw new IllegalMonitorStateException("current thread does not hold the lock");
 }

 unlock0;
 setExclusiveOwnerThread(null);
 }

 protected void setExclusiveOwnerThread(Thread thread) {
 exclusiveOwnerThread = thread;
 }

 protected final Thread getExclusiveOwnerThread {
 return exclusiveOwnerThread;
 }

 protected abstract void unlock0;

 /**
 * 阻塞式获取锁的实现
 *
 * @param useTimeout
 * @param time
 * @param unit
 * @param interrupt 是否响应中断
 * @return
 * @throws InterruptedException
 */
 protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException;

}

基于Redis的最终实现,关键的获取锁,释放锁的代码在这个类的lock方法和unlock0方法里,大家可以只看这两个方法然后完全自己写一个:

package cc.lixiaohui.lock;

import java.util.concurrent.TimeUnit;

import redis.clients.jedis.Jedis;

/**
 * <pre>
 * 基于Redis的SETNX操作实现的分布式锁
 *
 * 获取锁时最好用lock(long time, TimeUnit unit), 以免网路问题而导致线程一直阻塞
 *
 * <a href="http://redis.io/commands/setnx">SETNC操作参考资料</a>
 * </pre>
 *
 * @author lixiaohui
 *
 */
public class RedisBasedDistributedLock extends AbstractLock {

 private Jedis jedis;

 // 锁的名字
 protected String lockKey;

 // 锁的有效时长(毫秒)
 protected long lockExpires;

 public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires) {
 this.jedis = jedis;
 this.lockKey = lockKey;
 this.lockExpires = lockExpires;
 }

 // 阻塞式获取锁的实现
 protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{
 if (interrupt) {
 checkInterruption;
 }

 long start = System.currentTimeMillis;
 long timeout = unit.toMillis(time); // if !useTimeout, then it's useless

 while (useTimeout ? isTimeout(start, timeout) : true) {
 if (interrupt) {
 checkInterruption;
 }

 long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//锁超时时间
 String stringOfLockExpireTime = String.valueOf(lockExpireTime);

 if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁
 // TODO 成功获取到锁, 设置相关标识
 locked = true;
 setExclusiveOwnerThread(Thread.currentThread);
 return true;
 }

 String value = jedis.get(lockKey);
 if (value != null && isTimeExpired(value)) { // lock is expired
 // 假设多个线程(非单jvm)同时走到这里
 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
 // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)
 // 加入拿到的oldValue依然是expired的,那么就说明拿到锁了
 if (oldValue != null && isTimeExpired(oldValue)) {
  // TODO 成功获取到锁, 设置相关标识
  locked = true;
  setExclusiveOwnerThread(Thread.currentThread);
  return true;
 }
 } else {
 // TODO lock is not expired, enter next loop retrying
 }
 }
 return false;
 }

 public boolean tryLock {
 long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//锁超时时间
 String stringOfLockExpireTime = String.valueOf(lockExpireTime);

 if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁
 // TODO 成功获取到锁, 设置相关标识
 locked = true;
 setExclusiveOwnerThread(Thread.currentThread);
 return true;
 }

 String value = jedis.get(lockKey);
 if (value != null && isTimeExpired(value)) { // lock is expired
 // 假设多个线程(非单jvm)同时走到这里
 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
 // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)
 // 假如拿到的oldValue依然是expired的,那么就说明拿到锁了
 if (oldValue != null && isTimeExpired(oldValue)) {
 // TODO 成功获取到锁, 设置相关标识
 locked = true;
 setExclusiveOwnerThread(Thread.currentThread);
 return true;
 }
 } else {
 // TODO lock is not expired, enter next loop retrying
 }

 return false;
 }

 /**
 * Queries if this lock is held by any thread.
 *
 * @return {@code true} if any thread holds this lock and
  *   {@code false} otherwise
 */
 public boolean isLocked {
 if (locked) {
 return true;
 } else {
 String value = jedis.get(lockKey);
 // TODO 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了,
 // 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断
 // 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就
 // 不是同步控制, 它只是一种锁状态的报告.
 return !isTimeExpired(value);
 }
 }

 @Override
 protected void unlock0 {
 // TODO 判断锁是否过期
 String value = jedis.get(lockKey);
 if (!isTimeExpired(value)) {
 doUnlock;
 }
 }

 private void checkInterruption throws InterruptedException {
 if(Thread.currentThread.isInterrupted) {
 throw new InterruptedException;
 }
 }

 private boolean isTimeExpired(String value) {
 return Long.parseLong(value) < System.currentTimeMillis;
 }

 private boolean isTimeout(long start, long timeout) {
 return start + timeout > System.currentTimeMillis;
 }

 private void doUnlock {
 jedis.del(lockKey);
 }

}

如果将来还换一种实现方式(比如zookeeper之类的),到时直接继承AbstractLock并实现lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt)unlock0方法即可(所谓抽象嘛)

测试

模拟全局ID增长器,设计一个IDGenerator类,该类负责生成全局递增ID,其代码如下:

package cc.lixiaohui.lock;

import java.math.BigInteger;
import java.util.concurrent.TimeUnit;

/**
 * 模拟ID生成
 * @author lixiaohui
 *
 */
public class IDGenerator {

 private static BigInteger id = BigInteger.valueOf(0);

 private final Lock lock;

 private static final BigInteger INCREMENT = BigInteger.valueOf(1);

 public IDGenerator(Lock lock) {
 this.lock = lock;
 }

 public String getAndIncrement {
 if (lock.tryLock(3, TimeUnit.SECONDS)) {
 try {
 // TODO 这里获取到锁, 访问临界区资源
 return getAndIncrement0;
 } finally {
 lock.unlock;
 }
 }
 return null;
 //return getAndIncrement0;
 }

 private String getAndIncrement0 {
 String s = id.toString;
 id = id.add(INCREMENT);
 return s;
 }
}

测试主逻辑:同一个JVM内开两个线程死循环地(循环之间无间隔,有的话测试就没意义了)获取ID(我这里并不是死循环而是跑20s),获取到ID存到同一个Set里面,在存之前先检查该IDset中是否存在,如果已存在,则让两个线程都停止。如果程序能正常跑完20s,那么说明这个分布式锁还算可以满足要求,如此测试的效果应该和不同JVM(也就是真正的分布式环境中)测试的效果是一样的,下面是测试类的代码:

package cc.lixiaohui.DistributedLock.DistributedLock;

import java.util.HashSet;
import java.util.Set;

import org.junit.Test;

import redis.clients.jedis.Jedis;
import cc.lixiaohui.lock.IDGenerator;
import cc.lixiaohui.lock.Lock;
import cc.lixiaohui.lock.RedisBasedDistributedLock;

public class IDGeneratorTest {

 private static Set<String> generatedIds = new HashSet<String>;

 private static final String LOCK_KEY = "lock.lock";
 private static final long LOCK_EXPIRE = 5 * 1000;

 @Test
 public void test throws InterruptedException {
 Jedis jedis1 = new Jedis("localhost", 6379);
 Lock lock1 = new RedisBasedDistributedLock(jedis1, LOCK_KEY, LOCK_EXPIRE);
 IDGenerator g1 = new IDGenerator(lock1);
 IDConsumeMission consume1 = new IDConsumeMission(g1, "consume1");

 Jedis jedis2 = new Jedis("localhost", 6379);
 Lock lock2 = new RedisBasedDistributedLock(jedis2, LOCK_KEY, LOCK_EXPIRE);
 IDGenerator g2 = new IDGenerator(lock2);
 IDConsumeMission consume2 = new IDConsumeMission(g2, "consume2");

 Thread t1 = new Thread(consume1);
 Thread t2 = new Thread(consume2);
 t1.start;
 t2.start;

 Thread.sleep(20 * 1000); //让两个线程跑20秒

 IDConsumeMission.stop;

 t1.join;
 t2.join;
 }

 static String time {
 return String.valueOf(System.currentTimeMillis / 1000);
 }

 static class IDConsumeMission implements Runnable {

 private IDGenerator idGenerator;

 private String name;

 private static volatile boolean stop;

 public IDConsumeMission(IDGenerator idGenerator, String name) {
 this.idGenerator = idGenerator;
 this.name = name;
 }

 public static void stop {
 stop = true;
 }

 public void run {
 System.out.println(time + ": consume " + name + " start ");
 while (!stop) {
 String id = idGenerator.getAndIncrement;
 if(generatedIds.contains(id)) {
  System.out.println(time + ": duplicate id generated, id = " + id);
  stop = true;
  continue;
 } 

 generatedIds.add(id);
 System.out.println(time + ": consume " + name + " add id = " + id);
 }
 System.out.println(time + ": consume " + name + " done ");
 }

 }

}

说明一点,我这里停止两个线程的方式并不是很好,我是为了方便才这么做的,因为只是测试,最好不要这么做。

测试结果

跑20s打印的东西太多,前面打印的被clear了,只有差不多跑完的时候才有,下面截图。说明了这个锁能正常工作:

IDGererator没有加锁(即IDGereratorgetAndIncrement方法内部获取id时不上锁)时,测试是不通过的,非常大的概率中途就会停止,下面是不加锁时的测试结果:

这个1秒都不到:

这个也1秒都不到:

结束语

好了,以上就是Java实现基于Redis的分布式锁的全部内容,各位如果发现问题希望能指正,希望这篇文章能对大家的学习和工作带来一定的帮助,如果有疑问可以留言交流。

(0)

相关推荐

  • Java编程redisson实现分布式锁代码示例

    最近由于工作很忙,很长时间没有更新博客了,今天为大家带来一篇有关Redisson实现分布式锁的文章,好了,不多说了,直接进入主题. 1. 可重入锁(Reentrant Lock) Redisson的分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口,同时还支持自动过期解锁. public void testReentrantLock(RedissonClient redisson){ RLock lock = redisson.getL

  • java使用zookeeper实现的分布式锁示例

    使用zookeeper实现的分布式锁 分布式锁,实现了Lock接口 复制代码 代码如下: package com.concurrent; import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeU

  • Java分布式锁的三种实现方案

    方案一:数据库乐观锁 乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonus),有一个字段(left_count)记录礼物的剩余个数,用户每领取一个奖品,对应的left_count减1,在并发的情况下如何要保证left_count不为负数,乐观锁的实现方式为在红包表上添加一个版本号字段(version),默认为0. 异常实现流程 -- 可能会发生的异常情况 -- 线程1查询,当前left_count为1,则有记录 select * from t_bonus

  • 详解Java如何实现基于Redis的分布式锁

    前言 单JVM内同步好办, 直接用JDK提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用Redis,当然还有很多其他的实现方式.其实基于Redis实现的原理还算比较简单的,在看代码之前建议大家先去看看原理,看懂了之后看代码应该就容易理解了. 我这里不实现JDK的java.util.concurrent.locks.Lock接口,而是自定义一个,因为JDK的有个newCondition方法我这里暂时没实现.这个Lock提供了5个lock方法的变体,可以

  • 详解基于redis实现分布式锁

    前言 为了保证一个在高并发存场景下只能被同一个线程操作,java并发处理提供ReentrantLock或Synchronized进行互斥控制.但是这仅仅对单机环境有效.我们实现分布式锁大概通过三种方式. redis实现分布式锁 数据库实现分布式锁 zk实现分布式锁 原理剖析 上述三种分布式锁都是通过各自为依据对各个请求进行上锁,解锁从而控制放行还是拒绝.redis锁是基于其提供的setnx命令. setnx当且仅当key不存在.若给定key已经存在,则setnx不做任何动作.setnx是一个原子

  • php基于redis的分布式锁实例详解

    在使用分布式锁进行互斥资源访问时候,我们很多方案是采用redis的实现. 固然,redis的单节点锁在极端情况也是有问题的,假设你的业务允许偶尔的失效,使用单节点的redis锁方案就足够了,简单而且效率高. redis锁失效的情况: 客户端1从master节点获取了锁 master宕机了,存储锁的key还没来得及同步到slave节点上 slave升级为master 客户端2从新的master上获取到同一个资源的锁 于是,客户端1和客户端2同事持有了同一个资源的锁,锁的安全性被打破. 如果我们不考

  • Java基于redis实现分布式锁

    为了保证一个在高并发存场景下只能被同一个线程操作,java并发处理提供ReentrantLock或Synchronized进行互斥控制.但是这仅仅对单机环境有效.我们实现分布式锁大概通过三种方式. redis实现分布式锁 数据库实现分布式锁 zk实现分布式锁 实际上这三种和java对比看属于一类.都是属于程序外部锁. 原理剖析 上述三种分布式锁都是通过各自为依据对各个请求进行上锁,解锁从而控制放行还是拒绝.redis锁是基于其提供的setnx命令. setnx当且仅当key不存在.若给定key已

  • 详解Java编译优化之循环展开和粗化锁

    循环展开和粗化锁 我们先来回顾一下什么是循环展开. 循环展开就是说,像下面的循环遍历的例子: for (int i = 0; i < 1000; i++) { x += 0x51; } 因为每次循环都需要做跳转操作,所以为了提升效率,上面的代码其实可以被优化为下面的: for (int i = 0; i < 250; i++) { x += 0x144; //0x51 * 4 } 注意上面我们使用的是16进制数字,至于为什么要使用16进制呢?这是为了方便我们在后面的assembly代码中快速找

  • 详解Java ReentrantLock可重入,可打断,锁超时的实现原理

    目录 概述 可重入 可打断 锁超时 概述 前面讲解了ReentrantLock加锁和解锁的原理实现,但是没有阐述它的可重入.可打断以及超时获取锁失败的原理,本文就重点讲解这三种情况.建议大家先看下这篇文章了解下ReentrantLock加锁的基本原理,图解Java ReentrantLock公平锁和非公平锁的实现. 可重入 可重入是指一个线程如果获取了锁,那么它就是锁的主人,那么它可以再次获取这把锁,这种就是理解为重入,简而言之,可以重复获取同一把锁,不会造成阻塞,举个例子如下: @Test p

  • 详解如何在springcloud分布式系统中实现分布式锁

    目录 一.简介 二.redis命令介绍 三.实现思路 四.编码实现 五.注意点 六.参考资料 最近在看分布式锁的资料,看了 Josial L的<Redis in Action>的分布式锁的章节.实现思路是利用springcloud结合redis实现分布式锁. 注意:这篇文章有问题,请看这一篇https://www.jb51.net/article/228819.htm 一.简介 一般来说,对数据进行加锁时,程序先通过acquire获取锁来对数据进行排他访问,然后对数据进行一些列的操作,最后需要

  • 基于Redis实现分布式锁的方法(lua脚本版)

    1.前言 在Java中,我们通过锁来避免由于竞争而造成的数据不一致问题.通常我们使用synchronized .Lock来实现.但是Java中的锁只能保证在同一个JVM进程内中可用,在跨JVM进程,例如分布式系统上则不可靠了. 2.分布式锁 分布式锁,是一种思想,它的实现方式有很多,如基于数据库实现.基于缓存(Redis等)实现.基于Zookeeper实现等等.为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件 互斥性:在任意时刻,只有一个客户端能持有锁. 不会发生死锁:即使客户端

  • Java注解如何基于Redission实现分布式锁

    这篇文章主要介绍了Java注解如何基于Redission实现分布式锁,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.定义注解类 @Target({ ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface DistributedLock { //锁名称 String lockName() default ""; /

  • 基于redis实现分布式锁的原理与方法

    前言 系统的不断扩大,分布式锁是最基本的保障.与单机的多线程不一样的是,分布式跨多个机器.线程的共享变量无法跨机器. 为了保证一个在高并发存场景下只能被同一个线程操作,java并发处理提供ReentrantLock或Synchronized进行互斥控制.但是这仅仅对单机环境有效.我们实现分布式锁大概通过三种方式. redis实现分布式锁 数据库实现分布式锁 zk实现分布式锁 今天我们介绍通过redis实现分布式锁.实际上这三种和java对比看属于一类.都是属于程序外部锁. 原理剖析 上述三种分布

随机推荐