This will help with adoption of pipelines. The usual adapter code involves creating 2 pipes and creating 2 async loops that read from the stream and write into the pipe and reading from the pipe and writing into the Stream.
There are 2 directions: Stream -> Pipe and Pipe -> Stream.
```C#
public class PipeReader
{
public static PipeReader Create(Stream stream, StreamPipeReaderOptions readerOptions = null);
public virtual Stream AsStream();
public virtual Task CopyToAsync(Stream stream, CancellationToken cancellationToken = default);
}
public class PipeWriter
{
public static PipeWriter Create(Stream stream, StreamPipeWriterOptions writerOptions = null);
public virtual Stream AsStream();
protected internal virtual Task CopyFromAsync(Stream stream, CancellationToken cancellationToken = default);
}
public class StreamPipeWriterOptions
{
private const int DefaultMinimumBufferSize = 4096;
public StreamPipeWriterOptions(MemoryPool<byte> pool = null, int minimumBufferSize = DefaultMinimumBufferSize)
{
Pool = pool;
MinimumBufferSize = minimumBufferSize;
}
public int MinimumBufferSize { get; }
public MemoryPool<byte> Pool { get; }
}
public class StreamPipeReaderOptions
{
private const int DefaultBufferSize = 4096;
private const int DefaultMinimumReadSize = 1024;
public StreamPipeReaderOptions(MemoryPool<byte> memoryPool = null, int bufferSize = DefaultBufferSize, int minimumReadSize = DefaultMinimumReadSize)
{
Pool = pool;
BufferSize = bufferSize;
MinimumReadSize = minimumReadSize;
}
public int BufferSize { get; }
public int MinimumReadSize { get; }
public MemoryPool<byte> Pool { get; }
}
```C#
public static class StreamPipeExtensions
{
public static Task CopyToAsync(this Stream stream, PipeWriter pipeWriter, CancellationToken cancellationToken = default)
{
return pipeWriter.CopyFromAsync(stream, cancellationToken);
}
}
NOTES:
Most of the per read/write costs can be mitigated by using CopyToAsync (if overridden by the Stream) but there are some downsides there as well.
We can avoid some of these overheads if we implement a PipeReader on top of CopyToAsync that doesn't use the Pipe internally. The idea here is that we call CopyToAsync on a fake stream that forwards WriteAsync calls to the PipeReader consumer. This implementation would pass buffers directly from the Stream to the consumer. If the consumer doesn't process the entire buffer, only the unconsumed buffer is copied into an internal buffer for the next read.
The write side isn't as problematic because we need to be able to allocate memory to write into the Stream so reusing the pipe isn't so bad here. The implementation here would likely be using a Pipe internally, then writing to the Stream on FlushAsync.
Other implementations
@joshfree Unassigning @pakrym for now. Once this is triaged for a milestone, we can decide on an owner.
FYI I have working tested versions of these in Pipelines.Sockets.Unofficial, because I need them for StackExchange.Redis (in the absence of a TLS pipe API, we're using SslStream here); everything StreamConnector.* here: https://github.com/mgravell/Pipelines.Sockets.Unofficial/tree/master/src/Pipelines.Sockets.Unofficial
I wrote up extension methods to do this: https://github.com/AArnott/Nerdbank.Streams/pull/11
We need an API design for this. We'll do this in 3.0.
public static (Pipe left, Pipe right) CreateDuplexPipe(PipeOptions oneOptions, PipeOptions twoOptions);
It looks like you're divided between calling the pipes left and right vs. one and two. Perhaps pick a naming scheme that can be applied to both parameters and named tuple items.
It looks like you're divided between calling the pipes left and right vs. one and two. Perhaps pick a naming scheme that can be applied to both parameters and named tuple items.
I expect we'll spend the most time on this in the API review meeting. It was one of the more contentious APIs:
https://github.com/aspnet/KestrelHttpServer/blob/release/2.2/src/Kestrel.Core/Internal/DuplexPipe.cs
public static (Pipe left, Pipe right) CreateDuplexPipe
:bulb: According to the latest API Design Review video, public APIs that return Tuples should use PropertyCase for naming their elements.
Also, I feel like naming them "left" and "right" is rather unintuitive. Since it makes a duplex pipe, why not return one already initialized as the "Reader" and the other as the "Writer"?
Or input / output
Notes are here: https://github.com/dotnet/apireviews/pull/90
@terrajobst After implementing this I think we should have an overload of CopyFromAsync should take a bufferSize argument and also add an overload on StreamPipeExtensions to flow the same bufferSize argument. We'd default to 0 which means whatever the PipeWriter.GetMemory() default is.
cc @pakrym
@stephentoub @terrajobst what are your thoughts on making the stream wrappers public? That way, we can derive them and disable Synchronous IO for our implementations in ASP.NET Core.
what are your thoughts on making the stream wrappers public? That way, we can derive them and disable Synchronous IO for our implementations in ASP.NET Core.
We discussed this in the API review, and @davidfowl seemed comfortable with not exposing them. I think the general sentiment was that it'd be better to start with just these virtuals, and if exposing the concrete types seemed useful in the future, we could choose to do so subsequently.
That said, I'm ok with it if it's valuable. But I suggest we treat that as a separate API issue that's proposed/reviewed like any other, even if it's prioritized.
We'll do it once we finish this initial pass, there are a few API tweaks I want to add/make after finishing this implementation.
@davidfowl, is this done such that this issue can be closed, or is it tracking additional work beyond https://github.com/dotnet/corefx/pull/35399?
There鈥檚 one part of the work that I still have to do. That is write the stream -> pipe adapter
Most helpful comment
Or
input/output