Orleans: Stream throwing an exception

Created on 28 Aug 2016  路  13Comments  路  Source: dotnet/orleans

I'm writing a tracking service where the devices connect to the server via TCP and I need to report to a company the status of the devices, I have a DeviceGrain where I deserialize the message, make some calculations and then, via a stream, report the data to a CompanyGrain, on my OnActivateAsync I have the following piece of code:

_stream = GetStream();

            //when a new message arrive
            _stream.SubscribeAsync((data, token) =>
            {
                if (data.GetType().ToString().Contains("Alert"))
                {
                    _notificationHub.Invoke("SendAlert", new object());
                }
                else if(data.GetType().ToString().Contains("Notification"))
                {
                    var status = (StatusNotification)data;
                    DeviceStatus deviceStatus;
                    var exists = devicesStatus.TryGetValue(status.Id, out deviceStatus);
                    if (exists)
                    {
                        devicesStatus[status.Id] = status.Status;
                    }
                    else
                    {
                        devicesStatus.Add(status.Id, status.Status);
                    }
                }

The method GetStream is written as follows:

private IAsyncStream<object> GetStream()

        {
            var streamProvider = base.GetStreamProvider("SMSProvider");
            var companyStream = streamProvider.GetStream<object>(this.GetPrimaryKey(), "Devices");
            return companyStream;
        }

Everything works fine, the code inside the SuscribeAsync gets executed as it should (when a message arrives) bue it always throws the exception written below.

Extension not installed on grain GrainCollection.DeviceGrain attempting to invoke type Orleans.Streams.OrleansCodeGenStreamConsumerExtensionMethodInvoker from invokable Orleans.Runtime.ActivationData
Exception = Orleans.Runtime.GrainExtensionNotInstalledException: Extension not installed on grain GrainCollection.DeviceGrain attempting to invoke type Orleans.Streams.OrleansCodeGenStreamConsumerExtensionMethodInvoker from invokable Orleans.Runtime.ActivationData

question

Most helpful comment

TL;DR - Remove the ImplicitSubscription attribute from the DeviceGrain.

The ImplicitSubscription attribute is used on grains that one wants to implicitly subscribe to a stream. The DeviceGrain produces on the stream but does not subscribe to it. By having an ImplicitSubscription attribute on the grain, the streaming infrastructure is trying to deliver events to DeviceGrains, and getting the referenced exception because those activations are not subscribing to the stream during grain activation.

All 13 comments

Is it possible you are missing await before subscribe?

I thought so, I added async and await and the error persisted.

Just to clarify, you changed the code to await _stream.SubscribeAsync((data, token) =>?

Exactly.

This is very strange. Even though the lambda should be totally fine. Can you try defining the callback as a method on the grain class? Just to compare.

As Sergey stated, you must await the subscribe call or this can occure. Assuming you are:

Is your grain marked reentrant? That could cause this behavior.

Are there any conditions under which your grain could activate without subscribing?

What stream provider are you using?

Are you using implicit subscriptions or explicit subscriptions? Looks like implicit..

@jason-bragg even if the grain is marked as reentrant, we should not be allowing reentrancy before activation has completed - should we? It's like allowing calls to a half-constructed object.

@miker1423 I think a larger code snippet would be useful here.

@jason-bragg answering your questions:
1.- The grain isn't reentrant.
2.- No, the stream is created inside the OnActivateAsync method as well as the suscribe method.
3.- I'm using the simple message stream as my provider
4.- I'm using Implicit subscriptions

@ReubenBond, the thing is that the only thing I'm doing in the OnActivateAsync method...
I'll omit all the trivial code only leaving the stream part.

using Orleans;
using Orleans.Streams;
using Orleans.Runtime;
namespace GrainCollection
{
         [ImplicitStreamSubscription("Devices")]
         public class CompanyGrain : Grain, ICompanyGrain
         {
                 private IAsyncStream<DeviceNotification> _stream;

                public override async Task OnActivateAsync()
                {
                        _stream = GetStream();
                        await _stream.SuscribeAsync((data, token) => 
                          {
                            if(data.IsAlert){ TriggerAlerts(data); }
                            else { UpdateStatus(data); }
                          }
                }

                private IAsyncStream<DeviceNotification> GetStream()
                {
                      var streamProvider = base.GetStreamProvider("SMSProvider");
                      var companyStream = streamProvider.GetStream<DeviceNotification>(this.GetPrimaryKey(), "Devices");
                      return companyStream;
                }

         }
}

@miker1423
The referenced exception reports that the grain having issue is "GrainCollection.DeviceGrain", but the code you've referenced is from GrainCollection.CompanyGrain. Does GrainCollection.DeviceGrain have a [ImplicitStreamSubscription("Devices")] attribute on it?

@jason-bragg sorry, my fault, wrong class, I'll edit it.
EDIT: I remembered why I showed that code, the exception is raised when the Company grain is executed.

But here is the device grain Code also:

using Orleans;
using Orleans.Streams;
using Orleans.Runtime;
using Orleans.Runtime.Configuration;

namespace GrainCollection
{
         [ImplicitStreamSubscription("Devices")]
         public class DeviceGrain : Grain, IDeviceGrain, IRemindable
         {
                private IAsyncStream<DeviceNotification> _stream;

                public override Task OnActivateAsync()
                {
                       _stream = GetStream();
                       return TaskDone.Done;
                }

                private Task UpdateLocation(Point newLocation)
                {
                       _stream.OnNextAsync(new DeviceNotification {
                            Id = this.GetPrimaryKeyLong(),
                            Status = _status, //Private variable
                            Location = newLocation,
                            IsAlert = false
                        });
                }
         }
}

@miker1423 your DeviceGrain class has an [ImplicitSubscription(...)], but you are not subscribing to that stream in OnActivateAsync

TL;DR - Remove the ImplicitSubscription attribute from the DeviceGrain.

The ImplicitSubscription attribute is used on grains that one wants to implicitly subscribe to a stream. The DeviceGrain produces on the stream but does not subscribe to it. By having an ImplicitSubscription attribute on the grain, the streaming infrastructure is trying to deliver events to DeviceGrains, and getting the referenced exception because those activations are not subscribing to the stream during grain activation.

Thanks! that made it. :smile:

Was this page helpful?
0 / 5 - 0 ratings

Related issues

jt4000 picture jt4000  路  3Comments

scharada picture scharada  路  3Comments

Vlad-Stryapko picture Vlad-Stryapko  路  3Comments

bobanco picture bobanco  路  3Comments

luciobemquerer picture luciobemquerer  路  4Comments