Runtime: Add support for asynchronously (de)serializing IAsyncEnumerable<T>

Created on 13 Jun 2019  Â·  22Comments  Â·  Source: dotnet/runtime

It would serialize to a JSON array but in a streaming manner. This would be very useful for things like MVC that want to support returning entity framework queries (which implement IAsyncEnumerable) to the response steam without buffering the entire enumeration first (which is what is currently being implemented https://github.com/aspnet/AspNetCore/pull/11118/files).

area-System.Text.Json

Most helpful comment

For what it's worth, here's how I'm doing it in an ASP.NET Core where I stream large amounts of data (way too much for the default MaxIAsyncEnumerableBufferLimit, which I don't want to increase). If anyone has improvements or remarks, feel free to comment on it. I have not done any benchmarking (because it's not important to my project) but I didn't notice performance hits from this.

public class JsonStreamingResult<T> : IActionResult
{
    public object Data { get; }
    public JsonSerializerOptions SerializerOptions { get; set; }

    public JsonStreamingResult(IEnumerable<T> data)
    {
        Data = data ?? throw new ArgumentNullException(nameof(data));
    }

    public JsonStreamingResult(IAsyncEnumerable<T> data)
    {
        Data = data ?? throw new ArgumentNullException(nameof(data));
    }

    public Task ExecuteResultAsync(ActionContext context)
    {
        IJsonStreamingResultExecutor<T> executor = context.HttpContext.RequestServices.GetRequiredService<IJsonStreamingResultExecutor<T>>();

        return executor.ExecuteAsync(context, this);
    }
}

/// <summary>
/// Contains methods for creating instances of <see cref="JsonStreamingResult{T}"/>.
/// They are for convenience but also for anonymous types because there it's necessary
/// to infer the generic type since you can't specify it.
/// </summary>
public static class JsonStreamingResult
{
    public static JsonStreamingResult<T> Create<T>(IEnumerable<T> value, JsonSerializerOptions serializerOptions = null) => new JsonStreamingResult<T>(value)
    {
        SerializerOptions = serializerOptions
    };

    public static JsonStreamingResult<T> Create<T>(IAsyncEnumerable<T> value, JsonSerializerOptions serializerOptions = null) => new JsonStreamingResult<T>(value)
    {
        SerializerOptions = serializerOptions
    };
}

// This exists so an open generic type can be registered for automatic resolution
public interface IJsonStreamingResultExecutor<T> : IActionResultExecutor<JsonStreamingResult<T>>
{
}

internal class JsonStreamingResultExecutor<T> : IJsonStreamingResultExecutor<T>
{
    private const int BufferSizeThreshold = 4 * 1024 * 1024;

    public async Task ExecuteAsync(ActionContext context, JsonStreamingResult<T> result)
    {
        HttpResponse response = context.HttpContext.Response;
        response.StatusCode = (int)HttpStatusCode.OK;
        response.ContentType = $"{MediaTypeNames.Application.Json};charset={Encoding.UTF8.WebName}";

        // https://github.com/dotnet/runtime/issues/38055
        // Once that's implemented, this can be simplified and optimized a lot
        if (result.Data is IAsyncEnumerable<T> asyncEnumerable)
        {
            using MemoryStream memoryStream = new MemoryStream();
            Utf8JsonWriter writer = new Utf8JsonWriter(memoryStream);

            try
            {
                writer.WriteStartArray();

                await foreach (T value in asyncEnumerable)
                {
                    // There's no async overload that takes an Utf8JsonWriter
                    JsonSerializer.Serialize(writer, value, result.SerializerOptions);

                    if (memoryStream.Length >= BufferSizeThreshold)
                    {
                        memoryStream.Position = 0;
                        await memoryStream.CopyToAsync(response.Body, context.HttpContext.RequestAborted);
                        memoryStream.Position = 0;
                        memoryStream.SetLength(0);
                    }
                }

                writer.WriteEndArray();
            }
            finally
            {
                await writer.DisposeAsync().ConfigureAwait(false); // flushes the array-end as well
                memoryStream.Position = 0;
                await memoryStream.CopyToAsync(response.Body, context.HttpContext.RequestAborted);
            }
        }
        else
        {
            await JsonSerializer.SerializeAsync(response.Body, (IEnumerable<T>)result.Data, result.SerializerOptions).ConfigureAwait(false);
        }
    }
}

And in ConfigureServices:

services.AddSingleton(typeof(IJsonStreamingResultExecutor<>), typeof(JsonStreamingResultExecutor<>));

Now you can just return a JsonStreamingResult from your controller and it'll stream your data without loading everything into memory. Adjust the buffer size (BufferSizeThreshold) as you wish; 4mb seemed reasonable to me (and my use-case) but I didn't experiment with other values.

public IActionResult Get()
{
    var data = myContext.MyDataset
                        .Where(x.Foo > whatever)
                        .Select(x => new
                        {
                            x.Foo,
                            x.Bar
                        });

    return JsonStreamingResult.Create(data);
}

EDIT:

  • English is hard
  • You might be able to add some more .ConfigureAwait(false)s but I didn't test it so I'll leave it like it is.

All 22 comments

cc @steveharter, @pranavkm, @layomia

You guys might be able to point me to if my issue with quick code for this is HttpClient or MVC related? https://github.com/aspnet/AspNetCore/issues/12883#issuecomment-518353206

From @tangkhaiphuong in https://github.com/dotnet/corefx/issues/41378

I start the sample project with ASP.NET Core 3.0 API and implement the Route like

    [ApiController]
    [Route("[controller]")]
    public class WeatherForecastController : ControllerBase
    {
        private static readonly string[] Summaries = new[]
        {
            "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
        };

        private readonly ILogger<WeatherForecastController> _logger;

        public WeatherForecastController(ILogger<WeatherForecastController> logger)
        {
            _logger = logger;
        }

        [HttpGet]
        public async IAsyncEnumerable<WeatherForecast> Get()
        {
            var rng = new Random();
            for (var index = 0; index < 10; ++index)
            {
                yield return new WeatherForecast
                {
                    Date = DateTime.Now.AddDays(index),
                    TemperatureC = rng.Next(-20, 55),
                    Summary = Summaries[rng.Next(Summaries.Length)]
                };
                await Task.Delay(2000).ConfigureAwait(false);
            }
        }
    }

The hit the F5 in VS 2019. On browser http://localhost:53237/weatherforecast I need to wait after the 20s then receive the full JSON array. I found on headers response include Content-Length:
I expect ASP.NET Core should return in-stream JSON item continuously instead wait to full buffer then return the whole large JSON.

Is there any configuration to enable to allow stream data chunk to the client immediately without blocking and wait?

Hi,

Can you try on Linux and we’ll as windows, I’ve got an issue open where I see this on windows but it streams on Linux as the host,

Cheers,

Ross

For IAsyncEnumerable<T> I recently created this issue which appears to be a duplicate: https://github.com/dotnet/corefx/issues/41358

@steveharter dotnet/runtime#1569 has just deserialization APIs - might be good to consolidate these two issues.

I would love to see this. Major perf enhancement for both memory and time to first data to clients (and clients could handle the stream instead if they wanted and parse the stream in json immediately instead of waiting for the entire response which has a perf benefit too.)

The key is to make sure that the entire MVC pipeline can handle the IAsyncEnumerable so that if you return ActionResult it will stream it to the client.

And it should also work on a wrapper object. So if I have:

public class BaseResponse {
public IAsyncEnumerable Result {get; set;}
}

It should serialize this over the wire in a streaming manner to the clients without ever loading the entire thing into memory when I return ActionResult and return it a base response with an IAsyncEnumerable in it. I.e. MVC needs to stream it instead of just dumping the string produced by the serializer, and the serializer needs to stream the object as it goes and handle the sub IAsyncEnumerable properly and stream the array out properly.

To me that's the acceptance test on this one. Stream it out as json incrementally in both those cases, and the client can either wait for the entire string and jsonify it, or grab the stream and stream it using a json reader to deserialize.

From @steveharter in https://github.com/dotnet/runtime/issues/1569:

This is a placeholder to uptake the recent IAsyncEnumerable feature for the serializer.

This includes creating an enumerator class that that implements IAsyncEnumerable and accepts a Stream in its constructor (and others TBD) that contains the actual JSON which can be accessed asynchronously:

public class SerializerAsyncEnumerable<T> : IAsyncEnumerable<T>
{
    public SerializerAsyncEnumerable(Stream stream);
    public SerializerAsyncEnumerable(Stream stream, JsonSerializerOptions options);
...
}

and also likely exposed from the JsonSerializer static class:

public static class JsonSerializer
{
...
    public SerializerAsyncEnumerable<T> DeserializeEnumerableAsync(Stream stream);
    public SerializerAsyncEnumerable<T> DeserializeEnumerableAsync(Stream stream, JsonSerializerOptions options);
...
}

The implementation will leverage the same patterns as in the existing async methods which preserve the unprocessed data from the stream (since the stream just returns raw JSON which won't align with object instances) and the "state" for continuing object deserialization as new JSON is obtained.

Not saying this isn't a desirable feature, but doesn't IEnumerable<T> already stream elements to the client? The difference between IEnumerable<T> and IAsyncEnumerable<T> is that each time it retrieves the next element in the enumeration, the non-async version blocks the thread, while the async version can release the thread during blocking operations. But, this blocking only applies _while retrieving that one element_. So, it's still better to be async, but if you serialize an IEnumerable<T>, you'll get elements being written to the body stream _as they come out of the enumeration, in realtime_. The real problem is that the body stream has some buffering on it, and you need to flush it periodically, otherwise you'll only receive data in big chunks each time the buffer fills up.

This tiny test app shows that JsonSerializer will stream an IEnumerable just fine -- the IEnumerable in question, produced by StreamSomeResults, has no end, so if JsonSerializer were expecting to buffer it, it would never produce any output. The internals of JsonSerializer dynamically grow the buffer, so the output is a bit choppy, but the fact that you're seeing it means that the data is being streamed. :-)

using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace TestJsonSerializerStreamEnumerable
{
    class Program
    {
        static IEnumerable<string> StreamSomeResults()
        {
            while (true)
            {
                yield return Guid.NewGuid().ToString();
                Thread.Sleep(50);
            }
        }

        static async Task Main(string[] args)
        {
            var serializerOptions = new JsonSerializerOptions();

            serializerOptions.DefaultBufferSize = 1;

            var stdout = Console.OpenStandardOutput(bufferSize: 1);

            await JsonSerializer.SerializeAsync(stdout, StreamSomeResults(), serializerOptions);
        }
    }
}

So, yes, support for IAsyncEnumerable is a good thing, but the original post's premise seems to be flawed. The specific example listed, of streaming Entity Framework query results, could be implemented in the interim with a simple adapter that wraps an IAsyncEnumerable and waits synchronously on its results:

// Untested code
public class AsyncEnumerableSyncWrapper<T> : IEnumerable
{
  IAsyncEnumerable<T> _wrapped;

  public AsyncEnumerableSyncWrapper(T toWrap) { _wrapped = toWrap; }

  public IEnumerator<T> GetEnumerator() => new AsyncEnumeratorSyncWrapper<T>(_wrapped.GetAsyncEnumerator().GetAwaiter().GetResult());
}

public class AsyncEnumeratorSyncWrapper<T> : IEnumerator, IDisposable
{
  IAsyncEnumerator<T> _wrapped;

  public AsyncEnumeratorSyncWrapper(T toWrap) { _wrapped = toWrap; }

  public T Current => _wrapped.Current;
  public bool MoveNext() => _wrapped.MoveNextAsync().GetAwaiter().GetResult();
  public void Dispose() => _wrapped.DisposeAsync().GetAwaiter().GetResult();
}

The purpose of this is the following:

Database query is executed, mapped to DTOs as part of the select, and then sent to the client.

Right now, the entire recordset is loaded into memory and returned as an ienumerable that is then sent on to the client in whatever serialization format you request, and the client may or may not stream that.

In this scenario, that database query is returned as an IAsyncEnumerable instead, and then streams record by record that data into the serializer which is then streamed out of the HTTP response.

In the later, the response starts sooner, and only the memory necessary for the single record is required at any given time instead of the entire recordset.

Notice how much more efficient and responsive the latter is versus the former?

That's what this is about. Streaming data from a source, to the client, without having to load the entire source into memory, which isn't possible right now with asp.net core except for files. Using IAsyncEnumerable and ensuring that the entire serialization and response pipeline supports it without caching in memory solves this limitation and in real world scenarios means a HUGE perf improvement on your most expensive operations.

(and since almost all REST calls are read in most real world scenarios, this has a huge impact on scalability and responsiveness for most APIs)

@JohnGalt1717 not to mention it throws after 8192 records (IIRC)

That's all true but missing my point. It's not buffering and blowing up because it's IEnumerable and not IAsyncEnumerable. The async vs non-async only applies to how _individual records_ are retrieved from the enumerator. System.Text.Json absolutely should learn how to work with IAsyncEnumerable, but it doesn't need to be async in order to _not buffer_. The thing that's buffering is ASP.NET Core's route handling, not System.Text.Json and not because of whether the enumerable is async or not.

This issue's description is:

It would serialize to a JSON array but in a streaming manner. This would be very useful for things like MVC that want to support returning entity framework queries (which implement IAsyncEnumerable) to the response steam without buffering the entire enumeration first (which is what is currently being implemented https://github.com/aspnet/AspNetCore/pull/11118/files).

The premise of this description is that JsonSerializer cannot stream serialized results if they're IEnumerable, and that it needs to be IAsyncEnumerable to make this work, but this premise is wrong. The buffering isn't happening in the serializer at all, and JsonSerializer is perfectly capable of streaming individual items from an IEnumerable.

There _is_ a problem, though, in that JsonSerializer.Serialize, when passed an IAsyncResult, simply writes {} to the output stream.

This code illustrates this:

public interface IDataSource
{
  IEnumerable<string> GetData();
  IAsyncEnumerable<string> GetAsyncData();
}

[Route("/v1")]
public class TestController
{
  IDataSource _dataSource;

  [HttpGet("implicit")]
  public IEnumerable<string> ThisWillBufferTheEntireResultSet()
    => _dataSource.GetData();

  [HttpGet("broken")]
  public IAsyncEnumerable<string> ThisWillNotSerializeCorrectly()
    => _dataSource.GetAsyncData();

  [HttpGet("explicit")]
  public EnumerableDataResult ThisWillReturnDataAsItBecomesAvailableEvenThoughItIsNotAsync()
    => new EnumerableDataResult(_dataSource.GetData());

  [HttpGet("streaming")]
  public UnbufferedEnumerableDataResult ThisWillReturnDataItemsImmediatelyOneByOneWithNoBuffering()
    => new UnbufferedEnumerableDataResult(_dataSource.GetData());
}

class EnumerableDataResult : IActionResult
{
  IEnumerable<string> _data;
  public EnumerableDataResult(IEnumerable<string> data) { _data = data; }

  public async Task InvokeAsync(ActionContext context)
  {
    await JsonSerializer.SerializeAsync(context.HttpContext.Response.Body, _data, context.HttpContext.RequestAborted); // NB: last parameter here is important!
  }
}

class UnbufferedEnumerableDataResult : IActionResult
{
  IEnumerable<string> _data;
  public UnbufferedEnumerableDataResult (IEnumerable<string> data) { _data = data; }

  static readonly byte[] JSONArrayStart = new byte[] { (byte)'[' };
  static readonly byte[] JSONArraySeparator = new byte[] { (byte)',' };
  static readonly byte[] JSONArrayEnd = new byte[] { (byte)']' };

  public async Task InvokeAsync(ActionContext context)
  {
    // NB: uses of context.HttpContext.RequestAborted here are important!
    await context.HttpContext.Response.Body.WriteAsync(JSONArrayStart, context.HttpContext.RequestAborted);

    foreach (string item in _data)
    {
      await JsonSerializer.SerializeAsync(context.HttpContext.Response.Body, item, cancellationToken: context.HttpContext.RequestAborted);
      await context.HttpContext.Response.Body.WriteAsync(JSONArraySeparator, context.HttpContext.RequestAborted);

      await context.HttpContext.Response.Body.FlushAsync(context.HttpContext.RequestAborted);
    }

    await context.HttpContext.Response.Body.WriteAsync(JSONArrayEnd, context.HttpContext.RequestAborted);
  }
}

I don't believe that's actually true. I've tried it, and if I return an IAsyncEnumerable from EF Core on the mvc endpoint, it loads the entire thing in memory and then streams it to the client via the serializer.

If I have this:

public async Task> SomeMethod() {
return someEfCoreResult.AsAsyncEnumerable();
}

It should stream record by record through the pipeline as a streamed serialized result. It absolutely doesn't do anything of the sort right now, the serializer gets the ENTIRE resultset in memory and then serializes.

That's what this ticket is partially about. The other part is making the entire pipeline non-blocking so that it runs in it's own thread the entire way from ef core query to client receiving the data in a stream.

And it should do so without me having to jump through hoops in the action result method and do so based on the accepts header.

Oh? I tried passing IAsyncEnumerable into JsonSerializer and all it ever gives back is {}. Maybe ASP.NET Core has special handling in between, though, in the way that it is buffering the response. Again, the problem is ASP.NET Core buffering the response, _not_ the serialization infrastructure, and whether it is async or not is irrelevant.

Making something async doesn't make it run in its own thread. It is in fact exactly the opposite. Without async, the only pattern that is really available for concurrent execution is to dedicate a thread to the task. With async, the task is divorced from threads, so that it acquires a thread pool thread for each little bit of work it needs to do, immediately releasing it when the thread is blocking, and depending on the kernel to know how to schedule the continuation to execute when the blocking operation unblocks.

An async method that is forced to retrieve its data from a non-async IEnumerable will have the following characteristics:

  • It operates asynchronously whenever it is writing data.
  • When it is obtaining data from the data source, _for each individual record_, the thread obtained for that chunk of processing will block if necessary before returning.
  • No resource is hoarded for the duration of the entire enumeration process.

It is true that no blocking at all would be even better, and IAsyncEnumerable would mean that _for each of those individual record retrievals_, calls into EF that ended up blocking would temporarily release the thread to the thread pool.

But it is categorically not the case that using IEnumerable intrinsically means a thread will be tied
up for the entire duration. The problem arises specifically when you pass the IEnumerable (or IAsyncEnumerable) back up to the ASP.NET Core infrastructure and let it handle it, because _that_ is the point that buffers the entire result set before it starts sending anything.

I don't know the internals at all, so this could be entirely wrong, but my _speculation_ is that making the ASP.NET Core controller infrastructure stream its results out instead of buffering them would be a _major_ refactoring and not a simple change at all, requiring that control be inverted across different layers of the flow. Without that change, it is largely irrelevant whether IEnumerable or IAsyncEnumerable is used, because ASP.NET Core is going to transfer the entire enumeration into its own buffer before passing it on.

But what wouldn't be a major refactoring would be to have JsonSerializer support IAsyncEnumerable.

As I understand it, the example I showed with IActionResults is the canonical way to stream results from an ASP.NET Core controller.

@logiclrd is right that there’s 2 types of “async” happening here:

  1. Asynchronous IO (writing the data async)
  2. Producing each item asynchronously

ASP.NET is currently buffering to avoid having to implement custom array serialization, that should be in the JSON serializer (hence this issue).

I also agree that for EF it’s not as big a deal if results are produced “quickly” enough then there’s blocking only as each item is being produced not written to the response.

That’s not great for then general case of streaming though as it’ll block a thread while waiting for the next result to be produced.

I don't think ASP.NET is buffering only because of System.Text.Json not supporting IAsyncEnumerable, though. I will double-check this, but I'm pretty sure it has been my observation that ASP.NET also buffers the entire set when you return a plain IEnumerable from a controller.

Okay, I tested it and I was wrong. In my earlier testing, I just didn't wait long enough for it to fill the buffer. The test had a delay between objects returned, and that delay meant the data rate was low enough that it could run for quite some time without producing any output.

There _is in fact a serious bug_, though: If you return an IEnumerable that produces data indefinitely from your controller, then it appears that ASP.NET Core never stops reading from it. This is due to the fact that ASP.NET Core suppresses all write errors. I believe the fix for this is to supply HttpContext.RequestAborted as the cancellationToken parameter for JsonSerializer.SerializeAsync.

So, maybe the solution in https://github.com/aspnet/AspNetCore/pull/11118/files is wrong. If it used the adapter I showed in my first comment on this issue, then it could pull records from an IAsyncEnumerable<T> one at a time, while serializing. The adapter could be passed into JsonSerializer, which knows what to do with any IEnumerable<T>. This would effectively work around the fact that JsonSerializer doesn't know how to handle IAsyncEnumerable<T>, and could be torn out if/when System.Text.Json gets fixed upstream.

I initially arrived at this issue with ASP.NET Core-coloured glasses, and feel a bit foolish now, as it seems clear that this issue was in fact created for the upstream problem in System.Text.Json itself. The things I wrote were ultimately not technically wrong, but they were in response to misunderstanding the ticket's description in the first place.

I see that @steveharter self-assigned this issue back in November, but there haven't been further updates w.r.t. an implementation. I have done some experimentation locally and would like to consider submitting a PR to make System.Text.Json support serializing IAsyncEnumerable.

The preceding issue #1569 talks about deserializing IAsyncEnumerable values. The solution it proposes seems reasonable _except_ for doing it out of a Stream. I can't think of a way to do it without buffering, because you could have a data type such as:

public class Ariannus
{
  public IAsyncEnumerable<Person> BlackAndWhite { get; set; }
  public IAsyncEnumerable<Person> WhiteAndBlack { get; set; }
}

In order to stream the deserialization of this, the underlying data stream would need to be in two places at once, because there's no way to know if the caller wants BlackAndWhite first or WhiteAndBlack first -- or indeed interleaves the two. The situation is further complicated by the possibility of nesting.

I can't think of anything better than to make a List<T> that implements IAsyncEnumerable and just returns every call with a completed task wrapping already-deserialized data.

@steveharter \ @layomia would you mind pinging me once this feature is in? We'll need to teach MVC to take advantage of this feature once it lands.

For what it's worth, here's how I'm doing it in an ASP.NET Core where I stream large amounts of data (way too much for the default MaxIAsyncEnumerableBufferLimit, which I don't want to increase). If anyone has improvements or remarks, feel free to comment on it. I have not done any benchmarking (because it's not important to my project) but I didn't notice performance hits from this.

public class JsonStreamingResult<T> : IActionResult
{
    public object Data { get; }
    public JsonSerializerOptions SerializerOptions { get; set; }

    public JsonStreamingResult(IEnumerable<T> data)
    {
        Data = data ?? throw new ArgumentNullException(nameof(data));
    }

    public JsonStreamingResult(IAsyncEnumerable<T> data)
    {
        Data = data ?? throw new ArgumentNullException(nameof(data));
    }

    public Task ExecuteResultAsync(ActionContext context)
    {
        IJsonStreamingResultExecutor<T> executor = context.HttpContext.RequestServices.GetRequiredService<IJsonStreamingResultExecutor<T>>();

        return executor.ExecuteAsync(context, this);
    }
}

/// <summary>
/// Contains methods for creating instances of <see cref="JsonStreamingResult{T}"/>.
/// They are for convenience but also for anonymous types because there it's necessary
/// to infer the generic type since you can't specify it.
/// </summary>
public static class JsonStreamingResult
{
    public static JsonStreamingResult<T> Create<T>(IEnumerable<T> value, JsonSerializerOptions serializerOptions = null) => new JsonStreamingResult<T>(value)
    {
        SerializerOptions = serializerOptions
    };

    public static JsonStreamingResult<T> Create<T>(IAsyncEnumerable<T> value, JsonSerializerOptions serializerOptions = null) => new JsonStreamingResult<T>(value)
    {
        SerializerOptions = serializerOptions
    };
}

// This exists so an open generic type can be registered for automatic resolution
public interface IJsonStreamingResultExecutor<T> : IActionResultExecutor<JsonStreamingResult<T>>
{
}

internal class JsonStreamingResultExecutor<T> : IJsonStreamingResultExecutor<T>
{
    private const int BufferSizeThreshold = 4 * 1024 * 1024;

    public async Task ExecuteAsync(ActionContext context, JsonStreamingResult<T> result)
    {
        HttpResponse response = context.HttpContext.Response;
        response.StatusCode = (int)HttpStatusCode.OK;
        response.ContentType = $"{MediaTypeNames.Application.Json};charset={Encoding.UTF8.WebName}";

        // https://github.com/dotnet/runtime/issues/38055
        // Once that's implemented, this can be simplified and optimized a lot
        if (result.Data is IAsyncEnumerable<T> asyncEnumerable)
        {
            using MemoryStream memoryStream = new MemoryStream();
            Utf8JsonWriter writer = new Utf8JsonWriter(memoryStream);

            try
            {
                writer.WriteStartArray();

                await foreach (T value in asyncEnumerable)
                {
                    // There's no async overload that takes an Utf8JsonWriter
                    JsonSerializer.Serialize(writer, value, result.SerializerOptions);

                    if (memoryStream.Length >= BufferSizeThreshold)
                    {
                        memoryStream.Position = 0;
                        await memoryStream.CopyToAsync(response.Body, context.HttpContext.RequestAborted);
                        memoryStream.Position = 0;
                        memoryStream.SetLength(0);
                    }
                }

                writer.WriteEndArray();
            }
            finally
            {
                await writer.DisposeAsync().ConfigureAwait(false); // flushes the array-end as well
                memoryStream.Position = 0;
                await memoryStream.CopyToAsync(response.Body, context.HttpContext.RequestAborted);
            }
        }
        else
        {
            await JsonSerializer.SerializeAsync(response.Body, (IEnumerable<T>)result.Data, result.SerializerOptions).ConfigureAwait(false);
        }
    }
}

And in ConfigureServices:

services.AddSingleton(typeof(IJsonStreamingResultExecutor<>), typeof(JsonStreamingResultExecutor<>));

Now you can just return a JsonStreamingResult from your controller and it'll stream your data without loading everything into memory. Adjust the buffer size (BufferSizeThreshold) as you wish; 4mb seemed reasonable to me (and my use-case) but I didn't experiment with other values.

public IActionResult Get()
{
    var data = myContext.MyDataset
                        .Where(x.Foo > whatever)
                        .Select(x => new
                        {
                            x.Foo,
                            x.Bar
                        });

    return JsonStreamingResult.Create(data);
}

EDIT:

  • English is hard
  • You might be able to add some more .ConfigureAwait(false)s but I didn't test it so I'll leave it like it is.

In case anyone wanted to do it, here is how I am turning IAsyncEnumerable into Server-Sent-Events: using an ActionFilter.

internal class AsyncJsonActionFilter : IAsyncActionFilter
    {
        public async Task OnActionExecutionAsync(ActionExecutingContext context, ActionExecutionDelegate next)
        {
            var action = (context.ActionDescriptor as ControllerActionDescriptor);
            var returnType = action?.MethodInfo?.ReturnType;

            if (returnType.IsGenericType && returnType.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>))
            {
                var parameters = action.MethodInfo.GetParameters()
                    .Select(x =>
                    {
                        if (context.ActionArguments.TryGetValue(x.Name, out var p))
                        {
                            return p;
                        }

                        if (x.ParameterType.GetTypeInfo().IsValueType)
                        {
                            return Activator.CreateInstance(x.ParameterType);
                        }

                        return null;
                    }).ToArray();

                var result = action.MethodInfo.Invoke(context.Controller, parameters);

                MethodInfo method = typeof(AsyncJsonActionFilter)
                    .GetMethod(nameof(AsyncJsonActionFilter.WriteEventStream), BindingFlags.NonPublic | BindingFlags.Instance);

                MethodInfo generic = method.MakeGenericMethod(returnType.GetGenericArguments().First());
                generic.Invoke(this, new object[] { context.HttpContext.Response, result });
            }
            else
            {
                await next();
            }
        }

        private async Task WriteEventStream<T>(HttpResponse response, IAsyncEnumerable<T> stream)
        {
            response.StatusCode = 200;
            response.Headers.Add("Content-Type", "text/event-stream");

            await foreach (var item in stream)
            {
                var messageJson = JsonSerializer.Serialize(item);
                await response.WriteAsync($"data:{messageJson}\n\n");
                await response.Body.FlushAsync();
            }

            response.Body.Close();
        }
    }
Was this page helpful?
0 / 5 - 0 ratings

Related issues

omajid picture omajid  Â·  3Comments

Timovzl picture Timovzl  Â·  3Comments

nalywa picture nalywa  Â·  3Comments

GitAntoinee picture GitAntoinee  Â·  3Comments

bencz picture bencz  Â·  3Comments