In my use of channels recently I'm very commonly repeating a simple consumer pattern.
It would be good to have this in the code base so it got reviewed by the community and kept up to date.
var reader = _channel.Reader;
while (await reader.WaitToReadAsync().ConfigureAwait(false))
{
while (reader.TryRead(out T item))
{
// Consume item.
}
}
I do not know the best way to include this as a proper set of extensions, but just a few examples:
public static async Task Consume(this ChannelReader<T> source, Action<T> consumer)
{
while (await reader.WaitToReadAsync().ConfigureAwait(false))
{
while (reader.TryRead(out T item)) consumer(item);
}
}
public static async Task Consume(this ChannelReader<T> source, Func<T, Task> consumer, CancellationToken token)
{
while (await reader.WaitToReadAsync(token).ConfigureAwait(false))
{
await consumer(await ReadAsync(token));
}
}
This will get solved when the next version of c# adds support for async foreach.
Right. We explicitly didn't add anything here yet as we intend to do so with support ala:
C#
foreach await (var item in channel.Reader)
{
...
}
I see, so something similar to AsyncEnumerable will solve the 'next' and 'complete' problem.
Most helpful comment
Right. We explicitly didn't add anything here yet as we intend to do so with support ala:
C# foreach await (var item in channel.Reader) { ... }