Confluent-kafka-dotnet: Transaction in Confluent.Kafka at Constumer

Created on 30 Jun 2019  路  4Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

I'm using Apache Kafka as a Message Processor and using Confluent.Kafka in Asp.Net Core as a consumer.

I want to consume messages and save in database, Apparently, I need to a transaction for commit or rollback message in Queue

How to reproduce

`using (var c = new ConsumerBuilder(conf).Build())
{
c.Subscribe("testtopic");

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token); // I NEED TRANSACTION HERE

                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }

`

Checklist

Please provide the following information:

  • [ ] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [x] Confluent.Kafka nuget version.
  • [x] Apache Kafka version.
  • [ ] Client configuration.
  • [ ] Operating system.
  • [ ] Provide logs (with "debug" : "..." as necessary in configuration).
  • [ ] Provide broker log excerpts.
  • [ ] Critical issue.
question

Most helpful comment

you can't transactionally write data to your database and commit offsets to kafka unless you also use your database to manage kafka offsets (which you could do).

however, if you want to 'upsert' into the database (or any idempotent operation), it's likely the best solution for you is to set EnableAutoCommit to false, then use Commit after you've successfully written the data to your external database. You might also consider setting EnableAutoOffsetStore to false and using StoreOffsets instead of Commit.

All 4 comments

you can't transactionally write data to your database and commit offsets to kafka unless you also use your database to manage kafka offsets (which you could do).

however, if you want to 'upsert' into the database (or any idempotent operation), it's likely the best solution for you is to set EnableAutoCommit to false, then use Commit after you've successfully written the data to your external database. You might also consider setting EnableAutoOffsetStore to false and using StoreOffsets instead of Commit.

@mhowlett Any idea when producer transactions will be available in the dotnet kafka lib ? Don't know whether it's appropriate or not to post this question here. Will remove and create a separate question if requested.

Thanks!

this was merged in v1.4.0

Was this page helpful?
0 / 5 - 0 ratings