Azure-functions-durable-extension: [Enhancement] Enhance external event usecases to support classic scenarios

Created on 3 Jul 2018  路  7Comments  路  Source: Azure/azure-functions-durable-extension

As explained in the warning here: _"If there is no orchestration instance with the specified instance ID or if the instance is not waiting on the specified event name, the event message is discarded."_

Considering that limitation:
1) It becomes impossible to handle orchestrations with variable (undefined) number of events (because a new event could be received between two WaitForExternalEvent calls)
2) Unless ordering is possible with the trigger mechanism that raised the external events, all expected events need to be WaitForExternalEvent at the very beginning to make sure that none will be discarded.

Suggestions:
A) Considering that Events are the way to interact with an executing orchestration, I would suggest to enhance the event behavior to be more flexible. Currently, with these limitations, I cannot for example notify an orchestration each time a message is received on a queue, unless I know upfront the number of messages that will be received.
B) Maybe adding a new concept like an Orchestration Queue would be acceptable?

To display how easy it is to lose and event, consider the following code, only first event will be processed. I guess if the orchestration is not executed right away, I would probably lose the first event as well.

Versions:

  • Microsoft.Azure.WebJobs.Extensions.DurableTask: 1.5.0
  • Microsoft.NET.Sdk.Functions: 1.0.14
using System.Collections.Generic;
using System.Linq;
using System;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;

namespace FunctionAppStartAndEvent
{
    public static class OrchestratorFunction
    {
        [FunctionName(nameof(OrchestratorFunction))]
        public static async Task<DateTime> Run([OrchestrationTrigger] DurableOrchestrationContextBase context, ILogger logger)
        {
            var eventCount = 0;
            do
            {
                var newEvent = await context.WaitForExternalEvent<MyEvent>(nameof(MyEvent));
                if(!context.IsReplaying) logger.LogInformation($"** Event {eventCount} received. Message: {newEvent.Message}");
                if (newEvent.Message == "stop")
                {
                    break;
                }
                await context.CallActivityAsync<string>(nameof(LogActivityFunction), newEvent.Message);
            } while (true);
            return context.CurrentUtcDateTime;
        }
    }

    public static class LogActivityFunction
    {
        [FunctionName(nameof(LogActivityFunction))]
        public static string Run([ActivityTrigger] string message, ILogger logger)
        {
            logger.LogInformation($"** ACTIVITY: {message}");
            return $"Step {message}!";
        }
    }

    public class MyEvent
    {
        public string Message { get; set; }
    }

    public static class StartWorkflow
    {
        [FunctionName(nameof(StartWorkflow))]
        public static async Task<HttpResponseMessage> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "StartWorkflow")]HttpRequestMessage req, 
            [OrchestrationClient] DurableOrchestrationClient starter,
            ILogger logger)
        {
            var instanceId = $"{DateTime.UtcNow:O}_{Guid.NewGuid():N}";
            logger.LogInformation("** Starting workflow");
            logger.LogInformation($"** Instance Id: {instanceId}");
            await starter.StartNewAsync(nameof(OrchestratorFunction), instanceId, DateTime.UtcNow);

            logger.LogInformation("** Raising Events");
            for (int i = 0; i < 10; i++)
            {
                await starter.RaiseEventAsync(instanceId, nameof(MyEvent), new MyEvent
                {
                    Message = $"EVENT {i}"
                });
            }

            await starter.RaiseEventAsync(instanceId, nameof(MyEvent), new MyEvent
            {
                Message = "stop"
            });

            return starter.CreateCheckStatusResponse(req, instanceId);
        }
    }
}
bug dtfx enhancement fix-ready

Most helpful comment

I thought about this further and think there is actually a simple solution to the discarded event problem that doesn't require any programming model changes.

When an orchestration instance receives an event that it cannot process, it can simply buffer it in memory. If at a later time the orchestrator code calls WaitForExternalEventAsync<T>() with a matching event name, that buffered event can be used to complete the async task immediately. This will allow events to be received and saved at any time, not just wait the orchestrator is in the act of awaiting on the external event. This should also enable the event loop pattern mentioned above.

I think this could be done as a simple change to WaitForExternalEventAsync and RaiseEvent.

All 7 comments

If the example above doesn't work, then it's probably a bug. I don't see any reason for those events to be dropped.

I took a look and confirmed that this does NOT work correctly today due to some race conditions in our message processing logic. We definitely want to make this scenario work.

So good news and bad news:

  • Good news: I was able to locally fix some message ordering issues that your test case uncovered. This will help ensure a predictable behavior when an orchestrator starts and needs to process multiple external events at the same time.
  • Bad news: It turns out I misunderstood the specific problem your test case was illustrating (regarding message loss). I hadn't realized this before, but I think it suffers the same fundamental problem as the old ContinueAsNew race condition with RaiseEventAsync.

I didn't realize what was going on until I fixed the initial message ordering bug. The problematic behavior here is that any pending, unconsumed external events will be discarded as soon as the orchestrator function hits an await. In your case, the activity function call is where all but the first external event is likely to be discarded. This makes logical sense since the orchestrator is unable to take any further action until the activity function call completes.

I assumed that the "solution" would be to remove the activity function call, but it turns out there is another message ordering bug which is a little harder to fix because Azure Storage records the exact same insertion time for messages which arrive at around the same time. I think some sort of client-side sequence numbers will be needed to fix that.

In summary, I can make a partial fix that can help deal with the ordering problem. The problem of dropped events because of an awaited orchestrator function, however, will be more involved.

@cgillum, any chance this get prioritized to be addressed? Currently, this means that events can be dropped and cannot be used reliably to feed a workflow and is a pain for anything more complex than the samples. We really need a reliable way to communicate with the workflow, otherwise we will be stuck with simple standalone workflow. It makes it impossible to have a simple state machine waiting on external events reliably. This could probably be flagged as Bug instead of enhancement since people will fall in that trap for sure.

Consider the following example: submitting commands to be processed by the workflow (start, suspend, stop, insert), received by Event Grid.

Currently, that command example would force us to store commands received to be processed in a separate storage, raise an event to warn the workflow that a new command is available in the storage, but still need to poll the storage with a timer at regular interval since some events can be lost depending on timing.

I still think that something similar to an Orchestration Queue concept would help with that limitation.

The more I think about this, the more I think we will need to introduce a new way of handling external events. The problem with the current design is that it requires an orchestrator to await the external event at the time it arrives. Otherwise we don't have any way to deliver the message.

Another model we could consider is a callback model. For example, instead of WaitForExternalEventAsync, we have an OnExternalEvent method that takes a synchronous callback parameter. This would allow an event to be handled at any time - not just when the orchestrator logic is explicitly blocking and waiting for it. It will be more complicated than the first one, but it might solve some of these fundamental issues.

Thoughts?

I thought about this further and think there is actually a simple solution to the discarded event problem that doesn't require any programming model changes.

When an orchestration instance receives an event that it cannot process, it can simply buffer it in memory. If at a later time the orchestrator code calls WaitForExternalEventAsync<T>() with a matching event name, that buffered event can be used to complete the async task immediately. This will allow events to be received and saved at any time, not just wait the orchestrator is in the act of awaiting on the external event. This should also enable the event loop pattern mentioned above.

I think this could be done as a simple change to WaitForExternalEventAsync and RaiseEvent.

Resolved with v1.8.0 release.

Was this page helpful?
0 / 5 - 0 ratings