Efcore: SaveChangesAsync exhausts connection pool

Created on 17 Nov 2020  路  10Comments  路  Source: dotnet/efcore

I have produced a small repo that reproduces the problem here:
https://github.com/FlukeFan/EfCoreBug_SaveChangesAsync/tree/master

If required, modify the connection string in DemoContext.cs (defaults to Server=.;Database=AsyncDemo;Trusted_Connection=True)

To replicate, run "dotnet test"

The test (which is run multiple times in parallel) is:

        [Fact]
        public async Task SaveChangesAsync_Fails()
        {
            int aId;

            using (var ctx = new DemoContext())
            {
                var entityA = ctx.AEntities.Add(new EntityA()).Entity;
                ctx.SaveChanges();
                aId = entityA.Id;

                ctx.BEntities.Add(new EntityB { EntityAId = aId });
                ctx.SaveChanges();

                var a = ctx.AEntities.Single(e => e.Id == aId);
                var bToRemove = ctx.BEntities.Where(e => e.EntityAId == a.Id).ToList();
                ctx.BEntities.RemoveRange(bToRemove);

                await Task.CompletedTask;

                await ctx.SaveChangesAsync(); // tests fail, but they pass with this commented out

                ctx.AEntities.Remove(a);
                ctx.SaveChanges();
            }

            using (var ctx = new DemoContext())
            {
                var queriedEntity = ctx.AEntities.SingleOrDefault(e => e.Id == aId);
                Assert.Null(queriedEntity);
            }
        }

Note, if the await ctx.SaveChangesAsync(); is removed, or replaced with ctx.SaveChanges(), then the test run passes.

SDK: 3.1.403
EF Core version: 3.1.10
Database provider: Microsoft.EntityFrameworkCore.SqlServer
Target framework: .NET Core 3.1
Operating system: Windows + SQL Server 2019 Developer Edition

closed-question customer-reported

Most helpful comment

First, re xunit behavior. Take a look at the following sample test code, which resembles your original code sample. Each test asynchronously does nothing for 1 second:

```c#
[assembly: CollectionBehavior(DisableTestParallelization = true)]
// [assembly: CollectionBehavior(MaxParallelThreads = 1)]

public class MyTest
{
[Fact]
public Task FooTest() => Task.Delay(1000);
}

public class F001 : MyTest { }
public class F002 : MyTest { }
public class F003 : MyTest { }
public class F004 : MyTest { }
public class F005 : MyTest { }
public class F006 : MyTest { }
public class F007 : MyTest { }
public class F008 : MyTest { }
public class F009 : MyTest { }
```

With DisableTestParallelization=true, running this takes 12 seconds, since tests aren't parallelized at all.

With MaxParallelThreads=1 - which sounds like it's the same thing - this runs in around 2.7 seconds (because of startup overhead etc.). However, if you change Task.Delay(1000) to Thread.Sleep(1000), which synchronously blocks the thread, the tests will now take a full 10 seconds. This shows the different xunit behavior around tests which block synchronously or yield asynchronously. The logic behind this, is that while a test is waiting for I/O (or otherwise waiting asynchronously), there's no reason to just let the CPU go idle, and another test can be executed.

All 10 comments

With the SaveChangesAsync() removed, or replaced with SaveChanges(), I get the output:

Test Run Successful.
Total tests: 131
     Passed: 131
 Total time: 3.9353 Seconds

Using SaveChangesAsync(), I get exceptions like this:

   System.InvalidOperationException : Timeout expired.  The timeout period elapsed prior to obtaining a connection from the pool.  This may have occurred because all
 pooled connections were in use and max pool size was reached.
  Stack Trace:
     at Microsoft.Data.Common.ADP.ExceptionWithStackTrace(Exception e)
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.EntityFrameworkCore.Storage.RelationalConnection.OpenDbConnectionAsync(Boolean errorsExpected, CancellationToken cancellationToken)
   at Microsoft.EntityFrameworkCore.Storage.RelationalConnection.OpenDbConnectionAsync(Boolean errorsExpected, CancellationToken cancellationToken)
   at Microsoft.EntityFrameworkCore.Storage.RelationalConnection.OpenAsync(CancellationToken cancellationToken, Boolean errorsExpected)
   at Microsoft.EntityFrameworkCore.Storage.RelationalConnection.BeginTransactionAsync(IsolationLevel isolationLevel, CancellationToken cancellationToken)
   at Microsoft.EntityFrameworkCore.Storage.RelationalConnection.BeginTransactionAsync(CancellationToken cancellationToken)
   at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable`1 commandBatches, IRelationalConnection connection, CancellationToken canc
ellationToken)
   at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable`1 commandBatches, IRelationalConnection connection, CancellationToken canc
ellationToken)
   at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.SaveChangesAsync(IList`1 entriesToSave, CancellationToken cancellationToken)
   at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.SaveChangesAsync(DbContext _, Boolean acceptAllChangesOnSuccess, CancellationToken cancellat
ionToken)
   at Microsoft.EntityFrameworkCore.SqlServer.Storage.Internal.SqlServerExecutionStrategy.ExecuteAsync[TState,TResult](TState state, Func`4 operation, Func`4 verifyS
ucceeded, CancellationToken cancellationToken)
   at Microsoft.EntityFrameworkCore.DbContext.SaveChangesAsync(Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken)
   at SaveChangesAsyncTests.SaveChangesAsyncTests.SaveChangesAsync_Fails() in C:\Work\oss\EfCoreBug_SaveChangesAsync\SaveChangesTests.cs:line 32
--- End of stack trace from previous location where exception was thrown ---

Test Run Failed.
Total tests: 131
     Passed: 30
     Failed: 101
 Total time: 4.2280 Minutes

This looks by-design - your test suite really is trying to open more than 100 connections in parallel, which may result in connection pool exhaustion. It's up to your application to throttle new database connections, or else to increase the timeout enough so that connection timeouts don't occur.

As to why SaveChangesAsync triggers a failure and synchronous SaveChanges doesn't, that's an artifact of how xunit manages test parallelization. Whenever a test yields asynchronously, xunit automatically runs another test, since the CPU is free as the first test is waiting for I/O; this means that all 130 of your tests get executed concurrently. If the test doesn't yield (because it's entirely synchronous), xunit will not continue running tests, and the exhaustion doesn't occur.

In other words, it should be very easy to reproduce pool exhaustion with SaveChanges simply by running 130 SaveChanges in parallel, e.g. by doing Task.Run 130 times rather than relying on xunit.

@roji I'm not sure your assumptions of how XUnit manages parallelization are correct.

Note, there is (was) an assembly attribute that ensures that XUnit only allows 2 tests to run at a single time (regardless of sync/async): [assembly: CollectionBehavior(MaxParallelThreads = 2)]

I have now replaced this with a semaphore with a count of 2:

        // ensure only 2 tests are running at any given time
        private static Semaphore _semaphore = new Semaphore(2, 2);

        [Fact]
        public async Task SaveChangesAsync_Fails()
        {
            int aId;
            var semaphoreAcquired = false;

            try
            {
                semaphoreAcquired = _semaphore.WaitOne(TimeSpan.FromSeconds(10));
                Assert.True(semaphoreAcquired);
...
            }
            finally
            {
                if (semaphoreAcquired)
                    _semaphore.Release();
            }

... and EF Core still exhibits the same behaviour. The semaphore ensures there are only two connections/tests running in parallel at any given time.

Note, there is (was) an assembly attribute that ensures that XUnit only allows 2 tests to run at a single time (regardless of sync/async): [assembly: CollectionBehavior(MaxParallelThreads = 2)]

Yep - but crucial point is what it means for a test to be running. When your test calls SaveChanges, and that takes 10 seconds, your test is running (since the thread is blocked). When your test calls SaveChangesAsync instead, the thread yields, and xunit will run another test, since for 10 seconds your test is no longer running in the sense of using CPU.

If you want to submit the full sample with the semaphore, I can take a look. However, may I suggest simply writing your code as a console program, and launching tasks yourself with Task.Run, to cut xunit of the discussion? That's as trivial as something like:

c# class Program { public static async Task Main() { await Task.WhenAll(Enumerable(0, 150).Select(i => Task.Run(() => { // code })); } }

... When your test calls SaveChangesAsync ... the thread yields ... and xunit will run another test....

I believe the [assembly: CollectionBehavior(MaxParallelThreads = 2)] prevents XUnit from starting more than 2 tests concurrently.

I have updated the code: https://github.com/FlukeFan/EfCoreBug_SaveChangesAsync/tree/master

It now uses the semaphore, and no XUnit. I parameterized the call to the await:

If you run dotnet run skipAwait it completes successfully.
If you run dotnet run it locks up threads.

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace SaveChangesAsyncTests
{
    static class Program
    {
        // ensure only 2 threads are running at any given time
        private static Semaphore _semaphore = new Semaphore(2, 2);

        static async Task Main(string[] args)
        {
            var skipAwait = args.Length == 1 && args[0].Equals("skipAwait", StringComparison.OrdinalIgnoreCase);

            await Task.WhenAll(Enumerable.Range(0, 150).Select(i => Task.Run(async () =>
            {
                int aId;
                var semaphoreAcquired = false;

                try
                {
                    semaphoreAcquired = _semaphore.WaitOne(TimeSpan.FromSeconds(10));

                    if (!semaphoreAcquired)
                        throw new Exception("Semaphore not acquired");

                    using (var ctx = new DemoContext())
                    {
                        var entityA = ctx.AEntities.Add(new EntityA()).Entity;
                        ctx.SaveChanges();
                        aId = entityA.Id;

                        ctx.BEntities.Add(new EntityB { EntityAId = aId });
                        ctx.SaveChanges();

                        var a = ctx.AEntities.Single(e => e.Id == aId);
                        var bToRemove = ctx.BEntities.Where(e => e.EntityAId == a.Id).ToList();
                        ctx.BEntities.RemoveRange(bToRemove);

                        await Task.CompletedTask;

                        if (!skipAwait)
                            await ctx.SaveChangesAsync(); // passes with this commented out

                        ctx.AEntities.Remove(a);
                        ctx.SaveChanges();
                    }

                    using (var ctx = new DemoContext())
                    {
                        var queriedEntity = ctx.AEntities.SingleOrDefault(e => e.Id == aId);

                        if (queriedEntity != null)
                            throw new Exception("Entity not deleted");
                    }
                }
                finally
                {
                    if (semaphoreAcquired)
                        _semaphore.Release();
                }
            })));

            Console.WriteLine("Success");
        }
    }
}

Model:

using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;

namespace SaveChangesAsyncTests
{
    public class EntityA
    {
        [Key]
        [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
        public int Id { get; set; }
    }

    public class EntityB
    {
        [Key]
        [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
        public int Id { get; set; }

        public int EntityAId { get; set; }

        public virtual EntityA EntityA { get; set; }
    }
}

Context:

using System.Linq;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;

namespace SaveChangesAsyncTests
{
    public class DemoContext : DbContext
    {
        private static object _lock = new object();
        private static bool _created;

        public DemoContext()
        {
            if (!_created)
                lock (_lock)
                    if (!_created)
                    {
                        Database.EnsureDeleted();
                        Database.EnsureCreated();
                        _created = true;
                    }
        }

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            base.OnConfiguring(optionsBuilder);

            var cn = new SqlConnectionStringBuilder
            {
                ConnectionString = "Server=.;Database=AsyncDemo;Trusted_Connection=True",
                ConnectTimeout = 5, // so tests fail quicker
                MaxPoolSize = 30,
            }.ConnectionString;

            optionsBuilder.UseSqlServer(cn, o => o.CommandTimeout(5));
        }

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            base.OnModelCreating(modelBuilder);

            // turn off cascading deletes
            foreach (var relationship in modelBuilder.Model.GetEntityTypes().SelectMany(e => e.GetForeignKeys()))
                relationship.DeleteBehavior = DeleteBehavior.Restrict;
        }

        public DbSet<EntityA> AEntities { get; set; }
        public DbSet<EntityB> BEntities { get; set; }
    }
}

First, re xunit behavior. Take a look at the following sample test code, which resembles your original code sample. Each test asynchronously does nothing for 1 second:

```c#
[assembly: CollectionBehavior(DisableTestParallelization = true)]
// [assembly: CollectionBehavior(MaxParallelThreads = 1)]

public class MyTest
{
[Fact]
public Task FooTest() => Task.Delay(1000);
}

public class F001 : MyTest { }
public class F002 : MyTest { }
public class F003 : MyTest { }
public class F004 : MyTest { }
public class F005 : MyTest { }
public class F006 : MyTest { }
public class F007 : MyTest { }
public class F008 : MyTest { }
public class F009 : MyTest { }
```

With DisableTestParallelization=true, running this takes 12 seconds, since tests aren't parallelized at all.

With MaxParallelThreads=1 - which sounds like it's the same thing - this runs in around 2.7 seconds (because of startup overhead etc.). However, if you change Task.Delay(1000) to Thread.Sleep(1000), which synchronously blocks the thread, the tests will now take a full 10 seconds. This shows the different xunit behavior around tests which block synchronously or yield asynchronously. The logic behind this, is that while a test is waiting for I/O (or otherwise waiting asynchronously), there's no reason to just let the CPU go idle, and another test can be executed.

tl;dr your code sample with the semaphore is hanging because you are synchronously blocking thread pool threads, so none are available to handle the continuation code after SaveChangesAsync returns.

When you do Task.Run, you are handing off code to run in the .NET thread pool, which is a limited, global resource. In addition, every time you do an await, the code afterwards is again scheduled to run on that same thread pool. What you program does is:

  • Queue up 150 tasks to be executed in the thread pool.
  • The first two start executing, acquire the semaphore, and then reach SaveChangesAsync and yield
  • At the same time, the remaining thread-pool threads (there are about 10 at program startup IIRC) also start executing your code, and immediately block synchronously on the semaphore.
  • When your first two tasks complete, there are no TP threads to run the continuations, so you are in a pseudo-deadlock; the first 2 tasks are waiting for a free TP thread, but all TP threads are synchronously blocked on the semaphore, which must first be released by the first 2 tasks.
  • At that point the TP detects that it's starved, and enters "hill climbing" - it will slowly spawn new threads, around 1 per second, and each one of those again gets blocked on the semaphore (because tasks are queued to the TP in FIFO).

Attached below is your same code, except that I've changed all synchronous blocking to async. Note that this meant switching from Semaphore (which is seldom-used) to SemaphoreSlim, which also supports WaitAsync.

Unfortunately, async isn't a trivial thing :) But a great rule of thumb is to never mix sync and async (i.e. be 100% async or 100% sync, preferably the former), and never block thread-pool threads.


Full code sample which works

```c#
using System;
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;

namespace SaveChangesAsyncTests
{
static class Program
{
// ensure only 2 threads are running at any given time
private static SemaphoreSlim _semaphore = new SemaphoreSlim(2, 2);

    static async Task Main(string[] args)
    {
        Console.WriteLine("Starting");
        var skipAwait = args.Length == 1 && args[0].Equals("skipAwait", StringComparison.OrdinalIgnoreCase);

        await Task.WhenAll(Enumerable.Range(0, 150).Select(i => Task.Run(async () =>
        {
            Console.WriteLine($"{i} starting");
            int aId;

            // semaphoreAcquired = _semaphore.WaitOne(TimeSpan.FromSeconds(10));
            var semaphoreAcquired = await _semaphore.WaitAsync(TimeSpan.FromSeconds(10));
            if (!semaphoreAcquired)
                throw new Exception("Semaphore not acquired");

            Console.WriteLine($"{i} got lock");

            try
            {

                using (var ctx = new DemoContext())
                {
                    var entityA = ctx.AEntities.Add(new EntityA()).Entity;
                    await ctx.SaveChangesAsync();
                    aId = entityA.Id;

                    ctx.BEntities.Add(new EntityB { EntityAId = aId });
                    await ctx.SaveChangesAsync();

                    var a = await ctx.AEntities.SingleAsync(e => e.Id == aId);
                    var bToRemove = await ctx.BEntities.Where(e => e.EntityAId == a.Id).ToListAsync();
                    ctx.BEntities.RemoveRange(bToRemove);

                    await Task.CompletedTask;

                    if (!skipAwait)
                        await ctx.SaveChangesAsync();

                    Console.WriteLine($"{i} out of SaveChangesAsync");

                    ctx.AEntities.Remove(a);
                    await ctx.SaveChangesAsync();
                }

                using (var ctx = new DemoContext())
                {
                    var queriedEntity = await ctx.AEntities.SingleOrDefaultAsync(e => e.Id == aId);

                    if (queriedEntity != null)
                        throw new Exception("Entity not deleted");
                }
            }
            finally
            {
                if (semaphoreAcquired)
                    _semaphore.Release();
            }
        })));

        Console.WriteLine("Success");
    }
}

}

namespace SaveChangesAsyncTests
{
public class DemoContext : DbContext
{
private static object _lock = new object();
private static bool _created;

    public DemoContext()
    {
        if (!_created)
            lock (_lock)
                if (!_created)
                {
                    Database.EnsureDeleted();
                    Database.EnsureCreated();
                    _created = true;
                }
    }

    protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
    {
        base.OnConfiguring(optionsBuilder);

        var cn = new SqlConnectionStringBuilder
        {
            ConnectionString = "Server=localhost;Database=test;User=SA;Password=Abcd5678;Connect Timeout=60;ConnectRetryCount=0",
            ConnectTimeout = 5, // so tests fail quicker
            MaxPoolSize = 30,
        }.ConnectionString;

        optionsBuilder.UseSqlServer(cn, o => o.CommandTimeout(5));
    }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        base.OnModelCreating(modelBuilder);

        // turn off cascading deletes
        foreach (var relationship in modelBuilder.Model.GetEntityTypes().SelectMany(e => e.GetForeignKeys()))
            relationship.DeleteBehavior = DeleteBehavior.Restrict;
    }

    public DbSet<EntityA> AEntities { get; set; }
    public DbSet<EntityB> BEntities { get; set; }
}

}

namespace SaveChangesAsyncTests
{
public class EntityA
{
[Key]
[DatabaseGenerated(DatabaseGeneratedOption.Identity)]
public int Id { get; set; }
}

public class EntityB
{
    [Key]
    [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
    public int Id { get; set; }

    public int EntityAId { get; set; }

    public virtual EntityA EntityA { get; set; }
}

}
```

re XUnit behaviour:

I see now - my understanding was wrong. Thanks for the update.

Unfortunately, async isn't a trivial thing :)

I see the problem with the Semaphore; I'm currently investigating reproducting the problem I've had without XUnit, Thanks.

I can't manage to reproduce the problem I was seeing. We still have a problem in our codebase somewhere, but I'm back to square one on finding it though.

I'm sorry to have wasted your time on this.

Thanks for your time and patience with me, and specifically thanks for explaining the XUnit threading behaviour to me.

No problem @FlukeFan, good luck finding your issue...

Was this page helpful?
0 / 5 - 0 ratings