I get the error "Inputs contain futures that were created by another client" when I try to join a published dataset within a client different from the one that originally published it. The whole flow can be summarised as follows:
Client 1
df = dd.read_csv(...)
client.persist(df)
client.publish_dataset(ds_name=df)
Client 2
df = client.get_dataset("ds_name")
df2 = dd.read_csv(...)
df2.join(df)
client.persist(df2)
Is this expected behavior?
For reference, this check was added in the commit https://github.com/dask/distributed/commit/c02ea63ca2d0225ef52382e69165343d02f197cd#diff-96a27223dc91b5c9ea3d03684d79ad3f] which is part of the pull request https://github.com/dask/distributed/pull/2227
Are your code for publishing dataset should looks like this:
df = dd.read_csv(...)
df = client.persist(df)
client.publish_dataset(my_dataset=df)
Because client.persist() returns futures and you need to reference those futures for publishing dataset, but in your flow example you just call client.persist() without assigning it to any new variable. Please, check docs examples (https://distributed.readthedocs.io/en/latest/api.html#distributed.Client.publish_dataset) and add your actual code.
Sorry for being not precise enough.
The code I use is indeed:
names = dd.read_csv('/shared-store-path/names.csv')
names = names.set_index("ID")
names = client1.persist(names)
client1.publish_dataset(names=names)
I have attached a full example that can be used to reproduce the behavior. Shown below is the output from the notebook I used to test -
from dask.distributed import Client
from dask.distributed import wait
import dask.dataframe as dd
client1 = Client("scheduler-address:8786")
client2 = Client("scheduler-address:8786")
names = dd.read_csv('/shared-store-path/names.csv')
names = names.set_index("ID")
names = client1.persist(names)
wait(names)
DoneAndNotDoneFutures(done={client1.publish_dataset(names=names)
roles = dd.read_csv('/shared-store-path/roles.csv')
roles = roles.set_index("ID")
names_dataset = client2.get_dataset("names")
roles = roles.join(names_dataset)
roles.head()
Role
Name
ID
1
Developer
John
2
Developer
Doe
3
Developer
Johnny
4
Developer
Doey
5
Tester
Hello
client2.persist(roles)
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-20-7cb62cd7bd89> in <module>()
----> 1 client2.persist(roles)
/misc/anaconda3-amp/lib/python3.6/site-packages/distributed/client.py in persist(self, collections, optimize_graph, workers, allow_other_workers, resources, retries, priority, fifo_timeout, actors, **kwargs)
2561 user_priority=priority,
2562 fifo_timeout=fifo_timeout,
-> 2563 actors=actors)
2564
2565 postpersists = [c.__dask_postpersist__() for c in collections]
/misc/anaconda3-amp/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
2210 msg = ("Inputs contain futures that were created by "
2211 "another client.")
-> 2212 raise ValueError(msg)
2213
2214 if restrictions:
ValueError: Inputs contain futures that were created by another client.
Okay. Thanks for full example. I will try to run it tomorrow and see what will happen.
Actually, I have reproduced your error, but not using your code. It enough to publish a dataset with one client, then using second client (different process) get a dataset, modify something and if you will try to make client.persist() it will throw above error. I think it's a general error. My small example:
client = Client('localhost:8786')
client.restart()
df = dd.read_parquet('test_data/data.parquet')
df = client.persist(df)
client.publish_dataset(test=df)
Now, you can open a new terminal:
client = Client(...)
df = client.get_dataset('test')
new_df = df['some'] > 0.5
new_df = client.persist(new_df)
# error here...
Do you have any ideas @mrocklin ?
Python: 3.6.6
Dask: 0.20.1
Dask distributed: 1.24.1
Any updates/suggestions about this problem @mrocklin @martindurant ?
(I am at a conference and unlikely to be able to look into this right now)
did anything ever happen with this ticket. i seem to have run into the same situation and dont see any information anywhere else about it.
My situation is i am persisting a dataset the first time i load it and someone else is picking it up and adding more to the graph something akin to
df = client.get_dataset('test')
new_df = df['some'] > 0.5
new_df = client.compute(new_df)
and it is giving me this error
The original posting above used versions that are now somewhat out of date. Would you mind trying with the latest versions? I just ran a very similar test and didn't see any error.
sounds good after continuing to look i think i am getting something just slightly different. i will try and package up an example. I was able to get it working by not creating my own client and instead using get_client on my computer to make sure that the same client was being used with the client.compute call, but the behavior that i was experiencing didnt seem to make sense to me.
I will attempt to get something small to illustrate the situation shortly
We're also having troubles working with this check. There's a bunch of datasets published by a loader script, and another application is launching tasks that use .loc[...].compute() to retrieve a part of published dataset as a pandas dataframe and then perform some calculations with that dataframe. The way I see it, if the published dataset contains futures, and dask-worker retrieves it, then future.client is always a different object than the client of the worker, so future.client is self check will always fail. So how are we supposed to use published datasets from within workers?
future.client should become the client on the local worker.
That's not the case, the futures from the retrieved dataframe have a client with a different id (which is different for every session)
In [1]: from distributed import Client
In [2]: cl = Client("*redacted*")
In [3]: cl.id
Out[3]: 'Client-99fd4b86-8c42-11e9-b54a-48df3757a8a0'
In [4]: df = cl.get_dataset('test')
In [5]: list(df.dask.values())[0].client.id
Out[5]: 'Client-aaa7eacc-8c42-11e9-b54a-48df3757a8a0'
In [7]: dask.__version__
Out[7]: '1.2.2'
In [9]: distributed.__version__
Out[9]: '1.28.1'
Can anyone provide a reproducible example? This doesn't do it, I suspect because we aren't getting references to the client in the task graph
from distributed import LocalCluster, Client
import dask.array as da
def main():
cluster = LocalCluster(dashboard_address=None)
data = da.ones(3)
with Client(cluster) as a:
a.publish_dataset(data=data)
with Client(cluster, set_as_default=False) as b:
print(b.get_dataset('data').compute())
if __name__ == '__main__':
main()
Oh @tshatrov, can you repeat your test in https://github.com/dask/distributed/issues/2336#issuecomment-500818270 after calling optimize on the graph? That's when things would be replaced: https://github.com/dask/distributed/blob/587be8d48536f52453594eebd1a23becf864ccf9/distributed/client.py#L2519
STR:
dask-scheduler and dask-worker pointing at dask-scheduler.ipython and do the followingimport dask
from dask.distributed import Client
uri = <dask-scheduler's uri>
cl = Client(uri)
ts = dask.datasets.timeseries()
ts = ts.persist()
cl.publish_dataset(timeseries=ts)
def get_ts():
from dask.distributed import worker_client
with worker_client() as client:
return client.get_dataset('timeseries').compute()
cl.submit(get_ts).result()
As a result I get ValueError: Inputs contain futures that were created by another client.
I can't reproduce this on LocalCluster.
I have not figured out how to use optimize_insert_futures to make this not fail. As far as I see it doesn't change the dataframe.
Thanks @tshatrov. FWIW, I can't reproduce the ValueError locally with that. Perhaps others can.
Any updates on the issue, even I am facing the same problem as mentioned by @tshatrov
Hm, the example code also works OK for me. I wonder what might be different on your system, @pranav-kohli - what exact versions are you using?
I am using the following setup
dask==1.1.0
distributed==1.25.2
You should maybe try updating Dask and distributed and see if the problem
persists.
On Mon, Jun 24, 2019 at 8:26 AM pranav-kohli notifications@github.com
wrote:
I am using the following setup
dask==1.1.0
distributed==1.25.2—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/2336?email_source=notifications&email_token=AACKZTAV5HOAZ4SEQDXX2F3P4BSJ5A5CNFSM4GBZSIQ2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODYL4TAQ#issuecomment-504875394,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AACKZTEQ5XMEWRX4JFZDVLTP4BSJ5ANCNFSM4GBZSIQQ
.
As I posted above the problem persists on the most recently released versions.
This is what i used to replicate the issue on the latest versions
dask = 2.0.0
distributed = 2.0.1
from distributed import Client
daskClient = Client('localhost:8786')
df = pd.DataFrame(data=[[1]], columns=['a'])
df = dd.from_pandas(df, npartitions=1)
df = df.persist()
print(daskClient.id)
print(list(df.dask.values())[0].client.id)
future = daskClient.submit(getCompute, df)
def getCompute(df):
#print(df.id)
print(list(df.dask.values())[0].client.id)
df.compute()
The printed ids are different, hence the future.client is self check fails
Thanks @pranav-kohli. Unfortunately this is the output I get for (a lightly modified) version of your script :/
Client-7ef457a4-9bfa-11e9-abd5-186590cd1c87
import pandas as pd
from distributed import Client
import dask.dataframe as dd
def getCompute(df):
#print(df.id)
print(list(df.dask.values())[0].client.id)
df.compute()
if __name__ == '__main__':
daskClient = Client()
df = pd.DataFrame(data=[[1]], columns=['a'])
df = dd.from_pandas(df, npartitions=1)
df = df.persist()
print(daskClient.id)
print(list(df.dask.values())[0].client.id)
future = daskClient.submit(getCompute, df)
@TomAugspurger So actually in the test case we are printing the same id twice
print(daskClient.id)
print(list(df.dask.values())[0].client.id)
Can you check your dask worker log which prints from getCompute function?
My dask worker log shows a different client Id
Is anybody who's able to reproduce this issue locally able to debug further?
Here is a reproducer:
from distributed.deploy.ssh2 import SSHCluster
from distributed import Client
import pandas as pd
import dask.dataframe as dd
def getCompute(df):
return df.compute()
async def f():
async with SSHCluster(
hosts=["localhost", "localhost", "localhost"],
worker_kwargs={"nthreads": 4},
connect_kwargs={"known_hosts": None},
asynchronous=True,
) as cluster:
client = await Client("localhost:8786", asynchronous=True)
df = pd.DataFrame(data=[[1]], columns=["a"])
df = dd.from_pandas(df, npartitions=1)
df = df.persist()
future = client.submit(getCompute, df)
await future
if __name__ == "__main__":
import asyncio
asyncio.get_event_loop().run_until_complete(f())
Traceback (most recent call last):
File "foo.py", line 28, in <module>
asyncio.get_event_loop().run_until_complete(f())
File "/Users/mrocklin/miniconda/envs/dev/lib/python3.7/asyncio/base_events.py", line 573, in run_until_complete
return future.result()
File "foo.py", line 23, in f
await future
File "/Users/mrocklin/workspace/distributed/distributed/client.py", line 232, in _result
six.reraise(*exc)
File "/Users/mrocklin/miniconda/envs/dev/lib/python3.7/site-packages/six.py", line 692, in reraise
raise value.with_traceback(tb)
File "foo.py", line 8, in getCompute
return df.compute()
File "/Users/mrocklin/workspace/dask/dask/base.py", line 175, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/Users/mrocklin/workspace/dask/dask/base.py", line 446, in compute
results = schedule(dsk, keys, **kwargs)
File "/Users/mrocklin/workspace/distributed/distributed/client.py", line 2500, in get
actors=actors,
File "/Users/mrocklin/workspace/distributed/distributed/client.py", line 2389, in _graph_to_futures
raise ValueError(msg)
ValueError: Inputs contain futures that were created by another client.
I am able to reproduce following @tshatrov's instructions. I've slightly modified it to use get_cluster.
import distributed
client = distributed.Client('tcp://localhost:8786')
import dask
ts = dask.datasets.timeseries()
ts = ts.persist()
client.publish_dataset(timeseries=ts)
def get_ts():
from dask.distributed import get_client
with get_client() as client:
return client.get_dataset('timeseries').compute()
client.submit(get_ts).result()
ValueError: Inputs contain futures that were created by another client.
Dask and distributed 2.1.0.
Here is a new minimum reproducible example. The key here is that the clients connects to the scheduler on a different IP to the worker.
# Run dash scheduler on a machine
$ dask-scheduler
# Connect to that scheduler with a worker on the same machine via localhost
$ dask-worker localhost:8786
# Connect to the cluster on a different IP
import distributed
client = distributed.Client('10.1.2.3:8786') # Or whatever the LAN IP is
# Persist some data and publish it as a dataset
import dask
df = dask.datasets.timeseries().persist()
client.publish_dataset(df=df)
# Try and grab the dataset from within a delayed task
@dask.delayed
def remote_head():
client = distributed.get_client()
df = client.get_dataset('df')
return df.head()
remote_head().compute()
# This results in an error
ValueError: Inputs contain futures that were created by another client.
I"m stumbling on the same problem on distributed 2.7.0.
Another minimal example:
client = distributed.Client("localhost:8786")
a, = client.persist([delayed(1, pure=True)])
print(a) # Delayed('int-62645d78d66e2508256b7ab60a38b944')
print(a.compute()) # 1
print(client.compute([a])[0].result()) # 1
client.publish_dataset(foo=a)
b = client.get_dataset("foo")
print(b) # Delayed('int-62645d78d66e2508256b7ab60a38b944')
print(b.compute()) # 1
print(client.compute([b])[0].result()) # ValueError: Inputs contain futures that were created by another client.
The issue disappears if I connect to a LocalCluster instead.
Workaround:
print(client.gather([b.dask[b.key]])) # 1
This issue is particularly troublesome when using the asynchronous client together with non-trivial collections (read: anything other than a delayed), since the compute() method does not work (https://github.com/dask/dask/issues/5580) so one would have to reassemble the output by hand starting from output of the futures.
Still facing the same issue with Dask 2.6.0 and Distributed 2.6.0.
import pandas as pd
import dask.dataframe as dd
from distributed import Client
def getCompute(df):
print("client id inside")
print(list(df.dask.values())[0].client.id)
df.compute()
daskClient = Client('localhost:8786')
df = pd.DataFrame(data=[[1]], columns=['a'])
df = dd.from_pandas(df, npartitions=1)
df = df.persist()
print(daskClient.id)
future = daskClient.submit(getCompute, df)
print(future.result())
Error:
ValueError: Inputs contain futures that were created by another client.
@pranav-kohli I'm not able to reproduce the error with the latest dask and distributed release (version 2.12.0):
In [1]: import pandas as pd
...: import dask.dataframe as dd
...: from distributed import Client
...:
...: def getCompute(df):
...: print("client id inside")
...: print(list(df.dask.values())[0].client.id)
...: df.compute()
...:
...: daskClient = Client()
...: df = pd.DataFrame(data=[[1]], columns=['a'])
...: df = dd.from_pandas(df, npartitions=1)
...: df = df.persist()
...: print(daskClient.id)
...: future = daskClient.submit(getCompute, df)
...: print(future.result())
Client-cb589a84-63a4-11ea-9e5d-a0999b120aab
client id inside
Client-worker-ccab6e5c-63a4-11ea-9e6b-a0999b120aab
None
Can you update those packages and see if the problem persists?
Isn't this code using LocalCluster? We already know this bug does not reproduce on a LocalCluster.
LocalCluster has separate processes by default, just like any other deployment
Isn't this code using LocalCluster? We already know this bug does not reproduce on a LocalCluster.
Ah, I missed that when reading through the previous comments. I am able to reproduce the ValueError when not using LocalCluster
How weird! Now that should give us something to diagnose by, but I am pretty mystified.
I wonder, @jrbourbeau , do you get the problem with LocalCluster if you use spawn (which would end up with the client objects having different IDs in each worker) ?
The default multiprocessing method was updated to spawn in https://github.com/dask/distributed/pull/3461. I checked that the problem does not occur with LocalCluster when using either spawn or forkserver multiprocessing methods
This is what i used to replicate the issue on the latest versions
dask = 2.0.0
distributed = 2.0.1from distributed import Client daskClient = Client('localhost:8786') df = pd.DataFrame(data=[[1]], columns=['a']) df = dd.from_pandas(df, npartitions=1) df = df.persist() print(daskClient.id) print(list(df.dask.values())[0].client.id) future = daskClient.submit(getCompute, df) def getCompute(df): #print(df.id) print(list(df.dask.values())[0].client.id) df.compute()The printed ids are different, hence the
future.client is selfcheck fails
I have this same issue with the latest. Anyone one have any ideas?
dask==2.16.0
distributed==2.16.0
@pborgen my PR resolves the issue for get_dataset() specifically - which is what the opening post was about. What you're doing in that snippet is sending over an arbitrary python object which happens to contain Futures and expect them to be recreated correctly when the Worker deserializes them - which is a different problem, albeit related.
@pborgen my PR resolves the issue for get_dataset() specifically - which is what the opening post was about. What you're doing in that snippet is sending over an arbitrary python object which happens to contain Futures and expect them to be recreated correctly when the Worker deserializes them - which is a different problem, albeit related.
Is there currently a issue to capture this?
@pborgen there is one now: https://github.com/dask/distributed/issues/3790
Note that, as a workaround, you can use publish, which is currently the only sanctioned way to move collections across clients:
from distributed import Client, get_client
daskClient = Client('localhost:8786')
df = pd.DataFrame(data=[[1]], columns=['a'])
df = dd.from_pandas(df, npartitions=1)
df = df.persist()
daskClient.publish_dataset(foo=df)
try:
future = daskClient.submit(getCompute, "foo")
future.result()
finally:
daskClient.unpublish_dataset("foo")
def getCompute(name):
df = get_client().get_dataset(name)
df.compute()
The downside is that, if for any reason the Client is SIGKILL'ed or loses network connectivity before the end of the computation, you'll end up with a memory leak on the cluster.
Thanks for your help.....I upgraded my dev box and all my machines that are part of the cluster to 2.16.0. I am using dask cli with one machine dedicated to the scheduler and 3 other dask workers... When I run with one worker everything works fine. But if I run with 2 workers I get the below error:
Error:
File "c:\python38\lib\site-packagesdask\base.py", line 166, in compute
(result,) = compute(self, traverse=False, *kwargs)
File "c:\python38\lib\site-packagesdask\base.py", line 444, in compute
results = schedule(dsk, keys, *kwargs)
File "c:\python38\lib\site-packagesdistributed\client.py", line 2646, in get
used as an optimization to avoid recomputation.
File "c:\python38\lib\site-packagesdistributed\client.py", line 2543, in _graph_to_futures
ValueError: Inputs contain futures that were created by another client.
Versions:
dask==2.16.0
distributed==2.16.0
@pborgen run what? the POC that you posted, or my latest one with publish_dataset/get_dataset?
I am running my code...It is pretty similar to what you posted though..... Your code seems to run fine on my cluster.....I am going to try to run your code with a very large dask dataframe to see if I can reproduce...
I should say that this error I get only happens after things seem to be running for for a minute or so...I am also using a ddf that is created from a parquet file that is 140mb and about 40millions rows
inside my task I am querying this very large parquet file
@pborgen, if you can create a minimal reproducible example, please file a new issue and we can discuss there.
def task_big_data(id):
from dask.distributed import get_client
dataset = get_client().get_dataset('my_dataset')
df = dataset[dataset.ID== id].compute() <-----this is where I am getting my error(it does run for a bit before getting the error....Maybe it errors when the task is run on another worker?)
@pborgen, if you can create a minimal reproducible example, please file a new issue and we can discuss there.
Thanks for you help...I will try to get this to you today or tomarrow.
Do you know of a way to programmaticly create a very large dataframe? like at least 40 million rows and 8 columns.
There are some functions for doing this in dask.datasets (e.g. dask.datasets.timeseries). You could also use dask.dataframe.from_dask_array and dask.array.random to make some large random numeric dataframes.
Most helpful comment
(I am at a conference and unlikely to be able to look into this right now)