'During thread contention how can I speed up this ConcurrentQueue implementation which uses ReaderWriterLockSlim over a regular Queue<T>

Question: How can I implement a faster thread safe queue to support an object pool when under heavy thread contention?

Scenario: My overall final objective is a pure Dot Net implementation of a Micro Services host with performance as a feature: https://github.com/tcwicks/ChillX

Example Use Case: Unity ML Agents using TorchSharp but also able to utilize the processing power of multiple NVidia cards (in my case 4X 3090 cards). Performance is critical because PPO machine learning requires many millions of iterations before it even begins to learn. In other words very tight loops serializing data messages hundreds of millions of times as fast as possible. With this current code depending on the model I get between 2 to 6 million iterations per day (depending on the model) and it is only enough data to utilize one of the four video cards at around 60%.

Custom Implementation of ConcurrentQueue<T> using ReaderWriterLockSlim https://github.com/tcwicks/ChillX/blob/master/src/ChillX.Core/Structures/ThreadSafeQueue.cs

Turns out it is slightly faster than using System.Collections.Concurrent.ConcurrentQueue<T>

Note: both ConcurrentQueue<T> and ThreadSafeQueue.cs above are extremely fast when there is NO thread contention. However when there is high thread contention (example 4 threads in tight loops) it tends to struggle.

Objective: I am looking for a way to create a queue with faster performance than this during thread contention. Attempts at a lock free queue (see below) did not perform any better.

I am aware that this may be considered a Micro Optimization. The reason I need this to run faster is because this scenario is for machine learning (or other high volume transaction processing) where I have multiple producers (agents) feeding requests (messages) or training data to a central instance (consumer). For the machine learning scenario the central instance is executing these requests against Tensorflow.Net / TorchSharp running across 4 X 3090 video cards. the Tensorflow.Net / TorchSharp model is Proximal Policy optimization and therefore the result of each request then needs to be sent back to the producer (agent). I need to be able to serialize well over 200K messages per second to fully utilize the four video cards. However I am only able to get a throughput of around 43K messages per second. Each message is multiple float[] arrays plus control parameters such as score (float) step (int) etc... The overall reason why I need this performance is because I am trying to cut down 1 month for a training run down to about 1 week.

I'm using the Thread safe queue as a singleton to provide a backing store for object pooling for serialization across TCP sockets in a custom message queue implementation. The objects being pooled are management objects which wrap ArrayPool<byte> buffers in order to:

A) manage the renting and returning of ArrayPool<T> buffers https://github.com/tcwicks/ChillX/blob/master/src/ChillX.Core/Structures/RentedBuffer.cs

B) Guarantee the returning of rented buffers in order to prevent memory leaks https://github.com/tcwicks/ChillX/blob/master/src/ChillX.Core/Structures/RentedBufferContract.cs

Minimal GC collection overheads is also a very important factor.

Object Pooling code is here: https://github.com/tcwicks/ChillX/blob/master/src/ChillX.Core/Structures/ManagedPool.cs

Consider that serializing an object with say 30 array properties / fields marked for serialization results in not one buffer but multiple. This is because the buffers are used for the array properties themselves as well as for the byte buffers used in serialization. Plus packing an object which has other object properties means nested levels of serialization which requires even more buffers. The Serializer itself is here:

https://github.com/tcwicks/ChillX/blob/master/src/ChillX.Serialization/TypedSerializer.cs

I'm using a custom extended version of BitConverter which reads from and writes to afore mentioned (pooled) rented buffers:

https://github.com/tcwicks/ChillX/blob/master/src/ChillX.Serialization/BitConverterExtended.cs

Note: I realize I could use MemoryPool<T> however that does not solve my issue because using MemoryPool<T> would require creating yet another class to manage its renting and returning as it does NOT autoreturn when it goes out of scope.

Note: For the end solution (Message Queue) Already looked at ZeroMQ, RabitMQ etc... but I need faster speeds. In fact I was previously using ZeroMQ with MessagePack but I need faster speeds. The issue with MessagePack + ZeroMQ is not their native speed itself. Rather its to do with the amount of allocations and therefore GC Collection overheads.

Hence I've written from scratch serializer, socket transport, and object pooling. For which a thread safe queue is a basic building block. In my current implementation when under heavy thread contention the performance gains from the serializer are negated by the queue implementation where I am using it for object pooling of wrappers around ArrayPool< T > for managing their return and preventing memory leaks. These objects must be pooled because otherwise we are back to the issue of GC collection overheads.

Tried Julian M Bucknall's Lock Free Queue. https://secondboyet.com/Articles/LockfreeQueue.html However his implementation generates an allocation for each queue insert which creates a massive amount of garbage. End result is it ends up with 60% time spent in GC.

Tried extending it using object pooling however since the object pool uses the above ThreadSafeQueue implementation it is bottlenecked on the performance of the same.

Extended (optional object pooling) implementation of Julian M Bucknall's Lock Free Queue is here:

https://github.com/tcwicks/ChillX/blob/master/src/TestApps/ChillX.MQServer.Benchmark/LockFreeQueue.cs

Next implemented a lock free structure from scratch without pooling but using a queue of ring buffers to minimize allocations. Its performance is twice as fast as the LockFreeQueue above. 367ms vs 607ms for 1000000 queue / dequeue operations across 8 threads. however it is still 50% slower than ThreadSafeQueue which uses ReaderWriterLockSlim. Note LockFreeRingBufferQueue is a 99% lock free implementation:

https://github.com/tcwicks/ChillX/blob/master/src/ChillX.Core/Structures/LockFreeRingBufferQueue.cs

Any ideas or suggestions would be highly appreciated.

Benchmark Code is here: https://github.com/tcwicks/ChillX/blob/master/src/TestApps/ChillX.MQServer.Benchmark/Bench_Queue.cs

If running the benchmark App (link below) choose option 2 from the menu. https://github.com/tcwicks/ChillX/tree/master/src/TestApps/ChillX.MQServer.Benchmark

Here are the benchmark results using Benchmark.Net comparing above mentioned LockFreeQueue (Lock Free) versus ThreadSafeQueue (ReaderWriterLockSlim) versus the standard ConcurrentQueue:

Note: ConcurrentQueueCount method is checking the count before TryDequeue. ConcurrentQueueTry is doing a TryDequeue without checking the count.

/*
BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19044.1645 (21H2)
AMD Ryzen Threadripper 3970X, 1 CPU, 64 logical and 32 physical cores
.NET SDK=6.0.202
  [Host]   : .NET 6.0.4 (6.0.422.16404), X64 RyuJIT  [AttachedDebugger]
  .NET 6.0 : .NET 6.0.4 (6.0.422.16404), X64 RyuJIT

Job=.NET 6.0  Runtime=.NET 6.0

|                 Method |           m_TestMode | numRepititions | numThreads |     Mean |    Error |   StdDev |     Gen 0 | Completed Work Items | Lock Contentions |     Gen 1 |     Gen 2 |    Allocated |
|----------------------- |--------------------- |--------------- |----------- |---------:|---------:|---------:|----------:|---------------------:|-----------------:|----------:|----------:|-------------:|
| Bench_QueuePerformance |        LockFreeQueue |        1000000 |          4 | 607.8 ms | 11.97 ms | 15.14 ms | 5000.0000 |                    - |                - | 3000.0000 | 1000.0000 | 40,002,976 B |
| Bench_QueuePerformance |      ThreadSafeQueue |        1000000 |          4 | 322.0 ms | 19.68 ms | 58.02 ms |         - |                    - |                - |         - |         - |        992 B |
| Bench_QueuePerformance | ConcurrentQueueCount |        1000000 |          4 | 421.2 ms |  8.38 ms | 14.90 ms |         - |                    - |                - |         - |         - |      9,360 B |
| Bench_QueuePerformance |   ConcurrentQueueTry |        1000000 |          4 | 330.1 ms |  6.82 ms | 20.01 ms |         - |                    - |          11.5000 |         - |         - |  1,116,912 B |
 */

Here are the benchmark results for LockFreeRingBufferQueue versus ThreadSafeQueue (ReaderWriterLockSlim) versus the standard ConcurrentQueue

OS=Windows 10.0.19044.1645 (21H2)
AMD Ryzen Threadripper 3970X, 1 CPU, 64 logical and 32 physical cores
.NET SDK=6.0.202
  [Host]   : .NET 6.0.4 (6.0.422.16404), X64 RyuJIT  [AttachedDebugger]
  .NET 6.0 : .NET 6.0.4 (6.0.422.16404), X64 RyuJIT

Job=.NET 6.0  Runtime=.NET 6.0

| Method                 | m_TestMode           | numRepititions | numThreads | Mean      | Error      | StdDev     | Lock Contentions  |     Gen 0  |     Gen 1  |    Gen 2  | Allocated  |
|----------------------- |--------------------- |--------------- |----------- |----------:| ----------:| ----------:| -----------------:| ----------:| ----------:| ---------:| ----------:|
| Bench_QueuePerformance | RingBufferQueue      | 1000000        | 1          | 80.31 ms  | 3.194 ms   | 8.742 ms   | -                 | 3400.0000  | 1400.0000  | -         | 28,566 KB  |
| Bench_QueuePerformance | ThreadSafeQueue      | 1000000        | 1          | 96.60 ms  | 1.910 ms   | 3.901 ms   | -                 | -          | -          | -         | 1 KB       |
| Bench_QueuePerformance | ConcurrentQueueCount | 1000000        | 1          | 70.94 ms  | 1.410 ms   | 2.750 ms   | -                 | 500.0000   | 500.0000   | 500.0000  | 2,948 KB   |
| Bench_QueuePerformance | ConcurrentQueueTry   | 1000000        | 1          | 49.13 ms  | 2.274 ms   | 6.704 ms   | 0.1250            | -          | -          | -         | 283 KB     |
| Bench_QueuePerformance | RingBufferQueue      | 1000000        | 4          | 367.08 ms | 10.954 ms  | 32.128 ms  | -                 | 3000.0000  | 1000.0000  | -         | 28,566 KB  |
| Bench_QueuePerformance | ThreadSafeQueue      | 1000000        | 4          | 253.59 ms | 12.344 ms  | 36.398 ms  | -                 | -          | -          | -         | 1 KB       |
| Bench_QueuePerformance | ConcurrentQueueCount | 1000000        | 4          | 341.09 ms | 12.249 ms  | 36.116 ms  | 18.5000           | -          | -          | -         | 1,540 KB   |
| Bench_QueuePerformance | ConcurrentQueueTry   | 1000000        | 4          | 266.15 ms | 13.365 ms  | 39.408 ms  | 11.6667           | 333.3333   | 333.3333   | 333.3333  | 2,223 KB   |

Code implementation for each of the benchmarks:

4 threads are queueing items:

switch (m_TestMode)
{
    case TestMode.LockFreeQueue:
        for (int I = 0; I < numReps; I++)
        {
            m_queueLockFree.Enqueue(I);
            Interlocked.Increment(ref QueueSize);
        }
        break;
    case TestMode.ThreadSafeQueue:
        for (int I = 0; I < numReps; I++)
        {
            m_queueThreadSafe.Enqueue(I);
            Interlocked.Increment(ref QueueSize);
        }
        break;
    case TestMode.RingBufferQueue:
        for (int I = 0; I < numReps; I++)
        {
            m_queueThreadSafeRingBuffer.Enqueue(I);
            Interlocked.Increment(ref QueueSize);
        }
        break;
    case TestMode.ConcurrentQueueCount:
        for (int I = 0; I < numReps; I++)
        {
            m_queueConcurrent.Enqueue(I);
            Interlocked.Increment(ref QueueSize);
        }
        break;
    case TestMode.ConcurrentQueueTry:
        for (int I = 0; I < numReps; I++)
        {
            m_queueConcurrent.Enqueue(I);
            Interlocked.Increment(ref QueueSize);
        }
        break;
}

Simultaneously 4 threads are dequeuing items

switch (m_TestMode)
{
    case TestMode.LockFreeQueue:
        while (ThreadsIsRunning)
        {
            success = false;
            while (!success)
            {
                if (m_queueLockFree.Count > 0)
                {
                    m_queueLockFree.DeQueue();
                    success = true;
                }
                else
                {
                    if (!ThreadsIsRunning) { break; }
                }
            }
            Interlocked.Decrement(ref QueueSize);
        }
        break;
    case TestMode.ThreadSafeQueue:
        while (ThreadsIsRunning)
        {
            success = false;
            while (!success)
            {
                if (m_queueThreadSafe.Count > 0)
                {
                    m_queueThreadSafe.DeQueue();
                    success = true;
                }
                else
                {
                    if (!ThreadsIsRunning) { break; }
                }
            }
            Interlocked.Decrement(ref QueueSize);
        }
        break;
    case TestMode.RingBufferQueue:
        while (ThreadsIsRunning)
        {
            success = false;
            while (!success)
            {
                m_queueThreadSafeRingBuffer.DeQueue(out success);
                if (!ThreadsIsRunning) { break; }
            }
            Interlocked.Decrement(ref QueueSize);
        }
        break;
    case TestMode.ConcurrentQueueCount:
        while (ThreadsIsRunning)
        {
            success = false;
            while (!success)
            {
                // In order to keep this fair we are also checking .Count property
                if (m_queueConcurrent.Count > 0 && m_queueConcurrent.TryDequeue(out item))
                {
                    success = true;
                }
                else
                {
                    if (!ThreadsIsRunning) { break; }
                }
            }
            Interlocked.Decrement(ref QueueSize);
        }
        break;
    case TestMode.ConcurrentQueueTry:
        while (ThreadsIsRunning)
        {
            success = false;
            while (!success)
            {
                if (m_queueConcurrent.TryDequeue(out item))
                {
                    success = true;
                }
                else
                {
                    if (!ThreadsIsRunning) { break; }
                }
            }
            Interlocked.Decrement(ref QueueSize);
        }
        break;
}


Solution 1:[1]

Here is my attempt to reproduce your observations. I create 8 worker threads, where each thread first enqueues and then dequeues 1,000 items from the queue, in a loop, for a specific duration (1,500 msec). At the end I measure the total number of all enqueue/dequeue operations (by all workers), and I adjust it per second. I also display the memory allocations per second, as well as the LockContentionCount per second.

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public static class Program
{
    public static void Main()
    {
        Debug.Assert(false, "Debug built");
        const int workersCount = 8;
        const int durationOfEachTest = 1500;
        const int batchOperations = 1000;
        ThreadPool.SetMinThreads(workersCount, workersCount);
        Console.WriteLine($"Workers: {workersCount}, " +
            $"ProcessorCount: {Environment.ProcessorCount}");
        Console.WriteLine($"Duration of each test: {durationOfEachTest} msec, " +
            $"Batch enqueue/dequeue: {batchOperations:#,0}");

        // Warmup
        Test(null, new ConcurrentQueue<int>(), 100);
        //Test(null, new ConcurrentStack<int>(), 100);
        //Test(null, new ConcurrentBag<int>(), 100);
        Test(null, new MyConcurrentQueue<int>(), 100);

        Test("ConcurrentQueue", new ConcurrentQueue<int>(), durationOfEachTest);
        //Test("ConcurrentStack", new ConcurrentStack<int>(), durationOfEachTest);
        //Test("ConcurrentBag", new ConcurrentBag<int>(), durationOfEachTest);
        Test("Queue+Lock", new MyConcurrentQueue<int>(), durationOfEachTest);

        static void Test(string title, IProducerConsumerCollection<int> queue,
            int durationMsec)
        {
            if (title != null) Console.WriteLine();
            if (title != null) Console.WriteLine($"{title}");
            GC.Collect();
            GC.WaitForPendingFinalizers();
            var lcc0 = Monitor.LockContentionCount;
            var mem0 = GC.GetTotalAllocatedBytes(true);
            var stopwatch = Stopwatch.StartNew();
            var workers = Enumerable.Range(1, workersCount).Select(n => Task.Run(() =>
            {
                long outerLoops = 0;
                while (stopwatch.ElapsedMilliseconds < durationMsec)
                {
                    outerLoops++;
                    for (int i = 0; i < batchOperations; i++) queue.TryAdd(i);
                    for (int i = 0; i < batchOperations; i++) queue.TryTake(out _);
                }
                return outerLoops * batchOperations * 2;
            })).ToArray();
            long totalLoops = Task.WhenAll(workers).Result.Sum();
            var mem1 = GC.GetTotalAllocatedBytes(true);
            var lcc1 = Monitor.LockContentionCount;
            if (title != null) Console.WriteLine(
                $"Total operations per second: {(totalLoops * 1000L) / durationMsec:#,0}");
            if (title != null) Console.WriteLine(
                $"Allocated: {(mem1 - mem0) * 1000L / durationMsec:#,0} bytes per second, " +
                $"Lock contention per second: {(lcc1 - lcc0) * 1000L / durationMsec:#,0}");
            if (queue.Count > 0) throw new InvalidOperationException();
        }
    }

    private class MyConcurrentQueue<T> : IProducerConsumerCollection<T>
    {
        private readonly Queue<T> _queue = new();
        public bool TryAdd(T item)
        {
            lock (_queue) _queue.Enqueue(item); return true;
        }
        public bool TryTake(out T item)
        {
            lock (_queue)
            {
                if (_queue.Count == 0) { item = default; return false; }
                item = _queue.Dequeue(); return true;
            }
        }
        public int Count { get { lock (_queue) return _queue.Count; } }
        public bool IsSynchronized => throw new NotImplementedException();
        public object SyncRoot => throw new NotImplementedException();
        public void CopyTo(T[] array, int index) => throw new NotImplementedException();
        public void CopyTo(Array array, int index) => throw new NotImplementedException();
        public IEnumerator<T> GetEnumerator() => throw new NotImplementedException();
        IEnumerator IEnumerable.GetEnumerator() => throw new NotImplementedException();
        public T[] ToArray() => throw new NotImplementedException();
    }
}

Output on my PC:

Workers: 8, ProcessorCount: 4
Duration of each test: 1500 msec, Batch enqueue/dequeue: 1,000

ConcurrentQueue
Total operations per second: 15,700,000
Allocated: 2,799,621 bytes per second, Lock contention per second: 21

Queue+Lock
Total operations per second: 11,642,666
Allocated: 46,256 bytes per second, Lock contention per second: 10,310

Output on dotnetfiddle.net:

Workers: 8, ProcessorCount: 4
Duration of each test: 1500 msec, Batch enqueue/dequeue: 1,000

ConcurrentQueue
Total operations per second: 14,281,333
Allocated: 5,596,789 bytes per second, Lock contention per second: 24

Queue+Lock
Total operations per second: 22,566,666
Allocated: 46,768 bytes per second, Lock contention per second: 4,912

So on my PC the ConcurrentQueue<T> is a bit faster, while on dotnetfiddle.net is a bit slower than a Queue+lock. I have no idea why. The lock contention is also higher on my PC, which affects primarily the Queue+lock implementation (it makes it slower). Changing the number of workers or the number of batch operations affects these measurements in various ways.

The ConcurrentQueue<T> allocates less than a byte per operation, which is not that much in the grand scheme of things. But of course the Queue+lock is the clear winner in this aspect, since it's essentially allocation-free.

Solution 2:[2]

If you want more throughput then stop causing unnecessary thread contention. Pair up your Consumers and Producers. Give each Consumer it's own Queue and have a designated Producer add messages to it.

If your Production vs Consumption ratio is close to 1:1 then don't even bother with a Queue. Have a couple slots in memory for each Producer to store messages and have the paired Consumer poll those slots.

Here's an example of the memory slots idea vs ConcurrentQueue. The FastSwap class requires a 1 Producer to 1 Consumer pairing.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;

namespace TestQueue
{
    class Program
    {
        const int tries = 10;
        const int tasks = 4; // producers and consumers
        const int iterations = 500000;
        static Dummy[] dummies = Enumerable.Range(0, iterations).Select(i => new Dummy() { X = i }).ToArray();
        static Stopwatch s1;
        static Stopwatch s2;
        static Task[] t1 = new Task[tasks * 2];
        static Task[] t2 = new Task[tasks * 2];
        static long a1;
        static long a2;
        static volatile int c1 = 0;
        static volatile int c2 = 0;
        static ConcurrentQueue<Dummy> q1;
        static FastSwap<Dummy> q2;
        static void Main(string[] args)
        {
            // 1
            a1 = 0;
            for (int t = 0; t < tries; t++)
            {
                q1 = new ConcurrentQueue<Dummy>();
                c1 = 0;
                s1 = new Stopwatch();
                GC.Collect();
                GC.WaitForPendingFinalizers();
                s1.Start();
                for (int i = 0; i < tasks * 2; i++)
                {
                    if ((i & 1) == 0)
                    { // producer
                        t1[i] = new Task(() =>
                        {
                            for (int x = 0; x < iterations; x++)
                            {
                                q1.Enqueue(dummies[x]);
                            }
                        });
                    }
                    else
                    { // consumer
                        t1[i] = new Task(() => {
                            Dummy tmp;
                            while (c1 < (iterations * tasks))
                            {
                                if (q1.TryDequeue(out tmp))
                                {
                                    Interlocked.Increment(ref c1);
                                }
                            }
                        });
                    }
                    t1[i].Start();
                }
                Task.WaitAll(t1);
                s1.Stop();
                a1 += s1.ElapsedMilliseconds;
                Console.WriteLine($"Test {t} Queue 1 result: {s1.ElapsedMilliseconds} ms");
            }
            Console.WriteLine($"Queue 1 avg: {a1 / tries} ms");
            // 2
            a2 = 0;
            for (int t = 0; t < tries; t++)
            {
                q2 = new FastSwap<Dummy>(tasks);
                c2 = 0;
                s2 = new Stopwatch();
                GC.Collect();
                GC.WaitForPendingFinalizers();
                s2.Start();
                for (int i = 0; i < tasks * 2; i++)
                {
                    if ((i & 1) == 0)
                    { // producer
                        int proIdx = i >> 1;
                        t2[i] = new Task(() =>
                        {
                            for (int x = 0; x < iterations; x++)
                            {
                                q2.Insert(proIdx, dummies[x]);
                            }
                        });
                    }
                    else
                    { // consumer
                        int conIdx = (i - 1) >> 1;
                        t2[i] = new Task(() => {
                            Dummy tmp;
                            while (c2 < (iterations * tasks))
                            {
                                if ((tmp = q2.Take(conIdx)) != null)
                                {
                                    Interlocked.Increment(ref c2);
                                }
                            }
                        });
                    }
                    t2[i].Start();
                }
                Task.WaitAll(t2);
                s2.Stop();
                a2 += s2.ElapsedMilliseconds;
                Console.WriteLine($"Test {t} Queue 2 result: {s2.ElapsedMilliseconds} ms");
            }
            Console.WriteLine($"Queue 2 avg: {a2 / tries} ms");
            //
            Console.ReadLine();
        }
        public class Dummy
        {
            public int X;
            public Dummy() { }
        }
        public class FastSwap<T> where T : class, new()
        {
            const int slots = 2; // slots per producer
            readonly int _producers;
            readonly T[] _buffer;
            public FastSwap(int producers)
            {
                _producers = producers;
                _buffer = new T[_producers * slots];
            }
            public void Insert(int producerIndex, T item)
            {
                while(true)
                {
                    // wait for a free slot
                    // unroll based on # of slots
                    if(_buffer[producerIndex] == null)
                    {
                        _buffer[producerIndex] = item;
                        return;
                    }
                    else if (_buffer[producerIndex + _producers] == null)
                    {
                        _buffer[producerIndex + _producers] = item;
                        return;
                    }
                }
            }
            public T Take(int producerOndex)
            {
                T tmp = null;
                //int r = 100;
                //while (--r != 0) 
                //{
                    // check the slots for data
                    // unroll based on # of slots
                    if (_buffer[producerOndex] != null)
                    {
                        tmp = _buffer[producerOndex];
                        _buffer[producerOndex] = null;
                        //break;
                    }
                    else if (_buffer[producerOndex + _producers] != null)
                    {
                        tmp = _buffer[producerOndex + _producers];
                        _buffer[producerOndex + _producers] = null;
                        //break;
                    }
                //}
                return tmp;
            }
        }
    }
}

The more speed you require the more single-use (less generic) your solution will end up being.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Theodor Zoulias
Solution 2 Louis Ricci