PubSub
locally. But same behaviour on GKE.
I have a fairly simple subscriber (tried both synchronous and asynchronous) that should constantly check for new messages and handle a bulk of messages.
This is the code you can run to reproduce:
package main
import (
"context"
"os"
"time"
"cloud.google.com/go/pubsub"
log "github.com/sirupsen/logrus"
)
func main() {
ctx := context.Background()
projectID := os.Getenv("GCLOUD_PROJECT_ID")
subscriptionID := os.Getenv("SUBSCRIPTION_ID")
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
panic(err)
}
defer client.Close()
sub := client.Subscription(subscriptionID)
// sub.ReceiveSettings.Synchronous = true
// sub.ReceiveSettings.MaxOutstandingMessages = 0
// sub.ReceiveSettings.MaxOutstandingBytes = 1000
// sub.ReceiveSettings.MaxExtension = 10
for {
// Receive messages constantly
log.Infof("check for new messages in subscription %v...", subscriptionID)
tctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// Create a channel to handle messages to as they come in.
cm := make(chan *pubsub.Message)
messages := 0
finished := false
go func() {
for {
select {
case msg := <-cm:
log.Infof("received message: %v", string(msg.Data))
messages++
// make acks conditional
defer func() {
if finished {
msg.Ack()
} else {
msg.Nack()
}
}()
case <-tctx.Done():
if messages > 0 {
log.Info("here we do something, only if it suceeds we acknowledge the messages")
finished = true
}
log.Infof("number of messages: %v", messages)
messages = 0
return
}
}
}()
// Receive blocks until the context is cancelled or an error occurs.
err = sub.Receive(tctx, func(ctx context.Context, msg *pubsub.Message) {
cm <- msg
})
if err != nil && status.Code(err) != codes.Canceled {
log.Errorf("Receive: %v", err)
}
close(cm)
}
}
this program constantly checks for new messages in the subscription. If new messages arrive, it picks up as many messages as possible within 5 seconds, process them and acknowledge them. If a lot of messages arrive in the subscription, the program should still be able to handle them by processing bulks of messages one by another.
All works fine if there are only few messages in the subscription. If I start putting load onto it, say produce 2000 messages, the program gets stuck after printing the log info number of messages: X.
I realized that my application doesn't get stuck if I execute the receiver synchronously by setting sub.ReceiveSettings.Synchronous = true. Then the only thing that does not work is to defer the acknowledgment. I want to defer it as I only want to acknowledge all messages (say, I have processed 1000 messages within the timeout of 5secs) if and only if certain operations succeed on ALL of these messages. Can I achieve that somehow?
Is it at all possible to have a synchronous approach in golang like in 90's? This is available in python, is there any reason to not implement that in go?
Hi, thanks for filing an issue!
I was able to replicate your issue when in asynchronous mode (the client hanging) and will be looking into this. In the meantime, I wanted to clarify your statement on synchronous + defer.
Then the only thing that does not work is to defer the acknowledgment. I want to defer it as I only want to acknowledge all messages (say, I have processed 1000 messages within the timeout of 5secs) if and only if certain operations succeed on ALL of these messages
Could you elaborate on this a bit more? After I enabled synchronous mode, the messages seemed to be acknowledged properly in the defer statement.
Hi @hongalex, thanks for your reply. Sorry for my late response, I had a lot to do last week.
It only works with very few messages, if you increase the throughput it's not able to process all messages:

I've used a simple script to generate 200 messages:
#!/bin/bash
for i in {1..200};do
gcloud pubsub topics publish projects/my-project/topics/test --message "test ${i}" &
done
wait
echo "all messages published"
In my real use case, it can happen that more than 40000 messages get published to PubSub resulting in more than 500 unacknowledged messages.
I played a bit with the subscription configuration: I increased the acknowledgment deadline to 600secs, then my synchronous subscriber got stuck - the logs displayed 127 acknowledged messages (out of 200 total sent messages), but on the UI I saw 150 unacked messages. I killed the subscriber afterwards, it then started again to process and ack messages - now I have 26 unacked messages in the subscription and, at least, the subscriber is not stuck. Still, it does not recognize the remaining unacked messages (maybe PubSub has to resend them after 600secs?!).
Anyway, it is behaving very strange and not in a confident expected way.
@hongalex have you been able to reproduce the issue?
Hi, sorry been slightly busy and haven't dedicated time to delve into this deeper. Regarding your point about 600s ack deadline, you are correct. Specifically, Pub/Sub will wait up to 600s before attempting to delivery the messages since up until then, it's still "possible" for your client to acknowledge the message.
No problem @hongalex .
Maybe it's "possible" but still this behaviour of not being able to just consume 200 messages isn't as expected, is it? I wrote a simple subscriber in python which doesn't have these issues.
Definitely. I wanted to clarify why Pub/Sub doesn't redeliver immediately if your subscription's ack deadline is too high. Let me look into this more and I'll try to have an update by tomorrow.
Something I noticed that was kind of weird that you might want to optimize:
By calling sub.Receive in a for loop, you're creating a lot of streams in async mode. This could lead to poor performance and weird race conditions as messages expire/acked in your defer function with the stream teardown. I recommend you move the call to outside the outer for loop, but continue send the results over a channel.
Another thing that I wasn't quite sure about is when exactly the messages are nacked in your example. It seems like if there are messages to be ack'd (if messages > 0), then finished will always be true. This seems to imply messages are never nacked. This isn't really a problem if your subscription's ack deadline is set to something lower (maybe 30 seconds or so).
Thank you for further investigation.
Regarding your first point: I have it in the for loop as I want my service to constantly listen to Pub/Sub. As I don't have a push subscription but pull subscription, I need to have it in a for loop, don't I? Otherwise my service will consume once without continuing afterwards.
Regarding the second point: yeah that's true, that isn't really a good example ;). Actually the if messages > 0 is not the only condition. I basically want to ensure this:
if messages > 0 {
log.Info("here we do something, only if it suceeds we acknowledge the messages")
// write messages to an external storage and ensure it succeeded
if writeSucceeded {
finished = true
} else {
finished = false
}
}
As I don't have a push subscription but pull subscription, I need to have it in a for loop, don't I? Otherwise my service will consume once without continuing afterwards.
The subscription.Receive call will continue to pull messages via the stream until the context passed in is cancelled. Assuming you never cancel the context and your application is running, then a single Receive call would be enough. Things are slightly trickier if you want to have processes pulling messages to load balance, but that doesn't seem to be necessary here.
Since each Receive call opens up one or more streams, this could lead to significant overhead with the constant setup/teardown every 5 seconds.
Okay, that makes sense and also explains my weird observations.
The reason why I used a cancelled context is that I don't want to write every single Pub/Sub message to the storage (if lots of messages arrive in a very short time). I rather want to first "collect", say 500 messages, and then batch all of them to the external storage.
On the other hand, the service should also be able to handle times where there are only few messages arriving and shouldn't wait to write to the storage until 500 messages have arrived.
Does it make sense?
Yeah that sounds reasonable, though I recommend separating the logic of batching from the cancellation of the "Receive" context. Instead, have a single Receive call that sends messages to a channel. Then, you can have another goroutine that handles messages coming from that channel (which batches messages and acks them).
Here's an example of what I mean.
That sounds super reasonable. Thanks a lot for your help!
Most helpful comment
I realized that my application doesn't get stuck if I execute the receiver synchronously by setting
sub.ReceiveSettings.Synchronous = true. Then the only thing that does not work is to defer the acknowledgment. I want to defer it as I only want to acknowledge all messages (say, I have processed 1000 messages within the timeout of 5secs) if and only if certain operations succeed on ALL of these messages. Can I achieve that somehow?