基于redis分布式锁实现秒杀功能

最近在项目中遇到了类似“秒杀”的业务场景,在本篇博客中,我将用一个非常简单的demo,阐述实现所谓“秒杀”的基本思路。

业务场景

所谓秒杀,从业务角度看,是短时间内多个用户“争抢”资源,这里的资源在大部分秒杀场景里是商品;将业务抽象,技术角度看,秒杀就是多个线程对资源进行操作,所以实现秒杀,就必须控制线程对资源的争抢,既要保证高效并发,也要保证操作的正确。

一些可能的实现

刚才提到过,实现秒杀的关键点是控制线程对资源的争抢,根据基本的线程知识,可以不加思索的想到下面的一些方法:
1、秒杀在技术层面的抽象应该就是一个方法,在这个方法里可能的操作是将商品库存-1,将商品加入用户的购物车等等,在不考虑缓存的情况下应该是要操作数据库的。那么最简单直接的实现就是在这个方法上加上synchronized关键字,通俗的讲就是锁住整个方法;
2、锁住整个方法这个策略简单方便,但是似乎有点粗暴。可以稍微优化一下,只锁住秒杀的代码块,比如写数据库的部分;
3、既然有并发问题,那我就让他“不并发”,将所有的线程用一个队列管理起来,使之变成串行操作,自然不会有并发问题。

上面所述的方法都是有效的,但是都不好。为什么?第一和第二种方法本质上是“加锁”,但是锁粒度依然比较高。什么意思?试想一下,如果两个线程同时执行秒杀方法,这两个线程操作的是不同的商品,从业务上讲应该是可以同时进行的,但是如果采用第一二种方法,这两个线程也会去争抢同一个锁,这其实是不必要的。第三种方法也没有解决上面说的问题。

那么如何将锁控制在更细的粒度上呢?可以考虑为每个商品设置一个互斥锁,以和商品ID相关的字符串为唯一标识,这样就可以做到只有争抢同一件商品的线程互斥,不会导致所有的线程互斥。分布式锁恰好可以帮助我们解决这个问题。

何为分布式锁

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

我们来假设一个最简单的秒杀场景:数据库里有一张表,column分别是商品ID,和商品ID对应的库存量,秒杀成功就将此商品库存量-1。现在假设有1000个线程来秒杀两件商品,500个线程秒杀第一个商品,500个线程秒杀第二个商品。我们来根据这个简单的业务场景来解释一下分布式锁。
通常具有秒杀场景的业务系统都比较复杂,承载的业务量非常巨大,并发量也很高。这样的系统往往采用分布式的架构来均衡负载。那么这1000个并发就会是从不同的地方过来,商品库存就是共享的资源,也是这1000个并发争抢的资源,这个时候我们需要将并发互斥管理起来。这就是分布式锁的应用。
而key-value存储系统,如redis,因为其一些特性,是实现分布式锁的重要工具。

具体的实现

先来看看一些redis的基本命令:
SETNX key value
如果key不存在,就设置key对应字符串value。在这种情况下,该命令和SET一样。当key已经存在时,就不做任何操作。SETNX是”SET if Not eXists”。
expire KEY seconds
设置key的过期时间。如果key已过期,将会被自动删除。
del KEY
删除key
由于笔者的实现只用到这三个命令,就只介绍这三个命令,更多的命令以及redis的特性和使用,可以参考redis官网。

需要考虑的问题

1、用什么操作redis?幸亏redis已经提供了jedis客户端用于java应用程序,直接调用jedis API即可。
2、怎么实现加锁?“锁”其实是一个抽象的概念,将这个抽象概念变为具体的东西,就是一个存储在redis里的key-value对,key是于商品ID相关的字符串来唯一标识,value其实并不重要,因为只要这个唯一的key-value存在,就表示这个商品已经上锁。
3、如何释放锁?既然key-value对存在就表示上锁,那么释放锁就自然是在redis里删除key-value对。
4、阻塞还是非阻塞?笔者采用了阻塞式的实现,若线程发现已经上锁,会在特定时间内轮询锁。
5、如何处理异常情况?比如一个线程把一个商品上了锁,但是由于各种原因,没有完成操作(在上面的业务场景里就是没有将库存-1写入数据库),自然没有释放锁,这个情况笔者加入了锁超时机制,利用redis的expire命令为key设置超时时长,过了超时时间redis就会将这个key自动删除,即强制释放锁(可以认为超时释放锁是一个异步操作,由redis完成,应用程序只需要根据系统特点设置超时时间即可)。

talk is cheap,show me the code

在代码实现层面,注解有并发的方法和参数,通过动态代理获取注解的方法和参数,在代理中加锁,执行完被代理的方法后释放锁。

几个注解定义:

cachelock是方法级的注解,用于注解会产生并发问题的方法:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
 String lockedPrefix() default "";//redis 锁key的前缀
 long timeOut() default 2000;//轮询锁的时间
 int expireTime() default 1000;//key在redis里存在的时间,1000S
}

lockedObject是参数级的注解,用于注解商品ID等基本类型的参数:

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedObject {
 //不需要值
}

LockedComplexObject也是参数级的注解,用于注解自定义类型的参数:

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedComplexObject {
 String field() default "";//含有成员变量的复杂对象中需要加锁的成员变量,如一个商品对象的商品ID

}

CacheLockInterceptor实现InvocationHandler接口,在invoke方法中获取注解的方法和参数,在执行注解的方法前加锁,执行被注解的方法后释放锁:

public class CacheLockInterceptor implements InvocationHandler{
 public static int ERROR_COUNT = 0;
 private Object proxied;

 public CacheLockInterceptor(Object proxied) {
 this.proxied = proxied;
 }

 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

 CacheLock cacheLock = method.getAnnotation(CacheLock.class);
 //没有cacheLock注解,pass
 if(null == cacheLock){
  System.out.println("no cacheLock annotation");
  return method.invoke(proxied, args);
 }
 //获得方法中参数的注解
 Annotation[][] annotations = method.getParameterAnnotations();
 //根据获取到的参数注解和参数列表获得加锁的参数
 Object lockedObject = getLockedObject(annotations,args);
 String objectValue = lockedObject.toString();
 //新建一个锁
 RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);
 //加锁
 boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());
 if(!result){//取锁失败
  ERROR_COUNT += 1;
  throw new CacheLockException("get lock fail");

 }
 try{
  //加锁成功,执行方法
  return method.invoke(proxied, args);
 }finally{
  lock.unlock();//释放锁
 }

 }
 /**
 *
 * @param annotations
 * @param args
 * @return
 * @throws CacheLockException
 */
 private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{
 if(null == args || args.length == 0){
  throw new CacheLockException("方法参数为空,没有被锁定的对象");
 }

 if(null == annotations || annotations.length == 0){
  throw new CacheLockException("没有被注解的参数");
 }
 //不支持多个参数加锁,只支持第一个注解为lockedObject或者lockedComplexObject的参数
 int index = -1;//标记参数的位置指针
 for(int i = 0;i < annotations.length;i++){
  for(int j = 0;j < annotations[i].length;j++){
  if(annotations[i][j] instanceof LockedComplexObject){//注解为LockedComplexObject
   index = i;
   try {
   return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field());
   } catch (NoSuchFieldException | SecurityException e) {
   throw new CacheLockException("注解对象中没有该属性" + ((LockedComplexObject)annotations[i][j]).field());
   }
  }

  if(annotations[i][j] instanceof LockedObject){
   index = i;
   break;
  }
  }
  //找到第一个后直接break,不支持多参数加锁
  if(index != -1){
  break;
  }
 }

 if(index == -1){
  throw new CacheLockException("请指定被锁定参数");
 }

 return args[index];
 }
}

最关键的RedisLock类中的lock方法和unlock方法:

/**
 * 加锁
 * 使用方式为:
 * lock();
 * try{
 * executeMethod();
 * }finally{
 * unlock();
 * }
 * @param timeout timeout的时间范围内轮询锁
 * @param expire 设置锁超时时间
 * @return 成功 or 失败
 */
 public boolean lock(long timeout,int expire){
 long nanoTime = System.nanoTime();
 timeout *= MILLI_NANO_TIME;
 try {
  //在timeout的时间范围内不断轮询锁
  while (System.nanoTime() - nanoTime < timeout) {
  //锁不存在的话,设置锁并设置锁过期时间,即加锁
  if (this.redisClient.setnx(this.key, LOCKED) == 1) {
   this.redisClient.expire(key, expire);//设置锁过期时间是为了在没有释放
   //锁的情况下锁过期后消失,不会造成永久阻塞
   this.lock = true;
   return this.lock;
  }
  System.out.println("出现锁等待");
  //短暂休眠,避免可能的活锁
  Thread.sleep(3, RANDOM.nextInt(30));
  }
 } catch (Exception e) {
  throw new RuntimeException("locking error",e);
 }
 return false;
 }

 public void unlock() {
 try {
  if(this.lock){
  redisClient.delKey(key);//直接删除
  }
 } catch (Throwable e) {

 }
 }

上述的代码是框架性的代码,现在来讲解如何使用上面的简单框架来写一个秒杀函数。

先定义一个接口,接口里定义了一个秒杀方法:

public interface SeckillInterface {
/**
*现在暂时只支持在接口方法上注解
*/
 //cacheLock注解可能产生并发的方法
 @CacheLock(lockedPrefix="TEST_PREFIX")
 public void secKill(String userID,@LockedObject Long commidityID);//最简单的秒杀方法,参数是用户ID和商品ID。可能有多个线程争抢一个商品,所以商品ID加上LockedObject注解
}

上述SeckillInterface接口的实现类,即秒杀的具体实现:

public class SecKillImpl implements SeckillInterface{
 static Map<Long, Long> inventory ;
 static{
 inventory = new HashMap<>();
 inventory.put(10000001L, 10000l);
 inventory.put(10000002L, 10000l);
 }

 @Override
 public void secKill(String arg1, Long arg2) {
 //最简单的秒杀,这里仅作为demo示例
 reduceInventory(arg2);
 }
 //模拟秒杀操作,姑且认为一个秒杀就是将库存减一,实际情景要复杂的多
 public Long reduceInventory(Long commodityId){
 inventory.put(commodityId,inventory.get(commodityId) - 1);
 return inventory.get(commodityId);
 }

}

模拟秒杀场景,1000个线程来争抢两个商品:

@Test
 public void testSecKill(){
 int threadCount = 1000;
 int splitPoint = 500;
 CountDownLatch endCount = new CountDownLatch(threadCount);
 CountDownLatch beginCount = new CountDownLatch(1);
 SecKillImpl testClass = new SecKillImpl();

 Thread[] threads = new Thread[threadCount];
 //起500个线程,秒杀第一个商品
 for(int i= 0;i < splitPoint;i++){
  threads[i] = new Thread(new Runnable() {
  public void run() {
   try {
   //等待在一个信号量上,挂起
   beginCount.await();
   //用动态代理的方式调用secKill方法
   SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
    new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
   proxy.secKill("test", commidityId1);
   endCount.countDown();
   } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   }
  }
  });
  threads[i].start();

 }
 //再起500个线程,秒杀第二件商品
 for(int i= splitPoint;i < threadCount;i++){
  threads[i] = new Thread(new Runnable() {
  public void run() {
   try {
   //等待在一个信号量上,挂起
   beginCount.await();
   //用动态代理的方式调用secKill方法
   SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
    new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
   proxy.secKill("test", commidityId2);
   //testClass.testFunc("test", 10000001L);
   endCount.countDown();
   } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   }
  }
  });
  threads[i].start();

 }

 long startTime = System.currentTimeMillis();
 //主线程释放开始信号量,并等待结束信号量,这样做保证1000个线程做到完全同时执行,保证测试的正确性
 beginCount.countDown();

 try {
  //主线程等待结束信号量
  endCount.await();
  //观察秒杀结果是否正确
  System.out.println(SecKillImpl.inventory.get(commidityId1));
  System.out.println(SecKillImpl.inventory.get(commidityId2));
  System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT);
  System.out.println("total cost " + (System.currentTimeMillis() - startTime));
 } catch (InterruptedException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
 }
 }

在正确的预想下,应该每个商品的库存都减少了500,在多次试验后,实际情况符合预想。如果不采用锁机制,会出现库存减少499,498的情况。
这里采用了动态代理的方法,利用注解和反射机制得到分布式锁ID,进行加锁和释放锁操作。当然也可以直接在方法进行这些操作,采用动态代理也是为了能够将锁操作代码集中在代理中,便于维护。
通常秒杀场景发生在web项目中,可以考虑利用spring的AOP特性将锁操作代码置于切面中,当然AOP本质上也是动态代理。

小结

这篇文章从业务场景出发,从抽象到实现阐述了如何利用redis实现分布式锁,完成简单的秒杀功能,也记录了笔者思考的过程,希望能给阅读到本篇文章的人一些启发。

源码仓库

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

您可能感兴趣的文章:

  • redis使用watch秒杀抢购实现思路
  • php+redis实现商城秒杀功能
  • yii框架redis结合php实现秒杀效果(实例代码)
  • Docker + Nodejs + Kafka + Redis + MySQL搭建简单秒杀环境
  • php结合redis实现高并发下的抢购、秒杀功能的实例
(0)

相关推荐

  • Docker + Nodejs + Kafka + Redis + MySQL搭建简单秒杀环境

    秒杀活动可以说在互联网上随处可见,从12306抢票,到聚划算抢购,我们生活的方方面面都可以看到秒杀的身影.秒杀的架构设计也是对于一个架构师架构设计能力的一次考验.本文的目的并不在于提供一个可以直接落地的设计方案,而是意在提供一个简单的方法,一个思路,使大家能够对于秒杀背后的一些设计有更感性的认识, 并且可以自己亲自动手实践一下.所有的配置及源码都在本文最后的GitHub repository中可以找到. 首先,先简单介绍下本文中会涉及到的一些组件,如下图所示: JMeter:用JMeter来模拟

  • yii框架redis结合php实现秒杀效果(实例代码)

    废话不多说了,直接给大家贴代码了,具体代码如下所示: <?php namespace backend\controllers; use Yii; use yii\web\Controller; /** * */ class GoodsController extends Controller { public $enableCsrfValidation=false; public function actionInfo() { $data=yii::$app->db->createCom

  • php结合redis实现高并发下的抢购、秒杀功能的实例

    抢购.秒杀是如今很常见的一个应用场景,主要需要解决的问题有两个: 1 高并发对数据库产生的压力 2 竞争状态下如何解决库存的正确减少("超卖"问题) 对于第一个问题,已经很容易想到用缓存来处理抢购,避免直接操作数据库,例如使用Redis. 重点在于第二个问题 常规写法: 查询出对应商品的库存,看是否大于0,然后执行生成订单等操作,但是在判断库存是否大于0处,如果在高并发下就会有问题,导致库存量出现负数 <?php $conn=mysql_connect("localho

  • redis使用watch秒杀抢购实现思路

    本文实例为大家分享了redis使用watch秒杀抢购的具体代码,供大家参考,具体内容如下 1.使用watch,采用乐观锁 2.不使用悲观锁,因为等待时间非常长,响应慢 3.不使用队列,因为并发量会让队列内存瞬间升高 代码: import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import redis.clients.jedis.Jedis; /** * redis测试抢购 * *

  • php+redis实现商城秒杀功能

    好久没来整理文章了,闲了没事写篇文章记录下php+redis实现商城秒杀功能. 1.安装redis,根据自己的php版本安装对应的redis扩展(此步骤简单的描述一下) 1.1.安装php_igbinary.dll,php_redis.dll扩展此处需要注意你的php版本如图: 1.2.php.ini文件新增extension=php_igbinary.dll;extension=php_redis.dll两处扩展 ok此处已经完成第一步redis环境搭建完成看看phpinfo 2.项目中实际使

  • 基于redis分布式锁实现秒杀功能

    最近在项目中遇到了类似"秒杀"的业务场景,在本篇博客中,我将用一个非常简单的demo,阐述实现所谓"秒杀"的基本思路. 业务场景 所谓秒杀,从业务角度看,是短时间内多个用户"争抢"资源,这里的资源在大部分秒杀场景里是商品:将业务抽象,技术角度看,秒杀就是多个线程对资源进行操作,所以实现秒杀,就必须控制线程对资源的争抢,既要保证高效并发,也要保证操作的正确. 一些可能的实现 刚才提到过,实现秒杀的关键点是控制线程对资源的争抢,根据基本的线程知识,可

  • 基于Redis分布式锁Redisson及SpringBoot集成Redisson

    目录 - 分布式锁需要具备的条件和刚需 - Redisson使用 - SpringBoot集成Redisson - 分布式锁需要具备的条件和刚需 独占性:OnlyOne,任何时刻只能有且仅有一个线程持有 高可用:若redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况 防死锁:杜绝死锁,必须有超时控制机制或者撤销操作,有个兜底终止跳出方案 不乱抢:防止张冠李戴,不能私下unlock别人的锁,只能自己加锁自己释放 重入性:同一个节点的同一个线程如果获得锁之后,它也可以再次获取这

  • Redis分布式锁解决秒杀超卖问题

    目录 分布式锁应用场景 单体锁的分类 分布式锁核心逻辑 分布式锁实现的问题——死锁和解决 Redis解决删除别人锁的问题 分布式锁应用场景 秒杀环境下:订单服务从库存中心拿到库存数,如果库存总数大于0,则进行库存扣减,并创建订单订单服务负责创建订单库存服务负责扣减库存 模拟用户访问库存 多线程并发访问,出现超卖问题,线程不安全.没有保证原子性 单体锁的分类 单体应用锁指的是只能在 一个JVM 进程内有效的锁.我们把这种锁叫做单体应用锁 synchronized锁ReentrantLock锁一个

  • Go 语言下基于Redis分布式锁的实现方式

    分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁. 项目地址: https://github.com/Spongecaptain/redisLock 1. Go 原生的互斥锁 Go 原生的互斥锁即 sync 包下的 M

  • 基于Redis分布式锁的实现代码

    概述 目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题.分布式的CAP理论告诉我们"任何一个分布式系统都无法同时满足一致性(Consistency).可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项."所以,很多系统在设计之初就要对这三者做出取舍.在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证"最终一致性",只要这

  • Python实现的redis分布式锁功能示例

    本文实例讲述了Python实现的redis分布式锁功能.分享给大家供大家参考,具体如下: #!/usr/bin/env python # coding=utf-8 import time import redis class RedisLock(object): def __init__(self, key): self.rdcon = redis.Redis(host='', port=6379, password="", db=1) self._lock = 0 self.lock

  • 基于springboot实现redis分布式锁的方法

    在公司的项目中用到了分布式锁,但只会用却不明白其中的规则 所以写一篇文章来记录 使用场景:交易服务,使用redis分布式锁,防止重复提交订单,出现超卖问题 分布式锁的实现方式 基于数据库乐观锁/悲观锁 Redis分布式锁(本文) Zookeeper分布式锁 redis是如何实现加锁的? 在redis中,有一条命令,实现锁 SETNX key value 该命令的作用是将 key 的值设为 value ,当且仅当 key 不存在.若给定的 key 已经存在,则 SETNX不做任何动作.设置成功,返

  • redis分布式锁的8大坑总结梳理

    目录 前言 1 非原子操作 2 忘了释放锁 3 释放了别人的锁 4 大量失败请求 5 锁重入问题 6 锁竞争问题 6.1 读写锁 6.2 锁分段 7 锁超时问题 8 主从复制的问题 前言 在分布式系统中,由于redis分布式锁相对于更简单和高效,成为了分布式锁的首先,被我们用到了很多实际业务场景当中. 但不是说用了redis分布式锁,就可以高枕无忧了,如果没有用好或者用对,也会引来一些意想不到的问题. 今天我们就一起聊聊redis分布式锁的一些坑,给有需要的朋友一个参考. 1 非原子操作 使用r

  • Redis分布式锁实例分析讲解

    目录 1 一人一单并发安全问题 2 分布式锁的原理和实现 2.1 什么是分布式锁 2.2 分布式锁的实现 1 一人一单并发安全问题 之前一人一单的业务使用的悲观锁,在分布式系统下,是无法生效的. 理想的情况下是这样的:一个线程成功获取互斥锁,并对查询订单并创建订单,其他线程无法干预.它的原理是会有一个锁监视器,来监听是谁获得了锁. 但是问题就出现在: 分布式系统下,有多个不同的JVM,不同的JVM的环境下,锁监听器是有多个的,就会出现有的线程在别的线程已经拿到锁的情况下,仍然可以获取的到锁. 这

  • 深入理解redis分布式锁和消息队列

    最近博主在看redis的时候发现了两种redis使用方式,与之前redis作为缓存不同,利用的是redis可设置key的有效时间和redis的BRPOP命令. 分布式锁 由于目前一些编程语言,如PHP等,不能在内存中使用锁,或者如Java这样的,需要一下更为简单的锁校验的时候,redis分布式锁的使用就足够满足了. redis的分布式锁其实就是基于setnx方法和redis对key可设置有效时间的功能来实现的.基本用法比较简单. public boolean tryLock(String loc

随机推荐