馃憢 hello from Chicago!
Running a very small workflow that created a Dask DataFrame with dd.read_csv(), then persist()ed it, then called .compute() on it, a substantial amount of memory was not freed in the cluster.
This memory was not freed even after removing the client that ran the workflow and all references to the Dask DataFrame (by restarting the Jupyter notebook kernel in the notebook client code runs from).
For a workflow that had peak memory usage around 10GB, I observed increasing memory usage across runs. The values below are memory usage in the cluster after restarting the kernel in the client notebook.
Based on my previous experience and the documentation in https://distributed.dask.org/en/latest/memory.html#managing-memory, I expected that any memory allocated for some tasks would be freed after those tasks completed and all client references to those tasks' results were removed.
I found that the easiest way I could reproduce this was with dask_cloudprovider.FargateCluster, but I did observe the same behavior on a dask_kubernetes.KubeCluster.
conda environment and Jupyter kernel to work inname: daskdev
channels:
- defaults
dependencies:
- argon2-cffi=20.1.0
- async_generator=1.10
- attrs=20.2.0
- backcall=0.2.0
- bleach=3.2.1
- ca-certificates
- certifi=2020.6.20
- cffi=1.14.3
- decorator=4.4.2
- defusedxml=0.6.0
- entrypoints=0.3
- ipykernel=5.3.4
- ipython=7.18.1
- ipython_genutils=0.2.0
- jedi=0.17.2
- jinja2=2.11.2
- jsonschema=3.2.0
- jupyter_client=6.1.7
- jupyter_core=4.6.3
- jupyterlab_pygments=0.1.2
- libedit=3.1.20191231
- libffi=3.3
- libsodium=1.0.18
- markupsafe=1.1.1
- mistune=0.8.4
- nb_conda_kernels
- nbclient=0.5.1
- nbconvert=6.0.7
- nbformat=5.0.8
- ncurses=6.2
- nest-asyncio=1.4.1
- notebook=6.1.4
- openssl=1.1.1h
- packaging=20.4
- pandoc=2.11
- pandocfilters=1.4.2
- parso=0.7.0
- pexpect=4.8.0
- pickleshare=0.7.5
- pip=20.2.4
- prometheus_client=0.8.0
- prompt-toolkit=3.0.8
- ptyprocess=0.6.0
- pycparser=2.20
- pygments=2.7.1
- pyparsing=2.4.7
- pyrsistent=0.17.3
- python=3.8.5
- python-dateutil=2.8.1
- pyzmq=19.0.2
- readline=8.0
- send2trash=1.5.0
- setuptools=50.3.0
- six=1.15.0
- sqlite=3.33.0
- terminado=0.9.1
- testpath=0.4.4
- tk=8.6.10
- tornado=6.0.4
- traitlets=5.0.5
- wcwidth=0.2.5
- webencodings=0.5.1
- wheel=0.35.1
- xz=5.2.5
- zeromq=4.3.3
- zipp=3.3.1
- zlib=1.2.11
- pip:
- adal==1.2.5
- aiobotocore==1.1.2
- aiohttp==3.6.3
- aioitertools==0.7.0
- applicationinsights==0.11.9
- async-timeout==3.0.1
- azure-common==1.1.25
- azure-core==1.8.2
- azure-graphrbac==0.61.1
- azure-identity==1.4.1
- azure-mgmt-authorization==0.61.0
- azure-mgmt-containerregistry==2.8.0
- azure-mgmt-keyvault==2.2.0
- azure-mgmt-resource==10.2.0
- azure-mgmt-storage==11.2.0
- azureml-automl-core==1.16.0
- azureml-core==1.16.0.post1
- azureml-dataprep==2.3.4
- azureml-dataprep-native==23.0.0
- azureml-dataprep-rslex==1.1.3
- azureml-dataset-runtime==1.16.0
- azureml-pipeline==1.16.0
- azureml-pipeline-core==1.16.0
- azureml-pipeline-steps==1.16.0
- azureml-sdk==1.16.0
- azureml-telemetry==1.16.0
- azureml-train==1.16.0
- azureml-train-automl-client==1.16.0
- azureml-train-core==1.16.0
- azureml-train-restclients-hyperdrive==1.16.0
- backports-tempfile==1.0
- backports-weakref==1.0.post1
- blosc==1.9.2
- botocore==1.17.44
- chardet==3.0.4
- click==7.1.2
- cloudpickle==1.6.0
- contextlib2==0.6.0.post1
- cryptography==3.1.1
- dask==2.30.0
- dask-cloudprovider==0.4.1
- dask-glm==0.2.0
- dask-ml==1.6.0
- distributed==2.30.0
- distro==1.5.0
- docker==4.3.1
- docutils==0.15.2
- dotnetcore2==2.1.17
- fsspec==0.8.4
- fusepy==3.0.1
- heapdict==1.0.1
- idna==2.10
- isodate==0.6.0
- jeepney==0.4.3
- jmespath==0.10.0
- joblib==0.17.0
- jsonpickle==1.4.1
- llvmlite==0.34.0
- lz4==3.1.0
- msal==1.5.1
- msal-extensions==0.2.2
- msgpack==1.0.0
- msrest==0.6.19
- msrestazure==0.6.4
- multidict==4.7.6
- multipledispatch==0.6.0
- ndg-httpsclient==0.5.1
- numba==0.51.2
- numpy==1.19.0
- oauthlib==3.1.0
- pandas==1.1.0
- pathspec==0.8.0
- portalocker==1.7.1
- psutil==5.7.2
- pyarrow==1.0.1
- pyasn1==0.4.8
- pyjwt==1.7.1
- pyopenssl==19.1.0
- pytz==2020.1
- pyyaml==5.3.1
- requests==2.24.0
- requests-oauthlib==1.3.0
- ruamel-yaml==0.16.12
- ruamel-yaml-clib==0.2.2
- s3fs==0.5.1
- scikit-learn==0.23.2
- scipy==1.5.3
- secretstorage==3.1.2
- sortedcontainers==2.2.2
- tblib==1.7.0
- threadpoolctl==2.1.0
- toolz==0.11.1
- typing-extensions==3.7.4.3
- urllib3==1.25.11
- websocket-client==0.57.0
- wrapt==1.12.1
- yarl==1.5.1
- zict==2.0.0
conda env create --name daskdev --file env.yml
source activate daskdev
conda install -y nb_conda_kernels
python -m ipykernel install \
--user \
--name daskdev \
--display-name "Python (daskdev)"
authenticate with AWS, then start run Jupyter Lab
jupyter lab
start up a new notebook using that kernel you just created.
from dask_cloudprovider import FargateCluster
# valid memory + vcpu combinations:
# * https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-cpu-memory-error.html
cluster = FargateCluster(
image="daskdev/dask:2.30.0",
worker_mem=30720,
scheduler_mem=30720,
scheduler_cpu=4096,
n_workers=1,
fargate_use_private_ip=False,
scheduler_timeout="45 minutes",
environment={
"EXTRA_PIP_PACKAGES": "s3fs"
}
)
print(cluster.dashboard_link)
print(cluster.scheduler_address)
Open a new browser window with the result of print(cluster.dashboard_link). Keep it open.

Copy the output of print(cluster.scheduler_address). You'll need it in the next step.
import dask.dataframe as dd
from dask.distributed import Client, wait
scheduler_address = # PASTE result of cluster.scheduler_address from other noteboook
client = Client(address=scheduler_address)
def get_data_frame():
ddf = dd.read_csv(
urlpath="s3://nyc-tlc/trip data/yellow_tripdata_2018-1*.csv",
storage_options={'anon': True}
)
ddf = ddf.persist()
_ = wait(ddf)
return ddf
def summarize(ddf):
computed = ddf.compute()
print(computed.describe())
ddf = get_data_frame()
summarize(ddf)
summarize() finishes running successfully, restart this notebook's kernel. Return to the Dask dashboard, refresh the page (just to be safe), and record the memory usage in the cluster.Doing this, I observed the following pattern of memory usage:
The peak memory usage I observed while running the example code was around 10GB, so around 3GB being leaked seems noteworthy.
I originally started investigating this today because I was running a workload with prefect's DaskExecutor and noticed that each run had less memory available to it than the previous one. After coming up with the reproducible example in this issue, I've convinced myself that the problem I saw has nothing to do with how prefect does its Dask stuff.
I did try to do my due diligence looking for other reports of this issue. There are many similar ones, but I didn't see any that involved s3fs (which mine indirectly does because it uses DataFrame.read_csv()) or which use DataFrame.persist(). Related issues that I found:
I did also consult "Managing Memory" from the Dask docs and found that VERY helpful!
See the env.yml I provided above for the client environment. The scheduler + workers in my example ran in containers using the daskdev/dask:2.30.0 docker image.
dask 2.30.0distributed 2.30.0s3fs 0.5.1dask-cloudprovider 0.4.1pipThanks for your time and consideration!
I haven't looked closely, but could https://github.com/pandas-dev/pandas/issues/19941 be related?
If it is _not_ CSV related, @jameslamb, you could test this with dask.datasets.timeseries() (which will generated a random timeseries on the fly) and see if you are still observing a memory leak
I think there might be something missing in garbage collection. It would be my expectation that if all references to an object are gone then it should be deleted from the cluster even if it's been persisted.
I have a simplified example that doesn't use csvs. In one notebook:
from dask.distributed import LocalCluster
cluster = LocalCluster()
cluster
In another:
from dask.datasets import timeseries
from dask.distributed import Client, wait
client = Client("tcp://127.0.0.1:41813")
wait(timeseries().persist());
Then if you restart the kernel in the second notebook you'd notice that the cluster holds on to about 700MiB of memory. That is the point at which I expected the memory to get freed up since there are no more references to the data.
If you restart the client the data gets deleted, but that seems kind of intense.
client = Client("tcp://127.0.0.1:41813")
client.restart()
Just to add that you definitely don't expect all things to be cleared up. In my case, having executed @jsignell 's code, I have about 100MB per process (versus ~50MB for fresh processes), which might be accounted for by imports alone.
Most helpful comment
I think there might be something missing in garbage collection. It would be my expectation that if all references to an object are gone then it should be deleted from the cluster even if it's been persisted.
I have a simplified example that doesn't use csvs. In one notebook:
In another:
Then if you restart the kernel in the second notebook you'd notice that the cluster holds on to about 700MiB of memory. That is the point at which I expected the memory to get freed up since there are no more references to the data.
Work around
If you restart the client the data gets deleted, but that seems kind of intense.