Hi,
I am trying to implement a simple task queue using Pub/Sub and I think I am running into a bug with _FlowControl_.
I have a few workers that need to execute a callback one message by one (without concurrency) and I use the _FlowControl_ and max_messages=1 for that. I am expecting that every worker receive one message, process it, ack it, then pull another one.
import time
from google.cloud import pubsub
from google.cloud.pubsub import types
subscriber = pubsub.SubscriberClient()
subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
project_id='my-project-id',
sub='my-subscription'
)
flow_control = types.FlowControl(max_messages=1)
subscription = subscriber.subscribe(subscription_name, flow_control=flow_control)
def callback(message):
print('Received')
time.sleep(5)
print('Done')
message.ack()
future = subscription.open(callback)
future.result()
But when I publish a few messages using the following code, they are all received and processed concurrently, by a single worker.
from google.cloud import pubsub
from google.cloud.pubsub import types
publisher = pubsub.PublisherClient()
topic = publisher.topic_path('my-project-id', 'my-topic')
for i in range(1,5):
publisher.publish(topic, b'my-message')
Am I missing something or is this a bug ?
Thanks !
My environment details
I have the same problem on Python 3.5
This is expected behavior. Essentially, The API does not give us one message at a time - it gives us around 50. With the default executor, you will process about 10 at a time. This means that flow control values below a certain threshold have no effect.
This generally isn't a huge issue. You can make processing serial by having each callback acquire a shared lock - your subscriber will just have several messages waiting to be processing which is normal.
You could also make it serial by providing your own executor with max_workers=1, but the subscriber will still queue up pending messages to be processed.
import concurrent.futures
import google.cloud.pubsub_v1.types
import google.cloud.pubsub_v1.policy.thread
flow_control = google.cloud.pubsub_v1.types.FlowControl(...)
policy = google.cloud.pubsub_v1.policy.thread.Policy(
subscriber, subscription,
executor=concurrent.futures.ThreadPoolExecutor(max_workers=1))
policy.open(callback)
It might be useful to describe why you need to process messages serially. It's definitely not out of the question for us to consider such a feature.
/cc @kir-titievsky
@jonparrott we had a similar issue reported in Node a while back and we received an OK from the PubSub team to nack() any overflow messages. Should we be removing this functionality to align with the Python client?
That is a great question for @kir-titievsky, I'll start an internal thread.
Thanks for your answer but the issue with the shared lock is that it doesn't prevent a worker from receiving several messages and keeping it until the end of the processing of the first one.
Is there any way to distribute 1000 messages to 1000 workers with PubSub so that each worker receive one message ?
My use case is that I have an embarrassingly parallel task, splittable into 1000 sub-tasks and each one consume the entire memory of my worker machine.
I have tried to nack() every undesirable message but it seems that other workers can't see them before the end of the process of the current message. That means that I cannot parallelize my task.
I see. We're chatting internally on what we want to do here.
Ok thanks.
@Etendard7 ... in the meantime, if you are able to receive messages using a POST endpoint, Pub/Sub's push subscribers will do exactly what you are asking for. What do you think?
Thanks.
I ended up using one subscription for every worker so that they all receive every message. I made them ignore the messages not addressed to them using a message attribute containing the target worker id.
It's not optimal but it works for me.
@Etendard7 glad you found a workaround.
After discussing internally, the behavior that the Python client does is preferred. If we were to nack overflow messages, we increase the likelihood of duplicate messages.
I'm closing this for now, but feel free to continue discussing.
For anyone who stumbles across this, this has been fixed here: https://github.com/googleapis/google-cloud-python/issues/7677
Most helpful comment
For anyone who stumbles across this, this has been fixed here: https://github.com/googleapis/google-cloud-python/issues/7677