Azure-functions-durable-extension: At-most-once support for function calls

Created on 10 Nov 2020  路  6Comments  路  Source: Azure/azure-functions-durable-extension

Is your feature request related to a problem? Please describe.

This is an ask that came in from one of our customers that has built a customer facing integration platform on top of Durable Functions. Their customers use visual tools to build workflows that integrate with 3rd party SaaS services. It's very important for these customers that interruptions in storage availability do not result in duplicate executions without explicit permission from the customer.

Today, we advertise that Durable Functions guarantees at-least-once execution. This means transient network or compute failures could result in functions executing more than once. Customers are thus required to make their functions idempotent, which is not always possible or practical. This proposed feature would allow customers to opt-out of at-least-once and instead opt into at-most-once execution guarantees.

Describe the solution you'd like

There are two parts to the proposed solution:

  1. A programming model for declaring that a function (activity, orchestrator, or entity) must be executed with at-most-once guarantees. In other words, if we receive a message with DequeueCount > 1, then we discard it and fail the orchestration. This ensures that any function that would have been triggered by this message will never be executed more than once.

  2. An API to rewind an orchestration to its last known healthy state. We actually already have this feature, but it's in preview. We would need to finish making it production-ready.

These two features together would allow an orchestration to fail if a potential duplicate execution is detected. A support engineer (i.e. the app owner) could then investigate the failure and then decide whether to resume the orchestration using the rewind API or to just let it stay terminated if replaying is not safe.

Describe alternatives you've considered

As an alternative to automatically failing an orchestration if duplicates are detected (or perhaps in addition), we could expose a new property in the context interfaces that exposes the retry count of the current message. With this property, customer code can make its own decision about how to handle duplicate executions. This could be a simpler solution that is more flexible. However, it would still require a way to stop and resume the orchestration via some manual intervention.

Another option to consider would be to introduce a proper suspend/resume feature. Customers have asked for a feature like this before, and it could also help us implement poison message handling. If a duplicate message is detected, instead of failing the orchestration, the orchestration could be "suspended". A new API could then be introduced that resumes a suspended orchestration. This would be overall a cleaner and potentially safer design since failed orchestrations have the downside of permanently discarding any new messages that arrive. However, it would potentially be a much larger work item.

enhancement needs-discussion

Most helpful comment

@davidmrdavid Connor is correct that the retry count property is mainly intended for activity triggered functions, but would also be very useful for entities.

Suspend/resume probably deserves its own issue with a detailed description, but consider the following scenario:

  1. The orchestration runs and schedules an activity function, but we fail to checkpoint the completion of the activity function due to some failure.
  2. When the message that scheduled the activity function becomes visible again (i.e. after ~5 minutes) the DequeueCount will be set to 2 (this is done automatically by the Azure Storage service).
  3. After we dequeue the message and prepare to execute the activity function, we check the policy for that function to see if it is at-most-once. If it is, we stash the message somewhere safe (blob storage? history table? Details TBD) and put the orchestration in the "Suspended" state.
  4. Any new messages that arrive for a suspended orchestration similarly get stashed away. We continue to do this until something (details on what that "something" is are also TBD) causes the orchestration to be "resumed".
  5. Once resumed, we process all the stashed messages and continue execution as normal.

Some details would still need to be worked out, but it effectively gives us a feature for "pausing" an orchestration and then resuming it if/when the app owner thinks it is safe to do so.

All 6 comments

_Yes! Yes! Yes!_ I'm all in for relaxing the programming constraints.

Before I can provide any feedback though, I have a few questions about the points you brought up.

On exposing the current message's retry-count

You mentioned that we could expose the current message's retry-count to the user and let them decide how to handle repeated executions with it.

I'm trying to make sure I understand this the same way you do. So let's say I have a 2-activity orchestrator where I want to exit the program on a network failure. Do you imagine something like the snippet below?

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):


     # assume `context.retry_count` tracks the current message's deque count
     if context.retry_count != 0: # I'm assuming the current message is to call Hello-Tokyo
         return -1
    context.call_activity("Hello", "Tokyo")

     # assume `context.retry_count` tracks the current message's deque count
     if context.retry_count != 0:  # I'm assuming the current message is to call Hello-Seattle
         return -1
    context.call_activity("Hello", "Seattle")

This kind of reminds me of using the is_replaying flag, but I'm not sure if that's deliberate.

On the difference between the suspend-and-resume API and the rewind API

The usage of a rewind API does make sense to me as a way to recover from failures. However, the suspend-and-resume API proposal confuses me because I do not understand how an orchestrator that's gotten into a "bad state" would recover via a temporary pause. Could you elaborate on what the expected usage of suspend-and-resume would look like? Do users have a way of redirecting their messages to a new orchestrator or something like that?

_Thanks!_

The retry count is probably more relevant on activities, to prevent duplicate executions there. Duplicate message detection for orchestrations feel decidedly less useful to me, as by design they are idempotent.

That makes sense.

I suppose that means we'd need to equip activities with an implicit "context" parameter as well, so they can use it to read their own retry count

At least for C#, the default parameter actually is IDurableActivityContext, so that shouldn't be a problem. I believe JS already has an equivalent class as well.

@davidmrdavid Connor is correct that the retry count property is mainly intended for activity triggered functions, but would also be very useful for entities.

Suspend/resume probably deserves its own issue with a detailed description, but consider the following scenario:

  1. The orchestration runs and schedules an activity function, but we fail to checkpoint the completion of the activity function due to some failure.
  2. When the message that scheduled the activity function becomes visible again (i.e. after ~5 minutes) the DequeueCount will be set to 2 (this is done automatically by the Azure Storage service).
  3. After we dequeue the message and prepare to execute the activity function, we check the policy for that function to see if it is at-most-once. If it is, we stash the message somewhere safe (blob storage? history table? Details TBD) and put the orchestration in the "Suspended" state.
  4. Any new messages that arrive for a suspended orchestration similarly get stashed away. We continue to do this until something (details on what that "something" is are also TBD) causes the orchestration to be "resumed".
  5. Once resumed, we process all the stashed messages and continue execution as normal.

Some details would still need to be worked out, but it effectively gives us a feature for "pausing" an orchestration and then resuming it if/when the app owner thinks it is safe to do so.

@cgillum

If the queues dequeueCount property was exposed to the Activity, I would so something like this

if (activityContext.DequeueCount >= 1) {
   var alreadyPerformed = // Call 3rd party to see if that operation has already been formed
   if (alreadyPerformed)
      return;
   PerformAction();
}
else {
   PerformAction();
}

To abstract the underlying queue delivery count, you could offer an extension point for the user-code to supply a delegate for resolving if the operation can retry or not.

In this delegate I could then choose to

  • If an API is available, automatically call that 3rd party API to check on the status to decide if a retry is safe to performed.
  • Wait for manual intervention triggered by a human, such as a 'resume' operation on the Orchestration. Which would be useful if the 3rd party does not expose an API that can be programmatically accessed.
[FunctionName("Orchestration")]
public async Task RunOrchestratorAsync([OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var guaranteeOptions = new ActivityGauranteeOptions()
    {
        Guarantee = ActivityGuarantee.AtMostOnce,
        AtMostOnceHandler = "ActivityA-AtMostOnceHandler"
    }

    await context.CallActivityAsync("Activity-A", guaranteeOptions, data);
}


[FunctionName("Activity-A")]
public async Task<FooResult> Run([ActivityTrigger] JObject request)
{
    // do the important operation in 3rd party system/app/datastore...

    return new FooResult{ Foo = "bar" };
}


[FunctionName("ActivityA-AtMostOnceHandler")]
public async Task<OrchestrationHint<FooResult>> Run([ActivityAtMostOnceHandlerTrigger] JObject request)
{
    bool alreadyPerformed = // Call 3rd party to see if that operation has already been done, take this result as a source of truth.

    if (alreadyPerformed) {
        return OrchestrationHint.SuspendAndKeepExistingDurableTimers; 
        return OrchestrationHint.SuspendAndInvalidateExistingDurableTimers; 
        // Put the orchestration into an indefinite suspended state, can be resumed with DF HTTP API. Need to be aware of what happens to any DurableTimers when going into suspended mode?!

        return OrchestrationHint.ContinueWithActivityResult(new FooResult{ Foo = "bar" }); 
        // or, provide the neccessary data back to the orchestration to continue, and don't try the activity again
    }

    if (!alreadyPerformed)
        return OrchestrationHint.ContinueOnce; // try the activity again once more (deliveryCount increments)
}

Was this page helpful?
0 / 5 - 0 ratings