Confluent-kafka-dotnet: Partition assignment and offset semantic

Created on 3 May 2018  路  10Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

I am trying to understand how to structure my program in regard to offsets management.

  1. First thing to notice is that current implementation of confluent kafka client is using librdkafka function rd_kafka_consume0, which save offset first and then return message to the consumer:
    https://github.com/edenhill/librdkafka/blob/72b4e4169d7e1e2229595083bf31149fa01277cb/src/rdkafka.c#L2053
    As result, offset is saved before processing is complete, which means that message loss is possible.
    Is it correct conclusion that "enable.auto.commit" can cause lost messages?

  2. My message handling is asynchronous, so I have to commit offsets manually. If I turn "enable.auto.commit" off, and will call StoreOffsets from async handler and will call Commit() from a timer, then I would replicate autocommit in my async handler and will achieve "at least once" guarantee?

My question though is, are StoreOffset and Commit() thread safe? There is no notion about tread safety of those functions in documentation.

  1. I am having hard times understanding concept of Consumer.OnPartitionsAssigned. Documentation says "You should typically call the Consumer.Assign", but why? In English this API reads as "Upon partition asiignment you need to assign partitions" which is nonsense. If partitions have been assigned, why do I have to assign them? My only possible explanation is that OnPartitionsAssigned is in fact "OnOffsetDiscovered" and driver would not assign discovered partitions automatically.

Bottom line is, i do not understand what is going on, nor reasoning behind this API.

Checklist

Please provide the following information:

  • [x] Confluent.Kafka nuget version: 0.11.4
question

Most helpful comment

To add to what Matt said:
You can use StoreOffsets in conjunction with enable.auto.commit=true where you set enable.auto.offset.store=false which disables the offset store you pointed out in the librdkafka source (before passing message to app), allowing you to decide yourself when the message offset should be stored (locally in memory) for future commit (auto or manual). This way your last manually stored offset for each partition will be automatically committed by the consumer using the existing auto.commit.interval.ms timer.

OnPartitionsAssigned is triggered on group rebalance when your consumer has received a new partition assignment from the group leader, but prior to the consumer fetching committed offsets for those partitions - this allows the application to perform its own offset selection or storage logic. If you're happy to use the default behaviour of using the committed offsets you don't need to set an OnPartitionsAssigned delegete. But if you do you must call Assign() to let the consumer know you've handled the assignment, allowing you to set a manual starting offset for each partition.

All 10 comments

yeah, this is a bit confusing and needs better documentation.

  1. that's right.
  2. yes - you can turn enable.auto.commit off, use StoreOffsets after you've handled a message and use Commit from a timer to achieve at least once guarantees.
    2b. yes - those operations are thread safe.
  3. you are right - there's the notion of being assigned a set of partitions in the consumer group, and assigning to a set of partitions to actually read from. the same terminology is used for both. usually they will be the same, but this is not enforced/required.

it does in fact sound like you're got it pretty well figured out :-)

To add to what Matt said:
You can use StoreOffsets in conjunction with enable.auto.commit=true where you set enable.auto.offset.store=false which disables the offset store you pointed out in the librdkafka source (before passing message to app), allowing you to decide yourself when the message offset should be stored (locally in memory) for future commit (auto or manual). This way your last manually stored offset for each partition will be automatically committed by the consumer using the existing auto.commit.interval.ms timer.

OnPartitionsAssigned is triggered on group rebalance when your consumer has received a new partition assignment from the group leader, but prior to the consumer fetching committed offsets for those partitions - this allows the application to perform its own offset selection or storage logic. If you're happy to use the default behaviour of using the committed offsets you don't need to set an OnPartitionsAssigned delegete. But if you do you must call Assign() to let the consumer know you've handled the assignment, allowing you to set a manual starting offset for each partition.

I've made experiment with (f# code)

      |> Config.config "auto.commit.interval.ms" "10000"
      |> Config.config "enable.auto.commit" "true"
      |> Config.config "enable.auto.offset.store" "false"

and storing offsets after async handler complete consumer.StoreOffsets([| maxOffset |]) and it works as intended. Closing the issue.

Is enable.auto.commit=true and enable.auto.offset.store=false really working for asynchronous processing when Consumer.StoreOffset(Message message) can be called out-of-order? Is there a magic in auto commit that knows what is the maximum offset that can be committed or you have to figure it out on your own? That is, largest offset from the last one committed without any gaps.
Pardon my skepticism as I have seen manual offset management at work with Kafka spout in Apache Storm. Way too complex and buggy! :-)

The last offset to be stored will be the one that is committed - Kafka has no in-built mechanism for handling per-message acks.

That was my concern. In such case, the last offset to be stored has to be the largest contiguous processed offset and it is up to the application to ensure this if there is no magic in confluent-kafka-dotnet library,right? I don't see an easy way to handle this when processing messages async short of inefficient approach of processing N messages at time and waiting for all of them to successfully complete.
If I want to have a pool of workers that can continuously process messages then this becomes a daunting task when I look at Kafka spout implementation codebase.
Say application is processing 10 messages async and the auto commit interval kicks with messages with the following offsets completed processing in the following order: 9,8,7,4,5,6,1,2 and 3,10 have not completed yet. Ideally offset 2 should be committed but then there are a lot of other considerations with handling 3 and 10 and processing new messages.

yes, you'd need some sort of ordered list/map thing to keep track of each message individually.

in some scenarios, it may work to increase the number of partitions (and have one consumer per partition).

this is one thing traditional messaging systems give you that Kafka doesn't. but traditional messaging systems are also much slower of course.

Maybe set it up so as to process messages in batches allow a new batch to start up before a previous one has completed. Doing the commit offset bookkeeping on the scale of a batch would make it more efficient (i've not done this at all, just thinking out loud).

Asynchronous does not necessary mean out of order.

Asynchronous does not necessary mean out of order.

I meant completion order of processing offsets in asynchronous processing

Was this page helpful?
0 / 5 - 0 ratings

Related issues

nitinpi picture nitinpi  路  4Comments

andreas-soroko picture andreas-soroko  路  3Comments

mohoch1 picture mohoch1  路  3Comments

ThomasHjorslevFcn picture ThomasHjorslevFcn  路  3Comments

michael-huxtable picture michael-huxtable  路  4Comments