golang gin 监听rabbitmq队列无限消费的案例代码

golang gin 监听rabbitmq队列无限消费

连接rabbitmq

package database

import (
	"github.com/streadway/amqp"
	"log"
	"reflect"
	"yy-data-processing/common/config"
)

var RabbitConn *amqp.Connection
var RabbitChannel *amqp.Channel

func InitRabbitmq() {
	var err error
	RabbitConn, err = amqp.Dial(config.Config.RabbitUrl)
	if err != nil {
		log.Println("连接RabbitMQ失败")
		panic(err)
	}
	RabbitChannel, err = RabbitConn.Channel()
	if err != nil {
		log.Println("获取RabbitMQ channel失败")
		panic(err)
	}
}

// 0表示channel未关闭,1表示channel已关闭
func CheckRabbitClosed(ch amqp.Channel) int64 {
	d := reflect.ValueOf(ch)
	i := d.FieldByName("closed").Int()
	return i
}

创建生产者

package service

import (
	"encoding/json"
	"github.com/streadway/amqp"
	"log"
	"yy-data-processing/common/config"
	"yy-data-processing/common/database"
	"yy-data-processing/model"
)

func Producer() {
	// 声明队列,没有则创建
	// 队列名称、是否持久化、所有消费者与队列断开时是否自动删除队列、是否独享(不同连接的channel能否使用该队列)
	declare, err := database.RabbitChannel.QueueDeclare(config.Config.HawkSaveQueueName, true, false, false, false, nil)
	if err != nil {
		log.Printf("声明队列 %v 失败, error: %v", config.Config.HawkSaveQueueName, err)
		panic(err)
	}

	request := model.Request{}
	marshal, _ := json.Marshal(request )
	// exchange、routing key、mandatory、immediate
	err = database.RabbitChannel.Publish("", declare.Name, false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(marshal),
	})
	if err != nil {
		log.Printf("生产者发送消息失败, error: %v", err)
	} else {
		log.Println("生产者发送消息成功")
	}
}

创建消费者

package service

import (
	"encoding/json"
	"log"
	"os"
	"strings"
	"sync"
	"time"
	"yy-data-processing/common/config"
	"yy-data-processing/common/database"
	"yy-data-processing/model"
)

func Consumer() {
	// 声明队列,没有则创建
	// 队列名称、是否持久化、所有消费者与队列断开时是否自动删除队列、是否独享(不同连接的channel能否使用该队列)
	_, err := database.RabbitChannel.QueueDeclare(config.Config.QueueName, true, false, false, false, nil)
	if err != nil {
		log.Printf("声明队列 %v 失败, error: %v", config.Config.QueueName, err)
		panic(err)
	}

	// 队列名称、consumer、auto-ack、是否独享
	// deliveries是一个管道,有消息到队列,就会消费,消费者的消息只需要从deliveries这个管道获取
	deliveries, err := database.RabbitChannel.Consume(config.Config.QueueName, "", true, false, false, false, nil)
	if err != nil {
		log.Printf("从队列 %v 获取数据失败, error: %v", config.Config.QueueName, err)
	} else {
		log.Println("从消费队列获取任务成功")
	}

    // 阻塞住
	for {
		select {
		case message := <-deliveries:
			closed := database.CheckRabbitClosed(*database.RabbitChannel)
			if closed == 1 { // channel 已关闭,重连一下
				database.InitRabbitmq()
			} else {
				msgData := string(message.Body)
				request := model.Request{}
				err := json.Unmarshal([]byte(msgData), &request)
				if err != nil {
					log.Printf("解析rabbitmq数据 %v 失败, error: %v", msgData, err)
				} else {
					// TODO...
                    // 处理逻辑

				}
			}
		}
	}
}

main方法协程调用

package main

import (
	"log"
	"yy-data-processing/common/config"
	"yy-data-processing/common/database"
	"yy-data-processing/router"
	"yy-data-processing/service"
)

func main() {
	// 初始化路由
	routers := router.InitRouters()

	// 初始化RabbitMQ
	database.InitRabbitmq()
	go service.Producer()
	go service.Consumer()

	port := config.Config.Port
	if err := routers.Run(":" + port); err != nil {
		log.Printf("启动服务失败: ", err)
	}

}

到此这篇关于golang gin 监听rabbitmq队列无限消费的文章就介绍到这了,更多相关golang监听rabbitmq内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 关于golang监听rabbitmq消息队列任务断线自动重连接的问题

    golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务 需求背景: goalng常驻内存任务脚本监听rbmq执行任务 任务脚本由supervisor来管理 当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态 假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了 如果是短时间的停止重启,supervisor是可以即时唤醒该程序.如果服务器长时间没有恢复正常运行,程序就会出

  • GoLang RabbitMQ TTL与死信队列以及延迟队列详细讲解

    目录 TTL 死信队列 延迟队列 Go实现延迟队列 TTL TTL 全称 Time To Live(存活时间/过期时间).当消息到达存活时间后,还没有被消费,就会被自动清除.RabbitMQ可以设置两种过期时间: 对消息设置过期时间. 对整个队列(Queue)设置过期时间. 如何设置 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期. 设置消息过期时间使用参数:expiration,单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这

  • Golang rabbitMQ生产者消费者实现示例

    目录 消费者 生产者 消费者 package main import ( "fmt" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { fmt.Println("%s: %s", msg, err) } } // 只能在安装 rabbitmq 的服务器上操作 func main() { conn, err := amqp.

  • GoLang RabbitMQ实现六种工作模式示例

    目录 六种工作模式介绍 1.简单(Simple)模式 2.工作队列(Work Queue)模式 3.发布/订阅(Pub/Sub)模式 4.路由(Routing)模式 5.通配符(Tpoic)模式 Go语言的实现 安装操作库 简单(Simple)模式 工作队列(Work Queue)模式 发布/订阅(Pub/Sub)模式 路由(Routing)模式 通配符(Tpoic)模式 六种工作模式介绍 1.简单(Simple)模式 P:生产者,也就是要发送消息的程序. C:消费者:消息的接收者,会一直等待消

  • golang gin 监听rabbitmq队列无限消费的案例代码

    golang gin 监听rabbitmq队列无限消费 连接rabbitmq package database import ( "github.com/streadway/amqp" "log" "reflect" "yy-data-processing/common/config" ) var RabbitConn *amqp.Connection var RabbitChannel *amqp.Channel func

  • Android App内监听截图加二维码功能代码

    Android截屏功能是一个常用的功能,可以方便的用来分享或者发送给好友,本文介绍了如何实现app内截屏监控功能,当发现用户在我们的app内进行了截屏操作时,进行对图片的二次操作,例如添加二维码,公司logo等一系列*. 项目地址 测试截图: 截屏原理 android系统并没有提供截屏通知相关的API,需要我们自己利用系统能提供的相关特性变通实现.Android系统有一个媒体数据库,每拍一张照片,或使用系统截屏截取一张图片,都会把这张图片的详细信息加入到这个媒体数据库,并发出内容改变通知,我们可

  • Android 监听屏幕是否锁屏的实例代码

    今天,简单讲讲如何监听手机屏幕是否锁屏. 实现方法: 1)通过BroadcastReceiver接收广播Intent.ACTION_SCREEN_ON和Intent.ACTION_SCREEN_OFF可以判断屏幕状态是否锁屏,但是只有屏幕状态发生改变时才会发出广播: 2)如果要在屏幕状态发生改变之前就想获取屏幕状态,可以通过反射机制调用PowerManager的isScreenOn方法 . 具体实现,见代码: 直接上代码: 1.定义一个接收广播的类 package com.app.lib; im

  • android 监听SD卡文件变化的实现代码

    Android系统API提供了FileObserver抽象类(Linux的INotify机制)来监听系统/sdcard中的文件或文件夹,FileObserver类能对sdcard中的文件及文件夹的打开.创建.移动和删除操作进行监控.下面看看代码实现: (1)创建目录监听器: import android.os.FileObserver; import android.util.Log; /** * SD卡中的目录创建监听器. * * @author mayingcai */ public cla

  • 在as中监听自定义事件并处理事件的实例代码

    场景描述:点击一张图片,响应事件.必须在AS中,去监听事件,并处理事件. 1 自定义了一个事件,如下: 复制代码 代码如下: package bridge { import flash.events.Event; import mx.events.FlexEvent; public class MyEvent extends Event { public static const myclick:String="myclick"; public function MyEvent(typ

  • Android 广播监听网络状态详解及实例代码

    Android 广播监听网络状态 我们在做多线程下载的时候,或者是在加载h5界面的时候,常常会遇到网络状态不好或者断网的时候,在这或者当我们的应用程序启动没有退出的时候,我们就需要对网络状态监听加以判断. 这时候,我们一般情况下,两种方式进行处理. 第一: 开启服务. 第二:发送广播的形式. 建议采用方法二. 源代码如下: 广播: /** * 有网络的广播 */ BroadcastReceiver connectionReceiver = new BroadcastReceiver() { @O

  • Native.js获取监听开关等操作Android蓝牙设备实例代码

    Native.js开启关闭蓝牙 var main = plus.android.runtimeMainActivity(); var Context = plus.android.importClass("android.content.Context"); var BManager = main.getSystemService(Context.BLUETOOTH_SERVICE); plus.android.importClass(BManager);//引入相关的method函数

  • 基于python实现监听Rabbitmq系统日志代码示例

    介绍 rabbitmq默认有7个交换机,其中amq.rabbitmq.log为系统日志的交换机,这个日志为topic类型,会有三个等级的(routing_key)的日志发送到这个交换机上. 代码如下 #!/usr/bin/env python # -*- coding: utf-8 -*- import pika # ########################### 订阅者 ########################### credentials = pika.PlainCreden

  • Golang基于文件魔数判断文件类型的案例代码

    目录 查找位置 文件类型 实现基础函数 类型判断函数 测试代码 总结 本文介绍基于魔数判断文件类型,涉及文件查找读取内容.文件魔数.字节比较,最后还介绍函数参数的知识. 查找位置 File.Seek()函数可以设置偏移位置,为下一次读或写确定偏移量,具体起点有whence确定:0标识相对文件开始位置.1相对当前位置.2相对文件结尾.函数返回新的位置及错误.请看下面示例: package main import ( "os" "fmt" "log"

随机推荐