Runtime: Proposal: Add IAsyncEnumerable<T> support to System.Threading.Channels

Created on 10 Oct 2018  Â·  20Comments  Â·  Source: dotnet/runtime

_(EDIT: Updated 1/4/2019 based on IAsyncEnumerable design changes.)_

When we originally prototyped System.Threading.Channels, we included IAsyncEnumerable<T> support with a temporary IAsyncEnumerable<T> implementation,, but as IAsyncEnumerable<T> wasn't actually available when System.Threading.Channels shipped its first release, we cut that support with the idea of adding it back later. Now that IAsyncEnumerable<T> is available as of .NET Core 3.0, it's now that "later".

Proposal

Add the following interface implementation:
```C#
public abstract class ChannelReader // existing type
: IAsyncEnumerable // new interface implementation
{
…
public virtual IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default);
}

The base implementation will simply provide an async enumerator veneer over the other channel reader methods, e.g.
```C#
public virtual async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
    while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false))
        while (TryRead(out T item))
            yield return item;
}

but derived implementations can override to provide a more efficient implementation if applicable.

With that, you'll be able to await foreach over a channel reader, e.g.
```C#
await foreach (T item in channel.Reader)
{
… // handle each item read from the channel
}

or using various extensions on `IAsyncEnumerable<T>`, such as from an async LINQ implementation like that in Ix.NET:
```C#
await foreach (T item in channel.Reader.WhereAsync(…))
{
    … // handle each filtered item from the channel
}

Open issues:

  • Developers generally think of enumeration as going through existing data rather than mutating, but enumerating the reader would mutate the channel, pulling data out of it. When we faced that same question with BlockingCollection<T>, we chose to use "consuming" in a name (GetConsumingEnumerable), to highlight that this enumerable actually mutates as it pulls out data. Should we do the same thing here, e.g. exposing an AsConsumingAsyncEnumerable that returns an IAsyncEnumerable<T> rather than implementing the interface directly?
api-approved area-System.Threading.Channels

Most helpful comment

We’re looking at implementing this in SignalR and we’re adapting right now

All 20 comments

What is the downside to using do-while instead

do
{
  while (TryRead(out T item))
  {
    yield return item;
  }
}
while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false));

Was interested in extensions that provided this kind of feature set. Good? Bad?

channel.Read((item,index)=>{ /* do stuff */ });
channel.Read(async (item,index)=>{ /* do stuff */ await something; });

The underlying extension would basically be the previous comment.

Didn't want to code a lib just for this purpose if the functionality already existed, and was curious if it seemed futile once async enumerables were available.

IAsyncEnumerable is superior to using callbacks. Why add them?

@davidfowl...
What if you could setup your channel like so:

channel
   .Write(writer=>/* is run when writing is ready and stops if complete is called */)
   .Read((item,localIndex)=>/* receives the item */);

I'm simply contemplating simplicity and readability for simple producer consumer scenarios.
And I'm asking if it's worth it as I have not had exposure to C# 8.

What if you could setup your channel like so

I think you'll find that makes the code actually harder to write/read/maintain for anything but the simplest of use cases. Of course, if it works for your needs and fits your desired style, go for it: you should absolutely be able to write that on top. I just don't think we should provide that built-in.

What is the downside to using do-while instead

A do-while would be fine. Which is better would depend on the expected input. It really only affects the first time through the loop, and whether the first call you make is to TryRead or WaitForReadAsync... if you believe it's more likely that data will already be available, TryRead first is better... if you believe it's more likely that data will not already be available, then WaitForReadAsync first is better. But, as this only affects the first iteration, any difference is likely to be in the noise, ammortized across the processing of the entire sequence.

@stephentoub
Just wrapped this up:
https://github.com/electricessence/Open.ChannelExtensions
https://www.nuget.org/packages/Open.ChannelExtensions

For the work I've been doing with throttling and pipelining, something like this seems quite valid:

await Channel
    .CreateBounded<T>(10)
    .SourceAsync(source /* IEnumerable<Task<T>> */) /* returns ChannelReader<T> */
    .PipeAsync(asyncTransform01, 5) /* returns ChannelReader<TOut> */
    .Pipe(transform02, 2) /* returns ChannelReader<TOut> */
    .ReadAllAsync(finalTransformedValue => {
        // Do something.
    });

I did my best to follow patterns you have inside Channels for early exit ValueTask etc.
I will eventually add IAsyncEnumerable<T> when it's included.
If there was something that already did this and I missed it, please let me know.Would welcome any feedback or comments.

@stephentoub is this happening in preview3?

The code is easy. Just need to be reviewed.

We’re looking at implementing this in SignalR and we’re adapting right now

Video

We concluded that exposing it as a method is better as it's "destructive", i.e. it drains the items. We settled on ReadAllAsync().

That name sounds better then GetConsumingAsyncEnumerable() 😄

I would have expected GetConsumingAsyncEnumerable() given that there's already precedent in BlockingCollection<T>.

I didn't hear this come up in the video, but DbDataReader implements IEnumerable directly and destructively.

https://docs.microsoft.com/dotnet/api/system.data.common.dbdatareader
https://github.com/dotnet/corefx/blob/bc0cfc49d1a8c1681fb09603f3389ef65342d542/src/System.Data.SqlClient/src/System/Data/SqlClient/SqlDataReader.cs#L1176-L1179

I think this may have been to enable LINQ over IDataRecord instances. I've never used it because, when I discovered it six years ago, I knew I wanted IAsyncEnumerable.

Thanks, @jnm2; I wasn't aware of that one.

I know this is closed, but is there any reason why this in only supported in a netcoreapp project? I thought this was a C# 8 feature so should be able to use netstandard2.1

Or have I missed something somewhere?

The support wasn't available in netstandard2.1 when this was added. I assume a netstandard2.1 configuration/build could be added to the package.
cc: @terrajobst

🥇

@stephentoub Will netstandard2.1 support be added?

Will netstandard2.1 support be added?

Don't know. I opened https://github.com/dotnet/runtime/issues/817.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

chunseoklee picture chunseoklee  Â·  3Comments

jkotas picture jkotas  Â·  3Comments

jzabroski picture jzabroski  Â·  3Comments

sahithreddyk picture sahithreddyk  Â·  3Comments

nalywa picture nalywa  Â·  3Comments