Amqp: Better reconnect support

Created on 9 Feb 2015  路  11Comments  路  Source: streadway/amqp

https://github.com/streadway/amqp/blob/master/reconnect_test.go

Looking at how you are doing reconnect it causes a slight issue if you are using go routines. In that case you have to start managing all your goroutines with something like http://stackoverflow.com/a/6807784/2049333 which can be a real mess. I just wanted to open up a discussion to see if this could be improved or if that is not possible.

I was thinking something like not closing the channel when the server stops and allow for a call like TryReconnect attached to amqp.Connection that would try and reopen the connection with the current connection information.

I know overall you don't want to support reconnect but doing so is currently a bit of a mess in my opinion.

Most helpful comment

@icholy sorry missed this. Here is a small wrapper I wrote for this library that handles reconnect support.

This could be made a lot nice by making an interface and then use this as the standard implementation. Since likely only the connect function will be what most people will end up changing.

package consumers

import (
    "errors"
    "fmt"
    "log"
    "time"

    "github.com/streadway/amqp"
)

// Consumer holds all infromation
// about the RabbitMQ connection
// This setup does limit a consumer
// to one exchange. This should not be
// an issue. Having to connect to multiple
// exchanges means something else is
// structured improperly.
type Consumer struct {
    conn         *amqp.Connection
    channel      *amqp.Channel
    done         chan error
    consumerTag  string // Name that consumer identifies itself to the server with
    uri          string // uri of the rabbitmq server
    exchange     string // exchange that we will bind to
    exchangeType string // topic, direct, etc...
    bindingKey   string // routing key that we are using
}

// NewConsumer returns a Consumer struct
// that has been initialized properly
// essentially don't touch conn, channel, or
// done and you can create Consumer manually
func NewConsumer(
    consumerTag,
    uri,
    exchange,
    exchangeType,
    bindingKey string) *Consumer {
    return &Consumer{
        consumerTag:  consumerTag,
        uri:          uri,
        exchange:     exchange,
        exchangeType: exchangeType,
        bindingKey:   bindingKey,
        done:         make(chan error),
    }

}

// ReConnect is called in places where NotifyClose() channel is called
// wait 30 seconds before trying to reconnect. Any shorter amount of time
// will  likely destroy the error log while waiting for servers to come
// back online. This requires two parameters which is just to satisfy
// the AccounceQueue call and allows greater flexability
func (c *Consumer) ReConnect(queueName, bindingKey string) (<-chan amqp.Delivery, error) {
    time.Sleep(30 * time.Second)

    if err := c.Connect(); err != nil {
        log.Printf("Could not connect in reconnect call: %v", err.Error())
    }

    deliveries, err := c.AnnounceQueue(queueName, bindingKey)
    if err != nil {
        return deliveries, errors.New("Couldn't connect")
    }

    return deliveries, nil
}

// Connect to RabbitMQ server
func (c *Consumer) Connect() error {

    var err error

    log.Printf("dialing %q", c.uri)
    c.conn, err = amqp.Dial(c.uri)
    if err != nil {
        return fmt.Errorf("Dial: %s", err)
    }

    go func() {
        // Waits here for the channel to be closed
        log.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error)))
        // Let Handle know it's not time to reconnect
        c.done <- errors.New("Channel Closed")
    }()

    log.Printf("got Connection, getting Channel")
    c.channel, err = c.conn.Channel()
    if err != nil {
        return fmt.Errorf("Channel: %s", err)
    }

    log.Printf("got Channel, declaring Exchange (%q)", c.exchange)
    if err = c.channel.ExchangeDeclare(
        c.exchange,     // name of the exchange
        c.exchangeType, // type
        true,           // durable
        false,          // delete when complete
        false,          // internal
        false,          // noWait
        nil,            // arguments
    ); err != nil {
        return fmt.Errorf("Exchange Declare: %s", err)
    }

    return nil
}

// AnnounceQueue sets the queue that will be listened to for this
// connection...
func (c *Consumer) AnnounceQueue(queueName, bindingKey string) (<-chan amqp.Delivery, error) {
    log.Printf("declared Exchange, declaring Queue %q", queueName)
    queue, err := c.channel.QueueDeclare(
        queueName, // name of the queue
        true,      // durable
        false,     // delete when usused
        false,     // exclusive
        false,     // noWait
        nil,       // arguments
    )

    if err != nil {
        return nil, fmt.Errorf("Queue Declare: %s", err)
    }

    log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
        queue.Name, queue.Messages, queue.Consumers, bindingKey)

    // Qos determines the amount of messages that the queue will pass to you before
    // it waits for you to ack them. This will slow down queue consumption but
    // give you more certainty that all messages are being processed. As load increases
    // I would reccomend upping the about of Threads and Processors the go process
    // uses before changing this although you will eventually need to reach some
    // balance between threads, procs, and Qos.
    err = c.channel.Qos(50, 0, false)
    if err != nil {
        return nil, fmt.Errorf("Error setting qos: %s", err)
    }

    if err = c.channel.QueueBind(
        queue.Name, // name of the queue
        bindingKey, // bindingKey
        c.exchange, // sourceExchange
        false,      // noWait
        nil,        // arguments
    ); err != nil {
        return nil, fmt.Errorf("Queue Bind: %s", err)
    }

    log.Printf("Queue bound to Exchange, starting Consume (consumer tag %q)", c.consumerTag)
    deliveries, err := c.channel.Consume(
        queue.Name,    // name
        c.consumerTag, // consumerTag,
        false,         // noAck
        false,         // exclusive
        false,         // noLocal
        false,         // noWait
        nil,           // arguments
    )
    if err != nil {
        return nil, fmt.Errorf("Queue Consume: %s", err)
    }

    return deliveries, nil
}

// Handle has all the logic to make sure your program keeps running
// d should be a delievey channel as created when you call AnnounceQueue
// fn should be a function that handles the processing of deliveries
// this should be the last thing called in main as code under it will
// become unreachable unless put int a goroutine. The q and rk params
// are redundent but allow you to have multiple queue listeners in main
// without them you would be tied into only using one queue per connection
func (c *Consumer) Handle(
    d <-chan amqp.Delivery,
    fn func(<-chan amqp.Delivery),
    threads int,
    queue string,
    routingKey string) {

    var err error

    for {
        for i := 0; i < threads; i++ {
            go fn(d)
        }

        // Go into reconnect loop when
        // c.done is passed non nil values
        if <-c.done != nil {
            d, err = c.ReConnect(queue, routingKey)
            if err != nil {
                // Very likely chance of failing
                // should not cause worker to terminate
                log.Fatalf("Reconnecting Error: %s", err)
            }
        }
        log.Println("Reconnected... possibly")
    }
}

and this would be a simple implementation using it.

package main

import (
    "encoding/json"
    "flag"
    "io/ioutil"
    "log"
    "net/http"
    "net/url"
    "os"
    "runtime"
    "strings"

    "github.com/file/above"
    "github.com/streadway/amqp"
)

var (
    uri          = flag.String("uri", "amqp://something", "The rabbitmq endpoint")
    formURL      = flag.String("form_url", "http://localhost", "The URL that requests are sent to")
    logFile      = flag.String("log_file", "golang_worker.log", "The file where errors are logged")
    threads      = flag.Int("threads", 1, "The max amount of go routines that you would like the process to use")
    maxprocs     = flag.Int("max_procs", 1, "The max amount of processors that your application should use")
    paymentsKey  = flag.String("payments_key", "secret", "Access key")
    exchange     = flag.String("exchange", "something", "The exchange we will be binding to")
    exchangeType = flag.String("exchange_type", "topic", "Type of exchange we are binding to | topic | direct| etc..")
    queue        = flag.String("queue", "some.queue", "Name of the queue that you would like to connect to")
    routingKey   = flag.String("routing_key", "some.queue", "queue to route messages to")
    workerName   = flag.String("worker_name", "worker.name", "name to identify worker by")
    verbosity    = flag.Bool("verbos", false, "Set true if you would like to log EVERYTHING")

    // Hold consumer so our go routine can listen to
    // it's done error chan and trigger reconnects
    // if it's ever returned
    conn *consumers.Consumer
)

func init() {
    flag.Parse()
    runtime.GOMAXPROCS(*maxprocs)
}

func main() {
    // Open a system file to start logging to
    f, err := os.OpenFile(*logFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
    defer f.Close()
    if err != nil {
        log.Printf("error opening file: %v", err.Error())
    }
    log.SetOutput(f)

    conn := consumers.NewConsumer(*workerName, *uri, *exchange, *exchangeType, *queue)

    if err := conn.Connect(); err != nil {
        log.Printf("Error: %v", err)
    }

    deliveries, err := conn.AnnounceQueue(*queue, *routingKey)
    if err != nil {
        log.Printf("Error when calling AnnounceQueue(): %v", err.Error())
    }

    conn.Handle(deliveries, handler, *threads, *queue, *routingKey)
}

func handler(deliveries <-chan amqp.Delivery) {

    for d := range deliveries {
        formData := &Data{}

        err := json.Unmarshal(d.Body, formData)
        if err != nil {
            log.Printf("Error unmarshaling data: %s", err.Error())
        }

        resp, err := makeRequest(formData)
        if err != nil {
            log.Printf("Error posting form data: %s", err.Error())
        }

        body, err := ioutil.ReadAll(resp.Body)
        if err != nil {
            log.Printf("Error reading response body %s", err.Error())
        }

        // Turn on verbose if you are having issues and would like to
        // see everything that is being consumed and everything that
        // is being reported back to you by the server.
        if *verbosity {
            log.Println("-------DEBUG--------")
            log.Println("JSON from QUEUE: ", string(d.Body))
            log.Println("Response: ", resp.StatusCode)
            log.Println("Response Body: ", string(body))
            log.Println("------END DEBUG-----")
        }

        // Only ack on 200 or 400
        if resp.StatusCode == 200 || resp.StatusCode == 400 {
            d.Ack(false)
        } else {
            d.Nack(false, true)
        }
    }

    return
}

// Data struct
type Data struct {
    // Secret Stuff
}

func makeRequest(data *Data) (*http.Response, error) {
    // Secret Stuff
}

All 11 comments

After reconnect there are two concerns - establish the topology and establish the messaging pattern. Both are application concerns. Do you have an example of what you'd like to see?

I actually figured out a nice way to handle this and instead of trying to bend this library to do this I think adding an example of how you might go about handling failover may be beneficial. I can send you a pull request with that file shortly.

@michaeljs1990 I'm working on implementing reconnecting in my client code. Can you post that file?

@icholy sorry missed this. Here is a small wrapper I wrote for this library that handles reconnect support.

This could be made a lot nice by making an interface and then use this as the standard implementation. Since likely only the connect function will be what most people will end up changing.

package consumers

import (
    "errors"
    "fmt"
    "log"
    "time"

    "github.com/streadway/amqp"
)

// Consumer holds all infromation
// about the RabbitMQ connection
// This setup does limit a consumer
// to one exchange. This should not be
// an issue. Having to connect to multiple
// exchanges means something else is
// structured improperly.
type Consumer struct {
    conn         *amqp.Connection
    channel      *amqp.Channel
    done         chan error
    consumerTag  string // Name that consumer identifies itself to the server with
    uri          string // uri of the rabbitmq server
    exchange     string // exchange that we will bind to
    exchangeType string // topic, direct, etc...
    bindingKey   string // routing key that we are using
}

// NewConsumer returns a Consumer struct
// that has been initialized properly
// essentially don't touch conn, channel, or
// done and you can create Consumer manually
func NewConsumer(
    consumerTag,
    uri,
    exchange,
    exchangeType,
    bindingKey string) *Consumer {
    return &Consumer{
        consumerTag:  consumerTag,
        uri:          uri,
        exchange:     exchange,
        exchangeType: exchangeType,
        bindingKey:   bindingKey,
        done:         make(chan error),
    }

}

// ReConnect is called in places where NotifyClose() channel is called
// wait 30 seconds before trying to reconnect. Any shorter amount of time
// will  likely destroy the error log while waiting for servers to come
// back online. This requires two parameters which is just to satisfy
// the AccounceQueue call and allows greater flexability
func (c *Consumer) ReConnect(queueName, bindingKey string) (<-chan amqp.Delivery, error) {
    time.Sleep(30 * time.Second)

    if err := c.Connect(); err != nil {
        log.Printf("Could not connect in reconnect call: %v", err.Error())
    }

    deliveries, err := c.AnnounceQueue(queueName, bindingKey)
    if err != nil {
        return deliveries, errors.New("Couldn't connect")
    }

    return deliveries, nil
}

// Connect to RabbitMQ server
func (c *Consumer) Connect() error {

    var err error

    log.Printf("dialing %q", c.uri)
    c.conn, err = amqp.Dial(c.uri)
    if err != nil {
        return fmt.Errorf("Dial: %s", err)
    }

    go func() {
        // Waits here for the channel to be closed
        log.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error)))
        // Let Handle know it's not time to reconnect
        c.done <- errors.New("Channel Closed")
    }()

    log.Printf("got Connection, getting Channel")
    c.channel, err = c.conn.Channel()
    if err != nil {
        return fmt.Errorf("Channel: %s", err)
    }

    log.Printf("got Channel, declaring Exchange (%q)", c.exchange)
    if err = c.channel.ExchangeDeclare(
        c.exchange,     // name of the exchange
        c.exchangeType, // type
        true,           // durable
        false,          // delete when complete
        false,          // internal
        false,          // noWait
        nil,            // arguments
    ); err != nil {
        return fmt.Errorf("Exchange Declare: %s", err)
    }

    return nil
}

// AnnounceQueue sets the queue that will be listened to for this
// connection...
func (c *Consumer) AnnounceQueue(queueName, bindingKey string) (<-chan amqp.Delivery, error) {
    log.Printf("declared Exchange, declaring Queue %q", queueName)
    queue, err := c.channel.QueueDeclare(
        queueName, // name of the queue
        true,      // durable
        false,     // delete when usused
        false,     // exclusive
        false,     // noWait
        nil,       // arguments
    )

    if err != nil {
        return nil, fmt.Errorf("Queue Declare: %s", err)
    }

    log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
        queue.Name, queue.Messages, queue.Consumers, bindingKey)

    // Qos determines the amount of messages that the queue will pass to you before
    // it waits for you to ack them. This will slow down queue consumption but
    // give you more certainty that all messages are being processed. As load increases
    // I would reccomend upping the about of Threads and Processors the go process
    // uses before changing this although you will eventually need to reach some
    // balance between threads, procs, and Qos.
    err = c.channel.Qos(50, 0, false)
    if err != nil {
        return nil, fmt.Errorf("Error setting qos: %s", err)
    }

    if err = c.channel.QueueBind(
        queue.Name, // name of the queue
        bindingKey, // bindingKey
        c.exchange, // sourceExchange
        false,      // noWait
        nil,        // arguments
    ); err != nil {
        return nil, fmt.Errorf("Queue Bind: %s", err)
    }

    log.Printf("Queue bound to Exchange, starting Consume (consumer tag %q)", c.consumerTag)
    deliveries, err := c.channel.Consume(
        queue.Name,    // name
        c.consumerTag, // consumerTag,
        false,         // noAck
        false,         // exclusive
        false,         // noLocal
        false,         // noWait
        nil,           // arguments
    )
    if err != nil {
        return nil, fmt.Errorf("Queue Consume: %s", err)
    }

    return deliveries, nil
}

// Handle has all the logic to make sure your program keeps running
// d should be a delievey channel as created when you call AnnounceQueue
// fn should be a function that handles the processing of deliveries
// this should be the last thing called in main as code under it will
// become unreachable unless put int a goroutine. The q and rk params
// are redundent but allow you to have multiple queue listeners in main
// without them you would be tied into only using one queue per connection
func (c *Consumer) Handle(
    d <-chan amqp.Delivery,
    fn func(<-chan amqp.Delivery),
    threads int,
    queue string,
    routingKey string) {

    var err error

    for {
        for i := 0; i < threads; i++ {
            go fn(d)
        }

        // Go into reconnect loop when
        // c.done is passed non nil values
        if <-c.done != nil {
            d, err = c.ReConnect(queue, routingKey)
            if err != nil {
                // Very likely chance of failing
                // should not cause worker to terminate
                log.Fatalf("Reconnecting Error: %s", err)
            }
        }
        log.Println("Reconnected... possibly")
    }
}

and this would be a simple implementation using it.

package main

import (
    "encoding/json"
    "flag"
    "io/ioutil"
    "log"
    "net/http"
    "net/url"
    "os"
    "runtime"
    "strings"

    "github.com/file/above"
    "github.com/streadway/amqp"
)

var (
    uri          = flag.String("uri", "amqp://something", "The rabbitmq endpoint")
    formURL      = flag.String("form_url", "http://localhost", "The URL that requests are sent to")
    logFile      = flag.String("log_file", "golang_worker.log", "The file where errors are logged")
    threads      = flag.Int("threads", 1, "The max amount of go routines that you would like the process to use")
    maxprocs     = flag.Int("max_procs", 1, "The max amount of processors that your application should use")
    paymentsKey  = flag.String("payments_key", "secret", "Access key")
    exchange     = flag.String("exchange", "something", "The exchange we will be binding to")
    exchangeType = flag.String("exchange_type", "topic", "Type of exchange we are binding to | topic | direct| etc..")
    queue        = flag.String("queue", "some.queue", "Name of the queue that you would like to connect to")
    routingKey   = flag.String("routing_key", "some.queue", "queue to route messages to")
    workerName   = flag.String("worker_name", "worker.name", "name to identify worker by")
    verbosity    = flag.Bool("verbos", false, "Set true if you would like to log EVERYTHING")

    // Hold consumer so our go routine can listen to
    // it's done error chan and trigger reconnects
    // if it's ever returned
    conn *consumers.Consumer
)

func init() {
    flag.Parse()
    runtime.GOMAXPROCS(*maxprocs)
}

func main() {
    // Open a system file to start logging to
    f, err := os.OpenFile(*logFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
    defer f.Close()
    if err != nil {
        log.Printf("error opening file: %v", err.Error())
    }
    log.SetOutput(f)

    conn := consumers.NewConsumer(*workerName, *uri, *exchange, *exchangeType, *queue)

    if err := conn.Connect(); err != nil {
        log.Printf("Error: %v", err)
    }

    deliveries, err := conn.AnnounceQueue(*queue, *routingKey)
    if err != nil {
        log.Printf("Error when calling AnnounceQueue(): %v", err.Error())
    }

    conn.Handle(deliveries, handler, *threads, *queue, *routingKey)
}

func handler(deliveries <-chan amqp.Delivery) {

    for d := range deliveries {
        formData := &Data{}

        err := json.Unmarshal(d.Body, formData)
        if err != nil {
            log.Printf("Error unmarshaling data: %s", err.Error())
        }

        resp, err := makeRequest(formData)
        if err != nil {
            log.Printf("Error posting form data: %s", err.Error())
        }

        body, err := ioutil.ReadAll(resp.Body)
        if err != nil {
            log.Printf("Error reading response body %s", err.Error())
        }

        // Turn on verbose if you are having issues and would like to
        // see everything that is being consumed and everything that
        // is being reported back to you by the server.
        if *verbosity {
            log.Println("-------DEBUG--------")
            log.Println("JSON from QUEUE: ", string(d.Body))
            log.Println("Response: ", resp.StatusCode)
            log.Println("Response Body: ", string(body))
            log.Println("------END DEBUG-----")
        }

        // Only ack on 200 or 400
        if resp.StatusCode == 200 || resp.StatusCode == 400 {
            d.Ack(false)
        } else {
            d.Nack(false, true)
        }
    }

    return
}

// Data struct
type Data struct {
    // Secret Stuff
}

func makeRequest(data *Data) (*http.Response, error) {
    // Secret Stuff
}

I don't think reconnect is responsibility of this package. This package it pretty low-level and follow the AMQP protocol in it's API. We should create high-level wrapper around it.

I deal with reconnecting in my high-level library that uses amqp. See here:

https://github.com/RichardKnop/machinery/blob/master/v1/worker.go#L36

I think it should not be responsibility of this package.

so I wrote little but handy wrapper around this issue: https://github.com/assembla/cony

I believe automatic connection recovery is excluded from this library by design. There are examples and other libraries that cover this.

@michaeljs1990 question about your gist, since you're passing the d <- amqp.Delivery into multiple go routines, does that mean all those routines will share reading from d? Better yet, how would you refactor this if this was supposed to be a publisher instead of a consumer?

My take on the issue: https://github.com/makasim/amqpextra

The approach idea is to add as little abstraction as possible. In your code instead of using *amqp.Connection you should use <-chan *amqp.Connection. The channel returns a healthy connection. You should subscribe to chan *amqp.Error to get notified when a connection is not healthy anymore and you should request a new one via <-chan *amqp.Connection. The channel <-chan *amqp.Connection is closed when you explicitly closed it by calling connextra.Close() method, otherwise, it tries to reconnect in background. Example.

For anyone still interested in this, I am working on a drop-in replacement for this package with automatic redials. It's powered by this package under the hood and re-implements the same API. It also has method middleware and some other goodies.

https://github.com/peake100/rogerRabbit-go#readme

Was this page helpful?
0 / 5 - 0 ratings