Google-cloud-python: firestore: Implement Watch

Created on 1 Dec 2017  路  6Comments  路  Source: googleapis/google-cloud-python

The Firestore Python client should gain the ability to receive realtime updates as outlined in the specification document.

Please take a look at Node: https://github.com/googleapis/nodejs-firestore/blob/master/src/watch.js
and Java: https://github.com/GoogleCloudPlatform/google-cloud-java/pull/2665

feature request firestore

Most helpful comment

This is on the internal feature backlog.

All 6 comments

@schmidt-sebastian @dhermes I started working on some translation from js, but not really sure how everything interacts, and there are some event handlers that I'm not sure how to convert from js to python without knowing more of the code. But anyway, feel free to take a look and let me know if this is just way over my head or if you are willing to help me get through it. I also looked at the java code which is implemented a bit differently than js. Also this is just very preliminary work....

https://github.com/chemelnucfin/google-cloud-python/commit/57871e16c245bd358bfab33966b8679e63bcbdda

@dhermes
So it looks like a basic producer consumer model, unless I'm missing something. I start a queue when I start a watch and block until I get an item in the queue then return it.

But the question I have is how do I tell grpc to send the ListenResponse to the queue that I have? Also, would it be better to have one queue per Watch instance or one queue per system?

Also seems to me that threads would be ok and I don't need multiprocessing?

@chemelnucfin I'm not very familiar with the feature. To start, which RPC does watch correspond to? (The only BIDI RPCs are Listen and Write and it seems Listen is a closer synonym to Watch)

A basic implementation would like like this:

import threading
import uuid

from six.moves import queue as queue_mod

from google.cloud import firestore


STOP = uuid.uuid4()


def make_request_gen(queue):
    while True:
        request = queue.get()  # Blocks "forever"
        if request == STOP:
            break

        yield request


def do_watch(queue, firestore_api):
    request_gen = make_requent_gen(queue)
    response_gen = firestore_api.listen(request_gen)

    for response in response_gen:
        do_something(response)

    queue.put(STOP)


client = firestore.Client()
firestore_api = client._firestore_api

queue = queue_mod.Queue()
thread = threading.Thread(
    target=do_watch, args=(queue, firestore_api), name='Meh')
thread.start()

Also seems to me that threads would be ok and I don't need multiprocessing?

I would leave it open so that users can specify which type of concurrency they want (with the above that would mean swapping out the queue and the thread for other primitives)

But the question I have is how do I tell grpc to send the ListenResponse to the queue that I have?

The

Also, would it be better to have one queue per Watch instance or one queue per system?

The queue is for your requests. It seems in many cases you'd only ever send a single initial ListenRequest so the queue is just a way for you to send STOP (which is actually necessary).

I don't think you'd need a queue for the ListenResponse messages that come in via response_gen, you just need a way to process them (i.e. do_something). But that is me speaking from ignorance of how the "watch" feature is supposed to work.

Yes, it corresponds to listen.

So basically if I do the listen(request_gen), then grpc will send results into the queue whenever a change happens. ok, that's what I needed to know.

thanks.

.then grpc will send results into the queue whenever a change happens

No. The queue is owned by the caller to send requests. The only place ListenResponse-s get sent is into the response_gen iterator (it is an instance of the _Rendezvous god-class).

If you'd like to put them in a queue, that's where do_something comes in.

This is on the internal feature backlog.

Was this page helpful?
0 / 5 - 0 ratings