Skip to content

Flexible connections for sentinel support #429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 52 additions & 12 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,17 @@ function RedisClient(stream, options) {

this.old_state = null;

var self = this;
if(this.stream){
this.initialize_stream_listeners();
}

events.EventEmitter.call(this);
}
util.inherits(RedisClient, events.EventEmitter);
exports.RedisClient = RedisClient;

RedisClient.prototype.initialize_stream_listeners = function () {
var self = this;
this.stream.on("connect", function () {
self.on_connect();
});
Expand All @@ -86,23 +95,28 @@ function RedisClient(stream, options) {
self.on_error(msg.message);
});

this.stream.on("close", function () {
self.connection_gone("close");
this.stream.on("close", function (had_error) {
if(had_error !== true){
// Only in case the error event wasn't emitted earlier, to prevent duplicate call to connection_gone
self.connection_gone("close");
}
});

this.stream.on("end", function () {
self.connection_gone("end");
});

if(this.options.socket_timeout){
this.stream.setTimeout(this.options.socket_timeout, function () {
self.stream.destroy();
});
}

this.stream.on("drain", function () {
self.should_buffer = false;
self.emit("drain");
});

events.EventEmitter.call(this);
}
util.inherits(RedisClient, events.EventEmitter);
exports.RedisClient = RedisClient;
};

RedisClient.prototype.initialize_retry_vars = function () {
this.retry_timer = null;
Expand All @@ -114,6 +128,11 @@ RedisClient.prototype.initialize_retry_vars = function () {

// flush offline_queue and command_queue, erroring any items with a callback first
RedisClient.prototype.flush_and_error = function (message) {

if (this.options.disable_flush) {
return;
}

var command_obj;
while (this.offline_queue.length > 0) {
command_obj = this.offline_queue.shift();
Expand Down Expand Up @@ -479,6 +498,18 @@ RedisClient.prototype.connection_gone = function (why) {
}, this.retry_delay);
};

RedisClient.prototype.forceReconnectionAttempt = function (){
if(!this.stream){
this.stream = net.createConnection(this.port, this.host);
this.initialize_stream_listeners();
return;
}

clearTimeout(this.retry_timer);
this.initialize_retry_vars();
this.connection_gone();
};

RedisClient.prototype.on_data = function (data) {
if (exports.debug_mode) {
console.log("net read " + this.host + ":" + this.port + " id " + this.connection_id + ": " + data.toString());
Expand Down Expand Up @@ -1127,11 +1158,20 @@ RedisClient.prototype.eval = RedisClient.prototype.EVAL = function () {


exports.createClient = function (port_arg, host_arg, options) {
var port = port_arg || default_port,
host = host_arg || default_host,
redis_client, net_client;
var redis_client;
var net_client;

net_client = net.createConnection(port, host);
var port = port_arg,
host = host_arg;

options = options || {};

if(options.allowNoSocket !== true || (port_arg !== null && host_arg !== null)){
host = host || default_host;
port = port || default_port;

net_client = net.createConnection(port, host);
}

redis_client = new RedisClient(net_client, options);

Expand Down