Node.js进程管理中,如何深入理解其内部机制与长尾效应?

2026-04-06 20:301阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计3113个文字,预计阅读时间需要13分钟。

Node.js进程管理中,如何深入理解其内部机制与长尾效应?

Node.js基于V8引擎,V8中的JavaScript是单线程运行的。这里的单线程指的是运行JavaScript代码时是单线程的,而不是指Node.js启动时只有一个线程。Node.js还有其他线程,例如用于执行异步IO操作的线程。

众所周知Node基于V8,而在V8中JavaScript是单线程运行的,这里的单线程不是指Node启动的时候就只有一个线程,而是说运行JavaScript代码是在单线程上,Node还有其他线程,比如进行异步IO操作的IO线程。这种单线程模型带来的好处就是系统调度过程中不会频繁进行上下文切换,提升了单核CPU的利用率。

但是这种做法有个缺陷,就是我们无法利用服务器CPU多核的性能,一个Node进程只能利用一个CPU。而且单线程模式下一旦代码崩溃就是整个程序崩溃。通常解决方案就是使用Node的cluster模块,通过master-worker模式启用多个进程实例。下面我们详细讲述下,Node如何使用多进程模型利用多核CPU,以及自带的cluster模块具体的工作原理。

如何创建子进程

node提供了child_process模块用来进行子进程的创建,该模块一共有四个方法用来创建子进程。

const { spawn, exec, execFile, fork } = require('child_process') spawn(command[, args][, options]) exec(command[, options][, callback]) execFile(file[, args][, options][, callback]) fork(modulePath[, args][, options])

spawn

首先认识一下spawn方法,下面是Node文档的官方实例。

const { spawn } = require('child_process'); const child = spawn('ls', ['-lh', '/home']); child.on('close', (code) => { console.log(`子进程退出码:$[code]`); }); const { stdin, stdout, stderr } = child stdout.on('data', (data) => { console.log(`stdout: ${data}`); }); stderr.on('data', (data) => { console.log(`stderr: ${data}`); });

通过spawn创建的子进程,继承自EventEmitter,所以可以在上面进行事件(discounterrorclosemessage)的监听。同时子进程具有三个输入输出流:stdin、stdout、stderr,通过这三个流,可以实时获取子进程的输入输出和错误信息。

这个方法的最终实现基于libuv,这里不再展开讨论,感兴趣可以查看源码。

// 调用libuv的api,初始化一个进程 int err = uv_spawn(env->event_loop(), &wrap->process_, &options);

exec/execFile

之所以把这两个放到一起,是因为exec最后调用的就是execFile方法。唯一的区别是,exec中调用的normalizeExecArgs方法会将opts的shell属性默认设置为true。

exports.exec = function exec(/* command , options, callback */) { const opts = normalizeExecArgs.apply(null, arguments); return exports.execFile(opts.file, opts.options, opts.callback); }; function normalizeExecArgs(command, options, callback) { options = { ...options }; options.shell = typeof options.shell === 'string' ? options.shell : true; return { options }; }

在execFile中,最终调用的是spawn方法。

exports.execFile = function execFile(file /* , args, options, callback */) { let args = []; let callback; let options; var child = spawn(file, args, { // ... some options }); return child; }

exec会将spawn的输入输出流转换成String,默认使用UTF-8的编码,然后传递给回调函数,使用回调方式在node中较为熟悉,比流更容易操作,所以我们能使用exec方法执行一些shell命令,然后在回调中获取返回值。有点需要注意,这里的buffer是有最大缓存区的,如果超出会直接被kill掉,可用通过maxBuffer属性进行配置(默认: 200*1024)。

const { exec } = require('child_process'); exec('ls -lh /home', (error, stdout, stderr) => { console.log(`stdout: ${stdout}`); console.log(`stderr: ${stderr}`); });

fork

fork最后也是调用spawn来创建子进程,但是fork是spawn的一种特殊情况,用于衍生新的 Node.js 进程,会产生一个新的V8实例,所以执行fork方法时需要指定一个js文件。

exports.fork = function fork(modulePath /* , args, options */) { // ... options.shell = false; return spawn(options.execPath, args, options); };

通过fork创建子进程之后,父子进程直接会创建一个IPC(进程间通信)通道,方便父子进程直接通信,在js层使用 process.send(message)process.on('message', msg => {}) 进行通信。而在底层,实现进程间通信的方式有很多,Node的进程间通信基于libuv实现,不同操作系统实现方式不一致。在*unix系统中采用Unix Domain Socket方式实现,Windows中使用命名管道的方式实现。

常见进程间通信方式:消息队列、共享内存、pipe、信号量、套接字

下面是一个父子进程通信的实例。

parent.js

const path = require('path') const { fork } = require('child_process') const child = fork(path.join(__dirname, 'child.js')) child.on('message', msg => { console.log('message from child', msg) }); child.send('hello child, I\'m master')

child.js

process.on('message', msg => { console.log('message from master:', msg) }); let counter = 0 setInterval(() => { process.send({ child: true, counter: counter++ }) }, 1000);

小结

其实可以看到,这些方法都是对spawn方法的复用,然后spawn方法底层调用了libuv进行进程的管理,具体可以看下图。

利用fork实现master-worker模型

首先来看看,如果我们在child.js中启动一个node; # 这里要和最上面upstream后的应用名一致,可以自定义 } } }

小结

如果我们自己用Node原生来实现一个多进程模型,存在这样或者那样的问题,虽然最终我们借助了nginx达到了这个目的,但是使用nginx的话,我们需要另外维护一套nginx的配置,而且如果有一个Node服务挂了,nginx并不知道,还是会将请求转发到那个端口。

cluster模块

除了用nginx做反向代理,node本身也提供了一个cluster模块,用于多核CPU环境下多进程的负载均衡。cluster模块创建子进程本质上是通过child_procee.fork,利用该模块可以很容易的创建共享同一端口的子进程服务器。

上手指南

有了这个模块,你会感觉实现Node的单机集群是多么容易的一件事情。下面看看官方实例,短短的十几行代码就实现了一个多进程的Node服务,且自带负载均衡。

const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { // 判断是否为主进程 console.log(`主进程 ${process.pid} 正在运行`); // 衍生工作进程。 for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', (worker, code, signal) => { console.log(`工作进程 ${worker.process.pid} 已退出`); }); } else { // 子进程进行服务器创建 // 工作进程可以共享任何 TCP 连接。 // 在本例子中,共享的是一个 HTTP 服务器。 http.createServer((req, res) => { res.writeHead(200); res.end('hello world\n'); }).listen(8000); console.log(`工作进程 ${process.pid} 已启动`); }

cluster模块源码分析

首先看代码,通过isMaster来判断是否为主进程,如果是主进程进行fork操作,子进程创建服务器。这里cluster进行fork操作时,执行的是当前文件。cluster.fork最终调用的child_process.fork,且第一个参数为process.argv.slice(2),在fork子进程之后,会对其internalMessage事件进行监听,这个后面会提到,具体代码如下:

const { fork } = require('child_process'); cluster.fork = function(env) { cluster.setupMaster(); const id = ++ids; const workerProcess = createWorkerProcess(id, env); const worker = new Worker({ id: id, process: workerProcess }); // 监听子进程的消息 worker.process.on('internalMessage', internal(worker, onmessage)); // ... }; // 配置master进程 cluster.setupMaster = function(options) { cluster.settings = { args: process.argv.slice(2), exec: process.argv[1], execArgv: process.execArgv, silent: false, ...cluster.settings, ...options }; }; // 创建子进程 function createWorkerProcess(id, env) { return fork(cluster.settings.exec, cluster.settings.args, { // some options }); }

子进程端口监听问题

这里会有一个问题,子进程全部都在监听同一个端口,我们之前已经试验过,服务监听同一个端口会出现端口占用的问题,那么cluster模块如何保证端口不冲突的呢? 查阅源码发现,http模块的createServer继承自net模块。

util.inherits(Server, net.Server);

而在net模块中,listen方法会调用listenInCluster方法,listenInCluster判断当前是否为master进程。

lib/net.js

Server.prototype.listen = function(...args) { // ... if (typeof options.port === 'number' || typeof options.port === 'string') { // 如果listen方法只传入了端口号,最后会走到这里 listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive); return this; } // ... }; function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) { if (cluster === undefined) cluster = require('cluster'); if (cluster.isMaster) { // 如果是主进程则启动一个服务 // 但是主进程没有调用过listen方法,所以没有走这里一步 server._listen2(address, port, addressType, backlog, fd, flags); return; } const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, }; // 子进程获取主进程服务的句柄 cluster._getServer(server, serverQuery, listenOnMasterHandle); function listenOnMasterHandle(err, handle) { server._handle = handle; // 重写handle,对listen方法进行了hack server._listen2(address, port, addressType, backlog, fd, flags); } }

看上面代码可以知道,真正启动服务的方法为server._listen2。在_listen2方法中,最终调用的是_handle下的listen方法。

function setupListenHandle(address, port, addressType, backlog, fd, flags) { // ... this._handle.onconnection = onconnection; var err = this._handle.listen(backlog || 511); // ... } Server.prototype._listen2 = setupListenHandle; // legacy alias

那么cluster._getServer方法到底做了什么呢?

搜寻它的源码,首先向master进程发送了一个消息,消息类型为queryServer

// child.js cluster._getServer = function(obj, options, cb) { // ... const message = { act: 'queryServer', index, data: null, ...options }; // 发送消息到master进程,消息类型为 queryServer send(message, (reply, handle) => { rr(reply, indexesKey, cb); // Round-robin. }); // ... };

这里的rr方法,对前面提到的_handle.listen进行了hack,所有子进程的listen其实是不起作用的。

function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); var key = message.key; function listen(backlog) { // listen方法直接返回0,不再进行端口监听 return 0; } function close() { send({ act: 'close', key }); } function getsockname(out) { return 0; } const handle = { close, listen, ref: noop, unref: noop }; handles.set(key, handle); // 根据key将工作进程的 handle 进行缓存 cb(0, handle); } // 这里的cb回调就是前面_getServer方法传入的。 参考之前net模块的listen方法 function listenOnMasterHandle(err, handle) { server._handle = handle; // 重写handle,对listen方法进行了hack // 该方法调用后,会对handle绑定一个 onconnection 方法,最后会进行调用 server._listen2(address, port, addressType, backlog, fd, flags); }

主进程与子进程通信

那么到底在哪里对端口进行了监听呢?

前面提到过,fork子进程的时候,对子进程进行了internalMessage事件的监听。

worker.process.on('internalMessage', internal(worker, onmessage));

子进程向master进程发送消息,一般使用process.send方法,会被监听的message事件所接收。这里是因为发送的message指定了cmd: 'NODE_CLUSTER',只要cmd字段以NODE_开头,这样消息就会认为是内部通信,被internalMessage事件所接收。

Node.js进程管理中,如何深入理解其内部机制与长尾效应?

// child.js function send(message, cb) { return sendHelper(process, message, null, cb); } // utils.js function sendHelper(proc, message, handle, cb) { if (!proc.connected) return false; // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js message = { cmd: 'NODE_CLUSTER', ...message, seq }; if (typeof cb === 'function') callbacks.set(seq, cb); seq += 1; return proc.send(message, handle); }

master进程接收到消息后,根据act的类型开始执行不同的方法,这里act为queryServer。queryServer方法会构造一个key,如果这个key(规则主要为地址+端口+文件描述符)之前不存在,则对RoundRobinHandle构造函数进行了实例化,RoundRobinHandle构造函数中启动了一个TCP服务,并对之前指定的端口进行了监听。

// master.js const handles = new Map(); function onmessage(message, handle) { const worker = this; if (message.act === 'online') online(worker); else if (message.act === 'queryServer') queryServer(worker, message); // other act logic } function queryServer(worker, message) { // ... const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}`; var handle = handles.get(key); // 如果之前没有对该key进行实例化,则进行实例化 if (handle === undefined) { let address = message.address; // const RoundRobinHandle = require('internal/cluster/round_robin_handle'); var constructor = RoundRobinHandle; handle = new constructor(key, address, message.port, message.addressType, message.fd, message.flags); handles.set(key, handle); } // ... } // internal/cluster/round_robin_handle function RoundRobinHandle(key, address, port, addressType, fd, flags) { this.server = net.createServer(assert.fail); // 这里启动一个TCP服务器 this.server.listen({ port, host }); // TCP服务器启动时的事件 this.server.once('listening', () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); }); // ... }

可以看到TCP服务启动后,立马对connection事件进行了监听,会调用RoundRobinHandle的distribute方法。

// RoundRobinHandle this.handle.onconnection = (err, handle) => this.distribute(err, handle); // distribute 对工作进程进行分发 RoundRobinHandle.prototype.distribute = function(err, handle) { this.handles.push(handle); // 存入TCP服务的句柄 const worker = this.free.shift(); // 取出第一个工作进程 if (worker) this.handoff(worker); // 切换到工作进程 }; RoundRobinHandle.prototype.handoff = function(worker) { const handle = this.handles.shift(); // 获取TCP服务句柄 if (handle === undefined) { this.free.push(worker); // 将该工作进程重新放入队列中 return; } const message = { act: 'newconn', key: this.key }; // 向工作进程发送一个类型为 newconn 的消息以及TCP服务的句柄 sendHelper(worker.process, message, handle, (reply) => { if (reply.accepted) handle.close(); else this.distribute(0, handle); // 工作进程不能正常运行,启动下一个 this.handoff(worker); }); };

在子进程中也有对内部消息进行监听,在cluster/child.js中,有个cluster._setupWorker方法,该方法会对内部消息监听,该方法的在lib/internal/bootstrap/node.js中调用,这个文件是每次启动node命令后,由C++模块调用的。

链接

function startup() { // ... startExecution(); } function startExecution() { // ... prepareUserCodeExecution(); } function prepareUserCodeExecution() { if (process.argv[1] && process.env.NODE_UNIQUE_ID) { const cluster = NativeModule.require('cluster'); cluster._setupWorker(); delete process.env.NODE_UNIQUE_ID; } } startup()

下面看看_setupWorker方法做了什么。

cluster._setupWorker = function() { // ... process.on('internalMessage', internal(worker, onmessage)); function onmessage(message, handle) { // 如果act为 newconn 调用onconnection方法 if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') _disconnect.call(worker, true); } }; function onconnection(message, handle) { const key = message.key; const server = handles.get(key); const accepted = server !== undefined; send({ ack: message.seq, accepted }); if (accepted) server.onconnection(0, handle); // 调用net中的onconnection方法 }

最后子进程获取到客户端句柄后,调用net模块的onconnection,对Socket进行实例化,后面就与其他http请求的逻辑一致了,不再细讲。

至此,cluster模块的逻辑就走通了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持自由互联。

本文共计3113个文字,预计阅读时间需要13分钟。

Node.js进程管理中,如何深入理解其内部机制与长尾效应?

Node.js基于V8引擎,V8中的JavaScript是单线程运行的。这里的单线程指的是运行JavaScript代码时是单线程的,而不是指Node.js启动时只有一个线程。Node.js还有其他线程,例如用于执行异步IO操作的线程。

众所周知Node基于V8,而在V8中JavaScript是单线程运行的,这里的单线程不是指Node启动的时候就只有一个线程,而是说运行JavaScript代码是在单线程上,Node还有其他线程,比如进行异步IO操作的IO线程。这种单线程模型带来的好处就是系统调度过程中不会频繁进行上下文切换,提升了单核CPU的利用率。

但是这种做法有个缺陷,就是我们无法利用服务器CPU多核的性能,一个Node进程只能利用一个CPU。而且单线程模式下一旦代码崩溃就是整个程序崩溃。通常解决方案就是使用Node的cluster模块,通过master-worker模式启用多个进程实例。下面我们详细讲述下,Node如何使用多进程模型利用多核CPU,以及自带的cluster模块具体的工作原理。

如何创建子进程

node提供了child_process模块用来进行子进程的创建,该模块一共有四个方法用来创建子进程。

const { spawn, exec, execFile, fork } = require('child_process') spawn(command[, args][, options]) exec(command[, options][, callback]) execFile(file[, args][, options][, callback]) fork(modulePath[, args][, options])

spawn

首先认识一下spawn方法,下面是Node文档的官方实例。

const { spawn } = require('child_process'); const child = spawn('ls', ['-lh', '/home']); child.on('close', (code) => { console.log(`子进程退出码:$[code]`); }); const { stdin, stdout, stderr } = child stdout.on('data', (data) => { console.log(`stdout: ${data}`); }); stderr.on('data', (data) => { console.log(`stderr: ${data}`); });

通过spawn创建的子进程,继承自EventEmitter,所以可以在上面进行事件(discounterrorclosemessage)的监听。同时子进程具有三个输入输出流:stdin、stdout、stderr,通过这三个流,可以实时获取子进程的输入输出和错误信息。

这个方法的最终实现基于libuv,这里不再展开讨论,感兴趣可以查看源码。

// 调用libuv的api,初始化一个进程 int err = uv_spawn(env->event_loop(), &wrap->process_, &options);

exec/execFile

之所以把这两个放到一起,是因为exec最后调用的就是execFile方法。唯一的区别是,exec中调用的normalizeExecArgs方法会将opts的shell属性默认设置为true。

exports.exec = function exec(/* command , options, callback */) { const opts = normalizeExecArgs.apply(null, arguments); return exports.execFile(opts.file, opts.options, opts.callback); }; function normalizeExecArgs(command, options, callback) { options = { ...options }; options.shell = typeof options.shell === 'string' ? options.shell : true; return { options }; }

在execFile中,最终调用的是spawn方法。

exports.execFile = function execFile(file /* , args, options, callback */) { let args = []; let callback; let options; var child = spawn(file, args, { // ... some options }); return child; }

exec会将spawn的输入输出流转换成String,默认使用UTF-8的编码,然后传递给回调函数,使用回调方式在node中较为熟悉,比流更容易操作,所以我们能使用exec方法执行一些shell命令,然后在回调中获取返回值。有点需要注意,这里的buffer是有最大缓存区的,如果超出会直接被kill掉,可用通过maxBuffer属性进行配置(默认: 200*1024)。

const { exec } = require('child_process'); exec('ls -lh /home', (error, stdout, stderr) => { console.log(`stdout: ${stdout}`); console.log(`stderr: ${stderr}`); });

fork

fork最后也是调用spawn来创建子进程,但是fork是spawn的一种特殊情况,用于衍生新的 Node.js 进程,会产生一个新的V8实例,所以执行fork方法时需要指定一个js文件。

exports.fork = function fork(modulePath /* , args, options */) { // ... options.shell = false; return spawn(options.execPath, args, options); };

通过fork创建子进程之后,父子进程直接会创建一个IPC(进程间通信)通道,方便父子进程直接通信,在js层使用 process.send(message)process.on('message', msg => {}) 进行通信。而在底层,实现进程间通信的方式有很多,Node的进程间通信基于libuv实现,不同操作系统实现方式不一致。在*unix系统中采用Unix Domain Socket方式实现,Windows中使用命名管道的方式实现。

常见进程间通信方式:消息队列、共享内存、pipe、信号量、套接字

下面是一个父子进程通信的实例。

parent.js

const path = require('path') const { fork } = require('child_process') const child = fork(path.join(__dirname, 'child.js')) child.on('message', msg => { console.log('message from child', msg) }); child.send('hello child, I\'m master')

child.js

process.on('message', msg => { console.log('message from master:', msg) }); let counter = 0 setInterval(() => { process.send({ child: true, counter: counter++ }) }, 1000);

小结

其实可以看到,这些方法都是对spawn方法的复用,然后spawn方法底层调用了libuv进行进程的管理,具体可以看下图。

利用fork实现master-worker模型

首先来看看,如果我们在child.js中启动一个node; # 这里要和最上面upstream后的应用名一致,可以自定义 } } }

小结

如果我们自己用Node原生来实现一个多进程模型,存在这样或者那样的问题,虽然最终我们借助了nginx达到了这个目的,但是使用nginx的话,我们需要另外维护一套nginx的配置,而且如果有一个Node服务挂了,nginx并不知道,还是会将请求转发到那个端口。

cluster模块

除了用nginx做反向代理,node本身也提供了一个cluster模块,用于多核CPU环境下多进程的负载均衡。cluster模块创建子进程本质上是通过child_procee.fork,利用该模块可以很容易的创建共享同一端口的子进程服务器。

上手指南

有了这个模块,你会感觉实现Node的单机集群是多么容易的一件事情。下面看看官方实例,短短的十几行代码就实现了一个多进程的Node服务,且自带负载均衡。

const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { // 判断是否为主进程 console.log(`主进程 ${process.pid} 正在运行`); // 衍生工作进程。 for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', (worker, code, signal) => { console.log(`工作进程 ${worker.process.pid} 已退出`); }); } else { // 子进程进行服务器创建 // 工作进程可以共享任何 TCP 连接。 // 在本例子中,共享的是一个 HTTP 服务器。 http.createServer((req, res) => { res.writeHead(200); res.end('hello world\n'); }).listen(8000); console.log(`工作进程 ${process.pid} 已启动`); }

cluster模块源码分析

首先看代码,通过isMaster来判断是否为主进程,如果是主进程进行fork操作,子进程创建服务器。这里cluster进行fork操作时,执行的是当前文件。cluster.fork最终调用的child_process.fork,且第一个参数为process.argv.slice(2),在fork子进程之后,会对其internalMessage事件进行监听,这个后面会提到,具体代码如下:

const { fork } = require('child_process'); cluster.fork = function(env) { cluster.setupMaster(); const id = ++ids; const workerProcess = createWorkerProcess(id, env); const worker = new Worker({ id: id, process: workerProcess }); // 监听子进程的消息 worker.process.on('internalMessage', internal(worker, onmessage)); // ... }; // 配置master进程 cluster.setupMaster = function(options) { cluster.settings = { args: process.argv.slice(2), exec: process.argv[1], execArgv: process.execArgv, silent: false, ...cluster.settings, ...options }; }; // 创建子进程 function createWorkerProcess(id, env) { return fork(cluster.settings.exec, cluster.settings.args, { // some options }); }

子进程端口监听问题

这里会有一个问题,子进程全部都在监听同一个端口,我们之前已经试验过,服务监听同一个端口会出现端口占用的问题,那么cluster模块如何保证端口不冲突的呢? 查阅源码发现,http模块的createServer继承自net模块。

util.inherits(Server, net.Server);

而在net模块中,listen方法会调用listenInCluster方法,listenInCluster判断当前是否为master进程。

lib/net.js

Server.prototype.listen = function(...args) { // ... if (typeof options.port === 'number' || typeof options.port === 'string') { // 如果listen方法只传入了端口号,最后会走到这里 listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive); return this; } // ... }; function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) { if (cluster === undefined) cluster = require('cluster'); if (cluster.isMaster) { // 如果是主进程则启动一个服务 // 但是主进程没有调用过listen方法,所以没有走这里一步 server._listen2(address, port, addressType, backlog, fd, flags); return; } const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, }; // 子进程获取主进程服务的句柄 cluster._getServer(server, serverQuery, listenOnMasterHandle); function listenOnMasterHandle(err, handle) { server._handle = handle; // 重写handle,对listen方法进行了hack server._listen2(address, port, addressType, backlog, fd, flags); } }

看上面代码可以知道,真正启动服务的方法为server._listen2。在_listen2方法中,最终调用的是_handle下的listen方法。

function setupListenHandle(address, port, addressType, backlog, fd, flags) { // ... this._handle.onconnection = onconnection; var err = this._handle.listen(backlog || 511); // ... } Server.prototype._listen2 = setupListenHandle; // legacy alias

那么cluster._getServer方法到底做了什么呢?

搜寻它的源码,首先向master进程发送了一个消息,消息类型为queryServer

// child.js cluster._getServer = function(obj, options, cb) { // ... const message = { act: 'queryServer', index, data: null, ...options }; // 发送消息到master进程,消息类型为 queryServer send(message, (reply, handle) => { rr(reply, indexesKey, cb); // Round-robin. }); // ... };

这里的rr方法,对前面提到的_handle.listen进行了hack,所有子进程的listen其实是不起作用的。

function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); var key = message.key; function listen(backlog) { // listen方法直接返回0,不再进行端口监听 return 0; } function close() { send({ act: 'close', key }); } function getsockname(out) { return 0; } const handle = { close, listen, ref: noop, unref: noop }; handles.set(key, handle); // 根据key将工作进程的 handle 进行缓存 cb(0, handle); } // 这里的cb回调就是前面_getServer方法传入的。 参考之前net模块的listen方法 function listenOnMasterHandle(err, handle) { server._handle = handle; // 重写handle,对listen方法进行了hack // 该方法调用后,会对handle绑定一个 onconnection 方法,最后会进行调用 server._listen2(address, port, addressType, backlog, fd, flags); }

主进程与子进程通信

那么到底在哪里对端口进行了监听呢?

前面提到过,fork子进程的时候,对子进程进行了internalMessage事件的监听。

worker.process.on('internalMessage', internal(worker, onmessage));

子进程向master进程发送消息,一般使用process.send方法,会被监听的message事件所接收。这里是因为发送的message指定了cmd: 'NODE_CLUSTER',只要cmd字段以NODE_开头,这样消息就会认为是内部通信,被internalMessage事件所接收。

Node.js进程管理中,如何深入理解其内部机制与长尾效应?

// child.js function send(message, cb) { return sendHelper(process, message, null, cb); } // utils.js function sendHelper(proc, message, handle, cb) { if (!proc.connected) return false; // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js message = { cmd: 'NODE_CLUSTER', ...message, seq }; if (typeof cb === 'function') callbacks.set(seq, cb); seq += 1; return proc.send(message, handle); }

master进程接收到消息后,根据act的类型开始执行不同的方法,这里act为queryServer。queryServer方法会构造一个key,如果这个key(规则主要为地址+端口+文件描述符)之前不存在,则对RoundRobinHandle构造函数进行了实例化,RoundRobinHandle构造函数中启动了一个TCP服务,并对之前指定的端口进行了监听。

// master.js const handles = new Map(); function onmessage(message, handle) { const worker = this; if (message.act === 'online') online(worker); else if (message.act === 'queryServer') queryServer(worker, message); // other act logic } function queryServer(worker, message) { // ... const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}`; var handle = handles.get(key); // 如果之前没有对该key进行实例化,则进行实例化 if (handle === undefined) { let address = message.address; // const RoundRobinHandle = require('internal/cluster/round_robin_handle'); var constructor = RoundRobinHandle; handle = new constructor(key, address, message.port, message.addressType, message.fd, message.flags); handles.set(key, handle); } // ... } // internal/cluster/round_robin_handle function RoundRobinHandle(key, address, port, addressType, fd, flags) { this.server = net.createServer(assert.fail); // 这里启动一个TCP服务器 this.server.listen({ port, host }); // TCP服务器启动时的事件 this.server.once('listening', () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); }); // ... }

可以看到TCP服务启动后,立马对connection事件进行了监听,会调用RoundRobinHandle的distribute方法。

// RoundRobinHandle this.handle.onconnection = (err, handle) => this.distribute(err, handle); // distribute 对工作进程进行分发 RoundRobinHandle.prototype.distribute = function(err, handle) { this.handles.push(handle); // 存入TCP服务的句柄 const worker = this.free.shift(); // 取出第一个工作进程 if (worker) this.handoff(worker); // 切换到工作进程 }; RoundRobinHandle.prototype.handoff = function(worker) { const handle = this.handles.shift(); // 获取TCP服务句柄 if (handle === undefined) { this.free.push(worker); // 将该工作进程重新放入队列中 return; } const message = { act: 'newconn', key: this.key }; // 向工作进程发送一个类型为 newconn 的消息以及TCP服务的句柄 sendHelper(worker.process, message, handle, (reply) => { if (reply.accepted) handle.close(); else this.distribute(0, handle); // 工作进程不能正常运行,启动下一个 this.handoff(worker); }); };

在子进程中也有对内部消息进行监听,在cluster/child.js中,有个cluster._setupWorker方法,该方法会对内部消息监听,该方法的在lib/internal/bootstrap/node.js中调用,这个文件是每次启动node命令后,由C++模块调用的。

链接

function startup() { // ... startExecution(); } function startExecution() { // ... prepareUserCodeExecution(); } function prepareUserCodeExecution() { if (process.argv[1] && process.env.NODE_UNIQUE_ID) { const cluster = NativeModule.require('cluster'); cluster._setupWorker(); delete process.env.NODE_UNIQUE_ID; } } startup()

下面看看_setupWorker方法做了什么。

cluster._setupWorker = function() { // ... process.on('internalMessage', internal(worker, onmessage)); function onmessage(message, handle) { // 如果act为 newconn 调用onconnection方法 if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') _disconnect.call(worker, true); } }; function onconnection(message, handle) { const key = message.key; const server = handles.get(key); const accepted = server !== undefined; send({ ack: message.seq, accepted }); if (accepted) server.onconnection(0, handle); // 调用net中的onconnection方法 }

最后子进程获取到客户端句柄后,调用net模块的onconnection,对Socket进行实例化,后面就与其他http请求的逻辑一致了,不再细讲。

至此,cluster模块的逻辑就走通了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持自由互联。