RabbitMQ延迟队列及消息延迟推送实现详解

这篇文章主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

应用场景

目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:

  • 淘宝七天自动确认收货。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能。
  • 12306 购票支付确认页面。我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 30 分钟内订单不确认的话将会自动取消订单。其实在下订单那一刻开始购票业务系统就会发送一个延时消息给订单系统,延时30分钟,告诉订单系统订单未完成,如果我们在30分钟内完成了订单,则可以通过逻辑代码判断来忽略掉收到的消息。

在上面两种场景中,如果我们使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:

  • 使用 redis 给订单设置过期时间,最后通过判断 redis 中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息的延迟推送性能较低,因为我们知道 redis 都是存储于内存中,我们遇到恶意下单或者刷单的将会给内存带来巨大压力。
  • 使用传统的数据库轮询来判断数据库表中订单的状态,这无疑增加了IO次数,性能极低。
  • 使用 jvm 原生的 DelayQueue ,也是大量占用内存,而且没有持久化策略,系统宕机或者重启都会丢失订单信息。

消息延迟推送的实现

在 RabbitMQ 3.6.x 之前我们一般采用死信队列+TTL过期时间来实现延迟队列,我们这里不做过多介绍,可以参考之前文章来了解:TTL、死信队列

在 RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件,可以下载放置到 RabbitMQ 根目录下的 plugins 下。延迟队列插件下载

首先我们创建交换机和消息队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {

  public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
  public static final String LAZY_QUEUE = "MQ.LazyQueue";
  public static final String LAZY_KEY = "lazy.#";

  @Bean
  public TopicExchange lazyExchange(){
    //Map<String, Object> pros = new HashMap<>();
    //设置交换机支持延迟消息推送
    //pros.put("x-delayed-message", "topic");
    TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
    exchange.setDelayed(true);
    return exchange;
  }

  @Bean
  public Queue lazyQueue(){
    return new Queue(LAZY_QUEUE, true);
  }

  @Bean
  public Binding lazyBinding(){
    return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
  }
}

我们在 Exchange 的声明中可以设置exchange.setDelayed(true)来开启延迟队列,也可以设置为以下内容传入交换机声明的方法中,因为第一种方式的底层就是通过这种方式来实现的。

    //Map<String, Object> pros = new HashMap<>();
    //设置交换机支持延迟消息推送
    //pros.put("x-delayed-message", "topic");
    TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);

发送消息时我们需要指定延迟推送的时间,我们这里在发送消息的方法中传入参数 new MessagePostProcessor() 是为了获得 Message对象,因为需要借助 Message对象的api 来设置延迟时间。

import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class MQSender {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  //confirmCallback returnCallback 代码省略,请参照上一篇

  public void sendLazy(Object message){
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setConfirmCallback(confirmCallback);
    rabbitTemplate.setReturnCallback(returnCallback);
    //id + 时间戳 全局唯一
    CorrelationData correlationData = new CorrelationData("12345678909"+new Date());

    //发送消息时指定 header 延迟时间
    rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
        new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
        //设置消息持久化
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        //message.getMessageProperties().setHeader("x-delay", "6000");
        message.getMessageProperties().setDelay(6000);
        return message;
      }
    }, correlationData);
  }
}

我们可以观察 setDelay(Integer i)底层代码,也是在 header 中设置 x-delay。等同于我们手动设置 header

message.getMessageProperties().setHeader("x-delay", "6000");

/**
 * Set the x-delay header.
 * @param delay the delay.
 * @since 1.6
 */
public void setDelay(Integer delay) {
  if (delay == null || delay < 0) {
    this.headers.remove(X_DELAY);
  }
  else {
    this.headers.put(X_DELAY, delay);
  }
}

消费端进行消费

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class MQReceiver {

  @RabbitListener(queues = "MQ.LazyQueue")
  @RabbitHandler
  public void onLazyMessage(Message msg, Channel channel) throws IOException{
    long deliveryTag = msg.getMessageProperties().getDeliveryTag();
    channel.basicAck(deliveryTag, true);
    System.out.println("lazy receive " + new String(msg.getBody()));

  }

测试结果

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {

  @Autowired
  private MQSender mqSender;

  @Test
  public void sendLazy() throws Exception {
    String msg = "hello spring boot";

    mqSender.sendLazy(msg + ":");
  }
}

果然在 6 秒后收到了消息 lazy receive hello spring boot:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • SpringBoot+RabbitMq具体使用的几种姿势

    目前主流的消息中间件有activemq,rabbitmq,rocketmq,kafka,我们要根据实际的业务场景来选择一款合适的消息中间件,关注的主要指标有,消息投递的可靠性,可维护性,吞吐量以及中间件的特色等重要指标来选择,大数据领域肯定是kafka,那么传统的业务场景就是解耦,异步,削峰.那么就在剩下的3款产品中选择一款,从吞吐量,社区的活跃度,消息的可靠性出发,一般的中小型公司选择rabbitmq来说可能更为合适.那么我们就来看看如何使用它吧. 环境准备 本案例基于springboot集成

  • C#实现rabbitmq 延迟队列功能实例代码

    最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就

  • Rabbitmq延迟队列实现定时任务的方法

    场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题,一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执行,也就是说不管怎么样,我们需要先查询数据库,而且有些任务对时间准确要求比较高的,需要每秒查询一次,对于系统小倒是无所谓,如果系统本身就大而且数据也多的情况下,这就不大现实了,所以需要其他方式的,当然实现的方式有多种多样的,比如Redis实现定时队列.基于优先

  • 详解Spring Cloud Stream使用延迟消息实现定时任务(RabbitMQ)

    我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送.对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理. 为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用.RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍

  • JAVA获取rabbitmq消息总数过程详解

    公司使用的是rabbitMQ,需要做监控预警的job去监控rabbitMQ里面的堆积消息个数,如何使用rabbitMQ获取监控的队列里面的队列消息个数呢? 首先需要创建一个连接,配置文件注入相关的值,然后设置连接的相关信息,创建链接. 导入的包是使用: import com.rabbitmq.client @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}

  • 关于利用RabbitMQ实现延迟任务的方法详解

    开发过程中通常会碰到这样的需求: 淘宝订单业务:下单后 30min 之内没有付款,就自动取消订单. 饿了吗订餐通知:下单成功后 60s 之后给用户发送短信通知. 关闭空闲连接:服务器中有很多客户端的连接,空闲一段时间之后需要关闭之. 缓存:缓存中的对象,超过了空闲时间,从缓存中移出. 任务超时处理:在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求. 失败重试机制:业务操作失败后,间隔一定的时间进行失败重试. 这类业务的特点就是:需要延迟工作,需要进行失败重试.一种比较笨的方式是使用一个后

  • Spring Boot与RabbitMQ结合实现延迟队列的示例

    背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理.这是就可以使用延时队列将订单信息发送到延时队列. 场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作.这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备. 延迟队列能做什么? 延迟队列多用于需

  • Laravel使用RabbitMQ的方法示例

    导语 RabbitMQ 想必大家都有了解,不做多介绍来.这里实现的是用 RabbitMQ 作为 Larvel 队列的驱动,替代 Redis.下面以 Laradock 中安装示例. 安装 切换到 laradock 目录,将 .env 中关于 INSTALL_AMQP 的值修改为 true docker-compose stop workspace php-fpm php-worker docker-compose build workspace php-fpm php-worker rabbitm

  • Spring Boot RabbitMQ 延迟消息实现完整版示例

    概述 曾经去网易面试的时候,面试官问了我一个问题,说 下完订单后,如果用户未支付,需要取消订单,可以怎么做 我当时的回答是,用定时任务扫描DB表即可.面试官不是很满意,提出: 用定时任务无法做到准实时通知,有没有其他办法? 我当时的回答是: 可以用队列,订单下完后,发送一个消息到队列里,并指定过期时间,时间一到,执行回调接口. 面试官听完后,就不再问了.其实我当时的思路是对的,只不过讲的不是很专业而已.专业说法是利用 延迟消息 . 其实用定时任务,确实有点问题,原本业务系统希望10分钟后,如果订

  • RabbitMQ延迟队列及消息延迟推送实现详解

    这篇文章主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能. 12306 购票支付确认页面.我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 3

  • Spring Boot 使用 SSE 方式向前端推送数据详解

    目录 前言 服务端 SSE工具类 在Controller层创建 SSEController.java 前端代码 前言 SSE简单的来说就是服务器主动向前端推送数据的一种技术,它是单向的,也就是说前端是不能向服务器发送数据的.SSE适用于消息推送,监控等只需要服务器推送数据的场景中,下面是使用Spring Boot 来实现一个简单的模拟向前端推动进度数据,前端页面接受后展示进度条. 服务端 在Spring Boot中使用时需要注意,最好使用Spring Web 提供的SseEmitter这个类来进

  • Android、iOS和Windows Phone中的推送技术详解

    推送并不是什么新技术,这种技术在互联网时代就已经很流行了.只是随着进入移动互联网时代,推送技术显得更加重要.因为在智能手机中,推送从某种程度上,可以取代使用多年的短信,而且与短信相比,还可以向用户展示更多的信息(如图像.表格.声音等). 推送技术的实现通常会使用服务端向客户端推送消息的方式.也就是说客户端通过用户名.Key等ID注册到服务端后,在服务端就可以将消息向所有活动的客户端发送. 实际上,在很多移动操作系统中,官方都为其提供了推送方案,例如,Google的云推送.IOS.Windows

  • node.js中TCP Socket多进程间的消息推送示例详解

    前言 前段时间接到了一个支付中转服务的需求,即支付数据通过http接口传到中转服务器,中转服务器将支付数据发送到异构后台(Lua)的指定tcp socket. 一开始评估的时候感觉蛮简单的,就是http server和tcp server间的通信,不是一个Event实例就能解决的状态管理问题吗?注册一个事件A用于消息传递,在socket连接时注册唯一的ID,然后在http接收到数据时,emit事件A:在监听到事件A时,在tcp server中寻找指定ID对应的socket处理该数据即可. 尽管n

  • iOS10推送教程详解

    上个月接到一个需求,做ios10的推送,意图冲击AppStore头条.瞬间抓狂,工具都还没有,于是赶紧安装xcodeBeta版,ios10Beta版,然后就开始无尽的查资料,毕竟新功能,毕竟没做过........不过还好,在发布会之前赶出来了,由于本人比较懒,拖到现在才写出来,接下来就是见证奇迹的时刻! 原理 图中,Provider是指某个iPhone软件的Push服务器,这篇文章我将使用.net作为Provider. APNS 是Apple Push Notification Service(

  • iOS12新特性之推送通知详解

    序言 众所周知,iOS中消息推送扮演了不可或缺的位置.不管是本地通知还是远程通知无时不刻的在影响着我们的用户体验,以致于在iOS10的时候苹果对推送大规模重构,独立了已 UserNotifications 和 UserNotificationsUI 两个单独的framework,可见重要性一斑.针对于WWDC18苹果又给我们带来了什么惊喜呢? 新特性 Grouped notifications 推送分组 Notification content extensions 推送内容扩展中的可交互和动态

  • Redis延迟队列和分布式延迟队列的简答实现

    最近,又重新学习了下Redis,Redis不仅能快还能慢,简直利器,今天就为大家介绍一下Redis延迟队列和分布式延迟队列的简单实现. 在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列.我们本文的梗概如下,同学们可以选择性阅读. 1. 实现一个简单的延迟队列. 我们知道目前JAVA可以有DelayedQueue,我们首先开一个DelayQueue的结构类图.DelayQueue实现了Delay.BlockingQue

  • PHP实现的消息实时推送功能【基于反ajax推送】

    本文实例讲述了PHP实现的消息实时推送功能.分享给大家供大家参考,具体如下: 入口文件index.html <!DOCTYPE HTML> <html> <head> <title>反ajax推送</title> <style> .send{color:#555;text-align: left;} .require{color:blue;text-align: right;} .content_box{text-align: cen

  • 利用Socket.io 实现消息实时推送功能

    项目背景介绍 最近在写的项目中存在着社交模块,需要实现这样的一个功能:当发生了用户被点赞.评论.关注等操作时,需要由服务器向用户实时地推送一条消息.最终完成的项目地址为:https://github.com/noiron/socket-message-push,这里将介绍一下实现的思路及部分代码. 项目的流程中存在着这样的几个对象: 用 Java 实现的后端服务器 用 Node.js 实现的消息推送服务器 用户进行操作的客户端 事件处理的流程如下: 用户进行点赞操作时,后端服务器会进行处理,并向

  • PHP实现长轮询消息实时推送功能代码实例讲解

    本文实例讲述了PHP实现的消息实时推送功能.分享给大家供大家参考,具体如下: 入口文件index.html <!DOCTYPE HTML> <html> <head> <title>反ajax推送</title> <style> .send{color:#555;text-align: left;} .require{color:blue;text-align: right;} .content_box{text-align: cen

随机推荐