SpringBoot整合RabbitMQ消息队列的完整步骤

SpringBoot整合RabbitMQ

主要实现RabbitMQ以下三种消息队列:

  • 简单消息队列(演示direct模式)
  • 基于RabbitMQ特性的延时消息队列
  • 基于RabbitMQ相关插件的延时消息队列

公共资源

1. 引入pom依赖

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置yml文件

基于上篇《RabbitMQ安装与配置》实现的情况下,进行基础配置。

spring:
  rabbitmq:
    host: 121.5.168.31
    port: 5672    # 默认可省略
    virtual-host: /*** # 虚拟主机
    username: *** # 用户名
    password: *** # 用户密码
    # 开启投递成功回调 P -> Exchange
    publisher-confirm-type: correlated
    # 开启投递消息到队列失败回调 Exchange -> Queue
    publisher-returns: true
    # 开启手动ACK确认模式 Queue -> C
    listener:
      simple:
        acknowledge-mode: manual # 代表手动ACK确认
        # 一些基本参数的设置
        concurrency: 3
        prefetch: 15
        retry:
          enabled: true
          max-attempts: 5
        max-concurrency: 10

3. 公共Constants类

/**
 * @author Mr.Horse
 * @version 1.0
 * @description: {description}
 * @date 2021/4/23 15:28
 */

public class Constants {

    /**
     * 第一个配置Queue,Exchange,Key(非注解方式)
     */
    public final static String HORSE_SIMPLE_QUEUE = "HORSE_SIMPLE_QUEUE";
    public final static String HORSE_SIMPLE_EXCHANGE = "HORSE_SIMPLE_EXCHANGE";
    public final static String HORSE_SIMPLE_KEY = "HORSE_SIMPLE_KEY";

    /**
     * 第二个配置Queue,Exchange,Key(注解方式)
     */
    public final static String HORSE_ANNOTATION_QUEUE = "HORSE_ANNOTATION_QUEUE";
    public final static String HORSE_ANNOTATION_EXCHANGE = "HORSE_ANNOTATION_EXCHANGE";
    public final static String HORSE_ANNOTATION_KEY = "HORSE_ANNOTATION_KEY";

    //************************************延时消息队列配置信息**************************
    /**
     * 延时队列信息配置
     */
    public final static String HORSE_DELAY_EXCHANGE = "HORSE_DELAY_EXCHANGE";
    public final static String HORSE_DELAY_QUEUE = "HORSE_DELAY_QUEUE";
    public final static String HORSE_DELAY_KEY = "HORSE_DELAY_KEY";

    /**
     * 死信队列
     */
    public final static String HORSE_DEAD_EXCHANGE = "HORSE_DEAD_EXCHANGE";
    public final static String HORSE_DEAD_QUEUE = "HORSE_DEAD_QUEUE";
    public final static String HORSE_DEAD_KEY = "HORSE_DEAD_KEY";

    //**************************************延时消息队列配置信息(插件版)******************************
    /**
     * 新延时队列信息配置
     */
    public final static String HORSE_PLUGIN_EXCHANGE = "HORSE_PLUGIN_EXCHANGE";
    public final static String HORSE_PLUGIN_QUEUE = "HORSE_PLUGIN_QUEUE";
    public final static String HORSE_PLUGIN_KEY = "HORSE_PLUGIN_KEY";

}

简单消息队列(direct模式)

4. RabbitTemplate模板配置

主要定义消息投递Exchange成功回调函数和消息从Exchange投递到消息队列失败的回调函数。

package com.topsun.rabbit;

import com.sun.org.apache.xpath.internal.operations.Bool;
import com.topsun.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Mr.Horse
 * @version 1.0
 * @description: {description}
 * @date 2021/4/23 14:17
 */
@Configuration
public class RabbitConfig {

    private static Logger logger = LoggerFactory.getLogger(RabbitConfig.class);

    @Autowired
    private CachingConnectionFactory connectionFactory;

    /**
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 触发setReturnCallback回调必须设置mandatory=true,否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setMandatory(Boolean.TRUE);
        // 设置序列化机制
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        // 消息由投递到Exchange中时触发的回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
                logger.info("消息发送到Exchange情况反馈:唯一标识:correlationData={},消息确认:ack={},原因:cause={}",
                        correlationData, ack, cause)
        );
        // 消息由Exchange发送到Queue时失败触发的回调
        rabbitTemplate.setReturnsCallback((returnedMessage) -> {
            // 如果是插件形式实现的延时队列,则直接返回
            // 原因: 因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列,从而实现延时队列的操作
            if (Constants.HORSE_PLUGIN_EXCHANGE.equals(returnedMessage.getExchange())) {
                return;
            }
            logger.warn("消息由Exchange发送到Queue时失败:message={},replyCode={},replyText={},exchange={},rountingKey={}",
                    returnedMessage.getMessage(), returnedMessage.getReplyText(), returnedMessage.getReplyText(),
                    returnedMessage.getExchange(), returnedMessage.getRoutingKey());
        });
        return rabbitTemplate;
    }

    //*******************************************直接配置绑定关系*****************************************
    /**
     * 声明队列
     *
     * @return
     */
    @Bean
    public Queue horseQueue() {
        return new Queue(Constants.HORSE_SIMPLE_QUEUE, Boolean.TRUE);
    }

    /**
     * 声明指定模式交换机
     *
     * @return
     */
    @Bean
    public DirectExchange horseExchange() {
        return new DirectExchange(Constants.HORSE_SIMPLE_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
    }

    /**
     * 绑定交换机,队列,路由Key
     *
     * @return
     */
    @Bean
    public Binding horseBinding() {
        return BindingBuilder.bind(horseQueue()).to(horseExchange()).with(Constants.HORSE_SIMPLE_KEY);
    }

}

5. 定义消息监听器

基于 @RabbitListenerzi注解,实现自定义消息监听器。主要有两种实现方式:

  • 如果在配置类中声明了Queue、Excehange以及他们直接的绑定,这里直接指定队列进行消息监听
  • 如果前面什么也没做,这里可以直接用注解的方式进行绑定实现消息监听
package com.topsun.rabbit;

import com.rabbitmq.client.Channel;
import com.topsun.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author Mr.Horse
 * @version 1.0
 * @description: {description}
 * @date 2021/4/23 14:58
 */

@Component
public class MsgListener {

    private static Logger logger = LoggerFactory.getLogger(MsgListener.class);

    /**
     * 配置类中已经完成绑定,这里直接根据队列值接收
     *
     * @param message
     * @param channel
     * @param msg
     */
    @RabbitListenerzi(queues = Constants.HORSE_SIMPLE_QUEUE)
    public void customListener(Message message, Channel channel, String msg) {
        // 获取每条消息唯一标识(用于手动ACK确认)
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            logger.info(" ==> customListener接收" + msg);
            // 手动ACK确认
            channel.basicAck(tag, false);
        } catch (IOException e) {
            logger.error(" ==> 消息接收失败: {}", tag);
        }
    }

    /**
     * 根据注解的形式进行绑定接收
     *
     * @param message
     * @param channel
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = Constants.HORSE_ANNOTATION_QUEUE, durable = "true"),
            exchange = @Exchange(value = Constants.HORSE_ANNOTATION_EXCHANGE, ignoreDeclarationExceptions = "true"),
            key = {Constants.HORSE_ANNOTATION_KEY}
    ))
    public void annotationListener(Message message, Channel channel, String msg) {
        // 获取每条消息唯一标识(用于手动ACK确认)
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            logger.info(" ==> annotationListener接收" + msg);
            // 手动ACK确认
            channel.basicAck(tag, false);
        } catch (IOException e) {
            logger.error(" ==> 消息接收失败: {}", tag);
        }
    }

}

6. 测试接口

这里发送100条消息:

  • 奇数条到非注解方式的消息监听器
  • 偶数条到注解式消息监听器
@GetMapping("/rabbit")
    public void sendMsg() {
        for (int i = 1; i <= 100; i++) {
            String msg = "第" + i + "条消息";
            logger.info("==> 发送" + msg);
            if (i % 2 == 1) {
                rabbitTemplate.convertAndSend(Constants.HORSE_SIMPLE_EXCHANGE, Constants.HORSE_SIMPLE_KEY, msg, new CorrelationData(String.valueOf(i)));
            } else {
                rabbitTemplate.convertAndSend(Constants.HORSE_ANNOTATION_EXCHANGE, Constants.HORSE_ANNOTATION_KEY, msg, new CorrelationData(String.valueOf(i)));
            }
        }
    }

结果:自行测试过,非常成功:smile::smile::smile:

延时消息队列

原理:生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。

7. 配置绑定相关信息

/**
 * @author Mr.Horse
 * @version 1.0
 * @description: {description}
 * @date 2021/4/24 14:22
 */

@Configuration
public class DelayRabbitConfig {

    private static Logger logger = LoggerFactory.getLogger(DelayRabbitConfig.class);

    /**
     * 声明延时队列交换机
     *
     * @return
     */
    @Bean
    public DirectExchange delayExchange() {
        return new DirectExchange(Constants.HORSE_DELAY_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
    }

    /**
     * 声明死信队列交换机
     *
     * @return
     */
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(Constants.HORSE_DEAD_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
    }

    /**
     * 声明延时队列 延时10s(单位:ms),并将延时队列绑定到对应的死信交换机和路由Key
     *
     * @return
     */
    @Bean
    public Queue delayQueue() {
        Map<String, Object> args = new HashMap<>(3);
        // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Constants.HORSE_DEAD_EXCHANGE);
        // x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", Constants.HORSE_DEAD_KEY);
        // x-message-ttl  声明队列的TTL(过期时间)
        // 可以在这里直接写死,也可以进行动态的设置(推荐动态设置)
        // args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(Constants.HORSE_DELAY_QUEUE).withArguments(args).build();
    }

    /**
     * 声明死信队列
     *
     * @return
     */
    @Bean
    public Queue deadQueue() {
        return new Queue(Constants.HORSE_DEAD_QUEUE, Boolean.TRUE);
    }

    /**
     * 延时队列绑定管理
     *
     * @return
     */
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(Constants.HORSE_DELAY_KEY);
    }

    /**
     * 死信队列绑定管理
     *
     * @return
     */
    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(Constants.HORSE_DEAD_KEY);
    }

    //**********************************延时消息队列配置信息(插件版)************************************

    @Bean
    public Queue pluginQueue() {
        return new Queue(Constants.HORSE_PLUGIN_QUEUE);
    }

   /**
     * 设置延时队列的交换机,必须是 CustomExchange 类型交换机
     * 参数必须,不能改变
     * @return
     */
    @Bean
    public CustomExchange customPluginExchange() {
        Map<String, Object> args = new HashMap<>(2);
        args.put("x-delayed-type", "direct");
        return new CustomExchange(Constants.HORSE_PLUGIN_EXCHANGE, "x-delayed-message", Boolean.TRUE, Boolean.FALSE, args);
    }

    @Bean
    public Binding pluginBinding() {
        return BindingBuilder.bind(pluginQueue()).to(customPluginExchange()).with(Constants.HORSE_PLUGIN_KEY).noargs();
    }

}

8. 定义延时监听器

/**
 * @author Mr.Horse
 * @version 1.0
 * @description: {description}
 * @date 2021/4/24 14:51
 */
@Component
public class DelayMsgListener {

    private static Logger logger = LoggerFactory.getLogger(DelayMsgListener.class);

    /**
     * 监听死信队列
     *
     * @param message
     * @param channel
     * @param msg
     */
    @RabbitListener(queues = Constants.HORSE_DEAD_QUEUE)
    public void consumeDeadListener(Message message, Channel channel, String msg) {
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            logger.info(" ==> consumeDeadListener接收" + msg);
            // 手动ACK确认
            channel.basicAck(tag, false);
        } catch (IOException e) {
            logger.error(" ==> 消息接收失败: {}", tag);
        }
    }

    /**
     * 监听延时队列(插件版)
     *
     * @param message
     * @param channel
     * @param msg
     */
    @RabbitListener(queues = Constants.HORSE_PLUGIN_QUEUE)
    public void consumePluginListener(Message message, Channel channel, String msg) {
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            logger.info(" ==> consumePluginListener" + msg);
            // 手动ACK确认
            channel.basicAck(tag, false);
        } catch (IOException e) {
            logger.error(" ==> 消息接收失败: {}", tag);
        }
    }

}

9. 测试接口

   // 基于特性的延时队列
	@GetMapping("/delay/rabbit")
    public void delayMsg(@RequestParam("expire") Long expire) {
        for (int i = 1; i <= 10; i++) {
            String msg = "第" + i + "条消息";
            logger.info("==> 发送" + msg);
            // 这里可以动态的设置过期时间
            rabbitTemplate.convertAndSend(Constants.HORSE_DELAY_EXCHANGE, Constants.HORSE_DELAY_KEY, msg,
                    message -> {
                        message.getMessageProperties().setExpiration(String.valueOf(expire));
                        return message;
                    },
                    new CorrelationData(String.valueOf(i)));
        }
    }

	// 基于插件的延时队列
    @GetMapping("/delay/plugin")
    public void delayPluginMsg(@RequestParam("expire") Integer expire) {
        for (int i = 1; i <= 10; i++) {
            String msg = "第" + i + "条消息";
            logger.info("==> 发送" + msg);
            // 动态设置过期时间
            rabbitTemplate.convertAndSend(Constants.HORSE_PLUGIN_EXCHANGE, Constants.HORSE_PLUGIN_KEY, msg, message -> {
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                message.getMessageProperties().setDelay(expire);
                return message;
            }, new CorrelationData(String.valueOf(i)));

        }
    }

结果:你懂的:scream_cat::scream_cat::scream_cat:

RabbitMQ的基础使用演示到此结束。

总结

到此这篇关于SpringBoot整合RabbitMQ消息队列的文章就介绍到这了,更多相关SpringBoot整合RabbitMQ消息队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springBoot整合rabbitMQ的方法详解

    引入pom <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0

  • spring boot整合RabbitMQ(Direct模式)

    springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持. 1.新建一个Spring Boot工程,命名为:"rabbitmq-hello". 在pom.xml中引入如下依赖内容,其中spring-boot-starter-amqp用于支持RabbitMQ. <dependency> <groupId>org.springframework.boo

  • Spring Boot整合RabbitMQ实例(Topic模式)

    1.Topic交换器介绍 Topic Exchange 转发消息主要是根据通配符. 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息. 在这种交换机模式下: 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等. 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*

  • springboot整合rabbitmq的示例代码

    概述 RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,或者简单地将作业队列以便让分布式服务器进行处理. 它现实了AMQP协议,并且遵循Mozilla Public License开源协议,它支持多种语言,可以方便的和spring集成. 消息队列使用消息将应用程序连接起来,这些消息通过像RabbitMQ这样的消息代理服务器在应用程序之间路由. 基本概念 Broker 用来处理数据的消息队列服务器实体 vhost 由RabbitMQ服务器创建的虚拟消息

  • Spring Boot整合RabbitMQ开发实战详解

    这篇文章主要讲基本的整合.先把代码跑起来,再说什么高级特性. RabbitMQ 中的一些术语 如果你打开 RabbitMQ web 控制台,你会发现其中有一个 Exhanges 不好理解.下面简单说明一下. 交换器(Exchange) 交换器就像路由器,我们先是把消息发到交换器,然后交换器再根据路由键(routingKey)把消息投递到对应的队列.(明白这个概念很重要,后面的代码里面充分体现了这一点) 队列(Queue) 队列很好理解,就不用解释了. 绑定(Binding) 交换器怎么知道把这条

  • SpringBoot整合RabbitMQ, 实现生产者与消费者的功能

    自然,依赖是少不了的.除了spring-boot-starter-web依赖外. 就这个是最主要的依赖了,其他的看着办就是了.我用的是gradle,用maven的看着弄也一样的.无非就是包+包名+版本 //AMQP compile('org.springframework.boot:spring-boot-starter-amqp:2.0.4.RELEASE') 这里有一个坑.导致我后来发送消息时一直连不上去.报错: java.net.SocketException: socket closed

  • spring boot整合RabbitMQ实例详解(Fanout模式)

    1.Fanout Exchange介绍 Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略. 如上图所示,即当使用fanout交换器时,他会将消息广播到与该交换器绑定的所有队列上,这有利于你对单条消息做不同的反应. 例如存在以下场景:一个web服务要在用户完善信息时,获得积分奖励,这样你就可以创建两个对列,一个用来处理用户信息的请求,另一个对列获取这条消息是来完成积分奖励的任务. 2.代码示例 1).

  • SpringBoot整合RabbitMQ 手动应答(简单demo)

    版本说明 JDK 1.8 RabbitMQ 3.7.15 Erlang 22.0 SpringBoot 2.3.3.RELEASE // TODO 2021年1月8日 整理CentOS安装RabbitMQ流程 1. 在RabbitMQ的Web管理界面,创建test队列 参数的含义 durability:是否持久化(重启或宕机后消息依然保存) durable 持久 transient 暂时 新建maven项目. 2. pom.xml <?xml version="1.0" enco

  • Springboot 整合RabbitMq(用心看完这一篇就够了)

    该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,provider消息推送实例,consumer消息消费实例,Direct.Topic.Fanout的使用,消息回调.手动确认等. (但是关于rabbitMq的安装,就不介绍了) 在安装完rabbitMq后,输入http://ip:15672/ ,是可以看到一个简单后台管理界面的. 在这个界面里面我们可以做些什么? 可以手动创建虚拟host,创建用户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等. 以上

  • SpringBoot整合RabbitMQ消息队列的完整步骤

    SpringBoot整合RabbitMQ 主要实现RabbitMQ以下三种消息队列: 简单消息队列(演示direct模式) 基于RabbitMQ特性的延时消息队列 基于RabbitMQ相关插件的延时消息队列 公共资源 1. 引入pom依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • Springboot 整合 RabbitMQ 消息队列 详情

    目录 生产者工程 POM依赖 application文件 生产者业务代码 测试 Direct 模式 业务代码 消费者 消息监听 Topic 模式 生产者 消费者 生产者工程 POM依赖 可以在创建工程时直接选择添加依赖. application文件 因为rabbitmq具有默认地址及用户信息,所以如果是本地rabbitmq可以不需要进行配置. RabbitMQ配置文件: 在使用相关交换机及队列时,我们需要实现声明交换机及队列,如果没有对应信息,则启动项目会失败.所以在使用springboot整合

  • SpringBoot整合rockerMQ消息队列详解

    目录 Springboot整合RockerMQ 使用总结 消费模式 生产者组和消费者组 生产者投递消息的三种方式 如何保证消息不丢失 顺序消息 分布式事务 Springboot整合RockerMQ 1.maven依赖 <dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>

  • Springboot整合Active消息队列

    简单理解: Active是Apache公司旗下的一个消息总线,ActiveMQ是一个开源兼容Java Message Service(JMS) 面向消息的中件间. 是一个提供松耦合的应用程序架构. 主要用来在服务与服务之间进行异步通信的. 一.搭建步骤     1.相应jar包 <!-- 整合消息队列ActiveMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifa

  • springboot整合quartz定时任务框架的完整步骤

    目录 Spring整合Quartz pom文件 对应的properties 文件 配置类 自定义任务类:ScheduledTask 获取spring中bean的工具类:SpringContextUtil 定时任务服务接口:QuartzService QuartzService实现类:QuartzServiceImpl ScheduledTaskRunner类 任务实体类:QuartzTask 任务service层 service实现类 任务controller 数据表 具体使用 具体效果 总结

  • SpringBoot整合RabbitMQ实现消息确认机制

    前面几篇案例已经将常用的交换器(DirectExchange.TopicExchange.FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认. 在rabbitmq-provider项目的application.yml文件上加上一些配置 server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.

  • 详解SpringBoot整合RabbitMQ如何实现消息确认

    目录 简介 生产者消息确认 介绍 流程 配置 ConfirmCallback ReturnCallback 注册ConfirmCallback和ReturnCallback 消费者消息确认 介绍 手动确认三种方式 简介 本文介绍SpringBoot整合RabbitMQ如何进行消息的确认. 生产者消息确认 介绍 发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递. 如果消息和队列是可持久化的,那么确认消

  • SpringBoot整合RabbitMQ处理死信队列和延迟队列

    目录 简介 实例代码 路由配置 控制器 发送器 接收器 application.yml 实例测试 简介 说明 本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列. RabbitMQ消息简介 RabbitMQ的消息默认不会超时. 什么是死信队列?什么是延迟队列? 死信队列: DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,

  • SpringBoot整合RabbitMQ实现交换机与队列的绑定

    目录 简介 配置方法概述 法1:配置类(简洁方法)(推荐) 法2:配置类(繁琐方法)(不推荐) 法3:使用方配置(不推荐) 法4:MQ服务端网页(不推荐) 简介 本文用实例介绍SpringBoot中RabbitMQ如何绑定交换机(交换器)与队列. 配置方法概述 交换机 下边两种方式等价. ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME).durable(true).build(); new TopicExchange(EXCHANGE_T

  • SpringBoot使用RabbitMQ延时队列(小白必备)

    1.什么是MQ MQ,是一种跨进程的通信机制,用于上下游传递消息. 在互联网架构中,MQ是一种非常常见的上下游"逻辑解耦+物理解耦"的消息通信服务. 使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务. 为什么会产生消息列队? 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个: 不

随机推荐