Contrary to what's in the reference, the correct return type for publish() should be google.cloud.pubsub_v1.publisher.futures.Future. I didn't find a reference for it but only some reference for google.cloud.pubsub_v1.subscriber.futures.StreamingPullFuture.
People won't be able to use of Python's concurrent library's wait() method on google.cloud.pubsub_v1.publisher.futures.Future. But our doc implies they can because we say the return type is concurrent.futures.Future.
from concurrent.futures import wait
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
# future has type `google.cloud.pubsub_v1.publisher.futures.Future`
future = publisher.publish('projects/{PROJECT_ID}/topics/{TOPIC_NAME}', data=b'rain')
# wait(fs, timeout=None, return_when='ALL_COMPLETED') expects a sequence of `concurrent.futures.Future`.
wait([future])
Here is the error:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python3.5/concurrent/futures/_base.py", line 257, in wait
with _AcquireFutures(fs):
File "/usr/lib/python3.5/concurrent/futures/_base.py", line 146, in __enter__
future._condition.acquire()
AttributeError: 'Future' object has no attribute '_condition'
I tried in both Python 2 and 3.
@theacodes can you comment?
Yeah we should update it. It should additionally say that it conforms to the concurrent.futures.Future interface and can be used in similar ways, but it not completely compatible with concurrent.futures tools such as wait().
Thanks! Listing a couple of places in the doc where publisherfuture is mentioned:
I have actually got another case where the returned future not being a concurrent.futures.Future hurts: asyncio in the standard library has some utilities for using blocking futures together with asyncio async futures, like asyncio.wrap_future, which check on isinstance(f, concurrent.futures.Future) to distinguish the future types.
I currently have to resort to monkeypatching the base classes list of google.cloud.pubsub_v1.publisher.futures.Future to make it pass the checks, and the method itself works fine. If actually using/inheriting from concurrent.futures.Future is not possible, can at least some utilities for working together with asyncio be provided together with the custom future implementation?
@himikof we can't inherit from concurent.futures.Future because it brings in a ton of stuff. I'm happy for us to add utilities to make working across this stuff possible. It would be relatively low priority for us right now, but we would more than welcome contributions to get it done sooner.
Thanks!
If you end up here trying to figure out how to make a PubSub future _act_ like a concurrent Future, hopefully this code snippet will help
if __name__ == "__main__":
asyncio.set_event_loop(uvloop.new_event_loop())
s1 = app.create_server(host="0.0.0.0", port=8090)
task = asyncio.ensure_future(s1)
print('creating subscription to pubsub')
pubsub_future = subscriber.subscribe(subscription_path, callback=callback)
# this is not really a future
# so monkey patch
pubsub_future._asyncio_future_blocking = True
pubsub_future.__class__._asyncio_future_blocking = True
s2 = asyncio.wrap_future(pubsub_future)
task2 = asyncio.ensure_future(s2)
try:
loop.run_forever()
except:
loop.stop()
If you end up here trying to figure out how to make a PubSub future _act_ like a concurrent Future, hopefully this code snippet will help
if __name__ == "__main__": asyncio.set_event_loop(uvloop.new_event_loop()) s1 = app.create_server(host="0.0.0.0", port=8090) task = asyncio.ensure_future(s1) print('creating subscription to pubsub') pubsub_future = subscriber.subscribe(subscription_path, callback=callback) # this is not really a future # so monkey patch pubsub_future._asyncio_future_blocking = True pubsub_future.__class__._asyncio_future_blocking = True s2 = asyncio.wrap_future(pubsub_future) task2 = asyncio.ensure_future(s2) try: loop.run_forever() except: loop.stop()
@sam-qordoba What is a loop in your example?
Ahh sorry, that was some code I had for using asyncio to run a Sanic web server and pubsub listener in the same process. The main part is
pubsub_future = subscriber.subscribe(
subscription_path,
callback=callback)
# Google implemented a custom, psuedo-future
# need monkey patch for it to work with asyncio
pubsub_future._asyncio_future_blocking = True
pubsub_future.__class__._asyncio_future_blocking = True
real_pubsub_future = asyncio.wrap_future(pubsub_future)
Most helpful comment
I have actually got another case where the returned future not being a
concurrent.futures.Futurehurts: asyncio in the standard library has some utilities for using blocking futures together with asyncio async futures, likeasyncio.wrap_future, which check onisinstance(f, concurrent.futures.Future)to distinguish the future types.I currently have to resort to monkeypatching the base classes list of
google.cloud.pubsub_v1.publisher.futures.Futureto make it pass the checks, and the method itself works fine. If actually using/inheriting fromconcurrent.futures.Futureis not possible, can at least some utilities for working together with asyncio be provided together with the custom future implementation?