Efcore: Move to new IAsyncEnumerable in the C# 8 timeframe

Created on 17 May 2018  Â·  19Comments  Â·  Source: dotnet/efcore

If plans land as we hope, there will be support for async streams integrated into the languages and runtime. We want to move from the current definition of the interface in System.Interactive.Async to wherever the new interface lands.

closed-fixed type-enhancement

Most helpful comment

Only supporting netcoreapp3.0 is very good.

All 19 comments

Updated design for the new IAsyncEnumerable<T> interface at https://github.com/dotnet/corefx/issues/32640.

@divega - Since this feature requires .Net Standard 2.1, which .Net Framework 4.8 will not support, does that mean we are not supporting .Net Framework for anything past 2.2?

If so we should also consider using the new default interface feature of C# 8. That will allow us to more easily change out public interfaces in point releases without breaking things.

@pmiddleton Current plan is that EF Core 3.0 will still target .NET Framework. We're still working on the details of how C#8 features play with this.

A few notes from things I have investigated so far:

  1. As expected, the C# 8.0 compiler uses pattern matching for await foreach. Therefore it is possible to bring your own types that match the IAsyncEnumerable interface and things will work:

``` c#
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Dev16Test
{
class Program
{
static async Task Main(string[] args)
{
var xs = new FakeAsyncEnumerable();
await foreach(var x in xs)
{
}
}
}

public class FakeAsyncEnumerable<T>
{
    public FakeAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
    {
        return new FakeAsyncEnumerator<T>();
    }
}

public class FakeAsyncEnumerator<T>
{
    public T Current { get; }
    public ValueTask<bool> MoveNextAsync()
    {
        return new ValueTask<bool>(false);
    }
}

}

2. Also as expected, it is not possible to use arbitrary types for iterators. You have use the standard IAsyncEnumerable<T> or IAsyncEnumerator<T>. Otherwise you get an error like 

Error CS1624 The body of 'Program.FakeEnumerator()' cannot be an iterator block because 'IFakeAsyncEnumerable' is not an iterator interface type
```

  1. It seems that the identity of ValueTask<T> is preserved between the existing package and the definition of the type in .NET Core 3.0.0. This means it could be possible to define IAsyncEnumerable/tor interfaces in a package in a way that they match the identity of the types in .NET Core 3.0.

I made some more progress investigating what the compiler supports. It turns out that the compiler can create async iterators using user-provided definitions of IAsyncEnumerable<T>. It requires providing some additional compiler helper types, but it does not require having the same strong name as the types included in .NET Core 3.0.

Here is a minimum working example. It is a .NET Core 2.2 Console application.

Warning: this is a hack and not necessarily something we should push for or recommend customers to do. It is only data to use in our discussions.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Dev16Test
{
    class Program
    {
        static async Task Main(string[] args)
        {

            await foreach (var x in MyIterator())
            {
                Console.WriteLine($"The answer is {x}");
            }

        }

        public static async IAsyncEnumerable<int> MyIterator()
        {
            await Task.Delay(1000);
            yield return 42;
        }
    }
}

namespace System.Collections.Generic

{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        T Current { get; }

        ValueTask<bool> MoveNextAsync();
    }

}

namespace System
{ 
    public interface IAsyncDisposable
    {
        ValueTask DisposeAsync();
    }
}

namespace System.Runtime.CompilerServices
{

    using System.Runtime.InteropServices;
    using System.Threading;
    using System.Threading.Tasks;

    [StructLayout(LayoutKind.Auto)]
    public struct AsyncIteratorMethodBuilder
    {


        private AsyncTaskMethodBuilder _methodBuilder; 

        public static AsyncIteratorMethodBuilder Create() => default; 

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void MoveNext<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine =>
            _methodBuilder.Start(ref stateMachine);

        public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
            where TAwaiter : INotifyCompletion
            where TStateMachine : IAsyncStateMachine =>
            _methodBuilder.AwaitOnCompleted(ref awaiter, ref stateMachine);

        public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
            where TAwaiter : ICriticalNotifyCompletion
            where TStateMachine : IAsyncStateMachine =>
            _methodBuilder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);

        public void Complete() => _methodBuilder.SetResult();

        internal object ObjectIdForDebugger => null; // was supposed to retun something internal
    }
}



namespace System.Threading.Tasks.Sources
{
    using System.Diagnostics;
    using System.Runtime.ExceptionServices;
    using System.Runtime.InteropServices;
    [StructLayout(LayoutKind.Auto)]
    public struct ManualResetValueTaskSourceCore<TResult>
    {

        private Action<object> _continuation;
        private object _continuationState;
        private ExecutionContext _executionContext;
        private object _capturedContext;
        private bool _completed;
        private TResult _result;
        private ExceptionDispatchInfo _error;
        private short _version;

        public bool RunContinuationsAsynchronously { get; set; }

        public void Reset()
        {
            _version++;
            _completed = false;
            _result = default;
            _error = null;
            _executionContext = null;
            _capturedContext = null;
            _continuation = null;
            _continuationState = null;
        }

        public void SetResult(TResult result)
        {
            _result = result;
            SignalCompletion();
        }

        public void SetException(Exception error)
        {
            _error = ExceptionDispatchInfo.Capture(error);
            SignalCompletion();
        }

        public short Version => _version;

        public ValueTaskSourceStatus GetStatus(short token)
        {
            ValidateToken(token);
            return
                !_completed ? ValueTaskSourceStatus.Pending :
                _error == null ? ValueTaskSourceStatus.Succeeded :
                _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
                ValueTaskSourceStatus.Faulted;
        }

        [StackTraceHidden]
        public TResult GetResult(short token)
        {
            ValidateToken(token);
            if (!_completed)
            {
                ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
            }

            _error?.Throw();
            return _result;
        }


        public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
        {
            if (continuation == null)
            {
                throw new ArgumentNullException(nameof(continuation));
            }
            ValidateToken(token);

            if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
            {
                _executionContext = ExecutionContext.Capture();
            }

            if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
            {
                SynchronizationContext sc = SynchronizationContext.Current;
                if (sc != null && sc.GetType() != typeof(SynchronizationContext))
                {
                    _capturedContext = sc;
                }
                else
                {
                    TaskScheduler ts = TaskScheduler.Current;
                    if (ts != TaskScheduler.Default)
                    {
                        _capturedContext = ts;
                    }
                }
            }

            object oldContinuation = _continuation;
            if (oldContinuation == null)
            {
                _continuationState = state;
                oldContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null);
            }

            if (oldContinuation != null)
            {
                if (!ReferenceEquals(oldContinuation, ManualResetValueTaskSourceCoreShared.s_sentinel))
                {
                    ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
                }

                switch (_capturedContext)
                {
                    case null:
                        if (_executionContext != null)
                        {
                            ThreadPool.QueueUserWorkItem(continuation, state, preferLocal: true);
                        }
                        else
                        {
                            ThreadPool.QueueUserWorkItem(continuation, state, preferLocal: true); // Was supposed to call UnsafeQueueUserWorkItem
                        }
                        break;

                    case SynchronizationContext sc:
                        sc.Post(s =>
                        {
                            var tuple = (Tuple<Action<object>, object>)s;
                            tuple.Item1(tuple.Item2);
                        }, Tuple.Create(continuation, state));
                        break;

                    case TaskScheduler ts:
                        Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
                        break;
                }
            }
        }

        private void ValidateToken(short token)
        {
            if (token != _version)
            {
                ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
            }
        }

        private void SignalCompletion()
        {
            if (_completed)
            {
                ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
            }
            _completed = true;

            if (_continuation != null || Interlocked.CompareExchange(ref _continuation, ManualResetValueTaskSourceCoreShared.s_sentinel, null) != null)
            {
                if (_executionContext != null)
                {
                    ExecutionContext.Run(
                        _executionContext,
                        (s => ((ManualResetValueTaskSourceCore<TResult>)s).InvokeContinuation()), // to-do: file issue for public Run overload that takes generic callback to use ref
                        this);
                }
                else
                {
                    InvokeContinuation();
                }
            }
        }

        private void InvokeContinuation()
        {
            switch (_capturedContext)
            {
                case null:
                    if (RunContinuationsAsynchronously)
                    {
                        if (_executionContext != null)
                        {
                            ThreadPool.QueueUserWorkItem(_continuation, _continuationState, preferLocal: true);
                        }
                        else
                        {
                            ThreadPool.QueueUserWorkItem(_continuation, _continuationState, preferLocal: true); // was supposed to call UnsafeQueueUserWorkItem
                        }
                    }
                    else
                    {
                        _continuation(_continuationState);
                    }
                    break;

                case SynchronizationContext sc:
                    sc.Post(s =>
                    {
                        var state = (Tuple<Action<object>, object>)s;
                        state.Item1(state.Item2);
                    }, Tuple.Create(_continuation, _continuationState));
                    break;

                case TaskScheduler ts:
                    Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
                    break;
            }
        }
    }

    internal static class ManualResetValueTaskSourceCoreShared 
    {
        [StackTraceHidden]
        internal static void ThrowInvalidOperationException() => throw new InvalidOperationException();

        internal static readonly Action<object> s_sentinel = CompletionSentinel;
        private static void CompletionSentinel(object _) 
        {
            Debug.Fail("The sentinel delegate should never be invoked.");
            ThrowInvalidOperationException();
        }
    }
}

namespace System.Diagnostics
{
    [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method | AttributeTargets.Constructor | AttributeTargets.Struct, Inherited = false)]
    internal sealed class StackTraceHiddenAttribute : Attribute
    {
        public StackTraceHiddenAttribute() { }
    }
}

Do these types just need to be used by the EF library code, or the does the consumer of EF need them? I'm guessing the later so that the EF custom IAsyncEnumerable can be consumed correctly by the compiler.

What then happens if another library decides they need their own IAsyncEnumerable. Massive type collisions?

@pmiddleton I'm no expert, but I think there shouldn't be an issue with multiple versions of these types existing within the same application. Types are implicitly namespaced in their assembly (so it's fine to have two System.IAsyncDisposable residing in different assemblies. The compiler seems to only care about the API shape, so as long as your types have the proper methods on them, everything should function well. But someone with more knowledge should probably confirm this.

This is fine as long as they are internal, but if your application has two references two public types with the same name and namespace, it requires extern alias directives at the top of every file that references IAsyncEnumerable (☹) and special command line arguments to be passed to csc.exe. Since the assemblies are coming from NuGet packages, this requires a bit of MSBuild hackery:

https://github.com/aspnet/EntityFrameworkCore/blob/master/Directory.Build.targets

All in all, a frustrating experience that I tried once and decided to never put up with again.

@pmiddleton @roji @jnm2 Unlike await foreach, the async iterators part of the compiler seems to care about the types being in the exact same namespaces as the types in .NET Core 3.0/.NET Standard 2.1.

The only way I see this approach working is if we setup multi-targeting on the package containing the types so that they are not present at all for .NET Standard 2.1 compliant platforms. The package would probably need to be something we all agree on standardizing on. Otherwise the type collisions that @jnm2 and @pmiddleton mention would happen often.

We are following up on this with the language and .NET Core teams because we believe there might be other hidden complexities in this approach that we are missing.

FWIW, the code I pasted above is .NET Core 2.2 compatible code. To make it work with .NET Standard 2.0, I had to make a slightly different set of compromises: https://github.com/divega/MyAsyncEnumerables.

Only supporting netcoreapp3.0 is very good.

Only supporting netcoreapp3.0 is very good.

The whole .net ecosystem is larger that what you think. Web Assembly, unity games and android/iOS apps are based on mono.

@RainingNight @ysmoradi the way things currently look, it will be possible to support async streams by targeting .NET Standard 2.1 (which will be implemented by .NET Core 3.0, but also by other platforms). So there shouldn't be any reason to supposed this feature won't be available outside of .NET Core 3.0.

By removing IX-Async dependency and moving to netstandard2.1 now we are using IAsyncEnumerable.

@divega I tried your workaround but at compilation time I get a warning.
This code is in conflict with the ones in Microsoft.Bcl.AsyncInterfaces (<- System.Linq.Async <- System.Interactive.Async <- Microsoft.EntityFrameworkCore.2.2.6 <- Oracle.EntityFrameworkCore.2.19.70).
At runtime I still get the same exception.
Is there a way to compile thus your code replaces Microsoft.Bcl.AsyncInterfaces one ?

Note : My main project is in Framework 4.6.1 and the entity layer in a 2.0 Standard project

FWIW, the code I pasted above is .NET Core 2.2 compatible code. To make it work with .NET Standard 2.0, I had to make a slightly different set of compromises: https://github.com/divega/MyAsyncEnumerables.

@sinsedrix EF Core support for IAsyncEnumerable was introduced in version 3.0 (as indicated by the issue milestone), but you seem to be using 2.2. Note that EF Core 3.0 can run on older versions of .NET Core - the two aren't version-locked. You will have to upgrade to a newer version of EF Core, at the least.

@roji I'm stuck with EF Core 2.x since Oracle.EntityFrameworkCore last stable (2.19.70) is not compatible with EF Core 3.x

@sinsedrix EF Core support for IAsyncEnumerable was introduced in version 3.0 (as indicated by the issue milestone), but you seem to be using 2.2. Note that EF Core 3.0 can run on older versions of .NET Core - the two aren't version-locked. You will have to upgrade to a newer version of EF Core, at the least.

@sinsedrix they've very recently released a beta supporting EF Core 3.1, you may want to give that a try - though be aware that in general, upgrading from 2.2 to 3.x can be non-trivial for some types of usage.

Thanks @roji, I'm having a try with it. Hoping that a stable build will be released soon.

Was this page helpful?
0 / 5 - 0 ratings