Azure-sdk-for-java: [BUG] EventHub Consumer stops consuming messages until we restart

Created on 10 Dec 2020  路  18Comments  路  Source: Azure/azure-sdk-for-java

Describe the bug
EventHub consumer stops consuming messages until we restart the consumer. Have deployed my consumer in Azure Kubernetes (AKS). Initially, it consumes a few messages, all of a sudden it stops consuming. If we restart the consumer, it works like charm. Until we restart it, all the messages sit in EH. Even it is 2-3 days, none of the messages is consumed.

Exception or Stack Trace
This is the consumer log just captured before restarting:
consumerlogmessages.txt
Picked up this stack trace from log:

2020-12-03 14:56:02.126 [ERROR] [Loggers$Slf4JLogger:319] Scheduler worker in group main failed with an uncaught exception java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 60000ms in 'takeUntil' (and no fallback has been configured)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:289)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:274)
    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:396)

To Reproduce
This has happened very often.

Code Snippet

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.roo.connect.soa.config.IEventHubReceiverConfig;
import com.roo.connect.soa.log.ServiceLogger;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class IEventHubReceiver implements Runnable {

    private EventProcessorClient eventProcessorClient;
    private String consumerGroupName="";
    private static final String IEVENTHUBRECIEVER = "IEventHubReceiver : ";

    @Autowired
    private ServiceLogger log;

    public IEventHubReceiver(IEventHubReceiverConfig config){

        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
                .connectionString(config.getStorageConnectionString())
                .containerName(config.getStorageContainerName())
                .buildAsyncClient();

        eventProcessorClient = new EventProcessorClientBuilder()
                .consumerGroup(config.getConsumerGroup())
                .connectionString(config.getConnectionString())
                .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
                .processEvent(eventContext -> {
                    EventData eventData = eventContext.getEventData();
                    String receivedData = eventData.getBodyAsString();
                    consumerGroupName = config.getConsumerGroup();
                    log.info("IEventHubReceiver - received from : " + consumerGroupName + " : receivedData : " + receivedData);
                    onEvents(eventData);
                    if(isAlive()) {
                        //if onEvent is success, context updates the 'checkpoint'
                        eventContext.updateCheckpoint();
                    }
                })
                .processError(errorContext -> {
                    log.error(IEVENTHUBRECIEVER + consumerGroupName + " - Error occurred while processing events :: " + errorContext.getThrowable().getMessage());
                })
                .buildEventProcessorClient();
    }

    //This method is implemented in child classes with the logic to process the eventData
    public abstract void onEvents(EventData eventData);

    public void stop() {

        eventProcessorClient.stop();
        log.info(IEVENTHUBRECIEVER + consumerGroupName + " - Client stopped...");
    }

    public boolean isAlive() {

        boolean isAlive = eventProcessorClient.isRunning();
        log.info(IEVENTHUBRECIEVER + consumerGroupName + " - isAlive:: " + isAlive);
        return isAlive;
    }

    @Override
    public void run() {

        // This will start the processor. It will start processing events from all partitions.
        eventProcessorClient.start();
        log.info(IEVENTHUBRECIEVER + consumerGroupName + " - Client started...");
    }
}

Expected behavior
I could see a few connection exception errors in logs, but I am wondering why it stopped consuming messages forever. Immediately after restarting messages started consuming.

Setup (please complete the following information):

  • OS: Ubuntu (AKS)
  • IDE : IntelliJ
  • Version of the Library used
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
      <version>1.3.1</version>
    </dependency>

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • [x] Bug Description Added
  • [ ] Repro Steps Added
  • [x] Setup information Added
Client Event Hubs customer-reported question

Most helpful comment

Hey @jyyy-57 , the PR associated with this issue, https://github.com/Azure/azure-sdk-for-java/pull/19924, we are currently stress testing for the next three days. We plan to merge it on Monday if all goes well.

All 18 comments

We met with the same problem. We even set the RetryOptions to make it keep trying when error happens. While our log shows the same exception periodically throws out and for several days no message was received. When we restart the application, the message can be consumed normally again.

Could anyone throw some light here, please?

In 5.3.0, we added a watchdog functionality that would check to see if connection is alive. If it is not, we鈥檒l return the partition to the pool. That way, another processor can reclaim it and begin processing again.

https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md#530-2020-10-12

Thanks for your information.

we are already using 5.3.1 version as shown below, still, we face this issue.

    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
      <version>1.3.1</version>
    </dependency>

We've another customer who has been able to repro the same issue with latest version 5.5.0. They could repro the issue by disconnecting network for couple of mins and then connecting back.
We see SDK is able to take ownership of partition back, however keeps on failing with TimeoutException until application is restarted,

2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [INFO] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Starting next iteration of load balancer
2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [INFO] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Number of ownership records 10, number of partitions 10
2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [INFO] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Number of active ownership records 10
2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [DEBUG] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Current partition distribution 570672db-f55a-4bed-ad0b-f50d447a02e0=[0,1,2,3,4,5,6,7,8,9]
2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [INFO] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Number of active event processors 1
2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [INFO] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Expected min partitions per event processor = 10, expected number of event processors with additional partition = 0
2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [INFO] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Load is balanced with this event processor owning 10 partitions
****```
2021-02-16 18:56:49.233 [single-1] [INFO] com.azure.core.amqp.implementation.ReactorExecutor - Unable to acquire dispose reactor semaphore within timeout.
2021-02-16 18:56:49.233 [single-1] [DEBUG] com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor - Attempted 1 times to get a new AMQP connection
2021-02-16 18:56:49.233 [single-1] [WARN] com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor - Retry #1. Transient error occurred. Retrying after 800 ms.
Operation timed out, errorContext[NAMESPACE: s00199-enscdx-pd-us-account-na1.servicebus.windows.net]
com.azure.core.amqp.exception.AmqpException: Operation timed out, errorContext[NAMESPACE: xxxx-pd-us-account-na1.servicebus.windows.net]
```****

Related to #15976, #17568

@conniey Exact same issue seen in azure-messaging-eventhubs dependency version - 5.3.1 Can we expect a fix on this shortly or any temporary workaround that you can suggest?

It's not a very satisfactory workaround, but we track the time when we last received messages on each partition and trigger a restart using a liveness probe if too much time has elapsed since the last message. In Event Hubs with low message volumes this does cause a lot of unnecessary restarts, but this is far better than losing messages altogether.

So this isn't lost in the fray, we dug some more into this and the existing Track 1 library and noticed that our third party dependency doesn't always propagate to its children that the underlying transport is closed. It's possible our consumers believe they are still alive, even though the connection is not. I'm looking into a fix where we can propagate the connection error to all its children (receiver links) and close them.

@conniey From the perspective of a consumer client application that continuously listens to the eventhub to process any incoming message, this is a super critical issue. Any message that is not able to be picked by the consumer client is basically a kind of data loss and processing resumes only after we do an application restart which we can't afford to do in production environment. Can we expect a fix for this issue anytime soon as we have a production release in week's time and this issue is a blocker for us?

@gandhirajan

Any message that is not able to be picked by the consumer client is basically a kind of data loss and processing resumes only after we do an application restart which we can't afford to do in production environment.

Can you explain this a little more? Event Hubs doesn't remove any events from the stream. Events leave the stream when your "Message Retention" policy has elapsed for your Event Hub. That's one of the reasons we have a durable store for checkpointing, so if your application restarts, you'll understand where in the stream you last processed a message from the hub.

@conniey Hi, Thanks for the response. I do agree that the message is retained in event hub till the retention period is elapsed. But in case of near real time processing of events from eventhub, if the consumer client stops processing incoming messages till application restart then it's actually a kind of data loss for us.

In our use case, we are routing all the incoming messages to the IoT hubs as events to an external event hub. Also we have a spring boot consumer client application that listens to the external event hub and processes the incoming events to the eventhub near real time. The processing of incoming events is time bound as it has IoT device info at that point in time and actually it's obsolete to process these messages at later point in time. So in our use case, we can't afford to stop processing incoming messages to eventhub abruptly. Can we expect a SDK fix for this anytime soon?

@conniey Hi, I could see a merged PR - https://github.com/Azure/azure-sdk-for-java/pull/19585 related to this issue. Can we expect a fix for this issue in the next version?

So this isn't lost in the fray, we dug some more into this and the existing Track 1 library and noticed that our third party dependency doesn't always propagate to its children that the underlying transport is closed. It's possible our consumers believe they are still alive, even though the connection is not. I'm looking into a fix where we can propagate the connection error to all its children (receiver links) and close them.

@conniey If the connection is not alive but the consumers still hold it, will the server detect it and reassign the partition to other consumers?

Hey @conniey, could you please share updates on this issue? I believe we're facing a similar issue. We're using BlobContainerAsyncClient and EventProcessorClientBuilder. We have iot hub built-in endpoint. We need to keep eventProcessorClient open to receive the message in the built-in endpoint., but we notice that load balancing is done regularly, but until I restart the service it is not picking up new events. But it's hard to reproduce and debug, as it is intermittent and usually happens 10-14 days after we restart the service/client. Also, the log always shows "Load balancing completed successfully", even when it cannot pick any new messages.
the version:

                <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-eventhubs</artifactId>
            <version>5.1.1</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
            <version>1.1.1</version>
        </dependency>

Hey @jyyy-57 , the PR associated with this issue, https://github.com/Azure/azure-sdk-for-java/pull/19924, we are currently stress testing for the next three days. We plan to merge it on Monday if all goes well.

@conniey Hi, will the fix take care of automatically reestablishing the connection to the partition returned to the pool, or do the application need to implement any additional logic on top of this fix?

@conniey Hi, will the fix take care of automatically reestablishing the connection to the partition returned to the pool, or do the application need to implement any additional logic on top of this fix?

Yes. The partition load balancer periodically checks for ownership and it'll notice that this partition is unclaimed, then will re claim it automatically.

Was this page helpful?
0 / 5 - 0 ratings