go-micro集成RabbitMQ实战和原理详解

目录
  • Broker的核心功能
    • 发布
    • 订阅
  • go-micro集成RabbitMQ实战
    • 启动一个RabbitMQ
    • 编写收发函数
    • 编写主体代码
  • go-micro集成RabbitMQ的处理流程
  • 填的几个坑
    • 不能接收其它框架发布的消息
    • RabbitMQ重启后订阅者和发布者无限阻塞

在go-micro中异步消息的收发是通过Broker这个组件来完成的,底层实现有RabbitMQ、Kafka、Redis等等很多种方式,这篇文章主要介绍go-micro使用RabbitMQ收发数据的方法和原理。

Broker的核心功能

Broker的核心功能是Publish和Subscribe,也就是发布和订阅。它们的定义是:

Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)

发布

发布第一个参数是topic(主题),用于标识某类消息。

发布的数据是通过Message承载的,其包括消息头和消息体,定义如下:

type Message struct {
	Header map[string]string
	Body   []byte
}

消息头是map,也就是一组KV(键值对)。

消息体是字节数组,在发送和接收时需要开发者进行编码和解码的处理。

订阅

订阅的第一个参数也是topic(主题),用于过滤出要接收的消息。

订阅的数据是通过Handler处理的,Handler是一个函数,其定义如下:

type Handler func(Event) error

其中的参数Event是一个接口,需要具体的Broker来实现,其定义如下:

type Event interface {
	Topic() string
	Message() *Message
	Ack() error
	Error() error
}
  • Topic() 用于获取当前消息的topic,也是发布者发送时的topic。
  • Message() 用于获取消息体,也是发布者发送时的Message,其中包括Header和Body。
  • Ack() 用于通知Broker消息已经收到了,Broker可以删除消息了,可用来保证消息至少被消费一次。
  • Error() 用于获取Broker处理消息过成功的错误。

开发者订阅数据时,需要实现Handler这个函数,接收Event的实例,提取数据进行处理,根据不同的Broker,可能还需要调用Ack(),处理出现错误时,返回error。

go-micro集成RabbitMQ实战

大概了解了Broker的定义之后,再来看下如何使用go-micro收发RabbitMQ消息。

启动一个RabbitMQ

如果你已经有一个RabbitMQ服务器,请跳过这个步骤。

这里介绍一个使用docker快速启动RabbitMQ的方法,当然前提是你得安装了docker。

执行如下命令启动一个rabbitmq的docker容器:

docker run --name rabbitmq1 -p 5672:5672 -p 15672:15672 -d rabbitmq

然后进入容器进行一些设置:

docker exec -it rabbitmq1 /bin/bash

启动管理工具、禁用指标采集(会导致某些API500错误):

rabbitmq-plugins enable rabbitmq_management

cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf

最后重启容器:

docker restart rabbitmq1

最后浏览器中输入 http://127.0.0.0:15672 即可访问,默认用户名和密码都是 guest 。

编写收发函数

为了方便演示,先来定义发布消息和接收消息的函数。其中发布函数使用了go-micro提供的Event类型,还有其它类型也可以提供Publish的功能,这里发送的数据格式是Json字符串。接收消息的函数名称可以随意取,但是参数和返回值必须符合规范,也就是下边代码中的样子,这个函数也可以是绑定到某个类型的。

// 定义一个发布消息的函数:每隔1秒发布一条消息
func loopPublish(event micro.Event) {
	for {
		time.Sleep(time.Duration(1) * time.Second)

		curUnix := strconv.FormatInt(time.Now().Unix(), 10)
		msg := "{\"Id\":" + curUnix + ",\"Name\":\"张三\"}"
		event.Publish(context.TODO(), msg)
	}
}

// 定义一个接收消息的函数:将收到的消息打印出来
func handle(ctx context.Context, msg interface{}) (err error) {
	defer func() {
		if r := recover(); r != nil {
			err = errors.New(fmt.Sprint(r))
			log.Println(err)
		}
	}()

	b, err := json.Marshal(msg)
	if err != nil {
		log.Println(err)
		return
	}

	log.Println(string(b))
	return
}

编写主体代码

这里先给出代码,里面提供了一些注释,后边还会有详细介绍。

func main() {
	// RabbitMQ的连接参数
	rabbitmqUrl := "amqp://guest:guest@127.0.0.1:5672/"
	exchangeName := "amq.topic"
	subcribeTopic := "test"
	queueName := "rabbitmqdemo_test"

	// 默认是application/protobuf,这里演示用的是Json,所以要改下
	server.DefaultContentType = "application/json"

	// 创建 RabbitMQ Broker
	b := rabbitmq.NewBroker(
		broker.Addrs(rabbitmqUrl),           // RabbitMQ访问地址,含VHost
		rabbitmq.ExchangeName(exchangeName), // 交换机的名称
		rabbitmq.DurableExchange(),          // 消息在Exchange中时会进行持久化处理
		rabbitmq.PrefetchCount(1),           // 同时消费的最大消息数量
	)

	// 创建Service,内部会初始化一些东西,必须在NewSubscribeOptions前边
	service := micro.NewService(
		micro.Broker(b),
	)
	service.Init()

	// 初始化订阅上下文:这里不是必需的,订阅会有默认值
	subOpts := broker.NewSubscribeOptions(
		rabbitmq.DurableQueue(),   // 队列持久化,消费者断开连接后,消息仍然保存到队列中
		rabbitmq.RequeueOnError(), // 消息处理函数返回error时,消息再次入队列
		rabbitmq.AckOnSuccess(),   // 消息处理函数没有error返回时,go-micro发送Ack给RabbitMQ
	)

	// 注册订阅
	micro.RegisterSubscriber(
		subcribeTopic,    // 订阅的Topic
		service.Server(), // 注册到的rpcServer
		handle,           // 消息处理函数
		server.SubscriberContext(subOpts.Context), // 订阅上下文,也可以使用默认的
		server.SubscriberQueue(queueName),         // 队列名称
	)

	// 发布事件消息
	event := micro.NewEvent(subcribeTopic, service.Client())
	go loopPublish(event)

	log.Println("Service is running ...")
	if err := service.Run(); err != nil {
		log.Println(err)
	}
}

主要逻辑是:

1、先创建一个RabbitMQ Broker,它实现了标准的Broker接口。其中主要的参数是RabbitMQ的访问地址和RabbitMQ交换机,PrefetchCount是订阅者(或称为消费者)使用的。

2、然后通过 NewService 创建go-micro服务,并将broker设置进去。这里边会初始化很多东西,最核心的是创建一个rpcServer,并将rpcServer和这个broker绑定起来。

3、然后是通过 RegisterSubscriber 注册订阅,这个注册有两个层面的功能:一是如果RabbitMQ上还不存在这个队列时创建队列,并订阅指定topic的消息;二是定义go-micro程序从这个RabbitMQ队列接收数据的处理方式。

这里详细看下订阅的参数:

func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error
  • topic:go-micro使用的是Topic模式,发布者发送消息的时候要指定一个topic,订阅者根据需要只接收某个或某几个topic的消息;
  • s:消息从RabbitMQ接收后会进入这个Server进行处理,它是NewService的时候内部创建的;
  • h:使用了上一步创建的接收消息的函数 handle,Server中的方法会调用这个函数;
  • opts 是订阅的一些选项,这里需要指定RabbitMQ队列的名称;另外SubscriberContext定义了订阅的一些行为,这里DurableQueue设置RabbitMQ订阅消息的持久化方式,一般我们都希望消息不丢失,这个设置的作用是即使程序与RabbitMQ的连接断开,消息也会保存在RabbitMQ队列中;AckOnSuccess和RequeueOnError定义了程序处理消息出现错误时的行为,如果handle返回error,消息会重新返回RabbitMQ,然后再投递给程序。

4、然后这里为了演示,通过NewEvent创建了一个Event,通过它每隔一秒发送1条消息。

5、最后通过service.Run()把这个程序启动起来。

辛苦写了半天,看一下这个程序的运行效果:

注意一般发布者和订阅者是在不同的程序中,这里只是为了方便演示,才把他们放在一个程序中。所以如果只是发布消息,就不需要订阅的代码,如果只是订阅,也不需要发布消息的代码,大家使用的时候根据需要自己裁剪吧。

go-micro集成RabbitMQ的处理流程

这个部分来看一下消息在go-micro和RabbitMQ中是怎么流转的,我画了一个示意图:

这个图有点复杂,这里详细讲解下。

首先分成三块:RabbitMQ、消息发布部分、消息接收部分,这里用不同的颜色进行了区分。

  • RabbitMQ不是本文的重点,就把它看成一个整体就行了。
  • 消息发布部分:从生产者程序调用Event.Publish开始,然后调用Client.Publish,到这里为止,都是在go-micro的核心模块中进行处理;然后再调用Broker.Publish,这里的Broker是RabbitMQ插件的Broker实例,从这里开始进入了RabbiitMQ插件部分,然后再依次通过RabbitMQ Connection的Publish方法、RabbitMQ Channle的Publish方法,最终发送到RabbitMQ中。
  • 消息接收部分:Service.Run内部会调用rpcServer.Start,这个方法内部会调用Broker.Subscribe,这个方法是RabbitMQ插件中定义的,它会读取RegisterSubscriber时的一些RabbitMQ队列设置,然后再依次传递到RabbitMQ Connection的Consume方法、RabbitMQ Channel的ConsumeQueue方法,最终连接到RabbitMQ,并在RabbitMQ上设置好要订阅的队列;这些方法还会返回一个类型为amqp.Delivery的Go Channel,Broker.Subscribe不断的从这个Go Channel中读取数据,然后再发送到调用Broker.Subscribe时传入的一个消息处理方法中,这里就是rpcServer.HandleEvnet,消息经过一些处理后再进入rpcServer内部的路由处理模块,这里就是route.ProcessMessage,这个方法内部会根据当前消息的topic查找RegisterSubscriber时注册的订阅,并最终调用到当时注册的用于接收消息的函数。

这个处理过程还可以划分为业务部分、核心模块部分和插件部分。

  • 首先创建一个插件的Broker实现,把它注册到核心模块的rpcServer中;
  • 消息的发送从业务部分进入核心模块部分,再进入具体实现Broker的插件部分;
  • 消息的接收则首先进入插件部分,然后再流转到核心模块部分,再流转到业务部分。

从上边的图中可以看到消息都需要经过这个RabbitMQ插件进行处理,实际上可以只使用这个插件,就能实现消息的发送和接收。这个演示代码我已经提交到了Github,有兴趣的同学可以在文末获取Github仓库的地址。

从上边这些划分中,我们可以理解到设计者的整体设计思路,把握关键节点,用好用对,出现问题时可以快速定位。

填的几个坑

不能接收其它框架发布的消息

这个是因为route.ProcessMessage查找订阅时使用了go-micro专用的一个头信息:

// get the subscribers by topic
	subs, ok := router.subscribers[msg.Topic()]

这个msg.Topic返回的是如下实例中的topic字段:

	rpcMsg := &rpcMessage{
		topic:       msg.Header["Micro-Topic"],
		contentType: ct,
		payload:     &raw.Frame{Data: msg.Body},
		codec:       cf,
		header:      msg.Header,
		body:        msg.Body,
	}

其它框架不会有这么一个头信息,除非专门适配go-micro。

因为使用RabbitMQ的场景下,整个开发都是围绕RabbitMQ做的,而且go-micro的处理逻辑没有考虑RabbitMQ订阅可以使用通配符的情况,发布消息的Topic、接收消息的Topic与Micro-Topic的值匹配时都是按照是否相等的原则处理的,因此可以用RabbitMQ消息自带的topic来设置这个消息头。rabbitmq.rbroker.Subscribe 中接收到消息后,就可以进行这个设置:

// Messages sent from other frameworks to rabbitmq do not have this header.
		// The 'RoutingKey' in the message can be used as this header.
		// Then the message can be transfered to the subscriber which bind this topic.
		msgTopic := header["Micro-Topic"]
		if msgTopic == "" {
			header["Micro-Topic"] = msg.RoutingKey
		}

这样go-micro开发的消费者程序就能接收其它框架发布的消息了,其它框架无需适配。

RabbitMQ重启后订阅者和发布者无限阻塞

go-micro的RabbitMQ插件底层使用另一个库:github.com/streadway/amqp

对于发布者,RabbitMQ断开连接时amqp库会通过Go Channel同步通知go-micro,然后go-micro可以发起重新连接。问题出现在这个同步通知上,go-micro的RabbitMQ插件设置了接收连接和通道的关闭通知,但是只处理了一个通知就去重新连接了,这就导致有一个Go Channel一直阻塞,而这个阻塞会导致某个锁不能释放,这个锁又是Publish时候需要的,因此导致发布者无限阻塞。解决办法就是外层增加一个循环,等所有的通知都收到了,再去做重新连接。

对于订阅者,RabbitMQ断开连接时,它会一直阻塞在某个Go Channel上,直到它返回一个值,这个值代表连接已经重新建立,订阅者可以重建消费通道。问题也是出现在这个阻塞的Go Channel上,因为这个Go Channel在每次收到amqp的关闭通知时会重新赋值,而订阅者等待的Go Channel可能是之前的旧值,永远也不会返回,订阅者也就无限阻塞了。解决办法呢,就是在select时增加一个time.After,让等待的Go Channel有机会更新到新值。

代码就不贴了,有兴趣的可以到Github中去看:https://github.com/go-micro/plugins/commit/9f64710807221f3cc649ba4fe05f75b07c66c00c

关于这两个问题的修改已经合并到官方仓库中,大家去get最新的代码就可以了。

这两个坑填了,基本上就能满足我的需要了。当然可能还有其它的坑,比如go-micro的RabbitMQ插件好像没有发布者确认的功能,这个要实现,还得好好想想怎么改。

好了,以上就是本文的主要内容。

老规矩,代码已经上传到Github,欢迎访问:https://github.com/bosima/go-demo/tree/main/go-micro-broker-rabbitmq

到此这篇关于go-micro集成RabbitMQ实战和原理详解的文章就介绍到这了,更多相关go-micro集成RabbitMQ内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 关于golang监听rabbitmq消息队列任务断线自动重连接的问题

    golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务 需求背景: goalng常驻内存任务脚本监听rbmq执行任务 任务脚本由supervisor来管理 当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态 假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了 如果是短时间的停止重启,supervisor是可以即时唤醒该程序.如果服务器长时间没有恢复正常运行,程序就会出

  • Golang rabbitMQ生产者消费者实现示例

    目录 消费者 生产者 消费者 package main import ( "fmt" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { fmt.Println("%s: %s", msg, err) } } // 只能在安装 rabbitmq 的服务器上操作 func main() { conn, err := amqp.

  • go-micro集成RabbitMQ实战和原理详解

    目录 Broker的核心功能 发布 订阅 go-micro集成RabbitMQ实战 启动一个RabbitMQ 编写收发函数 编写主体代码 go-micro集成RabbitMQ的处理流程 填的几个坑 不能接收其它框架发布的消息 RabbitMQ重启后订阅者和发布者无限阻塞 在go-micro中异步消息的收发是通过Broker这个组件来完成的,底层实现有RabbitMQ.Kafka.Redis等等很多种方式,这篇文章主要介绍go-micro使用RabbitMQ收发数据的方法和原理. Broker的核

  • 基于RabbitMQ的简单应用(详解)

    虽然后台使用了读写分离技术,能够在一定程度上抗击高并发,但是如果并发量特别巨大时,主数据库不能同时处理高并发的请求,这时数据库容易宕机. 问题: 现在的问题是如何既能保证数据库正常运行,又能实现用户数据的入库操作? 解决方案: 引入rabbitMQ技术: 说明: 当数据库的访问压力过载时,这时会将过载以后的数据先保存到rabbitMQ中.其中的数据结构是队列的形式,先进先出.这时数据库从队列中取数据执行.一直到队列中的数据全部操作完成为止. RabbitMQ就是消息的中间件. RabbitMQ介

  • Spring事件Application Event原理详解

    这篇文章主要介绍了Spring 事件Application Event原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 Spring 的事件(Application Event)为 Bean 与 Bean 之间的消息通信提供了支持.当一个 Bean 处理完一个任务之后,希望另一个 Bean 知道并能做相应的处理,这时我们就需要让另一个 Bean 监听当前 Bean 所发送的事件.(观察者模式) Spring 的事件需要遵循以下流程: 自定

  • SpringBoot内置tomcat启动原理详解

    前言 不得不说SpringBoot的开发者是在为大众程序猿谋福利,把大家都惯成了懒汉,xml不配置了,连tomcat也懒的配置了,典型的一键启动系统,那么tomcat在springboot是怎么启动的呢? 内置tomcat 开发阶段对我们来说使用内置的tomcat是非常够用了,当然也可以使用jetty. <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-bo

  • Python集成学习之Blending算法详解

    一.前言 普通机器学习:从训练数据中学习一个假设. 集成方法:试图构建一组假设并将它们组合起来,集成学习是一种机器学习范式,多个学习器被训练来解决同一个问题. 集成方法分类为: Bagging(并行训练):随机森林 Boosting(串行训练):Adaboost; GBDT; XgBoost Stacking: Blending: 或者分类为串行集成方法和并行集成方法 1.串行模型:通过基础模型之间的依赖,给错误分类样本一个较大的权重来提升模型的性能. 2.并行模型的原理:利用基础模型的独立性,

  • Springboot整合实现邮件发送的原理详解

    目录 开发前准备 基础知识 进阶知识 加入依赖 配置邮件 测试邮件发送 通常在实际项目中,也有其他很多地方会用到邮件发送,比如通过邮件注册账户/找回密码,通过邮件发送订阅信息等等.SpringBoot集成邮件服务非常简单,通过简单的学习即可快速掌握邮件业务类的核心逻辑和企业邮件的日常服务 开发前准备 首先注册发件邮箱并设置客户端授权码,这里以QQ 免费邮箱为例,其他的邮箱的配置也大同小异. 登录 QQ 邮箱,点击设置->账户,开启IMAP/SMTP服务,并生成授权码. 基础知识 电子邮件需要在邮

  • Python学习之直方图均衡化原理详解

    目录 1.点算子 2.线性灰度变换 3.直方图均衡化 4.代码实战 1.点算子 点算子是两个像素灰度值间的映射关系,属于像素的逐点运算,相邻像素不参与运算.点算子是最简单的图像处理手段,如:亮度调整.对比度调整.颜色变换.直方图均衡化等等. 2.线性灰度变换 线性灰度变换表达为: 其中rk.sk分别为输入.输出点像素灰度值. ▲图2.1 线性灰度变换 当a>1时,输出图像像素灰度范围扩大,图像对比度增强,当a<1时反之.这是因为人眼不易区分相近的灰度值,因此若图像灰度值范围较小,观感上细节不够

  • OpenCV实现图像背景虚化效果原理详解

    目录 0 写在前面 1 小孔成像 2 光学成像 3 虚化效果 4 代码实战 0 写在前面 相信用过相机的同学都知道虚化特效,这是一种使焦点聚集在拍摄主题上,让背景变得朦胧的效果,例如本文最后实现的背景虚化效果 相机虚化特效背后的原理是什么?和计算机视觉有什么关系?本文带你研究这些问题. 1 小孔成像 小学我们就知道,没有光就不存在图像,为了产生图像,场景必须有一个或多个.直接或间接的光源. 如图所示,光照主要分为三类: 散射 直接光照 漫反射 在获得光源后,将产生从物体到检测平面的光线. 由于从

  • Python进阶之import导入机制原理详解

    目录 前言 1. Module组成 1.1 Module 内置全局变量 2. 包package 2.1 实战案例 3.sys.modules.命名空间 3.1 sys.modules 3.2 命名空间 4. 导入 4.1 绝对导入 4.2 相对导入 4.3 单独导入包 5. import运行机制 5.1 标准import,顶部导入 5.2 嵌套import 前言 在Python中,一个.py文件代表一个Module.在Module中可以是任何的符合Python文件格式的Python脚本.了解Mo

  • Spring AOP的实现原理详解及实例

    Spring AOP的实现原理详解及实例 spring 实现AOP是依赖JDK动态代理和CGLIB代理实现的. 以下是JDK动态代理和CGLIB代理简单介绍 JDK动态代理:其代理对象必须是某个接口的实现,它是通过在运行期间创建一个接口的实现类来完成对目标对象的代理. CGLIB代理:实现原理类似于JDK动态代理,只是它在运行期间生成的代理对象是针对目标类扩展的子类.CGLIB是高效的代码生成包,底层是依靠ASM(开源的Java字节码编辑类库)操作字节码实现的,性能比JDK强. 在Spring中

随机推荐