基于Golang实现延迟队列(DelayQueue)

目录
  • 背景
  • 原理
    • 随机删除
    • 重置元素到期时间
  • Golang实现
    • 数据结构
    • 实现原理
    • 添加元素
    • 阻塞获取元素
    • Channel方式阻塞读取
    • 性能测试
  • 总结

背景

延迟队列是一种特殊的队列,元素入队时需要指定到期时间(或延迟时间),从队头出队的元素必须是已经到期的,而且最先到期的元素最先出队,也就是队列里面的元素是按照到期时间排序的,添加元素和从队头出队的时间复杂度是O(log(n))。

由于以上性质,延迟队列一般可以用于以下场景(定时任务、延迟任务):

  • 缓存:用户淘汰过期元素
  • 通知:在指定时间通知用户,比如会议开始前30分钟
  • 订单:30分钟未支付取消订单
  • 超时:服务器自动断开太长时间没有心跳的连接

其实在Golang中是自带定时器的,也就是time.After()time.AfterFunc()等函数,它们的性能也是非常好的,随着Golang版本升级还会优化。但是对于某些场景来说确实不够方便,比如缓存场景我们需要能够支持随机删除定时器,随机重置过期时间,更加灵活的删除一小批过期元素。而且像Kafka的时间轮算法(TimeWheel)里面也用到了延迟队列,因此还是有必要了解下如何实现延迟队列。

原理

延迟队列每次出队的是最小到期时间的元素,而堆就是用来获取最值的数据结构。使用堆我们可以实现O(log(n))时间复杂度添加元素和移除最小到期时间元素。

随机删除

有时候延迟队列还需要具有随机删除元素的能力,可以通过以下方式实现:

  • 元素添加删除标记字段:堆中每个元素都添加一个删除标记字段,并把这个元素的地址返回给用户,用户就可以标记元素的这个字段为true,这样元素到达堆顶时如果判断到这个字段为true就会被清除,而延迟队列里的元素逻辑上是一定会到达堆顶的(因为时间会流逝)。这是一种懒删除的方式。
  • 元素添加堆中下标字段(或用map记录下标):堆中每个元素都添加一个堆中下标字段,并把这个元素的地址返回给用户,这样我们就可以通过这个元素里面记录的下标快速定位元素在堆中的位置,从而删除元素。详细可以看文章如何实现一个支持O(log(n))随机删除元素的堆。

重置元素到期时间

如果需要重置延迟队列里面元素的到期时间,则必须知道元素在堆中的下标,因为重置到期时间之后必须对堆进行调整,因此只能是元素添加堆中下标字段

Golang实现

这里我们实现一个最简单的延迟队列,也就是不支持随机删除元素和重置元素的到期时间,因为有些场景只需要添加元素和获取到期元素这两个功能,比如Kafka中的时间轮,而且这种简单实现性能会高一点。

代码地址

数据结构

主要的结构可以看到就是一个heap,Entry是每个元素在堆中的表示,Value是具体的元素值,Expired是为了堆中元素根据到期时间排序。

mutex是一个互斥锁,主要是保证操作并发安全。

wakeup是一个缓冲区长度为1的通道,通过它实现添加元素的时候唤醒等待队列不为空或者有更小到期时间元素加入的协程。(重点)

type Entry[T any] struct {
	Value   T
	Expired time.Time // 到期时间
}

// 延迟队列
type DelayQueue[T any] struct {
	h      *heap.Heap[*Entry[T]]
	mutex  sync.Mutex    // 保证并发安全
	wakeup chan struct{} // 唤醒通道
}

// 创建延迟队列
func New[T any]() *DelayQueue[T] {
	return &DelayQueue[T]{
		h: heap.New(nil, func(e1, e2 *Entry[T]) bool {
			return e1.Expired.Before(e2.Expired)
		}),
		wakeup: make(chan struct{}, 1),
	}
}

实现原理

阻塞获取元素的时候如果队列已经没有元素,或者没有元素到期,那么协程就需要挂起等待。而被唤醒的条件是元素到期、队列不为空或者有更小到期时间元素加入。

其中元素到期协程在阻塞获取元素时发现堆顶元素还没到期,因此这个条件可以自己构造并等待。但是条件队列不为空和有更小到期时间元素加入则需要另外一个协程在添加元素时才能满足,因此必须通过一个中间结构来进行协程间通信,一般Golang里面会使用Channel来实现。

添加元素

一开始加了一个互斥锁,避免并发冲突,然后把元素加到堆里。

因为我们Take()操作,既阻塞获取元素操作,在不满足条件时会去等待wakeup通道,但是等待通道前必须释放锁,否则Push()无法写入新元素去满足条件队列不为空和有更小到期时间元素加入。而从释放锁后到开始读取wakeup通道这段时间是没有锁保护的,如果Push()在这期间插入新元素,为了保证通道不阻塞同时又能通知到Take()协程,我们的通道的长度需要是1,同时使用select+default保证在通道里面已经有元素的时候不阻塞Push()协程。

// 添加延迟元素到队列
func (q *DelayQueue[T]) Push(value T, delay time.Duration) {
	q.mutex.Lock()
	defer q.mutex.Unlock()
	entry := &Entry[T]{
		Value:   value,
		Expired: time.Now().Add(delay),
	}
	q.h.Push(entry)
	// 唤醒等待的协程
	// 这里表示新添加的元素到期时间是最早的,或者原来队列为空
	// 因此必须唤醒等待的协程,因为可以拿到更早到期的元素
	if q.h.Peek() == entry {
		select {
		case q.wakeup <- struct{}{}:
		default:
		}
	}
}

阻塞获取元素

这里先判断堆是否有元素,如果有获取堆顶元素,然后判断是否已经到期,如果到期则直接出堆并返回。否则等待直到超时或者元素到期或者有新的元素到达。

这里在解锁之前会清空wakeup通道,这样可以保证下面读取的wakeup通道里的元素肯定是新加入的。

// 等待直到有元素到期
// 或者ctx被关闭
func (q *DelayQueue[T]) Take(ctx context.Context) (T, bool) {
	for {
		var expired *time.Timer
		q.mutex.Lock()
		// 有元素
		if !q.h.Empty() {
			// 获取元素
			entry := q.h.Peek()
			if time.Now().After(entry.Expired) {
				q.h.Pop()
				q.mutex.Unlock()
				return entry.Value, true
			}
			// 到期时间,使用time.NewTimer()才能够调用Stop(),从而释放定时器
			expired = time.NewTimer(time.Until(entry.Expired))
		}
		// 避免被之前的元素假唤醒
		select {
		case <-q.wakeup:
		default:
		}
		q.mutex.Unlock()

		// 不为空,需要同时等待元素到期
                // 并且除非expired到期,否则都需要关闭expired避免泄露
		if expired != nil {
			select {
			case <-q.wakeup: // 新的更快到期元素
				expired.Stop()
			case <-expired.C: // 首元素到期
			case <-ctx.Done(): // 被关闭
				expired.Stop()
				var t T
				return t, false
			}
		} else {
			select {
			case <-q.wakeup: // 新的更快到期元素
			case <-ctx.Done(): // 被关闭
				var t T
				return t, false
			}
		}
	}
}

Channel方式阻塞读取

Golang里面可以使用Channel进行流式消费,因此简单包装一个Channel形式的阻塞读取接口,给通道一点缓冲区大小可以带来更好的性能。

// 返回一个通道,输出到期元素
// size是通道缓存大小
func (q *DelayQueue[T]) Channel(ctx context.Context, size int) <-chan T {
	out := make(chan T, size)
	go func() {
		for {
			entry, ok := q.Take(ctx)
			if !ok {
				return
			}
			out <- entry
		}
	}()
	return out
}

使用方式

for entry := range q.Channel(context.Background(), 10) {
    // do something
}

性能测试

这里进行一个简单的性能测试,也就是先添加元素,然后等待到期后全部拿出来。

func BenchmarkPushAndTake(b *testing.B) {
	q := New[int]()
	b.ResetTimer()

        // 添加元素
	for i := 0; i < b.N; i++ {
		q.Push(i, time.Duration(i))
	}

        // 等待全部元素到期
	b.StopTimer()
	time.Sleep(time.Duration(b.N))
	b.StartTimer()

        // 获取元素
	for i := 0; i < b.N; i++ {
		_, ok := q.Take(context.Background())
		if !ok {
			b.Errorf("want %v, but %v", true, ok)
		}
	}
}

测试结果:

Benchmark-8      2331534               476.8 ns/op            76 B/op          1 allocs/op

总结

堆实现的延迟队列是一种实现起来比较简单的定时器(当然阻塞读取Take()是比较复杂的),由于时间复杂度是O(log(n)),因此可以满足定时任务数量不是特别多的场景。堆实现的延迟队列也是可以随机删除元素的,可以根据具体任务选择是否实现。如果对定时器性能要求比较敏感的话可以选择使用时间轮实现定时器,它可以在O(1)的时间复杂度添加和删除一个定时器,不过实现起来比较复杂(挖个坑,下篇文章实现)。

到此这篇关于基于Golang实现延迟队列(DelayQueue)的文章就介绍到这了,更多相关Golang延迟队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Golang实现基于Redis的可靠延迟队列

    目录 前言 原理详解 pending2ReadyScript ready2UnackScript unack2RetryScript ack consume 前言 在之前探讨延时队列的文章中我们提到了 redisson delayqueue 使用 redis 的有序集合结构实现延时队列,遗憾的是 go 语言社区中并无类似的库.不过问题不大,没有轮子我们自己造. 本文的完整代码实现在hdt3213/delayqueue,可以直接 go get 安装使用. 使用有序集合结构实现延时队列的方法已经广为

  • Go+Redis实现延迟队列实操

    目录 前言 简单的实现 定义消息 Push Consume 存在的问题 多消费者实现 定义消息 Push Consume 存在的问题 总结 前言 延迟队列是一种非常使用的数据结构,我们经常有需要延迟推送处理消息的场景,比如延迟60秒发送短信,延迟30分钟关闭订单,消息消费失败延迟重试等等. 一般我们实现延迟消息都需要依赖底层的有序结构,比如堆,而Redis刚好提供了zset这种数据类型,它的底层实现是哈希表+跳表,也是一种有序的结构,所以这篇文章主要是使用Go+Redis来实现延迟队列. 当然R

  • 基于Golang实现延迟队列(DelayQueue)

    目录 背景 原理 堆 随机删除 重置元素到期时间 Golang实现 数据结构 实现原理 添加元素 阻塞获取元素 Channel方式阻塞读取 性能测试 总结 背景 延迟队列是一种特殊的队列,元素入队时需要指定到期时间(或延迟时间),从队头出队的元素必须是已经到期的,而且最先到期的元素最先出队,也就是队列里面的元素是按照到期时间排序的,添加元素和从队头出队的时间复杂度是O(log(n)). 由于以上性质,延迟队列一般可以用于以下场景(定时任务.延迟任务): 缓存:用户淘汰过期元素 通知:在指定时间通

  • 详解Java线程池队列中的延迟队列DelayQueue

    目录 DelayQueue延迟队列 DelayQueue使用场景 DelayQueue属性 DelayQueue构造方法 实现Delayed接口使用示例 DelayQueue总结 在阻塞队里中,除了对元素进行增加和删除外,我们可以把元素的删除做一个延迟的处理,即使用DelayQueue的方法.本文就来和大家聊聊Java线程池队列中的DelayQueue—延迟队列 public enum QueueTypeEnum { ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQ

  • 基于Redis延迟队列的实现代码

    使用场景 工作中大家往往会遇到类似的场景: 1.对于红包场景,账户 A 对账户 B 发出红包通常在 1 天后会自动归还到原账户. 2.对于实时支付场景,如果账户 A 对商户 S 付款 100 元,5秒后没有收到支付方回调将自动取消订单. 解决方案分析 方案一: 采用通过定时任务采用数据库/非关系型数据库轮询方案. 优点: 1. 实现简单,对于项目前期这样是最容易的解决方案. 缺点: 1. DB 有效使用率低,需要将一部分的数据库的QPS分配给 JOB 的无效轮询. 2. 服务资源浪费,因为轮询需

  • Redis实现延迟队列的全流程详解

    目录 1.前言 1.1.什么是延迟队列 1.2.应用场景 1.3.为什么要使用延迟队列 2.Redis sorted set 3.Redis 过期键监听回调 4.Quartz定时任务 5.DelayQueue 延迟队列 6.RabbitMQ 延时队列 7.时间轮 1.前言 1.1.什么是延迟队列 延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理.从某种意义上来讲

  • Spring Boot与RabbitMQ结合实现延迟队列的示例

    背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理.这是就可以使用延时队列将订单信息发送到延时队列. 场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作.这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备. 延迟队列能做什么? 延迟队列多用于需

  • SpringBoot集成Redisson实现延迟队列的场景分析

    使用场景 1.下单成功,30分钟未支付.支付超时,自动取消订单 2.订单签收,签收后7天未进行评价.订单超时未评价,系统默认好评 3.下单成功,商家5分钟未接单,订单取消 4.配送超时,推送短信提醒 ...... 对于延时比较长的场景.实时性不高的场景,我们可以采用任务调度的方式定时轮询处理.如:xxl-job 今天我们采用一种比较简单.轻量级的方式,使用 Redis 的延迟队列来进行处理.当然有更好的解决方案,可根据公司的技术选型和业务体系选择最优方案.如:使用消息中间件Kafka.Rabbi

  • 基于golang的简单分布式延时队列服务的实现

    一.引言 背景 我们在做系统时,很多时候是处理实时的任务,请求来了马上就处理,然后立刻给用户以反馈.但有时也会遇到非实时的任务,比如确定的时间点发布重要公告.或者需要在用户做了一件事情的X分钟/Y小时后,EG: "PM:我们需要在这个用户通话开始10分钟后给予提醒给他们发送奖励" 对其特定动作,比如通知.发券等等.一般我接触到的解决方法中在比较小的服务里都会自己维护一个backend,但是随着这种backend和server增多,这种方法很大程度和本身业务耦合在一起,所以这时需要一个延

  • java并发中DelayQueue延迟队列原理剖析

    介绍 DelayQueue队列是一个延迟队列,DelayQueue中存放的元素必须实现Delayed接口的元素,实现接口后相当于是每个元素都有个过期时间,当队列进行take获取元素时,先要判断元素有没有过期,只有过期的元素才能出队操作,没有过期的队列需要等待剩余过期时间才能进行出队操作. 源码分析 DelayQueue队列内部使用了PriorityQueue优先队列来进行存放数据,它采用的是二叉堆进行的优先队列,使用ReentrantLock锁来控制线程同步,由于内部元素是采用的Priority

  • GoLang RabbitMQ TTL与死信队列以及延迟队列详细讲解

    目录 TTL 死信队列 延迟队列 Go实现延迟队列 TTL TTL 全称 Time To Live(存活时间/过期时间).当消息到达存活时间后,还没有被消费,就会被自动清除.RabbitMQ可以设置两种过期时间: 对消息设置过期时间. 对整个队列(Queue)设置过期时间. 如何设置 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期. 设置消息过期时间使用参数:expiration,单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这

随机推荐