Hi,
I was playing around with dask and it's delayed() function. Trying to do some simple benchmarks on parallel operations. I came up with a quick and dirty script to load in some XML, modify it and serialise it.
I tested it on a 4 core laptop, and played around with the number of workers (anywhere from 2-4). My implementation in dask was ~1.8x slower than single threaded, and ~4x slower than using the build-in multiprocessing module.
I first assumed that I was somehow using the wrong scheduler, but then I confirmed that it was forking the correct number of system processes. I did notice though that it never did utilise (percentage wise) them as much as using the built-in multiprocessing module.
Attached file is the code in question:
Here's what's happening here:
Pool.map splits the input data into batches, which can be more efficient than sending single elements (but not this much more efficient). However, pickle memoizes objects, so it only sends the same object once, and then references it later. This means that you're only serializing the vast_xml data to each worker process once under multiprocessing.
In contrast, dask ends up serializing the data for every argument, since they are all called separately. This is eating up processing time. I'm not seeing the same disparity you are though (not sure why yet). On my mac the dask time is roughly 1.5 slower than multiprocessing, but still roughly 3x faster than single threaded.
Suggestions:
@delayed(pure=False)
def parse_many(data):
return [parse_xml(i) for i in data]
and then applying to lists of xml blobs.
dask.bag might be sufficient for your needs. It is designed for working with collections of python objects (such as strings), and doing arbitrary transformations on them. A quick mock up might look like:import dask.bag as db
files = db.from_sequence(list_of_files)
text = files.map(load_file)
blobs = files.map(file_to_multiple_blobs).concat()
results = blobs.map(parse_xml)
results.compute()
Note that if you pass in a large list of blobs to from_sequence, dask ends up hashing your data to create a deterministic key name, which for large strings might be a bit slow.
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)
# Upon creation, `Client` registers itself as the default scheduler, so
# all dask computations will use it by default. You can also use it
# explicitly, by passing `client.get` everywhere you would the
# multiprocessing scheduler
For many problems the distributed scheduler will perform better than the multiprocessing scheduler, as it's smarter about data-locality. For a simple map like this though it shouldn't make a huge difference.
Playing around with your code, I created 3 other versions:
dask.delayed, but batches the operationsdask.bagdask.bag, plus a trick to avoid hashing the dataFor the ones where it matters, I separated the graph build time and compute time, so you can see the difference. Results are here: https://gist.github.com/jcrist/b785a193d6c099385f08c8001a88dcb5. In summary, the multiprocessing scheduler has more overhead than a process pool, but it also does more. We see roughly 1.3x overhead in the best case.
Closing. Feel free to reopen if needed.
Thanks for the thorough response. When I get some time I will sit down and play around with this.
Cheers.
Most helpful comment
Here's what's happening here:
Pool.mapsplits the input data into batches, which can be more efficient than sending single elements (but not this much more efficient). However,picklememoizes objects, so it only sends the same object once, and then references it later. This means that you're only serializing thevast_xmldata to each worker process once undermultiprocessing.In contrast,
daskends up serializing the data for every argument, since they are all called separately. This is eating up processing time. I'm not seeing the same disparity you are though (not sure why yet). On my mac the dask time is roughly 1.5 slower than multiprocessing, but still roughly 3x faster than single threaded.Suggestions:
and then applying to lists of xml blobs.
dask.bagmight be sufficient for your needs. It is designed for working with collections of python objects (such as strings), and doing arbitrary transformations on them. A quick mock up might look like:Note that if you pass in a large list of blobs to
from_sequence, dask ends up hashing your data to create a deterministic key name, which for large strings might be a bit slow.For many problems the distributed scheduler will perform better than the multiprocessing scheduler, as it's smarter about data-locality. For a simple
maplike this though it shouldn't make a huge difference.Playing around with your code, I created 3 other versions:
dask.delayed, but batches the operationsdask.bagdask.bag, plus a trick to avoid hashing the dataFor the ones where it matters, I separated the graph build time and compute time, so you can see the difference. Results are here: https://gist.github.com/jcrist/b785a193d6c099385f08c8001a88dcb5. In summary, the multiprocessing scheduler has more overhead than a process pool, but it also does more. We see roughly 1.3x overhead in the best case.