关于golang监听rabbitmq消息队列任务断线自动重连接的问题

golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务

需求背景:

goalng常驻内存任务脚本监听rbmq执行任务

任务脚本由supervisor来管理

当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态

假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了

如果是短时间的停止重启,supervisor是可以即时唤醒该程序。如果服务器长时间没有恢复正常运行,程序就会出现fatal进程启动失败的状态,此时可以通过告警来提醒开发人员

如果以上告警能时时通知运维人员此问题可以略过了。今天讨论的是如果在长时间断开连接还能在服务器恢复正常情况下自动实现重连。

代码实现一:

消费者:

package main
import (
    "fmt"
    "github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
)
type RecvPro struct {
}
//// 实现消费者 消费消息失败 自动进入延时尝试  尝试3次之后入库db
/*
返回值 error 为nil  则表示该消息消费成功
否则消息会进入ttl延时队列  重复尝试消费3次
3次后消息如果还是失败 消息就执行失败  进入告警 FailAction
 */
func (t *RecvPro) Consumer(dataByte []byte) error {
    //time.Sleep(500*time.Microsecond)
    //return errors.New("顶顶顶顶")
    fmt.Println(string(dataByte))
    //time.Sleep(1*time.Second)
    return nil
//消息已经消费3次 失败了 请进行处理
如果消息 消费3次后 仍然失败  此处可以根据情况 对消息进行告警提醒 或者 补偿  入库db  钉钉告警等等
func (t *RecvPro) FailAction(err error,dataByte []byte) error {
    fmt.Println(err)
    fmt.Println("任务处理失败了,我要进入db日志库了")
    fmt.Println("任务处理失败了,发送钉钉消息通知主人")
func main() {
    t := &RecvPro{}
    //rabbitmq.Recv(rabbitmq.QueueExchange{
    //    "a_test_0001",
    //    "",
    //    "amqp://guest:guest@192.168.2.232:5672/",
    //},t,5)
    /*
        runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了
     */
    err := rabbitmq.Recv(rabbitmq.QueueExchange{
        "a_test_0001",
        "hello_go",
        "direct",
        "amqp://guest:guest@192.168.1.169:5672/",
    },t,4)
    if(err != nil){
        fmt.Println(err)
    }

rabbitmq代码

package rabbitmq

import (
    "errors"
    "strconv"
    "time"
    //"errors"
    "fmt"
    "github.com/streadway/amqp"
    "log"
)
// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel
// 定义生产者接口
type Producer interface {
    MsgContent() string
}
type RetryProducer interface {
// 定义接收者接口
type Receiver interface {
    Consumer([]byte)    error
    FailAction(error , []byte)  error
// 定义RabbitMQ对象
type RabbitMQ struct {
    connection *amqp.Connection
    Channel *amqp.Channel
    dns string
    QueueName   string            // 队列名称
    RoutingKey  string            // key名称
    ExchangeName string           // 交换机名称
    ExchangeType string           // 交换机类型
    producerList []Producer
    retryProducerList []RetryProducer
    receiverList []Receiver
// 定义队列交换机对象
type QueueExchange struct {
    QuName  string           // 队列名称
    RtKey   string           // key值
    ExName  string           // 交换机名称
    ExType  string           // 交换机类型
    Dns     string              //链接地址
// 链接rabbitMQ
func (r *RabbitMQ)MqConnect() (err error){
    mqConn, err = amqp.Dial(r.dns)
    r.connection = mqConn   // 赋值给RabbitMQ对象
    if err != nil {
        fmt.Printf("rbmq链接失败  :%s \n", err)
    }
    return
// 关闭mq链接
func (r *RabbitMQ)CloseMqConnect() (err error){
    err = r.connection.Close()
    if err != nil{
        fmt.Printf("关闭mq链接失败  :%s \n", err)
func (r *RabbitMQ)MqOpenChannel() (err error){
    mqConn := r.connection
    r.Channel, err = mqConn.Channel()
    //defer mqChan.Close()
        fmt.Printf("MQ打开管道失败:%s \n", err)
    return err
func (r *RabbitMQ)CloseMqChannel() (err error){
    r.Channel.Close()
// 创建一个新的操作对象
func NewMq(q QueueExchange) RabbitMQ {
    return RabbitMQ{
        QueueName:q.QuName,
        RoutingKey:q.RtKey,
        ExchangeName: q.ExName,
        ExchangeType: q.ExType,
        dns:q.Dns,
func (mq *RabbitMQ) sendMsg (body string) (err error)  {
    err = mq.MqOpenChannel()
    ch := mq.Channel
        log.Printf("Channel err  :%s \n", err)
    defer mq.Channel.Close()
    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
        if err != nil {
            log.Printf("ExchangeDeclare err  :%s \n", err)
    // 用于检查队列是否存在,已经存在不需要重复声明
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
        log.Printf("QueueDeclare err :%s \n", err)
    // 绑定任务
    if mq.RoutingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
            log.Printf("QueueBind err :%s \n", err)
    if mq.ExchangeName != "" && mq.RoutingKey != ""{
        err = mq.Channel.Publish(
            mq.ExchangeName,     // exchange
            mq.RoutingKey, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(body),
            })
    }else{
            "",     // exchange
            mq.QueueName, // routing key
/*
发送延时消息
 */
func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){
    err =mq.MqOpenChannel()
            return
    if ttl <= 0{
        return errors.New("发送延时消息,ttl参数是必须的")
    table := make(map[string]interface{},3)
    table["x-dead-letter-routing-key"] = mq.RoutingKey
    table["x-dead-letter-exchange"] = mq.ExchangeName
    table["x-message-ttl"] = ttl*1000
    //fmt.Printf("%+v",table)
    //fmt.Printf("%+v",mq)
    ttlstring := strconv.FormatInt(ttl,10)
    queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
    routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
    _, err = ch.QueueDeclare(queueName, true, false, false, false, table)
        return
    if routingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil)
    header := make(map[string]interface{},1)
    header["retry_nums"] = 0
    var ttl_exchange string
    var ttl_routkey string
    if(mq.ExchangeName != "" ){
        ttl_exchange = mq.ExchangeName
        ttl_exchange = ""
    if mq.RoutingKey != "" && mq.ExchangeName != ""{
        ttl_routkey = routingKey
        ttl_routkey = queueName
    err = mq.Channel.Publish(
        ttl_exchange,     // exchange
        ttl_routkey, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
            ContentType: "text/plain",
            Body:        []byte(body),
            Headers:header,
        })
func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string)  {
    err :=mq.MqOpenChannel()
    //原始路由key
    oldRoutingKey := args[0]
    //原始交换机名
    oldExchangeName := args[1]
    table["x-dead-letter-routing-key"] = oldRoutingKey
    if oldExchangeName != "" {
        table["x-dead-letter-exchange"] = oldExchangeName
        mq.ExchangeName = ""
        table["x-dead-letter-exchange"] = ""
    table["x-message-ttl"] = int64(20000)
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)
    header["retry_nums"] = retry_nums + int32(1)
        ttl_routkey = mq.RoutingKey
        ttl_routkey = mq.QueueName
    //fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey)
        fmt.Printf("MQ任务发送失败:%s \n", err)
// 监听接收者接收任务 消费者
func (mq *RabbitMQ) ListenReceiver(receiver Receiver) {
    // 获取消费通道,确保rabbitMQ一个一个发送消息
    err =  ch.Qos(1, 0, false)
    msgList, err :=  ch.Consume(mq.QueueName, "", false, false, false, false, nil)
        log.Printf("Consume err :%s \n", err)
    for msg := range msgList {
        retry_nums,ok := msg.Headers["retry_nums"].(int32)
        if(!ok){
            retry_nums = int32(0)
        // 处理数据
        err := receiver.Consumer(msg.Body)
        if err!=nil {
            //消息处理失败 进入延时尝试机制
            if retry_nums < 3{
                fmt.Println(string(msg.Body))
                fmt.Printf("消息处理失败 消息开始进入尝试  ttl延时队列 \n")
                retry_msg(msg.Body,retry_nums,QueueExchange{
                        mq.QueueName,
                        mq.RoutingKey,
                        mq.ExchangeName,
                        mq.ExchangeType,
                        mq.dns,
                    })
            }else{
                //消息失败 入库db
                fmt.Printf("消息处理3次后还是失败了 入库db 钉钉告警 \n")
                receiver.FailAction(err,msg.Body)
            }
            err = msg.Ack(true)
            if err != nil {
                fmt.Printf("确认消息未完成异常:%s \n", err)
        }else {
            // 确认消息,必须为false
                fmt.Printf("消息消费ack失败 err :%s \n", err)
//消息处理失败之后 延时尝试
func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){
    //原始队列名称 交换机名称
    oldQName := queueExchange.QuName
    oldExchangeName := queueExchange.ExName
    oldRoutingKey := queueExchange.RtKey
    if oldRoutingKey == "" || oldExchangeName == ""{
        oldRoutingKey = oldQName
    if queueExchange.QuName != "" {
        queueExchange.QuName = queueExchange.QuName + "_retry_3";
    if queueExchange.RtKey != "" {
        queueExchange.RtKey = queueExchange.RtKey + "_retry_3";
        queueExchange.RtKey = queueExchange.QuName + "_retry_3";
//fmt.Printf("%+v",queueExchange)
    mq := NewMq(queueExchange)
    _ = mq.MqConnect()
    defer func(){
        _ = mq.CloseMqConnect()
    }()
    //fmt.Printf("%+v",queueExchange)
    mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)
func Send(queueExchange QueueExchange,msg string) (err error){
    err = mq.MqConnect()
        mq.CloseMqConnect()
    err = mq.sendMsg(msg)
//发送延时消息
func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){
    err = mq.sendDelayMsg(msg,ttl)
runNums  开启并发执行任务数量
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){
    //链接rabbitMQ
    if(err != nil){
    //rbmq断开链接后 协程退出释放信号
    taskQuit:= make(chan struct{}, 1)
    //尝试链接rbmq
    tryToLinkC := make(chan struct{}, 1)
    //开始执行任务
    for i:=1;i<=runNums;i++{
        go Recv2(mq,receiver,taskQuit);
    //如果rbmq断开连接后 尝试重新建立链接
    var tryToLink = func() {
        for {
            err = mq.MqConnect()
            if(err == nil){
                tryToLinkC <- struct{}{}
                break
            time.Sleep(time.Second * 10)
    for{
        select {
        case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接
             go tryToLink()
            <-tryToLinkC //建立链接成功后 重新开启协程执行任务
            fmt.Println("重新开启新的协程执行任务")
            go Recv2(mq,receiver,taskQuit);
        time.Sleep(time.Millisecond*100)
func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
        defer func() {
            fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~")
            taskQuit <- struct{}{}
        }()
        // 验证链接是否正常
        err := mq.MqOpenChannel()
        if(err != nil){
        mq.ListenReceiver(receiver)
type retryPro struct {
    msgContent   string

实现重连方式很多,下面实现方式比较简单

1.Recv方法创建ampq链接

2.启动协程开始执行任务 

MqOpenChannel 打开一个channel通道处理amqp消息

拿到消息 处理任务

  3,协程中捕获异常发送消息到taskQuit <- struct{}{}

  4,主进程监听taskQuit管道 开始尝试重新链接amqp 直到链接成功

  5,重新链接成功后启动新的协程处理任务

主要代码分析:

/*
runNums  开启并发执行任务数量
 */
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){
    mq := NewMq(queueExchange)
    //链接rabbitMQ
    err = mq.MqConnect()
    if(err != nil){
        return
    }
    //rbmq断开链接后 协程退出释放信号
    taskQuit:= make(chan struct{}, 1)
    //尝试链接rbmq
    tryToLinkC := make(chan struct{}, 1)
    //开始执行任务
    for i:=1;i<=runNums;i++{
        go Recv2(mq,receiver,taskQuit);

    //如果rbmq断开连接后 尝试重新建立链接
    var tryToLink = func() {
        for {
            err = mq.MqConnect()
            if(err == nil){
                tryToLinkC <- struct{}{}
                break
            }
            time.Sleep(time.Second * 10)
        }
    for{
        select {
        case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接
             go tryToLink()
            <-tryToLinkC //建立链接成功后 重新开启协程执行任务
            fmt.Println("重新开启新的协程执行任务")
            go Recv2(mq,receiver,taskQuit);
        time.Sleep(time.Millisecond*100)
}
func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
        defer func() {
            fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~")
            taskQuit <- struct{}{}
            return
        }()
        // 验证链接是否正常
        err := mq.MqOpenChannel()
        if(err != nil){
        mq.ListenReceiver(receiver)

到此这篇关于golang监听rabbitmq消息队列任务断线自动重连接的文章就介绍到这了,更多相关golang rabbitmq断线自动重连内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • golang实现redis的延时消息队列功能示例

    前言 在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现.本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo. 提前准备 安装redis, redis-go 因为用的是macOS, 直接 $ brew install redis $ go get github.com/garyburd/redigo/redis 又因为比较懒,生成任

  • Golang中优秀的消息队列NSQ基础安装及使用详解

    前言 NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息.NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障.故障容错.高可用性以及能够保证消息的可靠传递的特征,是一个成熟的.已在大规模生成环境下应用的产品. 背景介绍 在服务器最开始的时候,基本上在一台主机上就能解决大部分问题,所以一般架构设计如下: 但是,突然某一天,来了一个新需求,我们服务器上不只是简单的储存一些文本信息,我们需要储存图片甚至视频,显然直接在一台主机上再部署一个

  • 关于golang监听rabbitmq消息队列任务断线自动重连接的问题

    golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务 需求背景: goalng常驻内存任务脚本监听rbmq执行任务 任务脚本由supervisor来管理 当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态 假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了 如果是短时间的停止重启,supervisor是可以即时唤醒该程序.如果服务器长时间没有恢复正常运行,程序就会出

  • golang gin 监听rabbitmq队列无限消费的案例代码

    golang gin 监听rabbitmq队列无限消费 连接rabbitmq package database import ( "github.com/streadway/amqp" "log" "reflect" "yy-data-processing/common/config" ) var RabbitConn *amqp.Connection var RabbitChannel *amqp.Channel func

  • 利用Python学习RabbitMQ消息队列

    RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱.邮局.投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处. RabbitMQ和邮局的主要区别就是RabbitMQ接收.存储和发送的是二进制数据----消息. rabbitmq基本管理命令: 一步启动Erlang node和Rabbit应用:sudo rabbitmq-serv

  • SpringBoot整合RabbitMQ消息队列的完整步骤

    SpringBoot整合RabbitMQ 主要实现RabbitMQ以下三种消息队列: 简单消息队列(演示direct模式) 基于RabbitMQ特性的延时消息队列 基于RabbitMQ相关插件的延时消息队列 公共资源 1. 引入pom依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • RabbitMQ消息队列实现延迟任务示例

    目录 一.序言 1.实现原理 2.组件选型 二.方案设计 (一)服务器 (二)生产者 (三)消费者 三.SpringBoot实现 (一)生产者 (二)消费者 (三)通用工具包 一.序言 延迟任务应用广泛,延迟任务典型应用场景有订单超时自动取消:支付回调重试.其中订单超时取消具有幂等性属性,无需考虑重复消费问题:支付回调重试需要考虑重复消费问题. 延迟任务具有如下特点:在未来的某个时间点执行:一般仅执行一次. 1.实现原理 生产者将带有延迟信息的消息发送到RabbitMQ交换机中,等待延迟时间结束

  • Springboot 整合 RabbitMQ 消息队列 详情

    目录 生产者工程 POM依赖 application文件 生产者业务代码 测试 Direct 模式 业务代码 消费者 消息监听 Topic 模式 生产者 消费者 生产者工程 POM依赖 可以在创建工程时直接选择添加依赖. application文件 因为rabbitmq具有默认地址及用户信息,所以如果是本地rabbitmq可以不需要进行配置. RabbitMQ配置文件: 在使用相关交换机及队列时,我们需要实现声明交换机及队列,如果没有对应信息,则启动项目会失败.所以在使用springboot整合

  • Golang监听日志文件并发送到kafka中

    目录 前言 涉及的golang库和可视化工具: 工作的流程 环境准备 代码分层 关键的代码 main.go kafka.go tail.go 前言 日志收集项目的准备中,本文主要讲的是利用golang的tail库,监听日志文件的变动,将日志信息发送到kafka中. 涉及的golang库和可视化工具: go-ini,sarama,tail其中: go-ini:用于读取配置文件,统一管理配置项,有利于后其的维护 sarama:是一个go操作kafka的客户端.目前我用于向kefka发送消息 tail

  • 使用PHP访问RabbitMQ消息队列的方法示例

    本文实例讲述了使用PHP访问RabbitMQ消息队列的方法.分享给大家供大家参考,具体如下: 扩展安装 PHP访问RabbitMQ实际使用的是AMQP协议,所以我们只要安装epel库中的php-pecl-amqp这个包即可 rpm -ivh http://mirror.neu.edu.cn/fedora/epel/6/x86_64/epel-release-6-8.noarch.rpm yum install php-pecl-amqp 交换建立 <?php $connection = new

  • Java RabbitMQ消息队列详解常见问题

    目录 消息堆积 保证消息不丢失 死信队列 延迟队列 RabbitMQ消息幂等问题 RabbitMQ消息自动重试机制 合理的选择重试机制 消费者开启手动ack模式 rabbitMQ如何解决消息幂等问题 RabbitMQ解决分布式事务问题 基于RabbitMQ解决分布式事务的思路 消息堆积 消息堆积的产生场景: 生产者产生的消息速度大于消费者消费的速度.解决:增加消费者的数量或速度. 没有消费者进行消费的时候.解决:死信队列.设置消息有效期.相当于对我们的消息设置有效期,在规定的时间内如果没有消费的

随机推荐