I'm still investigating, but in the meantime I wanted to get this issue started.
I'm noticing that after executing a task graph with large inputs and a small output, my worker memory stays high. In the example below we
So the final result returned to the client is small, a single Python int. The only large objects should be the initially generated bytestrings.
The console output below is
Memory usage [before]
{'tcp://192.168.7.20:50533': '30.92 MB', 'tcp://192.168.7.20:50534': '30.95 MB'}
running
Memory usage [after]
{'tcp://192.168.7.20:50533': '231.97 MB',
'tcp://192.168.7.20:50534': '232.63 MB'}
Memory usage [after]
{'tcp://192.168.7.20:50533': '232.05 MB',
'tcp://192.168.7.20:50534': '232.63 MB'}
In an effort to test whether the scheduler or worker is holding a reference to the data, I submit a bunch of tiny inc tasks to one of the worker. I notice that the memory on that worker does settle down
Memory usage [final]
{'tcp://192.168.7.20:52114': '232.77 MB',
'tcp://192.168.7.20:52115': '49.73 MB'}
That's at least consistent with the worker or scheduler holding a reference to the data, but there could be many other causes. I'm still debugging.
The number of inc tasks, 2731, seems to be significant. With 2730 inc tasks, I don't see any memory reduction on that worker.
import time
from dask.utils import parse_bytes, format_bytes
import pprint
import string
import toolz
from distributed import Client, wait
N = parse_bytes("100 Mb")
I = 20
def inc(x):
return x + 1
def f(x, n=N):
time.sleep(0.05)
return string.ascii_letters[x % 52].encode() * n
def g(x):
time.sleep(0.02)
return x[:5]
def h(*args):
return sum(x[0] for x in args)
def get_mem(dask_worker):
return dask_worker.monitor.proc.memory_info().rss
def main():
dsk = {}
for i in range(I):
dsk[f'a-{i}'] = (f, i, N)
dsk[f'b-{i}'] = (g, f'a-{i}')
dsk['c-0'] = (h,) + tuple(f'b-{i}' for i in range(I))
with Client(n_workers=2, threads_per_worker=1, memory_limit='500Mb', processes=True) as client:
print("Memory usage [before]")
pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))
print("running")
client.get(dsk, keys=["c-0"])
time.sleep(2) # let things settle
print("Memory usage [after]")
pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))
time.sleep(5) # settle some more?
print("Memory usage [after]")
pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))
print("clear things?")
futures = client.map(inc, range(2731), pure=False)
wait(futures)
del futures
print("Memory usage [final]")
pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))
if __name__ == '__main__':
main()
Another thing that might be worth doing is to add all values placed into Worker.data also into a weakref.WeakValueDictionary
def __init__(self, ...):
self.weak_data = weakref.WeakValueDictionary()
def put_key_in_memory(self, key, value):
self.data[key] = value
self.weak_data[key] = value
Then, after seeing things flush through, you could check on the references to the items in the weak_data dictionary. This could even become a test fairly easily if we were to implement a custom mapping.
Regardless, thanks for investigating here. Issues like this have come up over the years without much resolution.
Another thing that might be worth doing is to add all values placed into Worker.data also into a weakref.WeakValueDictionary
Surprisingly(?) this exacerbates the memory leak. My computation doesn't even complete because the workers are killed for exceeding the memory limit. I guess if we are holding on to a reference somewhere, then we wouldn't expect it to be released from weak_data either...
I guess if we are holding on to a reference somewhere, then we wouldn't expect it to be released from weak_data either..
Right, but you would be able to look at the objects that have stayed around in weak_data, and then track their references
if worker.weak_data:
obj = list(weak_data.values())[0]
print(gc.get_referrers(obj))
Will give that a shot.
Another (failed) attempt, I made a custom object Foo
class Foo:
def __init__(self, x, n=N):
self.thing = string.ascii_letters[x % 52].encode() * n
def __getitem__(self, index):
return self.thing[index]
def __sizeof__(self):
return sys.getsizeof(self.thing) + sys.getsizeof(object())
and ran objgraph.by_type on the workers, nannies, and scheduler. It didn't find any :/
You should probably also be aware of https://github.com/dask/dask/issues/3530
You should probably also be aware of dask/dask#3530
Yes, that's a possible culprit.
Interestingly, I'm not seeing the memory lingering when running my original script on a linux system.
I don't have any insights to offer but I'm eager to hear any ideas about this; I'm often running into situations where I perform some kind of aggregation over a large dataset only to find I've permanently leaked 2TB of RAM 馃


I've run into issues previously with numpy arrays requiring an explicit gc.collect() before being released. I'm curious - does a gc.collect() work for you @bnaul to free up the memory?
I guess the below would work:
def collect():
import gc
gc.collect()
client.run(collect)
Edit: I should mention that this wasn't with dask but the symptoms sound familiar...
@dhirschfeld I have tried manually garbage collecting after reading the other issue Matt linked above and didn't see an improvement. Appreciate the suggestion though. This is using pandas read_csv for all the IO but I'm fairly confident I see the same behavior w/ other methods.
My guess here is that Dask isn't tracking any of the leaked data, and that we're in a situation where the next thing to do is to use normal Python methods to detect memory leaks (like the gc module). This is made more complex by the fact that we have to do this investigation in other processes. Using the client.run function is probably helpful here:
def f(dask_worker):
return len(dask_worker.data)
client.run(f)
def f():
return len([obj for obj in gc.get_objects() if isinstance(obj, pd.DataFrame)])
client.run(f)
...
Any progress on this issue? Thanks.
@jsanjay63 this GitHub issue should reflect the current state of things.
What is the workaround for this in the real world? Do people not use clusters with long-running workers? Or, are people okay with the worker eventually dying and the the task getting retried?
Small sample example to reproduce this issue: https://stackoverflow.com/questions/64046973/dask-memory-leakage-issue-with-json-and-requests
same problem here.
Is there a way to overcome this while we wait for the fix?
at the moment I am doing client.restart() every time
You might be interested in testing out this PR https://github.com/dask/distributed/pull/4221 . Reporting success or failures would be welcome. Note, this PR is still in flux and subject to change
You might be interested in testing out this PR #4221 . Reporting success or failures would be welcome. Note, this PR is still in flux and subject to change
Done but unfortunately no success. Thanks for the suggestion
* @quasiben EDIT: *
I was wrong. I was using the wrong branch when I tested the PR you suggested. It does half the job because it successfully kills the workers when they reach 80% of the memory limit but still the compute fails with an error
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/venv/lib/python3.8/site-packages/distributed/client.py in _gather()
1849 exc = CancelledError(key)
1850 else:
-> 1851 raise exception.with_traceback(traceback)
1852 raise exc
1853 if errors == "skip":
KilledWorker: ('_fit_one_estimator-e733af06-4296-44e5-8a1d-28a876c9f9a0', <Worker 'tcp://10.27.2.239:37489', name: 45, memory: 0, processing: 46>)
This problem is indeed a big one, preventing me to use dask in production where I have a very long running task and 200 gigs of memory get used in not time
Is there anything at all you can suggest to investigate the issue or try to mitigate it?
I already tried the suggested PR without success.
My structure is made by nested multiprocessing on different layers like:
level A A1
level B B1 D2
level C C1 C2 C3 C4
level D D1 D2 D3 D4 D5 D6 D7 D8
A1 runs in parallel (B1, B2)
B1 runs in parallel (C1, C2)
B2 runs in parallel (C3, C4)
C1 runs in parallel (D1, D2)
C2 runs in parallel (D3, D4)
C3 runs in parallel (D6, D5)
C4 runs in parallel (D7, D8)
And everything works fine except for the fact that once the tasks in the most inner layer D are completed the memory never gets released and it accumulates until the kernel dies.
In this situation of nested processes I cannot even restart the client in the inner layers because this will end up affecting the whole computation. So for me there is really no solution here.
Any help would be much appreciated.
I'm also experiencing some king of memory leak, though it might not be related. I'm using only Dask distributed as a job scheduler, not even passing any substantial data. The input is just a filename and there is no return value. And the job itself is calling only bare pandas and numpy. This way I'm processing 4000 files (almost equaly sized) on 40 core machine in cca 45 minutes.
With Dask distributed the memory usage continuously increases until the work is done. At that point it's consuming 40 GB and the memory is not freed. I see distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? in logs, which seems alright because Dask is not involved in any data manipulation.
The strange thing is that I don't experience memory leak when using multiprocessing.Pool to dispatch those jobs. (And I also get better CPU usage.) Though it also consumes cca 10 GB which is not freed entirely, but the usage is almost constant all the time.
So it seams that Dask is not directly involved, yet it makes the difference somehow.
I'm running Debian buster, python3.7 and latest libraries (dask==2020.12.0, numpy==1.19.5, pandas==1.2.0). (Python3.8 seems to make no difference.)
There are htop leak-screenshots.zip.
See the code...
def compute_profile_from_file(filepath):
df = pd.read_csv(filepath, compression="gzip", sep=";", names=("eid","t","src","spd"))
# I observe memory leak even if just reading the data.
# As I add more processing steps more memory is leaked.
...
df.to_parquet(...)
def main_dask():
fs = fsspec.filesystem("file")
filepaths = fs.glob("/data/*.csv.gz")
client = Client(LocalCluster(n_workers=40, threads_per_worker=1))
results = set()
for filepath in filepaths:
if len(results) == 40:
_, results = wait(results, return_when='FIRST_COMPLETED')
job = client.submit(compute_profile_from_file, filepath)
results.add(job)
client.gather(results);
del results
time.sleep(24*3600)
def main_mp():
fs = fsspec.filesystem("file")
filepaths = fs.glob("/data/*.csv.gz")
import multiprocessing as mp
mp.set_start_method('spawn')
pool = mp.Pool(40)
pool.map(compute_profile_from_file, filepaths)
time.sleep(24*3600)
if __name__ == "__main__":
#main_dask()
#main_mp()
Any update here? We are also trying to use Dask in production, but this is causing some major issues for us.
This issue, and probably a few others scattered across dask / distributed, should have the current state of things.
FWIW, I'm not able to reproduce my original issue now, at least not on a different machine. Previous attempts were on macOS, but on Linux under WSL2 I see
Memory usage [before]
{'tcp://127.0.0.1:33421': '54.06 MB', 'tcp://127.0.0.1:40071': '56.51 MB'}
running
Memory usage [after]
{'tcp://127.0.0.1:33421': '55.10 MB', 'tcp://127.0.0.1:40071': '56.91 MB'}
Memory usage [after]
{'tcp://127.0.0.1:33421': '55.50 MB', 'tcp://127.0.0.1:40071': '57.72 MB'}
clear things?
Memory usage [final]
{'tcp://127.0.0.1:33421': '65.99 MB', 'tcp://127.0.0.1:40071': '68.57 MB'}
Previously the [after] was in the hundreds of megabytes.
I also learned about sys._debugmallocstats(), which might be worth looking into if this is a memory fragmentation issue.
Is there a simple, easy, effective way to kill all dask processes/workers hogging up memory, whether via cmd line or directly in Python? I thought that it's automatically done upon completion of a python cmd line call or interrupting the execution, but I guess not. I executed these set of commands several times in VS code while debugging and wasn't aware that memory wasn't being freed up on every iteration. Now 73% of the RAM is blocked and I have no idea how to free it. Can someone please help?
COMMANDS:
import dask.dataframe as dd
df = dd.read_sql_table(parameterdict["table"],connstring,index_col='payee',columns=parameterdict["selectlist"])
df = df.compute()
MY CONFIGURATION:
Please help?

Just to chime in that I'm having a similar issue in long-running jobs (a very big job made of many small tasks). Worker logs indicate that something like 'memory full but no data to spill to disc'. It's very hard to diagnose because it only really has an impact after many hours - a 7-hour job completed but a 14 hour one got stuck as all the workers' memory filled so they stopped accepting or processing tasks, but since there's nothing to spill to disc they couldn't free their memory either.
Interestingly I first had the problem appear much sooner when i was using Numba to accelerate my algorithm. Then I switched to Cython instead and it seemed to cure the problem, but when I ran a much bigger job the problem still appeared :( Whatever it is must still be happening but much more slowly.
Most helpful comment
This problem is indeed a big one, preventing me to use dask in production where I have a very long running task and 200 gigs of memory get used in not time
Is there anything at all you can suggest to investigate the issue or try to mitigate it?
I already tried the suggested PR without success.
My structure is made by nested multiprocessing on different layers like:
A1runs in parallel(B1, B2)B1runs in parallel(C1, C2)B2runs in parallel(C3, C4)C1runs in parallel(D1, D2)C2runs in parallel(D3, D4)C3runs in parallel(D6, D5)C4runs in parallel(D7, D8)And everything works fine except for the fact that once the tasks in the most inner layer
Dare completed the memory never gets released and it accumulates until the kernel dies.In this situation of nested processes I cannot even restart the client in the inner layers because this will end up affecting the whole computation. So for me there is really no solution here.
Any help would be much appreciated.