SpringBoot整合MQTT并实现异步线程调用的问题

目录
  • 为什么选择MQTT
  • 使用背景
  • 代码实现
    • 基础代码
    • 异步线程处理实现

为什么选择MQTT

MQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用最简单直观的方式让每一位刚接触的同行们可以最快的应用起来

先从使用MQTT需要什么开始分析:

  • 消息服务器
  • 不同应用/设备之间的频繁交互
  • 可能涉及一对多的消息传递

基于SpringBoot通过注解实现对mqtt消息处理的异步调用

使用背景

生产环境下, 由于mqtt 生产者生产的消息逐渐增多, 可能会导致消息堆积. 因此需要消费者去快速的消费.
而其中的一个方案便是使用异步线程去加速消费消息. 下面介绍下思路

我们可以在原来的mqtt工具类上面进行改装.
首先创建一个类MqttMessageListener并继承IMqttMessageListener实现messageArrived, 用于处理这些消息(业务编写)
然后改写mqtt客户端订阅的方法, 注入MqttMessageListener, 并在订阅方法中新增该参数
在然后在启动类开启异步线程, 编写一个配置类配置线程池参数并且在messageArrived加上@Async开启异步线程调用

代码实现

基础代码

指没有开启线程池的代码

MqttPushClient 主要定义了连接参数

import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @Author
 * @Date
 * @Description  连接至EMQ X 服务器,获取mqtt连接,发布消息
 */
@Component
public class MqttPushClient{

    private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private PushCallback pushCallback;

    private static MqttClient client;

    public static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }

    public static MqttClient getClient() {
        return client;
    }

    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive, List<String> topicList) {
        MqttClient client;
        try {
            client = new MqttClient(host, clientID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            if (username != null) {
                options.setUserName(username);
            }
            if (password != null) {
                options.setPassword(password.toCharArray());
            }
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            MqttPushClient.setClient(client);
            try {
                //设置回调类
                client.setCallback(pushCallback);
                //client.connect(options);
                IMqttToken iMqttToken = client.connectWithResult(options);
                boolean complete = iMqttToken.isComplete();
                log.info("MQTT连接"+(complete?"成功":"失败"));
                /** 订阅主题 **/
                for (String topic : topicList) {
                    log.info("连接订阅主题:{}", topic);
                    client.subscribe(topic, 0);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

PushCallback 回调类, 实现重连, 消息发送监听, 消息接收监听

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Author
 * @Date
 * @Description  消息回调,处理接收的消息
 */
@Component
public class PushCallback implements MqttCallback {

    private static final Logger log = LoggerFactory.getLogger(PushCallback.class);

    @Autowired
    private MqttConfiguration mqttConfiguration;
    @Autowired
    private MqttTopic mqttTopic;

    @Override
    public void connectionLost(Throwable cause) {        // 连接丢失后,一般在这里面进行重连
        log.info("连接断开,正在重连");
        MqttPushClient mqttPushClient = mqttConfiguration.getMqttPushClient();
        if (null != mqttPushClient) {
            mqttPushClient.connect(mqttConfiguration.getHost(), mqttConfiguration.getClientid(), mqttConfiguration.getUsername(),
                    mqttConfiguration.getPassword(), mqttConfiguration.getTimeout(), mqttConfiguration.getKeepalive(), mqttConfiguration.getTopic());
            log.info("已重连");
        }

    }

    /**
     * 发送消息,消息到达后处理方法
     * @param token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        int messageId = token.getMessageId();
        String[] topics = token.getTopics();
        log.info("消息发送完成,messageId={},topics={}",messageId,topics.toString());
    }

    /**
     * 订阅主题接收到消息处理方法
     * @param topic
     * @param message
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) {
         // subscribe后得到的消息会执行到这里面,这里在控制台有输出
        String messageStr = new String(message.getPayload());
        // messageDistribute.distribute(topic, messageStr);
        log.info("接收的主题:" + topic +  ";接收到的信息:" + messageStr);
        }
  }

MqttConfiguration 配置了mqtt相关参数, 并初始化连接(mqtt在这里启动)

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @Author
 * @Date mqtt配置及连接
 * @Description
 */
@Slf4j
@Component
@Configuration
@ConfigurationProperties(MqttConfiguration.PREFIX)
public class MqttConfiguration {

    @Autowired
    private MqttPushClient mqttPushClient;

    /**
     * 指定配置文件application-local.properties中的属性名前缀
     */
    public static final String PREFIX = "std.mqtt";

    private String host;
    private String clientId;
    private String userName;
    private String password;
    private int timeout;
    private int keepAlive;
    private List<String> topic;

    public String getClientid() {
        return clientId;
    }

    public void setClientid(String clientid) {
        this.clientId = clientid;
    }

    public String getUsername() {
        return userName;
    }

    public void setUsername(String username) {
        this.userName = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getTimeout() {
        return timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public int getKeepalive() {
        return keepAlive;
    }

    public void setKeepalive(int keepalive) {
        this.keepAlive = keepalive;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public List<String> getTopic() {
        return topic;
    }

    public void setTopic(List<String> topic) {
        this.topic = topic;
    }

    /**
     * 连接至mqtt服务器,获取mqtt连接
     * @return
     */
    @Bean
    public MqttPushClient getMqttPushClient() {
        //连接至mqtt服务器,获取mqtt连接
        mqttPushClient.connect(host, clientId, userName, password, timeout, keepAlive, topic);
        return mqttPushClient;
    }
}

properties.yml 配置文件
std.mqtt:
  host: tcp://x.x.x.x:1883
  username: your_username
  password: your_password
  #MQTT-连接服务器默认客户端ID
  clientid: your_clientid
  #连接超时
  timeout: 1000
  # deviceId
  deviceId: your_deviceId
  # mqtt-topic
  topic[0]: your_tpoic

TopicOperation 定义了发布订阅的方法

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * @Author chy
 */
public class TopicOperation {

    private static final Logger log = LoggerFactory.getLogger(TopicOperation.class);

    /**
     * 订阅主题
     * @param topic 主题名称
     */
    public static void subscribe(String topic) {
        try {
            MqttClient client = MqttPushClient.getClient();
            if (client == null) {
                return;
            };
            client.subscribe(topic, 0);
            log.info("订阅主题:{}",topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 发布主题
     *
     * @param topic
     * @param pushMessage
     */
    public static void publish(String topic, String pushMessage) {

        log.info("SEND TO MQTT -- topic : {}, message : {}", topic, pushMessage);

        MqttMessage message = new MqttMessage();
        message.setQos(0);
        // 非持久化
        message.setRetained(false);
        message.setPayload(pushMessage.getBytes());
        MqttClient client = MqttPushClient.getClient();
        if (client == null) {
            return;
        };
        MqttTopic mTopic = client.getTopic(topic);
        if (null == mTopic) {
            log.error("主题不存在:{}",mTopic);
        }
        try {
            mTopic.publish(message);
        } catch (Exception e) {
            log.error("mqtt发送消息异常:",e);
        }
    }

}

定义了发布和订阅的相关主题

import com.sxd.onlinereservation.exception.BusinessException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * @Author
 * @Date topic名称
 * @Description
 */
@Component
public class MqttTopic {

    @Value("${std.mqtt.deviceId}")
    private String[] deviceId;

    public String getSubscribeTopic(String type){
        switch (type){

            case "appointTopic":
                return String.format("/v1/%s/service/appointTopic", deviceId[0]);
            default:
                throw new BusinessException("mqtt 订阅主题获取错误");
        }
    }

    public String getPublishTopic(String type) {
        switch (type){
                //1.0接口立即取号发布主题
            case "appointTopic":
                return String.format("/v1/%s/service/appointTopic", deviceId[1]);
            default:
                throw new BusinessException("mqtt 发布主题获取错误");
        }
    }
}

ps: 如果想要使用该工具类进行消息发送和接收看下面demo

//消息发布操作
 TopicOperation.publish(mqttTopic.getPublishTopic("appointTopic"), "消息体"));
 //消息订阅操作
  TopicOperation.subscribe(mqttTopic.getSubscribeTopic("appointTopic"), "消息体"));

异步线程处理实现

总结

  • 创建消息监听类 , 用于监听消息并进行业务处理
  • 在原来订阅时, 注入并使用第一步创建的监听类
  • 通过注解开启异步线程并配置处理方式

创建消息监听类 , 用于监听消息并进行业务处理

@Slf4j
@Component
public class MqttMessageListener implements IMqttMessageListener {

    @Resource
    private BusinessService businessService;
    @Autowired
    private MqttTopic mqttTopic;
    @Autowired
    private ThreeCallmachineService threeCallmachineService;
    @Autowired
    private BusinessHallService businessHallService;
    @Autowired
    private BusinessMaterialService businessMaterialService;
    @Autowired
    private BusinessWaitService businessWaitService;
    @Autowired
    private AppointmentService appointmentService;

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String messageStr = new String(message.getPayload());
        log.info("接收的主题:" + topic +  ";接收到的信息:" + messageStr);
        //进行 业务处理
        }
}

在原来订阅时, 注入并使用第一步创建的监听类

注入了 MqttMessageListener , 并且在订阅时加入 client.subscribe(topic, mqttMessageListener);

修改MqttPushClient (必须)

@Component
public class MqttPushClient{

    private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private PushCallback pushCallback;
    @Autowired   //这里进行了注入操作
    private MqttMessageListener mqttMessageListener;

    private static MqttClient client;

    public static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }

    public static MqttClient getClient() {
        return client;
    }

    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive, List<String> topicList) {
        MqttClient client;
        try {
            client = new MqttClient(host, clientID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            if (username != null) {
                options.setUserName(username);
            }
            if (password != null) {
                options.setPassword(password.toCharArray());
            }
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            MqttPushClient.setClient(client);
            try {
                //设置回调类
                client.setCallback(pushCallback);
                //client.connect(options);
                IMqttToken iMqttToken = client.connectWithResult(options);
                boolean complete = iMqttToken.isComplete();
                log.info("MQTT连接"+(complete?"成功":"失败"));
                /** 订阅主题 **/
                for (String topic : topicList) {
                    log.info("连接订阅主题:{}", topic);
                    //client.subscribe(topic, 0);
                    client.subscribe(topic, mqttMessageListener);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

如果业务还使用了手动订阅, 则也需要在订阅的类上面注入MqttMessageListener , 并且在订阅方法中作为参数使用. 但是我们需要将方法改成非静态的, 因此在使用该方法时我们需要new该对象然后才能够调用. 但是手动订阅很少用到. 因此有无此步骤都可

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * @Author chy
 * @Date
 * @Description
 */
public class TopicOperation {

    private static final Logger log = LoggerFactory.getLogger(TopicOperation.class);

	//注入MqttMessageListener
    @Autowired
    private MqttMessageListener mqttMessageListener;

    /**
     * 订阅主题
     * @param topic 主题名称
     */
    public void subscribe(String topic) {
        try {
            MqttClient client = MqttPushClient.getClient();
            if (client == null) {
                return;
            };
           //client.subscribe(topic, 0);
           //在订阅方法中作为参数使用
            client.subscribe(topic, mqttMessageListener);
            log.info("订阅主题:{}",topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 发布主题
     *
     * @param topic
     * @param pushMessage
     */
    public static void publish(String topic, String pushMessage) {

        log.info("SEND TO MQTT -- topic : {}, message : {}", topic, pushMessage);

        MqttMessage message = new MqttMessage();
        message.setQos(0);
        // 非持久化
        message.setRetained(false);
        message.setPayload(pushMessage.getBytes());
        MqttClient client = MqttPushClient.getClient();
        if (client == null) {
            return;
        };
        MqttTopic mTopic = client.getTopic(topic);
        if (null == mTopic) {
            log.error("主题不存在:{}",mTopic);
        }
        try {
            mTopic.publish(message);
        } catch (Exception e) {
            log.error("mqtt发送消息异常:",e);
        }
    }

}

通过注解开启异步线程并配置处理方式 启动类开启 @EnableAsync(proxyTargetClass=true )

@SpringBootApplication
@MapperScan(basePackages = "com.x.x.mapper")
@EnableTransactionManagement
@EnableAsync(proxyTargetClass=true )
public class XXApplication {

    public static void main(String[] args) {
        SpringApplication.run(XXApplication.class, args);
    }

}

配置类配置线程池参数

@Slf4j
@Configuration
public class ExecutorConfig {

    @Bean
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(9);
        //配置最大线程数
        executor.setMaxPoolSize(20);
        //配置队列大小
        executor.setQueueCapacity(200);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("sxd-async-service-");
        // 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}

MqttMessageListener的实现方法messageArrived开启@Async("asyncServiceExecutor")

@Slf4j
@Component
public class MqttMessageListener implements IMqttMessageListener {

    @Resource
    private BusinessService businessService;
    @Autowired
    private MqttTopic mqttTopic;
    @Autowired
    private ThreeCallmachineService threeCallmachineService;
    @Autowired
    private BusinessHallService businessHallService;
    @Autowired
    private BusinessMaterialService businessMaterialService;
    @Autowired
    private BusinessWaitService businessWaitService;
    @Autowired
    private AppointmentService appointmentService;

    @Override
    @Async("asyncServiceExecutor")
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String messageStr = new String(message.getPayload());
        log.info("接收的主题:" + topic +  ";接收到的信息:" + messageStr);
        System.out.println("线程名称:【" + Thread.currentThread().getName() + "】");
        //进行 业务处理
        }
}

到此这篇关于SpringBoot整合MQTT并实现异步线程调用的文章就介绍到这了,更多相关SpringBoot异步线程调用内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringBoot集成mqtt的多模块项目配置详解

    前言 近期为了准备毕设,准备使用SpringBoot搭建mqtt后端,本篇主要记录了在IDEA中搭建SpringBoot mqtt的多模块项目的过程 开发工具及系统环境 IDE:IntelliJ IDEA 2020.2 操作系统:Windows 10 2004 Java Version:1.8 SpringBoot Version:2.1.17.RELEASE 项目路径 Study |----study-common # 存放公共类 |----study-mapper # mapper层 |--

  • springboot 实现mqtt物联网的示例代码

    Springboot整合mybatisPlus+mysql+druid+swaggerUI+ mqtt 整合mqtt整合druid整合mybatis-plus完整pom完整yml整合swaggerUi整合log4j MQTT 物联网系统基本架构本物联网系列 mqtt) 整合mqtt <!--mqtt依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>

  • springboot集成mqtt的实践开发

    序 MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景.这里简单介绍一下如何在springboot中集成. maven <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</arti

  • SpringBoot2.0集成MQTT消息推送功能实现

    这几天在弄后端管理系统向指定的Android客户端推送消息的功能模块,查阅了网上很多博客介绍的许多方式,最终选择基于MQTT协议来实现,MQTT是一个轻量级的消息发布/订阅协议,它是实现基于手机客户端的消息推送服务器的理想解决方案. 实现MQTT协议的中间件有很多,我用的是Apollo服务器,如何搭建MQTT服务器,请查阅其他资料.这里,主要介绍SpringBoot2.0集成MQTT实现消息推送的功能.好,正式开始: 本文采用Gateway绑定的方式,网上也有介绍但不全面,还有其他采用Paho

  • SpringBoot 异步线程间传递上下文方式

    目录 异步线程间传递上下文 需求 实现 启用异步功能 配置异步 配置任务装饰器 启用多线程安全上下文无法在线程间共享问题 问题 解决方案 原理 结果 异步线程间传递上下文 需求 SpringBoot项目中,经常使用@Async来开启一个子线程来完成异步操作.主线程中的用户信息需要传递给子线程 实现 启用异步功能 在启动类里加上@EnableAsync注解 @EnableAsync @SpringBootApplication public class Application {} 配置异步 新建

  • SpringBoot+MQTT+apollo实现订阅发布功能的示例

    由于最近公司在开发一款后台与安卓的更新系统,经过再三研究之后,也是选择Mqtt这个目前流行的框架.为了能够让项目运营起来,最终虽说是选择ActiveMQ.但在这个过程中,也是发现Apollo作为服务器也是相当不错.当然对于后者已经被apace放弃,不过今天还是和大家整理一下SpringBoot+MQTT+apollo实现订阅发布功能的全过程. 对于项目首先需要用到的前提东西,比如Apollo如何下载,以及MQTT测试工具在这里就不多说.如果真的不懂私聊Damon吧,在这里就不浪费时间. 对于项目

  • SpringBoot整合MQTT并实现异步线程调用的问题

    目录 为什么选择MQTT 使用背景 代码实现 基础代码 异步线程处理实现 为什么选择MQTT MQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用最简单直观的方式让每一位刚接触的同行们可以最快的应用起来 先从使用MQTT需要什么开始分析: 消息服务器 不同应用/设备之间的频繁交互 可能涉及一对多的消息传递 基于SpringBoot通过注解实现对mqtt消息处理的异步调用 使用背景 生产环境下, 由于mqtt 生产者生产的消息逐渐增多, 可能会导致消息堆积. 因此需要消

  • SpringBoot整合MQTT小结汇总

    目录 前言: 一.什么是mqtt 二.主要思想 发布/订阅模式 三.MQTT重要概念 3.1 MQTT Client 3.2 MQTT Broker 3.3 MQTT Connection 3.4 MQTT主要参数 四.软件和Apollo 4.1 安装Apollo 4.2 安装Postman 4.3 安装MQTTBox 五.代码实现 5.1 配置pom.xml 5.2 配置MQTT服务器基本信息 5.3 配置读取yml文件的类MqttConfiguration 5.4  MQTT生产端的Hand

  • Springboot整合mqtt服务的示例代码

    首先在pom文件里引入mqtt的依赖配置 <!--mqtt--> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.4</version> </dependency> 其次在springboot 的配置yml文件,配

  • 从零开始学springboot整合feign跨服务调用的方法

    介绍 微服务横行的互联网世界, 跨服务调用显得很平凡, 我们除了采用传统的http方式接口调用, 有没有更为优雅方便的方法呢? 答案是肯定的,feign就提供了轻便的方式! 如果你的服务都注册了注册中心,比如nacos, 那么调用会显得很轻松, 只需一个注解, 带上需要调用的服务名即可,**feign + nacos**会帮你做剩余的事. 如果没有注册中心, 也无需担心, feign一样可以以传统的 ip:port 方式进行调用~ 下面,我们来实践下吧 springboot整合feign 引入依

  • SpringBoot整合Dubbo框架,实现RPC服务远程调用

    一.Dubbo框架简介 1.框架依赖 图例说明: 1)图中小方块 Protocol, Cluster, Proxy, Service, Container, Registry, Monitor 代表层或模块,蓝色的表示与业务有交互,绿色的表示只对 Dubbo 内部交互. 2)图中背景方块 Consumer, Provider, Registry, Monitor 代表部署逻辑拓扑节点. 3)图中蓝色虚线为初始化时调用,红色虚线为运行时异步调用,红色实线为运行时同步调用. 4)图中只包含 RPC

  • SpringBoot使用异步线程池实现生产环境批量数据推送

    目录 前言 编写线程池配置类 编写异步服务 异步批量上报数据 总结 前言 SpringBoot使用异步线程池: 1.编写线程池配置类,自定义一个线程池: 2.定义一个异步服务: 3.使用@Async注解指向定义的线程池: 这里以我工作中使用过的一个案例来做描述,我所在公司是医疗行业,敏感数据需要上报到某监管平台,所以有一个定时任务在流量较小时(一般是凌晨后)执行上报行为.但特殊时期会存在一定要在工作时间大批量上报数据的情况,且要求短时间内就要完成,此时就考虑写一个专门的异步上报接口手动执行,利用

  • springboot 正确的在异步线程中使用request的示例代码

    目录 起因: 发现有人踩过坑,但是没解决 尝试寻找官方支持 尝试自己解决 还是甩给官方 解决 结论 起因: 有后端同事反馈在异步线程中获取了request中的参数,然后下一个请求是get请求的话,发现会偶尔出现参数丢失的问题. 示例代码: @GetMapping("/getParams") public String getParams(String a, int b) { return "get success"; } @PostMapping("/po

  • Springboot 整合maven插口调用maven release plugin实现一键打包功能

    maven release plugin配置 参考https://www.cnblogs.com/jiujixin/p/16003321.html 配置好pom. 整合maven-invoker使程序去执行mvn命令 1.导包 <dependency> <groupId>org.apache.maven.shared</groupId> <artifactId>maven-invoker</artifactId> <version>3

  • springboot整合netty-mqtt-client实现Mqtt消息的订阅和发布示例

    目录 1.添加依赖 2.源码 3.运行测试 1.添加依赖 <dependency> <groupId>org.jetlinks</groupId> <artifactId>netty-mqtt-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>junit</groupId

随机推荐