Azure-sdk-for-python: azure-eventhub : Aggregate reception from multiple partitions into a single list

Created on 29 Sep 2020  路  4Comments  路  Source: Azure/azure-sdk-for-python

  • Package Name: azure-eventhub
  • Package Version: 5.2.0
  • Operating System: Windows 10
  • Python Version: 3.7

I introduced a StackOverFlow question earlier here. I'm currently trying to implement a batch reading from an Azure EventHub created with 32 partitions. More precisely, I'm trying to read a batch that includes all the events received from the last 60 minutes.

The problem is that, while using the consumer.receive_batch() method, it triggers a call to the on_event() callback method for batches within the same partition. For example, 6 events in partition 6 trigger a call while 7 events in partition 9 trigger another call. I'd like to have all the events from every partition triggering one call of on_event() callback method. Or, at least, being able to aggregate all the EventsData from every partition between starting_position and now into a single list that I can process. I'm currently using something like that to put the EventHubConsumerClient to listening mode :

self.consumer = EventHubConsumerClient.from_connection_string(
            conn_str=conn_str,
            consumer_group=consumer_group,
            eventhub_name=eventhub_name
        )

with self.consumer as consumer:
     consumer.receive_batch(
         on_event_batch=on_event_callback_method,
         starting_position=timestamp_60_minutes_ago,
         starting_position_inclusive=True,
         max_batch_size=999999999999,
         max_wait_time=60 # to let time to the receiver to gather all the messages
         )

I'm currently thinking about N-threads (here N being 32) reading partitions from 1-32 then reducing in one unified list of events but not really sure how to proceed or even if I'm not diving into the rabbit hole. Would be happy to have your view on that ! Our current implementation relies on Databricks support for EventHub (which seems to be able to do it) but we'd like to take a bit of distance from it and use official SDKs.

Client Event Hubs customer-reported needs-team-attention question

All 4 comments

Hey Dernat, thanks again for reaching out.

To quickly echo my initial response on SO and followup, I mentioned that you have a few ways to do aggregation depending on what semantics you care about, but it may be most effective to just have one consumer pushing to a unified Queue, which is then read from when appropriate. (as opposed to 32 consumers all aggregating)

I've put together the following example to briefly show the pattern I'm describing, don't hesitate to shout if any issues or clarification is needed or if this won't fit your constraints.

@KieranBrantnerMagee , thanks a lot for the gist! It definitely helped 馃憣
For posterity, I finally ended up using the Threading/Queue pattern a bit like follows :

from queue import Queue
import threading

from azure.eventhub import EventHubConsumerClient


class ReceiverClass:

    def _callback_process_data(self, partition_context, events):
        """
        Generic method used as a processing callback for all the events batches
        captured. This follows the Transform => ML => Post downstream workflow.
        """
        # Aggregate data into the aggregation Queue
        self.events_aggregation_queue.put(events)

    def receive_data(self):

        # Initialize the consumer
        self.consumer = EventHubConsumerClient.from_connection_string(
                    conn_str=self.config.connection_string,
                    consumer_group=self.config.consumer_group,
                    eventhub_name=self.config.eventhub_name
                )

        # Initialize the aggregation queue to gather all the EventData together (the callback take care of that)
        self.events_aggregation_queue = Queue()

        # Create a reception thread for each partition
        for partition_id in self.consumer.get_partition_ids():
            worker = threading.Thread(target=self.consumer.receive_batch,
                kwargs={"on_event_batch": self._callback_process_data,
                "starting_position": self.config.data_window.receiving_from_time,
                "starting_position_inclusive": True,
                "partition_id": partition_id})
            worker.start()

        # Aggregating for N seconds before stopping
        time.sleep(self.config.aggregation_wait_time_in_seconds)

        # Aggregate the data from the async Queue
        events_data_nested = [self.events_aggregation_queue.get() for _ in range(self.events_aggregation_queue.qsize())]
        events_data = [event_data for sublist in events_data_nested for event_data in sublist]
        logger.info(f"Received a list of {len(events_data)} EventData...")

        return events_data

This issue can be closed 馃憤

Glad to be of assistance! Don't hesitate to reach out if anything we can help with in the future.

Was this page helpful?
0 / 5 - 0 ratings