The current @threads for
loop breaks the work into equal length chunks and runs them on available pooled threads. This could be improved by using a work stealing technique like the one used by Rust Rayon in which the work to be done is queued and threads that have some free time can pop work from this queue.
Work stealing can result in faster parallel execution due to these significant advantages:
1) More efficient cache utilization: all threads use similar memory locations. Unlike the current implementation where memory is divided into large chunks with work stealing each thread works on a single item per time
2) Load balancing: when an item takes more time to process other threads will not wait for it to finish, instead they will continue stealing work. In the current implementation one thread can finish all work significantly faster than others and just wait for all to finish instead of working
Haven't written an implementation yet, just wanted to start a discussion
The current implementation can be used to implement this, of course; see ImageFiltering, which queues up a task list and then uses @threads
to iterate over the list, each thread grabbing the next task.
This is on our roadmap for multithreading.
Admittedly it's not obvious, but this is implied by issues such as #14494 and #18335. We could use some refactoring of the issues on this topic.
Standard work stealing does not improve cache efficiency, on the contrary. You need some very clever tricks for locality awareness to get that. See The Data Locality of Work Stealing for instance.
Load balancing, for sure.
But I'm hoping we'll get the best of all worlds with PDF scheduling. Coming soon!
This sounds like cool work!
@JeffBezanson, would it be in order if I opened up a post on a general roadmap/to-do on Discourse with regards to future design of multithreading?
Some of us on Gitter today where wondering if there was a handy reference for the roadmap you mention.
This could help keeping up with the developing ideas.
Thanks!
(@kpamnany, thanks for sharing the parallel depth first (PDF) scheduling paper! If you would recommend additional material along those lines, please share!)
I did some test with nested threaded loops and performance is severely degraded. Here's the code and benchmarking. The code is taken from the tests of the save nested thread fix.
edit - I assumed that 0.6rc2 is including the "Band aid". Perhaps not?
versioninfo()
Julia Version 0.6.0-rc2.0
Commit 68e911be53* (2017-05-18 02:31 UTC)
Platform Info:
OS: Linux (x86_64-linux-gnu)
CPU: Intel(R) Core(TM) i5-4570 CPU @ 3.20GHz
WORD_SIZE: 64
BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Haswell)
LAPACK: libopenblas64_
LIBM: libopenlibm
LLVM: libLLVM-3.9.1 (ORCJIT, haswell)
function testnestedloop()
a = zeros(Int, 10000, 10000)
Threads.@threads for i in 1:10000
Threads.@threads for j in 1:10000
a[j, i] = i + j
end
end
return a
end
function testloop()
a = zeros(Int, 10000, 10000)
for i in 1:10000
for j in 1:10000
a[j, i] = i + j
end
end
return a
end
@benchmark testnestedloop()
BenchmarkTools.Trial:
memory estimate: 5.05 GiB
allocs estimate: 236655143
--------------
minimum time: 13.538 s (7.93% GC)
median time: 13.538 s (7.93% GC)
mean time: 13.538 s (7.93% GC)
maximum time: 13.538 s (7.93% GC)
--------------
samples: 1
evals/sample: 1
vs.
@benchmark testloop()
BenchmarkTools.Trial:
memory estimate: 762.94 MiB
allocs estimate: 2
--------------
minimum time: 155.824 ms (1.54% GC)
median time: 163.314 ms (1.47% GC)
mean time: 170.175 ms (5.50% GC)
maximum time: 233.299 ms (31.00% GC)
--------------
samples: 30
evals/sample: 1
Getting rid of one of the Threads.@threads
bring back the performance.
function testnestedloop()
a = zeros(Int, 10000, 10000)
Threads.@threads for i in 1:10000
for j in 1:10000
a[j, i] = i + j
end
end
return a
end
testnestedloop (generic function with 1 method)
julia> @benchmark testnestedloop()
BenchmarkTools.Trial:
memory estimate: 762.94 MiB
allocs estimate: 3
--------------
minimum time: 149.190 ms (0.00% GC)
median time: 157.355 ms (0.00% GC)
mean time: 171.763 ms (8.29% GC)
maximum time: 261.659 ms (33.20% GC)
--------------
samples: 30
evals/sample: 1
The band aid is to make it work not necessarily making it fast. It won't be faster than a normal loop and it'll still have all the other problems (i.e. closure lowering). The slowdown has nothing to do with scheduling though. Given the large number of allocation this is due to https://github.com/JuliaLang/julia/issues/15276 and can be work arounded in a similar way.
Just checked locally and that's indeed the case. The lowered AST for the first loop body is
Variables:
#self#::##2#threadsfor_fun#1{Array{Int64,2},UnitRange{Int64}}
onethread::Bool
i@_3::Core.Box
i@_4::Int64
range::Core.Box
threadsfor_fun::##14#threadsfor_fun#2{Array{Int64,2}}
#temp#@_7::Int64
r@_8::UnitRange{Int64}
lenr::Int64
tid::Int64
#temp#@_11::Int64
len::Int64
rem::Int64
f::Int64
l::Int64
#temp#@_16::Bool
z@_17::Int64
b@_18::Bool
#temp#@_19::Int64
z@_20::Int64
b@_21::Bool
#temp#@_22::Int64
r@_23::Int64
ret::Int64
Body:
begin
NewvarNode(:(tid::Int64))
NewvarNode(:(len::Int64))
NewvarNode(:(rem::Int64))
NewvarNode(:(f::Int64))
NewvarNode(:(l::Int64))
r@_8::UnitRange{Int64} = (Core.getfield)(#self#::##2#threadsfor_fun#1{Array{Int64,2},UnitRange{Int64}}, Symbol("#1#range"))::UnitRange{Int64}
#= line 31 =#
$(Expr(:inbounds, false))
# meta: location range.jl length 393
# meta: location checked.jl checked_sub 221
SSAValue(9) = (Base.Checked.checked_ssub_int)((Core.getfield)(r@_8::UnitRange{Int64}, :stop)::Int64, (Core.getfield)(r@_8::UnitRange{Int64}, :start)::Int64)::Tuple{Int64,Bool}
SSAValue(12) = (Base.getfield)(SSAValue(9), 1)::Int64
SSAValue(14) = (Base.getfield)(SSAValue(9), 2)::Bool
#= line 222 =#
unless SSAValue(14) goto 17
(Base.Checked.throw)($(QuoteNode(OverflowError())))::Union{}
17:
# meta: pop location
# meta: location checked.jl checked_add 164
SSAValue(16) = (Base.Checked.checked_sadd_int)(SSAValue(12), 1)::Tuple{Int64,Bool}
SSAValue(19) = (Base.getfield)(SSAValue(16), 1)::Int64
SSAValue(21) = (Base.getfield)(SSAValue(16), 2)::Bool
#= line 165 =#
unless SSAValue(21) goto 26
(Base.Checked.throw)($(QuoteNode(OverflowError())))::Union{}
26:
# meta: pop location
# meta: pop location
$(Expr(:inbounds, :pop))
#= line 33 =#
unless onethread::Bool goto 38
#= line 34 =#
tid::Int64 = 1
#= line 35 =#
len::Int64 = SSAValue(19)
rem::Int64 = 0
goto 60
38:
#= line 37 =#
$(Expr(:inbounds, false))
# meta: location threadingconstructs.jl threadid 10
SSAValue(23) = $(Expr(:foreigncall, :(:jl_threadid), Int16, svec()))
# meta: pop location
$(Expr(:inbounds, :pop))
tid::Int64 = (Base.add_int)((Base.sext_int)(Int64, SSAValue(23))::Int64, 1)::Int64
#= line 38 =#
$(Expr(:inbounds, false))
# meta: location threadingconstructs.jl nthreads 19
SSAValue(24) = (Base.Threads.cglobal)(:jl_n_threads, Base.Threads.Cint)::Ptr{Int32}
SSAValue(25) = (Base.pointerref)(SSAValue(24), 1, 1)::Int32
# meta: pop location
$(Expr(:inbounds, :pop))
SSAValue(26) = (Base.sext_int)(Int64, SSAValue(25))::Int64
SSAValue(30) = (Base.checked_sdiv_int)(SSAValue(19), SSAValue(26))::Int64
SSAValue(31) = (Base.checked_srem_int)(SSAValue(19), SSAValue(26))::Int64
SSAValue(32) = (Base.add_int)(1, 1)::Int64
len::Int64 = SSAValue(30)
SSAValue(33) = (Base.add_int)(2, 1)::Int64
rem::Int64 = SSAValue(31)
60:
#= line 41 =#
unless (len::Int64 === 0)::Bool goto 71
#= line 42 =#
unless (Base.slt_int)(rem::Int64, tid::Int64)::Bool goto 67
#= line 43 =#
return
67:
#= line 45 =#
len::Int64 = 1
rem::Int64 = 0
71:
#= line 48 =#
f::Int64 = (Base.add_int)(1, (Base.mul_int)((Base.sub_int)(tid::Int64, 1)::Int64, len::Int64)::Int64)::Int64
#= line 49 =#
l::Int64 = (Base.sub_int)((Base.add_int)(f::Int64, len::Int64)::Int64, 1)::Int64
#= line 51 =#
unless (Base.slt_int)(0, rem::Int64)::Bool goto 90
#= line 52 =#
unless (Base.sle_int)(tid::Int64, rem::Int64)::Bool goto 85
#= line 53 =#
f::Int64 = (Base.add_int)(f::Int64, (Base.sub_int)(tid::Int64, 1)::Int64)::Int64
#= line 54 =#
l::Int64 = (Base.add_int)(l::Int64, tid::Int64)::Int64
goto 90
85:
#= line 56 =#
f::Int64 = (Base.add_int)(f::Int64, rem::Int64)::Int64
#= line 57 =#
l::Int64 = (Base.add_int)(l::Int64, rem::Int64)::Int64
90:
#= line 61 =#
SSAValue(34) = f::Int64
SSAValue(35) = (Base.select_value)((Base.sle_int)(f::Int64, l::Int64)::Bool, l::Int64, (Base.sub_int)(f::Int64, 1)::Int64)::Int64
#temp#@_7::Int64 = SSAValue(34)
95:
unless (Base.not_int)((#temp#@_7::Int64 === (Base.add_int)(SSAValue(35), 1)::Int64)::Bool)::Bool goto 166
i@_3::Core.Box = $(Expr(:new, :(Core.Box)))
range::Core.Box = $(Expr(:new, :(Core.Box)))
SSAValue(36) = #temp#@_7::Int64
SSAValue(37) = (Base.add_int)(#temp#@_7::Int64, 1)::Int64
i@_4::Int64 = SSAValue(36)
#temp#@_7::Int64 = SSAValue(37)
#= line 62 =#
$(Expr(:inbounds, false))
# meta: location abstractarray.jl unsafe_getindex 884
$(Expr(:inbounds, true))
SSAValue(27) = i@_4::Int64
$(Expr(:inbounds, false))
# meta: location range.jl getindex 476
ret::Int64 = (Base.sub_int)((Base.add_int)((Core.getfield)(r@_8::UnitRange{Int64}, :start)::Int64, SSAValue(27))::Int64, 1)::Int64
#= line 477 =#
112:
113:
# meta: pop location
$(Expr(:inbounds, :pop))
$(Expr(:inbounds, :pop))
# meta: pop location
$(Expr(:inbounds, :pop))
(Core.setfield!)(i@_3::Core.Box, :contents, ret::Int64)::Int64
#= line 63 =#
# meta: location REPL[1]
#= line 4 =#
# meta: location threadingconstructs.jl
#= line 28 =#
SSAValue(6) = $(Expr(:new, UnitRange{Int64}, 1, :((Base.select_value)((Base.sle_int)(1, 10000)::Bool, 10000, (Base.sub_int)(1, 1)::Int64)::Int64)))
(Core.setfield!)(range::Core.Box, :contents, SSAValue(6))::UnitRange{Int64}
#= line 29 =#
threadsfor_fun::##14#threadsfor_fun#2{Array{Int64,2}} = $(Expr(:new, ##14#threadsfor_fun#2{Array{Int64,2}}, :(i@_3), :(range), :((Core.getfield)(#self#, :a)::Array{Int64,2})))
#= line 67 =#
$(Expr(:inbounds, false))
# meta: location threadingconstructs.jl threadid 10
SSAValue(29) = $(Expr(:foreigncall, :(:jl_threadid), Int16, svec()))
# meta: pop location
$(Expr(:inbounds, :pop))
SSAValue(7) = (Base.not_int)(((Base.add_int)((Base.sext_int)(Int64, SSAValue(29))::Int64, 1)::Int64 === 1)::Bool)::Bool
unless SSAValue(7) goto 139
#temp#@_16::Bool = SSAValue(7)
goto 141
139:
#temp#@_16::Bool = (Core.getfield)(Base.Threads.in_threaded_loop, :x)::Bool
141:
unless #temp#@_16::Bool goto 146
#= line 69 =#
$(Expr(:invoke, MethodInstance for (::##14#threadsfor_fun#2{Array{Int64,2}})(::Bool), :(threadsfor_fun), true))::Void
goto 161
146:
#= line 71 =#
$(Expr(:inbounds, false))
# meta: location refpointer.jl setindex! 121
(Core.setfield!)(Base.Threads.in_threaded_loop, :x, true)::Bool
# meta: pop location
$(Expr(:inbounds, :pop))
#= line 73 =#
$(Expr(:foreigncall, :(:jl_threading_run), Ref{Void}, svec(Any), :(threadsfor_fun), 0))
#= line 74 =#
$(Expr(:inbounds, false))
# meta: location refpointer.jl setindex! 121
(Core.setfield!)(Base.Threads.in_threaded_loop, :x, false)::Bool
# meta: pop location
$(Expr(:inbounds, :pop))
161:
# meta: pop location
# meta: pop location
164:
goto 95
166:
return
end::Void
Note that the range
and i
are boxed.
Thanks for the feedback and info!
I wasn't expecting faster loops with nested threading, but was surprised by the results and was wondering how to solve that. :)
Should we close this in favour of the more recent #32207
FYI, FLoops.jl has parallel for
loop with load balancing, very flexible reduction syntax, and extensible executor infrastructure.
Most helpful comment
Standard work stealing does not improve cache efficiency, on the contrary. You need some very clever tricks for locality awareness to get that. See The Data Locality of Work Stealing for instance.
Load balancing, for sure.
But I'm hoping we'll get the best of all worlds with PDF scheduling. Coming soon!