Runtime: Async parallel foreach

Created on 24 Dec 2018  路  74Comments  路  Source: dotnet/runtime

There are situations where a asynchronous parallel foreach is needed instead of the already existing Parallel.ForEach() and while it's not rocket science to implement yourself I think it would be a nice feature to be implemented in corefx. This should be seen as the asynchronous alternative to the existing Parallel.ForEach(). And should take IEnumerable<T> as well as IAsyncEnumerable<T>.

There are examples on this issue already here is one: https://devblogs.microsoft.com/pfxteam/implementing-a-simple-foreachasync-part-2/

```c#
///


/// Executes a foreach asynchronously.
///

///
/// The source.
/// The degrees of parallelism.
/// The body.
///
public static Task ForEachAsync(this IEnumerable source, int dop, Func body)
{
return Task.WhenAll(
from partition in System.Collections.Concurrent.Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
{
while (partition.MoveNext())
await body(partition.Current);
}
}));
}

### Proposed API

```c#
Task ForEachAsync<T>(IAsyncEnumerable<T> source, ParallelOptions parallelOptions, Func<T, CancellationToken, ValueTask> asyncBody);

Task ForEachAsync<T>(IEnumerable<T> source, ParallelOptions parallelOptions, Func<T, CancellationToken, ValuteTask> asyncBody)
{
    //convert IEnumerable<T> to IAsyncEnumerable<T>
}

Closed questions

While I consider these questions to be closed feel free to continue discussion on them.

  • Should DOP be automatic or manual at all times?
    DOP should not be automatic in te case of async parallel foreach. Async parallel foreach will most likely be used in IO. The system has no way to know the right DOP for this. Thus urging the user to experiment with different DOP in docs is the best option. In other words: warn the user that the default DOP might be bad for their usecase.
  • Should it accept IEnumerable?
    Yes it should
  • Should the partitioner concept be supported at all?
    No we should ensure the specified DOP is adhered to exactly. Creating batches can lead to a smaller than expected effective DOP.
  • Should we process the source items sequentially or make no attempt?
    By default we should try to process source items sequentially to improve data locality. Reason being: HDD read performance will increase drastically. Furthermore cache usage will be improved. It should however be made very clear to people in docs, that the order cannot be guaranteed
  • Should there be an alternative to Parallel.For?
    There should be one but I'll leave that outside of this issue.
  • Should the funcs return ValueTask instead of Task?
    Yes ValueTask is more accomodating than Task (cheaper to convert Task to ValueTask than the other way arround). Furthermore it should result in a peformance increase.
  • Should TaskCreationOptions.DenyChildAttach be specified?
    Yes this should be specified see This comment for more info
  • Should there be an overload without CancellationToken in the Func?
    No there shouldn't be. It would have little to no benefit, but would cause a lot of implications (ambiguity, analysis becomes harder)
api-needs-work area-System.Threading.Tasks

Most helpful comment

This API is urgently needed. In particular, there is no way currently to process a set of "items" in parallel and in an async way. Parallel and PLINQ (.AsParallel()) cannot be used for this. I believe Dataflow can be used but it's awkward for something this simple.

The source for the code in the opening post is https://blogs.msdn.microsoft.com/pfxteam/2012/03/05/implementing-a-simple-foreachasync-part-2/.

I am very active on Stack Overflow and I see people needing this all the time. People then use very bad workarounds such as starting all items in parallel at the same time, then WhenAll them. So they start 10000 HTTP calls and wonder why it performs so poorly. Or, they execute items in batches. This can be much slower because as the batch completes item by item the effective DOP decreases. Or, they write very awkward looping code with collections of tasks and weird waiting schemes.

I needed this API multiple times myself. So I developed a solution based on the code in this article. I made the following enhancements which the framework should also have:

  1. A CancellationToken should be taken as an argument. When it is signaled no further loop iterations should be started.
  2. When an exception is thrown by one item the loop should be cancelled. When all items are done executing all exceptions should be thrown together in the form of an AggregateException.
  3. The worker delegate should receive a CancellationToken. This is a token that combines the externally passed token and the cancellation that can come from cancellation due to an exception.
  4. The partitioner should not do any batching. It must be ensured that the specified degree of parallelism is adhered to exactly. For IO, it is important to use the DOP specified by the caller. Batching can lead to the effective DOP being smaller than expected.
  5. Since we are starting tasks we should think about whether a TaskScheduler should be taken as an argument. The caller might want to do a lot of CPU work so he might want to schedule that. Alternatively, the caller can switch to a different scheduler as part of the delegate he passes in. So we might not want to natively support TaskScheduler.
  6. In my implementation I specified TaskCreationOptions.DenyChildAttach for those tasks. I'm not entirely sure about this.
  7. Do we need to think about the SynchronizationContext? As the code stands user code will always be called on the thread pool with no sync context.

Should the DOP be automatic? In my opinion, we have no choice but to force the caller to specify an explicit DOP. Usually, user code will perform IO as part of the loop (that's likely the reason for using async). The system has no way of knowing the right DOP for IO. Different devices (disk, SSD, web service) have vastly different characteristics. Also, the DOP might be intentionally low in order to not overload the system being called. IMO, no auto-tuning is possible. We cannot even make the parameter optional and default to the CPU count. The CPU count is unrelated to the ideal IO DOP! This is why IO on the thread pool can explode so radically. The auto-tuning heuristics can get into a state where they add unbounded amounts of threads (see these issues: https://github.com/dotnet/coreclr/issues/1754, https://github.com/dotnet/coreclr/issues/2184).

All 74 comments

ForEachAsync method can have to many different implementations. Some of users would like to create their own Partitioner<TSource> class, others do not need it in the ForEachAsync like method, and some of users would like to have their own absolutly specific logic in Task.Run(async delegate ..... block.

@ichensky Compare this with the synchronous Parallel.ForEach:

  • Some users would like to use Partitioner, and there's an overload for that.
  • Some users will need completely custom logic, and so they can't use Parallel.ForEach.

But Parallel.ForEach still exists and is useful. I think the situation with Parallel.ForEachAsync is similar: we could create a set of overloads that are useful in the common cases. And that would still be useful, even if it wouldn't cover all cases.

@ichensky I fully agree with the way @svick explains it.
I would simply see this as the asynchronous version of Parallel.ForEach.

A set of overloads for the most common cases is needed for sure!

Hmm. Probably wants to be based on dotnet/corefx#32640 ?

@Clockwork-Muse Possibly. I think Parallel.ForEachAsync that somehow accepts IEnumerable<T> is important. One that accepts IAsyncEnumerable<T> would be useful too. If there was a simple built-in way to convert from IEnumerable<T> to IAsyncEnumerable<T>, then overloads that directly accept IEnumerable<T> might not be necessary. Does something like that exist or is it proposed?

@svick Yes accepting IAsyncEnumerable<T> sounds good!

Usually when making ForEachAsync, for ex. over list of files and inside we create task that Download/Process these files, it is useful pass in custom function ForEachAsync param like maximum possible count of Tasks, that can executes at one time, because of internet bandwidth, depending on other services and so on. Time to time it is needed, that ForEachAsync not just Wait until all Tasks finished but also return result of all those tasks.

This API is urgently needed. In particular, there is no way currently to process a set of "items" in parallel and in an async way. Parallel and PLINQ (.AsParallel()) cannot be used for this. I believe Dataflow can be used but it's awkward for something this simple.

The source for the code in the opening post is https://blogs.msdn.microsoft.com/pfxteam/2012/03/05/implementing-a-simple-foreachasync-part-2/.

I am very active on Stack Overflow and I see people needing this all the time. People then use very bad workarounds such as starting all items in parallel at the same time, then WhenAll them. So they start 10000 HTTP calls and wonder why it performs so poorly. Or, they execute items in batches. This can be much slower because as the batch completes item by item the effective DOP decreases. Or, they write very awkward looping code with collections of tasks and weird waiting schemes.

I needed this API multiple times myself. So I developed a solution based on the code in this article. I made the following enhancements which the framework should also have:

  1. A CancellationToken should be taken as an argument. When it is signaled no further loop iterations should be started.
  2. When an exception is thrown by one item the loop should be cancelled. When all items are done executing all exceptions should be thrown together in the form of an AggregateException.
  3. The worker delegate should receive a CancellationToken. This is a token that combines the externally passed token and the cancellation that can come from cancellation due to an exception.
  4. The partitioner should not do any batching. It must be ensured that the specified degree of parallelism is adhered to exactly. For IO, it is important to use the DOP specified by the caller. Batching can lead to the effective DOP being smaller than expected.
  5. Since we are starting tasks we should think about whether a TaskScheduler should be taken as an argument. The caller might want to do a lot of CPU work so he might want to schedule that. Alternatively, the caller can switch to a different scheduler as part of the delegate he passes in. So we might not want to natively support TaskScheduler.
  6. In my implementation I specified TaskCreationOptions.DenyChildAttach for those tasks. I'm not entirely sure about this.
  7. Do we need to think about the SynchronizationContext? As the code stands user code will always be called on the thread pool with no sync context.

Should the DOP be automatic? In my opinion, we have no choice but to force the caller to specify an explicit DOP. Usually, user code will perform IO as part of the loop (that's likely the reason for using async). The system has no way of knowing the right DOP for IO. Different devices (disk, SSD, web service) have vastly different characteristics. Also, the DOP might be intentionally low in order to not overload the system being called. IMO, no auto-tuning is possible. We cannot even make the parameter optional and default to the CPU count. The CPU count is unrelated to the ideal IO DOP! This is why IO on the thread pool can explode so radically. The auto-tuning heuristics can get into a state where they add unbounded amounts of threads (see these issues: https://github.com/dotnet/coreclr/issues/1754, https://github.com/dotnet/coreclr/issues/2184).

@tomesendam @gspp could you please add the full proposal according to the first step in the doc https://github.com/dotnet/corefx/blob/master/Documentation/project-docs/api-review-process.md. Thanks.

@tarekgh I will try when I find the time to work on this!

I agree this is urgently needed for the reasons @GSPP lists.

I'd also add it's urgently needed because Parallel.ForEach() is a pit of failure when writing async code. It's more than happy to accept an async lambda that returns a Task. Only it doesn't do what the user expects (which is only continue once the async functions within have all fully executed). Instead it will immediately complete, leaving the user's code silently broken and waiting for a potentially hard-to-track-down issue at runtime.

I'd go so far as to say that Parallel.ForEach should maybe throw an exception when passed a Task returning method- or at the very least VS should ship with an analyzer that points out this is an anti-pattern.

Even then, this only solves half the problem. Users need a proper async API to call as an alternative.

Parallel.ForEach should maybe throw an exception when passed a Task returning method

It's not Task-returning though. It's async void as a lambda.

This is unlikely will be in v3.0. Please apply the request I mentioned in my comment https://github.com/dotnet/corefx/issues/34233#issuecomment-455354106 to be able to look at it as whole and move forward.

@tomesendam if you want I can open the api proposal issue

I think that a base parallel foreach should accept a Func that takes a CancellationToken.
```c#
Func asyncBody;

The same CT that is passed in `ParallelOptions` can then be passed along.

There could be other overloads, not accepting a CT for the body that would call the "base" overload, discarding the CT.

So in essence:
```c#
Task<ParallelLoopResult> ForEachAsync<T>(IAsyncEnumerable<T> source, ParallelOptions parallelOptions, Func<T, Task> asyncBody)
{
    return ForEachAsync(source, parallelOptions, (work, ct) => asyncBody(work));
}

Task<ParallelLoopResult> ForEachAsync<T>(IAsyncEnumerable<T> source, ParallelOptions parallelOptions, Func<T, CancellationToken, Task> asyncBody);

@MihaZupan Thanks! @tarekgh I'll look into everything next week as I have a week off then. But maybe I can find time this weekend!

@tarekgh Reason I'm putting this in a comment is to check if this is what you expect! If this is something you can work with I'll edit the original issue.

There are situations where a asynchronous parallel foreach is needed instead of the already existing Parallel.ForEach() and while it's not rocket science to implement yourself I think it would be a nice feature to be implemented in corefx. This should be seen as the asynchronous alternative to the existing Parallel.ForEach(). And should take IEnumerable<T> as well as IAsyncEnumerable<T>.

There are examples on this issue already here is one: https://devblogs.microsoft.com/pfxteam/implementing-a-simple-foreachasync-part-2/

```c#
///


/// Executes a foreach asynchronously.
///

///
/// The source.
/// The degrees of parallelism.
/// The body.
///
public static Task ForEachAsync(this IEnumerable source, int dop, Func body)
{
return Task.WhenAll(
from partition in System.Collections.Concurrent.Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
{
while (partition.MoveNext())
await body(partition.Current);
}
}));
}

### Proposed API

```c#
Task<ParallelLoopResult> ForEachAsync<T>(IAsyncEnumerable<T> source, ParallelOptions parallelOptions, Func<T, Task> asyncBody)
{
    return ForEachAsync(source, parallelOptions, (work, ct) => asyncBody(work));
}

Task<ParallelLoopResult> ForEachAsync<T>(IAsyncEnumerable<T> source, ParallelOptions parallelOptions, Func<T, CancellationToken, Task> asyncBody);

Task<ParallelLoopResult> ForEachAsync<T>(IEnumerable<T> source, ParallelOptions parallelOptions, Func<T, Task> asyncBody)
{
    return ForEachAsync(source, parallelOptions, (work, ct) => asyncBody(work));
}

Task<ParallelLoopResult> ForEachAsync<T>(IEnumerable<T> source, ParallelOptions parallelOptions, Func<T, CancellationToken, Task> asyncBody)
{
    //possibly cast IEnumerable<T> to IAsyncEnumerable<T>
}

Open questions

  • Should DOP be automatic or manual at all times? (@GSPP )
  • Should it accept IEnumerable?
  • Should TaskCreationOptions.DenyChildAttach be specified? (@GSPP)

As far as I understand it, conversion between IEnumerable and IAsyncEnumerable could be done quite easily (tho not necessarily efficiently), via a simple proxy class.

A Parallel.ForEachAsync that had an IAsyncEnumerable as the source would pretty much be a running-until-canceled load balancer.

From what I gathered from the source code, implementing this "properly" is a non-trivial change, requiring changes to the Partitioner, (I assume also Task replicator), and of course the Parallel class itself.

I think input from @stephentoub would be valuable here.

Should DOP be automatic or manual at all times?

I feel very strongly that it must be manual and a required parameter. See my comment (at the bottom). I'm speaking from experience and theoretically there.

Should it accept IEnumerable?

If you mean the non-generic version, no. But I think you meant the generic version. Definitely yes. Often, the input is a list of work items (not IO). E.g. URLs, file names, DTOs, ...

Of course, IAsyncEnumerable should be accepted as well.

From what I gathered from the source code, implementing this "properly" is a non-trivial change, requiring changes to the Partitioner, (I assume also Task replicator), and of course the Parallel class itself.

New open question: Should the partitioner concept be supported at all? I personally have never needed anything but a single items partitioner. For IO work, batch partitioning would likely not increase efficiency much.

Also note, that the default behavior must be no batching or chunking. I'm quoting my point 4:

The partitioner should not do any batching. It must be ensured that the specified degree of parallelism is adhered to exactly. For IO, it is important to use the DOP specified by the caller. Batching can lead to the effective DOP being smaller than expected.

The TL;TD is that IO needs exact DOP control. Anything in the way of that is a non-starter.

I agree on setting dop explicitly.

I too have never used the Partitioner directly. I don't think that it would be needed for IAsyncEnumerable, was just pointing it out as a part that would need changes during implementation.

We should think about the order that source items are processed. We cannot be fully deterministic here but we have choices:

  1. Make no attempt at ordering. Process IList and arrays using range partitioning for example.
  2. Try to process items sequentially.

Often, it is desirable to process items in approximate sequential order. For example, if you have a list of database keys you want to retrieve you likely want to send the queries to the database in order. That way data locality is increased and cache usage is better.

Or, if you want to read data blocks from a file you want to issue the IOs in ascending file position order so that the disk head sweeps sequentially over the disk (elevator algorithm).

The performance gains from this can be large. Data locality is important.

This is an argument for (2). An argument against (1) is that since this feature is mainly used for IO any small CPU optimization coming from (1) would not matter much.

I believe that (2) must be supported or else it can be a performance dealbreaker in some scenarios. We can independently decide what the default should be. My feeling is that only (2) should be supported.

I just wanted to create an issue for the exact same request.
There should be an async version for both Parallel.For and Parallel.ForEach.

@tomesendam do You think that adding one overload that supports IProgress would be fine?
Currently I use this:

/// <summary>
/// Executes a foreach asynchronously.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source">The source.</param>
/// <param name="dop">The degrees of parallelism.</param>
/// <param name="body">The body.</param>
/// <param name="progress">Use for progress updates</param>
/// <returns></returns>
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body, IProgress<T> progress = null)
{
    return Task.WhenAll(
        from partition in System.Collections.Concurrent.Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate
        {
            using (partition)
            {
                while (partition.MoveNext()) {
                    await body(partition.Current);
                    progress?.Report(partition.Current);
                }
            }
        }));
}

This single overload would be useful for progress notification (for example we want to know how many files are downloaded and notify the user on progress)

What do You guys think?

@Misiu you can send the progress from inside the body, right?

@GSPP I forgot about that :)
I guess there isn't any difference if we pass IProgress to a function call or to the body.
So extra overload isn't needed, let's keep it simple.

@Misiu Sounds good to me

@GSPP do you have any tips on finalizing the suggestion? Or is the comment I posted earlier alright. I'm rather new to this that's why I'm asking!

@tomesendam I am not a team member but I have contributed multiple times by writing up a comprehensive treatment of some issue that I think the framework should solve. This is how I would approach this: I think think you should now write down the full API surface in C#. Also, all relevant design points must be discussed and decided. I contributed some in my posts in this issue. You'd need to read through everything that was said here and write in your proposal how you would decide. I'd create a numbered list of all design aspects. Say what the design question is, the possible decisions, their pros and cons and your tentative decision.

This enables the team to then take your comprehensive summary into an internal meeting and produce feedback. This can lead to a final decision about this API. After that it can be implemented with no surprises left.

I think it's awesome that we are now making progress on this API! This is a very important feature that will come to be used a lot. We need to get the design exactly right. There is no second chance because compatibility forces design mistakes to be carried forever.

@tarekgh any chance this might get added with .NET Core 3.0?

@Misiu we are very late on adding such feature in 3.0. meanwhile you can use the workaround mentioned in the issue description.

Any update on this issue, btw?

@PureKrome honestly I've forgotten about this issue. I'll create the complete list of design decisions I can come up with by the end of today.

This most likely will not be enough so others should feel free to comment more design decisions and I'll add them to the list.

@tarekgh should I move this issue to the new repo? or will this be done by Microsoft?

Summary of the discussion so far

There are situations where a asynchronous parallel foreach is needed instead of the already existing Parallel.ForEach() and while it's not rocket science to implement yourself I think it would be a nice feature to be implemented in corefx. This should be seen as the asynchronous alternative to the existing Parallel.ForEach(). And should take IEnumerable<T> as well as IAsyncEnumerable<T>.

There are examples on this issue already here is one: https://devblogs.microsoft.com/pfxteam/implementing-a-simple-foreachasync-part-2/

```c#
///


/// Executes a foreach asynchronously.
///

///
/// The source.
/// The degrees of parallelism.
/// The body.
///
public static Task ForEachAsync(this IEnumerable source, int dop, Func body)
{
return Task.WhenAll(
from partition in System.Collections.Concurrent.Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
{
while (partition.MoveNext())
await body(partition.Current);
}
}));
}

### Proposed API

```c#
Task<ParallelLoopResult> ForEachAsync<T>(IAsyncEnumerable<T> source, ParallelOptions parallelOptions, Func<T, Task> asyncBody)
{
    return ForEachAsync(source, parallelOptions, (work, ct) => asyncBody(work));
}

Task<ParallelLoopResult> ForEachAsync<T>(IAsyncEnumerable<T> source, ParallelOptions parallelOptions, Func<T, CancellationToken, Task> asyncBody);

Task<ParallelLoopResult> ForEachAsync<T>(IEnumerable<T> source, ParallelOptions parallelOptions, Func<T, Task> asyncBody)
{
    return ForEachAsync(source, parallelOptions, (work, ct) => asyncBody(work));
}

Task<ParallelLoopResult> ForEachAsync<T>(IEnumerable<T> source, ParallelOptions parallelOptions, Func<T, CancellationToken, Task> asyncBody)
{
    //convert IEnumerable<T> to IAsyncEnumerable<T>
}

Closed questions

While I consider these questions to be closed feel free to continue discussion on them.

  • Should DOP be automatic or manual at all times?
    DOP should not be automatic in te case of async parallel foreach. Async parallel foreach will most likely be used in IO. The system has no way to know the right DOP for this. Thus forcing the user to experiment with different DOP is the best option. IF DOP will have a default it should be made really clear to the user that this might be bad. (Possibly trough compile warnings?)
  • Should it accept IEnumerable?
    Yes it should
  • Should the partitioner concept be supported at all?
    No we should ensure the specified DOP is adhered to exactly. Creating batches can lead to a smaller than expected effective DOP.
  • Should we process the source items sequentially or make no attempt?
    By default we should try to process source items sequentially to improve data locality. Reason being: HDD read performance will increase drastically. Furthermore cache usage will be improved.
  • Should there be an alternative to Parallel.For?
    There should be one but I'll leave that outside of this issue.

Open questions

  • Should TaskCreationOptions.DenyChildAttach be specified? (@GSPP)

@tomesendam transfered

By default we should try to process source items sequentially to improve data locality. Reason being: HDD read performance will increase drastically.

This assumes all of the following, at a minimum:

  1. The API is being primarily used to read from a local hard drive.
  2. The hard drive is a traditional disk drive, not an SSD.
  3. The hard drive has been defragmented; files are contiguous, and in directory traversal order.

... please correct me if I'm wrong, but I personally would expect this to be more likely to be used to make _web requests_...

Furthermore cache usage will be improved.

If you're speaking about CPU cache usage, can we really rely on that? It assumes that the allocated objects (especially the tasks) will be contiguous in memory. That doesn't strike me as something we can actually control.
(Please correct me if I'm wrong about the assumptions here)

@Clockwork-Muse You are not wrong. But do you suggest the default behavior should be different? We could also make no attempt at it, but would that actually result in any performance increase?

@tomesendam please move the proposal you mentioned https://github.com/dotnet/runtime/issues/1946#issuecomment-576464191 to the top of the issue. I think the proposal is reasonable now.

@stephentoub please let's know if you have any comment before we mark this issue ready for review.

@tomesendam I think I would personally explicitly state in documentation that the order of processing is not guaranteed. The easiest way to iterate over the source items is going to be in source collection order, as in your example. The problem is that once each or any task is created, all bets are off, and people may naively expect that:

// Side note: we probably want an override without ParallelOptions
var result = someEnumerable.ForEachAsync(url => MakeDeleteRequestAsync(url));

... would be guaranteed to post the delete requests in sequence, which is absolutely untrue (even assuming that MakeDeleteRequest was somehow called in sequence, the server probably isn't going to receive them in sequence).

In the example you're showing, the partitioner is going to mess up your desire to read files sequentially. Which probably means you would want something documented about dop not being terribly helpful if you have only a single physical disk, and are reading from it - but this depends heavily on things outside programmer control, and possibly knowledge. It also means ignoring the possibility of things like the drive reordering operations, or having the ability to queue operations (if you were trying to delete files instead).

I agree with @Clockwork-Muse regarding be explicitly mentioning in the docs the order is not guaranteed.

@Clockwork-Muse @tarekgh Yes I agree it should be made clear to people. I'll update the proposal to include it, and move it to the top

Why do you think we want an override without ParallelOptions? As discussed earlier DOP should always be specified. Or are we too biased in our assumptions.

"simplified use case", mostly, along the lines of what PLINQ does.

In general the API seems fine. Some questions/comments:

  • I don't understand why the methods return a Task<ParallelLoopResult> rather than just Task. The members on ParallelLoopResult are fairly tied to a ParallelLoopState that gets passed around, and these methods as currently defined don't have one. For better or worse, the corresponding ForEach methods returned ParallelLoopResult for consistency with the overloads that did pass a ParallelLoopState to the delegates, but there aren't any such overloads here.
  • Should there be? Is there a similar need for it? I've not seen a ton of usage of those overloads of ForEach, so maybe it's not something we should propagate forward?
  • I suggest we reduce the number of overloads and just have a CancellationToken always passed into the delegate. You then end up with one for IEnumerable and one for IAsyncEnumerable.
  • I think the funcs should return ValueTask rather than Task. The primary concern with ValueTask is that it'll be consumed incorrectly, but here the consumer is the method we're implementing, and we'll just make sure to do it right ;) And returning ValueTask is more accomodating: Task can be converted to a ValueTask very cheaply, but it requires an allocation to convert a ValueTask (if it's not already wrapping a Task) into a Task.
  • Re: "DOP should not be automatic in te case of async parallel foreach"... how do you plan to do that? It defaults to -1 in ParallelOptions. Are you suggesting -1 would mean ProcessorCount or similar when used with ForEachAsync even though it means "no upper bound" with ForEach?
  • Re: "Should we process the source items sequentially or make no attempt?" I don't understand this. Can you elaborate?
  • Re: "Should TaskCreationOptions.DenyChildAttach be specified?" parent/child relationships with tasks is something I wish we could go back and uninvent. Very few people use it, and we pay for it regardless (in complexity if nothing else). I also don't think it's particularly relevant here: async methods in general do not play with the parent/child relationship concept. If we schedule tasks as part of the implementation, we should do so with DenyChildAttach, just as Run does.

@stephentoub

  • To answer your first question: You are right I suppose there is no reason to have it be Task<ParallelLoopResult>. Cancelling from outside the loop should be good enough with just the cancellation token. I too have never used ParallelLoopStatemyself and cannot come up with a sensible usecase.

  • ValuteTask<T>: I never thought about this. But it could be better. but if I'm right this will increase the size of the callers allocation but spares us creating new allocations for each Task (manually) synchronously returned? So if the user would pass a method containing some caching logic this could improve performance. But would it not decrease performance slightly in all other cases?

  • DOP: A user should be forced to manually specify a DOP since there is no way of knowing what the right DOP is. But if we do try to set some DOP for the user it should be made very clear to the user that it might be a bad DOP. Trough warnings perhaps? Forcing users to set it manually would require either adapting ParallelOptions or creating a new way of setting it. AsyncParallelOptions?

  • See This comment on "Should we process the source items sequentially or make no attempt?"

  • TaskCreationOptions.DenyChildAttach Will be used then I'll update the issue

@tomesendam go ahead and update the final proposal so we can proceed with this issue.

@tarekgh it's updated to the current state

@tomesendam thanks for the update.

I think @stephentoub suggested the following

I suggest we reduce the number of overloads and just have a CancellationToken always passed into the delegate. You then end up with one for IEnumerable and one for IAsyncEnumerable.

But I am still seeing you are proposing the other overloads. if you agree with @stephentoub suggestion, please update the proposal one last time.

@tarekgh @stephentoub I do not agree on the CancellationToken. There might not always be a need for one thus simplifying use (a minor amount) is a good thing in my opinion. I'll remove it if this adds some major drawback I'm not seeing.

I do not agree on the CancellationToken. There might not always be a need for one thus simplifying use (a minor amount) is a good thing in my opinion. I'll remove it if this adds some major drawback I'm not seeing.

@tomesendam you may list this as an open issue in the proposal that can be discussed during the design review. I marked the issue ready for review.

@tarekgh will do! Thanks

There might not always be a need for one thus simplifying use (a minor amount) is a good thing in my opinion

First, if you really don't care about the token, we're talking about the difference between:
```C#
await Parallel.ForeachAsync(source, options, async (item, _) =>
{
...
});

```C#
await Parallel.ForeachAsync(source, options, async item =>
{
    ...
});

That's a _very_ minor amount and is not worth exposing two additional overloads.

Second, without the cancellation token being exposed there, it'll be really easy for code to accidentally omit it. Static analysis tools can look at the signature and say "I see a CancellationToken is being passed into your delegate, but your call to XyzAsync which has an overload that takes a CancellationToken isn't using it; strongly consider passing it through". But with both of these overloads, it'd take much more complicated and fragile analysis to help the developer fall into the pit of success here when they accidentally choose the wrong one. We want to encourage rather than discourage passing through a CancellationToken you receive, and here the CancellationToken is being provided through the ParallelOptions.

Third, having multiple overloads actually causes ambiguity problems in some cases. Let's say you just want to run the body 1000 times, but you don't actually need any input data, so you pass in something like Enumerable.Range(0, 1000) and then for your lambda you write async delegate to avoid specifying any arguments. If there are multiple delegate types valid, this will produce a compilation error "The call is ambiguous between the following methods or properties".

Finally, new methods need to prove their worth. We do not add new methods/overloads just because we can; we need to justify their value, and everything starts with negative value.

@stephentoub I'll remove the overloads. You're right, I overlooked these issues.

DOP: A user should be forced to manually specify a DOP since there is no way of knowing what the right DOP is.

We don't do that for the sync overloads (where there's also no foolproof way of knowing what the right DOP is), we shouldn't for the async overloads. Even for things that are I/O bound, we can still choose some reasonable default, either Environment.ProcessorCount or unbounded, and document that's the default; then if that maps to what the dev wants, great, they don't need to allocate and pass in a ParallelOptions just to use our defaults (keep in mind, too, that the default MaxDegreeOfParallelism in a ParallelOptions is unbounded, so if someone just does new ParallelOptions() to satisfy our signature, that's the default they're going to get... at least if we don't ask for a ParallelOptions, we can more reasonably choose another default). If that's inappropriate for the developer's needs, then they can choose to pass in a ParallelOptions and override our default choices. This is one place where I do believe having additional overloads is warranted. And then since you won't be providing a CancellationToken, the delegates there won't take one; any cancellation via a token would be achieved via closure, which is fine. We keep the simple cases simple and the more complicated cases more complicated.

But would it not decrease performance slightly in all other cases?

Potentially very slightly. I suspect it'll be a drop in the bucket. And the upside is potentially significant, especially depending on what we do with https://github.com/dotnet/coreclr/issues/27403.

Should we process the source items sequentially or make no attempt?
By default we should try to process source items sequentially to improve data locality. Reason being: HDD read performance will increase drastically. Furthermore cache usage will be improved. It should however be made very clear to people in docs, that the order cannot be guaranteed

I'm still not understanding. Both IEnumerable<T> and IAsyncEnumerable<T> are forward-only iterators, so via those interfaces the only way they can be consumed is sequentially (unless you're suggesting we try to type check for other interfaces?) In other words, without doing anything special, we'll be processing sequentially. What kind of consumption are you concerned we might encounter if we "make no attempt"? Or are you thinking about trying to ensure that all of the actual code execution runs serialized even when we allow I/O to be overlapped? That can be achieved by passing in a TaskScheduler like new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler; we should not bake in such behavior to the ForEachAsync, as there'd be no good way for the consumer to override it, whereas they can with the inverse.

@stephentoub Having an overload without ParallelOptions and thus no CancellationToken sounds okay to me. But Can we give the devs a warning that the default DOP might be bad trough the compiler? Or by the optional rosyln analyzers? @GSPP

Potentially very slightly. I suspect it'll be a drop in the bucket. And the upside is potentially significant, especially depending on what we do with dotnet/coreclr#27403.

I have included ValueTask in the suggestion.

we should not bake in such behavior to the ForEachAsync, as there'd be no good way for the consumer to override it, whereas they can with the inverse.

So we should leave this to the devs calling the code?

But Can we give the devs a warning that the default DOP might be bad trough the compiler?

The default is going to be unbounded if the developer just passes in new ParallelOptions(), so I'm not clear on what kind of warning you're hoping for when one isn't provided.

@stephentoub If we set it automatically based on cpu or something else. Then I would like there to be some kind of warning if that's possible to do. With this I mean either a compiler warning or from analyzers. I get that there currently isn't one but could or should there be one?

I'm still not understanding. Even for the overload that takes a ParallelOptions, the default is still unbounded, because new ParallelOptions() defaults to unbounded. Are you suggesting we always warn for every call to any of these overloads?

@stephentoub no I'm suggesting it for the overload without parallel options. And or a warning if the dop isn't manually changed in ParallelOptions (which probably isn't possible). That would by my ideal.

Or do you think any kind of warning is unnecessary? And that the documentation should suffice.

I'm suggesting it for the overload without parallel options

Why do you see that overload as being different from the one that takes ParallelOptions?

When a developer sees they need to pass in a ParallelOptions, they'll likely do:
C# await Parallel.ForEachAsync(source, new ParallelOptions(), i => { ... });
at which point they're getting the unlimited behavior.

You're concerned about the behavior when a ParallelOptions isn't provided but not when they get this default unlimited behavior?

Or do you think any kind of warning is unnecessary?

With the APIs shaped as they are, I don't think it makes sense to try to provide a warning. Added an API that always triggers a warning suggests either the warning is bad or the API is bad.

@stephentoub

And or a warning if the dop isn't manually changed in ParallelOptions (which probably isn't possible)

What I mean is: if the dop in ParallelOptions = default then give a warning. And don't get me wrong I don't always want the warning to show. Only when there's no manually set DOP.

Only when there's no manually set DOP.

How are you going to write an analyzer to detect that?

Is there going to be a warning for this?
C# public static async Task FooAsync(ParallelOptions options) => Parallel.ForEachAsync(..., options, ...);
?

@stephentoub that's the question. I'm asking if that's possible at all.

I'm asking if that's possible at all.

Warnings for that would easily be wrong. You don't know whether a custom dop was stored into the options or not.

@stephentoub well if it's not possible then it's not possible. Then we should just make this clear enough in the documentation.

Re: "DOP should not be automatic in te case of async parallel foreach"... how do you plan to do that? It defaults to -1 in ParallelOptions. Are you suggesting -1 would mean ProcessorCount or similar when used with ForEachAsync even though it means "no upper bound" with ForEach?

Even for things that are I/O bound, we can still choose some reasonable default, either Environment.ProcessorCount or unbounded, and document that's the default;

Any IO workload has an optimal DOP. Deviating from that DOP can quickly become quite bad. Environment.ProcessorCount is mostly unrelated to the optimal DOP. The deviation will commonly be very bad.

For example, a 2 core microservice that tries to make 10000 HTTP requests to an external service will underutilize that service greatly. A process running on a 64 core box might DOS the receiving web service into downtime. And migrating the same application with the same configuration to a smaller or larger server will now change performance and safety drastically. This could happen in the presence of cloud autoscaling.

Using Environment.ProcessorCount is quite unsafe.

Unbounded for IO is even more dangerous and most certainly inappropriate. What would unbounded even mean? Certainly not starting everything at the same time. That is easy to do with Enumerable.Select. It would involve some heuristic, but at what point would the heuristic start to throttle. Maybe some hill climbing scheme? That seems very brittle.

For the same reasons, doing IO on Parallel and PLINQ is very dangerous. Now with this new API, we know for sure that it is used for IO.

In my opinion, we should force the user to make a decision. For example, reject a -1 in the ParallelOptions or not use ParallelOptions at all.

Do we even need ParallelOptions? We need a required DOP argument and an optional CancellationToken argument. Maybe just add those two arguments to the method.

I'm still not understanding. Both IEnumerable and IAsyncEnumerable are forward-only iterators, so via those interfaces the only way they can be consumed is sequentially (unless you're suggesting we try to type check for other interfaces?) In other words, without doing anything special, we'll be processing sequentially.

I believe PLINQ and/or Parallel attempt to cast the input argument to IList<T> or array. If that is used for range partitioning this would introduce drastic deviations from the "natural" order of items. I think that's quite undesirable (for efficiency reasons as discussed above). We also don't want batching and, since this is about IO, small CPU overheads are not much of a concern. So this kind of IList<T> optimization should not be performed. Using the IEnumerable<T> in the obvious way will do exactly the right thing.

Now with this new API, we know for sure that it is used for IO.

It's the likely use case, but we don't know "for sure". Any of us can come up with a call where no I/O is involved.

Any IO workload has an optimal DOP.

In general, what's optimal is going to depend on many factors, some (most?) of which aren't known in most cases statically. You argue that Environment.ProcessorCount and unbounded are going to be wrong. I argue that it's going to be close to impossible for a developer to determine and pass in the "optimal" dop, except in very constrained cases where they're intimately aware of everything in the application, the hardware, the network, etc. If we force a dop to be provided, I can almost guarantee you we're going to see the vast majority of call sites passing in either Environment.ProcessorCount (or a constant factor of it) or some arbitrary number based on little more than sticking one's index finger in the air to see which way the wind is blowing. Allowing a dop to be provided is great, but I'm not seeing how not requiring one is any more dangerous than taking one; in fact, whether we take advantage of it or not, not taking one gives us some flexibility we lose if a strict bound is provided.

Using the IEnumerable in the obvious way will do exactly the right thing.

Right. And I expect it'll be rare (bordering on wrong) for types implementing IAsyncEnumerable<T> to also implement IList<T>, which is presumably what the guts of the implementation will be based around, so it'll naturally "do exactly the right thing".

Examples for a hardcoded DOP from my practical experience:

  1. When querying Amazon products from its API nightly for 1000 different search terms, I can decide a DOP value at development time and use that. If I go too high I will get banned from the API. The scenario is predictable.
  2. When querying from my relational database, I understand what hardware it is running on. I will pick a fairly low DOP to not crowd out other users of the system.
  3. When writing a web crawler, I will empirically determine at development time what DOP is best with respect to network usage. I can definitely set a good DOP for this by trying out different values.

None of these scenarios work with CPU count. It's not just suboptimal. The application fails in production.

If we force a dop to be provided, I can almost guarantee you we're going to see the vast majority of call sites passing in either Environment.ProcessorCount

I'd expect callers to become aware of the choice they need to make and just try a few different values.

You have given this a lot of thought. I firmly believe that is not the typical case. I expect the typical case is "I need to pass in some value here, what's a good default DOP, hmm, ProcessorCount seems like a reasonable stand-in, and I see lots of other code on the net using ProcessorCount as a default for dop, I'll use that.". Further, in most scenarios, ProcessorCount is going to be (much) lower than the optimal value for I/O-based concurrency, not higher.

I have done a lot of ETL-style data pipelines over time... That's where my interest in this comes from :smile:. It turns out that many production applications contain the basic pattern "take some data, transform it item by item, store it somewhere else".

@GSPP I think we should just provide a default but have documentation to warn devs about the pitfalls. Probably a good idea to put in XML comment/docs as well. When someone starts to dabble with parallel stuff they most likely will be bench-marking performance anyway and trying out various configs.

Even if we force someone to manually supply a DOP they'll be guessing the first time anyway. Probably selecting like @stephentoub said some random number most likely to be cpu count.

@terrajobst why this is marked back api-needs-work? I think we can go ahead with the proposed APIs and later we can add the overloads which have the default DOP if needed. But I don't think we need to block on that.

Because the proposed design is still being debated.

@stephentoub I think we should proceed with the defaults and warn users in the docs/summary. When someone start's dabbling with this kind of stuff they most likely will be testing various configs anyway. And if they don't they wouldn't do it when they're forced to supply DOP anyway.

I use this....
///


///
///

///
///
///
///
///
///
public static async System.Threading.Tasks.Task ForEachAsync(this IEnumerable source, int degreeOfParallelism,
Func body, CancellationToken token = default)
{
var tasks = new List();
using var throttler = new SemaphoreSlim(degreeOfParallelism);
foreach (var element in source)
{
await throttler.WaitAsync(token);
tasks.Add(System.Threading.Tasks.Task.Run(async () =>
{
try
{
await body(element);
}
finally
{
throttler.Release();
}
}, token));
}
await System.Threading.Tasks.Task.WhenAll(tasks);
}

Can we revisit this for .NET 6?

I believe there is tremendous demand for such an API (as evidenced by the 63 positive votes at this time).

Even if no proposal in this thread seems quite right to the team, the demand for this capability seems to justify some progress in this direction.

The core of this proposal is to

  1. Process a set of items
  2. in parallel
  3. using an async API.

The API could take various shapes but we are longing for something.

As many APIs are rate-limited (eg. 100 calls / min), Could we consider having a variant based on time which takes in parameter ?

  • int numberOfCalls
  • TimeSpan time

@YohanSciubukgian I don't think this API is the correct place for such a feature. That's better suited for the devs using this API to do on their own.

Was this page helpful?
0 / 5 - 0 ratings