'Concurrency: Detecting async-unsafe code (e.g. by detecting a forked ExecutionContext)

I have a multi-threaded codebase that manipulates a stateful object that cannot be safely used concurrently.

However, it's hard to prevent concurrent usage, and easy to make mistakes, especially with all the various (useful!) async methods and possibilities that modern C# provides.

I can and do wrap the object and detect when simultaneous (and thus incorrect) access occurs, but that's a fairly fragile check, making it unattractive as a runtime check. When 2 or more async methods use the shared object, the check won't trigger unless they happen to do so exactly concurrently - so often that means that potentially faulty code will only rarely fault.

Worse, since the object is stateful, there exist state-transition patterns that are broken even without temporally overlapping access. Consider 2 threads both doing something like: get-value-from-object; perform-slow-computation-on-value; write-value-to-object. Any concurrency sanity checks on the object clearly won't trigger if two async contexts happen to do such operations when they happen to interleave (read or write) access to the shared object such they each access happens in a short window and doesn't coincide with other accesses. Essentially, this is a form of transaction isolation violation, but then in C# code, not a DB.

I want to see such errors more reliably, i.e. have some kind of fail-fast pattern that ideally triggers the programmer making the buggy change to see the error right away, and not after a tricky debugging session in production.

.net has a call-stack-like concept for async; this is clearly visible in the debugger, and there appears to be some level of api support for (or at least interaction with) it in the form of ExecutionContext, which enables stuff like AsyncLocal<> if I understand things correctly.

However, I have not found a way to detect when the conceptual async call stack turns into a tree; i.e. when the stateful object is accessed from 2 diverging branches of that async context. If I could detect such a tree-like state context, I could fail fast whenever an access to the stateful object is both preceded and succeeded by an access on another branch.

Is there some way to detect such async call context forking? Alternatively, how can I best fail-fast when mutable state is accessed from multiple async contexts, even when such access is not reliably temporally overlapping?



Solution 1:[1]

Is there some way to detect such async call context forking?

Yes

If your class has a member representing its state (e.g. private bool _isProcessing;), you can check that member to detect concurrent calls. You need to make sure you read and write that member from a safe context, like from a lock(_someMutex) block.

Next Steps

Depending on what you want to do when you detect concurrent calls, you can implement your class in different ways.

If you want to handle concurrent calls by queuing them, you might want to just wrap the body of all your method around an async lock. You don't need an _isProcessing member with this approach.

private SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private async Task<IDisposable> LockAsync()
{
    await _semaphore.WaitAsync();
    return new ActionDisposable(() => _semaphore.Release());
}

public async Task SomeAsyncMethod1()
{
    using (await LockAsync())
    {
        // Do some async stuff.
    }
}

public async Task SomeAsyncMethod2()
{
    using (await LockAsync())
    {
        // Do some other async stuff.
    }
}

If you want to discard concurrent calls, now the _isProcessing method is interesting.

private bool _isProcessing;
private object _mutex = new object();

public async Task SomeAsyncMethod1()
{
    lock (_mutex)
    {
        if (_isProcessing)
        {
            // Concurrent call detected.
            // Deal with it the way you want. (return null, throw an exception, whatever.)
            throw new InvalidOperationException("Concurrent calls are not supported.");
        }

        _isProcessing = true;
    }

    // Do some async stuff.

    _isProcessing = false;
}

public async Task SomeAsyncMethod2()
{
    lock (_mutex)
    {
        if (_isProcessing)
        {
            // Concurrent call detected.
            // Deal with it the way you want. (return null, throw an exception, whatever.)
            throw new InvalidOperationException("Concurrent calls are not supported.");
        }

        _isProcessing = true;
    }

    // Do some other async stuff.

    _isProcessing = false;
}

Solution 2:[2]

Maybe something like this can help? In your wrapper object you'd need to check AsyncLock.CanProceed, if it is false the caller didn't obtain a lock.

It would be possible to extend the code below to block while waiting for a lock.

using System;
using System.Threading;
using System.Diagnostics;
using System.Threading.Tasks;
                    
public class Program {
    static Random _rnd = new();
    
    public static async Task Main() {
        for (int i = 0; i < 9; i++) {
            var t1 = TestTask("1");
            var t2 = TestTask("2");
            var t3 = SubTask("3");
            
            await Task.WhenAll(t1, t2, t3);
        }
    }
    
    static async Task TestTask(string id) {
        try {
            await Task.Delay(_rnd.Next(50,250));
            using (var alock = await AsyncLock.RequestLock()) {
                Console.WriteLine($"TASK {id} {alock is not null} {AsyncLock.CanProceed}");
                await Task.Delay(_rnd.Next(50,250));
                await SubTask(id);
            }
        } catch {
            Console.WriteLine($"LOCK FAILED {id}");
        }
    }
    
    static async Task SubTask(string id) {
        try {
            await Task.Delay(_rnd.Next(50,250));
            if (AsyncLock.CanProceed) {
                Console.WriteLine($"TASK {id} SUB CAN PROCEED");
                return;
            } else {
                using (var alock = await AsyncLock.RequestLock()) {
                    Console.WriteLine($"TASK {id} SUB {alock is not null} {AsyncLock.CanProceed}");
                    await Task.Delay(50);
                }
            }
        } catch {
            Console.WriteLine($"LOCK FAILED {id} SUB");
        }
    }
}

public class AsyncLock: IDisposable {
    private static AsyncLock _lockGlobal;
    private static AsyncLocal<AsyncLock> _lockAsync = new();
    private static object _mutex = new();
    
    public bool IsDisposed { get; private set; }
    
    public static Task<AsyncLock> RequestLock() => RequestLock(new CancellationTokenSource().Token);
    
    public static Task<AsyncLock> RequestLock(CancellationToken cancellationToken) {
        lock (_mutex) {
            if (_lockGlobal is not null) {
                Console.WriteLine("BLOCK");
                var tcs = new CancellationTokenSource();
                tcs.Cancel();
                return Task.FromCanceled<AsyncLock>(tcs.Token);
            }
            
            Console.WriteLine("LOCK");
            return Task.FromResult(_lockAsync.Value = _lockGlobal = new AsyncLock());
        }
    }
    
    public static bool CanProceed => _lockAsync.Value is not null;
    
    private AsyncLock() {}
    
    public void Dispose() {
        lock (_mutex) {
            if (IsDisposed) return;
            Console.WriteLine("DISPOSE");
            
            Debug.Assert(_lockGlobal == this);
            Debug.Assert(_lockAsync.Value == this);
            
            IsDisposed = true;
            _lockAsync.Value = _lockGlobal = null;
        }
    }
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Batesias
Solution 2 Herman