Distributed: Trigger callback on dask.distributed.Variable.set

Created on 1 Nov 2019  Â·  5Comments  Â·  Source: dask/distributed

Hello, I'm cross posting this from https://stackoverflow.com/questions/58648497/trigger-callback-on-dask-distributed-variable-set for better visibility, but please close this issue here if it is more appropriate for discussion on stackover flow.

I am working on a python app that requires data synchronization between a remote location (like on ec2) and a local location (like my laptop). Because some of the data involves very large arrays and I'm already using dask successfully in other places in the app, I'd like to use dask for this process.

I'm currently trying to use a dask.distributed.Variable to sync the data - either scattering the data first if it is an array - just setting it directly. I'm then able to run a get command to get the data, but what I'd really like to be possible is to connect a callback that will run on my local location whenever data is set on the remote location or visa-versa.

For example something like:

# On remote
from dask.distributed import LocalCluster, Client, Variable

cluster = LocalCluster()
client = Client(cluster)

# Define variable
aa = Variable('a')
# On local
from dask.distributed import Client, Variable
client = Client(cluster.scheduler.address)

# Define variable
aa = Variable('a')

# Define my custom callback
def my_callback():
    print(aa.get())

# Do some connection magic ?????

and then if

# On remote
aa.set(1)

Trigger my callback on my local.

Is such a paradigm possible here?

I've also been exploring the Pub / Sub model as described here https://docs.dask.org/en/latest/futures.html#publish-subscribe but I'm still struggling to get it all working, and I don't understand how to use await and async but they might be important here

Most helpful comment

As you suspect, I think that you should probably use Pub/Sub here and learn a little bit about async/await.

Here is a small example:

# Process one
from dask.distributed import Client, LocalCluster, Pub, Sub
cluster = LocalCluster(n_workers=0)  # Make a scheduler, but we don't need any workers for this
print(cluster.scheduler.address)
client = Client(cluster)
pub = Pub("x")
# Process 2
import asyncio
from dask.distributed import Client, LocalCluster, Pub, Sub

async def f():
    async with Client("tcp://127.0.0.1:58887", asynchronous=True) as client:
        sub = Sub("x")
        while True:
            value = await sub.get()
            print(value)

asyncio.get_event_loop().run_until_complete(f())
# Process one again

pub.put(123)
import numpy as np
x = np.arange(5)
pub.put(x)
# Output on 2
123
[0 1 2 3 4]

You can run many of these coroutines at once. Your asyncio event loop could run in a separate thread, or it may be that you already have an event loop running somewhere for your other application, and you can add these directly in there (this is greatly preferred).

All 5 comments

As you suspect, I think that you should probably use Pub/Sub here and learn a little bit about async/await.

Here is a small example:

# Process one
from dask.distributed import Client, LocalCluster, Pub, Sub
cluster = LocalCluster(n_workers=0)  # Make a scheduler, but we don't need any workers for this
print(cluster.scheduler.address)
client = Client(cluster)
pub = Pub("x")
# Process 2
import asyncio
from dask.distributed import Client, LocalCluster, Pub, Sub

async def f():
    async with Client("tcp://127.0.0.1:58887", asynchronous=True) as client:
        sub = Sub("x")
        while True:
            value = await sub.get()
            print(value)

asyncio.get_event_loop().run_until_complete(f())
# Process one again

pub.put(123)
import numpy as np
x = np.arange(5)
pub.put(x)
# Output on 2
123
[0 1 2 3 4]

You can run many of these coroutines at once. Your asyncio event loop could run in a separate thread, or it may be that you already have an event loop running somewhere for your other application, and you can add these directly in there (this is greatly preferred).

I've been able to get this working inside two separate jupyter notebooks running locally!!! Thanks so much @mrocklin.

I will add a note that to use this inside a jupyter notebook, you must use

asyncio.get_event_loop().create_task(f())

as run_until_complete() is not allowed for reasons described here.

Right. Jupyter already has an event loop running, so you should use that.
It may be that your application also has such an event loop, if so you
should be opportunistic about using it.

On Sat, Nov 2, 2019, 12:29 PM Nicholas Sofroniew notifications@github.com
wrote:

I've been able to get this working inside two separate jupyter notebooks
running locally!!! Thanks so much @mrocklin https://github.com/mrocklin.

I will add a note that to use this inside a jupyter notebook, you must use

asyncio.get_event_loop().create_task(f())

as run_until_complete() is not allowed for reasons described here
https://stackoverflow.com/questions/47518874/how-do-i-run-python-asyncio-code-in-a-jupyter-notebook
.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/3188?email_source=notifications&email_token=AACKZTCTDGNEFWZEJZV76ETQRXIKRA5CNFSM4JIACU2KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEC5DGLA#issuecomment-549073708,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTEUWSX6ANC6N2QBCXLQRXIKRANCNFSM4JIACU2A
.

OK to close @sofroniewn ?

Yes, good to close. Thanks for the help!

Was this page helpful?
0 / 5 - 0 ratings