Skip to content

How to reestablish connection after a failure? #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mlmarius opened this issue May 11, 2015 · 8 comments
Closed

How to reestablish connection after a failure? #153

mlmarius opened this issue May 11, 2015 · 8 comments

Comments

@mlmarius
Copy link

Hi. When i close the connection from the rabbitmq side i get this:

[Error] Error: Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - Closed via management plugin"
at Object.accept (/home/liviu/socketcluster/web_push/node_modules/amqplib/lib/connection.js:89:32)
at Connection.mainAccept [as accept] (/home/liviu/socketcluster/web_push/node_modules/amqplib/lib/connection.js:62:33)
at Socket.go (/home/liviu/socketcluster/web_push/node_modules/amqplib/lib/connection.js:465:48)
at Socket.emit (events.js:92:17)
at emitReadable_ (_stream_readable.js:426:10)
at emitReadable (_stream_readable.js:422:5)
at readableAddChunk (_stream_readable.js:165:9)
at Socket.Readable.push (_stream_readable.js:127:10)
at TCP.onread (net.js:528:21)

How do i make it retry the connection in 10 seconds instead of err-ing? I am using the code from topic receive.

@nini-os
Copy link

nini-os commented May 11, 2015

There isn't such an option with this library. You can listen to the even "close" of the connection like this

conn.on('close', function connectionClose() {
        log('Connection closed');
        retryConnection();
    });

and then you can implement a function like retryConnection() that, using setTimeout, call ampq.connect to connect again.

@squaremo
Copy link
Collaborator

As @chiarishow says, there's presently no reconnection feature in amqplib. See #25 for (inconclusive) discussion.

@mlmarius
Copy link
Author

I've made a test script and it appears to be working up to a point. If i stop the rabbitmq server completely then it reschedules the reconnect but then i get an unhandled exception. Where should i best handle this or is it an event ? This is my setup:

RabbitListener.prototype.connect = function(){
        var me = this;
        try{
                amqp.connect(this.addr_).then(function(conn) {
                        conn.on('error', me.reconnect.bind(me));
                        return conn.createChannel().then(function(chan){
                                console.log('channel created');
                                return chan;
                        });
                }).then(function(chan){
                        var exchange = chan.assertExchange('topic1', 'topic', {durable: true});
                        queue = exchange.then(function(){
                                return chan.assertQueue('', {exclusive: true});
                        }).then(function(queue){
                                return chan.bindQueue(queue.queue,'topic1','t1');
                        }).then(function(boundQueue){
                                return chan.consume(boundQueue.queue,function(msg){
                                        console.log(msg.content.toString());
                                },{noAck: true});
                        });
                });
        }
        catch(error){
                console.log('caught exception');
        }

};
RabbitListener.prototype.reconnect = function(){
        var reconnectTimeout = 10000;
        var me = this;
        console.log('Scheduling reconnect in '+(reconnectTimeout/1000)+'s');
        setTimeout(function(){
                console.log('now attempting reconnect ...');
                me.connect();
        }, reconnectTimeout);
}

I was trying to catch any exception but nothing appears. This is the error i get when my script exists:

Potentially unhandled rejection [1] Error: connect ECONNREFUSED
    at errnoException (net.js:904:11)
    at Object.afterConnect [as oncomplete] (net.js:895:19)

@nini-os
Copy link

nini-os commented May 12, 2015

Try catch doesn't work here.

Reading the API:
The returned promise, or supplied callback, will either be resolved with an object representing an open connection, or rejected with a sympathetically-worded error.

So you must modify your code:

amqp.connect(this.addr_).then(function(conn) {
                        conn.on('error', me.reconnect.bind(me));
                        return conn.createChannel().then(function(chan){
                                console.log('channel created');
                                return chan;
                        });
                }, function connectionFailed(err) {
                                console.log('connection failed', err);
                                me.reconnect();
                })

@mlmarius
Copy link
Author

thanks.

@jgato
Copy link

jgato commented Jul 16, 2015

@mlmarius I haven an issue with your code. I am also working on creating some kind of "simple" auto-reconnection with amqplib.

If you lunch this code with the rabbit server down, you will detect the error (and begin with the reconnect process). But at the same time you will continue with the promises chain executing the ".then(function(chan))...:

RabbitListener.prototype.connect = function(){
        var me = this;
                amqp.connect(this.addr_).then(function(conn) {
                        conn.on('error', me.reconnect.bind(me));
                        return conn.createChannel().then(function(chan){
                                console.log('channel created');
                                return chan;
                        });
                }).then(function(chan){
                    // this part executes always
                   // no matter if everything is going ok or not.

                }

Dont you have errors about "cannot call method assertExchange of undefined"?

I am trying to write a class that manage the connection automatically (and later, reconnect with your last list of queues). So, you will invoke connect once, and then you can do publish/subscribe without worrying about the channel is ready or not.

By the moment just a draft:

var queues = ["DemoChannel-48688754-queu"];

var connectQueues = function(chan){

    for (var i = 0; i < queues.length; i++) {
        chan.consume(queues[i], function (msg) {
            if (msg !== null) {
                console.log(msg.content.toString());
                chan.ack(msg);
            }
        }, Object.create({
            exclusive: true
        }));
    }
}
RabbitListener.prototype.connect = function () {
    var me = this;
    return new Promise(function (resolve, reject) {


        rabbit.connect('amqp://user:passwrod@localhost').then(function (conn) {

            console.log("connected to the server");
            conn.on('error', me.reconnect.bind(me));
            return conn.createChannel().then(function (chan) {
                console.log('channel created, lets connect exchanges');
                connectQueues(chan);
                resolve(chan);
            });
        }, function connectionFailed(err) {
            console.log('connection failed', err);
            me.reconnect();
        }).catch(function (error){
            console.log(error);
        });
    });

};
RabbitListener.prototype.reconnect = function () {
    var reconnectTimeout = 1000;
    var me = this;
    console.log('Scheduling reconnect in ' + (reconnectTimeout / 1000) + 's');
    setTimeout(function () {
        console.log('now attempting reconnect ...');
        me.connect();
    }, reconnectTimeout);
}

function RabbitListener() {

};

module.exports = RabbitListener;

I will add some methods to add/remove queues and callbacks for them.

@haydursek
Copy link

@jgato I am looking into doing something similar, so I wanted to know if your impl also covers channels being closed and reconnect logic.

@jgato
Copy link

jgato commented Dec 19, 2016

@haydursek I fear that I abandoned that implementation without going too much in detail :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants