Describe the bug
We've encountered an issue in which acknowledged messages are redelivered one or more times after other messages are negatively acknowledged. This seems to occur when messages are produced in batches. This happens in the absence of any known broker or connection failures.
To Reproduce
I've modified the NegativeAcksTest to test for the correct behavior here: https://github.com/apache/pulsar/compare/master...gmethvin:negative-ack-duplicates
As the test demonstrates, in some configurations positively acknowledged messages are redelivered. This is similar to a situation we see in production.
Expected behavior
Only the negatively acknowledged messages should be redelivered. Positively acknowledged messages should not be redelivered, at least not in a typical situation with no failures.
We produce messages in batches, but both the APIs and the documentation suggest that both acks and negative acks act on a per-message level. If negative acks act on batches, then the APIs and documentation should be changed to clearly indicate that.
I think the fundamental problem of #5891 #5969 is the cursor in tracking at batch level not at message level. Hence failures can result in redelivering the whole batch. In order to address the fundamental problem here, we need to improve the cursor tracking at the message level in the broker side.
Loop in @jiazhai @codelipenghui in the discussion.
@sijie @gmethvin we could track cursors at the batchLevel i.e. track cursors on the granularity of LedgerId:EntryId:BatchIndex, however there are performance implications if we would want to redeliver only messages that are NOT ACK ed or NACK ed. To do so would require us to de-serialized BK entries on the broker side, filter the messages in the entry, re-serialize the message, send it to the client, and finally the client with have to deserialize. To do this would entail more latency and 2X cpu effort for serialization and de-serialization. We have generally avoid entry inspection in the broker for such a reason.
The question I have is if a user really wants to get only the messages that are NOT ACK ed or NACK ed, can the user just turn off batching? Or if the user wants fewer duplicates from redelivered messages/batches, can the user just tune the batch size to be smaller? Are these good enough workflows or knobs to turn for users to satisfy these use cases?
Or is this also just a documentation / education issue? The docs are NOT very clear on what the expected behavior of ACKs or NACKs when batching is enabled.
@merlimat can you also chime in?
@jerrypeng there are many ways to avoid serialization and deserialization. Broker can deliver a bit metadata along with the batches to tell the client what messages are acked. Clients can filter out the already delivered messages during deserialization at client side.
The ability to control cursor tracking at the message level is a requirement for transactions anyway. We'd resolve the problem in single place.
@sijie isn't that design just trading cpu usage for network usage? If you don't filter on the broker side and resend the whole batch to be filtered on the client side, there will be network implications especially if batches are big.
@jerrypeng yes it is a trade off here - whose resource to be used for this task. However i think the core problem here isn鈥檛 the resource problem so far. It is a semantic issue. Redelivering batches mostly are okay. But people doesn鈥檛 have the knowledge of which messages have acknowledged when a batch is redelivered. Semantic issue is what most of people care about first.
I agree @sijie. The main issue is that the API semantics don't match what users expect. If the negativeAcknowledge API acts on a single message, then it is a very surprising behavior for all the messages in the batch to be redelivered. If on the other hand the negativeAcknowledge API applies to a batch, then the current behavior would be more understandable. I would argue that single-message semantics are much more useful for the user though.
@jerrypeng @sijie @gmethvin I agree that redelivering batches is OK in most cases. Redelivery of unacked/nacked messages is the exception, not the rule, but when it occurs it should behave in a way that makes sense to the user of the API.
It's not that hard for the client to filter out the already acked messages from the batch. It is already keeping track of this so it knows when it can ack the batch back to broker. It's just a matter of using this information to filter received messages.
That approach doesn't cover all failure cases (ex if client restarts), but in the case where the application is NACKing a message, it would give reasonable behavior. Plus it will reduce the number of duplicate messages that get sent to the client when using batch messages.
@zzzming and I are happy to work on a PR for this if everyone agrees this is the right approach.
Oh, since it is also related to transaction implementation, Penghui and me are already writing a PIP for this. We are going to share the PIP soon.
Most helpful comment
Oh, since it is also related to transaction implementation, Penghui and me are already writing a PIP for this. We are going to share the PIP soon.