详解SpringBoot整合RabbitMQ如何实现消息确认

目录
  • 简介
  • 生产者消息确认
    • 介绍
    • 流程
    • 配置
    • ConfirmCallback
    • ReturnCallback
    • 注册ConfirmCallback和ReturnCallback
  • 消费者消息确认
    • 介绍
    • 手动确认三种方式

简介

本文介绍SpringBoot整合RabbitMQ如何进行消息的确认。

生产者消息确认

介绍

发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递。

如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出;如果是镜像队列,所有镜像接受成功后发确认消息。

流程

  • 如果消息没有到达exchange,则confirm回调,ack=false
  • 如果消息到达exchange,则confirm回调,ack=true
  • exchange到queue成功,则不回调return
  • exchange到queue失败,则回调return(需设置mandatory=true,否则不会回调,这样消息就丢了)

配置

application.yml

# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true

ConfirmCallback

ConfirmCallback:消息只要被 RabbitMQ broker 接收到就会触发confirm方法。

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("confirm==>发送到broker失败\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        } else {
            log.info("confirm==>发送到broker成功\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        }
    }
}

correlationData:对象内部有id (消息的唯一性)和Message。

(若ack为false,则Message不为null,可将Message数据 重新投递;若ack是true,则correlationData为null)

ack:消息投递到exchange 的状态,true表示成功。

cause:表示投递失败的原因。 (若ack为false,则cause不为null;若ack是true,则cause为null)

给每一条信息添加一个dataId,放在CorrelationData,这样在RabbitConfirmCallback返回失败时可以知道哪个消息失败。

public void send(String dataId, String exchangeName, String rountingKey, String message){
    CorrelationData correlationData = new CorrelationData();
    correlationData.setId(dataId);

    rabbitTemplate.convertAndSend(exchangeName, rountingKey, message, correlationData);
}

public String receive(String queueName){
    return String.valueOf(rabbitTemplate.receiveAndConvert(queueName));
}

2.1版本开始,CorrelationData对象具有ListenableFuture,可用于获取结果,而不是在rabbitTemplate上使用ConfirmCallback。

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());

ReturnCallback

ReturnCallback:如果消息未能投递到目标 queue 里将触发returnedMessage方法。

若向 queue 投递消息未成功,可记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

注意:需要rabbitTemplate.setMandatory(true);

当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者。当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。

代码:

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
                        "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
                        message, replyCode, replyText, exchange, routingKey);
    }
}

message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

注册ConfirmCallback和ReturnCallback

整合后的写法

package com.example.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Slf4j
@Configuration
public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        return rabbitTemplate;
    }

    // 下边这样写也可以
    // @Autowired
    // private RabbitTemplate rabbitTemplate;
    // @PostConstruct
    // public void init() {
    //     rabbitTemplate.setMandatory(true);
    //     rabbitTemplate.setReturnCallback(this);
    //     rabbitTemplate.setConfirmCallback(this);
    // }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("confirm==>发送到broker失败\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        } else {
            log.info("confirm==>发送到broker成功\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
                        "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
                        message, replyCode, replyText, exchange, routingKey);
    }
}

消费者消息确认

介绍

确认方式 简介 详述
auto(默认) 根据消息消费的情况,智能判定 若消费者抛出异常,则mq不会收到确认消息,mq会一直此消息发出去
若消费者没有抛出异常,则mq会收到确认消息,mq不会再次将此消息发出去。
若消费者在消费时所在服务挂了,mq不会再次将此消息发出去。
none mq发出消息后直接确认消息  
manual 消费端手动确认消息 消费者调用 ack、nack、reject 几种方法进行确认,可以在业务失败后进行一些操作,如果消息未被 ACK 则消息还会存在于MQ,mq会一直将此消息发出去。
如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限。

只要消息没有被消费者确认(包括没有自动确认),会导致消息一直被失败消费,死循环导致消耗大量资源。正确的处理方式是:发生异常,将消息记录到db,再通过补偿机制来补偿消息,或者记录消息的重复次数,进行重试,超过几次后再放到db中。

消息确认三种方式配置方法

spring.rabbitmq.listener.simple.acknowledge-mode=manual

spring.rabbitmq.listener.direct.acknowledge-mode=manual

手动确认三种方式

basicAck,basicNack,basicReject

basicAck

含义

表示成功确认,使用此回执方法后,消息会被RabbitMQ broker 删除。

函数原型

void basicAck(long deliveryTag, boolean multiple) 

deliveryTag

  • 消息投递序号
  • 每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。

multiple

  • 是否批量确认
  • 值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

示例: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

实例

@RabbitHandler
public void process(String content, Channel channel, Message message){
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

basicNack

含义

表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

函数原型

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • deliveryTag:表示消息投递序号。
  • multiple:是否批量确认。
  • requeue:值为 true 消息将重新入队列。

basicReject

含义

拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

函数原型

void basicReject(long deliveryTag, boolean requeue)

  • deliveryTag:表示消息投递序号。
  • requeue:值为 true 消息将重新入队列。

以上就是详解SpringBoot整合RabbitMQ如何实现消息确认的详细内容,更多关于SpringBoot RabbitMQ消息确认的资料请关注我们其它相关文章!

(0)

相关推荐

  • SpringBoot整合RabbitMQ实现消息确认机制

    前面几篇案例已经将常用的交换器(DirectExchange.TopicExchange.FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认. 在rabbitmq-provider项目的application.yml文件上加上一些配置 server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.

  • springboot中rabbitmq实现消息可靠性机制详解

    1. 生产者模块通过publisher confirm机制实现消息可靠性 1.1 生产者模块导入rabbitmq相关依赖 <!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-

  • SpringBoot+RabbitMQ实现消息可靠传输详解

    目录 环境配置 消息丢失分析 生产阶段 生产端模拟消息丢失 RabbitMQ 消费端 环境配置 SpringBoot 整合 RabbitMQ 实现消息的发送. 1.添加 maven 依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <depen

  • SpringBoot整合RabbitMQ消息队列的完整步骤

    SpringBoot整合RabbitMQ 主要实现RabbitMQ以下三种消息队列: 简单消息队列(演示direct模式) 基于RabbitMQ特性的延时消息队列 基于RabbitMQ相关插件的延时消息队列 公共资源 1. 引入pom依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步 最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的. 于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和大伙一起学习学习! 这次我分享的是 springboot + rabbitmq 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,

  • 详解SpringBoot整合RabbitMQ如何实现消息确认

    目录 简介 生产者消息确认 介绍 流程 配置 ConfirmCallback ReturnCallback 注册ConfirmCallback和ReturnCallback 消费者消息确认 介绍 手动确认三种方式 简介 本文介绍SpringBoot整合RabbitMQ如何进行消息的确认. 生产者消息确认 介绍 发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递. 如果消息和队列是可持久化的,那么确认消

  • 详解Springboot整合ActiveMQ(Queue和Topic两种模式)

    写在前面: 从2018年底开始学习SpringBoot,也用SpringBoot写过一些项目.这里对学习Springboot的一些知识总结记录一下.如果你也在学习SpringBoot,可以关注我,一起学习,一起进步. ActiveMQ简介 1.ActiveMQ简介 Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件:由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行. 2.ActiveMQ下载 下载地址:htt

  • 详解springboot整合ueditor踩过的坑

    有一天老板突然找我让我改富文本(一脸懵逼,不过也不能推啊默默地接下了),大家都知道现在的富文本视频功能都是只有上传链接的没有从本地上传这一说(就连现在的csdn的也是)于是我找了好多个,最终发现百度的ueditor可以. 经过几天的日夜,甚至牺牲了周末休息时间开始翻阅资料... 废话不多说,开始教程: 第一步: 去ue官网下载他的源码 第二步: 解压下载的源码(下载可能会慢,好像需要翻墙下载) 然后打开项目把源码拖进项目的resources/static中去 第三步 就是重点了 由于spring

  • 详解SpringBoot整合MyBatis详细教程

    1. 导入依赖 首先新建一个springboot项目,勾选组件时勾选Spring Web.JDBC API.MySQL Driver 然后导入以下整合依赖 <!-- https://mvnrepository.com/artifact/org.mybatis.spring.boot/mybatis-spring-boot-starter --> <dependency> <groupId>org.mybatis.spring.boot</groupId> &

  • 详解springboot整合ehcache实现缓存机制

    EhCache 是一个纯Java的进程内缓存框架,具有快速.精干等特点,是Hibernate中默认的CacheProvider. ehcache提供了多种缓存策略,主要分为内存和磁盘两级,所以无需担心容量问题. spring-boot是一个快速的集成框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程.该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置. 由于spring-boot无需任何样板化的配置文件,所以spring-boot集成一些其他框架时会有略微的

  • 详解Springboot整合Dubbo之代码集成和发布

    本文介绍了Springboot整合Dubbo之代码集成和发布,分享给大家,具体如下: 1. boot-dubbo-api相关 打开boot-dubbo-api项目,正在src/main/java下创建一个包,并创建你需要dubbo暴露的接口TestService.java,并创建一个实体类用于测试User.java.如下图所示: 创建文件和包结构 User.java package com.boot.domain; import lombok.Data; import java.io.Seria

  • 详解springboot整合Listener的两种方式

    1.通过注解 编写启动类 package cn.bl; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletComponentScan; @SpringBootApplication @ServletCompo

  • 详解springboot整合mongodb

    这篇文章主要介绍springboot如何整合MongoDB. 准备工作 安装 MongoDB jdk 1.8 maven 3.0 idea 环境依赖 在pom文件引入spring-boot-starter-data-mongodb依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifa

  • springBoot整合rabbitMQ的方法详解

    引入pom <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0

  • Springboot整合RabbitMq测试TTL的方法详解

    目录 什么是TTL? 如何设置TTL? 设定整个队列的过期时间 配置类编写 测试 配置 测试 总结 代码下载 什么是TTL? 在RabbitMq中,存在一种高级特性 TTL. TTL即Time To Live的缩写,含义为存活时间或者过期时间.即: 设定消息在队列中存活的时间.当指定时间内,消息依旧未被消费,则由队列自动将其删除. 如何设置TTL? 既然涉及到设定消息的存活时间,在RabbitMq中,存在两种设置方式: 设置整个队列的过期时间. 设置单个消息的过期时间. 设定整个队列的过期时间

随机推荐