Confluent-kafka-dotnet: Expose Seek() functionality in the API

Created on 11 Aug 2017  路  9Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

Currently, using CommitAsync to rewind and replay a particular partition within a given assignment does not work as the broker does not send the messages again, particularly if the commit is within the low watermark or if it is the previous committed offset.

Would it be possible to expose the seeking API to allow re-positioning of the stream ?

Checklist

Please provide the following information:

  • [x] Confluent.Kafka nuget version: 0.11.0
  • [x] Apache Kafka version: 0.11
  • [x] Client configuration:
    { "bootstrap.servers", },
    { "api.version.request", true },
    { "max.in.flight.requests.per.connection", 1 },
    { "enable.auto.commit", false },
    { "enable.auto.offset.store", false },
    { "max.partition.fetch.bytes", 512 },
    { "session.timeout.ms", 60000 },
    { "heartbeat.interval.ms", 5000 }
    }
  • [x] Operating system: Windows Server 2012
  • [ ] Provide logs (with "debug" : "..." as necessary in configuration)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue
enhancement question

Most helpful comment

Ok, I've had a chance to read this properly now :-) yes, committing won't rewind the consumer. You could re-assign the partition, but yes this is hardly optimal ... Consumer.Seek will be very easy to add (it's exposed by librdkafka) - i'll try to do this soon.

All 9 comments

Is the CommitAsync behavior something new you have noticed in moving from 0.9.5 to 0.11.0?

Hi mhowlett,

Unfortunately my tests prior to switching to 0.11.0 did not cover replay. I have no way to know except downgrade the framework. Should I try that ?

Best Regards,
Jonathan

It might be worth swapping out the librdkafka.dll binary with the one from the 0.9.5 librdkafka.redist nuget package. Sorry I haven't had a chance to look into this in detail but it could be related to another issue I think we have, and if so this might fix it.

Thanks for the feedback - this is important, I'll get to it.

No problem, I'll try to test it tomorrow or Friday and post back the results.

Apparently the behavior is the same with the 0.9.5 dll.

I'm thinking this has to do with how I use the client. I provision messages into a ConcurrentQueue and dequeue them for treatment. If at one point I get an exception in a Queue treatment I rollback a DB transaction. By this time the messages are out of the Queue and GC-ed. I then seek to the previously committed offset to retrieve them again. It is not a "replay" per se since this is the same session. If I stop my program and start it again I receive the messages again.

Would committing one offset before the start of the current session work ? This is workable but hardly optimal since new partitions could get stuck the same way.

Any other options for this use-case ?

Ok, I've had a chance to read this properly now :-) yes, committing won't rewind the consumer. You could re-assign the partition, but yes this is hardly optimal ... Consumer.Seek will be very easy to add (it's exposed by librdkafka) - i'll try to do this soon.

Hi @mhowlett .
Has there been any progress on adding Consumer.Seek as this would greatly ease my use cases as well. Please 馃憤

I did the implementation but didn't complete testing due to bug in librdkafka. However you probably don't care about that. I opened a WIP PR so you can get a nuget package with Consumer.Seek in it. #315. See README.md for instructions on how to get this.

Thank you mhowlett, the tests on our end are green and we are moving to production with ci-48, Seek() works like a charm.

Was this page helpful?
0 / 5 - 0 ratings