Azure-sdk-for-python: [Event Hubs] event handler callback supports on_event_batch (list of events)

Created on 17 Dec 2019  路  9Comments  路  Source: Azure/azure-sdk-for-python

Is your feature request related to a problem? Please describe.

It was widely discussed whether the event handler should receive a batch of events or a single event.
Currently in 5.0.0 GA JS' event handler accepts a list of events while other three languages use singe event.
In Dec 2019, Elena reached out to 11 customers that used eventhubs preview. Receiving batch was one of the main wishes from customers. (Elena's email was sent on 12/16/2019 with title "Event Hubs SDK - customer outreach")

Describe the solution you'd like

Currently Python has EventHubConsumerClient.receive(on_event, **kwargs).
The new feature will add new method receive_batch(on_event_batch, **kwargs) into EventHubConsumerClient. It calls the user callback on_event_batch(partition_context, event_batch).
For flow control, the **kwargs will have three parameters - prefetch for link credit size, max_batch_size and max_wait_time.
max_batch_size is the expected size of the list of events within the max_wait_time.
max_wait_time determines the longest waiting time between two calls of user callback on_event_batch. receive_batch accumulates events up to max_batch_size and call on_event_batch within max_wait_time and call the callback with the list of events (could be an empty list).

A 0 max_wait_time means not to wait and call the callback function immediately with at most one fetch from the event hub. If the number of pre-fetched events is larger than the max_batch_size, there is no need to fetch. Otherwise fetch only once from the event hub and call the callback with up to max_batch_size events.

Usually on_event_batch should have a list with size max_batch_size. But it may have an empty list of events or fewer than max_batch_size list.

A user can call both receive and receive_batch from one EventHubConsumerClient.

API is like

class EventHubConsumerClient(ClientBase):

    ...

    async def receive_batch(
        self,
        on_event_batch: Callable[["PartitionContext", List["EventData"]], Awaitable[None]],
        max_batch_size: int, 
        max_wait_time: float,
        *,
        partition_id: Optional[str] = None,
        owner_level: Optional[int] = None,
        prefetch: int = 300,
        track_last_enqueued_event_properties: bool = False,
        starting_position: Optional[
            Union[str, int, datetime.datetime, Dict[str, Any]]
        ] = None,
        starting_position_inclusive: Union[bool, Dict[str, bool]] = False,
        on_error: Optional[
            Callable[["PartitionContext", Exception], Awaitable[None]]
        ] = None,
        on_partition_initialize: Optional[
            Callable[["PartitionContext"], Awaitable[None]]
        ] = None,
        on_partition_close: Optional[
            Callable[["PartitionContext", "CloseReason"], Awaitable[None]]
        ] = None
    ) -> None:

Sample code

# create the consumer client
consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group=CONSUMER_GROUP,
    eventhub_name=EVENTHUB_NAME,
)

# define the callback
async def on_event_batch(partition_context, event_batch):
    if len(event_batch) > 0:
        # do something
    else:  # event_batch size is 0
        # do something else 

# call the receive_batch
async with consumer_client:
    await consumer_client.receive_batch(on_event_batch, prefetch=300, max_batch_size=100, max_wait_time=3)

Describe alternatives you've considered

  1. Make `EventHubConsumerClient.receive(on_event, **kwargs) to have a "batch" mode. on_event can accept either an single event or a batch. But this will add complexity to specific users. A single user will use either receive_batch, or receive. Plus, max_batch_size will be meaningless for singe event callback.

Additional context
None

Client Event Hubs

Most helpful comment

Hey Richard - thanks a lot for the response!

If this was merely a convenience wrapper around doing a list.append() on your behalf would it still be useful?

Yeah, I think it would be. Without that sort of guidance it's not immediately clear what sort of structure my program should have to get that behavior. For instance - you mention a list.append(). Does this mean that the event receiver is intended to be wrapped up in a class? Would we have to keep track of the most recent event per partition? Seems like something that the library is intended to abstract away.

Something like this is what comes to mind - but I think it comes with a lot of problems.

class EventReceiver:

    def __init__(self):
        self.database_connection = None
        self.events = []
        self.last_partition_context = {}
        self.max_batch_size = 10

    def event_receive_callback(self, partition_context: PartitionContext, event):
        self.events.append(event)
        self.last_partition_context[partition_context.partition_number] = partition_context

        if len(self.events) % self.max_batch_size == 0:
            # commit events to database
            # update partition and then clear events for that partition??
            # what happens if there's a lag between max batch sizes?

    def receive(self):
        consumer = EventHubConsumerClient.from_connection_string(
            connection_str, consumer_group=consumer_group, checkpoint_store=checkpoint_store)

        with consumer:
            consumer.receive(on_event=self.event_receive_callback)

Whereas something like a consumer.receive_batch(on_events=batch_callback) makes a lot more sense (to me) and doesn't require a class in order to work. Not that I'm against classes - something like the C# implementation would be fine - it's just not how the current implementation seems to be set up.

def batch_callback(partition_context, events):
    for event in events:
        # process event

    # commit processed events
    partition_context.update(events[-1])

consumer.receive_batch(on_events=batch_callback)

The events[-1] doesn't feel too great, so maybe another param in the batch_callback function would be nice.

Since you were looking for this method what were you seeking beyond the typical "message at a time" model that we're currently offering?

I'm looking to not overwhelm the checkpoint store, and to have the option to batch database calls in a way that feels like I'm not reinventing the purpose of the library.

All 9 comments

Will performance be markedly different between the two methods? Or is this just for convenience?

Any concern that we might be overloading a term (batch) we're using in other places to refer to an actual class (EventDataBatch)? Or is the type of arg to on_event_batch actually a class, not just an array or list?

Will performance be markedly different between the two methods? Or is this just for convenience?

Not much difference on performance difference. Could be a little bit improvement. Mainly for convenience.

Any concern that we might be overloading a term (batch) we're using in other places to refer to an actual class (EventDataBatch)? Or is the type of arg to on_event_batch actually a class, not just an array or list?

This is a good point. The current design is a list of EventData instead of a different class.

Sorry to intrude here - I was just looking to see whether this feature existed.

Will performance be markedly different between the two methods? Or is this just for convenience?

Unless I'm missing something, performance would improve for users of the library. For instance, if I'm saving each event to a database, it would be faster for me to save them as a batch instead of individually, without having to do any acrobatics.

Plus, updating the partition state for every event is "unwise", according to the Microsoft Azure Event Hubs C# documentation, and that's pretty much what we're being encouraged to do when processing events individually.

It is critical to consider checkpointing in context - __it would be unwise to checkpoint every message. The storage account used for checkpointing probably would not handle this load__, but more importantly checkpointing every single event is indicative of a queued messaging pattern for which a Service Bus queue might be a better option than an event hub.

Thanks for working on this feature!

Sorry to intrude here - I was just looking to see whether this feature existed.

Not intruding at all - this is why we do our discussions in public :)

Will performance be markedly different between the two methods? Or is this just for convenience?

Unless I'm missing something, performance would improve for users of the library. For instance, if I'm saving each event to a database, it would be faster for me to save them as a batch instead of individually, without having to do any acrobatics.

You are 100% correct and that's a _strong_ argument in favor of this particular feature. The Azure JavaScript SDK (which I work on) already does this (delivers messages in batches) so I'm actually a strong proponent in favor. :)

Plus, updating the partition state for every event is "unwise", according to the Microsoft Azure Event Hubs C# documentation, and that's pretty much what we're being encouraged to do when processing events individually.

Yes! Now the questions for you are:

  • If this was merely a convenience wrapper around doing a list.append() on your behalf would it still be useful?
  • Since you were looking for this method what were you seeking beyond the typical "message at a time" model that we're currently offering?

Hey Richard - thanks a lot for the response!

If this was merely a convenience wrapper around doing a list.append() on your behalf would it still be useful?

Yeah, I think it would be. Without that sort of guidance it's not immediately clear what sort of structure my program should have to get that behavior. For instance - you mention a list.append(). Does this mean that the event receiver is intended to be wrapped up in a class? Would we have to keep track of the most recent event per partition? Seems like something that the library is intended to abstract away.

Something like this is what comes to mind - but I think it comes with a lot of problems.

class EventReceiver:

    def __init__(self):
        self.database_connection = None
        self.events = []
        self.last_partition_context = {}
        self.max_batch_size = 10

    def event_receive_callback(self, partition_context: PartitionContext, event):
        self.events.append(event)
        self.last_partition_context[partition_context.partition_number] = partition_context

        if len(self.events) % self.max_batch_size == 0:
            # commit events to database
            # update partition and then clear events for that partition??
            # what happens if there's a lag between max batch sizes?

    def receive(self):
        consumer = EventHubConsumerClient.from_connection_string(
            connection_str, consumer_group=consumer_group, checkpoint_store=checkpoint_store)

        with consumer:
            consumer.receive(on_event=self.event_receive_callback)

Whereas something like a consumer.receive_batch(on_events=batch_callback) makes a lot more sense (to me) and doesn't require a class in order to work. Not that I'm against classes - something like the C# implementation would be fine - it's just not how the current implementation seems to be set up.

def batch_callback(partition_context, events):
    for event in events:
        # process event

    # commit processed events
    partition_context.update(events[-1])

consumer.receive_batch(on_events=batch_callback)

The events[-1] doesn't feel too great, so maybe another param in the batch_callback function would be nice.

Since you were looking for this method what were you seeking beyond the typical "message at a time" model that we're currently offering?

I'm looking to not overwhelm the checkpoint store, and to have the option to batch database calls in a way that feels like I'm not reinventing the purpose of the library.

@kdazzle
Really appreciate you for this wonderful discussion!
For "The events[-1] doesn't feel too great, so maybe another param in the batch_callback function would be nice.", are you thinking of another param like "last_event" so it's easier to call update_checkpoint with this param? If yes, another option is to have update_checkpoint(event=None) so you just call update_checkpoint( ) and by default it works on the last event. Plus some users have the flexibility to call update_checkpoint(event), where event is any event from the list, if they do want to do so.

BTW, a relevant question is do you expect the on_event_batch(or batch_callback in your code snippet) to be called when max_wait_time reaches but no event is received from the eventhub?

Was this page helpful?
0 / 5 - 0 ratings