Dask: Dask multiprocessing scheduler much slower than implementation with built-in multiprocessing

Created on 16 Nov 2016  路  4Comments  路  Source: dask/dask

Hi,

I was playing around with dask and it's delayed() function. Trying to do some simple benchmarks on parallel operations. I came up with a quick and dirty script to load in some XML, modify it and serialise it.
I tested it on a 4 core laptop, and played around with the number of workers (anywhere from 2-4). My implementation in dask was ~1.8x slower than single threaded, and ~4x slower than using the build-in multiprocessing module.

I first assumed that I was somehow using the wrong scheduler, but then I confirmed that it was forking the correct number of system processes. I did notice though that it never did utilise (percentage wise) them as much as using the built-in multiprocessing module.

Most helpful comment

Here's what's happening here:

Pool.map splits the input data into batches, which can be more efficient than sending single elements (but not this much more efficient). However, pickle memoizes objects, so it only sends the same object once, and then references it later. This means that you're only serializing the vast_xml data to each worker process once under multiprocessing.

In contrast, dask ends up serializing the data for every argument, since they are all called separately. This is eating up processing time. I'm not seeing the same disparity you are though (not sure why yet). On my mac the dask time is roughly 1.5 slower than multiprocessing, but still roughly 3x faster than single threaded.

Suggestions:

  • Perhaps just keep using multiprocessing. For simple, extremely parallel tasks like this that works just as well.
  • I suspect in your real workflow, you'll be loading data from files. In general, it is advised to let dask do the data loading for you. This reduces worker->worker communication, as you're not sending large chunks of data around, and also allows dask to minimize memory usage, as you're not loading everything into memory at once.
  • Batch your data, so that each task does more work. This could be as simple as defining:
@delayed(pure=False)
def parse_many(data):
    return [parse_xml(i) for i in data]

and then applying to lists of xml blobs.

  • Alternatively, dask.bag might be sufficient for your needs. It is designed for working with collections of python objects (such as strings), and doing arbitrary transformations on them. A quick mock up might look like:
import dask.bag as db
files = db.from_sequence(list_of_files)
text = files.map(load_file)
blobs = files.map(file_to_multiple_blobs).concat()
results = blobs.map(parse_xml)
results.compute()

Note that if you pass in a large list of blobs to from_sequence, dask ends up hashing your data to create a deterministic key name, which for large strings might be a bit slow.

from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)
# Upon creation, `Client` registers itself as the default scheduler, so
# all dask computations will use it by default. You can also use it
# explicitly, by passing `client.get` everywhere you would the
# multiprocessing scheduler

For many problems the distributed scheduler will perform better than the multiprocessing scheduler, as it's smarter about data-locality. For a simple map like this though it shouldn't make a huge difference.


Playing around with your code, I created 3 other versions:

  • One that uses dask.delayed, but batches the operations
  • One that uses dask.bag
  • One that uses dask.bag, plus a trick to avoid hashing the data

For the ones where it matters, I separated the graph build time and compute time, so you can see the difference. Results are here: https://gist.github.com/jcrist/b785a193d6c099385f08c8001a88dcb5. In summary, the multiprocessing scheduler has more overhead than a process pool, but it also does more. We see roughly 1.3x overhead in the best case.

All 4 comments

Attached file is the code in question:

test-xml.txt

Here's what's happening here:

Pool.map splits the input data into batches, which can be more efficient than sending single elements (but not this much more efficient). However, pickle memoizes objects, so it only sends the same object once, and then references it later. This means that you're only serializing the vast_xml data to each worker process once under multiprocessing.

In contrast, dask ends up serializing the data for every argument, since they are all called separately. This is eating up processing time. I'm not seeing the same disparity you are though (not sure why yet). On my mac the dask time is roughly 1.5 slower than multiprocessing, but still roughly 3x faster than single threaded.

Suggestions:

  • Perhaps just keep using multiprocessing. For simple, extremely parallel tasks like this that works just as well.
  • I suspect in your real workflow, you'll be loading data from files. In general, it is advised to let dask do the data loading for you. This reduces worker->worker communication, as you're not sending large chunks of data around, and also allows dask to minimize memory usage, as you're not loading everything into memory at once.
  • Batch your data, so that each task does more work. This could be as simple as defining:
@delayed(pure=False)
def parse_many(data):
    return [parse_xml(i) for i in data]

and then applying to lists of xml blobs.

  • Alternatively, dask.bag might be sufficient for your needs. It is designed for working with collections of python objects (such as strings), and doing arbitrary transformations on them. A quick mock up might look like:
import dask.bag as db
files = db.from_sequence(list_of_files)
text = files.map(load_file)
blobs = files.map(file_to_multiple_blobs).concat()
results = blobs.map(parse_xml)
results.compute()

Note that if you pass in a large list of blobs to from_sequence, dask ends up hashing your data to create a deterministic key name, which for large strings might be a bit slow.

from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)
# Upon creation, `Client` registers itself as the default scheduler, so
# all dask computations will use it by default. You can also use it
# explicitly, by passing `client.get` everywhere you would the
# multiprocessing scheduler

For many problems the distributed scheduler will perform better than the multiprocessing scheduler, as it's smarter about data-locality. For a simple map like this though it shouldn't make a huge difference.


Playing around with your code, I created 3 other versions:

  • One that uses dask.delayed, but batches the operations
  • One that uses dask.bag
  • One that uses dask.bag, plus a trick to avoid hashing the data

For the ones where it matters, I separated the graph build time and compute time, so you can see the difference. Results are here: https://gist.github.com/jcrist/b785a193d6c099385f08c8001a88dcb5. In summary, the multiprocessing scheduler has more overhead than a process pool, but it also does more. We see roughly 1.3x overhead in the best case.

Closing. Feel free to reopen if needed.

Thanks for the thorough response. When I get some time I will sit down and play around with this.
Cheers.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

abast picture abast  路  5Comments

michcio1234 picture michcio1234  路  3Comments

asmith26 picture asmith26  路  3Comments

hoangthienan95 picture hoangthienan95  路  6Comments

cornhundred picture cornhundred  路  4Comments