Currently, I have several test files being executed that all require using a dask client that is being setup via a PyTest fixture:
from dask.distributed import Client, LocalCluster
import pytest
@pytest.fixture(scope="module")
def dask_client():
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
client = Client(cluster)
yield client
# teardown
client.close()
cluster.close()
This exists at the top of each test file and then, the dask_client is accessed with:
def test_one(dask_client):
...
def test_two(dask_client):
...
def test_three(dask_client):
...
Based on my reading of the PyTest documentation, it is my understanding that the dask_client is created once at the start of the execution of the test file (with scope="module"), each test within the test file is executed, and then the dask_client is torn down before the next test file (that also requires a dask_client) does the same thing.
Since the LocalCluster is initially setup with n_workers=2, threads_per_worker=2, I naively expected the maximum number of cores to be 2 and the number of threads per core to also be 2. However, according to the Activity Monitor on my 13" Macbook Pro, I see the number of threads climb to 16 for one process:

Note that I don't have any other Python processes running. All of the Python processes shown in the image appear to be the result of tests starting/stopping and the dask_client tear down is catching up. However, occasionally, by simply re-running the exact same test suite multiple times, we'll encounter a CancelledError:
../../miniconda3/lib/python3.7/site-packages/distributed/client.py:1885: in gather
asynchronous=asynchronous,
../../miniconda3/lib/python3.7/site-packages/distributed/client.py:767: in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
../../miniconda3/lib/python3.7/site-packages/distributed/utils.py:345: in sync
raise exc.with_traceback(tb)
../../miniconda3/lib/python3.7/site-packages/distributed/utils.py:329: in f
result[0] = yield future
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tornado.gen.Runner object at 0x1c27e07860>
def run(self) -> None:
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if future is None:
raise Exception("No pending future")
if not future.done():
return
self.future = None
try:
exc_info = None
try:
> value = future.result()
E concurrent.futures._base.CancelledError
../../miniconda3/lib/python3.7/site-packages/tornado/gen.py:735: CancelledError
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
============================================================ 1 failed, 175 passed in 52.44s ============================================================
Error: pytest encountered exit code 1
Based on my past experience, a CancelledError is common when running a distributed cluster and when there are differences in Python packages installed. However, in this case, we are running a LocalCluster and it appears that all of the resources are being used up and tornado is hanging. Again, the CancelledError happens sporadically when I re-run the exact same test suite multiple times.
I'm guessing that I'm doing things incorrectly or my assumptions are incorrect. Is there a correct/proper way to use Dask LocalCluster with PyTest so that all tests are limited to only 2 cores and 2 threads per core (instead of getting up to 16 threads)?
Initially, a hacky way around this was to limit the total number of tests within each test file which resulted in a test suite with many separate test files (that would each setup/tear down its own dask_client) but with only a handful of tests in each test file. This seemed to help ensure that the number of threads being used wouldn't keep climbing. However, this solution is no longer sufficient and I'm still seeing the same CancelledError as my test suite grows. I've also tried adding cluster restarts inbetween tests, adding a few seconds of sleep time after tear down, or setting up/tearing down dask_client at the test level but this significantly slows down the execution of the test suite.
The test suite can be found here
Sorry, I haven't looked too closely at the tests in your repo, but is restructuring the tests to be more similar to distributed's an option? Specifically, using the gen_cluster decorator, as described in https://distributed.dask.org/en/latest/develop.html#writing-tests (that's a bit out of date, going to update it now)?
I strongly recommend using the dask.distributed testing harness if it makes
sense for your situation. We test for and clean up a lot of things there.
If you don't want to use gen_cluster (because it's async) there are also
pytest fixtures for clients that are running synchronously with external
processes.
On Tue, Mar 3, 2020 at 5:13 AM Tom Augspurger notifications@github.com
wrote:
Sorry, I haven't looked too closely at the tests in your repo, but is
restructuring the tests to be more similar to distributed's an option?
Specifically, using the gen_cluster decorator, as described in
https://distributed.dask.org/en/latest/develop.html#writing-tests (that's
a bit out of date, going to update it now)?—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/3540?email_source=notifications&email_token=AACKZTDSR2GWHE6V7U63OATRFT67LA5CNFSM4LACIAH2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOENTNW5A#issuecomment-593943412,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTFOFMQ7G6LLWKJUE3DRFT67LANCNFSM4LACIAHQ
.
I have a few questions:
LocalCluster for my testing previously vs using @gen_cluster? I'm using dask.distributed to perform an embarrassingly parallel/chunked computation so I want to test and make sure that the single threaded computation will yield the same result as a manually chunked/embarrassingly parallel computation. Essentially, the function(s) that I need to test takes in a dask client, performs some chunked computation, and returns the results. So, it will look something like this:@numba.njit(parallel=True)
def _some_func(chunk):
# Do some computation on the chunk of data
return result
def some_func(dask_client, x):
...
chunks = split_array_into_chunks(x)
futures = []
for chunk in chunks:
futures.append(
dask_client.submit(_some_func, chunk)
)
results = dask_client.gather(futures)
return results
@gen_cluster? I can usually pass in data and filter warnings in PyTest with:test_data = [np.random.randint(1000)]
@pytest.mark.filterwarnings("ignore:\\s+Port 8787 is already in use:UserWarning")
@pytest.mark.parametrize("x", test_data)
def test_some_func(x):
y = some_func(x)
...
npt.assert_almost_equal(the_correct_answer, y)
I can get the @gen_cluster example in the dask.distributed docs to work but I can't seem to figure out how to combine both pytest.mark and @gen_cluster together in one test.
Some marks will work (e.g. filterwarnings). I think parametrize causes issues, so the actual test is written in a closure: https://github.com/dask/distributed/blob/b049bd71f8ef28adb96aa0cdd91254242c38ea2c/distributed/tests/test_actor.py#L50-L86.
@TomAugspurger that was the missing link! I think I'm now able to transition everything over to using @gen_cluster. Thank you for your help!
So, I was able to port all of my tests over to using @gen_cluster and all of the tests pass. In my testing, I separate it into two parts:
For the first part, all of the tests pass. However, for the second part (coverage testing), all of the tests pass as well but the tests that use @gen_cluster are unable to verify that the Python functions that need to be tested are actually called. I'm guessing that it's because those Python functions are being run on a different forked process. Is there a way to limit this to only 1 process and 2 threads per process? This would have to be controlled either at the command line (when calling py.test) handled via some environment variable and not controlled at the code level.
By default gen_cluster uses the Worker class, which runs things in the same
process, but in different threads. If you want to use processes then you
should add the Worker=Nanny keyword.
Maybe coverage doesn't respect code run in other threads? I'm not sure.
On Tue, Mar 3, 2020 at 11:49 AM Sean M. Law notifications@github.com
wrote:
Reopened #3540 https://github.com/dask/distributed/issues/3540.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/3540?email_source=notifications&email_token=AACKZTANE3LBNMKQPZ42KGTRFVNK7A5CNFSM4LACIAH2YY3PNVWWK3TUL52HS4DFWZEXG43VMVCXMZLOORHG65DJMZUWGYLUNFXW5KTDN5WW2ZLOORPWSZGOXBSQZJY#event-3093630119,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTEW7MLL3MACTLRGSZ3RFVNK7ANCNFSM4LACIAHQ
.
Hmmm, according to the docs for coverage.py
Coverage.py can measure multi-threaded programs by default. If you are using more exotic concurrency, with the multiprocessing, greenlet, eventlet, or gevent libraries, then coverage.py will get very confused. Use the --concurrency switch to properly measure programs using these libraries. Give it a value of multiprocessing, thread, greenlet, eventlet, or gevent. Values other than thread require the C extension.
So, in theory this should work out of the box with @gen_cluster if dask.distributed is running in threads.
Then I don't know. gen_cluster by default runs everything in the same
process.
On Tue, Mar 3, 2020 at 12:51 PM Sean M. Law notifications@github.com
wrote:
Hmmm, according to the docs for coverage.py
Coverage.py can measure multi-threaded programs by default. If you are
using more exotic concurrency, with the multiprocessing, greenlet,
eventlet, or gevent libraries, then coverage.py will get very confused. Use
the --concurrency switch to properly measure programs using these
libraries. Give it a value of multiprocessing, thread, greenlet, eventlet,
or gevent. Values other than thread require the C extension.So, in theory this should work out of the box with @gen_cluster.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/3540?email_source=notifications&email_token=AACKZTAUVBDZYPQUEBUGF73RFVUVBA5CNFSM4LACIAH2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOENVDDLA#issuecomment-594162092,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTCT7YOMNPETATWDQYTRFVUVBANCNFSM4LACIAHQ
.
Okay, I ran a couple of more tests and it looks like the problem is with the closure (or nested function). For some reason, coverage.py isn't able to pick up anything inside of the inner function test():
test_data = [np.random.randint(1000)]
@pytest.mark.filterwarnings("ignore:\\s+Port 8787 is already in use:UserWarning")
@pytest.mark.parametrize("x", test_data)
def test_some_func(x):
@gen_cluster(client=True)
def test(c, s, a, b):
y = some_func(c, x) # Coverage can't seem to see this function call
...
npt.assert_almost_equal(the_correct_answer, y)
This works fine (i.e., coverage can easily detect the some_func function call) but this is what was causing the CancelledError in the first place:
@pytest.fixture(scope="module")
def dask_client():
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
client = Client(cluster)
yield client
# teardown
client.close()
cluster.close()
test_data = [np.random.randint(1000)]
@pytest.mark.filterwarnings("ignore:\\s+Port 8787 is already in use:UserWarning")
@pytest.mark.parametrize("x", test_data)
def test_some_func(x, dask_client):
@gen_cluster(client=True)
def test(c, s, a, b):
# Do nothing in the closure
return
y = some_func(dask_client, x)
...
npt.assert_almost_equal(the_correct_answer, y)
But in this case, when we move the some_func function call inside of the closure (but still using the client that is generated from the pytest fixture), coverage cannot detect the some_func function call:
@pytest.fixture(scope="module")
def dask_client():
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
client = Client(cluster)
yield client
# teardown
client.close()
cluster.close()
test_data = [np.random.randint(1000)]
@pytest.mark.filterwarnings("ignore:\\s+Port 8787 is already in use:UserWarning")
@pytest.mark.parametrize("x", test_data)
def test_some_func(x, dask_client):
@gen_cluster(client=True)
def test(c, s, a, b):
# Note that we use a dask_client from the pytest fixture above
y = some_func(dask_client, x) # Coverage can't seem to see this function call
...
npt.assert_almost_equal(the_correct_answer, y)
Is there a more manual way that I could perform the same setup/teardown that would avoid the closure? So, it would look something like:
def test_some_func(x):
# Add dask client setup code here
y = some_func(c, x) # Coverage can't seem to see this function call
...
npt.assert_almost_equal(the_correct_answer, y)
# Add dask client tear down code
It may be worth making gen_cluster a proper pytest fixture or mark so that we play more nicely with parametrize and others. I'm not really familiar with how that's done though.
gen_cluster today takes some keywords, so I'm not sure how easy it would be
to make it into a proper fixture unfortunately. When I need to interact
with parameterize and others I tend to create a scheduler/workers/client
manually within the test with async with context managers, and I also
include the cleanup context manager.
On Wed, Mar 4, 2020 at 9:31 AM Tom Augspurger notifications@github.com
wrote:
It may be worth making gen_cluster a proper pytest fixture or mark so
that we play more nicely with parametrize and others. I'm not really
familiar with how that's done though.—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/3540?email_source=notifications&email_token=AACKZTAW76IFQSF6756C4DDRFZQ2VA5CNFSM4LACIAH2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOENYDXBQ#issuecomment-594557830,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTBTHNNS3I2YNYJG7VLRFZQ2VANCNFSM4LACIAHQ
.
@mrocklin Can you point me to any good examples where you used async with context manager with the cleanup context manager when interacting with parameterize? I could try emulating it in my code
If you grep for async with Scheduler you'll find some. Here is an example
@pytest.mark.asyncio
@pytest.mark.parametrize("Worker", [Worker, Nanny])
async def test_protocol_from_scheduler_address(Worker):
ucp = pytest.importorskip("ucp")
async with Scheduler(protocol="ucx") as s:
assert s.address.startswith("ucx://")
async with Worker(s.address) as w:
assert w.address.startswith("ucx://")
async with Client(s.address, asynchronous=True) as c:
info = c.scheduler_info()
assert info["address"].startswith("ucx://")
One thing that is still perplexing from my original code is that when I did:
@pytest.fixture(scope="module")
def dask_client():
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
client = Client(cluster)
yield client
# teardown
client.close()
cluster.close()
def test_some_func_1():
...
def test_some_func_2():
...
This successfully sets up the LocalCluster and tears it down after all of the tests are run. However, the number of threads get up to double digits (when I expected only 2 threads):

Is this normal? Is there something else that I need to do in order to teardown the LocalCluster properly or more elegantly?
That sounds like unexpected behavior, but also not entirely unexpected.
Cleaning things up reliably is hard. You might want to take a look at the
cleanup fixture, which waits until everything gets properly cleaned up.
It's also included in our testing fixtures, which ensure that resources
like threads/processes/file-descriptors all get cleaned up.
On Fri, Mar 13, 2020 at 1:42 PM Sean M. Law notifications@github.com
wrote:
One thing that is still perplexing from my original code is that when I
did:@pytest.fixture(scope="module")
def dask_client():
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
client = Client(cluster)
yield client
# teardown
client.close()
cluster.close()def test_some_func_1():
...def test_some_func_2():
...This successfully sets up the LocalCluster and tears it down after all of
the tests are run. However, the number of threads get up to double digits:[image: threads]
https://user-images.githubusercontent.com/7473521/75742886-15b60680-5cdd-11ea-899c-fcfa3dca3d69.pngIs this normal? Is there something else that I need to do in order to
teardown the LocalCluster properly or more elegantly?—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/3540#issuecomment-598902968,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTFE6IVXRZA26BKIG2LRHKLCNANCNFSM4LACIAHQ
.
@seanlaw your approach of defining a cluster fixture instead of using the client one is brilliant. I proposed adopting that in https://github.com/microsoft/LightGBM/pull/4159 which was merged today and reduced the CI time from 20 minutes to 3 minutes. The folks at xgboost are looking into adopting it as well (https://github.com/dmlc/xgboost/issues/6816).
I think this approach should be in https://distributed.dask.org/en/latest/develop.html#writing-tests
Most helpful comment
@seanlaw your approach of defining a cluster fixture instead of using the client one is brilliant. I proposed adopting that in https://github.com/microsoft/LightGBM/pull/4159 which was merged today and reduced the CI time from 20 minutes to 3 minutes. The folks at xgboost are looking into adopting it as well (https://github.com/dmlc/xgboost/issues/6816).
I think this approach should be in https://distributed.dask.org/en/latest/develop.html#writing-tests