Confluent-kafka-dotnet: Consumer reassignment to Offset.End does not work as expected

Created on 18 Apr 2020  路  13Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

When consumer is assigned to the end of partition then unassigned and after some time again assigned to the end of partition, it will be consuming from the offset at which it was unassigned, while the expected behaviour is to consume from the end of partition.

How to reproduce

```C#
var producer =
new ProducerBuilder(
new ProducerConfig
{
BootstrapServers = kafkaBrokers,
Acks = Acks.None
})
.Build();

Task.Run(
async () =>
{
while (true)
{
producer.Produce(
testTopic,
new Message
{
Value = DateTime.UtcNow.ToString()
});
await Task.Delay(1000);
}
});

var consumer =
new ConsumerBuilder(
new ConsumerConfig
{
GroupId = Guid.NewGuid().ToString(),
BootstrapServers = kafkaBrokers
})
.Build();

Task.Run(
() =>
{
while (true)
{
var consumeResult = consumer.Consume();
logger.Info($"Consumed: {consumeResult.Value}");
}
});

consumer.Assign(new TopicPartitionOffset(testTopic, Offset.End));
logger.Info("Subscribed.");

Task.Delay(TimeSpan.FromSeconds(5)).ContinueWith(t =>
{
consumer.Assign(Enumerable.Empty());
logger.Info("Unsubscribed.");
});

Task.Delay(TimeSpan.FromSeconds(10)).ContinueWith(t =>
{
consumer.Assign(new TopicPartitionOffset(testTopic, Offset.End));
logger.Info("Subscribed.");
});

Task.Delay(TimeSpan.FromSeconds(15)).ContinueWith(t =>
{
consumer.Assign(Enumerable.Empty());
logger.Info("Unsubscribed.");
});


Actual result:

2020-04-18 13:27:29,853 INFO [1]: Log - Subscribed.
2020-04-18 13:27:31,115 INFO [5]: Log - Consumed: 18.04.2020 13:27:30
2020-04-18 13:27:32,159 INFO [5]: Log - Consumed: 18.04.2020 13:27:31
2020-04-18 13:27:33,209 INFO [5]: Log - Consumed: 18.04.2020 13:27:32
2020-04-18 13:27:34,111 INFO [5]: Log - Consumed: 18.04.2020 13:27:33
2020-04-18 13:27:34,863 INFO [4]: Log - Unsubscribed.
2020-04-18 13:27:39,870 INFO [4]: Log - Subscribed.
2020-04-18 13:27:39,999 INFO [5]: Log - Consumed: 18.04.2020 13:27:34
2020-04-18 13:27:40,000 INFO [5]: Log - Consumed: 18.04.2020 13:27:35
2020-04-18 13:27:40,000 INFO [5]: Log - Consumed: 18.04.2020 13:27:36
2020-04-18 13:27:40,000 INFO [5]: Log - Consumed: 18.04.2020 13:27:37
2020-04-18 13:27:40,000 INFO [5]: Log - Consumed: 18.04.2020 13:27:38
2020-04-18 13:27:40,130 INFO [5]: Log - Consumed: 18.04.2020 13:27:39
2020-04-18 13:27:41,182 INFO [5]: Log - Consumed: 18.04.2020 13:27:40
2020-04-18 13:27:42,231 INFO [5]: Log - Consumed: 18.04.2020 13:27:41
2020-04-18 13:27:43,133 INFO [5]: Log - Consumed: 18.04.2020 13:27:42
2020-04-18 13:27:44,181 INFO [5]: Log - Consumed: 18.04.2020 13:27:43
2020-04-18 13:27:44,870 INFO [8]: Log - Unsubscribed.


Expected result:

2020-04-18 13:27:29,853 INFO [1]: Log - Subscribed.
2020-04-18 13:27:31,115 INFO [5]: Log - Consumed: 18.04.2020 13:27:30
2020-04-18 13:27:32,159 INFO [5]: Log - Consumed: 18.04.2020 13:27:31
2020-04-18 13:27:33,209 INFO [5]: Log - Consumed: 18.04.2020 13:27:32
2020-04-18 13:27:34,111 INFO [5]: Log - Consumed: 18.04.2020 13:27:33
2020-04-18 13:27:34,863 INFO [4]: Log - Unsubscribed.
2020-04-18 13:27:39,870 INFO [4]: Log - Subscribed.
2020-04-18 13:27:40,130 INFO [5]: Log - Consumed: 18.04.2020 13:27:39
2020-04-18 13:27:41,182 INFO [5]: Log - Consumed: 18.04.2020 13:27:40
2020-04-18 13:27:42,231 INFO [5]: Log - Consumed: 18.04.2020 13:27:41
2020-04-18 13:27:43,133 INFO [5]: Log - Consumed: 18.04.2020 13:27:42
2020-04-18 13:27:44,181 INFO [5]: Log - Consumed: 18.04.2020 13:27:43
2020-04-18 13:27:44,870 INFO [8]: Log - Unsubscribed.
```

bug

All 13 comments

you're calling Assign simultaneously with Consume, and I suspect that is the problem since librdkafka was not designed to be used in that way. i believe this should work if you interleave the calls (use consume with a timeout). without having thought / investigated this too much, I'm thinking we should probably throw an exception in this scenario. marking as enhancement to implement that.

@mhowlett this is rather simplified example, in production code I actually have Assign and Consume synchronized, I do this by pausing consumer thread before assigning and resuming after.

What I see here is that Offset.End means 2 different things depending if consumer instance was already assigned to TP or not. If it was not assigned, latest offset will be fetched from the broker. And if it was already assigned, then the offset before unassignment will be used.

I workaround this by querying the latest offset from brokers before assignment, but IMO the behavior of the Offset.End is very confusing and may cause issues to users that don't know how it really works.

At the very least this behaviour should be documented, but maybe it's worth considering to make Assign querying latest offset when user specifies Offset.End.

I agree, this sounds like a bug in librdkafka.

Can you share a more detailed program flow with regards to pause, assign, resume, etc.

@edenhill sure, will share tomorrow. It's basically a consumer in a separate thread and two ManualResetEventSlim objects. First one to know when to pause in consumer thread and the second one to know when to unpause. This was done to synchronize access to the dictionary where I store offsets for each TP the consumer is assigned to. But it also means that Assign and Consume will not be called simultaneously.

I just made a test for this #1256. You mentioned pausing the consumer, so i included that (also tried a few other variants). These all pass for me. It would be good to replicate the issue on a single thread to count out synchronization bugs. Can you make a variation of that integration test that fails?

@mhowlett sorry, I've confused you when said about pausing/unpausing. Here is how my consumer actually looks like:
```C#
public interface IKafkaConsumerListener
{
void OnConsumed(T value);
}

public interface IKafkaPartitioner
{
int MaxPartitions { get; }
Func KeySelector { get; }
Partition GetPartition(TValue message);
Partition GetPartition(string partitionKey);
}

public class KafkaConsumer : IDisposable
{
private static readonly ILog Log = LogManager.GetLogger("KafkaConsumer");

private readonly object _lock = new object();
private readonly SubscriptionsList _subscriptions = new SubscriptionsList();
private readonly Dictionary<TopicPartition, Offset> _topicPartitionOffsets = new Dictionary<TopicPartition, Offset>();
private readonly IKafkaPartitioner<T> _partitioner;
private readonly IKafkaConsumerListener<T> _listener;
private readonly IConsumer<Ignore, T> _consumer;

// Consumer state
private bool _consuming => _consumerWorking.IsSet;
private bool _disposed;
private CancellationTokenSource _consumeCancel = new CancellationTokenSource();
private TimeSpan _poolInterval = TimeSpan.FromMilliseconds(50);
private Task _consumeTask;
private readonly ManualResetEventSlim _consumerWorking = new ManualResetEventSlim(false);
private readonly ManualResetEventSlim _consumerPaused = new ManualResetEventSlim(false);

public KafkaConsumer(
    BrokerEndpoints brokerEndpoints,
    IDeserializer<T> deserializer,
    IKafkaPartitioner<T> partitioner,
    IKafkaConsumerListener<T> listener)
{
    _partitioner = partitioner;
    _listener = listener;
    _consumer =
        new ConsumerBuilder<Ignore, T>(
                new ConsumerConfig
                {
                    GroupId = Guid.NewGuid().ToString(),
                    BootstrapServers = brokerEndpoints.ToCsvString(),
                    EnableAutoCommit = false,
                    EnableAutoOffsetStore = false,
                    ConsumeResultFields = "topic",
                    AutoOffsetReset = AutoOffsetReset.Latest,
                    FetchWaitMaxMs = 50,
                    FetchErrorBackoffMs = 100
                })
            .SetLogHandler((_, m) => Log.Debug($"{m.Level}: {m.Name} {m.Message}"))
            .SetErrorHandler((_, e) => Log.Error(e))
            .SetValueDeserializer(deserializer)
            .Build();
}

public void Subscribe(string topic, string partitionKey)
{
    lock (_lock)
    {
        if (_disposed)
            throw new ObjectDisposedException("Consumer was disposed");

        var topicPartition = new TopicPartition(topic, _partitioner.GetPartition(partitionKey));
        if (_subscriptions.Contains(topicPartition, partitionKey)) return;

        StopConsumer();
        if (!_subscriptions.Contains(topicPartition))
        {
            _topicPartitionOffsets.Add(topicPartition, GetLatestOffset(topicPartition));
            Log.Debug($"Subscribed to {topic}({partitionKey}){topicPartition.Partition}");
        }
        _subscriptions.Add(topicPartition, partitionKey);
        StartConsumer();
    }
}

public void Unsubscribe(string topic, string partitionKey)
{
    lock (_lock)
    {
        if (_disposed)
            throw new ObjectDisposedException("Consumer was disposed");

        var topicPartition = new TopicPartition(topic, _partitioner.GetPartition(partitionKey));
        if (!_subscriptions.Contains(topicPartition, partitionKey)) return;

        StopConsumer();
        _subscriptions.Remove(topicPartition, partitionKey);
        if (!_subscriptions.Contains(topicPartition))
        {
            _topicPartitionOffsets.Remove(topicPartition);
            Log.Debug($"Unsubscribed from {topic}({partitionKey}){topicPartition.Partition}");
        }
        StartConsumer();
    }
}

private Offset GetLatestOffset(TopicPartition topicPartition)
{
    var offset = _consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(5));
    return offset.High - 1;
}

private void StartConsumer()
{
    if (_consuming)
        throw new InvalidOperationException();

    _consumer.Assign(_topicPartitionOffsets.Select(kv => new TopicPartitionOffset(kv.Key, kv.Value + 1)));
    _consumerWorking.Set();
    if (_consumeTask == null)
    {
        _consumeTask = Task.Run(
        async () =>
        {
            Log.Debug("Consumer started");

            while (!_consumeCancel.IsCancellationRequested)
            {
                if (!_consumerWorking.IsSet)
                {
                    _consumerPaused.Set();
                    _consumerWorking.Wait();
                    _consumerPaused.Reset();
                }

                ConsumeResult<Ignore, T> consumeResult = null;
                try
                {
                    consumeResult = _consumer.Consume(TimeSpan.Zero);
                }
                catch (ConsumeException e)
                {
                    Log.Error(e.Error);
                    // Kafka client handles most of the errors internally, so we just need to keep consuming
                }

                if (consumeResult == null)
                {
                    try
                    {
                        await Task.Delay(_poolInterval, _consumeCancel.Token);
                    }
                    catch (TaskCanceledException)
                    {
                        // ignore
                    }

                    continue;
                }

                _topicPartitionOffsets[consumeResult.TopicPartition] = consumeResult.Offset;

                var partitionKey = _partitioner.KeySelector(consumeResult.Value);
                if (!_subscriptions.Contains(consumeResult.TopicPartition, partitionKey))
                {
                    continue;
                }

                try
                {
                    _listener.OnConsumed(consumeResult.Value);
                }
                catch (Exception e)
                {
                    Log.Error("Consumer listener error", e);
                }
            }

            _consumer.Close();
            Log.Debug("Consumer stopped");
        });
    }
}

private void StopConsumer()
{
    if (!_consuming)
        return;

    _consumerWorking.Reset();
    _consumerPaused.Wait();
}

public void Dispose()
{
    lock (_lock)
    {
        if (_disposed)
            return;

        _consumeCancel.Cancel();
        _disposed = true;
        Log.Debug("Consumer disposed");
    }
}

private class SubscriptionsList
{
    private readonly Dictionary<TopicPartition, HashSet<string>> _items = new Dictionary<TopicPartition, HashSet<string>>();

    public bool Contains(TopicPartition topicPartition) => _items.ContainsKey(topicPartition);
    public bool Contains(TopicPartition topicPartition, string partitionKey) =>
        _items.TryGetValue(topicPartition, out var partitions) && partitions.Contains(partitionKey);

    public void Add(TopicPartition topicPartition, string partitionKey)
    {
        if (_items.TryGetValue(topicPartition, out var partitionKeys))
            partitionKeys.Add(partitionKey);
        else
            _items[topicPartition] = new HashSet<string> { partitionKey };
    }

    public void Remove(TopicPartition topicPartition, string partitionKey)
    {
        _items[topicPartition].Remove(partitionKey);
        if (_items[topicPartition].Count == 0)
            _items.Remove(topicPartition);
    }
}

}
`` So I only pause the consumer thread, but not the consumer itself. Note that the whole point of this class is to let client subscribe to multiple TopicPartitions dynamically as well as to only provide a subset of messages in TP that client is interested in(by usingpartitionKey`). Hopefully this is not too crazy stuff 馃槃.

This example already includes a workaround for the Offset.End problem that this ticket is all about, see GetLatestOffset method. Anyway I will provide a better example for the Offset.End problem later today.

@mhowlett here is a better reproduction which does not impose any concurrency on a consumer:
```C#
var producer =
new ProducerBuilder(
new ProducerConfig
{
BootstrapServers = kafkaBrokers,
Acks = Acks.None
})
.Build();

Task.Run(
async () =>
{
while (true)
{
producer.Produce(
testTopic,
new Message
{
Value = DateTime.UtcNow.ToString()
});
await Task.Delay(1000);
}
});

var consumer =
new ConsumerBuilder(
new ConsumerConfig
{
GroupId = Guid.NewGuid().ToString(),
BootstrapServers = kafkaBrokers
})
.Build();

void Consume(int n)
{
for (int i = 0; i < n; i++)
{
Thread.Sleep(1000);
var consumeResult = consumer.Consume(TimeSpan.FromSeconds(0.5));
logger.LogInformation($"Consumed: {consumeResult?.Offset} {consumeResult?.Value ?? "nothing"}");
}
}

consumer.Assign(new TopicPartitionOffset(testTopic, Offset.End));
logger.LogInformation("Subscribed.");

Consume(5);

consumer.Assign(Enumerable.Empty());
logger.LogInformation("Unsubscribed.");

Consume(5);

consumer.Assign(new TopicPartitionOffset(testTopic, Offset.End));
logger.LogInformation("Subscribed.");

Consume(5);

This code produces the following log:

2020-04-21 22:56:16,983 INFO [1]: Test.Program - Subscribed.
2020-04-21 22:56:18,498 INFO [1]: Test.Program - Consumed: nothing
2020-04-21 22:56:19,501 INFO [1]: Test.Program - Consumed: 6702 21.04.2020 19:56:18
2020-04-21 22:56:20,502 INFO [1]: Test.Program - Consumed: 6703 21.04.2020 19:56:19
2020-04-21 22:56:21,503 INFO [1]: Test.Program - Consumed: 6704 21.04.2020 19:56:20
2020-04-21 22:56:22,504 INFO [1]: Test.Program - Consumed: 6705 21.04.2020 19:56:22
2020-04-21 22:56:22,506 INFO [1]: Test.Program - Unsubscribed.
2020-04-21 22:56:24,008 INFO [1]: Test.Program - Consumed: nothing
2020-04-21 22:56:25,510 INFO [1]: Test.Program - Consumed: nothing
2020-04-21 22:56:27,011 INFO [1]: Test.Program - Consumed: nothing
2020-04-21 22:56:28,513 INFO [1]: Test.Program - Consumed: nothing
2020-04-21 22:56:30,015 INFO [1]: Test.Program - Consumed: nothing
2020-04-21 22:56:30,016 INFO [1]: Test.Program - Subscribed.
2020-04-21 22:56:31,017 INFO [1]: Test.Program - Consumed: 6706 21.04.2020 19:56:23
2020-04-21 22:56:32,018 INFO [1]: Test.Program - Consumed: 6707 21.04.2020 19:56:24
2020-04-21 22:56:33,020 INFO [1]: Test.Program - Consumed: 6708 21.04.2020 19:56:25
2020-04-21 22:56:34,021 INFO [1]: Test.Program - Consumed: 6709 21.04.2020 19:56:26
2020-04-21 22:56:35,023 INFO [1]: Test.Program - Consumed: 6710 21.04.2020 19:56:27

As you can see even though I subscribe to `Offset.End`, it still gives me not the most recent messages. On the other hand consider the following code:
```C#
consumer.Assign(new TopicPartitionOffset(testTopic, Offset.End));
logger.LogInformation("Subscribed.");

Consume(5);

consumer.Assign(Enumerable.Empty<TopicPartition>());
logger.LogInformation("Unsubscribed.");

Consume(5);

var offset = consumer.QueryWatermarkOffsets(testTopic, TimeSpan.FromSeconds(2));
consumer.Assign(new TopicPartitionOffset(testTopic, offset.High));
logger.LogInformation("Subscribed.");

Consume(5);

This way I get latest messages:

2020-04-21 22:58:51,518 INFO  [1]: Test.Program - Subscribed.
2020-04-21 22:58:52,805 INFO  [1]: Test.Program - Consumed: 6852 21.04.2020 19:58:52
2020-04-21 22:58:53,806 INFO  [1]: Test.Program - Consumed: 6853 21.04.2020 19:58:53
2020-04-21 22:58:54,808 INFO  [1]: Test.Program - Consumed: 6854 21.04.2020 19:58:54
2020-04-21 22:58:55,809 INFO  [1]: Test.Program - Consumed: 6855 21.04.2020 19:58:55
2020-04-21 22:58:56,810 INFO  [1]: Test.Program - Consumed: 6856 21.04.2020 19:58:56
2020-04-21 22:58:56,811 INFO  [1]: Test.Program - Unsubscribed.
2020-04-21 22:58:58,313 INFO  [1]: Test.Program - Consumed:  nothing
2020-04-21 22:58:59,814 INFO  [1]: Test.Program - Consumed:  nothing
2020-04-21 22:59:01,316 INFO  [1]: Test.Program - Consumed:  nothing
2020-04-21 22:59:02,817 INFO  [1]: Test.Program - Consumed:  nothing
2020-04-21 22:59:04,318 INFO  [1]: Test.Program - Consumed:  nothing
2020-04-21 22:59:04,614 INFO  [1]: Test.Program - Subscribed.
2020-04-21 22:59:05,616 INFO  [1]: Test.Program - Consumed: 6864 21.04.2020 19:59:04
2020-04-21 22:59:06,617 INFO  [1]: Test.Program - Consumed: 6865 21.04.2020 19:59:05
2020-04-21 22:59:07,618 INFO  [1]: Test.Program - Consumed: 6866 21.04.2020 19:59:06
2020-04-21 22:59:08,618 INFO  [1]: Test.Program - Consumed: 6867 21.04.2020 19:59:07
2020-04-21 22:59:09,619 INFO  [1]: Test.Program - Consumed: 6868 21.04.2020 19:59:08

thanks for the attention to detail - have replicated this and made some progress in turning your scenario into an integration test case here: https://github.com/mhowlett/confluent-kafka-dotnet/blob/assign2test/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Assign2.cs

indeed seems to be a bug

I wonder if this will be fixed by https://github.com/edenhill/librdkafka/issues/2782

thanks for following up - https://github.com/edenhill/librdkafka/issues/2782 is fixed in 1.4.2 and unfortunately i can still replicate using the Consumer_Assign2 test above. fyi: @edenhill

the issue is here (identified by @edenhill): https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_offset.c#L793 - the cached offset is being used when this isn't valid. expect a fix in 1.5.

fixed in https://github.com/edenhill/librdkafka/pull/2876 (will be resolved in v1.5)

Was this page helpful?
0 / 5 - 0 ratings