GoLang RabbitMQ TTL与死信队列以及延迟队列详细讲解

目录
  • TTL
  • 死信队列
  • 延迟队列
  • Go实现延迟队列

TTL

TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,就会被自动清除。RabbitMQ可以设置两种过期时间:

  • 对消息设置过期时间。
  • 对整个队列(Queue)设置过期时间。

如何设置

  • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
  • 设置消息过期时间使用参数:expiration,单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

如果两者都设置了过期时间,以时间短的为准。

在streadway/amqp库提供的API中设置TTL

设置队列过期时间:

QueueDeclare函数的最后一个参数是一个amqp.Table类型,它的声明是这样的: type Table map[string]interface{},其实是一个可以用于设置队列属性的map。

// 设置Queue ttl为5s
args := amqp.Table{"x-message-ttl": 5000}
q, e := ch.QueueDeclare(
		name,  //队列名
		false,
		true,
		false,
		false,
		args,   //设置Queue ttl为5s
	)

设置消息过期时间:

e = q.channel.Publish(
		"",
		queue,
		false,
		false,
		amqp.Publishing{
			// 设置当前发送消息的过期时间为3s
			Expiration: "3000",
			ReplyTo:    q.Name,
			Body:       []byte(str),
})

死信队列

当一个队列中存在死信时,RabbitMQ会把消息发送给DLX(死信交换机),进而被路由到另一个队列中,这个队列就叫做死信队列。

死信就是指没有被消费者消费成功的消息,一条消息变成死信有三种情况:

  • 如果给消息队列设置了最大容量x-max-length,队列已经满了,后续再进来的消息会溢出,无法被队列接收就会变成死信。
  • 消息接收时被拒绝会变成死信,例如调用Reject()函数,并设置requeuefalse
  • 如果给消息队列设置了消息的过期时间x-message-ttl,或者发送消息时设置了当前消息的过期时间,当消息在队列中的存活时间大于过期时间时,就会变成死信。

如何将死信发送给DLX

为队列设置参数即可,将要发送死信的队列配置以下两个参数:

x-dead-letter-exchange: [DLX的名字]
x-dead-letter-routing-key: [DLX的routing key]

下面是死信队列的工作流程:

延迟队列

延时队列就是用来存放需要在指定时间被处理的元素的队列,通常可以用来处理一些具有过期性操作的业务。

比如十分钟内未支付则取消订单,原先这个功能我们可以使用定时器来实现,即每隔一段时间去数据库对比未支付订单的当前时间与订单创建时间。但是定时器的时长难以确定,太长会导致订单失效时间出现误差,太短则会增大数据库压力。

实现

在RabbitMQ中没有提供延迟队列的功能,但是我们可以使用:TTL+死信队列组合的方式来实现延迟队列的效果。

下面是实现延迟队列的流程图:

Go实现延迟队列

创建一个死信交换机

再创建一个死信队列

将死信队列绑定至死信交换机

创建一个正常队列,并指定消息过期后被发往的死信交换机

生产者

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/")
	ch, _ := conn.Channel()
	body := "This is a delayed message, created at " + time.Now().Format("2006-01-02 15:04:05")
	fmt.Println(body)
	// 发送消息到queue.normal队列中
	ch.Publish("", "queue.normal", false, false, amqp.Publishing{
		Body:       []byte(body),
		Expiration: "10000", // 设置TTL为10秒
	})
	defer conn.Close()
	defer ch.Close()
}

消费者

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/")
	ch, _ := conn.Channel()
	//监听queue.dlx队列
	msgs, _ := ch.Consume(
		"queue.dlx",
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	for d := range msgs {
		fmt.Printf("receive: %s\n", d.Body) // 收到消息,业务处理
	}
}

流程说明

生产者生产一条消息,然后指定消息的TTL为10s,接着将消息发给普通队列,消息在普通队列中过期后被发往死信交换机,死信交换机将这条消息路由给延迟队列。消费者一直在监听到延迟队列中的死信后,开始消费。

到此这篇关于GoLang RabbitMQ TTL与死信队列以及延迟队列详细讲解的文章就介绍到这了,更多相关GoLang RabbitMQ内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • golang gin 监听rabbitmq队列无限消费的案例代码

    golang gin 监听rabbitmq队列无限消费 连接rabbitmq package database import ( "github.com/streadway/amqp" "log" "reflect" "yy-data-processing/common/config" ) var RabbitConn *amqp.Connection var RabbitChannel *amqp.Channel func

  • Golang rabbitMQ生产者消费者实现示例

    目录 消费者 生产者 消费者 package main import ( "fmt" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { fmt.Println("%s: %s", msg, err) } } // 只能在安装 rabbitmq 的服务器上操作 func main() { conn, err := amqp.

  • GoLang RabbitMQ实现六种工作模式示例

    目录 六种工作模式介绍 1.简单(Simple)模式 2.工作队列(Work Queue)模式 3.发布/订阅(Pub/Sub)模式 4.路由(Routing)模式 5.通配符(Tpoic)模式 Go语言的实现 安装操作库 简单(Simple)模式 工作队列(Work Queue)模式 发布/订阅(Pub/Sub)模式 路由(Routing)模式 通配符(Tpoic)模式 六种工作模式介绍 1.简单(Simple)模式 P:生产者,也就是要发送消息的程序. C:消费者:消息的接收者,会一直等待消

  • 关于golang监听rabbitmq消息队列任务断线自动重连接的问题

    golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务 需求背景: goalng常驻内存任务脚本监听rbmq执行任务 任务脚本由supervisor来管理 当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态 假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了 如果是短时间的停止重启,supervisor是可以即时唤醒该程序.如果服务器长时间没有恢复正常运行,程序就会出

  • GoLang RabbitMQ TTL与死信队列以及延迟队列详细讲解

    目录 TTL 死信队列 延迟队列 Go实现延迟队列 TTL TTL 全称 Time To Live(存活时间/过期时间).当消息到达存活时间后,还没有被消费,就会被自动清除.RabbitMQ可以设置两种过期时间: 对消息设置过期时间. 对整个队列(Queue)设置过期时间. 如何设置 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期. 设置消息过期时间使用参数:expiration,单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这

  • RabbitMQ死信机制实现延迟队列的实战

    目录 延迟队列 应用场景 Time To Live(TTL) Dead Letter Exchanges(DLX) 延迟队列 延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 应用场景 三方支付,扫码支付调用上游的扫码接口,当扫码有效期过后去调用查询接口查询结果.实现方式:每当一笔扫码支付请求后,立即将此订单号放入延迟队列中(RabbitMQ),队列过期时间为二维码有效期,此队列

  • 详解RabbitMQ中死信队列和延迟队列的使用详解

    目录 简介 死信队列 简介 示例 延迟队列 简介 使用场景 简介 本文介绍RabbitMQ的死信队列和延迟队列. 本内容也是Java后端面试中常见的问题. 死信队列 简介 DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列. 以下几种情况会导致消息变成死信: 消息被拒绝(Basic.Reject/Ba

  • SpringBoot整合RabbitMQ处理死信队列和延迟队列

    目录 简介 实例代码 路由配置 控制器 发送器 接收器 application.yml 实例测试 简介 说明 本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列. RabbitMQ消息简介 RabbitMQ的消息默认不会超时. 什么是死信队列?什么是延迟队列? 死信队列: DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,

  • RabbitMQ延迟队列及消息延迟推送实现详解

    这篇文章主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能. 12306 购票支付确认页面.我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 3

  • 如何通过Python实现RabbitMQ延迟队列

    最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好.因为系统本身一直在用RabbitMQ做异步处理任务的中间件,所以想到是否可以利用RabbitMQ实现延迟队列.功夫不负有心人,RabbitMQ虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1.Time To Live(TTL)消息超时机制:2.Dead Letter Exchanges(DLX)死信队列.下面将具体描述实现原理以及实现代 延迟

  • RabbitMQ 实现延迟队列的两种方式详解

    目录 1. 用插件 1.1 安装插件 1.2 消息收发 2. DLX 实现延迟队列 2.1 延迟队列实现思路 2.2 案例 3. 小结 定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备份,这种固定时间的定时任务我们一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,因为开始时间不确定,我们开发中有的时候也会遇到类似的需求,例如: 在电商项目中,当我们下单之后,一般需要

  • Redis实现延迟队列的全流程详解

    目录 1.前言 1.1.什么是延迟队列 1.2.应用场景 1.3.为什么要使用延迟队列 2.Redis sorted set 3.Redis 过期键监听回调 4.Quartz定时任务 5.DelayQueue 延迟队列 6.RabbitMQ 延时队列 7.时间轮 1.前言 1.1.什么是延迟队列 延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理.从某种意义上来讲

  • SpringBoot集成Redisson实现延迟队列的场景分析

    使用场景 1.下单成功,30分钟未支付.支付超时,自动取消订单 2.订单签收,签收后7天未进行评价.订单超时未评价,系统默认好评 3.下单成功,商家5分钟未接单,订单取消 4.配送超时,推送短信提醒 ...... 对于延时比较长的场景.实时性不高的场景,我们可以采用任务调度的方式定时轮询处理.如:xxl-job 今天我们采用一种比较简单.轻量级的方式,使用 Redis 的延迟队列来进行处理.当然有更好的解决方案,可根据公司的技术选型和业务体系选择最优方案.如:使用消息中间件Kafka.Rabbi

  • 基于Redis延迟队列的实现代码

    使用场景 工作中大家往往会遇到类似的场景: 1.对于红包场景,账户 A 对账户 B 发出红包通常在 1 天后会自动归还到原账户. 2.对于实时支付场景,如果账户 A 对商户 S 付款 100 元,5秒后没有收到支付方回调将自动取消订单. 解决方案分析 方案一: 采用通过定时任务采用数据库/非关系型数据库轮询方案. 优点: 1. 实现简单,对于项目前期这样是最容易的解决方案. 缺点: 1. DB 有效使用率低,需要将一部分的数据库的QPS分配给 JOB 的无效轮询. 2. 服务资源浪费,因为轮询需

随机推荐