Golang中优秀的消息队列NSQ基础安装及使用详解

前言

NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。

背景介绍

在服务器最开始的时候,基本上在一台主机上就能解决大部分问题,所以一般架构设计如下:

但是,突然某一天,来了一个新需求,我们服务器上不只是简单的储存一些文本信息,我们需要储存图片甚至视频,显然直接在一台主机上再部署一个文件服务是不科学的,因为它会占用大量的流量,这时候我们就要考虑单独使用一个文件服务器了,所以我们调整一下架构:

通过nginx这个高性能代理服务器,有效的处理不同用户请求的诉求:

然后接下来某一天,随着用户越来越多,你发现服务器的负载越来越高,这个时候,就要考虑去分离业务服务和数据库服务了,设计再次修改:

接下来又过了一些日子,新的问题又出现了,你们公司的业务越做越大,用户越来越多,有用户投诉,收到的响应延迟很大甚至出现了无响应的情况,你检查了一下服务器,发现每天峰值的时候,有一小部分数据是经常要使用的,频繁的从数据库读出这部分数据,占用了大量的数据库资源,然后我们的设计又要改了:

把一些常用的数据储存到缓存中,遵循国际惯例的二八原则(80%的请求读取20%的数据),这样一来,数据库的压力负担自然可以减少很多。

当你觉得高枕无忧的时候,突然老板一时兴起,我们来搞一个秒杀活动吧,这时候你一定会绞尽脑汁解决不能多买多卖的问题(一瞬间高并发的常见问题),好吧,终于轮到我们消息队列出场了(当然消息队列不是唯一的解决方案),我们先把设计贴出来:

好的,所有的请求都先注入消息队列,然后立刻给予请求响应,因为注入消息队列实际上向内存写入数据的过程,所以响应的速度会非常的快,然后后面的业务服务器再根据消息的队列的内容逐个读出(相当于异步读取),所以可以大大削减数据库的压力,避免多买多卖的问题。当然后面随着业务的扩大,还有很多问题要解决,比如使用负载均衡搭建多个业务服务器,使用分布式部署数据库,搭载高可用架构等,但是今天只是仅仅为了引出消息队列它的应用场景,所以我们就不往下讨论了。

正文

打开 https://nsq.io/deployment/installing.html 下载对应的nsq版本,我下载的是linux最新稳定版

下载解压之后,在/usr/下建立一个目录,接着把解压文件夹/bin/下面的文件全部拷贝进去,最后在/etc/profile添加引用路径,这样就可以直接使用命令启动nsq服务了,我的配置如下

我们先介绍一下几个必要服务的作用

nsqlookupd:

主要负责服务发现 负责nsqd的心跳、状态监测,给客户端、nsqadmin提供nsqd地址与状态,启动命令如下:

nsqd:

负责接收消息,存储队列和将消息发送给客户端,nsqd 可以多机器部署,当你使用客户端向一个topic发送消息时,可以配置多个nsqd地址,消息会随机的分配到各个nsqd上,nsqd优先把消息存储到内存channel中,当内存channel满了之后,则把消息写到磁盘文件中。他监听了两个tcp端口,一个用来服务客户端,一个用来提供http的接口 ,启动命令如下:

nsqadmin:

nsqadmin是一个web管理界面 启动方式如下:

启动之后,通过 http://10.10.6.147:4171/ 可以访问这个管理页面, 默认使用4171端口

我们先来说明一下这个后台里面的一些内容,因为我们的NSQ所使用的是经典的pub/sub模式(发布/订阅,典型的生产者/消费者模式),我们可以先发布一个主题到NSQ,然后所有订阅的服务器就会异步的从这里读取主题的内容:

Topic(左上角):发布的主题名字

NSQd Host:Nsq主机服务地址

Channel:消息通道

NSQd Host:Nsq主机服务地址

Depth:消息积压量

In-flight:已经投递但是还未消费掉的消息

Deferred:没有消费掉的延时消息

Messages:服务器启动之后,总共接收到的消息量

Connections:通道里面客户端的订阅数

TimeOut:超时时间内没有被响应的消息数

Memory + Disk:储存在内存和硬盘中总共的消息数

----------------------------------------华丽的分割线----------------------------------------

接着,我们讲解如何在代码中发布主题内容,然后通过订阅某主题去异步读取消息

使用官方提供的下载地址:

go get github.com/nsqio/go-nsq

先创建一个主题,并且发布100条消息:

package main
import (
 "github.com/nsqio/go-nsq"
 "fmt"
)

var (
 //nsqd的地址,使用了tcp监听的端口
 tcpNsqdAddrr = "10.10.6.147:4150"
)

func main() {
 //初始化配置
 config := nsq.NewConfig()
 for i := 0; i < 100; i++ {
 //创建100个生产者
 tPro, err := nsq.NewProducer(tcpNsqdAddrr, config)
 if err != nil {
 fmt.Println(err)
 }
 //主题
 topic := "Insert"
 //主题内容
 tCommand := "new data!"
 //发布消息
 err = tPro.Publish(topic, []byte(tCommand))
 if err != nil {
 fmt.Println(err)
 }
 }
}

接下来我们看看admin的显示内容:

我们可以看到Nsqd接收到了100条信息,100条信息都储存在内存中,没有被消化。

现在没有任何服务订阅了我们的主题,所以主题的消息都没有被消化,那我们创建一个消费者去订阅我们的主题:

package main
import (
 "github.com/nsqio/go-nsq"
 "fmt"
 "sync"
 "time"
)

var (
 //nsqd的地址,使用了tcp监听的端口
 tcpNsqdAddrr = "10.10.6.147:4150"
)

//声明一个结构体,实现HandleMessage接口方法(根据文档的要求)
type NsqHandler struct {
 //消息数
 msqCount int64
 //标识ID
 nsqHandlerID string
}

//实现HandleMessage方法
//message是接收到的消息
func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
 //没收到一条消息+1
 s.msqCount++
 //打印输出信息和ID
 fmt.Println(s.msqCount,s.nsqHandlerID)
 //打印消息的一些基本信息
 fmt.Printf("msg.Timestamp=%v, msg.nsqaddress=%s,msg.body=%s \n", time.Unix(0 , message.Timestamp).Format("2006-01-02 03:04:05") , message.NSQDAddress, string(message.Body))
 return nil
}
func main() {

 //初始化配置
 config := nsq.NewConfig()
 //创造消费者,参数一时订阅的主题,参数二是使用的通道
 com, err := nsq.NewConsumer("Insert", "channel1", config)
 if err != nil {
 fmt.Println(err)
 }
 //添加处理回调
 com.AddHandler(&NsqHandler{nsqHandlerID:"One"})
 //连接对应的nsqd
 err = com.ConnectToNSQD(tcpNsqdAddrr)
 if err != nil {
 fmt.Println(err)
 }

 //只是为了不结束此进程,这里没有意义
 var wg = &sync.WaitGroup{}
 wg.Add(1)
 wg.Wait()
 /*
 result:
 msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data!
 98 One
 msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data!
 99 One
 msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data!
 100 One
 msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data!
 */
}

这里可以看到,之前挤压的100条信息,都被我们的订阅者消化掉了,也就是读取了

所以我们的订阅者(可以有多个)如果提前订阅主题的话,只要对应的主题有发布新内容,就可以马上异步读取。

完结

消息队列的应用场景还有很多,Nsq这里也只是先介绍了一下入门知识,有兴趣的朋友可以继续深入了解。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。如有错误或未考虑完全的地方,望不吝赐教。

(0)

相关推荐

  • golang实现redis的延时消息队列功能示例

    前言 在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现.本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo. 提前准备 安装redis, redis-go 因为用的是macOS, 直接 $ brew install redis $ go get github.com/garyburd/redigo/redis 又因为比较懒,生成任

  • Golang中基础的命令行模块urfave/cli的用法说明

    前言 相信只要部署过线上服务,都知道启动参数一定是必不可少的,当你在不同的网络.硬件.软件环境下去启动一个服务的时候,总会有一些启动参数是不确定的,这时候就需要通过命令行模块去解析这些参数,urfave/cli是Golang中一个简单实用的命令行工具. 安装 通过 go get github.com/urfave/cli 命令即可完成安装. 正文 使用了urfave/cli之后,你的程序就会变成一个命令行程序,以下就是通过urfave/cli创建的一个最简单的命令行程序,它设定了一些基础的信息,

  • 聊聊Golang中很好用的viper配置模块

    前言 viper 支持Yaml.Json. TOML.HCL 等格式,读取非常的方便. 安装 go get github.com/spf13/viper 如果提示找不到golang.org/x/text/这个库,是因为golang.org/x/text/这个库在GitHub上托管的路径不一致. 解决办法: 可以从https://github.com/golang/text下载源码下来,然后到$GOPATH/src下面创建golang.org/x/文件夹(已存在的忽略),把压缩包的文件解压到gol

  • golang 将[]byte转成16进制的实现

    将[]byte转成16进制 import "crypto/md5" import "fmt" sign := md5.Sum([]byte("date string")) signStr := fmt.Sprintf("%x", sign) //将[]byte转成16进制 补充:golang []byte存储存储的16进制转10进制 项目中有用[]byte存储16进制需要转到10进制,如果用系统自带的函数处理,需要先将[]by

  • Golang使用第三方包viper读取yaml配置信息操作

    Golang有很多第三方包,其中的 viper 支持读取多种配置文件信息.本文只是做一个小小demo,用来学习入门用的. 1.安装 go get github.com/spf13/viper 2.编写一个yaml的配置文件,config.yaml database: host: 127.0.0.1 user: root dbname: test pwd: 123456 3.编写学习脚本main.go,读取config.yaml配置信息 package main import ( "fmt&quo

  • Golang中优秀的消息队列NSQ基础安装及使用详解

    前言 NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息.NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障.故障容错.高可用性以及能够保证消息的可靠传递的特征,是一个成熟的.已在大规模生成环境下应用的产品. 背景介绍 在服务器最开始的时候,基本上在一台主机上就能解决大部分问题,所以一般架构设计如下: 但是,突然某一天,来了一个新需求,我们服务器上不只是简单的储存一些文本信息,我们需要储存图片甚至视频,显然直接在一台主机上再部署一个

  • golang中for循环遍历channel时需要注意的问题详解

    前言 for循环是Go语言唯一的循环结构,最近在做一个基于RabbitMQ的应用,由于官方的qos没有golang的版本,所以出了一点问题. 问题代码如下: _, ch, err := component.NewRabbitMQ() if err != nil { panic(err) } if err := ch.Qos(10, 0, true); err != nil { panic(err) } msgs, err := ch.Consume("push", "&quo

  • 关于golang监听rabbitmq消息队列任务断线自动重连接的问题

    golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务 需求背景: goalng常驻内存任务脚本监听rbmq执行任务 任务脚本由supervisor来管理 当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态 假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了 如果是短时间的停止重启,supervisor是可以即时唤醒该程序.如果服务器长时间没有恢复正常运行,程序就会出

  • Java基础之代码死循环详解

    一.前言 代码死循环这个话题,个人觉得还是挺有趣的.因为只要是开发人员,必定会踩过这个坑.如果真的没踩过,只能说明你代码写少了,或者是真正的大神. 尽管很多时候,我们在极力避免这类问题的发生,但很多时候,死循环却悄咪咪的来了,坑你于无形之中.我敢保证,如果你读完这篇文章,一定会对代码死循环有一些新的认识,学到一些非常实用的经验,少走一些弯路. 二.死循环的危害 我们先来一起了解一下,代码死循环到底有哪些危害? 程序进入假死状态, 当某个请求导致的死循环,该请求将会在很大的一段时间内,都无法获取接

  • Kotlin协程的基础与使用示例详解

    目录 一.协程概述 1.概念 2.特点 3.原理 1)续体传递 2)状态机 二.协程基础 1.协程的上下文 2.协程的作用域 3.协程调度器 4.协程的启动模式 5.协程的生命周期 1)协程状态的转换 2)状态标识的变化 三.协程使用 1.协程的启动 1)runBlocking方法 2)launch方法 3)async方法 4)suspend关键字 5)withContext方法 6)suspend方法 2.协程间通信 1)Channel 2)Channel的容量 3)produce方法与act

  • PHP队列场景以及实现代码实例详解

    为了降低单点压力,通常会根据业务情况进行分表分库,将表分布在不同的库中(库可能分布在不同的机器上),但是一个业务场景可能会同时处理两个表的操作.在这种场景下,事务的提交会变得相对复杂,因为多个节点(库)的存在,可能存在部分节点提交失败的情况,即事务的ACID特性需要在各个不同的数据库实例中保证.比如更新db1库的A表时,必须同步更新db2库的B表,两个更新形成一个事务,要么都成功,要么都失败. 那么我们如何利用mysql实现分布式数据库的事务呢? mysql是从5.0开始支持分布式事务 这里先声

  • springboot整合websocket最基础入门使用教程详解

    项目最终的文件结构 1 添加maven依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <

  • Java基础之集合框架详解

    一.前言 本节学习到的内容有以下5类,不分先后顺序: 集合Collection体系结构 List子类 与集合结合使用的迭代器对象 集合与数组的区别? 常见的一般数据结构整理 二.集合的由来? Collection List ArrayList Vector LinkedList Set hashSet treeSet 在集合没有出现之前,使用对象数组来存储对象,但是,对象数组的长度一旦确定,则不可以发生变化,所以我们希望存在一个容器就像StringBuffer一样存储字符串,同时依据传入的值的个

  • Python基础之hashlib模块详解

    一.hashlib简介 1.什么叫hash: hash是一种算法(不同的hash算法只是复杂度不一样)(3.x里代替了md5模块和sha模块,主要提供 SHA1, SHA224, SHA256, SHA384, SHA512 ,MD5 算法),该算法接受传入的内容,经过运算得到一串hash值 2.hash值的特点是(hash值/产品有三大特性:): 只要传入的内容一样,得到的hash值必然一样=====>要用明文传输密码文件完整性校验 不能由hash值返解成内容=======>把密码做成has

  • Java数据结构之图的基础概念和数据模型详解

    目录 图的实际应用 图的定义及分类 图的相关术语 图的存储结构 邻接矩阵 邻接表 图的实现 图的API设计 代码实现 图的实际应用 在现实生活中,有许多应用场景会包含很多点以及点点之间的连接,而这些应用场景我们都可以用即将要学习的图这种数据结构去解决. 地图: 我们生活中经常使用的地图,基本上是由城市以及连接城市的道路组成,如果我们把城市看做是一个一个的点,把道路看做是一条一条的连接,那么地图就是我们将要学习的图这种数据结构. 图的定义及分类 定义: 图是由一组顶点和一组能够将两个顶点相连的边组

随机推荐