I am trying to get topic/partition info from GroupInfo.Member.MemberAssignment. This property is in byte[] and would like to know the best way to decode this. I tried different types of string encoding but was not able to parse the topics in a clean way.
Link: https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.GroupMemberInfo.html
This doc shows that the format of this data depends on the protocol type. I would like to know what this "protocol type" is supposed to be.
Please provide the following information:
I believe the format for consumers is:
MemberAssignment => Version PartitionAssignment
Version => int16
PartitionAssignment => [Topic [Partition]]
Topic => string
Partition => int32
UserData => bytes
(in standard kafka protocol notation, you can look that up), and is equal the most recent assignments sent in the SyncGroupResponse from the group leader. user data will depend on the partition assignor (typically empty).
we haven't yet implemented deserializing this in librdkafka or any of the non-java clients.
Thank you!
@mhowlett is the there some issue we can track in terms of Implementing deserialization of MemberAssignment to strongly typed object? If there is no such issue can we convert this one from question to feature request? It would be great to have one implementation of this deserialization instead of everyone creating their own.
Here's a little method I wrote to decode it:
private string DecodeMemberAssignment(byte[] b) {
/*
https://kafka.apache.org/protocol
STRING Represents a sequence of characters. First the length N is given as an INT16. Then N bytes follow which are the UTF-8 encoding of the character sequence. Length must not be negative.
INT16 Represents an integer between -2^15 and 2^15-1 inclusive. The values are encoded using two bytes in network byte order (big-endian).
INT32 Represents an integer between -2^31 and 2^31-1 inclusive. The values are encoded using four bytes in network byte order (big-endian).
BYTES Represents a raw sequence of bytes. First the length N is given as an INT32. Then N bytes follow.
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
MemberAssignment => Version PartitionAssignment
Version => int16
PartitionAssignment => [Topic [Partition]]
Topic => string
Partition => int32
UserData => bytes
Note: [] probably denotes a sequence of same type items, begining with Int32 of the # of such items
*/
System.Text.UTF8Encoding enc = new UTF8Encoding();
StringBuilder s = new StringBuilder();
try {
short version = SwapEndianness(BitConverter.ToInt16(b, 0));
int num_topic_assignments = SwapEndianness(BitConverter.ToInt32(b, 2));
int i = 6;
for (int t = 0; t < num_topic_assignments; t++) {
short topic_len = SwapEndianness(BitConverter.ToInt16(b, i));
byte[] str = new byte[topic_len];
Array.Copy(b, i + 2, str, 0, topic_len);
string topic = enc.GetString(str);
i += (topic_len + 2);
int num_partition = SwapEndianness(BitConverter.ToInt32(b, i));
if (s.Length > 0) s.Append($"; ");
s.Append($"{topic}: ");
for (int j = 0; j < num_partition; j++) {
i += 4;
s.Append($"{SwapEndianness(BitConverter.ToInt32(b, i))}{(j < num_partition - 1 ? "," : "")}");
}
}
return s.ToString();
} catch {
return "";
}
}
public static int SwapEndianness(int value) {
var b1 = (value >> 0) & 0xff;
var b2 = (value >> 8) & 0xff;
var b3 = (value >> 16) & 0xff;
var b4 = (value >> 24) & 0xff;
return b1 << 24 | b2 << 16 | b3 << 8 | b4 << 0;
}
public Int16 SwapEndianness(Int16 i) {
return (Int16)((i << 8) + (i >> 8));
}
Most helpful comment
@mhowlett is the there some issue we can track in terms of Implementing deserialization of
MemberAssignmentto strongly typed object? If there is no such issue can we convert this one from question to feature request? It would be great to have one implementation of this deserialization instead of everyone creating their own.