如何在Spring Boot中使用MQTT

为什么选择MQTT

MQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用最简单直观的方式让每一位刚接触的同行们可以最快的应用起来

先从使用MQTT需要什么开始分析:

  • 消息服务器
  • 不同应用/设备之间的频繁交互
  • 可能涉及一对多的消息传递

根据上面列举的这三点,我们大概可以了解到, MQTT最适合的场景是消息做为系统的重要组成部分,且参与着系统关键业务逻辑的情形

MQTT, 启动!

既然决定使用它,我们首先要研究的是如何让MQTT正常工作,毕竟它不是简单的在maven里加入个依赖就完事的

我们总共需要干如下两件事:

  • 下载EMQX消息服务器, 作为broker
  • 在maven中引入依赖
<dependency> 
    <groupId>org.springframework.integration</groupId> 
    <artifactId>spring-integration-mqtt</artifactId> 
    <version>5.3.2.RELEASE</version> 
</dependency>

完成上面两步后, 启动EMQX服务器, 正式进入我们的MQTT旅途

使用方式

在Spring Boot中使用MQTT的代码, 笔者总结了如下两种方式:

  • 使用spring-integration的消息通道概念
  • 使用传统的Client客户端概念

第一种会产生一定程度的心智负担,但在笔者成功搭配(抄袭+造轮子)自动注册后, 比后者要方便许多

在介绍具体代码之前, 我们先简单整理下使用中最常见的概念:

  • 主题: MQTT消息的主要传播途径, 我们向主题发布消息, 订阅主题, 从主题中读取消息并进行业务逻辑处理, 主题是消息的通道
  • 生产者: MQTT消息的发送者, 他们向主题发送消息
  • 消费者: MQTT消息的接收者, 他们订阅自己需要的主题, 并从中获取消息
  • broker: 消息转发器, 消息是通过它来承载的, EMQX就是我们的broker, 在使用中我们不用关心它的具体实现

其实, MQTT的使用流程就是: 生产者给主题发消息->broker进行消息的传递->订阅该主题的消费者拿到消息并进行相应的业务逻辑

Client模式

本模式和传统的数据库链接,Redis链接基本一致,有开发经验的小伙伴们可以很轻松的驾驭,我们需要考虑的就是如果创建对应的工厂,是单例模式,还是原型,亦或是造个池子呢?

我们使用单例模式来进行本次的介绍

创建工厂类

首先, 我们创造一个工厂(就不承认设计模式中毒)

public class MqttFactory {  

    private static MqttProperties configuration;  

    private static MqttClient client;  

    /**
    *   获取客户端实例
    *   单例模式, 存在则返回, 不存在则初始化
    */
    public static MqttClient getInstance() {   
        if (client == null) {     
            init();   
        }   
        return client; 
    }  

    /**
    *   初始化客户端
    */
    public static void init() {   
        try {     
            client = new MqttClient(configuration.getAddress(), "client-" + System.currentTimeMillis());     
            // MQTT配置对象
            MqttConnectOptions options = new MqttConnectOptions();     
            // 设置自动重连, 其它具体参数可以查看MqttConnectOptions
            options.setAutomaticReconnect(true);     
            if (!client.isConnected()) {       
            client.connect(options);     
            }   
        } catch (MqttException e) {     
            LOGGER.error(String.format("MQTT: 连接消息服务器[%s]失败", configuration.getAddress()));   
        } 
    }

}

关于MQTT的具体配置可以查看MqttConnectOptions, 在这里就不做说明了

多嘴一句, 文档永远比某些博客给力!!!

创建工具类

接下来, 我们创建MqttUtil, 用于消息的发送以及主题的订阅

public class MqttUtil {  

    /**
    *   发送消息
    *   @param topic 主题
    *   @param data 消息内容
    */
    public static void send(String topic, Object data) {   
        // 获取客户端实例
        MqttClient client = MqttFactory.getInstance();   
        ObjectMapper mapper = new ObjectMapper();   
        try {
            // 转换消息为json字符串
            String json = mapper.writeValueAsString(data);     
            client.publish(topic, new MqttMessage(json.getBytes(StandardCharsets.UTF_8)));   
        } catch (JsonProcessingException e) {     
            LOGGER.error(String.format("MQTT: 主题[%s]发送消息转换json失败", topic));   
        } catch (MqttException e) {     
            LOGGER.error(String.format("MQTT: 主题[%s]发送消息失败", topic));   
        } 
    }

    /**
    * 订阅主题
    * @param topic 主题
    * @param listener 消息监听处理器
    */
    public static void subscribe(String topic, IMqttMessageListener listener) { 
        MqttClient client = MqttFactory.getInstance(); 
        try {   
            client.subscribe(topic, listener); 
        } catch (MqttException e) {   
            LOGGER.error(String.format("MQTT: 订阅主题[%s]失败", topic)); 
        }
    }

}

相信小伙伴们注意到了IMqttMessageListener这个东西, 我们只需要创建一个监听类, 实现IMqttMessageListener接口, 就可以处理消息啦, 代码如下:

public class MessageListener implements IMqttMessageListener {  

    /**
    * 处理消息
    * @param topic 主题
    * @param mqttMessage 消息
    */
    @Override 
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {   
        LOGGER.info(String.format("MQTT: 订阅主题[%s]发来消息[%s]", topic, new String(mqttMessage.getPayload()))); 
    }

    public static void main(String[] args) { 
        //订阅主题test01, 使用MessageListener来处理它的消息
        MqttUtil.subscribe("test01", new MessageListener());
    }

}

无论是发送还是订阅,是不是都很好理解?

舒服的事情结束后, 带来的是无尽的折磨和空虚, 来吧, 让我们挑战下心智负担大的第二种模式!

Spring Integration

什么是Spring Integration?对不起,我不知道,我也不想知道

为什么使用Spring Integration?因为它真的很好维护

网上大部分教程都是针对Spring Integration的, 可能是我第一次接触, 千篇一律看的我莫名其妙, 所以我选择放弃了他们, 选择了大神的自动配置方式,并在其基础上,针对心智负担进行了相应的调整

还记得我们之前讨论过的概念吗?主题/生产者/消费者

在Spring Integration中,我们新加入一些概念, 并把之前的进行微调:

  • 通道: 消息传输和接受的管道, 每一条消息都是通过它钻进钻出
  • 客户端工厂: 用于创建MQTT客户端, 和模式一中的类似
  • 消息适配器: 用于接收MQTT消息, 进行转换, 但不参与业务逻辑
  • 入站通道: 搭配消息适配器, 消息进入站台的通道
  • 出站通道: 搭配客户端工厂, 消息发出站台的通道
  • 主题: 还是主题, 它不变
  • 生产者: 拥有出站通道的家伙
  • 消费者: 拥有入站通道的家伙

如果能渐渐理解上面定义的话, 这种模式的流程其实可以变成这样:

  • 生产者: 创建指定客户端工厂的出站通道->发送消息
  • 消费者: 创建指定消息适配器的入站通道->接收消息->进入消息拦截器->业务逻辑

其实在笔者看来, 这符合Spring Boot的理念, 约定优于配置

代码已挪入公司私服, 待后续个人私服配置好后再补充笔记

总结

MQTT作为消息服务, 能够满足我们大部分的开发需求, 但还有一些遗留问题笔者还没进行过深入思考和实践:

  • 如何利用qos机制保证数据不会丢失
  • 消息的队列和排序
  • 集群模式下的应用

以上就是如何在Spring Boot中使用MQTT的详细内容,更多关于在Spring Boot中使用MQTT的资料请关注我们其它相关文章!

(0)

相关推荐

  • SpringBoot2.0集成MQTT消息推送功能实现

    这几天在弄后端管理系统向指定的Android客户端推送消息的功能模块,查阅了网上很多博客介绍的许多方式,最终选择基于MQTT协议来实现,MQTT是一个轻量级的消息发布/订阅协议,它是实现基于手机客户端的消息推送服务器的理想解决方案. 实现MQTT协议的中间件有很多,我用的是Apollo服务器,如何搭建MQTT服务器,请查阅其他资料.这里,主要介绍SpringBoot2.0集成MQTT实现消息推送的功能.好,正式开始: 本文采用Gateway绑定的方式,网上也有介绍但不全面,还有其他采用Paho

  • springboot 实现mqtt物联网的示例代码

    Springboot整合mybatisPlus+mysql+druid+swaggerUI+ mqtt 整合mqtt整合druid整合mybatis-plus完整pom完整yml整合swaggerUi整合log4j MQTT 物联网系统基本架构本物联网系列 mqtt) 整合mqtt <!--mqtt依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>

  • SpringBoot集成mqtt的多模块项目配置详解

    前言 近期为了准备毕设,准备使用SpringBoot搭建mqtt后端,本篇主要记录了在IDEA中搭建SpringBoot mqtt的多模块项目的过程 开发工具及系统环境 IDE:IntelliJ IDEA 2020.2 操作系统:Windows 10 2004 Java Version:1.8 SpringBoot Version:2.1.17.RELEASE 项目路径 Study |----study-common # 存放公共类 |----study-mapper # mapper层 |--

  • springboot集成mqtt的实践开发

    序 MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景.这里简单介绍一下如何在springboot中集成. maven <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</arti

  • SpringBoot+MQTT+apollo实现订阅发布功能的示例

    由于最近公司在开发一款后台与安卓的更新系统,经过再三研究之后,也是选择Mqtt这个目前流行的框架.为了能够让项目运营起来,最终虽说是选择ActiveMQ.但在这个过程中,也是发现Apollo作为服务器也是相当不错.当然对于后者已经被apace放弃,不过今天还是和大家整理一下SpringBoot+MQTT+apollo实现订阅发布功能的全过程. 对于项目首先需要用到的前提东西,比如Apollo如何下载,以及MQTT测试工具在这里就不多说.如果真的不懂私聊Damon吧,在这里就不浪费时间. 对于项目

  • 如何在Spring Boot中使用MQTT

    为什么选择MQTT MQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用最简单直观的方式让每一位刚接触的同行们可以最快的应用起来 先从使用MQTT需要什么开始分析: 消息服务器 不同应用/设备之间的频繁交互 可能涉及一对多的消息传递 根据上面列举的这三点,我们大概可以了解到, MQTT最适合的场景是消息做为系统的重要组成部分,且参与着系统关键业务逻辑的情形 MQTT, 启动! 既然决定使用它,我们首先要研究的是如何让MQTT正常工作,毕竟它不是简单的在maven里加入

  • 详解如何在spring boot中使用spring security防止CSRF攻击

    CSRF是什么? CSRF(Cross-site request forgery),中文名称:跨站请求伪造,也被称为:one click attack/session riding,缩写为:CSRF/XSRF.  CSRF可以做什么? 你这可以这么理解CSRF攻击:攻击者盗用了你的身份,以你的名义发送恶意请求.CSRF能够做的事情包括:以你名义发送邮件,发消息,盗取你的账号,甚至于购买商品,虚拟货币转账......造成的问题包括:个人隐私泄露以及财产安全. CSRF漏洞现状 CSRF这种攻击方式

  • 图解如何在Spring Boot中使用JSP页面

    一.创建webapp目录 在src/main下创建webapp目录,用于存放jsp文件.这就是一个普通的目录,无需执行Mark Directory As 二.创建jsp 1.指定web资源目录 在spring boot工程中若要创建jsp文件,一般是需要在src/main下创建webapp目录,然后在该目录下创建jsp文件.但通过Alt + Insert发现没有创建jsp文件的选项.此时,需要打开Project Structrue窗口,将webapp目录指定为web资源目录,然后才可以创建jsp

  • 如何在spring boot中进行参数校验示例详解

    上文我们讨论了spring-boot如何去获取前端传递过来的参数,那传递过来总不能直接使用,需要对这些参数进行校验,符合程序的要求才会进行下一步的处理,所以本篇文章我们主要讨论spring-boot中如何进行参数校验. lombok使用介绍 在介绍参数校验之前,先来了解一下lombok的使用,因为在接下来的实例中或有不少的对象创建,但是又不想写那么多的getter和setter,所以先介绍一下这个很强大的工具的使用. Lombok 是一个可以通过简单的注解形式来帮助我们简化消除一些必须有但显得很

  • 在Spring Boot中如何使用数据缓存

    在实际开发中,对于要反复读写的数据,最好的处理方式是将之在内存中缓存一份,频繁的数据库访问会造成程序效率低下,同时内存的读写速度本身就要强于硬盘.Spring在这一方面给我们提供了诸多的处理手段,而Spring Boot又将这些处理方式进一步简化,接下来我们就来看看如何在Spring Boot中解决数据缓存问题. 创建Project并添加数据库驱动 Spring Boot的创建方式还是和我们前文提到的创建方式一样,不同的是这里选择添加的依赖不同,这里我们添加Web.Cache和JPA依赖,如下图

  • Spring boot中mongodb的使用

    MongoDB是最早热门非关系数据库的之一,使用也比较普遍,一般会用做离线数据分析来使用,放到内网的居多.由于很多公司使用了云服务,服务器默认都开放了外网地址,导致前一阵子大批 MongoDB 因配置漏洞被攻击,数据被删,引起了人们的注意,感兴趣的可以看看这篇文章:场屠戮MongoDB的盛宴反思:超33000个数据库遭遇入侵勒索,同时也说明了很多公司生产中大量使用mongodb. mongodb简介 MongoDB(来自于英文单词"Humongous",中文含义为"庞大&qu

  • 详解在Spring Boot中使用Https

    本文介绍如何在Spring Boot中,使用Https提供服务,并将Http请求自动重定向到Https. Https证书 巧妇难为无米之炊,开始的开始,要先取得Https证书.你可以向证书机构申请证书,也可以自己制作根证书. 创建Web配置类 在代码中创建一个使用了Configuration注解的类,就像下面这段代码一样: @Configuration public class WebConfig { //Bean 定义... } 配置Https 在配置类中添加EmbeddedServletCo

  • 详解Spring Boot中使用@Scheduled创建定时任务

    我们在编写Spring Boot应用中经常会遇到这样的场景,比如:我需要定时地发送一些短信.邮件之类的操作,也可能会定时地检查和监控一些标志.参数等. 创建定时任务 在Spring Boot中编写定时任务是非常简单的事,下面通过实例介绍如何在Spring Boot中创建定时任务,实现每过5秒输出一下当前时间. 在Spring Boot的主类中加入@EnableScheduling注解,启用定时任务的配置 @SpringBootApplication @EnableScheduling publi

  • 详解在Spring Boot中使用数据库事务

    我们在前面已经分别介绍了如何在spring Boot中使用JPA以及如何在Spring Boot中输出REST资源.那么关于数据库访问还有一个核心操作那就是事务的处理了,前面两篇博客小伙伴们已经见识到Spring Boot带给我们的巨大便利了,其实不用猜,我们也知道Spring Boot在数据库事务处理问题上也给我们带来惊喜,OK,废话不多说,就来看看如何在Spring Boot中使用事务吧. OK,那我们开始今天愉快的coding旅程吧! 创建Project并添加数据库依赖 这个没啥好说的,不

  • Spring Boot中优雅的获取yml文件工具类

    如何在spring boot中优雅的获取.yml文件工具类呢 代码如下: package com.common.base.utils.base; import com.common.base.generator.ResourceManager; import org.yaml.snakeyaml.Yaml; import java.io.InputStream; import java.util.HashMap; import java.util.Map; /** * yml文件工具类 */ p

随机推荐