Consumer subscribed to multiple topics only fetches message to a single topic.
Confluent.Kafka nuget version - 1.0.0
Kafka type - Kafka enabled Azure event hub
Following is the consumer code -
```csharp
public static void Run_Consume(string brokerList, List
{
var config = new ConsumerConfig
{
BootstrapServers = brokerList,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = connStr,
SslCaLocation = cacertlocation,
GroupId = "consumer-dotnet",
EnableAutoCommit = false,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 6000,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true
};
const int commitPeriod = 5;
using (var consumer = new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
})
.Build())
{
consumer.Subscribe(topics);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine(
$"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine("--------------------------------------------------------------------------------------");
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Value}");
Console.WriteLine("--------------------------------------------------------------------------------------");
if (consumeResult.Offset % commitPeriod == 0)
{
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"Commit error: {e.Error.Reason}");
}
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Closing consumer.");
consumer.Close();
}
}
}
```
https://github.com/edenhill/librdkafka/wiki/FAQ#how-are-partitions-fetched