Go+Kafka实现延迟消息的实现示例

目录
  • 前言
  • 原理
  • 简单的实现
    • 生产者
    • 延迟服务
    • 消费者
  • 改进点
    • 通用的延迟服务
    • 生产者负责延迟服务
  • 总结

前言

延迟队列是一个非常有用的工具,我们经常遇到需要使用延迟队列的场景,比如延迟通知,订单关闭等等。

这篇文章主要是使用Go+Kafka实现延迟消息。

使用了sarama客户端。

原理

Kafka实现延迟消息分为下面三步:

  • 生产者把消息发送到延迟队列
  • 延迟服务把延迟队列里超过延迟时间的消息写入真实队列
  • 消费者消费真实队列里的消息

简单的实现

生产者

生产者只是把消息发送到延迟队列

msg := &sarama.ProducerMessage{
   Topic: kafka_delay_queue_test.DelayTopic,
   Value: sarama.ByteEncoder("test" + strconv.Itoa(i)),
}
if _, _, err := producer.SendMessage(msg); err != nil {
   log.Println(err)
}

延迟服务

延迟服务会订阅延迟队列的消息,并把超时消息发送到真实队列

if err = consumerGroup.Consume(context.Background(),
   []string{kafka_delay_queue_test.DelayTopic}, consumer); err != nil {
   break
}
type Consumer struct {
   producer sarama.SyncProducer
   delay    time.Duration
}

func NewConsumer(producer sarama.SyncProducer, delay time.Duration) *Consumer {
   return &Consumer{
      producer: producer,
      delay:    delay,
   }
}

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for message := range claim.Messages() {
      // 如果消息已经超时,把消息发送到真实队列
      now := time.Now()
      if now.Sub(message.Timestamp) >= c.delay {
         _, _, err := c.producer.SendMessage(&sarama.ProducerMessage{
            Topic: kafka_delay_queue_test.RealTopic,
            Key:   sarama.ByteEncoder(message.Key),
            Value: sarama.ByteEncoder(message.Value),
         })
         if err == nil {
            session.MarkMessage(message, "")
         }
         continue
      }
      // 否则休眠一秒
      time.Sleep(time.Second)
      return nil
   }
   return nil
}

消费者

消费者只是订阅真实队列并消费消息

if err = consumerGroup.Consume(context.Background(),
   []string{kafka_delay_queue_test.RealTopic}, consumer); err != nil {
   break
}
type Consumer struct{}

func NewConsumer() *Consumer {
   return &Consumer{}
}

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for message := range claim.Messages() {
      fmt.Println("收到消息:", message.Value, message.Timestamp)
      session.MarkMessage(message, "")
   }
   return nil
}

改进点

通用的延迟服务

可以把延迟服务封装成一个通用的服务,这样生产者可以直接把消息发送给延迟服务,让延迟服务去处理剩下的逻辑。

延迟服务可以提供多个延时等级,比如5s、10s、30s、1m、5m、10m、1h、2h等,类似于RocketMQ。

生产者负责延迟服务

也可以让生产者负责延迟服务,让生产者自己把延迟队列里面的消息发送到真实队列。

下面是一个简单的实现:

// KafkaDelayQueueProducer 延迟队列生产者,包含了生产者和延迟服务
type KafkaDelayQueueProducer struct {
   producer   sarama.SyncProducer // 生产者
   delayTopic string              // 延迟服务主题
}

// NewKafkaDelayQueueProducer 创建延迟队列生产者
// producer 生产者
// delayServiceConsumerGroup 延迟服务消费者
// delayTime 延迟时间
// delayTopic 延迟服务主题
// realTopic 真实队列主题
func NewKafkaDelayQueueProducer(producer sarama.SyncProducer, delayServiceConsumerGroup sarama.ConsumerGroup,
   delayTime time.Duration, delayTopic, realTopic string) *KafkaDelayQueueProducer {
   // 启动延迟服务
   consumer := NewDelayServiceConsumer(producer, delayTime, realTopic)
   go func() {
      for {
         if err := delayServiceConsumerGroup.Consume(context.Background(),
            []string{delayTopic}, consumer); err != nil {
            break
         }
      }
   }()
   return &KafkaDelayQueueProducer{
      producer:   producer,
      delayTopic: delayTopic,
   }
}

// SendMessage 发送消息
func (q *KafkaDelayQueueProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
   msg.Topic = q.delayTopic
   return q.producer.SendMessage(msg)
}

// DelayServiceConsumer 延迟服务消费者
type DelayServiceConsumer struct {
   producer  sarama.SyncProducer
   delay     time.Duration
   realTopic string
}

func NewDelayServiceConsumer(producer sarama.SyncProducer, delay time.Duration,
   realTopic string) *DelayServiceConsumer {
   return &DelayServiceConsumer{
      producer:  producer,
      delay:     delay,
      realTopic: realTopic,
   }
}

func (c *DelayServiceConsumer) ConsumeClaim(session sarama.ConsumerGroupSession,
   claim sarama.ConsumerGroupClaim) error {
   for message := range claim.Messages() {
      // 如果消息已经超时,把消息发送到真实队列
      now := time.Now()
      if now.Sub(message.Timestamp) >= c.delay {
         _, _, err := c.producer.SendMessage(&sarama.ProducerMessage{
            Topic: c.realTopic,
            Key:   sarama.ByteEncoder(message.Key),
            Value: sarama.ByteEncoder(message.Value),
         })
         if err == nil {
            session.MarkMessage(message, "")
         }
         continue
      }
      // 否则休眠一秒
      time.Sleep(time.Second)
      return nil
   }
   return nil
}

func (c *DelayServiceConsumer) Setup(sarama.ConsumerGroupSession) error {
   return nil
}

func (c *DelayServiceConsumer) Cleanup(sarama.ConsumerGroupSession) error {
   return nil
}

总结

使用中间队列+轮询可以很容易的在Kafka实现延迟消息,如果需要一个通用的延迟队列也可以实现一个通用的延迟服务,也可以让消费者负责延迟服务的功能。

完整代码:

到此这篇关于Go+Kafka实现延迟消息的实现示例的文章就介绍到这了,更多相关Go Kafka延迟消息内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • GO语言延迟函数defer用法分析

    本文实例讲述了GO语言延迟函数defer用法.分享给大家供大家参考.具体分析如下: defer 在声明时不会立即执行,而是在函数 return 后,再按照 FILO (先进后出)的原则依次执行每一个 defer,一般用于异常处理.释放资源.清理数据.记录日志等.这有点像面向对象语言的析构函数,优雅又简洁,是 Golang 的亮点之一. 代码1:了解 defer 的执行顺序 复制代码 代码如下: package main import "fmt" func fn(n int) int {

  • Golang学习笔记之延迟函数(defer)的使用小结

    golang的defer优雅又简洁, 是golang的亮点之一.defer在声明时不会立即执行,而是在函数return后,再按照先进后出的原则依次执行每个defer,一般用于释放资源.清理数据.记录日志.异常处理等. 关键字defer于注册延迟调用.这些调用直到 ret 前才被执行,通常用于释放资源或错误处理. 一.当defer被声明时,其参数就会被实时解析 func a() { i := 0 defer fmt.Println(i) //输出0,因为i此时就是0 i++ defer fmt.P

  • Go语言中的延迟函数defer示例详解

    前言 大家都知道go语言的defer功能很强大,对于资源管理非常方便,但是如果没用好,也会有陷阱哦.Go 语言中延迟函数 defer 充当着 try...catch 的重任,使用起来也非常简便,然而在实际应用中,很多 gopher 并没有真正搞明白 defer.return.返回值.panic 之间的执行顺序,从而掉进坑中,今天我们就来揭开它的神秘面纱!话不多说了,来一起看看详细的介绍吧. 先来运行下面两段代码: A. 匿名返回值的情况 package main import ( "fmt&qu

  • go-zero 如何应对海量定时/延迟任务

    一个系统中存在着大量的调度任务,同时调度任务存在时间的滞后性,而大量的调度任务如果每一个都使用自己的调度器来管理任务的生命周期的话,浪费cpu的资源而且很低效. 本文来介绍 go-zero 中 延迟操作,它可能让开发者调度多个任务时,只需关注具体的业务执行函数和执行时间「立即或者延迟」.而 延迟操作,通常可以采用两个方案: Timer:定时器维护一个优先队列,到时间点执行,然后把需要执行的 task 存储在 map 中collection 中的 timingWheel ,维护一个存放任务组的数组

  • Golang之defer 延迟调用操作

    前言 defer语句被用于预定对一个函数的调用.我们把这类被defer语句调用的函数称为延迟函数.而defer 延迟语句在其他编程语言里好像没有见到.应该是属于 Go 语言里的独有的关键字.但用法类似于面向对象编程语言 Java 和 C# 的 finally 语句块. 下面对defer进行介绍. defer特性 1. 关键字 defer 用于注册延迟调用. 2. 这些调用直到 return 前才被执.因此,可以用来做资源清理. 3. 多个defer语句,按先进后出的方式执行. 1.延迟调用 用法

  • Go+Kafka实现延迟消息的实现示例

    目录 前言 原理 简单的实现 生产者 延迟服务 消费者 改进点 通用的延迟服务 生产者负责延迟服务 总结 前言 延迟队列是一个非常有用的工具,我们经常遇到需要使用延迟队列的场景,比如延迟通知,订单关闭等等. 这篇文章主要是使用Go+Kafka实现延迟消息. 使用了sarama客户端. 原理 Kafka实现延迟消息分为下面三步: 生产者把消息发送到延迟队列 延迟服务把延迟队列里超过延迟时间的消息写入真实队列 消费者消费真实队列里的消息 简单的实现 生产者 生产者只是把消息发送到延迟队列 msg :

  • Java Kafka实现延迟队列的示例代码

    目录 基于kafka如何实现延迟队列 完善细节 Java代码实现 还需要做什么 kafka作为一个使用广泛的消息队列,很多人都不会陌生,但当你在网上搜索“kafka 延迟队列”,出现的都是一些讲解时间轮或者只是提供了一些思路,并没有一份真实可用的代码实现,今天我们就来打破这个现象,提供一份可运行的代码,抛砖引玉,吸引更多的大神来分享. 基于kafka如何实现延迟队列 想要解决一个问题,我们需要先分解问题.kafka作为一个高性能的消息队列,只要消费能力足够,发出的消息都是会立刻收到的,因此我们需

  • Spring Boot RabbitMQ 延迟消息实现完整版示例

    概述 曾经去网易面试的时候,面试官问了我一个问题,说 下完订单后,如果用户未支付,需要取消订单,可以怎么做 我当时的回答是,用定时任务扫描DB表即可.面试官不是很满意,提出: 用定时任务无法做到准实时通知,有没有其他办法? 我当时的回答是: 可以用队列,订单下完后,发送一个消息到队列里,并指定过期时间,时间一到,执行回调接口. 面试官听完后,就不再问了.其实我当时的思路是对的,只不过讲的不是很专业而已.专业说法是利用 延迟消息 . 其实用定时任务,确实有点问题,原本业务系统希望10分钟后,如果订

  • Spring boot 整合KAFKA消息队列的示例

    这里使用 spring-kafka 依赖和 KafkaTemplate 对象来操作 Kafka 服务. 一.添加依赖和添加配置项 1.1.在 Pom 文件中添加依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 1.2.添加配置项 spring: kafka: b

  • 详解Spring Cloud Stream使用延迟消息实现定时任务(RabbitMQ)

    我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送.对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理. 为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用.RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍

  • Java 指定微信好友自动发送消息的实现示例

    Backgroud 原理很简单:robot类模拟键盘输入,快捷键打开微信,搜索好友,把发送内容发送到粘贴板实现. 程序源码 package com.cloudansys.test; import java.awt.*; import java.awt.datatransfer.Clipboard; import java.awt.datatransfer.StringSelection; import java.awt.datatransfer.Transferable; import java

  • 使用go实现一个超级mini的消息队列的示例代码

    目录 前言 目的 设计 协议 队列 broker 删除消息 生产者 消费者 启动 总结 前言 趁着有空余时间,就想着撸一个mini的生产-消费消息队列,说干就干了.自己是个javer,这次实现,特意换用了go.没错,是零基础上手go,顺便可以学学go. 前置知识: go基本语法 消息队列概念,也就三个:生产者.消费者.队列 目的 没想着实现多复杂,因为时间有限,就mini就好,mini到什么程度呢 使用双向链表数据结构作为队列 有多个topic可供生产者生成消息和消费者消费消息 支持生产者并发写

  • RocketMQ延迟消息超详细讲解

    目录 一.什么是延时消息 二.延时消息等级 三.延时消息使用场景 四.延时消息示例 五.延时消息实现原理 一.什么是延时消息 当消息写入到Broker后,不能立刻被消费者消费,需要等待指定的时长后才可被消费处理的消息,称为延时消息. 二.延时消息等级 RocketMQ延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的.默认支持18个等级的延迟消息,延时等级定义在RocketMQ服务端的MessageStoreConfig类中的如下变量中: // MessageStoreConf

  • Java利用Redis实现消息队列的示例代码

    本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下: 应用场景 为什么要用redis? 二进制存储.java序列化传输.IO连接数高.连接频繁 一.序列化 这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口; 其代码如下: package Utils

  • Python日志无延迟实时写入的示例

    我在用python生成日志时,发现无论怎么flush(),文件内容总是不能实时写入,导致程序意外中断时一无所获. 以下是查到的解决方案(亲测可行): open 函数中有一个bufferin的参数,默认是-1,如果设置为0是,就是无缓冲模式. 但是用二进制模式打开这个文件,并且把要写入的信息转换byte -like如下. with open("test.txt",'wb',buffering=0) as f: #wb是写模式加二进制模式 f.write(b"hello!&quo

随机推荐