| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- /**
- * A wrapper around a third-party child process worker pool implementation.
- * Used by {@link module:buffered-runner}.
- * @private
- * @module buffered-worker-pool
- */
- 'use strict';
- /**
- * @typedef {import('workerpool').WorkerPoolOptions} WorkerPoolOptions
- * @typedef {import('../types.d.ts').MochaOptions} MochaOptions
- * @typedef {import('../types.d.ts').SerializedWorkerResult} SerializedWorkerResult
- */
- const serializeJavascript = require('serialize-javascript');
- const workerpool = require('workerpool');
- const {deserialize} = require('./serializer');
- const debug = require('debug')('mocha:parallel:buffered-worker-pool');
- const {createInvalidArgumentTypeError} = require('../errors');
- const WORKER_PATH = require.resolve('./worker.js');
- /**
- * A mapping of Mocha `Options` objects to serialized values.
- *
- * This is helpful because we tend to same the same options over and over
- * over IPC.
- * @type {WeakMap<MochaOptions,string>}
- */
- let optionsCache = new WeakMap();
- /**
- * These options are passed into the [workerpool](https://npm.im/workerpool) module.
- * @type {Partial<WorkerPoolOptions>}
- */
- const WORKER_POOL_DEFAULT_OPTS = {
- // use child processes, not worker threads!
- workerType: 'process',
- // ensure the same flags sent to `node` for this `mocha` invocation are passed
- // along to children
- forkOpts: {execArgv: process.execArgv},
- maxWorkers: workerpool.cpus - 1
- };
- /**
- * A wrapper around a third-party worker pool implementation.
- * @private
- */
- class BufferedWorkerPool {
- /**
- * Creates an underlying worker pool instance; determines max worker count
- * @param {Partial<WorkerPoolOptions>} [opts] - Options
- */
- constructor(opts = {}) {
- const maxWorkers = Math.max(
- 1,
- typeof opts.maxWorkers === 'undefined'
- ? WORKER_POOL_DEFAULT_OPTS.maxWorkers
- : opts.maxWorkers
- );
- /* istanbul ignore next */
- if (workerpool.cpus < 2) {
- // TODO: decide whether we should warn
- debug(
- 'not enough CPU cores available to run multiple jobs; avoid --parallel on this machine'
- );
- } else if (maxWorkers >= workerpool.cpus) {
- // TODO: decide whether we should warn
- debug(
- '%d concurrent job(s) requested, but only %d core(s) available',
- maxWorkers,
- workerpool.cpus
- );
- }
- /* istanbul ignore next */
- debug(
- 'run(): starting worker pool of max size %d, using node args: %s',
- maxWorkers,
- process.execArgv.join(' ')
- );
- let counter = 0;
- const onCreateWorker = ({forkOpts}) => {
- return {
- forkOpts: {
- ...forkOpts,
- // adds an incremental id to all workers, which can be useful to allocate resources for each process
- env: {...process.env, MOCHA_WORKER_ID: counter++}
- }
- };
- };
- this.options = {
- ...WORKER_POOL_DEFAULT_OPTS,
- ...opts,
- maxWorkers,
- onCreateWorker
- };
- this._pool = workerpool.pool(WORKER_PATH, this.options);
- }
- /**
- * Terminates all workers in the pool.
- * @param {boolean} [force] - Whether to force-kill workers. By default, lets workers finish their current task before termination.
- * @private
- * @returns {Promise<void>}
- */
- async terminate(force = false) {
- /* istanbul ignore next */
- debug('terminate(): terminating with force = %s', force);
- return this._pool.terminate(force);
- }
- /**
- * Adds a test file run to the worker pool queue for execution by a worker process.
- *
- * Handles serialization/deserialization.
- *
- * @param {string} filepath - Filepath of test
- * @param {MochaOptions} [options] - Options for Mocha instance
- * @private
- * @returns {Promise<SerializedWorkerResult>}
- */
- async run(filepath, options = {}) {
- if (!filepath || typeof filepath !== 'string') {
- throw createInvalidArgumentTypeError(
- 'Expected a non-empty filepath',
- 'filepath',
- 'string'
- );
- }
- const serializedOptions = BufferedWorkerPool.serializeOptions(options);
- const result = await this._pool.exec('run', [filepath, serializedOptions]);
- return deserialize(result);
- }
- /**
- * Returns stats about the state of the worker processes in the pool.
- *
- * Used for debugging.
- *
- * @private
- */
- stats() {
- return this._pool.stats();
- }
- /**
- * Instantiates a {@link WorkerPool}.
- * @private
- */
- static create(...args) {
- return new BufferedWorkerPool(...args);
- }
- /**
- * Given Mocha options object `opts`, serialize into a format suitable for
- * transmission over IPC.
- *
- * @param {MochaOptions} [opts] - Mocha options
- * @private
- * @returns {string} Serialized options
- */
- static serializeOptions(opts = {}) {
- if (!optionsCache.has(opts)) {
- const serialized = serializeJavascript(opts, {
- unsafe: true, // this means we don't care about XSS
- ignoreFunction: true // do not serialize functions
- });
- optionsCache.set(opts, serialized);
- /* istanbul ignore next */
- debug(
- 'serializeOptions(): serialized options %O to: %s',
- opts,
- serialized
- );
- }
- return optionsCache.get(opts);
- }
- /**
- * Resets internal cache of serialized options objects.
- *
- * For testing/debugging
- * @private
- */
- static resetOptionsCache() {
- optionsCache = new WeakMap();
- }
- }
- exports.BufferedWorkerPool = BufferedWorkerPool;
|