Spring boot 集成 MQTT详情

目录
  • 一、简介
  • 二、主要特性
  • 三、集成步骤
    • 1.引入相关jar包
    • 2.核心配置类
    • 3.网关配置
    • 4.编写测试类
    • 5.yml配置信息

一、简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,可以以极少的代码和有限的带宽为连接远程设备提供实时可靠的消息服务。目前在物联网、小型设备、移动应用等方面有较广泛的应用。

二、主要特性

  • (1)使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
  • (2)对负载内容屏蔽的消息传输。
  • (3)使用TCP/IP提供网络连接。
  • (4)有三种消息发布服务质量:

“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。

“至少一次”,确保消息到达,但消息重复可能会发生。

“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。

  • (5)小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。
  • (6)使用Last Will和Testament特性通知有关各方客户端异常中断的机制。

Last Will:即遗言机制,用于通知同一主题下的其他设备发送遗言的设备已经断开了连接。

Testament:遗嘱机制,功能类似于Last Will。

三、集成步骤

1.引入相关jar包

  <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-mqtt</artifactId>
</dependency>  

2.核心配置类

@Configuration
public class MqttConfig
{
    @Autowired
    private MqttProperties mqttProperties;
    /**
     * 连接器
     * @return
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions()
    {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        // 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
        mqttConnectOptions.setCleanSession(true);
        // 设置超时时间,默认30秒
        mqttConnectOptions.setConnectionTimeout(mqttProperties.getConnectionTimeOut());
        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
        mqttConnectOptions.setAutomaticReconnect(true);
        // 设置连接的用户名
        mqttConnectOptions.setUserName(mqttProperties.getUsername());
        // 设置连接的密码
        mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
        //服务器地址
        mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()});
        mqttConnectOptions.setKeepAliveInterval(2);
        return mqttConnectOptions;
    }
    /***
     * MQTT客户端
     * @return
     */
    @Bean("mqttClientFactory")
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    /*-----------------     消息生产者的配置       ---------------------*/
    /**
     * MQTT生产端发布处理器
     *
     * @return {@link org.springframework.messaging.MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getProducerClientId(), mqttClientFactory());
        messageHandler.setAsync(true);
        return messageHandler;
    }
    /**
     *  MQTT生产端发布通道
     * @return
     */
    @Bean("mqttOutboundChannel")
    public MessageChannel mqttOutboundChannel()
    {
        return new DirectChannel();
    }
    /*-----------------   消息消费者的配置       ---------------------*/

    /**
     * MQTT消费端订阅通道
     *
     * @return {@link org.springframework.messaging.MessageChannel}
     */
    @Bean(name = "mqttInboundChannel")
    public MessageChannel mqttInboundChannel() {
      return new DirectChannel();
    }

    /**
     * MQTT消费端连接配置
     *
     * @param channel {@link org.springframework.messaging.MessageChannel}
     * @param factory {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
     * @return {@link org.springframework.integration.core.MessageProducer}
     */
    @Bean
    public MessageProducer inbound(
        @Qualifier("mqttInboundChannel") MessageChannel channel,
        @Qualifier("mqttClientFactory") MqttPahoClientFactory factory) {
      MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getConsumerClientId(), factory, "test");
      adapter.setCompletionTimeout(30000);
      adapter.setConverter(new DefaultPahoMessageConverter());
      // 0 至多一次,数据可能丢失
      // 1 至少一次,数据可能重复
      // 2 只有一次,且仅有一次,最耗性能
      adapter.setQos(1);
      // 设置订阅通道
      adapter.setOutputChannel(channel);
      return adapter;
    }
}
@ConfigurationProperties("mqtt")
@Component
public class MqttProperties implements Serializable
{
    private static final long serialVersionUID = -1425980007744001158L;
    private String url;
    private String username;
    private String password;
    private int keepAlive;
    private int connectionTimeOut;
    private String producerClientId;
    private String producerQos;
    private String consumerClientId;
    private String consumerQos;
    private String consumerTopic;
    private int completionTimeout;
    private String defaultTopic;
    //get、set方法省略
  }

3.网关配置

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway
{
   void sendToMqtt(byte[] data,@Header(MqttHeaders.TOPIC) String topic);
}

4.编写测试类

  @Autowired
  private MqttGateway mqttGateway;
  @RequestMapping("/sendTest")
  public String sendMqttTest(String msg)
  {
      mqttGateway.send("test",msg);
      return "OK";
  }

5.yml配置信息

mqtt:
  url: tcp://localhost:1883
  username: test
  password: test1234
  keep-alive: 30
  connection-timeout: 3000
  producerClientId:  test-producer
  producerQos: 1
  consumerClientId: test-consumer
  consumerQos: 1
  deafultTopic : test

到此这篇关于Spring boot 集成 MQTT详情的文章就介绍到这了,更多相关Spring boot 集成 MQTT内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springboot集成mqtt的实践开发

    序 MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景.这里简单介绍一下如何在springboot中集成. maven <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</arti

  • SpringBoot2.0集成MQTT消息推送功能实现

    这几天在弄后端管理系统向指定的Android客户端推送消息的功能模块,查阅了网上很多博客介绍的许多方式,最终选择基于MQTT协议来实现,MQTT是一个轻量级的消息发布/订阅协议,它是实现基于手机客户端的消息推送服务器的理想解决方案. 实现MQTT协议的中间件有很多,我用的是Apollo服务器,如何搭建MQTT服务器,请查阅其他资料.这里,主要介绍SpringBoot2.0集成MQTT实现消息推送的功能.好,正式开始: 本文采用Gateway绑定的方式,网上也有介绍但不全面,还有其他采用Paho

  • SpringBoot集成mqtt的多模块项目配置详解

    前言 近期为了准备毕设,准备使用SpringBoot搭建mqtt后端,本篇主要记录了在IDEA中搭建SpringBoot mqtt的多模块项目的过程 开发工具及系统环境 IDE:IntelliJ IDEA 2020.2 操作系统:Windows 10 2004 Java Version:1.8 SpringBoot Version:2.1.17.RELEASE 项目路径 Study |----study-common # 存放公共类 |----study-mapper # mapper层 |--

  • Spring boot 集成 MQTT详情

    目录 一.简介 二.主要特性 三.集成步骤 1.引入相关jar包 2.核心配置类 3.网关配置 4.编写测试类 5.yml配置信息 一.简介 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,可以以极少的代码和有限的带宽为连接远程设备提供实时可靠的消息服务.目前在物联网.小型设备.移动应用等方面有较广泛的应用. 二.主要特性 (1)使用发布/订阅消

  • Spring Boot集成Swagger2项目实战

    一.Swagger简介 上一篇文章中我们介绍了Spring Boot对Restful的支持,这篇文章我们继续讨论这个话题,不过,我们这里不再讨论Restful API如何实现,而是讨论Restful API文档的维护问题. 在日常的工作中,我们往往需要给前端(WEB端.IOS.Android)或者第三方提供接口,这个时候我们就需要给他们提供一份详细的API说明文档.但维护一份详细的文档可不是一件简单的事情.首先,编写一份详细的文档本身就是一件很费时费力的事情,另一方面,由于代码和文档是分离的,所

  • spring boot集成p6spy的最佳实践

    目录 前言 p6spy-spring-boot-starter快速集成 第一步:导入依赖 第二步:配置application.properties 配置智能提示 兼容原生所有配置项 前言 P6Spy是一个框架,它可以无缝地拦截和记录数据库活动,而无需更改现有应用程序的代码.一般我们使用的比较多的是使用p6spy打印我们最后执行的sql语句.常用的数据框架也会自带打印sql的功能,比如jpa,mybatis等,但是一般都会有缺陷,比如打印的sql是不带执行参数拼接的sql,这种sql不完整,不具有

  • Spring Boot集成MyBatis访问数据库的方法

    基于spring boot开发的微服务应用,与MyBatis如何集成? 集成方法 可行的方法有: 1.基于XML或者Java Config,构建必需的对象,配置MyBatis. 2.使用MyBatis官方提供的组件,实现MyBatis的集成. 方法一 建议参考如下文章,完成集成的验证. MyBatis学习 之 一.MyBatis简介与配置MyBatis+Spring+MySql 基于Spring + Spring MVC + Mybatis 高性能web构建 spring与mybatis三种整合

  • Spring Boot 集成Mybatis实现主从(多数据源)分离方案示例

    本文将介绍使用Spring Boot集成Mybatis并实现主从库分离的实现(同样适用于多数据源).延续之前的Spring Boot 集成MyBatis.项目还将集成分页插件PageHelper.通用Mapper以及Druid. 新建一个Maven项目,最终项目结构如下: 多数据源注入到sqlSessionFactory POM增加如下依赖: <!--JSON--> <dependency> <groupId>com.fasterxml.jackson.core<

  • Spring Boot集成Redis实现缓存机制(从零开始学Spring Boot)

    本文章牵涉到的技术点比较多:spring Data JPA.Redis.Spring MVC,Spirng Cache,所以在看这篇文章的时候,需要对以上这些技术点有一定的了解或者也可以先看看这篇文章,针对文章中实际的技术点在进一步了解(注意,您需要自己下载Redis Server到您的本地,所以确保您本地的Redis可用,这里还使用了MySQL数据库,当然你也可以内存数据库进行测试).这篇文章会提供对应的Eclipse代码示例,具体大体的分如下几个步骤: (1)新建Java Maven Pro

  • 详解Spring Boot集成MyBatis(注解方式)

    MyBatis是支持定制化SQL.存储过程以及高级映射的优秀的持久层框架,避免了几乎所有的JDBC代码和手动设置参数以及获取结果集.spring Boot是能支持快速创建Spring应用的Java框架.本文通过一个例子来学习Spring Boot如何集成MyBatis,而且过程中不需要XML配置. 创建数据库 本文的例子使用MySQL数据库,首先创建一个用户表,执行sql语句如下: CREATE TABLE IF NOT EXISTS user ( `id` INT(10) NOT NULL A

  • 详解spring Boot 集成 Thymeleaf模板引擎实例

    今天学习了spring boot 集成Thymeleaf模板引擎.发现Thymeleaf功能确实很强大.记录于此,供自己以后使用. Thymeleaf: Thymeleaf是一个java类库,他是一个xml/xhtml/html5的模板引擎,可以作为mvc的web应用的view层. Thymeleaf还提供了额外的模块与Spring MVC集成,所以我们可以使用Thymeleaf完全替代jsp. spring Boot 通过org.springframework.boot.autoconfigu

  • Spring Boot 集成MyBatis 教程详解

    Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程.该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置.通过这种方式,Spring Boot致力于在蓬勃发展的快速应用开发领域(rapid application development)成为领导者. 在集成MyBatis前,我们先配置一个druid数据源. Spring Boot 系列 1.Spring Boot 入门 2.Spring Boot 属性配置

  • spring boot集成rabbitmq的实例教程

    一.RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache). 消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下: 从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息

随机推荐