Hi, is it possible/recomended to start an orchestrator from another one (fire and forget using DurableOrchestrationClient StartNewAsync). I would like to chain them without sub orchestrating them.
My use case is using two orchestrators, first one is fanning out activities and I want to start the second one when all these activities are ended.
Thanks!
Yes, you can definitely do that. Here is a very simple example of a generic activity function that can start any new orchestration instance and return the instance ID:
[FunctionName("StartNewOrchestrator")]
public static async Task<string> StartNewOrchestrator(
[ActivityTrigger] StartOrchestrationArgs args,
[OrchestrationClient] DurableOrchestrationClient starter,
TraceWriter log)
{
string instanceId = await starter.StartNewAsync(args.FunctionName, args.Input);
log.Info($"Started orchestration with ID = '{instanceId}'.");
return instanceId;
}
Note that StartOrchestrationArgs is a built-in type that you can use if you want (or not).
Isn't it easier to directly call
c#
string instanceId = await starter.StartNewAsync(args.FunctionName, args.Input);
log.Info($"Started orchestration with ID = '{instanceId}'.");
in the orchestrator?
That's not allowed. It violates this important rule of orchestrator functions:
Orchestrator code must never initiate any async operation except by using the DurableOrchestrationContext API.
More info here: https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay
Ok, I thought only calls to non durable-function APIs where forbidden. It would be nice to be able to fire-and-forget orchestrators from other orchestrator without adding another activity, for the case where you want to chain them.
Like callSuborchestror, we could have a callNextOrchestrator in the context API.
That's something we can look into. However, is there another way we could achieve your scenario?
For example, why wouldn't the following work for the scenario you describe?
public static async Task ParentOrchestrator(DurableOrchestrationContext ctx)
{
var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
tasks.Add(ctx.CallActivityAsync("Foo", null));
}
await Task.WhenAll(tasks);
async ctx.CallSubOrchestrationAsync("ChildOrchestration", null);
}
Is there a reason you need it to be "fire-and-forget"?
I was thinking of "fire-and-forget" because second orchestrator is not a child of first one in terms of logic. In the end I decided to have a "meta-orchestrator" calling these two orchestrators in sequence for other reasons. Don't bother with it as it is totally functionnal without the "fire-and-forget" thing anyway.
There's now a last part of my code where I want to use durable functions but don't know how. I've got a regular function extracting files fom archives and putting them in a queue for other functions to process. This is happening daily and I would like to find a solution that would allow me to start the "meta-orchestrator" from before to start only when all archives are extracted AND files processed. At the moment I'm just starting them with a few hours delay, hopping they don't overlap
Would something like this work for your scenario?
public static async Task ParentOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
// get the list of archives to process
string[] archiveFiles = await context.CallActivityAsync<string[]>("GetArchiveFiles");
// process all archives in parallel and wait for all to complete
IEnumerable<Task> tasks = archiveFiles.Select(
path => context.CallActivityAsync("ExtractAndProcessArchive", path);
await Task.WhenAll(tasks);
// start the meta-orchestrator after archive processing has finished
await context.CallSubOrchestrationAsync("MetaOrchestrator");
}
I don't think it could work as each file extracted from an archive needs to be processed by another function. It could work if I keep track of all extracted file names in a table (for ex) and an activity getting these file names and doing a fan out on them before calling metaorchestrator. But if I do so files are not processed as soon as they are extracted anymore (was nice to have processing run in parallel with extraction).
So the constraint is that extraction must happen in a separate function from processing? That's doable too with a slight modification:
public static async Task ParentOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
// get the list of archives to process
string[] archiveFiles = await context.CallActivityAsync<string[]>("GetArchiveFiles");
// process all archives in parallel and wait for all to complete
IEnumerable<Task> tasks = archiveFiles.Select(
path => context.CallActivityAsync("ExtractArchive", path)
.ContinueWith(t => context.CallActivityAsync("ProcessArchive", t.Result));
await Task.WhenAll(tasks);
// start the meta-orchestrator after archive processing has finished
await context.CallSubOrchestrationAsync("MetaOrchestrator");
}
Now extraction and processing is broken up into separate functions, and you also continue to get parallelism between the two. This solution is understandably less obvious because I'm using ContinueWith while most examples use await, but it's a more advanced technique you can use to implement multiple sequential function chains in parallel within a single orchestrator function.
I understand now that I was not clear enough on one point : the archives can contains up to thousands of small xml files, this is why I process them in a separate function. I understand how that would work for one file per archive but I don't see how the info about many files could transit with ContinueWith.
This is the code of the extract function
```c#
using SharpCompress.Readers;
[FunctionName("ExtractArchive")]
public static async Task Run([QueueTrigger("gz-to-extract", Connection = "AzureWebJobsStorage")]string archiveFullName,
[Blob("archives/{queueTrigger}", FileAccess.Read, Connection = "AzureWebJobsStorage")]Stream myBlob,
TraceWriter log)
{
using (var reader = ReaderFactory.Open(myBlob))
{
while (reader.MoveToNextEntry())
{
if (!reader.Entry.IsDirectory && Path.GetExtension(reader.Entry.Key) == ".xml")
{
//extract file to blob storage
string newPath = generateBlobPath();
var blob = StorageService.GetXmlBlobReference(newPath);
using (var writer = await blob.OpenWriteAsync())
{
reader.WriteEntryTo(writer);
}
await blob.SetMetadataAsync();
//add blobPath to queue for process function
var queue = StorageService.GetCloudQueueReference("FileToConvertQueue");
await queue.AddMessageAsync(new CloudQueueMessage(newPath));
}
}
}
}
``
I've got another function that process theFileToConvertQueue`.
PS : thanks for your time!
Ah, sorry, I should have figured that out intuitively. I think the ContinueWith path might still work. How about this?
public static async Task ParentOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
// get the list of archives to process
string[] archiveFiles = await context.CallActivityAsync<string[]>("GetArchiveFiles");
// process all archives in parallel and wait for all to complete
IEnumerable<Task> extractThenProcessTasks = archiveFiles.Select(
path => context.CallActivityAsync("ExtractArchive", path)
.ContinueWith(async t =>
{
// The result of extraction is a list of files to process.
// These are all processed in parallel.
IEnumerable<Task> processFileTasks =
t.Result.Select(file => context.CallActivityAsync("ProcessFile", file));
await Task.WhenAll(processFileTasks);
}));
await Task.WhenAll(extractThenProcessTasks);
// start the meta-orchestrator after archive processing has finished
await context.CallSubOrchestrationAsync("MetaOrchestrator");
}
The difference is that I assume ExtractArchive returns a list of the extracted files (instead of adding them to another queue). The orchestrator can then process the extracted files in parallel immediately after the extraction has completed, but without blocking any other extraction tasks.
Let me know if this is closer to what you're looking for. I think this is a great example and I'd love to share it more broadly if we think this is a relatively common pattern. :)
This could work, just three remarks :
Could we imagine an orchestrator being able to process streams? then we could stream extracted file names while the archive is being extracted and process them without waiting. Not sure it could be done though (given your answer in #79).
I've got no problem with sharing the example, processing archives is common but I had trouble finding some suitable cloud method to automate the whole thing. I can also tell that extracting large/fragmented archives (more than 15mb with complex folder tree) can exceed functions timeout. I had only few of them to deal with so I coded a local app to split them in smaller archives.
I don't see a problem with message size. The only data being passed back and forth is file paths, and I assume you don't have more than a few thousand files in a single archive.
Fan-in could cause some bottlenecks, but it will depend on how much work the orchestrator needs to do relative to your activity functions. Using sub-orchestrations would be one way to improve the throughput of my previous solution because then you have multiple threads processing return values concurrently. However, as you mentioned, if you don't need fan-in and don't need to know when the processing is complete, then offloading the processing to a queue would give you the fastest overall throughput.
Orchestrators can certainly process streams using the stateful singleton pattern if by "streams" you mean a series of discrete messages (e.g. names of files). However, I'm not sure whether we could make this scale the way you would want - I'd have to think about this further.
Well I need to know when processing ends, being able to process files as soon as extraction starts is a bonus though.