Google-cloud-python: pubsub - modify_ack_deadline does not work, message gets redelivered after 10 minutes always

Created on 21 Dec 2017  Â·  27Comments  Â·  Source: googleapis/google-cloud-python

  1. Specify the API at the beginning of the title (for example, "BigQuery: ...")
    General, Core, and Other are also allowed as types
    google-cloud-pubsub - 0.29.2
  2. OS type and version
    Linux 4.4.0-104-generic #127-Ubuntu SMP Mon Dec 11 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
  3. Python version and virtual environment information python --version
    Python 2.7.12
  4. google-cloud-python version pip show google-cloud, pip show google-<service> or pip freeze
    google-cloud: 0.30.0
    google-cloud-pubsub - 0.29.2
  5. Stacktrace if available
  6. Steps to reproduce
    I need to have very long ack deadlines since our consumer process could be running for long time per message. No matter what I set as ack_deadline_seconds in subscription definition, or call subscriber.modify_ack_deadline every 1 minute, the message gets redelivered in exact ten minutes.
  7. Code example

I have a test case with a simple subscriber::

subscriber.create_subscription(sname, tname, ack_deadline_seconds=120)
subscription = subscriber.subscribe(
        sname #, flow_control=flow_control
 )
future = subscription.open(callback)

def callback(pubsub_message):
    logging.warn("Received message: %s %s at %s", pubsub_message.message_id, pubsub_message.data, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    i = 0
    while i < 480:
        i = i + 1
        time.sleep(60)
        subscriber.modify_ack_deadline(sname, [pubsub_message._ack_id], 120)
        logging.warn("Renewed: %s %s %s at %s", i, pubsub_message.message_id, pubsub_message.data, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

    logging.warn("Ack: %s %s at %s", pubsub_message.message_id, pubsub_message.data, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    pubsub_message.ack()

enabling DEBUG logging, I see ack messages every few seconds
DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:The current p99 value is 10 seconds.
DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Renewing lease for 5 ack IDs.
DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Snoozing lease management for 7.362112 seconds.
DEBUG:google.cloud.pubsub_v1.subscriber._consumer:Sending request:

but message gets redelivered in 10 minutes.

Using GitHub flavored markdown can help make your request clearer.
See: https://guides.github.com/features/mastering-markdown/

bug pubsub p1 triaged for GA

All 27 comments

@sachin-shetty Could you try with google-cloud-python==0.30.0. There were a few bugs in flow control fixed between 0.29.2 and 0.29.4 (though it doesn't seem like that would be causing the issue you describe.

Also, I'm not 100% sure how the line

subscriber.modify_ack_deadline(sname, [pubsub_message._ack_id], 120)

will interact with the lease management thread (i.e. the one that is logging "Renewing lease for ...") but that seems to be the problem.

The "expected" usage pattern would be that you assume your callback only need the state of the Message it gets:

pubsub_message.modify_ack_deadline(120.0)

Hi @dhermes

Tried new version as well as pubsub_message.modify_ack_deadline. Same result. Message gets re-delivered in exactly 10 minutes even after calling pubsub_message.modify_ack_deadline(120) every minute.

and even though pubsub_message.modify_ack_deadline(120) is called, debug logs still show 10s

DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:The current p99 value is 10 seconds.
DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Renewing lease for 4 ack IDs.
DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Snoozing lease management for 8.668086 seconds.
DEBUG:google.cloud.pubsub_v1.subscriber._consumer:Sending request:
modify_deadline_seconds: 10
modify_deadline_seconds: 10
modify_deadline_seconds: 10
modify_deadline_seconds: 10
modify_deadline_ack_ids: "fjY5RUFeQBJMPQxESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIAIG8QLUJaGmJoXFx1B1ALGXV6MCFrU0cACERZfndrOTNpWF9xAFQEHnR7YHRjWjvkncSLwfJoZh89WxJLLD4"
modify_deadline_ack_ids: "fjY5RUFeQBJMPQxESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIAIG8QLUJaGmJoXFx1B1ALGXV6MCFrU0cACEVZfndrOTNpWF9xDlAEGnV9aXxpUjvkncSLwfJoZh89WxJLLD4"
modify_deadline_ack_ids: "fjY5RUFeQBJMPQxESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIAIG8QLUJaGGJoXFx1B1ALGXV6MHdjDhUBCERZfndrOTNpWF9wA1UKHXt8Yn1tXzvkncSLwfJoZh89WxJLLD4"
modify_deadline_ack_ids: "fjY5RUFeQBJMPQxESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIAIG8QLUJaGGJoXFx1B1ALGXV6MHdjDhUBCEVZfndrOTNpWF9xAVELGXB1Z3RvUjvkncSLwfJoZh89WxJLLD4"

I tried commenting _start_lease_worker() in subscriber/policy/thread.py, that stopped the lease worker from forcing a 10s ack deadline.

but the message sent to server by the subscriber.modify_ack_deadline(sname, [pubsub_message._ack_id], 120) still shows as 10s in the debug logs. I see that 10s is hardcoded in base.py

Is there a version I can downgrade to to get this to work, I need longer message ack time.

I'm fairly certain this is a bug. The lease maintenance thread is trying to make sure a deadline gets extended for any messages that the subscriber is still responsible for, but in doing this extension it actually reduces the lease time for your message (this is a mistake in your and my opinion).

There is no equivalent high-throughput / highly concurrent implementation you can downgrade to. Here is every Pub / Sub release. Version 0.28.0 was when the rewrite happened, so downgrading to 0.27.0 or earlier you won't be subject to this bug, but the library was totally different at that point. (We don't have any published docs for that version of the library, but I'd be happy to work with you to show you how to build the docs locally.)

Thankyou @dhermes. Do you think a potential fix would be available anytime soon? I am trying to figure out if I should build application level logic to handle this or wait for a fix.

@explicitcall Thanks for the link, but be aware that the temp is not a typo in that domain (it is a staging domain used by @jonparrott).

@dhermes are there any plans to host documentation for multiple versions on the official docs site? That would be greatly appreciated.

@explicitcall No concrete plans, but I have ideas in my head once we can split google-cloud-python into N repositories (one for each subpackage). It's a difficult task right now that would require a HUGE investment of time making modifications to the RTD Sphinx theme.

Blocked by #4325?

@sachin-shetty This is a known server-side bug. There are two that have rolled out server side this week. You should see a lot less of this issue, but it might recur to an extent. There is a relatively straightforward workaround -- let me know if things are not any better for you (literally) starting today. Thanks for your patience!

@dhermes Let's keep this one open since if we don't solve this completely server-side, we'll need to address the use case some other way -- e.g. through documentation on explicit limits and alternatives.

Also: the receipt acks bug/FR will make this worse. So we may need to fix that to call this fixed.

I am still not able to the extend an acknowledgement using the google-cloud-pubsub==0.30.1 library. Can you please recommend a workaround?

@kir-titievsky . I tried with 0.30.1, still cannot get the message to be renewed after 10 minutes, irrespective of any number of callbacks to modify_ack_deadline.

My test script:

import google.cloud.pubsub_v1 as pubsub
import google.cloud.pubsub_v1.subscriber.policy.thread as pubsub_thread
from google.cloud import exceptions
from google.oauth2 import service_account
import time
import json
import logging,logging.handlers
from datetime import datetime
import sys,os
import ffmpeg_transcode
import utils
import shutil
from ConfigParser import ConfigParser
import re
from timeit import default_timer as timer
from logging.handlers import TimedRotatingFileHandler
import thread, threading
from concurrent.futures import ThreadPoolExecutor

subscriber = pubsub.SubscriberClient()
sname = 'projects/<project-name>/subscriptions/ack-test-topic-subscribe'
#logging.getLogger("google.cloud").setLevel(logging.DEBUG)

def create_topic(project_id, topic_name):
    publisher = pubsub.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)

    try:
        topic = publisher.create_topic(topic_path)
        logging.warn("Topic created: %s", topic)
    except exceptions.Conflict as e:
        logging.warn("Topic %s exists: %s", topic_path, repr(e))
    publisher.publish(topic_path, 'Published at: %s ' % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

def callback(pubsub_message):
    logging.warn("Received message: %s %s at %s", pubsub_message.message_id, pubsub_message.data, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    i = 0
    #pubsub_message.ack()
    while i < 480:
        i = i + 1
        time.sleep(60)
        subscriber.modify_ack_deadline(sname, [pubsub_message._ack_id], 120)
        pubsub_message.modify_ack_deadline(120)
        logging.warn("Renewed: %s %s %s at %s", i, pubsub_message.message_id, pubsub_message.data, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

    logging.warn("Ack: %s %s at %s", pubsub_message.message_id, pubsub_message.data, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    pubsub_message.ack()

if __name__ == '__main__':
    tname = 'projects/<project-name>/topics/ack-test-topic'
    create_topic('<project-name>', 'ack-test-topic')
    try:
        subscriber.create_subscription(sname, tname, ack_deadline_seconds=120)
    except exceptions.Conflict as e:
        logging.warn("Subscriber %s exists: %s", sname, repr(e))

    flow_control = pubsub.types.FlowControl(max_messages=10)
    global subscription
    subscription = subscriber.subscribe(
        sname #, flow_control=flow_control
    )
    future = subscription.open(callback)
    while True:
        try:
            logging.warn("Waiting for messages on %s", sname)
            future.result()
            time.sleep(10)
        except Exception as ex:
            logging.exception("Error in blocked result")
            subscription.close()
            future = subscription.open(callback)
            logging.warn("Opened again: %s %s", tname, sname)

Output:

WARNING:root:Received message: 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:46:36
WARNING:root:Renewed: 1 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:47:36
WARNING:root:Renewed: 2 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:48:36
WARNING:root:Renewed: 3 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:49:37
WARNING:root:Renewed: 4 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:50:37
WARNING:root:Renewed: 5 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:51:37
WARNING:root:Renewed: 6 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:52:37
WARNING:root:Renewed: 7 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:53:38
WARNING:root:Renewed: 8 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:54:38
WARNING:root:Renewed: 9 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:55:39
WARNING:root:Received message: 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:56:35

Message gets redelivered at 10th minute.

Please let me know of any workaround, this is a blocker for us.

Hi - @kir-titievsky - can you please let me know the workaround?

Sachin, The code below worked for me with no duplicates. I have a vague subscription that your error handling at the very bottom might be to blame for your results: the underlying code occasionally surfaces exceptions that come from closed connections. The code will rebuild the connection automatically. You, however, close the client and rebuild it -- which stops all renewals. Try catching only specific exception types maybe?

import google.cloud.pubsub_v1 as pubsub
from google.cloud import exceptions
import time
import logging,logging.handlers
from datetime import datetime
import sys,os

subscriber = pubsub.SubscriberClient()
sname = 'projects/google.com:kir-learns-cloud/subscriptions/sachin'

def timestamp():
  return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

def create_topic(project_id, topic_name):
    publisher = pubsub.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)
    publish_future = publisher.publish(topic_path, 'Published at: %s ' % timestamp())
    # wait for the publish to succeed
    logging.warn("PUB : %s at %s", publish_future.result(), timestamp())

def callback(pubsub_message):
    logging.warn("PULL: %s %s at %s", pubsub_message.message_id, pubsub_message.data, timestamp())
    time.sleep(60*12)
    logging.warn("ACK : %s %s at %s", pubsub_message.message_id, pubsub_message.data, timestamp())
    pubsub_message.ack()

if __name__ == '__main__':
    project = 'google.com:kir-learns-cloud'
    create_topic(project, 'sachin')
    subscriber = pubsub.SubscriberClient()
    subscriber.subscribe("projects/%s/subscriptions/%s"%(project, "sachin"), callback=callback)
    while True:
        time.sleep(10)                      

@kir-titievsky - Sorry no luck, with your script, I get duplicates at 10 minutes

WARNING:root:PUB : 27133513131810 at 2018-01-24 11:06:50
WARNING:root:PULL: 27133513131810 Published at: 2018-01-24 11:06:50 at 2018-01-24 11:06:51
WARNING:root:PULL: 27133513131810 Published at: 2018-01-24 11:06:50 at 2018-01-24 11:16:49
WARNING:root:ACK : 27133513131810 Published at: 2018-01-24 11:06:50 at 2018-01-24 11:18:51

I only changed the project and topic in your script.

< sname = 'projects/google.com:kir-learns-cloud/subscriptions/sachin'
---
> #sname = 'projects/google.com:kir-learns-cloud/subscriptions/sachin'
> sname = 'projects/audit-bq/subscriptions/ack-test-topic-subscribe'
> tname = 'projects/audit-bq/topics/ack-test-topic'
28,29c30,31
<     project = 'google.com:kir-learns-cloud'
<     create_topic(project, 'sachin')
---
>     project = 'audit-bq'
>     create_topic(project, 'ack-test-topic')
31c33
<     subscriber.subscribe("projects/%s/subscriptions/%s"%(project, "sachin"), callback=callback)
---
>     subscriber.subscribe(sname, callback=callback)

I also tried this from a US server to see if that makes any difference, no luck.

@SeanMaday - can you please try with the script provided by @kir-titievsky since you are also hitting the same issue.

This could be a fluke, so worth trying with a few messages. If that does
not work, you can try doing this with an older version of the client
library [0] which uses a polling pull method. You would do something like:

Let me know if and how this works out for you. And thanks for your patience.


import multiprocessing
from google.cloud import pubsub

def run(topic_name='parser-jobs'):
"""Receives a message from a pull subscription."""
pubsub_client = pubsub.Client()
topic = pubsub_client.topic(topic_name)
subscription = topic.subscription(topic_name)

results = subscription.pull(return_immediately=False, max_messages=1)
ack_id, message = results.pop()
file_name = message.data.strip('"').strip()
if not (file_name.startswith('gs://') and file_name.endswith('.gz')):
    subscription.acknowledge((ack_id,))
    log('Skipping following file because it does not look like a GCS

URL ' + message.data)
return 1
log('Starting work on file %s' % file_name)
parse_process = multiprocessing.Process(target=parse, args=(file_name,))
parse_process.start()

while parse_process.is_alive():
    time.sleep(CHECK_INTERVAL_SEC)
    subscription.modify_ack_deadline((ack_id,), 2 * CHECK_INTERVAL_SEC)

subscription.acknowledge((ack_id,))
return 0

[0] https://pypi.org/project/google-cloud-pubsub/0.27.0/ . Don't remember
which pip or easyinstall command let's you specify the version of the top
of my head unfortunately.
You can build similar logic with the API client library
https://developers.google.com/api-client-library/python/apis/pubsub/v1
which have a lower level API, but are maintained so should not suffer from
version conflicts (if you run into those).

On Wed, Jan 24, 2018 at 6:32 AM, Sachin Shetty notifications@github.com
wrote:

@kir-titievsky https://github.com/kir-titievsky - Sorry no luck, with
your script, I get duplicates at 10 minutes

WARNING:root:PUB : 27133513131810 at 2018-01-24 11:06:50
WARNING:root:PULL: 27133513131810 Published at: 2018-01-24 11:06:50 at
2018-01-24 11:06:51
WARNING:root:PULL: 27133513131810 Published at: 2018-01-24 11:06:50 at
2018-01-24 11:16:49
WARNING:root:ACK : 27133513131810 Published at: 2018-01-24 11:06:50 at
2018-01-24 11:18:51

I only changed the project and topic in your script.

< sname = 'projects/google.com:kir-learns-cloud/subscriptions/sachin'

sname = 'projects/google.com:kir-learns-cloud/subscriptions/sachin'

sname = 'projects/audit-bq/subscriptions/ack-test-topic-subscribe'
tname = 'projects/audit-bq/topics/ack-test-topic'
28,29c30,31
< project = 'google.com:kir-learns-cloud'

< create_topic(project, 'sachin')

project = 'audit-bq'
create_topic(project, 'ack-test-topic')

31c33

< subscriber.subscribe("projects/%s/subscriptions/%s"%(project, "sachin"), callback=callback)

subscriber.subscribe(sname, callback=callback)

I also tried this from a US server to see if that makes any difference, no
luck.

@SeanMaday https://github.com/seanmaday - can you please try with the
script provided by @kir-titievsky https://github.com/kir-titievsky
since you are also hitting the same issue.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4648#issuecomment-360103263,
or mute the thread
https://github.com/notifications/unsubscribe-auth/ARrMFoRc6JBOLKXHIz3e1yVkryUr4MoYks5tNxTmgaJpZM4RJV0p
.

--
Kir Titievsky | Product Manager | Google Cloud Pub/Sub
https://cloud.google.com/pubsub/overview

Thankyou @kir-titievsky. I tested 0.27 and it looks better, I am able to hold on to a message longer as long as I keep renewing it.

BTW, even nack does not work in 0.31, I fetched a messages, nacked it at 8th minute, the message got immediately redelivered - which is good, but it again got redelivered at 10th minute - so something is really funky about 10 minutes in 0.31.

@sachin-shetty Sachin, 10 minutes is a magical number in that it's the maximum duration for which you can modifyAckDeadline. To keep the messages longer, you have to repeatedly call modifyAckDeadline. Were you doing that?

@dhermes Is there something that the client library does at 10 min intervals?

Yes, I tried modifyAckDeadline on both message and subscription

I have switched to 0.27 which is working well for me.

@kir-titievsky the pydoc says "This is not an extension", which is quite deceiving if this function can be called to indeed "extend" the deadline.

Juan, fair point. Any interest in submitting a small PR to correct this?

On Thu, Feb 1, 2018 at 1:14 PM Juan M Uys notifications@github.com wrote:

@kir-titievsky https://github.com/kir-titievsky the pydoc
https://github.com/GoogleCloudPlatform/google-cloud-python/blob/8616c45291e999ad83b8b45fe16d990f73cbed0f/pubsub/google/cloud/pubsub_v1/subscriber/message.py#L233-L236
says "This is not an extension", which is quite deceiving if this function
can be called to indeed "extend" the deadline.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4648#issuecomment-362354195,
or mute the thread
https://github.com/notifications/unsubscribe-auth/ARrMFs2tL6yoP5J0fxfjXbfDn-Z9ZVk9ks5tQf8cgaJpZM4RJV0p
.

--
Kir Titievsky | Product Manager | Google Cloud Pub/Sub
https://cloud.google.com/pubsub/overview

~@opyate How would you prefer that to be written? (Disclaimer: I wrote the original statement, and am happy to modify to make it clearer, but I could not think of a clearer-cut way to write it.)~

Nevermind. You sent a PR already.

Given that #4822 is merged, what work remains for this issue?

I think this is fine to close. We can always re-open or file a new issue.

Was this page helpful?
0 / 5 - 0 ratings