Distributed: FFT on modestly large array results in KilledWorker

Created on 14 Feb 2018  Â·  12Comments  Â·  Source: dask/distributed

System Info

OS: GNU/Linux x86_64
Cores: 64
Memory: 252 GB
distributed 1.21.0
dask 0.17.0

Minimum Example

# Import needed libraries
import numpy as np
import distributed
import dask.array
# Create local cluster
client = distributed.Client()
# Create Numpy array
shape = (500, 500, 1000)
big_array= np.random.rand(*shape)
# Create Dask array with appropriate chunk size
chunks = (int(shape[0]/20), int(shape[1]/20), shape[2])
big_dask_array = dask.array.from_array(big_array, chunks=chunks)
# Compute FFT
fft = dask.array.fft.rfft(big_dask_array, axis=2,  n=(2 * shape[2] + 1))
foo = fft.compute()

Explanation and Traceback

When calculating a fast Fourier transform on a modestly large array (that still fits in memory), I get repeated error messages that look like,

tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:32854, threads: 1>>
Traceback (most recent call last):
  File "/storage-home/w/wtb2/anaconda3/envs/synthesizar/lib/python3.6/site-packages/tornado/ioloop.py", line 1026, in _run
    return self.callback()
  File "/storage-home/w/wtb2/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/nanny.py", line 251, in memory_monitor
    self.process.process.terminate()
AttributeError: 'NoneType' object has no attribute 'terminate'

followed by the error (and traceback) seen below. Looking at the Bokeh dashboard for the local cluster, the jobs either hang or continually restart.

I'm a bit confused as to why the worker is being killed. The array sizes I'm using here easily fit into memory. Adjusting the chunk size either way does not seem to make a difference . The traceback does not provide much insight. Any idea what is going on here?

Reducing the array shape to (475,475,1000), I don't seem to see this error. The tipping point seems to be around a (495,495).

---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<ipython-input-7-2e30612ecfe9> in <module>()
----> 1 foo = fft_1.compute()

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    141         dask.base.compute
    142         """
--> 143         (result,) = compute(self, traverse=False, **kwargs)
    144         return result
    145 

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    390     postcomputes = [a.__dask_postcompute__() if is_dask_collection(a)
    391                     else (None, a) for a in args]
--> 392     results = get(dsk, keys, **kwargs)
    393     results_iter = iter(results)
    394     return tuple(a if f is None else f(next(results_iter), *a)

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, **kwargs)
   2039                 secede()
   2040             try:
-> 2041                 results = self.gather(packed, asynchronous=asynchronous)
   2042             finally:
   2043                 for f in futures.values():

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1476             return self.sync(self._gather, futures, errors=errors,
   1477                              direct=direct, local_worker=local_worker,
-> 1478                              asynchronous=asynchronous)
   1479 
   1480     @gen.coroutine

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    601             return future
    602         else:
--> 603             return sync(self.loop, func, *args, **kwargs)
    604 
    605     def __repr__(self):

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    251             e.wait(10)
    252     if error[0]:
--> 253         six.reraise(*error[0])
    254     else:
    255         return result[0]

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/utils.py in f()
    235             yield gen.moment
    236             thread_state.asynchronous = True
--> 237             result[0] = yield make_coro()
    238         except Exception as exc:
    239             logger.exception(exc)

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/tornado/util.py in raise_exc_info(exc_info)

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1061                     if exc_info is not None:
   1062                         try:
-> 1063                             yielded = self.gen.throw(*exc_info)
   1064                         finally:
   1065                             # Break up a reference to itself

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1354                             six.reraise(type(exception),
   1355                                         exception,
-> 1356                                         traceback)
   1357                     if errors == 'skip':
   1358                         bad_keys.add(key)

~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

KilledWorker: ('array-original-4f69bbca3af6e278592118459b4e2c5d', 'tcp://127.0.0.1:41577')

Most helpful comment

Short: You might be running into a per-worker memory limit. Set worker-memory-terminate: False in ~/.dask/config.yml and see if it goes away. The default silence_logs level on LocalCluster might need to be turned down so that dummies like me don't spend an afternoon chasing dead workers before deciding to turn it down themselves.

Long: I think I had a similar issue to this. I had a long-running computation whose arguments/returns are pretty big (~100MB) and when I recently increased the size of these arguments a fair bit (up to ~300MB) I started getting KilledWorker exceptions.

It seemed to be dependent on problem size (with which both memory and computation time scales) and on the number of workers: this happened with a LocalCluster(6), but not (or not as reliably) with a LocalCluster(2) or LocalCluster(1).

Finally I turned logging on - which I should've done much sooner - and found that when the task failed, I got

distributed.nanny - WARNING - Worker exceeded 95% memory budget.  Restarting
distributed.nanny - WARNING - Worker process 7297 was killed by unknown signal
distributed.scheduler - INFO - Worker 'tcp://127.0.0.1:34409' failed from closed comm: in <closed TCP>: Stream is closed
distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:34409
distributed.nanny - WARNING - Restarting worker

The root problem is that the process termination memory limit is evenly divided between processes, so with a lot of processes and big tasks you can easily run into it without saturating your system-level limits. Immediate solution is to set the worker memory termination limit to False.

Might also be a good idea to lower the default silence_logs level to 30 so warnings show up by default.

All 12 comments

I recommend trying with the local scheduler and watching memory use during
computation with the ResourceProfiler:
http://dask.pydata.org/en/latest/diagnostics.html#resourceprofiler

That might help to isolate potential causes of the issue. Also cc
@jakirkham due to the use of FFT

On Wed, Feb 14, 2018 at 1:21 AM, Will Barnes notifications@github.com
wrote:

System Info

OS: GNU/Linux x86_64
Cores: 64
Memory: 252 GB
distributed 1.21.0
dask 0.17.0
Minimum Example

Import needed librariesimport numpy as npimport distributedimport dask.array# Create local cluster

client = distributed.Client()# Create Numpy array
shape = (500, 500, 1000)
big_array= np.random.rand(*shape)# Create Dask array with appropriate chunk size
chunks = (int(shape[0]/20), int(shape[1]/20), shape[2])
big_dask_array = dask.array.from_array(big_array, chunks=chunks)# Compute FFT
fft = dask.array.fft.rfft(big_dask_array, axis=2, n=(2 * shape[2] + 1))
foo = fft.compute()

Explanation and Traceback

When calculating a fast Fourier transform on a modestly large array (that
still fits in memory), I get repeated error messages that look like,

tornado.application - ERROR - Exception in callback >
Traceback (most recent call last):
File "/storage-home/w/wtb2/anaconda3/envs/synthesizar/lib/python3.6/site-packages/tornado/ioloop.py", line 1026, in _run
return self.callback()
File "/storage-home/w/wtb2/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/nanny.py", line 251, in memory_monitor
self.process.process.terminate()AttributeError: 'NoneType' object has no attribute 'terminate'

followed by the error (and traceback) seen below. Looking at the Bokeh
dashboard for the local cluster, the jobs either hang or continually
restart.

I'm a bit confused as to why the worker is being killed. The array sizes
I'm using here easily fit into memory. Adjusting the chunk size either way
does not seem to make a difference . The traceback does not provide much
insight. Any idea what is going on here?

Reducing the array shape to (475,475,1000), I don't seem to see this
error. The tipping point seems to be around a (495,495).


KilledWorker Traceback (most recent call last) in ()----> 1 foo = fft_1.compute()
~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/dask/base.py in compute(self, *kwargs)
141 dask.base.compute
142 """--> 143 (result,) = compute(self, traverse=False, *
kwargs) 144 return result 145 ~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/dask/base.py in compute(args, *kwargs) 390 postcomputes = [a.__dask_postcompute__() if is_dask_collection(a) 391 else (None, a) for a in args]--> 392 results = get(dsk, keys, *kwargs) 393 results_iter = iter(results) 394 return tuple(a if f is None else f(next(results_iter), *a)~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, *kwargs) 2039 secede() 2040 try:-> 2041 results = self.gather(packed, asynchronous=asynchronous) 2042 finally: 2043 for f in futures.values():~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous) 1476 return self.sync(self._gather, futures, errors=errors, 1477 direct=direct, local_worker=local_worker,-> 1478 asynchronous=asynchronous) 1479 1480 @gen.coroutine~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/client.py in sync(self, func, args, *kwargs) 601 return future 602 else:--> 603 return sync(self.loop, func, args, *kwargs) 604 605 def __repr__(self):~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, args, *kwargs) 251 e.wait(10) 252 if error[0]:--> 253 six.reraise(error[0]) 254 else: 255 return result[0]~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/six.py in reraise(tp, value, tb) 691 if value.__traceback__ is not tb: 692 raise value.with_traceback(tb)--> 693 raise value 694 finally: 695 value = None~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/utils.py in f() 235 yield gen.moment 236 thread_state.asynchronous = True--> 237 result[0] = yield make_coro() 238 except Exception as exc: 239 logger.exception(exc)~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/tornado/gen.py in run(self) 1053 1054 try:-> 1055 value = future.result() 1056 except Exception: 1057 self.had_exception = True~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/tornado/concurrent.py in result(self, timeout) 236 if self._exc_info is not None: 237 try:--> 238 raise_exc_info(self._exc_info) 239 finally: 240 self = None~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/tornado/util.py in raise_exc_info(exc_info)~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/tornado/gen.py in run(self) 1061 if exc_info is not None: 1062 try:-> 1063 yielded = self.gen.throw(exc_info) 1064 finally: 1065 # Break up a reference to itself~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker) 1354 six.reraise(type(exception), 1355 exception,-> 1356 traceback) 1357 if errors == 'skip': 1358 bad_keys.add(key)~/anaconda3/envs/synthesizar/lib/python3.6/site-packages/six.py in reraise(tp, value, tb) 691 if value.__traceback__ is not tb: 692 raise value.with_traceback(tb)--> 693 raise value 694 finally: 695 value = NoneKilledWorker: ('array-original-4f69bbca3af6e278592118459b4e2c5d', 'tcp://127.0.0.1:41577')

—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/1760, or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszJWNOyY9_cqadmaouyICEO57N8VVks5tUnt3gaJpZM4SE2aV
.

Thanks @mrocklin. I'll check this out. I've been monitoring the memory usage with typical command line tools (e.g. top, htop) during execution and those do not show memory usage anywhere near the total amount of memory of the machine. Maybe the ResourceProfiler will give a bit more insight

Ok so running it without creating a distributed.Client (i.e. using the local scheduler), I'm not seeing this error come up at all.

So the problem seems to be running with this with the LocalCluster. One solution of course is to just not create a client and just let the local scheduler handle it. I'm just confused as to why distributed kills the job. Any idea why this might be happening?

For completeness, the output from ResourceProfiler for dt=0.25 is,

[ResourceData(time=9168348.207994284, mem=4123.148288, cpu=0.0),
 ResourceData(time=9168348.471448295, mem=5141.21728, cpu=2616.1),
 ResourceData(time=9168348.723122692, mem=6666.149888, cpu=4965.4),
 ResourceData(time=9168348.974196846, mem=7912.640512, cpu=5926.5),
 ResourceData(time=9168349.224839108, mem=9476.050944, cpu=5685.2),
 ResourceData(time=9168349.475537471, mem=10017.488896, cpu=351.0),
 ResourceData(time=9168349.726117289, mem=10541.129728, cpu=99.8),
 ResourceData(time=9168349.97658792, mem=11068.555264, cpu=99.8),
 ResourceData(time=9168350.227070272, mem=11597.33248, cpu=99.8),
 ResourceData(time=9168350.47756516, mem=12134.490112, cpu=99.8),
 ResourceData(time=9168350.728043746, mem=12672.729088, cpu=99.8),
 ResourceData(time=9168350.978517156, mem=13207.18336, cpu=99.8),
 ResourceData(time=9168351.228988253, mem=12851.228672, cpu=103.8),
 ResourceData(time=9168351.479492525, mem=10318.5408, cpu=99.8)]

I next recommend running with the distributed scheduler within a single
process using Client(processes=False)

On Wed, Feb 14, 2018 at 2:59 PM, Will Barnes notifications@github.com
wrote:

Ok so running it without creating a distributed.Client (i.e. using the
local scheduler), I'm not seeing this error come up at all.

So the problem seems to be running with this with the LocalCluster. One
solution of course is to just not create a client and just let the local
scheduler handle it. I'm just confused as to why distributed kills the job.
Any idea why this might be happening?

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/1760#issuecomment-365727260,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszJv0V1IcnOkV50kH4wOrp8k_huOGks5tUzs7gaJpZM4SE2aV
.

Using Client(processes=False), no error is raised and the above example runs just fine.

Short: You might be running into a per-worker memory limit. Set worker-memory-terminate: False in ~/.dask/config.yml and see if it goes away. The default silence_logs level on LocalCluster might need to be turned down so that dummies like me don't spend an afternoon chasing dead workers before deciding to turn it down themselves.

Long: I think I had a similar issue to this. I had a long-running computation whose arguments/returns are pretty big (~100MB) and when I recently increased the size of these arguments a fair bit (up to ~300MB) I started getting KilledWorker exceptions.

It seemed to be dependent on problem size (with which both memory and computation time scales) and on the number of workers: this happened with a LocalCluster(6), but not (or not as reliably) with a LocalCluster(2) or LocalCluster(1).

Finally I turned logging on - which I should've done much sooner - and found that when the task failed, I got

distributed.nanny - WARNING - Worker exceeded 95% memory budget.  Restarting
distributed.nanny - WARNING - Worker process 7297 was killed by unknown signal
distributed.scheduler - INFO - Worker 'tcp://127.0.0.1:34409' failed from closed comm: in <closed TCP>: Stream is closed
distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:34409
distributed.nanny - WARNING - Restarting worker

The root problem is that the process termination memory limit is evenly divided between processes, so with a lot of processes and big tasks you can easily run into it without saturating your system-level limits. Immediate solution is to set the worker memory termination limit to False.

Might also be a good idea to lower the default silence_logs level to 30 so warnings show up by default.

@andyljones 's analysis sounds promising to me. I also support the idea of changing the default level for silence_logs. @andyljones is this something you would be intersted in contributing?

Sure.

Thanks for the suggestion @andyljones. I will try that out. If this is indeed the problem, do you know why it might be happening when running with multiple processes, but not within a single process?

Because the memory limit is calculated as system_memory/num_processes. As an example:

from distributed import LocalCluster
for n in [1, 12]:
    cluster = LocalCluster(n)
    limit = cluster.workers[0].memory_limit/2**30
    print(f'Memory limit for {n} workers is {limit:.0f}GB')

# Memory limit for 1 workers is 126GB
# Memory limit for 12 workers is 10GB

I think it's due to this chunk of code but I'm not sure.

One can override the memory limit by passing the memory_limit keyword argument to LocalCluster.

tl;dr: Using memory_limit as suggested by @jakirkham and @andyljones solves this issue.

Additional explantion: With dask 0.18.0 and distributed 1.22.0, using the minimal example I originally posted, I now get the repeated warning
```python traceback
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 68829 was killed by unknown signal

followed by the error,
```python traceback
KilledWorker: ('array-original-f874af421ac213e98c4a116a470adba2', 'tcp://127.0.0.1:34788')

which is indeed more helpful than the previous error I was getting and clearly indicates the need for more memory per worker. Starting up a LocalCluster with more memory allotted per worker (and fewer workers) fixes this problem, e.g.

cluster = LocalCluster(n_workers=32,memory_limit='8GB')
client = Client(cluster)

(as opposed to 64 workers with 4GB each).

Was this page helpful?
0 / 5 - 0 ratings

Related issues

michaelnarodovitch picture michaelnarodovitch  Â·  35Comments

muammar picture muammar  Â·  46Comments

stuartarchibald picture stuartarchibald  Â·  24Comments

rabernat picture rabernat  Â·  37Comments

rabernat picture rabernat  Â·  29Comments