Hi,
This is less of a clear bug report and more a writeup of some debugging I recently did around weird memory leak like issues while running code using dask. I hope this will save someone a bit of time in the future.
I'm trying to use dask to run some simple code in parallel, as a better multiprocessing. Roughly this:
def fun(x):
return x + 1
tasks = [delayed(fun)(i) for i in range(1000)]
futs = client.compute(tasks)
In practice fun is a bit more complex, and reads data from s3, does a bit of computation and writes results back to s3.
When running the real example in parallel I was seeing really slow scheduling, and workers slowly ran out of memory. It seemed to scale roughly with the number of tasks, each task takes around 30 seconds on a single core, and when I was running 10 of them it all worked perfectly. 500 still worked, 1000 were borderline and 10000 certainly didn't.
My workers died with various exceptions related to memory usage, e.g.
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
After a bit of debugging this seems to be caused by a helper class that got passed into fun, a minimal example that breaks looks like this:
class S3FsWrapper(object):
def __init__(self):
self.fs = s3fs.S3FileSystem()
def get_s3fs(self):
return self.fs
fs = S3FsWrapper()
def fun(fs, x):
# would do something with fs here, but not necessary to trigger OOM
return x + 1
tasks = [delayed(fun)(fs, i) for i in range(1000)]
futs = client.compute(tasks)
progress(futs)
Running this will take ages, and depending on how much RAM you have will most likely crash.
So I had a look at where all this memory goes with pympler.muppy:
def debug_mem():
from pympler import summary, muppy
all_objects = muppy.get_objects()
s = summary.summarize(all_objects)
return s
s = client.run(debug_mem)
from pympler import summary, muppy
summary.print_(list(s.values())[0])
types | # objects | total size
======================================================= | =========== | ============
<class 'collections.OrderedDict | 372201 | 163.90 MB
<class 'str | 598212 | 48.18 MB
<class 'dict | 90573 | 23.05 MB
<class 'list | 95265 | 7.50 MB
<class '_io.BufferedWriter | 3 | 4.25 MB
<class 'code | 25294 | 3.49 MB
<class 'type | 3317 | 3.39 MB
<class 'botocore.hooks.NodeList | 19000 | 1.45 MB
<class 'tuple | 22560 | 1.42 MB
<class 'set | 2921 | 1.28 MB
<class 'cell | 22473 | 1.03 MB
<class 'botocore.docs.docstring.ClientMethodDocstring | 7700 | 789.55 KB
<class 'weakref | 5410 | 422.66 KB
<class 'botocore.model.OperationModel | 7700 | 421.09 KB
<class 'int | 8753 | 261.06 KB
It looks like every task instance has loaded its own copy of botocore. All the strings contain AWS api descriptions, and I suspect the OrderedDicts are similar.
So this is how far I've gotten. Runnable notebook is at https://github.com/ah-/notebooks/blob/master/dask_oom.ipynb.
I have some ideas what exactly is going on underneath, but I'd be grateful for a clear explanation, and maybe some hints how to avoid this.
I suspect this isn't actually a dask bug but a side-effect of how data is serialised and passed around.
Thank you for raising this.
First question: if you change the code as follows does the memory of your workers still climb?
tasks = [delayed(fun)(fs, i) for i in range(1000)]
futs = client.compute(tasks)
tasks = [delayed(fun)(fs, i) for i in range(1000)]
fire_and_forget(client.compute(tasks))
from dask.distributed import fire_and_forget
Yeah, so if I del futs in between calls the boto things go away. I agree that we shouldn't be getting thousands of these (this probably has to do with serializing and deserializing boto objects) but I'm not surprised that the result of the task and the function stay in memory while they are still possibly needed.
In [9]: def debug_mem():
...: from pympler import summary, muppy
...: all_objects = muppy.get_objects()
...: s = summary.summarize(all_objects)
...: return s
...:
...: s = client.run(debug_mem)
...:
...: from pympler import summary, muppy
...: summary.print_(list(s.values())[0])
...:
types | # objects | total size
================================= | =========== | ============
<class 'dict | 44753 | 13.10 MB
<class 'str | 94103 | 12.05 MB
<class 'list | 48956 | 3.63 MB
<class 'code | 23752 | 3.28 MB
<class 'type | 2985 | 3.03 MB
<class 'collections.OrderedDict | 3230 | 1.46 MB
<class 'set | 2654 | 1.07 MB
<class 'botocore.hooks.NodeList | 12450 | 972.66 KB
<class 'tuple | 14660 | 949.68 KB
<class 'weakref | 5080 | 396.88 KB
<class 'cell | 7526 | 352.78 KB
<class 'bokeh.model.MetaModel | 243 | 252.84 KB
<class 'getset_descriptor | 3530 | 248.20 KB
<class 'int | 8055 | 226.12 KB
function (__init__) | 1677 | 222.73 KB
In [10]: del futs
In [11]: def debug_mem():
...: from pympler import summary, muppy
...: all_objects = muppy.get_objects()
...: s = summary.summarize(all_objects)
...: return s
...:
...: s = client.run(debug_mem)
...:
...: from pympler import summary, muppy
...: summary.print_(list(s.values())[0])
...:
types | # objects | total size
================================= | =========== | ============
<class 'str | 75600 | 10.95 MB
<class 'dict | 16256 | 6.22 MB
<class 'code | 23375 | 3.23 MB
<class 'type | 2860 | 2.91 MB
<class 'collections.OrderedDict | 3230 | 1.46 MB
<class 'set | 2529 | 1.04 MB
<class 'list | 11456 | 966.04 KB
<class 'tuple | 13049 | 849.97 KB
<class 'weakref | 4955 | 387.11 KB
<class 'bokeh.model.MetaModel | 243 | 252.84 KB
<class 'cell | 5276 | 247.31 KB
<class 'getset_descriptor | 3280 | 230.62 KB
<class 'int | 7623 | 214.31 KB
function (__init__) | 1552 | 206.12 KB
<class 'wrapper_descriptor | 2552 | 199.38 KB
It's a bit better with fire_and_forget. After it's done it does clean up and memory usage goes down to normal levels, and there aren't tons of strs and OrderedDicts around anymore. However, while running it still eats tons of memory and doesn't run for larger numbers of tasks.
So, using simple functions like def fun(i): I'm unable to get a large number of objects to leak, other than 10000 or so that are due to an internal log. This tends to be 100MB or so at maximum.
I'm also curious what version of distributed and Python you're running on.
I'm not seeing any OrderedDicts, but suspect that this is a Python version difference.
I'm sorry that's not true, I do see OrderedDicts when engaging the s3fs code
Yep, by cleaning up futures it runs much better. However, this for example still kills my workers before it has a chance to finish:
def fun(fs, x):
return x + 1
tasks = [delayed(fun)(fs, i) for i in range(10000)]
fire_and_forget(client.compute(tasks))
The scheduling etc. is also noticeably slower if fs is passed in. I'm running Anaconda Python 3.6.4 on osx and linux, other packages:
dask 0.17.1 py_2 conda-forge
dask-core 0.17.1 py_0 conda-forge
distributed 1.21.3 py36_0 conda-forge
boto 2.48.0 py36hdbc59ac_1
boto3 1.6.7 py_0 conda-forge
botocore 1.9.7 py_0 conda-forge
cloudpickle 0.5.2 py_0 conda-forge
I find that surprising. My system is happy to run that computation with an order of magnitude more tasks without an issue.
Any interesting logs by any chance? How are you starting your client?
https://github.com/ah-/notebooks/blob/master/dask_oom_2.ipynb
It might just be that the 2GB it got were too little to temporarily import x number of botocore instances.
You might consider diving into https://github.com/dask/s3fs and seeing if
there is a reasonable place to reuse botocore instances across S3FileSystem
objects :)
On Tue, Mar 27, 2018 at 6:37 PM, Andreas Heider notifications@github.com
wrote:
https://github.com/ah-/notebooks/blob/master/dask_oom_2.ipynb
It might just be that the 2GB it got were too little to temporarily import
x number of botocore instances.—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/1866#issuecomment-376698557,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszExoJn_1jBiREh277n3ot2ZvBMhGks5tir8VgaJpZM4S9oqP
.
OK, I can reproduce the issue. Some details:
This helps when testing on larger systems
cluster = LocalCluster(..., memory_limit='2GB')
When I watch the diagnostic dashboard I notice that memory jumps up quickly before any of the computations start. I suspect that this means that the memory cost isn't in the results of the worker, its in the deserialized versions of the tasks themselves (the many Python functions). Generally we don't have any controls on data like this that we expect to be small. I am not surprised to learn that Dask crashes here.
One thing that may help here would be if we were to do a bit of caching on deserialization. "Hey, I've seen this huge string of bytes recently, it turned into this function, I'll just return that immediately rather than deserialize it again." But this will likely have complications of its own.
I'm tempted to say "just don't send hundreds of thousands of tasks that close over non-trivial data"
%time len(cloudpickle.dumps(fs))
CPU times: user 0 ns, sys: 4.06 ms, total: 4.06 ms
Wall time: 3.32 ms
Out[10]:
911
This isn't that big serialized, but the serialization time is non-trivial and I wouldn't be surprised if it's much bigger when in memory.
I recommend raising an issue upstream on s3fs noting that creating many S3FileSystems (or at least deserializing them) seems to make many botocore objects, and asking if there is a place where you can help to correct the issue.
Most helpful comment
I recommend raising an issue upstream on s3fs noting that creating many S3FileSystems (or at least deserializing them) seems to make many botocore objects, and asking if there is a place where you can help to correct the issue.