The kafka input used for the azure filebeat module has stopped receiving any logs recently.
Debugging the issue showed that Azure is returning an OffsetFetchResponse with an empty partition table, which causes sarama to give up since it has no blocks to fetch.
Suspecting changes on the Azure event hub side, maybe on the message format.
Existing eventhubs and new eventhubs show the same behavior (subscription type irrelevant).
The specific failure in sarama is in offset_manager.go:fetchInitialOffset. In this function the call to broker.FetchOffset is successful, but instead of returning a nonempty OffsetFetchResponse as described here: OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]], we get ["topic", []], i.e. an empty map for the second entry. This causes a failure when it tries to fetch the corresponding block with resp.GetBlock.
The description of OffsetFetchResponse seems to suggest the map shouldn't be empty:
if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1.
In our case we see no offset field at all, but we aren't confident yet whether the Azure response is actually incorrect, or just exercising a corner case of the Kafka spec that sarama doesn't recognize.
More details: If we spoof the offset response by filling in the missing partition with an initial offset of -1, then the consumer can read the topic as usual. In addition, after any consumer group offset is committed, the event hub _will_ return it on future calls, i.e. an active consumer group will work with an unmodified Filebeat. This is why we observed some setups failing but not others: active consumer groups with a committed offset continued working after the change, but any newly-created consumer group fails.
The fundamental change seems to be that in the past, an Azure event hub would return an explicit offset map for uninitialized consumer groups (which is in keeping with my understanding of the Kafka spec, and with sarama's implementation), whereas now it only returns an offset map for topic / partition pairs that have received an explicit commit.
The remaining questions are:
If Azure considers this a bug then we can probably look forward to a fix soon. Otherwise we will need to coordinate with sarama, and might need to vendor a custom workaround in the short term.
Here are some captures of raw Kafka OffsetFetchRequest / OffsetFetchResponse pairs exhibiting the problem, in base64 plus annotation.
Baseline (running against Apache Kafka):
OffsetFetchRequest: AAp0ZXN0LWdyb3VwAAAAAQAKdGVzdC10b3BpYwAAAAEAAAAA
00000000: 000a 7465 7374 2d67 726f 7570 0000 0001 ..test-group....
00000010: 000a 7465 7374 2d74 6f70 6963 0000 0001 ..test-topic....
00000020: 0000 0000 ....
OffsetFetchResponse: AAAAAQAKdGVzdC10b3BpYwAAAAEAAAAA//////////8AAAAA
00000000: 0000 0001 000a 7465 7374 2d74 6f70 6963 ......test-topic
00000010: 0000 0001 0000 0000 ffff ffff ffff ffff ................
00000020: 0000 0000 ....
Running against an Azure Event Hub:
OffsetFetchRequest: AAgkRGVmYXVsdAAAAAEACmthZmthLXdlc3QAAAABAAAAAA==
00000000: 0008 2444 6566 6175 6c74 0000 0001 000a ..$Default......
00000010: 6b61 666b 612d 7765 7374 0000 0001 0000 kafka-west......
00000020: 0000 ..
OffsetFetchResponse: AAAAAQAKa2Fma2Etd2VzdAAAAAA=
00000000: 0000 0001 000a 6b61 666b 612d 7765 7374 ......kafka-west
00000010: 0000 0000 ....
Both of these were run against new consumer groups that had never read or committed an offset before.
As described in this guide, an OffsetFetchRequest is structured as ConsumerGroup [TopicName [Partition]] and an OffsetFetchResponse is structured [TopicName [Partition Offset Metadata ErrorCode]].
Interpreting the raw bytes above:
["test-group", [("test-topic", [0])]] (requesting an offset for partition 0 of test-topic for the group test-group) and gets back [("test-topic", [(0, -1, "", 0)]]. (This is also what Azure used to return for new consumer groups, at least through late October.)["$Default", [("kafka-west", [0])] (requesting an offset for partition 0 of kafka-west for the group $Default) and gets back [("kafka-west", [])] (an empty list rather than an initial offset for the requested partition).This discrepancy is consistent for any new Azure consumer group (not just $Default) and for both single and multiple partitions. We haven't seen it in any other Kafka implementation, and in Azure the new behavior started recently -- likely within the last two weeks, but certainly within the last three.
Hey - we've fixed this on the EH side. Will update thread when we've deployed the change.
@faec has implemented a workaround on our side (inside the sarama package) which we have released in 7.5.
@arerlend , thank you for the update, keep us posted on the deployment time, we would like to test out any changes there.
@faec , we received confirmation that the fix has been deployed on all regions now
@faec , do you want to still keep this issue opened?
We are currently using the azure-eventhub input in the azure filebeat module.