Azure-functions-durable-extension: Question: Using Durable Function for Streaming job

Created on 25 Apr 2019  路  10Comments  路  Source: Azure/azure-functions-durable-extension

We need to ingest a large amount of messages coming from vehicles and find behavior patterns. Each message contains the vehicle identifier, and a values indicating position, speed and direction.

The objective is to detect given trajectories in the stream of messages. We need to detect when each vehicle has gone through a series of positions, with given speeds and directions.

We thought of the following solution:

  1. Fire an eternal orchestrator function per vehicle
  2. A client function ingest the messages from EventHub, and raise an event to the corresponding orchestrator function
  3. Each orchestrator function invokes an activity function that extracts the values and integrates the trajectory, and verify if it matches the expected behavior.
  4. If the expected behavior is found, send a notification to a system
  5. If not, return the current state to the orchestrator, that can save it until the next message arrives.

Is this the right way to solve the problem with durable functions ? Can this approach scale to very large amount of vehicles (~50,000), each sending 1 message a second ?

Thanks!

question

All 10 comments

@cgillum and @sebastianburckhardt: This sounds like a job for entities. 馃Ω

@philemon33 I'm by no means an authority on this but I do have some experience of running DF Orchestrations in a high-throughput scenario.....


50,000 events a second is a hell of a lot of events to be processing in DF in its own right. But before you go further, you might want to estimate the financial cost of ingesting 50,000 events a second into Event Hub? If I remember off the top of my head, EH can ingest max 1000 events a second per Throughput Unit, so you're gonna be paying for 50 TUs which isn't cheap if you're going to be operating at this scale more often than not. Apologies if you've already considered this!

But anyway, assuming that's all fine. I have a few questions...

  • What is your typical acceptable latency between the telemetry being pushed from the IoT device, and being processed by the Orchestration?
  • Can you afford a backlog of events during times of peak-load, what would be your peak latency?

I'll assume that you're objective is to 'react' within 5 minutes of that event coming from the source and you can tolerate some backlog during peak.

If possible, you could aggregate the telemetry using something like Stream Analytics and window functions. The ASA Job would output one Storage Queue Message/ Service Bus Message for each of your active Vehicles, every 4 minutes. You could even look at utilising the geospatial functions in ASA to do some of the heavy lifting - very cool! Anyway, this would drastically reduce the amount of events coming into your Durable Function.

In your Durable Function App, you would receive those Queue Messages, and raise them as Durable Function events into your awaiting Orchestration.

Now you can process the aggregation rather than each individual event. Obviously, this means loosing some resolution to your data, but if this is an acceptable trade-off you will reduce the necessary throughput from 50,000 events a second to ~208 a second if my maths is right, which is a massive saving.

At 208 events a second, I think you have a chance, but it all depends what that Business Logic is doing inside your Orchestration.

You could tweak the partition setting to get a greater throughput across control-queues (up to 16 max) if you're getting close to your desired throughput. Do some testing here and see if this helps.

If this doesn't work then you might want to look into partitioning your work across many independent Durable Function Apps, but this really is a nuclear option in many ways.


If you can't batch your incoming events and demand realtime processing of those events, well, this is a huge huge scenario, and my experience runs out at this point :)

I'd be amazed if you could achieve that level of throughput when you factor in the run-time Azure Storage churn.

To combat storage churn you're going to want to keep your 50,000 Orchestrations loaded in-memory to be able to have a chance at reacting at 1 second intervals using extended sessions.

Given that you can only have orchestrations spread across max 16 hosts, that means 3,125 Concurrent orchestrations per host. I feel this would definitely require some testing as the default is capped at just 10 max concurrent orchestrations per host.

Feels like its starting to get complex at this point, alternatively I'd start looking at a Service Fabric Native cluster on some real beefy VMs with an Actor per Vehicle, or something like that....

Hope this all helps.

@olitomlinson,

To your questions:
1) Most messages won't trigger the pattern detection, so no real pressure on the latency. However, when a pattern is detected, notification needs to happen real-time (let's say under 2s), so batching is not an option.
2) 50,000 is the very highest peak, operations will usually run with 3,000 vehicles. But traffic is expected to grow exponentially, so we will soon reach 10,000 vehicles, and within 3 years, we will hit 40 or 50,000. Given the real-time nature of the req as described above, the peak is the standard to align on, hence my initial figure of 50,000.

Cost is not an issue, currently consuming over 100 TUs. But from what you say, it appears DF won't scale anywhere near those numbers. I was thinking (wrongly) that with the right EH partitioning (maybe 50 partitions delivering 1000 messages per second each), and enough control queues (16), it would be sufficient. But I guess I over-estimated that by a lot.

@philemon33

So even at just 3,000 events a second, across 16 control queues, that鈥檚 188 events a second, per host. There is some old benchmarks in the docs somewhere that stated a single core host could handle approx 50 events a second, with extended sessions enabled. So your mileage may vary!

Off-topic but how are you going to offer near real-time Service Levels, when you are dealing with IoT in the field? Flaky 4G signal, vehicles going through tunnels etc are going to impact your ability to offer notifications with a 2s latency. I鈥檓 just curious as to your use-case as I work in TelCo so defining/honouring SLA/SLOs is a large part of my life and building real time systems at scale can be a super costly & challenging endeavour! If you can get the business to agree to less real-time expectations it will give you greater flexibility in your technology approach.

@kashimiz, what are entities ?

@olitomlinson

Just read this article, was wondering if there is any reason Durable Functions cannot get to that level of performance ?

Thanks!

@philemon33 I don't think you will have a problem ingesting the events _into_ the Azure Function itself from an Event Hubs -> Azure Function perspective.

The challenge is determining the throughput of raising events _inside_ durable functions using the Context.WaitForEventAsync & Context.RaiseEventAsync methods.

I visualise your code looking something like this?

[FunctionName("Vehicle_Telemetry_Loop")]
public static async Task Run([OrchestrationTrigger] DurableOrchestrationContext context)
{
    var vehicleId = context.InstanceId;
    var state = await context.GetInput<VehicleState>();
    var event = await context.WaitForEventAsync($"vehicle_telemetry_event_{vehicleId});
    var result = ProcessVehicleTelemetry(state, event);

    if (result.SomeConditionWasMet) 
    {
         await context.CallActivityAsync(result, "NotifyStakeHolders");
    }

    await context.ContinueAsNew(result.newState);
}

You need to do some testing to figure out your Durable Function is capable of handling 3,000 events a second. If it can't the control queues are going to build, and you will not be able to maintain your 2 second SLOs as the orechestrations will fall behind.

3000 / 16 hosts is still 188 events a second per host. My gut is telling me this is quite the challenge, due to how DF uses Storage Queues for inter-communication of events and for Orhcestration re-entrancy.

Extended Sessions would be something to look at, if you can't get the throughput you require from the default set up of DF, but this brings other challenges such as memory pressure.

Once you've found out how many vehicles you can reliably run on a single Durable Function App, (as I mentioned in my previous comment), you _could_ partition your vehicles into completely independent & deploy-able Function Apps, as long as you are okay with the management overhead of course.

@cgillum am I a million miles off with my analysis here?

@olitomlinson

This makes sense. For my understanding, is state in orchestration function shared across hosts ?

@philemon33

In principle, no, but it can move to other hosts.

Given you set a partitionCount of 16, and your app isn't doing much traffic, 1 host will be provisioned and the host will acquire a lock for all the 16 partitions. As your app receives more traffic, and the Host scales out, the leases are eventually renewed and rebalanced among the Hosts. Best case scenario being that 1 host is only leasing one of the 16 partitions.

The InstanceId that you assign to your Orchestration is _hashed_, the framework then deterministic(ly) places all events for that InstanceId into 1 of the 16 control queues. Each control queue is leased by a host so that you can guarantee that the event will be picked up by the correct host (where your orchestration is running)


Thinking about your situation some more... you might have to do some testing with continuously raising events with the same Event ID. For example if you did use code like :

var event = await context.WaitForEventAsync($"vehicle_telemetry_event_{vehicleId});

This would be expecting an Event Id like vehicle_telemetry_event_1f3558f8-71fc-4657-bf5c-bc4ee2661a30

Given a scenario where _many_ events were backlogged for a particular vehicleId, I'm not sure if the runtime would maintain a queue of all the events and dequeue them one by one as the eternal orchestration loops, or would the runtime process 1 event, then recognise that all the others that are queued have the same event name and then drop the messages? Definitely one for @cgillum to clarify.

I would suggest trying durable entities for streaming. When you repeatedly send a signal to entities (signals are effectively one-way messages) it gives you stream-like behavior. For instance,

  • operations delivered to entities are processed in batches.
  • after batches are processed, they are cleared from storage (so you don't have ever-growing histories)
  • operations sent from orchestrations to entities, or from entities to entities, are processed in order and effectively-once.
Was this page helpful?
0 / 5 - 0 ratings