ConcurrentQueue<T> is an unbounded, thread-safe queue, where the primary operations are Enqueue and TryDequeue. It's one of the more valuable concurrent collection types. However, unbounded collections aren't always desirable. For example, consider wanting to use a concurrent queue for object pooling. If you want to ensure you don't store more than N objects, this is difficult or impossible to achieve efficiently with ConcurrentQueue<T>, which will automatically grow its storage to store the item being added if there's insufficient room.
ConcurrentQueue<T>, however, is actually implemented as a wrapper for a bounded queue, internally called ConcurrentQueue<T>.Segment.
https://github.com/dotnet/corefx/blob/9c468a08151402a68732c784b0502437b808df9f/src/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentQueue.cs#L820
In essence, ConcurrentQueue<T>.Segment provides the bounded queue behavior, and ConcurrentQueue<T> layers on top of that unbounded semantics.
We should clean up the Segment APIs and expose it as
```C#
namespace System.Collections.Concurrent
{
public sealed class BoundedConcurrentQueue
{
public BoundedConcurrentQueue(int capacity); // capacity must a power of 2 that's >= 2
public int Capacity { get; }
public int Count { get; }
public bool IsEmpty { get; }
public bool TryEnqueue(T item);
public bool TryDequeue(out T item);
}
}
```
The current implementation is fast, and is able to achieve that speed in part by eschewing some functionality that would weigh it down non-trivially, e.g. enumeration. That's why it doesn't implement any interfaces like IEnumerable<T> or IReadOnlyCollection<T> that would force us to add such behavior and slow it down. This collection would be very specialized and used purely for its ability to have items enqueued and dequeued quickly. (Methods like TryPeek, ToArray, CopyTo, GetEnumerator, etc., all require the ability to look at data in the queue without removing it, and in the current implementation, that requires marking the segment as "preserving for observation", which means nothing will ever be removed from the queue; this has the effect of continuing to allow enqueues until the segment is full, but since dequeues don't end up removing data, at that point nothing further can be enqueued, even if everything is dequeued. ConcurrentQueue<T> deals with this simply by creating a new segment, but that doesn't work for the segment itself.)
EDIT @stephentoub 7/6/2018: See alternate proposal at https://github.com/dotnet/corefx/issues/24365#issuecomment-403074379.
cc: @kouvel, @tarekgh, @benaadams
For performance it would be desirable for boundedCapacity to be a power of 2 (to use & rather than %).
Would it be acceptable to have the capacity as:
(int minCapacity) // Will be rounded up to nearest power of two
Yes, we would want either to require the value be a power of 2 or be allowed to round it up to one. And I think it's best to require it, as that keeps the doors open in the future to allow for other sizes to be supported.
This is a vary desirable type for Object pooling (rather than array pooling)
One candidate use is "Improve performance of Immutable/SecureObjectPool/AllocFreeConcurrentStack" https://github.com/dotnet/corefx/issues/24337
However; testing wise it would be better to be a more generic type and have specific tests rather than be part of Immutable.
Other candidates:
Also the ObjectPools in aspnet https://github.com/aspnet/Common/tree/dev/src/Microsoft.Extensions.ObjectPool
System.Reflection.Internal.ObjectPool
https://github.com/dotnet/corefx/blob/master/src/System.Reflection.Metadata/src/System/Reflection/Internal/Utilities/ObjectPool%601.cs
System.Net.Http.HttpConnectionPool
https://github.com/dotnet/corefx/blob/master/src/System.Net.Http/src/System/Net/Http/Managed/HttpConnectionPool.cs
System.Data.ProviderBase.DbConnectionPool
https://github.com/dotnet/corefx/blob/master/src/System.Data.SqlClient/src/System/Data/ProviderBase/DbConnectionPool.cs
Also items like SocketAsyncEventArgs docs suggest using a pool for them
Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool.
...
Reuse the context for another operation, put it back in the pool, or discard it.
However there is no easy way to do a high performance bounded pool in the framework (hence all the different implementations within the framework itself)
Note: the applications for a bounded non-blocking concurrent queue are much bigger than object pooling; its just a common pattern; and its very easy to put an object pool on top (with other semantics like auto-create when empty, clear etc)
What about implementing IProducerConsumerCollection<T>? This type would seemingly fit that interface, except then it would also have to implement IEnumerable<T> and ICollection.
What would
void ICollection.Add(T item);
Do? Throw when full?
Can an interface be inserted into an inheritance chain and be binary compat?
interface IBoundedCollection<T>
{
bool TryAdd(T item);
bool TryTake(out T item);
}
interface IProducerConsumerCollection<T> : IBoundedCollection<T>, IEnumerable<T>, ICollection
{
void CopyTo(T[] array, int index);
T[] ToArray();
}
I assume not; else the IReadOnlyList and IReadOnlyCollection were put in wrong place
@benaadams
What would
void ICollection.Add(T item);do?
IProducerConsumerCollection<T> implements System.Collections.ICollection, but not System.Collections.Generic.ICollection<T>, so there is no Add.
This type would seemingly fit that interface
Yes, but in addition to IEnumerable being problematic, CopyTo and ToArray are problematic for the same reasons.
@stephentoub do we need to expose it as a new type? or we can just add the bounding functionality to ConcurrentQueue
```C#
namespace System.Collections.Concurrent
{
public sealed class ConcurrentQueue
{
public ConcurrentQueue(int capacity); // capacity must a power of 2 that's >= 2
public int Capacity { get; }
...
}
}
```
do we need to expose it as a new type? or we can just add the bounding functionality to ConcurrentQueue?
A few concerns:
@stephentoub I am not pushing back on the new type but trying to evaluate if it is really needed. is it possible we can have a static method on ConcurrentQueue
ConcurrentQueue exposes Enqueue rather than TryEnqueue... what's the behavior of Enqueue if the queue is full?
I think we are defining a new behavior so I think it is ok to throw at that time. This is similar to the pattern we have in other classes anyway.
My only point in having a new type is, developers, have to treat BoundedConcurrentQueue and ConcurrentQueue as 2 different things while they are really same thing but with different settings or constraints.
trying to evaluate if it is really needed
That's a good thing to do :smile:
is it possible we can have a static method on ConcurrentQueue (e.g. CreateBoundedQueue) and this method will just return an internal instance of BoundedConcurrentQueue (considering it is subclassed of ConcurrentQueue)
That would require the relevant methods to be made virtual, which can add non-trivial expense.
have to treat BoundedConcurrentQueue and ConcurrentQueue as 2 different things while they are really same thing but with different settings or constraints.
Well, if a core method like Enqueue throws on one but not the other, developers have to do two different things.
That would require the relevant methods to be made virtual, which can add non-trivial expense.
agree. I heard the jit is trying to optimize on some specific cases or patterns of the virtual calls. If this is true, we can write put implementation with the pattern that the Jit optimize it.
Well, if a core method like Enqueue throws on one but not the other, developers have to do two different things.
Usually, devs can avoid that by just using TryEnque. so it is up to the dev to decide what works better in their scenario.
I heard the jit is trying to optimize on some specific cases or patterns of the virtual calls. If this is true, we can write put implementation with the pattern that the Jit optimize it.
If you have a method DoWork(ConcurrentQueue<T> queue) that operates on that queue, unless DoWork gets inlined and the call site was able to determine the concrete type of the queue, there's no way the JIT could help avoid the virtuals here.
I'm also not understanding why adding such a factory is better in this case. 90% of the surface area of the type would need to throw for the special case, code would need to know how the type it's handed was constructed / what its constraint is, none of the interface implementations would work, etc. What is the benefit over exposing a new type?
What is the benefit overexposing a new type?
The only reason is what I mentioned which is devs don't have to use 2 types which really doing exact same things. Imagine someone wants to write a method which takes the queue as a parameter. having 2 types means they need to provide 2 methods doing exactly same things.
If you think having a factory method on the ConcurrentQueue will complicate the implementation and the behavior, then I am fine with 2 types at that time.
Is it possible to make educated guess what performance gain could be achived (or even simple perf test using internal ConcurrentQueue.Segment)?
@4creators current issue is you can't create a bounded queue with ConcurrentQueue; calling .Count rather than .IsEmpty causes an allocation; and also you'd have a race condition between .Count and .Enqueue.
In order to get round that you need to wrap it in another structure that does interlocks; but then you are getting a performance hit for the extra overhead - whereas its already behaving in the desired way in the internal Segment without a race or extra overhead.
One case for which I had to create a bounded concurrent queue before was when I had multiple producers and a single consumer that was transferring results from producers over the network. Producers had to be throttled such that they don't cause too much memory to be used while the sole consumer is lagging behind. Would it be beneficial to add Enqueue and Dequeue to the API that block upon queue full / queue empty respectively?
For that particular case it may still be benefical to have a different implementation, as it may be possible to have more efficient Dequeue in some cases since there is only one consumer.
Another requirement for that particular scenario was to bound the queue based on an estimation of memory usage rather than number of items, throttling on some memory usage bound. Would it be useful to allow some way for the user to determine the bound?
@kouvel
Would it be beneficial to add
EnqueueandDequeueto the API that block upon queue full / queue empty respectively?
Can't you use BlockingCollection<T> for that already?
Would it be beneficial to add Enqueue and Dequeue to the API that block upon queue full / queue empty respectively?
This is what the Channel.CreateBounded<T> API from the channels library on corefxlab that's moving to corefx lab provides, or at least the asynchronous version. BlockingCollection<T> already provides that for synchronous.
an estimation of memory usage rather than number of items
Would that really need to be built-in? I'd think such an estimation could just be accomplished by the user and translated by the user into a count. This implementation would likely suffer significantly in perf if it had to do anything more per operation than it's already doing.
Can't you use BlockingCollection
for that already?
I'm guessing it doesn't allow concurrent enqueues, in which case it would require a lock
Channel.CreateBounded
I see, I'll take a look
I'd think such an estimation could just be accomplished by the user and translated by the user into a count.
The capacity would have to be guessed up-front, some items may be much larger than others so if the queue contains a bunch of large items I may want to bound it sooner, and if it contains a bunch of small items I may want to allow more items in the queue before bounding it.
This implementation would likely suffer significantly in perf if it had to do anything more per operation than it's already doing.
Yea probably not worth folding that into this implementation
I'm guessing it doesn't allow concurrent enqueues, in which case it would require a lock
It does support concurrent enqueues, and it does use a lock, though I think there are ways around that. One of the things I'd like help with in the channels lib is improving the perf of the implementation, including removing some of the locking that's currently being done, e.g. currently the channel implementations all lock on enqueue (most avoid that on dequeue), but we should be able to avoid that in many cases.
The capacity would have to be guessed up-front, some items may be much larger than others so if the queue contains a bunch of large items I may want to bound it sooner, and if it contains a bunch of small items I may want to allow more items in the queue before bounding it.
You're saying you'd make the bound dynamic? How would you do that while still keeping the queue implementation fast?
@kouvel
Can't you use BlockingCollection for that already?
I'm guessing it doesn't allow concurrent enqueues, in which case it would require a lock
From the docs:
The
Disposemethod is not thread-safe. All other public and protected members ofBlockingCollection<T>are thread-safe and may be used concurrently from multiple threads.
So, concurrent enqueues are definitely allowed.
It does support concurrent enqueues
I see. BlockingCollection then seems to be very similar to BoundedConcurrentQueue in the functionality it provides. I guess it's not possible to have one implementation that does both because of the interfaces that BlockingCollection implements.
You're saying you'd make the bound dynamic? How would you do that while still keeping the queue implementation fast?
The bound could be based on the sum of opaque values that are passed in to Enqueue, which could for instance represent the size of the item being enqueued.
BoundedChannel appears to be something close to what I was looking for.
BoundedChannel appears to be something close to what I was looking for.
I think there's more flexibility in that API. Note that a bounded channel is created from a BoundedChannelOptions, so if there's something other than a count you think should be provided there, let's chat about it. I currently have https://github.com/dotnet/corefxlab/pull/1805 out to revise the API surface area based on the API review we did.
Another random thought, if the main use case for BoundedConcurrentQueue is object pooling, where I imagine the order in which items are removed from the queue doesn't matter, why not call it ObjectPool? If we feel that the queue behavior is beneficial there could also be BoundedConcurrentQueue and if we further feel that they could benefit from the same implementation, that could be the case behind the scenes.
Is it possible to implement an ObjectPool which has less restrictions to be more efficient than a BoundedConcurrentQueue?
why not call it ObjectPool?
I think we do need to invest in an ObjectPool<T>, but I don't know that it has the same surface area / implementation as this BoundedConcurrentQueue<T> would. Object pooling often requires varying policies, one of which could be the super simple policy effectively implemented by this queue type, but it's not the only one. I don't think we want to burn the ObjectPool<T> name on this.
That said, it's certainly possible there's a better name than the one I proposed.
Is it possible to implement an ObjectPool which has less restrictions to be more efficient than a BoundedConcurrentQueue?
I think it depends on the scenario. For example, if you have a bunch of threads that are all just taking and returning objects, and are doing so at fairly consistent rates, the existing ConcurrentBag<T> can actually be a more efficient pool (after https://github.com/dotnet/corefx/pull/14254).
You might also not want to throw objects away, again depending on your scenario, in which case you might prefer ConcurrentQueue<T> (or ConcurrentBag<T>) over this. In theory ConcurrentStack<T> would also be good, and in fact it's used in coreclr/corefx for some object pooling today, but it suffers in its current implementation from an allocation per push, which means it's really only useful as a pool for super expensive objects.
If I need the queue behavior and I call TryEnqueue and it fails because the queue is full, what could I do as the user? I would like to block until the queue has space (if I require enqueuing the item), but otherwise, I would have to not care that I cannot enqueue something or just spin until it can be enqueued. Is that a realistic scenario outside of ObjectPool?
Forgot about ConcurrentBag, when would one use BoundedConcurrentQueue over ConcurrentBag (assuming ConcurrentBag's implementation is ideal)?
I guess ConcurrentBag is not bounded. But when would this type be more beneficial than a theoretical BoundedConcurrentBag / ObjectPool?
I would have to not care that I cannot enqueue something or just spin until it can be enqueued
Or process the item yourself, e.g. if this is a producer/consumer scenario and the queue is being used to hand off data from producer to consumer and the producer has charged ahead so far that the consumers can't keep up, the producer might be able to process the item itself (temporarily switch to being a consumer).
Or dequeue an item to make room for the new item, either dropping it (e.g. imagine you were processing stock ticks and you only cared to have the most recent ones) or processing it itself as in the above case.
Is that a realistic scenario outside of ObjectPool?
Yes. Object pooling is just one example of a scenario where you're ok with some data being lost, but there are others.
when would one use BoundedConcurrentQueue over ConcurrentBag (assuming ConcurrentBag's implementation is ideal)?
Ideal for what? Different situations demand different implementations. If the producers are different threads from the consumers, for example, ConcurrentBag<T> is a bad choice, as every take will be forced to steal, making it very expensive.
I guess ConcurrentBag is not bounded.
Right
Is the concern being expressed here about exposing such functionality at all? Or about the name of the type being chosen? We could even choose to expose it as it is now:
C#
namespace System.Collections.Concurrent
{
public class ConcurrentQueue<T>
{
public class Segment { ... }
}
}
I see the main upside of that being it highlights the nicheness of the type. The main downsides I see are that a) it can't be released out-of-band, as it's modifying an existing type in corelib, and b) it might result in a strange design if we ever changed ConcurrentQueue<T>s implementation again in the future.
Or process the item yourself
Or dequeue an item to make room for the new item
Yea that's a good point, I suppose producers could act as consumers temporarily and that could serve as a decent throttling mechanism in some cases.
Is the concern being expressed here about exposing such functionality at all? Or about the name of the type being chosen?
My main concern was that I didn't understand the use case. Based on above, I guess one scenario is that I need queue behavior, there are multiple producers and consumers, and producers are willing to act as consumers when the queue is full. I don't know how realistic that scenario is, but I agree that would be something this type could provide that doesn't overlap with other existing or potential future types.
@stephentoub Regarding naming the type ConcurrentQueue<T>.Segment: I'm fine with hiding niche types and making it a nested type is one way to do that. What I don't like is that the name does not describe what the type does or how people are supposed to use it, it describes how the current implementation of ConcurrentQueue<T> uses it.
BlockingCollection then seems to be very similar to BoundedConcurrentQueue in the functionality it provides. I guess it's not possible to have one implementation that does both because of the interfaces that BlockingCollection implements.
BlockingCollection is a meta type on top of IProducerConsumerCollection<T> that uses ConcurrentQueue<T> as its default backing it does extra work on top to add bounding (including 2 semaphores); so its increasing the complexity for bounding rather than reducing it (though it provides additional functionality and can work on any IProducerConsumerCollection); and ConcurrentQueue is built on top of the BoundedConcurrentQueue - so its two extra datastructures to return to the behaviour of the underlying one.
one scenario... there are multiple producers and consumers, and producers are willing to act as consumers when the queue is full
A shared Queue is generally a good data structure for pooling over Stack as the add and remove don't contend as much; its more controlled than [ThreadStatic] though that has advantages also for contention; but isn't good when one thread is getting from pool and a different thread is putting back.
Another scenario is non-blocking queuing; without backlog getting too big per queue (as an inverse to what the threadpool does)
For example N threads with bounded local queues; global unbounded overflow queue
New task
=> if global queue is not empty (overflow state) -> Enqueue to global queue
=> else round robin TryEnqueue to a local queue
==> if full round robin to next and TryEnqueue
==> if all full -> Enqueue to global queue
Thread processing
=> TryDequeue from local queue
=> Else TryDequeue global
=> Else round robin TryDequeue (steal)
@benaadams that makes sense, thanks
@stephentoub @KrzysztofCwalina
ConcurrentQueue<T> is a general purpose multiple readers/multiple writers unbounded queue - safe and convenient combination of properties.
BoundedConcurrentQueue<T> is a lower level thing, exposed publically for its performance characteristics. If there is a chance that other kinds of queues - like single producer/single consumer one - get into CoreFx, it may be worth creating a new namespace for them now.
If the addition of an Enqueue which overwrites the oldest and also moves the head on by one wouldn't be too much of a stretch? Then it would be a CircularBuffer<T> https://github.com/dotnet/corefx/issues/3126 ?
e.g. TryEnqueue doesn't write when full; Enqueue "removes" oldest and adds newest to end when full
If the addition of an Enqueue which overwrites the oldest and also moves the head on by one wouldn't be too much of a stretch?
With the current implementation you would have tearing problems.
With the current implementation you would have tearing problems.
Probably should be two different data types then.
As likely wouldn't (shouldn't) be using both Enqueue and TryEnqueue on the same instance - and it would add complication (like stream with blocking and async that interact - though likely only used in one mode)
Should they be related naming-wise?
@stephentoub should we mark this one as ready for review?
FYI: The API review discussion was recorded - see https://youtu.be/BI3iXFT8H7E?t=6317 (5 min duration)
Conclusion: We want to wait on @stephentoub in the API review meeting (Feb).
This isn't blocked anymore?
Correct, @stephentoub is back, it is unblocked :)
@karelz is this approved now?
Nope, it was not critical for 2.1, so we didn't prioritize it.
@stephentoub: Can we make this a new ctor on the existing ConcurrentQueue<T> type?
Can we make this a new ctor on the existing
ConcurrentQueue<T>?
We could. Here's what the implementation would likely look like:
https://github.com/stephentoub/coreclr/commit/186dc3e3339306cbdb04d184fc18a92168324aae
If we went this route, the proposed additional surface area for the existing ConcurrentQueue<T> would be:
C#
public class ConcurrentQueue<T>
{
public ConcurrentQueue(int boundedCapacity);
public int? BoundedCapacity { get; }
public bool TryEnqueue(T item);
...
}
and there would be an impact on some existing surface area when constructed with this new constructor:
IProducerConsumerCollection<T>.TryAdd would return false if there was no space available. This is in contrast to today where it just calls Enqueue and thus always returns true.Enqueue on a full queue would spin until the item could be added (the alternative would be to throw if we don't want to spin).ToArray, CopyTo, ICollection.CopyTo, GetEnumerator, and IEnumerable.GetEnumerator would all throw InvalidOperationException.I'm fine with this approach if that's the preferred way to go.
Personally, I'm fine with the first two bullet points but I don't understand the last. Could you elaborate? Reading the issue description it sounds like peeking could result in marking all data as "preserve" which would block dequeues from removing data and thus could result in a state where enqueues would fail and the collection is stuck. Is copying not an option?
There are also some concerns about the second bullet point although I find this acceptable as callers have to opt-in.
I don't understand the last. Could you elaborate?
You mean the bullet about ToArray/CopyTo/GetEnumerator/etc.? ConcurrentQueue is implemented as a linked list of circular buffers. If a segment fills up, a new segment is allocated and linked from the previous one, at which point no enqueueing happens into the previous one, only into the new one, and the old one will go away as soon as all items are dequeued from it and no one still has a reference to it.
The bounding feature is implemented by only having a single segment, ever, such that if the segment fills up, rather than creating a new one, it simply says "it's full, you can't enqueue", such that TryEnqueue returns false. If you could have multiple segments at the same time, then you don't have a good way to enforce the bound that sized the original segment. The moment the second segment is created, you've now got a bunch of items in the first segment, and any enqueuer can come along and enqueue to the second segment, so you've just increased the capacity / bound artificially and incorrectly.
Certain operations also cause a segment to no longer be viable for adds. For example, ConcurrentQueue supports enumerating the contents of the collection, and it does so with snapshot semantics, meaning the moment you call GetEnumerator, you're effectively creating a moment-in-time view of what was in the ConcurrentQueue... subsequent dequeues and enqueues will not affect what's returned from the enumerator. That means that a dequeue can't allow the element being dequeued to be overwritten if there's an active enumerator, because that enumerator will need to be able to get the data from the relevant slot at effectively any time in the future. Which means that once you start enumerating (which includes ToArray, CopyTo, etc.), the segments in the queue become blocked from any future enqueues. In a normal ConcurrentQueue, that's fine, subsequent enqueues will just create a new segment. But in a bounded queue where you're limited to a single segment and can't create additional segments, you're stuck. So, either a) you completely change the implementation and make it much slower to be able to handle this (e.g. all enqueues/dequeues are blocked while any thread is enumerating the queue), b) add additional synchronization to maintain a separate count for the number of items in the queue, c) once you enumerate the queue you say it can never have any more enqueues to it ever, or d) you say that enumerating operations are blocked.
And I'm suggesting that (d) is the best option if you want to build this bounding functionality into ConcurrentQueue rather than creating a new type. We could always fall back to (b) in the future if the constraints were prohibitive and we were willing to accept the additional cost (though it would likely add at least some cost even for the non-bounded case, which would be concerning).
- IProducerConsumerCollection
.TryAdd would return false if there was no space available. - Enqueue on a full queue would spin until the item could be added.
- Most impactfully, ToArray, CopyTo, ICollection.CopyTo, GetEnumerator, and IEnumerable.GetEnumerator would all throw InvalidOperationException.
Too many changes in behavior because of a ctor param.
This functionality deserves to be a separate type.
Just occurred to me... as well as don't enqueue if full behaviour, I have other scenarios where overwrite oldest if full is also desirable (temporal data discarding oldest if behind, live video/data streaming, networking with packetloss)
With that in mind; would this work instead?
namespace System.Collections.Generic
{
public sealed class CircularQueue<T>
{
public CircularQueue(int capacity); // capacity must a power of 2 that's >= 2
public int Capacity { get; }
public int Count { get; }
public bool IsEmpty { get; }
public bool Enqueue(T item); // Returns whether queue was full, so item overwritten
public bool Enqueue(T item, out T discarded); // Additionally returns overwritten item
public bool TryEnqueue(T item);
public bool TryDequeue(out T item);
}
}
namespace System.Collections.Concurrent
{
public sealed class ConcurrentCircularQueue<T>
{
public ConcurrentCircularQueue(int capacity); // capacity must a power of 2 that's >= 2
public int Capacity { get; }
public int Count { get; }
public bool IsEmpty { get; }
public bool Enqueue(T item); // Returns whether queue was full, so item overwritten
public bool Enqueue(T item, out T discarded); // Additionally returns overwritten item
public bool TryEnqueue(T item);
public bool TryDequeue(out T item);
}
}
Then more controversially....
public partial sealed class ConcurrentCircularQueue<T>
{
// Completes when item queued (will wait if full)
public Task EnqueueAsync(T item, CancellationToken token = default);
}
as well as don't enqueue if full behaviour, I have other scenarios where overwrite oldest if full is also desirable
Then more controversially
Why isn't Channels sufficient for this, where this exact behavior is already supported? If you have a more efficient implementation that can be used for the bounded implementation in Channels, excellent, please submit that as a PR. I'm not seeing how this can be done efficiently in the current ConcurrentQueue implementation.
Why isn't Channels sufficient for this,
It probably is :)
Are you just referring to the Async; or the overwrite oldest on Enqueue?
Enqueueon a full queue would spin until the item could be added (the alternative would be to throw if we don't want to spin).
Was suggesting alternative is to overwrite instead; however then you'd want a return value saying if it did overwrite and possibly the item that was overwritten. So it still wouldn't fit with the existing api.
So it still wouldn't fit with the existing api.
Or the existing implementation... how would overwrite be implemented safely and efficiently without harming the rest of the implementation?
Are you just referring to the Async; or the overwrite oldest on Enqueue?
Primarily the former, but also the latter.
In principle it would be
bool Enqueue(T item, out T discard)
{
while (true)
{
if (TryEnqueue(T item))
{
discard = default;
return false;
}
else if (TryEnqueueDequeue(T item, out T discard))
{
return true;
}
}
}
Though I will admit TryEnqueueDequeue is close to step 2 in "How to draw an owl" 馃槃

Mostly I'm arguing for having a different data-structure rather than changing the behaviour of the existing ConcurrentQueue's methods based on a "mode" provided at construction.
A new data-struture would be more extensible for other methods that also fit its bounded mode of operation; whereas adding something like "overwrite when full" for Enqueue at a future date (working out Step 2.) would just make ConcurrentQueue more confusing.
A new data-struture would be more extensible for other methods that also fit its bounded mode of operation; whereas adding something like "overwrite when full" for Enqueue at a future date (working out Step 2.)
I'm just not understanding how this is different from Channel.CreateBounded<T>(...), which provides the exact support you're asking for.
I don't believe we should add any kind of asynchronous methods to ConcurrentQueue<T>. Changing the data structure to support a policy of overwriting some element would be complicated and very likely to not be pay-for-play. Changing the data structure to support a policy of dropping anything other than the head would similarly be problematic. It would be relatively simple to add a policy that supported dropping the oldest element, i.e. the next to be dequeued, but that could also be implemented on top just as easily (e.g. while (!TryEnqueue) TryDequeue), and since it would add at least some branching to the implementation, isn't warranted.
From my perspective, that means the only additional policy we might want to add would be the one expounded on in this issue, allowing a TryEnqueue to return false if the collection is full. But it does come with the additional constraints, where creating the queue in this mode would make some of the other APIs throw.
I don't have a strong preference about which way to go. I think we should add this functionality, it's just a question of whether to:
I don't believe we should add any kind of asynchronous methods to ...
Fine with that
Add it to the existing ConcurrentQueue... some APIs on ConcurrentQueue will throw if used
Don't think that's a good option
Expose a new type
This is good
Having a data structure change mode based on the constructor isn't very good for a consumer of that data structure ; its runtime/convention enforcement/erroring rather than compile time. In some situations its hard to avoid, but this would be just to avoid adding a new type.
There is a trade-off decision here: how much do we care about the type behaving like a collection (i.e. being enumerable) vs how much we care about performance. @stephentoub added this item as we're using it internally for building object pools. It seems there is an idea to make object pooling more first class. Let's look at this type when we're productizing an object pool to make sure we're building a type that doesn't end up being the worst of all worlds (not quite fast enough, not quite usable enough).
Most helpful comment
Too many changes in behavior because of a ctor param.
This functionality deserves to be a separate type.