Hello guys, we are observing a weird behavior from message pullers:
We are using Azure Queues to transfer persistent asynchronous messages between grains and worker roles in the system.
Queues are heterogeneous, meaning that unrelated messages travel on the same (few) queues. Messages are dequeued in batches and delivered to different listener grains.
Not all the code triggered by messages is properly re-entrant. Some messages can be retried if they fail, but are guaranteed to fail (throw) if the message re-enters after it succeeds in a previous attempt.
Sometimes, specially when the system is under load, we observe messages re-entering in an infinite loop. Queue sizes grow quickly to abnormal levels (2k and above).
This behavior won't stop until we manually delete the messages in the queue.
Observing the logs, we notice that all kinds of messages are re-entering. This includes:
This behavior is not constant, and we managed to trigger it before, but cannot trigger it at will. We have a theory on what may be the cause, although it is not confirmed yet. It goes like this:
The system is under load, meaning that many messages enter the queue simultaneously.
One of the messages fail (throw) for whatever reason.
Instead of repeating only the failed message, the puller will repeat that message along with a number of other messages that were the queue about the same time as the failed one. This includes messages that have already been processed.
One of the messages is not re-entrant, meaning that it will fail from now on every time it is dequeued. This take us back to the situation in item [2], therefore starting a loop.
More messages join the queue, aggravating the situation, and adding more poisonous messages to the queue in the long run.
We are aware that we need to make changes in our code to manage re-entrance properly everywhere. However, even in that case it seems like the infinite loop as well as repeating successful messages multiple times does not seem to be the appropriate behavior.
Are we missing something? Configuration, design pattern, circuit-breaker?
Thanks
@luciobemquerer we experienced something like that as well in our system, but unfortunately we didn't manage to find out exact conditions when/why it happens. In the end, we've decided to switch to other stream provider implementation.
Thanks for letting us know about the problems your seeing
Some questions/comments:
Messages are dequeued in batches and delivered to different listener grains
Does this mean multiple application messages are packed inside a single azure queue message?
Some messages can be retried if they fail, but are guaranteed to fail (throw) if the message re-enters after it succeeds in a previous attempt.
If an OnNextAsync throws, the provider will attempt to redeliver the message for a configured period of time, 1 minute by default. This behavior is intended to allow more resilience in case of transient failures. If the application does not want the message redelivered, it should swallow the exception (not return it to Orleans streaming). Since azure queue stream provider does not delete messages from the queue until a set has been delivered, this retry logic, depending on azure queue message expiration, can lead to message expiration.
Instead of repeating only the failed message, the puller will repeat that message along with a number of other messages that were the queue about the same time as the failed one.
In the pulling agent, messages are placed in a message cache for delivery and delivered to consumers from the cache. Once a message has been delivered to all of it's consumers (or fails after retries), it can be purged from the cache. The cache purge, however, operates on blocks of messages, not single messages. A block of messages is purged after all of the messages in it have been delivered (or failed after retries). When a block of messages is purged from the cache all of the messages that were successfully delivered are deleted from the queue. If (due to system load, failure retry of multiple events, low processing, ...) a batch of messages is not purged in a timely manner (before the expiration time) then azure queue will redeliver the messages.
For azure queue stream provider to work well under load you may need to be specific about some of the configuration settings.
VisibilityTimeout, this set's the azure queue messages expiration. If a message is in the cache longer than this time, azure queue will duplicate it.
"CacheSize", this is the max number of messages that will be cached for each queue.
"MaxEventDeliveryTime", this controls how long the retry logic will spend redelivering a failed message.
For instance, if a cache is configured to hold 1000 messages, and under load 10% of them may fail repeatedly, and the max delivery time is 1 minute, that's 100 minutes of retries, likely well over the visibility timeout, meaning most messages will be expired and azure queue will resend them, causing the type of death spiral you are seeing.
I suggest preventing the application from throwing exceptions on OnNextAsync unless it wishes to redeliver the messages. I understand this is not always possible, as detecting poison messages or errors due to duplicate processing is not always easy, but to the degree you can do this, it will cut down on retry attempts.
I also suggest considering, under load, how many messages you expect to process, per queue, processing time of those messages and how many delivery failures you'll hit. The VisibilityTimeout needs be greater than the time it takes to process the events in the cache, including the retry attempts.
@jason-bragg , I agree with all your comments about properly tuned configuration.
Still, unrelated to configuration, you wrote:
The cache purge, however, operates on blocks of messages, not single messages. A block of messages is purged after all of the messages in it have been delivered (or failed after retries).
If I remember the code correctly (and I might be wrong, so correct me please if I am), what the above means is:
The application could have published 10 independant msg into the AQ (publishing on AQ stream). These were not related in any way from application perspective and were enqueued as separate msg. But in the pulling agent/cache implementation we put them together in buckets/blocks/batches (we even give them numbers, for some weird reason I never understood - AQ msgs are not ordered by definition). If any of the msgs in the batch/block fails to be processed while the rest succeeded, we will still not delete the successful msgs and will retry to redeliver the whole batch of msgs to the app, just because one of them was bad.
Am I correct?
This sounds very suboptimal to say the least, wrong actually if being fully honest. One bad apple spoils the barrel? Internal implementation details leaking into application semantics?
In particular, in their situation, they have one poison msg. It is OK to try to redeliver it X (maybe even infinite) times (it would be much better to give up after some time and jail it), but punishing other unrelated msgs, even once, does not sound right.
That reminded me the pretty brutal punishment techniques in the Genghis Khan army:
https://en.wikipedia.org/wiki/Mongol_military_tactics_and_organization#Training_and_discipline
"If one soldier ran from danger in battle, he and his nine comrades from the same arban would face the death penalty together."
@gabikliot agree. This is what we are thinking about (Lucio is in Drawboard) and I'm trying to wrap my head around on how we can work with such implementation in the way it doesn't kill our entire app from time to time. And how much code we need to verify and re-check to ensure that it's safe for the given model (and so far - a lot).
I'll get back here to answer some questions about current configuration and details over the weekend, but I think we are mostly running on defaults - haven't tuned much really.
Also correct me if I'm wrong - if we configure batch size to be = 1, so we will "narrow our barrel to a single apple " - this means that only one batch is retrieved from each queue over GetQueueMessagesTimerPeriod ? I.e. if our poll frequency is 3 seconds - pullier will try tro read BatchSize messages from it's queue only once per 3 seconds, right?
Upd: quickly checked the config - as I said, we are running on defaults (we have a good faith that Orleans defaults are better than the ones we can come with :) ), only adjusted that GetQueueMessagesTimerPeriod to be 3 seconds. And with latest changes (after this issue ) we are parting our streams into reasonable smaller sets - we are using 2-4 queues per our logical domain stream, but they still will have messages with more than one type. Given that we are using stream set per deployment - this creates around 30 queues per deployment in test configuration. cleaning them manually is not a pleasant task so we probably won't go over more than 50 queues per deploy on prod
I don't think you can configure this batch. It is something internal. And no, the batch size is unrelated GetQueueMessagesTimerPeriod and how fast we read. The msgs will be retrieved as long as the queue is non empty, back to back. When it becomes empty, we wait GetQueueMessagesTimerPeriod until the next attempt.
The batch I was talking about is an internal data structure in the cache of how msg are stored.
Lets first validate with @jason-bragg if what I wrote is indeed correct.
If yes, then I think the path should be: you don't have to figure out how to work with this model. The model should be fixed as I consider this a bug. Until this is fixed, we may be able to suggest a temporal work around.
But let mot jump the gun and figure out if this is indeed what is going on.
In my comment I referred to a 'block' of messages. I'll use terms from the code instead in case anyone wants to dig in further. For the SimpleQueueCache, which is what the azure queue stream provider uses, we keep all messages read from the queue in CacheBuckets (buckets). These buckets are what I was referring to as 'blocks' of messages. These buckets contain 10 messages. These messages can be for different streams and can be delivered independently. If any delivery failure occurs for any of the messages, only that message will be retried, so the bucketing of messages does not, in general, effect delivery. However, all messages in a bucket are kept in the cache until the entire bucket's messages are delivered (or fail delivery after retries). Once all the messages are delivered, the entire bucket will be purged from the cache and all the messages in it which were successfully delivered will be deleted from the azure queue. Buckets are also only purged in order. This means that retry attempts of previous messages in previous buckets can delay the deletion of messages that have already been delivered.
As an example, consider a cache of 400 messages, and a MaxEventDeliveryTime of 1 second, and a VisibilityTimeout of 30 seconds.
This would mean that their are 40 buckets of messages in the cache.
Now imagine that we're so unfortunate as to get one poison message on the same stream per bucket. That message would take 1 second to fail delivery with retry, per bucket, so the oldest bucket in the cache would be in the cache no less than 40 seconds.
This is over the VisibilityTimeout, so all of the messages in that bucket would be reintroduced by azure queue, even if the other 90% of the messages were delivered in a timely manner with no issues.
If this continued indefinitely, all messages would be repeated indefinitely.
Their is a batching behavior, wherein the failure to process a single event in a batch will trigger the entire batch to be redelivered, but azure queue messages are not 'batched' so that logic is probably not related. I suspect @gabikliot may have confused the cache message 'bucketing' with the message batching that occurs in some stream providers, which is easy to do, but they are independent.
This sounds very suboptimal to say the least.
It 'should' not be required for service engineers to fine tune the azure queue stream provider for their environment, as I am suggesting above. This is minimally a shortcoming in the feature, more accurately, a bug. However, my first goal is to get @centur's service running, even if it takes some custom configuration and/or hacks, but a consideration of how to fix this such that service engineers don't encounter this in the future is also needed.
Side note: GetQueueMessagesTimerPeriod, shouldn't really alter this behavior much.
how we can work with such implementation in the way it doesn't kill our entire app from time to time.
While it's a brute force solution, setting a very large VisibilityTimeout (5-30 minutes?), should mitigate this, assuming that does not have other negative side effects. Azure queue supports a timeout of up to 2 hours, so there is a fair amount of room here.
Thank you @jason-bragg.
I suspect @gabikliot may have confused the cache message 'bucketing' with the message batching that occurs in some stream providers, which is easy to do, but they are independent.
No, I don't think I confused it. I specifically referred to bucketing in the cache, and described exactly the same behavior that you confirmed now. There is indeed much more batching and all kinds of bucketings going on in other places and layers, but I referred to this one.
more accurately, a bug.
Agree.
Also agree on the value of temporal mitigation while a proper full solution is devised.
but a consideration of how to fix this such that service engineers don't encounter this in the future is also needed.
Do you need my help trying to figure out how to solve that "shortcoming/bug" in a simplest way? I can dig in and put some thoughts, if you indeed need/want my help.
@gabikliot I'd very much appreciate your assistance on this, as I don't currently have the time to invest in a 'good' solution. I could come up with a quick fix, but I'd far prefer we do it right, if time allows.
Thanks!
Before diving into fixing this specific issue, I'm wondering what you think about using the same pulling agent logic for ordered recoverable streams like EH and unordered queues like azure storage? Much of the issues in the azure queue stream provider were not originally their. They were introduced over time while adding support for more complex logic needed for ordered recoverable streams. I've been wondering if unordered queues like azure can use a simpler, more stream lined, pulling agent? Thoughts?
I see your point about different agents. I would say that at this point its totally up to you. If you feel it will make it easier to maintain it this way, go ahead. I had a pretty strict preference to avoid significant code duplication. But this does introduce sometimes an unnecessary abstractions and complexity. So in retrospective, maybe it was a wrong decision.
In particular, when we started we envisioned multiple stream provides, but in fact we have only 2, AQ and EH (or do we have more?). How would it look like if there are more than 2? Would there be ordered and un-ordered PersistentStreamProvider base classes, with ordered and un-ordered agents?
I would try to poke and come up with ideas.
Thanks for insightful conversation here. Correct me if'm I'm wrong:
Also I have another question: based on the description on how cache of messages works - this means that if we have poison message - it will never ever be deleted from azure queue, because failure on delivery to the listener after MaxEventDeliveryTime means that the message will not be considered as successful and only successful messages are purged as part of the cache bucket. Is this correct ?
Is it possible to have access to internal information ( azure queue message has a dequeue counter on it IIRC ) in the context of a message ( that delivered to a recipient grain), so grain can make a decision whether to handle input or give up immediately, cause dequeue counter exceeds some reasonable limit ?
I recall that I'd checked that Orleans code one day but couldn't find any obvious ways to pass that information down as part of the strongly typed object. Maybe there is not so explicit way to pass it down ?
Or maybe there is a counter based retry logic that allows us to sack such poisonous messages eventually, say after 5 azure dequeues?
Thank you guys for all the information above. The discussion is spot on the problem we are observing, and definitely helpful for us to move on. I was travelling the last couple of days, so I'm catching up now.
--
Answering the question from @jason-bragg:
Does this mean multiple application messages are packed inside a single azure queue message?
No, each azure queue message contains a single appllication message. When I mentioned batches I was referring to Azure Queues dequeuing multiple messages at once. After reading this thread, it is quite clear that the situation is related to pulling agent cache, therefore, I reckon that sentence can be ignored.
--
I would like your help to clarify a few points:
I gather the current understanding is that successful messages are being repeated because VisibilityTimeout for the Azure Queue is shorter than the time it takes for the Agent to callback the queue informing that the message succeeded or failed. Please correct me if I'm wrong, but I reckon this would not prevent successful messages from being deleted from Azure Queue at some point before the Agent moves to the next "batch", right? So the expected behaviour would be for a successful message to repeat at most X times, where X is the number of Agents working in parallel? The reason I ask is I observed some successful and re-entrant messages to repeat thousands of times, thus as if they were never cleared from Azure Queue until we manually did it.
The suggestion to increase the VisibilityTimeout aims to allow more time for the Agent to process/retry the whole Cache before messages reappear on Azure Queue, thus preventing other agents from pulling the messages again. I wonder if we can achieve the same effect by reducing the CacheSize, and what would be the reason to prefer one solution over the other?
Suppose we have a message that takes too long to process inside the grain, and it reaches the timeout after 30 seconds. How would this time affect the Cache purging routine? Would this 30 second timespan count towards the VisibilityTimeout of current Buckets?
Suppose we have a poison message, that is, it will throw every time it is delivered. Will this message ever be cleared from the Queue? Is there any sort of circuit-break?
@centur
The described behaviour is a bug and will be addressed in the future version of persistent streams pullers
Yes. I'll add an issue.
The main mitigation we can apply now is to increase the VisibilityInterval from defaults to some high values (10mins ? 30 mins ?)
Correct. Though this assumes our theory on the problem is correct. If this mitigation does not have the expected effect, further investigation will be needed.
Is there any valid reason to also fine tune cache size and other mentioned parameters?
Only as needed for your scenario. I attempted to provide enough context for you and your team to make those decisions. If you need more details, please feel free to ask.
This means that if we have poison message - it will never ever be deleted from azure queue.
Correct.
I recall that I'd checked that Orleans code one day but couldn't find any obvious ways to pass that information down as part of the strongly typed object.
In 1.4 we added the ability for users to have direct access to the queue data. This was added to allow services to provide their own serializers, and control the data format (in the queue) themselves, but I suspect you can hook into this logic to get the dequeue counter. Please take a look at IAzureQueueDataAdapter and it's implementations. I suspect it will allow you to solve the poison event problem.
We should consider adding a configurable value for max dequeue count in the azure queue stream provider.
@luciobemquerer
So the expected behaviour would be for a successful message to repeat at most X times, where X is the number of Agents working in parallel?
Each agent reads from a single azure queue, so their should not be any 'parallel' agents. I understand azure queues supports multiple readers, but that is not the pattern we use. Each queue has a single reader.
I wonder if we can achieve the same effect by reducing the CacheSize
That can work as well. I attempted to describe the three settings that effected this so you and your team could tune your service in a way that works well with your load and usage patterns. I'm far less comfortable telling you how to mitigate this than providing you the details you need to make informed decisions. At the end of the day, you and your team will always know more about what is right for your service than I.
... by reducing the CacheSize, and what would be the reason to prefer one solution over the other?
A good starting point would be a cache size equal to the number of messages you expect to process per second per queue. So a service processing 8000 messages per second with 10 queues would be 800.
A smaller cache will effect the number of messages you can process at any point in time.
If your events are not really associated with a stream and can be processed in parallel, then a cache size less than the expected load you can process can starve your service.
For instance: If one expected an event to be processed in 2ms of work (including grain call time), with 1 queue per silo, and 8 cores (per silo) available for event processing, a cache size less than 4000 would risk starving the service. This is not entirely true, 400 is probably more accurate, as it assumes cache is refilled once per second, but I'm avoiding getting into read frequency discussion.
The above is silly math, but illustrates the basic idea. If you've load tests, I'd suggest trying a smaller cache and seeing how it effects your service, with math like this used just to provide initial order of magnitude expectations.
Suppose we have a message that takes too long to process inside the grain, and it reaches the timeout after 30 seconds. How would this time affect the Cache purging routine? Would this 30 second timespan count towards the VisibilityTimeout of current Buckets?
Yes. VisibilityTimeout must be less than the difference between the time it was dequeue (and put into the cache for delivery) and released from the cache after event delivery (or failure). The processing time Including the 30 second timeout of grain calls) is included in this time. Since blocks are only purged in order, any processing time of a previous block can effect the purge time of any block after it.
poison message
See my previous post to @centur. Azure queue stream provider does not detect or handle poison messages.
Thank you very much, @jason-bragg. I'll set up some load tests and update you guys if I have any new information.
@gabikliot
I had a pretty strict preference to avoid significant code duplication.
I advocated for the current approach as well, believing that if we built the stream infrastructure for the more strict requirements of ordered recoverable data it would not be hard to relax those requirements, but that has not turned out to be as easy as I'd anticipated.
when we started we envisioned multiple stream providers, but in fact we have only 2, AQ and EH.
For the most part yes. Strictly speaking we also have one for SQS and a couple test stream providers, GeneratorStreamProvider and MemoryStreamProvider, but the primary queues we maintain are AQ and EH.
Would there be ordered and un-ordered PersistentStreamProvider base classes, with ordered and un-ordered agents?
Unclear, but this is how I was thinking of categorizing the behaviors. The biggest difference seems to be our treatment of unordered queues, which do not support recovery and ordered queues which support limited recoverability (via checkpoints and message caching).
Yes. VisibilityTimeout must be less than the difference between the time it was dequeue (and put into the cache for delivery) and released from the cache after event delivery (or failure). The processing time Including the 30 second timeout of grain calls) is included in this time. Since blocks are only purged in order, any processing time of a previous block can effect the purge time of any block after it.
Umm, now I'm a bit confused what should be less than or greater than the other.
From what I read - there is 2 timespans that affect the processing:
VisibilityTimeout - the time for which message stays invisible in AzureQueue after it has been dequeued.MaxEventDeliveryTime - this is a period that defines for how long some Orleans class _(I'm avoiding pulling agent as I suspect that PA actually just fetches the messages for queue-to-cache path and cache-to-grain is handled by another class)_ will try to deliver a message from cache to the subscribed grain. So this time includes all bits in an example scenario: subscriberFoosubscrBar, subscriberFoo completes, subscrBar times out,subscrBarsubscrBar manages to process until MaxEventDeliveryTime ends - mark this message as processed in cache. if not - mark it as failed.By the end of the cache->grain delivery window - infrastructure purges the cache (bucket). So if the message in cache marked as processed - it tells the Azure Queue to kill the still invisible message (assuming VisibilityTimeout >MaxEventDeliveryTime) and we are fine here. If the message failed - infrastructure does nothing with AQ reference and message reappears automatically after VisibilityTimeout and will be retried (infinitely cause there is no poison msg detection)
This sounds like a logical way to with least surprises.
In the quote above you said that VisibilityTimeout < MaxEventDeliveryTime. Which means that the message reappear in the Azure Queue automatically before Orleans infrastructure exhaust dedicated time to deliver, process and sweep message on the cache->grain route. Am I reading this correctly ?
If yes - this sounds wrong to me- re-surfaced message in AQ will be in 2 cache buckets - old one and the new one, and the old bucket AQ-msg reference will be broken, although cache->grain delivery is not yet completed and still MAY succeed. So when old bucket will be eventually purged - msg may be marked as processed but the reference to it is broken (because message reappeared in AQ already) - so attempt to mark it as deleted (cause it was eventually delivered successfully) will fail with Azure Queue exception message. And I think I saw these few times before in our logs (but never managed to find out why they happened as they are veery random and not very repeatable)...
So if the above description is correct - the safest way is to have VisibilityTimeout > MaxEventDeliveryTime, not the other way around. Thoughts ?
I miss spoke. Brain failed..
VisibilityTimeout must be _greater_ than the difference between the time it was dequeue (and put into the cache for delivery) and released from the cache after event delivery (or failure).
I was trying to figure out how to make it clear, and restructured the sentence without switching the lesser to greater.. then didn't catch it in proof reading. Sorry for confusion.
As far as VisibilityTimeout > MaxEventDeliveryTime, yes, this is the minimal safe value, but plz keep in mind that blocks are purged in order from the oldest block in the cache, so the time in the cache may be as much as the time to deliver every message before it in the cache plus it's delivery time.
If one needs a small visibility time, cache size will need to be tightened, possibly at a cost of read rate. If high read rate is more important than visibility time, a larger cache with a long VisibilityTimeout would probably work better. This is very general and stream topology is also important, so vetting these in load tests for a specific service is advisable.
Most helpful comment
That reminded me the pretty brutal punishment techniques in the Genghis Khan army:
https://en.wikipedia.org/wiki/Mongol_military_tactics_and_organization#Training_and_discipline
"If one soldier ran from danger in battle, he and his nine comrades from the same arban would face the death penalty together."