聊聊RabbitMQ发布确认高级问题

目录
  • 1、发布确认高级
    • 1.1、发布确认SpringBoot版本
      • 1.1.1、确认机制方案
      • 1.1.2、代码架构图
      • 1.1.3、配置文件
      • 1.1.4、配置类
      • 1.1.5、回调接口
      • 1.1.6、生产者
      • 1.1.7、消费者
      • 1.1.8、测试结果
    • 1.2、回退消息
      • 1.2.1、Mandatory参数
      • 1.2.2、配置文件
      • 1.2.3、生产者代码
      • 1.2.4、回调接口代码
      • 1.2.5、测试结果
    • 1.3、备份交换机
      • 1.3.1、代码架构图
      • 1.3.2、配置类代码
      • 1.3.3、消费者代码
      • 1.3.4、测试结果

1、发布确认高级

1. 存在的问题

再生产环境中由于一些不明原因导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,会导致消息丢失。

1.1、发布确认SpringBoot版本

1.1.1、确认机制方案

当消息不能正常被接收的时候,我们需要将消息存放在缓存中。

1.1.2、代码架构图

1.1.3、配置文件

spring.rabbitmq.host=192.168.123.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated
  • NONE:禁用发布确认模式,是默认值。
  • CORRELATED:发布消息成功到交换机会触发回调方方法。
  • CORRELATED:就是发布一个就确认一个。

1.1.4、配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";

    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                        @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}

1.1.5、回调接口

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 回调接口
 */

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换机接受失败后进行回调
     * 1. 保存消息的ID及相关消息
     * 2. 是否接收成功
     * 3. 接受失败的原因
     * @param correlationData
     * @param b
     * @param s
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(b == true){
            log.info("交换机已经收到id为:{}的消息",id);
        }else{
            log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s);
        }
    }
}

1.1.6、生产者

import com.xiao.springbootrabbitmq.utils.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData1 = new CorrelationData("1");
        String routingKey1 = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1);

        CorrelationData correlationData2 = new CorrelationData("2");
        String routingKey2 = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2);
        log.info("发送得内容是:{}",message);
    }
}

1.1.7、消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ConfirmConsumer {
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";

    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveMessage(Message message){
        String msg = new String(message.getBody());
        log.info("接收到队列" + CONFIRM_QUEUE_NAME + "消息:{}",msg);
    }
}

1.1.8、测试结果

1. 第一种情况

ID1的消息正常送达,ID2的消息由于RoutingKey的错误,导致不能正常被消费,但是交换机还是正常收到了消息,所以此时由于交换机正常接收之后的原因丢失的消息不能正常被接收

2. 第二种情况

我们再上一种情况下修改了ID1的消息的交换机的名称,所以此时回调函数会进行回答由于什么原因导致交换机无法接收成功消息

1.2、回退消息

1.2.1、Mandatory参数

  • 在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由(就是消息被交换机成功接收后,无法到达队列),那么消息会直接被丢弃,此时生产者是不知道消息被丢弃这个事件的
  • 通过设置该参数可以在消息传递过程中不可达目的地时将消息返回给生产者。

1.2.2、配置文件

spring.rabbitmq.publisher-returns=true

需要在配置文件种开启返回回调

1.2.3、生产者代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData1 = new CorrelationData("1");
        String routingKey1 = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1);
        log.info("发送得内容是:{}",message + routingKey1);

        CorrelationData correlationData2 = new CorrelationData("2");
        String routingKey2 = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2);
        log.info("发送得内容是:{}",message + routingKey2);
    }
}

1.2.4、回调接口代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 回调接口
 */

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 交换机接受失败后进行回调
     * 1. 保存消息的ID及相关消息
     * 2. 是否接收成功
     * 3. 接受失败的原因
     * @param correlationData
     * @param b
     * @param s
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(b == true){
            log.info("交换机已经收到id为:{}的消息",id);
        }else{
            log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s);
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        Message message = returnedMessage.getMessage();
        String exchange = returnedMessage.getExchange();
        String routingKey = returnedMessage.getRoutingKey();
        String replyText = returnedMessage.getReplyText();
        log.error("消息{},被交换机{}退回,回退原因:{},路由Key:{}",new String(message.getBody()),exchange,replyText,routingKey);
    }
}

1.2.5、测试结果

其他类的代码与上一小节案例相同

ID2的消息由于RoutingKey不可路由,但是还是被回调函数处理了。

1.3、备份交换机

1.3.1、代码架构图

这里我们新增了备份交换机、备份队列、报警队列。它们绑定关系如图所示。如果确认交换机成功接收的消息无法路由到相应的队列,就会被确认交换机发送给备份交换机

1.3.2、配置类代码

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";

    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";

    public static final String BACKUP_QUEUE_NAME = "backup_queue";

    public static final String WARNING_QUEUE_NAME = "warning_queue";

    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
                .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
    }

    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    @Bean("backupQueue")
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    @Bean("warningQueue")
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                        @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

    @Bean
    public Binding queueBindingExchange1(@Qualifier("backupExchange") FanoutExchange backupExchange,
                                        @Qualifier("backupQueue") Queue backupQueue){
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    @Bean
    public Binding queueBindingExchange2(@Qualifier("backupExchange") FanoutExchange backupExchange,
                                         @Qualifier("warningQueue") Queue warningQueue){
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }
}

1.3.3、消费者代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class WarningConsumer {
    public static final String WARNING_QUEUE_NAME = "warning_queue";

    @RabbitListener(queues = WARNING_QUEUE_NAME)
    public void receiveMessage(Message message){
        String msg = new String(message.getBody());
        log.info("报警发现不可路由的消息内容为:{}",msg);
    }
}

1.3.4、测试结果

mandatory参数与备份交换机可以一起使用的时候,如果两者同时开启,备份交换机优先级高

到此这篇关于RabbitMQ发布确认高级的文章就介绍到这了,更多相关RabbitMQ发布确认高级内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringBoot整合RabbitMQ及生产全场景高级特性实战

    目录 摘要 整合 依赖与配置 生产者配置消息队列规则 生产者发布消息 消费者监听消息 摘要 整合场景含 topic 工作模式(通过 routingKey 可满足简单/工作队列/发布订阅/路由等四种工作模式)和 confirm(消息确认).return(消息返回).basicAck(消息签收).basicNack(拒绝签收).DLX(Dead Letter Exchange死信队列)实现延时/定时任务等. 整合 依赖与配置 以下内容消费者同生产者 <parent> <groupId>

  • 聊聊RabbitMQ发布确认高级问题

    目录 1.发布确认高级 1.1.发布确认SpringBoot版本 1.1.1.确认机制方案 1.1.2.代码架构图 1.1.3.配置文件 1.1.4.配置类 1.1.5.回调接口 1.1.6.生产者 1.1.7.消费者 1.1.8.测试结果 1.2.回退消息 1.2.1.Mandatory参数 1.2.2.配置文件 1.2.3.生产者代码 1.2.4.回调接口代码 1.2.5.测试结果 1.3.备份交换机 1.3.1.代码架构图 1.3.2.配置类代码 1.3.3.消费者代码 1.3.4.测试结

  • Java RabbitMQ的持久化和发布确认详解

    目录 1.持久化 1.1实现持久化 1.2不公平分发 1.3测试不公平分发 1.4预取值 1.4.1代码测试 2.发布确认 2.1单个确认发布 2.2批量确认发布 2.3异步确认发布 2.4处理未确认的消息 总结 1. 持久化 当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失.默认情况下RabbitMQ退出或者崩溃时,会忽视掉队列和消息.为了保证消息不丢失需要将队列和消息都标记为持久化. 1.1 实现持久化 1.队列持久化:在创建队列时将channel.queueDeclare();第

  • RabbitMQ消息确认机制剖析

    目录 前言 消息确认 基本流程 消息确认模式 ConfirmCallback确认模式 ReturnCallback退回模式 消息发送者确认 消息接收者确认 basicAck模式 basicNack模式 basicReject模式 测试 解决办法 消费者确认失败 总结 前言 上一章讲解了RabbitMq的三种Exchange消息发送的模式,但是在默认情况下RabbitMQ并不能保证消息是否发送成功,以及是否能被成功消费,为了保证消息在传递过程中不丢失,需要对消息进行确认机制,来提高消息的可靠性.

  • 详细聊聊RabbitMQ竟无法反序列化List问题

    目录 前言 问题重现 项目依赖 发送方 接收方 错误日志 分析问题原因 解决办法 总结 前言 最近在接到了一个需求,大概是通过RabbitMq给xx子系统同步用户数据,要提供单个同步和批量同步.内心暗喜这不简单的很嘛.三下五除二就把代码给写完了,大概长这样: public void syncUserSingle(User user) { // 省略一大堆业务代码 rabbitTemplate.convertAndSend("q_sync_user_single", user); } p

  • springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步 最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的. 于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和大伙一起学习学习! 这次我分享的是 springboot + rabbitmq 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,

  • SpringBoot整合RabbitMQ实现消息确认机制

    前面几篇案例已经将常用的交换器(DirectExchange.TopicExchange.FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认. 在rabbitmq-provider项目的application.yml文件上加上一些配置 server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.

  • 详解SpringBoot整合RabbitMQ如何实现消息确认

    目录 简介 生产者消息确认 介绍 流程 配置 ConfirmCallback ReturnCallback 注册ConfirmCallback和ReturnCallback 消费者消息确认 介绍 手动确认三种方式 简介 本文介绍SpringBoot整合RabbitMQ如何进行消息的确认. 生产者消息确认 介绍 发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递. 如果消息和队列是可持久化的,那么确认消

  • python操作RabbitMq的三种工作模式

    一.简介: RabbitMq 是实现了高级消息队列协议(AMQP)的开源消息代理中间件.消息队列是一种应用程序对应用程序的通行方式,应用程序通过写消息,将消息传递于队列,由另一应用程序读取 完成通信.而作为中间件的 RabbitMq 无疑是目前最流行的消息队列之一. ​ RabbitMq 应用场景广泛: 系统的高可用:日常生活当中各种商城秒杀,高流量,高并发的场景.当服务器接收到如此大量请求处理业务时,有宕机的风险.某些业务可能极其复杂,但这部分不是高时效性,不需要立即反馈给用户,我们可以将这部

  • python对RabbitMQ的简单入门使用教程

    目录 (一)RabbitMQ的简介 (二)RabbitMQ的安装 (三)python操作RabbitMQ (四)RabbitMQ简单模式 (五)RabbitMQ发布订阅模式 (六)RabbitMQ RPC模式 (七)说点啥 (八)结语 (一)RabbitMQ的简介 RabbitMq 是实现了高级消息队列协议(AMQP)的开源消息代理中间件.消息队列是一种应用程序对应用程序的通行方式,应用程序通过写消息,将消息传递于队列,由另一应用程序读取 完成通信.而作为中间件的 RabbitMq 无疑是目前最

  • rabbitmq五种模式详解(含实现代码)

    一.五种模式详解 1.简单模式(Queue模式) 当生产端发送消息到交换机,交换机根据消息属性发送到队列,消费者监听绑定队列实现消息的接收和消费逻辑编写.简单模式下,强调的一个队列queue只被一个消费者监听消费. 1.1 结构 生产者:生成消息,发送到交换机交换机:根据消息属性,将消息发送给队列消费者:监听这个队列,发现消息后,获取消息执行消费逻辑 1.2应用场景 常见的应用场景就是一发,一接的结构 例如: 手机短信邮件单发 2.争抢模式(Work模式) 强调的也是后端队列与消费者绑定的结构

随机推荐