Dask: memory leak using Dask DataFrame

Created on 22 Oct 2020  路  4Comments  路  Source: dask/dask

馃憢 hello from Chicago!

What happened:

Running a very small workflow that created a Dask DataFrame with dd.read_csv(), then persist()ed it, then called .compute() on it, a substantial amount of memory was not freed in the cluster.

This memory was not freed even after removing the client that ran the workflow and all references to the Dask DataFrame (by restarting the Jupyter notebook kernel in the notebook client code runs from).

For a workflow that had peak memory usage around 10GB, I observed increasing memory usage across runs. The values below are memory usage in the cluster after restarting the kernel in the client notebook.

  • cluster startup: 101.7 MB
  • after run 1: 2.45 GB
  • after run 2: 2.87 GB
  • after run 3: 3.05 GB

What you expected to happen:

Based on my previous experience and the documentation in https://distributed.dask.org/en/latest/memory.html#managing-memory, I expected that any memory allocated for some tasks would be freed after those tasks completed and all client references to those tasks' results were removed.

Minimal Complete Verifiable Example:

I found that the easiest way I could reproduce this was with dask_cloudprovider.FargateCluster, but I did observe the same behavior on a dask_kubernetes.KubeCluster.

  1. create a conda environment and Jupyter kernel to work in
    env.yml (click me)
name: daskdev
channels:
  - defaults
dependencies:
  - argon2-cffi=20.1.0
  - async_generator=1.10
  - attrs=20.2.0
  - backcall=0.2.0
  - bleach=3.2.1
  - ca-certificates
  - certifi=2020.6.20
  - cffi=1.14.3
  - decorator=4.4.2
  - defusedxml=0.6.0
  - entrypoints=0.3
  - ipykernel=5.3.4
  - ipython=7.18.1
  - ipython_genutils=0.2.0
  - jedi=0.17.2
  - jinja2=2.11.2
  - jsonschema=3.2.0
  - jupyter_client=6.1.7
  - jupyter_core=4.6.3
  - jupyterlab_pygments=0.1.2
  - libedit=3.1.20191231
  - libffi=3.3
  - libsodium=1.0.18
  - markupsafe=1.1.1
  - mistune=0.8.4
  - nb_conda_kernels
  - nbclient=0.5.1
  - nbconvert=6.0.7
  - nbformat=5.0.8
  - ncurses=6.2
  - nest-asyncio=1.4.1
  - notebook=6.1.4
  - openssl=1.1.1h
  - packaging=20.4
  - pandoc=2.11
  - pandocfilters=1.4.2
  - parso=0.7.0
  - pexpect=4.8.0
  - pickleshare=0.7.5
  - pip=20.2.4
  - prometheus_client=0.8.0
  - prompt-toolkit=3.0.8
  - ptyprocess=0.6.0
  - pycparser=2.20
  - pygments=2.7.1
  - pyparsing=2.4.7
  - pyrsistent=0.17.3
  - python=3.8.5
  - python-dateutil=2.8.1
  - pyzmq=19.0.2
  - readline=8.0
  - send2trash=1.5.0
  - setuptools=50.3.0
  - six=1.15.0
  - sqlite=3.33.0
  - terminado=0.9.1
  - testpath=0.4.4
  - tk=8.6.10
  - tornado=6.0.4
  - traitlets=5.0.5
  - wcwidth=0.2.5
  - webencodings=0.5.1
  - wheel=0.35.1
  - xz=5.2.5
  - zeromq=4.3.3
  - zipp=3.3.1
  - zlib=1.2.11
  - pip:
    - adal==1.2.5
    - aiobotocore==1.1.2
    - aiohttp==3.6.3
    - aioitertools==0.7.0
    - applicationinsights==0.11.9
    - async-timeout==3.0.1
    - azure-common==1.1.25
    - azure-core==1.8.2
    - azure-graphrbac==0.61.1
    - azure-identity==1.4.1
    - azure-mgmt-authorization==0.61.0
    - azure-mgmt-containerregistry==2.8.0
    - azure-mgmt-keyvault==2.2.0
    - azure-mgmt-resource==10.2.0
    - azure-mgmt-storage==11.2.0
    - azureml-automl-core==1.16.0
    - azureml-core==1.16.0.post1
    - azureml-dataprep==2.3.4
    - azureml-dataprep-native==23.0.0
    - azureml-dataprep-rslex==1.1.3
    - azureml-dataset-runtime==1.16.0
    - azureml-pipeline==1.16.0
    - azureml-pipeline-core==1.16.0
    - azureml-pipeline-steps==1.16.0
    - azureml-sdk==1.16.0
    - azureml-telemetry==1.16.0
    - azureml-train==1.16.0
    - azureml-train-automl-client==1.16.0
    - azureml-train-core==1.16.0
    -     azureml-train-restclients-hyperdrive==1.16.0
    - backports-tempfile==1.0
    - backports-weakref==1.0.post1
    - blosc==1.9.2
    - botocore==1.17.44
    - chardet==3.0.4
    - click==7.1.2
    - cloudpickle==1.6.0
    - contextlib2==0.6.0.post1
    - cryptography==3.1.1
    - dask==2.30.0
    - dask-cloudprovider==0.4.1
    - dask-glm==0.2.0
    - dask-ml==1.6.0
    - distributed==2.30.0
    - distro==1.5.0
    - docker==4.3.1
    - docutils==0.15.2
    - dotnetcore2==2.1.17
    - fsspec==0.8.4
    - fusepy==3.0.1
    - heapdict==1.0.1
    - idna==2.10
    - isodate==0.6.0
    - jeepney==0.4.3
    - jmespath==0.10.0
    - joblib==0.17.0
    - jsonpickle==1.4.1
    - llvmlite==0.34.0
    - lz4==3.1.0
    - msal==1.5.1
    - msal-extensions==0.2.2
    - msgpack==1.0.0
    - msrest==0.6.19
    - msrestazure==0.6.4
    - multidict==4.7.6
    - multipledispatch==0.6.0
    - ndg-httpsclient==0.5.1
    - numba==0.51.2
    - numpy==1.19.0
    - oauthlib==3.1.0
    - pandas==1.1.0
    - pathspec==0.8.0
    - portalocker==1.7.1
    - psutil==5.7.2
    - pyarrow==1.0.1
    - pyasn1==0.4.8
    - pyjwt==1.7.1
    - pyopenssl==19.1.0
    - pytz==2020.1
    - pyyaml==5.3.1
    - requests==2.24.0
    - requests-oauthlib==1.3.0
    - ruamel-yaml==0.16.12
    - ruamel-yaml-clib==0.2.2
    - s3fs==0.5.1
    - scikit-learn==0.23.2
    - scipy==1.5.3
    - secretstorage==3.1.2
    - sortedcontainers==2.2.2
    - tblib==1.7.0
    - threadpoolctl==2.1.0
    - toolz==0.11.1
    - typing-extensions==3.7.4.3
    - urllib3==1.25.11
    - websocket-client==0.57.0
    - wrapt==1.12.1
    - yarl==1.5.1
    - zict==2.0.0

conda env create --name daskdev --file env.yml
source activate daskdev
conda install -y nb_conda_kernels
python -m ipykernel install \
    --user \
    --name daskdev \
    --display-name "Python (daskdev)"
  1. authenticate with AWS, then start run Jupyter Lab

    jupyter lab
    
  2. start up a new notebook using that kernel you just created.

from dask_cloudprovider import FargateCluster

# valid memory + vcpu combinations:
# * https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-cpu-memory-error.html
cluster = FargateCluster(
    image="daskdev/dask:2.30.0",
    worker_mem=30720,
    scheduler_mem=30720,
    scheduler_cpu=4096,
    n_workers=1,
    fargate_use_private_ip=False,
    scheduler_timeout="45 minutes",
    environment={
        "EXTRA_PIP_PACKAGES": "s3fs"
    }
)

print(cluster.dashboard_link)
print(cluster.scheduler_address)
  1. Open a new browser window with the result of print(cluster.dashboard_link). Keep it open.

    • record the current memory usage in the cluster. It will probably be on the order of 100 MB
    • image
  2. Copy the output of print(cluster.scheduler_address). You'll need it in the next step.

  3. in another notebook using the kernel created in step 1, run this code (after pasting in the scheduler address)
import dask.dataframe as dd
from dask.distributed import Client, wait

scheduler_address = # PASTE result of cluster.scheduler_address from other noteboook
client = Client(address=scheduler_address)

def get_data_frame():
    ddf = dd.read_csv(
        urlpath="s3://nyc-tlc/trip data/yellow_tripdata_2018-1*.csv",
        storage_options={'anon': True}
    )
   ddf = ddf.persist()
    _ = wait(ddf)
    return ddf

def summarize(ddf):
    computed = ddf.compute()
    print(computed.describe())

ddf = get_data_frame()
summarize(ddf)
  1. When summarize() finishes running successfully, restart this notebook's kernel. Return to the Dask dashboard, refresh the page (just to be safe), and record the memory usage in the cluster.
  2. Repeat steps 5-6 several times. You should see that after each kernel restart on the client notebook, the cluster has higher memory utilization than it did after the previous restart.

Doing this, I observed the following pattern of memory usage:

  • cluster startup: 101.7 MB
  • after run 1: 2.45 GB
  • after run 2: 2.87 GB
  • after run 3: 3.05 GB

The peak memory usage I observed while running the example code was around 10GB, so around 3GB being leaked seems noteworthy.

Anything else we need to know?

I originally started investigating this today because I was running a workload with prefect's DaskExecutor and noticed that each run had less memory available to it than the previous one. After coming up with the reproducible example in this issue, I've convinced myself that the problem I saw has nothing to do with how prefect does its Dask stuff.

I did try to do my due diligence looking for other reports of this issue. There are many similar ones, but I didn't see any that involved s3fs (which mine indirectly does because it uses DataFrame.read_csv()) or which use DataFrame.persist(). Related issues that I found:

I did also consult "Managing Memory" from the Dask docs and found that VERY helpful!

Environment:

See the env.yml I provided above for the client environment. The scheduler + workers in my example ran in containers using the daskdev/dask:2.30.0 docker image.

  • Dask version:

    • dask 2.30.0

    • distributed 2.30.0

    • s3fs 0.5.1

    • dask-cloudprovider 0.4.1

  • Python version:

    • 3.8

  • Operating System:

    • client on macOS 10.14.6, scheduler and workers in daskdev/dask:2.30.0` image.

  • Install method (conda, pip, source):

    • pip


Thanks for your time and consideration!

Most helpful comment

I think there might be something missing in garbage collection. It would be my expectation that if all references to an object are gone then it should be deleted from the cluster even if it's been persisted.

I have a simplified example that doesn't use csvs. In one notebook:

from dask.distributed import LocalCluster

cluster = LocalCluster()
cluster

In another:

from dask.datasets import timeseries
from dask.distributed import Client, wait

client = Client("tcp://127.0.0.1:41813")

wait(timeseries().persist());

Then if you restart the kernel in the second notebook you'd notice that the cluster holds on to about 700MiB of memory. That is the point at which I expected the memory to get freed up since there are no more references to the data.

Work around

If you restart the client the data gets deleted, but that seems kind of intense.

client = Client("tcp://127.0.0.1:41813")
client.restart()

All 4 comments

I haven't looked closely, but could https://github.com/pandas-dev/pandas/issues/19941 be related?

If it is _not_ CSV related, @jameslamb, you could test this with dask.datasets.timeseries() (which will generated a random timeseries on the fly) and see if you are still observing a memory leak

I think there might be something missing in garbage collection. It would be my expectation that if all references to an object are gone then it should be deleted from the cluster even if it's been persisted.

I have a simplified example that doesn't use csvs. In one notebook:

from dask.distributed import LocalCluster

cluster = LocalCluster()
cluster

In another:

from dask.datasets import timeseries
from dask.distributed import Client, wait

client = Client("tcp://127.0.0.1:41813")

wait(timeseries().persist());

Then if you restart the kernel in the second notebook you'd notice that the cluster holds on to about 700MiB of memory. That is the point at which I expected the memory to get freed up since there are no more references to the data.

Work around

If you restart the client the data gets deleted, but that seems kind of intense.

client = Client("tcp://127.0.0.1:41813")
client.restart()

Just to add that you definitely don't expect all things to be cleared up. In my case, having executed @jsignell 's code, I have about 100MB per process (versus ~50MB for fresh processes), which might be accounted for by imports alone.

Was this page helpful?
0 / 5 - 0 ratings