I'm using Apache Kafka as a Message Processor and using Confluent.Kafka in Asp.Net Core as a consumer.
I want to consume messages and save in database, Apparently, I need to a transaction for commit or rollback message in Queue
`using (var c = new ConsumerBuilder
{
c.Subscribe("testtopic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token); // I NEED TRANSACTION HERE
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
`
Please provide the following information:
you can't transactionally write data to your database and commit offsets to kafka unless you also use your database to manage kafka offsets (which you could do).
however, if you want to 'upsert' into the database (or any idempotent operation), it's likely the best solution for you is to set EnableAutoCommit to false, then use Commit after you've successfully written the data to your external database. You might also consider setting EnableAutoOffsetStore to false and using StoreOffsets instead of Commit.
@mhowlett Any idea when producer transactions will be available in the dotnet kafka lib ? Don't know whether it's appropriate or not to post this question here. Will remove and create a separate question if requested.
Thanks!
producer transaction now in preview: https://github.com/confluentinc/confluent-kafka-dotnet/pull/1147
this was merged in v1.4.0
Most helpful comment
you can't transactionally write data to your database and commit offsets to kafka unless you also use your database to manage kafka offsets (which you could do).
however, if you want to 'upsert' into the database (or any idempotent operation), it's likely the best solution for you is to set
EnableAutoCommittofalse, then useCommitafter you've successfully written the data to your external database. You might also consider settingEnableAutoOffsetStoretofalseand usingStoreOffsetsinstead ofCommit.