If I hit the consumer with enough messages at once (e.g. 10,000), using the code below the channel closes and will no longer take in new deliveries. However, if I remove d.Ack and just have the body of the delivery being printed, then there's no problem.
func main() {
conn, _ := amqp.Dial(*uri)
channel, _ := conn.Channel()
deliveries, _ := channel.Consume(
*queue, //queue
"",
false,
false,
false,
false,
nil)
for d := range deliveries {
go func(amqp.Delivery) {
log.Printf("got %s", d.Body)
d.Ack(false)
}(d)
}
}
Your anonymous func is closing over d
from the for statement. Try naming the param in scope of your func.
Also you can show asynchronous channel errors like:
go func() {
log.Fatal(<-channel.NotifyClose(make(chan *amqp.Error)))
}()
@MattParker89 see RabbitMQ log, it will contain a channel exception. You are almost certainly double-acking deliveries.
Thanks so much for the help. @michaelklishin I was double-acking. I misunderstood Go anonymous functions. I got it working by breaking out a separate function and using that in the go routine. For anyone else that makes the same mistake, the error I got was Exception (406) Reason: "PRECONDITION_FAILED - unknown delivery tag 24782"
Here's the working code
func main() {
conn, _ := amqp.Dial(*uri)
channel, _ := conn.Channel()
deliveries, _ := channel.Consume(
*queue, //queue
"",
false,
false,
false,
false,
nil)
for d := range deliveries {
go doSomeWork(d)
}
}
func doSomeWork(d amqp.Delivery){
log.Printf("got %s", d.Body)
d.Ack(false)
}
Most helpful comment
Thanks so much for the help. @michaelklishin I was double-acking. I misunderstood Go anonymous functions. I got it working by breaking out a separate function and using that in the go routine. For anyone else that makes the same mistake, the error I got was Exception (406) Reason: "PRECONDITION_FAILED - unknown delivery tag 24782"
Here's the working code