Distributed: Worker memory not being freed when tasks complete

Created on 7 Jun 2019  路  24Comments  路  Source: dask/distributed

I'm still investigating, but in the meantime I wanted to get this issue started.

I'm noticing that after executing a task graph with large inputs and a small output, my worker memory stays high. In the example below we

  1. Generate data (large byte strings)
  2. filter data (slice)
  3. reduce many tasks (sum)

So the final result returned to the client is small, a single Python int. The only large objects should be the initially generated bytestrings.

The console output below is

  1. per-worker memory usage before the computation (~30 MB)
  2. per-worker memory usage right after the computation (~ 230 MB)
  3. per-worker memory usage 5 seconds after, in case things take some time to settle down. (~ 230 MB)
Memory usage [before]
{'tcp://192.168.7.20:50533': '30.92 MB', 'tcp://192.168.7.20:50534': '30.95 MB'}
running
Memory usage [after]
{'tcp://192.168.7.20:50533': '231.97 MB',
 'tcp://192.168.7.20:50534': '232.63 MB'}
Memory usage [after]
{'tcp://192.168.7.20:50533': '232.05 MB',
 'tcp://192.168.7.20:50534': '232.63 MB'}

In an effort to test whether the scheduler or worker is holding a reference to the data, I submit a bunch of tiny inc tasks to one of the worker. I notice that the memory on that worker does settle down

Memory usage [final]
{'tcp://192.168.7.20:52114': '232.77 MB',
 'tcp://192.168.7.20:52115': '49.73 MB'}

That's at least consistent with the worker or scheduler holding a reference to the data, but there could be many other causes. I'm still debugging.

The number of inc tasks, 2731, seems to be significant. With 2730 inc tasks, I don't see any memory reduction on that worker.

import time
from dask.utils import parse_bytes, format_bytes
import pprint
import string
import toolz
from distributed import Client, wait

N = parse_bytes("100 Mb")
I = 20


def inc(x):
    return x + 1


def f(x, n=N):
    time.sleep(0.05)
    return string.ascii_letters[x % 52].encode() * n


def g(x):
    time.sleep(0.02)
    return x[:5]


def h(*args):
    return sum(x[0] for x in args)


def get_mem(dask_worker):
    return dask_worker.monitor.proc.memory_info().rss


def main():
    dsk = {}
    for i in range(I):
        dsk[f'a-{i}'] = (f, i, N)
        dsk[f'b-{i}'] = (g, f'a-{i}')
    dsk['c-0'] = (h,) + tuple(f'b-{i}' for i in range(I))

    with Client(n_workers=2, threads_per_worker=1, memory_limit='500Mb', processes=True) as client:
        print("Memory usage [before]")
        pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))

        print("running")
        client.get(dsk, keys=["c-0"])
        time.sleep(2)  # let things settle

        print("Memory usage [after]")
        pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))

        time.sleep(5)  # settle some more?
        print("Memory usage [after]")
        pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))


        print("clear things?")
        futures = client.map(inc, range(2731), pure=False)
        wait(futures)
        del futures

        print("Memory usage [final]")
        pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))


if __name__ == '__main__':
    main()

Most helpful comment

This problem is indeed a big one, preventing me to use dask in production where I have a very long running task and 200 gigs of memory get used in not time
Is there anything at all you can suggest to investigate the issue or try to mitigate it?

I already tried the suggested PR without success.
My structure is made by nested multiprocessing on different layers like:

level A                            A1
level B                  B1                 D2
level C             C1        C2       C3        C4 
level D           D1  D2    D3  D4   D5  D6    D7  D8         

A1 runs in parallel (B1, B2)

B1 runs in parallel (C1, C2)
B2 runs in parallel (C3, C4)

C1 runs in parallel (D1, D2)
C2 runs in parallel (D3, D4)
C3 runs in parallel (D6, D5)
C4 runs in parallel (D7, D8)

And everything works fine except for the fact that once the tasks in the most inner layer D are completed the memory never gets released and it accumulates until the kernel dies.

In this situation of nested processes I cannot even restart the client in the inner layers because this will end up affecting the whole computation. So for me there is really no solution here.

Any help would be much appreciated.

All 24 comments

Another thing that might be worth doing is to add all values placed into Worker.data also into a weakref.WeakValueDictionary

def __init__(self, ...):
    self.weak_data = weakref.WeakValueDictionary()

def put_key_in_memory(self, key, value):
    self.data[key] = value
    self.weak_data[key] = value

Then, after seeing things flush through, you could check on the references to the items in the weak_data dictionary. This could even become a test fairly easily if we were to implement a custom mapping.

Regardless, thanks for investigating here. Issues like this have come up over the years without much resolution.

Another thing that might be worth doing is to add all values placed into Worker.data also into a weakref.WeakValueDictionary

Surprisingly(?) this exacerbates the memory leak. My computation doesn't even complete because the workers are killed for exceeding the memory limit. I guess if we are holding on to a reference somewhere, then we wouldn't expect it to be released from weak_data either...

I guess if we are holding on to a reference somewhere, then we wouldn't expect it to be released from weak_data either..

Right, but you would be able to look at the objects that have stayed around in weak_data, and then track their references

if worker.weak_data:
    obj = list(weak_data.values())[0]
    print(gc.get_referrers(obj))

Will give that a shot.

Another (failed) attempt, I made a custom object Foo

class Foo:
    def __init__(self, x, n=N):
        self.thing = string.ascii_letters[x % 52].encode() * n

    def __getitem__(self, index):
        return self.thing[index]

    def __sizeof__(self):
        return sys.getsizeof(self.thing) + sys.getsizeof(object())

and ran objgraph.by_type on the workers, nannies, and scheduler. It didn't find any :/

You should probably also be aware of https://github.com/dask/dask/issues/3530

You should probably also be aware of dask/dask#3530

Yes, that's a possible culprit.

Interestingly, I'm not seeing the memory lingering when running my original script on a linux system.

I don't have any insights to offer but I'm eager to hear any ideas about this; I'm often running into situations where I perform some kind of aggregation over a large dataset only to find I've permanently leaked 2TB of RAM 馃

Screen Shot 2019-06-13 at 4 50 54 PM
Screen Shot 2019-06-13 at 4 51 17 PM

I've run into issues previously with numpy arrays requiring an explicit gc.collect() before being released. I'm curious - does a gc.collect() work for you @bnaul to free up the memory?

I guess the below would work:

def collect():
    import gc
    gc.collect()

client.run(collect)

Edit: I should mention that this wasn't with dask but the symptoms sound familiar...

@dhirschfeld I have tried manually garbage collecting after reading the other issue Matt linked above and didn't see an improvement. Appreciate the suggestion though. This is using pandas read_csv for all the IO but I'm fairly confident I see the same behavior w/ other methods.

My guess here is that Dask isn't tracking any of the leaked data, and that we're in a situation where the next thing to do is to use normal Python methods to detect memory leaks (like the gc module). This is made more complex by the fact that we have to do this investigation in other processes. Using the client.run function is probably helpful here:

def f(dask_worker):
    return len(dask_worker.data)

client.run(f)

def f():
    return len([obj for obj in gc.get_objects() if isinstance(obj, pd.DataFrame)])

client.run(f)

...

Any progress on this issue? Thanks.

@jsanjay63 this GitHub issue should reflect the current state of things.

What is the workaround for this in the real world? Do people not use clusters with long-running workers? Or, are people okay with the worker eventually dying and the the task getting retried?

same problem here.
Is there a way to overcome this while we wait for the fix?

at the moment I am doing client.restart() every time

You might be interested in testing out this PR https://github.com/dask/distributed/pull/4221 . Reporting success or failures would be welcome. Note, this PR is still in flux and subject to change

You might be interested in testing out this PR #4221 . Reporting success or failures would be welcome. Note, this PR is still in flux and subject to change

Done but unfortunately no success. Thanks for the suggestion

* @quasiben EDIT: *
I was wrong. I was using the wrong branch when I tested the PR you suggested. It does half the job because it successfully kills the workers when they reach 80% of the memory limit but still the compute fails with an error

    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/venv/lib/python3.8/site-packages/distributed/client.py in _gather()
   1849                             exc = CancelledError(key)
   1850                         else:
-> 1851                             raise exception.with_traceback(traceback)
   1852                         raise exc
   1853                     if errors == "skip":

KilledWorker: ('_fit_one_estimator-e733af06-4296-44e5-8a1d-28a876c9f9a0', <Worker 'tcp://10.27.2.239:37489', name: 45, memory: 0, processing: 46>)

This problem is indeed a big one, preventing me to use dask in production where I have a very long running task and 200 gigs of memory get used in not time
Is there anything at all you can suggest to investigate the issue or try to mitigate it?

I already tried the suggested PR without success.
My structure is made by nested multiprocessing on different layers like:

level A                            A1
level B                  B1                 D2
level C             C1        C2       C3        C4 
level D           D1  D2    D3  D4   D5  D6    D7  D8         

A1 runs in parallel (B1, B2)

B1 runs in parallel (C1, C2)
B2 runs in parallel (C3, C4)

C1 runs in parallel (D1, D2)
C2 runs in parallel (D3, D4)
C3 runs in parallel (D6, D5)
C4 runs in parallel (D7, D8)

And everything works fine except for the fact that once the tasks in the most inner layer D are completed the memory never gets released and it accumulates until the kernel dies.

In this situation of nested processes I cannot even restart the client in the inner layers because this will end up affecting the whole computation. So for me there is really no solution here.

Any help would be much appreciated.

I'm also experiencing some king of memory leak, though it might not be related. I'm using only Dask distributed as a job scheduler, not even passing any substantial data. The input is just a filename and there is no return value. And the job itself is calling only bare pandas and numpy. This way I'm processing 4000 files (almost equaly sized) on 40 core machine in cca 45 minutes.

With Dask distributed the memory usage continuously increases until the work is done. At that point it's consuming 40 GB and the memory is not freed. I see distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? in logs, which seems alright because Dask is not involved in any data manipulation.

The strange thing is that I don't experience memory leak when using multiprocessing.Pool to dispatch those jobs. (And I also get better CPU usage.) Though it also consumes cca 10 GB which is not freed entirely, but the usage is almost constant all the time.

So it seams that Dask is not directly involved, yet it makes the difference somehow.

I'm running Debian buster, python3.7 and latest libraries (dask==2020.12.0, numpy==1.19.5, pandas==1.2.0). (Python3.8 seems to make no difference.)

There are htop leak-screenshots.zip.


See the code...

def compute_profile_from_file(filepath):
    df = pd.read_csv(filepath, compression="gzip", sep=";", names=("eid","t","src","spd"))
    # I observe memory leak even if just reading the data.
    # As I add more processing steps more memory is leaked.
    ...
    df.to_parquet(...)


def main_dask():
    fs = fsspec.filesystem("file")
    filepaths = fs.glob("/data/*.csv.gz")

    client = Client(LocalCluster(n_workers=40, threads_per_worker=1))
    results = set()

    for filepath in filepaths:
        if len(results) == 40:
            _, results = wait(results, return_when='FIRST_COMPLETED')

        job = client.submit(compute_profile_from_file, filepath)
        results.add(job)

    client.gather(results);
    del results

    time.sleep(24*3600)


def main_mp():
    fs = fsspec.filesystem("file")
    filepaths = fs.glob("/data/*.csv.gz")

    import multiprocessing as mp
    mp.set_start_method('spawn')

    pool = mp.Pool(40)
    pool.map(compute_profile_from_file, filepaths)

    time.sleep(24*3600)


if __name__ == "__main__":
    #main_dask()
    #main_mp()

Any update here? We are also trying to use Dask in production, but this is causing some major issues for us.

This issue, and probably a few others scattered across dask / distributed, should have the current state of things.

FWIW, I'm not able to reproduce my original issue now, at least not on a different machine. Previous attempts were on macOS, but on Linux under WSL2 I see

Memory usage [before]
{'tcp://127.0.0.1:33421': '54.06 MB', 'tcp://127.0.0.1:40071': '56.51 MB'}
running
Memory usage [after]
{'tcp://127.0.0.1:33421': '55.10 MB', 'tcp://127.0.0.1:40071': '56.91 MB'}
Memory usage [after]
{'tcp://127.0.0.1:33421': '55.50 MB', 'tcp://127.0.0.1:40071': '57.72 MB'}
clear things?
Memory usage [final]
{'tcp://127.0.0.1:33421': '65.99 MB', 'tcp://127.0.0.1:40071': '68.57 MB'}

Previously the [after] was in the hundreds of megabytes.


I also learned about sys._debugmallocstats(), which might be worth looking into if this is a memory fragmentation issue.

Is there a simple, easy, effective way to kill all dask processes/workers hogging up memory, whether via cmd line or directly in Python? I thought that it's automatically done upon completion of a python cmd line call or interrupting the execution, but I guess not. I executed these set of commands several times in VS code while debugging and wasn't aware that memory wasn't being freed up on every iteration. Now 73% of the RAM is blocked and I have no idea how to free it. Can someone please help?

COMMANDS:
import dask.dataframe as dd
df = dd.read_sql_table(parameterdict["table"],connstring,index_col='payee',columns=parameterdict["selectlist"])
df = df.compute()

MY CONFIGURATION:

  • Azure Data Science Virtual Machine, DS13 v2 instance (so it's not a cluster, just a single node I guess)
  • 56GB RAM
  • I'm using VS code on Python 3.6 with Dask 2.19 managed by Conda

Please help?

image

Just to chime in that I'm having a similar issue in long-running jobs (a very big job made of many small tasks). Worker logs indicate that something like 'memory full but no data to spill to disc'. It's very hard to diagnose because it only really has an impact after many hours - a 7-hour job completed but a 14 hour one got stuck as all the workers' memory filled so they stopped accepting or processing tasks, but since there's nothing to spill to disc they couldn't free their memory either.

Interestingly I first had the problem appear much sooner when i was using Numba to accelerate my algorithm. Then I switched to Cython instead and it seemed to cure the problem, but when I ran a much bigger job the problem still appeared :( Whatever it is must still be happening but much more slowly.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

anweshknayak picture anweshknayak  路  6Comments

DPeterK picture DPeterK  路  3Comments

wmlba picture wmlba  路  4Comments

quasiben picture quasiben  路  7Comments

sofroniewn picture sofroniewn  路  5Comments