Julia: Proposal: Make creation of CachingPool a default for pmap

Created on 18 May 2017  ยท  10Comments  ยท  Source: JuliaLang/julia

Ref #21940

In most parallel operations I've come across, there is at least some data that moves to the processor. Because it appears that pmap sends function arguments to workers for every block (determined by block_size, default=1), this means that pmaps default behavior results in large inefficiencies when functions within the pmap block reference large data structures.

@amitmurthy has suggested, and testing has confirmed, that the use of Distributed.CachingPool will reduce the amount of data transferred to functions called within a pmap block.

The use of caching pools incurs a small amount of overhead, which is more apparent when pmap doesn't result in data transfers of appreciable size. This is less common in real-world situations, though.

This proposal is to modify pmap so that it creates a caching pool equal to the number of workers by default, with an optional override when pmap is used without data transfer. This provides the expected performance gains from pmap use under most real-world conditions out-of-the-box, with the flexibility to tune things in these less-frequent cases.

Right now, there is a proposal to deprecate @parallel for loops. @parallel does not suffer from this issue, and therefore provides better performance than pmap when data is processed by workers. This proposal would insure that pmap is at least as performant as @parallel once the latter is deprecated.

parallel

Most helpful comment

I did a short benchmark of a few pmap use cases relevant to this issue. Specifically timings related to the use the closures vs named functions, with a regular pmap vs one with CachingPool for different lengths of data referenced by the mapping function and length of range over which pmap is executed.

The actual computation is negligible, just returns a length of the referenced array for each element. The benchmark was to compare the network and serialization overhead in pmap.

All runs with 4 workers.

Code here - https://gist.github.com/amitmurthy/aaeff92d70399c45d00a50414a54b85f

A. Regular pmap with closures pointing to global data:
results = 7ร—3 Named Array{Float64,2}
pmap collection length โ•ฒ input num elements โ”‚    100    1000   10000
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
10^0 element Array{Float64,1}               โ”‚ 0.0065  0.0638  0.5933
10^1 element Array{Float64,1}               โ”‚ 0.0065  0.0628  0.6125
10^2 element Array{Float64,1}               โ”‚ 0.0071  0.0599  0.6247
10^3 element Array{Float64,1}               โ”‚ 0.0068  0.0628  0.6265
10^4 element Array{Float64,1}               โ”‚ 0.0073  0.0679  0.6622
10^5 element Array{Float64,1}               โ”‚ 0.0131  0.1203  1.1875
10^6 element Array{Float64,1}               โ”‚ 0.0798  0.7104   7.322

B. Regular pmap with defined functions pointing to global data:
results = 7ร—3 Named Array{Float64,2}
pmap collection length โ•ฒ input num elements โ”‚    100    1000   10000
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
10^0 element Array{Float64,1}               โ”‚ 0.0055  0.0521  0.5385
10^1 element Array{Float64,1}               โ”‚ 0.0056  0.0519  0.5597
10^2 element Array{Float64,1}               โ”‚ 0.0058  0.0577  0.5582
10^3 element Array{Float64,1}               โ”‚ 0.0064  0.0589  0.5319
10^4 element Array{Float64,1}               โ”‚  0.006   0.052  0.5592
10^5 element Array{Float64,1}               โ”‚ 0.0063  0.0539  0.5591
10^6 element Array{Float64,1}               โ”‚ 0.0059   0.054  0.5275

C. Regular pmap with local data captured in a closure:
results = 7ร—3 Named Array{Float64,2}
pmap collection length โ•ฒ input num elements โ”‚     100     1000    10000
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
10^0 element Array{Float64,1}               โ”‚  0.0069   0.0687   0.6513
10^1 element Array{Float64,1}               โ”‚  0.0073   0.0626   0.6575
10^2 element Array{Float64,1}               โ”‚  0.0071   0.0658   0.6942
10^3 element Array{Float64,1}               โ”‚  0.0081   0.0768   0.7517
10^4 element Array{Float64,1}               โ”‚  0.0104   0.1057   0.9813
10^5 element Array{Float64,1}               โ”‚  0.0304   0.2659     2.49
10^6 element Array{Float64,1}               โ”‚  0.2453   2.0951  20.6758

D. pmap with a caching pool created for each call. Using closures:
results = 7ร—3 Named Array{Float64,2}
pmap collection length โ•ฒ input num elements โ”‚    100    1000   10000
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
10^0 element Array{Float64,1}               โ”‚ 0.0075  0.0711  0.7222
10^1 element Array{Float64,1}               โ”‚ 0.0082  0.0692  0.7243
10^2 element Array{Float64,1}               โ”‚ 0.0073  0.0685  0.7189
10^3 element Array{Float64,1}               โ”‚ 0.0076  0.0681  0.7345
10^4 element Array{Float64,1}               โ”‚ 0.0077  0.0738  0.7313
10^5 element Array{Float64,1}               โ”‚  0.008  0.0716   0.733
10^6 element Array{Float64,1}               โ”‚ 0.0155  0.0811  0.7215

E. pmap with a caching pool used across all calls. Time cached closures call.:
results = 7ร—3 Named Array{Float64,2}
pmap collection length โ•ฒ input num elements โ”‚    100    1000   10000
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
10^0 element Array{Float64,1}               โ”‚ 0.0069  0.0678  0.7176
10^1 element Array{Float64,1}               โ”‚ 0.0069  0.0727  0.6985
10^2 element Array{Float64,1}               โ”‚ 0.0077  0.0664  0.7229
10^3 element Array{Float64,1}               โ”‚ 0.0075  0.0666  0.7206
10^4 element Array{Float64,1}               โ”‚ 0.0071  0.0674  0.7353
10^5 element Array{Float64,1}               โ”‚ 0.0078  0.0692  0.7293
10^6 element Array{Float64,1}               โ”‚  0.007  0.0687  0.7237

Each row is for a given length of the array referenced by a closure/named function.

The column is the length of the range over which pmap is executed.

The fastest of course is when the mapping function is a named function referring to global constants. This is the time in the second block (B).

The slowest is closures referencing local arrays (3rd block, C)

Closures referencing global arrays (block A) perform better, in fact comparable to named functions at smaller data sizes and range length(block B), but there seems to be a slowdown as the input data length increases and referenced array length increases. This is the same behavior seen in closures referencing local arrays (3rd block, C) too.

Using a CachingPool to cache closures does help (blocks D & E). The timings are now just dependent on the pmap mapping range length and not on the data referenced by the closure. There also does not seem to be any appreciable overhead due to the CachingPool at lower sizes.

All 10 comments

Right now, there is a proposal to deprecate @parallel for loops. @parallel does not suffer from this issue, and therefore provides better performance than pmap when data is processed by workers. This proposal would insure that pmap is at least as performant as @parallel once the latter is deprecated.

I think this is misguided. It is just a matter of default value for batch_size. If you set batch_size=div(length(looprange),nworkers())ish then pmap is essentially the same as @parallel except for the reduction (which could be solved by exporting a pmapreduce). You could argue that the default value of batch_size should change but the optimal value is problem dependent.

I really don't think there's a reason not to do this. The post which shows the overhead in pretty much a worst-case scenario wasn't even that bad:

https://github.com/JuliaLang/julia/issues/21940#issuecomment-302450158

In any case where the iterations are costly (which are cases you should use pmap) that doesn't matter, and any case where the data is large then caching is much much better. I think turning off caching should be something you can do with a keyword argument, but it should definitely cache by default. If anything, the manual should be very explicit about the fact that it does not cache, because that's very unintuitive behavior. Don't take it from me, take it from a whole thread of people who thought they know parallelism who all didn't know to use a CachingPool:

https://discourse.julialang.org/t/help-with-parallelism/3615

Because it appears that pmap sends function arguments to workers for every block

The function arguments have to be sent for each execution. A CachingPool ensures that the function itself is sent only once to each worker. This is beneficial when the function is a closure referencing a large chunk of data.

The function arguments have to be sent for each execution. A CachingPool ensures that the function itself is sent only once to each worker. This is beneficial when the function is a closure referencing a large chunk of data.

Yes, thanks for the clarification.

@andreasnoack

I think this is misguided. It is just a matter of default value for batch_size. If you set batch_size=div(length(looprange),nworkers())ish then pmap is essentially the same as @parallel except for the reduction (which could be solved by exporting a pmapreduce).

Except for the fact that you can get better results from pmap with a CachingPool because it does not allow workers to become/remain idle if there's still work in the queue (that is the primary advantage of setting a block size of 1), and it eliminates the per-batch data transfer.

Except for the fact that you can get better results from pmap with a CachingPool because it does not allow workers to become/remain idle if there's still work in the queue (that is the primary advantage of setting a block size of 1), and it eliminates the per-batch data transfer.

I don't think we disagree on that. I'm just arguing for deprecation of @parallel for.

I'm just arguing for deprecation of @parallel for.

If this proposal is accepted, I'm 100% on board. Until it is, we can expect a whole bunch of confusion as folks reimplement using pmap with defaults and get horrible performance.

Maybe we should just use a regular WorkerPool for named functions, and a CachingPool for anonymous functions. It will only be the latter that would capture data. And explicitly document this behavior.

I did a short benchmark of a few pmap use cases relevant to this issue. Specifically timings related to the use the closures vs named functions, with a regular pmap vs one with CachingPool for different lengths of data referenced by the mapping function and length of range over which pmap is executed.

The actual computation is negligible, just returns a length of the referenced array for each element. The benchmark was to compare the network and serialization overhead in pmap.

All runs with 4 workers.

Code here - https://gist.github.com/amitmurthy/aaeff92d70399c45d00a50414a54b85f

A. Regular pmap with closures pointing to global data:
results = 7ร—3 Named Array{Float64,2}
pmap collection length โ•ฒ input num elements โ”‚    100    1000   10000
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
10^0 element Array{Float64,1}               โ”‚ 0.0065  0.0638  0.5933
10^1 element Array{Float64,1}               โ”‚ 0.0065  0.0628  0.6125
10^2 element Array{Float64,1}               โ”‚ 0.0071  0.0599  0.6247
10^3 element Array{Float64,1}               โ”‚ 0.0068  0.0628  0.6265
10^4 element Array{Float64,1}               โ”‚ 0.0073  0.0679  0.6622
10^5 element Array{Float64,1}               โ”‚ 0.0131  0.1203  1.1875
10^6 element Array{Float64,1}               โ”‚ 0.0798  0.7104   7.322

B. Regular pmap with defined functions pointing to global data:
results = 7ร—3 Named Array{Float64,2}
pmap collection length โ•ฒ input num elements โ”‚    100    1000   10000
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
10^0 element Array{Float64,1}               โ”‚ 0.0055  0.0521  0.5385
10^1 element Array{Float64,1}               โ”‚ 0.0056  0.0519  0.5597
10^2 element Array{Float64,1}               โ”‚ 0.0058  0.0577  0.5582
10^3 element Array{Float64,1}               โ”‚ 0.0064  0.0589  0.5319
10^4 element Array{Float64,1}               โ”‚  0.006   0.052  0.5592
10^5 element Array{Float64,1}               โ”‚ 0.0063  0.0539  0.5591
10^6 element Array{Float64,1}               โ”‚ 0.0059   0.054  0.5275

C. Regular pmap with local data captured in a closure:
results = 7ร—3 Named Array{Float64,2}
pmap collection length โ•ฒ input num elements โ”‚     100     1000    10000
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
10^0 element Array{Float64,1}               โ”‚  0.0069   0.0687   0.6513
10^1 element Array{Float64,1}               โ”‚  0.0073   0.0626   0.6575
10^2 element Array{Float64,1}               โ”‚  0.0071   0.0658   0.6942
10^3 element Array{Float64,1}               โ”‚  0.0081   0.0768   0.7517
10^4 element Array{Float64,1}               โ”‚  0.0104   0.1057   0.9813
10^5 element Array{Float64,1}               โ”‚  0.0304   0.2659     2.49
10^6 element Array{Float64,1}               โ”‚  0.2453   2.0951  20.6758

D. pmap with a caching pool created for each call. Using closures:
results = 7ร—3 Named Array{Float64,2}
pmap collection length โ•ฒ input num elements โ”‚    100    1000   10000
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
10^0 element Array{Float64,1}               โ”‚ 0.0075  0.0711  0.7222
10^1 element Array{Float64,1}               โ”‚ 0.0082  0.0692  0.7243
10^2 element Array{Float64,1}               โ”‚ 0.0073  0.0685  0.7189
10^3 element Array{Float64,1}               โ”‚ 0.0076  0.0681  0.7345
10^4 element Array{Float64,1}               โ”‚ 0.0077  0.0738  0.7313
10^5 element Array{Float64,1}               โ”‚  0.008  0.0716   0.733
10^6 element Array{Float64,1}               โ”‚ 0.0155  0.0811  0.7215

E. pmap with a caching pool used across all calls. Time cached closures call.:
results = 7ร—3 Named Array{Float64,2}
pmap collection length โ•ฒ input num elements โ”‚    100    1000   10000
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
10^0 element Array{Float64,1}               โ”‚ 0.0069  0.0678  0.7176
10^1 element Array{Float64,1}               โ”‚ 0.0069  0.0727  0.6985
10^2 element Array{Float64,1}               โ”‚ 0.0077  0.0664  0.7229
10^3 element Array{Float64,1}               โ”‚ 0.0075  0.0666  0.7206
10^4 element Array{Float64,1}               โ”‚ 0.0071  0.0674  0.7353
10^5 element Array{Float64,1}               โ”‚ 0.0078  0.0692  0.7293
10^6 element Array{Float64,1}               โ”‚  0.007  0.0687  0.7237

Each row is for a given length of the array referenced by a closure/named function.

The column is the length of the range over which pmap is executed.

The fastest of course is when the mapping function is a named function referring to global constants. This is the time in the second block (B).

The slowest is closures referencing local arrays (3rd block, C)

Closures referencing global arrays (block A) perform better, in fact comparable to named functions at smaller data sizes and range length(block B), but there seems to be a slowdown as the input data length increases and referenced array length increases. This is the same behavior seen in closures referencing local arrays (3rd block, C) too.

Using a CachingPool to cache closures does help (blocks D & E). The timings are now just dependent on the pmap mapping range length and not on the data referenced by the closure. There also does not seem to be any appreciable overhead due to the CachingPool at lower sizes.

If a workerpool is unspecified, the default worker pool is used. The default pool is actually elastic in nature, i.e., at any time it consists of all workers currently available. Newly added workers while a pmap is active are automatically added to the pool. We need to update CachingPool with an option that makes it elastic before we can default to using it for pmap.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

m-j-w picture m-j-w  ยท  3Comments

TotalVerb picture TotalVerb  ยท  3Comments

StefanKarpinski picture StefanKarpinski  ยท  3Comments

felixrehren picture felixrehren  ยท  3Comments

manor picture manor  ยท  3Comments