利用redis实现分布式锁,快速解决高并发时的线程安全问题

实际工作中,经常会遇到多线程并发时的类似抢购的功能,本篇描述一个简单的redis分布式锁实现的多线程抢票功能。

直接上代码。首先按照慣例,给出一个错误的示范:

我们可以看看,当20个线程一起来抢10张票的时候,会发生什么事。

package com.tiger.utils;
public class TestMutilThread {

	// 总票量
	public static int count = 10;
	public static void main(String[] args) {
		statrtMulti();
	}

	public static void statrtMulti() {
		for (int i = 1; i <= 20; i++) {
			TicketRunnable tickrunner = new TicketRunnable();
			Thread thread = new Thread(tickrunner, "Thread No: " + i);
			thread.start();
		}
	}

	public static class TicketRunnable implements Runnable {

		@Override
		public void run() {
			System.out.println(Thread.currentThread().getName() + " start "
					+ count);
			// TODO Auto-generated method stub
			// logger.info(Thread.currentThread().getName()
			// + " really start" + count);
			if (count <= 0) {
				System.out.println(Thread.currentThread().getName()
						+ " ticket sold out ! No tickets remained!" + count);
				return;
			} else {
				count = count - 1;
				System.out.println(Thread.currentThread().getName()
						+ " bought a ticket,now remaining :" + (count));
			}
		}
	}
}

测试结果,从结果可以看到,票数在不同的线程中已经出现混乱。

Thread No: 2 start 10
Thread No: 6 start 10
Thread No: 4 start 10
Thread No: 5 start 10
Thread No: 3 start 10
Thread No: 9 start 6
Thread No: 1 start 10
Thread No: 1 bought a ticket,now remaining :3
Thread No: 9 bought a ticket,now remaining :4
Thread No: 3 bought a ticket,now remaining :5
Thread No: 12 start 3
Thread No: 5 bought a ticket,now remaining :6
Thread No: 4 bought a ticket,now remaining :7
Thread No: 8 start 7
Thread No: 7 start 8
Thread No: 12 bought a ticket,now remaining :1
Thread No: 14 start 0
Thread No: 6 bought a ticket,now remaining :8
Thread No: 16 start 0
Thread No: 2 bought a ticket,now remaining :9
Thread No: 16 ticket sold out ! No tickets remained!0
Thread No: 14 ticket sold out ! No tickets remained!0
Thread No: 18 start 0
Thread No: 18 ticket sold out ! No tickets remained!0
Thread No: 7 bought a ticket,now remaining :0
Thread No: 15 start 0
Thread No: 8 bought a ticket,now remaining :1
Thread No: 13 start 2
Thread No: 19 start 0
Thread No: 11 start 3
Thread No: 11 ticket sold out ! No tickets remained!0
Thread No: 10 start 3
Thread No: 10 ticket sold out ! No tickets remained!0
Thread No: 19 ticket sold out ! No tickets remained!0
Thread No: 13 ticket sold out ! No tickets remained!0
Thread No: 20 start 0
Thread No: 20 ticket sold out ! No tickets remained!0
Thread No: 15 ticket sold out ! No tickets remained!0
Thread No: 17 start 0
Thread No: 17 ticket sold out ! No tickets remained!0

为了解决多线程时出现的混乱问题,这里給出真正的测试类!!!

真正的测试类,这里启动20个线程,来抢10张票。

RedisTemplate 是用来实现redis操作的,由spring进行集成。这里是使用到了RedisTemplate,所以我以构造器的形式在外部将RedisTemplate传入到测试类中。

MultiTestLock 是用来实现加锁的工具类。

总票数使用volatile关键字,实现多线程时变量在系统内存中的可见性,这点可以去了解下volatile关键字的作用。

TicketRunnable用于模拟抢票功能。

其中由于lock与unlock之间存在if判断,为保证线程安全,这里使用synchronized来保证。

测试类:

package com.tiger.utils;
import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
public class MultiConsumer {
	Logger logger=LoggerFactory.getLogger(MultiTestLock.class);
	private RedisTemplate<Serializable, Serializable> redisTemplate;
	public MultiTestLock lock;
	//总票量
	public volatile static int count = 10;

	public void statrtMulti() {
		lock = new MultiTestLock(redisTemplate);
		for (int i = 1; i <= 20; i++) {
			TicketRunnable tickrunner = new TicketRunnable();
			Thread thread = new Thread(tickrunner, "Thread No: " + i);
			thread.start();
			}
	}

	public class TicketRunnable implements Runnable {

		@Override
		public void run() {
			logger.info(Thread.currentThread().getName() + " start "
					+ count);
			// TODO Auto-generated method stub
			if (count > 0) {
//				logger.info(Thread.currentThread().getName()
//						+ " really start" + count);
				lock.lock();
				synchronized (this) {
					if(count<=0){
						logger.info(Thread.currentThread().getName()
								+ " ticket sold out ! No tickets remained!" + count);
						lock.unlock();
						return;
					}else{
						count=count-1;
						logger.info(Thread.currentThread().getName()
								+ " bought a ticket,now remaining :" + (count));
					}
				}
				lock.unlock();
			}else{
				logger.info(Thread.currentThread().getName()
						+ " ticket sold out !" + count);
			}
		}
	}

	public RedisTemplate<Serializable, Serializable> getRedisTemplate() {
		return redisTemplate;
	}

	public void setRedisTemplate(
			RedisTemplate<Serializable, Serializable> redisTemplate) {
		this.redisTemplate = redisTemplate;
	}

	public MultiConsumer(RedisTemplate<Serializable, Serializable> redisTemplate) {
		super();
		this.redisTemplate = redisTemplate;
	}
}

Lock工具类:

我们知道为保证线程安全,程序中执行的操作必须时原子的。redis后续的版本中可以使用set key同时设置expire超时时间。

想起上次去 电信翼支付 面试时,面试官问过一个问题:分布式锁如何防止死锁,问题关键在于我们在分布式中进行加锁操作时成功了,但是后续业务操作完毕执行解锁时出现失败。导致分布式锁无法释放。出现死锁,后续的加锁无法正常进行。所以这里设置expire超时时间的目的就是防止出现解锁失败的情况,这样,即使解锁失败了,分布式锁依然会在超时时间过了之后自动释放。

具体在代码中也有注释,也可以作为参考。

package com.tiger.utils;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import javax.sound.midi.MidiDevice.Info;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.script.RedisScript; 

public class MultiTestLock implements Lock {
	Logger logger=LoggerFactory.getLogger(MultiTestLock.class);
	private RedisTemplate<Serializable, Serializable> redisTemplate;
	public MultiTestLock(RedisTemplate<Serializable, Serializable> redisTemplate) {
		super();
		this.redisTemplate = redisTemplate;
	}

	@Override
	public void lock() {
		//这里使用while循环强制线程进来之后先进行抢锁操作。只有抢到锁才能进行后续操作
		while(true){
			if(tryLock()){
				try {
					//这里让线程睡500毫秒的目的是为了模拟业务耗时,确保业务结束时之前设置的值正好打到超时时间,
					//实际生产中可能有偏差,这里需要经验
					Thread.sleep(500l);
//					logger.info(Thread.currentThread().getName()+" time to awake");
					return;
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}else{
				try {
					//这里设置一个随机毫秒的sleep目的时降低while循环的频率
					Thread.sleep(new Random().nextInt(200)+100);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}

	@Override
	public boolean tryLock() {
		//这里也可以选用transactionSupport支持事务操作
		SessionCallback<Object> sessionCallback=new SessionCallback<Object>() {
			@Override
			public Object execute(RedisOperations operations)
					throws DataAccessException {
				operations.multi();
				operations.opsForValue().setIfAbsent("secret", "answer");
				//设置超时时间要根据业务实际的可能处理时间来,是一个经验值
				operations.expire("secret", 500l, TimeUnit.MILLISECONDS);
				Object object=operations.exec();
				return object;
			}
		};
		//执行两部操作,这里会拿到一个数组值 [true,true],分别对应上述两部操作的结果,如果中途出现第一次为false则表明第一步set值出错
		List<Boolean> result=(List) redisTemplate.execute(sessionCallback);
//		logger.info(Thread.currentThread().getName()+" try lock "+ result);
		if(true==result.get(0)||"true".equals(result.get(0)+"")){
			logger.info(Thread.currentThread().getName()+" try lock success");
			return true;
		}else{
			return false;
		}
	}

	@Override
	public boolean tryLock(long arg0, TimeUnit arg1)
			throws InterruptedException {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public void unlock() {
		//unlock操作直接删除锁,如果执行完还没有达到超时时间则直接删除,让后续的线程进行继续操作。起到补刀的作用,确保锁已经超时或被删除
		SessionCallback<Object> sessionCallback=new SessionCallback<Object>() {
			@Override
			public Object execute(RedisOperations operations)
					throws DataAccessException {
				operations.multi();
				operations.delete("secret");
				Object object=operations.exec();
				return object;
			}
		};
		Object result=redisTemplate.execute(sessionCallback);
	} 

	@Override
	public void lockInterruptibly() throws InterruptedException {
		// TODO Auto-generated method stub
	}

	@Override
	public Condition newCondition() {
		// TODO Auto-generated method stub
		return null;
	}

	public RedisTemplate<Serializable, Serializable> getRedisTemplate() {
		return redisTemplate;
	}

	public void setRedisTemplate(
			RedisTemplate<Serializable, Serializable> redisTemplate) {
		this.redisTemplate = redisTemplate;
	}
}

执行结果

可以看到,票数稳步减少,后续没有抢到锁的线程余票为0,无票可抢。

tips:

这其中也出现了一个问题,redis进行多部封装操作时,系统报错:ERR EXEC without MULTI

后经过查阅发现问题出在:

在spring中,多次执行MULTI命令不会报错,因为第一次执行时,会将其内部的一个isInMulti变量设为true,后续每次执行命令是都会检查这个变量,如果为true,则不执行命令。

而多次执行EXEC命令则会报开头说的"ERR EXEC without MULTI"错误。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。如有错误或未考虑完全的地方,望不吝赐教。

(0)

相关推荐

  • redis分布式锁及会出现的问题解决

    一.redis实现分布式锁的主要原理: 1.加锁 最简单的方法是使用setnx命令.key是锁的唯一标识,按业务来决定命名.比如想要给一种商品的秒杀活动加锁,可以给key命名为 "lock_sale_商品ID" .而value设置成什么呢?我们可以姑且设置成1.加锁的伪代码如下: setnx(key,1) 当一个线程执行setnx返回1,说明key原本不存在,该线程成功得到了锁:当一个线程执行setnx返回0,说明key已经存在,该线程抢锁失败. 2.解锁 有加锁就得有解锁.当得到锁的

  • redis分布式锁的问题与解决方法

    分布式锁 在分布式环境中,为了保证业务数据的正常访问,防止出现重复请求的问题,会使用分布式锁来阻拦后续请求.我们先写一段有问题的业务代码: public void doSomething(String userId){ User user=getUser(userId); if(user==null){ user.setUserName("xxxxx"); user.setUserId(userId); insert(user); return; } update(user); } 上

  • 如何利用Redis分布式锁实现控制并发操作

    redis命令解释 说道Redis的分布式锁都是通过setNx命令结合getset来实现的,在讲之前我们先了解下setNx和getset的意思,在redis官网是这样解释的 注:redis的命令都是原子操作 SETNX key value 将 key 的值设为 value ,当且仅当 key 不存在. 若给定的 key 已经存在,则 SETNX 不做任何动作. SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写. 可用版本: 1.0.0+ 时间复杂度: O(1)

  • 利用redis实现分布式锁,快速解决高并发时的线程安全问题

    实际工作中,经常会遇到多线程并发时的类似抢购的功能,本篇描述一个简单的redis分布式锁实现的多线程抢票功能. 直接上代码.首先按照慣例,给出一个错误的示范: 我们可以看看,当20个线程一起来抢10张票的时候,会发生什么事. package com.tiger.utils; public class TestMutilThread { // 总票量 public static int count = 10; public static void main(String[] args) { sta

  • Redis锁完美解决高并发秒杀问题

    目录 1 单机环境下的锁 2 分布式情况下使用Redis锁. 3 一台服务宕机,导致无法释放锁 4 给每一把锁加上过期时间 5延长锁的过期时间,解决锁失效 6 使用Redisson简化代码 场景:一家网上商城做商品限量秒杀. 1 单机环境下的锁 将商品的数量存到Redis中.每个用户抢购前都需要到Redis中查询商品数量(代替mysql数据库.不考虑事务),如果商品数量大于0,则证明商品有库存.然后我们在进行库存扣减和接下来的操作.因为多线程并发问题,我们不得不在get()方法内部使用同步代码块

  • 使用Redis解决高并发方案及思路解读

    目录 NoSQL Redis 痛点 思路 分布式锁 锁续命 扩展 结语 NoSQL Not Only SQL的简称.NoSQL是解决传统的RDBMS在应对某些问题时比较乏力而提出的. 即非关系型数据库,它们不保证关系数据的ACID特性,数据之间一般没有关联,在扩展上就非常容易实现,并且拥有较高的性能. Redis redis是nosql的典型代表,也是目前互联网公司的必用技术. redis是键值(Key-Value)存储数据库,主要会使用到哈希表.大多数时候是直接以缓存的形式被使用,使得请求不直

  • PHP利用Mysql锁解决高并发的方法

    前面写过利用文件锁来处理高并发的问题的,现在我们说另外一个处理方式,利用Mysql的锁来解决高并发的问题 先看没有利用事务的时候并发的后果 创建库存管理表 CREATE TABLE `storage` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `number` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=l

  • SpringBoot中使用redis做分布式锁的方法

    一.模拟问题 最近在公司遇到一个问题,挂号系统是做的集群,比如启动了两个相同的服务,病人挂号的时候可能会出现同号的情况,比如两个病人挂出来的号都是上午2号.这就出现了问题,由于是集群部署的,所以单纯在代码中的方法中加锁是不能解决这种情况的.下面我将模拟这种情况,用redis做分布式锁来解决这个问题. 1.新建挂号明细表 2.在idea上新建项目 下图是创建好的项目结构,上面那个parent项目是其他项目不用管它,和新建的没有关系 3.开始创建controller,service,dao(mapp

  • 如何利用Redis锁解决高并发问题详解

    redis技术的使用: redis真的是一个很好的技术,它可以很好的在一定程度上解决网站一瞬间的并发量,例如商品抢购秒杀等活动... redis之所以能解决高并发的原因是它可以直接访问内存,而以往我们用的是数据库(硬盘),提高了访问效率,解决了数据库服务器压力. 为什么redis的地位越来越高,我们为何不选择memcache,这是因为memcache只能存储字符串,而redis存储类型很丰富(例如有字符串.LIST.SET等),memcache每个值最大只能存储1M,存储资源非常有限,十分消耗内

  • 基于Redis实现分布式锁以及任务队列

    一.前言 双十一刚过不久,大家都知道在天猫.京东.苏宁等等电商网站上有很多秒杀活动,例如在某一个时刻抢购一个原价1999现在秒杀价只要999的手机时,会迎来一个用户请求的高峰期,可能会有几十万几百万的并发量,来抢这个手机,在高并发的情形下会对数据库服务器或者是文件服务器应用服务器造成巨大的压力,严重时说不定就宕机了,另一个问题是,秒杀的东西都是有量的,例如一款手机只有10台的量秒杀,那么,在高并发的情况下,成千上万条数据更新数据库(例如10台的量被人抢一台就会在数据集某些记录下 减1),那次这个

  • Redis实现分布式锁的实例讲解

    在一个分布式系统中,会遇到一些需要对多个节点共享的资源加锁的情况,这个时候需要用到分布式锁.分布式锁通常保存在一个共享的存储系统中,可以被多个节点共享和访问. 锁的本质 简单来讲,锁可以用一个变量来表示.比如,在一个单机多线程的程序来说,某个资源的锁用一个 bit 的数据就可以表示.即 0 表示没有资源可以访问,1 表示资源的锁已被别的线程获取,不能访问. 获取和释放特定资源的锁,本质上就是为获取和修改这个变量的值.如果值是 0 则将其修改为 1,就完成了获取的过程,如果访问到的值不是 0,则获

  • Redis实现分布式锁的方法示例

    之前我们使用的定时任务都是只部署在了单台机器上,为了解决单点的问题,为了保证一个任务,只被一台机器执行,就需要考虑锁的问题,于是就花时间研究了这个问题.到底怎样实现一个分布式锁呢? 锁的本质就是互斥,保证任何时候能有一个客户端持有同一个锁,如果考虑使用redis来实现一个分布式锁,最简单的方案就是在实例里面创建一个键值,释放锁的时候,将键值删除.但是一个可靠完善的分布式锁需要考虑的细节比较多,我们就来看看如何写一个正确的分布式锁. 单机版分布式锁 SETNX 所以我们直接基于 redis 的 s

  • Redis实现分布式锁的几种方法总结

    Redis实现分布式锁的几种方法总结 分布式锁是控制分布式系统之间同步访问共享资源的一种方式.在分布式系统中,常常需要协调他们的动作.如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁. 我们来假设一个最简单的秒杀场景:数据库里有一张表,column分别是商品ID,和商品ID对应的库存量,秒杀成功就将此商品库存量-1.现在假设有1000个线程来秒杀两件商品,500个线程秒杀第一个商品,

随机推荐