解决springboot集成rocketmq关于tag的坑

springboot集成rocketmq关于tag的坑

新项目使用springboot的若依框架集成rocketmq,选择集成RocketMQTemplate这种方式实现消息的发送和接收。

1.客户端发送代码

此处回调方法里有些业务不用关注,只关心发送方法

@Component
public class RocketMqHelper {
    Logger logger = LoggerFactory.getLogger(RocketMqHelper.class);
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    public void send(ReqMsg msg){
        rocketMQTemplate.asyncSend(msg.getMsg().getTopic()+":"+msg.getMsg().getTags(),
        msg.getMsg(),
        new SendCallback(){
            @Override
            public void onSuccess(SendResult sendResult) {
                logger.debug("msgid:{} 发送成功" , sendResult.getMsgId());
                logger.debug("发送mq成功后要执行的service:
                          {}",msg.getMsg().getSendAfterMethod());
                IsaveSendAfterMqLog saveSendAfterMqLog =
                SpringUtils.getBean(msg.getMsg().getSendAfterMethod());
                saveSendAfterMqLog.saveSendAfterMqLog(new
                               SendAfterLog(msg.getMsg(),sendResult,"0"));
            }

            @Override
            public void onException(Throwable throwable) {
                logger.error("mq发送异常!{}",throwable.toString());
                logger.debug("发送mq失败后执行的service:
                               {}",msg.getMsg().getSendAfterMethod());
                //异常描述截取500 length入库
                msg.getMsg().putUserProperty("exceptionDesc",throwable.toString());
                IsaveSendAfterMqLog saveSendAfterMqLog =
                SpringUtils.getBean(msg.getMsg().getSendAfterMethod());
                saveSendAfterMqLog.saveSendAfterMqLog(new
                                     SendAfterLog(msg.getMsg(),"1"));
            }
        });
    }
}

2.服务端监听消息

@Service
@RocketMQMessageListener(topic = "${rocketmq.topic}", consumerGroup = "${rocketmq.consumer.group}",
        selectorExpression="${rocketmq.tags}")
public class CbiRocketmqConsumer implements RocketMQListener<CbiMsg> {
    Logger logger = LoggerFactory.getLogger(CbiRocketmqConsumer.class);
    @Override
    public void onMessage(CbiMsg message) {
        String msgBody = new String(message.getBody());
        String serviceName = message.getTags();
        logger.info("本次消费服务名称:{}",serviceName);
        AbSaveReceiveAfter saveReceiveAfter = SpringUtils.getBean(serviceName);
        saveReceiveAfter.saveReceiveAfter(new RecevieAfterLog(message,
        Constants.CONSUME_SUCCESS));//默认消费成功
    }
}

@RocketMQMessageListener这个注解里selectorExpression默认是*,接收topic下全部消息。想动态对tags进行配置。于是利用springboot获取yml配置。写死的时候没有问题,但是改成$表达式配置后怎么都收不到消息,经排查居然是selectorExpression这个不支持配置,会原封的按表达式进入MQ容器初始化。然而注解里面的topic,comsumerGroup都可以正常拿到配置值。

翻源码发现问题所在,项目启动时,在ListenerContainerConfiguration在这个类里初始化mq容器时,对配置进行赋值

private DefaultRocketMQListenerContainer createRocketMQListenerContainer(Object bean, RocketMQMessageListener annotation) {
        DefaultRocketMQListenerContainer container = new
                                    DefaultRocketMQListenerContainer();
        container.setNameServer(rocketMQProperties.getNameServer());
        container.setTopic(environment.resolvePlaceholders(annotation.topic()));
        container.setConsumerGroup(environment.resolvePlaceholders
                                                  (annotation.consumerGroup()));
        container.setRocketMQMessageListener(annotation);
        container.setRocketMQListener((RocketMQListener) bean);
        container.setObjectMapper(objectMapper);
        return container;
    }

topic和comsumerGroup都在springboot环境里获取配置值了,唯独selectorExpression这个没有,直接默认注解里的。下面的问题就是需要自己在项目启动,springboot容器起来,但是rocketmq容器未起的时候,动态去改注解里配置的值。然后让Rocketmq启动。

**
 *  因为RocketMQMessageListener不提供动态配置功能
 *  springboot初始化后rocket容器初始化前利用反射动态改变
 *  RocketMQMessageListener注解selectorExpression的值
 *
 *
 */
@Component
public class ChangeSelectorExpressionBeforeMqStart implements InitializingBean {

    @Value("${rocketmq.consumer.tags}")
    private String tags;

    @Override
    public void afterPropertiesSet() throws Exception {
        RocketMQMessageListener annoTable =
         CbiRocketmqConsumer.class.getAnnotation(RocketMQMessageListener.class);
        // 获取代理处理器
        InvocationHandler invocationHandler = Proxy.getInvocationHandler(annoTable);
        // 获取私有 memberValues 属性
        Field f = invocationHandler.getClass().getDeclaredField("memberValues");
        f.setAccessible(true);
        // 获取实例的属性map
        Map<String, Object> memberValues = (Map<String, Object>)
        f.get(invocationHandler);
        // 修改属性值
        memberValues.put("selectorExpression", tags);
    }
}

问题解决。。

SpringBoot集成RocketMQ及报错处理

项目场景:

说明:springBoot集成RocketMQ开发

环境:阿里云+Centos8+RocketMQ+SpringBoot+Docker

启动:docker start rmqserver rmqbroker[因为RocketMQ安装在Docket容器中,所以这样启动]

服务器broker.conf配置信息:

brokerIP1=外网ip
namesrvAddr=外网ip:9876
brokerName=broker_tanhua
autoCreateTopicEnable=true

说明:

1.brokerIP1 当前broker监听的IP

2.Broker是RocketMq的核心,负责消息的传递(提供者=》消费者)以及消息的持久化存储,消息的HA机制以及服务器过滤功能。

3.autoCreateTopicEnable:自动创建Topic路由

问题一描述:

我第一次配置时,broker.conf配置文件中没有配置autoCreateTopicEnable,因此在程序运行时会提示没有路由信息:No route info of this topic: tanhua-sso-login

我发送消息路由名字是tanhua-sso-login

错误信息:

No route info of this topic: tanhua-sso-login

错误信息截图:我没有截图网上找了一个,差不多

解决方式:

我当时也在网上找了很多,有在启动时添加自动创建的也有说防火墙开启的原因,但是我感觉会这个的话应该都知道关防火墙。

在启动时添加自动创建可能也好使,但是我没试过,因为我在搜索时发现问题统一指向说没有自动创建,因此我想的是直接在配置文件中进行修改,然后重启

解决方式:

在broker.conf配置文件中添加如下配置:

autoCreateTopicEnable=true

SpringBoot集成信息:

application.properties:

# RocketMQ相关配置
rocketmq.nameServer=外网IP:9876
rocketmq.producer.group=tanhua
rocketmq.producer.send-message-timeout= 6000

【注】:这里配置的开通没有spring,我之前加spring怎么也连接不上

pom.xml:

<!--RocketMQ相关-->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>2.0.4</version>
            </dependency>
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.5.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-common</artifactId>
                <version>4.5.1</version>
            </dependency>

问题二描述:

我在修改上面的错误后,紧接着又报

错误信息:

RemotingTooMuchRequestException: sendDefaultImpl call timeout

错误信息截图:也是没有截图网上找了一个,差不多

思路:错误信息中提示call timeout,timeout一般想到到时连接或响应超时,因此在网上找到的是在发送MQ时出错,网上解决方案是:修改Mq配置文件中的sendMsgTimeout,因此想到修改可以修改SpringBoot连接MQ时的配置设置

解决方案:添加rocketmq.producer.send-message-timeout= 6000

说明:给大一点发送信息超时时间。

说明:同时在SpringBoot集成RoctetMQ配置中没有sendMsgTimeout因此用rocketmq=>输入'.'=>输入sendtimeout=>查看有哪些关于这个的配置。

完整配置:

# RocketMQ相关配置
rocketmq.nameServer=外网IP:9876
rocketmq.producer.group=tanhua
rocketmq.producer.send-message-timeout= 6000

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • springBoot整合RocketMQ及坑的示例代码

    版本: JDK:1.8 springBoot:1.5.10 rocketMQ:4.2.0 pom 配置: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> </parent> <d

  • 浅谈Springboot整合RocketMQ使用心得

    一.阿里云官网---帮助文档 https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh 按照官网步骤,创建Topic.申请发布(生产者).申请订阅(消费者) 二.代码 1.配置: public class MqConfig { /** * 启动测试之前请替换如下 XXX 为您的配置 */ public static final String PUBLIC_TOPIC = "test"

  • 解决SpringBoot整合RocketMQ遇到的坑

    应用场景 在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group.Topic以及selectorExpression(数据过滤.选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换. 引入依赖 <!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq<

  • Springboot RocketMq实现过程详解

    首先,在虚拟机上安装rocketmq和rocketMq可视化控制,安装不做描述. 1.pom.xml文件添加依赖 mq的版本与连接的rocketmq版本保持一致 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-remoting</artifactId> <version>4.4.0</version> </depende

  • 解决springboot集成rocketmq关于tag的坑

    springboot集成rocketmq关于tag的坑 新项目使用springboot的若依框架集成rocketmq,选择集成RocketMQTemplate这种方式实现消息的发送和接收. 1.客户端发送代码 此处回调方法里有些业务不用关注,只关心发送方法 @Component public class RocketMqHelper { Logger logger = LoggerFactory.getLogger(RocketMqHelper.class); @Resource private

  • SpringBoot集成RocketMQ发送事务消息的原理解析

    目录 简介 原理 具体实现 消费者 消费者 生产者消息监听器 消息事务测试 正常测试 异常测试 代码调整 执行结果 总结 简介 RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致. 原理 RocketMQ事务消息通过异步确保方式,保证事务的最终一致性.设计的思想可以借鉴两个阶段提交事

  • 解决springboot集成swagger碰到的坑(报404)

    一:项目使用springboot集成swagger进行调试 配置swagger非常简单,主要有三步: 1.添加swagger依赖 <!-- 引入 swagger等相关依赖 --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.6.1</version> <

  • 解决SpringBoot集成Eureka导致返回结果由json变为xml的问题

    SpringBoot集成Eureka导致返回结果由json变为xml 解决方案 在请求的Mapping上加上 produces = { "application/json;charset=UTF-8" } 例如: @GetMapping(value = "/user-instance", produces = { "application/json;charset=UTF-8" }) 以下是json和xml @GetMapping(value =

  • 解决spring集成redisson踩过的坑

    目录 spring集成redisson踩过的坑 第一坑就是版本兼容问题 第二个坑是设置密码问题 spring整合redisson配置 配置方式 单节点配置standalone 哨兵配置sentinel 集群配置cluster 主从部署方式(master/slave) spring集成redisson踩过的坑 我用spring的xml集成一直报错,所以只能选择注解方式: @Configuration public class RedissionConfig { Logger log = Logge

  • 如何解决SpringBoot集成百度UEditor图片上传后直接访问404

    SpringBoot项目上传图片一般是上传至远程服务器存储,开发过程中可能会上传至当前项目的某个静态目录中,此时就会遇到这个问题,文件在上传之后直接访问并不能被访问到,必须重新加载项目. 首先分析一下原因: 我们知道,如果使用类似 /upload/image/1.jpg 这种格式进行图片的访问的时候,SpringBoot读取的并不是本项目中直接的静态目录,而是在进行编译的时候生成target目录下的文件,如下图所示: 那么问题就来了,我们在运行的过程中上传一个图片的话,并不能重新加载当前这个项目

  • 解决SpringBoot配置文件application.yml遇到的坑

    目录 配置文件application.yml遇到的坑 1.第一个坑,原代码 解决办法 2.第二个坑,原代码参见下图 解决办法 配置文件application.yml的注意事项 这类似于 还有一种配置是properties文件配置 配置文件application.yml遇到的坑 1.第一个坑,原代码 username:root password:123456 项目启动报以下异常: Caused by: org.yaml.snakeyaml.scanner.ScannerException: whi

  • 解决springboot 2.x集成log4j2调试日志无法关闭的问题

    springboot2.x集成log4j2时,始终无法关闭log4j2自身的日志输出 已经做了如下配置: 在log4j2.xml的配置文件中,配置configuration的status属性为OFF: 确认系统所有地方无配置log4j2.debug: 如上配置都无法解决问题,只能从源码着手一探究竟. 从log4j2-api包中,找到StatusLogger,其设置日志输出level的代码如下: private StatusLogger(final String name,final Messag

  • 详解SpringBoot集成jsp(附源码)+遇到的坑

    本文介绍了SpringBoot集成jsp(附源码)+遇到的坑 ,分享给大家 1.大体步骤 (1)创建Maven web project: (2)在pom.xml文件添加依赖: (3)配置application.properties支持jsp (4)编写测试Controller (5)编写JSP页面 (6)编写启动类App.java 2.新建SpringInitialzr 3.pom文件 <dependencies> <dependency> <groupId>org.s

随机推荐