From gitter chat here
We started to migrate 1.4 -> 1.5 (2.0 deemed too big rework and too many breaking changes)
and discovered that we need to make some changes in our code base. They are more or less reasonable like DI for constructors and some removed defaults. There is one change in 1.5 that looks strange and unreasonable and I'd like to understand what was the reasoning behind it:
Previously (in 1.4) ImplicitStreamSubscriptionAttribute has a public Namespace property on it and it was quite nice way to use subscription attribute itself to subscribe instance of the class to streams:
we are using reflection to create shared code in BaseStreamListener class which simply reflected over attributes and produced the list of "namespaces" to subscribe - thus we have centralized way to subscribe to the stream in base class - reusable piece of logic.
in 1.5. Namespace property was replaced by Predicate and for simple scenario like ours - all what we need is to pass namespace string in ctor, which then will be converted to ExactMatchStreamNamespacePredicate (which design is also leaves question as it's internal and doesn't expose any helpful information to subscribe to streams in the stream listening classes).
From my understanding - we still need to subscribe instance to the stream in runtime, and call something like this:
foreach(var streamNamespace in allNamespaces)
var inputStream = StreamFactory.GetStream<TMessageType>(primaryKey, streamProvider, streamNamespace);
var handle = await inputStream.SubscribeAsync(ReceiveCommand, OnCommandError, OnCommandCompleted);
_streams.Add(inputStream);
_handles.Add(handle);
}
At least this is how we are working with streams in 1.4.
In _raw_ Orleans 1.5 we need to add ImplicitStreamSubscription("fooNamespace") attribute to the class but ALSO to keep a separate collection of namespaces to subscribe in runtime like in above example. It feels that this is not a well-thought decision and some practical functionality was removed without adding any obvious way to achieve the same results.
In 1.4. we don't need to maintain any separate collection - it was beautifully simple to scrap all string values from attributes on the class:
protected virtual IEnumerable<string> getNamespacesFromAttributes()
{
var allNamespaces = this.GetType()
.GetCustomAttributes(typeof(ImplicitStreamSubscriptionAttribute), false)
.Select(attrib => ((ImplicitStreamSubscriptionAttribute) attrib).Namespace)
.ToList();
return allNamespaces;
}
and the attributes on top of the class were a single source of truth for namespaces this stream is subscribed to.
We can simply add back this functionality by creating
/// <summary>
/// The reason why this class exists is that between 1.4 and 1.5 ImplicitStreamSubscriptionAttribute lost it's property "Namespace" that we were using in getNamespacesFromAttributes method of BaseListenerGrain.
/// This class simply returns it back.
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public class DrawboardImplicitStreamSubscriptionAttribute : ImplicitStreamSubscriptionAttribute
{
/// <summary>
/// Legacy compartible property Namespace to make subscription easier
/// </summary>
public string Namespace { get; }
/// <summary>Used to subscribe to all stream namespaces.</summary>
public DrawboardImplicitStreamSubscriptionAttribute() : base()
{
this.Namespace = string.Empty;
}
/// <summary>Used to subscribe to the specified stream namespace.</summary>
/// <param name="streamNamespace">The stream namespace to subscribe.</param>
public DrawboardImplicitStreamSubscriptionAttribute(string streamNamespace) : base(streamNamespace)
{
this.Namespace = streamNamespace;
}
}
And ditch all ImplicitStreamSubscriptionAttribute in favour of our own one, and this is what we are doing now. For me it's a code smell to introduce your own attributes to solve the problem that was perfectly solved in older versions of framework.
But the question is - what was the reason behind this breaking change ? It feels that it doesn't affect anything, but was simply to break public contract that was in use (across multiple 1.x version) without any obvious alternative or any afterthoughts of how this is going to affect existing deployments.
Also, If there is a better way to implement this in non 2.0 world - keen to hear in comments.
For implicit subscriptions, the subscriptions always exist, and the grain is always subscribed to them, it's just a question of attaching the observable to the stream. For that, one needs to know which stream the grain is subscribed to. To make this cleaner, we added a new capability. One needs only have their grain implement the IStreamSubscriptionObserver interface.
Something like
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
await handleFactory.Create<IFruit>().ResumeAsync(OnNext);
}
The system will now tell the grain about it's subscriptions by calling the OnSubscribed for each stream attempting to deliver events to the grain. If you want to know more about the stream prior to connecting the observer, the IStreamSubscriptionHandleFactory has information in it, like stream id and provider name.
Comparable code to what you have should be something like:
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = await handleFactory.Create<TMessageType>().ResumeAsync(
ReceiveCommand, OnCommandError, OnCommandCompleted);
_handles.Add(handle);
}
if you want to cache that actual stream (for some reason) you can add
var inputStream = StreamFactory.GetStream<TMessageType>(handleFactory.StreamId.Guid,
handleFactory.ProviderName, handleFactory.StreamId.Namespace);
_streams.Add(inputStream);
Thanks @jason-bragg 馃憤 , this looks clean.
1.5.4. implementation passes a bit different object than in your example - an actual handle, not factory, which looks even better:
public async Task OnSubscribed(StreamSubscriptionHandle<TMessageType> handle)
{
await handle.ResumeAsync(ReceiveCommand, OnCommandError, OnCommandCompleted);
_handles.Add(handle);
}
On caching actual stream - I think it was done before we realised that for a proper Unsubscribe we don't need an actual IAsyncStream<TMessageType> object, but only a handle. We can cache handle in the existing interface which is using to unsubscribe close websocket clients on deactivation.
One thing that I don't 100% understand - with this new code - how runtime selects which stream to chose by name? Previously it was in the code of obtaining stream reference e.g.:
var streamProviderName = "EventhubStreamProvider1";
var inputStream = StreamFactory.GetStream<TMessageType>(primaryKey, streamProviderName, streamNamespace);
But how it works with this IStreamSubscriptionObserver - how Orleans infers which specific stream implementation (out of all registered ones ) to use for this subscription?
Hmm.. this isn't good news.. The original implementation that returned a handle had a bug in it. The changes to the public surface were there to address that. I thought we'd pushed the fix in a 1.5.x release, but I'm not confident.
@xiazen, do you recall what we fixed and in what release?
Umm @jason-bragg does it means that 1.5 doesn't work well with streams or it's just your solution that would not work ?
Seems that this is what I guessed - I tried to debug streams locally - proposed solution does not work. reverted to previous idea with our own custom attribute and old subscription code in OnActivateAsync(). I recon this is quite an issue...
@centur, 1.5.x works with streams, but, due to a bug, the IStreamSubscriptionObserver abstraction only works for sms. :( I think your original work around is probably the best solution until you move to 2.0, unfortunately.. :/
the IStreamSubscriptionObserver bug fix change is included in 2.0 I think. But not 1.5.x
Would it make sense to release 1.5.5 with the fix?
if we can. I don't see why not. Ideally it should, since the feature and the bug is released within 1.5.0. So in theory the fix should be included in 1.5.x
@jason-bragg that may not happen very soon (migration to 2.0). I tried to do it in a separate branch - changes are quite painful and very broad, we haven't got to 1.5.4 yet (battling with DI currently) so having this fix in 1.5.5 would be nice.
We're looking at a 1.5.5 release. If we go forward with it, this will definitely be included.
@xiazen @jason-bragg What commit from master do we need to backport to 1.5.5 for this?
@centur We ported the relevant PRs (#3843 and #3851) to https://github.com/dotnet/orleans/tree/1.5.5. Can you pull, build, and try it to confirm that this will resolve the issue for you?
@centur We just published 1.5.5. I think we can close this now. Please reopen if that's not the case.
Sorry just get a chance to get to it, really, last week was a bit packed.
First outcome - Cloud service silo does not start with this error 馃憤
[WaWorkerHost.exe] Starting Orleans.Runtime.Host.AzureSilo v1.5.5.0 (Release).
[WaWorkerHost.exe] [2018-09-11 02:03:00.398 GMT 12 ERROR 100105 OrleansSiloHost ] !!!!!!!!!! ERROR starting Orleans silo name=AzureCloudServiceHost.DBCloud.ActorsHost_IN_0 Exception=
Exc level 0: System.TypeLoadException: Could not load type 'Orleans.Streams.Core.IStreamSubscriptionObserver`1' from assembly 'Orleans, Version=1.5.5.0, Culture=neutral, PublicKeyToken=null'.
at Orleans.Runtime.Silo..ctor(SiloInitializationParameters initializationParams)
at Orleans.Runtime.Host.SiloHost.InitializeOrleansSilo()
Exc level 0: System.TypeLoadException: Could not load type 'Orleans.Streams.Core.IStreamSubscriptionObserver`1' from assembly 'Orleans, Version=1.5.5.0, Culture=neutral, PublicKeyToken=null'.
at Orleans.Runtime.Silo..ctor(SiloInitializationParameters initializationParams)
at Orleans.Runtime.Host.SiloHost.InitializeOrleansSilo()
[WaWorkerHost.exe] [2018-09-11 02:03:00.421 GMT 12 ERROR 100105 OrleansSiloHost ] !!!!!!!!!! ERROR starting Orleans silo name=AzureCloudServiceHost.DBCloud.ActorsHost_IN_0 Exception=
Exc level 0: System.InvalidOperationException: Cannot start silo AzureCloudServiceHost.DBCloud.ActorsHost_IN_0 due to prior initialization error
at Orleans.Runtime.Host.SiloHost.StartOrleansSilo(Boolean catchExceptions)
Exc level 0: System.InvalidOperationException: Cannot start silo AzureCloudServiceHost.DBCloud.ActorsHost_IN_0 due to prior initialization error
at Orleans.Runtime.Host.SiloHost.StartOrleansSilo(Boolean catchExceptions)
[WaWorkerHost.exe] [2018-09-11 02:03:00.438 GMT 12 ERROR 100285 OrleansAzureSilo ] !!!!!!!!!! Failed to start Orleans silo 'AzureCloudServiceHost.DBCloud.ActorsHost_IN_0' as a Secondary node.
[runtime] Unhandled Exception:
System.Exception: Silo failed to start correctly - aborting
at Orleans.Runtime.Host.AzureSilo.RunImpl(Nullable`1 cancellationToken)
at Orleans.Runtime.Host.AzureSilo.Run(CancellationToken cancellationToken)
at DBCloud.ActorsHost.WorkerRole.Run() in C:\Projects\bullclip-backend\DBCloud.ActorsHost\WorkerRole.cs:line 85
at Microsoft.WindowsAzure.ServiceRuntime.RoleEnvironment.StartRoleInternal()
at Microsoft.WindowsAzure.ServiceRuntime.Implementation.Loader.RoleRuntimeBridge.<StartRole>b__2()
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.ThreadHelper.ThreadStart()
Orleans.dll version in the package looks valid (old Cloud services still require us to have valid <assemblyBinding><dependentAssembly>.... setup, so it always a check for me)

So we are staying on 1.5.4 for now, it's in production already.
But I still can't understand how it supposed to work (again :( ) in new model:
1.5.4 has generic interface IStreamSubscriptionObserver<TMessageType> in 1.5.5 it's non-generic IStreamSubscriptionObserver - is this an accidental or intentional breaking change ? I understand that it's quite unlikely anyone used it as it was not working properly.
Generic version of interface kinda makes sense - in pre 1.5.x it was StreamFactory.GetStream<TMessageType>(primaryKey, streamProvider, streamNamespace); .
And in our code I have that generic TMessageType available ( in all our classes that are implicitly subscribing to streams) so I can re-subscribe as @jason-bragg posted above - var handle = await handleFactory.Create<TMessageType>().ResumeAsync(ReceiveCommand, OnCommandError, OnCommandCompleted); . I remember it was invoked for every attribute instance on the class.
But what I can't understand - how this factory resolves which exact stream provider to use? In 1.4 it was an explicit parameter in this code:
var inputStream = StreamFactory.GetStream<TMessageType>(primaryKey, streamProvider, streamNamespace);
var handle = await inputStream.SubscribeAsync(ReceiveCommand, OnCommandError, OnCommandCompleted);
Right there, first line, streamProvider - name of the provider from config. No magic, if one has 10 Azure queue-backed provider instances registered - he must call these 2 lines for each of 10 provider.
From everything I saw so far in 1.5.x, new streaming has no way to resolve provider by name. There is no such parameters in the interface and class itself is not setting it anywhere. Can someone points me to that magic in the code which is used to select a specific events stream for subscription (if we are implementation-specific - specific set of azure queues)
@sergeybykov I can't reopen this issue. only team members can.
1.5.4 has generic interface IStreamSubscriptionObserver
in 1.5.5 it's non-generic IStreamSubscriptionObserver - is this an accidental or intentional breaking change ? I understand that it's quite unlikely anyone used it as it was not working properly.
Yes it is a breaking change, because 1.5.4 IStreamSubscriptionObserver<TMessageType> has a bug so this feature doesn't work. We fixed it by introducing the factory in 1.5.5. The reason for the change is, if I remember correctly, IStreamSubscriptionObserver<TMessageType> has trouble handle the case where TMessageType is a base class or an interface, but the message sent is a child class of the base class or an interface implementation. The streaming infra has difficulty figure out the correct IStreamSubscriptionObserver<TMessageType> to use, since at this point it only knows the message's concrete type. Ofc we can use reflection to figure out interfaces this class implements, but we have no way to know which one. So we introduced the factory pattern in 1.5.5, hand this job to application code, since application would know which TMessageType it is expecting. Hence this breaking change. Does this make sense?
how this factory resolves which exact stream provider to use?
The IStreamSubscriptionHandleFactory has a getter for stream provider name, so the consumer grain will know which stream provider and which stream it is dealing with, when OnSubscribed called.
From everything I saw so far in 1.5.x, new streaming has no way to resolve provider by name.
In 1.5, it has ways to resolve provider by name, for stream provider specificly, I believe you can call GetStreamProvider method on Grains and cluster client
As for the assembly loading bug... hmm.... is it happening with OrleansSiloHost?
Sorry for the lack of documentation on IStreamSubscriptionObserver<TMessageType> feature, and probably some docs missing in 1.5. streaming. Did you guys successfully migrated to 1.5.4?
@xiazen Assembly loading bug - from stack trace above I would assume so
System.TypeLoadException: Could not load type 'Orleans.Streams.Core.IStreamSubscriptionObserver`1' from assembly 'Orleans, Version=1.5.5.0, Culture=neutral, PublicKeyToken=null'.
at Orleans.Runtime.Silo..ctor(SiloInitializationParameters initializationParams)
at Orleans.Runtime.Host.SiloHost.InitializeOrleansSilo()
this not even getting into our code, it's at the point of constructing silo host in the role.
On others - this still does not clarify anything with regards to my question - how this interface figures out, which exact named stream to use in subscription. if the syntax of new re-subscribing in this issue is correct - name is currently removed from the process flow. And it breaks model in my head.
To illustrate this with a bit more specific context:
We have multiple (5 different) Azure Queue backed streams with different queue names and polling frequencies. Call it AQ_1_Provider to AQ_5_Provider.
In some cases we are sending same message, say UserCreated class instance into multiple streams.
Now for each of those streams we have multiple listeners (grains which have been implicitly subscribed to a stream and specific namespace).
In listeners we usually subscribed to streams by provider name - e.g. AQ_3 and AQ_5 and name of the message (which is usually a message type name, in the case above, Namespace on attribute was assigned to nameof(UserCreated) )
So with this general setup - I understand that to subscribe to UserCreated messages (read namespace) from AQ_3 stream - I need both values in 1.4.x How come it to some magical resolution that in 1.5 and new system we don't use name of a stream provider AQ_3 anymore. How that even works ? There is no named registration or named DI injection in our grains. How it picks correct Stream provider registration ?
How come it to some magical resolution that in 1.5 and new system we don't use name of a stream provider AQ_3 anymore.
Sorry for confused you somehow, but I never said we doesn't need a stream provider name to begin within 1.5. When you set up the subscription usingStreamSubscriptionManager on behalf of the grain, you will need go give streaming infra the stream provider name, stream id and grain ref . This is how stream infra knows what stream provider name to set on the IStreamSubscriptionHandleFactory when it sees a message sent to the consumer grain. Does this answer your question, or you are confused about something else?
@xiazen can you provide an example how grain subscribes to a specific streamprovider with it's name. In the sample above by @jason-bragg here, StreamProvider name is not mentioned anywhere
right now it can subscribe in 3 ways:
1 explicit subscribe to a stream in its grain code :
var streamProvider = GetStreamProvider(GeneratedStreamTestConstants.StreamProviderName);
stream = streamProvider.GetStream<GeneratedEvent>(State.StreamGuid, State.StreamNamespace);
2 explicit subscribe using IStreamSubscriptionManager, whose interface as follows
public interface IStreamSubscriptionManager
{
Task<StreamSubscription> AddSubscription(string streamProviderName, IStreamIdentity streamId, GrainReference grainRef);
Task RemoveSubscription(string streamProviderName, IStreamIdentity streamId, Guid subscriptionId);
Task<IEnumerable<StreamSubscription>> GetSubscriptions(string streamProviderName, IStreamIdentity StreamId);
}
3 implicit subscribe
On Jason's example, he is trying to talk about how to use "subscribe on behalf", aka implicit subscribe or subscribe using IStreamSubscriptionManager, because in this case, something else subscribe a grain to a consumer on their behalf, so the grain need to give its subscription a handle. use the IStreamSubscriptionHandleFactory and IStreamSubscriptionObserver pattern is a new way doing it, which we introduced in 1.5. The old way of subscribe (implicit and explicit) is still there. We didn't delete that to my knowledge. He didn't mention stream provider because he wasn't trying to talk about subscribing, he was trying to talk about after something subscribe on behalf a grain, what is the new way to handle it.
Does this make sense?
close this due to inactivity. Please reopen if still have problems
No, It doesn't make sense, at all.
Upfront note - I said this already above after the first closure of this issue - non-orleans team members can not re-open issues in this repository, so the moment you close it - no one, except core team members, can re-open it again. This either need to be re-configured on repository level or if closing issues is a default way to communicate with externals - let me know.
I'm trying to understand quite simple thing - how to convert code that was used for IMPLICIT grain subscription in 1.4 to 1.5.x version. Either I'm not in the same terminology and flow context or this issue keep falling off topic to Explicit and new magical ways to subscribe. We don't use explicit, so snippets for explicit flow don't make things clearer.
I want to help and confirm that this issue has been fixed in 1.5.5 but I don't know how. There is no documentation explains how to implicitly subscribe a grain to a stream, especially in our case - we have multiple named streams of the same type that we are using.
I spent some time after your last comment (and this includes some cleanuprebuildrestart magic which somehow resolved that blocking problem with assembly loading problem I mentioned above) trying push everything here is something coherent in my head and failed again:
I tried to inject IStreamSubscriptionManager into our grain to obtain IStreamId from it
public HealthCheckListenerGrain(IStreamSubscriptionManager subMgr)
{
_subMgr = subMgr ?? throw new ArgumentNullException(nameof(subMgr));
}
This fails with exception that - IStreamSubscriptionManager is not registered in DI in 1.5.5.
global::Orleans.Streams.IStreamConsumerExtension:DeliverBatch() Exc level 0: System.InvalidOperationException: Unable to resolve service for type 'Orleans.Streams.Core.IStreamSubscriptionManager' while attempting to activate 'DBCloud.ActorCollection.Health.HealthCheckListenerGrain'. at Microsoft.Extensions.Internal.ActivatorUtilities.GetService(IServiceProvider sp, Type type, Type requiredBy, Boolean isDefaultParameterRequired) at lambda_method(Closure , IServiceProvider , Object[] ) at Orleans.Runtime.GrainCreator.CreateGrainInstance(IGrainActivationContext context) at Orleans.Runtime.Catalog.CreateGrainInstance(String grainTypeName, ActivationData data, String genericArguments) at Orleans.Runtime.Catalog.SetupActivationInstance(ActivationData result, String grainType, String genericArguments) at Orleans.Runtime.Catalog.GetOrCreateActivation(ActivationAddress address, Boolean newPlacement, String grainType, String genericArguments, Dictionary`2 requestContextData, Task& activatedPromise) at Orleans.Runtime.Dispatcher.ReceiveMessage(Message message)
This brings me back to where I already was - for subscription I can only implement IStreamSubscriptionObserver on this grain and try to figure way from it. But that method signature doesnt have enough information to subscribe a grain to the stream properly.
Implementing this interface doesn't have any effect in 1.5.5 - it's method not even being invoked. In 1.5.4 method that @jason-bragg mentioned was at least called by Orleans at some point.
[DrawboardImplicitStreamSubscription(nameof(HealthCheckStreamEvt))]
[ImplicitStreamSubscription(nameof(HealthCheckStreamEvt))]
public class HealthCheckListenerGrain : BaseListenerGrain<CoreEvent>, ICoreEventsListener, IStreamSubscriptionObserver
{
/// <summary> Empty ctor </summary>
public HealthCheckListenerGrain()
{
RethrowGeneralException = true;
StreamProviderName = StreamNames.StronglyTypedCoreEvents_Provider;
}
/// <summary> Receive the command from the stream and process it </summary>
/// <param name="cmd"> The command to process </param>
/// <returns> </returns>
public override async Task ProcessCommand(CoreEvent cmd)
{
await onCommand(cmd as dynamic);
}
private async Task onCommand(HealthCheckStreamEvt cmd)
{
GrainLogger.LogInfo($"Stream listener: {nameof(HealthCheckStreamEvt)} Received.", nameof(HealthCheckListenerGrain));
await Task.CompletedTask;
}
public Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
// Don't know what to do here, lets try to see how it's called.
return Task.CompletedTask;
}
}
I'm sorry about closing this issue, I probably skipped your comment about not being able to open issue. There's so many information in above comments, I'm lost on where the issue is exactly. But it seems like your are going back to your first comment on why the breaking change in ImplicitSubscriptionAttr? Is this correct? It seems like the breaking change brought your some trouble , and your are wondering why the breaking change. but at the same time, you already have a solution for that. So this issue is not urgent. Is this correct?
But why does this have to be with IStreamSubscriptionObserver ? I explained before why we introduced this interface in above comments, that it is a new feature which doesn't affect the existing implicit subscribe feature. But it seems like your are less interested in this interface, but more interested on how to do implicit subscribe correctly in 1.5, while migrated from 1.4. Is this correct?
If yes, if I just focus on explain why the breaking change in ImplicitSubscriptionAttr, will that be enough for this issue?
it seems the second issue is , you tried to use implicit subscribe with IStreamSubscriptionObserver and it doesn't seems to work. is this correct?
btw, if you use implicit subscribe to subscribe, then you don't need to use IStreamSubscriptionManager, which is for explicit subscribe.
Let's restart, might be simpler to start from scratch. Here is my chronology:
Using Implicit stream subscription a lot and our codebase critically depends on it to work properly, can get into details but they are less interesting.
Updated 1.4 to 1.5. Discovered that Namespace attribute is missing. Our implementation to attach handlers to subscription stopped to work. Found a workaround - inherit attribute, create our own DrawboardImplicitStreamSubscriptionAttribute with Namespace property, update our codebase (132 unique usages, to illustrate why it's a pain, but Re# helps ). Done. Golden.
Asked how to use Implicit subscription in new model of 1.5.4, got a response from @jason-bragg . Tried it - it didn't work, example was a bit different from real signatures in 1.5.4
Discovered that some bug fix was not backported.
Bugfix was backported in 1.5.5
Tried 1.5.5. - our project was broken because of assembly binding error originated from orleans.
While it was presumably broken, asked how old model of subscription in 1.4 was changed to new 1.5 model (answer is - it was not changed, new model was added), tried to describe my mental model of how we are using streams and how this old model translates to new interface signatures.
Failed few times because in old model even for Implicit subscription we still have to use StreamProviderName, while in new model it's somehow always subscribed.
Tried to experiment with new model interfaces and can't create anything that would work.
And here is my questions:
foreach(var streamNamespace in getNamespacesFromAttributes())
{
var inputStream = StreamFactory.GetStream<TMessageType>(primaryKey, streamProvider, streamNamespace);
var handle = await inputStream.SubscribeAsync(ReceiveCommand, OnCommandError, OnCommandCompleted);
_streams.Add(inputStream);
}
From the conversation in this issue - Jason provided this example (grain implements IStreamSubscriptionObserver):
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = await handleFactory.Create<TMessageType>().ResumeAsync(
ReceiveCommand, OnCommandError, OnCommandCompleted);
_handles.Add(handle);
}
This interface method is not being invoked in 1.5.5.
Regardless of the above 鈽濓笍 statement, my second question is
OnSubscribed() call of a grain if nowhere in these calls we are using ProviderNames?You said that ProviderName is used in IStreamSubscriptionManager
public interface IStreamSubscriptionManager
{
Task<StreamSubscription> AddSubscription(string streamProviderName, IStreamIdentity streamId, GrainReference grainRef);
Task RemoveSubscription(string streamProviderName, IStreamIdentity streamId, Guid subscriptionId);
Task<IEnumerable<StreamSubscription>> GetSubscriptions(string streamProviderName, IStreamIdentity StreamId);
}
I tried to inject it into my grain via DI - attempt failed because this interface is not registered in DI.
Maybe this is an exact culprit and new way of attaching handlers to an Implicit stream in 1.5.5 is still broken ?
Upfront note - I said this already above after the first closure of this issue - non-orleans team members can not re-open issues in this repository, so the moment you close it - no one, except core team members, can re-open it again.
Hmm, I don't believe we changed anything. I looked through the repo settings, and I do not see anything related to permission to open/reopen issues. Any idea where this is configured? We definitely want at least authors of issues to be free to reopen them, and I think this is how it used to work.
Looks like this is a well known problem that we were unaware of - the author can reopen a closed issue only if it was closed by them. If a maintainer closes the issue, the author cannot reopen it. https://github.com/isaacs/github/issues/583. So all our comments saying "please reopen if the issue hasn't been resolved" aren't working. Plus some issue get auto-closed by associated PRs.
Looks like we need to be more careful with closing issues and keep the unfortunate limitation in mind.
Regarding the following code's equivalent
foreach(var streamNamespace in getNamespacesFromAttributes())
{
var inputStream = StreamFactory.GetStream<TMessageType>(primaryKey, streamProvider, streamNamespace);
var handle = await inputStream.SubscribeAsync(ReceiveCommand, OnCommandError, OnCommandCompleted);
_streams.Add(inputStream);
}
its equivalent in 1.5 is as following
foreach (var streamNamespace in getNamespacesFromAttributes())
{
var inputStream = this.GetStreamProvider(streamProvider).GetStream<TMessageType>(primaryKey, streamNamespace);
var handle = await inputStream.SubscribeAsync(ReceiveCommand, OnCommandError, OnCommandCompleted);
_streams.Add(inputStream);
}
Regarding to point 2 :
We have a case when we register multiple named instances of a particular StreamProviderType (instances have different initial configuration). How does Orleans know which instance to inject into
OnSubscribed() call of a grain if nowhere in these calls we are using ProviderNames?
Orleans knows that from the producer. Because the producer need to produce on certain stream, with a certain providerName, streamId and namespace. Orleans internally know which message is related to which stream. So when orleans deliver this message to the consumer, then it would tell OnSubscribe() that which stream it is, including its streamId, streamProviderName and namespace. Application code can retrieve that from IStreamSubscriptionHandleFactory.ProviderName, IStreamSubscriptionHandleFactory.StreamId etc. I mentioned this before, not sure if you missed it. So if a consumer consumers stream from multiple stream provider, then your OnSubscribe code can look like this :
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
**var streamProvider = handleFactory.ProviderName. //from here you can decide which ReceiveCommand, OnCommentError delegate to give on following call.**
var handle = await handleFactory.Create<TMessageType>().ResumeAsync(
ReceiveCommand, OnCommandError, OnCommandCompleted);
_handles.Add(handle);
}
I do verified this ISubscriptionObserver works in master, whose streaming code is equivalent to 15.5. I tested it with implicit subscribe. I suspect it not working on your end may due to the assembly loading error. If the type isn't being loaded, then it probably won't work
System.TypeLoadException: Could not load type 'Orleans.Streams.Core.IStreamSubscriptionObserver`1' from assembly 'Orleans, Version=1.5.5.0, Culture=neutral, PublicKeyToken=null'.
at Orleans.Runtime.Silo..ctor(SiloInitializationParameters initializationParams)
at Orleans.Runtime.Host.SiloHost.InitializeOrleansSilo()
With that being said, ISubscriptionObserver isn't required to be used with implicit subscribe, it is just another way. If you don't want to use it, you can still use the way I mentioned in my previous comment.
Regarding
You said that ProviderName is used in IStreamSubscriptionManager
I tried to inject it into my grain via DI - attempt failed because this interface is not registered in DI.
Maybe this is an exact culprit and new way of attaching handlers to an Implicit stream in 1.5.5 is still broken ?
I want to first clear out that IStreamSubscriptionManager is for explicit subscribe. You probably know that, but I just want to make sure I don't confuse you again. It is used to subscribe on behalf of the consumer without having activating the consumer and subscribe.
To get IStreamSubscriptionManager, you need to do a dance like this
serviceProvider.GetService<IStreamSubscriptionManagerAdmin>().GetStreamSubscriptionManager(StreamSubscriptionManagerType.ExplicitSubscribeOnly);
Sorry for not clearing that out earlier. IStreamSubscriptionManagerAdmin is in DI, user need to get StreamSubscriptionManager from it.
With that being said, the old fashioned way in 1.4 for explicit subscribe still exisits, this is just a new feature.
I'm interested in that assembly loading error:
System.TypeLoadException: Could not load type 'Orleans.Streams.Core.IStreamSubscriptionObserver`1' from assembly 'Orleans, Version=1.5.5.0, Culture=neutral, PublicKeyToken=null'.
at Orleans.Runtime.Silo..ctor(SiloInitializationParameters initializationParams)
at Orleans.Runtime.Host.SiloHost.InitializeOrleansSilo()
which might be the core of why you didn't see OnSubscribe got invoked. I'd love to sort that out as well if you can provide more details or repro
On this:
its equivalent in 1.5 is as following
foreach (var streamNamespace in getNamespacesFromAttributes())
{
var inputStream = this.GetStreamProvider(streamProvider).GetStream(primaryKey, streamNamespace);
var handle = await inputStream.SubscribeAsync(ReceiveCommand, OnCommandError, OnCommandCompleted);
_streams.Add(inputStream);
}
This is really my own code (except that in v1.4 code we are using abstraction StreamFactory.GetStream<TMessageType> of that this.GetStreamProvider method for making it injectable for our unit tests) and it's relies on a known streamNamespace.
In 1.5.5 - there is no way application code can extract that Namespace as string from attribute.
How this code expected to look like with raw Orleans functionality in 1.5.5, when namespace defined in attribute is not available to app code anymore? This is the only important question here
I want to first clear out that IStreamSubscriptionManager is for explicit subscribe. You probably know that, but I just want to make sure I don't confuse you again. It is used to subscribe on behalf of the consumer without having activating the consumer and subscribe.
I didn't. With this knowledge half of my questions above don't make sense ...
How this code expected to look like with raw Orleans functionality in 1.5.5, when namespace defined in attribute is not available to app code anymore? This is the only important question here
Yes it isn't anymore, unless one is doing some hacky reflection code. When we made that change, I don't think we had your use case in mind, as in we didn't know people would use the ImplicitSubscribeAttr that way. Most of our users have a separate collection of streamNamespace kept aside. I'm sorry that brought you inconvenience. Is it a big inconvenience for you?
Now I understand why @jason-bragg brought up that ISubscriptionObserver, because if you don't want to keep a separate list of streamNameSpace, then ISubscriptionObserver is a good alternative. If you want to know more on it, or don't understand how it work, I can dive more in it. But for this comment I don't want to bring more noise, since that was proven confusing in our past convo :) .
So in short, if you just want to migrate to 1.5, I think keep a sperate collection of streamNameSpace is indeed inconvenient, but OK. Do you disagree? Unless it is super inconvenient for you, then please elaborate why. And we will see what we can do.
I do disagree. Keeping such namespaces in a separate collection is not a good practice, and removing it without a reason is a wrong decision.
Keeping namespaces in a separate file (say namespaces class with consts) require information duplication which leads to error prone code and multiple sources of truth.
For example - to implicitly subscribe to said Namespace1..Namespace7 out of entire collection of [1..50] Namespaces one need to list 1..7 as 7 entries of attribute on the grain class AND duplicate this whole collection somewhere inside the class, so application code can attach correct handlers to these always-exist subscriptions. Now if you want to add or remove namespaces - you need to modify both collections - list of attributes and in-class copy.
Such approach in our codebase is just begging for subtle bugs - getting these collections out of sync (ant it's very easy when you have 10-15 namespaces) would cause incorrect behaviours - grain would not receive certain messages (no handlers attached as there is an attribute on a grain, but in-class collection entry is missing) or receive messages it should not, and no longer knows how to handle them.
As I mentioned before - we restored Namespace property in our own attribute. We already migrated to 1.5.4 with our custom attribute, it's just quite annoying that simple and harmless properties are quietly taken out without reasoning or consideration how this components were actually used. Figuring what's broken between minor revision upgrades is frustrating and taking too much time and energy that can be spent elsewhere.
I want to understand how to use ISubscriptionObserver for such implicit subscriptions as in my understanding it's a new "way forward" for streams, that will replace Namespace-based approach. It's not critical for us to know it right now as we sorted out all problems and running production release on 1.5.4. Is it described somewhere in documentation ? I can't find anything by ISubscriptionObserver keyword.
Figuring what's broken between minor revision upgrades is frustrating and taking too much time and energy that can be spent elsewhere.
I apologize about that, we aren't following strict sematic versioning practice in 1.x releases. We are trying to follow that in our 2.x release , hopefully the experience will be better there.
Regarding the ISubscriptionObserver, we don't have documentation about it yet. I do have a test which uses ISubscriptionObserver with ImplicitSubscribe. I think it can be a good sample for this feature. The commit is here : https://github.com/xiazen/orleans/commit/1503474950826b4a3e7caea646cef30afba4246d
The ImplicitSubscribeGrain should be a good starting point to understand how to use ISubscriptionObserver with ImplicitSubscribe. In the test suit I wrote, I do have test using multiple stream providers , which I think might be useful to you as well.
Haven't submit this as a PR to merge the tests to master, since I'm sorting out some internal test infra issues. But I plan to merge it into master soon.
close due to inactivity. Feel free to reopen.
I'm not sure which activity was expected from me on this topic. You said - there is no documentation, just a test suite with samples to figure out everything yourself. Any progress in a documentation space to highlight this breaking change ?
no plan on the docs yet. I can move this to backlog or certain milestone, and add doc tag on it. But generally We highlight breaking change in release note, not so much in documentation.
I can reopen this. I closed it merely due to inactivity. You didn't reply to my last comment for a long time, so I think it might be solved in your side. Not everyone remembers to close issues when the issue is solved. Close due to inactivity is merely designed to solve that problem.