Amqp: Can find no way to determine delivery tag on a publish

Created on 17 Jul 2017  Â·  9Comments  Â·  Source: streadway/amqp

I need to publish multiple concurrently. I can't wait on each publish for confirmation before allowing any other publish to occur. But Publish method doesn't return any information as to what the delivery tag will be, so that I can put in a listener for just that tag, while other publishes can continue and get their own tags.

It seems like I have to duplicate the delivery tag creation myself and hope I locked correctly and that the tag is the same as what the code is using.

Am I missing something?

Thanks, Rich K.

Most helpful comment

I'm using channel.NotifyPublish(), channel.Confirm().

The problem is when I do channel.Publish() I don't know what the delivery tag will be. So I can't tell when that published message is confirmed in the NotifyPublish channel.

The examples only show doing Publish sequentially, i.e. Publish, wait for confirm, then go to next Publish.

My code can't do that, it needs to Publish and go on and not wait at that time for confirmation before any other Publish can occur. It will then handle later from the notify channel if the message couldn't be published.

All 9 comments

There is no response for basic.publish in the protocol. Publisher confirms arrive asynchronously and in this client, handled via a Go channel.

Publisher confirms have a separate test suite, too.

Lastly, other clients (Java, .NET, Bunny, …) expose the counter that is incremented after publishing on a channel that has confirms enabled. The only thing that complicates it, as the docs mention, is when multiple delivery tags are confirmed at once. That's probably why this client presents a "confirms API" and not just a synchronised counter.

I'm using channel.NotifyPublish(), channel.Confirm().

The problem is when I do channel.Publish() I don't know what the delivery tag will be. So I can't tell when that published message is confirmed in the NotifyPublish channel.

The examples only show doing Publish sequentially, i.e. Publish, wait for confirm, then go to next Publish.

My code can't do that, it needs to Publish and go on and not wait at that time for confirmation before any other Publish can occur. It will then handle later from the notify channel if the message couldn't be published.

Client cannot know what the next delivery tag will be unless it keeps track of them.
Delivery tags are monotonically growing starting from 1 and acks for multiple publishes is the only complication to that.

This client has an abstraction that handles acks for multiple publishes and resequencing for you: the confirms type. It is used internally, your client simply subscribes for notifications over a channel.

While examples and tests for this client often use acknowledgement channel of size 1, that does not have to be the case. You can initialise a counter, publish a batch of size N, increment the counter by N, then wait for an ack or nack with delivery tag N to arrive or channel/connection to be lost.

This client could abstract this further. I'm not in a position to say what are the chances of higher level abstractions to be accepted but if you have specific ideas, feel free to submit a PR to demonstrate them :)

That is basically what I'm doing. I'm reproducing the delivery tag calculation (which streadway/amqp
already internally does itself, it just doesn't return it). So I have to sync on the publish, calculate a new deliverytag for this publish, and then release the lock. Then other publishes can occur. Then I have one routine which processes the confirms as they come back in and notify the correct process that is waiting on each individual delivery tag request.

Unfortunately this package does not use versioning or branches, so some API changes are not an option but perhaps some of the confirms fields can be made public. Feel free to submit a PR and we'll see what the maintainers decide.

I also face the same issue, Is there any chance that we can get this small feature in future? https://github.com/streadway/amqp/issues/476

While examples and tests for this client often use acknowledgement channel of size 1, that does not have to be the case. You can initialise a counter, publish a batch of size N, increment the counter by N, then wait for an ack or nack with delivery tag N to arrive or channel/connection to be lost.

The concern is what if the application restarts, it will lose the counter.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

CodingAgainstChaos picture CodingAgainstChaos  Â·  3Comments

fho picture fho  Â·  7Comments

michaeljs1990 picture michaeljs1990  Â·  11Comments

cenkalti picture cenkalti  Â·  4Comments

kamal-github picture kamal-github  Â·  9Comments