_(EDIT: Updated 1/4/2019 based on IAsyncEnumerable design changes.)_
When we originally prototyped System.Threading.Channels, we included IAsyncEnumerable<T> support with a temporary IAsyncEnumerable<T> implementation,, but as IAsyncEnumerable<T> wasn't actually available when System.Threading.Channels shipped its first release, we cut that support with the idea of adding it back later. Now that IAsyncEnumerable<T> is available as of .NET Core 3.0, it's now that "later".
Proposal
Add the following interface implementation:
```C#
public abstract class ChannelReader
: IAsyncEnumerable
{
…
public virtual IAsyncEnumerator
}
The base implementation will simply provide an async enumerator veneer over the other channel reader methods, e.g.
```C#
public virtual async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false))
while (TryRead(out T item))
yield return item;
}
but derived implementations can override to provide a more efficient implementation if applicable.
With that, you'll be able to await foreach over a channel reader, e.g.
```C#
await foreach (T item in channel.Reader)
{
… // handle each item read from the channel
}
or using various extensions on `IAsyncEnumerable<T>`, such as from an async LINQ implementation like that in Ix.NET:
```C#
await foreach (T item in channel.Reader.WhereAsync(…))
{
… // handle each filtered item from the channel
}
Open issues:
BlockingCollection<T>, we chose to use "consuming" in a name (GetConsumingEnumerable), to highlight that this enumerable actually mutates as it pulls out data. Should we do the same thing here, e.g. exposing an AsConsumingAsyncEnumerable that returns an IAsyncEnumerable<T> rather than implementing the interface directly?What is the downside to using do-while instead
do
{
while (TryRead(out T item))
{
yield return item;
}
}
while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false));
Was interested in extensions that provided this kind of feature set. Good? Bad?
channel.Read((item,index)=>{ /* do stuff */ });
channel.Read(async (item,index)=>{ /* do stuff */ await something; });
The underlying extension would basically be the previous comment.
Didn't want to code a lib just for this purpose if the functionality already existed, and was curious if it seemed futile once async enumerables were available.
Here's what I've got for now:
https://github.com/electricessence/Open.ChannelExtensions
https://www.nuget.org/packages/Open.ChannelExtensions
IAsyncEnumerable is superior to using callbacks. Why add them?
@davidfowl...
What if you could setup your channel like so:
channel
.Write(writer=>/* is run when writing is ready and stops if complete is called */)
.Read((item,localIndex)=>/* receives the item */);
I'm simply contemplating simplicity and readability for simple producer consumer scenarios.
And I'm asking if it's worth it as I have not had exposure to C# 8.
What if you could setup your channel like so
I think you'll find that makes the code actually harder to write/read/maintain for anything but the simplest of use cases. Of course, if it works for your needs and fits your desired style, go for it: you should absolutely be able to write that on top. I just don't think we should provide that built-in.
What is the downside to using do-while instead
A do-while would be fine. Which is better would depend on the expected input. It really only affects the first time through the loop, and whether the first call you make is to TryRead or WaitForReadAsync... if you believe it's more likely that data will already be available, TryRead first is better... if you believe it's more likely that data will not already be available, then WaitForReadAsync first is better. But, as this only affects the first iteration, any difference is likely to be in the noise, ammortized across the processing of the entire sequence.
@stephentoub
Just wrapped this up:
https://github.com/electricessence/Open.ChannelExtensions
https://www.nuget.org/packages/Open.ChannelExtensions
For the work I've been doing with throttling and pipelining, something like this seems quite valid:
await Channel
.CreateBounded<T>(10)
.SourceAsync(source /* IEnumerable<Task<T>> */) /* returns ChannelReader<T> */
.PipeAsync(asyncTransform01, 5) /* returns ChannelReader<TOut> */
.Pipe(transform02, 2) /* returns ChannelReader<TOut> */
.ReadAllAsync(finalTransformedValue => {
// Do something.
});
I did my best to follow patterns you have inside Channels for early exit ValueTask etc.
I will eventually add IAsyncEnumerable<T> when it's included.
If there was something that already did this and I missed it, please let me know.Would welcome any feedback or comments.
@stephentoub is this happening in preview3?
The code is easy. Just need to be reviewed.
We’re looking at implementing this in SignalR and we’re adapting right now
We concluded that exposing it as a method is better as it's "destructive", i.e. it drains the items. We settled on ReadAllAsync().
That name sounds better then GetConsumingAsyncEnumerable() 😄
I would have expected GetConsumingAsyncEnumerable() given that there's already precedent in BlockingCollection<T>.
I didn't hear this come up in the video, but DbDataReader implements IEnumerable directly and destructively.
https://docs.microsoft.com/dotnet/api/system.data.common.dbdatareader
https://github.com/dotnet/corefx/blob/bc0cfc49d1a8c1681fb09603f3389ef65342d542/src/System.Data.SqlClient/src/System/Data/SqlClient/SqlDataReader.cs#L1176-L1179
I think this may have been to enable LINQ over IDataRecord instances. I've never used it because, when I discovered it six years ago, I knew I wanted IAsyncEnumerable.
Thanks, @jnm2; I wasn't aware of that one.
I know this is closed, but is there any reason why this in only supported in a netcoreapp project? I thought this was a C# 8 feature so should be able to use netstandard2.1
Or have I missed something somewhere?
The support wasn't available in netstandard2.1 when this was added. I assume a netstandard2.1 configuration/build could be added to the package.
cc: @terrajobst
🥇
@stephentoub Will netstandard2.1 support be added?
Will netstandard2.1 support be added?
Don't know. I opened https://github.com/dotnet/runtime/issues/817.
Most helpful comment
We’re looking at implementing this in SignalR and we’re adapting right now