var events = require('events');
var crypto = require('crypto');
var util = require('util');
var dbg = require('debug');
var utils = require('./utils');
module.exports = (function() {
var debugChannel = dbg('mikronode:channel');
var debugChannelData = dbg('mikronode:channel:data');
var _ = require('private-parts').createKey();
/**
* writeCallback
* @callback mikronode.Channel.writeCallback
* @param {Channel}
*/
/**
* Emitted when a command has finished successfully.
* @event mikronode.Channel#event:done
* @property {(string|string[])} data - The data returned by the channel
* @property {Channel} channel - The channel originating the event Fatal event.
*/
/**
* Emitted when a non-recoverable error has occurred on the socket. No further commands
* can be processed on any channel.
* @event mikronode.Channel#event:error
* @property {error} error - The error object
* @property {Channel} channel - The channel originating the event
*/
/**
* Emitted when a socket has been idle too long.
* @event mikronode.Channel#event:timeout
* @property {string} message - 'Socket Timeout'
* @property {boolean} socketStillOpen - If true, communications can continue
* @property {Channel} channel - The channel originating the event
*/
/**
* Emitted when the channel is closed either by an explicit call to
* {@link mikronode.Channel#close} or when the channel is closed automatically via
* {@link mikronode.Channel#closeOnDone}
* @event mikronode.Channel#event:close
* @property {Channel} channel - The channel originating the event
*/
/**
* Emitted when a command has failed. Subsequent commands may succeed.
* @event mikronode.Channel#event:trap
* @property {mikronode.Trap} trap - The trap object
*/
/**
* Channel (should not be instantiated directly)
* @exports mikronode.Channel
* @implements {EventEmitter}
* @class
* @param {number} id
* @param {mikronode.Connection} conn
* @fires mikronode.Channel#event:done
* @fires mikronode.Channel#event:trap
* @fires mikronode.Channel#event:error
* @fires mikronode.Channel#event:timeout
* @fires {mikronode.Channel#event:close}
*/
function Channel(id, conn) {
debugChannel('Opening channel: ' + id);
/**
* Channel ID
* @public
* @readonly
* @instance
* @member {number} id
* @memberof mikronode.Channel
*/
utils.createProperty(this, 'id', id, false, _);
/**
* Connection
* @private
* @readonly
* @instance
* @member {mikronode.Connection} connection
* @memberof mikronode.Channel
*/
_(this).connection = conn;
/**
* @public
* @readonly
* @instance
* @member {boolean} running
* @memberof mikronode.Channel
*/
utils.createProperty(this, 'running', false, false, _);
/**
* @public
* @readonly
* @instance
* @member {boolean} closing
* @memberof mikronode.Channel
*/
utils.createProperty(this, 'closing', false, false, _);
/**
* @public
* @readonly
* @instance
* @member {boolean} closed
* @memberof mikronode.Channel
*/
utils.createProperty(this, 'closed', false, false, _);
/**
* Clear event listeners on done
* @public
* @instance
* @member {boolean} clearEvents
* @memberof mikronode.Channel
*/
utils.createProperty(this, 'clearEvents', false, true, _);
/**
* Save each line received in a buffer and pass the entire buffer to the done event.
* Otherwise the done event will not get all the lines, only the last line. This is
* handy when following trailing output from a listen command, where the data could
* be endless.
* @public
* @instance
* @member {boolean} saveBuffer
* @memberof mikronode.Channel
*/
utils.createProperty(this, 'saveBuffer', true, true, _);
/**
* Close channel on done
* @public
* @instance
* @member {boolean} closeOnDone
* @memberof mikronode.Channel
*/
utils.createProperty(this, 'closeOnDone', false, true, _);
/**
* @public
* @readonly
* @instance
* @member {string[]} lastCommand
* @memberof mikronode.Channel
*/
utils.createProperty(this, 'lastCommand', [], false, _, true);
/**
* @private
* @instance
* @member {mikronode.Channel.writeCallback} writeCallback
* @memberof mikronode.Channel
*/
_(this).writeCallback = null;
/**
* @private
* @instance
* @member {array} packet
* @memberof mikronode.Channel
*/
_(this).packet = [];
/**
* @private
* @instance
* @member {array} commands
* @memberof mikronode.Channel
*/
_(this).commands = [];
/**
* @private
* @instance
* @member {array} buffer
* @memberof mikronode.Channel
*/
_(this).buffer = [];
/* We want connection errors to propogate down to
* the channel so they can be caught by a channel promise
*/
var _this = this;
/* A 'error' event is thrown by Socket
* and are non-recoverable so we force close the channel.
*/
_(this).errorListener = utils.makeListener(this.errorListener, this);
conn.once('error', _(this).errorListener);
/* A 'timeout' event is thrown by Socket
* but they are recoverable. If Connection has closed
* the Socket, we'll close the channel. Otherwise, just
* notify receivers and let them decide what to do.
*/
_(this).timeoutListener = utils.makeListener(this.timeoutListener, this);
conn.on('timeout', _(this).timeoutListener);
}
util.inherits(Channel, events.EventEmitter);
Channel.prototype.errorListener = function errorListener(err) {
debugChannel('Channel %s caught Connection Error: %o', _(this).id, _(this).connection);
this.emit('error', err, this);
this.close(true);
};
Channel.prototype.timeoutListener = function timeoutListener(message, socketStillOpen) {
debugChannel('Channel %s caught Timeout', _(this).id);
this.emit('timeout', message, socketStillOpen, this);
if (!socketStillOpen) {
this.close(true);
}
};
/**
* Writes data to the channel
* @param {(string|string[])} data - Can be a single string with the command and
* optional parameters separated by '\n' or an array of strings with the
* command in the first position and the parameters in the rest.
* @param {(object|string[])} [parameters] - If the first parameter is a command
* string, this object will be treated as the parameters for the command.
* <p>
* It can be an array or strings...
*
* <pre>
* ['name=value','name=value'...]
* </pre>
*
* or an Object...
*
* <pre>
* {'name': 'value', 'name': 'value'...}
* </pre>
*
* @param {mikronode.Channel.writeCallback} [writeCallback] - This will be called just
* before write actually writes the data to the connection.
*/
Channel.prototype.write = function write(d, parameters, writeCallback) {
if (_(this).closing) {
return;
}
if (d) {
if (typeof (d) === 'string') {
d = d.split("\n");
}
if (typeof parameters !== 'function') {
if (Array.isArray(parameters)) {
Array.prototype.push.apply(d, parameters);
} else if (parameters instanceof Object) {
Object.keys(parameters).forEach(function(k) {
d.push(k + '=' + parameters[k]);
});
}
} else if (writeCallback === undefined) {
writeCallback = parameters;
}
if (Array.isArray(d) && d.length) {
_(this).buffer = _(this).buffer.concat(d);
} else {
return;
}
} else {
debugChannel('Channel %s write: empty arg.', _(this).id);
}
if (_(this).running) {
_(this).lastCommand = _(this).buffer;
if (debugChannelData.enabled) {
debugChannelData('Channel %s running: pushing command %o', _(this).id, _(this).lastCommand);
} else {
debugChannel('Channel %s running: pushing command', _(this).id);
}
_(this).commands.push([ _(this).buffer, writeCallback ]);
_(this).buffer = [];
} else {
_(this).lastCommand = _(this).buffer;
var b = _(this).buffer;
_(this).running = true;
_(this).saveBuffer = true;
_(this).buffer = [];
b.push('.tag=' + _(this).id);
if (writeCallback) {
writeCallback(this);
}
if (debugChannelData.enabled) {
debugChannelData('Channel %s idle: writing %o', _(this).id, _(this).lastCommand);
} else {
debugChannel('Channel %s idle: writing', _(this).id);
}
_(this).connection._write(b); // Send command.
}
};
/**
* Called when connection gets 'done'
* @private
* @param {(string|string[])} data
*/
Channel.prototype._done = function _done(data, trap) {
if (trap) {
debugChannel('Channel %s trap: %o', _(this).id, trap);
this.emit('trap', trap, this);
} else {
var p = _(this).packet;
_(this).packet = [];
if (!p.length) {
p = [ data ];
} else if (p[p.length - 1] !== data) {
p.push(data);
}
if (debugChannelData.enabled) {
debugChannelData('Channel %s done: %o', _(this).id, p);
} else {
debugChannel('Channel %s done', _(this).id);
}
this.emit('done', p, this);
}
if (_(this).clearEvents) {
this.removeAllListeners('done');
this.removeAllListeners('data');
this.removeAllListeners('read');
}
_(this).running = false;
if (_(this).commands.length) {
var c = _(this).commands.shift();
var cl = _(this).closing;
_(this).closing = false;
debugChannel('Channel %s more commands', _(this).id);
this.write(c[0], {}, c[1]);
_(this).closing = cl;
} else if (_(this).closing || _(this).closeOnDone) {
this.close();
}
};
/**
* Called when connection gets 'data'
* @private
* @param {(string|string[])} data
*/
Channel.prototype._data = function _data(data) {
if (debugChannelData.enabled) {
debugChannelData('Channel %s data: %o', _(this).id, data);
} else {
debugChannel('Channel %s data', _(this).id);
}
if (_(this).saveBuffer) {
_(this).packet.push(data);
}
this.emit('data', [ data ], this);
this.emit('read', [ data ], this);
};
/**
* Closes the channel This will close the connection if
* {@link mikronode.Connection#closeOnDone} was set and this was the last channel to
* close.
* @public
* @param {boolean} force - Force close even of there are other commands pending.
* Otherwise mark the channel as 'closing' which will prevent new commands
* from being started but will let queued ones finish.
*/
Channel.prototype.close = function close(force) { // Close _(this) channel.
_(this).closing = true;
if (_(this).closed || (!force && (_(this).commands.length || _(this).running))) {
debugChannel('Channel %s closing deferred', _(this).id);
return;
}
debugChannel('Channel %s closing. Forced: %s', _(this).id, force ? 'true' : 'false');
if (_(this).running) {
try {
debugChannel('Channel %s sending cancel', _(this).id);
_(this).connection._write([ '/cancel', '=tag=' + _(this).id ]);
} catch (err) {
debugChannel('Error sending /cancel', err.stack);
}
}
_(this).connection.removeListener('error', _(this).errorListener);
_(this).connection.removeListener('timeout', _(this).timeoutListener);
debugChannel('Channel %s closed', _(this).id);
_(this).closed = true;
this.emit('close', this);
this.removeAllListeners();
};
/**
* Calls {@link mikronode.Channel#close}(false)
* @public
*/
Channel.prototype.finalize = function finalize() {
debugChannel('Channel %s finalize', _(this).id);
if (!_(this).closing) {
this.close();
}
};
return Channel;
})();