Distributed: Leaking semaphores tracking issue.

Created on 12 May 2017  ยท  24Comments  ยท  Source: dask/distributed

The purpose of this issue is to track cases of semaphore leaks with view of hopefully getting to the root cause and fixing it (assuming it is a problem!). This issue is to also hold any useful notes whilst narrowing down the problems.

Things that would be really helpful:

  • Anyone being able to supply a concrete reproducer of a semaphore leak coming from dask/distributed.
  • If the leak is not (easily) reproducible, anecdotal evidence about what was running at the time including things like:

    • The setup of dask-scheduler and dask-workers.

    • Roughly what the application code was doing at the time.

    • If any signals were sent to the code (e.g. SIGINT).

    • The reported message.

Many thanks for your help.

Most helpful comment

@ijstokes, please wrap your code in if __name__ == '__main__' and retry.

All 24 comments

@stuartarchibald, have you found a simple way to reproduce?

Happening to me. This program will trigger it:

import sys
from distributed import Client

client = Client()

sys.exit()

I have the latest versions of dask and distributed installed (see below for exact versions of all pkgs in conda environment). Nothing related to Python, Dask, or Bokeh are running (returns nothing):

$ ps -ef | grep -i -e bokeh -e python -e dask

Here is the output that I get before the program "hangs" (in the midst of the Client() object creation):

Traceback (most recent call last):
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/forkserver.py", line 178, in main
    _serve_one(s, listener, alive_r, handler)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/forkserver.py", line 212, in _serve_one
    code = spawn._main(child_r)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/spawn.py", line 114, in _main
    prepare(preparation_data)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/spawn.py", line 225, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/spawn.py", line 277, in _fixup_main_from_path
    run_name="__mp_main__")
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/runpy.py", line 263, in run_path
    pkg_name=pkg_name, script_name=fname)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/runpy.py", line 96, in _run_module_code
    mod_name, mod_spec, pkg_name, script_name)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/Users/ijstokes/code/anaconda-download-data/semaphore_tracker_problem.py", line 4, in <module>
    client = Client()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/client.py", line 400, in __init__
    self.start(timeout=timeout)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/client.py", line 435, in start
    sync(self.loop, self._start, **kwargs)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/utils.py", line 223, in sync
    six.reraise(*error[0])
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/six.py", line 686, in reraise
    raise value
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/utils.py", line 212, in f
    result[0] = yield make_coro()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/client.py", line 478, in _start
    yield self.cluster._start()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/deploy/local.py", line 149, in _start
    services=self.worker_services, **self.worker_kwargs)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/deploy/local.py", line 155, in _start_all_workers
    yield [self._start_worker(**kwargs) for i in range(n_workers)]
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 828, in callback
    result_list.append(f.result())
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/deploy/local.py", line 183, in _start_worker
    yield w._start()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/nanny.py", line 104, in _start
    response = yield self.instantiate()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 307, in wrapper
    yielded = next(result)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/nanny.py", line 217, in instantiate
    self.process.start()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/context.py", line 291, in _Popen
    return Popen(process_obj)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/popen_forkserver.py", line 35, in __init__
    super().__init__(process_obj)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/popen_forkserver.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/spawn.py", line 143, in get_preparation_data
    _check_not_importing_main()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

After I press CTRL-C I get this output:

^CTraceback (most recent call last):
  File "semaphore_tracker_problem.py", line 4, in <module>
    client = Client()
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/client.py", line 400, in __init__
    self.start(timeout=timeout)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/client.py", line 435, in start
    sync(self.loop, self._start, **kwargs)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/site-packages/distributed/utils.py", line 221, in sync
    e.wait(1000000)
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
Traceback (most recent call last):
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/threading.py", line 299, in wait
  File "<string>", line 1, in <module>
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/forkserver.py", line 164, in main
    rfds = [key.fileobj for (key, events) in selector.select()]
  File "/Users/ijstokes/anaconda/envs/test/lib/python3.6/selectors.py", line 577, in select
    kev_list = self._kqueue.control(None, max_ev, timeout)
KeyboardInterrupt
/Users/ijstokes/anaconda/envs/test/lib/python3.6/multiprocessing/semaphore_tracker.py:129: UserWarning: semaphore_tracker: There appear to be 192 leaked semaphores to clean up at shutdown
  len(cache))

Here is my conda environment:

$ conda list --explicit
# This file may be used to create an environment using:
# $ conda create --name <env> --file <this file>
# platform: osx-64
@EXPLICIT
https://repo.continuum.io/pkgs/free/osx-64/bokeh-0.12.5-py36_1.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/click-6.7-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/cloudpickle-0.2.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/dask-0.14.3-py36_1.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/distributed-1.16.3-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/heapdict-1.0.0-py36_1.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/jinja2-2.9.6-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/locket-0.2.0-py36_1.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/markupsafe-0.23-py36_2.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/mkl-2017.0.1-0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/msgpack-python-0.4.8-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/numpy-1.13.0-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/openssl-1.0.2l-0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/pandas-0.20.2-np113py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/partd-0.3.8-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/pip-9.0.1-py36_1.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/psutil-5.2.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/python-3.6.1-2.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/python-dateutil-2.6.0-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/pytz-2017.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/pyyaml-3.12-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/readline-6.2-2.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/requests-2.14.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/setuptools-27.2.0-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/six-1.10.0-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/sortedcollections-0.5.3-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/sortedcontainers-1.5.7-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/sqlite-3.13.0-0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/tblib-1.3.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/tk-8.5.18-0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/toolz-0.8.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/tornado-4.5.1-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/wheel-0.29.0-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/xz-5.2.2-1.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/yaml-0.1.6-0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/zict-0.1.2-py36_0.tar.bz2
https://repo.continuum.io/pkgs/free/osx-64/zlib-1.2.8-3.tar.bz2

And in case you want to try reproducing it:

$ conda list -e
# This file may be used to create an environment using:
# $ conda create --name <env> --file <this file>
# platform: osx-64
bokeh=0.12.5=py36_1
click=6.7=py36_0
cloudpickle=0.2.2=py36_0
dask=0.14.3=py36_1
distributed=1.16.3=py36_0
heapdict=1.0.0=py36_1
jinja2=2.9.6=py36_0
locket=0.2.0=py36_1
markupsafe=0.23=py36_2
mkl=2017.0.1=0
msgpack-python=0.4.8=py36_0
numpy=1.13.0=py36_0
openssl=1.0.2l=0
pandas=0.20.2=np113py36_0
partd=0.3.8=py36_0
pip=9.0.1=py36_1
psutil=5.2.2=py36_0
python=3.6.1=2
python-dateutil=2.6.0=py36_0
pytz=2017.2=py36_0
pyyaml=3.12=py36_0
readline=6.2=2
requests=2.14.2=py36_0
setuptools=27.2.0=py36_0
six=1.10.0=py36_0
sortedcollections=0.5.3=py36_0
sortedcontainers=1.5.7=py36_0
sqlite=3.13.0=0
tblib=1.3.2=py36_0
tk=8.5.18=0
toolz=0.8.2=py36_0
tornado=4.5.1=py36_0
wheel=0.29.0=py36_0
xz=5.2.2=1
yaml=0.1.6=0
zict=0.1.2=py36_0
zlib=1.2.8=3

@ijstokes, please wrap your code in if __name__ == '__main__' and retry.

@pitrou, that should have been obvious to me. Thanks for the advice. I did that and things worked fine. The actual script that I am using, however, ran for 10 minutes and then failed with the semaphore_tracker error, but the cause seems reasonably clear: a CSV parsing error.

In case it is helpful, here is the stack-trace (and if it isn't feel free to edit the comment and remove it):

/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/worker.py:636: DtypeWarning: Columns (2) have mixed types. Specify dtype option on import or set low_memory=False.
  result = function(*args, **kwargs)
distributed.utils - ERROR - could not convert string to float: '1486589$30.0'
Traceback (most recent call last):
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 212, in f
    result[0] = yield make_coro()
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1551, in _get
    result = yield self._gather(packed)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/client.py", line 996, in _gather
    traceback)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/six.py", line 685, in reraise
    raise value.with_traceback(tb)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/compatibility.py", line 47, in apply
    return func(*args, **kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/dataframe/io/csv.py", line 61, in pandas_read_text
    coerce_dtypes(df, dtypes)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/dataframe/io/csv.py", line 103, in coerce_dtypes
    df[c] = df[c].astype(dtypes[c])
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/util/_decorators.py", line 91, in wrapper
    return func(*args, **kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/generic.py", line 3299, in astype
    **kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 3224, in astype
    return self.apply('astype', dtype=dtype, **kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 3091, in apply
    applied = getattr(b, f)(**kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 471, in astype
    **kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 521, in _astype
    values = astype_nansafe(values.ravel(), dtype, copy=True)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/dtypes/cast.py", line 636, in astype_nansafe
    return arr.astype(dtype)
ValueError: could not convert string to float: '1486589$30.0'
Traceback (most recent call last):
  File "scripts/loganalysis.py", line 26, in <module>
    total = 0
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/dataframe/core.py", line 2492, in iterrows
    df = self.get_partition(i).compute()
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/base.py", line 96, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/base.py", line 203, in compute
    results = get(dsk, keys, **kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1590, in get
    resources=resources)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 223, in sync
    six.reraise(*error[0])
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/six.py", line 686, in reraise
    raise value
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 212, in f
    result[0] = yield make_coro()
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1551, in _get
    result = yield self._gather(packed)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/distributed/client.py", line 996, in _gather
    traceback)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/six.py", line 685, in reraise
    raise value.with_traceback(tb)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/compatibility.py", line 47, in apply
    return func(*args, **kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/dataframe/io/csv.py", line 61, in pandas_read_text
    coerce_dtypes(df, dtypes)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/dask/dataframe/io/csv.py", line 103, in coerce_dtypes
    df[c] = df[c].astype(dtypes[c])
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/util/_decorators.py", line 91, in wrapper
    return func(*args, **kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/generic.py", line 3299, in astype
    **kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 3224, in astype
    return self.apply('astype', dtype=dtype, **kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 3091, in apply
    applied = getattr(b, f)(**kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 471, in astype
    **kwargs)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/internals.py", line 521, in _astype
    values = astype_nansafe(values.ravel(), dtype, copy=True)
  File "/Users/ijstokes/anaconda/lib/python3.6/site-packages/pandas/core/dtypes/cast.py", line 636, in astype_nansafe
    return arr.astype(dtype)
ValueError: could not convert string to float: '1486589$30.0'
distributed.client - WARNING - Client report stream closed to scheduler
/Users/ijstokes/anaconda/lib/python3.6/multiprocessing/semaphore_tracker.py:129: UserWarning: semaphore_tracker: There appear to be 150 leaked semaphores to clean up at shutdown
  len(cache))

@ijstokes, would you be able to post a small standalone script (and accompanying CSV file) for me to try and reproduce? Thanks!

The semaphore_tracker.py is an integral part of the forkserver for reasons noted here https://github.com/python/cpython/blob/master/Lib/multiprocessing/semaphore_tracker.py#L2. I'm of the view that the presence of the semaphore_tracker reporting that there are semaphores to clean at shutdown is valid and correct if a worker fork of the forkserver that created the semaphores has been killed by other means (e.g. signal) (also note, I am guessing at dask.distributed internal implementation!). Sending a signal that cannot be caught by the forkserver's semaphore_tracker.py to the semaphore_tracker.py whilst in this state does cause a leak, but is very unlikely to happen unless someone went our of their way and sent the signal directly to that process.

I would suspect that an inadvertent SIGKILL directly to the semaphore_tracker.py process in any state would leak, this could happen if a user sends a SIGKILL to all running Python processes via the killall command.

Below are two samples, the first produces the message noted by @ijstokes, the second demonstrates a leak.

Here's a sample to produce the message:

#!/bin/bash

function count_semaphores() {
    ls /dev/shm/sem.mp-* > /dev/null 2>&1
    if [ $? -eq 0 ];then
        sem_count=`ls /dev/shm/sem.mp-*|wc -l`
    else
        sem_count=0
    fi
    echo ""
    echo "Semaphores count: ${sem_count}"
    echo ""
}

function cleandask {
    echo "Cleaning up"
    for pid in ${worker_pid} ${scheduler_pid}; do
        if ps -p  ${pid} > /dev/null; then
            echo "Sending SIGTERM to ${pid}"
            kill -15 ${pid}
        fi
    done
}

dask-scheduler --host tcp://127.0.0.1 --scheduler-file ~/.dask_scheduler_foo &
scheduler_pid=$!
dask-worker --scheduler-file ~/.dask_scheduler_foo &
worker_pid=$!

trap cleandask EXIT

count_semaphores
# wait a bit for dask
echo "Sleeping..."
sleep 15
count_semaphores
sleep 5
# signal the worker pid with SIGINT and wait for it to finish
echo "Sending SIGINT to worker"
kill -2 ${worker_pid}
wait ${worker_pid}
sleep 5
count_semaphores
exit 0

which reports:

$ bash timing_attack.sh 

Semaphores count: 0

Sleeping...
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO -   Scheduler at:      tcp://127.0.0.1:8786
distributed.scheduler - INFO -        http at:            127.0.0.1:9786
distributed.scheduler - INFO -       bokeh at:            127.0.0.1:8787
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-mbrqkzfa
distributed.scheduler - INFO - -----------------------------------------------
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:35783'
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:35826
distributed.worker - INFO -              nanny at:            127.0.0.1:35783
distributed.worker - INFO -               http at:            127.0.0.1:42934
distributed.worker - INFO -              bokeh at:            127.0.0.1:8789
distributed.worker - INFO - Waiting to connect to:       tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                    4.92 GB
distributed.worker - INFO -       Local Directory: /tmp/nanny-y5s_2oja/worker-bi4p90bu
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register tcp://127.0.0.1:35826
distributed.worker - INFO -         Registered to:             tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:35826
distributed.nanny - INFO - Nanny 'tcp://127.0.0.1:35783' starts worker process 'tcp://127.0.0.1:35826'

Semaphores count: 8

Sending SIGINT to worker
distributed.dask_worker - INFO - End worker
distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:35826
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - INFO - Removed worker tcp://127.0.0.1:35826
<path>/lib/python3.6/multiprocessing/semaphore_tracker.py:129: UserWarning: semaphore_tracker: There appear to be 5 leaked semaphores to clean up at shutdown
  len(cache))

Semaphores count: 0

Cleaning up
Sending SIGTERM to 5679
distributed.scheduler - INFO - End scheduler at 'tcp://127.0.0.1:8786'

This demonstrates a user using killall -9 ${PYTHON}:

#!/bin/bash

function count_semaphores() {
    ls /dev/shm/sem.mp-* > /dev/null 2>&1
    if [ $? -eq 0 ];then
        sem_count=`ls /dev/shm/sem.mp-*|wc -l`
    else
        sem_count=0
    fi
    echo ""
    echo "Semaphores count: ${sem_count}"
    echo ""
}

function cleandask {
    echo "Cleaning up"
    for pid in ${worker_pid} ${scheduler_pid}; do
        if ps -p  ${pid} > /dev/null; then
            echo "Sending SIGTERM to ${pid}"
            kill -15 ${pid}
        fi
    done
}

dask-scheduler --host tcp://127.0.0.1 --scheduler-file ~/.dask_scheduler_foo &
scheduler_pid=$!
dask-worker --scheduler-file ~/.dask_scheduler_foo &
worker_pid=$!

mypython=`which python`

trap cleandask EXIT

count_semaphores
# wait a bit for dask
echo "Sleeping..."
sleep 15
count_semaphores
sleep 5
# use killall with SIGKILL against python
echo "killall -9 ${mypython}"
killall -9 ${mypython}
wait ${worker_pid}
sleep 5
count_semaphores
exit 0

which produces (with a ls /dev/shm/sem.mp-* run afterwards):

$ bash timing_attack.sh; ls /dev/shm/sem.mp-*

Semaphores count: 0

Sleeping...
distributed.scheduler - INFO - -----------------------------------------------
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:43815'
distributed.scheduler - INFO -   Scheduler at:      tcp://127.0.0.1:8786
distributed.scheduler - INFO -        http at:            127.0.0.1:9786
distributed.scheduler - INFO -       bokeh at:            127.0.0.1:8787
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-22eg4ek7
distributed.scheduler - INFO - -----------------------------------------------
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:45014
distributed.worker - INFO -              nanny at:            127.0.0.1:43815
distributed.worker - INFO -               http at:            127.0.0.1:40271
distributed.worker - INFO -              bokeh at:            127.0.0.1:8789
distributed.worker - INFO - Waiting to connect to:       tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                    4.92 GB
distributed.worker - INFO -       Local Directory: /tmp/nanny-m_82kphw/worker-tgnm2dta
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register tcp://127.0.0.1:45014
distributed.worker - INFO -         Registered to:             tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:45014
distributed.nanny - INFO - Nanny 'tcp://127.0.0.1:43815' starts worker process 'tcp://127.0.0.1:45014'

Semaphores count: 8

killall -9 <path>/bin/python
timing_attack.sh: line 43:  7882 Killed                  dask-scheduler --host tcp://127.0.0.1 --scheduler-file ~/.dask_scheduler_foo
timing_attack.sh: line 43:  7883 Killed                  dask-worker --scheduler-file ~/.dask_scheduler_foo

Semaphores count: 8

Cleaning up
/dev/shm/sem.mp-35h9qgk9  /dev/shm/sem.mp-8phhzt84  /dev/shm/sem.mp-9zf45tsh  /dev/shm/sem.mp-dtadz8sg
/dev/shm/sem.mp-60f905kl  /dev/shm/sem.mp-98ut9s4s  /dev/shm/sem.mp-bvdx50qy  /dev/shm/sem.mp-em91_tkn

and that is where I suspect some of the leak reports are coming from ?

I am running into this issue, I believe. I can start a jupyter notebook with the following single cell:

from distributed import LocalCluster
import time

with LocalCluster():
    time.sleep(10)

As soon as my processes spin up, I see this:

โ”‚โ””โ”€ /envs/arch/bin/python /envs/arch/bin/jupyter notebook
โ”‚   โ””โ”€ /envs/arch/bin/python -m ipykernel_launcher -f /run/user/13586/jupyter/kernel-8643517c-b2d9-4821-8775-af89d34b8a28.json                                                                                                   
โ”‚      โ”œโ”€ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35.zi
โ”‚      โ”‚  โ”œโ”€ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35
โ”‚      โ”‚  โ”œโ”€ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35
โ”‚      โ”‚  โ”œโ”€ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35
โ”‚      โ”‚  โ”œโ”€ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35
โ”‚      โ”‚  โ”œโ”€ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35
โ”‚      โ”‚  โ”œโ”€ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35
โ”‚      โ”œโ”€ /envs/arch/bin/python -c from multiprocessing.semaphore_tracker import main;main(71)

If I allow the cell to finished, after it does so, I have this:

โ”‚โ””โ”€ /envs/arch/bin/python /envs/arch/bin/jupyter notebook                                                                                                                                     
โ”‚   โ””โ”€ /envs/arch/bin/python -m ipykernel_launcher -f /run/user/13586/jupyter/kernel-8643517c-b2d9-4821-8775-af89d34b8a28.json
โ”‚      โ”œโ”€ /envs/arch/bin/python -c from multiprocessing.forkserver import main; main(74, 75, ['distributed', 'pkg_resources'], **{'sys_path': ['/home/aklein/.ipython/notebooks', '/envs/arch/lib/python35.zi
โ”‚      โ”œโ”€ /envs/arch/bin/python -c from multiprocessing.semaphore_tracker import main;main(71)

If I interrupt the cell, I get this:

โ”‚โ””โ”€ /envs/arch/bin/python /envs/arch/bin/jupyter notebook                                                                                                                                     
โ”‚   โ””โ”€ /envs/arch/bin/python -m ipykernel_launcher -f /run/user/13586/jupyter/kernel-8643517c-b2d9-4821-8775-af89d34b8a28.json                                                                                                  
โ”‚      โ”œโ”€ python
โ”‚      โ”œโ”€ /envs/arch/bin/python -c from multiprocessing.semaphore_tracker import main;main(71)

And next time I try to run after I interrupt, I get this in the notebook:

    /envs/arch/lib/python3.5/multiprocessing/forkserver.py in connect_to_new_process(self, fds)
     64             raise ValueError('too many fds')
     65         with socket.socket(socket.AF_UNIX) as client:
---> 66             client.connect(self._forkserver_address)
     67             parent_r, child_w = os.pipe()
     68             child_r, parent_w = os.pipe()

ConnectionRefusedError: [Errno 111] Connection refused

The semaphore_tracker process doesn't respond to SIGTERM, I have to send it a SIGKILL. After that, if I try to evaluate the cell, I get this:

tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/envs/arch/lib/python3.5/site-packages/tornado/gen.py", line 828, in callback
    result_list.append(f.result())
  File "/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/envs/arch/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/envs/arch/lib/python3.5/site-packages/distributed/deploy/local.py", line 172, in _start_worker
    yield w._start()
  File "/envs/arch/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/envs/arch/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/envs/arch/lib/python3.5/site-packages/distributed/nanny.py", line 112, in _start
    response = yield self.instantiate()
  File "/envs/arch/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/envs/arch/lib/python3.5/site-packages/tornado/gen.py", line 307, in wrapper
    yielded = next(result)
  File "/envs/arch/lib/python3.5/site-packages/distributed/nanny.py", line 207, in instantiate
    q = mp_context.Queue()
  File "/envs/arch/lib/python3.5/multiprocessing/context.py", line 101, in Queue
    return Queue(maxsize, ctx=self.get_context())
  File "/envs/arch/lib/python3.5/multiprocessing/queues.py", line 42, in __init__
    self._rlock = ctx.Lock()
  File "/envs/arch/lib/python3.5/multiprocessing/context.py", line 66, in Lock
    return Lock(ctx=self.get_context())
  File "/envs/arch/lib/python3.5/multiprocessing/synchronize.py", line 163, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  File "/envs/arch/lib/python3.5/multiprocessing/synchronize.py", line 81, in __init__
    register(self._semlock.name)
  File "/envs/arch/lib/python3.5/multiprocessing/semaphore_tracker.py", line 69, in register
    self._send('REGISTER', name)
  File "/envs/arch/lib/python3.5/multiprocessing/semaphore_tracker.py", line 82, in _send
    nbytes = os.write(self._fd, msg)
BrokenPipeError: [Errno 32] Broken pipe
---------------------------------------------------------------------------
BrokenPipeError                           Traceback (most recent call last)
<ipython-input-5-5cd5d4e17e92> in <module>()
      2 import time
      3 
----> 4 with LocalCluster():
      5     time.sleep(60)

/envs/arch/lib/python3.5/site-packages/distributed/deploy/local.py in __init__(self, n_workers, threads_per_worker, processes, loop, start, ip, scheduler_port, silence_logs, diagnostics_port, services, worker_services, nanny, **worker_kwargs)
    113 
    114         if start:
--> 115             sync(self.loop, self._start, ip)
    116 
    117         clusters_to_close.add(self)

/envs/arch/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    221         e.wait(1000000)
    222     if error[0]:
--> 223         six.reraise(*error[0])
    224     else:
    225         return result[0]

/envs/arch/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    684         if value.__traceback__ is not tb:
    685             raise value.with_traceback(tb)
--> 686         raise value
    687 
    688 else:

/envs/arch/lib/python3.5/site-packages/distributed/utils.py in f()
    210                 raise RuntimeError("sync() called from thread of running loop")
    211             yield gen.moment
--> 212             result[0] = yield make_coro()
    213         except Exception as exc:
    214             logger.exception(exc)

/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

/envs/arch/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)

/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1061                     if exc_info is not None:
   1062                         try:
-> 1063                             yielded = self.gen.throw(*exc_info)
   1064                         finally:
   1065                             # Break up a reference to itself

/envs/arch/lib/python3.5/site-packages/distributed/deploy/local.py in _start(self, ip)
    145         yield self._start_all_workers(
    146             self.n_workers, ncores=self.threads_per_worker,
--> 147             services=self.worker_services, **self.worker_kwargs)
    148 
    149         self.status = 'running'

/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

/envs/arch/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)

/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1061                     if exc_info is not None:
   1062                         try:
-> 1063                             yielded = self.gen.throw(*exc_info)
   1064                         finally:
   1065                             # Break up a reference to itself

/envs/arch/lib/python3.5/site-packages/distributed/deploy/local.py in _start_all_workers(self, n_workers, **kwargs)
    151     @gen.coroutine
    152     def _start_all_workers(self, n_workers, **kwargs):
--> 153         yield [self._start_worker(**kwargs) for i in range(n_workers)]
    154 
    155     @gen.coroutine

/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

/envs/arch/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)

/envs/arch/lib/python3.5/site-packages/tornado/gen.py in callback(f)
    826             for f in children:
    827                 try:
--> 828                     result_list.append(f.result())
    829                 except Exception as e:
    830                     if future.done():

/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

/envs/arch/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)

/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1061                     if exc_info is not None:
   1062                         try:
-> 1063                             yielded = self.gen.throw(*exc_info)
   1064                         finally:
   1065                             # Break up a reference to itself

/envs/arch/lib/python3.5/site-packages/distributed/deploy/local.py in _start_worker(self, port, processes, death_timeout, **kwargs)
    170               death_timeout=death_timeout,
    171               silence_logs=self.silence_logs, **kwargs)
--> 172         yield w._start()
    173 
    174         self.workers.append(w)

/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

/envs/arch/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)

/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1061                     if exc_info is not None:
   1062                         try:
-> 1063                             yielded = self.gen.throw(*exc_info)
   1064                         finally:
   1065                             # Break up a reference to itself

/envs/arch/lib/python3.5/site-packages/distributed/nanny.py in _start(self, addr_or_port)
    110 
    111         logger.info('        Start Nanny at: %r', self.address)
--> 112         response = yield self.instantiate()
    113         if response == 'OK':
    114             self.loop.add_callback(self._watch)

/envs/arch/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

/envs/arch/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

/envs/arch/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)

/envs/arch/lib/python3.5/site-packages/tornado/gen.py in wrapper(*args, **kwargs)
    305                 try:
    306                     orig_stack_contexts = stack_context._state.contexts
--> 307                     yielded = next(result)
    308                     if stack_context._state.contexts is not orig_stack_contexts:
    309                         yielded = TracebackFuture()

/envs/arch/lib/python3.5/site-packages/distributed/nanny.py in instantiate(self, comm)
    205                 raise ValueError("Existing process still alive. Please kill first")
    206 
--> 207             q = mp_context.Queue()
    208             self.process = mp_context.Process(
    209                 target=run_worker_fork,

/envs/arch/lib/python3.5/multiprocessing/context.py in Queue(self, maxsize)
     99         '''Returns a queue object'''
    100         from .queues import Queue
--> 101         return Queue(maxsize, ctx=self.get_context())
    102 
    103     def JoinableQueue(self, maxsize=0):

/envs/arch/lib/python3.5/multiprocessing/queues.py in __init__(self, maxsize, ctx)
     40         self._maxsize = maxsize
     41         self._reader, self._writer = connection.Pipe(duplex=False)
---> 42         self._rlock = ctx.Lock()
     43         self._opid = os.getpid()
     44         if sys.platform == 'win32':

/envs/arch/lib/python3.5/multiprocessing/context.py in Lock(self)
     64         '''Returns a non-recursive lock object'''
     65         from .synchronize import Lock
---> 66         return Lock(ctx=self.get_context())
     67 
     68     def RLock(self):

/envs/arch/lib/python3.5/multiprocessing/synchronize.py in __init__(self, ctx)
    161 
    162     def __init__(self, *, ctx):
--> 163         SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
    164 
    165     def __repr__(self):

/envs/arch/lib/python3.5/multiprocessing/synchronize.py in __init__(self, kind, value, maxvalue, ctx)
     79             # process shuts down we unlink the semaphore name
     80             from .semaphore_tracker import register
---> 81             register(self._semlock.name)
     82             util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
     83                           exitpriority=0)

/envs/arch/lib/python3.5/multiprocessing/semaphore_tracker.py in register(self, name)
     67     def register(self, name):
     68         '''Register name of semaphore with semaphore tracker.'''
---> 69         self._send('REGISTER', name)
     70 
     71     def unregister(self, name):

/envs/arch/lib/python3.5/multiprocessing/semaphore_tracker.py in _send(self, cmd, name)
     80             # bytes are atomic, and that PIPE_BUF >= 512
     81             raise ValueError('name too long')
---> 82         nbytes = os.write(self._fd, msg)
     83         assert nbytes == len(msg)
     84 

BrokenPipeError: [Errno 32] Broken pipe

Finally, if I restart the ipython kernel, I go back to a happy state.

My versions are:

- dask=0.15.0=py35_0
- distributed=1.17.0=py35_0
- ipython=5.3.0=py35_0
- ipython_genutils=0.2.0=py35_0
- jupyter=1.0.0=py35_3
- jupyter_client=5.0.1=py35_0
- jupyter_console=5.1.0=py35_0
- jupyter_core=4.3.0=py35_0
- pyzmq=16.0.2=py35_0
- tornado=4.5.1=py35_0

On Linux version 4.1.35-pv-ts2 (gcc version 4.7.2 (Debian 4.7.2-5) ) #1 SMP Mon Dec 5 21:15:47 GMT 2016

Any updates on this issue? Appears to still be a problem with the latest version of Distributed.

No updates from me. If someone wanted to look into it I would be grateful.

@jakirkham Would you care to post the precise symptoms? Also which OS and which version of Python?

@ijstokes, please wrap your code in if __name__ == '__main__' and retry.

@pitrou, why does this solve the issue?

@2gotgrossman because of how multiprocessing works under the hood. This is explained here:
https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods

When I first started using dask I was getting very strange performance problems and @mrocklin helped me track it down to needing to use spawn not forkserver on my laptop (we believed that Fedora was the issue). Fast forward ~1 year and I just got a fresh laptop and have been freshly installing things.

I tried dask again with default config forkserver instead of setting spawn. I noticed that the default number of workers has also been changed recently so starting a LocalCluster gives me 4 workers for 12 cores rather than 1 per core.

I found that on forkserver I get worker death, but not as bad as before and everything recovers and keeps working after the initial death. That worker death is associated with

/home/bird/miniconda3/envs/ovscrptd/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))

Other things seem to have improved with my OS and I have reporting for that worker death. I have the output from abrtd. Here's one backtrace object in case it's useful:

{   "signal": 11
,   "executable": "/home/bird/miniconda3/envs/ovscrptd/bin/python3.6"
,   "stacktrace":
      [ {   "crash_thread": true
        ,   "frames":
              [ {   "address": 139764484833953
                ,   "build_id_offset": 25249
                ,   "file_name": "/home/bird/miniconda3/envs/ovscrptd/lib/libffi.so.6.0.4"
                }
              , {   "address": 139764484835224
                ,   "build_id_offset": 26520
                ,   "file_name": "/home/bird/miniconda3/envs/ovscrptd/lib/libffi.so.6.0.4"
                } ]
        }
      , {   "frames":
              [ {   "address": 139764718262743
                ,   "build_id": "534a2a62ff86a0e893ef6748dff3018af42ab3ab"
                ,   "build_id_offset": 1038807
                ,   "function_name": "epoll_wait"
                ,   "file_name": "/lib64/libc.so.6"
                }
              , {   "address": 139764493963126
                ,   "build_id_offset": 12150
                ,   "file_name": "/home/bird/miniconda3/envs/ovscrptd/lib/python3.6/lib-dynload/select.cpython-36m-x86_64-linux-gnu.so"
                } ]
        }
      , {   "frames":
              [ {   "address": 139764718226703
                ,   "build_id": "534a2a62ff86a0e893ef6748dff3018af42ab3ab"
                ,   "build_id_offset": 1002767
                ,   "function_name": "__select"
                ,   "file_name": "/lib64/libc.so.6"
                }
              , {   "address": 94140253262932
                ,   "build_id_offset": 1492052
                ,   "file_name": "/home/bird/miniconda3/envs/ovscrptd/bin/python3.6"
                } ]
        }
      , {   "frames":
              [ {   "address": 139764719156724
                ,   "build_id": "624da64f4feb34bf56b25751c611682469d8bcb4"
                ,   "build_id_offset": 73204
                ,   "function_name": "read"
                ,   "file_name": "/lib64/libpthread.so.0"
                }
              , {   "address": 94140253258391
                ,   "build_id_offset": 1487511
                ,   "file_name": "/home/bird/miniconda3/envs/ovscrptd/bin/python3.6"
                } ]
        }
      , {   "frames":
              [ {   "address": 139764718216225
                ,   "build_id": "534a2a62ff86a0e893ef6748dff3018af42ab3ab"
                ,   "build_id_offset": 992289
                ,   "function_name": "__poll"
                ,   "file_name": "/lib64/libc.so.6"
                }
              , {   "address": 139764493963972
                ,   "build_id_offset": 12996
                ,   "file_name": "/home/bird/miniconda3/envs/ovscrptd/lib/python3.6/lib-dynload/select.cpython-36m-x86_64-linux-gnu.so"
                } ]
        }
      , {   "frames":
              [ {   "address": 139764719153350
                ,   "build_id": "624da64f4feb34bf56b25751c611682469d8bcb4"
                ,   "build_id_offset": 69830
                ,   "function_name": "do_futex_wait.constprop.1"
                ,   "file_name": "/lib64/libpthread.so.0"
                }
              , {   "address": 139764719153592
                ,   "build_id": "624da64f4feb34bf56b25751c611682469d8bcb4"
                ,   "build_id_offset": 70072
                ,   "function_name": "__new_sem_wait_slow.constprop.0"
                ,   "file_name": "/lib64/libpthread.so.0"
                }
              , {   "address": 94140252744790
                ,   "build_id_offset": 973910
                ,   "file_name": "/home/bird/miniconda3/envs/ovscrptd/bin/python3.6"
                } ]
        }
      , {   "frames":
              [ {   "address": 139764719153350
                ,   "build_id": "624da64f4feb34bf56b25751c611682469d8bcb4"
                ,   "build_id_offset": 69830
                ,   "function_name": "do_futex_wait.constprop.1"
                ,   "file_name": "/lib64/libpthread.so.0"
                }
              , {   "address": 139764719153592
                ,   "build_id": "624da64f4feb34bf56b25751c611682469d8bcb4"
                ,   "build_id_offset": 70072
                ,   "function_name": "__new_sem_wait_slow.constprop.0"
                ,   "file_name": "/lib64/libpthread.so.0"
                }
              , {   "address": 94140252744790
                ,   "build_id_offset": 973910
                ,   "file_name": "/home/bird/miniconda3/envs/ovscrptd/bin/python3.6"
                } ]
        } ]
}

In the meantime, I'm going to continue using spawn as it's clearly still more stable on my machine and seems to be only fractionally slower.

(if this isn't helpful, I'm happy to completely delete my comment, I don't want to clog up a thread, but I thought this was related)

Your comments are always welcome :)

I see the same warning message UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown len(cache)).

Sometimes, it also showed BrokenPipeError.

$ python fail.py

/usr/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 240, in _feed
    send_bytes(obj)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

$ python fail.py
/usr/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))

$ python fail.py
/usr/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))

You can reproduce the warning message with the following code (save as fail.py):

from pathlib import Path
import dask.array as da
#import threading

class ZarrReader:
    def __init__(self, input_folder, pattern="*.zarr"):
        try:
            self.input_path = next(Path(input_folder).glob(pattern))
        except StopIteration:
            raise RuntimeError(
                "'{}' folder doesn't have Zarr file".format(input_folder)
            )
        # self._lock = threading.Lock()

    def _tile_generator(self):
        level_array = da.from_zarr(
            str(self.input_path)
        )
        block_row_size, block_col_size, _ = level_array.numblocks
        for block_row in range(block_row_size):
            for block_col in range(block_col_size):
                block_array = level_array.blocks[block_row, block_col].compute()
                yield level_array, block_array, (
                    0,
                    block_col,
                    block_row,
                )

    def generator(self):
        gen = self._tile_generator()
        while True:
            # with self._lock:
                yield next(gen)


def load_zarr():
    for level_array, block_array, index in ZarrReader(
        '.',
        pattern='fail.zarr',
    ).generator():
        b = block_array


if __name__ == '__main__':
    if len(list(Path('.').glob('fail.zarr'))) == 0:
        from numcodecs import Blosc
        compressor = Blosc()
        x = da.random.random((1000, 1000, 3), chunks=(100, 100,3))
        x.to_zarr('fail.zarr', compressor=compressor, overwrite=True)

    from dask.distributed import LocalCluster, Client
    # cluster = LocalCluster(host='0.0.0.0', n_workers=1, threads_per_worker=12)
    # client = Client(cluster)
    client = Client(threads_per_worker=1)

    load_zarr()

With client = Client(processes=False, threads_per_worker=1) instead of client = Client(threads_per_worker=1)
,
the problem doesn't occurs as it uses uses the same process for workers.

The following version of dask/zarr was used.

zarr==2.4.0
dask==2.16.0
distributed==2.16.0

Python version is Python 3.6.9 on Ubuntu 18.04.

Please help resolve the issue!

@gigony, I wonder if this is not actually a Dask issue, but a Zarr issue. In particular issue ( https://github.com/zarr-developers/numcodecs/issues/230 ) looks similar. Are you able to reproduce this without using Dask (using Zarr only)?

@jakirkham
You're right, it looks like a Zarr's issue (with npy_stack, it doesn't happen) but not sure if it is caused by Zarr itself or it happened with the combination of dask.distribution package.

With npy_stack, it doesn't show the warning message.

from pathlib import Path
import dask.array as da


class ZarrReader:
    def __init__(self, input_folder, pattern="*.zarr"):
        try:
            self.input_path = next(Path(input_folder).glob(pattern))
        except StopIteration:
            raise RuntimeError(
                "'{}' folder doesn't have {} file".format(input_folder, pattern)
            )
        # self._lock = threading.Lock()

    def _tile_generator(self):
        if str(self.input_path).endswith('zarr'):
            level_array = da.from_zarr(
                str(self.input_path)
            )
        else:
            level_array = da.from_npy_stack(
                str(self.input_path)
            )
        block_array = level_array.blocks[0,0].compute()
        yield block_array

    def generator(self):
        gen = self._tile_generator()
        while True:
            # with self._lock:
                yield next(gen)


def load_zarr(pattern="fail.zarr"):
    for block_array in ZarrReader(
        '.',
        pattern=pattern,
    ).generator():
        b = block_array
        print("Done:", b.shape)



if __name__ == '__main__':
    if len(list(Path('.').glob('fail.zarr'))) == 0:
        from numcodecs import Blosc
        compressor = Blosc()
        x = da.random.random((100, 100, 3), chunks=(100, 100,3))
        x.to_zarr('fail.zarr', compressor=compressor, overwrite=True)
    if len(list(Path('.').glob('fail.npystack'))) == 0:
        x = da.random.random((100, 100, 3), chunks=(100, 100,3))
        da.to_npy_stack('fail.npystack', x)


    from dask.distributed import LocalCluster, Client
    # cluster = LocalCluster(host='0.0.0.0', n_workers=1, threads_per_worker=1)
    # client = Client(cluster)              # 'fail'
    client = Client(threads_per_worker=1)   # 'fail'
    # client = Client(processes=False, threads_per_worker=1)   # 'not fail'

    load_zarr() # 'fail' with zarr

    #load_zarr('fail.npystack') # 'not fail' with npystack

FYI, I have attached a debugging information (how the leaking semaphore was registered). semaphore created by from numcodecs import blosc was not unregistered during the execution.

FrameInfo(frame=<frame object at 0x174b838>, filename='/home/gbae/.virtualenvs/dp/lib/python3.6/site-packages/numcodecs/__init__.py', lineno=45, function='<module>', 
  code_context=['    from numcodecs import blosc\n'], index=0), 

leaked_semaphore.txt
full_semaphore_trace.txt

Thanks for checking! Yeah I think it is this Numcodecs issue ( https://github.com/zarr-developers/numcodecs/issues/230 ). @quasiben ran into the same thing yesterday.

I am just now having this error, using dask_jobqueue, dask.distributed on a SLURM cluster.
The context of the error is the following: I am trying to evaluate a tensorflow model using the tf.data API. At some point I am using tf.py_function to load the data from h5 files because the tensorflow loading functions for h5 are not mature yet.
The loading process is done in parallel in tf so I guess that's where the multiprocessing forking happens.

All the links in this issue are to my actual code, which is a bit complex but I will try to build a reproducible example very soon, but I figured given the nature of this issue that describing a bit my problem was maybe a good place to start.

The issue is accompanied by a lot of memory warnings like:

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 75.99 GB -- Worker memory limit: 80.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 75.99 GB -- Worker memory limit: 80.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 75.99 GB -- Worker memory limit: 80.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 75.99 GB -- Worker memory limit: 80.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 75.99 GB -- Worker memory limit: 80.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 75.99 GB -- Worker memory limit: 80.00 GB
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
/gpfslocalsup/pub/anaconda-py3/2019.10/envs/fastmri-tf-2.1.0/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))

The worker is then killed, relaunched and the same thing happens, this time without relaunch.
The error I get on the client side is:

KilledWorker: ('evaluate_ncpdnet-037ab7e3fb5540921de119c8e6a20d4f', <Worker 'tcp://10.148.5.101:39901', name: 3, memory: 0, processing: 1>)

But I guess this is just because the worker has been killed due to the memory leak.

EDIT

When running locally, with LocalCluster, I have the memory error, and the restarting workers, but I don't have the semaphore issue.
However, it's clear that the issue comes from using dask.distributed because if I simply run the function as is there is no problem.

So I have an example but it's not really minimal, here it is:

# imports
from dask.distributed import Client, LocalCluster
import h5py
import numpy as np
import tensorflow as tf

# cluster/client
cluster = LocalCluster()
cluster.scale(1)
client = Client(cluster)

# data shapes
K_shape_single_coil = (35, 640, 322)
I_shape = (35, 320, 320)
contrast = 'CORPD_FBK'

# data creation
def create_data(filename, multicoil=False, train=True):
    k_shape = K_shape_single_coil
    if multicoil:
        k_shape = K_shape_multi_coil
    if train:
        image_ds = "reconstruction_esc"
        if multicoil:
            image_ds = "reconstruction_rss"
        image = np.random.normal(size=I_shape)
        image = image.astype(np.float32)
    else:
        mask_shape = [K_shape_multi_coil[-1]]
        mask = np.random.choice(a=[True, False], size=mask_shape)
        af = np.sum(mask.astype(int)) / mask_shape[0]
    kspace = np.random.normal(size=k_shape) + 1j * np.random.normal(size=k_shape)
    kspace = kspace.astype(np.complex64)
    with h5py.File(filename, "w") as h5_obj:
        h5_obj.create_dataset("kspace", data=kspace)
        if train:
            h5_obj.create_dataset(image_ds, data=image)
        else:
            h5_obj.create_dataset('mask', data=mask)
        h5_obj.attrs['acquisition'] = contrast
    if not train:
        return af

n_h5 = 200  # only works with a large amount of data
for i in range(n_h5):
    create_data(f'test_{i}.h5')


# dataset functions
def from_train_file_to_image_and_kspace_and_contrast(filename):
    with h5py.File(filename, 'r') as h5_obj:
        kspace = h5_obj['kspace'][()]
        image = h5_obj['reconstruction_esc'][()]
        contrast = h5_obj.attrs['acquisition']
    return image, kspace, contrast


def handle_data(images, kspaces):
    pad_sizes = [
        (0, 0),  # batch dimension
        (0, 0),  # coil dimension
    ] + [
        (0, 800),  # nx
        (0, 800),  # ny
    ]
    kspace_padded = tf.pad(kspaces[:, None], pad_sizes)
    nc_kspace = tf.signal.fft2d(kspace_padded)
    return nc_kspace



IM_SIZE = (640, 400)

def very_simple_fun(**kwargs):
    image_size = (640, 400)
    acq_type='radial'
    compute_dcomp = False
    acq_kwargs = {'us': 4}
    def _tf_filename_to_image_and_kspace_and_contrast(filename):
        def _from_train_file_to_image_and_kspace_and_contrast_tensor_to_tensor(filename):
            filename_str = filename.numpy()
            image, kspace, contrast = from_train_file_to_image_and_kspace_and_contrast(
                filename_str,
            )
            return tf.convert_to_tensor(image), tf.convert_to_tensor(kspace), tf.convert_to_tensor(contrast)
        [image, kspace, contrast] = tf.py_function(
            _from_train_file_to_image_and_kspace_and_contrast_tensor_to_tensor,
            [filename],
            [tf.float32, tf.complex64, tf.string],
        )
        return image, kspace, contrast

    path = './'
    files_ds = tf.data.Dataset.list_files(f'{path}*.h5', shuffle=False)
    files_ds = files_ds.shuffle(
        buffer_size=1000,
        seed=0,
        reshuffle_each_iteration=False,
    )
    image_and_kspace_and_contrast_ds = files_ds.map(
        _tf_filename_to_image_and_kspace_and_contrast,
        num_parallel_calls=tf.data.experimental.AUTOTUNE,
    )
    image_and_kspace_ds = image_and_kspace_and_contrast_ds.map(
        lambda image, kspace, tf_contrast: (image, kspace),
        num_parallel_calls=tf.data.experimental.AUTOTUNE,
    )

    masked_kspace_ds = image_and_kspace_ds.map(
        handle_data,
        num_parallel_calls=tf.data.experimental.AUTOTUNE,
    ).repeat()
    val_set = masked_kspace_ds.take(1)
    res = next(val_set.as_numpy_iterator())
    return res

# function submission
future = client.submit(
    very_simple_fun,
)

# bug zone
res = client.gather(future)

It only happens with enough examples and having the last map be with num_parallel_calls=tf.data.experimental.AUTOTUNE.

I am sorry for the length of this example, but the code I trimmed was far bigger than this before. If it's a problem I can try to simplify it further.

You can check that calling very_simple_fun() causes no error.

Versions:

  • dask-distributed: 2.20
  • tensorflow: 2.2
  • python: 3.6.8

If you need any others don't hesitate to ask.

EDIT

Just to be clear, this example doesn't give the classical semaphore issue, but on the cluster I have seen this memory leak associated with the semaphore issue so I figured they were the same.

Also when running in colab, that is without processes, the error doesn't appear.

The purpose of this issue is to track cases of semaphore leaks with view of hopefully getting to the root cause and fixing it (assuming it is a problem!). This issue is to also hold any useful notes whilst narrowing down the problems.

Things that would be really helpful:

  • Anyone being able to supply a concrete reproducer of a semaphore leak coming from dask/distributed.
  • If the leak is not (easily) reproducible, anecdotal evidence about what was running at the time including things like:

    • The setup of dask-scheduler and dask-workers.
    • Roughly what the application code was doing at the time.
    • If any signals were sent to the code (e.g. SIGINT).
    • The reported message.

Many thanks for your help.

do we know what is causing this issue?

Regardless, I have one minimal example with pytorch:

def minimal(nb_iteraions=1000):
    from torchmeta.datasets.helpers import omniglot
    from torchmeta.utils.data import BatchMetaDataLoader

    dataset = omniglot("data", ways=5, shots=5, test_shots=15, meta_train=True, download=True)
    dataloader = BatchMetaDataLoader(dataset, batch_size=16, num_workers=4)

    i = 0
    for batch in dataloader:
        train_inputs, train_targets = batch["train"]
        print(f'--\ni = {i}')
        print('Train inputs shape: {0}'.format(train_inputs.shape))    # (16, 25, 1, 28, 28)
        print('Train targets shape: {0}'.format(train_targets.shape))  # (16, 25)

        test_inputs, test_targets = batch["test"]
        print('Test inputs shape: {0}'.format(test_inputs.shape))      # (16, 75, 1, 28, 28)
        print('Test targets shape: {0}'.format(test_targets.shape))    # (16, 75)
        i += 1
        if i > nb_iteraions:
            return # halt

if __name__ == '__main__':
    import time
    from uutils import report_times

    start = time.time()
    #test_torchmeta_good_accumulator()
    minimal()
    #minimal_miniImagenet()
    time_passed_msg, _, _, _ = report_times(start)
    print(time_passed_msg)

I run it with debug mode in vscode. Sometimes it sends the error other times it does not. It's a weird error.

Run this it should install everything you need:

conda install -y pytorch torchvision -c pytorch
conda install -y torchvision
pip install torchmeta

Regardless, I have one minimal example with pytorch:

def minimal(nb_iteraions=1000):
    from torchmeta.datasets.helpers import omniglot
    from torchmeta.utils.data import BatchMetaDataLoader

    dataset = omniglot("data", ways=5, shots=5, test_shots=15, meta_train=True, download=True)
    dataloader = BatchMetaDataLoader(dataset, batch_size=16, num_workers=4)

    i = 0
    for batch in dataloader:
        train_inputs, train_targets = batch["train"]
        print(f'--\ni = {i}')
        print('Train inputs shape: {0}'.format(train_inputs.shape))    # (16, 25, 1, 28, 28)
        print('Train targets shape: {0}'.format(train_targets.shape))  # (16, 25)

        test_inputs, test_targets = batch["test"]
        print('Test inputs shape: {0}'.format(test_inputs.shape))      # (16, 75, 1, 28, 28)
        print('Test targets shape: {0}'.format(test_targets.shape))    # (16, 75)
        i += 1
        if i > nb_iteraions:
            return # halt

if __name__ == '__main__':
    import time
    from uutils import report_times

    start = time.time()
    #test_torchmeta_good_accumulator()
    minimal()
    #minimal_miniImagenet()
    time_passed_msg, _, _, _ = report_times(start)
    print(time_passed_msg)

I run it with debug mode in vscode. Sometimes it sends the error other times it does not. It's a weird error.

Run this it should install everything you need:

conda install -y pytorch torchvision -c pytorch
conda install -y torchvision
pip install torchmeta

Oh I think my error only happens when I halt the script in the middle. So far I can't reproduce it if I let the script end by itself.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

anweshknayak picture anweshknayak  ยท  6Comments

jameslamb picture jameslamb  ยท  6Comments

tom-andersson picture tom-andersson  ยท  3Comments

mrocklin picture mrocklin  ยท  6Comments

quasiben picture quasiben  ยท  5Comments