Orleans: Reactive Queries

Created on 29 Jun 2016  路  16Comments  路  Source: dotnet/orleans

Hi all,

I'm interning at MSR this summer and will be working on what we (currently) call reactive queries. Since there seems to be quite some interest and previous ideas already, I decided to start a thread to get the community's feedback.

The main idea is to allow programmers to write declarative "queries" that get updated whenever the result of that query changes (where a query is actually just a method calling some other grain methods, for now). This declarative (pull-based) query is then transformed into a reactive (push-based) version by the runtime. Thus, executing such query will set up the required subscriptions (dependency tracking) for you in the runtime in order to get updated whenever the result of the query changes.

Concretely, at the client or consumer end one would write something like this:

``` c#
var Grain = GrainFactory.GetGrain(0);
Query Query = Grain.MyQuery(args).KeepAlive(2000, CancellationToken);
while (Interested) {
Result result = await Query.NextUpdateAsync();
// Use the entire result of the query (so not a delta change), e.g. to display on screen
}


And your query in your grain would look like this, for example:

``` c#
public class MyGrain : IGrainReactive {
    List<IOtherGrain> OtherGrains;

    public Query<Result> MyQuery(args){
       return Task.WhenAll(this.OtherGrains.Select((g) => await g.OtherQuery());
    }

    public Task AddGrain(IOtherGrain grain) {
        this.OtherGrains.Add(grain);
        return TaskDone.Done;
    }
}

This has the following semantics:

  • Whenever the result of the Query changes, this will be propagated to the client and Query.NextUpdateAsync() will return with the new result.
  • Even though multiple updates might have been propagated in between 2 calls, .NextUpdateAsync() will just return the latest result (thus possibly skipping some versions of the state), i.e. we are only interested in the latest state.
  • These updates will stop whenever the invoked grain hasn't received a keep-alive notification for the configured time or when it is explicitly cancelled using the CancellationToken (this is recursively propagated to other grains on which this query depends).

In order to achieve this, the runtime will perform the following actions:

  • When a query is initiated for the first time, it will be executed just as if it is a regular async method call, together with the following additions:

    • Every grain that calls a query will memoize the result of this query.

    • Every grain on which a query was called will set up a subscription to subscribe the caller to this query. The result of this query will also be memoized on this end.

  • Whenever a non-query is called on a IReactiveGrain, all the queries that other grains depend on from this grain will be re-executed. If the result of the query is different from the memoized result, the new value will be pushed to the dependents. They will recursively do the same thing, all the way up to the client that executed the query, where Query.NextUpdateAsync() will then return with the new result of the query. Finally, whenever a query is re-executed and it contains calls to other queries, the re-execution will use the memoized results instead of re-invoking those queries.

Thus, In the example code AddGrain(IOtherGrain grain) will trigger a re-execution of MyQuery(args) for this client, but will re-use the memoized results from g.OtherQuery(). Re-executing the queries on every non-query method invocation is very conservative, but it is a good start to keep the implementation simple.

(Note: Since the result of a query is memoized on both ends, update propagation can later be optimised to let the grains only communicate via diff operations.)

This proposal relates to https://github.com/dotnet/orleans/issues/940, except that we plan to go one step further by not requiring the programmer to manually construct an IObservable. Instead we keep track of when data that is dependent on might affect a query and then automatically propagate this.

We have a pretty good idea about the core of this concept, but the API in which this is exposed to the programmer is still in transit. So any feedback is very welcome.

Thanks for your opinion! :)

enhancement help wanted

All 16 comments

Very exciting, @ticup!

I wonder where the commonality between this and #940 are in terms of implementation. I'm not entirely sure how to implement #940 yet. @dVakulen's CancellationToken PR (#1569) might have some implementation hints.

Have you checked out DynamicData? There are also some similar project which provide a programming model akin to KnockoutJS/Excel where a dependency graph is built and values are intelligently recomputed as needed.

Would the grain return Query<T> or Task<Query<T>>? The await in your example shouldn't be there (unless you stick an async on that lambda, but it's best to just drop it).

Thanks for your thoughts and pointers @ReubenBond !

I wonder where the commonality between this and #940 are in terms of implementation. I'm not entirely sure how to implement #940 yet. @dVakulen's CancellationToken PR (#1569) might have some implementation hints.

I think a large part of the implementation of https://github.com/dotnet/orleans/issues/940 would be subsumed by this implementation. Namely, catching calls that return a Query or a IObservable and setup the required subscriptions/cancellations for that call. I'll definitely go through the PR.

Have you checked out DynamicData? There are also some similar project which provide a programming model akin to KnockoutJS/Excel where a dependency graph is built and values are intelligently recomputed as needed.

I haven't seen DynamicData yet. But, from what I get at first sight for me this goes with the other "reactive programming" models. In these models the programmer has to work with special reactive values and a bunch of special functions on top of that to create new reactive values. These values are either supplied (ISourceCache and ISourceList in DynamicData's case) or explicitly created by the programmer herself.

I haven't quite arrived at putting my finger at the fundamental differences of our proposal and these models yet, but my current thought is that:
1) people don't have to explicitly work with the special reactive values, they are created for you, and
2) reactivity is triggered cross-grain, but transactional intra-grain (i.e. a method will always run on a consistent snapshot of the grain and the other queries it depends on). So you have an inherent composition there that is very hard to achieve in regular reactive models.

Would the grain return Query or Task>? The await in your example shouldn't be there (unless you stick an async on that lambda, but it's best to just drop it).

That's 100% correct, the grain would return Query<T> and it is Query.OnUpdateAsync() that returns a Task<T>. So my first await should be removed (edited now).

@ticup I like this idea,especially this keep-alive functionality is a great idea for production use.

There are a few points that I think could be important for the feature design:

  • Object Identity: How are memoized result objects and their contents identified and updated? In my approach I used a weak table to attach random guids to objects. Something like INotifyPropertyChanged and INotifyCollectionChanged for stream operations could be implemented.
  • Transactionality: How to ensure if a certain set of changes has been applied and propagated to the query result?
  • Query Execution: Considering re-usability it might be beneficial not to store the query in a specific grain, but in a special query grain that is responsible to evaluate a certain query on all grains. If the respective assembly is loaded, it is even possible to pass a delegate across grains (would have to be checked to what extent and under which conditions), or one sticks to transferring expressions with a library that allows to serialize expressions.
  • Query Restriction: Could it be enough to support the standard query operators? They should offer enough expressiveness.

Whenever the result of the Query changes, this will be propagated to the client and Query.NextUpdateAsync() will return with the new result.

This sounds like a possibility. However, imho the use case of data binding should be considered and references should be kept intact whenever possible. Let's take an UI example: If one binds a listbox to a collection of objects, which is the result of a query, the collection should not be reattached but updated.

If you're interested in what I've already implemented of that features in Orleans.Containers (ignore that name, it should be called "Orleans.Streams.Stateful"), we can meet on Skype. Maybe you can reuse some of it :) I have some architectural documentation that is part of my thesis, which is why I can't release it to the public yet.

Hi all,

Sorry for the inactivity on this issue, but we had some internal reasons.
Now I'm 100% back and a lot has happened since last time. As a matter of fact, we now have a finished full-fledged implementation of this concept and we are moving into large-scale benchmarking next.

The important things first: we moved away from _Reactive Query_ to _Reactive Computation_. The semantics of a query were too overloaded for our goals.
Now, let me reiterate and/or clarify the goals and semantics of this feature.

Problem Context

When would you be interested in using a _Reactive Computation_ ?

  • You are interested in the result of a distributed computation (i.e. a bunch of, possibly recursive, grain calls).
  • You want to be notified when there is a new _latest value_ of that computation. This means you are only interested in the current or latest state of that computation and don't care about any previous or intermediate states.
  • The computation itself is pure, i.e. it does not alter grain state.

Examples

Some concrete examples are:

  • a social network timeline or feed (e.g. facebook, twitter, etc.).

    • You want to see a mix of your friends/followers their posts according to popularity and time.

    • If two people post something new at the same time, you are not interested in first seeing your timeline where the first post is added and then where both are added.

  • a multiplayer game.

    • You want to see the current state of the players you are currently interacting with.

    • If the server is under heavy load, you are not interested in showing exactly every move that all the other players performed. Instead, you want to show the current state as recognized by the server ASAP.

Solution

This is how a basic Reactive Computation looks like:

``` c#
var Rc = GrainFactory.StartReactiveComputation(() => {
var result1 = await grain1.method(1);
var result2 = await grain1.method(2);
var result3 = await grain2.method();
return result1 + result2 + result3;
});

var It = Rc.GetResultEnumerator();

while (InterestedInResult) {
var result = await It.NextResultAsync();
// do something with result, e.g. push it to a web client through a WebSocket
}

Rc.Dispose();


In this example `grain1` and `grain2` are normal grains that let's say eventually return an integer. It can return this integer depending on local state and/or by calling other grain methods, but it should not have side-effects.

Using `Rc.GetResultEnumerator()` we can get any number of iterators that have one important method, the `It.NextResultAsync()`. This call will block until something has changed such that the result of our computation has changed, returning the new result. It's important to see that `result` is a number and always represents the entire result of the computation (i.e. `result1 + result2 +result3`). So it does not provide you a delta between the current state of the result you are seeing and the new one, like you would normally do with for example streams!

### Semantics

This is what happens:
- Whenever you execute a reactive computation, we recursively track the calls on which this computation depends.
- Every call that stems from a reactive computation is cached in the silo of the caller.
- Notifying clients of the latest value then becomes a matter of (recursive) cache invalidation:
  - Whenever a method is invoked on a grain (outside of the context of a reactive computation, e.g. a normal grain invocation) of which other silos have a cached value of this grains' methods, this grain is marked as "possibly dirty". 
  - The cached methods of this grain will be scheduled for re-execution.
  - When a method is re-executed, it will re-use the cached values it already has for all the other grain calls.
  - When the result of such re-execution differs from the cache, the methods that depend on this cache will be notified and they will also be scheduled for re-execution.
  - This continues recursively, all the way to the reactive computation created by the programmer, triggering the `.NextResultAsync()` of the enumerators.
- As long as the reactive computation is not disposed, it will keep polling (currently every 30sec) the dependencies to let them know it is still interested in the cached results.
- The other way around, when caches and computations see that they haven't been polled for a while, they will disappear.
- Lastly, because grain calls performed in context of a reactive computation are supposed to be pure, they can be executed interleaving with other invocations.

In the case of our example this means that when someone calls `grain2.SomeOtherMethod()` (not from within a reactive computation), `grain2.method()` will be re-executed. If its result has changed, the reactive computation will be notified and it will re-execute with the new result. It will not re-initiate the `grain1.method(1)` and `grain1.method(2)`calls, but instead reuse the cached results for them.

Note: we currently do an overestimation to determine when a grain's state might have changed to see if we need to reschedule executions. But, we can easily add some constructs to allow the programmer to take control of this and notify our system when the grain has changed. For persistent grains we could even hi-jack the `WriteStateAsync()` for this!

### Benefits
- Write **declarative computations** and automatically get notified when your result has changed. As opposed to either
   1) setting up and managing the subscriptions yourself or
   2) by polling and entirely re-executing the computation every X seconds.
- **Caching of method invocations** and thus a lower number of message calls:
  - If a complicated computation depends on multiple other calls, it will re-use cached values when it is re-executed.
  - This caching is silo-wide, thus if the same invocations get called a lot by different computations, they are all re-using the same cache.
- **Batching of re-execution**
  - If a grain changes a lot and computations depend on that grain's state, we will not trigger a re-execution of that grain's methods for every update that occurs. Instead, we will batch these executions such that we execute them just enough to always propagate the latest result. (When you are re-executing a computation and in the meantime 10 dependencies have changed, we only have to execute that computation instead of 10 times).

### Demo/Use-Case

A good use-case for this feature is in combination with a web client and a reactive rendering framework such as [react](https://facebook.github.io/react/). We already created a reactive web version of the Chirper sample using these technologies and the reactive computations. You can already have a look at it [here](https://github.com/ticup/AzureReactiveChirper).

The core of this application goes as follows

1) The server has a WebSocket server that dispatches incoming clients messages. Whenever the client wants to see its timeline (the fist 100 posts), the server executes the following code for that client:

``` c#
public void TimelineSubscribe(string username)
        {
                TimelineSubscriptions.Add(username);

                var grain = GrainClient.GrainFactory.GetGrain<IUserGrain>(username);
                var Rc = GrainClient.GrainFactory.StartReactiveComputation(() =>
                    grain.GetTimeline(100));

                var It = Rc.GetResultEnumerator();

                while (ClientConnected && TimelineSubscriptions.Contains(username))
                {
                    var result = await It.NextResultAsync();
                    Send(new { Type = "TimelineResult", Timeline = result });
                }
            });
        }

When the client is no longer interested its as simple as executing the following for that client:

``` c#
void FollowerUnsubscribe(string username)
{
FollowerSubscriptions.Remove(username);
}


When somebody posts a message this is as simple as:

``` c#
public void NewMessage(string username, string text)
        {
                var grain = GrainClient.GrainFactory.GetGrain<IUserGrain>(username);
                var result = await grain.PostText(text);
        }

The timelines that now need to include this message will be automatically pushed the new timeline by the construction TimelineSubscribe!

On the client-side web applications we use React to declaratively specify given a timeline object, how it should be displayed on the screen. Then, every time we get a new result for the timeline we just have to notify React of this new timeline. React on its turn will do the necessary diffing to permanently re-render the html.

Thanks for hearing me out on this!! :)

Feel free to leave your comments and feedback!

@bwanner A bit late, but here it goes:

Query Execution

Exactly, we currently have a dedicated scheduler per activation that does this and we do the caching of the results in a silo.

Query Restriction

It could be enough, but we feel that like this it's more powerful and simplistic. The fewer new constructs we have to introduce the better. This allows programmers to re-use their normal programming constructs in order to create queries.

Object Identity

Let's take a UI example: If one binds a listbox to a collection of objects, which is the result of a query, the collection should not be reattached but updated.

We believe there is a movement that's going more in the declarative direction, where the programmer should not be concerned by diffing. Instead, the underlying tool at hand should be responsible to decide whether diffing is required and how it should be applied. This drastically decreases the complexity of the program. Look at the exploded popularity of React for example. By doing smart diffing of new state under the hood for you, the programmer no longer has to worry about whether to add, remove or update its item in a list. Instead, React makes these decisions for the programmer.

Of course there are still plenty of applications where you would maybe still need object identity. But, with these reactive computations we are explicitly not doing that and hoping to drive some new declarative tools/applications :)

If you still want to have a skype, let me know!

Tim.

@ticup, Very interesting.

Can you elaborate some on how error handling occurs in this model?
If a grain deactivates (idle), and a non-reactive compute call reactivates the grain on a different silo, how does it get wired back up with the running reactive computations?
In case of silo restart, how do the reactive computations know they need to rebuild lost caches?
If a grain call, within a reactive computation, fails (bugs, timeouts, ...), how is this handled?

Whenever a method is invoked on a grain (outside of the context of a reactive computation, e.g. a normal grain invocation) of which other silos have a cached value of this grains' methods, this grain is marked as "possibly dirty".
The cached methods of this grain will be scheduled for re-execution.
When a method is re-executed, it will re-use the cached values it already has for all the other grain calls.
...
When the result of such re-execution differs from the cache, the methods that depend on this cache will be notified and they will also be scheduled for re-execution.

Is "possibly dirty" sent back to all of the silos interested in this grain, to schedule re-execution, or is the re-execution scheduled on the silo with the grain and interested silos only contacted if the cache changes after the local call is made?

Glad you're interested @jason-bragg, these are great questions!

Can you elaborate some on how error handling occurs in this model?

These questions about failure handling are exactly why we are interested in building this feature. Right now when programmers implement their own "ad-hoc" reactive system (by using some kind of pub/sub), these are things they also need to take into account, but which they often don't.

Our failure model goes as follows:

A reactive computation re-subscribes to its dependencies every x seconds (30 sec right now). This re-subscription is silo/activation oblivious. It simply performs a special "invocation message" that will go through the directory lookup etc. like a normal invocation.

The other way around, a reactive computation validates the computations that depend on itself every Y seconds (60 sec right now). If it hasn't heard of the dependent for too long, it will remove that dependent. If there are no dependents left, the computation is disposed.

So to answer your questions:

If a grain deactivates (idle), and a non-reactive compute call reactivates the grain on a different silo, how does it get wired back up with the running reactive computations?

The running reactive computation its re-subscription will eventually end up with the new activation, because it goes through the directory service.

In case of silo restart, how do the reactive computations know they need to rebuild lost caches?

Same thing, when the re-subscription of the dependencies arrive at the restarted silo, it will detect the requested computation is not there and (re)start it.

If a grain call, within a reactive computation, fails (bugs, timeouts, ...), how is this handled?

You can do two things:

  • You can catch the exception within the computation, i.e.

``` c#
GrainFactory.StartReactiveComputation(() => {
try {
var res = await grain.method();
} catch (Exception e) {
// handle it here
res = ...
}
return res;
});

- Or, if it isn't handled in the computation itself, it will be propagated all the way to the NextAsyncResult():

``` c#
try {
    var res = it.NextResultAsync();
} catch (Exception e) {
    // handle exception here
}

Also, when an exception is occurred, this does not stop the reactive computation. It just sees the exception as a current result.

I hope this answers some of your questions!

@ticup
Thanks, that helps a lot. Interesting model.

  • preface: Some of the below questions may simply be out of scope for this model, as in, this model simply isn't intended to solve them, but I'm trying to get a sense of what kind of problems this would be applied to so..

Strict calculation requirements - ?
The pattern looks viable for real-time data processing, where the most recent results are more important than any previous calculations. It also looks like a good model for systems that value eventual consistency and availability over strict consistency. This, in itself, is quite useful, but I was curious if you'd considered situations with more strict calculation requirements? For instance, an end-game report where all the players contributions to the game -must- be calculated before a player ranking can be calculated. (I realize that rankings can be calculated on the fly and changed, but assume we've only a single set of data we can present, so the results need be final.)

Modifying queries - ?
How does this system handle queries that change? For instance, if I want to track the number of players in all active games, every time a new game was started the query would need to be modified to start asking that game grain how many players it had in it.

Performance - ?
Do you have performance targets for this system?
I suspect the tricky part will be reducing the overhead of the chatter in services with large numbers of low activity reactions.
One of the issues we ran into with optimizing streaming (other than the error handling issues) was that we needed to handle 10s to 100s of thousands of low activity streams (maybe one or two events per second). This turned out to be quite difficult to optimize in a distributed system. Lower numbers of active systems would have been far easier to optimize than high numbers of low activity ones.

Data loss/Recoverability - ?
How would this system be applied to calculations that are cumulative. Like how many times Jim has rolled his humvee? Assuming the Jim grain can be asked how many times he rolled his Humvee today. "It doesn't solve that problem" is a viable answer.. but if it can be used for such calculations, how would data loss and recovery be handled?

@jason-bragg
Waw, even more interesting and useful questions!
A shot at answering them:

a good model for systems that value eventual consistency and availability over strict consistency.

This is spot on!

Strict calculation requirements

This is perfectly possible, because dependencies are dynamic and you can write any code in a computation (as long as it doesn't contain side-effects). This means you can add a test to the computation that checks whether the contributions are calculated or not and only then get the player's ranking:

``` c#
var Rc = GrainFractory.StartReactiveComputation(() =>{
// displays the current game state on the main screen
var State = await Game.GetState();
DisplayMain(State);

// displays the rankings on the side screen, if they are available
if (await Game.ContributionsCalculated()) {
    var Ranking = await Rankings.GetRanking(User);
    DisplaySideBar(Ranking);
 } else {
     DisplaySideBar("...Loading");
}

});


Once all the contributions are calculated, the computation will be re-executed and at that point it will get the ranking.

> **Modifying queries**

A computation is **re-triggered** both when _the result of calls it depends on have changed_, or when its _local state might have changed_. So we could implement your example as follows (you would fan out the calls, but for the sake conciseness):

``` c#
class App: GrainWithGuidKey {
   List<IGame> ActiveGames = new List<IGame>();

   public Task<int> GetAllNumPlayers() {
        return ActiveGames.Aggregate(0, (acc, game) => acc + await game.GetNumPlayers()));
    }
}

// somewhere else (e.g. the client)
var App = GrainFactory.GetGrain<IApp>(...);
var Rc = GrainFactory.startReactiveComputation(() => Grain.GetAllNumPlayers());

So both if an existing game's number of players changes (game.GetNumPlayers()) or a new game is added/deleted (ActiveGames), the new result will be recalculated! And remember that the calls are cached, so if ActiveGames changes because a game was added, we will only get the number of players for the new game and re-use the ones we already had.

Performance

I suspect an application with a lot of low activity computations would actually be an ideal target for this feature. Because the overhead that you get is 1) a re-subscription message every X seconds (which you can configure, so you can put this very high if you require, and 2) the recalculation of the computations whenever state changes. So overhead of 1) is a matter of configuration and overhead of 2) is making sure you properly (de)construct your grains and the computations. For example, you don't want to maintain a grain that contains a map of all users and then has computations attached for each user. Instead, you want a grain instance per user and very few computations per activation. This makes sure that only the concerning computations are re-triggered when data changes.

This is just a suspicion though, we are now starting to run benchmarks to get more insights in how it actually turns out!

Data loss/Recoverability

So this implies that your computation contains side-effects, e.g. updating the total number of rolls with those of today, which it explicitly shouldn't. So yes, my answer for now is that it doesn't solve that problem, right now :)

Please steer me right if I understood you questions wrong, this is very interesting!

@ticup
Thanks for the detailed response. It's an interesting programming model, that I'll need to think on more. As I work on other issues I'll keep it in mind and see how well it applies.

For the performance issues, I've been thinking on it and suspect the system can be optimized to be highly efficient, but the below statement gives me pause:

"For example, you don't want to maintain a grain that contains a map of all users and then has computations attached for each user. Instead, you want a grain instance per user and very few computations per activation. "

I agree with the first part, but not with the 'very few computations per activation'. There is a question of granularity of processing and data distribution here. For high performance distributed computing the work being distributed must be sufficient to justify the cost of distribution (of data usually), plus consolidation of results. Fine grain computation can quickly become inefficient when distributed, unless the overhead is negligible (which it is not in Orleans). I'm confident you're aware of this, and remind you only because a programing model influences the patterns used. If it promotes inefficient solutions, it will be seen as inefficient even if this is not the case.

This model looks quite useful for constructing declarative queries in a distributed system, but if attention to performance is not taken in both the efficiency of this system and the queries written using it, it will be very, very easy to write applications that perform orders of magnitude slower in distributed environments than they would using only local resources.

All that said, please take it with a grain of salt, as I've not fully wrapped my brain around the model yet. :)

@ticup
For clarification, my interest in this model is that I think it would go very well with streaming. If streams of events are processed and stored into accumulated facts that can then trigger reactive declarative queries that update real time displays.. well that would just be shiny!

@ticup Do you already have a plan for how you are going to share the code you created during your internship? A number of people are curious to see and interested in integrating it.

@sergeybykov Last time we spoke you preferred to put it in the contrib instead of the core code.
In order to put it in the contrib, we identified two required functionalities in the core:

1) user-defined system targets
2) different task context information within the same activation. Right now the same context information is re-used for every task that's scheduled on the same activation. We need to be able to distinguish two different routines within the same activation though. @jdom pointed me to something that might help for this, but I forgot by now... Would need to double check on this feature.

I'd be happy to coordinate either the entire pull request into the core code or to perform the necessary PR's to add it to the contrib.
On the other hand, the reactive code actually belongs to @sebastianburckhardt , so we need to see what his interests are?

That's right. @sebastianburckhardt and I are figuring this out as we speak. I should have started by asking him. :-) Will update the issue soon.

Looks like we'd need to break it down into a series of changes, some rather generic: user-level system targets, method caches; and some specific to the reactive queries.

I have thought about this some more... I don't quite see the value of breaking it apart. There are really just two components: the reactive caching mechanism, and the API. So we could separate the caching mechanism from the API, but it is not clear to me why that would be an advantage. The mechanism without an API is not that useful.

The reactive caching mechanism is the technical centerpiece, and where all the complexity lives. It uses the following runtime-internals:

  • a special scheduling mode for re-executing cached methods, which is used to detect when and how caches need to be updated
  • a system target used for delivering updates to caches
  • modifiers on the message objects that are used to create and maintain the relationship between caches (observers) and the grain being observed with minimal overhead (i.e. not requiring extra management messages to be sent)

These things are very tightly connected to deep runtime internals. I don't think we want to provide extension mechanisms for message formats or scheduler internals.

Very nice feature to have indeed. I think in general having system targets as pinned grains as a feature for Orleans user code is interesting on its own right however we need to warn users of detrimental effects it might have if they mess it up.

Was this page helpful?
0 / 5 - 0 ratings