RabbitMq如何做到消息的可靠性投递

目录
  • 如何保证消息不丢失
  • Go 实现
    • 安装操作库
    • 发送端的确认
    • 消费端的确认

如何保证消息不丢失

在使用RabbitMQ的时候,我们需要保证消息不能丢失,消息从生产者生产出来一直到消费者消费成功,这条链路是这样的:

消息的可靠投递分为了两大内容:发送端的确认(p->broker和exchange->queue)和消费端的确认(queue->c)。

发送端的确认

Rabbit提供了两种方式来保证发送端的消息可靠性投递:confirm 确认模式

和return 退回模式。

confirm 确认模式:消息从 producer 到达 exchange 则会给 producer 发送一个应答,我们需要开启confirm模式,才能接收到这条应答。开启方式是将Channel.Confirm(noWait bool)参数设置为false,表示同意发送者将当前channel信道设置为confirm模式。

return 退回模式:消息从 exchange–>queue 投递失败,会将消息退回给producer。

消费端的确认

消息从Queue发送到消费端之后,消费端会发送一个确认消息:Consumer Ack,有两种确认方式:自动确认和手动确认。

在编码中,关于消息的确认方式,我们需要在消费者端调用Consumer函数时,设置第三个参数:autoAck是false还是true(false表示手动,true表示自动)。

自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。

但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用ch.Ack(false),手动签收,如果出现异常,则调用d.Reject(true)让其自动重新发送消息。

Go 实现

安装操作库

安装API库

Go可以使用streadway/amqp库来操作rabbit,使用以下命令来安装:

go get github.com/streadway/amqp

封装rabbitmq

接下来我们对streadway/amqp库的内容进行一个二次封装,封装为一个rabbitmq.go文件:

package rabbitmq
import (
	"encoding/json"
	"github.com/streadway/amqp"
	"log"
)
// RabbitMQ RabbitMQ结构
type RabbitMQ struct {
	channel  *amqp.Channel
	Name     string
	exchange string
}
// Connect 连接服务器
func Connect(s string) *RabbitMQ {
	//连接rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "连接Rabbitmq服务器失败!")
	ch, e := conn.Channel()
	failOnError(e, "无法打开频道!")
	mq := new(RabbitMQ)
	mq.channel = ch
	return mq
}
// New 初始化消息队列
//第一个参数:rabbitmq服务器的链接,第二个参数:队列名字
func New(s string, name string) *RabbitMQ {
	//连接rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "连接Rabbitmq服务器失败!")
	ch, e := conn.Channel()
	failOnError(e, "无法打开频道!")
	q, e := ch.QueueDeclare(
		name,  //队列名
		false, //是否开启持久化
		true,  //不使用时删除
		false, //排他
		false, //不等待
		nil,   //参数
	)
	failOnError(e, "初始化消息队列失败!")
	mq := new(RabbitMQ)
	mq.channel = ch
	mq.Name = q.Name
	return mq
}
// QueueDeclare 声明queue
func (q *RabbitMQ) QueueDeclare(queue string) {
	_, e := q.channel.QueueDeclare(queue, false, true, false, false, nil)
	failOnError(e, "声明queue失败!")
}
// QueueDelete 删除queue
func (q *RabbitMQ) QueueDelete(queue string) {
	_, e := q.channel.QueueDelete(queue, false, true, false)
	failOnError(e, "删除queue失败!")
}
// Qos 配置queue参数
func (q *RabbitMQ) Qos() {
	e := q.channel.Qos(1, 0, false)
	failOnError(e, "无法设置QoS")
}
// NewExchange 初始化交换机
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
func NewExchange(s string, name string, typename string) {
	//连接rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "连接Rabbitmq服务器失败!")
	ch, e := conn.Channel()
	failOnError(e, "无法打开频道!")
	e = ch.ExchangeDeclare(
		name,     // name
		typename, // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(e, "初始化交换机失败!")
}
// ExchangeDelete 删除交换机
func (q *RabbitMQ) ExchangeDelete(exchange string) {
	e := q.channel.ExchangeDelete(exchange, false, true)
	failOnError(e, "删除交换机失败!")
}
// Bind 绑定消息队列到exchange
func (q *RabbitMQ) Bind(exchange string, key string) {
	e := q.channel.QueueBind(
		q.Name,
		key,
		exchange,
		false,
		nil,
	)
	failOnError(e, "绑定队列失败!")
	q.exchange = exchange
}
// Send 向消息队列发送消息
//Send方法可以往某个消息队列发送消息
func (q *RabbitMQ) Send(queue string, body interface{}) {
	str, e := json.Marshal(body)
	failOnError(e, "消息序列化失败!")
	e = q.channel.Publish(
		"",    //交换
		queue, //路由键
		false, //必填
		false, //立即
		amqp.Publishing{
			ReplyTo: q.Name,
			Body:    []byte(str),
		})
	msg := "向队列:" + q.Name + "发送消息失败!"
	failOnError(e, msg)
}
// Publish 向exchange发送消息
//Publish方法可以往某个exchange发送消息
func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) {
	str, e := json.Marshal(body)
	failOnError(e, "消息序列化失败!")
	e = q.channel.Publish(
		exchange,
		key,
		false,
		false,
		amqp.Publishing{ReplyTo: q.Name,
			Body: []byte(str)},
	)
	failOnError(e, "向交换机发送消息失败!")
}
// Consume 接收某个消息队列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
	c, e := q.channel.Consume(
		q.Name, //指定从哪个队列中接收消息
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	failOnError(e, "接收消息失败!")
	return c
}
// Close 关闭队列连接
func (q *RabbitMQ) Close() {
	q.channel.Close()
}
//错误处理函数
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

发送端的确认

首先初始化消息队列的时候,我们要开启confirm模式,才能接收到这条应答。开启方式是将Channel.Confirm(noWait bool)参数设置为false,表示同意发送者将当前channel信道设置为confirm模式。

func New(s string, name string) *RabbitMQ {
	conn, e := amqp.Dial(s)
	failOnError(e, "连接Rabbitmq服务器失败!")
	ch, e := conn.Channel()
	failOnError(e, "无法打开频道!")
	q, e := ch.QueueDeclare(
		name,  //队列名
		false, //是否开启持久化
		true,  //不使用时删除
		false, //排他
		false, //不等待
		nil,   //参数
	)
	failOnError(e, "初始化消息队列失败!")
	mq := new(RabbitMQ)
	mq.channel = ch
	mq.Name = q.Name
	// 设置为confirm模式
	mq.channel.Confirm(false)
	return mq
}

然后在封装库中创建一个函数handleConfirm()用于接收来自Borker的回复:

func (q *RabbitMQ) ConfirmFromBroker(ch chan amqp.Confirmation) chan amqp.Confirmation {
	return q.channel.NotifyPublish(ch)
}

生产者

生产者端在向Broker发送消息的时候,我们使用一个无缓冲的通道来接收来自Broker的回复,然后创建一个协程监听这个无缓冲通道。

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	// 指定为topic类型
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout")
	confirm := producer.ConfirmFromBroker(make(chan amqp.Confirmation))
	go handleConfirm(confirm)
	var i int
	for {
		time.Sleep(time.Second)
		producer.Publish("exchange1", "fanout message: "+strconv.Itoa(i), "")
		i++
	}
}
func handleConfirm(confirm <-chan amqp.Confirmation) {
	for {
		select {
		case message := <-confirm:
			fmt.Println("接收到来自Broker的回复:", message)
		}
	}
}

运行结果:

接收到来自Broker的回复: {1 true}
接收到来自Broker的回复: {2 true}
接收到来自Broker的回复: {3 true}
接收到来自Broker的回复: {4 true}
接收到来自Broker的回复: {5 true}

消费端的确认

首先将Consume函数的第三个参数autoAck参数标记为false:

// Consume 接收某个消息队列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
	c, e := q.channel.Consume(
		q.Name,
		"",
		false, // 不自动确认消息
		false,
		false,
		false,
		nil,
	)
	failOnError(e, "接收消息失败!")
	return c
}

在消费者端我们采用公平派遣模式,即队列发送消息给消费者的时候,不再采用轮询机制,而是一个消费者消费完消息之后,会调用Ack(false)函数向队列发送一个回复,队列每次会将消息优先发送给消费完消息的消费者(回复过)。

消费端限流:

实现公平派遣模式我们需要设置消费者端一次只能消费一条消息,之前我们已经进行了封装,直接在消费者端调用即可:

// Qos 配置queue参数
func (q *RabbitMQ) Qos() {
	e := q.channel.Qos(1, 0, false)
	failOnError(e, "无法设置QoS")
}

生产者

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	// 指定为direct类型
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct")
	i := 0
	for {
		time.Sleep(time.Second)
		producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1")
		i = i + 1
	}
}

消费者1

消费者2在消费第三条消息的时候,假设发生了错误,我们调用d.Reject(true)函数让队列重新发送消息。

func main() {
	//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 指定一次只消费一条消息,直到消费完才重新接收
	consumer1.Qos()
	// 队列绑定到exchange
	consumer1.Bind("exchange", "key1")
	//接收消息
	msgs := consumer1.Consume()
	go func() {
		var i int
		for d := range msgs {
			time.Sleep(time.Second * 1)
			log.Printf("Consumer1 received a message: %s", d.Body)
			// 假设消费第三条消息的时候出现了错误,我们就调用d.Reject(true),队列会重新发送消息给消费者
			if i == 2 {
				d.Reject(true)
			} else {
				// 消息消费成功之后就回复
				d.Ack(false)
			}
			i++
		}
	}()
	select {}
}

消费者2

func main() {
	//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 指定一次只消费一条消息,直到消费完才重新接收
	consumer2.Qos()
	// 队列绑定到exchange
	consumer2.Bind("exchange", "key1")
	//接收消息
	msgs := consumer2.Consume()
	go func() {
		for d := range msgs {
			time.Sleep(time.Second * 5)
			log.Printf("Consumer2 received a message: %s", d.Body)
			// 消息消费成功之后就回复
			d.Ack(false)
		}
	}()
	select {}
}

运行结果:

# 消费者1
2022/11/06 19:55:08 Consumer1 received a message: "routing message: 0"
2022/11/06 19:55:10 Consumer1 received a message: "routing message: 2"
2022/11/06 19:55:11 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:12 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:13 Consumer1 received a message: "routing message: 4"
2022/11/06 19:55:14 Consumer1 received a message: "routing message: 6"

# 消费者2
2022/11/06 19:55:13 Consumer2 received a message: "routing message: 1"

到此这篇关于RabbitMq如何做到消息的可靠性投递的文章就介绍到这了,更多相关RabbitMq消息可靠性投递内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 详解RabbitMq如何做到消息的可靠性投递

    目录 前言 RabbitMq的投递及消费流程 提供者如何确保消息的成功投递 单条消息的同步确认 多条消息的同步确认 异步消息确认 消息的返回机制 前言 现在的一些互联网项目或者是高并发的项目中很少有没有引入消息队列的. 引入消息队列可以给这个项目带来很多的好处:比如 削峰 这个就很好的理解,在系统中的请求量是固定的,但是有的时候会多出很多的突发流量,比如在有秒杀活动的时候,这种瞬时的高流量可能会打垮系统,这个时候就可以很好的引入MQ,将这些请求积压到MQ中,然后消费端在按照自已的能力去处理这里请

  • RabbitMq如何做到消息的可靠性投递

    目录 如何保证消息不丢失 Go 实现 安装操作库 发送端的确认 消费端的确认 如何保证消息不丢失 在使用RabbitMQ的时候,我们需要保证消息不能丢失,消息从生产者生产出来一直到消费者消费成功,这条链路是这样的: 消息的可靠投递分为了两大内容:发送端的确认(p->broker和exchange->queue)和消费端的确认(queue->c). 发送端的确认 Rabbit提供了两种方式来保证发送端的消息可靠性投递:confirm 确认模式 和return 退回模式. confirm 确

  • springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步 最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的. 于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和大伙一起学习学习! 这次我分享的是 springboot + rabbitmq 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,

  • Java RabbitMQ 中的消息长期不消费会过期吗

    目录 1. 默认情况 2. TTL 2.1 单条消息过期 2.2 队列消息过期 2.3 特殊情况 3. 死信队列 3.1 死信交换机 3.2 死信队列 3.3 实践 4. 小结 RabbitMQ 中的消息长期未被消费会过期吗?用过 RabbitMQ 的小伙伴可能都有这样的疑问,今天松哥就来和大家捋一捋这个问题. 1. 默认情况 首先我们来看看默认情况. 默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直

  • 详解SpringBoot整合RabbitMQ如何实现消息确认

    目录 简介 生产者消息确认 介绍 流程 配置 ConfirmCallback ReturnCallback 注册ConfirmCallback和ReturnCallback 消费者消息确认 介绍 手动确认三种方式 简介 本文介绍SpringBoot整合RabbitMQ如何进行消息的确认. 生产者消息确认 介绍 发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递. 如果消息和队列是可持久化的,那么确认消

  • RocketMQ事务消息保证消息的可靠性和一致性

    这篇讲解一下rocketMq的事务消息的原理 在发送事务消息的时候,会加一个标识,表示这个消息是事务消息.broker接收到消息后,在我们之前看的代码里org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage会判断是否是事务消息. if (sendTransactionPrepareMessage) { asyncPutMessageFuture = this.brokerController.getTransac

  • .Net使用RabbitMQ即时发消息Demo

    前言 最近项目要使用RabbitMQ,网上已经有很多优秀的文章了,百度百科对RabbitMQ阐述也非常明确,建议去看下,还有amqp协议.必须一提的是rabbitmq是由LShift提供的一个消息队列协议(AMQP)的开源实现,由以高性能.健壮以及可伸缩性出名的Erlang写成(因此也是继承了这些优点). 最近参考大神们的博客,自己做了一个RabbitMQ即时发消息的Demo.下面话不多说了,来一起看看详细的介绍吧. 步骤如下: 1.使用VS的NuGet安装包管理工具安装RabbitMQ.Cli

  • SpringBoot+RabbitMQ方式收发消息的实现示例

    本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~ 交给Spring帮我们管理连接可以让我们专注于业务逻辑,就像声明式事务一样易用,方便又高效. 祝有好收获,先赞后看,快乐无限. 本文代码:   https://gitee.com/he-erduo/spring-boot-learning-demo https://github.com/he-erduo/spring-boot-lea

  • c# rabbitmq 简单收发消息的示例代码

    发布消息:(生产者) /// <summary> /// 发送消息 /// </summary> /// <param name="queue">队列名</param> /// <param name="message">消息内容</param> private static void PublishInfo(string queue, string message) { try { var f

  • C#利用RabbitMQ实现点对点消息传输

    消息队列模型 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列.生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者. RabbitMQ设置 RabbitMQ是通过交换机将消息转发到对应队列,所以队列需要和交换机进行绑定.本例将队列绑定到默认的amq.direct交换机,并设置Routing key,如下图所示: RabbitMQ动态库安装 通过NuGet包管理器进行安装RabbitMQ.Client,如下所示:

随机推荐