解决线程并发redisson使用遇到的坑
线程并发redisson的坑
背景
因为业务上的一个购买需求,需要对库存进行行程保护,防止超卖的出现(我们不是电商公司),经过调研,最终选择使用Redission来进行控制。
主要因为Redission丰富的API,开源框架,已经被广泛应用于实际生产环境。
问题描述
当我们使用Ression中Lock.lock()方法之后,如果存在线程并发常见情况下,会出现如下异常:
java.lang.IllegalMonitorStateException: attempt to unlock lock, not locked by current thread by node id: 9f178836-f7e1-44fe-a89d-2db52f399c0d thread-id: 22
问题分析
在thread-1还没有结束的时候,也就是在thread-1在获得锁但是还没有释放锁的时候, `thread-2由于被别的线程中断停止了等待从lock.tryLock的阻塞状态中返回继续执行接下来的逻辑,并且由于尝试去释放一个属于线程thread-1的锁而抛出了一个运行时异常导致该线程thread-2结束了, 然而thread-2完成了一系列操作后,线程thread-1才释放了自己的锁.
所以thread-2并没有获得锁,却执行了需要同步的内容,还尝试去释放锁。
那解决方式我们就知道了,当前线程加的锁由当前线程去解锁,也就是说当我们使用lock.unlock的时候加上线程的判断即可。
问题解决
RLock lock = redissonClient.getLock(key); if(lock.isLocked()){ // 是否还是锁定状态 if(lock.isHeldByCurrentThread()){ // 时候是当前执行线程的锁 lock.unlock(); // 释放锁 } }
redisson使用注意事项
Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格,相较于暴露底层操作的Jedis,Redisson提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。
特性 & 功能:
- 支持 Redis 单节点(single)模式、哨兵(sentinel)模式、主从(Master/Slave)模式以及集群(Redis Cluster)模式
- 程序接口调用方式采用异步执行和异步流执行两种方式
- 数据序列化,Redisson 的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在 Redis 里的读取和存储
- 单个集合数据分片,在集群模式下,Redisson 为单个 Redis 集合类型提供了自动分片的功能
- 提供多种分布式对象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等
- 提供丰富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等
- 分布式锁和同步器的实现,可重入锁(Reentrant Lock),公平锁(Fair Lock),联锁(MultiLock),红锁(Red Lock),信号量(Semaphonre),可过期性信号锁(PermitExpirableSemaphore)等
- 提供先进的分布式服务,如分布式远程服务(Remote Service),分布式实时对象(Live Object)服务,分布式执行服务(Executor Service),分布式调度任务服务(Schedule Service)和分布式映射归纳服务(MapReduce)
- 更多特性和功能,请关注官网:http://redisson.org
实现原理
redis本身是不支持上述的分布式对象和集合,Redisson是通过利用redis的特性在客户端实现了高级数据结构和特性,例如优先队列的实现,是通过客户端排序整理后再存入redis。
客户端实现,意味着当没有任何客户端在线时,这些所有的数据结构和特性都不会保留,也不会自动生效,例如过期事件的触发或原来优先队列的元素增加。
注意事项
实时性
RMap中有一个功能是可以设置键值对的过期时间的,并可以注册键值对的事件监听器
元素淘汰功能(Eviction)
Redisson的分布式的RMapCache Java对象在基于RMap的前提下实现了针对单个元素的淘汰机制。同时仍然保留了元素的插入顺序。由于RMapCache是基于RMap实现的,使它同时继承了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。Redisson提供的Spring Cache整合以及JCache正是基于这样的功能来实现的。
目前的Redis自身并不支持散列(Hash)当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理300个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到1小时之间。比如该次清理时删除了300条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。
正如官方wiki所述,这个功能是通过后台线程定时去清理的, 所以这个是非实时的(issue-1234:on expired event is not executed in real-time.),延迟在5秒到2小时之间,因此对实时性要求比较高的场景就得自己衡量了。
由于过期时间的非实时性,所以导致过期事件的发生也是非实时的,相应的监听器可能会延迟了一会儿才收到通知,在我的测试中,ttl设置在秒级误差是比较大的,分钟级别的ttl倒还好(左侧设置值,右侧实际耗时):
1s _ 5s
3s _ 5s
4s _ 5s
5s _ 9s
6s _ 10s
10s _ 15s
1m _ 1m11s
序列化
由Redisson默认的编码器为JsonJacksonCodec,JsonJackson在序列化有双向引用的对象时,会出现无限循环异常。而fastjson在检查出双向引用后会自动用引用符$ref替换,终止循环。
在我的情况中,我序列化了一个service,这个service已被spring托管,而且和另一个service之间也相互注入了,用fastjson能 正常序列化到redis,而JsonJackson则抛出无限循环异常。
为了序列化后的内容可见,所以不用redission其他自带的二进制编码器,自行实现编码器:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; import org.redisson.client.codec.BaseCodec; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; import java.io.IOException; public class FastjsonCodec extends BaseCodec { private final Encoder encoder = in -> { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); try { ByteBufOutputStream os = new ByteBufOutputStream(out); JSON.writeJSONString(os, in,SerializerFeature.WriteClassName); return os.buffer(); } catch (IOException e) { out.release(); throw e; } catch (Exception e) { out.release(); throw new IOException(e); } }; private final Decoder<Object> decoder = (buf, state) -> JSON.parseObject(new ByteBufInputStream(buf), Object.class); @Override public Decoder<Object> getValueDecoder() { return decoder; } @Override public Encoder getValueEncoder() { return encoder; } }
订阅发布
Redisson对订阅发布的封装是RTopic,这也是Redisson中很多事件监听的实现原理(例如键值对的事件监听)。
使用单元测试时发现,在事件发布后,订阅方需要延时一下才能收到事件。具体原因待查。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。