Springboot整合Redis实现超卖问题还原和流程分析(分布式锁)

目录
  • 超卖简单代码
  • 超卖问题
    • 单服务器单应用情况下
    • 设置synchronized
  • Redis实现分布式锁
    • 通过超时间解决上述问题
    • 通过key设置值匹配的方式解决形同虚设问题
  • 最终版

超卖简单代码

写一段简单正常的超卖逻辑代码,多个用户同时操作同一段数据,探究出现的问题。

Redis中存储一项数据信息,请求对应接口,获取商品数量信息;
商品数量信息如果大于0,则扣减1,重新存储Redis中;
运行代码测试问题。

/**
 * Redis数据库操作,超卖问题模拟
 * @author
 *
 */
@RestController
public class RedisController {

	// 引入String类型redis操作模板
	@Autowired
	private StringRedisTemplate stringRedisTemplate;

	// 测试数据设置接口
	@RequestMapping("/setStock")
	public String setStock() {
		stringRedisTemplate.opsForValue().set("stock", "100");
		return "ok";
	}

	// 模拟商品超卖代码
	@RequestMapping("/deductStock")
	public String deductStock() {
		// 获取Redis数据库中的商品数量
		Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
		// 减库存
		if(stock > 0) {
			int realStock = stock -1;
			stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
			System.out.println("商品扣减成功,剩余商品:"+realStock);
		}else {
			System.out.println("库存不足.....");
		}
		return "end";
	}
}

超卖问题

单服务器单应用情况下

在单应用模式下,使用jmeter压测。

测试结果:

每个请求相当于一个线程,当几个线程同时拿到数据时,线程A拿到库存为84,这个时候线程B也进入程序,并且抢占了CPU,访问库存为84,最后两个线程都对库存减一,导致最后修改为83,实际上多卖出去了一件

既然线程和线程之间,数据处理不一致,能否使用synchronized加锁测试?

设置synchronized

依旧还是先测试单服务器

    // 模拟商品超卖代码,
	// 设置synchronized同步锁
	@RequestMapping("/deductStock1")
	public String deductStock1() {
		synchronized (this) {
			// 获取Redis数据库中的商品数量
			Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
			// 减库存
			if(stock > 0) {
				int realStock = stock -1;
				stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
				System.out.println("商品扣减成功,剩余商品:"+realStock);
			}else {
				System.out.println("库存不足.....");
			}
		}
		return "end";
	}

数量100

重新压测,得到的日志信息如下所示:

在单机模式下,添加synchronized关键字,的确能够避免商品的超卖现象!

但是在分布式微服务中,针对该服务设置了集群,synchronized依旧还能保证数据的正确性吗?

假设多个请求,被注册中心负载均衡,每个微服务中的该处理接口,都添加有synchronized,

依然会出现类似的超卖问题:

synchronized只是针对单一服务器JVM进行加锁,但是分布式是很多个不同的服务器,导致两个线程或多个在不同服务器上共同对商品数量信息做了操作!


Redis实现分布式锁

在Redis中存在一条命令setnx (set if not exists)

setnx key value
如果不存在key,则可以设置成功;否则设置失败。

修改处理接口,增加key

// 模拟商品超卖代码
	@RequestMapping("/deductStock2")
	public String deductStock2() {
		// 创建一个key,保存至redis
		String key = "lock";
		// setnx
		// 由于redis是一个单线程,执行命令采取“队列”形式排队!
		// 优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败。
		boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock");
		// 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false
		if (!result) {
			// 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!
			return "err";
		}

		// 获取Redis数据库中的商品数量
		Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
		// 减库存
		if(stock > 0) {
			int realStock = stock -1;
			stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
			System.out.println("商品扣减成功,剩余商品:"+realStock);
		}else {
			System.out.println("库存不足.....");
		}

        // 程序执行完成,则删除这个key
		stringRedisTemplate.delete(key);

		return "end";
	}

1、请求进入接口中,如果redis中不存在key,则会新建一个setnx;如果存在,则不会新建,同时返回错误编码,不会继续执行抢购逻辑。
2、当创建成功后,执行抢购逻辑。
3、抢购逻辑执行完成后,删除数据库中对应的setnxkey。让其他请求能够设置并操作。

这种逻辑来说比之前单一使用syn合理的多,但是如果执行抢购操作中出现了异常,导致这个key无法被删除。以至于其他处理请求,一直无法拿到key,程序逻辑死锁!

可以采取try … finally进行操作

/**
	 * 模拟商品超卖代码 设置
	 *
	 * @return
	 */
	@RequestMapping("/deductStock3")
	public String deductStock3() {
		// 创建一个key,保存至redis
		String key = "lock";
		// setnx
		// 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败
		boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock");
		// 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false
		if (!result) {
			// 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!
			return "err";
		}

		try {
			// 获取Redis数据库中的商品数量
			Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
			// 减库存
			if (stock > 0) {
				int realStock = stock - 1;
				stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
				System.out.println("商品扣减成功,剩余商品:" + realStock);
			} else {
				System.out.println("库存不足.....");
			}
		} finally {
			// 程序执行完成,则删除这个key
			// 放置于finally中,保证即使上述逻辑出问题,也能del掉
			stringRedisTemplate.delete(key);
		}

		return "end";
	}

这个逻辑相比上面其他的逻辑来说,显得更加的严谨。

但是,如果一套服务器,因为断电、系统崩溃等原因出现宕机,导致本该执行finally中的语句未成功执行完成!!同样出现key一直存在,导致死锁

通过超时间解决上述问题

在设置成功setnx后,以及抢购代码逻辑执行前,增加key的限时。

/**
	 * 模拟商品超卖代码 设置setnx保证分布式环境下,数据处理安全行问题;<br>
	 * 但如果某个代码段执行异常,导致key无法清理,出现死锁,添加try...finally;<br>
	 * 如果某个服务因某些问题导致释放key不能执行,导致死锁,此时解决思路为:增加key的有效时间;<br>
	 * 为了保证设置key的值和设置key的有效时间,两条命令构成同一条原子命令,将下列逻辑换成其他代码。
	 *
	 * @return
	 */
	@RequestMapping("/deductStock4")
	public String deductStock4() {
		// 创建一个key,保存至redis
		String key = "lock";
		// setnx
		// 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败
		//boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock");

		//让设置key和设置key的有效时间都可以同时执行
		boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock", 10, TimeUnit.SECONDS);

		// 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false
		if (!result) {
			// 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!
			return "err";
		}
		// 设置key有效时间
		//stringRedisTemplate.expire(key, 10, TimeUnit.SECONDS);

		try {
			// 获取Redis数据库中的商品数量
			Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
			// 减库存
			if (stock > 0) {
				int realStock = stock - 1;
				stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
				System.out.println("商品扣减成功,剩余商品:" + realStock);
			} else {
				System.out.println("库存不足.....");
			}
		} finally {
			// 程序执行完成,则删除这个key
			// 放置于finally中,保证即使上述逻辑出问题,也能del掉
			stringRedisTemplate.delete(key);
		}

		return "end";
	}

但是上述代码的逻辑中依旧会有问题:

如果处理逻辑中,出现超时问题。
当逻辑执行时,时间超过设定key有效时间,此时会出现什么问题?

从上图可以清楚的发现问题:
如果一个请求执行时间超过了key的有效时间。
新的请求执行过来时,必然可以拿到key并设置时间;
此时的redis中保存的key并不是请求1的key,而是别的请求设置的。
当请求1执行完成后,此处删除key,删除的是别的请求设置的key!

依然出现了key形同虚设的问题!如果失效一直存在,超卖问题依旧不会解决。

通过key设置值匹配的方式解决形同虚设问题

既然出现key形同虚设的现象,是否可以增加条件,当finally中需要执行删除操作时,获取数据判断值是否是该请求中对应的,如果是则删除,不是则不管!

修改上述代码如下所示:

/**
	 * 模拟商品超卖代码 <br>
	 * 解决`deductStock6`中,key形同虚设的问题。
	 *
	 * @return
	 */
	@RequestMapping("/deductStock5")
	public String deductStock5() {
		// 创建一个key,保存至redis
		String key = "lock";
		String lock_value = UUID.randomUUID().toString();
		// setnx
		//让设置key和设置key的有效时间都可以同时执行
		boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, lock_value, 10, TimeUnit.SECONDS);
		// 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false
		if (!result) {
			// 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!
			return "err";
		}
		try {
			// 获取Redis数据库中的商品数量
			Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
			// 减库存
			if (stock > 0) {
				int realStock = stock - 1;
				stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
				System.out.println("商品扣减成功,剩余商品:" + realStock);
			} else {
				System.out.println("库存不足.....");
			}
		} finally {
			// 程序执行完成,则删除这个key
			// 放置于finally中,保证即使上述逻辑出问题,也能del掉

			// 判断redis中该数据是否是这个接口处理时的设置的,如果是则删除
			if(lock_value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(key))) {
				stringRedisTemplate.delete(key);
			}
		}
		return "end";
	}

由于获得锁的线程必须执行完减库存逻辑才能释放锁,所以在此期间所有其他的线程都会由于没获得锁,而直接结束程序,导致有很多库存根本没有卖出去,所以这里应该可以优化,让没获得锁的线程等待,或者循环检查锁


最终版

我们将锁封装到一个实体类中,然后加入两个方法,加锁和解锁

@Component
public class RedisLock {
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    private final long acquireTimeout = 10*1000;    // 获取锁之前的超时时间(获取锁的等待重试时间)
    private final int timeOut = 20;   // 获取锁之后的超时时间(防止死锁)

    @Autowired
    private StringRedisTemplate stringRedisTemplate;  // 引入String类型redis操作模板

    /**
     * 获取分布式锁
     * @return 锁标识
     */
    public boolean getRedisLock(String lockName,String lockValue) {
        // 1.计算获取锁的时间
        Long endTime = System.currentTimeMillis() + acquireTimeout;
        // 2.尝试获取锁
        while (System.currentTimeMillis() < endTime) {
            //3. 获取锁成功就设置过期时间 让设置key和设置key的有效时间都可以同时执行
            boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockName, lockValue, timeOut, TimeUnit.SECONDS);
            if (result) {
                return true;
            }
        }
        return false;
    }

    /**
     * 释放分布式锁
     * @param lockName 锁名称
     * @param lockValue 锁值
     */
    public void unRedisLock(String lockName,String lockValue) {
        if(lockValue.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(lockName))) {
            stringRedisTemplate.delete(lockName);
        }
    }
}
@RestController
public class RedisController {

	// 引入String类型redis操作模板
	@Autowired
	private StringRedisTemplate stringRedisTemplate;
	@Autowired
	private RedisLock redisLock;

	@RequestMapping("/setStock")
	public String setStock() {
		stringRedisTemplate.opsForValue().set("stock", "100");
		return "ok";
	}

	@RequestMapping("/deductStock")
	public String deductStock() {
		// 创建一个key,保存至redis
		String key = "lock";
		String lock_value = UUID.randomUUID().toString();
		try {
			boolean redisLock = this.redisLock.getRedisLock(key, lock_value);//获取锁
			if (redisLock)
			{
				// 获取Redis数据库中的商品数量
				Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
				// 减库存
				if (stock > 0) {
					int realStock = stock - 1;
					stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
					System.out.println("商品扣减成功,剩余商品:" + realStock);
				} else {
					System.out.println("库存不足.....");
				}
			}
		} finally {
			redisLock.unRedisLock(key,lock_value);   //释放锁
		}
		return "end";
	}
}

可以看到失败的线程不会直接结束,而是会尝试重试,一直到重试结束时间,才会结束



实际上这个最终版依然存在3个问题

1、在finally流程中,由于是先判断在处理。如果判断条件结束后,获取到的结果为true。但是在执行del操作前,此时jvm在执行GC操作(为了保证GC操作获取GC roots根完全,会暂停java程序),导致程序暂停。GC操作执行完成后(暂停恢复后),执行del操作,但是此时的key还在当前加锁的key么?

2、问题如图所示

到此这篇关于Springboot整合Redis实现超卖问题还原和分析的文章就介绍到这了,更多相关Springboot整合Redis内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springboot整合redis进行数据操作(推荐)

    redis是一种常见的nosql,日常开发中,我们使用它的频率比较高,因为它的多种数据接口,很多场景中我们都可以用到,并且redis对分布式这块做的非常好. springboot整合redis比较简单,并且使用redistemplate可以让我们更加方便的对数据进行操作. 1.添加依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starte

  • springboot整合redis集群过程解析

    简介 在springboot使用搭建好的redis集群 添加redis和连接池依赖 <!--redis连接池 start--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <!--redis连接池 end--> <!--redis start-

  • SpringBoot 整合Redis 数据库的方法

    Redis简介 Redis(官网: https://redis.io )是一个基于内存的日志型可持久化的缓存数据库,保存形式为key-value格式,Redis完全免费开源,它使用ANSI C语言编写.与其他的key - value缓存产品一样,Redis具有以下三个特点. • Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用: • Redis不仅支持简单的key-value类型数据,同时还提供字符串.链表.集合.有序集合和哈希等数据结构的存储: • R

  • SpringBoot 整合 Lettuce Redis的实现方法

    SpringBoot 是为了简化 Spring 应用的创建.运行.调试.部署等一系列问题而诞生的产物,自动装配的特性让我们可以更好的关注业务本身而不是外部的XML配置,我们只需遵循规范,引入相关的依赖就可以轻易的搭建出一个 WEB 工程 Spring Boot 除了支持常见的ORM框架外,更是对常用的中间件提供了非常好封装,随着Spring Boot2.x的到来,支持的组件越来越丰富,也越来越成熟,其中对Redis的支持不仅仅是丰富了它的API,更是替换掉底层Jedis的依赖,取而代之换成了Le

  • springboot与redis的简单整合实例

    前言 Redis是一个缓存.消息代理和功能丰富的键值存储.StringBoot提供了基本的自动配置.本文记录一下springboot与redis的简单整合实例 官方文档:https://docs.spring.io/spring-boot/docs/2.1.0.RELEASE/reference/htmlsingle/ 前期准备 首先我们要有一个Redis服务,由于我没有Linux环境,为了方便搭建项目,直接在Windows下安装,参考这篇博客:Windows下安装Redis服务 安装步骤:一直

  • Springboot整合Redis实现超卖问题还原和流程分析(分布式锁)

    目录 超卖简单代码 超卖问题 单服务器单应用情况下 设置synchronized Redis实现分布式锁 通过超时间解决上述问题 通过key设置值匹配的方式解决形同虚设问题 最终版 超卖简单代码 写一段简单正常的超卖逻辑代码,多个用户同时操作同一段数据,探究出现的问题. Redis中存储一项数据信息,请求对应接口,获取商品数量信息: 商品数量信息如果大于0,则扣减1,重新存储Redis中: 运行代码测试问题. /** * Redis数据库操作,超卖问题模拟 * @author * */ @Res

  • SpringBoot整合Redis实现常用功能超详细过程

    目录 1 登陆功能 1.1 基于Session实现登录流程 1.1.1 session共享问题 1.2 Redis替代Session 1.2.1.设计key的结构 1.2.2.设计key的具体细节 1.2.3.整体访问流程 2 缓存功能 2.1 什么是缓存? 2.1.1 为什么要使用缓存 2.1.2 如何使用缓存 2.2.使用缓存 2.2.1 .缓存模型和思路 2.3 缓存更新策略 2.3.1 .数据库缓存不一致解决方案: 2.3.2 .数据库和缓存不一致采用什么方案 2.4 缓存穿透问题的解决

  • SpringBoot整合Redis、ApachSolr和SpringSession的示例

    本文介绍了SpringBoot整合Redis.ApachSolr和SpringSession,分享给大家,具体如下: 一.简介 SpringBoot自从问世以来,以其方便的配置受到了广大开发者的青睐.它提供了各种starter简化很多繁琐的配置.SpringBoot整合Druid.Mybatis已经司空见惯,在这里就不详细介绍了.今天我们要介绍的是使用SpringBoot整合Redis.ApacheSolr和SpringSession. 二.SpringBoot整合Redis Redis是大家比

  • SpringBoot整合Redis正确的实现分布式锁的示例代码

    前言 最近在做分块上传的业务,使用到了Redis来维护上传过程中的分块编号. 每上传完成一个分块就获取一下文件的分块集合,加入新上传的编号,手动接口测试下是没有问题的,前端通过并发上传调用就出现问题了,并发的get再set,就会存在覆盖写现象,导致最后的分块数据不对,不能触发分块合并请求. 遇到并发二话不说先上锁,针对执行代码块加了一个JVM锁之后问题就解决了. 仔细一想还是不太对,项目是分布式部署的,做了负载均衡,一个节点的代码被锁住了,请求轮询到其他节点还是可以进行覆盖写,并没有解决到问题啊

  • Springboot整合Redis最简单例子分享

    1. 编写目的 最简单的例子,Springboot整合Redis. 2. 详细过程 pom 文件添加依赖 <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spr

  • SpringBoot整合Redis的示例

    redis是最常用的缓存数据库,常用于存储用户登录token.临时数据.定时相关数据等. redis是单线程的,所以redis的操作是原子性的,这样可以保证不会出现并发问题. redis基于内存,速度非常快,据测试,redis读的速度是110000次/s,写的速度是81000次/s 本节介绍SpringBoot引入redis,以及使用RedisTemplate来操作redis数据. 采用SpringBoot 2.1.9.RELEASE,对应示例代码在:https://github.com/lao

  • SpringBoot整合Redis的步骤

    1.添加配置文件: Redis.properties # 配置单台redis服务器 redis.host=192.168.126.129 ip地址 redis.port=6379 端口号 设置配置类: RedisConfig 2.将对象转化为Json格式入门案例 API: MAPPER.writeValueAsString(itemDesc); 3.将对象转化为Json格式格式优化 4.AOP实现Redis缓存 AOP实现Redis @cacheFind()注解 实现策略: A.需要自定义注解C

  • springboot整合redis修改分区的操作流程

    springboot整合redis修改分区 问题由来 最近使用springboot整合redis,一个系统动态数据源连接不同数据库,缓存使用的redis,那么就需要将不同数据库的数据缓存到redis不同的分区,也就是不同的库中. 老版解决 这里的老版指的是2.0之前的,我使用的1.5.9是ok的. redis的配置类这里就不贴了,网上很多. 1.使用JedisConnectionFactory修改 @Autowired JedisConnectionFactory jedisConnection

随机推荐