解决SpringBoot整合RocketMQ遇到的坑

应用场景

在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数据过滤、选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换。

引入依赖

 <!-- RocketMq Spring Boot Starter-->
 <dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.0.4</version>
  </dependency>

消费者代码

@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}")
public class Consumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("消费到的数据为:"+s);
    }
}

问题排查

RocketMQMessageListener整个注解默认selectorExpression为*,表示接收当前Topic下的所有数据,如果我们想对tags进行动态配置,在使用${rocketmq.selectorExpression}表达式时会发现所有数据全被过滤了,跟踪源码(ListenerContainerConfiguration.java)发现在创建listener时selectorExpression的数据在通environment环境变量中获取对应的数据后又被覆盖了,导致整个过滤条件被变更为表达式。

@Override
    public void afterSingletonsInstantiated() {
    	// 获取所有所有使用了RocketMQMessageListener注解的bean
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
        if (Objects.nonNull(beans)) {
        	// 循环注册容器
            beans.forEach(this::registerContainer);
        }
    }
    private void registerContainer(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
		// 校验当前bean是否实现了RocketMQListener接口
        if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
            throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
        }
		// 获取bean上的annotation
        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
		// 解析group及topic,可支持表达式
        String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
        String topic = this.environment.resolvePlaceholders(annotation.topic());
        boolean listenerEnabled =
            (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                .getOrDefault(topic, true);
        if (!listenerEnabled) {
            log.debug(
                "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
                consumerGroup, topic);
            return;
        }
        validate(annotation);
        String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
            counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
		// 注册bean的,调用createRocketMQListenerContainer
        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
            () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
            DefaultRocketMQListenerContainer.class);
        if (!container.isRunning()) {
            try {
                container.start();
            } catch (Exception e) {
                log.error("Started container failed. {}", container, e);
                throw new RuntimeException(e);
            }
        }
        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }
    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
        RocketMQMessageListener annotation) {
        DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
        container.setRocketMQMessageListener(annotation);
        String nameServer = environment.resolvePlaceholders(annotation.nameServer());
        nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
        String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
        container.setNameServer(nameServer);
        if (!StringUtils.isEmpty(accessChannel)) {
            container.setAccessChannel(AccessChannel.valueOf(accessChannel));
        }
        container.setTopic(environment.resolvePlaceholders(annotation.topic()));
        // 此处已经根据表达式将数据取出
        String tags = environment.resolvePlaceholders(annotation.selectorExpression());
        if (!StringUtils.isEmpty(tags)) {
            container.setSelectorExpression(tags);
        }
        container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
        // 此处将SelectorExpression的数据覆盖成了表达式
        container.setRocketMQMessageListener(annotation);
        container.setRocketMQListener((RocketMQListener)bean);
        container.setObjectMapper(objectMapper);
        container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
        container.setName(name);  // REVIEW ME, use the same clientId or multiple?
        return container;
    }

问题解决

因为ListenerContainerConfiguration类是实现了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我们可以通过反射对selectorExpression的数据在ListenerContainerConfiguration进行初始化前进行解析并赋值回去。

/**
 * 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据
**/
@Configuration
public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private StandardEnvironment environment;
    @Override
    public void afterPropertiesSet() throws Exception {
        Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
        for (Object bean : beans.values()){
            Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
            if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
                continue;
            }
            RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
            InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);
            Field field = invocationHandler.getClass().getDeclaredField("memberValues");
            field.setAccessible(true);
            Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler);
            for (Map.Entry<String,Object> entry: memberValues.entrySet()) {
                if(Objects.nonNull(entry)){
                    memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));
                }
            }
        }
    }
}

初次之外,在2.1.0版本的依赖包中已经修复了此Bug,在不造成依赖冲突的前提下,建议使用2.1.0以上的版本包。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • springboot整合rocketmq实现分布式事务

    1 执行流程 (1) 发送方向 MQ 服务端发送消息. (2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息. (3) 发送方开始执行本地事务逻辑. (4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息:MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息. (5)

  • 浅谈Springboot整合RocketMQ使用心得

    一.阿里云官网---帮助文档 https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh 按照官网步骤,创建Topic.申请发布(生产者).申请订阅(消费者) 二.代码 1.配置: public class MqConfig { /** * 启动测试之前请替换如下 XXX 为您的配置 */ public static final String PUBLIC_TOPIC = "test"

  • springBoot整合RocketMQ及坑的示例代码

    版本: JDK:1.8 springBoot:1.5.10 rocketMQ:4.2.0 pom 配置: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> </parent> <d

  • 解决SpringBoot整合RocketMQ遇到的坑

    应用场景 在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group.Topic以及selectorExpression(数据过滤.选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换. 引入依赖 <!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq<

  • 解决springboot整合druid遇到的坑

    springboot整合druid的坑 项目环境 springboot 2.1.6.RELEASE jdk 1.8 pom.xml配置 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance&

  • springboot整合spring-data-redis遇到的坑

    描述 使用springboot整合redis,使用默认的序列化配置,然后使用redis-client去查询时查询不到相应的key. 使用工具发现,key的前面多了\xAC\xED\x00\x05t\x00!这样一个串. 而且value也是不能直观可见的. 问题所在 使用springdataredis,默认情况下是使用org.springframework.data.redis.serializer.JdkSerializationRedisSerializer这个类来做序列化. org.spri

  • 解决SpringBoot整合Mybatis扫描不到Mapper的问题

    闲来无事,想学学springboot,开始搭建一个项目,但是一直显示mapper扫描不到的错误: "Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'userController': Unsa

  • SpringBoot整合RocketMQ实现消息发送和接收的详细步骤

    我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷: 最终项目结构如下: 具体步骤如下: 第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <vers

  • Springboot 整合 RocketMQ 收发消息

    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

  • Springboot 整合 RocketMQ 收发消息的配置过程

    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

  • SpringBoot整合RocketMQ的方法详解

    目录 一:Ubuntu安装RocketMQ 二:添加RocketMQ依赖 三:在application中添加RocketMQ配置 四:编写消费者,消息生产者,消息实体类(自定义) 五:测试Controller 一:Ubuntu安装RocketMQ 1.下载(在下面地址选择自己需要的版本的rocketmq) http://rocketmq.apache.org/release_notes/ 2.解压,更改配置 将下载的zip文件解压到自己需要安装的位置 在unbuntu系统下需要修改安装跟目录下的

随机推荐