Android开发MQTT协议的模型及通信浅析

目录
  • 前言
  • 什么是MQTT协议
  • MQTT协议的模型
  • 开发MQTT通信
    • 1. 处理客户端和服务端
      • (1)服务端开发
      • (2)客户端开发
    • 2. 客户端开发
  • Paho的mqtt的BUG
  • 总结

前言

为什么要讲MQTT协议?因为现在越来越多的领域会使用到这个协议,无论是做M2M,还是做Iot,或是想实现推送功能,MQTT都是一个不错的选择。

什么是MQTT协议

MQTT协议又称为消息队列要测传输协议,他是一种基于发布/订阅范式的消息协议,并且它是一种基于TCP/IP协议族的应用层协议。

可以看出的它的特点:轻量、简单、基于发布/订阅范式、基于TCP/IP、是一种应用层协议。

如果还是不明白,我们可以简单拿它和我们常用的http协议做个比较。

HTTP协议 MQTT协议
基于TCP或UDP 基于TCP
基于 请求/响应 模型 基于 发布/订阅 模型
http1.x是传数据包 传输二进制数据

MQTT协议的模型

我们得知道它是一个怎样的模型才好去了解它的一个工作方式。比如说HTTP协议简单分为两个角色,一个Client代表客户端,一个Server代表服务端。

而MQTT简单来看分为3个角色,publisher表示发布者,subscriber表示订阅者,它们两个都是Client,所以任何一个Client客户端既能充当publisher,也能充当subscriber。还有一个角色是broker表示代理,它是Server服务端。可以看出MQTT也是基于C/S的通信架构,只不过分为3种角色。

如果理解了这个模型之后,你就会有个疑问,发布和订阅什么呢?这就需要引入一个新的东西叫主题topic(如果不理解主题这个概念的话也没关系,后面用代码就很容易理解主题是什么)

所以它的工作流程就是:

  • subscriber订阅者连接broker代理,并订阅主题topic
  • publisher发布者连接broker代理(当然如何订阅者和发布者是同一个Client的话就不需要重复连接),并发布消息到相应的主题
  • broker代理会把消息发给对应订阅的主题的subscriber订阅者

开发MQTT通信

1. 处理客户端和服务端

前面我们说了MQTT是继续C/S的结构,那我们就需要有一个客户端和一个服务端。

(1)服务端开发

很不幸我是开发前端的,后台的开发我并不熟悉,所以这里的演示中我选择用云服务EMQX,想尝试的朋友可以上这个网页去部署自己的云服务,流程很简单 cloud.emqx.com/ ,免费试用14天。

(2)客户端开发

因为我是做Android开发的,所以这里我用Android来举例子。正常来说可以在TCP的基础上开发,自己去封装,但我这只是浅谈,所以我用第三方框架进行演示,用Paho的mqtt

2. 客户端开发

先导入Paho的mqtt

dependencies {
    ......
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}

在manifest中注册Paho的MqttService

<application
    android:allowBackup="true"
    android:icon="@mipmap/ic_launcher"
    android:label="@string/app_name"
    android:roundIcon="@mipmap/ic_launcher_round"
    android:supportsRtl="true"
    android:theme="@style/Theme.MyApplication">
    <activity
        android:name=".MainActivity"
        android:exported="true">
        <intent-filter>
            <action android:name="android.intent.action.MAIN" />
            <category android:name="android.intent.category.LAUNCHER" />
        </intent-filter>
    </activity>
    <service android:name="org.eclipse.paho.android.service.MqttService"/>
    <service android:name=".MqttActionService"/>
</application>

我这边为了用一个项目来演示Mqtt通信,所有把MainActivity当成publisher发布者,把MqttActionService当成subscriber订阅者。

所以整体的流程是这样的,我们先开启MqttActionService,然后在MqttActionService中进行连接和订阅。再在MainActivity进行连接和发送消息。

先把Mqtt的Client给封装起来(我这里防止有些朋友看不懂Kotlin,我就用了Java,后面不重要的地方我直接用Kotlin,一般也比较容易看懂)。

public class MyMqttClient {
    private MqttAndroidClient mClient;
    private MqttConnectOptions mOptions;
    private OnMqttConnectListener mOnMqttConnectListener;
    private final String mClientId;
    private MqttCallbackExtended mExtended = new MqttCallbackExtended() {
        @Override
        public void connectComplete(boolean reconnect, String serverURI) {
            if (mOnMqttConnectListener != null){
                mOnMqttConnectListener.onConnectComplete(serverURI);
            }
        }
        @Override
        public void connectionLost(Throwable cause) {
            if (mOnMqttConnectListener != null){
                mOnMqttConnectListener.onConnectFailure(cause);
            }
        }
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
        }
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
        }
    };
    private IMqttActionListener mConnectAction = new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken asyncActionToken) {
        }
        @Override
        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
            if (mOnMqttConnectListener != null){
                mOnMqttConnectListener.onConnectFailure(exception);
            }
            exception.printStackTrace();
        }
    };
    private IMqttMessageListener messageListener = new IMqttMessageListener() {
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            if (mOnMqttConnectListener != null){
                mOnMqttConnectListener.onMessageArrived(topic, message);
            }
        }
    };
    public MyMqttClient(Context context){
        this(context, null);
    }
    public MyMqttClient(Context context, String clientId){
        if (!TextUtils.isEmpty(clientId)) {
            this.mClientId = clientId;
        }else {
            this.mClientId = MqttConfig.clientId;
        }
        init(context);
    }
    public void init(Context context){
        mClient = new MqttAndroidClient(context, MqttConfig.mqttUrl, mClientId);
        mClient.setCallback(mExtended);
        mOptions = new MqttConnectOptions();
        mOptions.setConnectionTimeout(4000);
        mOptions.setKeepAliveInterval(30);
        mOptions.setUserName(MqttConfig.username);
        mOptions.setPassword(MqttConfig.password.toCharArray());
    }
    public void setOnMqttConnectListener(OnMqttConnectListener onMqttConnectListener) {
        this.mOnMqttConnectListener = onMqttConnectListener;
    }
    /**
     *  连接
     */
    public void connect(){
        try {
            if (!mClient.isConnected()){
                mClient.connect(mOptions, null, mConnectAction);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    /**
     *  订阅
     */
    public void subscribeToTopic(String mTopic){
        this.subscribeToTopic(mTopic, 0);
    }
    public void subscribeToTopic(String mTopic, int qos){
        try {
            mClient.subscribe(mTopic, qos, null,null, messageListener);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    /**
     *  发送消息
     */
    public void sendMessage(String mTopic, byte[] data){
        try {
            MqttMessage message = new MqttMessage();
            message.setPayload(data);
            mClient.publish(mTopic, message);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    public void onDestroy(){
        try {
            mClient.disconnect();
            mExtended = null;
            mConnectAction = null;
            messageListener = null;
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    /**
     *  提供给外层的回调,更方便进行使用
     */
    public interface OnMqttConnectListener{
        void onConnectComplete(String serverURI);
        void onConnectFailure(Throwable e);
        void onMessageArrived(String topic, MqttMessage message);
    }
}

当中有些配置我直接抽出来

public interface MqttConfig {
    String mqttUrl = "tcp://r0c36017.cn-shenzhen.emqx.cloud:11005";
    String clientId = "deployment-r0c36017";
    String username = "yeshuaishizhenshuai";
    String password = "123456";
    String oneTopic = "kylin/topic/one";
}

可以讲一下这些参数

(1) mqttUrl: 连接代理的连接,可以看到我上面云服务那张截图里面的“连接地址”和“连接端口” (2) clientId: 客户端ID,无论是subscriber还是publisher都属于客户端,这个在上面说过,所以都有一个对应的ID标识他们是属于哪个客户端。我下面的Demo中MqttActionService用的ClienId是deployment-r0c36017,MainActivity用的ClienId是deployment-r0c36018,不同的,所以是两个客户端。 (3) username和password: 这两个参数都是一个标识,会和后台记录,如果你没有的话,那你就连不上代理,也就是连不上服务端。 (4) oneTopic: 就是主题,你订阅和发送消息都要对应是哪个主题。

然后subscriber连接并订阅主题

class MqttActionService : Service() {
    private var mqttClient : MyMqttClient ?= null
    override fun onCreate() {
        super.onCreate()
        mqttClient = MyMqttClient(this)
        mqttClient?.setOnMqttConnectListener(object : MyMqttClient.OnMqttConnectListener{
            override fun onConnectComplete(serverURI: String?) {
                mqttClient?.subscribeToTopic(MqttConfig.oneTopic)
            }
            override fun onConnectFailure(e: Throwable?) {
            }
            override fun onMessageArrived(topic: String?, message: MqttMessage?) {
                val h = Handler(Looper.getMainLooper())
                h.post {
                    Toast.makeText(this@MqttActionService.applicationContext, message.toString(), Toast.LENGTH_SHORT).show();
                }
            }
        })
    }
    override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
        val handler = Handler()
        handler.postDelayed({ mqttClient?.connect() }, 1000)
        return START_STICKY
    }
    override fun onBind(intent: Intent?): IBinder? {
        return null
    }
    override fun onDestroy() {
        super.onDestroy()
        mqttClient?.onDestroy()
    }
}

然后publisher连接并发送消息

class MainActivity : AppCompatActivity() {
    private var clinet : MyMqttClient ?= null
    private var isConnect = false
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        init()
        val btn : Button = findViewById(R.id.btn_connect)
        val send : Button = findViewById(R.id.btn_send)
        val open : Button = findViewById(R.id.open)
        open.setOnClickListener {
            val intent = Intent()
            intent.setClass(this, MqttActionService::class.java)
            startService(intent)
        }
        btn.setOnClickListener {
            clinet?.connect()
        }
        send.setOnClickListener {
            clinet?.sendMessage(MqttConfig.oneTopic, "你干嘛啊~哎呦~".toByteArray())
        }
    }
    private fun init(){
        clinet = MyMqttClient(this, "deployment-r0c36018")
        clinet?.setOnMqttConnectListener(object : MyMqttClient.OnMqttConnectListener{
            override fun onConnectComplete(serverURI: String?) {
                isConnect = true
            }
            override fun onConnectFailure(e: Throwable?) {
                e?.printStackTrace()
                isConnect = false
            }
            override fun onMessageArrived(topic: String?, message: MqttMessage?) {
            }
        })
    }
}

我这定了3个按钮,第一个按钮open会跳转Service然后subscriber连接并订阅主题,第二个按钮btn会连接代理,第三个按钮send发送消息。看MqttActionService的代码可以看出,我这里发送消息后,会弹出Toast。

Paho的mqtt的BUG

这库我也是第一次用,我们那用的都是自己撸的(这边肯定没法放上来),然后我用的时候发现一个问题。我想给Service去开一条进程去处理订阅的操作的,这样能更真实的去模拟,结果就在连接时出问题了

经检查,连接的context的进程要和org.eclipse.paho.android.service.MqttService的进程一致。我去看他源码是怎么回事。

发现它内部的Binder竟然做了强转,这里因为不是代理而会出现报错。如果使用这个库的话就小心点你要做的夸进程的操作。

总结

今天只是浅谈一些MQTT的一些原理和流程,其实还有更深的功能,比如Qos啊这些还没说,我觉得一次说太多可能会让第一次接触的人混乱。先简单的了解MQTT是什么,主要使用的场景,内部的原理大致是怎样的。当了解这些之后再去深入的看,会能够更好的去理解。

以上就是Android开发MQTT协议的模型及通信浅析的详细内容,更多关于Android MQTT协议模型通信的资料请关注我们其它相关文章!

(0)

相关推荐

  • Android端使用Modbus协议的简单方法

    目录 1.导入Modbus4Android的库 2.基于 TCP/IP 的 Modbus Master/客户端 2.1.初始化ModbusMaster 2.2.读写保持寄存器 2.3.读写线圈 2.4 回收Master 3.基于 TCP/IP 的 Modbus Salve/服务端 4.总结 点我了解Modbus协议 Modbus用于设备之间的通信,同样在正常App开发中用的也很少 1.导入Modbus4Android的库 gayhub地址:github.com/zgkxzx/Modb- 使用Ja

  • android利用websocket协议与服务器通信

    最近做一个项目,需求中需要服务器主动推送消息到客户端.这样的话一般的http连接就不能使用了.博主问了个朋友,向我推荐websocket协议,特此测试了一下,发现效果很好. android本身没有websocket的库,需要自己下载 ,下载地址. 客户端代码: 界面布局自己写,很简单的两个button package com.example.test; import com.example.test.R; import android.app.Activity; import android.o

  • Android开发之音视频协议介绍

    目录 什么是视频文件 什么是264 了解音视频协议有啥用? 两大电信联盟 ITU-T ISO ITU-T 视频编码发展历程 H.26X系列(由ITU[国际电传视讯联盟]主导) 其他音视频协议 Google(VP8/VP9) Microsoft (VC-1) 国产自主标准: AVS/AVS+/AVS2 什么是视频文件 一般是指以某种格式封装了音视频数据的文件 常见的音频格式:mp3.wma.avi.rm.rmvb.flv.mpg.mov.mkv等. 常见的视频格式:rmvb.rm.wmv.avi.

  • Android MQTT与WebSocket协议详细讲解

    目录 MQTT WebSocket 总结 MQTT MQTT是一个极其轻量级的发布/订阅消息传输协议,对于需要较小代码占用空间或网络带宽非常宝贵的远程连接非常有用 有如下特点: 开放消息协议,简单易实现: 发布订阅模式,一对多消息发布: 基于TCP/IP网络连接,提供有序,无损,双向连接: 1字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量: 消息QoS支持,可靠传输保证. 添加依赖 maven { url "https://repo.eclipse.org/conten

  • 详解Android登陆界面用户协议解决方案

    先上一张图来看要实现的东西 用户协议.png 一般来说每个app都有这个用户协议阅读相关的功能,之前做的都是一个协议,也都是单行的,完全没有复杂度,可以一个checkbox加上一个textview来搞定,那么像图上这种复杂的该怎们实现呢. 来看他有神们不同,有那些难点 1,选中框被文字包裹,单纯的checkbox和textview无法实现,因为选中框会在文字左方 2,协议文件有很多,不定项,文件是服务器返回的,而且每个文件中间都会有一个颜色不一样的点隔开 3,为每个文件都有点击事件,点击文件会产

  • Android开发MQTT协议的模型及通信浅析

    目录 前言 什么是MQTT协议 MQTT协议的模型 开发MQTT通信 1. 处理客户端和服务端 (1)服务端开发 (2)客户端开发 2. 客户端开发 Paho的mqtt的BUG 总结 前言 为什么要讲MQTT协议?因为现在越来越多的领域会使用到这个协议,无论是做M2M,还是做Iot,或是想实现推送功能,MQTT都是一个不错的选择. 什么是MQTT协议 MQTT协议又称为消息队列要测传输协议,他是一种基于发布/订阅范式的消息协议,并且它是一种基于TCP/IP协议族的应用层协议. 可以看出的它的特点

  • Android开发使用Messenger及Handler进行通信的方法示例

    本文实例讲述了Android开发使用Messenger及Handler进行通信的方法.分享给大家供大家参考,具体如下: 1. 客户端service public class MessageService extends Service { private static final int MSG_SET_VALUE = 0x110; private Handler mHandler = new Handler() { public void handleMessage(Message msgFr

  • Android使用http协议与服务器通信的实例

    网上介绍Android上http通信的文章很多,不过大部分只给出了实现代码的片段,一些注意事项和如何设计一个合理的类用来处理所有的http请求以及返回结果,一般都不会提及.因此,自己对此做了些总结,给出了我的一个解决方案. 首先,需要明确一下http通信流程,Android目前提供两种http通信方式,HttpURLConnection和HttpClient,HttpURLConnection多用于发送或接收流式数据,因此比较适合上传/下载文件,HttpClient相对来讲更大更全能,但是速度相

  • Android开发中的MVC设计模式浅析

    Android开发中的MVC设计模式的理解 1. Android系统中分层的理解: (1).在Android的软件开发工作中,应用程序的开发人员主要是应用Android Application Framework层封装好的Api进行快速开发. (2).在Android框架的四个层次中,下层为上层服务,上层需要下层的支持,上层需要调用下层的服务. (3).这种分层的方式带来极大的稳定性.灵活性和可扩展性,使得不同层的开发人员可以按照规范专心特定层的开发. (4). Android的官方建议应用程序

  • 计算机网络编程MQTT协议基础原理详解

    目录 什么是 MQTT 协议 MQTT 基础 发布 - 订阅模式 可拓展性 消息过滤 基于主题的过滤 基于内容的过滤 基于类型的过滤 MQTT 与消息队列的区别 MQTT 重要概念 MQTT client MQTT broker MQTT Connection 消息报文 CONNECT CONNACK 消息类型 发布 订阅 确认消息 退订 确认退订 聊聊 Topic 通配符 单级通配符 多级通配符 之前有位读者给我留言说想要了解一下什么是 MQTT 协议,顺便还把我夸了一把,有点不好意思啦. 那

  • Android开发组件化架构设计原理到实战

    目录 为什么需要组件化 组件化和模块化 模块化架构 组件化架构 组件化带来的优势 组件化需解决的问题 资源冲突解决 AndroidManifest 独立调试 单工程方案 多工程方案 页面跳转 Arouter 实现组件间方法调用 组件化的消息通信方式选择 广播 事件总线 Application生命周期分发 为什么需要组件化 小项目是不需要组件化的.当一个项目有数十个人开发,编译项目要花费10分钟,修改一个bug就可能会影响到其他业务,小小的改动就需要进行回归测试,如果是这种项目,那么我们需要进行组

  • Android开发OkHttp执行流程源码分析

    目录 前言 介绍 执行流程 OkHttpClient client.newCall(request): RealCall.enqueue() Dispatcher.enqueue() Interceptor RetryAndFollowUpInterceptor BridgeInterceptor CacheInterceptor 前言 OkHttp 是一套处理 HTTP 网络请求的依赖库,由 Square 公司设计研发并开源,目前可以在 Java 和 Kotlin 中使用. 对于 Androi

  • 深入解读Android开发中Activity的生命周期

    什么是Activity        首先,Activity是Android系统中的四大组件之一,可以用于显示View.Activity是一个与用记交互的系统模块,几乎所有的Activity都是和用户进行交互的,但是如果这样就能说Activity主要是用来显示View就不太正确了. 在深入了解Activity之前,我们先要知道一下MVC设计模式,在JAVAEE 中MVC设计模式已经很经典了,而且分的也比较清晰了,但是在Android中,好多人对MVC在Android开发中的应用不是很清楚,下面我

随机推荐