Runtime: API Proposal: PipeReader and PipeWriter implementations over a Stream

Created on 19 Feb 2018  路  15Comments  路  Source: dotnet/runtime

This will help with adoption of pipelines. The usual adapter code involves creating 2 pipes and creating 2 async loops that read from the stream and write into the pipe and reading from the pipe and writing into the Stream.

There are 2 directions: Stream -> Pipe and Pipe -> Stream.

  • Read only
  • Write only

```C#
public class PipeReader
{
public static PipeReader Create(Stream stream, StreamPipeReaderOptions readerOptions = null);

public virtual Stream AsStream();
public virtual Task CopyToAsync(Stream stream, CancellationToken cancellationToken = default);

}

public class PipeWriter
{
public static PipeWriter Create(Stream stream, StreamPipeWriterOptions writerOptions = null);

public virtual Stream AsStream();
protected internal virtual Task CopyFromAsync(Stream stream, CancellationToken cancellationToken = default);

}

public class StreamPipeWriterOptions
{
private const int DefaultMinimumBufferSize = 4096;

public StreamPipeWriterOptions(MemoryPool<byte> pool = null, int minimumBufferSize = DefaultMinimumBufferSize)
{
    Pool = pool;
    MinimumBufferSize = minimumBufferSize;
}

public int MinimumBufferSize { get; }
public MemoryPool<byte> Pool { get; }

}

public class StreamPipeReaderOptions
{
private const int DefaultBufferSize = 4096;
private const int DefaultMinimumReadSize = 1024;

public StreamPipeReaderOptions(MemoryPool<byte> memoryPool = null, int bufferSize = DefaultBufferSize, int minimumReadSize = DefaultMinimumReadSize)
{
    Pool = pool;
    BufferSize = bufferSize;
    MinimumReadSize = minimumReadSize;
}

public int BufferSize { get; }
public int MinimumReadSize { get; }
public MemoryPool<byte> Pool { get; }

}

```C#
public static class StreamPipeExtensions
{
    public static Task CopyToAsync(this Stream stream, PipeWriter pipeWriter, CancellationToken cancellationToken = default)
    {
         return pipeWriter.CopyFromAsync(stream, cancellationToken);
    }
}

NOTES:

  • Some streams buffer internally and we may end up copying from the Stream's internal buffer into the pipe's buffers.
  • Even if we can avoid that, we'll end up allocating a Task per read and write operation (though some streams cache the result of the previous operation)
  • We end up paying per read/write costs in general (for e.g. in FileStream allocating via ThreadPoolBoundHandle.AllocateNativeOverlapped per read/write pair).

Most of the per read/write costs can be mitigated by using CopyToAsync (if overridden by the Stream) but there are some downsides there as well.

  • Using the default implementations CopyToAsync allocates an internal buffer if the Stream doesn't have one already and passes that buffer to the other stream. Using the default pipe implementation, we end up copying the Stream's buffer into the pipe's buffer which might be fine but is a bit unfortunate.

We can avoid some of these overheads if we implement a PipeReader on top of CopyToAsync that doesn't use the Pipe internally. The idea here is that we call CopyToAsync on a fake stream that forwards WriteAsync calls to the PipeReader consumer. This implementation would pass buffers directly from the Stream to the consumer. If the consumer doesn't process the entire buffer, only the unconsumed buffer is copied into an internal buffer for the next read.

The write side isn't as problematic because we need to be able to allocate memory to write into the Stream so reusing the pipe isn't so bad here. The implementation here would likely be using a Pipe internally, then writing to the Stream on FlushAsync.

Other implementations

api-approved area-System.IO.Pipelines

Most helpful comment

Or input / output

All 15 comments

@joshfree Unassigning @pakrym for now. Once this is triaged for a milestone, we can decide on an owner.

FYI I have working tested versions of these in Pipelines.Sockets.Unofficial, because I need them for StackExchange.Redis (in the absence of a TLS pipe API, we're using SslStream here); everything StreamConnector.* here: https://github.com/mgravell/Pipelines.Sockets.Unofficial/tree/master/src/Pipelines.Sockets.Unofficial

I wrote up extension methods to do this: https://github.com/AArnott/Nerdbank.Streams/pull/11

We need an API design for this. We'll do this in 3.0.

     public static (Pipe left, Pipe right) CreateDuplexPipe(PipeOptions oneOptions, PipeOptions twoOptions);

It looks like you're divided between calling the pipes left and right vs. one and two. Perhaps pick a naming scheme that can be applied to both parameters and named tuple items.

It looks like you're divided between calling the pipes left and right vs. one and two. Perhaps pick a naming scheme that can be applied to both parameters and named tuple items.

I expect we'll spend the most time on this in the API review meeting. It was one of the more contentious APIs:

https://github.com/aspnet/KestrelHttpServer/blob/release/2.2/src/Kestrel.Core/Internal/DuplexPipe.cs

public static (Pipe left, Pipe right) CreateDuplexPipe

:bulb: According to the latest API Design Review video, public APIs that return Tuples should use PropertyCase for naming their elements.

Also, I feel like naming them "left" and "right" is rather unintuitive. Since it makes a duplex pipe, why not return one already initialized as the "Reader" and the other as the "Writer"?

Or input / output

@terrajobst After implementing this I think we should have an overload of CopyFromAsync should take a bufferSize argument and also add an overload on StreamPipeExtensions to flow the same bufferSize argument. We'd default to 0 which means whatever the PipeWriter.GetMemory() default is.

cc @pakrym

@stephentoub @terrajobst what are your thoughts on making the stream wrappers public? That way, we can derive them and disable Synchronous IO for our implementations in ASP.NET Core.

what are your thoughts on making the stream wrappers public? That way, we can derive them and disable Synchronous IO for our implementations in ASP.NET Core.

We discussed this in the API review, and @davidfowl seemed comfortable with not exposing them. I think the general sentiment was that it'd be better to start with just these virtuals, and if exposing the concrete types seemed useful in the future, we could choose to do so subsequently.

That said, I'm ok with it if it's valuable. But I suggest we treat that as a separate API issue that's proposed/reviewed like any other, even if it's prioritized.

We'll do it once we finish this initial pass, there are a few API tweaks I want to add/make after finishing this implementation.

@davidfowl, is this done such that this issue can be closed, or is it tracking additional work beyond https://github.com/dotnet/corefx/pull/35399?

There鈥檚 one part of the work that I still have to do. That is write the stream -> pipe adapter

Was this page helpful?
0 / 5 - 0 ratings

Related issues

matty-hall picture matty-hall  路  3Comments

noahfalk picture noahfalk  路  3Comments

omajid picture omajid  路  3Comments

nalywa picture nalywa  路  3Comments

bencz picture bencz  路  3Comments