Google-cloud-python: FlowControl / max_messages is not working as expected

Created on 22 Feb 2018  路  11Comments  路  Source: googleapis/google-cloud-python

Hi,

I am trying to implement a simple task queue using Pub/Sub and I think I am running into a bug with _FlowControl_.

I have a few workers that need to execute a callback one message by one (without concurrency) and I use the _FlowControl_ and max_messages=1 for that. I am expecting that every worker receive one message, process it, ack it, then pull another one.

import time
from google.cloud import pubsub
from google.cloud.pubsub import types

subscriber = pubsub.SubscriberClient()

subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
    project_id='my-project-id',
    sub='my-subscription'
)

flow_control = types.FlowControl(max_messages=1)

subscription = subscriber.subscribe(subscription_name, flow_control=flow_control)

def callback(message):
    print('Received')    
    time.sleep(5)
    print('Done')
    message.ack()

future = subscription.open(callback)

future.result()

But when I publish a few messages using the following code, they are all received and processed concurrently, by a single worker.

from google.cloud import pubsub
from google.cloud.pubsub import types

publisher = pubsub.PublisherClient()

topic = publisher.topic_path('my-project-id', 'my-topic')

for i in range(1,5):    
    publisher.publish(topic, b'my-message')

Am I missing something or is this a bug ?

Thanks !

My environment details

  • OS: Ubuntu 17.10
  • Python: 3.6
  • google-cloud-pubsub : 0.30.1
question pubsub p2 awaiting information triaged for GA

Most helpful comment

For anyone who stumbles across this, this has been fixed here: https://github.com/googleapis/google-cloud-python/issues/7677

All 11 comments

I have the same problem on Python 3.5

This is expected behavior. Essentially, The API does not give us one message at a time - it gives us around 50. With the default executor, you will process about 10 at a time. This means that flow control values below a certain threshold have no effect.

This generally isn't a huge issue. You can make processing serial by having each callback acquire a shared lock - your subscriber will just have several messages waiting to be processing which is normal.

You could also make it serial by providing your own executor with max_workers=1, but the subscriber will still queue up pending messages to be processed.

import concurrent.futures

import google.cloud.pubsub_v1.types
import google.cloud.pubsub_v1.policy.thread

flow_control = google.cloud.pubsub_v1.types.FlowControl(...)
policy = google.cloud.pubsub_v1.policy.thread.Policy(
    subscriber, subscription,
    executor=concurrent.futures.ThreadPoolExecutor(max_workers=1))

policy.open(callback)

It might be useful to describe why you need to process messages serially. It's definitely not out of the question for us to consider such a feature.

/cc @kir-titievsky

@jonparrott we had a similar issue reported in Node a while back and we received an OK from the PubSub team to nack() any overflow messages. Should we be removing this functionality to align with the Python client?

That is a great question for @kir-titievsky, I'll start an internal thread.

Thanks for your answer but the issue with the shared lock is that it doesn't prevent a worker from receiving several messages and keeping it until the end of the processing of the first one.

Is there any way to distribute 1000 messages to 1000 workers with PubSub so that each worker receive one message ?

My use case is that I have an embarrassingly parallel task, splittable into 1000 sub-tasks and each one consume the entire memory of my worker machine.

I have tried to nack() every undesirable message but it seems that other workers can't see them before the end of the process of the current message. That means that I cannot parallelize my task.

I see. We're chatting internally on what we want to do here.

Ok thanks.

@Etendard7 ... in the meantime, if you are able to receive messages using a POST endpoint, Pub/Sub's push subscribers will do exactly what you are asking for. What do you think?

Thanks.
I ended up using one subscription for every worker so that they all receive every message. I made them ignore the messages not addressed to them using a message attribute containing the target worker id.
It's not optimal but it works for me.

@Etendard7 glad you found a workaround.

After discussing internally, the behavior that the Python client does is preferred. If we were to nack overflow messages, we increase the likelihood of duplicate messages.

I'm closing this for now, but feel free to continue discussing.

For anyone who stumbles across this, this has been fixed here: https://github.com/googleapis/google-cloud-python/issues/7677

Was this page helpful?
0 / 5 - 0 ratings

Related issues

rmceoin picture rmceoin  路  3Comments

tweeter0830 picture tweeter0830  路  4Comments

pongad picture pongad  路  4Comments

blaflamme picture blaflamme  路  3Comments

stevenschlansker picture stevenschlansker  路  3Comments