Runtime: Add CopyToAsync with Func overload to Stream

Created on 23 Jun 2017  路  29Comments  路  Source: dotnet/runtime

Latest proposal

```c#
public abstract class Stream
{
public virtual void CopyTo(ReadOnlySpanAction callback, object state, int bufferSize);
public virtual Task CopyToAsync(Func, CancellationToken, Task> callback, object state, int bufferSize, CancellationToken cancellationToken);
...
}

See https://github.com/dotnet/corefx/issues/21472#issuecomment-321669769 for details.

## Original Proposal

Today `Stream` as `CopyToAsync` which acts more like a write operation than a read operation but it can also be used as a way to have a `Stream` push data to a consumer. Instead of forcing people to write a dummy `Stream` implementation that handles incoming data on `WriteAsync`, a callback could be provided to get the data that `Stream` has to hand out.

API shape:

```C#
public class Stream
{
    public Task CopyToAsync(Func<object, ArraySegment<byte>, Task> callback, object state);
    public Task CopyToAsync(Func<object, ArraySegment<byte>, CancellationToken, Task> callback, CancellationToken cancellationToken, object state);
    public Task CopyToAsync(Func<object, ArraySegment<byte>, CancellationToken, Task> callback, Int32 bufferSize, CancellationToken cancellationToken, object state);
}

The underlying implementation would just have a dummy stream that forwarded to callback on calls to WriteAsync:

C# CopyToAsync(new InternalCopyToAsyncStream(callback, state), bufferSize, cancellationToken);

/cc @stephentoub

api-approved area-System.IO

Most helpful comment

I would start small and expand when we find a need. As @stephentoub said, these are pretty corner case APIs.

All 29 comments

Thanks, David. Seems reasonable. Codified, I understand you're talking about something like this (except as an instance overload on Stream rather than as an extension method):
```C#
public static class StreamExtensions
{
public static Task CopyToAsync(this Stream source, Func, CancellationToken, Task> func, object state, int bufferSize, CancellationToken cancellationToken) =>
source.CopyToAsync(new WriteCallbackStream(func, state), bufferSize, cancellationToken);

private sealed class WriteCallbackStream : Stream
{
    private readonly Func<object, ArraySegment<byte>, CancellationToken, Task> _func;
    private readonly object _state;

    public WriteCallbackStream(Func<object, ArraySegment<byte>, CancellationToken, Task> func, object state)
    {
        _func = func;
        _state = state;
    }

    public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
        _func(_state, new ArraySegment<byte>(buffer, offset, count), cancellationToken);

    public override bool CanRead => false;
    public override bool CanSeek => false;
    public override bool CanWrite => true;
    public override void Flush() => throw new NotSupportedException();
    public override long Length => throw new NotSupportedException();
    public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
    public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();
    public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
}

}
```

I get that object state is good for performance, but since this is a convenience method, shouldn't there also be an overload (or three) without state? I.e.:

```c#
public Task CopyToAsync(Func, Task> callback);

Also, wouldn't it be better to make the state generic? I always dislike having to cast in the callback and that the `state` parameter is not type-checked. The method would then be:

```c#
public Task CopyToAsync<TState>(Func<TState, ArraySegment<byte>, Task> callback, TState state);

Sure it can be generic in this case. I'm always hesitant about generic methods that are virtual but this could just be an extension method so there's no overhead.

It would be interesting to understand how common a need such a helper actually is. I understand how it would fit into ASP.NET. Would other folks benefit from this?

I want to get pushed based IO via CopyToAsync plumbed through the system (WebSocket, Stream, StreamReader). That way we can build a system where buffer allocations can be managed by the Stream, StreamReader, WebSocket implementations so that parsers can just consume data without managing buffer pools.

I've been thinking about this more, and I agree this is a worthwhile thing to add, with a few tweaks:

  • We should add a synchronous CopyTo in addition to the asynchronous CopyToAsync.
  • They should be virtual: the base implementation can allocate an intermediate stream as was shown earlier, but derived implementations can do better than that in various situations and avoid that.
  • They should use ReadOnlyBuffer<byte> rather than ArraySegment<byte>. That way, streams that wrap something other than arrays don't have to allocate/copy to an array to benefit from this, e.g. UnmanagedMemoryStream, and the target is less able to mutate the data from the source.

As such, I suggest the following:
```C#
public abstract class Stream
{
public virtual void CopyTo(ReadOnlySpanAction callback, object state, int bufferSize);
public virtual Task CopyToAsync(Func, CancellationToken, Task> callback, object state, int bufferSize, CancellationToken cancellationToken);

...

}
```
We could also consider making the state be a generic TState instead of an object. And if we feel passing in a buffer size is onerous, we could provide a default one, either by making the argument optional with a default value (and doing the same for CancellationToken on CopyToAsync), or adding non-virtual overloads that delegate to this with a value.

In corefx itself, there are several places we could use this. For example:


EDIT 9/9/17: Based on API review discussions, changed CopyTo to work with a ReadOnlySpan.

@stephentoub Would using Span<byte> instead of Buffer<byte> make sense for CopyTo? Though it would require using a custom delegate type (not Action) and it would also mean that CopyTo and CopyToAsync would be inconsistent (one would use Span, the other Buffer).

Would using Span instead of Buffer make sense for CopyTo?

Potentially... but the two reasons you outlined were why I suggested ReadOnyBuffer<byte> instead. I don't feel strongly about it, though; I'd be ok with using ReadOnlySpan<byte> (and a custom delegate) if folks wanted to stick with the "always-use-span-for-sync" approach.

We could also consider making the state be a generic TState instead of an object.

Apparently generic virtual dispatch is the worst. That's why I avoided it here.

/cc @vancem @jkotas

API looks good, but we should swap the arguments passed to the func to match the ReadOnlySpanAction<T>:

C# public abstract class Stream { public virtual void CopyTo(ReadOnlySpanAction<byte, object> callback, object state, int bufferSize); public virtual Task CopyToAsync(Func<ReadOnlyBuffer<byte>, object, CancellationToken, Task> callback, object state, int bufferSize, CancellationToken cancellationToken); ... }

Does this also include an overload similar to dotnet/corefx#23312 for completeness?

FWIW, these are pretty advanced APIs; I'm not entirely sure completeness is a good thing in this case. But I don't feel strongly about it either way.

@terrajobst, @KrzysztofCwalina, @weshaggard? The question is whether we should have:
```C#
public abstract class Stream
{
public virtual void CopyTo(ReadOnlySpanAction callback, object state, int bufferSize);

public virtual Task CopyToAsync(Func<ReadOnlyBuffer<byte>, object, CancellationToken, Task> callback, object state, int bufferSize, CancellationToken cancellationToken);
...

}

or:
```C#
public abstract class Stream
{
    public void CopyTo(ReadOnlySpanAction<byte, object> callback, object state);
    public virtual void CopyTo(ReadOnlySpanAction<byte, object> callback, object state, int bufferSize);

    public Task CopyToAsync(Func<ReadOnlyBuffer<byte>, object, CancellationToken, Task> callback, object state);
    public Task CopyToAsync(Func<ReadOnlyBuffer<byte>, object, CancellationToken, Task> callback, object state, int bufferSize);
    public Task CopyToAsync(Func<ReadOnlyBuffer<byte>, object, CancellationToken, Task> callback, object state, CancellationToken cancellationToken);
    public virtual Task CopyToAsync(Func<ReadOnlyBuffer<byte>, object, CancellationToken, Task> callback, object state, int bufferSize, CancellationToken cancellationToken);
    ...
}

I would start small and expand when we find a need. As @stephentoub said, these are pretty corner case APIs.

I agreed we should start small here.

I am willing to help on that topic.
I propose to split the work into multiple successive tasks:

  • [x] Implement in System.Private.CoreLib in coreclr (https://github.com/dotnet/coreclr/pull/27639)
  • [x] Expose from System.Runtime contract in corefx (https://github.com/dotnet/runtime/pull/48)
  • [x] Add tests to System.IO tests in corefx (https://github.com/dotnet/runtime/pull/390)
  • [ ] Use it in HashAlgorithm.ComputeHash(Stream) (https://github.com/dotnet/runtime/pull/391)

The approved API is now implemented.
@davidfowl Does it fit your initial need ? Do you expect additional changes ?

Nothing yet. I鈥檒l give it a play next week

@davidfowl Did you get a chance to validate yet?

Repo consolidation plus asp.net has a super old CoreFx build, not yet

After answering to https://github.com/dotnet/runtime/issues/2062, I wonder whether we should enhance the implemented API in order to be more "user friendly" by:

  • Moving state to a generic TState in order to avoid casting from object
  • Making bufferSize and cancellationToken optional parameters
  • Add a CopyToAsync overload taking a sync action as parameter for use cases where we only want the read operation to be async.

This would lead to the following API:

public abstract class Stream
{
    public void CopyTo<TState>(ReadOnlySpanAction<byte, TState> callback, TState state);
    public virtual void CopyTo<TState>(ReadOnlySpanAction<byte, TState> callback, TState state, int bufferSize);

    public Task CopyToAsync<TState>(ReadOnlySpanAction<byte, TState> callback, TState state, CancellationToken cancellationToken = default);
    public virtual Task CopyToAsync<TState>(ReadOnlySpanAction<byte, TState> callback, TState state, int bufferSize, CancellationToken cancellationToken = default);

    public Task CopyToAsync<TState>(Func<ReadOnlyMemory<byte>, TState, CancellationToken, ValueTask> callback, TState state, CancellationToken cancellationToken = default);
    public virtual Task CopyToAsync<TState>(Func<ReadOnlyMemory<byte>, TState, CancellationToken, ValueTask> callback, TState state, int bufferSize, CancellationToken cancellationToken = default);
}

Indeed, I can contribute to make it happen in coming release.

@davidfowl What is your opinion on this?

This is a more advanced API. It does not need to be the paramount of easy to use.

Moving state to a generic TState in order to avoid casting from object

This would make it a generic virtual method. That has non-trivial performance implications and should be avoided.

Add a CopyToAsync overload taking a sync action as parameter for use cases where we only want the read operation to be async.

From my perspective this is unnecessary complication. I already wonder whether we should be removing this altogether, not adding more. We have very few places where we can take advantage of it, very few streams that override it, and if you use the API on a stream that doesn't, you may very well be paying more than you could otherwise, aka a pit of failure. I would like not to extend it further without _really_ strong motivation.

@stephentoub @manandre all these PRs got merged, are there still any stream classes that need to get a CopyToAsync override, or can we consider this API Proposal fixed and close it?

For reference, here's Stephen's reply with a list of streams that we wanted to benefit from this new API, and it seems they were all addressed in Emmanuel's PRs.

Initial intent of this API change request is reached. I propose to close it.

I've left it open because I'm not convinced there's enough value in actually keeping the API, and I think we should consider removing it prior to shipping .NET 5. Very few streams are currently overriding it, and there's currently zero consumption of it anywhere in the stack as far as I can tell. That's not promising for adding a new virtual like this to Stream. Are we expecting more overrides, and if so, where? Are we expecting more consumption anywhere in the stack?

@davidfowl there seemed to be interest in consuming these APIs in ASP.NET. Were you able to use them for your needs?

Ping @davidfowl

We haven鈥檛 used this at all

Should we revert this? We probably don't have any plan to use it in the short term (.NET 5) either. We've changed strategy a bit here.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

ebickle picture ebickle  路  318Comments

iSazonov picture iSazonov  路  139Comments

ghuntley picture ghuntley  路  158Comments

PureKrome picture PureKrome  路  157Comments

morganbr picture morganbr  路  225Comments