199 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			199 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| var EventEmitter = require('events').EventEmitter;
 | |
| var util = require('util');
 | |
| 
 | |
| var DEFAULT_TIMEOUT = 3000;
 | |
| var INIT_ID = 0;
 | |
| var EVENT_CLOSED = 'closed';
 | |
| var EVENT_DRAINED = 'drained';
 | |
| 
 | |
| /**
 | |
|  * Instance a new queue
 | |
|  *
 | |
|  * @param {Number} timeout a global timeout for new queue
 | |
|  * @class
 | |
|  * @constructor
 | |
|  */
 | |
| var SeqQueue = function(timeout) {
 | |
| 	EventEmitter.call(this);
 | |
| 	
 | |
| 	if(timeout && timeout > 0) {
 | |
| 		this.timeout = timeout;
 | |
| 	} else {
 | |
| 		this.timeout = DEFAULT_TIMEOUT;
 | |
| 	}
 | |
| 	
 | |
| 	this.status = SeqQueueManager.STATUS_IDLE;
 | |
| 	this.curId = INIT_ID;
 | |
| 	this.queue = [];
 | |
| };
 | |
| util.inherits(SeqQueue, EventEmitter);
 | |
| 
 | |
| /**
 | |
|  * Add a task into queue.
 | |
|  * 
 | |
|  * @param fn new request
 | |
|  * @param ontimeout callback when task timeout
 | |
|  * @param timeout timeout for current request. take the global timeout if this is invalid
 | |
|  * @returns true or false
 | |
|  */
 | |
| SeqQueue.prototype.push = function(fn, ontimeout, timeout) {
 | |
| 	if(this.status !== SeqQueueManager.STATUS_IDLE && this.status !== SeqQueueManager.STATUS_BUSY) {
 | |
| 		//ignore invalid status
 | |
| 		return false;
 | |
| 	}
 | |
| 	
 | |
| 	if(typeof fn !== 'function') {
 | |
| 		throw new Error('fn should be a function.');
 | |
| 	}
 | |
| 	this.queue.push({fn: fn, ontimeout: ontimeout, timeout: timeout});
 | |
| 
 | |
| 	if(this.status === SeqQueueManager.STATUS_IDLE) {
 | |
| 		this.status = SeqQueueManager.STATUS_BUSY;
 | |
| 		var self = this;
 | |
| 		process.nextTick(function() {
 | |
| 			self._next(self.curId);
 | |
| 		});
 | |
| 	}
 | |
| 	return true;
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Close queue
 | |
|  * 
 | |
|  * @param {Boolean} force if true will close the queue immediately else will execute the rest task in queue
 | |
|  */
 | |
| SeqQueue.prototype.close = function(force) {
 | |
| 	if(this.status !== SeqQueueManager.STATUS_IDLE && this.status !== SeqQueueManager.STATUS_BUSY) {
 | |
| 		//ignore invalid status
 | |
| 		return;
 | |
| 	}
 | |
| 	
 | |
| 	if(force) {
 | |
| 		this.status = SeqQueueManager.STATUS_DRAINED;
 | |
| 		if(this.timerId) {
 | |
| 			clearTimeout(this.timerId);
 | |
| 			this.timerId = undefined;
 | |
| 		}
 | |
| 		this.emit(EVENT_DRAINED);
 | |
| 	} else {
 | |
| 		this.status = SeqQueueManager.STATUS_CLOSED;
 | |
| 		this.emit(EVENT_CLOSED);
 | |
| 	}
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Invoke next task
 | |
|  * 
 | |
|  * @param {String|Number} tid last executed task id
 | |
|  * @api private
 | |
|  */
 | |
| SeqQueue.prototype._next = function(tid) {
 | |
| 	if(tid !== this.curId || this.status !== SeqQueueManager.STATUS_BUSY && this.status !== SeqQueueManager.STATUS_CLOSED) {
 | |
| 		//ignore invalid next call
 | |
| 		return;
 | |
| 	}
 | |
| 	
 | |
| 	if(this.timerId) {
 | |
| 		clearTimeout(this.timerId);
 | |
| 		this.timerId = undefined;
 | |
| 	}
 | |
| 	
 | |
| 	var task = this.queue.shift();
 | |
| 	if(!task) {
 | |
| 		if(this.status === SeqQueueManager.STATUS_BUSY) {
 | |
| 			this.status = SeqQueueManager.STATUS_IDLE;
 | |
| 			this.curId++;	//modify curId to invalidate timeout task
 | |
| 		} else {
 | |
| 			this.status = SeqQueueManager.STATUS_DRAINED;
 | |
| 			this.emit(EVENT_DRAINED);
 | |
| 		}
 | |
| 		return;
 | |
| 	}
 | |
| 	
 | |
| 	var self = this;
 | |
| 	task.id = ++this.curId;
 | |
| 
 | |
| 	var timeout = task.timeout > 0 ? task.timeout : this.timeout;
 | |
| 	timeout = timeout > 0 ? timeout : DEFAULT_TIMEOUT;
 | |
| 	this.timerId = setTimeout(function() {
 | |
| 		process.nextTick(function() {
 | |
| 			self._next(task.id);
 | |
| 		});
 | |
| 		self.emit('timeout', task);
 | |
| 		if(task.ontimeout) {
 | |
| 			task.ontimeout();
 | |
| 		}
 | |
| 	}, timeout);
 | |
| 
 | |
| 	try {
 | |
| 		task.fn({
 | |
| 			done: function() {
 | |
| 				var res = task.id === self.curId;
 | |
| 				process.nextTick(function() {
 | |
| 					self._next(task.id);
 | |
| 				});
 | |
| 				return res;
 | |
| 			}
 | |
| 		});
 | |
| 	} catch(err) {
 | |
| 		self.emit('error', err, task);
 | |
| 		process.nextTick(function() {
 | |
| 			self._next(task.id);
 | |
| 		});
 | |
| 	}
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Queue manager.
 | |
|  * 
 | |
|  * @module
 | |
|  */
 | |
| var SeqQueueManager = module.exports;
 | |
| 
 | |
| /**
 | |
|  * Queue status: idle, welcome new tasks
 | |
|  *
 | |
|  * @const
 | |
|  * @type {Number}
 | |
|  * @memberOf SeqQueueManager
 | |
|  */
 | |
| SeqQueueManager.STATUS_IDLE = 0;
 | |
| 
 | |
| /**
 | |
|  * Queue status: busy, queue is working for some tasks now
 | |
|  *
 | |
|  * @const
 | |
|  * @type {Number}
 | |
|  * @memberOf SeqQueueManager
 | |
|  */
 | |
| SeqQueueManager.STATUS_BUSY = 1;
 | |
| 
 | |
| /**
 | |
|  * Queue status: closed, queue has closed and would not receive task any more 
 | |
|  * 					and is processing the remaining tasks now.
 | |
|  *
 | |
|  * @const
 | |
|  * @type {Number}
 | |
|  * @memberOf SeqQueueManager
 | |
|  */
 | |
| SeqQueueManager.STATUS_CLOSED = 2; 
 | |
| 
 | |
| /**
 | |
|  * Queue status: drained, queue is ready to be destroy
 | |
|  *
 | |
|  * @const
 | |
|  * @type {Number}
 | |
|  * @memberOf SeqQueueManager
 | |
|  */
 | |
| SeqQueueManager.STATUS_DRAINED = 3;
 | |
| 
 | |
| /**
 | |
|  * Create Sequence queue
 | |
|  * 
 | |
|  * @param  {Number} timeout a global timeout for the new queue instance
 | |
|  * @return {Object}         new queue instance
 | |
|  * @memberOf SeqQueueManager
 | |
|  */
 | |
| SeqQueueManager.createQueue = function(timeout) {
 | |
| 	return new SeqQueue(timeout);
 | |
| }; |