Mango Cache缓存管理库TinyLFU源码解析
目录
- 介绍
- 整体架构
- 初始化流程
- 读流程
- 写流程
- 事件处理机制
- 主流程
- write
- 清理工作
- 缓存管理
- 什么是LRU?
- 什么是SLRU?
- 什么是TinyLFU?
- mango Cache中的TinyLFU
- counter
- counter的初始化
- counter的使用
- lruCache
- slruCache
- filter
- TinyLFU的初始化
- TinyLFU写入
- TinyLFU访问
- 增加entry的访问次数
- 估计entry访问次数
- 总结
介绍
据官方所述,mango Cache是对Guava Cache基于go的部分实现,同时mangoCache参考了Caffeine以及go-tinylfu.
支持以下缓存管理策略:
- LRU
- Segmented LRU(默认)
- TinyLFU(实验性)
本文将从源码对其进行解析,重点将放在loadingCache(一种可以自定义如何载入新内存的cache)上面.
整体架构
mango Cache的主体功能由localCache结构体(local.go)实现,
type localCache struct { cache cache // 真正存放数据的地方 expireAfterAccess time.Duration //用户自定义的过期以及刷新时间 expireAfterWrite time.Duration refreshAfterWrite time.Duration policyName string //缓存管理策略 onInsertion Func //钩子函数 onRemoval Func loader LoaderFunc //自定义加载和重载函数 reloader Reloader stats StatsCounter //状态管理器 cap int //缓存容量 // 访问时的缓存管理策略 accessQueue policy // 写入时的缓存管理策略,只有在expireAfterWrite或refreshAfterWrite被设置以后才启用 writeQueue policy // 用于processEntries的事件队列 events chan entryEvent // 记录距离上一次写期间发生的读次数,用于判断是否进行清扫 readCount int32 // 用于关闭由localCache创建的协程 closing int32 closeWG sync.WaitGroup }
真正存储数据的结构体是cache(policy.go),所有缓存数据都存储在其中
type cache struct { size int64 // 缓存大小 segs [segmentCount]sync.Map // 分片sync.map, map[key]*entry }
entry是存放一个缓存数据的单元
type entry struct { hash uint64 //key的hash值 accessTime int64 writeTime int64 invalidated int32 //是否有效 loading int32 //是否处于载入中状态 key Key value atomic.Value // 存储的值 // 下面的属性只被缓存管理策略操作,所以不需要原子性的访问 accessList *list.Element //此entry目前所在的访问链表中的位置 writeList *list.Element //此entry目前所在的写入链表中的位置 listID uint8 //此entry目前所在的缓存段(SLRU) }
localCache使用事件队列来管理并发读写事件,事件队列是一个chan,其中流转着entryEvent(即数据以及对应的缓存事件),localCache通过processEntries协程来处理各种读写事件.之后将详细讲解mango Cache是如何进行读写操作的.
localCache的write、access等操作底层是通过操作cache、accessQueue以及writeQueue从而实现的,而localCache还会负责清扫过期数据的工作.
前面提到了mango Cache支持诸如LRU、SLRU以及TinyLFU等缓存管理策略,其在localCache中即为accessQueue以及writeQueue,负责对缓存的淘汰准入等操作.
初始化流程
mango Cache提供了两种cache---普通Cache以及LoadingCache,这两者都是接口,而localCache实现了这两个接口,由于LoadingCache继承了普通Cache,因此本文只讲解LoadingCache.
func NewLoadingCache(loader LoaderFunc, options ...Option) LoadingCache { c := newLocalCache() c.loader = loader for _, opt := range options { opt(c) } c.init() return c }
NewLoadingCache函数先初始化一个LoadingCache,然后根据用户传入的自定义载入函数和一些配置来初始化LoadingCache.配置包括注册插入或者删除时触发的钩子函数以及过期时间等等.然后调用localCache.init.
func (c *localCache) init() { c.accessQueue = newPolicy(c.policyName) c.accessQueue.init(&c.cache, c.cap) if c.expireAfterWrite > 0 || c.refreshAfterWrite > 0 { c.writeQueue = &recencyQueue{} } else { c.writeQueue = discardingQueue{} } c.writeQueue.init(&c.cache, c.cap) c.events = make(chan entryEvent, chanBufSize) c.closeWG.Add(1) go c.processEntries() }
localCache.init会根据用户传入的缓存管理策略名字来初始化accessQueue然后根据是否有写过期和写刷新配置来决定是否初始化写入队列.接着创建事件队列并开启事件处理协程.到此为止,cache启动完成.
读流程
LoadingCache的Get操作可以通过key获取缓存值,其流程为:
先从主缓存中查询entry
若未查询到entry,则记录miss并且调用用户自定义的load方法加载缓存值并返回
若查询到entry,先检查是否过期
- 若过期且没有设置loader则直接向事件处理协程发送eventDelete
- 若过期但设置了loader,则异步更新entry值
若没有过期则更新访问时间并向事件处理协程发送eventAccess然后记录命中
最后返回entry
func (c *localCache) Get(k Key) (Value, error) { en := c.cache.get(k, sum(k)) //计算key的hash并查询该key if en == nil { c.stats.RecordMisses(1) return c.load(k) } // 检查entry是否需要更新 now := currentTime() if c.isExpired(en, now) { if c.loader == nil { //如果没有设置加载器则直接删除该entry c.sendEvent(eventDelete, en) } else { //对于loadingCache,我们不删除这个entry //而是把它暂时留在缓存中,所以用户依旧可以读取到旧的缓存值 c.setEntryAccessTime(en, now) c.refreshAsync(en) } c.stats.RecordMisses(1) } else { c.setEntryAccessTime(en, now) c.sendEvent(eventAccess, en) c.stats.RecordHits(1) } return en.getValue(), nil }
需要注意一下这里的refreshAsync函数:
func (c *localCache) refreshAsync(en *entry) bool { if en.setLoading(true) { // 如果此entry没有在加载 if c.reloader == nil { go c.refresh(en) } else { c.reload(en) } return true } return false }
如果没有用户设置的重载器,就异步执行refresh,refresh函数实际上就是对entry进行加载.
而如果有重载器那么就同步执行用户自定义的reload函数.
写流程
loadingCache的Put操作与Get操作类似,流程如下:
先去主缓存查询key是否存在,若查询到对应的entry,那么直接更新entry
若没有查询到对应的entry,说明其不存在,因此根据key,value初始化一个新entry
如果缓存容量足够,则让主缓存存储该entry,此时会再次检查主存中是否有该entry(解决并发问题)
- 若cen不为空,说明主缓存中已经存在该entry,直接修改该entry即可
- 若cen为空,说明主缓存中还不存在该entry,那么就会在主缓存中存储该entry
最后向事件处理协程发送eventWrite事件
func (c *localCache) Put(k Key, v Value) { h := sum(k) en := c.cache.get(k, h) now := currentTime() if en == nil { en = newEntry(k, v, h) c.setEntryWriteTime(en, now) c.setEntryAccessTime(en, now) // 直接将新value添加进缓存(在缓存容量足够的时候) if c.cap == 0 || c.cache.len() < c.cap { cen := c.cache.getOrSet(en) if cen != nil { cen.setValue(v) c.setEntryWriteTime(cen, now) en = cen } } } else { // 更新entry en.setValue(v) c.setEntryWriteTime(en, now) } c.sendEvent(eventWrite, en) } //当entry在缓存中存在,则返回该entry,否则存储该entry func (c *cache) getOrSet(v *entry) *entry { seg := c.segment(v.hash) en, ok := seg.LoadOrStore(v.key, v) if ok { return en.(*entry) } atomic.AddInt64(&c.size, 1) return nil }
事件处理机制
主流程
mango Cache通过entryEvent chan以及processEntries协程来处理并发读写事务
缓存事件一共四个,分别为写入、访问、删除以及关闭.每个业务协程通过向localCache的events chan发送entryEvent通知事件处理协程,进而实现并发读写.
而processEntries协程内,会不断从events chan内取出entryEvent并执行对应的操作,在write、access以及delete操作后会执行清理工作(具体清扫工作由expireEntries函数执行)
type event uint8 const ( eventWrite event = iota eventAccess eventDelete eventClose ) type entryEvent struct { entry *entry event event } func (c *localCache) processEntries() { defer c.closeWG.Done() for e := range c.events { switch e.event { case eventWrite: //写入事务 c.write(e.entry) c.postWriteCleanup() //清理操作 case eventAccess: //访问事务 c.access(e.entry) c.postReadCleanup() //清理操作 case eventDelete: if e.entry == nil { //InvalidateAll函数中使用 c.removeAll() } else { c.remove(e.entry) //移除单个entry } c.postReadCleanup() //清理操作 case eventClose: if c.reloader != nil { // 停止所有refresh工作 c.reloader.Close() } c.removeAll() return } } }
write
由于事件处理机制对于access、delete和write的操作类似,因此这里只讲解较为复杂的write操作:
首先通过调用底层访问队列以及写入队列的write方法
触发用户自定义的钩子函数
如果write方法返回值不为空,说明有entry被驱逐,
因此需要从写入队列将其删除,同时记录驱逐并触发用户自定义的钩子函数
func (c *localCache) write(en *entry) { ren := c.accessQueue.write(en) c.writeQueue.write(en) if c.onInsertion != nil { c.onInsertion(en.key, en.getValue()) } if ren != nil { //有entry被驱逐出了缓存 c.writeQueue.remove(ren) c.stats.RecordEviction() if c.onRemoval != nil { c.onRemoval(ren.key, ren.getValue()) } } }
后面将详细讲述底层访问队列以及缓存管理是如何实现的.
清理工作
前面讲到过每次进行完write、access以及delete操作后会执行清理工作.具体地,write操作会触发postWriteCleanup而access和delete操作会触发postReadCleanup.
postReadCleanup会根据当前距离上一次写的read操作次数是否达到清理工作阈值来决定是否清理,这个阈值是64,也就是说每隔64次read操作就会触发一次清理工作
而postWriteCleanup将在每一次write操作之后触发
真正的清理工作由expireEntries函数完成,它一次清理工作最多只会清理16个entry避免了对事件处理的长时间阻塞.
func (c *localCache) postReadCleanup() { if atomic.AddInt32(&c.readCount, 1) > drainThreshold { atomic.StoreInt32(&c.readCount, 0) c.expireEntries() } } // 在添加完entry以后再执行 func (c *localCache) postWriteCleanup() { atomic.StoreInt32(&c.readCount, 0) c.expireEntries() } //清理工作函数 func (c *localCache) expireEntries() { remain := drainMax now := currentTime() if c.expireAfterAccess > 0 { expiry := now.Add(-c.expireAfterAccess).UnixNano() c.accessQueue.iterate(func(en *entry) bool { if remain == 0 || en.getAccessTime() >= expiry { // 找到了第一个没有过期的entry或者清理entry数足够了 return false } c.remove(en) c.stats.RecordEviction() remain-- return remain > 0 }) } if remain > 0 && c.expireAfterWrite > 0 { ... } if remain > 0 && c.loader != nil && c.refreshAfterWrite > 0 { ... } }
缓存管理
localCache在初始化过程中也初始化了缓存管理策略,由于localCache的writeQueue默认使用LRU缓存淘汰策略,而accessQueue支持LRU、SLRU以及TinyLFU三种缓存淘汰策略,本节将着重讲解accessQueue.
什么是LRU?
LRU是Least Recently Used的缩写,即最近最少使用.在mango Cache中LRU依靠go SDK自带的List(双向链表)实现,新缓存条目会被插入List头部,如果List内元素达到容量上限则删除List尾部的元素,此元素也正是最近最少使用的元素.LRU可以使得主缓存内的缓存条目永远是最近被访问的,不是最近访问的元素将被淘汰.
如果一个不是经常使用的数据,偶尔或者周期性的被使用,那么该数据会被加到LRU链表头部,而这种不经常使用的数据,放在链表头部,占用了空间;一直等到LRU淘汰完,才会被剔除链表;如果这种数据一次性过多,那么链表数据都是这种无用的数据,从而会导致缓存命中率低下,影响系统性能.
什么是SLRU?
Segmented LRU(SLRU)是一种LRU的改进,主要把在一个时间窗口内命中至少2次的记录和命中1次的单独存放,这样就可以把短期内较频繁的缓存元素区分开来.具体做法上,SLRU包含2个固定尺寸的LRU,一个叫Probation段(A1),一个叫Protection段(A2).新记录总是插入到A1中,当A1的记录被再次访问,就把它移到A2,当A2满了需要驱逐记录时,会把驱逐记录插入到A1中.
在mango Cache的实现中,Protection段与Probation段大小比为4:1.
SLRU是mango Cache的默认缓存管理策略
什么是TinyLFU?
TinyLFU是一种空间利用率很高的新的数据结构,可以在较大访问量的场景下近似的替代LFU的数据统计部分(meta-data),其采用类似布隆过滤器的位计数器来实现对每个key的访问次数的粗略统计.(由于哈希冲突的存在,位计数器的结果将偏大)
对于较为静态的访问分布,各种的缓存管理策略的差异是微不足道的,而对于偏倚较大的负载场景,TinyLFU体现出了较大的优势.即便是使用了TinyLFU准入策略进行优化的LRU也获得了较大的提升.
如果想要详细了解TinyLFU请阅读论文TinyLFU: A Highly Efficient Cache Admission Policy 以及一种简略实现讲解LRU、LFU、TinyLFU缓存算法
mango Cache中的TinyLFU
mango Cache的实现中,实际上是实现了改进版的TinyLFU也就是Window-TinyLFU,这个缓存数据结构也在论文中有讲解,简而言之就是缓存由window Cache、doorKeeper(布隆过滤器)、counter以及SLRU组成.主缓存使用SLRU驱逐策略和TinyLFU准入策略,window Cache仅使用LRU驱逐策略无准入策略.
主缓存中的SLRU策略的A1和A2区域被静态划分开来,80%空间被分配给热点元素(A2),被驱逐者则从20%的非热项(A1)中选取.任何到来的元素总是允许进入window Cache, window Cache的淘汰元素有机会进入主缓存,如果在经过TinyLFU准入策略检验后被主缓存接受,那么该元素将进入主缓存,此时Window-TinyLFU的淘汰者就是主缓存的淘汰者(从A1段选出),否则该元素将被window Cache淘汰.
type tinyLFU struct { filter bloomFilter // doorkeeper counter countMinSketch // 4bit计数器 additions int //目前采样数量 samples int //采样窗口大小 lru lruCache //window Cache slru slruCache //主缓存 }
接下来本文将详细讲述mango Cache中tinyLFU的具体实现
counter
counter是实现TinyLFU的重点,对于访问次数的粗略统计就是通过此数据结构实现的.
const sketchDepth = 4 //hash次数 type countMinSketch struct { counters []uint64 mask uint32 }
可以看到, counter由一个uint64类型的数组和一个mask组成, 因为我们使用的是4次hash的4bit计数器,因此数组的每个元素实际上是4个计数器,可以通过位操作来实现访问.
counter的初始化
func (c *countMinSketch) init(width int) { size := nextPowerOfTwo(uint32(width)) >> 2 //size = (width * 4 * 4) bits / 64 if size < 1 { size = 1 } c.mask = size - 1 if len(c.counters) == int(size) { c.clear() } else { c.counters = make([]uint64, size) } }
我们通过传入的width确定需要的计数器数量(size大小)
counter的使用
counter最关键的操作就是按hash增加计数器值以及根据hash估算该hash的计数器值:
func (c *countMinSketch) add(h uint64) { h1, h2 := uint32(h), uint32(h>>32) for i := uint32(0); i < sketchDepth; i++ { //这里使用折叠法求得hash值 idx, off := c.position(h1 + i*h2) c.inc(idx, (16*i)+off) } } // 根据给定hash值返回对应计数器中最小的那个计数器值 func (c *countMinSketch) estimate(h uint64) uint8 { h1, h2 := uint32(h), uint32(h>>32) var min uint8 = 0xFF for i := uint32(0); i < sketchDepth; i++ { idx, off := c.position(h1 + i*h2) count := c.val(idx, (16*i)+off) if count < min { min = count } } return min }
里面的position、val以及inc函数都是对应的位操作,有兴趣的话可以进一步研究下.
需要注意的是estimate函数用于求给定hash对应计数器中最小计数器的值,这是因为有哈希碰撞的情况,因此计数器的值始终会大于等于真实的访问次数,因此这里采用多次hash取其中最小值的方法来减少误差.
同时,为了保证计数器的时效性,counter实现了新鲜度机制,将在一定条件下触发reset:
//将每个计数器的值减半 func (c *countMinSketch) reset() { for i, v := range c.counters { if v != 0 { c.counters[i] = (v >> 1) & 0x7777777777777777 } } }
lruCache
lruCache是LRU的实现,在mango Cache中是一个链表.
type lruCache struct { cache *cache cap int ls list.List } func (l *lruCache) init(c *cache, cap int) { l.cache = c l.cap = cap l.ls.Init() }
slruCache
slruCache是SLRU的实现,在mango Cache中由两个链表组成.
const ( protectedRatio = 0.8 ) type slruCache struct { cache *cache probationCap int probationLs list.List protectedCap int protectedLs list.List } func (l *slruCache) init(c *cache, cap int) { l.cache = c l.protectedCap = int(float64(cap) * protectedRatio) l.probationCap = cap - l.protectedCap l.probationLs.Init() l.protectedLs.Init() }
slruCache中,值得注意的点是它的写入策略:
func (l *slruCache) write(en *entry) *entry { if en.accessList != nil { // entry已存在,直接修改该entry l.markAccess(en) return nil } // 尝试将新entry加入probation段 cen := l.cache.getOrSet(en) if cen == nil { en.listID = probationSegment //将其加入probation段 en.accessList = l.probationLs.PushFront(en) } else { // 该entry已存在,直接修改它的值 cen.setValue(en.getValue()) cen.setWriteTime(en.getWriteTime()) if cen.accessList == nil { //该entry已载入缓存但是还没有注册到SLRU管理的链表中 cen.listID = probationSegment cen.accessList = l.probationLs.PushFront(cen) } else { l.markAccess(cen) } } // probation段超出容量但list并没有超出容量 if l.probationCap > 0 && l.probationLs.Len() > l.probationCap && l.length() > (l.probationCap+l.protectedCap) { // 移除并返回probation段尾部的entry en = getEntry(l.probationLs.Back()) return l.remove(en) } return nil } //记录访问情况 func (l *slruCache) markAccess(en *entry) { if en.listID == protectedSegment { // entry位于在protected段 l.protectedLs.MoveToFront(en.accessList) return } // 若entry位于probation段,则将其提升至protected段 en.listID = protectedSegment l.probationLs.Remove(en.accessList) en.accessList = l.protectedLs.PushFront(en) if l.protectedCap > 0 && l.protectedLs.Len() > l.protectedCap { // protected段超出容量限制,则淘汰其尾部的entry将其加入probation段 en = getEntry(l.protectedLs.Back()) en.listID = probationSegment l.protectedLs.Remove(en.accessList) en.accessList = l.probationLs.PushFront(en) } }
这样一来,就实现了SLRU的缓存管理策略.
filter
filter是一个布隆过滤器,这里不展开讲解其实现细节,只需要了解布隆过滤器可以通过key的hash来确定这个key是否在filter中,并且其只占用非常小的内存.如果想要详细了解布隆过滤器,可以参考这篇文章:布隆过滤器
type bloomFilter struct { numHashes uint32 // 每个元素使用的hash数量 bitsMask uint32 // 位向量大小 bits []uint64 // 过滤器位向量 } //根据给定的插入元素数量以及假阳性率初始化布隆过滤器 func (f *bloomFilter) init(ins int, fpp float64) { ln2 := math.Log(2.0) factor := -math.Log(fpp) / (ln2 * ln2) numBits := nextPowerOfTwo(uint32(float64(ins) * factor)) if numBits == 0 { numBits = 1 } f.bitsMask = numBits - 1 if ins == 0 { f.numHashes = 1 } else { f.numHashes = uint32(ln2 * float64(numBits) / float64(ins)) } size := int(numBits+63) / 64 if len(f.bits) != size { f.bits = make([]uint64, size) } else { f.reset() } }
TinyLFU的初始化
前面介绍了TinyLFU的各个组件,接下来将详细讲解TinyLFU是如何进行缓存管理的.
const ( //entry所处的缓存段 admissionWindow uint8 = iota probationSegment protectedSegment ) const ( samplesMultiplier = 8 //采样窗口大小乘数 insertionsMultiplier = 2 //doorkeeper大小乘数 countersMultiplier = 1 //计数器大小乘数 falsePositiveProbability = 0.1 //假阳性率 admissionRatio = 0.01 //window Cache占比 ) func (l *tinyLFU) init(c *cache, cap int) { if cap > 0 { // 只在容量有上限时开启doorkeeper l.samples = samplesMultiplier * cap //采样窗口大小 l.filter.init(insertionsMultiplier*cap, falsePositiveProbability) //doorkeeper初始化 l.counter.init(countersMultiplier * cap) //计数器初始化 } lruCap := int(float64(cap) * admissionRatio) l.lru.init(c, lruCap) //window Cache初始化 l.slru.init(c, cap-lruCap) //SLRU主存初始化 }
TinyLFU写入
TinyLFU的写入流程如下
首先写入window Cache
如果window Cache出现被淘汰的candidate,则将从SLRU中选取一个victim(如果SLRU未满,则不会产生victim,此时直接将candidate插入SLRU)
在计数器中查询candidate和victim的访问次数
- 若candidate的访问次数较大,则将其插入SLRU,淘汰victim
- 否则将candidate淘汰
根据此流程,我们可以发现被插入SLRU的entry的访问次数一定是较大的,而我们通过计数器实现了对entry访问次数的保存,这样就结合了LRU以及LFU的优点并且没有占用过多的内存,这正是TinyLFU最大的优势所在.
func (l *tinyLFU) write(en *entry) *entry { if l.lru.cap <= 0 { //若容量无限,则直接对SLRU写入 return l.slru.write(en) } l.increase(en.hash) //entry访问次数+1 candidate := l.lru.write(en) //window Cache中被淘汰的entry if candidate == nil { return nil } victim := l.slru.victim() //SLRU中下一个将被淘汰的entry if victim == nil { return l.slru.write(candidate) } candidateFreq := l.estimate(candidate.hash) victimFreq := l.estimate(victim.hash) if candidateFreq > victimFreq { return l.slru.write(candidate) } return candidate }
TinyLFU访问
TinyLFU的访问很简单,只是根据entry所在的缓存段分别调用对应的access函数实现访问
func (l *tinyLFU) access(en *entry) { l.increase(en.hash) if en.listID == admissionWindow { l.lru.access(en) } else { l.slru.access(en) } }
增加entry的访问次数
write函数中通过increase可以对entry的访问次数+1,下面分析一下increase函数:
func (l *tinyLFU) increase(h uint64) { if l.samples <= 0 { return } l.additions++ if l.additions >= l.samples { l.filter.reset() l.counter.reset() l.additions = 0 } if l.filter.put(h) { l.counter.add(h) } }
这里会判断已采样数量是否超出采样窗口,若超出了则会重置doorkeeper和计数器.如果entry在doorkeeper中存在,那么filter.put(h)将返回true,此时会调用counter.add(h)来增加entry的访问次数.
估计entry访问次数
write函数中通过estimate可以查询entry的访问次数,下面分析一下estimate函数:
func (l *tinyLFU) estimate(h uint64) uint8 { freq := l.counter.estimate(h) if l.filter.contains(h) { freq++ } return freq }
已经在前面的章节中介绍过counter.estimate(),其根据hash值返回对应元素的计数器中最小的值以减少哈希碰撞的影响.这里需要注意的是l.filter.contains(h),它将在doorkeeper中查找该hash值是否存在,若存在需要将估计值+1(因为doorkeeper中的值也算访问次数)
总结
Mango Cache中还有统计模块来记录缓存的各方面运行状态(命中率,缓存驱逐等等),由于不是核心内容,因此就不在本文进行赘述了.
Mango Cache最值得学习的点就是事件处理机制和TinyLFU缓存管理策略,希望读者可以好好体会在并发状态下,如何实现安全且高效的缓存操作并借助TinyLFU实现较高的缓存命中率.
以上就是Mango Cache缓存管理库TinyLFU源码解析的详细内容,更多关于Mango Cache TinyLFU的资料请关注我们其它相关文章!