SpringBoot集成MQTT示例详解

目录
  • 引言
  • MQTT
    • 特点
  • Apache-Apollo
    • 下载
    • 配置与启动
  • SpringBoot2的开发
    • 添加依赖
    • 自定义配置
    • 配置MQTT发布和订阅
    • 消息发布器
    • 发送消息
    • 入口类

引言

特别提醒: 文中提到的MQTT服务器Apache-Apollo,现在已经不维护。但是客户端的写法是通用的。目前我常用的是RabbitMQ加mqtt插件。

MQTT

MQTT(消息队列遥测传输)是ISO标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。国内很多企业都广泛使用MQTT作为Android手机客户端与服务器端推送消息的协议。

特点

MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

  • 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
  • 对负载内容屏蔽的消息传输;
  • 使用TCP/IP提供网络连接;
  • 有三种消息发布服务质量;

至多一次:消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。

至少一次:确保消息到达,但消息重复可能会发生。

只有一次:确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

  • 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
  • 使用Last Will和Testament特性通知有关各方客户端异常中断的机制。

Apache-Apollo

Apache Apollo是一个代理服务器,其是在ActiveMQ基础上发展而来的,可以支持STOMP, AMQP, MQTT, Openwire, SSL, WebSockets 等多种协议。

原理:服务器端创建一个唯一订阅号,发送者可以向这个订阅号中发东西,然后接受者(即订阅了这个订阅号的人)都会收到这个订阅号发出来的消息。以此来完成消息的推送。服务器其实是一个消息中转站。

下载

下载地址:http://archive.apache.org/dist/activemq/activemq-apollo/

配置与启动

  • 需要安装JDK环境
  • 在命令行模式下进入bin,执行apollo create mybroker d:\apache-apollo\broker,创建一个名为mybroker虚拟主机(Virtual Host)。需要特别注意的是,生成的目录就是以后真正启动程序的位置。
  • 在命令行模式下进入d:\apache-apollo\broker\bin,执行apollo-broker run,也可以用apollo-broker-service.exe配置服务。
  • 访问http://127.0.0.1:61680打开web管理界面。(密码查看broker/etc/users.properties)
  • 启动端口,看cmd输出。

SpringBoot2的开发

添加依赖

<!--
  spring-boot版本 2.1.0.RELEASE
-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>

自定义配置

# src/main/resources/config/mqtt.properties
##################
#  MQTT 配置
##################
# 用户名
mqtt.username=admin
# 密码
mqtt.password=password
# 推送信息的连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.1.61:61613
mqtt.url=tcp://127.0.0.1:61613
##################
#  MQTT 生产者
##################
# 连接服务器默认客户端ID
mqtt.producer.clientId=mqttProducer
# 默认的推送主题,实际可在调用接口时指定
mqtt.producer.defaultTopic=topic1
##################
#  MQTT 消费者
##################
# 连接服务器默认客户端ID
mqtt.consumer.clientId=mqttConsumer
# 默认的接收主题,可以订阅多个Topic,逗号分隔
mqtt.consumer.defaultTopic=topic1

配置MQTT发布和订阅

import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
 * MQTT配置,生产者
 *
 * @author BBF
 */
@Configuration
public class MqttConfig {
  private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class);
  private static final byte[] WILL_DATA;
  static {
    WILL_DATA = "offline".getBytes();
  }
  /**
   * 订阅的bean名称
   */
  public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
  /**
   * 发布的bean名称
   */
  public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
  @Value("${mqtt.username}")
  private String username;
  @Value("${mqtt.password}")
  private String password;
  @Value("${mqtt.url}")
  private String url;
  @Value("${mqtt.producer.clientId}")
  private String producerClientId;
  @Value("${mqtt.producer.defaultTopic}")
  private String producerDefaultTopic;
  @Value("${mqtt.consumer.clientId}")
  private String consumerClientId;
  @Value("${mqtt.consumer.defaultTopic}")
  private String consumerDefaultTopic;
  /**
   * MQTT连接器选项
   *
   * @return {@link org.eclipse.paho.client.mqttv3.MqttConnectOptions}
   */
  @Bean
  public MqttConnectOptions getMqttConnectOptions() {
    MqttConnectOptions options = new MqttConnectOptions();
    // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
    // 这里设置为true表示每次连接到服务器都以新的身份连接
    options.setCleanSession(true);
    // 设置连接的用户名
    options.setUserName(username);
    // 设置连接的密码
    options.setPassword(password.toCharArray());
    options.setServerURIs(StringUtils.split(url, ","));
    // 设置超时时间 单位为秒
    options.setConnectionTimeout(10);
    // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
    options.setKeepAliveInterval(20);
    // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
    options.setWill("willTopic", WILL_DATA, 2, false);
    return options;
  }
  /**
   * MQTT客户端
   *
   * @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
   */
  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(getMqttConnectOptions());
    return factory;
  }
  /**
   * MQTT信息通道(生产者)
   *
   * @return {@link org.springframework.messaging.MessageChannel}
   */
  @Bean(name = CHANNEL_NAME_OUT)
  public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
  }
  /**
   * MQTT消息处理器(生产者)
   *
   * @return {@link org.springframework.messaging.MessageHandler}
   */
  @Bean
  @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
        producerClientId,
        mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic(producerDefaultTopic);
    return messageHandler;
  }
  /**
   * MQTT消息订阅绑定(消费者)
   *
   * @return {@link org.springframework.integration.core.MessageProducer}
   */
  @Bean
  public MessageProducer inbound() {
    // 可以同时消费(订阅)多个Topic
    MqttPahoMessageDrivenChannelAdapter adapter =
        new MqttPahoMessageDrivenChannelAdapter(
            consumerClientId, mqttClientFactory(),
            StringUtils.split(consumerDefaultTopic, ","));
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    // 设置订阅通道
    adapter.setOutputChannel(mqttInboundChannel());
    return adapter;
  }
  /**
   * MQTT信息通道(消费者)
   *
   * @return {@link org.springframework.messaging.MessageChannel}
   */
  @Bean(name = CHANNEL_NAME_IN)
  public MessageChannel mqttInboundChannel() {
    return new DirectChannel();
  }
  /**
   * MQTT消息处理器(消费者)
   *
   * @return {@link org.springframework.messaging.MessageHandler}
   */
  @Bean
  @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
  public MessageHandler handler() {
    return new MessageHandler() {
      @Override
      public void handleMessage(Message<?> message) throws MessagingException {
        LOGGER.error("===================={}============", message.getPayload());
      }
    };
  }
}

消息发布器

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
 * MQTT生产者消息发送接口
 * <p>MessagingGateway要指定生产者的通道名称</p>
 * @author BBF
 */
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {
  /**
   * 发送信息到MQTT服务器
   *
   * @param data 发送的文本
   */
  void sendToMqtt(String data);
  /**
   * 发送信息到MQTT服务器
   *
   * @param topic 主题
   * @param payload 消息主体
   */
  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
      String payload);
  /**
   * 发送信息到MQTT服务器
   *
   * @param topic 主题
   * @param qos 对消息处理的几种机制。
 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。

   * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。

   * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
   * @param payload 消息主体
   */
  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
      @Header(MqttHeaders.QOS) int qos,
      String payload);
}

发送消息

/**
 * MQTT消息发送
 *
 * @author BBF
 */
@Controller
@RequestMapping(value = "/")
public class MqttController {
  /**
   * 注入发送MQTT的Bean
   */
  @Resource
  private IMqttSender iMqttSender;
  /**
   * 发送MQTT消息
   * @param message 消息内容
   * @return 返回
   */
  @ResponseBody
  @GetMapping(value = "/mqtt", produces ="text/html")
  public ResponseEntity<String> sendMqtt(@RequestParam(value = "msg") String message) {
    iMqttSender.sendToMqtt(message);
    return new ResponseEntity<>("OK", HttpStatus.OK);
  }
}

入口类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.PropertySource;
/**
 * SpringBoot 入口类
 *
 * @author BBF
 */
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class,
    HibernateJpaAutoConfiguration.class})
@PropertySource(encoding = "UTF-8", value = {"classpath:config/mqtt.properties"})
public class Application {
  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }
}

代码

https://gitee.com/bbfbbf/mqtt-test

以上就是SpringBoot集成MQTT示例详解的详细内容,更多关于SpringBoot集成MQTT的资料请关注我们其它相关文章!

(0)

相关推荐

  • Springboot整合mqtt服务的示例代码

    首先在pom文件里引入mqtt的依赖配置 <!--mqtt--> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.4</version> </dependency> 其次在springboot 的配置yml文件,配

  • SpringBoot集成mqtt的多模块项目配置详解

    前言 近期为了准备毕设,准备使用SpringBoot搭建mqtt后端,本篇主要记录了在IDEA中搭建SpringBoot mqtt的多模块项目的过程 开发工具及系统环境 IDE:IntelliJ IDEA 2020.2 操作系统:Windows 10 2004 Java Version:1.8 SpringBoot Version:2.1.17.RELEASE 项目路径 Study |----study-common # 存放公共类 |----study-mapper # mapper层 |--

  • SpringBoot整合MQTT并实现异步线程调用的问题

    目录 为什么选择MQTT 使用背景 代码实现 基础代码 异步线程处理实现 为什么选择MQTT MQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用最简单直观的方式让每一位刚接触的同行们可以最快的应用起来 先从使用MQTT需要什么开始分析: 消息服务器 不同应用/设备之间的频繁交互 可能涉及一对多的消息传递 基于SpringBoot通过注解实现对mqtt消息处理的异步调用 使用背景 生产环境下, 由于mqtt 生产者生产的消息逐渐增多, 可能会导致消息堆积. 因此需要消

  • springboot整合netty-mqtt-client实现Mqtt消息的订阅和发布示例

    目录 1.添加依赖 2.源码 3.运行测试 1.添加依赖 <dependency> <groupId>org.jetlinks</groupId> <artifactId>netty-mqtt-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>junit</groupId

  • SpringBoot2.0集成MQTT消息推送功能实现

    这几天在弄后端管理系统向指定的Android客户端推送消息的功能模块,查阅了网上很多博客介绍的许多方式,最终选择基于MQTT协议来实现,MQTT是一个轻量级的消息发布/订阅协议,它是实现基于手机客户端的消息推送服务器的理想解决方案. 实现MQTT协议的中间件有很多,我用的是Apollo服务器,如何搭建MQTT服务器,请查阅其他资料.这里,主要介绍SpringBoot2.0集成MQTT实现消息推送的功能.好,正式开始: 本文采用Gateway绑定的方式,网上也有介绍但不全面,还有其他采用Paho

  • SpringBoot集成MQTT示例详解

    目录 引言 MQTT 特点 Apache-Apollo 下载 配置与启动 SpringBoot2的开发 添加依赖 自定义配置 配置MQTT发布和订阅 消息发布器 发送消息 入口类 引言 特别提醒: 文中提到的MQTT服务器Apache-Apollo,现在已经不维护.但是客户端的写法是通用的.目前我常用的是RabbitMQ加mqtt插件. MQTT MQTT(消息队列遥测传输)是ISO标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议.它工作在 TCP/IP协议族上,是为硬件

  • Springboot - Fat Jar示例详解

    目录 导读 JAR 是什么 JAR简介 JAR结构 包结构 描述文件MANIFEST.MF FatJar有什么不同 什么是FatJar? SpringBoot FatJar解决方案 spring-boot-maven-plugin打包过程 打包结果 启动时的类加载原理 启动的整个流程 参考资料 导读 Spring Boot应用可以使用spring-boot-maven-plugin快速打包,构建一个可执行jar.Spring Boot内嵌容器,通过java -jar命令便可以直接启动应用. 虽然

  • springboot集成mybatisplus实例详解

    集成mybatisplus后,简单的CRUD就不用写了,如果没有特别的sql,就可以不用mapper的xml文件的. 目录 pom.xml文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-insta

  • springboot bootcdn使用示例详解

    应用:直接使用bootcdn提供的静态资源,不需要本地存储 bootcdn 官网:https://www.bootcdn.cn/ staticfile cdn官网: http://www.staticfile.org/ 常用静态资源 # layui.js https://cdn.bootcdn.net/ajax/libs/layui/2.6.8/layui.js https://cdn.bootcdn.net/ajax/libs/layui/2.6.8/layui.min.js # layui.

  • Springboot集成restTemplate过程详解

    一restTemplate简介 restTemplate底层是基于HttpURLConnection实现的restful风格的接口调用,类似于webservice,rpc远程调用,但其工作模式更加轻量级,方便于rest请求之间的调用,完成数据之间的交互,在springCloud之中也有一席之地.大致调用过程如下图 二restTemplate常用方法列表 forObeject跟forEntity有什么区别呢?主要的区别是forEntity的功能更加强大一些,其返回值是一个ResponseEntit

  • SpringBoot框架集成ElasticSearch实现过程示例详解

    目录 依赖 与SpringBoot集成 配置类 实体类 测试例子 RestHighLevelClient直接操作 索引操作 文档操作 检索操作 依赖 SpringBoot版本:2.4.2 <dependencies> <!--lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <opti

  • SpringBoot集成POI实现Excel导入导出的示例详解

    目录 知识准备 什么是POI POI中基础概念 实现案例 Pom依赖 导出Excel 导入Excel 示例源码 知识准备 需要了解POI工具,以及POI对Excel中的对象的封装对应关系. 什么是POI Apache POI 是用Java编写的免费开源的跨平台的 Java API,Apache POI提供API给Java程序对Microsoft Office格式档案读和写的功能.POI为“Poor Obfuscation Implementation”的首字母缩写,意为“简洁版的模糊实现”. A

  • SpringBoot集成本地缓存性能之王Caffeine示例详解

    目录 引言 Spring Cache 是什么 集成 Caffeine 核心原理 引言 使用缓存的目的就是提高性能,今天码哥带大家实践运用 spring-boot-starter-cache 抽象的缓存组件去集成本地缓存性能之王 Caffeine. 大家需要注意的是:in-memeory 缓存只适合在单体应用,不适合与分布式环境. 分布式环境的情况下需要将缓存修改同步到每个节点,需要一个同步机制保证每个节点缓存数据最终一致. Spring Cache 是什么 不使用 Spring Cache 抽象

  • springboot2.2 集成 activity6实现请假完整流程示例详解

    新手学习记录.写在springboot test 示例  示例代码地址看结尾.后面有带页面的示例. SpringBoot Test无页面简单示例 员工请假流程 员工发起申请,附带请假信息(请假几天) 单位领导审批,如果通过,交付经理审批,不通过,重新申请 经理审批,如果请假天数不超过三天,经理1审批 如果请假天数在3-5天,经理3审批 超过5天,经理2审批 经理审批通过,流程结束,经理审批不通过,员工重新申请 流程图 代码 activiti.cfg.xml为必须文件且数据库连接正确,否则Proc

随机推荐