Repro steps
async {
let x = Seq.initInfinite id |> Seq.take 10 |> Seq.toList
let delay i =
async {
//System.Threading.Thread.Sleep (500)
do! Async.Sleep (500)
printf "%d\r\n" (i)
}
do! x |> List.map delay |> fun x -> Async.Parallel(x,1) |> Async.Ignore
return 0
}
|> Async.RunSynchronously
This prints completely raced things in console.
Everytask runs at once (or sleep is basically ignored/turned to nothing).
What's interesting is Thread.Sleep, works as expected, each task evaluated in turn.
What's more interesting is leaving just
async {
printf "%d\r\rn"
}
This gets evaluated in order.
Expected behavior
Parallel with a limit of 1 only every runs 1 async at a time.
Actual behavior
Parallel seems to start the next Async as soon as it hits an 'async' part.
Known workarounds
None. I wouldn't call sleeping a thread a good workaround :).
For Sequential, we do use our own implementation that uses recursion, but was thinking about switching to use inbuilt functions.
Related information
Provide any related information (optional):
What is Async.Parallel(x,1)? I don't believe such an override exists in the core library.
Looks like it was added via #6357
Particularly it adds Sequential, which me and a co-worker found in FSharp 4.7 that's come in via nuget.org, published about 1 month ago. So it is super new :)
That looks like a clear bug.
Currently, maxDegreeOfParallelism only seems to limit the amount of worker threads used to _start_ the computations, but continuations scheduled by those have unbound parallelism. I don't think that was the API intented by the RFC. I'd expect behaviour similar to
F#
async {
use sem = new SemaphoreSlim(maxDegreeOfParallelism)
return! Async.Parallel(seq {
for c in computations -> async {
do! Async.AwaitTask(sem.WaitAsync())
try return! c finally sem.Release() |> ignore
}
})
}
although queueing the computations when we actually enter the semaphore would be somewhat better.
I don't think that was the API intented by the RFC.
Indeed, this pretty much invalidates its purpose.
Yeh this is clearly broken (async code is not easy to write it seems :disappointed: ). My intention was to limit the number of active threads to the parallel limit. I explciitly didn't just use the semaphore approach above though because that results in Seq.length number of tasks being sent to the thread pool most of which will just be blocked on the semaphore. I was trying to be smarter and copy what the C# parallel libraries do when given a MaxDegreeOfParallelism so that if you passed in a large sequence of tasks you wouldn't end up with hundreds of blocked threads waiting.
I think the current code is mostly salvageable, just need to instead of while looping, incrementing and starting the next async instead do that increment and next start in the continuation functions that were invoking the asnycs with.
@btrepp you can use [0..9] instead of Seq.initInfinite id |> Seq.take 10 |> Seq.toList! :)