使用go实现一个超级mini的消息队列的示例代码

目录
  • 前言
  • 目的
  • 设计
    • 协议
    • 队列
    • broker
    • 删除消息
    • 生产者
    • 消费者
    • 启动
  • 总结

前言

趁着有空余时间,就想着撸一个mini的生产-消费消息队列,说干就干了。自己是个javer,这次实现,特意换用了go。没错,是零基础上手go,顺便可以学学go。

前置知识:

  1. go基本语法
  2. 消息队列概念,也就三个:生产者、消费者、队列

目的

  • 没想着实现多复杂,因为时间有限,就mini就好,mini到什么程度呢
  • 使用双向链表数据结构作为队列
  • 有多个topic可供生产者生成消息和消费者消费消息
  • 支持生产者并发写
  • 支持消费者读,且ok后,从队列删除
  • 消息不丢失(持久化)
  • 高性能(先这样想)

设计

整体架构

协议

通讯协议底层使用tcp,mq是基于tcp自定义了一个协议,协议如下

type Msg struct {
   Id int64
   TopicLen int64
   Topic string
   // 1-consumer 2-producer 3-comsumer-ack 4-error
   MsgType int64 // 消息类型
   Len int64 // 消息长度
   Payload []byte // 消息
}

Payload使用字节数组,是因为不管数据是什么,只当做字节数组来处理即可。Msg承载着生产者生产的消息,消费者消费的消息,ACK、和错误消息,前两者会有负载,而后两者负载和长度都为空

协议的编解码处理,就是对字节的处理,接下来有从字节转为Msg,和从Msg转为字节两个函数

func BytesToMsg(reader io.Reader) Msg {

   m := Msg{}
   var buf [128]byte
   n, err := reader.Read(buf[:])
   if err != nil {
      fmt.Println("read failed, err:", err)
   }
   fmt.Println("read bytes:", n)
   // id
   buff := bytes.NewBuffer(buf[0:8])
   binary.Read(buff, binary.LittleEndian, &m.Id)
   // topiclen
   buff = bytes.NewBuffer(buf[8:16])
   binary.Read(buff, binary.LittleEndian, &m.TopicLen)
   // topic
   msgLastIndex := 16 + m.TopicLen
   m.Topic = string(buf[16: msgLastIndex])
   // msgtype
   buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 8])
   binary.Read(buff, binary.LittleEndian, &m.MsgType)

   buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 16])
   binary.Read(buff, binary.LittleEndian, &m.Len)

   if m.Len <= 0 {
      return m
   }

   m.Payload = buf[msgLastIndex + 16:]
   return m
}

func MsgToBytes(msg Msg) []byte {
   msg.TopicLen = int64(len([]byte(msg.Topic)))
   msg.Len = int64(len([]byte(msg.Payload)))

   var data []byte
   buf := bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.Id)
   data = append(data, buf.Bytes()...)

   buf = bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.TopicLen)
   data = append(data, buf.Bytes()...)

   data = append(data, []byte(msg.Topic)...)

   buf = bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.MsgType)
   data = append(data, buf.Bytes()...)

   buf = bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.Len)
   data = append(data, buf.Bytes()...)
   data = append(data, []byte(msg.Payload)...)

   return data
}

队列

使用container/list,实现先入先出,生产者在队尾写,消费者在队头读取

package broker

import (
   "container/list"
   "sync"
)

type Queue struct {
   len int
   data list.List
}

var lock sync.Mutex

func (queue *Queue) offer(msg Msg) {
   queue.data.PushBack(msg)
   queue.len = queue.data.Len()
}

func (queue *Queue) poll() Msg{
   if queue.len == 0 {
      return Msg{}
   }
   msg := queue.data.Front()
   return msg.Value.(Msg)
}

func (queue *Queue) delete(id int64) {
   lock.Lock()
   for msg := queue.data.Front(); msg != nil; msg = msg.Next() {
      if msg.Value.(Msg).Id == id {
         queue.data.Remove(msg)
         queue.len = queue.data.Len()
         break
      }
   }
   lock.Unlock()
}

方法offer往队列里插入数据,poll从队列头读取数据素,delete根据消息ID从队列删除数据。这里使用Queue结构体对List进行封装,其实是有必要的,List作为底层的数据结构,我们希望隐藏更多的底层操作,只给客户提供基本的操作
delete操作是在消费者消费成功且发送ACK后,对消息从队列里移除的,因为消费者可以多个同时消费,所以这里进入临界区时加锁(em,加锁是否就一定会影响对性能有较大的影响呢)

broker

broker作为服务器角色,负责接收连接,接收和响应请求

package broker

import (
   "bufio"
   "net"
   "os"
   "sync"
   "time"
)

var topics = sync.Map{}

func handleErr(conn net.Conn)  {
   defer func() {
      if err := recover(); err != nil {
         println(err.(string))
         conn.Write(MsgToBytes(Msg{MsgType: 4}))
      }
   }()
}

func Process(conn net.Conn) {
   handleErr(conn)
   reader := bufio.NewReader(conn)
   msg := BytesToMsg(reader)
   queue, ok := topics.Load(msg.Topic)
   var res Msg
   if msg.MsgType == 1 {
      // comsumer
      if queue == nil || queue.(*Queue).len == 0{
         return
      }
      msg = queue.(*Queue).poll()
      msg.MsgType = 1
      res = msg
   } else if msg.MsgType == 2 {
      // producer
      if ! ok {
         queue = &Queue{}
         queue.(*Queue).data.Init()
         topics.Store(msg.Topic, queue)
      }
      queue.(*Queue).offer(msg)
      res = Msg{Id: msg.Id, MsgType: 2}
   } else if msg.MsgType == 3 {
      // consumer ack
      if queue == nil {
         return
      }
      queue.(*Queue).delete(msg.Id)

   }
   conn.Write(MsgToBytes(res))

}

MsgType等于1时,直接消费消息;MsgType等于2时是生产者生产消息,如果队列为空,那么还需创建一个新的队列,放在对应的topic下;MsgType等于3时,代表消费者成功消费,可以

删除消息

我们说消息不丢失,这里实现不完全,我就实现了持久化(持久化也没全部实现)。思路就是该topic对应的队列里的消息,按协议格式进行序列化,当broker启动时,从文件恢复
持久化需要考虑的是增量还是全量,需要保存多久,这些都会影响实现的难度和性能(想想Kafka和Redis的持久化),这里表示简单实现就好:定时器定时保存

func Save()  {
   ticker := time.NewTicker(60)
   for {
      select {
      case <-ticker.C:
         topics.Range(func(key, value interface{}) bool {
            if value == nil {
               return false
            }
            file, _ := os.Open(key.(string))
            if file == nil {
               file, _ = os.Create(key.(string))
            }
            for msg := value.(*Queue).data.Front(); msg != nil; msg = msg.Next() {
               file.Write(MsgToBytes(msg.Value.(Msg)))
            }
            _ := file.Close()
            return false
         })
      default:
         time.Sleep(1)
      }
   }
}

有一个问题是,当上面的delete操作时,这里的file文件需不需要跟着delete掉对应的消息?答案是需要删除的,如果不删除,只能等下一次的全量持久化来覆盖了,中间就有脏数据问题
下面是启动逻辑

package main

import (
   "awesomeProject/broker"
   "fmt"
   "net"
)

func main()  {
   listen, err := net.Listen("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.Print("listen failed, err:", err)
      return
   }
   go broker.Save()
   for {
      conn, err := listen.Accept()
      if err != nil {
         fmt.Print("accept failed, err:", err)
         continue
      }
      go broker.Process(conn)

   }
}

生产者

package main

import (
   "awesomeProject/broker"
   "fmt"
   "net"
)

func produce() {
   conn, err := net.Dial("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.Print("connect failed, err:", err)
   }
   defer conn.Close()

   msg := broker.Msg{Id: 1102, Topic: "topic-test",  MsgType: 2,  Payload: []byte("我")}
   n, err := conn.Write(broker.MsgToBytes(msg))
   if err != nil {
      fmt.Print("write failed, err:", err)
   }

   fmt.Print(n)
}

消费者

package main

import (
   "awesomeProject/broker"
   "bytes"
   "fmt"
   "net"
)

func comsume() {
   conn, err := net.Dial("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.Print("connect failed, err:", err)
   }
   defer conn.Close()

   msg := broker.Msg{Topic: "topic-test",  MsgType: 1}

   n, err := conn.Write(broker.MsgToBytes(msg))
   if err != nil {
      fmt.Println("write failed, err:", err)
   }
   fmt.Println("n", n)

   var res [128]byte
   conn.Read(res[:])
   buf := bytes.NewBuffer(res[:])
   receMsg := broker.BytesToMsg(buf)
   fmt.Print(receMsg)

   // ack
   conn, _ = net.Dial("tcp", "127.0.0.1:12345")
   l, e := conn.Write(broker.MsgToBytes(broker.Msg{Id: receMsg.Id, Topic: receMsg.Topic, MsgType: 3}))
   if e != nil {
      fmt.Println("write failed, err:", err)
   }
   fmt.Println("l:", l)
}

消费者这里ack时重新创建了连接,如果不创建连接的话,那服务端那里就需要一直从conn读取数据,直到结束。思考一下,像RabbitMQ的ack就有自动和手工的ack,如果是手工的ack,必然需要一个新的连接,因为不知道客户端什么时候发送ack,自动的话,当然可以使用同一个连接,but这里就简单创建一条新连接吧

启动

先启动broker,再启动producer,然后启动comsumer,OK,能跑,能实现发送消息到队列,从队列消费消息

总结

整体虽然简单,但毕竟是使用go实现的,就是看似一顿操作猛如虎,实质慌如狗。第一时间就被go的gopath和go mod困扰住,后面语法的使用,比如指针,传值传引用等,最头疼的就是类型转换,作为一个javer,使用go进行类型转换,着实被狠狠得虐了一番。

到此这篇关于使用go实现一个超级mini的消息队列的示例代码的文章就介绍到这了,更多相关go mini消息队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们! 

(0)

相关推荐

  • golang实现redis的延时消息队列功能示例

    前言 在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现.本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo. 提前准备 安装redis, redis-go 因为用的是macOS, 直接 $ brew install redis $ go get github.com/garyburd/redigo/redis 又因为比较懒,生成任

  • Golang中优秀的消息队列NSQ基础安装及使用详解

    前言 NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息.NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障.故障容错.高可用性以及能够保证消息的可靠传递的特征,是一个成熟的.已在大规模生成环境下应用的产品. 背景介绍 在服务器最开始的时候,基本上在一台主机上就能解决大部分问题,所以一般架构设计如下: 但是,突然某一天,来了一个新需求,我们服务器上不只是简单的储存一些文本信息,我们需要储存图片甚至视频,显然直接在一台主机上再部署一个

  • 使用go实现一个超级mini的消息队列的示例代码

    目录 前言 目的 设计 协议 队列 broker 删除消息 生产者 消费者 启动 总结 前言 趁着有空余时间,就想着撸一个mini的生产-消费消息队列,说干就干了.自己是个javer,这次实现,特意换用了go.没错,是零基础上手go,顺便可以学学go. 前置知识: go基本语法 消息队列概念,也就三个:生产者.消费者.队列 目的 没想着实现多复杂,因为时间有限,就mini就好,mini到什么程度呢 使用双向链表数据结构作为队列 有多个topic可供生产者生成消息和消费者消费消息 支持生产者并发写

  • Java利用Redis实现消息队列的示例代码

    本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下: 应用场景 为什么要用redis? 二进制存储.java序列化传输.IO连接数高.连接频繁 一.序列化 这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口; 其代码如下: package Utils

  • python实现RabbitMQ的消息队列的示例代码

    最近在研究redis做消息队列时,顺便看了一下RabbitMQ做消息队列的实现.以下是总结的RabbitMQ中三种exchange模式的实现,分别是fanout, direct和topic. base.py: import pika # 获取认证对象,参数是用户名.密码.远程连接时需要认证 credentials = pika.PlainCredentials("admin", "admin") # BlockingConnection(): 实例化连接对象 # C

  • C#调用RabbitMQ实现消息队列的示例代码

    前言 我在刚接触使用中间件的时候,发现,中间件的使用并不是最难的,反而是中间件的下载,安装,配置才是最难的. 所以,这篇文章我们从头开始学习RabbitMq,真正的从头开始. 关于消息队列 其实消息队列没有那么神秘,我们这样想一下,用户访问网站,最终是要将数据以HTTP的协议的方式,通过网络传输到主机的某个端口上的. 那么,接收数据的方式是什么呢?自然是端口监听啦. 那消息队列是什么就很好解释了? 它就是端口监听,接到数据后,将数据排列起来. 那这件事,我们不用中间件能做吗? 当然能做啦,写个T

  • java多线程消息队列的实现代码

    本文介绍了java多线程消息队列的实现代码,分享给大家,希望对大家有帮助,顺便也自己留个笔记 1.定义一个队列缓存池: //static修饰的成员变量和成员方法独立于该类的任何对象.也就是说,它不依赖类特定的实例,被类的所有实例共享. private static List<Queue> queueCache = new LinkedList<Queue>(); 2.定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行. private Integer

  • Python实现RabbitMQ6种消息模型的示例代码

    RabbitMQ与Redis对比 ​ RabbitMQ是一种比较流行的消息中间件,之前我一直使用redis作为消息中间件,但是生产环境比较推荐RabbitMQ来替代Redis,所以我去查询了一些RabbitMQ的资料.相比于Redis,RabbitMQ优点很多,比如: 具有消息消费确认机制 队列,消息,都可以选择是否持久化,粒度更小.更灵活. 可以实现负载均衡 RabbitMQ应用场景 异步处理:比如用户注册时的确认邮件.短信等交由rabbitMQ进行异步处理 应用解耦:比如收发消息双方可以使用

  • SpringBoot+Netty+WebSocket实现消息发送的示例代码

    一.导入Netty依赖 <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency> 二.搭建websocket服务器 @Component public class WebSocketServer { /** * 主线程池 */

  • Spring boot 整合KAFKA消息队列的示例

    这里使用 spring-kafka 依赖和 KafkaTemplate 对象来操作 Kafka 服务. 一.添加依赖和添加配置项 1.1.在 Pom 文件中添加依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 1.2.添加配置项 spring: kafka: b

  • Android未读消息拖动气泡示例代码详解(附源码)

    前言 拖动清除未读消息可以说在很多应用中都很常见,也被用户广泛接受.本文是一个可以供参考的Demo,希望能有帮助. 提示:以下是本篇文章正文内容,下面案例可供参考 最终效果图及思路 实现关键: 气泡中间的两条边,分别是以ab,cd为数据点,G为控制点的贝塞尔曲线. 步骤: 绘制圆背景以及文本:连接情况绘制贝塞尔曲线:另外端点绘制一个圆 关键代码 1.定义,初始化等 状态:静止.连接.分离.消失 在onSizeChanged中初始化状态,固定气泡以及可动气泡的圆心 代码如下(示例): @Overr

  • PHP实现RabbitMQ消息列队的示例代码

    目录 业务场景 1.首先部署好thinkphp6框架 2.安装workerman扩展 3.生产者 4.消费者 5.整体测试 业务场景 项目公司是主php做开发的,框架为thinkphp.众所周知,php本身的运行效率存在一定的缺陷,所以如果有一个很复杂很耗时的业务时,必须开发一个常驻内存的程序.首先我想到了php的workerman与swoole,但是这里应上面的标题哈,想将耗时任务交给另一个服务器,同时列队处理.所以这里我想独立部署一个rabbitMQ服务器用于处理列队任务. 当rabbitM

随机推荐