Amqp: Channel is not thread safe

Created on 7 Oct 2014  路  17Comments  路  Source: streadway/amqp

Channel.shutdown(*error) function sets me.send function using lock. However Ack, Nack, Reject functions are not using the same lock.

Is the Channel supposed to be thread safe? Simple fix would be just add locking to those methods missing it, but I am not sure if there is something else behind this.

Most helpful comment

Actually, I think part of my problem was misunderstanding the parameter that Ack took. I was calling delivery.Ack(true) thinking that was what I needed to do, but it turns out that was acking messages that were running in other goroutines. I changed that to delivery.Ack(false) and I think that fixed my issue.

All 17 comments

Channel and Connection are intended to be thread safe. Thanks for finding this. Indeed the simple fix would be to hold the lock.

This might not be so simple fix. It seems that call function also uses send and does not use "m" mutex. However Confirm method use "m" lock when making using call-function. Thus one can not simply add locking to call method.

Only place where send method seems to be changing is when channel is closed. Thus errors caused by this problem seems unlikely if channel is only used from one goroutine.

Would it make sense just declare Channel not thread safe and remove locking? It would make the internal working of the library faster in general and less error prone (in this rare case)?

I personally saw massive performance improvements by using a channel per go routine. I allocated a channel then passed it into my function which was spawned as a go routine. If the connection or channel had an error I'd just clean up all the go routines and re-spawn them.

Using this method it made my code much cleaner to deal with than having to juggle whether the connection and/or channel were still valid.

Removing the mutex and saying that channels are not thread safe would surely increase performance for my use case.

So, ACKing messages (that are received via same delivery channel) from different goroutines cause any problem?

@cenkalti yeah, i'm running into that problem.

@CrackerJackMack would you mind sharing some code around what you're doing to get around this? I was hoping that I could spawn a goroutine for each amqp.Delivery received through the channel, but if I tried to Ack them, everything blows up.

@bmortin just create go chan for the ack messages and in spawned go routine just "push" ack message to this chan which is read by the goroutine responisible for handling the AMQP channel.

Something like this (I pulled it out from my sleeve so, probably won't compile):

ackChan := make(chan int) // or something
for {
   select {
    case m := <-fromAmqp:
        go handle(m,ackChan)
    case m := <-ackChan:
        amqpChannel.Ack(m)
   }
}

So, should this be solved by improving synchronization and remove raceconditions or remove partial synchronization?

I'll give this some time today and tomorrow to fix.

Actually, I think part of my problem was misunderstanding the parameter that Ack took. I was calling delivery.Ack(true) thinking that was what I needed to do, but it turns out that was acking messages that were running in other goroutines. I changed that to delivery.Ack(false) and I think that fixed my issue.

Some sample code:

func main() {

  // ...

  log.Printf("Waiting for messages...")
  for {
    d, ok := <-c.Deliveries
    if !ok {
      break
    }
    go processMessage(d)
  }

  // ...

}

func processMessage(d amqp.Delivery) {
  log.Printf("[%v] %q", d.DeliveryTag, d.Body)
  d.Ack(false)
}

@bmorton you can't ack deliveries on a different channel. In your code example all goroutines use the same channel: were you running into "double acknowledgements"?

Yeah, I was ack'ing things before the goroutine started processing some messages which was giving me some empty messages which became panics at some point.

Double acking should result in a [RabbitMQ] channel error. I'm guessing this leads to the deliveries [Go] channe to producel nils? We have some conflicting terminology here :)

@pnuz3n update: I've put together an Ack race in https://github.com/streadway/amqp/compare/ack-race-119 but am failing to detect the race with go test -cpu 8 -race. I'll try with getting a shutdown() in there somehow.

The race is now in the ack-race-119 branch. I don't see a simple fix due to Connection's reentrant call from Connection.send back to Channel.shutdown so I can't hold a lock on Channel.send's mutation.

I will need to sleep on this. Suggestions welcome for any approaches.

  • Combined sendOpen and sendClosed methods into one (send).
  • Added new state shutdowned bool (sorry for weird name, couldn't come up with better one)

Assigning functions to variables are cool but personally I don't like this feature. Making send a regular method has fixed the weird calling form as a side effect me.send(me, ...)

What do you think? Is it a good solution?

This issue seems to be very broad. Discussing concurrency hazard safety without specific scenarios is going to be very time consuming at best. Most clients do not support concurrent publishing on a shared channel: this is not unreasonable.

If there are specific examples that are still not covered over 2 years later, please file new, more focussed issues. Thank you.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

vibridi picture vibridi  路  7Comments

guilhem picture guilhem  路  5Comments

rlk833 picture rlk833  路  9Comments

parthibd picture parthibd  路  3Comments

eimugray picture eimugray  路  12Comments