Julia: Feature request: a work stealing threaded for loop

Created on 14 Mar 2017  Â·  11Comments  Â·  Source: JuliaLang/julia

The current @threads for loop breaks the work into equal length chunks and runs them on available pooled threads. This could be improved by using a work stealing technique like the one used by Rust Rayon in which the work to be done is queued and threads that have some free time can pop work from this queue.

Work stealing can result in faster parallel execution due to these significant advantages:

1) More efficient cache utilization: all threads use similar memory locations. Unlike the current implementation where memory is divided into large chunks with work stealing each thread works on a single item per time

2) Load balancing: when an item takes more time to process other threads will not wait for it to finish, instead they will continue stealing work. In the current implementation one thread can finish all work significantly faster than others and just wait for all to finish instead of working

Haven't written an implementation yet, just wanted to start a discussion

multithreading

Most helpful comment

Standard work stealing does not improve cache efficiency, on the contrary. You need some very clever tricks for locality awareness to get that. See The Data Locality of Work Stealing for instance.

Load balancing, for sure.

But I'm hoping we'll get the best of all worlds with PDF scheduling. Coming soon!

All 11 comments

The current implementation can be used to implement this, of course; see ImageFiltering, which queues up a task list and then uses @threads to iterate over the list, each thread grabbing the next task.

This is on our roadmap for multithreading.

Admittedly it's not obvious, but this is implied by issues such as #14494 and #18335. We could use some refactoring of the issues on this topic.

Standard work stealing does not improve cache efficiency, on the contrary. You need some very clever tricks for locality awareness to get that. See The Data Locality of Work Stealing for instance.

Load balancing, for sure.

But I'm hoping we'll get the best of all worlds with PDF scheduling. Coming soon!

This sounds like cool work!
@JeffBezanson, would it be in order if I opened up a post on a general roadmap/to-do on Discourse with regards to future design of multithreading?
Some of us on Gitter today where wondering if there was a handy reference for the roadmap you mention.
This could help keeping up with the developing ideas.
Thanks!

(@kpamnany, thanks for sharing the parallel depth first (PDF) scheduling paper! If you would recommend additional material along those lines, please share!)

I did some test with nested threaded loops and performance is severely degraded. Here's the code and benchmarking. The code is taken from the tests of the save nested thread fix.

edit - I assumed that 0.6rc2 is including the "Band aid". Perhaps not?

versioninfo()
Julia Version 0.6.0-rc2.0
Commit 68e911be53* (2017-05-18 02:31 UTC)
Platform Info:
  OS: Linux (x86_64-linux-gnu)
  CPU: Intel(R) Core(TM) i5-4570 CPU @ 3.20GHz
  WORD_SIZE: 64
  BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Haswell)
  LAPACK: libopenblas64_
  LIBM: libopenlibm
  LLVM: libLLVM-3.9.1 (ORCJIT, haswell)
function testnestedloop()
  a = zeros(Int, 10000, 10000)
  Threads.@threads for i in 1:10000
    Threads.@threads for j in 1:10000
      a[j, i] = i + j
    end
  end
  return a
end

function testloop()
  a = zeros(Int, 10000, 10000)
  for i in 1:10000
    for j in 1:10000
      a[j, i] = i + j
    end
  end
  return a
end
@benchmark testnestedloop()
BenchmarkTools.Trial: 
  memory estimate:  5.05 GiB
  allocs estimate:  236655143
  --------------
  minimum time:     13.538 s (7.93% GC)
  median time:      13.538 s (7.93% GC)
  mean time:        13.538 s (7.93% GC)
  maximum time:     13.538 s (7.93% GC)
  --------------
  samples:          1
  evals/sample:     1

vs.

@benchmark testloop()
BenchmarkTools.Trial: 
  memory estimate:  762.94 MiB
  allocs estimate:  2
  --------------
  minimum time:     155.824 ms (1.54% GC)
  median time:      163.314 ms (1.47% GC)
  mean time:        170.175 ms (5.50% GC)
  maximum time:     233.299 ms (31.00% GC)
  --------------
  samples:          30
  evals/sample:     1

Getting rid of one of the Threads.@threads bring back the performance.

function testnestedloop()
         a = zeros(Int, 10000, 10000)
         Threads.@threads for i in 1:10000
        for j in 1:10000
             a[j, i] = i + j
           end
         end
         return a
       end
testnestedloop (generic function with 1 method)

julia> @benchmark testnestedloop()
BenchmarkTools.Trial: 
  memory estimate:  762.94 MiB
  allocs estimate:  3
  --------------
  minimum time:     149.190 ms (0.00% GC)
  median time:      157.355 ms (0.00% GC)
  mean time:        171.763 ms (8.29% GC)
  maximum time:     261.659 ms (33.20% GC)
  --------------
  samples:          30
  evals/sample:     1

The band aid is to make it work not necessarily making it fast. It won't be faster than a normal loop and it'll still have all the other problems (i.e. closure lowering). The slowdown has nothing to do with scheduling though. Given the large number of allocation this is due to https://github.com/JuliaLang/julia/issues/15276 and can be work arounded in a similar way.

Just checked locally and that's indeed the case. The lowered AST for the first loop body is

Variables:
  #self#::##2#threadsfor_fun#1{Array{Int64,2},UnitRange{Int64}}
  onethread::Bool
  i@_3::Core.Box
  i@_4::Int64
  range::Core.Box
  threadsfor_fun::##14#threadsfor_fun#2{Array{Int64,2}}
  #temp#@_7::Int64
  r@_8::UnitRange{Int64}
  lenr::Int64
  tid::Int64
  #temp#@_11::Int64
  len::Int64
  rem::Int64
  f::Int64
  l::Int64
  #temp#@_16::Bool
  z@_17::Int64
  b@_18::Bool
  #temp#@_19::Int64
  z@_20::Int64
  b@_21::Bool
  #temp#@_22::Int64
  r@_23::Int64
  ret::Int64

Body:
  begin
      NewvarNode(:(tid::Int64))
      NewvarNode(:(len::Int64))
      NewvarNode(:(rem::Int64))
      NewvarNode(:(f::Int64))
      NewvarNode(:(l::Int64))
      r@_8::UnitRange{Int64} = (Core.getfield)(#self#::##2#threadsfor_fun#1{Array{Int64,2},UnitRange{Int64}}, Symbol("#1#range"))::UnitRange{Int64}
      #= line 31 =#
      $(Expr(:inbounds, false))
      # meta: location range.jl length 393
      # meta: location checked.jl checked_sub 221
      SSAValue(9) = (Base.Checked.checked_ssub_int)((Core.getfield)(r@_8::UnitRange{Int64}, :stop)::Int64, (Core.getfield)(r@_8::UnitRange{Int64}, :start)::Int64)::Tuple{Int64,Bool}
      SSAValue(12) = (Base.getfield)(SSAValue(9), 1)::Int64
      SSAValue(14) = (Base.getfield)(SSAValue(9), 2)::Bool
      #= line 222 =#
      unless SSAValue(14) goto 17
      (Base.Checked.throw)($(QuoteNode(OverflowError())))::Union{}
      17: 
      # meta: pop location
      # meta: location checked.jl checked_add 164
      SSAValue(16) = (Base.Checked.checked_sadd_int)(SSAValue(12), 1)::Tuple{Int64,Bool}
      SSAValue(19) = (Base.getfield)(SSAValue(16), 1)::Int64
      SSAValue(21) = (Base.getfield)(SSAValue(16), 2)::Bool
      #= line 165 =#
      unless SSAValue(21) goto 26
      (Base.Checked.throw)($(QuoteNode(OverflowError())))::Union{}
      26: 
      # meta: pop location
      # meta: pop location
      $(Expr(:inbounds, :pop))
      #= line 33 =#
      unless onethread::Bool goto 38
      #= line 34 =#
      tid::Int64 = 1
      #= line 35 =#
      len::Int64 = SSAValue(19)
      rem::Int64 = 0
      goto 60
      38: 
      #= line 37 =#
      $(Expr(:inbounds, false))
      # meta: location threadingconstructs.jl threadid 10
      SSAValue(23) = $(Expr(:foreigncall, :(:jl_threadid), Int16, svec()))
      # meta: pop location
      $(Expr(:inbounds, :pop))
      tid::Int64 = (Base.add_int)((Base.sext_int)(Int64, SSAValue(23))::Int64, 1)::Int64
      #= line 38 =#
      $(Expr(:inbounds, false))
      # meta: location threadingconstructs.jl nthreads 19
      SSAValue(24) = (Base.Threads.cglobal)(:jl_n_threads, Base.Threads.Cint)::Ptr{Int32}
      SSAValue(25) = (Base.pointerref)(SSAValue(24), 1, 1)::Int32
      # meta: pop location
      $(Expr(:inbounds, :pop))
      SSAValue(26) = (Base.sext_int)(Int64, SSAValue(25))::Int64
      SSAValue(30) = (Base.checked_sdiv_int)(SSAValue(19), SSAValue(26))::Int64
      SSAValue(31) = (Base.checked_srem_int)(SSAValue(19), SSAValue(26))::Int64
      SSAValue(32) = (Base.add_int)(1, 1)::Int64
      len::Int64 = SSAValue(30)
      SSAValue(33) = (Base.add_int)(2, 1)::Int64
      rem::Int64 = SSAValue(31)
      60: 
      #= line 41 =#
      unless (len::Int64 === 0)::Bool goto 71
      #= line 42 =#
      unless (Base.slt_int)(rem::Int64, tid::Int64)::Bool goto 67
      #= line 43 =#
      return
      67: 
      #= line 45 =#
      len::Int64 = 1
      rem::Int64 = 0
      71: 
      #= line 48 =#
      f::Int64 = (Base.add_int)(1, (Base.mul_int)((Base.sub_int)(tid::Int64, 1)::Int64, len::Int64)::Int64)::Int64
      #= line 49 =#
      l::Int64 = (Base.sub_int)((Base.add_int)(f::Int64, len::Int64)::Int64, 1)::Int64
      #= line 51 =#
      unless (Base.slt_int)(0, rem::Int64)::Bool goto 90
      #= line 52 =#
      unless (Base.sle_int)(tid::Int64, rem::Int64)::Bool goto 85
      #= line 53 =#
      f::Int64 = (Base.add_int)(f::Int64, (Base.sub_int)(tid::Int64, 1)::Int64)::Int64
      #= line 54 =#
      l::Int64 = (Base.add_int)(l::Int64, tid::Int64)::Int64
      goto 90
      85: 
      #= line 56 =#
      f::Int64 = (Base.add_int)(f::Int64, rem::Int64)::Int64
      #= line 57 =#
      l::Int64 = (Base.add_int)(l::Int64, rem::Int64)::Int64
      90: 
      #= line 61 =#
      SSAValue(34) = f::Int64
      SSAValue(35) = (Base.select_value)((Base.sle_int)(f::Int64, l::Int64)::Bool, l::Int64, (Base.sub_int)(f::Int64, 1)::Int64)::Int64
      #temp#@_7::Int64 = SSAValue(34)
      95: 
      unless (Base.not_int)((#temp#@_7::Int64 === (Base.add_int)(SSAValue(35), 1)::Int64)::Bool)::Bool goto 166
      i@_3::Core.Box = $(Expr(:new, :(Core.Box)))
      range::Core.Box = $(Expr(:new, :(Core.Box)))
      SSAValue(36) = #temp#@_7::Int64
      SSAValue(37) = (Base.add_int)(#temp#@_7::Int64, 1)::Int64
      i@_4::Int64 = SSAValue(36)
      #temp#@_7::Int64 = SSAValue(37)
      #= line 62 =#
      $(Expr(:inbounds, false))
      # meta: location abstractarray.jl unsafe_getindex 884
      $(Expr(:inbounds, true))
      SSAValue(27) = i@_4::Int64
      $(Expr(:inbounds, false))
      # meta: location range.jl getindex 476
      ret::Int64 = (Base.sub_int)((Base.add_int)((Core.getfield)(r@_8::UnitRange{Int64}, :start)::Int64, SSAValue(27))::Int64, 1)::Int64
      #= line 477 =#
      112: 
      113: 
      # meta: pop location
      $(Expr(:inbounds, :pop))
      $(Expr(:inbounds, :pop))
      # meta: pop location
      $(Expr(:inbounds, :pop))
      (Core.setfield!)(i@_3::Core.Box, :contents, ret::Int64)::Int64
      #= line 63 =#
      # meta: location REPL[1]
      #= line 4 =#
      # meta: location threadingconstructs.jl
      #= line 28 =#
      SSAValue(6) = $(Expr(:new, UnitRange{Int64}, 1, :((Base.select_value)((Base.sle_int)(1, 10000)::Bool, 10000, (Base.sub_int)(1, 1)::Int64)::Int64)))
      (Core.setfield!)(range::Core.Box, :contents, SSAValue(6))::UnitRange{Int64}
      #= line 29 =#
      threadsfor_fun::##14#threadsfor_fun#2{Array{Int64,2}} = $(Expr(:new, ##14#threadsfor_fun#2{Array{Int64,2}}, :(i@_3), :(range), :((Core.getfield)(#self#, :a)::Array{Int64,2})))
      #= line 67 =#
      $(Expr(:inbounds, false))
      # meta: location threadingconstructs.jl threadid 10
      SSAValue(29) = $(Expr(:foreigncall, :(:jl_threadid), Int16, svec()))
      # meta: pop location
      $(Expr(:inbounds, :pop))
      SSAValue(7) = (Base.not_int)(((Base.add_int)((Base.sext_int)(Int64, SSAValue(29))::Int64, 1)::Int64 === 1)::Bool)::Bool
      unless SSAValue(7) goto 139
      #temp#@_16::Bool = SSAValue(7)
      goto 141
      139:
      #temp#@_16::Bool = (Core.getfield)(Base.Threads.in_threaded_loop, :x)::Bool
      141:
      unless #temp#@_16::Bool goto 146
      #= line 69 =#
      $(Expr(:invoke, MethodInstance for (::##14#threadsfor_fun#2{Array{Int64,2}})(::Bool), :(threadsfor_fun), true))::Void
      goto 161
      146:
      #= line 71 =#
      $(Expr(:inbounds, false))
      # meta: location refpointer.jl setindex! 121
      (Core.setfield!)(Base.Threads.in_threaded_loop, :x, true)::Bool
      # meta: pop location
      $(Expr(:inbounds, :pop))
      #= line 73 =#
      $(Expr(:foreigncall, :(:jl_threading_run), Ref{Void}, svec(Any), :(threadsfor_fun), 0))
      #= line 74 =#
      $(Expr(:inbounds, false))
      # meta: location refpointer.jl setindex! 121
      (Core.setfield!)(Base.Threads.in_threaded_loop, :x, false)::Bool
      # meta: pop location
      $(Expr(:inbounds, :pop))
      161:
      # meta: pop location
      # meta: pop location
      164:
      goto 95
      166:
      return
  end::Void

Note that the range and i are boxed.

Thanks for the feedback and info!

I wasn't expecting faster loops with nested threading, but was surprised by the results and was wondering how to solve that. :)

Should we close this in favour of the more recent #32207

FYI, FLoops.jl has parallel for loop with load balancing, very flexible reduction syntax, and extensible executor infrastructure.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

omus picture omus  Â·  3Comments

StefanKarpinski picture StefanKarpinski  Â·  3Comments

sbromberger picture sbromberger  Â·  3Comments

TotalVerb picture TotalVerb  Â·  3Comments

yurivish picture yurivish  Â·  3Comments