Often, the publisher client stops working but without surfacing the stacktrace that never reaches our code. The result is the application hanging without failing any healthcheck. For now we had to set up an alert for when we see the log in stackdriver but that is really bad.
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/site-packages/google/cloud/pubsub_v1/publisher/_batch/thread.py", line 259, in monitor
return self._commit()
File "/usr/lib/python2.7/site-packages/google/cloud/pubsub_v1/publisher/_batch/thread.py", line 207, in _commit
self._messages,
File "/usr/lib/python2.7/site-packages/google/cloud/pubsub_v1/gapic/publisher_client.py", line 398, in publish
request, retry=retry, timeout=timeout, metadata=metadata)
File "/usr/lib/python2.7/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__
return wrapped_func(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/google/api_core/retry.py", line 270, in retry_wrapped_func
on_error=on_error,
File "/usr/lib/python2.7/site-packages/google/api_core/retry.py", line 199, in retry_target
last_exc,
File "/usr/lib/python2.7/site-packages/six.py", line 737, in raise_from
raise value
RetryError: Deadline of 600.0s exceeded while calling <functools.partial object at 0x7f34dc9c9838>, last exception: 503 Connect Failed
Alpine 3.8
python --version2.7.14
bernhard==0.2.6
boto==2.27.0
cachetools==2.1.0
certifi==2019.3.9
chardet==3.0.4
click-replayer==0.0.1
configobj==4.7.2
debugtrace==0.0.1
enum34==1.1.6
funcsigs==1.0.2
futures==3.2.0
google-api-core==1.9.0
google-auth==1.6.3
google-cloud-pubsub==0.38.0
googleapis-common-protos==1.5.9
grpc-google-iam-v1==0.11.4
grpcio==1.20.0
idna==2.8
meld3==1.0.2
mock==2.0.0
MySQL-python==1.2.5
pbr==5.1.3
protobuf==3.7.1
py==1.8.0
pyasn1==0.4.5
pyasn1-modules==0.2.4
pygeoip==0.2.7
pyparsing==2.2.0
pytest==3.2.2
python-dateutil==2.2
pytz==2019.1
raven==4.0.4
redirecting==0.0.1
redis==2.10.1
requests==2.21.0
rsa==4.0
six==1.12.0
skimpubsub==1.1.4
SQLAlchemy==1.1.15
supervisor==3.2.4
ua-parser==0.8.0
ujson==1.35
urllib3==1.24.2
user-agents==1.1.0
uWSGI==2.0.14
Werkzeug==0.9.6
It mostly happens in one particular region asia-southeast1-b but in general it happens in all the regions (US, EU, etc..).
@jam182 Thank you for reporting this. From the error description it seems that this is the same issue as #7709 ?
Edit: Sorry, this is the publisher client, not subscriber. But the symptom seems similar, and both issues might have the same underlying cause.
@jam182 Just as a sanity check, how does your publisher client code look like? Does it use the Future object returned by the publisher.publish() call and interact with it?
Something like the following:
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")
while True:
msg = b"some message"
future = publisher.publish(topic_path, msg)
try:
result = future.result()
except Exception as ex:
# handle exception
time.sleep(3)
I was able to replicate the reported behavior (an error not surfacing to the publisher code) if I faked an error in the underlying channel, and also ignored the future object returned by the publish.publish() call. More specifically, if the client script never called future.result().
I also tried an alternative approach that uses future.add_cone_callback():
def my_callback(future):
result = future.result()
# do something with result
...
while True:
...
future = publisher.publish(topic_path, msg)
future.add_done_callback(my_callback)
...
In this case, the exception occurred in the callback (in future.result() line), but the main thread nevertheless kept running, because the callback is invoked in the background by another thread, and the client script code never terminated despite the error.
Could any of these two scenarios be applicable to your case?
I'd say it's a combination of both.
we publish a message and delegate a different thread to the double checking of all the futures, just in case pubsub ever goes down.
the code would look something like this:
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)
events = Queue(maxsize=100000)
def check_response():
while True:
logger.debug('Checking the queue')
message = events.get(block=True)
logger.debug('Checking the message')
_callback = partial(callback, message)
message.response.add_done_callback(_callback)
def callback(message, future):
try:
logger.debug('Callback is waiting for the future')
message_id = future.result(timeout=2)
logger.debug('Future resolved: %s', message_id)
except Exception:
logger.exception('Failed to publish message. Retry.')
message.retry()
class Message(object):
"""This represents one item in the events queue."""
__slots__ = ('response', 'publish_func', 'record')
def __init__(self, response, publish_func, record):
self.response = response
self.publish_func = publish_func
self.record = record
def retry(self):
self.response = self.publish_func(self.record)
queue_size = events.qsize()
try:
events.put_nowait(self)
except Full:
logger.error('Events queue is full. Messages are not being checked anymore.')
class PubSubLogger(object):
def __init__(self, config):
self.pub_sub_client = PubSubTopicClient(config['topic_name']) # this returns the google publisher client object to always publishes on a specific project/topic
Thread(target=check_response).start()
def log_event(event):
try:
events.put_nowait(
Message(
self.pub_sub_client.publish_message(event),
self.pub_sub_client.publish_message,
event,
)
)
except Full:
self.logger.error('Clicks queue is full. Messages are not being checked anymore.')
we don't really see the Events queue is full log message anyway.
The other thing I should mention probably is that the above code is running in a uwsgi MULE worker.
UPDATE: in the pip freeze I posted there is google-cloud-pubsub==0.38.0 but I just noticed that there is 0.40.0 out which has #7071
I updated our pubsub version to 0.40.0 and I actaully got the RetryError exception propagated this time.
I think the way you reproduced the error was without calling the future.result() but I believe making that call is a good enough way to propagate the exception?
The only concerns is, in our retry logic we reuse the same client object in case of failure to issue a new publish() call. Is the client going to try to reconnect or it will fail every time for now on? My guess is, it shouldn't fail because I can see in the logs that a message had to be retried but the retry queue never got full. it means that eventually the message was sent. We sort of want the thread not to die but to keep checking and retry-ing to send messages.
@jam182 Glad to hear that updating to 0.40.0 works now! :+1: (that was the version I tested with).
I think the way you reproduced the error was without calling the future.result() but I believe making that call is a good enough way to propagate the exception?
Indeed, as the only way I managed to reproduce the bug was to not call future.result() anywhere. I basically just wanted to check that the publisher client was not used incorrectly, e.g. like below:
try
client.publish(...)
except Exception as exc:
# does not work this way...
The only concerns is, in our retry logic we reuse the same client object in case of failure to issue a new publish() call. Is the client going to try to reconnect or it will fail every time for now on?
If repeating the call with the same client instance, the same underlying channel will be used (the channel does not get recreated on additional publish() calls). It is thus likely that the error will persist, and the client will enter another retry cycle - and eventually succeed, or end up raising another RetryError after a timeout.
FWIW, a RetryError indicates that the underlying error was transient and was expected to eventually sort itself out, but retrying took too long. Re-issuing a publish request with the came client would therefore mean "retrying the retry", and would have a chance to succeed eventually IMO.
On the other hand, if a non-retryable error occurs, the underlying machinery will not attempt to retry the request, and a different exception will be propagated up to the custom script. "Manually" retrying on such errors is probably futile.
Closing this, as it has been resolved by upgrading pubsub to the latest version.
@jam182 Thank you for the effort on your side, too, and for providing all the details. Should the issue re-emerge, however, feel free to come back and re-open it. Thanks again!