Android MQTT与WebSocket协议详细讲解

目录
  • MQTT
  • WebSocket
  • 总结

MQTT

MQTT是一个极其轻量级的发布/订阅消息传输协议,对于需要较小代码占用空间或网络带宽非常宝贵的远程连接非常有用

有如下特点:

  • 开放消息协议,简单易实现;
  • 发布订阅模式,一对多消息发布;
  • 基于TCP/IP网络连接,提供有序,无损,双向连接;
  • 1字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量;
  • 消息QoS支持,可靠传输保证。

添加依赖

maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }

implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

首先,连接服务器,SERVER_HOST为主机名,CLIENT_ID为客户端唯一标识,还需要用户名和密码,当然,这些一般由服务器返回。

    fun connect() {
        try {
            //MemoryPersistence设置clientId的保存形式,默认为以内存保存
            client = MqttAsyncClient(SERVER_HOST, CLIENT_ID, MemoryPersistence())
            //MQTT连接设置
            options = MqttConnectOptions()
            with(options) {
                //是否清空session,true表示每次连接到服务器都以新的身份,false表示服务器会保留客户的连接记录
                isCleanSession = true
                //用户名和密码
                userName = USERNAME
                password = PASSWORD.toCharArray()
                //超时时间,单位是秒
                connectionTimeout = 30
                //会话心跳时间,单位是秒,服务器每隔30秒向客户端发送消息判断客户端是否在线
                keepAliveInterval = 30
            }
            //设置回调
            client!!.setCallback(PushCallBack())
            client!!.connect(options, context, iMqttActionListener)
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

心跳包:在空闲时间内,如果客户端没有其他任何报文发送,必须发送一个PINGREQ报文到服务器,而如果服务端在 1.5 倍心跳时间内没有收到客户端消息,则会主动断开客户端的连接,发送其遗嘱消息给所有订阅者,而服务端收到PINGREQ报文之后,立即返回PINGRESP报文给客户端。

如果两个客户端的 clientID 一样,那么服务端记录第一个客户端连接之后再收到第二个客户端连接请求,则会向第一个客户端发送 Disconnect 报文断开连接,并连接第二个客户端。

遗嘱消息:MqttConnectOptions的setWill方法可以设置遗嘱消息,客户端没有主动向服务端发起disconnect断开连接消息,但服务端检测到和客户端的连接已断开,此时服务端将该客户端设置的遗嘱消息发送出去,遗嘱Topic中不能存在通配符。

应用场景:客户端掉线之后,可以及时通知到所有订阅该遗嘱topic的客户端。

订阅消息。

    fun subscribeMessage(topic: String, qos: Int) {
        client?.let {
            try {
                it.subscribe(topic, qos)
            } catch (e: MqttException) {
                e.printStackTrace()
            }
        }
    }

MQTT是一种发布/订阅的消息协议, 通过设定的主题topic,发布者向topic发送的payload负载消息会经过服务器,转发到所有订阅该topic的订阅者。topic有两个通配符,“+” 和 “#”,与 “/” 组合使用,“+” 只能表示一级topic,“#” 可以表示任意层级,例如,订阅topic为 “guangdong/+”,发布者发布的topic可以是 guangdong,guangdong/shenzhen,但是不能是guangdong/shenzhen/nanshan,而订阅的topic如果是 “guangdong/#” 则可以收到。

Qos为服务质量等级,qos=0,表示发送方只发一次,不管是否发成功,也不管接收方是否成功接收,适用于不重要的数据传输;qos=1,表示消息至少有一次到达接收方,发送方发送消息,需要等待接收方返回应答消息,如果发送方在一定时间内未收到应答,发送方将继续发送,直到收到应答消息,删除本地消息缓存,不再发送。适用于需要收到所有消息,客户端可以处理重复消息;qos = 2:确保消息只一次到达接收方,一般我们都会选择2。

发布消息

    fun publish(topic: String, msg: String, isRetained: Boolean, qos: Int) {
        try {
            client?.let {
                val message = MqttMessage()
                message.qos = qos
                message.isRetained = isRetained
                message.payload = msg.toByteArray()
                it.publish(topic, message)
            }
        } catch (e: MqttPersistenceException) {
            e.printStackTrace()
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

payload为负载消息,字节流类型,是 MQTT 通信传输的真实数据。retain是保留信息,服务端将保留对应topic最新的一条消息记录,保留消息的作用是每次客户端连上都会收到其topic的最后一条保留消息。

整个封装类如下:

class MQTTManager private constructor(private val context: Context) {
    private var client: MqttAsyncClient? = null
    private lateinit var options: MqttConnectOptions
    private val iMqttActionListener = object : IMqttActionListener {
        override fun onSuccess(asyncActionToken: IMqttToken?) {
            //连接成功,开始订阅
            subscribeMessage(TOPIC, 2)
        }
        override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
            //连接失败
        }
    }
    companion object {
        @Volatile
        private var instance: MQTTManager? = null
        fun getInstance(context: Context): MQTTManager = instance ?: synchronized(this) {
            instance ?: MQTTManager(context).also {
                instance = it
            }
        }
    }
    /**
     * 连接服务器
     */
    fun connect() {
        try {
            //MemoryPersistence设置clientId的保存形式,默认为以内存保存
            client = MqttAsyncClient(SERVER_HOST, CLIENT_ID, MemoryPersistence())
            //MQTT连接设置
            options = MqttConnectOptions()
            with(options) {
                //是否清空session,true表示每次连接到服务器都以新的身份,false表示服务器会保留客户的连接记录
                isCleanSession = true
                //用户名和密码
                userName = USERNAME
                password = PASSWORD.toCharArray()
                //超时时间,单位是秒
                connectionTimeout = 30
                //会话心跳时间,单位是秒,服务器每隔30秒向客户端发送消息判断客户端是否在线
                keepAliveInterval = 30
                //自动重连
                isAutomaticReconnect = true
            }
            //设置回调
            client!!.setCallback(PushCallBack())
            client!!.connect(options, context, iMqttActionListener)
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }
    /**
     * 订阅消息
     */
    fun subscribeMessage(topic: String, qos: Int) {
        client?.let {
            try {
                it.subscribe(topic, qos)
            } catch (e: MqttException) {
                e.printStackTrace()
            }
        }
    }
    /**
     * 发布消息
     */
    fun publish(topic: String, msg: String, isRetained: Boolean, qos: Int) {
        try {
            client?.let {
                val message = MqttMessage()
                message.qos = qos
                message.isRetained = isRetained
                message.payload = msg.toByteArray()
                it.publish(topic, message)
            }
        } catch (e: MqttPersistenceException) {
            e.printStackTrace()
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }
    /**
     * 断开连接
     */
    fun disconnect() {
        client?.takeIf {
            it.isConnected
        }?.let {
            try {
                it.disconnect()
                instance = null
            } catch (e: MqttException) {
                e.printStackTrace()
            }
        }
    }
    /**
     * 判断是否连接
     */
    fun isConnected() = if (client != null) client!!.isConnected else false
    inner class PushCallBack : MqttCallback {
        override fun connectionLost(cause: Throwable?) {
            //断开连接
        }
        override fun messageArrived(topic: String?, message: MqttMessage?) {
            //接收消息回调
        }
        override fun deliveryComplete(token: IMqttDeliveryToken?) {
            //发布消息完成后的回调
        }
    }
}

WebSocket

WebSocket是应用层的一种协议,是建立在TCP协议基础上的,主要特点就是全双工通信,允许服务器主动发送信息给客户端。

这里使用OKHTTP进行WebSocket开发。

class WebSocketManager private constructor() {
    private val client: OkHttpClient = OkHttpClient.Builder().writeTimeout(5, TimeUnit.SECONDS)
        .readTimeout(5, TimeUnit.SECONDS)
        .connectTimeout(5, TimeUnit.SECONDS)
        .build()
    private var webSocket: WebSocket? = null
    companion object {
        val instance: WebSocketManager by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { WebSocketManager() }
    }
    fun connect(url: String) {
        webSocket?.cancel()
        val request = Request.Builder().url(url).build()
        webSocket = client.newWebSocket(request, object : WebSocketListener() {
            //连接成功后回调
            override fun onOpen(webSocket: WebSocket, response: Response) {
                super.onOpen(webSocket, response)
            }
            //服务器发送消息给客户端时回调
            override fun onMessage(webSocket: WebSocket, text: String) {
                super.onMessage(webSocket, text)
            }
            //服务器发送的byte类型消息
            override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
                super.onMessage(webSocket, bytes)
            }
            //服务器连接关闭中
            override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
                super.onClosing(webSocket, code, reason)
            }
            //服务器连接已关闭
            override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
                super.onClosed(webSocket, code, reason)
            }
            //服务器连接失败
            override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
                super.onFailure(webSocket, t, response)
            }
        })
    }
    //发送消息
    private fun sendMessage(message: String) {
        webSocket?.send(message)
    }
    private fun close(code: Int, reason: String) {
        webSocket?.close(code, reason)
    }
}

总结

WebSocket是一种简单的报文协议,着重解决客户端与服务端不能双向通信的问题。

MQTT是基于TCP的发布/订阅消息传输协议,客户端可以创建和订阅任意主题,并向主题发布或接收消息,此外,有许多为物联网优化的特性,比如服务质量等级Qos,层级主题,遗嘱信息等。

MQTT和WebSocket都是应用层协议,都使用TCP协议来确保可靠传输,都支持双向通信,都使用二进制编码。WebSocket更简单灵活,MQTT相对复杂,但功能强大,大家可以根据自己的使用场景来选择 。

到此这篇关于Android MQTT与WebSocket协议详细讲解的文章就介绍到这了,更多相关Android MQTT与WebSocket内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 使用Android WebSocket实现即时通讯功能

    最近做这个功能,分享一下.即时通讯(Instant Messaging)最重要的毫无疑问就是即时,不能有明显的延迟,要实现IM的功能其实并不难,目前有很多第三方,比如极光的JMessage,都比较容易实现.但是如果项目有特殊要求(如不能使用外网),那就得自己做了,所以我们需要使用WebSocket. WebSocket WebSocket协议就不细讲了,感兴趣的可以具体查阅资料,简而言之,它就是一个可以建立长连接的全双工(full-duplex)通信协议,允许服务器端主动发送信息给客户端. Ja

  • android利用websocket协议与服务器通信

    最近做一个项目,需求中需要服务器主动推送消息到客户端.这样的话一般的http连接就不能使用了.博主问了个朋友,向我推荐websocket协议,特此测试了一下,发现效果很好. android本身没有websocket的库,需要自己下载 ,下载地址. 客户端代码: 界面布局自己写,很简单的两个button package com.example.test; import com.example.test.R; import android.app.Activity; import android.o

  • Android使用Websocket实现聊天室

    最近的项目中要实现一个聊天的功能,类似于斗鱼TV的聊天室功能,与服务器端人商量后决定用WebSocket来做,但是在这之前我只知道Socket但是听都没有听过WebSocket,但是查看了相关的材料以后发现实现一个聊天室其实是很简单的!下面我们先来看看WebSocket. Autobahn|Android 是由Autobahn开发一个开源的Java/Android网络库,实现了WebSocket协议和Web应用程序消息传输协议来创建本地移动的WebSocket/ WAMP的客服端. WebSoc

  • Android使用WebSocket实现多人游戏

    WebSocket 是 HTML5 一种新的协议.它实现了浏览器与服务器全双工通信,能更好的节省服务器资源和带宽并达到实时通讯,它建立在 TCP 之上,同 HTTP 一样通过 TCP 来传输数据,但是它和 HTTP 最大不同是: WebSocket 是一种双向通信协议,在建立连接后,WebSocket 服务器和 Browser/Client Agent 都能主动的向对方发送或接收数据,就像 Socket 一样: WebSocket 需要类似 TCP 的客户端和服务器端通过握手连接,连接成功后才能

  • Android MQTT与WebSocket协议详细讲解

    目录 MQTT WebSocket 总结 MQTT MQTT是一个极其轻量级的发布/订阅消息传输协议,对于需要较小代码占用空间或网络带宽非常宝贵的远程连接非常有用 有如下特点: 开放消息协议,简单易实现: 发布订阅模式,一对多消息发布: 基于TCP/IP网络连接,提供有序,无损,双向连接: 1字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量: 消息QoS支持,可靠传输保证. 添加依赖 maven { url "https://repo.eclipse.org/conten

  • Android 超详细讲解fitsSystemWindows属性的使用

    对于android:fitsSystemWindows这个属性你是否感觉又熟悉又陌生呢? 熟悉是因为大概知道它可以用来实现沉浸式状态栏的效果,陌生是因为对它好像又不够了解,这个属性经常时灵时不灵的. 其实对于android:fitsSystemWindows属性我也是一知半解,包括我在写<第一行代码>的时候对这部分知识的讲解也算不上精准.但是由于当时的理解对于我来说已经够用了,所以也就没再花时间继续深入研究. 而最近因为工作的原因,我又碰上了android:fitsSystemWindows这

  • Android详细讲解谷歌推出的官方二维码扫描库

    相信二维码扫描现在大家都已经不稀奇了,几乎所有的App里都会支持这个功能. 这里我要问大家一个问题,你们都是如何在自己的App中加入二维码扫描功能的呢? 相信会有一大部分朋友说,使用的是ZXing或者ZBar这种开源库. 但是不知道大家有没有思考过,二维码功能这么常见,为什么Google却没有提供一个官方的二维码扫描库呢? 反正我是没思考过.有需求,找开源,这可能已经成了很多Android开发者的常态化思维. 但令我没想到的是,官方的二维码扫描库,它真的要来了. 由于我是Google的GDE,有

  • Android超详细讲解弹出多选框的实现

    目录 程序代码功能:点击一个按钮弹出一个多选框 在activity_main.xml布局一个button控件,大小,颜色,位置,背景可自行调节,以被用来在MainActivity.java调用其id来实现点击弹出多选框!在btn1.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) {}}大括号内放入点击btn1时间要发生的内容,因为是弹出多选框,所以用来Alter.Bu

  • Android超详细讲解组件AdapterView的使用

    目录 概述 介绍AdapterView的编程模式 Adapter ListView使用 myAdapater.java MainActivity.java activity_main.xml myAdapater.java MainActivity.java 概述 在Android应用开发中,AdapterView是一类常用且非常重要的组件.我们常见的以列表的形式显示信息的组件就是AdapterView的子类,称为Listview:我们经常以网格方式浏览图片缩略图的组件也是AdapterView

  • Android超详细讲解组件LinearLayout的使用

    目录 概述 常用XML配置属性 (1) android:orientation (2) android:gravity (3) View中继承来的属性 代码举例 概述 LinearLayout是线性布局组件,放置在其中的组件按列或者按行(就是垂直或者水平)的方式排序分布. 常用XML配置属性 (1) android:orientation 设置LinearLayout容器布局组件的方式:只能取值:horizontal(水平的),vertical(垂直的) (2) android:gravity

  • Android超详细讲解组件ScrollView的使用

    目录 概述 练习 HorizontalScrollView: 概述 ScrollView也是一个容器,它是FrameLayout的子类,它的主要作用就是将超出物理屏幕的内容显示出来,(就是滚动条效果)ScrollView提供垂直滚动,进而可将超出物理屏幕的内容显示出来. 在一般情况下,可以将一个采用垂直方式布局组件的LinearLayout作为ScrollLayout容器的子组件,同时,在LinearLayout容器中可以显示超出屏幕物理高度的内容. 练习 这么说有点抽象,然后我们现在实现完成一

  • Android四大组件之Service服务详细讲解

    目录 一.Service是什么 二.Service 的启动方式 2.1.startService 显示启动 Service启动 Service 停止 2.2.bindService 绑定启动 使用bindService()方法启动Service unbindService 停止服务 三.Service 生命周期 startService启动的生命周期 bindService启动的生命周期 上一节:Activity 简介:在Android组件中最基本也是最为常见的四大组件: Activity Se

  • Android组件化、插件化详细讲解

    目录 什么是组件化(通俗易懂) 反射的写法 反射的⽬的 关于DEX: 插件化原理:动态加载 问题⼀:未注册的组件(例如Activity)不能打开 问题⼆:资源⽂件⽆法加载 插件化有什么用? 什么是组件化(通俗易懂) 通俗易懂来讲就是,拆成多个module开发就是组件化. App的部分功能模块在打包时并不以传统⽅式打包进apk⽂件中,⽽是以另⼀种形式⼆次封装进apk内部,或者放在⽹络上适时下载,在需要的时候动态对这些功能模块进⾏加载,称之为插件化.这些单独⼆次封装的功能模块apk,就称作插件,初始

随机推荐