Confluent-kafka-dotnet: Decoding GroupMemberInfo.memberAssignment

Created on 4 Jun 2020  路  4Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

I am trying to get topic/partition info from GroupInfo.Member.MemberAssignment. This property is in byte[] and would like to know the best way to decode this. I tried different types of string encoding but was not able to parse the topics in a clean way.
Link: https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.GroupMemberInfo.html
This doc shows that the format of this data depends on the protocol type. I would like to know what this "protocol type" is supposed to be.

How to reproduce

Checklist

Please provide the following information:

  • [ ] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [x] Confluent.Kafka nuget version.
  • [ ] Apache Kafka version.
  • [ ] Client configuration.
  • [ ] Operating system.
  • [ ] Provide logs (with "debug" : "..." as necessary in configuration).
  • [ ] Provide broker log excerpts.
  • [ ] Critical issue.
question

Most helpful comment

@mhowlett is the there some issue we can track in terms of Implementing deserialization of MemberAssignment to strongly typed object? If there is no such issue can we convert this one from question to feature request? It would be great to have one implementation of this deserialization instead of everyone creating their own.

All 4 comments

I believe the format for consumers is:

MemberAssignment => Version PartitionAssignment
  Version => int16
  PartitionAssignment => [Topic [Partition]]
    Topic => string
    Partition => int32
  UserData => bytes

(in standard kafka protocol notation, you can look that up), and is equal the most recent assignments sent in the SyncGroupResponse from the group leader. user data will depend on the partition assignor (typically empty).

we haven't yet implemented deserializing this in librdkafka or any of the non-java clients.

Thank you!

@mhowlett is the there some issue we can track in terms of Implementing deserialization of MemberAssignment to strongly typed object? If there is no such issue can we convert this one from question to feature request? It would be great to have one implementation of this deserialization instead of everyone creating their own.

Here's a little method I wrote to decode it:

private string DecodeMemberAssignment(byte[] b) {
    /*
    https://kafka.apache.org/protocol
    STRING  Represents a sequence of characters. First the length N is given as an INT16. Then N bytes follow which are the UTF-8 encoding of the character sequence. Length must not be negative.
    INT16   Represents an integer between -2^15 and 2^15-1 inclusive. The values are encoded using two bytes in network byte order (big-endian).
    INT32   Represents an integer between -2^31 and 2^31-1 inclusive. The values are encoded using four bytes in network byte order (big-endian).
    BYTES   Represents a raw sequence of bytes. First the length N is given as an INT32. Then N bytes follow.

    https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol 
    MemberAssignment => Version PartitionAssignment
      Version => int16
      PartitionAssignment => [Topic [Partition]]
        Topic => string
        Partition => int32
      UserData => bytes 

    Note: [] probably denotes a sequence of same type items, begining with Int32 of the # of such items
    */

    System.Text.UTF8Encoding enc = new UTF8Encoding();
    StringBuilder s = new StringBuilder();
    try {
        short version = SwapEndianness(BitConverter.ToInt16(b, 0));
        int num_topic_assignments = SwapEndianness(BitConverter.ToInt32(b, 2));
        int i = 6;
        for (int t = 0; t < num_topic_assignments; t++) {
            short topic_len = SwapEndianness(BitConverter.ToInt16(b, i));           
            byte[] str = new byte[topic_len];
            Array.Copy(b, i + 2, str, 0, topic_len);
            string topic = enc.GetString(str);
            i += (topic_len + 2);
            int num_partition = SwapEndianness(BitConverter.ToInt32(b, i));
            if (s.Length > 0) s.Append($"; ");
            s.Append($"{topic}: ");
            for (int j = 0; j < num_partition; j++) {
                i += 4;
                s.Append($"{SwapEndianness(BitConverter.ToInt32(b, i))}{(j < num_partition - 1 ? "," : "")}");
            }
        }
        return s.ToString();
    } catch {
        return "";
    }
}

public static int SwapEndianness(int value) {
    var b1 = (value >> 0) & 0xff;
    var b2 = (value >> 8) & 0xff;
    var b3 = (value >> 16) & 0xff;
    var b4 = (value >> 24) & 0xff;
    return b1 << 24 | b2 << 16 | b3 << 8 | b4 << 0;
}

public Int16 SwapEndianness(Int16 i) {
    return (Int16)((i << 8) + (i >> 8));
}
Was this page helpful?
0 / 5 - 0 ratings

Related issues

alfhv picture alfhv  路  3Comments

Duorman picture Duorman  路  3Comments

andreas-soroko picture andreas-soroko  路  3Comments

farodin91 picture farodin91  路  3Comments

vinodres picture vinodres  路  4Comments