I introduced a StackOverFlow question earlier here. I'm currently trying to implement a batch reading from an Azure EventHub created with 32 partitions. More precisely, I'm trying to read a batch that includes all the events received from the last 60 minutes.
The problem is that, while using the consumer.receive_batch() method, it triggers a call to the on_event() callback method for batches within the same partition. For example, 6 events in partition 6 trigger a call while 7 events in partition 9 trigger another call. I'd like to have all the events from every partition triggering one call of on_event() callback method. Or, at least, being able to aggregate all the EventsData from every partition between starting_position and now into a single list that I can process. I'm currently using something like that to put the EventHubConsumerClient to listening mode :
self.consumer = EventHubConsumerClient.from_connection_string(
conn_str=conn_str,
consumer_group=consumer_group,
eventhub_name=eventhub_name
)
with self.consumer as consumer:
consumer.receive_batch(
on_event_batch=on_event_callback_method,
starting_position=timestamp_60_minutes_ago,
starting_position_inclusive=True,
max_batch_size=999999999999,
max_wait_time=60 # to let time to the receiver to gather all the messages
)
I'm currently thinking about N-threads (here N being 32) reading partitions from 1-32 then reducing in one unified list of events but not really sure how to proceed or even if I'm not diving into the rabbit hole. Would be happy to have your view on that ! Our current implementation relies on Databricks support for EventHub (which seems to be able to do it) but we'd like to take a bit of distance from it and use official SDKs.
Hey Dernat, thanks again for reaching out.
To quickly echo my initial response on SO and followup, I mentioned that you have a few ways to do aggregation depending on what semantics you care about, but it may be most effective to just have one consumer pushing to a unified Queue, which is then read from when appropriate. (as opposed to 32 consumers all aggregating)
I've put together the following example to briefly show the pattern I'm describing, don't hesitate to shout if any issues or clarification is needed or if this won't fit your constraints.
@KieranBrantnerMagee , thanks a lot for the gist! It definitely helped 馃憣
For posterity, I finally ended up using the Threading/Queue pattern a bit like follows :
from queue import Queue
import threading
from azure.eventhub import EventHubConsumerClient
class ReceiverClass:
def _callback_process_data(self, partition_context, events):
"""
Generic method used as a processing callback for all the events batches
captured. This follows the Transform => ML => Post downstream workflow.
"""
# Aggregate data into the aggregation Queue
self.events_aggregation_queue.put(events)
def receive_data(self):
# Initialize the consumer
self.consumer = EventHubConsumerClient.from_connection_string(
conn_str=self.config.connection_string,
consumer_group=self.config.consumer_group,
eventhub_name=self.config.eventhub_name
)
# Initialize the aggregation queue to gather all the EventData together (the callback take care of that)
self.events_aggregation_queue = Queue()
# Create a reception thread for each partition
for partition_id in self.consumer.get_partition_ids():
worker = threading.Thread(target=self.consumer.receive_batch,
kwargs={"on_event_batch": self._callback_process_data,
"starting_position": self.config.data_window.receiving_from_time,
"starting_position_inclusive": True,
"partition_id": partition_id})
worker.start()
# Aggregating for N seconds before stopping
time.sleep(self.config.aggregation_wait_time_in_seconds)
# Aggregate the data from the async Queue
events_data_nested = [self.events_aggregation_queue.get() for _ in range(self.events_aggregation_queue.qsize())]
events_data = [event_data for sublist in events_data_nested for event_data in sublist]
logger.info(f"Received a list of {len(events_data)} EventData...")
return events_data
This issue can be closed 馃憤
Glad to be of assistance! Don't hesitate to reach out if anything we can help with in the future.