基于golang的简单分布式延时队列服务的实现

一、引言

背景

我们在做系统时,很多时候是处理实时的任务,请求来了马上就处理,然后立刻给用户以反馈。但有时也会遇到非实时的任务,比如确定的时间点发布重要公告。或者需要在用户做了一件事情的X分钟/Y小时后,EG:

“PM:我们需要在这个用户通话开始10分钟后给予提醒给他们发送奖励”

对其特定动作,比如通知、发券等等。一般我接触到的解决方法中在比较小的服务里都会自己维护一个backend,但是随着这种backend和server增多,这种方法很大程度和本身业务耦合在一起,所以这时需要一个延时队列服务。

名词解释

topic_list队列:每一个来的延时请求都应该又一个延时主题参考kafka,在逻辑上划分出一个队列出来每个业务分开处理;

topic_info队列:每一个队列topic都存在一个新的队列里,每次扫描topic信息检测新的topic建立与销毁管理服务协程数量;

offset:当前消费的进度;

new_offset:新消费的进度,预备更迭offset;

topic_offset_lock:分布式锁。

二、设计目标

 功能清单

1、延时信息添加接口基于http调用

2、拥有存储队列特性,可保存近3天内的队列消费数据

3、提供消费功能

4、延时通知

性能指标

预计接口的调用量:单秒单类任务数3500,多秒单类任务数1300

压测结果:

简单压测

wrk写入qps:259.3s 写入9000条记录 单线程 无并发

触发性能/准确率:单秒1000,在测试机无延长。单秒3000时,偶尔出现1-2秒延迟。受内存和cpu影响。

三、系统设计

交互流程

时序图

本设计基于http接口调用,当向topic存在的队列中添加消息的时候,消息会被添加到相应topic队列的末尾储存,当添加到不存在的相应topic队列时,首先建立新topic队列,当定时器触发的时候或者分布式锁,抢到锁的实例先获得相应队列的offset,设置新offset,就可以释放锁了让给其他实例争抢,弹出队列头一定数量元素,然后拿到offset段的实例去存储中拿详细信息,在协程中处理,主要协程等待下次触发。然后添加协程去监控触发。

模块划分

1、队列存储模块

1·delay下的delay.base模块,主要负责接收写请求,将队列信息写入存储,不负责backend逻辑,调用存储模块

2、backend模块。delay下的delay.backend模块,负责时间触发扫描对应的topic队列,调用存储模块,主要负责访问读取存储模块,调用callback模块

1·扫描topic添加groutine

2·扫描topic_list消费信息

3·扫描topic_list如果一定时间没有消费到则关闭groutine

3、callback模块,主要负责发送已经到时间的数据,向相应服务通知

3、存储模块

1·分布式锁模块,系统多机部署,保证每次消费的唯一性,对每次topic消费的offset段进行上锁offset到new_offset段单机独享

2·topic管理列表,管理topic数量控制协程数

3·topic_list,消息队列

4·topic_info,消息实体,可能需要回调中会携带一些信息统一处理

4、唯一号生成模块。

五、缓存设计

目前使用全缓存模式

key设计:

topic管理list key: XX:DELAY_TOPIC_LIST type:list

topic_list key: XX:DELAY_SIMPLE_TOPIC_TASK-%s(根据topic分key) type:zset

topic_info key: XX:DELAY_REALL_TOPIC_TASK-%s(根据topic分key) type:hash

topic_offset key: XX:DELAY_TOPIC_OFFSET-%s(根据topic分key) type:string

topic_lock key: xx:DELAY_TOPIC_RELOAD_LOCK-%s(根据topic分key) type:string

六、接口设计

delay.task.addv1 (延时队列添加v1)

请求示例

curl -d
'{
  "topic": "xxx", 								// 业务topic
  "timing_moment": ,							    // 单位秒,要定时时刻
  "content": "{}"								// 消息体,json串
}'
'http://127.0.0.1:xxxx/delay/task/add'

返回示例

{
  "dm_error": 0,
  "error_msg": "操作成功",
  "task_id":112345465765
}

pull回调方式返回(v2不再支持)

请求示例

curl -d
'{
  "topic": "xxxx", 								// 业务topic
  "task_id":1324568798765							// taskid,选填,有则返回特定消息
}'
'http://127.0.0.1:xxxx/delay/task/pull'

返回示例

{
  "dm_error": 0,
  "error_msg": "操作成功"
  "content":"{"\xxx"\}"
}

delay.task.addv2 (延时队列添加v2)

请求示例

curl -d
'{
  "topic": "xxx", 						// 业务topic
  "timing_moment": ,						// 单位秒,要定时时刻
  "content": "{                        // 消息内容(json string)
	"sn":"message.call",                  // 服务发现名字(或为配置服务名)
	"url":"/ev/tp/xxxx",                  // 回调url
	"xxx":"xxx"                       // 其他字段
  }"
}'
'http://127.0.0.1:xxxx/delay/task/add'

示例

curl -d '{
  "topic":"xxxx_push",
  "content":"{
    "uid":"111111",
    "sn":"other.server",
    "url":"/xxxx/callback",
    "msg_type":"gift",
  }",
  "timing_moment":1565700615
}'
http://127.0.0.1:xxxx/delay/task/add

返回示例

{
  "dm_error": 0,
  "error_msg": "操作成功",
  "task_id":112345465765
}

七、MQ设计(v2不再支持)

关于kafka消费方式返回:

topic: delay_base_push

固定返回格式
{
  "topic": "xxxx",								// 业务topic
  "content": "{}"								// 单条生产消息content
}

八、其他设计

唯一号设计

调用存储模块,利用redis的自增结合逻辑生成唯一号具体逻辑如下:

func (c *CacheManager) OperGenTaskid() (uint64, error) {
	now := time.Now().Unix()
	key := c.getDelayTaskIdKey()
	reply, err := c.DelayRds.Do("INCR", key)
	if err != nil {
		log.Errorf("genTaskid INCR key:%s, error:%s", key, err)
		return 0, err
	}
	version := reply.(int64)
	if version == 1 {
    //默认认为1秒能创建100个任务
		c.DelayRds.Expire(key, time.Duration(100)*time.Second)
	}
	incrNum := version % 10000
	taskId := (uint64(now)*10000 + uint64(incrNum))
	log.Debugf("genTaskid INCR key:%s, taskId:%d", key, taskId)
	return taskId, nil
}

分布式锁设计

func (c *CacheManager) SetDelayTopicLock(ctx context.Context, topic string) (bool, error) {
	key := c.getDelayTopicReloadLockKey(topic)
	reply, err := c.DelayRds.Do("SET", key, "lock", "NX", "EX", 2)
	if err != nil {
		log.Errorf("SetDelayTopicLock SETNX key:%s, cal:%v, error:%s", key, "lock", err)
		return false, err
	}
	if reply == nil {
		return false, nil
	}
	log.Debugf("SetDelayTopicLock SETNXEX topic:%s lock:%d", topic, false)
	return true, nil
}

九、设计考虑

健壮性

熔断策略:

这版设计中有很多不足之处,当redis不可访问时,请求将大量积压给机器或者实例带来压力,导致其他服务不可用,所以采取降级策略(降级策略也有不足);在请求redis时加入重试,当重试次数多于报警次数,会记录一个原子操作atomic.StoreInt32(&stopFlag,1),其中stopFlag为一个全局的变量,在atomic.LoadInt32(&stopFlag)后,stopFlag的值为1则暂时不请求redis,同时记录当前时间,加入定时器,熔断器分为三个级别,开,关,半开,当定时器结束后stopFlag=2第二个定时将为半开状态计时,有概率访问redis,当成功次数到达阈值stopFlag=0,否则stopFlag=1继续计时

不足

1、调用time定时

通常golang 写循环执行的定时任务大概用三种实现方式:

1、time.Sleep方法:

for {
  time.Sleep(time.Second)
  fmt.Println("test")
}

2、time.Tick函数:

t1:=time.Tick(3*time.Second)
for {
  select {
  case <-t1:
    fmt.Println("test")
  }
}

3、其中Tick定时任务,也可以先使用time.Ticker函数获取Ticker结构体,然后进行阻塞监听信息,这种方式可以手动选择停止定时任务,在停止任务时,减少对内存的浪费。

t:=time.NewTicker(time.Second)
for {
  select {
  case <-t.C:
    fmt.Println("test")
    t.Stop()
  }
}

在最开始以为sleep是单独处理直接停掉了这个协程,所以第一版用的也是sleep,但是在收集资料后发现这几种方式都创建了timer,并加入了定时任务处理协程。实际上这两个函数产生的timer都放入了同一个timer堆(golang时间轮),都在定时任务处理协程中等待被处理。Tick,Sleep,time.After函数都使用的timer结构体,都会被放在同一个协程中统一处理,这样看起来使用Tick,Sleep并没有什么区别。实际上是有区别的,本文不是讨论golang定时执行任务time.sleep和time.tick的优劣,以后会在后续文章进行探讨。使用channel阻塞协程完成定时任务比较灵活,可以结合select设置超时时间以及默认执行方法,而且可以设置timer的主动关闭,所以,建议使用time.Tick完成定时任务。

2、存储模块问题

目前是全缓存,没有DB参与,首先redis(codis)的高可用是个问题,在熔断之后采取“不作为”的判断也是有问题的,所以对未来展望,首先是:

1·单机的数据结构使用多时间轮。为了减少数据的路程,将load数据的过程异步加载到机器,减少网络io所造成的时间损耗。同时也是减少对redis的依赖

2·引入ZooKeeper或者添加集群备份,leader。保证集群中至少有两台机器load一个topic的数据,leader可以协调消费保证高可用

到此这篇关于基于golang的简单分布式延时队列服务的实现的文章就介绍到这了,更多相关golang 分布式延时队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • golang实现redis的延时消息队列功能示例

    前言 在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现.本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo. 提前准备 安装redis, redis-go 因为用的是macOS, 直接 $ brew install redis $ go get github.com/garyburd/redigo/redis 又因为比较懒,生成任

  • 基于golang的简单分布式延时队列服务的实现

    一.引言 背景 我们在做系统时,很多时候是处理实时的任务,请求来了马上就处理,然后立刻给用户以反馈.但有时也会遇到非实时的任务,比如确定的时间点发布重要公告.或者需要在用户做了一件事情的X分钟/Y小时后,EG: "PM:我们需要在这个用户通话开始10分钟后给予提醒给他们发送奖励" 对其特定动作,比如通知.发券等等.一般我接触到的解决方法中在比较小的服务里都会自己维护一个backend,但是随着这种backend和server增多,这种方法很大程度和本身业务耦合在一起,所以这时需要一个延

  • Redisson 分布式延时队列 RedissonDelayedQueue 运行流程

    目录 前言 基本使用 内部数据结构介绍 基本流程 发送延时消息 获取延时消息 初始化延时队列 总结 前言 因为工作中需要用到分布式的延时队列,调研了一段时间,选择使用 RedissonDelayedQueue,为了搞清楚内部运行流程,特记录下来. 总体流程大概是图中的这个样子,初看一眼有点不知从何下手,接下来我会通过以下几点来分析流程,相信看完本文你能了解整个运行流程. 基本使用 内部数据结构介绍 基本流程 发送延时消息 获取延时消息 初始化延时队列 基本使用 发送延迟消息代码如下,发送了一条延

  • golang实现简单的udp协议服务端与客户端示例

    本文实例讲述了golang实现简单的udp协议服务端与客户端.分享给大家供大家参考,具体如下: 其实udp没有什么服务端和客户端的概念了,只是一个发一个收而已,只是这样较方便能识别和理解. 服务端: 复制代码 代码如下: package main import (     "fmt"     "net" ) func main() {     // 创建监听     socket, err := net.ListenUDP("udp4", &am

  • 一口气说出Java 6种延时队列的实现方法(面试官也得服)

    五一期间原计划是写两篇文章,看一本技术类书籍,结果这五天由于自律性过于差,禁不住各种诱惑,我连电脑都没打开过,计划完美宣告失败.所以在这能看出和大佬之间的差距,人家没白没夜的更文,比你优秀的人比你更努力,难以望其项背,真是让我自愧不如. 知耻而后勇,这不逼着自己又学起来了,个人比较喜欢一些实践类的东西,既学习到知识又能让技术落地,能搞出个demo最好,本来不知道该分享什么主题,好在最近项目紧急招人中,而我有幸做了回面试官,就给大家整理分享一道面试题:"如何实现延时队列?". 下边会介绍

  • golang实现简单的tcp数据传输

    目录 前言 首先介绍什么是Tcp协议 什么是可靠数据传输? TCP的快速重传机制 简单介绍TCP连接的三次握手和四次挥手 三次握手 四次挥手 golang实现简单的tcp连接建立 服务端 完整代码 客户端 完整代码 前言 通过golang实现Tcp的连接与信息传输 本文主要介绍Tcp协议以及如何使用golang来建立一个简单的tcp连接服务,并且实现信息的传输. 首先介绍什么是Tcp协议 Tcp协议是传输层的一个可靠数据传输协议,Tcp协议有以下几个特点: 点对点的发送:一个发送方,一个接收方

  • 基于Redis实现延时队列的优化方案小结

    目录 一.延时队列的应用 二.延时队列的实现 三.总结 一.延时队列的应用 近期在开发部门的新项目,其中有个关键功能就是智能推送,即根据用户行为在特定的时间点向用户推送相应的提醒消息,比如以下业务场景: 在用户点击充值项后,半小时内未充值,向用户推送充值未完成提醒. 在用户最近一次阅读行为2小时后,向用户推送继续阅读提醒. 在用户新注册或退出应用N分钟后,向用户推送合适的推荐消息. … 上述场景的共同特征就是在某事件触发后延迟一定时间后再执行特定任务,若事件触发时间点可知,则上述逻辑也可等价于在

  • 基于Golang实现延迟队列(DelayQueue)

    目录 背景 原理 堆 随机删除 重置元素到期时间 Golang实现 数据结构 实现原理 添加元素 阻塞获取元素 Channel方式阻塞读取 性能测试 总结 背景 延迟队列是一种特殊的队列,元素入队时需要指定到期时间(或延迟时间),从队头出队的元素必须是已经到期的,而且最先到期的元素最先出队,也就是队列里面的元素是按照到期时间排序的,添加元素和从队头出队的时间复杂度是O(log(n)). 由于以上性质,延迟队列一般可以用于以下场景(定时任务.延迟任务): 缓存:用户淘汰过期元素 通知:在指定时间通

  • docker 基于golang镜像构建 ssh服务的方法

    下面给大家介绍下docker 基于golang镜像构建 ssh服务的代码,具体内容如下所示: # golang:latest镜像 FROM ee23292e2826 # 作者 MAINTAINER dechao@qq.com # 添加Golang环境变量 ENV GOPROXY https://goproxy.cn,direct ENV GO111MODULE on # 配置apt-get源 ADD sources.list /etc/apt/ # 更新apt-get源 安装ssh服务 修改ro

  • Golang中优秀的消息队列NSQ基础安装及使用详解

    前言 NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息.NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障.故障容错.高可用性以及能够保证消息的可靠传递的特征,是一个成熟的.已在大规模生成环境下应用的产品. 背景介绍 在服务器最开始的时候,基本上在一台主机上就能解决大部分问题,所以一般架构设计如下: 但是,突然某一天,来了一个新需求,我们服务器上不只是简单的储存一些文本信息,我们需要储存图片甚至视频,显然直接在一台主机上再部署一个

  • 关于golang监听rabbitmq消息队列任务断线自动重连接的问题

    golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务 需求背景: goalng常驻内存任务脚本监听rbmq执行任务 任务脚本由supervisor来管理 当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态 假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了 如果是短时间的停止重启,supervisor是可以即时唤醒该程序.如果服务器长时间没有恢复正常运行,程序就会出

随机推荐