Google-cloud-node: is flowControl.maxMessages option honored in 0.14.x to limit calls to handler

Created on 1 Oct 2017  Â·  28Comments  Â·  Source: googleapis/google-cloud-node

Environment details

  • OS: Linux (ubuntu)
  • Node.js version: 8.5.0
  • npm version: 5.3.0
  • google-cloud-node version: @google-cloud/pubsub 0.14.2

Steps to reproduce

Is it possible to get or limit the inflight messages using the new 0.14.x pubsub library?

I'm calling something like the following
this.subscription = this.topic.subscription(this.options.name, this.options.subscription);

Even if I get a topic.subscription with the following options it appears to call the my handler with as many messages in in the queue.

{
  flowControl: {
    maxMessages: 1
  }
}
pubsub

Most helpful comment

@thomas-hilaire @rhodgkins I've moved your issue to https://github.com/googleapis/nodejs-pubsub/issues/2, so we can track it there.

All 28 comments

This happens because the PubSub API doesn't allow us to limit how many results it sends us, so we use those numbers as an indicator to stop making requests to the API if they are hit/exceeded. Unfortunately in this case its easy to exceed.

@lukesneeringer should we only emit the specified number of messages listed in maxMessages and store the rest in memory while modifying the ack deadline along the way?

/cc @jganetsk

Thanks @callmehiphop seems like this is a behavior change from the 0.13 client where I could specifically do 1 thing at a time if I wanted. For my specific use case I only can do one thing at a time due to the intensity of the jobs I'm doing.

should we only emit the specified number of messages listed in maxMessages and store the rest in memory while modifying the ack deadline along the way?

We should not do this. The point of flow control is to prevent the process from dying because it ran out of memory. Storing messages in memory defeats that purpose, and it's not very useful to the user to have a message in memory but not to emit it.

Is it possible to get or limit the inflight messages using the new 0.14.x pubsub library?

It sounds like the user wants flow control to be on, not off. But it's supposed to be on by default, so is there a bug in flow control?

It sounds like the user wants flow control to be on, not off. But it's supposed to be on by default, so is there a bug in flow control?

Maybe? @jganetsk I'm a little lost on how we are supposed to process payloads with message sizes that exceed flow control limits. As it is today all the flowControl knobs do are control when we pause/resume connections. If we don't want overflow messages in memory should we be nacking them?

We have a lower bound on flow control limits, iirc, can you take a look at what those are? I think it's ok to overflow by 1 message (which means we can only overflow by up to 10MB, since that's the max message size in Pub/Sub), but beyond that we probably should nack any additional overflowing messages.

I don't think there are bounds on any of the flow control limits. We have defaults of

  • maxBytes - 20% of available memory
  • maxMessages - Infinity

But I do not believe that there were any bounds specified in the design doc.

I might have been thinking about limits on batching settings. Well regardless, my point about overflowing 1 message still holds and nacking otherwise still holds.

@jganetsk sounds good, thanks for the input!

@jganetsk thanks for taking the time to comment on this issue. I'm not sure I follow completely with what you and @callmehiphop were discussing so maybe it's going to be resolved.... I wanted to clarify what i wanted to happen. I want maxMessages on and set to emit 1 at a time. It doesn't appear to honor that.

Thank you again.

@evilpacket I just cut a new release (0.14.4) that should make maxMessages behave as you intend. Thanks for reporting this!

hello @callmehiphop,

I encounter the same issue than @evilpacket with the version 0.14.4.
My simple testing code is

const options = { flowControl: { maxMessages: 1 }, ackDeadlineSeconds: 20 };
const [subscription] = await this.pubsub.createSubscription('any-topic', 'my-sub', options);
subscription.on('message', (message) => {
      console.log(‘MSG ID’, new Date(), message.id);
      setTimeout(() => {
        console.log(‘acking’, new Date(), message.id);
        msg.ack();
      }, 18000);
    });
  });

The librairy seems to well respect the ackDeadlineSeconds but not the maxMessages, as the output is:

MSG ID 2017-10-09T15:45:19.934Z 152454070363484
MSG ID 2017-10-09T15:45:21.167Z 152453547904853 // wrong, we got 2 messages at the same time
acking 2017-10-09T15:45:37.939Z 152454070363484
MSG ID 2017-10-09T15:45:37.941Z 152454417672984
acking 2017-10-09T15:45:39.167Z 152453547904853
MSG ID 2017-10-09T15:45:39.168Z 152454417672984
acking 2017-10-09T15:45:55.946Z 152454417672984
MSG ID 2017-10-09T15:45:55.947Z 152450191168159
acking 2017-10-09T15:45:57.173Z 152454417672984
MSG ID 2017-10-09T15:45:57.174Z 152457459578813
MSG ID 2017-10-09T15:45:57.174Z 152457459578813 // wrong, we now have 3 unacked messages in our hands
acking 2017-10-09T15:46:13.951Z 152450191168159
acking 2017-10-09T15:46:15.176Z 152457459578813
acking 2017-10-09T15:46:15.177Z 152457459578813

Any idea about how we can get this "flow control" working as expected?

Thanks you!

@thomas-hilaire the latest release has allowed for a single message to be overflowed, meaning at any given time you could have maxMessages + 1 unacked messages, which would explain your logs.

Maybe we can overflow by 1 message on maxBytes, but I don't think we should overflow ever on maxMessages.

@jganetsk ah, I may have misunderstood.

Hello,

I just cut a new release (0.14.4) that should make maxMessages behave as you intend.

@callmehiphop by reading this, I understood that you removed the overflow on maxMessages. For me, having multiple messages in memory is not a real problem. However, I expect to get them one by one, it is a common pattern that most programmers require from a pubsub.

If you plan to nack overflowed messages, I would like to know where a nacked message will go in the subscription queue, to the head, to the tail or somewhere between that (I don't find this information in the documentation)? I don't expect a strict ordering but we can't send to the tail every overflowed message. In this case, I prefer to keep the message in memory by the library while the first message still non-acked.

Thanks for your help!

@thomas-hilaire There are no ordering guarantees with pub/sub. If you nack a message, it will be redelivered sooner or later. Probably sooner. And for example, if you have multiple processes pulling in parallel (each with maxMessages = 1), the message might be redelivered to another process as soon as that process finishes with whatever one message it was processing.

But maxMessages = 1 is not necessarily a common pattern with users of Cloud Pub/Sub, who often demand high throughput, and therefore need to process many messages simultaneously. Are you sure maxMessages = 1 is necessary for your use case?

Thanks for your explanations, the nack behavior that you explained fits my needs.

The maxMessages = 1 helps me to implement the task-queue/worker pattern, I don't really need high throughput. I may code this behavior by myself in each micro-service I have, but having it directly in the library would be much more comfortable as I have a lot of them already in production.

@callmehiphop Can you remind me why nacks are necessary to solve this problem, for maxMessages in particular? As soon as numMessages == maxMessages, the library should "slam the brakes" and immediately suspend all connection streams. Because Node is single-threaded, I wouldn't expect any more message events to be emitted after this point. But maybe my assumption is wrong here?

Note that in our code review, I was very particular that we deal with maxMessages differently from maxBytes and make sure we don't have this kind of off-by-one bug. The cases around maxMessages all used == and would take action immediately on the arrival of the message that reached the limit, while the maxBytes cases would accept messages as long as numBytes < maxBytes, with the possibility of going over the limit.

I think because the API sends more than what the user wants: https://github.com/GoogleCloudPlatform/google-cloud-node/issues/2638#issuecomment-333385590

@callmehiphop Actually, I might again be confusing flow control and batching. We need to be pretty careful about all these off-by-one bugs in both batching and flow control.

Hello, thanks for your update but I continue unfortunately to encounter the issue with the version 0.14.5 of the lib.

I did the same simple test than for my previous comment (https://github.com/GoogleCloudPlatform/google-cloud-node/issues/2638#issuecomment-335206769) and I continue to get more messages than maxMessages. My limit is 1, I often have 2 but I can even have 3 or more of them at the same time.

I also noticed a more important issue, the same message can be delivered many times!

Note that in my tests, messages are acked after having waited for 18 seconds and I don't encounter these issues when the wait time is around 10ms (the ack deadline is at 60 seconds).

GET 2017-10-16T09:39:31.070Z 160876962074917
ACK 2017-10-16T09:39:49.076Z 160876962074917

GET 2017-10-16T09:39:49.078Z 160883478723260
ACK 2017-10-16T09:40:07.083Z 160883478723260

GET 2017-10-16T09:40:07.084Z 160876273930628
ACK 2017-10-16T09:40:25.085Z 160876273930628

GET 2017-10-16T09:40:25.086Z 160877349571364
ACK 2017-10-16T09:40:43.089Z 160877349571364

// BAD, I’m getting the same message than the one before
GET 2017-10-16T09:40:43.091Z 160877349571364
ACK 2017-10-16T09:41:01.090Z 160877349571364

// BAD, I’m getting two messages in my hands
GET 2017-10-16T09:41:01.091Z 160882282679698
GET 2017-10-16T09:41:01.091Z 160876273930628
ACK 2017-10-16T09:41:19.091Z 160882282679698
ACK 2017-10-16T09:41:19.092Z 160876273930628

// BAD, I’m getting the same message than the one just acked
// AND, I'm getting it thrice
// AND, I’m getting 4 messages in my hands
GET 2017-10-16T09:41:19.092Z 160882282679698
GET 2017-10-16T09:41:19.093Z 160882282679698
GET 2017-10-16T09:41:19.093Z 160882282679698
GET 2017-10-16T09:41:19.093Z 160876273930628
ACK 2017-10-16T09:41:37.096Z 160882282679698
ACK 2017-10-16T09:41:37.097Z 160882282679698
ACK 2017-10-16T09:41:37.098Z 160882282679698
ACK 2017-10-16T09:41:37.098Z 160876273930628

// BAD, the same message delivered for the fifth time.
GET 2017-10-16T09:41:37.099Z 160882282679698
ACK 2017-10-16T09:41:55.099Z 160882282679698

Thanks you!

@thomas-hilaire the redelivery of acked messages is something that the API team is aware of and I believe a fix is coming soon. I'll update this issue (and likely put out a new release) when the change goes live.

@callmehiphop was the redelivery of acked messages fixed? Cheers in advance!

@rhodgkins it should be as of 0.14.8. I believe we just released a new version (0.15) as well.

I continue to have the redelivery of acked messages with the release 0.15. It seems that more the ack call is delayed, more the redelivery issue occurs. I don't understand why it's so easy to reproduce for me and not for others, this is really an annoying issue (I need to track in a DB which message has been already processed).

@callmehiphop do we open a new issue about that or we reopen this one?

Else the 0.15 release is great and I confirm that the flowControl.maxMessages option is well honored!

@thomas-hilaire I'm getting the same issue with redelivery, I raised a separate issue if that helps - #2756!

@thomas-hilaire @rhodgkins I've moved your issue to https://github.com/googleapis/nodejs-pubsub/issues/2, so we can track it there.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

positlabs picture positlabs  Â·  3Comments

VikramTiwari picture VikramTiwari  Â·  3Comments

hvolschenk picture hvolschenk  Â·  4Comments

bamapookie picture bamapookie  Â·  5Comments

stephenplusplus picture stephenplusplus  Â·  4Comments