I have some legacy protobuf data, which uses its own implementation of
oneof-like functionality.
Unfortunately, the type tag is serialized as two bytes in little endian order.
The format is otherwise very regular, just a bunch of messages prefixed
with varint-encoded length.
In order to read/write such data with proto.Buffer, I'd like to read/write raw
[]byte from/to the Buffer. Right now, it's impossible to do, unless I recreate
the whole implementation of (*Buffer).DecodeMessage on top of []byte.
I'd prefer not to do that. Another alternative is to track the position manually
and use (*Buffer).Bytes to access the raw bytes, but then I can't make
Buffer skip the consumed raw bytes other than to reset the buffer.
Could we make proto.Buffer implement io.Read/io.Writer instead? I can
contribute the required change if this is reasonable.
/cc: @dsymonds
Thanks.
The protocol buffer wire format isn't a streaming protocol and doesn't lend itself to io.Reader/io.Writer. It would have to slurp it all into an internal bytes.Buffer anyway, so there's no gain.
For my use case, it's not streaming, I'm reading the entire stream
into proto.Buffer to decode it.
It's just sometimes you need to read/write non-protobuf objects from
the proto.Buffer but you don't want to implement every method of
proto.Buffer on a custom buffer again (and I want to use the batch
allocation feature of proto.Buffer)
Right now, once the data is in proto.Buffer, you don't have a way
to get them back out or know what the current position is. At least
we should consider provide a method to expose the index field.
proto.Buffer is for reading protos, not for reading non-protos, sorry.
OK. No io.Reader/io.Writer then.
But I still think exposing the index field is worthwhile.
For example, to see how much I've consumed or how
much has been written.
As an example, I want to make sure that I've decoded
the whole buffer, the current workaround is to call
(*Buffer).DecodeVaraint and see if the result is io.EOF,
but of course that's not correct.
For writing, the Bytes method will tell you how much.
I'm also interested in the reading side to verify that
I've consumed everything in the buffer.
I imagine Bytes could return the remaining []byte
unconsumed, but that's a breaking change.
Well, that's what DecodeMessage or Unmarshal are for. They'll either consume all the bytes, or return an error.
Hi, I would like to dig this up again, as I have a really good use case for it, namely Google's own AdExchange Publisher settings here: https://developers.google.com/ad-exchange/rtb/downloads/publisher-settings-proto.
Publisher settings are just a series of repeated messages that are provided to clients as a gzipped file. Our file is ~80M gzipped and 200M raw. Instead of having to load the whole file into memory first, it would be nice to consume the repeated messages from an io.Reader instead. This is my workaround at the moment:
func parse() error {
f, err := os.Open("./publisher_settings.pb.gz")
if err != nil {
return err
}
defer f.Close()
z, err := gzip.NewReader(f)
if err != nil {
return err
}
defer z.Close()
var (
b []byte
list []pb.PublisherSettings
)
r := bufio.NewReader(z)
for {
u, err := binary.ReadUvarint(r)
if err == io.EOF {
break
} else if err != nil {
return err
}
if tag := u >> 3; tag != 1 {
return fmt.Errorf("unexpected field tag %d", tag)
}
if wire := u & 0x07; wire != proto.WireBytes {
return proto.ErrInternalBadWireType
}
sz, err := binary.ReadUvarint(r)
if err != nil {
return err
}
n := int(sz)
if cap(b) < n {
b = make([]byte, 0, n)
}
buf := b[:n]
if _, err := io.ReadFull(r, buf); err != nil {
return err
}
var msg pb.PublisherSettings
if err := proto.Unmarshal(buf, &msg); err != nil {
return err
}
list = append(list, msg)
}
log.Println("read", len(list))
return nil
}
@awalterschulze thanks, we are heavy users of that, except it's not really the same protocol. we have an extra varint (field/wire indicator) in the stream...
Oh wow sorry I didn't spot that. I just assumed it was the same.
I welcome a pull request with another reader/writer implementation that covers this case.
These are really easy to maintain :)
IIUC, you are essentially reading a message that is equivalent to: message { repeated PublisherSettings = 1; }
I understand your use-case and your desire to avoid slurping in all 200M, but this seems specialized enough that the workaround logic you are doing seems fine. An additional 40 lines is not that bad.
Ended up on this issue because I was looking for the same thing.
My use case is multiple processes that communicate through anonymous pipes.
I thought that it could be a good thing to start learning about protobufs because I was looking for a way to serialize data in a streaming fashion, which I assumed was the case by default.
Although I have nothing to contribute to this issue, I'll be looking for a way to make it work in another way, maybe by writing the result of Size() in uint32 format network byte ordered, so the reader know how many bytes to read? This is obviously just an idea that came to mind reading the API
Ok I've tried it and it's pretty reliable, so the goal is to do something like I suggested, you can start by sending the int32's size of the message, then write the message.
The opposite has to be implemented on the other side.
It doesn't include any error mechanism, if something is off just by 1 byte, it's over.
In short as long as it's pipes or unix socket, I guess it's fine. Use tcp for the rest.
I'll try to come up with a library that does that.
@tehmoon no need, there is a library for this already, see https://godoc.org/github.com/gogo/protobuf/io.
The problem described here is not about a stream of messages but about a single message with many repeated fields. It would be nice to be able to read a single message from a reader.
@dim thank you for sharing this! It looks perfect.
Sorry I misunderstood the problem for this issue. I should have re-read the whole thing before posting, the answers were there.