解决线程并发redisson使用遇到的坑

线程并发redisson的坑

背景

因为业务上的一个购买需求,需要对库存进行行程保护,防止超卖的出现(我们不是电商公司),经过调研,最终选择使用Redission来进行控制。

主要因为Redission丰富的API,开源框架,已经被广泛应用于实际生产环境。

问题描述

当我们使用Ression中Lock.lock()方法之后,如果存在线程并发常见情况下,会出现如下异常:

java.lang.IllegalMonitorStateException: attempt to unlock lock, not locked by current thread by node id: 9f178836-f7e1-44fe-a89d-2db52f399c0d thread-id: 22

问题分析

在thread-1还没有结束的时候,也就是在thread-1在获得锁但是还没有释放锁的时候, `thread-2由于被别的线程中断停止了等待从lock.tryLock的阻塞状态中返回继续执行接下来的逻辑,并且由于尝试去释放一个属于线程thread-1的锁而抛出了一个运行时异常导致该线程thread-2结束了, 然而thread-2完成了一系列操作后,线程thread-1才释放了自己的锁.

所以thread-2并没有获得锁,却执行了需要同步的内容,还尝试去释放锁。

那解决方式我们就知道了,当前线程加的锁由当前线程去解锁,也就是说当我们使用lock.unlock的时候加上线程的判断即可。

问题解决

 RLock lock = redissonClient.getLock(key);
    if(lock.isLocked()){ // 是否还是锁定状态
      if(lock.isHeldByCurrentThread()){ // 时候是当前执行线程的锁
        lock.unlock(); // 释放锁
      }
    }

redisson使用注意事项

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格,相较于暴露底层操作的Jedis,Redisson提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。

特性 & 功能:

  • 支持 Redis 单节点(single)模式、哨兵(sentinel)模式、主从(Master/Slave)模式以及集群(Redis Cluster)模式
  • 程序接口调用方式采用异步执行和异步流执行两种方式
  • 数据序列化,Redisson 的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在 Redis 里的读取和存储
  • 单个集合数据分片,在集群模式下,Redisson 为单个 Redis 集合类型提供了自动分片的功能
  • 提供多种分布式对象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等
  • 提供丰富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等
  • 分布式锁和同步器的实现,可重入锁(Reentrant Lock),公平锁(Fair Lock),联锁(MultiLock),红锁(Red Lock),信号量(Semaphonre),可过期性信号锁(PermitExpirableSemaphore)等
  • 提供先进的分布式服务,如分布式远程服务(Remote Service),分布式实时对象(Live Object)服务,分布式执行服务(Executor Service),分布式调度任务服务(Schedule Service)和分布式映射归纳服务(MapReduce)
  • 更多特性和功能,请关注官网:http://redisson.org

实现原理

redis本身是不支持上述的分布式对象和集合,Redisson是通过利用redis的特性在客户端实现了高级数据结构和特性,例如优先队列的实现,是通过客户端排序整理后再存入redis。

客户端实现,意味着当没有任何客户端在线时,这些所有的数据结构和特性都不会保留,也不会自动生效,例如过期事件的触发或原来优先队列的元素增加。

注意事项

实时性

RMap中有一个功能是可以设置键值对的过期时间的,并可以注册键值对的事件监听器

元素淘汰功能(Eviction)

Redisson的分布式的RMapCache Java对象在基于RMap的前提下实现了针对单个元素的淘汰机制。同时仍然保留了元素的插入顺序。由于RMapCache是基于RMap实现的,使它同时继承了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。Redisson提供的Spring Cache整合以及JCache正是基于这样的功能来实现的。

目前的Redis自身并不支持散列(Hash)当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理300个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到1小时之间。比如该次清理时删除了300条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。

正如官方wiki所述,这个功能是通过后台线程定时去清理的, 所以这个是非实时的(issue-1234:on expired event is not executed in real-time.),延迟在5秒到2小时之间,因此对实时性要求比较高的场景就得自己衡量了。

由于过期时间的非实时性,所以导致过期事件的发生也是非实时的,相应的监听器可能会延迟了一会儿才收到通知,在我的测试中,ttl设置在秒级误差是比较大的,分钟级别的ttl倒还好(左侧设置值,右侧实际耗时):

1s _ 5s
3s _ 5s
4s _ 5s
5s _ 9s
6s _ 10s
10s _ 15s
1m _ 1m11s

序列化

由Redisson默认的编码器为JsonJacksonCodec,JsonJackson在序列化有双向引用的对象时,会出现无限循环异常。而fastjson在检查出双向引用后会自动用引用符$ref替换,终止循环。

在我的情况中,我序列化了一个service,这个service已被spring托管,而且和另一个service之间也相互注入了,用fastjson能 正常序列化到redis,而JsonJackson则抛出无限循环异常。

为了序列化后的内容可见,所以不用redission其他自带的二进制编码器,自行实现编码器:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;​
import java.io.IOException;​
public class FastjsonCodec extends BaseCodec {​
 private final Encoder encoder = in -> {
 ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
 try {
 ByteBufOutputStream os = new ByteBufOutputStream(out);
 JSON.writeJSONString(os, in,SerializerFeature.WriteClassName);
 return os.buffer();
 } catch (IOException e) {
 out.release();
 throw e;
 } catch (Exception e) {
 out.release();
 throw new IOException(e);
 }
 };
​
 private final Decoder<Object> decoder = (buf, state) ->
 JSON.parseObject(new ByteBufInputStream(buf), Object.class);
​
 @Override
 public Decoder<Object> getValueDecoder() {
 return decoder;
 }
​
 @Override
 public Encoder getValueEncoder() {
 return encoder;
 }
}

订阅发布

Redisson对订阅发布的封装是RTopic,这也是Redisson中很多事件监听的实现原理(例如键值对的事件监听)。

使用单元测试时发现,在事件发布后,订阅方需要延时一下才能收到事件。具体原因待查。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 使用redis分布式锁解决并发线程资源共享问题

    前言 众所周知, 在多线程中,因为共享全局变量,会导致资源修改结果不一致,所以需要加锁来解决这个问题,保证同一时间只有一个线程对资源进行操作 但是在分布式架构中,我们的服务可能会有n个实例,但线程锁只对同一个实例有效,就需要用到分布式锁----redis setnx 原理 修改某个资源时, 在redis中设置一个key,value根据实际情况自行决定如何表示 我们既然要通过检查key是否存在(存在表示有线程在修改资源,资源上锁,其他线程不可同时操作,若key不存在,表示资源未被线程占用,允许线程

  • 详解redis分布式锁的这些坑

    一.白话分布式 什么是分布式,用最简单的话来说,就是为了较低单个服务器的压力,将功能分布在不同的机器上面,本来一个程序员可以完成一个项目:需求->设计->编码->测试 但是项目多的时候,一个人也扛不住,这就需要不同的人进行分工合作了 这就是一个简单的分布式协同工作了: 二.分布式锁 首先看一个问题,如果说某个环节被终止或者别侵占,就会发生不可知的事情 这就会出现,设计好的或者设计的半成品会被破坏,导致后面环节出错: 这时候,我们就需要引入分布式锁的概念: 何为分布式锁 当在分布式模型下,

  • Spring使用redis遇到的问题及解决方案

    本人在spring中使用redis作为缓存时,遇到两个坑,现在记录如下,算是作为自己的备忘吧,文笔不好,望大家见谅: 一.配置文件 <!-- 加载Properties文件 --> <bean id="configurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locatio

  • 使用Redis incr解决并发问题的操作

    项目背景: 1.新增问题件工单,工单中有工单编码字段,工单编码字段的规则为 "WT"+yyyyMMdd+0000001. 2.每天的工单生成量是30W,所以会存在并发问题 解决思路: 1.首先乐观的认为redis不会宕机,对应的缓存不会被清除(除非人为操作,人为操作会有独立的补救办法) 2.将工单编码存到缓存中(redis),其值只存"WT"+yyyyMMdd后面的数字部分: 对应的key为:key标识+yyyyMMdd,即每天一个key 3.每次生成工单编码时,先

  • 解决线程并发redisson使用遇到的坑

    线程并发redisson的坑 背景 因为业务上的一个购买需求,需要对库存进行行程保护,防止超卖的出现(我们不是电商公司),经过调研,最终选择使用Redission来进行控制. 主要因为Redission丰富的API,开源框架,已经被广泛应用于实际生产环境. 问题描述 当我们使用Ression中Lock.lock()方法之后,如果存在线程并发常见情况下,会出现如下异常: java.lang.IllegalMonitorStateException: attempt to unlock lock,

  • 解决spring集成redisson踩过的坑

    目录 spring集成redisson踩过的坑 第一坑就是版本兼容问题 第二个坑是设置密码问题 spring整合redisson配置 配置方式 单节点配置standalone 哨兵配置sentinel 集群配置cluster 主从部署方式(master/slave) spring集成redisson踩过的坑 我用spring的xml集成一直报错,所以只能选择注解方式: @Configuration public class RedissionConfig { Logger log = Logge

  • Redis锁完美解决高并发秒杀问题

    目录 1 单机环境下的锁 2 分布式情况下使用Redis锁. 3 一台服务宕机,导致无法释放锁 4 给每一把锁加上过期时间 5延长锁的过期时间,解决锁失效 6 使用Redisson简化代码 场景:一家网上商城做商品限量秒杀. 1 单机环境下的锁 将商品的数量存到Redis中.每个用户抢购前都需要到Redis中查询商品数量(代替mysql数据库.不考虑事务),如果商品数量大于0,则证明商品有库存.然后我们在进行库存扣减和接下来的操作.因为多线程并发问题,我们不得不在get()方法内部使用同步代码块

  • C#解决SQlite并发异常问题的方法(使用读写锁)

    本文实例讲述了C#解决SQlite并发异常问题的方法.分享给大家供大家参考,具体如下: 使用C#访问sqlite时,常会遇到多线程并发导致SQLITE数据库损坏的问题. SQLite是文件级别的数据库,其锁也是文件级别的:多个线程可以同时读,但是同时只能有一个线程写.Android提供了SqliteOpenHelper类,加入Java的锁机制以便调用.但在C#中未提供类似功能. 作者利用读写锁(ReaderWriterLock),达到了多线程安全访问的目标. using System; usin

  • PHP解决高并发的优化方案实例

    我们通常衡量一个Web系统的吞吐率的指标是QPS(Query Per Second,每秒处理请求数),解决每秒数万次的高并发场景,这个指标非常关键.举个例子,我们假设处理一个业务请求平均响应时间为100ms,同时,系统内有20台Apache的Web服务器,配置MaxClients为500个(表示Apache的最大连接数目). 那么,我们的Web系统的理论峰值QPS为(理想化的计算方式): 20*500/0.1 = 100000 (10万QPS) 咦?我们的系统似乎很强大,1秒钟可以处理完10万的

  • JDK源码之线程并发协调神器CountDownLatch和CyclicBarrier详解

    目录 引言 CountDownLatch 使用场景 底层实现原理 初始化 计数器递减 阻塞线程 CyclicBarrier 使用场景 底层实现原理 初始化 阻塞等待 总结 引言 那么在程序的世界中是如何对这种协调关系进行描述的呢?今天就和大家聊聊Java大神Doug Lea在并发包中如何通过CountDownLatch和CyclicBarrier实现任务协调的代码描述. CountDownLatch 我相信大家都知道好代码的一个重要特性就是代码中类.变量等的命名可以做到顾名思义,也就是说看到命名

  • C#使用读写锁解决多线程并发问题

    一.简介 在开发程序的过程中,难免少不了写入错误日志这个关键功能.实现这个功能,可以选择使用第三方日志插件,也可以选择使用数据库,还可以自己写个简单的方法把错误信息记录到日志文件.现在我们来讲下最后一种方法: 在选择最后一种方法实现的时候,若对文件操作与线程同步不熟悉,问题就有可能出现了,因为同一个文件并不允许多个线程同时写入,否则会提示“文件正在由另一进程使用,因此该进程无法访问此文件”.这是文件的并发写入问题,就需要用到线程同步.而微软也给线程同步提供了一些相关的类可以达到这样的目的,本文使

  • 使用Redis解决高并发方案及思路解读

    目录 NoSQL Redis 痛点 思路 分布式锁 锁续命 扩展 结语 NoSQL Not Only SQL的简称.NoSQL是解决传统的RDBMS在应对某些问题时比较乏力而提出的. 即非关系型数据库,它们不保证关系数据的ACID特性,数据之间一般没有关联,在扩展上就非常容易实现,并且拥有较高的性能. Redis redis是nosql的典型代表,也是目前互联网公司的必用技术. redis是键值(Key-Value)存储数据库,主要会使用到哈希表.大多数时候是直接以缓存的形式被使用,使得请求不直

  • 如何利用Redis锁解决高并发问题详解

    redis技术的使用: redis真的是一个很好的技术,它可以很好的在一定程度上解决网站一瞬间的并发量,例如商品抢购秒杀等活动... redis之所以能解决高并发的原因是它可以直接访问内存,而以往我们用的是数据库(硬盘),提高了访问效率,解决了数据库服务器压力. 为什么redis的地位越来越高,我们为何不选择memcache,这是因为memcache只能存储字符串,而redis存储类型很丰富(例如有字符串.LIST.SET等),memcache每个值最大只能存储1M,存储资源非常有限,十分消耗内

  • 基于Django的乐观锁与悲观锁解决订单并发问题详解

    前言 订单并发这个问题我想大家都是有一定认识的,这里我说一下我的一些浅见,我会尽可能的让大家了解如何解决这类问题. 在解释如何解决订单并发问题之前,需要先了解一下什么是数据库的事务.(我用的是mysql数据库,这里以mysql为例) 1)     事务概念 一组mysql语句,要么执行,要么全不不执行.  2)  mysql事务隔离级别 Read Committed(读取提交内容) 如果是Django2.0以下的版本,需要去修改到这个隔离级别,不然乐观锁操作时无法读取已经被修改的数据 Repea

随机推荐