buffered-worker-pool.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. /**
  2. * A wrapper around a third-party child process worker pool implementation.
  3. * Used by {@link module:buffered-runner}.
  4. * @private
  5. * @module buffered-worker-pool
  6. */
  7. 'use strict';
  8. /**
  9. * @typedef {import('workerpool').WorkerPoolOptions} WorkerPoolOptions
  10. * @typedef {import('../types.d.ts').MochaOptions} MochaOptions
  11. * @typedef {import('../types.d.ts').SerializedWorkerResult} SerializedWorkerResult
  12. */
  13. const serializeJavascript = require('serialize-javascript');
  14. const workerpool = require('workerpool');
  15. const {deserialize} = require('./serializer');
  16. const debug = require('debug')('mocha:parallel:buffered-worker-pool');
  17. const {createInvalidArgumentTypeError} = require('../errors');
  18. const WORKER_PATH = require.resolve('./worker.js');
  19. /**
  20. * A mapping of Mocha `Options` objects to serialized values.
  21. *
  22. * This is helpful because we tend to same the same options over and over
  23. * over IPC.
  24. * @type {WeakMap<MochaOptions,string>}
  25. */
  26. let optionsCache = new WeakMap();
  27. /**
  28. * These options are passed into the [workerpool](https://npm.im/workerpool) module.
  29. * @type {Partial<WorkerPoolOptions>}
  30. */
  31. const WORKER_POOL_DEFAULT_OPTS = {
  32. // use child processes, not worker threads!
  33. workerType: 'process',
  34. // ensure the same flags sent to `node` for this `mocha` invocation are passed
  35. // along to children
  36. forkOpts: {execArgv: process.execArgv},
  37. maxWorkers: workerpool.cpus - 1
  38. };
  39. /**
  40. * A wrapper around a third-party worker pool implementation.
  41. * @private
  42. */
  43. class BufferedWorkerPool {
  44. /**
  45. * Creates an underlying worker pool instance; determines max worker count
  46. * @param {Partial<WorkerPoolOptions>} [opts] - Options
  47. */
  48. constructor(opts = {}) {
  49. const maxWorkers = Math.max(
  50. 1,
  51. typeof opts.maxWorkers === 'undefined'
  52. ? WORKER_POOL_DEFAULT_OPTS.maxWorkers
  53. : opts.maxWorkers
  54. );
  55. /* istanbul ignore next */
  56. if (workerpool.cpus < 2) {
  57. // TODO: decide whether we should warn
  58. debug(
  59. 'not enough CPU cores available to run multiple jobs; avoid --parallel on this machine'
  60. );
  61. } else if (maxWorkers >= workerpool.cpus) {
  62. // TODO: decide whether we should warn
  63. debug(
  64. '%d concurrent job(s) requested, but only %d core(s) available',
  65. maxWorkers,
  66. workerpool.cpus
  67. );
  68. }
  69. /* istanbul ignore next */
  70. debug(
  71. 'run(): starting worker pool of max size %d, using node args: %s',
  72. maxWorkers,
  73. process.execArgv.join(' ')
  74. );
  75. let counter = 0;
  76. const onCreateWorker = ({forkOpts}) => {
  77. return {
  78. forkOpts: {
  79. ...forkOpts,
  80. // adds an incremental id to all workers, which can be useful to allocate resources for each process
  81. env: {...process.env, MOCHA_WORKER_ID: counter++}
  82. }
  83. };
  84. };
  85. this.options = {
  86. ...WORKER_POOL_DEFAULT_OPTS,
  87. ...opts,
  88. maxWorkers,
  89. onCreateWorker
  90. };
  91. this._pool = workerpool.pool(WORKER_PATH, this.options);
  92. }
  93. /**
  94. * Terminates all workers in the pool.
  95. * @param {boolean} [force] - Whether to force-kill workers. By default, lets workers finish their current task before termination.
  96. * @private
  97. * @returns {Promise<void>}
  98. */
  99. async terminate(force = false) {
  100. /* istanbul ignore next */
  101. debug('terminate(): terminating with force = %s', force);
  102. return this._pool.terminate(force);
  103. }
  104. /**
  105. * Adds a test file run to the worker pool queue for execution by a worker process.
  106. *
  107. * Handles serialization/deserialization.
  108. *
  109. * @param {string} filepath - Filepath of test
  110. * @param {MochaOptions} [options] - Options for Mocha instance
  111. * @private
  112. * @returns {Promise<SerializedWorkerResult>}
  113. */
  114. async run(filepath, options = {}) {
  115. if (!filepath || typeof filepath !== 'string') {
  116. throw createInvalidArgumentTypeError(
  117. 'Expected a non-empty filepath',
  118. 'filepath',
  119. 'string'
  120. );
  121. }
  122. const serializedOptions = BufferedWorkerPool.serializeOptions(options);
  123. const result = await this._pool.exec('run', [filepath, serializedOptions]);
  124. return deserialize(result);
  125. }
  126. /**
  127. * Returns stats about the state of the worker processes in the pool.
  128. *
  129. * Used for debugging.
  130. *
  131. * @private
  132. */
  133. stats() {
  134. return this._pool.stats();
  135. }
  136. /**
  137. * Instantiates a {@link WorkerPool}.
  138. * @private
  139. */
  140. static create(...args) {
  141. return new BufferedWorkerPool(...args);
  142. }
  143. /**
  144. * Given Mocha options object `opts`, serialize into a format suitable for
  145. * transmission over IPC.
  146. *
  147. * @param {MochaOptions} [opts] - Mocha options
  148. * @private
  149. * @returns {string} Serialized options
  150. */
  151. static serializeOptions(opts = {}) {
  152. if (!optionsCache.has(opts)) {
  153. const serialized = serializeJavascript(opts, {
  154. unsafe: true, // this means we don't care about XSS
  155. ignoreFunction: true // do not serialize functions
  156. });
  157. optionsCache.set(opts, serialized);
  158. /* istanbul ignore next */
  159. debug(
  160. 'serializeOptions(): serialized options %O to: %s',
  161. opts,
  162. serialized
  163. );
  164. }
  165. return optionsCache.get(opts);
  166. }
  167. /**
  168. * Resets internal cache of serialized options objects.
  169. *
  170. * For testing/debugging
  171. * @private
  172. */
  173. static resetOptionsCache() {
  174. optionsCache = new WeakMap();
  175. }
  176. }
  177. exports.BufferedWorkerPool = BufferedWorkerPool;