We're seeing a grain implicitly subscribed to multiple streams suddenly stop having it's OnNextAsync() method called. Other explicitly subscribed grains to one of the streams continue to work as expected. Other implicitly subscribed grains continue to work as expected.
Orleans Version:
Microsoft Orleans Client => 2.0.3
Microsoft Orleans Core => 2.0.3
Microsoft Orleans Core_Legacy => 2.0.3
Microsoft Orleans Core_Abstractions => 2.0.0
Microsoft Orleans OrleansCodeGenerator_Build => 2.0.3
Microsoft Orleans OrleansProviders => 2.0.4
Microsoft Orleans Runtime => 2.0.4
Microsoft Orleans Server => 2.0.4
Microsoft Orleans Runtime_Legacy => 2.0.4
Microsoft Orleans TestingHost => 2.0.4
I have a grain class defined similar to:
```c#
[StorageProvider(ProviderName = DocumentPersistence.ServiceLifetimeRedisStorage)]
[ImplicitStreamSubscription(DocumentStreams.DocumentA)]
[ImplicitStreamSubscription(DocumentStreams.DocumentB)]
public class MyGrain : Grain
IMyGrain,
IAsyncObserver
IAsyncObserver
{
public override async Task OnActivateAsync()
{
var id = this.GetPrimaryKey();
var provider = GetStreamProvider(DocumentStreams.StreamProvider);
var documentAStream = provider.GetDocumentStream<DocumentA>(id);
await documentAStream.SubscribeAsync(this);
var documentBStream = provider.GetDocumentStream<DocumentB>(id);
await documentBStream.SubscribeAsync(this);
}
public async Task HandleDocument(DocumentA change)
{
// ...
}
public async Task HandleDocument(DocumentB change)
{
// ...
}
public async Task OnNextAsync(IEnumerable<DocumentA> items, StreamSequenceToken token = null)
{
var tasks = new List<Task>();
foreach (var document in items)
{
tasks.Add(HandleDocument(document));
}
await Task.WhenAll(tasks);
}
public async Task OnNextAsync(IEnumerable<DocumentB> items, StreamSequenceToken token = null)
{
var tasks = new List<Task>();
foreach (var document in items)
{
tasks.Add(HandleDocument(document));
}
await Task.WhenAll(tasks);
}
public Task OnCompletedAsync() => Task.CompletedTask;
public Task OnErrorAsync(Exception ex)
{
GetSerilog().Error(ex, "An exception occurred while receiving stream data");
return Task.CompletedTask;
}
}
```
_Most_ of the time all is well and OnActivatedAsync() followed by OnNextAsync() calls fire as one would expect. However, left running over night, we no longer receive OnNextAsync() calls when expected - none to be exact. Other grains that are subsequently, explicitly subscribed to the streams continue to work.
I should point out that we are using a redis stream and storage provider; however, all other stream subscriptions (both explicit and implicit) are working fine. It's, seemingly, only this grain with _two_ implicit subscriptions that is no longer receiving stream messages.
Some things I've tried to suss out what is actually going on:
GrainCollectionOptions.CollectionAge = TimeSpan.FromMinutes(1) to force grain collection. I was able to repro the issue but only after still letting it run over night. Once in this state, waiting a number of minutes, I was never able to get OnActivatedAsync() to fire in the grain. This seems to imply the grain was either inaccessible due to some deadlock or busy waiting or that the grain itself was never activated and sent the message. Given that the grain is _relatively_ simple and devoid of any locks, I am suspicious that it's the latter.You probably have checked this, but it is silly not to ask. if OnNext wasn't called, how about OnErrorAsync? Was it called?
I should have added that - I did indeed check that.
and also regarding the tasks.Add(HandleDocument(document)); on OnNext, do you see any abnormal logs about the handle document task?
I haven't seen anything strange or abnormal in the logs. We have info on down turned on for Orleans logging.
Is this multi-implicit-subscribed grain type the only subscriber to the stream? or this stream has different type of subscriber grain, just this multi-implicit-subscribed grain type has trouble receiving message ?
And I assume you checked in redis queue that this particular stream has message flowing in? In other words, the producer is producing messages to this particular stream.
some abnormal or related logs or a repro may help me.
@xiazen I'll get back to you tomorrow when I can dig in a little more.
For some information:
I did confirm that the redis queues (and underlying redis subscriptions for each queue) were still established. This was corroborated by other pubsub streams still working and actively sending items through. I should also point out that some of those streams received the _same_ item that the multiple subscribed stream should have received but didn't.
This answers your first question about if the multi-implicitly-subscribed stream was the only consumer of its streams - no. There are other subscribers (albeit only explicit) that did receive stream items.
I can't answer the second with 100% confidence yet as I haven't yet monitored the redis activity to make sure all queues were receiving produced messages. However, because the other explicit streams received the items I'm inclined to believe that the underlying queues received them.
Looking into this more now - the AdapterReceiver is definitely getting the message via Redis. Occasionally the message just doesn't get forwarded to the implicit subscription. The explicit ones seem to always receive the updates. I'm still digging in to try to figure out exactly why it's sporadic.
does your IStreamFailureHandler long on delivery or subscription failures? If so is it reporting any?
Heh, that's set to a no-op. I'll set one up.
@jason-bragg
Nope, neither of the two failure methods are hit.
For sanity, can you also log the HandleId of the stream subscription handles returned by the SubscribeAsync calls.
We use a hash function that we expect to be unique, but let's double check to be safe.
I'm also curious if the A stream should be using the DocumentA namespaces, and B stream the DocumentB namespace? This may be the case, as I don't know what GetDocumentStream does.. :)
```c#
public static IAsyncStream
where TDocument : BaseDocumentModel
{
return provider.GetStream
}
public static string DocumentStream(Type type)
{
return $"Collected-{type.ToDocumentType()}";
}
```
So, the namespace would be "Collected-DocumentA" or "Collected-DocumentB" depending on the type.
Grabbing the Handle IDs now.
Stream=Collected-DocumentA-d2b4a3e9-6e77-4bce-a20c-fc5fbdd1ad63-SMSProvider
HandleId=853ed9f2-26ff-787a-24b3-7869af8773de
Stream=Collected-DocumentB-d2b4a3e9-6e77-4bce-a20c-fc5fbdd1ad63-SMSProvider
HandleId=853ed9f2-2567-b909-24b3-7869af8773de
SMSProvider is the provider name despite it being the RedisStreamProvider.
Same results when using a non "SMSProvider" name: random messages are missed.
Stream=Collected-User-d2b4a3e9-6e77-4bce-a20c-fc5fbdd1ad63-RedisProvider
HandleId=853ed9f2-26ff-787a-24b3-7869707508a6
Stream=Collected-EmergencyResource-d2b4a3e9-6e77-4bce-a20c-fc5fbdd1ad63-RedisProvider
HandleId=853ed9f2-2567-b909-24b3-7869707508a6
Hmm.. Ok, so the subscription mgmt looks good. So either the issue is on the delivery side (pulling agent + adapter) or the grain side (HandleDocument). Seems easiest to rule out the grain side first. Can you just replace the OnNextAsync logic with an await Task.Delay(10ms) then log. If the issues persists with just an async operation and a log line, then we can reasonably be confident the issue is not on the grain side? Agreed?
I have breakpoints set in the adapter's GetQueueMessagesAsync() method and in the grain's OnNextAsync() methods. The GetQueueMessagesAsync() method _always_ gets hit with the expected message when I repro the issue. Arbitrarily, the OnNextAsync() method does or doesn't get hit.
That seems sufficient to prove the method isn't being called? I can do the logging if you want but it'll be a pain as we log a healthy amount of other things.
Clarification, are you seeing this issue with both SMS and Redis providers?
Yeah, I was planning on getting around to that. I can flip and see if it's repro-able with the SMSProvider.
Wasn't suggesting you test it with SMS, though it could be useful to know that. I was mainly checking my assumptions. Most of the code paths I've been walking were for persistent streams.
I've only observed it with the redis provider right now. I'm running with SMS now. It'll be useful to know if it's inside the redis provider or not.
I've confirmed the issue also occurs with the SMSProvider. Which is both great and bad news. Great for confidence in the Redis provider. Unfortunate that it's in the underlying stream logic. :(
If you can repro using the sms, then we can add a test case for this in our sms tests... this is really good news!
For clarification we need to:
Set up single grain to subscribe to 2 implicit streams, as defined earlier in this issue.
Send events to both streams.
Ensure that all events are processed.
Is that correct?
Yeah - there's a small amount of overhead load that our system imposes on the stream handler as well but I'm not sure that is necessary to duplicate the issue. A more representative picture of our current setup including this (presumably unnecessary) load is:
Oh!! wait.. I have a theory. We've batching interfaces for async observers and observables. They are untested but still on the public surfaces (unfortunately). On the IEnumerable
IAsyncObserver<IEnumerable<DocumentA>> documentAStream = provider.GetDocumentStream<DocumentA>(id);
await documentAStream.OnNextAsync(docs);
if you just call OnNextAsync on the IAsyncStream itself with an enumerable it will use the batch version of OnNextAsync which, as stated, is not tested.
EDIT: NM. batch calls have different names :/
Question. are the stream ID's (guids and namespaces) the same for the streams of DocumentA and streams of IEnumberable
Good question but they're not. :) The IEnumerables are "Collected-Xxx" whereas the individual streams are "Xxx".
I originally implemented using the batch APIs then ran into "Sorry, this isn't implemented" exceptions. :)
hrrm... not able to repro yet. What is the concrete container passed to IEnumerable
I'm using List..?
It's a list; List<DocumentChange<DocumentType>> where DocumentChange is:
c#
public class DocumentChange<TDocumentType> : IDocumentChange {
public TDocumentType Old { get; set; }
public TDocumentType New { get; set; }
}
The actual type past around for streams is IEnumerable<IDocumentChange<DocumentType>>.
One other possible difference between your test and my repro case is the data being passed around is quite a bit larger. Probably a couple orders of magnitude larger.
Update: I was able to locate the issue and determine that this was not an issue with Orleans or the streaming infrastructure :).
@berdon - it would be helpful if you'd share in general what was the issue. Maybe we could learn from it as well.
Thanks in advance
An unfortunate return instead of continue in the OnNextAsync. So, just a trivial programmer error. :(
Most helpful comment
@berdon - it would be helpful if you'd share in general what was the issue. Maybe we could learn from it as well.
Thanks in advance