The Firestore Python client should gain the ability to receive realtime updates as outlined in the specification document.
Please take a look at Node: https://github.com/googleapis/nodejs-firestore/blob/master/src/watch.js
and Java: https://github.com/GoogleCloudPlatform/google-cloud-java/pull/2665
@schmidt-sebastian @dhermes I started working on some translation from js, but not really sure how everything interacts, and there are some event handlers that I'm not sure how to convert from js to python without knowing more of the code. But anyway, feel free to take a look and let me know if this is just way over my head or if you are willing to help me get through it. I also looked at the java code which is implemented a bit differently than js. Also this is just very preliminary work....
https://github.com/chemelnucfin/google-cloud-python/commit/57871e16c245bd358bfab33966b8679e63bcbdda
@dhermes
So it looks like a basic producer consumer model, unless I'm missing something. I start a queue when I start a watch and block until I get an item in the queue then return it.
But the question I have is how do I tell grpc to send the ListenResponse to the queue that I have? Also, would it be better to have one queue per Watch instance or one queue per system?
Also seems to me that threads would be ok and I don't need multiprocessing?
@chemelnucfin I'm not very familiar with the feature. To start, which RPC does watch correspond to? (The only BIDI RPCs are Listen and Write and it seems Listen is a closer synonym to Watch)
A basic implementation would like like this:
import threading
import uuid
from six.moves import queue as queue_mod
from google.cloud import firestore
STOP = uuid.uuid4()
def make_request_gen(queue):
while True:
request = queue.get() # Blocks "forever"
if request == STOP:
break
yield request
def do_watch(queue, firestore_api):
request_gen = make_requent_gen(queue)
response_gen = firestore_api.listen(request_gen)
for response in response_gen:
do_something(response)
queue.put(STOP)
client = firestore.Client()
firestore_api = client._firestore_api
queue = queue_mod.Queue()
thread = threading.Thread(
target=do_watch, args=(queue, firestore_api), name='Meh')
thread.start()
Also seems to me that threads would be ok and I don't need multiprocessing?
I would leave it open so that users can specify which type of concurrency they want (with the above that would mean swapping out the queue and the thread for other primitives)
But the question I have is how do I tell grpc to send the ListenResponse to the queue that I have?
The
Also, would it be better to have one queue per Watch instance or one queue per system?
The queue is for your requests. It seems in many cases you'd only ever send a single initial ListenRequest so the queue is just a way for you to send STOP (which is actually necessary).
I don't think you'd need a queue for the ListenResponse messages that come in via response_gen, you just need a way to process them (i.e. do_something). But that is me speaking from ignorance of how the "watch" feature is supposed to work.
Yes, it corresponds to listen.
So basically if I do the listen(request_gen), then grpc will send results into the queue whenever a change happens. ok, that's what I needed to know.
thanks.
.then grpc will send results into the queue whenever a change happens
No. The queue is owned by the caller to send requests. The only place ListenResponse-s get sent is into the response_gen iterator (it is an instance of the _Rendezvous god-class).
If you'd like to put them in a queue, that's where do_something comes in.
This is on the internal feature backlog.
Most helpful comment
This is on the internal feature backlog.