parallel-buffered-runner.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. /**
  2. * A test Runner that uses a {@link module:buffered-worker-pool}.
  3. * @module parallel-buffered-runner
  4. * @private
  5. */
  6. 'use strict';
  7. /**
  8. * @typedef {import('../types.d.ts').FileRunner} FileRunner
  9. * @typedef {import('../types.d.ts').RunnerOptions} RunnerOptions
  10. * @typedef {import('../types.d.ts').SerializedWorkerResult} SerializedWorkerResult
  11. * @typedef {import('../types.d.ts').SigIntListener} SigIntListener
  12. */
  13. const Runner = require('../runner');
  14. const {EVENT_RUN_BEGIN, EVENT_RUN_END} = Runner.constants;
  15. const debug = require('debug')('mocha:parallel:parallel-buffered-runner');
  16. const {BufferedWorkerPool} = require('./buffered-worker-pool');
  17. const {setInterval, clearInterval} = global;
  18. const {createMap, constants} = require('../utils');
  19. const {MOCHA_ID_PROP_NAME} = constants;
  20. const {createFatalError} = require('../errors');
  21. const DEFAULT_WORKER_REPORTER = require.resolve(
  22. './reporters/parallel-buffered'
  23. );
  24. /**
  25. * List of options to _not_ serialize for transmission to workers
  26. */
  27. const DENY_OPTIONS = [
  28. 'globalSetup',
  29. 'globalTeardown',
  30. 'parallel',
  31. 'p',
  32. 'jobs',
  33. 'j'
  34. ];
  35. /**
  36. * Outputs a debug statement with worker stats
  37. * @param {BufferedWorkerPool} pool - Worker pool
  38. */
  39. /* istanbul ignore next */
  40. const debugStats = pool => {
  41. const {totalWorkers, busyWorkers, idleWorkers, pendingTasks} = pool.stats();
  42. debug(
  43. '%d/%d busy workers; %d idle; %d tasks queued',
  44. busyWorkers,
  45. totalWorkers,
  46. idleWorkers,
  47. pendingTasks
  48. );
  49. };
  50. /**
  51. * The interval at which we will display stats for worker processes in debug mode
  52. */
  53. const DEBUG_STATS_INTERVAL = 5000;
  54. const ABORTED = 'ABORTED';
  55. const IDLE = 'IDLE';
  56. const ABORTING = 'ABORTING';
  57. const RUNNING = 'RUNNING';
  58. const BAILING = 'BAILING';
  59. const BAILED = 'BAILED';
  60. const COMPLETE = 'COMPLETE';
  61. const states = createMap({
  62. [IDLE]: new Set([RUNNING, ABORTING]),
  63. [RUNNING]: new Set([COMPLETE, BAILING, ABORTING]),
  64. [COMPLETE]: new Set(),
  65. [ABORTED]: new Set(),
  66. [ABORTING]: new Set([ABORTED]),
  67. [BAILING]: new Set([BAILED, ABORTING]),
  68. [BAILED]: new Set([COMPLETE, ABORTING])
  69. });
  70. /**
  71. * This `Runner` delegates tests runs to worker threads. Does not execute any
  72. * {@link Runnable}s by itself!
  73. * @public
  74. */
  75. class ParallelBufferedRunner extends Runner {
  76. constructor(...args) {
  77. super(...args);
  78. let state = IDLE;
  79. Object.defineProperty(this, '_state', {
  80. get() {
  81. return state;
  82. },
  83. set(newState) {
  84. if (states[state].has(newState)) {
  85. state = newState;
  86. } else {
  87. throw new Error(`invalid state transition: ${state} => ${newState}`);
  88. }
  89. }
  90. });
  91. this._workerReporter = DEFAULT_WORKER_REPORTER;
  92. this._linkPartialObjects = false;
  93. this._linkedObjectMap = new Map();
  94. this.once(Runner.constants.EVENT_RUN_END, () => {
  95. this._state = COMPLETE;
  96. });
  97. }
  98. /**
  99. * Returns a mapping function to enqueue a file in the worker pool and return results of its execution.
  100. * @param {BufferedWorkerPool} pool - Worker pool
  101. * @param {RunnerOptions} options - Mocha options
  102. * @returns {FileRunner} Mapping function
  103. * @private
  104. */
  105. _createFileRunner(pool, options) {
  106. /**
  107. * Emits event and sets `BAILING` state, if necessary.
  108. * @param {Object} event - Event having `eventName`, maybe `data` and maybe `error`
  109. * @param {number} failureCount - Failure count
  110. */
  111. const emitEvent = (event, failureCount) => {
  112. this.emit(event.eventName, event.data, event.error);
  113. if (
  114. this._state !== BAILING &&
  115. event.data &&
  116. event.data._bail &&
  117. (failureCount || event.error)
  118. ) {
  119. debug('run(): nonzero failure count & found bail flag');
  120. // we need to let the events complete for this file, as the worker
  121. // should run any cleanup hooks
  122. this._state = BAILING;
  123. }
  124. };
  125. /**
  126. * Given an event, recursively find any objects in its data that have ID's, and create object references to already-seen objects.
  127. * @param {Object} event - Event having `eventName`, maybe `data` and maybe `error`
  128. */
  129. const linkEvent = event => {
  130. const stack = [{parent: event, prop: 'data'}];
  131. while (stack.length) {
  132. const {parent, prop} = stack.pop();
  133. const obj = parent[prop];
  134. let newObj;
  135. if (obj && typeof obj === 'object') {
  136. if (obj[MOCHA_ID_PROP_NAME]) {
  137. const id = obj[MOCHA_ID_PROP_NAME];
  138. newObj = this._linkedObjectMap.has(id)
  139. ? Object.assign(this._linkedObjectMap.get(id), obj)
  140. : obj;
  141. this._linkedObjectMap.set(id, newObj);
  142. parent[prop] = newObj;
  143. } else {
  144. throw createFatalError(
  145. 'Object missing ID received in event data',
  146. obj
  147. );
  148. }
  149. }
  150. Object.keys(newObj).forEach(key => {
  151. const value = obj[key];
  152. if (value && typeof value === 'object' && value[MOCHA_ID_PROP_NAME]) {
  153. stack.push({obj: value, parent: newObj, prop: key});
  154. }
  155. });
  156. }
  157. };
  158. return async file => {
  159. debug('run(): enqueueing test file %s', file);
  160. try {
  161. const {failureCount, events} = await pool.run(file, options);
  162. if (this._state === BAILED) {
  163. // short-circuit after a graceful bail. if this happens,
  164. // some other worker has bailed.
  165. // TODO: determine if this is the desired behavior, or if we
  166. // should report the events of this run anyway.
  167. return;
  168. }
  169. debug(
  170. 'run(): completed run of file %s; %d failures / %d events',
  171. file,
  172. failureCount,
  173. events.length
  174. );
  175. this.failures += failureCount; // can this ever be non-numeric?
  176. let event = events.shift();
  177. if (this._linkPartialObjects) {
  178. while (event) {
  179. linkEvent(event);
  180. emitEvent(event, failureCount);
  181. event = events.shift();
  182. }
  183. } else {
  184. while (event) {
  185. emitEvent(event, failureCount);
  186. event = events.shift();
  187. }
  188. }
  189. if (this._state === BAILING) {
  190. debug('run(): terminating pool due to "bail" flag');
  191. this._state = BAILED;
  192. await pool.terminate();
  193. }
  194. } catch (err) {
  195. if (this._state === BAILED || this._state === ABORTING) {
  196. debug(
  197. 'run(): worker pool terminated with intent; skipping file %s',
  198. file
  199. );
  200. } else {
  201. // this is an uncaught exception
  202. debug('run(): encountered uncaught exception: %O', err);
  203. if (this.allowUncaught) {
  204. // still have to clean up
  205. this._state = ABORTING;
  206. await pool.terminate(true);
  207. }
  208. throw err;
  209. }
  210. } finally {
  211. debug('run(): done running file %s', file);
  212. }
  213. };
  214. }
  215. /**
  216. * Listen on `Process.SIGINT`; terminate pool if caught.
  217. * Returns the listener for later call to `process.removeListener()`.
  218. * @param {BufferedWorkerPool} pool - Worker pool
  219. * @returns {SigIntListener} Listener
  220. * @private
  221. */
  222. _bindSigIntListener(pool) {
  223. const sigIntListener = async () => {
  224. debug('run(): caught a SIGINT');
  225. this._state = ABORTING;
  226. try {
  227. debug('run(): force-terminating worker pool');
  228. await pool.terminate(true);
  229. } catch (err) {
  230. console.error(
  231. `Error while attempting to force-terminate worker pool: ${err}`
  232. );
  233. process.exitCode = 1;
  234. } finally {
  235. process.nextTick(() => {
  236. debug('run(): imminent death');
  237. this._state = ABORTED;
  238. process.kill(process.pid, 'SIGINT');
  239. });
  240. }
  241. };
  242. process.once('SIGINT', sigIntListener);
  243. return sigIntListener;
  244. }
  245. /**
  246. * Runs Mocha tests by creating a thread pool, then delegating work to the
  247. * worker threads.
  248. *
  249. * Each worker receives one file, and as workers become available, they take a
  250. * file from the queue and run it. The worker thread execution is treated like
  251. * an RPC--it returns a `Promise` containing serialized information about the
  252. * run. The information is processed as it's received, and emitted to a
  253. * {@link Reporter}, which is likely listening for these events.
  254. *
  255. * @param {Function} callback - Called with an exit code corresponding to
  256. * number of test failures.
  257. * @param {RunnerOptions} [opts] - options
  258. */
  259. run(callback, {files, options = {}} = {}) {
  260. /**
  261. * Listener on `Process.SIGINT` which tries to cleanly terminate the worker pool.
  262. */
  263. let sigIntListener;
  264. // assign the reporter the worker will use, which will be different than the
  265. // main process' reporter
  266. options = {...options, reporter: this._workerReporter};
  267. // This function should _not_ return a `Promise`; its parent (`Runner#run`)
  268. // returns this instance, so this should do the same. However, we want to make
  269. // use of `async`/`await`, so we use this IIFE.
  270. (async () => {
  271. /**
  272. * This is an interval that outputs stats about the worker pool every so often
  273. */
  274. let debugInterval;
  275. /**
  276. * @type {BufferedWorkerPool}
  277. */
  278. let pool;
  279. try {
  280. pool = BufferedWorkerPool.create({maxWorkers: options.jobs});
  281. sigIntListener = this._bindSigIntListener(pool);
  282. /* istanbul ignore next */
  283. debugInterval = setInterval(
  284. () => debugStats(pool),
  285. DEBUG_STATS_INTERVAL
  286. ).unref();
  287. // this is set for uncaught exception handling in `Runner#uncaught`
  288. // TODO: `Runner` should be using a state machine instead.
  289. this.started = true;
  290. this._state = RUNNING;
  291. this.emit(EVENT_RUN_BEGIN);
  292. options = {...options};
  293. DENY_OPTIONS.forEach(opt => {
  294. delete options[opt];
  295. });
  296. const results = await Promise.allSettled(
  297. files.map(this._createFileRunner(pool, options))
  298. );
  299. // note that pool may already be terminated due to --bail
  300. await pool.terminate();
  301. results
  302. .filter(({status}) => status === 'rejected')
  303. .forEach(({reason}) => {
  304. if (this.allowUncaught) {
  305. // yep, just the first one.
  306. throw reason;
  307. }
  308. // "rejected" will correspond to uncaught exceptions.
  309. // unlike the serial runner, the parallel runner can always recover.
  310. this.uncaught(reason);
  311. });
  312. if (this._state === ABORTING) {
  313. return;
  314. }
  315. this.emit(EVENT_RUN_END);
  316. debug('run(): completing with failure count %d', this.failures);
  317. callback(this.failures);
  318. } catch (err) {
  319. // this `nextTick` takes us out of the `Promise` scope, so the
  320. // exception will not be caught and returned as a rejected `Promise`,
  321. // which would lead to an `unhandledRejection` event.
  322. process.nextTick(() => {
  323. debug('run(): re-throwing uncaught exception');
  324. throw err;
  325. });
  326. } finally {
  327. clearInterval(debugInterval);
  328. process.removeListener('SIGINT', sigIntListener);
  329. }
  330. })();
  331. return this;
  332. }
  333. /**
  334. * Toggle partial object linking behavior; used for building object references from
  335. * unique ID's.
  336. * @param {boolean} [value] - If `true`, enable partial object linking, otherwise disable
  337. * @returns {Runner}
  338. * @chainable
  339. * @public
  340. * @example
  341. * // this reporter needs proper object references when run in parallel mode
  342. * class MyReporter() {
  343. * constructor(runner) {
  344. * runner.linkPartialObjects(true)
  345. * .on(EVENT_SUITE_BEGIN, suite => {
  346. * // this Suite may be the same object...
  347. * })
  348. * .on(EVENT_TEST_BEGIN, test => {
  349. * // ...as the `test.parent` property
  350. * });
  351. * }
  352. * }
  353. */
  354. linkPartialObjects(value) {
  355. this._linkPartialObjects = Boolean(value);
  356. return super.linkPartialObjects(value);
  357. }
  358. /**
  359. * If this class is the `Runner` in use, then this is going to return `true`.
  360. *
  361. * For use by reporters.
  362. * @returns {true}
  363. * @public
  364. */
  365. isParallelMode() {
  366. return true;
  367. }
  368. /**
  369. * Configures an alternate reporter for worker processes to use. Subclasses
  370. * using worker processes should implement this.
  371. * @public
  372. * @param {string} path - Absolute path to alternate reporter for worker processes to use
  373. * @returns {Runner}
  374. * @throws When in serial mode
  375. * @chainable
  376. */
  377. workerReporter(reporter) {
  378. this._workerReporter = reporter;
  379. return this;
  380. }
  381. }
  382. module.exports = ParallelBufferedRunner;