在Node.js中的cluster怎么使用
发布时间:2023-07-26 11:30:18 所属栏目:教程 来源:转载
导读: 为大家详细介绍“Node.js中的cluster怎么使用”,内容详细,步骤清晰,细节处理妥当,希望这篇“Node.js中的cluster怎么使用”文章能帮助大家解决疑惑,下面跟着小
为大家详细介绍“Node.js中的cluster怎么使用”,内容详细,步骤清晰,细节处理妥当,希望这篇“Node.js中的cluster怎么使用”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。 当初使用 cluster 时,一直好奇它是怎么做到多个子进程监听同一个端口而不冲突的,比如下面这段代码: const cluster = require('cluster') const net = require('net') const cpus = require('os').cpus() if (cluster.isPrimary) { for (let i = 0; i < cpus.length; i++) { cluster.fork() } } else { net .createServer(function (socket) { socket.on('data', function (data) { socket.write(`Reply from ${process.pid}: ` + data.toString()) }) socket.on('end', function () { console.log('Close') }) socket.write('Hello!\n') }) .listen(9999) } 该段代码通过父进程 fork 出了多个子进程,且这些子进程都监听了 9999 这个端口并能正常提供服务,这是如何做到的呢?我们来研究一下。 准备调试环境 学习 Node.js 官方提供库最好的方式当然是调试一下,所以,我们先来准备一下环境。注:本文的操作系统为 macOS Big Sur 11.6.6,其他系统请自行准备相应环境。 编译 Node.js 下载 Node.js 源码 git clone https://github.com/nodejs/node.git 然后在下面这两个地方加入断点,方便后面调试用: // lib/internal/cluster/primary.js function queryServer(worker, message) { debugger; // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return; ... } // lib/internal/cluster/child.js send(message, (reply, handle) => { debugger if (typeof obj._setServerData === 'function') obj._setServerData(reply.data) if (handle) { // Shared listen socket shared(reply, {handle, indexesKey, index}, cb) } else { // Round-robin. rr(reply, {indexesKey, index}, cb) } }) 进入目录,执行 ./configure --debug make -j4 之后会生成 out/Debug/node 准备 IDE 环境 使用 vscode 调试,配置好 launch.json 就可以了(其他 IDE 类似,请自行解决): { "version": "0.2.0", "configurations": [ { "name": "Debug C++", "type": "cppdbg", "program": "/Users/youxingzhi/ayou/node/out/Debug/node", "request": "launch", "args": ["/Users/youxingzhi/ayou/node/index.js"], "stopAtEntry": false, "cwd": "${workspaceFolder}", "environment": [], "externalConsole": false, "MIMode": "lldb" }, { "name": "Debug Node", "type": "node", "runtimeExecutable": "/Users/youxingzhi/ayou/node/out/Debug/node", "request": "launch", "args": ["--expose-internals", "--nolazy"], "skipFiles": [], "program": "${workspaceFolder}/index.js" } ] } 其中第一个是用于调式 C++ 代码(需要安装 C/C++ 插件),第二个用于调式 JS 代码。接下来就可以开始调试了,我们暂时用调式 JS 代码的那个配置就好了。 Cluster 源码调试 准备好调试代码(为了调试而已,这里启动一个子进程就够了): debugger const cluster = require('cluster') const net = require('net') if (cluster.isPrimary) { debugger cluster.fork() } else { const server = net.createServer(function (socket) { socket.on('data', function (data) { socket.write(`Reply from ${process.pid}: ` + data.toString()) }) socket.on('end', function () { console.log('Close') }) socket.write('Hello!\n') }) debugger server.listen(9999) } 很明显,我们的程序可以分父进程和子进程这两部分来进行分析。 首先进入的是父进程: 执行 require('cluster') 时,会进入 lib/cluster.js 这个文件: const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'primary' module.exports = require(`internal/cluster/${childOrPrimary}`) 会根据当前 process.env 上是否有 NODE_UNIQUE_ID 来引入不同的模块,此时是没有的,所以会引入 internal/cluster/primary.js 这个模块: ... const cluster = new EventEmitter(); ... module.exports = cluster const handles = new SafeMap() cluster.isWorker = false cluster.isMaster = true // Deprecated alias. Must be same as isPrimary. cluster.isPrimary = true cluster.Worker = Worker cluster.workers = {} cluster.settings = {} cluster.SCHED_NONE = SCHED_NONE // Leave it to the operating system. cluster.SCHED_RR = SCHED_RR // Primary distributes connections. ... cluster.schedulingPolicy = schedulingPolicy cluster.setupPrimary = function (options) { ... } // Deprecated alias must be same as setupPrimary cluster.setupMaster = cluster.setupPrimary function setupSettingsNT(settings) { ... } function createWorkerProcess(id, env) { ... } function removeWorker(worker) { ... } function removeHandlesForWorker(worker) { ... } cluster.fork = function (env) { ... } 该模块主要是在 cluster 对象上挂载了一些属性和方法,并导出,这些后面回过头再看,我们继续往下调试。往下调试会进入 if (cluster.isPrimary) 分支,代码很简单,仅仅是 fork 出了一个新的子进程而已: // lib/internal/cluster/primary.js cluster.fork = function (env) { cluster.setupPrimary() const id = ++ids const workerProcess = createWorkerProcess(id, env) const worker = new Worker({ id: id, process: workerProcess, }) ... worker.process.on('internalMessage', internal(worker, onmessage)) process.nextTick(emitForkNT, worker) cluster.workers[worker.id] = worker return worker } cluster.setupPrimary():比较简单,初始化一些参数啥的。 createWorkerProcess(id, env): // lib/internal/cluster/primary.js function createWorkerProcess(id, env) { const workerEnv = {...process.env, ...env, NODE_UNIQUE_ID: `${id}`} const execArgv = [...cluster.settings.execArgv] ... return fork(cluster.settings.exec, cluster.settings.args, { cwd: cluster.settings.cwd, env: workerEnv, serialization: cluster.settings.serialization, silent: cluster.settings.silent, windowsHide: cluster.settings.windowsHide, execArgv: execArgv, stdio: cluster.settings.stdio, gid: cluster.settings.gid, uid: cluster.settings.uid, }) } 可以看到,该方法主要是通过 fork 启动了一个子进程来执行我们的 index.js,且启动子进程的时候设置了环境变量 NODE_UNIQUE_ID,这样 index.js 中 require('cluster') 的时候,引入的就是 internal/cluster/child.js 模块了。 worker.process.on('internalMessage', internal(worker, onmessage)):监听子进程传递过来的消息并处理。 接下来就进入了子进程的逻辑: 前面说了,此时引入的是 internal/cluster/child.js 模块,我们先跳过,继续往下,执行 server.listen(9999) 时实际上是调用了 Server 上的方法: // lib/net.js Server.prototype.listen = function (...args) { ... listenInCluster( this, null, options.port | 0, 4, backlog, undefined, options.exclusive ); } 可以看到,最终是调用了 listenInCluster: // lib/net.js function listenInCluster( server, address, port, addressType, backlog, fd, exclusive, flags, options ) { exclusive = !!exclusive if (cluster === undefined) cluster = require('cluster') if (cluster.isPrimary || exclusive) { // Will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags) return } const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, backlog, ...options, } // Get the primary's server handle, and listen on it cluster._getServer(server, serverQuery, listenOnPrimaryHandle) function listenOnPrimaryHandle(err, handle) { err = checkBindError(err, port, handle) if (err) { const ex = exceptionWithHostPort(err, 'bind', address, port) return server.emit('error', ex) } // Reuse primary's server handle server._handle = handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags) } } 由于是在子进程中执行,所以最后会调用 cluster._getServer(server, serverQuery, listenOnPrimaryHandle): // lib/internal/cluster/child.js // 这里的 cb 就是上面的 listenOnPrimaryHandle cluster._getServer = function (obj, options, cb) { ... send(message, (reply, handle) => { debugger if (typeof obj._setServerData === 'function') obj._setServerData(reply.data) if (handle) { // Shared listen socket shared(reply, {handle, indexesKey, index}, cb) } else { // Round-robin. rr(reply, {indexesKey, index}, cb) } }) ... } 该函数最终会向父进程发送 queryServer 的消息,父进程处理完后会调用回调函数,回调函数中会调用 cb 即 listenOnPrimaryHandle。看来,listen 的逻辑是在父进程中进行的了。 接下来进入父进程: 父进程收到 queryServer 的消息后,最终会调用 queryServer 这个方法: // lib/internal/cluster/primary.js function queryServer(worker, message) { // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}` let handle = handles.get(key) if (handle === undefined) { let address = message.address // Find shortest path for unix sockets because of the ~100 byte limit if ( message.port < 0 && typeof address === 'string' && process.platform !== 'win32' ) { address = path.relative(process.cwd(), address) if (message.address.length < address.length) address = message.address } // UDP is exempt from round-robin connection balancing for what should // be obvious reasons: it's connectionless. There is nothing to send to // the workers except raw datagrams and that's pointless. if ( schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6' ) { handle = new SharedHandle(key, address, message) } else { handle = new RoundRobinHandle(key, address, message) } handles.set(key, handle) } ... } 可以看到,这里主要是对 handle 的处理,这里的 handle 指的是调度策略,分为 SharedHandle 和 RoundRobinHandle,分别对应抢占式和轮询两种策略(文章最后补充部分有关于两者对比的例子)。 Node.js 中默认是 RoundRobinHandle 策略,可通过环境变量 NODE_CLUSTER_SCHED_POLICY 来修改,取值可以为 none(SharedHandle) 或 rr(RoundRobinHandle)。 SharedHandle 首先,我们来看一下 SharedHandle,由于我们这里是 TCP 协议,所以最后会通过 net._createServerHandle 创建一个 TCP 对象挂载在 handle 属性上(注意这里又有一个 handle,别搞混了): // lib/internal/cluster/shared_handle.js function SharedHandle(key, address, {port, addressType, fd, flags}) { this.key = key this.workers = new SafeMap() this.handle = null this.errno = 0 let rval if (addressType === 'udp4' || addressType === 'udp6') rval = dgram._createSocketHandle(address, port, addressType, fd, flags) else rval = net._createServerHandle(address, port, addressType, fd, flags) if (typeof rval === 'number') this.errno = rval else this.handle = rval } 在 createServerHandle 中除了创建 TCP 对象外,还绑定了端口和地址: // lib/net.js function createServerHandle(address, port, addressType, fd, flags) { ... } else { handle = new TCP(TCPConstants.SERVER); isTCP = true; } if (address || port || isTCP) { ... err = handle.bind6(address, port, flags); } else { err = handle.bind(address, port); } } ... return handle; } 然后,queryServer 中继续执行,会调用 add 方法,最终会将 handle 也就是 TCP 对象传递给子进程: // lib/internal/cluster/primary.js function queryServer(worker, message) { ... if (!handle.data) handle.data = message.data // Set custom server data handle.add(worker, (errno, reply, handle) => { const {data} = handles.get(key) if (errno) handles.delete(key) // Gives other workers a chance to retry. send( worker, { errno, key, ack: message.seq, data, ...reply, }, handle // TCP 对象 ) }) ... } 之后进入子进程: 子进程收到父进程对于 queryServer 的回复后,会调用 shared: // lib/internal/cluster/child.js // `obj` is a net#Server or a dgram#Socket object. cluster._getServer = function (obj, options, cb) { ... send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data) if (handle) { // Shared listen socket shared(reply, {handle, indexesKey, index}, cb) } else { // Round-robin. rr(reply, {indexesKey, index}, cb) // cb 是 listenOnPrimaryHandle } }) ... } shared 中最后会调用 cb 也就是 listenOnPrimaryHandle: // lib/net.js function listenOnPrimaryHandle(err, handle) { err = checkBindError(err, port, handle) if (err) { const ex = exceptionWithHostPort(err, 'bind', address, port) return server.emit('error', ex) } // Reuse primary's server handle 这里的 server 是 index.js 中 net.createServer 返回的那个对象 server._handle = handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags) } 这里会把 handle 赋值给 server._handle,这里的 server 是 index.js 中 net.createServer 返回的那个对象,并调用 server._listen2,也就是 setupListenHandle: // lib/net.js function setupListenHandle(address, port, addressType, backlog, fd, flags) { debug('setupListenHandle', address, port, addressType, backlog, fd) // If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (this._handle) { debug('setupListenHandle: have a handle already') } else { ... } this[async_id_symbol] = getNewAsyncId(this._handle) this._handle.onconnection = onconnection this._handle[owner_symbol] = this // Use a backlog of 512 entries. We pass 511 to the listen() call because // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1); // which will thus give us a backlog of 512 entries. const err = this._handle.listen(backlog || 511) if (err) { const ex = uvExceptionWithHostPort(err, 'listen', address, port) this._handle.close() this._handle = null defaultTriggerAsyncIdScope( this[async_id_symbol], process.nextTick, emitErrorNT, this, ex ) return } } 首先会执行 this._handle.onconnection = onconnection,由于客户端请求过来时会调用 this._handle(也就是 TCP 对象)上的 onconnection 方法,也就是会执行lib/net.js 中的 onconnection 方法建立连接,之后就可以通信了。为了控制篇幅,该方法就不继续往下了。 然后调用 listen 监听,注意这里参数 backlog 跟之前不同,不是表示端口,而是表示在拒绝连接之前,操作系统可以挂起的最大连接数量,也就是连接请求的排队数量。我们平时遇到的 listen EADDRINUSE: address already in use 错误就是因为这行代码返回了非 0 的错误。 如果还有其他子进程,也会同样走一遍上述的步骤,不同之处是在主进程中 queryServer 时,由于已经有 handle 了,不需要再重新创建了: function queryServer(worker, message) { debugger; // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return; const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}`; let handle = handles.get(key); ... } (编辑:晋中站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
推荐文章
站长推荐