node 利用进程通信实现Cluster共享内存

Node.js的标准API没有提供进程共享内存,然而通过IPC接口的send方法和对message事件的监听,就可以实现一个多进程之间的协同机制,通过通信来操作共享内存。

##IPC的基本用法:

// worker进程 发送消息
process.send(‘读取共享内存');

// master进程 接收消息 -> 处理 -> 发送回信
cluster.on('online', function (worker) {
   // 有worker进程建立,即开始监听message事件
   worker.on(‘message', function(data) {
     // 处理来自worker的请求
     // 回传结果
     worker.send(‘result')
   });
});

在Node.js中,通过send和on(‘message', callback)实现的IPC通信有几个特点。首先,master和worker之间可以互相通信,而各个worker之间不能直接通信,但是worker之间可以通过master转发实现间接通信。另外,通过send方法传递的数据,会先被JSON.stringify处理后再传递,接收后会再用JSON.parse解析。所以Buffer对象传递后会变成数组,而function则无法直接传递。反过来说,就是可以直接传递除了buffer和function之外的所有数据类型(已经很强大了,而且buffer和function也可以用变通的方法实现传递)。

基于以上特点,我们可以设计一个通过IPC来共享内存的方案:

1、worker进程作为共享内存的使用者,并不直接操作共享内存,而是通过send方法通知master进程进行写入(set)或者读取(get)操作。

2、master进程初始化一个Object对象作为共享内存,并根据worker发来的message,对Object的键值进行读写。

3、由于要使用跨进程通信,所以worker发起的set和get都是异步操作,master根据请求进行实际读写操作,然后将结果返回给worker(即把结果数据send给worker)。

##数据格式

为了实现进程间异步的读写功能,需要对通信数据的格式做一点规范。

首先是worker的请求数据:

requestMessage = {
  isSharedMemoryMessage: true, // 表示这是一次共享内存的操作通信
  method: ‘set', // or ‘get' 操作的方法
  id: cluster.worker.id, // 发起操作的进程(在一些特殊场景下,用于保证master可以回信)
  uuid: uuid, // 此次操作的(用于注册/调用回调函数)
  key: key, // 要操作的键
  value: value // 键对应的值(写入)
}

master在接到数据后,会根据method执行相应操作,然后根据requestMessage.id将结果数据发给对应的worker,数据格式如下:

responseMessage = {
  isSharedMemoryMessage: true, // 标记这是一次共享内存通信
  uuid: requestMessage.uuid, // 此次操作的唯一标示
  value: value // 返回值。get操作为key对应的值,set操作为成功或失败
}

规范数据格式的意义在于,master在接收到请求后,能够将处理结果发送给对应的worker,而worker在接到回传的结果后,能够调用此次通信对应的callback,从而实现协同。

规范数据格式后,接下来要做的就是设计两套代码,分别用于master进程和worker进程,监听通信并处理通信数据,实现共享内存的功能。

##User类

User类的实例在worker进程中工作,负责发送操作共享内存的请求,并监听master的回信。

var User = function() {
  var self = this;
  self.__uuid__ = 0;

  // 缓存回调函数
  self.__getCallbacks__ = {};

  // 接收每次操作请求的回信
  process.on('message', function(data) {

    if (!data.isSharedMemoryMessage) return;
    // 通过uuid找到相应的回调函数
    var cb = self.__getCallbacks__[data.uuid];
    if (cb && typeof cb == 'function') {
      cb(data.value)
    }
    // 卸载回调函数
    self.__getCallbacks__[data.uuid] = undefined;
  });
};

// 处理操作
User.prototype.handle = function(method, key, value, callback) {

  var self = this;
  var uuid = self.__uuid__++;

  process.send({
    isSharedMemoryMessage: true,
    method: method,
    id: cluster.worker.id,
    uuid: uuid,
    key: key,
    value: value
  });

  // 注册回调函数
  self.__getCallbacks__[uuid] = callback;

};

User.prototype.set = function(key, value, callback) {
  this.handle('set', key, value, callback);
};

User.prototype.get = function(key, callback) {
  this.handle('get', key, null, callback);
};

##Manager类

Manager类的实例在master进程中工作,用于初始化一个Object作为共享内存,并根据User实例的请求,在共享内存中增加键值对,或者读取键值,然后将结果发送回去。

var Manager = function() {

  var self = this;

  // 初始化共享内存
  self.__sharedMemory__ = {};

  // 监听并处理来自worker的请求
  cluster.on('online', function(worker) {
    worker.on('message', function(data) {
      // isSharedMemoryMessage是操作共享内存的通信标记
      if (!data.isSharedMemoryMessage) return;
      self.handle(data);
    });
  });
};

Manager.prototype.handle = function(data) {
  var self = this;
  var value = this[data.method](data);

  var msg = {
    // 标记这是一次共享内存通信
    isSharedMemoryMessage: true,
    // 此次操作的唯一标示
    uuid: data.uuid,
    // 返回值
    value: value
  };

  cluster.workers[data.id].send(msg);
};

// set操作返回ok表示成功
Manager.prototype.set = function(data) {
  this.__sharedMemory__[data.key] = data.value;
  return 'OK';
};

// get操作返回key对应的值
Manager.prototype.get = function(data) {
  return this.__sharedMemory__[data.key];
};

##使用方法

if (cluster.isMaster) {

  // 初始化Manager的实例
  var sharedMemoryManager = new Manager();

  // fork第一个worker
  cluster.fork();

  // 1秒后fork第二个worker
  setTimeout(function() {
    cluster.fork();
  }, 1000);

} else {

  // 初始化User类的实例
  var sharedMemoryUser = new User();

  if (cluster.worker.id == 1) {
    // 第一个worker向共享内存写入一组数据,用a标记
    sharedMemoryUser.set('a', [0, 1, 2, 3]);
  }

  if (cluster.worker.id == 2) {
    // 第二个worker从共享内存读取a的值
    sharedMemoryUser.get('a', function(data) {
      console.log(data); // => [0, 1, 2, 3]
    });
  }

}

以上就是一个通过IPC通信实现的多进程共享内存功能,需要注意的是,这种方法是直接在master进程的内存里缓存数据,必须注意内存的使用情况,这里可以考虑加入一些简单的淘汰策略,优化内存的使用。另外,如果单次读写的数据比较大,IPC通信的耗时也会相应增加。

完整代码:https://github.com/x6doooo/sharedmemory

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Node.js中多进程模块Cluster的介绍与使用

    前言 我们都知道nodejs最大的特点就是单进程.无阻塞运行,并且是异步事件驱动的.Nodejs的这些特性能够很好的解决一些问题,例如在服务器开发中,并发的请求处理是个大问题,阻塞式的函数会导致资源浪费和时间延迟.通过事件注册.异步函数,开发人员可以提高资源的利用率,性能也会改善.既然Node.js采用单进程.单线程模式,那么在如今多核硬件流行的环境中,单核性能出色的Nodejs如何利用多核CPU呢?创始人Ryan Dahl建议,运行多个Nodejs进程,利用某些通信机制来协调各项任务.目前,已

  • Nodejs中解决cluster模块的多进程如何共享数据问题

    前述 nodejs在v0.6.x之后增加了一个模块cluster用于实现多进程,利用child_process模块来创建和管理进程,增加程序在多核CPU机器上的性能表现.本文将介绍利用cluster模块创建的多线程如何共享数据的问题. 进程间数据共享 首先举个简单的例子,代码如下: var cluster = require('cluster'); var data = 0;//这里定义数据不会被所有进程共享,各个进程有各自的内存区域 if (cluster.isMaster) { //主进程

  • node.js中cluster的使用教程

    本文主要给大家介绍了关于node.js中cluster使用的相关教程,分享出来供大家参考学习,下面来看看详细的介绍: 一.使用NODE中cluster利用多核CPU var cluster = require('cluster'); var http = require('http'); var numCPUs = require('os').cpus().length; if (cluster.isMaster) { // 创建工作进程 for (var i = 0; i < numCPUs;

  • 使用cluster 将自己的Node服务器扩展为多线程服务器

    用nodejs的朋友都有了解,node是单线程的,也就是说跑在8核CPU上,只能使用一个核的算力. 单线程一直是node的一个诟病,但随着0.6版本中引入cluster之后,这个情况则得到了改变,开发人员可以依靠cluster很轻松的将自己的Node服务器扩展为多线程服务器了. 什么是Cluster cluster是node提供的一个多线程库,用户可以使用它来创建多个线程,线程之间共享一个监听端口,当有外部请求这个端口时,cluster会将请求转发到随机线程里.因为每个node线程都会占用几十兆

  • Node学习记录之cluster模块

    在如今机器的CPU都是多核的背景下,Node的单线程设计已经没法更充分的"压榨"机器性能了.所以从v0.8开始,Node新增了一个内置模块--"cluster",故名思议,它可以通过一个父进程管理一坨子进程的方式来实现集群的功能. var cluster = require('cluster'); var http = require('http'); var numCPUs = require('os').cpus().length; // 获取CPU的个数 if

  • node.js使用cluster实现多进程

    首先郑重声明: nodeJS 是一门单线程!异步!非阻塞语言! nodeJS 是一门单线程!异步!非阻塞语言! nodeJS 是一门单线程!异步!非阻塞语言! 重要的事情说3遍. 因为nodeJS天生自带buff, 所以从一出生就受到 万千 粉丝的追捧(俺,也是它的死忠). 但是,傻逼php 竟然嘲笑 我大NodeJS 的性能. 说不稳定,不可靠,只能利用单核CPU. 辣鸡 nodeJS. 艹!艹!艹! 搞mo shi~ 但,大哥就是大哥,nodeJS在v0.8 的时候就已经加入了cluster

  • node 利用进程通信实现Cluster共享内存

    Node.js的标准API没有提供进程共享内存,然而通过IPC接口的send方法和对message事件的监听,就可以实现一个多进程之间的协同机制,通过通信来操作共享内存. ##IPC的基本用法: // worker进程 发送消息 process.send('读取共享内存'); // master进程 接收消息 -> 处理 -> 发送回信 cluster.on('online', function (worker) { // 有worker进程建立,即开始监听message事件 worker.o

  • 详解两个Node.js进程是如何通信

    目录 前言 不同电脑上的两个 Node.js 进程间通信 使用 TCP 套接字 使用 HTTP 协议 同一台电脑上两个 Node.js 进程间通信 使用内置 IPC 通道 使用自定义管道 总结 前言 两个 Node.js 进程之间如何进行通信呢?这里要分两种场景: 不同电脑上的两个 Node.js 进程间通信 同一台电脑上两个 Node.js 进程间通信 对于第一种场景,通常使用 TCP 或 HTTP 进行通信,而对于第二种场景,又分为两种子场景: Node.js 进程和自己创建的 Node.j

  • Linux进程通信(IPC)方式简介

    进程间通信的目的 数据传输:一个进程需要将它的数据发送给另一个进程,发送的数据量在一个字节到几兆字节之间.共享数据:多个进程想要操作共享数据,一个进程对共享数据的修改,别的进程应该立刻看到.通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程).资源共享:多个进程之间共享同样的资源.为了作到这一点,需要内核提供锁和同步机制.进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并

  • Linux之进程间通信(共享内存【mmap实现+系统V】)

    目录 共享内存 mmap()及其相关的系统调用 mmap() munmap() 共享内存的使用 命令管理共享内存 总结 共享内存 共享内存可以说是最有用的进程间通信方式,也是最快的IPC形式,两个不同的进程A.B共享内存的意思就是:同一块物理内存被映射到进程A.B各自的进程地址空间,进程A可以同时看到进程B对共享内存中数据的更新,反之亦然. 由于个多个进程共享同一块内存区域,必然需要某种同步机制.互斥锁和信号量都可以. 好处: 效率高,进程可以直接读写内存,而不需要复制任何数据,而管道.消息队列

  • Linux共享内存实现机制的详解

    Linux共享内存实现机制的详解 内存共享: 两个不同进程A.B共享内存的意思是,同一块物理内存被映射到进程A.B各自的进程地址空间.进程A可以即时看到进程B对共享内存中数据的更新,反之亦然.由于多个进程共享同一块内存区域,必然需要某种同步机制,互斥锁和信号量都可以. 效率: 采用共享内存通信的一个显而易见的好处是效率高,因为进程可以直接读写内存,而不需要任何数据的拷贝.对于像管道和消息队列等通信方式,则需要在内核和用户空间进行四次的数据拷贝,而共享内存则只拷贝两次数据[1]: 一次从输入文件到

  • 详解Linux进程间通信——使用共享内存

    一.什么是共享内存 顾名思义,共享内存就是允许两个不相关的进程访问同一个逻辑内存.共享内存是在两个正在运行的进程之间共享和传递数据的一种非常有效的方式.不同进程之间共享的内存通常安排为同一段物理内存.进程可以将同一段共享内存连接到它们自己的地址空间中,所有进程都可以访问共享内存中的地址,就好像它们是由用C语言函数malloc分配的内存一样.而如果某个进程向共享内存写入数据,所做的改动将立即影响到可以访问同一段共享内存的任何其他进程. 特别提醒:共享内存并未提供同步机制,也就是说,在第一个进程结束

  • nginx共享内存机制详解

    nginx的共享内存,是其能够实现高性能的主要原因之一,而其主要是用于对文件的缓存.本文首先会讲解共享内存的使用方式,然后会讲解nginx是如何实现共享内存的管理的. 1. 使用示例 nginx声明共享内存的指令为: proxy_cache_path /Users/Mike/nginx-cache levels=1:2 keys_zone=one:10m max_size=10g inactive=60m use_temp_path=off; 这里只是声明的一个名称为one,最大可用内存为10g

  • 详解Android Ashmem匿名共享内存

    目录 1. 简述 2. 创建 MemoryFile 和 数据写入 3. 将文件描述符传递到其他进程 4. 在其他进程接收 FileDescriptor 并读取数据 1. 简述 Android 的 匿名共享内存(Ashmem) 基于 Linux 的共享内存,都是在临时文件系统(tmpfs)上创建虚拟文件,再映射到不同的进程.它可以让多个进程操作同一块内存区域,并且除了物理内存限制,没有其他大小限制.相对于 Linux 的共享内存,Ashmem 对内存的管理更加精细化,并且添加了互斥锁.Java 层

  • C++共享内存删除的陷阱

    文章转自微信公众号:CPP开发前沿 当进程结束使用共享内存区时,要通过函数 shmdt 断开与共享内存区的连接.该函数声明在 sys/shm.h 中,其原型如下: int shmdt(const void *shmaddr); 参数 shmaddr 是 shmat 函数的返回值. 进程脱离共享内存区后,数据结构 shmid_ds 中的 shm_nattch 就会减 1 .但是共享段内存依然存在,只有 shm_attch 为 0 后,即没有任何进程再使用该共享内存区,共享内存区才在内核中被删除.一

  • PHP进程通信基础之信号量与共享内存通信

    由于进程之间谁先执行并不确定,这取决于内核的进程调度算法,其中比较复杂.由此有可能多进程在相同的时间内同时访问共享内存,从而造成不可预料的错误.信号量这个名字起的令人莫名其妙,但是看其英文原意,就十分容易理解. semaphore 英[ˈseməfɔ:(r)] vt. 发出信号,打旗语; 类似于指挥官的作用. 下面我们看下一个伪代码信号量的使用. 1.创建信号量唯一标识符 $ftok = ftok(__FILE__, 'a'); 2.创建信号量资源ID $sem_resouce_id = sem

随机推荐