Springboot整合mqtt服务的示例代码

首先在pom文件里引入mqtt的依赖配置

        <!--mqtt-->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.4</version>
        </dependency>

其次在springboot 的配置yml文件,配置mqtt的服务配置

spring:
  mqtt:
    url: tcp://127.0.0.1:1883
    client-id: niubility-tiger
    username:
    password:
    topic: [/unify/test]

创建 MqttProperties配置参数类

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Data
@ConfigurationProperties("spring.mqtt")
public class MqttProperties {
    private String url;
    private String clientId;
    private String username;
    private String password;
    private String[] topic;
}

创建 MqttConfiguration 配置类

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.listener.MqttSubscribeListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableConfigurationProperties({MqttProperties.class})
public class MqttConfiguration {
    private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class);
    @Autowired
    private MqttProperties mqttProperties;

    public MqttConfiguration() {
    }

    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        connectOptions.setServerURIs(new String[]{this.mqttProperties.getUrl()});
        if (Func.isNotBlank(this.mqttProperties.getUrl())) {
            connectOptions.setUserName(this.mqttProperties.getUsername());
        }

        if (Func.isNotBlank(this.mqttProperties.getPassword())) {
            connectOptions.setPassword(this.mqttProperties.getPassword().toCharArray());
        }

        connectOptions.setKeepAliveInterval(60);
        return connectOptions;
    }

    @Bean
    public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException {
        IMqttClient mqttClient = new MqttClient(this.mqttProperties.getUrl(), this.mqttProperties.getClientId());
        mqttClient.connect(options);
        for(int i = 0; i< this.mqttProperties.getTopic().length; ++i) {
            mqttClient.subscribe(this.mqttProperties.getTopic()[i], new MqttSubscribeListener());
        }
        return mqttClient;
    }
}

创建 订阅事件类

import org.springframework.context.ApplicationEvent;

public class UWBMqttSubscribeEvent extends ApplicationEvent {
    private String topic;

    public UWBMqttSubscribeEvent(String topic, Object source) {
        super(source);
        this.topic = topic;
    }

    public String getTopic() {
        return this.topic;
    }
}

创建订阅事件监听器

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.ubw.event.UWBMqttSubscribeEvent;

public class MqttSubscribeListener implements IMqttMessageListener {

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) {
        String content = new String(mqttMessage.getPayload());
        UWBMqttSubscribeEvent event = new UWBMqttSubscribeEvent(s, content);
        SpringUtil.publishEvent(event);
    }
}

创建mqtt消息事件异步处理监听器

import com.baomidou.mybatisplus.core.toolkit.StringPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.config.MqttProperties;
import org.springblade.ubw.event.UWBMqttSubscribeEvent;
import org.springblade.ubw.service.MqttService;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;

import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;

@Configuration
public class MqttEventListener {

    private static final Logger log = LoggerFactory.getLogger(MqttEventListener.class);

    @Resource
    private MqttProperties mqttProperties;

    @Resource
    private MqttService mqttService;

    private String processTopic (String topic) {
        List<String> topics = Arrays.asList(mqttProperties.getTopic());
        for (String wild : topics) {
            wild = wild.replace(StringPool.HASH, StringPool.EMPTY);
            if (topic.startsWith(wild)) {
                return topic.replace(wild, StringPool.EMPTY);
            }
        }
        return StringPool.EMPTY;
    }

    @Async
    @EventListener(UWBMqttSubscribeEvent.class)
    public void listen (UWBMqttSubscribeEvent event) {
        String topic = processTopic(event.getTopic());
        Object source = event.getSource();
        if (Func.isEmpty(source)) {
            return;
        }
        mqttService.issue(topic,source);
//        log.info("mqtt接收到 通道 {} 的信息为:{}",topic,source);
    }
}

创建MqttService 数据处理服务类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.area.entity.WorkArea;
import org.springblade.ubw.area.entity.WorkSite;
import org.springblade.ubw.area.entity.WorkSiteNeighbourInfo;
import org.springblade.ubw.area.entity.WorkSitePassInfo;
import org.springblade.ubw.area.service.WorkAreaService;
import org.springblade.ubw.area.service.WorkSiteNeighbourInfoService;
import org.springblade.ubw.area.service.WorkSitePassInfoService;
import org.springblade.ubw.area.service.WorkSiteService;
import org.springblade.ubw.constant.UbwConstant;
import org.springblade.ubw.history.entity.HistoryLocusInfo;
import org.springblade.ubw.history.entity.HistoryOverTimeSosAlarmInfo;
import org.springblade.ubw.history.service.HistoryLocusInfoService;
import org.springblade.ubw.history.service.HistoryOverTimeSosAlarmInfoService;
import org.springblade.ubw.loc.entity.LocStatusInfo;
import org.springblade.ubw.loc.entity.LocStatusInfoHistory;
import org.springblade.ubw.loc.service.LocStatusInfoHistoryService;
import org.springblade.ubw.loc.service.LocStatusInfoService;
import org.springblade.ubw.msg.entity.*;
import org.springblade.ubw.msg.service.*;
import org.springblade.ubw.system.entity.*;
import org.springblade.ubw.system.service.*;
import org.springblade.ubw.system.wrapper.MqttWrapper;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;

@Service
public class MqttService {

    private static final Logger log = LoggerFactory.getLogger(MqttService.class);

    @Resource
    private EmployeeAndDepartmentService employeeAndDepartmentService;

    @Resource
    private VehicleInfoService vehicleInfoService;

    @Resource
    private WorkSiteService workSiteService;

    @Resource
    private LocStatusInfoService locStatusInfoService;

    @Resource
    private LocStatusInfoHistoryService locStatusInfoHistoryService;

    @Resource
    private LocOverTimeSosAlarminfoService locOverTimeSosAlarminfoService;

    @Resource
    private LocAreaOverSosAlarminfoService locAreaOverSosAlarmInfoService;

    @Resource
    private LocSosAlarminfoService locSosAlarminfoService;

    @Resource
    private AttendanceInfoService attendanceInfoService;

    @Resource
    private HistoryLocusInfoService historyLocusInfoService;

    @Resource
    private WorkSitePassInfoService workSitePassInfoService;

    @Resource
    private EnvironmentalMonitorInfoService environmentalMonitorInfoService;

    @Resource
    private TrAlertService trAlertService;

    @Resource
    private AddEvacuateInfoService addEvacuateInfoService;

    @Resource
    private CancelEvacuateInfoService cancelEvacuateInfoService;

    @Resource
    private WorkSiteNeighbourInfoService workSiteNeighbourInfoService;

    @Resource
    private LinkMsgAlarmInfoService linkMsgAlarmInfoService;

    @Resource
    private LeaderEmployeeInfoService leaderEmployeeInfoService;

    @Resource
    private ElectricMsgInfoService electricMsgInfoService;

    @Resource
    private WorkAreaService workAreaService;

    @Resource
    private HistoryOverTimeSosAlarmInfoService historyOverTimeSosAlarmInfoService;

    @Resource
    private SpecialWorksService specialWorksService;

    @Resource
    private AttendanceLocusInfoService attendanceLocusInfoService;

    @Resource
    private WorkTypeService workTypeService;

    @Resource
    private OfficePositionService officePositionService;

    @Resource
    private ClassTeamService classTeamService;

    /**
     * 方法描述: 消息分发
     *
     * @param topic
     * @param source
     * @author liwenbin
     * @date 2021年12月14日 14:14:09
     */
    public void issue(String topic,Object source){
        switch(topic){
            case UbwConstant.TOPIC_EMP :
                //人员和部门信息
                employeeAndDepartmentService.saveBatch(source);
                break;
            case UbwConstant.TOPIC_VEHICLE :
                //车辆信息
                List<VehicleInfo> vehicleInfos = MqttWrapper.build().toEntityList(source,new VehicleInfo());
                vehicleInfoService.deleteAll();
                vehicleInfoService.saveBatch(vehicleInfos);
                break;
            case UbwConstant.TOPIC_WORK_SITE :
                //基站信息
                List<WorkSite> workSites = MqttWrapper.build().toEntityList(source,new WorkSite());
                workSiteService.deleteAll();
                workSiteService.saveBatch(workSites);
                break;
            case UbwConstant.TOPIC_LOC_STATUS:
                //井下车辆人员实时
                List<LocStatusInfo> locStatusInfos = MqttWrapper.build().toEntityList(source,new LocStatusInfo());
                if (Func.isEmpty(locStatusInfos)){
                    break;
                }
                locStatusInfoService.deleteAll();
                //筛选入井人员列表
                List<LocStatusInfo> inWellList = locStatusInfos.stream().filter(s -> s.getIsInWell() == 1).collect(Collectors.toList());
                locStatusInfoService.saveBatch(inWellList);
                //人员历史数据入库
                List<LocStatusInfoHistory> locStatusInfoHistorys = MqttWrapper.build().toEntityList(source,new LocStatusInfoHistory());
                locStatusInfoHistoryService.saveBatch(locStatusInfoHistorys);
                break;
            case UbwConstant.TOPIC_LOC_OVER_TIME:
                //超时报警信息
                List<LocOverTimeSosAlarminfo> locOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocOverTimeSosAlarminfo());
                locOverTimeSosAlarminfoService.saveBatch(locOverTimeSosAlarmInfos);
                break;
            case UbwConstant.TOPIC_LOC_OVER_AREA:
                //超员报警信息
                List<LocAreaOverSosAlarminfo> locAreaOverSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocAreaOverSosAlarminfo());
                locAreaOverSosAlarmInfoService.saveBatch(locAreaOverSosAlarmInfos);
                break;
            case UbwConstant.TOPIC_LOC_SOS:
                //求救报警信息
                List<LocSosAlarminfo> locSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocSosAlarminfo());
                locSosAlarminfoService.saveBatch(locSosAlarmInfos);
                break;
            case UbwConstant.TOPIC_ATTEND:
                //考勤信息
                List<AttendanceInfo> attendanceInfos = MqttWrapper.build().toEntityList(source,new AttendanceInfo());
                attendanceInfoService.saveBatch(attendanceInfos);
                break;
            case UbwConstant.TOPIC_HISTORY_LOCUS:
                //精确轨迹信息
                List<HistoryLocusInfo> historyLocusInfos = MqttWrapper.build().toEntityList(source,new HistoryLocusInfo());
                historyLocusInfoService.saveBatch(historyLocusInfos);
                break;
            case UbwConstant.TOPIC_WORK_SITE_PASS:
                //基站经过信息
                List<WorkSitePassInfo> workSitePassInfos = MqttWrapper.build().toEntityList(source,new WorkSitePassInfo());
                workSitePassInfoService.saveBatch(workSitePassInfos);
                break;
            case UbwConstant.TOPIC_ENV_MON:
                //环境监测信息
                List<EnvironmentalMonitorInfo> environmentalMonitorInfos = MqttWrapper.build().toEntityList(source,new EnvironmentalMonitorInfo());
                environmentalMonitorInfoService.saveBatch(environmentalMonitorInfos);
                break;
            case UbwConstant.TOPIC_TR_ALERT:
                //环境监测报警信息
                List<TrAlert> trAlerts = MqttWrapper.build().toEntityList(source,new TrAlert());
                trAlertService.saveBatch(trAlerts);
                break;
            case UbwConstant.TOPIC_ADD_EVA:
                //下发撤离信息
                List<AddEvacuateInfo> addEvacuateInfos = MqttWrapper.build().toEntityList(source,new AddEvacuateInfo());
                addEvacuateInfoService.saveBatch(addEvacuateInfos);
                break;
            case UbwConstant.TOPIC_CANCEL_EVA:
                //取消撤离信息
                List<CancelEvacuateInfo> cancelEvacuateInfos = MqttWrapper.build().toEntityList(source,new CancelEvacuateInfo());
                cancelEvacuateInfoService.saveBatch(cancelEvacuateInfos);
                break;
            case UbwConstant.TOPIC_WORK_SITE_NEI:
                //相邻基站关系信息
                workSiteNeighbourInfoService.deleteAll();
                List<WorkSiteNeighbourInfo> workSiteNeighbourInfos = MqttWrapper.build().toEntityList(source,new WorkSiteNeighbourInfo());
                workSiteNeighbourInfoService.saveBatch(workSiteNeighbourInfos);
                break;
            case UbwConstant.TOPIC_LINK_MSG:
                //基站链路信息
                linkMsgAlarmInfoService.deleteAll();
                List<LinkMsgAlarmInfo> linkMsgAlarmInfos = MqttWrapper.build().toEntityList(source,new LinkMsgAlarmInfo());
                linkMsgAlarmInfoService.saveBatch(linkMsgAlarmInfos);
                break;
            case UbwConstant.TOPIC_LEADER_EMP:
                //带班领导信息
                leaderEmployeeInfoService.deleteAll();
                List<LeaderEmployeeInfo> leaderEmployeeInfos = MqttWrapper.build().toEntityList(source,new LeaderEmployeeInfo());
                leaderEmployeeInfoService.saveBatch(leaderEmployeeInfos);
                break;
            case UbwConstant.TOPIC_ELE_MSG:
                //低电报警信息
                List<ElectricMsgInfo> electricMsgInfos = MqttWrapper.build().toEntityList(source,new ElectricMsgInfo());
                electricMsgInfoService.saveBatch(electricMsgInfos);
                break;
            case UbwConstant.TOPIC_WORK_AREA:
                //区域信息
                workAreaService.deleteAll();
                List<WorkArea> workAreas = MqttWrapper.build().toEntityList(source,new WorkArea());
                workAreaService.saveBatch(workAreas);
                break;
            case UbwConstant.TOPIC_HIS_OVER_TIME_SOS:
                //历史超时报警信息
                List<HistoryOverTimeSosAlarmInfo> historyOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new HistoryOverTimeSosAlarmInfo());
                historyOverTimeSosAlarmInfoService.saveBatch(historyOverTimeSosAlarmInfos);
                break;
            case UbwConstant.TOPIC_SPECIAL_WORK:
                //特种人员预设线路信息
                specialWorksService.deleteAll();
                List<SpecialWorks> specialWorks = MqttWrapper.build().toEntityList(source,new SpecialWorks());
                specialWorksService.saveBatch(specialWorks);
                break;
            case UbwConstant.TOPIC_ATTEND_LOC:
                //历史考勤轨迹信息
                List<AttendanceLocusInfo> attendanceLocusInfos = MqttWrapper.build().toEntityList(source,new AttendanceLocusInfo());
                attendanceLocusInfoService.saveBatch(attendanceLocusInfos);
                break;
            case UbwConstant.TOPIC_WORK_TYPE:
                //工种信息
                workTypeService.deleteAll();
                List<WorkType> workTypes = MqttWrapper.build().toEntityList(source,new WorkType());
                workTypeService.saveBatch(workTypes);
                break;
            case UbwConstant.TOPIC_OFFICE_POS:
                //职务信息
                officePositionService.deleteAll();
                List<OfficePosition> officePositions = MqttWrapper.build().toEntityList(source,new OfficePosition());
                officePositionService.saveBatch(officePositions);
                break;
            case UbwConstant.TOPIC_CLASS_TEAM:
                //班组信息
                classTeamService.deleteAll();
                List<ClassTeam> classTeams = MqttWrapper.build().toEntityList(source,new ClassTeam());
                classTeamService.saveBatch(classTeams);
                break;
            default : //可选
                break;
        }
    }
}

完结,小伙伴们,可以根据这个demo 改造自己的mqtt服务处理!!!

以上就是Springboot整合mqtt服务的示例代码的详细内容,更多关于Springboot整合mqtt的资料请关注我们其它相关文章!

(0)

相关推荐

  • springboot集成mqtt的实践开发

    序 MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景.这里简单介绍一下如何在springboot中集成. maven <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</arti

  • SpringBoot整合MQTT并实现异步线程调用的问题

    目录 为什么选择MQTT 使用背景 代码实现 基础代码 异步线程处理实现 为什么选择MQTT MQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用最简单直观的方式让每一位刚接触的同行们可以最快的应用起来 先从使用MQTT需要什么开始分析: 消息服务器 不同应用/设备之间的频繁交互 可能涉及一对多的消息传递 基于SpringBoot通过注解实现对mqtt消息处理的异步调用 使用背景 生产环境下, 由于mqtt 生产者生产的消息逐渐增多, 可能会导致消息堆积. 因此需要消

  • springboot整合netty-mqtt-client实现Mqtt消息的订阅和发布示例

    目录 1.添加依赖 2.源码 3.运行测试 1.添加依赖 <dependency> <groupId>org.jetlinks</groupId> <artifactId>netty-mqtt-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>junit</groupId

  • SpringBoot整合MQTT小结汇总

    目录 前言: 一.什么是mqtt 二.主要思想 发布/订阅模式 三.MQTT重要概念 3.1 MQTT Client 3.2 MQTT Broker 3.3 MQTT Connection 3.4 MQTT主要参数 四.软件和Apollo 4.1 安装Apollo 4.2 安装Postman 4.3 安装MQTTBox 五.代码实现 5.1 配置pom.xml 5.2 配置MQTT服务器基本信息 5.3 配置读取yml文件的类MqttConfiguration 5.4  MQTT生产端的Hand

  • springboot 实现mqtt物联网的示例代码

    Springboot整合mybatisPlus+mysql+druid+swaggerUI+ mqtt 整合mqtt整合druid整合mybatis-plus完整pom完整yml整合swaggerUi整合log4j MQTT 物联网系统基本架构本物联网系列 mqtt) 整合mqtt <!--mqtt依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>

  • Springboot整合mqtt服务的示例代码

    首先在pom文件里引入mqtt的依赖配置 <!--mqtt--> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.4</version> </dependency> 其次在springboot 的配置yml文件,配

  • springboot整合mongodb changestream的示例代码

    目录 前言 ChangeStream介绍 环境准备 Java客户端操作changestream 1.引入maven依赖 2.测试类核心代码 下面来看看具体的整合步骤 1.引入核心依赖 2.核心配置文件 3.编写实体类,映射comment集合中的字段 4.编写一个服务类 5.编写一个接口 6.接下来,只需要依次添加下面3个配置类即可 典型应用场景 数据迁移 应用监控 对接大数据应用 前言 changestream是monggodb的3.6版本之后出现的一种基于collection(数据库集合)的变

  • Redis和springboot 整合redisUtil类的示例代码

    一.引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 二.在application.yml 配置redis服务器 spring: # 环境 dev|test|prod profiles: active: dev servle

  • SpringBoot整合Redis管道的示例代码

    目录 1. Redis 之管道(pipeline) 2. SpringBoot 整合 Redis 管道实例 1. Redis 之管道(pipeline) 执行一个Redis命令,Redis客户端和Redis服务器就需要执行以下步骤: 客户端发送命令到服务器: 服务器接受命令请求,执行命令,产生相应的结果: 服务器返回结果给客户端: 客户端接受命令的执行结果,并向用户展示. Redis命令所消耗的大部分时间都用在了发送命令请求和接收命令结果上面,把任意多条Redis命令请求打包在一起,然后一次性地

  • SpringBoot整合WebService服务的实现代码

    目录 为什么使用WebService? 适用场景: 不适用场景: Axis2与CXF的区别 SpringBoot使用CXF集成WebService WebService是一个SOA(面向服务的编程)的架构,它是不依赖于语言,不依赖于平台,可以实现不同的语言间的相互调用,通过Internet进行基于Http协议的网络应用间的交互. 其实WebService并不是什么神秘的东西,它就是一个可以远程调用的类,或者说是组件,把你本地的功能开放出去共别人调用. 为什么使用WebService? 简单解释一

  • springboot整合kaptcha验证码的示例代码

    前言: 关于kaptcha简介以及spring整合kaptcha,我在另一篇文章中已详细讲解,请参考:spring整合kaptcha验证码. 本文将介绍springboot整合kaptcha的两种方式. 开发工具及技术: 1.idea 2017 2.springboot 2.0.2 3.kaptcha 正式开始: 方式一:通过kaptcha.xml配置 1.用idea新建一个spring Initializr 2.添加kaptcha的依赖: <!-- kaptcha验证码 --> <de

  • SpringBoot整合minio快速入门教程(代码示例)

    分享一个快速使用springboot整合minio实现文件上传和下载的示例.前提是已经安装并运行minio服务,参考 minio快速入门文档 首先添加Minio的依赖 <dependency> <groupId>io.minio</groupId> <artifactId>minio</artifactId> <version>3.0.10</version> </dependency> 然后写一个contro

  • SpringBoot整合Hbase的实现示例

    简介 当单表数据量过大的时候,关系性数据库会出现性能瓶颈,这时候我们就可以用NoSql,比如Hbase就是一个不错的解决方案.接下来是用Spring整合Hbase的实际案例,且在最后会给出整合中可能会出现的问题,以及解决方案.这里我是用本地Windows的IDEA,与局域网的伪分布Hbase集群做的连接,其中Hbase集群包括的组件有:Jdk1.8.Hadoop2.7.6.ZooKeeper3.4.10.Hbase2.0.1,因为这里只是开发环境,所以做一个伪分布的就好,之后部署的时候再按生产环

随机推荐