Hello,
My job is pulling messages from pubsub. It works fine until we get an error message like
transport: http2Client.notifyError got notified that the client transport was broken EOF..
Actually I'm not sure where this error message come from, but once the job gets this message,
it never receives a new message from pubsub (from next() call). And the job just keeps writing this error message.
Is there any way to debug this?
(I'm using the commit 9d965e63e8cceb1b5d7977a202f0fcb8866d6525)
Thanks,
I think the underlying grpc connection is broken, but MessageIterator.Next() doesn't return error or retry. Is there any workaround to detect this and reopen subscription?
I believe this may be related to #499. We don't have a fix yet. You say that MessageIterator.Next doesn't return an error -- does it just hang?
If so, one quick solution is to run Next with a timeout, and restart the subscription if you hit it. We're building an improved stream-based implementation of Pull, which may also help.
Yes, right. MessageIterator.Next() just hang.
And yes. it would be very helpful to handle this if we can pass timeout to Next().
You don't need to pass a timeout to Next. Do something like the following (untested):
First, create a context that you can cancel:
cctx, cancel := context.WithCancel(ctx)
defer cancel()
Use that context in your call to Pull:
iter, err := sub.Pull(cctx)
Run your call to Next with a timeout:
done := make(chan int)
var msg *pubsub.Message
var err error
go func() {
msg, err = iter.Next()
done <- 1
}()
select {
case <-time.After(1*time.Minute):
cancel() // cancel the context for the whole Pull
case <-done:
}
// check err and use msg
Yes, I got your point.
I was thinking to do a hack like this, but I meant it would be convenient if Next() provides it.
Just jumping into this thread. I'm experiencing the same issue and I'll be trying out the WithCancel trick too
Restarting the subscription doesn't seem to be enough.
After I got this error, sometimes Subscription.Pull() was stuck.
I'm trying to close/recreate the client too.
Is there any update on this? In a simple for { it.Next() } loop we see this happening around once every 2 minutes while listening on a subscription that has nothing being send on it.
I think this is a serious bug and MessageIterator.Next() should not hang in any case.
I'm still using the hack with cancel, but we have to keep reconnecting unnecessarily depending on traffic.
Got same issue, no error received, just in the logs:
2017/06/23 18:42:46 transport: http2Client.notifyError got notified that the client transport was broken EOF., quite annoying :) will implemented that timeout hack
Update: any reasonable hack to keep using func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error { without waiting for this bug to be fixed?
had a look at how grpc client is initialised and remembered that I have already seen this issue some time ago in my other project. Since it's a grpc client issue - you won't be able to fix it in pubsub package as there are any errors returned by the grpc connection.
Fix it by adding additional dialer:
import (
"google.golang.org/grpc"
)
func WithKeepAliveDialer() grpc.DialOption {
return grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
d := net.Dialer{Timeout: timeout, KeepAlive: time.Duration(10 * time.Second)}
return d.Dial("tcp", addr)
})
}
and initialise pubsub client with it:
import(
"google.golang.org/api/option"
)
clientOption := option.WithGRPCDialOption(WithKeepAliveDialer())
client, err := pubsub.NewClient(context.Background(), opts.ProjectID, clientOption)
I'm assuming that our new streaming pull implementation and/or the dialer mentioned in the previous comment solve this problem.
Most helpful comment
Is there any update on this? In a simple
for { it.Next() }loop we see this happening around once every 2 minutes while listening on a subscription that has nothing being send on it.