node.js中TCP Socket多进程间的消息推送示例详解

前言

前段时间接到了一个支付中转服务的需求,即支付数据通过http接口传到中转服务器,中转服务器将支付数据发送到异构后台(Lua)的指定tcp socket。

一开始评估的时候感觉蛮简单的,就是http server和tcp server间的通信,不是一个Event实例就能解决的状态管理问题吗?注册一个事件A用于消息传递,在socket连接时注册唯一的ID,然后在http接收到数据时,emit事件A;在监听到事件A时,在tcp server中寻找指定ID对应的socket处理该数据即可。

尽管node.js在高并发方面有不错的性能,但是单个tcp server实例的承载能力有限,为避免服务器过载,node.js 单进程的内存有上限(默认2G),能容纳的长连接客户端数不多。但随着业务的扩大,我们需要考虑多机集群部署,客户端可以连接到任一节点,并发送消息。如何做到多节点的同时推送,我们需要建立一套多节点之间的消息分发/订阅架构。常用的第三方消息管理库有 RabbitMQ和Redis等。在这里,我用的是Redis的订阅发布服务。

redis.io有一个比较成熟的redis消息中转库socket.io-redis (本地下载)。但我们项目中异构后台用到的并非websocket,而是原生的TCP原生的Socket。用原生redis的sub/pubs实现并不难,就手写了。

redis在该项目中主要起到一个消息分发中心(publish/subscribe)的作用。当http请求的支付数据发送过来时,则通过redis的publish功能往所有的channel推送消息,这样所有订阅该channel的socket server就能收到回调,然后推送到指定客户端。在应用层看跟Event事件消息的处理差不多。

const redis = require("redis"),
 redisClient = redis.createClient,
 REDIS_CFG = {
  host: '127.0.0.1',
  port: 6379
 },
 sub = redisClient(REDIS_CFG),
 pub = redisClient(REDIS_CFG),
 PAY_MQ_CHANNEL = 'pay_mq_channel';

// 监听频道的消息回调
sub.on('message', function(channel, message) {
 switch (channle){
  case PAY_MQ_CHANNEL:
   console.log('notification received:', message);

   // 广播消息到指定socket

   break;
 }
});
// 订阅频道
sub.subscribe(PAY_MQ_CHANNEL);

// 当接收到支付数据时,推送频道消息
pub.publish(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});

由于redis的sub/pub的channel订阅数有上限,所以建议一类消息使用一个channel,一个channel下使用map、set或数组来存储订阅时的回调函数,在接收到订阅消息时遍历执行回调函数。

下面是我封装好的Redis组件(RedisMQProxy.js):

/*
 * redis 订阅/发布
 */
const _ = require('lodash'),
 redis = require("redis"),
 REDIS_CFG = {
  host: '127.0.0.1',
  port: 6379
 },
 sub = redisClient(REDIS_CFG),
 pub = redisClient(REDIS_CFG);

let SubListenerFuns = {}; // channel的回调函数列表

let RedisMQProxy = {

 // 订阅channel
 on(channel, cb, errorCb, once = false) {
  sub.subscribe(channel); // 订阅channel消息

  // 将回调函数存放数组中
  SubListenerFuns[channel] = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
  SubListenerFuns[channel].push({
   once, cb, errorCb
  });
 },

 // 监听一次性的channel回调函数
 once(channel, cb, errorCb) {
  this.on(channel, cb, errorCb, true);
 },

 // 发送channel消息
 emit(channel, message) {
  if(!_.isString(message)) {
   message = JSON.stringify(message);
  }
  pub.publish(channel, message);
 },

 // 移除channel上的监听函数
 removeListener(channel, func) {
  let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
  for(let i = 0, l = channelHandlers.length; i < l; i++) {
   let handler = channelHandlers[i] || {};
   let cb = handler.cb;
   if(func && func == cb) {
    channelHandlers.splice(i, 1);
    return false;
   }
  }
 }
};

RedisMQProxy.SubListeners = SubListenerFuns;

pub.on('error', onError);
sub.on('error', onError);

// 监听redis的订阅消息
sub.on("message", function(channel, message) {
 // 遍历执行channel的回调函数
 try {
  message = JSON.parse(message);
 } catch(e) {}
 broadcastToChannel(channel, message);
});

// 广播消息到指定频道
function broadcastToChannel(channel, message, isError) {
 let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
 for(let i = 0, l = channelHandlers.length; i < l; i++) {
  let handler = channelHandlers[i] || {};
  let isOnce = handler.once || false;
  let func = handler.cb;
  let errorFunc = handler.errorCb;

  _.isFunction(func) && func(message);
  isError && _.isFunction(errorFunc) && errorFunc(message);

  isOnce && channelHandlers.splice(i, 1); // 移除一次性监听的函数
 }
}

function broadcastToAllChannels(message, isError) {
 for(let channel in SubListenerFuns) {
  broadcastToChannel(channel, message, isError);
 }
}

function onError(err) {
 err = err || {};
 err.msg = err.msg || 'redis sub/pub fail';

 // 通知所有channel执行错误回调函数
 broadcastToAllChannels(err, true);
}

module.exports = RedisMQProxy;

在使用时就可以比较方便地调用了:

const RedisMQProxy = require('./RedisMQProxy'),
 PAY_MQ_CHANNEL = 'pay_mq_channel';

// 订阅channel
RedisMQ.on(PAY_MQ_CHANNEL, function(message) {
 console.log('notification received:', message);
 // 广播消息到指定socket
 // ...
});

// 订阅一次性的channel
RedisMQ.once(PAY_MQ_CHANNEL, function(message) {
 // ...
});

// 当接收到支付数据时,推送频道消息
RedisMQ.emit(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});

目前该项目已经健康运行了一个多月。由于socket server的多进程间消息推送依赖于redis的消息中转,而Redis使用的是单进程,未能充分利用CPU。当业务膨胀的时候,redis就要考虑分布集群了。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • node.js使用cluster实现多进程

    首先郑重声明: nodeJS 是一门单线程!异步!非阻塞语言! nodeJS 是一门单线程!异步!非阻塞语言! nodeJS 是一门单线程!异步!非阻塞语言! 重要的事情说3遍. 因为nodeJS天生自带buff, 所以从一出生就受到 万千 粉丝的追捧(俺,也是它的死忠). 但是,傻逼php 竟然嘲笑 我大NodeJS 的性能. 说不稳定,不可靠,只能利用单核CPU. 辣鸡 nodeJS. 艹!艹!艹! 搞mo shi~ 但,大哥就是大哥,nodeJS在v0.8 的时候就已经加入了cluster

  • Node.js中child_process实现多进程

    复制代码 代码如下: var http = require('http'); function fib (n) {     if (n < 2) {         return 1;     } else {         return fib(n - 2) + fib(n - 1);     } } var server = http.createServer(function (req, res) {     var num = parseInt(req.url.substring(1)

  • nodejs基础之多进程实例详解

    本文实例讲述了nodejs基础之多进程.分享给大家供大家参考,具体如下: Node.js 多进程 我们都知道 Node.js 是以单线程的模式运行的,但它使用的是事件驱动来处理并发,这样有助于我们在多核 cpu 的系统上创建多个子进程,从而提高性能. 每个子进程总是带有三个流对象:child.stdin, child.stdout 和child.stderr.他们可能会共享父进程的 stdio 流,或者也可以是独立的被导流的流对象. Node 提供了 child_process 模块来创建子进程

  • Node.js中多进程模块Cluster的介绍与使用

    前言 我们都知道nodejs最大的特点就是单进程.无阻塞运行,并且是异步事件驱动的.Nodejs的这些特性能够很好的解决一些问题,例如在服务器开发中,并发的请求处理是个大问题,阻塞式的函数会导致资源浪费和时间延迟.通过事件注册.异步函数,开发人员可以提高资源的利用率,性能也会改善.既然Node.js采用单进程.单线程模式,那么在如今多核硬件流行的环境中,单核性能出色的Nodejs如何利用多核CPU呢?创始人Ryan Dahl建议,运行多个Nodejs进程,利用某些通信机制来协调各项任务.目前,已

  • Nodejs中解决cluster模块的多进程如何共享数据问题

    前述 nodejs在v0.6.x之后增加了一个模块cluster用于实现多进程,利用child_process模块来创建和管理进程,增加程序在多核CPU机器上的性能表现.本文将介绍利用cluster模块创建的多线程如何共享数据的问题. 进程间数据共享 首先举个简单的例子,代码如下: var cluster = require('cluster'); var data = 0;//这里定义数据不会被所有进程共享,各个进程有各自的内存区域 if (cluster.isMaster) { //主进程

  • 深入理解NodeJS 多进程和集群

    进程和线程 "进程" 是计算机系统进行资源分配和调度的基本单位,我们可以理解为计算机每开启一个任务就会创建至少一个进程来处理,有时会创建多个,如 Chrome 浏览器的选项卡,其目的是为了防止一个进程挂掉而应用停止工作,而 "线程" 是程序执行流的最小单元,NodeJS 默认是单进程.单线程的,我们将这个进程称为主进程,也可以通过 child_process 模块创建子进程实现多进程,我们称这些子进程为 "工作进程",并且归主进程管理,进程之间默

  • node.js中TCP Socket多进程间的消息推送示例详解

    前言 前段时间接到了一个支付中转服务的需求,即支付数据通过http接口传到中转服务器,中转服务器将支付数据发送到异构后台(Lua)的指定tcp socket. 一开始评估的时候感觉蛮简单的,就是http server和tcp server间的通信,不是一个Event实例就能解决的状态管理问题吗?注册一个事件A用于消息传递,在socket连接时注册唯一的ID,然后在http接收到数据时,emit事件A:在监听到事件A时,在tcp server中寻找指定ID对应的socket处理该数据即可. 尽管n

  • JS中原始值和引用值的储存方式示例详解

    在ECMAscript中,变量可以存放两种类型的值,即原始值和引用值 原始值指的是代表原始数据类型的值,也叫基本数据类型,包括:Number.Stirng.Boolean.Null.Underfined 引用值指的是复合数据类型的值,包括:Object.Function.Array.Date.RegExp 根据数据类型不同,有的变量储存在栈中,有的储存在堆中.具体区别如下: 原始变量及他们的值储存在栈中,当把一个原始变量传递给另一个原始变量时,是把一个栈房间的东西复制到另一个栈房间,且这两个原始

  • node.js中的socket.io的广播消息

    在多个客户端与服务器端建立连接后,socket.io()服务器具有一个sockets属性,属性值为所有与客户端建立连接的socket对象.可以利用该对象的send方法或emit方法向所有客户端广播消息. io.sockets.send("user commected); io.socket.emit("login",names); 案例 server.js代码: 复制代码 代码如下: var express=require("express"); var

  • Node.js中使用socket创建私聊和公聊聊天室

    先给大家展示效果图: 在上篇文章给大家介绍使用Angular和Nodejs.socket.io搭建聊天室及多人聊天室,本文继续介绍Node.js中使用socket创建私聊和公聊聊天室,具体详情请看下文吧. nodejs的应用中,关于socket应该是比较出彩的了,socket.io在github上有几万人的star,它的成功应该是不输于express的,为了方便了解整个socket.io的使用. 例子请点击http://chat.lovewebgames.com/ 源码下载https://git

  • node.js中使用socket.io制作命名空间

    如果开发者想在一个特定的应用程序中完全控制消息与事件的发送,只需要使用一个默认的"/"命名空间就足够了.但是如果开发者需要将应用程序作为第三方服务提供给其他应用程序,则需要为一个用于与客户端连接的socket端口定义一个独立的命名空间. io.of(namespace) 制作两个命名空间 chat和news然后在客户端相互发送信息. 复制代码 代码如下: var express=require("express"); var http=require("h

  • Node.js基础入门之path模块,url模块,http模块使用详解

    目录 path模块 1. path模块示例 2. path模块其他方法 url模块 1. 旧的解析方法 2. 新的解析方法 http模块 1. 什么是HTTP协议? 2. HTTP协议约束的细节 3. HTTP请求响应过程 4. http模块get方法 经过前面四天的学习,对Node.js已经有了一个基础的认识,今天继续学习Node.js网络通信编程相关内容,并稍加整理加以分享,如有不足之处,还请指正. path模块 Node.js中,提供了一个path模块,在这个模块中,提供了许多实用的,可被

  • python TCP Socket的粘包和分包的处理详解

    概述 在进行TCP Socket开发时,都需要处理数据包粘包和分包的情况.本文详细讲解解决该问题的步骤.使用的语言是Python.实际上解决该问题很简单,在应用层下,定义一个协议:消息头部+消息长度+消息正文即可. 那什么是粘包和分包呢? 关于分包和粘包 粘包:发送方发送两个字符串"hello"+"world",接收方却一次性接收到了"helloworld". 分包:发送方发送字符串"helloworld",接收方却接收到了两

  • Node.js基础入门之模块与npm包管理器使用详解

    目录 require函数 模块分类 第三方模块 1. 安装第三方模块 2. 引入第三方模块 3. 示例测试 系统模块 require注意事项 exports导出对象 1. exports示例 2. exports注意事项 module模块对象 package.json包描述文件 1. 什么是package.json ? 2. 如何创建package.json文件? NPM基础 1. 常用npm命令 2. npm 示例 cnpm基础 1. 什么是cnpm ? 2. 使用cnpm 控制台输出 1.

  • JS中正则表达式只有3种匹配模式(没有单行模式)详解

    JS正则表达式对象模式仅有如下三种:  g (全文查找出现的所有 pattern) i (忽略大小写) m (多行查找) 即没有单行匹配模式,Singleline(单行模式):更改.的含义,使它与每一个字符匹配(包括换行符\n). 如java中 String regex = "(?s)(?<=interface).{0,500}(shutdown)";---------"."表示在一行. 但可以采用[\d\D]或[\w\W]或[\s\S]或(.|\s)*?来解

  • js中获取URL参数的共用方法getRequest()方法实例详解

    下面通过一段代码给大家介绍js中获取URL参数的共用方法getRequest()方法,具体代码如下所示: getRequest : function() { var url = location.search; //获取url中"?"符后的字串 var theRequest = new Object(); if (url.indexOf("?") != -1) { var str = url.substr(1); strs = str.split("&am

随机推荐