The purpose of this issue is to track cases of semaphore leaks with view of hopefully getting to the root cause and fixing it (assuming it is a problem!). This issue is to also hold any useful notes whilst narrowing down the problems.
Things that would be really helpful:
dask-scheduler and dask-workers.SIGINT).Many thanks for your help.
@stuartarchibald, have you found a simple way to reproduce?
Happening to me. This program will trigger it:
import sys
from distributed import Client
client = Client()
sys.exit()
I have the latest versions of dask and distributed installed (see below for exact versions of all pkgs in conda environment). Nothing related to Python, Dask, or Bokeh are running (returns nothing):
$ ps -ef | grep -i -e bokeh -e python -e dask
Here is the output that I get before the program "hangs" (in the midst of the Client() object creation):
Traceback (most recent call last):
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/forkserver.py", line 178, in main
_serve_one(s, listener, alive_r, handler)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/forkserver.py", line 212, in _serve_one
code = spawn._main(child_r)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/spawn.py", line 114, in _main
prepare(preparation_data)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/spawn.py", line 225, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/spawn.py", line 277, in _fixup_main_from_path
run_name="__mp_main__")
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/Users/ijstokes/code/anaconda-download-data/semaphore_tracker_problem.py", line 4, in <module>
client = Client()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/client.py", line 400, in __init__
self.start(timeout=timeout)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/client.py", line 435, in start
sync(self.loop, self._start, **kwargs)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/utils.py", line 223, in sync
six.reraise(*error[0])
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/six.py", line 686, in reraise
raise value
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/utils.py", line 212, in f
result[0] = yield make_coro()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/client.py", line 478, in _start
yield self.cluster._start()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/deploy/local.py", line 149, in _start
services=self.worker_services, **self.worker_kwargs)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/deploy/local.py", line 155, in _start_all_workers
yield [self._start_worker(**kwargs) for i in range(n_workers)]
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 828, in callback
result_list.append(f.result())
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/deploy/local.py", line 183, in _start_worker
yield w._start()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/nanny.py", line 104, in _start
response = yield self.instantiate()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 307, in wrapper
yielded = next(result)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/nanny.py", line 217, in instantiate
self.process.start()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/context.py", line 291, in _Popen
return Popen(process_obj)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/popen_forkserver.py", line 35, in __init__
super().__init__(process_obj)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/popen_forkserver.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/spawn.py", line 143, in get_preparation_data
_check_not_importing_main()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
After I press CTRL-C I get this output:
^CTraceback (most recent call last):
File "semaphore_tracker_problem.py", line 4, in <module>
client = Client()
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/client.py", line 400, in __init__
self.start(timeout=timeout)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/client.py", line 435, in start
sync(self.loop, self._start, **kwargs)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/utils.py", line 221, in sync
e.wait(1000000)
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/threading.py", line 551, in wait
signaled = self._cond.wait(timeout)
Traceback (most recent call last):
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/threading.py", line 299, in wait
File "<string>", line 1, in <module>
gotit = waiter.acquire(True, timeout)
KeyboardInterrupt
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/forkserver.py", line 164, in main
rfds = [key.fileobj for (key, events) in selector.select()]
File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/selectors.py", line 577, in select
kev_list = self._kqueue.control(None, max_ev, timeout)
KeyboardInterrupt
/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/semaphore_tracker.py:129: UserWarning: semaphore_tracker: There appear to be 192 leaked semaphores to clean up at shutdown
len(cache))
Here is my conda environment:
$ conda list --explicit
# This file may be used to create an environment using:
# $ conda create --name <env> --file <this file>
# platform: osx-64
@EXPLICIT
https://repo.continuum.io/pkgs/free/osx-64/bokeh-0.12.5-py36_1.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/click-6.7-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/cloudpickle-0.2.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/dask-0.14.3-py36_1.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/distributed-1.16.3-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/heapdict-1.0.0-py36_1.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/jinja2-2.9.6-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/locket-0.2.0-py36_1.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/markupsafe-0.23-py36_2.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/mkl-2017.0.1-0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/msgpack-python-0.4.8-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/numpy-1.13.0-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/openssl-1.0.2l-0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/pandas-0.20.2-np113py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/partd-0.3.8-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/pip-9.0.1-py36_1.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/psutil-5.2.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/python-3.6.1-2.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/python-dateutil-2.6.0-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/pytz-2017.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/pyyaml-3.12-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/readline-6.2-2.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/requests-2.14.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/setuptools-27.2.0-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/six-1.10.0-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/sortedcollections-0.5.3-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/sortedcontainers-1.5.7-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/sqlite-3.13.0-0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/tblib-1.3.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/tk-8.5.18-0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/toolz-0.8.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/tornado-4.5.1-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/wheel-0.29.0-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/xz-5.2.2-1.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/yaml-0.1.6-0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/zict-0.1.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/zlib-1.2.8-3.tar.bz2
And in case you want to try reproducing it:
$ conda list -e
# This file may be used to create an environment using:
# $ conda create --name <env> --file <this file>
# platform: osx-64
bokeh=0.12.5=py36_1
click=6.7=py36_0
cloudpickle=0.2.2=py36_0
dask=0.14.3=py36_1
distributed=1.16.3=py36_0
heapdict=1.0.0=py36_1
jinja2=2.9.6=py36_0
locket=0.2.0=py36_1
markupsafe=0.23=py36_2
mkl=2017.0.1=0
msgpack-python=0.4.8=py36_0
numpy=1.13.0=py36_0
openssl=1.0.2l=0
pandas=0.20.2=np113py36_0
partd=0.3.8=py36_0
pip=9.0.1=py36_1
psutil=5.2.2=py36_0
python=3.6.1=2
python-dateutil=2.6.0=py36_0
pytz=2017.2=py36_0
pyyaml=3.12=py36_0
readline=6.2=2
requests=2.14.2=py36_0
setuptools=27.2.0=py36_0
six=1.10.0=py36_0
sortedcollections=0.5.3=py36_0
sortedcontainers=1.5.7=py36_0
sqlite=3.13.0=0
tblib=1.3.2=py36_0
tk=8.5.18=0
toolz=0.8.2=py36_0
tornado=4.5.1=py36_0
wheel=0.29.0=py36_0
xz=5.2.2=1
yaml=0.1.6=0
zict=0.1.2=py36_0
zlib=1.2.8=3
@ijstokes, please wrap your code in if __name__ == '__main__' and retry.
@pitrou, that should have been obvious to me. Thanks for the advice. I did that and things worked fine. The actual script that I am using, however, ran for 10 minutes and then failed with the semaphore_tracker error, but the cause seems reasonably clear: a CSV parsing error.
In case it is helpful, here is the stack-trace (and if it isn't feel free to edit the comment and remove it):
/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/worker.py:636: DtypeWarning: Columns (2) have mixed types. Specify dtype option on import or set low_memory=False.
result = function(*args, **kwargs)
distributed.utils - ERROR - could not convert string to float: '1486589$30.0'
Traceback (most recent call last):
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 212, in f
result[0] = yield make_coro()
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1551, in _get
result = yield self._gather(packed)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/client.py", line 996, in _gather
traceback)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/six.py", line 685, in reraise
raise value.with_traceback(tb)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/compatibility.py", line 47, in apply
return func(*args, **kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/dataframe/io/csv.py", line 61, in pandas_read_text
coerce_dtypes(df, dtypes)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/dataframe/io/csv.py", line 103, in coerce_dtypes
df[c] = df[c].astype(dtypes[c])
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/util/_decorators.py", line 91, in wrapper
return func(*args, **kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/generic.py", line 3299, in astype
**kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 3224, in astype
return self.apply('astype', dtype=dtype, **kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 3091, in apply
applied = getattr(b, f)(**kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 471, in astype
**kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 521, in _astype
values = astype_nansafe(values.ravel(), dtype, copy=True)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/dtypes/cast.py", line 636, in astype_nansafe
return arr.astype(dtype)
ValueError: could not convert string to float: '1486589$30.0'
Traceback (most recent call last):
File "scripts/loganalysis.py", line 26, in <module>
total = 0
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/dataframe/core.py", line 2492, in iterrows
df = self.get_partition(i).compute()
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/base.py", line 96, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/base.py", line 203, in compute
results = get(dsk, keys, **kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1590, in get
resources=resources)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 223, in sync
six.reraise(*error[0])
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/six.py", line 686, in reraise
raise value
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 212, in f
result[0] = yield make_coro()
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1551, in _get
result = yield self._gather(packed)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/client.py", line 996, in _gather
traceback)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/six.py", line 685, in reraise
raise value.with_traceback(tb)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/compatibility.py", line 47, in apply
return func(*args, **kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/dataframe/io/csv.py", line 61, in pandas_read_text
coerce_dtypes(df, dtypes)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/dataframe/io/csv.py", line 103, in coerce_dtypes
df[c] = df[c].astype(dtypes[c])
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/util/_decorators.py", line 91, in wrapper
return func(*args, **kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/generic.py", line 3299, in astype
**kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 3224, in astype
return self.apply('astype', dtype=dtype, **kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 3091, in apply
applied = getattr(b, f)(**kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 471, in astype
**kwargs)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 521, in _astype
values = astype_nansafe(values.ravel(), dtype, copy=True)
File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/dtypes/cast.py", line 636, in astype_nansafe
return arr.astype(dtype)
ValueError: could not convert string to float: '1486589$30.0'
distributed.client - WARNING - Client report stream closed to scheduler
/Users/ijstokes/anaconda/lib/python3.6/multiprocessing/semaphore_tracker.py:129: UserWarning: semaphore_tracker: There appear to be 150 leaked semaphores to clean up at shutdown
len(cache))
@ijstokes, would you be able to post a small standalone script (and accompanying CSV file) for me to try and reproduce? Thanks!
The semaphore_tracker.py is an integral part of the forkserver for reasons noted here https://github.com/python/cpython/blob/master/Lib/multiprocessing/semaphore_tracker.py#L2. I'm of the view that the presence of the semaphore_tracker reporting that there are semaphores to clean at shutdown is valid and correct if a worker fork of the forkserver that created the semaphores has been killed by other means (e.g. signal) (also note, I am guessing at dask.distributed internal implementation!). Sending a signal that cannot be caught by the forkserver's semaphore_tracker.py to the semaphore_tracker.py whilst in this state does cause a leak, but is very unlikely to happen unless someone went our of their way and sent the signal directly to that process.
I would suspect that an inadvertent SIGKILL directly to the semaphore_tracker.py process in any state would leak, this could happen if a user sends a SIGKILL to all running Python processes via the killall command.
Below are two samples, the first produces the message noted by @ijstokes, the second demonstrates a leak.
Here's a sample to produce the message:
#!/bin/bash
function count_semaphores() {
ls /dev/shm/sem.mp-* > /dev/null 2>&1
if [ $? -eq 0 ];then
sem_count=`ls /dev/shm/sem.mp-*|wc -l`
else
sem_count=0
fi
echo ""
echo "Semaphores count: ${sem_count}"
echo ""
}
function cleandask {
echo "Cleaning up"
for pid in ${worker_pid} ${scheduler_pid}; do
if ps -p ${pid} > /dev/null; then
echo "Sending SIGTERM to ${pid}"
kill -15 ${pid}
fi
done
}
dask-scheduler --host tcp://127.0.0.1 --scheduler-file ~/.dask_scheduler_foo &
scheduler_pid=$!
dask-worker --scheduler-file ~/.dask_scheduler_foo &
worker_pid=$!
trap cleandask EXIT
count_semaphores
# wait a bit for dask
echo "Sleeping..."
sleep 15
count_semaphores
sleep 5
# signal the worker pid with SIGINT and wait for it to finish
echo "Sending SIGINT to worker"
kill -2 ${worker_pid}
wait ${worker_pid}
sleep 5
count_semaphores
exit 0
which reports:
$ bash timing_attack.sh
Semaphores count: 0
Sleeping...
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786
distributed.scheduler - INFO - http at: 127.0.0.1:9786
distributed.scheduler - INFO - bokeh at: 127.0.0.1:8787
distributed.scheduler - INFO - Local Directory: /tmp/scheduler-mbrqkzfa
distributed.scheduler - INFO - -----------------------------------------------
distributed.nanny - INFO - Start Nanny at: 'tcp://127.0.0.1:35783'
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:35826
distributed.worker - INFO - nanny at: 127.0.0.1:35783
distributed.worker - INFO - http at: 127.0.0.1:42934
distributed.worker - INFO - bokeh at: 127.0.0.1:8789
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 4
distributed.worker - INFO - Memory: 4.92 GB
distributed.worker - INFO - Local Directory: /tmp/nanny-y5s_2oja/worker-bi4p90bu
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register tcp://127.0.0.1:35826
distributed.worker - INFO - Registered to: tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:35826
distributed.nanny - INFO - Nanny 'tcp://127.0.0.1:35783' starts worker process 'tcp://127.0.0.1:35826'
Semaphores count: 8
Sending SIGINT to worker
distributed.dask_worker - INFO - End worker
distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:35826
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - INFO - Removed worker tcp://127.0.0.1:35826
<path>/lib/python3.6/multiprocessing/semaphore_tracker.py:129: UserWarning: semaphore_tracker: There appear to be 5 leaked semaphores to clean up at shutdown
len(cache))
Semaphores count: 0
Cleaning up
Sending SIGTERM to 5679
distributed.scheduler - INFO - End scheduler at 'tcp://127.0.0.1:8786'
This demonstrates a user using killall -9 ${PYTHON}:
#!/bin/bash
function count_semaphores() {
ls /dev/shm/sem.mp-* > /dev/null 2>&1
if [ $? -eq 0 ];then
sem_count=`ls /dev/shm/sem.mp-*|wc -l`
else
sem_count=0
fi
echo ""
echo "Semaphores count: ${sem_count}"
echo ""
}
function cleandask {
echo "Cleaning up"
for pid in ${worker_pid} ${scheduler_pid}; do
if ps -p ${pid} > /dev/null; then
echo "Sending SIGTERM to ${pid}"
kill -15 ${pid}
fi
done
}
dask-scheduler --host tcp://127.0.0.1 --scheduler-file ~/.dask_scheduler_foo &
scheduler_pid=$!
dask-worker --scheduler-file ~/.dask_scheduler_foo &
worker_pid=$!
mypython=`which python`
trap cleandask EXIT
count_semaphores
# wait a bit for dask
echo "Sleeping..."
sleep 15
count_semaphores
sleep 5
# use killall with SIGKILL against python
echo "killall -9 ${mypython}"
killall -9 ${mypython}
wait ${worker_pid}
sleep 5
count_semaphores
exit 0
which produces (with a ls /dev/shm/sem.mp-* run afterwards):
$ bash timing_attack.sh; ls /dev/shm/sem.mp-*
Semaphores count: 0
Sleeping...
distributed.scheduler - INFO - -----------------------------------------------
distributed.nanny - INFO - Start Nanny at: 'tcp://127.0.0.1:43815'
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786
distributed.scheduler - INFO - http at: 127.0.0.1:9786
distributed.scheduler - INFO - bokeh at: 127.0.0.1:8787
distributed.scheduler - INFO - Local Directory: /tmp/scheduler-22eg4ek7
distributed.scheduler - INFO - -----------------------------------------------
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:45014
distributed.worker - INFO - nanny at: 127.0.0.1:43815
distributed.worker - INFO - http at: 127.0.0.1:40271
distributed.worker - INFO - bokeh at: 127.0.0.1:8789
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 4
distributed.worker - INFO - Memory: 4.92 GB
distributed.worker - INFO - Local Directory: /tmp/nanny-m_82kphw/worker-tgnm2dta
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register tcp://127.0.0.1:45014
distributed.worker - INFO - Registered to: tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:45014
distributed.nanny - INFO - Nanny 'tcp://127.0.0.1:43815' starts worker process 'tcp://127.0.0.1:45014'
Semaphores count: 8
killall -9 <path>/bin/python
timing_attack.sh: line 43: 7882 Killed dask-scheduler --host tcp://127.0.0.1 --scheduler-file ~/.dask_scheduler_foo
timing_attack.sh: line 43: 7883 Killed dask-worker --scheduler-file ~/.dask_scheduler_foo
Semaphores count: 8
Cleaning up
/dev/shm/sem.mp-35h9qgk9 /dev/shm/sem.mp-8phhzt84 /dev/shm/sem.mp-9zf45tsh /dev/shm/sem.mp-dtadz8sg
/dev/shm/sem.mp-60f905kl /dev/shm/sem.mp-98ut9s4s /dev/shm/sem.mp-bvdx50qy /dev/shm/sem.mp-em91_tkn
and that is where I suspect some of the leak reports are coming from ?
I am running into this issue, I believe. I can start a jupyter notebook with the following single cell:
from distributed import LocalCluster
import time
with LocalCluster():
time.sleep(10)
As soon as my processes spin up, I see this:
โโโ /envs/arch/bin/python /envs/arch/bin/jupyter notebook
โ โโ /envs/arch/bin/python -m ipykernel_launcher -f /run/user/13586/jupyter/kernel-8643517c-b2d9-4821-8775-af89d34b8a28.json
โ โโ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35.zi
โ โ โโ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35
โ โ โโ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35
โ โ โโ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35
โ โ โโ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35
โ โ โโ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35
โ โ โโ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35
โ โโ /envs/arch/bin/python -c from multiprocessing.semaphore_tracker import main;main(71)
If I allow the cell to finished, after it does so, I have this:
โโโ /envs/arch/bin/python /envs/arch/bin/jupyter notebook
โ โโ /envs/arch/bin/python -m ipykernel_launcher -f /run/user/13586/jupyter/kernel-8643517c-b2d9-4821-8775-af89d34b8a28.json
โ โโ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35.zi
โ โโ /envs/arch/bin/python -c from multiprocessing.semaphore_tracker import main;main(71)
If I interrupt the cell, I get this:
โโโ /envs/arch/bin/python /envs/arch/bin/jupyter notebook
โ โโ /envs/arch/bin/python -m ipykernel_launcher -f /run/user/13586/jupyter/kernel-8643517c-b2d9-4821-8775-af89d34b8a28.json
โ โโ python
โ โโ /envs/arch/bin/python -c from multiprocessing.semaphore_tracker import main;main(71)
And next time I try to run after I interrupt, I get this in the notebook:
/envs/arch/lib/python3.5/multiprocessing/forkserver.py in connect_to_new_process(self, fds)
64 raise ValueError('too many fds')
65 with socket.socket(socket.AF_UNIX) as client:
---> 66 client.connect(self._forkserver_address)
67 parent_r, child_w = os.pipe()
68 child_r, parent_w = os.pipe()
ConnectionRefusedError: [Errno 111] Connection refused
The semaphore_tracker process doesn't respond to SIGTERM, I have to send it a SIGKILL. After that, if I try to evaluate the cell, I get this:
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
File "/envs/arch/lib/python3.5/site-packages/tornado/gen.py", line 828, in callback
result_list.append(f.result())
File "/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/envs/arch/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/envs/arch/lib/python3.5/site-packages/distributed/deploy/local.py", line 172, in _start_worker
yield w._start()
File "/envs/arch/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/envs/arch/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/envs/arch/lib/python3.5/site-packages/distributed/nanny.py", line 112, in _start
response = yield self.instantiate()
File "/envs/arch/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/envs/arch/lib/python3.5/site-packages/tornado/gen.py", line 307, in wrapper
yielded = next(result)
File "/envs/arch/lib/python3.5/site-packages/distributed/nanny.py", line 207, in instantiate
q = mp_context.Queue()
File "/envs/arch/lib/python3.5/multiprocessing/context.py", line 101, in Queue
return Queue(maxsize, ctx=self.get_context())
File "/envs/arch/lib/python3.5/multiprocessing/queues.py", line 42, in __init__
self._rlock = ctx.Lock()
File "/envs/arch/lib/python3.5/multiprocessing/context.py", line 66, in Lock
return Lock(ctx=self.get_context())
File "/envs/arch/lib/python3.5/multiprocessing/synchronize.py", line 163, in __init__
SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
File "/envs/arch/lib/python3.5/multiprocessing/synchronize.py", line 81, in __init__
register(self._semlock.name)
File "/envs/arch/lib/python3.5/multiprocessing/semaphore_tracker.py", line 69, in register
self._send('REGISTER', name)
File "/envs/arch/lib/python3.5/multiprocessing/semaphore_tracker.py", line 82, in _send
nbytes = os.write(self._fd, msg)
BrokenPipeError: [Errno 32] Broken pipe
---------------------------------------------------------------------------
BrokenPipeError Traceback (most recent call last)
<ipython-input-5-5cd5d4e17e92> in <module>()
2 import time
3
----> 4 with LocalCluster():
5 time.sleep(60)
/envs/arch/lib/python3.5/site-packages/distributed/deploy/local.py in __init__(self, n_workers, threads_per_worker, processes, loop, start, ip, scheduler_port, silence_logs, diagnostics_port, services, worker_services, nanny, **worker_kwargs)
113
114 if start:
--> 115 sync(self.loop, self._start, ip)
116
117 clusters_to_close.add(self)
/envs/arch/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
221 e.wait(1000000)
222 if error[0]:
--> 223 six.reraise(*error[0])
224 else:
225 return result[0]
/envs/arch/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
684 if value.__traceback__ is not tb:
685 raise value.with_traceback(tb)
--> 686 raise value
687
688 else:
/envs/arch/lib/python3.5/site-packages/distributed/utils.py in f()
210 raise RuntimeError("sync() called from thread of running loop")
211 yield gen.moment
--> 212 result[0] = yield make_coro()
213 except Exception as exc:
214 logger.exception(exc)
/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
1053
1054 try:
-> 1055 value = future.result()
1056 except Exception:
1057 self.had_exception = True
/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
236 if self._exc_info is not None:
237 try:
--> 238 raise_exc_info(self._exc_info)
239 finally:
240 self = None
/envs/arch/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)
/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
1061 if exc_info is not None:
1062 try:
-> 1063 yielded = self.gen.throw(*exc_info)
1064 finally:
1065 # Break up a reference to itself
/envs/arch/lib/python3.5/site-packages/distributed/deploy/local.py in _start(self, ip)
145 yield self._start_all_workers(
146 self.n_workers, ncores=self.threads_per_worker,
--> 147 services=self.worker_services, **self.worker_kwargs)
148
149 self.status = 'running'
/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
1053
1054 try:
-> 1055 value = future.result()
1056 except Exception:
1057 self.had_exception = True
/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
236 if self._exc_info is not None:
237 try:
--> 238 raise_exc_info(self._exc_info)
239 finally:
240 self = None
/envs/arch/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)
/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
1061 if exc_info is not None:
1062 try:
-> 1063 yielded = self.gen.throw(*exc_info)
1064 finally:
1065 # Break up a reference to itself
/envs/arch/lib/python3.5/site-packages/distributed/deploy/local.py in _start_all_workers(self, n_workers, **kwargs)
151 @gen.coroutine
152 def _start_all_workers(self, n_workers, **kwargs):
--> 153 yield [self._start_worker(**kwargs) for i in range(n_workers)]
154
155 @gen.coroutine
/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
1053
1054 try:
-> 1055 value = future.result()
1056 except Exception:
1057 self.had_exception = True
/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
236 if self._exc_info is not None:
237 try:
--> 238 raise_exc_info(self._exc_info)
239 finally:
240 self = None
/envs/arch/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)
/envs/arch/lib/python3.5/site-packages/tornado/gen.py in callback(f)
826 for f in children:
827 try:
--> 828 result_list.append(f.result())
829 except Exception as e:
830 if future.done():
/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
236 if self._exc_info is not None:
237 try:
--> 238 raise_exc_info(self._exc_info)
239 finally:
240 self = None
/envs/arch/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)
/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
1061 if exc_info is not None:
1062 try:
-> 1063 yielded = self.gen.throw(*exc_info)
1064 finally:
1065 # Break up a reference to itself
/envs/arch/lib/python3.5/site-packages/distributed/deploy/local.py in _start_worker(self, port, processes, death_timeout, **kwargs)
170 death_timeout=death_timeout,
171 silence_logs=self.silence_logs, **kwargs)
--> 172 yield w._start()
173
174 self.workers.append(w)
/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
1053
1054 try:
-> 1055 value = future.result()
1056 except Exception:
1057 self.had_exception = True
/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
236 if self._exc_info is not None:
237 try:
--> 238 raise_exc_info(self._exc_info)
239 finally:
240 self = None
/envs/arch/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)
/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
1061 if exc_info is not None:
1062 try:
-> 1063 yielded = self.gen.throw(*exc_info)
1064 finally:
1065 # Break up a reference to itself
/envs/arch/lib/python3.5/site-packages/distributed/nanny.py in _start(self, addr_or_port)
110
111 logger.info(' Start Nanny at: %r', self.address)
--> 112 response = yield self.instantiate()
113 if response == 'OK':
114 self.loop.add_callback(self._watch)
/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
1053
1054 try:
-> 1055 value = future.result()
1056 except Exception:
1057 self.had_exception = True
/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
236 if self._exc_info is not None:
237 try:
--> 238 raise_exc_info(self._exc_info)
239 finally:
240 self = None
/envs/arch/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)
/envs/arch/lib/python3.5/site-packages/tornado/gen.py in wrapper(*args, **kwargs)
305 try:
306 orig_stack_contexts = stack_context._state.contexts
--> 307 yielded = next(result)
308 if stack_context._state.contexts is not orig_stack_contexts:
309 yielded = TracebackFuture()
/envs/arch/lib/python3.5/site-packages/distributed/nanny.py in instantiate(self, comm)
205 raise ValueError("Existing process still alive. Please kill first")
206
--> 207 q = mp_context.Queue()
208 self.process = mp_context.Process(
209 target=run_worker_fork,
/envs/arch/lib/python3.5/multiprocessing/context.py in Queue(self, maxsize)
99 '''Returns a queue object'''
100 from .queues import Queue
--> 101 return Queue(maxsize, ctx=self.get_context())
102
103 def JoinableQueue(self, maxsize=0):
/envs/arch/lib/python3.5/multiprocessing/queues.py in __init__(self, maxsize, ctx)
40 self._maxsize = maxsize
41 self._reader, self._writer = connection.Pipe(duplex=False)
---> 42 self._rlock = ctx.Lock()
43 self._opid = os.getpid()
44 if sys.platform == 'win32':
/envs/arch/lib/python3.5/multiprocessing/context.py in Lock(self)
64 '''Returns a non-recursive lock object'''
65 from .synchronize import Lock
---> 66 return Lock(ctx=self.get_context())
67
68 def RLock(self):
/envs/arch/lib/python3.5/multiprocessing/synchronize.py in __init__(self, ctx)
161
162 def __init__(self, *, ctx):
--> 163 SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
164
165 def __repr__(self):
/envs/arch/lib/python3.5/multiprocessing/synchronize.py in __init__(self, kind, value, maxvalue, ctx)
79 # process shuts down we unlink the semaphore name
80 from .semaphore_tracker import register
---> 81 register(self._semlock.name)
82 util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
83 exitpriority=0)
/envs/arch/lib/python3.5/multiprocessing/semaphore_tracker.py in register(self, name)
67 def register(self, name):
68 '''Register name of semaphore with semaphore tracker.'''
---> 69 self._send('REGISTER', name)
70
71 def unregister(self, name):
/envs/arch/lib/python3.5/multiprocessing/semaphore_tracker.py in _send(self, cmd, name)
80 # bytes are atomic, and that PIPE_BUF >= 512
81 raise ValueError('name too long')
---> 82 nbytes = os.write(self._fd, msg)
83 assert nbytes == len(msg)
84
BrokenPipeError: [Errno 32] Broken pipe
Finally, if I restart the ipython kernel, I go back to a happy state.
My versions are:
- dask=0.15.0=py35_0
- distributed=1.17.0=py35_0
- ipython=5.3.0=py35_0
- ipython_genutils=0.2.0=py35_0
- jupyter=1.0.0=py35_3
- jupyter_client=5.0.1=py35_0
- jupyter_console=5.1.0=py35_0
- jupyter_core=4.3.0=py35_0
- pyzmq=16.0.2=py35_0
- tornado=4.5.1=py35_0
On Linux version 4.1.35-pv-ts2 (gcc version 4.7.2 (Debian 4.7.2-5) ) #1 SMP Mon Dec 5 21:15:47 GMT 2016
Any updates on this issue? Appears to still be a problem with the latest version of Distributed.
No updates from me. If someone wanted to look into it I would be grateful.
@jakirkham Would you care to post the precise symptoms? Also which OS and which version of Python?
@ijstokes, please wrap your code in
if __name__ == '__main__'and retry.
@pitrou, why does this solve the issue?
@2gotgrossman because of how multiprocessing works under the hood. This is explained here:
https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods
When I first started using dask I was getting very strange performance problems and @mrocklin helped me track it down to needing to use spawn not forkserver on my laptop (we believed that Fedora was the issue). Fast forward ~1 year and I just got a fresh laptop and have been freshly installing things.
I tried dask again with default config forkserver instead of setting spawn. I noticed that the default number of workers has also been changed recently so starting a LocalCluster gives me 4 workers for 12 cores rather than 1 per core.
I found that on forkserver I get worker death, but not as bad as before and everything recovers and keeps working after the initial death. That worker death is associated with
/home/bird/miniconda3/envs/ovscrptd/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
len(cache))
Other things seem to have improved with my OS and I have reporting for that worker death. I have the output from abrtd. Here's one backtrace object in case it's useful:
{ "signal": 11
, "executable": "/home/bird/miniconda3/envs/ovscrptd/bin/python3.6"
, "stacktrace":
[ { "crash_thread": true
, "frames":
[ { "address": 139764484833953
, "build_id_offset": 25249
, "file_name": "/home/bird/miniconda3/envs/ovscrptd/lib/libffi.so.6.0.4"
}
, { "address": 139764484835224
, "build_id_offset": 26520
, "file_name": "/home/bird/miniconda3/envs/ovscrptd/lib/libffi.so.6.0.4"
} ]
}
, { "frames":
[ { "address": 139764718262743
, "build_id": "534a2a62ff86a0e893ef6748dff3018af42ab3ab"
, "build_id_offset": 1038807
, "function_name": "epoll_wait"
, "file_name": "/lib64/libc.so.6"
}
, { "address": 139764493963126
, "build_id_offset": 12150
, "file_name": "/home/bird/miniconda3/envs/ovscrptd/lib/python3.6/lib-dynload/select.cpython-36m-x86_64-linux-gnu.so"
} ]
}
, { "frames":
[ { "address": 139764718226703
, "build_id": "534a2a62ff86a0e893ef6748dff3018af42ab3ab"
, "build_id_offset": 1002767
, "function_name": "__select"
, "file_name": "/lib64/libc.so.6"
}
, { "address": 94140253262932
, "build_id_offset": 1492052
, "file_name": "/home/bird/miniconda3/envs/ovscrptd/bin/python3.6"
} ]
}
, { "frames":
[ { "address": 139764719156724
, "build_id": "624da64f4feb34bf56b25751c611682469d8bcb4"
, "build_id_offset": 73204
, "function_name": "read"
, "file_name": "/lib64/libpthread.so.0"
}
, { "address": 94140253258391
, "build_id_offset": 1487511
, "file_name": "/home/bird/miniconda3/envs/ovscrptd/bin/python3.6"
} ]
}
, { "frames":
[ { "address": 139764718216225
, "build_id": "534a2a62ff86a0e893ef6748dff3018af42ab3ab"
, "build_id_offset": 992289
, "function_name": "__poll"
, "file_name": "/lib64/libc.so.6"
}
, { "address": 139764493963972
, "build_id_offset": 12996
, "file_name": "/home/bird/miniconda3/envs/ovscrptd/lib/python3.6/lib-dynload/select.cpython-36m-x86_64-linux-gnu.so"
} ]
}
, { "frames":
[ { "address": 139764719153350
, "build_id": "624da64f4feb34bf56b25751c611682469d8bcb4"
, "build_id_offset": 69830
, "function_name": "do_futex_wait.constprop.1"
, "file_name": "/lib64/libpthread.so.0"
}
, { "address": 139764719153592
, "build_id": "624da64f4feb34bf56b25751c611682469d8bcb4"
, "build_id_offset": 70072
, "function_name": "__new_sem_wait_slow.constprop.0"
, "file_name": "/lib64/libpthread.so.0"
}
, { "address": 94140252744790
, "build_id_offset": 973910
, "file_name": "/home/bird/miniconda3/envs/ovscrptd/bin/python3.6"
} ]
}
, { "frames":
[ { "address": 139764719153350
, "build_id": "624da64f4feb34bf56b25751c611682469d8bcb4"
, "build_id_offset": 69830
, "function_name": "do_futex_wait.constprop.1"
, "file_name": "/lib64/libpthread.so.0"
}
, { "address": 139764719153592
, "build_id": "624da64f4feb34bf56b25751c611682469d8bcb4"
, "build_id_offset": 70072
, "function_name": "__new_sem_wait_slow.constprop.0"
, "file_name": "/lib64/libpthread.so.0"
}
, { "address": 94140252744790
, "build_id_offset": 973910
, "file_name": "/home/bird/miniconda3/envs/ovscrptd/bin/python3.6"
} ]
} ]
}
In the meantime, I'm going to continue using spawn as it's clearly still more stable on my machine and seems to be only fractionally slower.
(if this isn't helpful, I'm happy to completely delete my comment, I don't want to clog up a thread, but I thought this was related)
Your comments are always welcome :)
I see the same warning message UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown len(cache)).
Sometimes, it also showed BrokenPipeError.
$ python fail.py
/usr/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
len(cache))
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/queues.py", line 240, in _feed
send_bytes(obj)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/lib/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
$ python fail.py
/usr/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
len(cache))
$ python fail.py
/usr/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
len(cache))
You can reproduce the warning message with the following code (save as fail.py):
from pathlib import Path
import dask.array as da
#import threading
class ZarrReader:
def __init__(self, input_folder, pattern="*.zarr"):
try:
self.input_path = next(Path(input_folder).glob(pattern))
except StopIteration:
raise RuntimeError(
"'{}' folder doesn't have Zarr file".format(input_folder)
)
# self._lock = threading.Lock()
def _tile_generator(self):
level_array = da.from_zarr(
str(self.input_path)
)
block_row_size, block_col_size, _ = level_array.numblocks
for block_row in range(block_row_size):
for block_col in range(block_col_size):
block_array = level_array.blocks[block_row, block_col].compute()
yield level_array, block_array, (
0,
block_col,
block_row,
)
def generator(self):
gen = self._tile_generator()
while True:
# with self._lock:
yield next(gen)
def load_zarr():
for level_array, block_array, index in ZarrReader(
'.',
pattern='fail.zarr',
).generator():
b = block_array
if __name__ == '__main__':
if len(list(Path('.').glob('fail.zarr'))) == 0:
from numcodecs import Blosc
compressor = Blosc()
x = da.random.random((1000, 1000, 3), chunks=(100, 100,3))
x.to_zarr('fail.zarr', compressor=compressor, overwrite=True)
from dask.distributed import LocalCluster, Client
# cluster = LocalCluster(host='0.0.0.0', n_workers=1, threads_per_worker=12)
# client = Client(cluster)
client = Client(threads_per_worker=1)
load_zarr()
With client = Client(processes=False, threads_per_worker=1) instead of client = Client(threads_per_worker=1)
,
the problem doesn't occurs as it uses uses the same process for workers.
The following version of dask/zarr was used.
zarr==2.4.0
dask==2.16.0
distributed==2.16.0
Python version is Python 3.6.9 on Ubuntu 18.04.
Please help resolve the issue!
@gigony, I wonder if this is not actually a Dask issue, but a Zarr issue. In particular issue ( https://github.com/zarr-developers/numcodecs/issues/230 ) looks similar. Are you able to reproduce this without using Dask (using Zarr only)?
@jakirkham
You're right, it looks like a Zarr's issue (with npy_stack, it doesn't happen) but not sure if it is caused by Zarr itself or it happened with the combination of dask.distribution package.
With npy_stack, it doesn't show the warning message.
from pathlib import Path
import dask.array as da
class ZarrReader:
def __init__(self, input_folder, pattern="*.zarr"):
try:
self.input_path = next(Path(input_folder).glob(pattern))
except StopIteration:
raise RuntimeError(
"'{}' folder doesn't have {} file".format(input_folder, pattern)
)
# self._lock = threading.Lock()
def _tile_generator(self):
if str(self.input_path).endswith('zarr'):
level_array = da.from_zarr(
str(self.input_path)
)
else:
level_array = da.from_npy_stack(
str(self.input_path)
)
block_array = level_array.blocks[0,0].compute()
yield block_array
def generator(self):
gen = self._tile_generator()
while True:
# with self._lock:
yield next(gen)
def load_zarr(pattern="fail.zarr"):
for block_array in ZarrReader(
'.',
pattern=pattern,
).generator():
b = block_array
print("Done:", b.shape)
if __name__ == '__main__':
if len(list(Path('.').glob('fail.zarr'))) == 0:
from numcodecs import Blosc
compressor = Blosc()
x = da.random.random((100, 100, 3), chunks=(100, 100,3))
x.to_zarr('fail.zarr', compressor=compressor, overwrite=True)
if len(list(Path('.').glob('fail.npystack'))) == 0:
x = da.random.random((100, 100, 3), chunks=(100, 100,3))
da.to_npy_stack('fail.npystack', x)
from dask.distributed import LocalCluster, Client
# cluster = LocalCluster(host='0.0.0.0', n_workers=1, threads_per_worker=1)
# client = Client(cluster) # 'fail'
client = Client(threads_per_worker=1) # 'fail'
# client = Client(processes=False, threads_per_worker=1) # 'not fail'
load_zarr() # 'fail' with zarr
#load_zarr('fail.npystack') # 'not fail' with npystack
FYI, I have attached a debugging information (how the leaking semaphore was registered). semaphore created by from numcodecs import blosc was not unregistered during the execution.
FrameInfo(frame=<frame object at 0x174b838>, filename='/home/gbae/.virtualenvs/dp/lib/python3.6/site-packages/numcodecs/__init__.py', lineno=45, function='<module>',
code_context=[' from numcodecs import blosc\n'], index=0),
Thanks for checking! Yeah I think it is this Numcodecs issue ( https://github.com/zarr-developers/numcodecs/issues/230 ). @quasiben ran into the same thing yesterday.
I am just now having this error, using dask_jobqueue, dask.distributed on a SLURM cluster.
The context of the error is the following: I am trying to evaluate a tensorflow model using the tf.data API. At some point I am using tf.py_function to load the data from h5 files because the tensorflow loading functions for h5 are not mature yet.
The loading process is done in parallel in tf so I guess that's where the multiprocessing forking happens.
All the links in this issue are to my actual code, which is a bit complex but I will try to build a reproducible example very soon, but I figured given the nature of this issue that describing a bit my problem was maybe a good place to start.
The issue is accompanied by a lot of memory warnings like:
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 75.99 GB -- Worker memory limit: 80.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 75.99 GB -- Worker memory limit: 80.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 75.99 GB -- Worker memory limit: 80.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 75.99 GB -- Worker memory limit: 80.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 75.99 GB -- Worker memory limit: 80.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 75.99 GB -- Worker memory limit: 80.00 GB
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
/gpfslocalsup/pub/anaconda-py3/2019.10/envs/fastmri-tf-2.1.0/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
len(cache))
The worker is then killed, relaunched and the same thing happens, this time without relaunch.
The error I get on the client side is:
KilledWorker: ('evaluate_ncpdnet-037ab7e3fb5540921de119c8e6a20d4f', <Worker 'tcp://10.148.5.101:39901', name: 3, memory: 0, processing: 1>)
But I guess this is just because the worker has been killed due to the memory leak.
When running locally, with LocalCluster, I have the memory error, and the restarting workers, but I don't have the semaphore issue.
However, it's clear that the issue comes from using dask.distributed because if I simply run the function as is there is no problem.
So I have an example but it's not really minimal, here it is:
# imports
from dask.distributed import Client, LocalCluster
import h5py
import numpy as np
import tensorflow as tf
# cluster/client
cluster = LocalCluster()
cluster.scale(1)
client = Client(cluster)
# data shapes
K_shape_single_coil = (35, 640, 322)
I_shape = (35, 320, 320)
contrast = 'CORPD_FBK'
# data creation
def create_data(filename, multicoil=False, train=True):
k_shape = K_shape_single_coil
if multicoil:
k_shape = K_shape_multi_coil
if train:
image_ds = "reconstruction_esc"
if multicoil:
image_ds = "reconstruction_rss"
image = np.random.normal(size=I_shape)
image = image.astype(np.float32)
else:
mask_shape = [K_shape_multi_coil[-1]]
mask = np.random.choice(a=[True, False], size=mask_shape)
af = np.sum(mask.astype(int)) / mask_shape[0]
kspace = np.random.normal(size=k_shape) + 1j * np.random.normal(size=k_shape)
kspace = kspace.astype(np.complex64)
with h5py.File(filename, "w") as h5_obj:
h5_obj.create_dataset("kspace", data=kspace)
if train:
h5_obj.create_dataset(image_ds, data=image)
else:
h5_obj.create_dataset('mask', data=mask)
h5_obj.attrs['acquisition'] = contrast
if not train:
return af
n_h5 = 200 # only works with a large amount of data
for i in range(n_h5):
create_data(f'test_{i}.h5')
# dataset functions
def from_train_file_to_image_and_kspace_and_contrast(filename):
with h5py.File(filename, 'r') as h5_obj:
kspace = h5_obj['kspace'][()]
image = h5_obj['reconstruction_esc'][()]
contrast = h5_obj.attrs['acquisition']
return image, kspace, contrast
def handle_data(images, kspaces):
pad_sizes = [
(0, 0), # batch dimension
(0, 0), # coil dimension
] + [
(0, 800), # nx
(0, 800), # ny
]
kspace_padded = tf.pad(kspaces[:, None], pad_sizes)
nc_kspace = tf.signal.fft2d(kspace_padded)
return nc_kspace
IM_SIZE = (640, 400)
def very_simple_fun(**kwargs):
image_size = (640, 400)
acq_type='radial'
compute_dcomp = False
acq_kwargs = {'us': 4}
def _tf_filename_to_image_and_kspace_and_contrast(filename):
def _from_train_file_to_image_and_kspace_and_contrast_tensor_to_tensor(filename):
filename_str = filename.numpy()
image, kspace, contrast = from_train_file_to_image_and_kspace_and_contrast(
filename_str,
)
return tf.convert_to_tensor(image), tf.convert_to_tensor(kspace), tf.convert_to_tensor(contrast)
[image, kspace, contrast] = tf.py_function(
_from_train_file_to_image_and_kspace_and_contrast_tensor_to_tensor,
[filename],
[tf.float32, tf.complex64, tf.string],
)
return image, kspace, contrast
path = './'
files_ds = tf.data.Dataset.list_files(f'{path}*.h5', shuffle=False)
files_ds = files_ds.shuffle(
buffer_size=1000,
seed=0,
reshuffle_each_iteration=False,
)
image_and_kspace_and_contrast_ds = files_ds.map(
_tf_filename_to_image_and_kspace_and_contrast,
num_parallel_calls=tf.data.experimental.AUTOTUNE,
)
image_and_kspace_ds = image_and_kspace_and_contrast_ds.map(
lambda image, kspace, tf_contrast: (image, kspace),
num_parallel_calls=tf.data.experimental.AUTOTUNE,
)
masked_kspace_ds = image_and_kspace_ds.map(
handle_data,
num_parallel_calls=tf.data.experimental.AUTOTUNE,
).repeat()
val_set = masked_kspace_ds.take(1)
res = next(val_set.as_numpy_iterator())
return res
# function submission
future = client.submit(
very_simple_fun,
)
# bug zone
res = client.gather(future)
It only happens with enough examples and having the last map be with num_parallel_calls=tf.data.experimental.AUTOTUNE.
I am sorry for the length of this example, but the code I trimmed was far bigger than this before. If it's a problem I can try to simplify it further.
You can check that calling very_simple_fun() causes no error.
Versions:
If you need any others don't hesitate to ask.
Just to be clear, this example doesn't give the classical semaphore issue, but on the cluster I have seen this memory leak associated with the semaphore issue so I figured they were the same.
Also when running in colab, that is without processes, the error doesn't appear.
The purpose of this issue is to track cases of semaphore leaks with view of hopefully getting to the root cause and fixing it (assuming it is a problem!). This issue is to also hold any useful notes whilst narrowing down the problems.
Things that would be really helpful:
- Anyone being able to supply a concrete reproducer of a semaphore leak coming from dask/distributed.
If the leak is not (easily) reproducible, anecdotal evidence about what was running at the time including things like:
- The setup of
dask-scheduleranddask-workers.- Roughly what the application code was doing at the time.
- If any signals were sent to the code (e.g.
SIGINT).- The reported message.
Many thanks for your help.
do we know what is causing this issue?
Regardless, I have one minimal example with pytorch:
def minimal(nb_iteraions=1000):
from torchmeta.datasets.helpers import omniglot
from torchmeta.utils.data import BatchMetaDataLoader
dataset = omniglot("data", ways=5, shots=5, test_shots=15, meta_train=True, download=True)
dataloader = BatchMetaDataLoader(dataset, batch_size=16, num_workers=4)
i = 0
for batch in dataloader:
train_inputs, train_targets = batch["train"]
print(f'--\ni = {i}')
print('Train inputs shape: {0}'.format(train_inputs.shape)) # (16, 25, 1, 28, 28)
print('Train targets shape: {0}'.format(train_targets.shape)) # (16, 25)
test_inputs, test_targets = batch["test"]
print('Test inputs shape: {0}'.format(test_inputs.shape)) # (16, 75, 1, 28, 28)
print('Test targets shape: {0}'.format(test_targets.shape)) # (16, 75)
i += 1
if i > nb_iteraions:
return # halt
if __name__ == '__main__':
import time
from uutils import report_times
start = time.time()
#test_torchmeta_good_accumulator()
minimal()
#minimal_miniImagenet()
time_passed_msg, _, _, _ = report_times(start)
print(time_passed_msg)
I run it with debug mode in vscode. Sometimes it sends the error other times it does not. It's a weird error.
Run this it should install everything you need:
conda install -y pytorch torchvision -c pytorch
conda install -y torchvision
pip install torchmeta
Regardless, I have one minimal example with pytorch:
def minimal(nb_iteraions=1000): from torchmeta.datasets.helpers import omniglot from torchmeta.utils.data import BatchMetaDataLoader dataset = omniglot("data", ways=5, shots=5, test_shots=15, meta_train=True, download=True) dataloader = BatchMetaDataLoader(dataset, batch_size=16, num_workers=4) i = 0 for batch in dataloader: train_inputs, train_targets = batch["train"] print(f'--\ni = {i}') print('Train inputs shape: {0}'.format(train_inputs.shape)) # (16, 25, 1, 28, 28) print('Train targets shape: {0}'.format(train_targets.shape)) # (16, 25) test_inputs, test_targets = batch["test"] print('Test inputs shape: {0}'.format(test_inputs.shape)) # (16, 75, 1, 28, 28) print('Test targets shape: {0}'.format(test_targets.shape)) # (16, 75) i += 1 if i > nb_iteraions: return # halt if __name__ == '__main__': import time from uutils import report_times start = time.time() #test_torchmeta_good_accumulator() minimal() #minimal_miniImagenet() time_passed_msg, _, _, _ = report_times(start) print(time_passed_msg)I run it with debug mode in vscode. Sometimes it sends the error other times it does not. It's a weird error.
Run this it should install everything you need:
conda install -y pytorch torchvision -c pytorch conda install -y torchvision pip install torchmeta
Oh I think my error only happens when I halt the script in the middle. So far I can't reproduce it if I let the script end by itself.
Most helpful comment
@ijstokes, please wrap your code in
if __name__ == '__main__'and retry.