python --versiongoogle-cloud-python version pip show google-cloud, pip show google-<service> or pip freeze
google-cloud-pubsub==0.28.4
grpcio==1.6.3
Stacktrace if available
import os
import sys
import logging
import traceback
import json
import time
import argparse
import grequests
import grpc
from item_store import ItemStore
from google.cloud.pubsub_v1.subscriber.policy.thread import Policy
from google.cloud import pubsub_v1
requests = ItemStore()
class OurPolicy(Policy):
"""
We occasionally see errors that google code doesn't
recover from, set a flag that let's the outer thread respond
by restarting the client.
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated
with (StatusCode.UNAVAILABLE, OS Error)>
"""
_exception_caught = None
def __init__(self, *args, **kws):
return super(OurPolicy, self).__init__(*args, **kws)
def on_exception(self, exc):
# If this is DEADLINE_EXCEEDED, then we want to retry by returning
# None instead of raise-ing
deadline_exceeded = grpc.StatusCode.DEADLINE_EXCEEDED
code_value = getattr(exc, 'code', lambda: None)()
if code_value == deadline_exceeded:
return
OurPolicy._exception_caught = exc
# will just raise exc
return super(OurPolicy, self).on_exception(exc)
class InvalidSchemaException(Exception):
pass
def log_unhandled_exception(type, value, traceback):
logger.error(type, value, traceback)
sys.excepthook = log_unhandled_exception
def send_to_data_insertion(message):
requests.add(grequests.post('http://%s:8080/url_here=%s' % (
address, token),
data=message.data.decode('utf-8')))
############### Subscriber logic here ####################
def receive_messages(project, subscription_name):
subscriber = pubsub_v1.SubscriberClient(policy_class=OurPolicy)
subscription_path = subscriber.subscription_path(
project, subscription_name)
def callback(message):
message.ack()
try:
send_to_data_insertion(message)
except InvalidSchemaException as e:
return
except Exception as e:
return
while live_forever:
if subscriber is None:
logger.warning('Starting pubsub subscriber client')
subscriber = pubsub_v1.SubscriberClient(policy_class=OurPolicy)
subscriber.subscribe(subscription_path).open(callback=callback)
try:
while True:
grequests.map(requests.getAll())
time.sleep(sleep_interval)
if OurPolicy._exception_caught:
exc = OurPolicy._exception_caught
OurPolicy._exception_caught = None
raise exc
except KeyboardInterrupt:
break
except Exception as e:
subscriber = None
# otherwise, sleep for one interval and exit
time.sleep(sleep_interval)
#####################################################
if __name__ == "__main__":
parser.add_argument('project', help='Google cloud project ID')
parser.add_argument('subscription', help="Google cloud subscription name")
args = parser.parse_args()
receive_messages(args.project, args.subscription)

Using GitHub flavored markdown can help make your request clearer.
See: https://guides.github.com/features/mastering-markdown/
Don't have any solutions for you, but I'm also trying out the new API and seeing similar issues.
I'm also seeing StatusCode.UNAVAILABLE killing the consumer thread. My approach was just to let the policy itself retry by returning None in on_exception, same as DEADLINE_EXCEEDED. I don't understand why this or something similar wouldn't be the case in the default policy, as occasional UNAVAILABLE responses seem expected and inevitable, and the default policy's approach is to let the thread die without any way to detect or recover it in the main thread.
In any case, that modification of on_exception definitely helps. I am still seeing a very large number of repeat messages, though I haven't done thorough enough testing to even pin it down to the library or my own code.
EDIT: I see there are several similar issues already opened. Between this, #4186, and #3886 upgrading Pub/Sub has been a real pain, but the old API holds back every other client :/
I think this issue is a duplicate of #4238, but leaving open just in case (and because this is a really useful bug report).
I think I have a handle on the root cause of this and am working on it this week. I actually got a fix in for the exception handling issue last week, but we have not released it to PyPI yet.
More info on this:
What it appears to do is drop the ack rate to between 0/s and 10/s while StreamingPullOperations climb into the thousands. It also has thousands of ModifyAckDeadline events. So it seems to keep repulling (?) the same events and extending their deadline so they don't get resent to other subscribers.
Any updates on this?
@eoltean Still working on it. :-/
What it appears to do is drop the ack rate to between 0/s and 10/s while StreamingPullOperations climb into the thousands. It also has thousands of ModifyAckDeadline events. So it seems to keep repulling (?) the same events and extending their deadline so they don't get resent to other subscribers.
Let me make sure I understand this correctly. You are saying that your acks cease going through (except in small amounts) while the modacks do?
Unless I misunderstand the graph, then yes that's what is going on. These are all the same time period. The slow burndown in unacked messages is the python client. When the unacked messages dropped suddenly, that is when we switched to the Go client for testing.
For most of the period of these graphs the ack rate is from 0 to 10/s with a few random spikes where it hit several hundred per second before dropping.


Ignore from-data-insertion-svc, that is another Topic/Sub using the same client code

Zoomed (two time periods, one with 0 acks, one with acks)


Could someone recommend a version of the pubsub client which doesn't have this bug? I've just started using pubsub and this bug makes it unusable. Thanks.
Note for those debugging this issue: I only see this problem on GKE and not when running locally.
This is happening both locally and on GKE for us
@adamlofts The last version that appears to work correctly was before the API rewrite. So the 0.27.x releases.
@anorth2 I've downgraded and the subscriber is working nicely. Thanks.
(Status update: Work still in progress trying to fix this.)
Any updates on this? Our team is trying to decide if the production version of our project can rely on the Python client or if we need to switch to Go/Java.
@lukesneeringer I see a couple of the other issues are being resolved do these effect this issue at all? Is there any other updates on this its been over 3 weeks since any update.
@eoltean I'll try to get to this soon. Hopefully it'll be reproducible (since @anorth2 provided so much code, it seems there are plenty of details).
We might be running into a related problem, we are currently rewriting the psq package to suit our needs and I have trouble with ack'ing messages after an exception occurred. Is the message somehow auto-nack'ed after an exception is thrown?
If I start a multi-process worker I can watch the task getting redelivered to the other processes just to also fail.
google-cloud==0.30.0
google-cloud-pubsub==0.29.2
@classmethod
def restore(cls, message):
"""Restore task from dumped data.
Args:
message (google.cloud.pubsub_v1.subscriber.message.Message):
Returns:
psq.task.Task: task instance for worker
"""
try:
# todo: psq: implement TaskRegistry and a sane json serialization to avoid import problems
task = unpickle(message.data) # type: Task
task.message = message
return task
except UnpickleError:
#
# import broken, it's dead, Jim.
#
message.ack() # <-- this should discard the poor message, shouldn't it?
logger.exception('Failed to unpickle task {}.'.format(message.message_id))
ERROR 2017-12-05 10:07:15,998 psq.task Failed to unpickle task 101585298217325.
Traceback (most recent call last):
File "/home/andre/Projects/psq/src/psq/utils.py", line 47, in unpickle
obj = loads(pickled_string)
AttributeError: Can't get attribute 'mark_done' on <module 'psq.tests.worker_test' from '/home/andre/Projects/psq/src/psq/tests/worker_test.py'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/andre/Projects/psq/src/psq/task.py", line 99, in restore
task = unpickle(message.data)
File "/home/andre/Projects/psq/src/psq/utils.py", line 49, in unpickle
raise UnpickleError('Could not unpickle', pickled_string, e)
psq.utils.UnpickleError: ('Could not unpickle', b'\x80\x04\x95\x96\x00\x00\x00\x00\x00\x00\x00\x8c\x08psq.task\x94\x8c\x04Task\x94\x93\x94)\x81\x94}\x94(\x8c\x06kwargs\x94}\x94\x8c\x01f\x94\x8c\x15psq.tests.worker_test\x94\x8c\tmark_done\x94\x93\x94\x8c\x06result\x94N\x8c\x04args\x94)\x8c\x02id\x94\x8c\x011\x94\x8c\x07retries\x94K\x00\x8c\x07message\x94N\x8c\x06status\x94\x8c\x06queued\x94ub.', AttributeError("Can't get attribute 'mark_done' on <module 'psq.tests.worker_test' from '/home/andre/Projects/psq/src/psq/tests/worker_test.py'>",))
psq hasn't yet been updated to use the latest pubsub package. In fact, the
latest pubsub package has many of the features that psq implements.
Also worth noting that psq isn't an official Google product, just an
open-source project.
On Tue, Dec 5, 2017, 1:34 AM André Cimander notifications@github.com
wrote:
We might be running into the same problem, we are currently rewriting the
psq package to suit our needs and I have trouble with ack'ing messages
after an exception occurred. Is the message somehow auto-nack'ed after an
exception is thrown?If I start a multi-process worker I can watch the task getting redelivered
to the other processes just to also fail.google-cloud==0.30.0
google-cloud-pubsub==0.29.2@classmethod def restore(cls, message): """Restore task from dumped data. Args: message (google.cloud.pubsub_v1.subscriber.message.Message): Returns: psq.task.Task: task instance for worker """ try: # todo: psq: implement TaskRegistry and a sane json serialization to avoid import problems task = unpickle(message.data) # type: Task task.message = message return task except UnpickleError: # # import broken, it's dead, Jim. # message.ack() # <-- this should discard the poor message, shouldn't it? logger.exception('Failed to unpickle task {}.'.format(message.message_id))ERROR 2017-12-05 10:07:15,998 psq.task Failed to unpickle task 101585298217325.
Traceback (most recent call last):
File "/home/andre/Projects/psq/src/psq/utils.py", line 47, in unpickle
obj = loads(pickled_string)
AttributeError: Can't get attribute 'mark_done' onDuring handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/andre/Projects/psq/src/psq/task.py", line 99, in restore
task = unpickle(message.data)
File "/home/andre/Projects/psq/src/psq/utils.py", line 49, in unpickle
raise UnpickleError('Could not unpickle', pickled_string, e)
psq.utils.UnpickleError: ('Could not unpickle', b'x80x04x95x96x00x00x00x00x00x00x00x8cx08psq.taskx94x8cx04Taskx94x93x94)x81x94}x94(x8cx06kwargsx94}x94x8cx01fx94x8cx15psq.tests.worker_testx94x8c\tmark_donex94x93x94x8cx06resultx94Nx8cx04argsx94)x8cx02idx94x8cx011x94x8cx07retriesx94Kx00x8cx07messagex94Nx8cx06statusx94x8cx06queuedx94ub.', AttributeError("Can't get attribute 'mark_done' on",)) —
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4274#issuecomment-349247670,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAPUcz_lIgZhuiTvcA4dPCWozWnT8wYrks5s9Q4UgaJpZM4QJjhU
.
Any updates on this?
I might be seeing this issue as well. I have a single subscriber listening to infrequent changes in a GCS bucket using a PubSub topic. My subscriber callback looks like this:
def add_to_queue(message):
message_queue.put(message)
message.ack()
Old messages keep getting delivered over and over, and eventually I think the un-ack()'d backlog blocks new messages from being processed.

I'm using pubsub 0.29.3, cloud 0.31.0, core 0.28.0, grpcio 1.7.3 and Python 3.6.3.
Edit: I rewrote my script using google-cloud-pubsub==0.27.0 as mentioned by @anorth2, and it works correctly. I'll also try the newly released 0.29.4 to see how it works.
Edit 2: 0.29.4 seemed to handle message pulling a bit better but not perfectly (once it hung on receiving messages, but they seemed to be acknowledged when I restarted the app). However, it exhibits the separate behavior of eventually consuming 100% CPU that I reported in #4563, so that forced me to use 0.27.0 anyway.
To provide an update, we're currently on 0.29.4 and seeing the same (if not worse) behavior as before. Hitting 14 GB of memory usage after about 10 minutes, only acking at around 5-10/s, 2000-7000+ streamingpulloperations/second:

@anorth2, there is a spinlock bug in gRPC that I think may be the culprit. A PR (https://github.com/grpc/grpc/pull/13665) is out as a potential fix and I am going to install grpcio from source based on this PR to see if it resolves the issue.
Can you run a strace on the impacted pthread? For example, when 25626 was my parent process:
$ ps auxw -L
USER PID LWP %CPU NLWP %MEM VSZ RSS TTY STAT START TIME COMMAND
...
${USER} 25626 25626 0.2 9 0.3 816352 53276 pts/2 Sl+ 20:30 0:13 .../bin/python no-messages-too/script.py
${USER} 25626 25631 0.1 9 0.3 816352 53276 pts/2 Sl+ 20:30 0:06 .../bin/python no-messages-too/script.py
${USER} 25626 25632 0.0 9 0.3 816352 53276 pts/2 Sl+ 20:30 0:00 .../bin/python no-messages-too/script.py
${USER} 25626 25639 0.1 9 0.3 816352 53276 pts/2 Sl+ 20:30 0:05 .../bin/python no-messages-too/script.py
${USER} 25626 25657 0.0 9 0.3 816352 53276 pts/2 Sl+ 20:30 0:00 .../bin/python no-messages-too/script.py
${USER} 25626 25658 0.0 9 0.3 816352 53276 pts/2 Sl+ 20:30 0:00 .../bin/python no-messages-too/script.py
${USER} 25626 25659 0.0 9 0.3 816352 53276 pts/2 Sl+ 20:30 0:00 .../bin/python no-messages-too/script.py
${USER} 25626 5033 99.1 9 0.3 816352 53276 pts/2 Rl+ 21:46 0:35 .../bin/python no-messages-too/script.py
${USER} 25626 5034 0.0 9 0.3 816352 53276 pts/2 Sl+ 21:46 0:00 .../bin/python no-messages-too/script.py
...
I identified 5033 as the runaway pthread and attaching to it shows the POLLIN/POLLHUP issue:
$ sudo strace -p 5033
...
poll([{fd=6, events=POLLIN}, {fd=7, events=0}, {fd=9, events=POLLIN}], 3, 190) = 1 ([{fd=7, revents=POLLHUP}])
clock_gettime(CLOCK_REALTIME, {1513230431, 254451018}) = 0
write(2, "D1213 21:47:11.254451018 5033"..., 78) = 78
clock_gettime(CLOCK_REALTIME, {1513230431, 254488163}) = 0
write(2, "D1213 21:47:11.254488163 5033"..., 96) = 96
clock_gettime(CLOCK_MONOTONIC, {596307, 373944159}) = 0
poll([{fd=6, events=POLLIN}, {fd=7, events=0}, {fd=9, events=POLLIN}], 3, 190) = 1 ([{fd=7, revents=POLLHUP}])
clock_gettime(CLOCK_REALTIME, {1513230431, 254563047}) = 0
write(2, "D1213 21:47:11.254563047 5033"..., 78) = 78
clock_gettime(CLOCK_REALTIME, {1513230431, 254597548}) = 0
write(2, "D1213 21:47:11.254597548 5033"..., 96) = 96
clock_gettime(CLOCK_MONOTONIC, {596307, 374077123}) = 0
poll([{fd=6, events=POLLIN}, {fd=7, events=0}, {fd=9, events=POLLIN}], 3, 190) = 1 ([{fd=7, revents=POLLHUP}])
clock_gettime(CLOCK_REALTIME, {1513230431, 254708226}) = 0
write(2, "D1213 21:47:11.254708226 5033"..., 78) = 78
clock_gettime(CLOCK_REALTIME, {1513230431, 254742499}) = 0
write(2, "D1213 21:47:11.254742499 5033"..., 96) = 96
clock_gettime(CLOCK_MONOTONIC, {596307, 374197185}) = 0
poll([{fd=6, events=POLLIN}, {fd=7, events=0}, {fd=9, events=POLLIN}], 3, 190) = 1 ([{fd=7, revents=POLLHUP}])
clock_gettime(CLOCK_REALTIME, {1513230431, 254828449}) = 0
write(2, "D1213 21:47:11.254828449 5033"..., 78) = 78
clock_gettime(CLOCK_REALTIME, {1513230431, 254868180}) = 0
write(2, "D1213 21:47:11.254868180 5033"..., 96) = 96
clock_gettime(CLOCK_MONOTONIC, {596307, 374335746}) = 0
...
The current example I have to reproduce the CPU spike usually takes an hour to appear, I'd love it if I could get a spike within 10 minutes so I could run a reproducible case more often. Ping me on Hangouts and maybe we can figure something out?
I have backported that fix to the v1.7.3 tag and will be installing from source and running it now to see if it solves the spinlock issue.
So I just ran my "do-nothing" reproducible case (thanks to @dmontag) that reliably thrashes the CPU after 65 minutes. I can confirm that after installing a custom grpcio with https://github.com/grpc/grpc/pull/13665, the thrashing goes away!
I have created a manylinux 64-bit wheel that includes the backport: https://github.com/dhermes/google-cloud-pubsub-performance/blob/master/grpcio-1.7.4.dev1-cp36-cp36m-manylinux1_x86_64.whl
Feel free to install but "buyer beware" this isn't a wheel endorsed by the grpcio folks. The wheel was created via:
$ docker run \
> --rm \
> --tty \
> --interactive \
> --volume $(pwd):/var/wheels/ \
> quay.io/pypa/manylinux1_x86_64:latest \
> /bin/bash
% cd tmp/
% git clone https://github.com/dhermes/grpc
% cd grpc
% git checkout 1.7.3-with-13665
% git submodule update --init
% /opt/python/cp36-cp36m/bin/python3.6 -m pip install --upgrade pip wheel
% /opt/python/cp36-cp36m/bin/python3.6 -m pip install --requirement requirements.txt
% export REPO_ROOT=$(pwd)
% export GRPC_PYTHON_BUILD_WITH_CYTHON=1
% /opt/python/cp36-cp36m/bin/python3.6 -m pip wheel . --wheel-dir $(pwd)
% auditwheel repair grpcio-1.7.4.dev1-cp36-cp36m-linux_x86_64.whl --wheel-dir /var/wheels/
All of these commands will work on your target machine except for auditwheel, but if you build a custom wheel on your target machine (i.e. a wheel not intended for "general distribution") then you don't need to use auditwheel. Also note you'll need to change the relevant Python path from /opt/python/cp36-cp36m/bin/python3.6.
@anorth2 Would you mind trying with the patched grpcio==1.7.4.dev1?
I'm closing this issue in favor of #4600. The original report "PubSub: Subscriber Client stops acking messages after several seconds" has been resolved by the addition of Consumer.pause() and Consumer.resume() in #4558.
@anorth2 Can we move future discussion there? I'll move my last pending question there for "completeness".
Most helpful comment
Any updates on this?