Azure-sdk-for-js: [Event Hubs] Out of date checkpoint errors

Created on 21 Mar 2019  路  11Comments  路  Source: Azure/azure-sdk-for-js

Node: v10.15.1
@azure/event-processor-host: 1.0.6

Every so often I am seeing Ignoring out of date checkpoint with offset: 'n', sequenceNumber: n because currently persisted checkpoint has higher offset 'n', sequenceNumber n. I only have a single listener storing the checkpoint in a storage container only accessible to this listener.

I'm not sure how the checkpoint would be out of date when nothing else is writing to it. A checkpoint is written after each time an event is received instead of batching (maybe this is the issue?) and I can expect around a batch of 50-100 events every minute.

Is this an "error" that I could ignore? It seems to function fine but getting constant logs with this and not sure if there is something I should be doing to mitigate the errors.

Client Event Hubs question

Most helpful comment

Thanks a lot for the sample code @ShivangiReja, it works as expected with no errors. My implementation is the same but I still see the errors come through. Since the sample works fine, I can only assume something on my end and will try and investigate it further to see if there are specific conditions that is causing mine to throw. I'll close this issue for now and open it when and if I find out more. Appreciate all the support!

All 11 comments

Thanks for reporting @j0h, we will look into what would cause this error to show up and will let you know if we have any questions to understand your scenario better.

Hi @ShivangiReja, were you able to investigate this at all?

I apologize for the delay.I haven't got time for this but I'll pick this up early next week and get back to you if I have any questions.

@j0h This is an expected error and we throw this error when the sequence number of current checkpoint is less than or equal to the sequence number of stored checkpoint.

Thanks for looking into this @ShivangiReja. Do you think that checkpointing after each event could cause a race condition because I'm currently seeing this error a lot? Majority of the time, the sequence and offset number is equal. I'm wondering whether to checkpoint every 2nd+ event.

Correct, you are seeing this error more often because you are checkpointing every single event. I would suggest to checkpoint every 3rd event or for the safe side checkpoint every 5th event.
Let me know if it works.

@ShivangiReja I have been running it every 5th event and noticed there were less errors but some still occurred with the same offset and checkpoint number. I'd assume that if I were to receive 100 events in a burst, this error would still throw. So basically, the checkout threshold would have to be dynamic or we would have to do a manual/external check on the checkpoint values before committing?

@j0h await context.checkpoint(); has more chances of throwing an error because it uses聽the sequence number of the latest message received by the sdk (which is not necessarily the message you want to checkpoint).

We suggest that you use await context.checkpointFromEventData(eventData) instead. This will use the sequence number of the message that you want to checkpoint, because you get to pass the message as a parameter.

We will update our samples and docs to reflect the same

Thanks for the suggestion @ShivangiReja. I have tried a few things. When using context.checkpointFromEventData, it would still throw with the persisted offset off by 1. Using that with batch checking on a count of 10 would also throw with persisted offset off by 2+. I resorted back to using context.checkpoint and have been increasing the batch count, up to 20 at the moment but it still throws from time to time.

Would checkpointFromEventData still work as you mentioned with hubs that have multiple partitions? Not sure if this is a common issue or I'm facing some isolated case on my end as I've not seen it being mentioned.

@j0h Yes, await context.checkpointFromEventData(eventData) method would work even you have multiple partitions.

I'm adding a sample code(checkpointing every single event). This sample works fine for me without any errors. Can you try using this sample?

Description about my Event hub:
Number of partitions : 2
Number of messages: 300 in both the partitions (total 600 messages)
Consumer group: $Default

Sample Code:

import {
  EventProcessorHost, OnReceivedError, OnReceivedMessage, EventData, PartitionContext, delay
} from "@azure/event-processor-host";
import dotenv from "dotenv";
dotenv.config();

const path = process.env.EVENTHUB_NAME;
const storageCS = process.env.STORAGE_CONNECTION_STRING;
const ehCS = process.env.EVENTHUB_CONNECTION_STRING;
// creates a unique storageContainer name for every run
// if you wish to keep the name same between different runs then use the following then that is fine as well.
const storageContainerName = EventProcessorHost.createHostName("test-container");
const ephName = "my-eph";

/**
 * The main function that executes the sample.
 */
async function main(): Promise<void> {
  // Please feel free to use the `./sendBatch.ts` sample to send messages to an EventHub.
  // Post that you can run this sample to start the EPH and see it in action.
  // 1. Start eph.
  const eph = await startEph(ephName);
  // 2. Sleeeping for 180 seconds. This will give time for eph to receive messages.
  await sleep(180);
  // 3. After 180 seconds stop eph.
  await stopEph(eph);
}

// calling the main().
main().catch((err) => {
  console.log("Exiting from main() due to an error: %O.", err);
});

/**
 * Sleeps for the given number of seconds.
 * @param timeInSeconds Time to sleep in seconds.
 */
async function sleep(timeInSeconds: number): Promise<void> {
  console.log(">>>>>> Sleeping for %d seconds..", timeInSeconds);
  await delay(timeInSeconds * 1000);
}

/**
 * Creates an EPH with the given name and starts the EPH.
 * @param ephName The name of the EPH.
 * @returns {Promise<EventProcessorHost>} Promise<EventProcessorHost>
 */
async function startEph(ephName: string): Promise<EventProcessorHost> {
  // Create the Event Processo Host
  const eph = EventProcessorHost.createFromConnectionString(
    EventProcessorHost.createHostName(ephName),
    storageCS!,
    storageContainerName,
    ehCS!,
    {
      eventHubPath: path,
      onEphError: (error) => {
        console.log(">>>>>>> [%s] Error: %O", ephName, error);
      }
    }
  );
  // Message handler
  const partionCount: { [x: string]: number } = {};
  const onMessage: OnReceivedMessage = async (context: PartitionContext, eventData: EventData) => {
    (!partionCount[context.partitionId])
      ? partionCount[context.partitionId] = 1
      : partionCount[context.partitionId]++;
      const num = partionCount[context.partitionId];
     try {
        console.log("$$$$ Attempting to checkpoint message number %d sequenceNumber %d partitions number: %O", num, eventData.sequenceNumber, eph.receivingFromPartitions.length);
        await context.checkpointFromEventData(eventData);
        console.log("$$$$ [%s] Successfully checkpointed message number %d", ephName, num);
      } catch (err) {
        console.log(">>>>> [%s] An error occurred while checkpointing msg number %d: %O", ephName, num, err);
      }
  };
  // Error handler
  const onError: OnReceivedError = (error) => {
    console.log(">>>>> [%s] Received Error: %O", ephName, error);
  };
  console.log(">>>>>> Starting the EPH - %s", ephName);
  await eph.start(onMessage, onError);
  return eph;
}

/**
 * Stops the given EventProcessorHost.
 * @param eph The event processor host.
 * @returns {Promise<void>} Promise<void>
 */
async function stopEph(eph: EventProcessorHost): Promise<void> {
  console.log(">>>>>> Stopping the EPH - '%s'.", eph.hostName);
  await eph.stop();
  console.log(">>>>>> Successfully stopped the EPH - '%s'.", eph.hostName);
}

Thanks a lot for the sample code @ShivangiReja, it works as expected with no errors. My implementation is the same but I still see the errors come through. Since the sample works fine, I can only assume something on my end and will try and investigate it further to see if there are specific conditions that is causing mine to throw. I'll close this issue for now and open it when and if I find out more. Appreciate all the support!

Was this page helpful?
0 / 5 - 0 ratings