在Node.js下运用MQTT协议实现即时通讯及离线推送的方法

前言

前些日子了解到mqtt这样一个协议,可以在web上达到即时通讯的效果,但网上并不能很方便地找到一篇目前版本的在node下正确实现这个协议的博客。

自己捣鼓了一段时间,理解不深刻,但也算是基本能够达到使用目的。

本文尚未对离线消息的接收顺序进行处理。

代码

服务端: server.js

//服务端引入中间件mosca
let mosca = require('mosca')
let settings = {
 port: 5112
}
let server = new mosca.Server(settings)
server.on('ready', function(){
  console.log('Mosca server is up and running at port 5112');
})
server.on('published', function(packet, client) {
 console.log('Published', packet.payload)
})

server.on('clientDisconnected', function(client){
 console.log('disconnected: ', client.id)
})

推送端: pub.js

//客户端引入mqtt
let mqtt = require('mqtt');

let client = mqtt.connect('mqtt://localhost', {
 port: 5112,
 clientId: 'cli_pub',
})

let num = 0;
setInterval(function (){
 client.publish('test',
 'Hello mqtt ' + (++num),
 {qos:1},
 () => console.log(num));
}, 1000)

订阅端: sub.js

let mqtt = require('mqtt')

let client = mqtt.connect('mqtt://localhost', {
 port: 5112,
 clientId: 'cli_sub',
})

client.subscribe('test',{qos:1})

client.on('message', function (topic, message) {
 console.log('received message: ', message.toString())
})

server运行后,先启动pub,再启动sub,即可在sub中接收到推送过来的消息序列

至此实现了简单的即时推送

离线推送相关配置及简要介绍

离线配置-服务端:

要实现消息的离线推送,必然需要一个存储临时数据的部件

此处用到的是mongo,当然可以根据需要选择其他的存储工具

server.js中的settings需更改为:

let settings = {
 port: 5112,
 persistence:{  //增加了此项
  factory: mosca.persistence.Mongo,
  url: "mongodb://localhost:27017/mosca"
 }
}

factory: 引入mosca对特定存储工具的一些处理方法

url: 其中的 27017 为mongo所监听的端口号,mosca为存储相关数据的数据库

值得一提的是:配置好mongo的环境后,不需要提前在mongo中手动创建,若数据库不存在会自动生成,而且mosca会为你作好其他一切基本事项 (即:若只想临时体验下效果,甚至可以暂时把mongo放一边 )

在mongo中,可以看到自动新添了db: mosca及其下的collection(相当于关系型数据库中的表/关系)

离线配置-客户端:

pub.js和sub.js中的client中都可以改为:

let client = mqtt.connect('mqtt://localhost', {
 port: 5112,
 clientId: 'cli_**',
 clean: false//增加了此项
})
  • clientId: 区分客户端的识别码
  • clean: 此处决定了客户端在服务端的session是否会被清除,默认为true,为实现离线推送,我们需要将其保留
  • clean及上文中的persistence为实现离线推送的关键配置

mqtt.connect()会返回一个mqttClient对象,包含了:reconnect(), subscribe(), publish()等一系列方法。

本文中发送端接收端被分为了pub.js和sub.js两个独立文件,仅仅为了方便在不同控制台中观察效果
一个client可以既为推送端,又为订阅端

至此,所有代码已完成

其他介绍:

client.subscribe():
为本客户端订阅一个话题,所有订阅此话题的用户都会收到在此话题下推送的信息

//client.subscribe(topic,opts)
client.subscribe('test',{qos:1})

opts中的qos为通信机制,控制发送端与接收端的互锁程度

上文中的其中一个collection: subscriptions即记录各用户话题订阅情况

用户cli_sub及cli2_sub订阅了话题test:

(新增一个cli2_pub,下文有用)

注:

重复执行脚本sub.js实际上对topic进行了重复订阅

实际编码时,应避免topic的重复订阅,即使重复订阅并不影响实现效果

client.publish():

向指定topic发送数据

message为Buffer或String格式,可以通过序列化或转json实现对复杂数据对象的传送

//client.publish(topic, message, opts, callback)
let num = 0;
setInterval(function (){
 client.publish('test',
 'Hello mqtt ' + (++num),
 {qos:1},
 () => console.log(num));
}, 1000)

参数不再赘述

此处用一个定时器定时在 topic: test 下发送'Hello mqtt 1,2,3..'

用回调函数实时打印一下发送的num:

当订阅者处于离线状态时,可以在collection: packets中查看到临时数据的存储情况:

mosca把每一条推送消息为所有订阅用户都生成了独立的记录,用同一个messageId进行关联

当其中一个用户(cli2_sub)上线时,获取到其对应的数据,

而后数据库中相应记录便会被删除

此时仅有cli_sub用户的数据

当cli2_sub上线接收消息后,packets中记录将被清空

client.on():

即在client上触发的事件,此处只列举消息接收事件

//client.on(event, callback)
client.on('message', function (topic, message) {
 console.log('received message: ', message.toString())
})

处理为简单地打印到控制台

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Node.js 进程平滑离场剖析小结

    使用 Node.js 搭建 HTTP Server 已是司空见惯的事.在生产环境中,Node 进程平滑重启直接关系到服务的可靠性,它的重要性不容我们忽视.既然是平滑重启,就涉及到新旧进程的接替过渡: 首先,保证新进程平滑入场 其次,保证旧进程平滑离场 本文主要谈论下,在新旧进程接替过渡期间,如何保证旧进程平滑离场.那怎样的离场才算平滑的呢? 如何定义平滑离场 以进程离场作为时间分割点,我们可以把请求分为两类:增量请求和存量请求. 在进程离场前,停止接收新的(增量)请求 在进程离场前,保证未完成的

  • Docker使用编写dockerfile启动node.js应用

    编写 Dockerfile 以 express 自动创建的目录为例,目录结构如下: ├── /bin │ └── www ├── /node_modules ├── /public ├── /routes ├── /views ├── package-lock.json ├── package.json ├── ecosystem.config.js ├── app.js └── Dockerfile 在项目目录下新建 Dockerfile 文件 FROM node:10.15 MAINTAIN

  • 快速了解Node中的Stream流是什么

    Stream Buffer 的工作原理 Data 是一块大数据 他被分为很多个小数据 每块小数据都被存储在内存中的 Buffer 中 接着 Buffer 不断接收小数据 同时一旦 Buffer 接收的小数据填满了就会被消费 填满的 Buffer 也被称为一个 Chunk 所有 Chunk 组合而成的才是那块 Data 大数据 Stream 的分类 Read Stream Write Stream Duplex Transform Duplex 实际上就是有两个 Buffer 一个处理 ReadS

  • 详解在Node.js中发起HTTP请求的5种方法

    创建HTTP请求使现代编程语言的核心功能之一,也是很多程序员在接触到新的开发环境时最先遇到的技术之一.在Node.js中有相当多的解决方案,其中有语言内置功能,也有开源社区贡献的开发库.下面咱们来看一下比较流行的几种方式. 在开始之前,请先在自己的计算机上安装最新版的node.js和npm. HTTP - 标准库 首先是标准库中默认的 HTTP 模块.这个模块无需安装依赖外部即可使用,做到了真正的即插即用.缺点是与其他解决方案相比,用起来不是那么友好. 下面的代码将向NASA的API发送一个 G

  • node.js微信小程序配置消息推送的实现

    在开发微信小程序时,有一个消息推送,它的解释是这样的. 消息推送具体的内容是下面的这个网址   https://developers.weixin.qq.com/miniprogram/dev/framework/server-ability/message-push.html,他介绍的也还可以,就是我这里换成了node代码. 消息推送 启用并设置消息推送配置后,用户发给小程序的消息以及开发者需要的事件推送,都将被微信转发至该服务器地址中. 在微信小程序的首页开发里面,开发设置中,微信的官网中,

  • 详解基于node.js的脚手架工具开发经历

    前言 我们团队的前端项目是基于一套内部的后台框架进行开发的,这套框架是基于vue和ElementUI进行了一些定制化包装,并加入了一些自己团队设计的模块,可以进一步简化后台页面的开发工作. 这套框架拆分为基础组件模块,用户权限模块,数据图表模块三个模块,后台业务层的开发至少要基于基础组件模块,可以根据具体需要加入用户权限模块或者数据图表模块.尽管vue提供了一些脚手架工具vue-cli,但由于我们的项目是基于多页面的配置进行开发和打包,与vue-cli生成的项目结构和配置有些不一样,所以创建项目

  • 详解Node.js amqplib 连接 Rabbit MQ最佳实践

    客户端设置connection_name 在建立连接时,设置connection_name属性,可以在RabbitMQ Managerment 中查看到连接来自那个实例. amqp.connect(rabbitMqAddress, { clientProperties: { connection_name: 'your host name' } }) 队列属性autoDelete durable 如无必要,建议将队列设置成自动删除,这个在TCP连接断开后,队列会自动删除.另外也不要使用持久化队列

  • Node.js EventEmmitter事件监听器用法实例分析

    本文实例讲述了Node.js EventEmmitter事件监听器用法.分享给大家供大家参考,具体如下: Node.js 所有的异步 I/O 操作在完成时都会发送一个事件到事件队列. events 模块只提供了一个对象: events.EventEmitter.EventEmitter 的核心就是事件触发与事件监听器功能的封装. 该模块已被node.js默认引,不需要使用require()显示引入. EventEmitter 对象如果在实例化时发生错误,会触发 'error' 事件.当添加新的监

  • PostgreSQL Node.js实现函数计算方法示例

    前言 由于工作需要,设计到了阿里云的弹性计算,这里便记录下来 技术栈 node.js postgresql nodemailer controller +  services 编写postgresql lib 不管异常还是正常都返回resolve,在resolve中处理结果,通过success字段去处理 const { Pool } = require('pg'); const config = require('../config/default.js'); const { database:

  • 从零搭建docker+jenkins+node.js自动化部署环境的方法

    本次案例基于CentOS 7系统 适合有一定docker使用经验的人阅读 适合有一定linux命令使用经验的人阅读 1.docker部分 1.1.docker简介 Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化.容器是完全使用沙箱机制,相互之间不会有任何接口 1.2.docker架构 简单的说,docker就是一个轻量级的linux系统.Docker 容器通过 Docker 镜像来创建.

随机推荐