Amqp: Channels do not have a per-operation timeout

Created on 13 Jul 2017  路  22Comments  路  Source: streadway/amqp

Hi,

I think I've found a bug with Connection.Channel(): sometimes the call is blocked indefinitely.

My scenario:
I've got a queue in a vhost. I start to consume on this queue.
Then I delete the vhost from the RabbitMQ web interface. Obviously, the channel is closed.

In my code, I retry in an infinite loop to open a new channel, then to consume on this channel.
Consume() returns an error:
Exception (403) Reason: "ACCESS_REFUSED - access to queue 'xxxx' in vhost 'xxxx' refused for user 'guest'" (it's expected)
After a few seconds, my program is blocked indefinitely.

I think that the RabbitMQ server has closed the connection from its side, because I don't see the connection in the web interface anymore.
Nothing relevant in the logs.

Here is the stack trace for the goroutine:

goroutine 34 [select]:
github.com/streadway/amqp.(*Channel).call(0xc42025b8c0, 0xa43020, 0xc4202690b0, 0xc42044bd70, 0x1, 0x1, 0x1, 0xc4203074a0)
    github.com/streadway/amqp/channel.go:176 +0x198
github.com/streadway/amqp.(*Channel).open(0xc42025b8c0, 0xc42025b8c0, 0x0)
    github.com/streadway/amqp/channel.go:165 +0xb3
github.com/streadway/amqp.(*Connection).openChannel(0xc420295a40, 0xc420024600, 0x77e7a0, 0x8a2480)
    github.com/streadway/amqp/connection.go:624 +0x55
github.com/streadway/amqp.(*Connection).Channel(0xc420295a40, 0xa42820, 0xc42033b4c0, 0xc420295a40)
    github.com/streadway/amqp/connection.go:646 +0x2b
bug

Most helpful comment

Hello! I have almost the same problem. But it seems not to be connected with vhost deletion.
This is my environement: RabbitMQ 3.6.10, Erlang 19.3, topic exchange.

I want to publish a message, and almost every time I call connection.Channel() the program hangs in github.com/streadway/amqp/channel.go:176 +0x44a
This bug is unstable, investigate please. May be line 176 is not the place that need to be fixed and the error in some other place.

By the way with RabbitMQ 3.6.6, Erlang 19.2 driver works fine.

All 22 comments

More info:

  • Go 1.8.3 / 1.9 beta2
  • Latest version of streadway/amqp
  • RabbitMQ 3.6.10

RabbitMQ 3.6.x will NOT close connections to a vhost that is deleted (3.7.0 will). So I'd expect something similar since the client isn't notified unless it attempts to perform an operation.
How exactly it would run into an error remains to be seen.

Please post your code or a small snippet that can be used to reproduce.
A traffic capture would help immensely.

This should be a non-issue with 3.7.0 since clients will observe a server-initiated connection closure.

The client is blocked in a select from <-ch.errors, which in this case will not get any notifications.

func (ch *Channel) call(req message, res ...message) error {
    if err := ch.send(req); err != nil {
        return err
    }

    if req.wait() {
        select {
        case e, ok := <-ch.errors:

Other clients (Java, .NET, Ruby) have a timeout for such RPC operations. @gerhard how do you think we should introduce a timeout here?

Could we add a third clause in the select ?

I will submit code later.

I would go with:

func (ch *Channel) call(req message, res ...message) error {
    if err := ch.send(req); err != nil {
        return err
    }

    if req.wait() {
        select {
        case e, ok := <-ch.errors:
            if ok {
                return e
            }
            return ErrClosed

        // If there is no response within a sensible time period, consider the channel closed
        case <-time.After(time.Second * 30):
            return ErrClosed

@pierrre would you be interested in submitting a PR that uses @gerhard's idea above? I can
suggest a sensible timeout: the default heartbeat timeout used by this library + 1-5 seconds. In other clients it varies from 30 to 60 seconds (and default heartbeat interval is 60 seconds) but ultimately it's the heartbeat mechanism that should dictate it, so I think we will bump it in other clients eventually.

Agreed with @michaelklishin to:

  • set the timeout to the heartbeat interval
  • maybe introduce ErrChannelOperationTimeout or OperationTimeout. In Ruby & Java we use TimeoutException

Hello! I have almost the same problem. But it seems not to be connected with vhost deletion.
This is my environement: RabbitMQ 3.6.10, Erlang 19.3, topic exchange.

I want to publish a message, and almost every time I call connection.Channel() the program hangs in github.com/streadway/amqp/channel.go:176 +0x44a
This bug is unstable, investigate please. May be line 176 is not the place that need to be fixed and the error in some other place.

By the way with RabbitMQ 3.6.6, Erlang 19.2 driver works fine.

@shilkin asking others to investigate is not how open source software works. We've explained what this specific issue is about and how other clients address it. Virtual host deletion is only one specific scenario. There's even a snippet of roughly how the same can be done in this client. Feel free to submit a PR if this is a pressing issue for you.

This was discussed on our maintainer call today with @gerhard and @streadway. We agreed that

  • a temporary solution would be a timeout enforced as in @gerhard's snippet above
  • eventually context is the right way to address this

Whether there should be one configurable timeout for both heartbeats and channel operations or two timeouts (that are related to each other in a way) is TBD. FWIW several other clients use separate timeouts.

@michaelklishin Thank you for reply. I'll try to figure out what goes wrong.

I've got the same issue in production, is there estimate to when this issue will be closed?

@coyle when someone contributes a fix and we accept it. There is a snippet above that demonstrates roughly what should be done (or at least what several other clients do). Feel free to turn that into a pull request and test it with your workload.

Hi,
I created a PR for this(essentially adding a timeout for the channel rpc operations). However, I am now wondering if there could be other side effects if this is done in isolation.

If we exit the above mentioned select loop due to a timeout and the response arrives right after this point, the go routine invoking channel:dispatch() will hang till a select is done on channel:rpc again(due to a different invocation which would now get the older response). So, i think there's probably more that needs to be done here. I'll look into the code again. In the meantime, if you have any comments/suggestions on this, please do share.

@ponumapathy thank you for looking into this. Other clients basically discard late responses (not necessarily gracefully). I'm not necessarily experienced enough to judge what's the best way to do this in Go for a project not using context.

@streadway and I discussed this at today's maintainer hangout. The snippet in https://github.com/streadway/amqp/issues/278#issuecomment-315144664 looks good but needs an explicit timer disposal, otherwise there will be a subtle memory leak on the hot code path.

I will take a look into putting together a PR.

Sure. I was thinking of something on these lines(this is in my fork currently. I can create a PR if required):
(having trouble with the formatting when i paste the diff here. so, formatted to make it as readable as possible)

\-\-\- a/channel.go
+++ b/channel.go
@@ -9,7 +9,6 @@ import (
"reflect"
"sync"
"sync/atomic"
- "time"
)

@@ \-30,7 +29,6 @@ type Channel struct {
m sync.Mutex // struct field mutex
confirmM sync.Mutex // publisher confirms state mutex
notifyM sync.RWMutex
- rpcM sync.Mutex

@@ \-170,11 +168,6 @@ func (ch *Channel) open() error {
// Performs a request/response call for when the message is not NoWait and is
// specified as Synchronous.
func (ch *Channel) call(req message, res ...message) error {
- if req.wait() {
- ch.rpcM.Lock()
- defer ch.rpcM.Unlock()
- }
-
if err := ch.send(req); err != nil {
return err
}
@@ -204,15 +197,6 @@ func (ch *Channel) call(req message, res ...message) error {
// error on the Connection. This indicates we have already been
// shutdown and if were waiting, will have returned from the errors chan.
return ErrClosed
- case <-time.After(ch.connection.Config.Heartbeat):
- ch.transition((*Channel).recvMethod)
- // drain any stale response that might have arrived before the state transition happened
- select {
- case <-ch.rpc:
- case <-time.After(time.Millisecond):
- }
-
- return ErrChannelOpTimeout
}
}

Addressed by #304.

Thanks ! 馃憤

Hi, I just wanted to mention some observations on our setup post this fix.

Since the default heartbeat value in this client is 10 seconds, I see that the channel operations are more sensitive to intermittent network lags/errors and during times when the rabbitmq server might be busy.

Specifically, we see channel timeouts happening at times when we are creating a large number of channels(we are trying to optimize this to instead use a pool of reusable channels - while publishing - instead of creating and closing a channel for every publish).

Just wanted to bring this up(though its not something that happens frequently) since this behavior is new post this fix. I am wondering if we should use a separate, higher timeout value for this purpose.

The other side of the argument would be for the client to set an appropriate heartbeat value for the use case/environment(which is what we are going to do now, btw). If that's the way forward, the documentation should probably call that out explicitly.

Please do let me know your thoughts on this.

Hi,
I was reviewing my fix that got merged, in the context of the behavior i mentioned above and i think the merged fix would have the same issue as the originally discussed fix - https://github.com/streadway/amqp/issues/278#issuecomment-335657483.

In the fix that got merged, i made a wrong assumption that the RPC response frame types would be of type 'header' instead of 'method'. So, in the case of a timeout, a state transition was made to only expect a 'method' frame in that channel(which would have the effect of dropping header frames if they arrive later than that). However, the RPC response frame types would also be 'method' frames. So, that code is essentially a no-op in that regard.

I am wondering if we should revert the merge, given that the originally discussed timer leak would be present and with the default timeout value being 10s, this can potentially show up more often.

Sorry about this, btw. I guess its good that i realized this now rather than later, though. :)

hi bro, is there any improve to this problem? because im stuck in the same situation.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

arulrajalivi picture arulrajalivi  路  11Comments

auyer picture auyer  路  4Comments

pnuz3n picture pnuz3n  路  17Comments

fho picture fho  路  7Comments

CodingAgainstChaos picture CodingAgainstChaos  路  3Comments