| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- /**
- * A test Runner that uses a {@link module:buffered-worker-pool}.
- * @module parallel-buffered-runner
- * @private
- */
- 'use strict';
- /**
- * @typedef {import('../types.d.ts').FileRunner} FileRunner
- * @typedef {import('../types.d.ts').RunnerOptions} RunnerOptions
- * @typedef {import('../types.d.ts').SerializedWorkerResult} SerializedWorkerResult
- * @typedef {import('../types.d.ts').SigIntListener} SigIntListener
- */
- const Runner = require('../runner');
- const {EVENT_RUN_BEGIN, EVENT_RUN_END} = Runner.constants;
- const debug = require('debug')('mocha:parallel:parallel-buffered-runner');
- const {BufferedWorkerPool} = require('./buffered-worker-pool');
- const {setInterval, clearInterval} = global;
- const {createMap, constants} = require('../utils');
- const {MOCHA_ID_PROP_NAME} = constants;
- const {createFatalError} = require('../errors');
- const DEFAULT_WORKER_REPORTER = require.resolve(
- './reporters/parallel-buffered'
- );
- /**
- * List of options to _not_ serialize for transmission to workers
- */
- const DENY_OPTIONS = [
- 'globalSetup',
- 'globalTeardown',
- 'parallel',
- 'p',
- 'jobs',
- 'j'
- ];
- /**
- * Outputs a debug statement with worker stats
- * @param {BufferedWorkerPool} pool - Worker pool
- */
- /* istanbul ignore next */
- const debugStats = pool => {
- const {totalWorkers, busyWorkers, idleWorkers, pendingTasks} = pool.stats();
- debug(
- '%d/%d busy workers; %d idle; %d tasks queued',
- busyWorkers,
- totalWorkers,
- idleWorkers,
- pendingTasks
- );
- };
- /**
- * The interval at which we will display stats for worker processes in debug mode
- */
- const DEBUG_STATS_INTERVAL = 5000;
- const ABORTED = 'ABORTED';
- const IDLE = 'IDLE';
- const ABORTING = 'ABORTING';
- const RUNNING = 'RUNNING';
- const BAILING = 'BAILING';
- const BAILED = 'BAILED';
- const COMPLETE = 'COMPLETE';
- const states = createMap({
- [IDLE]: new Set([RUNNING, ABORTING]),
- [RUNNING]: new Set([COMPLETE, BAILING, ABORTING]),
- [COMPLETE]: new Set(),
- [ABORTED]: new Set(),
- [ABORTING]: new Set([ABORTED]),
- [BAILING]: new Set([BAILED, ABORTING]),
- [BAILED]: new Set([COMPLETE, ABORTING])
- });
- /**
- * This `Runner` delegates tests runs to worker threads. Does not execute any
- * {@link Runnable}s by itself!
- * @public
- */
- class ParallelBufferedRunner extends Runner {
- constructor(...args) {
- super(...args);
- let state = IDLE;
- Object.defineProperty(this, '_state', {
- get() {
- return state;
- },
- set(newState) {
- if (states[state].has(newState)) {
- state = newState;
- } else {
- throw new Error(`invalid state transition: ${state} => ${newState}`);
- }
- }
- });
- this._workerReporter = DEFAULT_WORKER_REPORTER;
- this._linkPartialObjects = false;
- this._linkedObjectMap = new Map();
- this.once(Runner.constants.EVENT_RUN_END, () => {
- this._state = COMPLETE;
- });
- }
- /**
- * Returns a mapping function to enqueue a file in the worker pool and return results of its execution.
- * @param {BufferedWorkerPool} pool - Worker pool
- * @param {RunnerOptions} options - Mocha options
- * @returns {FileRunner} Mapping function
- * @private
- */
- _createFileRunner(pool, options) {
- /**
- * Emits event and sets `BAILING` state, if necessary.
- * @param {Object} event - Event having `eventName`, maybe `data` and maybe `error`
- * @param {number} failureCount - Failure count
- */
- const emitEvent = (event, failureCount) => {
- this.emit(event.eventName, event.data, event.error);
- if (
- this._state !== BAILING &&
- event.data &&
- event.data._bail &&
- (failureCount || event.error)
- ) {
- debug('run(): nonzero failure count & found bail flag');
- // we need to let the events complete for this file, as the worker
- // should run any cleanup hooks
- this._state = BAILING;
- }
- };
- /**
- * Given an event, recursively find any objects in its data that have ID's, and create object references to already-seen objects.
- * @param {Object} event - Event having `eventName`, maybe `data` and maybe `error`
- */
- const linkEvent = event => {
- const stack = [{parent: event, prop: 'data'}];
- while (stack.length) {
- const {parent, prop} = stack.pop();
- const obj = parent[prop];
- let newObj;
- if (obj && typeof obj === 'object') {
- if (obj[MOCHA_ID_PROP_NAME]) {
- const id = obj[MOCHA_ID_PROP_NAME];
- newObj = this._linkedObjectMap.has(id)
- ? Object.assign(this._linkedObjectMap.get(id), obj)
- : obj;
- this._linkedObjectMap.set(id, newObj);
- parent[prop] = newObj;
- } else {
- throw createFatalError(
- 'Object missing ID received in event data',
- obj
- );
- }
- }
- Object.keys(newObj).forEach(key => {
- const value = obj[key];
- if (value && typeof value === 'object' && value[MOCHA_ID_PROP_NAME]) {
- stack.push({obj: value, parent: newObj, prop: key});
- }
- });
- }
- };
- return async file => {
- debug('run(): enqueueing test file %s', file);
- try {
- const {failureCount, events} = await pool.run(file, options);
- if (this._state === BAILED) {
- // short-circuit after a graceful bail. if this happens,
- // some other worker has bailed.
- // TODO: determine if this is the desired behavior, or if we
- // should report the events of this run anyway.
- return;
- }
- debug(
- 'run(): completed run of file %s; %d failures / %d events',
- file,
- failureCount,
- events.length
- );
- this.failures += failureCount; // can this ever be non-numeric?
- let event = events.shift();
- if (this._linkPartialObjects) {
- while (event) {
- linkEvent(event);
- emitEvent(event, failureCount);
- event = events.shift();
- }
- } else {
- while (event) {
- emitEvent(event, failureCount);
- event = events.shift();
- }
- }
- if (this._state === BAILING) {
- debug('run(): terminating pool due to "bail" flag');
- this._state = BAILED;
- await pool.terminate();
- }
- } catch (err) {
- if (this._state === BAILED || this._state === ABORTING) {
- debug(
- 'run(): worker pool terminated with intent; skipping file %s',
- file
- );
- } else {
- // this is an uncaught exception
- debug('run(): encountered uncaught exception: %O', err);
- if (this.allowUncaught) {
- // still have to clean up
- this._state = ABORTING;
- await pool.terminate(true);
- }
- throw err;
- }
- } finally {
- debug('run(): done running file %s', file);
- }
- };
- }
- /**
- * Listen on `Process.SIGINT`; terminate pool if caught.
- * Returns the listener for later call to `process.removeListener()`.
- * @param {BufferedWorkerPool} pool - Worker pool
- * @returns {SigIntListener} Listener
- * @private
- */
- _bindSigIntListener(pool) {
- const sigIntListener = async () => {
- debug('run(): caught a SIGINT');
- this._state = ABORTING;
- try {
- debug('run(): force-terminating worker pool');
- await pool.terminate(true);
- } catch (err) {
- console.error(
- `Error while attempting to force-terminate worker pool: ${err}`
- );
- process.exitCode = 1;
- } finally {
- process.nextTick(() => {
- debug('run(): imminent death');
- this._state = ABORTED;
- process.kill(process.pid, 'SIGINT');
- });
- }
- };
- process.once('SIGINT', sigIntListener);
- return sigIntListener;
- }
- /**
- * Runs Mocha tests by creating a thread pool, then delegating work to the
- * worker threads.
- *
- * Each worker receives one file, and as workers become available, they take a
- * file from the queue and run it. The worker thread execution is treated like
- * an RPC--it returns a `Promise` containing serialized information about the
- * run. The information is processed as it's received, and emitted to a
- * {@link Reporter}, which is likely listening for these events.
- *
- * @param {Function} callback - Called with an exit code corresponding to
- * number of test failures.
- * @param {RunnerOptions} [opts] - options
- */
- run(callback, {files, options = {}} = {}) {
- /**
- * Listener on `Process.SIGINT` which tries to cleanly terminate the worker pool.
- */
- let sigIntListener;
- // assign the reporter the worker will use, which will be different than the
- // main process' reporter
- options = {...options, reporter: this._workerReporter};
- // This function should _not_ return a `Promise`; its parent (`Runner#run`)
- // returns this instance, so this should do the same. However, we want to make
- // use of `async`/`await`, so we use this IIFE.
- (async () => {
- /**
- * This is an interval that outputs stats about the worker pool every so often
- */
- let debugInterval;
- /**
- * @type {BufferedWorkerPool}
- */
- let pool;
- try {
- pool = BufferedWorkerPool.create({maxWorkers: options.jobs});
- sigIntListener = this._bindSigIntListener(pool);
- /* istanbul ignore next */
- debugInterval = setInterval(
- () => debugStats(pool),
- DEBUG_STATS_INTERVAL
- ).unref();
- // this is set for uncaught exception handling in `Runner#uncaught`
- // TODO: `Runner` should be using a state machine instead.
- this.started = true;
- this._state = RUNNING;
- this.emit(EVENT_RUN_BEGIN);
- options = {...options};
- DENY_OPTIONS.forEach(opt => {
- delete options[opt];
- });
- const results = await Promise.allSettled(
- files.map(this._createFileRunner(pool, options))
- );
- // note that pool may already be terminated due to --bail
- await pool.terminate();
- results
- .filter(({status}) => status === 'rejected')
- .forEach(({reason}) => {
- if (this.allowUncaught) {
- // yep, just the first one.
- throw reason;
- }
- // "rejected" will correspond to uncaught exceptions.
- // unlike the serial runner, the parallel runner can always recover.
- this.uncaught(reason);
- });
- if (this._state === ABORTING) {
- return;
- }
- this.emit(EVENT_RUN_END);
- debug('run(): completing with failure count %d', this.failures);
- callback(this.failures);
- } catch (err) {
- // this `nextTick` takes us out of the `Promise` scope, so the
- // exception will not be caught and returned as a rejected `Promise`,
- // which would lead to an `unhandledRejection` event.
- process.nextTick(() => {
- debug('run(): re-throwing uncaught exception');
- throw err;
- });
- } finally {
- clearInterval(debugInterval);
- process.removeListener('SIGINT', sigIntListener);
- }
- })();
- return this;
- }
- /**
- * Toggle partial object linking behavior; used for building object references from
- * unique ID's.
- * @param {boolean} [value] - If `true`, enable partial object linking, otherwise disable
- * @returns {Runner}
- * @chainable
- * @public
- * @example
- * // this reporter needs proper object references when run in parallel mode
- * class MyReporter() {
- * constructor(runner) {
- * runner.linkPartialObjects(true)
- * .on(EVENT_SUITE_BEGIN, suite => {
- * // this Suite may be the same object...
- * })
- * .on(EVENT_TEST_BEGIN, test => {
- * // ...as the `test.parent` property
- * });
- * }
- * }
- */
- linkPartialObjects(value) {
- this._linkPartialObjects = Boolean(value);
- return super.linkPartialObjects(value);
- }
- /**
- * If this class is the `Runner` in use, then this is going to return `true`.
- *
- * For use by reporters.
- * @returns {true}
- * @public
- */
- isParallelMode() {
- return true;
- }
- /**
- * Configures an alternate reporter for worker processes to use. Subclasses
- * using worker processes should implement this.
- * @public
- * @param {string} path - Absolute path to alternate reporter for worker processes to use
- * @returns {Runner}
- * @throws When in serial mode
- * @chainable
- */
- workerReporter(reporter) {
- this._workerReporter = reporter;
- return this;
- }
- }
- module.exports = ParallelBufferedRunner;
|