springboot+rabbitmq实现指定消费者才能消费的方法

如何保证mq队列里的消息只被测试服务器上的consumer消费,避免本地环境误消费?

程序里有一个应用场景使用到了rabbitmq——当财务确认收到企业的打款金额后,系统会把企业订单生成用户付款单。由于订单记录数据量大,改为通过mq来异步实现。即财务确认收款操作后,将企业订单数据放入mq,另一端监听mq消息队列,将收到的企业订单加工转换成用户付款单,并做持久化。

本地开发环境与测试环境共用一套rabbitmq。当项目部署到测试环境后,QA测试过程中,总是“莫名其妙”的发现所保存的用户付款单数据有问题。

当然,首先要排查程序,检查Consumer的数据处理的逻辑是否有bug。单元测试后,发现并不存在测试环境的bug。

原来,消息队列被“非正常”消费了!

Q: 什么情况?

A: 几个伙伴一起参与的项目,大家总是要调试自己的程序的。而如果碰巧本地程序监听到消息队列里有消息,那么,消息就被本地程序消费掉了。问题正是出现在这里!————团队开发,大家并不会及时检出git上最新的程序版本。如果本地的程序版本不是最新的正确的版本,势必会出现bug。

那么,怎么办?

每次你改了逻辑,告诉大家获取最新?

不现实,约定的东西往往不奏效的。

如何保证mq队列里的消息只被测试服务器上的consumer消费,避免本地环境误消费? 或者说,如何实现消息的定向消费呢?

只要肯琢磨,办法总比困难多!百思可得解!

我们知道,rabbitmq手动ack模式。这还不够,因为我们怎么让consumer来决定是否消费呢? 所以,我们需要一个标识————producer设定一个标识,consumer如果匹配这个标识,则消费,否则予以reject放回消息队列。

通过查看spring-rabbit/spring-amqp的代码,发现可以在spring-amqp里的MessageProperties上做文章。生产者与消费者每次消息传输都会携带一个MessageProperties,通常我们是不指定的,走MessageProperties的默认设置值。

我的策略:MessageProperties有一个属性叫AppId。我们程序所部署的测试机器就一台,即消息Producer和消息Consumer在一台机器上。那么,我就可以利用机器的IP来识别消息。只有Producer与Consumer的IP匹配,才消费消息。程序员本机IP与测试服务器IP不一样,就会拒绝接收消息,会把消息重新放回消息队列,等待测试服务器的Consumer消费。

话不多说,上代码吧,

生产者代码:

package com.sboot.mq;

import org.junit.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import java.net.InetAddress;
import java.util.UUID;

public class MQProducerTest extends BaseTest {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void test() throws Exception {
        for (int i = 1; i <= 5; i++) {
            MessageProperties messageProperties = new MessageProperties();
            String ip = InetAddress.getLocalHost().getHostAddress();
            messageProperties.setAppId(ip);
//            messageProperties.setUserId(String.valueOf(i));
            MessageConverter messageConverter = new SimpleMessageConverter();
            String msg = UUID.randomUUID().toString();
//            System.out.println(msg);
            Message message1 = messageConverter.toMessage(msg, messageProperties);
            rabbitTemplate.send(MessageQueueConstant.USER_SETTLEMENT_EXCHANGE, "UserSettlementRouting", message1);
            System.out.println("入队完成");
            Thread.sleep(500L);
        }
    }
}

消费者手动ACK,要实现ChannelAwareMessageListener接口,感知rabbitmq.client.Channel实例,调用channel的basicAck、basicReject等方法:

package com.sboot.mq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

import java.net.InetAddress;

@Component
@Profile(value = "dev")
@Slf4j
public class UserSettlementDevConsumer implements ChannelAwareMessageListener {

    @RabbitHandler
    @RabbitListener(queues = MessageQueueConstant.USER_SETTLEMENT_QUEUE, ackMode = "MANUAL")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.currentThread().setName(UserSettlementDevConsumer.class.getSimpleName() + System.currentTimeMillis());

        long tag = message.getMessageProperties().getDeliveryTag();
        String appId = message.getMessageProperties().getAppId();
        log.info("{}-{}, 消息出队", tag, appId);
        String receiveMsg = "";
        try {
            //核对标识,决定是否消费消息
            String ip = InetAddress.getLocalHost().getHostAddress();
            if (!ip.equals(appId)) {
                log.info("这不是我需要的消息。放回队列。{}", receiveMsg);
//                channel.basicNack(tag, false, true);
                channel.basicReject(tag, true);
//                channel.basicRecover(true);
                return;
            }

            MessageConverter messageConverter = new SimpleMessageConverter();
            receiveMsg = String.valueOf(messageConverter.fromMessage(message));
            。。。。在这里消费消息
            log.info("success " + receiveMsg);
            channel.basicAck(tag, false);

        } catch (Exception e) {
            log.error("receive message has an error, ", e);
            channel.basicNack(tag, false, true);
        }
    }

}

说明一下依赖的spring-rabbit包的版本,我的是2.2.0.RELEASE。如果是2.1.4版本里,@RabbitListener注解没有ackMode。

解决本案问题过程中的花絮:

spring-rabbit-2.1.4.RELEASEspring-rabbit-2.2.0.RELEASE

@RabbitListener的ackMode的值见枚举org.springframework.amqp.core.AcknowledgeMode

NONE-- no acks(自动消费 autoAck)MANUAL --Manual acks - user must ack/nack via a channel aware listener.(手动消费,Consumer端必须显式调用ack或nack)AUTO --

设置了手动消费,上文消费端的deliveryTag会是不同的long值。自动消费的deliveryTag是重复的1和2这样的。并且,自动消费时,如果要使用channel的ack/nack,会报异常:

2020-06-19 22:26:54.586 [AMQP Connection 192.168.40.20:5672] ERROR o.s.a.rabbit.connection.CachingConnectionFactory:1468 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-06-19 22:26:54.599 [SimpleAsyncTaskExecutor-1] ERROR c.e.z.r.p.modules.mq.UserSettlementAckConsumer:49 -
org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1092)

到此这篇关于springboot+rabbitmq实现指定消费者才能消费的文章就介绍到这了,更多相关springboot rabbitmq消费内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringBoot整合RabbitMQ, 实现生产者与消费者的功能

    自然,依赖是少不了的.除了spring-boot-starter-web依赖外. 就这个是最主要的依赖了,其他的看着办就是了.我用的是gradle,用maven的看着弄也一样的.无非就是包+包名+版本 //AMQP compile('org.springframework.boot:spring-boot-starter-amqp:2.0.4.RELEASE') 这里有一个坑.导致我后来发送消息时一直连不上去.报错: java.net.SocketException: socket closed

  • SpringBoot整合RabbitMQ 手动应答(简单demo)

    版本说明 JDK 1.8 RabbitMQ 3.7.15 Erlang 22.0 SpringBoot 2.3.3.RELEASE // TODO 2021年1月8日 整理CentOS安装RabbitMQ流程 1. 在RabbitMQ的Web管理界面,创建test队列 参数的含义 durability:是否持久化(重启或宕机后消息依然保存) durable 持久 transient 暂时 新建maven项目. 2. pom.xml <?xml version="1.0" enco

  • springboot+rabbitmq实现指定消费者才能消费的方法

    如何保证mq队列里的消息只被测试服务器上的consumer消费,避免本地环境误消费? 程序里有一个应用场景使用到了rabbitmq--当财务确认收到企业的打款金额后,系统会把企业订单生成用户付款单.由于订单记录数据量大,改为通过mq来异步实现.即财务确认收款操作后,将企业订单数据放入mq,另一端监听mq消息队列,将收到的企业订单加工转换成用户付款单,并做持久化. 本地开发环境与测试环境共用一套rabbitmq.当项目部署到测试环境后,QA测试过程中,总是"莫名其妙"的发现所保存的用户付

  • Springboot配置suffix指定mvc视图的后缀方法

    Springboot配置suffix指定mvc视图后缀 如下所示: spring: #配置MVC视图后缀 mvc: view: suffix: ".html" 配置指定后缀之后 访问welcome.html页面时只需要写"welcome"即可. @Controller public class demoController { @GetMapping("/a") public String demo(){ return "welcome

  • springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步 最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的. 于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和大伙一起学习学习! 这次我分享的是 springboot + rabbitmq 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,

  • SpringBoot+RabbitMQ方式收发消息的实现示例

    本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~ 交给Spring帮我们管理连接可以让我们专注于业务逻辑,就像声明式事务一样易用,方便又高效. 祝有好收获,先赞后看,快乐无限. 本文代码:   https://gitee.com/he-erduo/spring-boot-learning-demo https://github.com/he-erduo/spring-boot-lea

  • Springboot+rabbitmq实现延时队列的两种方式

    什么是延时队列,延时队列应用于什么场景 延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费. 那么,为什么需要延迟消费呢?我们来看以下的场景 网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝.去哪儿网) 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会 系统中的业务失败之后,需要重试 这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会.那么一天之中肯定是会有很多个

  • SpringBoot+RabbitMQ+Redis实现商品秒杀的示例代码

    目录 业务分析 创建表 功能实现 1.用户校验 2.下单 3.减少库存 4.支付 总结 业务分析 一般而言,商品秒杀大概可以拆分成以下几步: 用户校验 校验是否多次抢单,保证每个商品每个用户只能秒杀一次 下单 订单信息进入消息队列,等待消费 减少库存 消费订单消息,减少商品库存,增加订单记录 付款 十五分钟内完成支付,修改支付状态 创建表 goods_info 商品库存表 列 说明 id 主键(uuid) goods_name 商品名称 goods_stock 商品库存 package com.

  • SpringBoot+RabbitMQ 实现死信队列的示例

    前言 死信:无法被消费的消息,称为死信. 如果死信一直留在队列中,会导致一直被消费,却从不消费成功. 所以我们专门开辟了一个来存放死信的队列,叫死信队列(DLX,dead-letter-exchange). 死信的几种来源: 消息 TTL 过期(time to live,存活时间,可以用在限时支付消息) 队列达到最大长度(队列满了,无法路由到该队列) 消息被拒绝( basic.reject / basic.nack ),并且 requeue = false 环境准备配置 准备 MQ 的队列和环境

  • 手把手带你掌握SpringBoot RabbitMQ延迟队列

    目录 1. 简介 2. 安装插件 3. 实现延迟队列 3.1 引入所需依赖 3.2 application.yaml 3.3 RabbitConfig 3.4 Producer 3.5 Consumer 3.6 测试代码 3.7 启动测试 1. 简介 我们在上一篇博文中遗留了一个小问题,就是虽然TTL + DLX能实现延迟队列的功能,但是有两个问题. 首先业务场景为:比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知. 通过给

  • SpringBoot+RabbitMQ实现消息可靠传输详解

    目录 环境配置 消息丢失分析 生产阶段 生产端模拟消息丢失 RabbitMQ 消费端 环境配置 SpringBoot 整合 RabbitMQ 实现消息的发送. 1.添加 maven 依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <depen

  • springboot +rabbitmq+redis实现秒杀示例

    目录 实现说明 1.工具准备 2.数据表 3.pom 4.代码结构 5.配置config 6.订单业务层 7.redis实现层 8.mq实现层 9.redis模拟初始化库存量 10.controller控制层 11.测试 12.测试结果 实现说明 这里的核心在于如何在大并发的情况下保证数据库能扛得住压力,因为大并发的瓶颈在于数据库.如果用户的请求直接从前端传到数据库,显然,数据库是无法承受几十万上百万甚至上千万的并发量的.因此,我们能做的只能是减少对数据库的访问.例如,前端发出了100万个请求,

随机推荐