Confluent-kafka-dotnet: Consumer subscribed to multiple topics only fetches message to a single topic

Created on 24 May 2019  路  1Comment  路  Source: confluentinc/confluent-kafka-dotnet

Description

Consumer subscribed to multiple topics only fetches message to a single topic.

How to reproduce

Confluent.Kafka nuget version - 1.0.0
Kafka type - Kafka enabled Azure event hub

Following is the consumer code -

```csharp
public static void Run_Consume(string brokerList, List topics, CancellationToken cancellationToken, string connStr, string cacertlocation)
{
var config = new ConsumerConfig
{
BootstrapServers = brokerList,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = connStr,
SslCaLocation = cacertlocation,

            GroupId = "consumer-dotnet",
            EnableAutoCommit = false,
            StatisticsIntervalMs = 5000,
            SessionTimeoutMs = 6000,
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnablePartitionEof = true
        };

        const int commitPeriod = 5;
        using (var consumer = new ConsumerBuilder<Ignore, string>(config)
            .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
            .SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
            .SetPartitionsAssignedHandler((c, partitions) =>
            {
                Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
            })
            .SetPartitionsRevokedHandler((c, partitions) =>
            {
                Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
            })
            .Build())
        {
            consumer.Subscribe(topics);
            try
            {
                while (true)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(cancellationToken);

                        if (consumeResult.IsPartitionEOF)
                        {
                            Console.WriteLine(
                                $"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");

                            continue;
                        }

                        Console.WriteLine("--------------------------------------------------------------------------------------");
                        Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Value}");
                        Console.WriteLine("--------------------------------------------------------------------------------------");

                        if (consumeResult.Offset % commitPeriod == 0)
                        {
                            try
                            {
                                consumer.Commit(consumeResult);
                            }
                            catch (KafkaException e)
                            {
                                Console.WriteLine($"Commit error: {e.Error.Reason}");
                            }
                        }
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Consume error: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Closing consumer.");
                consumer.Close();
            }
        }
    }

```

question

>All comments

Was this page helpful?
0 / 5 - 0 ratings

Related issues

nitinpi picture nitinpi  路  4Comments

zoeysaurusrex picture zoeysaurusrex  路  4Comments

Eibwen picture Eibwen  路  3Comments

MihaiComan87 picture MihaiComan87  路  3Comments

andreas-soroko picture andreas-soroko  路  3Comments