Julia: Proposal: allow for immutable structures that don't get passed to each batch of a pmap.

Created on 18 May 2017  Â·  20Comments  Â·  Source: JuliaLang/julia

Consider the following MWE:

function dosomethingwitharr(bigarray, n)
    l = floor(Int, abs(sin(n)) * (n-1)) + 1
    return bigarray'[l,:]
end
function doparallel(bigarray, inds)
    inc = @parallel (+) for i in inds
        dosomethingwitharr(bigarray, i)
    end
    return inc
end

function dopmap(bigarray, inds)
    inc = sum(pmap((s) -> begin
        v = dosomethingwitharr(bigarray, s)
        return v
    end, inds))
    return inc
end

I get the following results with -p4:

julia> @everywhere include("testpar.jl")

julia> bigarr = rand(5000,5000);

julia> @time y = doparallel(bigarr, 1:1000);
 80.115457 seconds (256.56 k allocations: 14.034 MiB)

julia> @time z = dopmap(bigarr, 1:1000);
197.356092 seconds (956.73 k allocations: 49.034 MiB, 0.05% gc time)

julia> all(isapprox.(y, z))
true

I am guessing that bigarr is being passed in each iteration of pmap but is only being passed once with @parallel, but I don't know why that should be the case.

Also: if dosomethingwitharr() allocates float64's on the order of size(bigarr,1), the performance difference is 10x instead of ~3x.

parallel

Most helpful comment

I found using a CachingPool with pmap

Is there a reason not to use a CachingPool by default?

All 20 comments

Following @ChrisRackauckas' suggestion to add type assertions, I rewrote the code a bit:

function dosomethingwitharr(bigarray::AbstractMatrix{Float64}, n::Int)::Vector{Float64}
    l = floor(Int, abs(sin(n)) * (n-1)) + 1
    return bigarray'[l,:]
end
function doparallel(bigarray::AbstractMatrix{Float64}, inds::AbstractVector{Int})::Vector{Float64}
    inc = @parallel (+) for i in inds
        dosomethingwitharr(bigarray, i)
    end
    return inc
end

function dopmap(bigarray::AbstractMatrix{Float64}, inds::AbstractVector{Int})::Vector{Float64}
    inc = sum(pmap((s) -> begin
        v = dosomethingwitharr(bigarray, s)
        return v
    end, inds))
    return inc
end

but it didn't help:

julia> @time y = doparallel(bigarr, 1:1000);
 90.298477 seconds (306.32 k allocations: 16.782 MiB)

julia> @time z = dopmap(bigarr, 1:1000);
205.351537 seconds (967.17 k allocations: 125.852 MiB, 0.12% gc time)

julia> all(isapprox.(y,z))
true

You get

julia> all(isapprox.(y, z))
true

so the results are consistent, right? Is it the difference in @time that worries you? The two constructs are pretty different and that is explained in https://docs.julialang.org/en/stable/manual/parallel-computing/#parallel-map-and-loops.

The results are consistent (which is good). The differences in @time are concerning. I understand that pmap should be preferred to @parallel. How do I make equivalent (performant) code?

One more data point: to remove all @code_warntype warnings about Any, I type asserted inc. Here are the results:

julia> @time y = doparallel(bigarr, 1:1000);
 86.938445 seconds (306.63 k allocations: 16.927 MiB)

julia> @time z = dopmap(bigarr, 1:1000);
218.840166 seconds (967.51 k allocations: 126.146 MiB, 0.10% gc time)

julia> @code_warntype doparallel(bigarr, 1:1000);
Variables:
  #self#::#doparallel
  bigarray::Array{Float64,2}
  inds::UnitRange{Int64}
  inc::Array{Float64,1}
  #7::##7#8{Array{Float64,2}}

Body:
  begin
      # meta: location distributed/macros.jl # line 157:
      #7::##7#8{Array{Float64,2}} = $(Expr(:new, ##7#8{Array{Float64,2}}, :(bigarray)))
      # meta: pop location
      SSAValue(2) = $(Expr(:invoke, MethodInstance for preduce(::Function, ::Function, ::UnitRange{Int64}), :(Base.Distributed.preduce), :(Main.+), :(#7), :(inds)))
      inc::Array{Float64,1} = (Core.typeassert)((Base.convert)(Array{Float64,1}, SSAValue(2))::Any, Array{Float64,1})::Array{Float64,1} # line 9:
      return inc::Array{Float64,1}
  end::Array{Float64,1}

julia> @code_warntype dopmap(bigarr, 1:1000);
Variables:
  #self#::#dopmap
  bigarray::Array{Float64,2}
  inds::UnitRange{Int64}
  inc::Array{Float64,1}
  #9::##9#10{Array{Float64,2}}

Body:
  begin
      #9::##9#10{Array{Float64,2}} = $(Expr(:new, ##9#10{Array{Float64,2}}, :(bigarray)))
      SSAValue(2) = $(Expr(:invoke, MethodInstance for #pmap#213(::Array{Any,1}, ::Function, ::Function, ::UnitRange{Int64}), :(Base.Distributed.#pmap#213), :($(Expr(:foreigncall, :(:jl_alloc_array_1d), Array{Any,1}, svec(Any, Int64), Array{Any,1}, 0, 0, 0))), :(Main.pmap), :(#9), :(inds)))
      SSAValue(3) = (Main.sum)(SSAValue(2))::Any
      inc::Array{Float64,1} = (Core.typeassert)((Base.convert)(Array{Float64,1}, SSAValue(3))::Any, Array{Float64,1})::Array{Float64,1} # line 17:
      return inc::Array{Float64,1}
  end::Array{Float64,1}

julia> all(isapprox.(y,z))
true

I suggested @sbromberger use pmap because in his actual case the iterations are much longer. But even in that case (the case where pmap should be doing better, and in many ways it has better behavior with its work sharing) it still has this issue, which we have boiled down to be related to enclosing a large data structure. At least from the description of pmap in the manual, it should be faster when the iteration times get large, even if there's a large piece of data that's sent.

So it looks like pmap is re-sending the data structure (bigarray) with each batch. When I increase the batch size to div(n, nprocs()-1), I get equivalent performance between pmap and @parallel:

julia> @time z = dopmap(bigarr, 1:1000; batch_size=(div(1000, (nprocs()-1))));
 90.059483 seconds (29.89 k allocations: 77.633 MiB, 0.11% gc time)

But this obviously negates any advantage of using pmap over @parallel, with the disadvantage that the @parallel code is easier to read.

I found using a CachingPool with pmap - https://docs.julialang.org/en/latest/stdlib/parallel.html#Base.Distributed.CachingPool- with your MWE return slightly better times than @parallel.

I found using a CachingPool with pmap

Is there a reason not to use a CachingPool by default?

With caching pool:

julia> @time y = doparallel(bigarr, 1:500);
 45.159037 seconds (306.62 k allocations: 16.797 MiB)

julia> @time z = dopmap(bigarr, 1:500; batch_size=1);
 41.522238 seconds (162.17 k allocations: 45.769 MiB, 0.07% gc time)

julia> @time z = dopmap(bigarr, 1:500; batch_size=(div(500, (nprocs()-1))));
 48.243192 seconds (240.77 k allocations: 55.799 MiB, 0.05% gc time)

Proposal: create a CachingPool if one isn't already passed in to pmap.

julia> @time pmap(identity, 1:10^4);
  0.599005 seconds (927.26 k allocations: 27.170 MiB, 1.52% gc time)

julia> @time pmap(cp, identity, 1:10^4);
  0.781404 seconds (1.17 M allocations: 33.237 MiB, 1.18% gc time)

The CachingPool introduces a slight overhead as seen above which is offset only when the closure is capturing a large amount of data.

It seems to me that most parallel operations in the real world will involve non-trivial data transfer. I propose to make CachingPool the default, overridable via a pmap option.

Edit: especially if we're deprecating @parallel.

The CachingPool introduces a slight overhead as seen above which is offset only when the closure is capturing a large amount of data.

If that's a worst-case scenario, then its still not bad. pmap is supposed to be used for expensive parallel calculations, so worrying about the small slowdown on the essentially zero-calculation case is a non-issue. If anything, it should cache by default, and have an keyword to turn that off.

oops.

I suggest we close this issue and open a new one to discuss whether a CachingPool should be the default for pmap.

Thanks @amitmurthy - I'll do that.

Just an update: it turns out that, even with CachingPool, @parallel still beats pmap in my real-world code. I'm now fiddling with batch sizes to try to optimize, but this is a bit of strange behavior.

Not surprising - if the workload is balanced, @parallel will result in the minimum number of remote calls. pmap is more appropriate for larger unbalanced workloads.

@amitmurthy thanks. I managed to get slightly better performance with pmap when batch_size is set to max(1, div(nloop, nworkers() / 2)). I'll keep experimenting.

@sbromberger Is batch_size=max(1, div(nloop, nworkers() / 2)) giving better results than batch_size=max(1, div(nloop, nworkers()))?

Was this page helpful?
0 / 5 - 0 ratings

Related issues

felixrehren picture felixrehren  Â·  3Comments

StefanKarpinski picture StefanKarpinski  Â·  3Comments

Keno picture Keno  Â·  3Comments

sbromberger picture sbromberger  Â·  3Comments

arshpreetsingh picture arshpreetsingh  Â·  3Comments