417 lines
12 KiB
JavaScript
417 lines
12 KiB
JavaScript
var Promise = require('./Promise');
|
|
var WorkerHandler = require('./WorkerHandler');
|
|
var environment = require('./environment');
|
|
var DebugPortAllocator = require('./debug-port-allocator');
|
|
var DEBUG_PORT_ALLOCATOR = new DebugPortAllocator();
|
|
/**
|
|
* A pool to manage workers
|
|
* @param {String} [script] Optional worker script
|
|
* @param {WorkerPoolOptions} [options] See docs
|
|
* @constructor
|
|
*/
|
|
function Pool(script, options) {
|
|
if (typeof script === 'string') {
|
|
this.script = script || null;
|
|
}
|
|
else {
|
|
this.script = null;
|
|
options = script;
|
|
}
|
|
|
|
this.workers = []; // queue with all workers
|
|
this.tasks = []; // queue with tasks awaiting execution
|
|
|
|
options = options || {};
|
|
|
|
this.forkArgs = options.forkArgs || [];
|
|
this.forkOpts = options.forkOpts || {};
|
|
this.debugPortStart = (options.debugPortStart || 43210);
|
|
this.nodeWorker = options.nodeWorker;
|
|
this.workerType = options.workerType || options.nodeWorker || 'auto'
|
|
this.maxQueueSize = options.maxQueueSize || Infinity;
|
|
|
|
// configuration
|
|
if (options && 'maxWorkers' in options) {
|
|
validateMaxWorkers(options.maxWorkers);
|
|
this.maxWorkers = options.maxWorkers;
|
|
}
|
|
else {
|
|
this.maxWorkers = Math.max((environment.cpus || 4) - 1, 1);
|
|
}
|
|
|
|
if (options && 'minWorkers' in options) {
|
|
if(options.minWorkers === 'max') {
|
|
this.minWorkers = this.maxWorkers;
|
|
} else {
|
|
validateMinWorkers(options.minWorkers);
|
|
this.minWorkers = options.minWorkers;
|
|
this.maxWorkers = Math.max(this.minWorkers, this.maxWorkers); // in case minWorkers is higher than maxWorkers
|
|
}
|
|
this._ensureMinWorkers();
|
|
}
|
|
|
|
this._boundNext = this._next.bind(this);
|
|
|
|
|
|
if (this.workerType === 'thread') {
|
|
WorkerHandler.ensureWorkerThreads();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Execute a function on a worker.
|
|
*
|
|
* Example usage:
|
|
*
|
|
* var pool = new Pool()
|
|
*
|
|
* // call a function available on the worker
|
|
* pool.exec('fibonacci', [6])
|
|
*
|
|
* // offload a function
|
|
* function add(a, b) {
|
|
* return a + b
|
|
* };
|
|
* pool.exec(add, [2, 4])
|
|
* .then(function (result) {
|
|
* console.log(result); // outputs 6
|
|
* })
|
|
* .catch(function(error) {
|
|
* console.log(error);
|
|
* });
|
|
*
|
|
* @param {String | Function} method Function name or function.
|
|
* If `method` is a string, the corresponding
|
|
* method on the worker will be executed
|
|
* If `method` is a Function, the function
|
|
* will be stringified and executed via the
|
|
* workers built-in function `run(fn, args)`.
|
|
* @param {Array} [params] Function arguments applied when calling the function
|
|
* @param {ExecOptions} [options] Options object
|
|
* @return {Promise.<*, Error>} result
|
|
*/
|
|
Pool.prototype.exec = function (method, params, options) {
|
|
// validate type of arguments
|
|
if (params && !Array.isArray(params)) {
|
|
throw new TypeError('Array expected as argument "params"');
|
|
}
|
|
|
|
if (typeof method === 'string') {
|
|
var resolver = Promise.defer();
|
|
|
|
if (this.tasks.length >= this.maxQueueSize) {
|
|
throw new Error('Max queue size of ' + this.maxQueueSize + ' reached');
|
|
}
|
|
|
|
// add a new task to the queue
|
|
var tasks = this.tasks;
|
|
var task = {
|
|
method: method,
|
|
params: params,
|
|
resolver: resolver,
|
|
timeout: null,
|
|
options: options
|
|
};
|
|
tasks.push(task);
|
|
|
|
// replace the timeout method of the Promise with our own,
|
|
// which starts the timer as soon as the task is actually started
|
|
var originalTimeout = resolver.promise.timeout;
|
|
resolver.promise.timeout = function timeout (delay) {
|
|
if (tasks.indexOf(task) !== -1) {
|
|
// task is still queued -> start the timer later on
|
|
task.timeout = delay;
|
|
return resolver.promise;
|
|
}
|
|
else {
|
|
// task is already being executed -> start timer immediately
|
|
return originalTimeout.call(resolver.promise, delay);
|
|
}
|
|
};
|
|
|
|
// trigger task execution
|
|
this._next();
|
|
|
|
return resolver.promise;
|
|
}
|
|
else if (typeof method === 'function') {
|
|
// send stringified function and function arguments to worker
|
|
return this.exec('run', [String(method), params]);
|
|
}
|
|
else {
|
|
throw new TypeError('Function or string expected as argument "method"');
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Create a proxy for current worker. Returns an object containing all
|
|
* methods available on the worker. The methods always return a promise.
|
|
*
|
|
* @return {Promise.<Object, Error>} proxy
|
|
*/
|
|
Pool.prototype.proxy = function () {
|
|
if (arguments.length > 0) {
|
|
throw new Error('No arguments expected');
|
|
}
|
|
|
|
var pool = this;
|
|
return this.exec('methods')
|
|
.then(function (methods) {
|
|
var proxy = {};
|
|
|
|
methods.forEach(function (method) {
|
|
proxy[method] = function () {
|
|
return pool.exec(method, Array.prototype.slice.call(arguments));
|
|
}
|
|
});
|
|
|
|
return proxy;
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Creates new array with the results of calling a provided callback function
|
|
* on every element in this array.
|
|
* @param {Array} array
|
|
* @param {function} callback Function taking two arguments:
|
|
* `callback(currentValue, index)`
|
|
* @return {Promise.<Array>} Returns a promise which resolves with an Array
|
|
* containing the results of the callback function
|
|
* executed for each of the array elements.
|
|
*/
|
|
/* TODO: implement map
|
|
Pool.prototype.map = function (array, callback) {
|
|
};
|
|
*/
|
|
|
|
/**
|
|
* Grab the first task from the queue, find a free worker, and assign the
|
|
* worker to the task.
|
|
* @protected
|
|
*/
|
|
Pool.prototype._next = function () {
|
|
if (this.tasks.length > 0) {
|
|
// there are tasks in the queue
|
|
|
|
// find an available worker
|
|
var worker = this._getWorker();
|
|
if (worker) {
|
|
// get the first task from the queue
|
|
var me = this;
|
|
var task = this.tasks.shift();
|
|
|
|
// check if the task is still pending (and not cancelled -> promise rejected)
|
|
if (task.resolver.promise.pending) {
|
|
// send the request to the worker
|
|
var promise = worker.exec(task.method, task.params, task.resolver, task.options)
|
|
.then(me._boundNext)
|
|
.catch(function () {
|
|
// if the worker crashed and terminated, remove it from the pool
|
|
if (worker.terminated) {
|
|
return me._removeWorker(worker);
|
|
}
|
|
}).then(function() {
|
|
me._next(); // trigger next task in the queue
|
|
});
|
|
|
|
// start queued timer now
|
|
if (typeof task.timeout === 'number') {
|
|
promise.timeout(task.timeout);
|
|
}
|
|
} else {
|
|
// The task taken was already complete (either rejected or resolved), so just trigger next task in the queue
|
|
me._next();
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Get an available worker. If no worker is available and the maximum number
|
|
* of workers isn't yet reached, a new worker will be created and returned.
|
|
* If no worker is available and the maximum number of workers is reached,
|
|
* null will be returned.
|
|
*
|
|
* @return {WorkerHandler | null} worker
|
|
* @private
|
|
*/
|
|
Pool.prototype._getWorker = function() {
|
|
// find a non-busy worker
|
|
var workers = this.workers;
|
|
for (var i = 0; i < workers.length; i++) {
|
|
var worker = workers[i];
|
|
if (worker.busy() === false) {
|
|
return worker;
|
|
}
|
|
}
|
|
|
|
if (workers.length < this.maxWorkers) {
|
|
// create a new worker
|
|
worker = this._createWorkerHandler();
|
|
workers.push(worker);
|
|
return worker;
|
|
}
|
|
|
|
return null;
|
|
};
|
|
|
|
/**
|
|
* Remove a worker from the pool.
|
|
* Attempts to terminate worker if not already terminated, and ensures the minimum
|
|
* pool size is met.
|
|
* @param {WorkerHandler} worker
|
|
* @return {Promise<WorkerHandler>}
|
|
* @protected
|
|
*/
|
|
Pool.prototype._removeWorker = function(worker) {
|
|
DEBUG_PORT_ALLOCATOR.releasePort(worker.debugPort);
|
|
// _removeWorker will call this, but we need it to be removed synchronously
|
|
this._removeWorkerFromList(worker);
|
|
// If minWorkers set, spin up new workers to replace the crashed ones
|
|
this._ensureMinWorkers();
|
|
// terminate the worker (if not already terminated)
|
|
return new Promise(function(resolve, reject) {
|
|
worker.terminate(false, function(err) {
|
|
if (err) {
|
|
reject(err);
|
|
} else {
|
|
resolve(worker);
|
|
}
|
|
});
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Remove a worker from the pool list.
|
|
* @param {WorkerHandler} worker
|
|
* @protected
|
|
*/
|
|
Pool.prototype._removeWorkerFromList = function(worker) {
|
|
// remove from the list with workers
|
|
var index = this.workers.indexOf(worker);
|
|
if (index !== -1) {
|
|
this.workers.splice(index, 1);
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Close all active workers. Tasks currently being executed will be finished first.
|
|
* @param {boolean} [force=false] If false (default), the workers are terminated
|
|
* after finishing all tasks currently in
|
|
* progress. If true, the workers will be
|
|
* terminated immediately.
|
|
* @param {number} [timeout] If provided and non-zero, worker termination promise will be rejected
|
|
* after timeout if worker process has not been terminated.
|
|
* @return {Promise.<void, Error>}
|
|
*/
|
|
Pool.prototype.terminate = function (force, timeout) {
|
|
// cancel any pending tasks
|
|
this.tasks.forEach(function (task) {
|
|
task.resolver.reject(new Error('Pool terminated'));
|
|
});
|
|
this.tasks.length = 0;
|
|
|
|
var f = function (worker) {
|
|
this._removeWorkerFromList(worker);
|
|
};
|
|
var removeWorker = f.bind(this);
|
|
|
|
var promises = [];
|
|
var workers = this.workers.slice();
|
|
workers.forEach(function (worker) {
|
|
var termPromise = worker.terminateAndNotify(force, timeout)
|
|
.then(removeWorker);
|
|
promises.push(termPromise);
|
|
});
|
|
return Promise.all(promises);
|
|
};
|
|
|
|
/**
|
|
* Retrieve statistics on tasks and workers.
|
|
* @return {{totalWorkers: number, busyWorkers: number, idleWorkers: number, pendingTasks: number, activeTasks: number}} Returns an object with statistics
|
|
*/
|
|
Pool.prototype.stats = function () {
|
|
var totalWorkers = this.workers.length;
|
|
var busyWorkers = this.workers.filter(function (worker) {
|
|
return worker.busy();
|
|
}).length;
|
|
|
|
return {
|
|
totalWorkers: totalWorkers,
|
|
busyWorkers: busyWorkers,
|
|
idleWorkers: totalWorkers - busyWorkers,
|
|
|
|
pendingTasks: this.tasks.length,
|
|
activeTasks: busyWorkers
|
|
};
|
|
};
|
|
|
|
/**
|
|
* Ensures that a minimum of minWorkers is up and running
|
|
* @protected
|
|
*/
|
|
Pool.prototype._ensureMinWorkers = function() {
|
|
if (this.minWorkers) {
|
|
for(var i = this.workers.length; i < this.minWorkers; i++) {
|
|
this.workers.push(this._createWorkerHandler());
|
|
}
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Helper function to create a new WorkerHandler and pass all options.
|
|
* @return {WorkerHandler}
|
|
* @private
|
|
*/
|
|
Pool.prototype._createWorkerHandler = function () {
|
|
return new WorkerHandler(this.script, {
|
|
forkArgs: this.forkArgs,
|
|
forkOpts: this.forkOpts,
|
|
debugPort: DEBUG_PORT_ALLOCATOR.nextAvailableStartingAt(this.debugPortStart),
|
|
workerType: this.workerType
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Ensure that the maxWorkers option is an integer >= 1
|
|
* @param {*} maxWorkers
|
|
* @returns {boolean} returns true maxWorkers has a valid value
|
|
*/
|
|
function validateMaxWorkers(maxWorkers) {
|
|
if (!isNumber(maxWorkers) || !isInteger(maxWorkers) || maxWorkers < 1) {
|
|
throw new TypeError('Option maxWorkers must be an integer number >= 1');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Ensure that the minWorkers option is an integer >= 0
|
|
* @param {*} minWorkers
|
|
* @returns {boolean} returns true when minWorkers has a valid value
|
|
*/
|
|
function validateMinWorkers(minWorkers) {
|
|
if (!isNumber(minWorkers) || !isInteger(minWorkers) || minWorkers < 0) {
|
|
throw new TypeError('Option minWorkers must be an integer number >= 0');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Test whether a variable is a number
|
|
* @param {*} value
|
|
* @returns {boolean} returns true when value is a number
|
|
*/
|
|
function isNumber(value) {
|
|
return typeof value === 'number';
|
|
}
|
|
|
|
/**
|
|
* Test whether a number is an integer
|
|
* @param {number} value
|
|
* @returns {boolean} Returns true if value is an integer
|
|
*/
|
|
function isInteger(value) {
|
|
return Math.round(value) == value;
|
|
}
|
|
|
|
module.exports = Pool;
|