Spring Boot MQTT Too many publishes in progress错误的解决方案

目录
  • 前言
  • 原因分析
  • 源码分析
    • MQTT的Push消息到缓存中时序图
      • MqttPahoMessageHandler的publish方法
      • MqttAsyncClient的publish方法
      • ClientComms的internalSend方法
      • ClientState的send方法
    • 异步发送消息时序图
    • ClientComms的conncect方法
    • ConnectBG的run方法
    • CommsSender的run方法
    • CommsSender的notifySent方法
    • 小结
  • 解决方案
    • 方案1:发送消息时设置为Qos=1
    • 方案2: 修改maxInflight的默认值,例如将其修改为50
    • 方案3:将消息配置为多客户端模式
    • 方案4:升级mqtt版本为1.2.1
  • 总结

前言

最近项目中需要与andorid端进行交互,采用了MQTT消息进行通信,生产环境中偶尔会出现Too many publishesin progress(32202)的错误,严重的影响了正常功能的使用。

项目中采用的是Spring Boot2.0集成的MQTT引入的版本为1.2.0,消息发送用的是MessagingGateway的方式实现,不熟悉的朋友可以查看这篇文章Spring boot 集成 MQTT详情

原因分析

出现此问题的原因跟MQTT的Qos的设置有关,所以需要简单的介绍下Qos相关值的含义

0 最多一次的传输

发布者发送消息到服务器,没有确认消息,也不知道对方是否收到。

1 至少一次的传输

发布者发布消息保存消息,服务器(broker)接收到消息,服务器(broker)发送消息到订阅者,服务器(broker)回一个PUBACK信息到发布者让删除消息,然后订阅者接收消息后PUBACK给服务器让删除消息。如果失败了,在一段时间确认信息没有收到,发送方都会将消息头的DUP设置为1,然后再次发送消息,消息最少一次到达服务。例如网络延迟等问题,发布者重复发送消息,订阅者多次订阅会产生重复消息.

2 只有一次的传输

Qos为2只是在1的基础上做了改进,在发布者发送到服务器之后多了消息的确认以及多了消息msgID的缓存,重复信息的去重。在服务器发送到订阅者之后也多了消息的确认。

项目中使用了MQTT发送消息的地方比较多,且一般都是以Qos为0,那么为什么Qos为0,在并发量比较大的情况下就会出现Too many publishesin progress(32202)的错误,报错的内容的源码如下:

当actualInFlight超出设置的maxInflight最大值时就会报此错误,那么具体是什么原因导致的呢?我们需要通过源码来分析此问题的原因。

源码分析

关于源码的阅读我们需要整理主线思路,MQTT发送消息主线分为消息push到缓存中和异步发送两部分。

MQTT的Push消息到缓存中时序图

MqttPahoMessageHandler的publish方法

说明: checkConection检查连接后,在发送消息。

MqttAsyncClient的publish方法

ClientComms的internalSend方法

ClientState的send方法

MqttPublish消息类型,继承了父类MqttWireMessage,而在MqttWireMessage的构造方法中将消息id设置为0

SaveToken的源码实现如下:

通过前面这几步的操作,消息已经放入到HashTable缓存中,准备异步发送。

异步发送消息时序图

说明:MqttAsyncClient的connect为客户端建立连接,兴趣的可以看下源码。

ClientComms的conncect方法

ConnectBG的run方法

CommsSender的run方法

1.从clientState中获取消息 2.通过消息id去hashtable中获取缓存消息 3.消息不为空,执行消息发送 4.调用notifySent方法删除消息,且actualInFlight执行递减操作。

CommsSender的notifySent方法

小结

在高并发的场景下,pendingMessage可能会添加多条数据,Qos设置为0的时候,存入tokens(Hashtable)中的key一直是0,当执行tokenStore.getToken在发送方法之后会remove所有数据,由于tokenStore中已经不存在值,因为已经被上一次已经全部remove了,当再次getToken的消息时获取会为空,不在发送信息,使得actualInFlight没有递减,所以才经过一段时间后actualInFlight就会超出设置最大值,从而报错。

//存放待发送消息的Vector数组
volatile private Vector pendingMessages;

解决方案

方案1:发送消息时设置为Qos=1

此方案虽然可以解决此问题,但存在如下的缺点:

  • 网络延迟时会发送重复消息问题,导致消费者重复消费,关于重复的消息解决需要进行相关的幂等性操作,增加了修改的复杂度和成本。
  • 发送消息需要进行消息确认,网络资源消耗过大。

方案2: 修改maxInflight的默认值,例如将其修改为50

```
  MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
  mqttConnectOptions.setMaxInflight(50);
```

此方案虽然修改比较简单,但是并没有从根本上解决问题,只是缓解了出现错误的时间,如果项目中并发量比较低,可以采用此方案解决。

方案3:将消息配置为多客户端模式

由于mqttMessageHandler只会引用一个paho客户端,所以才会想到增加客户端模式来提高并发量.需要重写MqttPahoMessageHandler类的相关方法。虽然可以解决此问题,如果对MQTT的源码不是很了解,不建议采用此方案,不利于后续的版本升级。

方案4:升级mqtt版本为1.2.1

在1.2.1的版本中官方已经进行了相关的修改,当qos=0已经不存入tokenStore了,每次发送完之后就会删除掉token以及释放id,所以就不会出现Too many publishes in progress的问题。

引入1.2.1的版本会带来https验证问题,因为在Mqtt的1.2.1版本中,增加了https的验证需要添加相关配置,否则启动时会报安全认证错误。

解决方案:如果项目中没有开启https认证,需要设置HttpsHostnameVerificationEnabled为false即可。

mqttConnectOptions.setHttpsHostnameVerificationEnabled(false);

总结

本文通过定位MQTT错误,详细的讲解了MQtt消息的发送流程,解决的方案虽然有多种,我们需要结合实际的业务情况来解决问题,关于MQtt如果还有其他疑问,可以随时反馈,大家共同学习,共同进步。

到此这篇关于Spring Boot MQTT Too many publishes in progress错误的解决方案的文章就介绍到这了,更多相关Spring Boot内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 如何在Spring Boot中使用MQTT

    为什么选择MQTT MQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用最简单直观的方式让每一位刚接触的同行们可以最快的应用起来 先从使用MQTT需要什么开始分析: 消息服务器 不同应用/设备之间的频繁交互 可能涉及一对多的消息传递 根据上面列举的这三点,我们大概可以了解到, MQTT最适合的场景是消息做为系统的重要组成部分,且参与着系统关键业务逻辑的情形 MQTT, 启动! 既然决定使用它,我们首先要研究的是如何让MQTT正常工作,毕竟它不是简单的在maven里加入

  • SpringBoot集成mqtt的多模块项目配置详解

    前言 近期为了准备毕设,准备使用SpringBoot搭建mqtt后端,本篇主要记录了在IDEA中搭建SpringBoot mqtt的多模块项目的过程 开发工具及系统环境 IDE:IntelliJ IDEA 2020.2 操作系统:Windows 10 2004 Java Version:1.8 SpringBoot Version:2.1.17.RELEASE 项目路径 Study |----study-common # 存放公共类 |----study-mapper # mapper层 |--

  • springboot整合netty-mqtt-client实现Mqtt消息的订阅和发布示例

    目录 1.添加依赖 2.源码 3.运行测试 1.添加依赖 <dependency> <groupId>org.jetlinks</groupId> <artifactId>netty-mqtt-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>junit</groupId

  • SpringBoot整合MQTT并实现异步线程调用的问题

    目录 为什么选择MQTT 使用背景 代码实现 基础代码 异步线程处理实现 为什么选择MQTT MQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用最简单直观的方式让每一位刚接触的同行们可以最快的应用起来 先从使用MQTT需要什么开始分析: 消息服务器 不同应用/设备之间的频繁交互 可能涉及一对多的消息传递 基于SpringBoot通过注解实现对mqtt消息处理的异步调用 使用背景 生产环境下, 由于mqtt 生产者生产的消息逐渐增多, 可能会导致消息堆积. 因此需要消

  • springboot集成mqtt的实践开发

    序 MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景.这里简单介绍一下如何在springboot中集成. maven <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</arti

  • SpringBoot+MQTT+apollo实现订阅发布功能的示例

    由于最近公司在开发一款后台与安卓的更新系统,经过再三研究之后,也是选择Mqtt这个目前流行的框架.为了能够让项目运营起来,最终虽说是选择ActiveMQ.但在这个过程中,也是发现Apollo作为服务器也是相当不错.当然对于后者已经被apace放弃,不过今天还是和大家整理一下SpringBoot+MQTT+apollo实现订阅发布功能的全过程. 对于项目首先需要用到的前提东西,比如Apollo如何下载,以及MQTT测试工具在这里就不多说.如果真的不懂私聊Damon吧,在这里就不浪费时间. 对于项目

  • springboot 实现mqtt物联网的示例代码

    Springboot整合mybatisPlus+mysql+druid+swaggerUI+ mqtt 整合mqtt整合druid整合mybatis-plus完整pom完整yml整合swaggerUi整合log4j MQTT 物联网系统基本架构本物联网系列 mqtt) 整合mqtt <!--mqtt依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>

  • SpringBoot2.0集成MQTT消息推送功能实现

    这几天在弄后端管理系统向指定的Android客户端推送消息的功能模块,查阅了网上很多博客介绍的许多方式,最终选择基于MQTT协议来实现,MQTT是一个轻量级的消息发布/订阅协议,它是实现基于手机客户端的消息推送服务器的理想解决方案. 实现MQTT协议的中间件有很多,我用的是Apollo服务器,如何搭建MQTT服务器,请查阅其他资料.这里,主要介绍SpringBoot2.0集成MQTT实现消息推送的功能.好,正式开始: 本文采用Gateway绑定的方式,网上也有介绍但不全面,还有其他采用Paho

  • Spring boot 集成 MQTT详情

    目录 一.简介 二.主要特性 三.集成步骤 1.引入相关jar包 2.核心配置类 3.网关配置 4.编写测试类 5.yml配置信息 一.简介 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,可以以极少的代码和有限的带宽为连接远程设备提供实时可靠的消息服务.目前在物联网.小型设备.移动应用等方面有较广泛的应用. 二.主要特性 (1)使用发布/订阅消

  • Springboot整合mqtt服务的示例代码

    首先在pom文件里引入mqtt的依赖配置 <!--mqtt--> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.4</version> </dependency> 其次在springboot 的配置yml文件,配

随机推荐