Orleans: Best Practices Question: How to limit concurrency in re-entrant grain?

Created on 31 Jan 2017  路  12Comments  路  Source: dotnet/orleans

I'm working on Ship, an application which integrates with GitHub and has to respect their API limits. Things we need on a _cluster wide, per user basis_:

  • Ability to limit concurrent requests to GitHub.
  • Ability to smooth requests (maximum x/sec, somewhat even spacing)

GitHub publicly discloses some limits, but is secretive about others. We've had to tweak things a lot.

Our current solution uses a grain for each user, and routes all requests to GitHub through the user grains. The grain is re-entrant, with concurrency internally limited by a SemaphoreSlim, and smoothing implemented with Task.Delay(...). For now we've completely disabled the smoothing, and are only enforcing concurrency limits.

The grain looks something like this:
```C#
[Reentrant]
public class GitHubActor : Grain, IGitHubActor, IGitHubClient, IDisposable {
public const int MaxConcurrentRequests = 2;

private volatile GitHubRateLimit _rateLimit; // Do not update directly please.
private void UpdateRateLimit(GitHubRateLimit rateLimit) {
  lock (this) {
    if (_rateLimit == null
      || _rateLimit.Reset < rateLimit.Reset
      || _rateLimit.Remaining > rateLimit.Remaining) {
      _rateLimit = rateLimit;
    }
  }
}

public Task<GitHubResponse<Account>> User(GitHubCacheDetails cacheOptions = null) {
  var request = new GitHubRequest("user", cacheOptions);
  return Fetch<Account>(request);
}

private DateTimeOffset? _dropRequestsUntil;
private volatile bool _dropRequestAbuse;
private volatile bool _lastRequestLimited;
private SemaphoreSlim _maxConcurrentRequests = new SemaphoreSlim(MaxConcurrentRequests);

private async Task<GitHubResponse<T>> Fetch<T>(GitHubRequest request) {
  if (_dropRequestsUntil != null && _dropRequestsUntil.Value > DateTimeOffset.UtcNow) {
    var rateException = new GitHubRateException(UserInfo, request.Uri, _rateLimit, _dropRequestAbuse);
    if (!_lastRequestLimited) {
      _lastRequestLimited = true;
      rateException.Report(userInfo: UserInfo);
    }
    throw rateException;
  }

  // Clear flag since we made it this far.
  _lastRequestLimited = false;

  await _maxConcurrentRequests.WaitAsync();

  GitHubResponse<T> result;
  try {
    result = await SharedHandler.Fetch<T>(this, request);
  } finally {
    _maxConcurrentRequests.Release();
  }

  // Token revocation handling and abuse.
  var response = result as GitHubResponse;
  bool abuse = false;
  DateTimeOffset? limitUntil = null;
  if (response?.Status == HttpStatusCode.Unauthorized) {
    using (var ctx = _shipContextFactory.CreateInstance()) {
      await ctx.RevokeAccessToken(AccessToken);
    }
    DeactivateOnIdle();
  } else if (response.Error?.IsAbuse == true) {
    abuse = true;
    limitUntil = response.RetryAfter ?? DateTimeOffset.UtcNow.AddSeconds(60); // Default to 60 seconds.
  } else if (_rateLimit?.IsExceeded == true) {
    limitUntil = _rateLimit.Reset;
  }

  if (limitUntil != null) {
    _dropRequestAbuse = abuse;
    _dropRequestsUntil = limitUntil;
    using (var context = _shipContextFactory.CreateInstance()) {
      var oldRate = _rateLimit;
      var newRate = new GitHubRateLimit(
        oldRate.AccessToken,
        oldRate.Limit,
        Math.Min(oldRate.Remaining, GitHubRateLimit.RateLimitFloor - 1),
        limitUntil.Value);

      // Record in DB for sync notification
      await context.UpdateRateLimit(newRate);
    }

  } else if (response.RateLimit != null) {
    // Normal rate limit tracking
    UpdateRateLimit(response.RateLimit);
  }

  return result;
}

}
```

This appears to work, and well. The only problem is that occasionally we'll see a lot of timeout related exceptions:

System.TimeoutException: Response did not arrive on time in 00:00:30 for message: Request S10.1.0.4:11111:_grn/_/_@_->S10.1.0.7:11111:_grn/_/_@_#_: global::RealArtists.ShipHub.ActorInterfaces.GitHub.IGitHubActor:Labels(). Target History is: .

and warnings:

321 WARNING 100157 CallbackData 10.1.0.8:11111] Response did not arrive on time in 00:00:30 for message: Request S10.1.0.8:11111:_grn/_/_@_->S10.1.0.7:11111:_grn/_/_@_#1744900: global::RealArtists.ShipHub.ActorInterfaces.GitHub.IGitHubActor:Repository(). Target History is: . About to break its promise.

My best guess is that messages sometimes back up, and without shedding, timeouts will continue until request rate decreases and things can catch back up. My thought was to only wait for the semaphore for at most (message timeout - already elapsed time - 10) seconds and drop requests that can't be fulfilled in time on the floor.

Can you tell if I'm doing anything obviously wrong? Is there a better approach I should be using? I can provide server logs if that's helpful, but would need to know what information in the grain/message identifiers could potentially be user info, so I can anonymize it.

question

All 12 comments

I don't see anything obviously wrong, and your assumptions about the problem seem reasonable. I'd suggest adding some instrumenting (counters, logs..?) to be more confident, but that's your call.

In the code above, I don't see either of the calls from the timeout and warning (Labels(), Repository()). What makes you suspect the listed code?

Do you know how long the SharedHandler.Fetch calls are taking? Is it possible that this could be performance hiccups on a downstream service?

As far as a fix, how critical is it that the latest data be returned? I ask because the grain can act like a cache, caching the previous response, and only fetch a new version if the previous fetch has completed.

In the code above, I don't see either of the calls from the timeout and warning (Labels(), Repository()). What makes you suspect the listed code?

They're similar to the User function above, just simple wrappers around a call to GitHub.

I suspect I've done something wrong because it appears once a grain enters the timeout throwing state, it remains there for long periods of time, and occasionally requires a silo restart.

When messages start to back up for a grain, are those that can't be delivered before the timeout dropped? Or am I potentially servicing a backlog of already failed requests?

or am I potentially servicing a backlog of already failed requests?

The timeout triggers on the caller side, but the message may remain in the grain queue, so yes, I believe it is possible a grain could be servicing calls that have already timed out.

If your service's request per second requirements are greater than that of GitHub's supported rate, I don't see how you'll be able to meet your requirements without caching data in the grains. While this means introducing the possibility of serving stale data, I don't know what other options there are.

You may be able to throttle the calls more intelligently buy using a stateless worker (with a single instance) to prioritize and rate limit GitHub calls by silo. (for instance, if you have 10 silos and GitHub's max request rate is 10000/s that would mean each silo would be limited to 1000 requests per second, with this knowledge a new grain (with no cached state) could be prioritized over a grain with a grain state 100ms old, but a grain with a 100ms old state would be prioritized over a grain with 10ms old state. This would allow smarter caching but would still serve at lease some stale data under load.

@kogir The concurrency limit is per user, not across users, correct?

I was suggesting on Gitter to try an alternative approach with an explicit queue of requests and no semaphore. The grain would still have to be reentrant, but Fetch would create a TaskCompletionSource<T>, add a struct with it, the received GitHubRequest, and a timestamp to a queue. It would send the request to GitHub immediately only if the concurrency counter (a simple int, no lock, no semaphore) hasn't been reached, and return the Task<T> of the created TaskCompletionSource<T>.

Upon completion of a call to GitHub (after its await) the code needs to resolve the respective TaskCompletionSource<T> with the response value, remove the struct of the request from the queue, send next request waiting in the queue to GitHub. Otherwise, decrement the concurrency counter.

In addition, you can set up a timer to scan the queue and report its size, oldest request age, and any concerns. It could also cancel and remove any requests from the queue if necessary.

@jason-bragg:

The timeout triggers on the caller side, but the message may remain in the grain queue, so yes.

Ah, ok. I may either need to shed them myself or take a different approach. The callers are ok with failures. Also, most of the calls support 304 Not Modified responses - I suppose I could return that when I'm not able to get the latest data before the timeout.

caching data in the grains

I'd not thought of this, but it's an interesting idea. It would result in a lot of duplication of data in memory though.


@sergeybykov:

The concurrency limit is per user, not across users, correct?

Yes.

Just to make sure I understand your suggestion:

  • The internal queue would replace the grain message queue. As quickly as the grain can process incoming messages, either drop them or queue a work item. If queuing, return a TaskCompletionSource<T> that will be completed in the future.
  • Use a timer task to monitor and perform maintenance on the queue. It can cancel requests that can't be satisfied, and log metrics and warnings.

What services the queue though? Does the timer actually dispatch requests, or should I start my own run loop with Task.StartNew()?


I'm beginning to think I may have implemented this with a lot of baggage from previous attempts. Is there a more "Native Orleans" way to do this? Should the calling grains instead be expressing interest in a result and receiving it via streams or some other mechanism instead? Right now the callers are basically polling for updates via the GitHubActor, but the GitHubActor could do the polling itself and just dispatch notifications.

The internal queue would replace the grain message queue.

Yes, effectively. But it will make the queue explicit and visible for logging, monitoring, etc.

Use a timer task to monitor and perform maintenance on the queue. It can cancel requests that can't be satisfied, and log metrics and warnings.

The timer part is optional. No additional maintenance of the is really required if the 'callback' part of the method (after awaiting a call to GitHub) removes completed request from the queue.

There is a "resource governance" pattern for this kind of throttling scenarios which employs a fixed set of non-reentrant grains per target resource, and all requests to the resource are forwarded via the 'resource governor' grains. For example, it is used to restrict the number of open connections to a SQL database.

The challenge with your scenario is that you have throttling per user, and having 2 (or N) 'resource governor' grains for each user grain seems excessive. If instead the limit was per service, then it'd be easy to forward traffic from all user grains via N 'resource governor' grains shared by all user grains.

Should the calling grains instead be expressing interest in a result and receiving it via streams or some other mechanism instead? Right now the callers are basically polling for updates via the GitHubActor, but the GitHubActor could do the polling itself and just dispatch notifications.

Good questions. Clients polling for updates isn't a very scalable and efficient pattern. Using streams or observers for sending updates to clients is more efficient.

It would result in a lot of duplication of data in memory.

Agreed. The service would need to cache data for all active users. Not knowing target load or data size, I can't speculate on the practicality of this.

I don't know your requirements, but I see two problems that may necessitate some architectural changes:
1) Resource Management - Efficient and judicious use of limited GitHub requests. See Sergey's resource governance comments.
2) Overload - How does the system handle requests beyond the GitHub limit. If clients are ok with failures, doing nothing may be acceptable, though probably not preferable. Caching data in grains can allow the service to at least serve possibly stale data, if that is acceptable and memory limits allow, while prioritizing GitHub requests for un-cached or old data could minimize failures or stale responses.

If the GitHub request limits do not scale with your load, the success of your service may well hinge on how well you manage that resource, as it directly influences scalability.

Ok, thanks for all the advice and suggestions. I'll test out a few different approaches. The biggest thing I missed is that the messages could back up when overloaded. Obvious in retrospect.

A queueing system somewhat like what Sergey was suggesting can be seen in https://github.com/dotnet/orleans/blob/master/src/Orleans/Async/AsyncSerialExecutor.cs

For posterity:

It was (obviously) the messages backing up. Why that didn't immediately occur to me I don't know. Thanks for pointing it out.

We instrumented the code to track this and are going to change a number of things, none of which are related to Orleans specifically. Among them will be a priority queue to expedite user-interactive requests, an evolution of @sergeybykov's suggestion, and perhaps a way to register for callbacks when less important requests can eventually be completed.

This is entirely a failure of our app code/design, and Orleans is working as intended (and documented) - the best way to work 馃槃

@kogir Thanks for the follow-up. 馃憤 If you feel like writing a short blog post about the learning and related patterns, you are welcome to post on https://blogs.msdn.microsoft.com/orleans/.

Well, I'm still working through it, but once I have a solution I'll see if I can write up something worthwhile, perhaps with sample code. Thanks!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

guopenglun picture guopenglun  路  3Comments

DixonDs picture DixonDs  路  4Comments

turowicz picture turowicz  路  3Comments

luciobemquerer picture luciobemquerer  路  4Comments

pherbel picture pherbel  路  4Comments