详解通过源码解析Node.js中cluster模块的主要功能实现

众所周知,Node.js中的JavaScript代码执行在单线程中,非常脆弱,一旦出现了未捕获的异常,那么整个应用就会崩溃。这在许多场景下,尤其是web应用中,是无法忍受的。通常的解决方案,便是使用Node.js中自带的cluster模块,以master-worker模式启动多个应用实例。然而大家在享受cluster模块带来的福祉的同时,不少人也开始好奇:

  1. 为什么我的应用代码中明明有app.listen(port);,但cluter模块在多次fork这份代码时,却没有报端口已被占用?
  2. Master是如何将接收的请求传递至worker中进行处理然后响应的?

让我们从Node.js项目的lib/cluster.js中的代码里,来一勘究竟。

问题一

为了得到这个问题的解答,我们先从worker进程的初始化看起,master进程在fork工作进程时,会为其附上环境变量NODE_UNIQUE_ID,是一个从零开始的递增数:

// lib/cluster.js
// ...

function createWorkerProcess(id, env) {
 // ...
 workerEnv.NODE_UNIQUE_ID = '' + id;

 // ...
 return fork(cluster.settings.exec, cluster.settings.args, {
  env: workerEnv,
  silent: cluster.settings.silent,
  execArgv: execArgv,
  gid: cluster.settings.gid,
  uid: cluster.settings.uid
 });
}

随后Node.js在初始化时,会根据该环境变量,来判断该进程是否为cluster模块fork出的工作进程,若是,则执行workerInit()函数来初始化环境,否则执行masterInit()函数。

在workerInit()函数中,定义了cluster._getServer方法,这个方法在任何net.Server实例的listen方法中,会被调用:

// lib/net.js
// ...

function listen(self, address, port, addressType, backlog, fd, exclusive) {
 exclusive = !!exclusive;

 if (!cluster) cluster = require('cluster');

 if (cluster.isMaster || exclusive) {
  self._listen2(address, port, addressType, backlog, fd);
  return;
 }

 cluster._getServer(self, {
  address: address,
  port: port,
  addressType: addressType,
  fd: fd,
  flags: 0
 }, cb);

 function cb(err, handle) {
  // ...

  self._handle = handle;
  self._listen2(address, port, addressType, backlog, fd);
 }
}

你可能已经猜到,问题一的答案,就在这个cluster._getServer函数的代码中。它主要干了两件事:

  1. 向master进程注册该worker,若master进程是第一次接收到监听此端口/描述符下的worker,则起一个内部TCP服务器,来承担监听该端口/描述符的职责,随后在master中记录下该worker。
  2. Hack掉worker进程中的net.Server实例的listen方法里监听端口/描述符的部分,使其不再承担该职责。

对于第一件事,由于master在接收,传递请求给worker时,会符合一定的负载均衡规则(在非Windows平台下默认为轮询),这些逻辑被封装在RoundRobinHandle类中。故,初始化内部TCP服务器等操作也在此处:

// lib/cluster.js
// ...

function RoundRobinHandle(key, address, port, addressType, backlog, fd) {
 // ...
 this.handles = [];
 this.handle = null;
 this.server = net.createServer(assert.fail);

 if (fd >= 0)
  this.server.listen({ fd: fd });
 else if (port >= 0)
  this.server.listen(port, address);
 else
  this.server.listen(address); // UNIX socket path.

 /// ...
}

对于第二件事,由于net.Server实例的listen方法,最终会调用自身_handle属性下listen方法来完成监听动作,故在代码中修改之:

// lib/cluster.js
// ...

function rr(message, cb) {
 // ...
 // 此处的listen函数不再做任何监听动作
 function listen(backlog) {
  return 0;
 }

 function close() {
  // ...
 }
 function ref() {}
 function unref() {}

 var handle = {
  close: close,
  listen: listen,
  ref: ref,
  unref: unref,
 };
 // ...
 handles[key] = handle;
 cb(0, handle); // 传入这个cb中的handle将会被赋值给net.Server实例中的_handle属性
}

// lib/net.js
// ...
function listen(self, address, port, addressType, backlog, fd, exclusive) {
 // ...

 if (cluster.isMaster || exclusive) {
  self._listen2(address, port, addressType, backlog, fd);
  return; // 仅在worker环境下改变
 }

 cluster._getServer(self, {
  address: address,
  port: port,
  addressType: addressType,
  fd: fd,
  flags: 0
 }, cb);

 function cb(err, handle) {
  // ...
  self._handle = handle;
  // ...
 }
}

至此,第一个问题便已豁然开朗了,总结下:

  1. 端口仅由master进程中的内部TCP服务器监听了一次。
  2. 不会出现端口被重复监听报错,是由于,worker进程中,最后执行监听端口操作的方法,已被cluster模块主动hack。

问题二

解决了问题一,问题二的解决就明朗轻松许多了。通过问题一我们已得知,监听端口的是master进程中创建的内部TCP服务器,所以第二个问题的解决,着手点就是该内部TCP服务器接手连接时,执行的操作。Cluster模块的做法是,监听该内部TCP服务器的connection事件,在监听器函数里,有负载均衡地挑选出一个worker,向其发送newconn内部消息(消息体对象中包含cmd: 'NODE_CLUSTER'属性)以及一个客户端句柄(即connection事件处理函数的第二个参数),相关代码如下:

// lib/cluster.js
// ...

function RoundRobinHandle(key, address, port, addressType, backlog, fd) {
 // ...
 this.server = net.createServer(assert.fail);
 // ...

 var self = this;
 this.server.once('listening', function() {
  // ...
  self.handle.onconnection = self.distribute.bind(self);
 });
}

RoundRobinHandle.prototype.distribute = function(err, handle) {
 this.handles.push(handle);
 var worker = this.free.shift();
 if (worker) this.handoff(worker);
};

RoundRobinHandle.prototype.handoff = function(worker) {
 // ...
 var message = { act: 'newconn', key: this.key };
 var self = this;
 sendHelper(worker.process, message, handle, function(reply) {
  // ...
 });
};

Worker进程在接收到了newconn内部消息后,根据传递过来的句柄,调用实际的业务逻辑处理并返回:

// lib/cluster.js
// ...

// 该方法会在Node.js初始化时由 src/node.js 调用
cluster._setupWorker = function() {
 // ...
 process.on('internalMessage', internal(worker, onmessage));

 // ...
 function onmessage(message, handle) {
  if (message.act === 'newconn')
   onconnection(message, handle);
  // ...
 }
};

function onconnection(message, handle) {
 // ...
 var accepted = server !== undefined;
 // ...
 if (accepted) server.onconnection(0, handle);
}

至此,问题二也得到了解决,也总结一下:

  1. 所有请求先同一经过内部TCP服务器。
  2. 在内部TCP服务器的请求处理逻辑中,有负载均衡地挑选出一个worker进程,将其发送一个newconn内部消息,随消息发送客户端句柄。
  3. Worker进程接收到此内部消息,根据客户端句柄创建net.Socket实例,执行具体业务逻辑,返回。

最后

Node.js中的cluster模块除了上述提到的功能外,其实还提供了非常丰富的API供master和worker进程之前通信,对于不同的操作系统平台,也提供了不同的默认行为。本文仅挑选了一条功能线进行了分析阐述。如果大家有闲,非常推荐完整领略一下cluster模块的代码实现。

参考:

https://github.com/nodejs/node/blob/master/lib/cluster.js
https://github.com/nodejs/node/blob/master/lib/net.js

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • node.js中cluster的使用教程

    本文主要给大家介绍了关于node.js中cluster使用的相关教程,分享出来供大家参考学习,下面来看看详细的介绍: 一.使用NODE中cluster利用多核CPU var cluster = require('cluster'); var http = require('http'); var numCPUs = require('os').cpus().length; if (cluster.isMaster) { // 创建工作进程 for (var i = 0; i < numCPUs;

  • Node.js中多进程模块Cluster的介绍与使用

    前言 我们都知道nodejs最大的特点就是单进程.无阻塞运行,并且是异步事件驱动的.Nodejs的这些特性能够很好的解决一些问题,例如在服务器开发中,并发的请求处理是个大问题,阻塞式的函数会导致资源浪费和时间延迟.通过事件注册.异步函数,开发人员可以提高资源的利用率,性能也会改善.既然Node.js采用单进程.单线程模式,那么在如今多核硬件流行的环境中,单核性能出色的Nodejs如何利用多核CPU呢?创始人Ryan Dahl建议,运行多个Nodejs进程,利用某些通信机制来协调各项任务.目前,已

  • 深入剖析Node.js cluster模块

    cluster模块概览 node实例是单线程作业的.在服务端编程中,通常会创建多个node实例来处理客户端的请求,以此提升系统的吞吐率.对这样多个node实例,我们称之为cluster(集群). 借助node的cluster模块,开发者可以在几乎不修改原有项目代码的前提下,获得集群服务带来的好处. 集群有以下两种常见的实现方案,而node自带的cluster模块,采用了方案二. 方案一:多个node实例+多个端口 集群内的node实例,各自监听不同的端口,再由反向代理实现请求到多个端口的分发.

  • 浅谈node中的cluster集群

    结论 虽然平常通过设置为CPU进程数的工作进程,但是可以超过这个数,并且并不是主进程先创建 if (cluster.isMaster) { // 循环 fork 任务 CPU i5-7300HQ 四核四进程 for (let i = 0; i < 6; i++) { cluster.fork() } console.log(chalk.green(`主进程运行在${process.pid}`)) } else { app.listen(1314) // export app 一个 Koa 服务器

  • node 利用进程通信实现Cluster共享内存

    Node.js的标准API没有提供进程共享内存,然而通过IPC接口的send方法和对message事件的监听,就可以实现一个多进程之间的协同机制,通过通信来操作共享内存. ##IPC的基本用法: // worker进程 发送消息 process.send('读取共享内存'); // master进程 接收消息 -> 处理 -> 发送回信 cluster.on('online', function (worker) { // 有worker进程建立,即开始监听message事件 worker.o

  • Nodejs中解决cluster模块的多进程如何共享数据问题

    前述 nodejs在v0.6.x之后增加了一个模块cluster用于实现多进程,利用child_process模块来创建和管理进程,增加程序在多核CPU机器上的性能表现.本文将介绍利用cluster模块创建的多线程如何共享数据的问题. 进程间数据共享 首先举个简单的例子,代码如下: var cluster = require('cluster'); var data = 0;//这里定义数据不会被所有进程共享,各个进程有各自的内存区域 if (cluster.isMaster) { //主进程

  • node.js使用cluster实现多进程

    首先郑重声明: nodeJS 是一门单线程!异步!非阻塞语言! nodeJS 是一门单线程!异步!非阻塞语言! nodeJS 是一门单线程!异步!非阻塞语言! 重要的事情说3遍. 因为nodeJS天生自带buff, 所以从一出生就受到 万千 粉丝的追捧(俺,也是它的死忠). 但是,傻逼php 竟然嘲笑 我大NodeJS 的性能. 说不稳定,不可靠,只能利用单核CPU. 辣鸡 nodeJS. 艹!艹!艹! 搞mo shi~ 但,大哥就是大哥,nodeJS在v0.8 的时候就已经加入了cluster

  • 使用cluster 将自己的Node服务器扩展为多线程服务器

    用nodejs的朋友都有了解,node是单线程的,也就是说跑在8核CPU上,只能使用一个核的算力. 单线程一直是node的一个诟病,但随着0.6版本中引入cluster之后,这个情况则得到了改变,开发人员可以依靠cluster很轻松的将自己的Node服务器扩展为多线程服务器了. 什么是Cluster cluster是node提供的一个多线程库,用户可以使用它来创建多个线程,线程之间共享一个监听端口,当有外部请求这个端口时,cluster会将请求转发到随机线程里.因为每个node线程都会占用几十兆

  • Node学习记录之cluster模块

    在如今机器的CPU都是多核的背景下,Node的单线程设计已经没法更充分的"压榨"机器性能了.所以从v0.8开始,Node新增了一个内置模块--"cluster",故名思议,它可以通过一个父进程管理一坨子进程的方式来实现集群的功能. var cluster = require('cluster'); var http = require('http'); var numCPUs = require('os').cpus().length; // 获取CPU的个数 if

  • Node.js中的cluster模块深入解读

    预备知识 在如今机器的CPU都是多核的背景下,Node的单线程设计已经没法更充分的"压榨"机器性能了.所以从v0.8开始,Node新增了一个内置模块--"cluster",故名思议,它可以通过一个父进程管理一坨子进程的方式来实现集群的功能. 学习cluster之前,需要了解process相关的知识,如果不了解的话建议先阅读process模块.child_process模块. cluster借助child_process模块的fork()方法来创建子进程,通过fork

随机推荐