Fsharp: Async.Sleep and Async.Parallel interact in a surprising way

Created on 16 Sep 2019  路  7Comments  路  Source: dotnet/fsharp

Repro steps

async {
            let x = Seq.initInfinite id |> Seq.take 10 |> Seq.toList
            let delay i = 
                async { 
                    //System.Threading.Thread.Sleep (500)
                    do! Async.Sleep (500)
                    printf "%d\r\n" (i)                
                }

            do! x |> List.map delay |> fun x -> Async.Parallel(x,1) |> Async.Ignore
            return 0
        }
    |> Async.RunSynchronously

This prints completely raced things in console.
Everytask runs at once (or sleep is basically ignored/turned to nothing).
What's interesting is Thread.Sleep, works as expected, each task evaluated in turn.
What's more interesting is leaving just

    async {
            printf "%d\r\rn"
    }

This gets evaluated in order.

Expected behavior

Parallel with a limit of 1 only every runs 1 async at a time.

Actual behavior

Parallel seems to start the next Async as soon as it hits an 'async' part.

Known workarounds

None. I wouldn't call sleeping a thread a good workaround :).
For Sequential, we do use our own implementation that uses recursion, but was thinking about switching to use inbuilt functions.

Related information

Provide any related information (optional):

  • Operating system: Windows 10
  • .NET Runtime kind (.NET Core, .NET Framework, Mono): Net Framework 4.6.1 and .Net Core App 2.2 (others untested)
  • Editing Tools (e.g. Visual Studio Version, Visual Studio): Visual Studio 2019
  • FSharp.Core 4.7.0
Area-Library bug

All 7 comments

What is Async.Parallel(x,1)? I don't believe such an override exists in the core library.

https://github.com/dotnet/fsharp/blob/897afd3dad3bfe58a0495713e99a8094098c18c5/src/fsharp/FSharp.Core/async.fsi#L177

Looks like it was added via #6357

Particularly it adds Sequential, which me and a co-worker found in FSharp 4.7 that's come in via nuget.org, published about 1 month ago. So it is super new :)

That looks like a clear bug.

Currently, maxDegreeOfParallelism only seems to limit the amount of worker threads used to _start_ the computations, but continuations scheduled by those have unbound parallelism. I don't think that was the API intented by the RFC. I'd expect behaviour similar to

F# async { use sem = new SemaphoreSlim(maxDegreeOfParallelism) return! Async.Parallel(seq { for c in computations -> async { do! Async.AwaitTask(sem.WaitAsync()) try return! c finally sem.Release() |> ignore } }) }

although queueing the computations when we actually enter the semaphore would be somewhat better.

I don't think that was the API intented by the RFC.

Indeed, this pretty much invalidates its purpose.

Yeh this is clearly broken (async code is not easy to write it seems :disappointed: ). My intention was to limit the number of active threads to the parallel limit. I explciitly didn't just use the semaphore approach above though because that results in Seq.length number of tasks being sent to the thread pool most of which will just be blocked on the semaphore. I was trying to be smarter and copy what the C# parallel libraries do when given a MaxDegreeOfParallelism so that if you passed in a large sequence of tasks you wouldn't end up with hundreds of blocked threads waiting.

I think the current code is mostly salvageable, just need to instead of while looping, incrementing and starting the next async instead do that increment and next start in the continuation functions that were invoking the asnycs with.

@btrepp you can use [0..9] instead of Seq.initInfinite id |> Seq.take 10 |> Seq.toList! :)

Was this page helpful?
0 / 5 - 0 ratings