Orleans: Talk about the stream (Feedback on the stream and QueueCacheMissException)

Created on 27 Sep 2015  路  54Comments  路  Source: dotnet/orleans

Continuing the theme https://github.com/dotnet/orleans/issues/844 at the request of @gabikliot, I give feedback.
For starters, I would like to say I could be wrong in my statements because I may not know some of the nuances in the script and the principles of implementation or maybe I do not fully understand the work scenario.

Scenario example: lets says you subscribed at offset X, then produced event with sequence X, X was delivered to consumer, now it maybe be deleted from the cache, now you subscribe again to X from a different consumer, then you will get cache miss exception, rightfully, for the 2nd subscription.

Overall I completely agree with this scenario, but there are a number of issues.
During testing I noticed that if you receive an error QueueCacheMissException I lose data flow (queue), based on the described scenario it is logical, because there is a false consumer that does not process messages, and all messages go nowhere
But is it right? How can I recover this data? Why not leave in the stream and in the queue until the consumer who will be able to handle them? As the message can be considered processed if it has not been processed?

About the subscription:
Of course one the consumer needs to sign only once.
But I faced a problem, what if multiple consumers subscribed to the stream, or rather how to understand what the current consumer I don't have a subscription again?
While I always work with only one consumer because to me it is important to follow FIFO
So I want to show my subscription option with one consumer within the solution of the problem which I described https://github.com/dotnet/orleans/issues/844
Would welcome comments on the code and the answer to the question how to track the subscription of current consumer if you know what subscriptionHandles.Count > 1 ?

Consumer

``` C#
public async override Task OnActivateAsync()
{
var streamProvider = GetStreamProvider(PROVIDER_NAME);
_stream = streamProvider.GetStream(Guid.Parse(GUID_STREAM), this.GetPrimaryKeyString());
var subscriptionHandles = await _stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
subscriptionHandles.ForEach(async x => await x.ResumeAsync(OnNextAsync));
}

    public async Task SubscribeAsync()
    {
        var subscriptionHandles = await _stream.GetAllSubscriptionHandles();
        if (subscriptionHandles.IsNullOrEmpty())
            await _stream.SubscribeAsync(OnNextAsync);
    }

Producer

``` C#
        private async Task<IAsyncStream<string>> GetStream(string consumerName)
        {
            var streamConsumerGrain = GrainFactory.GetGrain<IConsumerGrain>(consumerName);
            await streamConsumerGrain.SubscribeAsync();
            var streamProvider = GetStreamProvider(PROVIDER_NAME);
            return streamProvider.GetStream<string>(Guid.Parse(GUID_STREAM), consumerName);
        }
P2 question

All 54 comments

you are missing await in subscriptionHandles.ForEach(x => x.ResumeAsync(OnNextAsync));

of course, I forgot to copy, corrected

"if you receive an error QueueCacheMissException I lose data flow"
This was a bug which was also addressed in #849, though I admit we should have more testing around this to verify that. The intended behavior is as follows:

1) Consumer subscribes to stream at sequence token t.
2) If t is in cache, start delivering events from t.
3) If t is newer than items in cache, no data is sent until t is added to the cache, then deliver from t.
4) If t is older than the items in the cache, send the consumer will get a cache miss exception on the OnErrorAsync call, then continue delivering from the most recent data in the cache.

This means if the consumer does nothing with the OnError notification of the cache miss, the consumer may loose data, but the consumer has the opportunity to decide what to do: Unsubscribe? Re-subscribe at the next interesting token? Just continue processing?

Do I understand correctly that I can't subscribe again? I definitely need to re-subscribe (ResumeAsync)?
If Yes, then how can I among all subscribers (.GetAllSubscriptionHandles()) select (current) to re-subscribe it (ResumeAsync)?

"Do I understand correctly that I can't subscribe again?"
Subscribing again will create a new subscription. This is probably not the behavior you want.

"I definitely need to re-subscribe (ResumeAsync)?"
It's not clear to me if you mean resume after grain deactivation, or after a cache miss exception. I'll assume you mean after cache miss.

To be clear, you "shouldn't" need to re-subscribe (resume) just because a cache miss occurs. You "can" but should not have to. If you don't re-subscribe on the error, you will miss all of the events from the point in the stream the consumer was at, to the most recent data. If you do re-subscribe, you may avoid losing some data, but you'll need to know where to resume from, which can be non-trivial.

It sound like you may also be struggling with matching an onError call to the subscription handle that you would use for any unsubscribe call. For this I would suggest writing your own implementation of IAsyncObserver that contains subscription details and subscription state. The SubscribeAsync extension functions you are using are utilities to handle simple cases. They are not likely to be sufficient to handle more complex scenarios, such as multiple subscriptions. They 'can' be used for this if you really want to avoid implementing your own observers though by doing something like this.

subscriptionHandles.ForEach(async x => await x.ResumeAsync(OnNextAsync, ex => OnErrorAsync(x,ex)));

private Task OnErrorAsync(StreamSubscriptionHandle handle, Exception ex)
{
...
}

This provides the error handler the subscription handle the error came from.

"It's not clear to me if you mean resume after grain deactivation, or after a cache miss exception. I'll assume you mean after cache miss."
I meant both options

As I touched on in my previous post. If there is some sort of state associated with a subscription (aggregates, routes, ..?) you'll need to persist this state associated with the subscription to re-associate it when rehydrating the grain and resuming the subscription. Serializable implementations of IAsyncObservable would be the method I'd suggest, but there are other options.

When storing stream state, I'd avoid serializing the IAsyncStream or the subscription handle, even though they are both serializable and intended to be used that way. The reason I discourage this is that these objects are still in flux and we don't have backwards compatibility built in yet. Instead, I'd suggest storing the stream guid and namespace, then matching those with the related data in the handle to associate the correct handle during grain activation.

When it comes to passing a sequence token to a resume, at grain activation or on cache miss, I would also avoid this if you can. The issue here is that there are two types of PersistenStreamProviders.

  • The first is non-rewindable. These use a persistent queues which have no ordering or reliability guarantees. An example of this is AzureQueueStreamProvider, which you seem to be modeling off of. For these stream providers, the tokens are volatile and only carry meaning during delivery. That is, if you restart the silo and try to use a previously delivered token to subscribe to a particular place in the stream, the operation will likely fail or not act as you'd intended, because that token may not be valid any more. These are mainly best effort stream providers that use a persistent queue to help reduce data loss. When subscribing or resuming to streams from this type of stream provider, providing null tokens is the correct action most of the time.
  • The second type of Persistent stream providers are rewindable. These use queues that support ordering and reliability guarantees, like kafka, or event hub. With these kinds of stream providers, one can persist the sequence token with the grain state, and use it to resume the subscription, because these queues provide the ability to redeliver data (to a degree) from reliable locations that can be encoded into the tokens. Unfortunately, we do not yet have stream providers of this type in Orleans. All implementations of these types of stream providers have been customer specific.

From the above description of persistent stream providers, what you've been building more resembles a non-rewindable stream provider. These stream providers allow for producers to place data in the queue and for processers to process it, but do not have a strong recoverability story. Applications using this type of stream provider tend to focus on fast best effort stream processing, identifying when data loss occurs to successfully processing the data that is not lost.

Some things that would be helpful for us to understand when advising:
How important is the data you're processing? Mission critical, or is some data loss acceptable?
How important is it that these data be processed in real time? On failures can backend, recovery batch process be used for eventual consistency or is the data of no value if it's not served in real time?

data is important, loss is not acceptable (better way one pack will be processed again than get lost completely)
I have a tree structure from multiple streams. the first three levels I important processing in real time, all subsequent I admit or a long data processing or deferred (but to keep all data for me in the first place).
there is one exception:
throughout the tree there is one branch which needs to be served in real time, but data integrity after the third level I priority (I can assure you that data processing it will be fast)
can draw a diagram if this scenario is not the end understandable

Why not initially considered an option of saving the key grain of the consumer as subscription handle?

"Instead, I'd suggest storing the stream guid and namespace, then matching those with the related data in the handle to associate the correct handle during grain activation."
Do I understand correctly that you propose to do cosumer state and to store basic information about the stream to which it relates, including subscription handle?

"Do I understand correctly that you propose to do cosumer state and to store basic information about the stream to which it relates, including subscription handle?"
Yes, but I don't advise actually persisting the subscription handle. Instead, persist the guid and namespace, and get the actual handle from GetAllSubscriptionHandles. This is because serializing the handle will likely cause issues if/when we change the subscription handle. The persistent stream provider (and its subscription handles) is still very much in flux.

I got confused a little. Let's consider the following scenario:
I have a stream and two grain of consumer who have subscribed (C1 and C2, with keys guid1 and guid2, respectively).
therefore GetAllSubscriptionHandles().Count == 2 and we know what belongs HandleId1 C1 and C2 HandleId2
now I restart silo
C1 is activated, but it is not subscribed on the stream, but I know that GetAllSubscriptionHandles().Count == 2, but I don't know what HandleId belongs (belonged) to C1, so I don't know to what Handle I need to call ResumeAsync in the current consumer C1

Thanks for the scenario.
I was under the impression you were talking about multiple consumers in a single grain, so I think clarifying the scenario will help.

I'll restate what I think you're saying:
-two stream consumer grains C1, C2.
-one stream S1
-C1 is subscribed to S1 using handle H1.
-C2 is subscribed to S1 using handle H2.
-restart
-How does C1 know which handle to resume?
-How does C2 know which handle to resume?
Is the above correct?

"therefore GetAllSubscriptionHandles().Count == 2"
This should not be the case. GetAllSubscriptionHandles called on an IAsyncStream from a grain should only return the subscriptions that grain has on that stream.
So when C1 calls S1.GetAllSubscriptionHandles() it should only find H1.
When C2 calls S1.GetAllSubscriptionHandles() it should only find H2.

To verify this you can log the handles during the subscribe/GetAllSubscriptionHandles/resume calls. The handle's ToString has been overridden to aid in this kind of debugging.

If there are multiple handles, the this means the grain must have, at some point, subscribed multiple times. In this case, you can resume any of the previous handles, and should unsubscribe from the rest.

If this is not the behavior you are seeing, then please let me know.

Yes that's right, you understood correctly my scenario, and all correctly explained and described, I am convinced of this in practice
I immediately raises a number of "provocative questions" :)
1)is it correct that the same consumer can subscribe to a stream multiple times and appropriately handle several times the same message? (if I remember correctly, in this scenario, the error occurred the second subscription (before the fix))
2)is it correct that two different consumer can process one and the same "message"?
2.1)how to implement scenario when the first consumer will get the first message from the stream, and the second will take next?
3)how the consumer may inquire about other consumer the same steam?
4) can a single consumer to be subscribed to 2 different stream? (came to mind when I wrote these questions :))

I'm going to assume, as before, when you say consumer you are referring to a consumer grain.

1) Yes. However, this will come at the cost of the stream infrastructure actually delivering the event to each subscription the consumer creates. That is, each subscription is independent, so the stream infrastructure will send each event on a stream to every subscription. Since grains are usually not reentrant, this can lead to timeouts if there are large numbers of subscriptions or long event processing time. Rather than doing this, it would be more efficient to have one subscription per consumer and perform all the related processing on the events from the single subscription.

2) Each event (message) is delivered per subscription, not per consumer.
For instance.
Consumers C1, C2.
Stream S1.
Consumer C1 subscribes to S1 twice, H11, and H12.
Consumer C2 subscribes to S1 once on H2.
An event E is produced on S1.
E is sent to C1 on H11
E is sent to C1 on H12
E is sent to C2 on H2.

2.1) No native support for this. One possible solution is to write a consumer grain that 'dispatches' work to other grains in a round-robin fashion.

3) No native support for this. If consumers need to know about each other, you'll need to create some sort of consumer registry.
The pubsub system knows about subscribers, but it does not expose this data natively, nor does the stream infrastructure allow one to provide one's own pubsub system. If this is an extensibility point you need for this capability, it is probably significant enough for it's own issue.

4) Yes. A consumer grain can subscribe to multiple streams, the same stream multiple times, and even the same handler (onNext) can be used with multiple subscriptions. In general it's better to think of stream delivery in terms of subscriptions rather than consumer grains. We often mix this terminology because a single subscription per grain is the most common (and tested) usage pattern.

Now all is clear, many thanks for your detailed answer!
By the way I it is difficult to imagine a real situation in which I need to implement scenario 1, maybe I'm also not sure about the support scenario 2.
Bad that is not supported scenario 2.1, I understand you can send in a small stream, and not just in other grains.
The creation of a large number of small streams will not hurt the system? and whether it is expedient to increase the number of queues in the provider? and in what ratio?

"Bad that is not supported scenario 2.1"
Why is this scenario important? Performance?

"The creation of a large number of small streams will not hurt the system?"
Large and small are relative. Can you provide approximate numbers? Also, streams tend to come and go, so how many streams do you expect to be active at any point in time?
For instance

  • 600000 concurrent streams
  • 10 queues
  • each stream lasting 1 minute
  • The above means there are about 1000 new streams per second.
  • With a grain per stream this means 1000 grain activations per second.

The above scenario is not alarming, but knowing one's expected stream topology is very helpful in capacity planning.
There is setup and teardown cost involved in each stream, so fewer, larger, more active streams should be more efficient, but in general lots of small streams isn't necessarily a bad thing.

"number of queues"
The queue count will mostly be related to the capacity of your underlying queueing tech. For instance if you're queue SLA supports 10MB/second per queue then you'll need to figure out how many messages you expect to send, how big your messages are and from that determine how many queues you'll need (then double it! :) )

"Why is this scenario important? Performance?"
I think it's more popular and frequently used scenario in practice. Besides, it is more difficult to implement (in comparison with the situation if the scenario 2 is not supported and I would need to implement it).
Once again, I can't even imagine the real situation in which you are using the scenario 1. If I correctly understand, and the example does not support scenario one 1, the need for GetAllSubscriptionHandles() will disappear, as there will be always only one subscriber, but they may appear sense to use GetAllSubscriptionHandles() as getting all the grain consumer (subscribers), not just the current consumer grain.

If there is a common scenario we're not familiar with I'd like to understand it. If you don't mind exploring this more, I'd like to get a deeper understanding of 2.1.

Does the below scenario cover the behavior you'd want.

  • Stream S1
  • Consumer grains C1, C2, C3 all subscribe to S1.
  • 5 events are produced onto S1: e1, e2, e3, e4, e5.
  • e1 goes to C1
  • e2 goes to C2
  • e3 goes to C3
  • e4 goes to C1
  • e5 goes to C2
  • Another consumer grain C4 subscribes to S1.
  • 5 events are produced onto S1: e6, e7, e8, e9, e10.
  • e6 goes to C3
  • e7 goes to C4
  • e8 goes to C1
  • e9 goes to C2
  • e10 goes to C3

Is this the type of event processing behavior you'd prefer?
If so, this seems to indicate that there is no relation between events. For instance, e6 and e7 are completely independent. Is this the case?

the more I think about real use scenarios 1 and 2, the more I have questions (though I have little interest), for example:
For instance.
Consumers C1, C2.
Stream S1.
Consumer C1 subscribes to S1 twice, H11, and H12.
C2 Consumer subscribes to S1 once on H2.
An event E1 is produced on S1.
An event E2 is produced on S1.
E1 is sent to C1 on H11
E1 is sent to C1 on H12
E1 is sent to C2 on H2.

Time1:
Processed H11 E1
H12 E1 is not processed
H2 E1 is not processed

邪1) H11 can move to E2 ? (I will assume no, because it's one grain consumer, the concept is correct)
b1)E1 flagged for deletion is removed from the queue? (I will assume Yes, then restarting silo I lose the data for H2 and H12, but maybe not, then look further)

Time2:
Processed H11 E1
Processed H12 E1
H2 E1 is not processed
邪2) H11 can move to E2 ? (I will assume Yes, otherwise I just don't understand the meaning of expected H2)
b2)E1 flagged for deletion is removed from the queue? (I will assume Yes, then restarting silo will I lose my data for H2, if not, then I have to rerun H11 and H12)

Maybe all of what I have said above is nonsense, and I really need to sleep :)

"If there is a common scenario we're not familiar with I'd like to understand it. If you don't mind exploring this more, I'd like to get a deeper understanding of 2.1."
Yes, you correctly describe the behavior

To use the analogy:
It is a container of water (a stream), from which pump out the water N the number of pumps (consumer grain). And for example rain is a producer in a certain period of time can fill the capacity, but maybe not :)

"H11 can move to E2 ?"
Yes. Each subscription is independent. Once E1 is delivered on H11, the stream system will attempt to deliver E2.
Since E1 is being delivered to C1 via H11 and H12, assuming C1 is not a reentrant grain, the delivery of E2 will most likely be delayed as the grain is processing E1 on H12, though this behavior is indeterminate.

"E1 flagged for deletion is removed from the queue?"
E1 will not be flagged for deletion until E1 is purged from the message cache. In general, this means that this behavior is determined by the cache implementation. Specifically, in the simple message cache implementation you are using, this means that E1 will not be purged until it is delivered to all of it's consumers (H11, H12, and H2).

"To use the analogy"
Good analogy, but in our inception of streams context matters. The streams we tent to process are a related set of data that occur over time that we want derive conclusions from.
For instance, John's position stream:
e1 - John is at position p1 a time t1
e2 - John is at position p2 a time t2
...
eN - John is at position pN a time tN

From this stream of data, we can map where john when, how fast he got there, his max acceleration, his max velocity, how close we was to running into Jim at the mall (assuming we have Jim's stream)...
In this type of stream of information, order and context matters. Processing each individual piece of data tells us very little. It's the patterns that emerge from the data that are of interest.

The use case you are interested in is interesting and we should support it, but it's not the type of scenarios we initially targeted, so there may be some tinkering needed.. :/

if I understood you correctly, you consider one stream as a single object and process state (parameters/properties) of one object? Multiple objects, multiple streams? Created a new object, created a new stream?

about 2 scenarios:
why in the situation when two grain consumer processes one the same message you're using to stream, but not for example ObserverSubscriptionManager<IGrainObserver>?

"if I understood you correctly, you consider one stream as a single object and process state"
Not exactly. What a 'stream' is has not been clearly defined . I'm not defining a stream as much as describing the stream processing patterns we initially targeted. There are many stream processing patterns and over time we would like to support a wide range, but we had to start somewhere. Unfortunately the patterns we initially targeted don't fit your needs.

There are (at least) a couple options:
1) You could ignore the streaming infrastructure and develop a queue reader role that uses the actor model to scale event processing.
2) You could use the streaming infrastructure as it is, attempting to work around it's shortcomings.
3) You could work with us and the community to update the streaming infrastructure to support the event processing pattern you're interested in.

Obviously, I favor 3, as it would help us improve our streaming tech. Others in the community have asked about the pattern you're using, so I've reason to believe you may be able to find community support for this effort.

"for example ObserverSubscriptionManager "
One of the requirements of the streaming infrastructure is that consumers be independent. That is, if a single consumer is encountering issues (slow storage calls, processing errors, bugs, ...) it does not effect other consumers.

In the observer pattern, a single message is fanned out to all observers, then the next message is fanned out, and so on... If a single observer takes 30 seconds to process the message, all observers (even those processing the message immediately) must wait for the slow observer to finish before the next message is sent. Similarly, if a single receiver times-out or throws an error, the sender must deal with that before sending the next message, even though the other observers are ready for the next message.

The stream consumers to not have these restrictions. Each consumer can consume data as fast or slow as it wants, without regard to other consumers.

If I choose option 3, what should I do?

In general terms, we're talking about a different event delivery pattern. In the persistent stream provider, this behavior is the responsibility of the PersistentStreamPullingAgent. This agent reads events from the adapter and delivers them to consumers. This is the behavior you need changed. I suggest familiarizing yourself with this code then proposing a set of changes to introduce the behaviors you need (a rough design). With a concrete proposal, we and the community can offer design and coding support.

At the high level, I think the proposed change would be along one of the following lines of thought:
1) If changes are mainly confined to the pulling agent, we could componentize the agent, add an additional agent that supports the behavior you prefer, then allow the provider or adapter to determine which pulling agent is to be used.
2) If the changes are more invasive, significantly influencing other streaming infrastructure (stream pub/sub, stream grain extensions, the adapter api, queue balancer), then you may need to introduce and entirely new stream provider that reuses as much of the existing infrastructure as is practicable, while adding new behaviors where needed.

My hope (and suspicion) is that the necessary changes can be confined to the agent behavior (option 1).

OK, this weekend will see what I can do

Continuing the theme, question...
Noticed a trend that MessagesDeliveredAsync method is invoked before the complete method OnNextAsync (consumer), is this correct?

I'm retracting this post, as I misunderstood the question and don't want the response to confuse future readers.
I was under the impression @Merroy was asking about the call order in the agent logic that makes it look like MessagesDeliveredAsync is called before event delivery, which is not what he was asking about.
I'm leaving the response below for historical reasons. It is correct in context of the agent behavior, but does not address @Merroy's question.

============================ retracted =======================================
That's correct. We try to purge the cache prior to adding more data into it (see usage of AddToCache later in the function). Once we purge the events from the cache we tell the receiver they have been delivered. This assumes that the cache will not purge the messages until they have been delivered.

This loop continues until the agent is shutdown, so if messages are delivered and can be purged from the cache, they will be purged the next loop.
============================ retracted =======================================

How can I provide the save (fault tolerance) data which are in the process? state consumer grain?

I apologize, but I don't understand the question. Can you elaborate?
What do you want to save? Stream state? Events?
What kind of faults are you trying to recover from? Event processing? Timeouts? Event delivery? Queue Errors?

Jason, I think you misunderstood the above question. That is of course NOT correct, we first deliver to consumers and only after delete from queue. Otherwise we can break the at least once delivery guarantee of azure queue stream provider.

Of course, bug is possible.
Or maybe what merroy observes is for different messages.

@gabikliot It is quite possible I misunderstood the question.

"MessagesDeliveredAsync method is invoked before the complete method OnNextAsync (consumer), is this correct"

@Merroy, If you add logging in the MessagesDeliveredAsync and OnNextAsync, do you see events MessagesDeliveredAsync being called on an event before the OnNextAsync is resolved for that event? If it is, this is a bug, as Gabriel has stated.

Translating as usual to the scenarios:
There is a method

C# private async Task OnNextAsync(T message, StreamSequenceToken streamSequenceToken) { await DoSomething1(message); await DoSomething2(message); }

1)In queue (stream) 1 message
2)褋onsumer call OnNextAsync
3)receiver's call MessagesDeliveredAsync
4)In queue (stream) 0 messages
5)consumer call await DoSomething1(from OnNextAsync)
6)stream/silo falling with error
7)I lose "message" forever

I'm not sure I understand.
In scenario, message was enqueued into stream 1 (1), processed (2), then removed from the queue (3). All good.
Then messages were enqueued into stream 0 (4), but never removed from the receiver (hence, not deleted from azure queue) so where did the data loss come from?

In general, If the OnNextAsync throws an exception, the agent should redeliver the event. This does not necessarily prevent data loss.

The stream delivery error handling works as such:
1) If the OnNextAsync call or any other event delivery error (timeout for instance) occurs while delivering an event to a consumer, Orleans retries delivering the event until a (configurable) max retry period (default of 1 minute) has been exceeded.
2) Once this limit has passed, the grains OnErrorAsync will be called reporting that the event could not be delivered.
3) The adapter's stream failure handler (IStreamFailureHandler) OnDeliveryFailure call will be made.
4) If the subscription is an explicit subscription and the IStreamFailureHandler,ShouldFaultSubsriptionOnError is true, the subscription will be put into a faulted state and no more events will be delivered on that subscription. For azure queue, this could lead to data loss.
5) If the subscription is an implicit subscription or the IStreamFailureHandler,ShouldFaultSubsriptionOnError is false, the failed event is skipped and Orleans attempts to deliver the next event in the stream. For azure queue, this could lead to data loss.
If the OnNextAsync throws and exception, the agent should redeliver the event. This does not necessarily prevent data loss.

Note that in both of the data loss error cases, both the grain and the adapters failure handler are notified.

In case of silo crashes, any events that were in the queue cache (being processed, or pending processing) should be reread from the queue receiver when the silo comes back up.

Does the behavior you see, differ from what I've described here?

For exception handling, all is well, no issues.
I'm more interested in this point:

In scenario, message was enqueued into stream 1 (1), processed (2), then removed from the queue (3). All good.
Then messages were enqueued into stream 0 (4), but never removed from the receiver (hence, not deleted from azure queue) so where did the data loss come from?

It seems to me or there is contradiction in your words?

then removed from the queue (3). All good.
but never removed from the receiver (hence, not deleted from azure queue)

I do not understand a bit, MessagesDeliveredAsync is called after the completion of the method OnNextAsync or during the beginning of his work (if a review option which is in the queue only one message)?
In other words, the message is deleted from the queue after processing or at the time "start processing" (when the message was delivered cosumer grain)?

"the message is deleted from the queue after processing or at the time "start processing" (when the message was delivered cosumer grain"
The intended behavior is that the event is deleted from the queue -after- it is processed. All consumers OnNextAsync calls should be completed on the event, -before- it is deleted from the queue.

If this is not the behavior you are seeing, please let us know, as that would indicate a bug.

So I guess this is a bug
I can see the situation when the event is removed before completing call OnNextAsync

Can you create and issue for this. I'll look into it.

Ok, this relates to the same problem?
There is an interesting situation.
I have a movement structure of the message
P1->Stream1->C1->P2->Stream2->C2
I is sent to P1, for example, 100 messages
Has the following behavior:
the segment P1->Stream1->C1->P2->Stream2 all messages pass so quickly, for example 1 second.
cut Stream2->C2, is handled by "long" on each message, for example, 5 seconds. In the end, all messages are processed 500 seconds

Detail:
1)after 2 seconds after starting I know Stream1 contains 0 messages and Stream2 contains 100 messages (one of which is in process)
2)after 501 seconds I know that Stream1 and Stream2 empty

Summary:
501 second later, I get a call MessagesDeliveredAsync, in which the removal of 200 messages

Question:
1)why didn't the deletion occurred when Stream1 became empty?
2)why did not occur periodical deletion in the process Stream2 (why so long)?

It would be very helpful if you created a test scenario that reproduces this issue and submitted it as a pr. Thanks.

@Merroy Cache purges events in order and in bulk. We don't purge the cache on an item by item basis. Inside the cache are 'buckets' of events, and a bucket is not purged until all of the events in the bucket have been delivered. There are multiple buckets in the cache and newer buckets are not purged until older ones are purged.

For notation purposes let's call s1eN an event in stream1 and s2eN an event in stream two.
The events in a bucket may look like this s1e1,s1e2,...,s1eN,s2e1,s1eN+1,...
If a bucket looked like this, none of the events in the bucket would be purged until all were processed. Since s2e1 takes 500 seconds to process. All of the events in the bucket would be held until S2e1 was done processing. Even if the next bucket contained only events from stream1 and none from stream2, it would still be held in the cache until s2e1 was purged because buckets are purged in order.

it turns out that, for example, restarting the silo, at the outlet of C2 I get duplicate messages (a total of 200 messages, 100 original replacements) or if I restart silo again and I get 400 messages?

"restarting the silo ... I get duplicate messages"
This is expected behavior, and will vary by queue cache size.

how can this be minimized? to reduce the cache size? if Yes, what is the probability to catch QueueCacheMissException?

from @gabikliot "It would be very helpful if you created a test scenario"
I suggest testing this as an isolated test of the SimpleQueueCache as that would be simpler than testing this from the grain level. My strong suspicion is that this is a SimpleQueueCache bug.

@Merroy "probability to catch QueueCacheMissException?"
Using the SimpleQueueCache, cache size should not increase QueueCacheMissException likelihood. Depending on the number of streams you have per partition, it is likely to hinder throughput.

in my opinion it is also necessary to support the scenario (without duplicates), even if this would reduce bandwidth

as an idea
Why those messages which have already been processed mark it as processed, and keep the cache in persistent stream. After "restart silo" restores the cache and not send to reprocess marked?

@Merroy Ensuring the streams never deliver duplicates is non-trivial.
Solutions that have been used to deal with duplicates tend to follow one of the following patterns:

  • Idempotent data processing.
  • Encode ordering info into the data that can be used to detect duplicates at the processing layer.

@Merroy @jason-bragg I used to own the Amazon SQS service, and spent a lot of time looking into guaranteeing exactly-once delivery. The short answer is that it requires a unicorn (more precisely, a perfect distributed consensus algorithm that works in the face of arbitrary Byzantine failures). This is particularly true in a system like Orleans where a component on a failed node will be automatically recreated.

Idempotency is generally the best approach, but it does require that all instances of the receiver agree on whether or not a specific message has been processed. Even if there's never more than one receiver active at a time, this is hard because it requires persisting a unique message ID as part of processing the message -- and doing it transactionally along with any other outputs of the message processing.

All you can really do is reduce the likelihood of duplicate delivery. Since you can't be perfect, this then becomes a question of trade-offs: generally, the more work the platform does to minimize duplicates, the higher the performance impact. If your application just can't handle duplicates, then you need to code for duplicate delivery regardless of what the platform is doing for you, so it's a question of perf cost in the platform versus perf cost in the application. If your application can handle a low likelihood of duplicates, then it's good if the platform can meet your threshold because then the application code doesn't have to deal with it at all.

One nice thing about the Orleans model is that you could build different providers that have different likelihoods of duplicate delivery, with different costs, and let the application developer pick the appropriate one for each message stream.

@alan-geller +1
So when I said it was 'non-trivial', yeah, umm. I was understating the point :)

"One nice thing about the Orleans model is that you could build different providers that have different likelihoods of duplicate delivery."
This is very true, though developing a stream provider from scratch can be daunting. The IQueueAdapterFactory pattern used by the PersistentStreamProvider was intended to help mitigate this. Unfortunately, the event delivery logic is not yet customizable through the adapter pattern.

"Since you can't be perfect, this then becomes a question of trade-offs"
Regarding the limiting of duplicates, one approach would be to make your custom version of the azure queue receiver not delete the message in the MessagesDeliveredAsync, but instead generated IAsyncDisposable events that deleted themselves from the queue upon being disposed. Once the OnNextAsync completed processing the event, it could dispose of it. This would allow the application control of the queue cleanup, rather than relying on the streaming infrastructure. This would be best implemented with consumer grains that were locally placed (located on the same silo as the queue they came from).

The last suggestion means you are breaking location transparency, a big no no in Orleans world (this is a comment only on Jason last suggestion about async disposable and local grains, not about Alan's point).

At the high level, I think the proposed change would be along one of the following lines of thought:
1) If changes are mainly confined to the pulling agent, we could componentize the agent, add an additional agent that supports the behavior you prefer, then allow the provider or adapter to determine which pulling agent is to be used.
2) If the changes are more invasive, significantly influencing other streaming infrastructure (stream pub/sub, stream grain extensions, the adapter api, queue balancer), then you may need to introduce and entirely new stream provider that reuses as much of the existing infrastructure as is practicable, while adding new behaviors where needed.

My hope (and suspicion) is that the necessary changes can be confined to the agent behavior (option 1).

@jason-bragg you are right, option 1
I studied the code, and if I'm not mistaken,the change needs to be made in PersistentStreamPullingAgent (major changes in RunConsumerCursor) and/or add a new agent.

PS in the course of examining the code, I had the idea to extend the functionality, namely to add a subscription to the consumer on OnNextBatchAsync
One can consider to integrate in the subscription of the consumer to ObserverSubscriptionManager (), i.e. to realize the quick transfer of the responsibility for the message stream (the consumer), for example in state grain, without waiting
although the existing functionality already allows you to implement this script, if I'm not mistaken

"to be made in PersistentStreamPullingAgent (major changes in RunConsumerCursor) and/or add a new agent."
My previous (brief) investigation into this left me with the impression that PersistentStreamPullingAgent performs three different behaviors.
A) Queue processor - It is managed by the PersistentStreamPullingManager and IStreamQueueBalancer to determine what queue it processes, and starts/stops processing of that queue.
B) Stream Producer - It inherits from IStreamProducerExtension and interacts with the stream pub/sub system (IStreamPubSub) to publish streams.
C) Event dispatcher - it reads from the queue and dispatches events to consumers (IStreamProducerExtension).

The behavior you seem to want to modify is C.
If you want to keep the same queue management infrastructure but swap out behaviors, you'll likely also need to address A.
If you're addressing A, and C, B will almost automatically be isolated as well.

What are your thoughts on the following:
1) Generalize the PersistentStreamPullingManager into some generic QueueProcessingManager that manages queue processors which conform to some minimal queue processing interface (IQueueProcessor?). The PersistentStreamPullingAgent as a system target and its stop/start and other management abilities would need to be moved into a shell that called into an IQueueProcessor to perform any actual queue processing logic. This would introduce a general queue processing capability to Orleans that supported queue balancing and queue processing running within a system target.
2) Move the stream logic for PersistentStreamPullingAgent into a PersistentStreamQueueProcessor that fits into the shiny new Orleans queue processing framework.
3) Add your logic to queue processor.

  • a) If you're in a hurry, make some sort of BroadcastStreamQueueProcessor that is just like PersistentStreamQueueProcessor but has the delivery mechanism you need.
  • or b) Further fracture internals of PersistentStreamQueueProcessor to include a IStreamPublisher and a IStreamEventDispatcher, that separates the interactions with the pub/sub from event dispatching, then add some sort of BroadcastStreamEventDispatcher that has the delivery mechanism you need.

Note that 3.b from previous post not only allows you to add the delivery behavior you want, but allows anyone using Orleans streams to introduce their own custom stream delivery logic without needing to change Orleans.
3.a, and any similar change, requires changes to Orleans because the pubsub system and related interfaces are all internal. By separating the stream publishing and event delivery, we can allow customization of the delivery logic without updates to Orleans.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

jt4000 picture jt4000  路  3Comments

bobanco picture bobanco  路  3Comments

Liversage picture Liversage  路  4Comments

galvesribeiro picture galvesribeiro  路  4Comments

Vlad-Stryapko picture Vlad-Stryapko  路  3Comments