'Stop Parallel.ForEachAsync

In C#, I am interested in stopping a Parallel.ForEachAsync loop (considering the differences between Stop and Break); for Parallel.ForEach I can do the following:

Parallel.ForEach(items, (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        state.Stop();
        return;
    }

    // some process on the item
    Process(item);
});

However, since I have a process that needs to be executed asynchronously, I switched to using Parallel.ForEachAsync. The ForEachAsync does not have the Stop() method, I'm able to break the loop as the following, but I'm wondering if this the most effective way of breaking the look (in other words, the loop needs to stop ASAP when it receives the cancellation request).

await Parallel.ForEachAsync(items, async (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        return;
    }

    // some async process on the item
    await ProcessAsync(item);
});


Solution 1:[1]

The Parallel.ForEachAsync lambda has a CancellationToken as its second parameter. This token is supplied by the API, it's not the same token that you have passed in the ParallelOptions. You can forward this token to any asynchronous method that you invoke inside the lambda. If you invoke non-cancelable methods, then the best you can do is to call the ThrowIfCancellationRequested at strategic places inside the lambda:

var cts = new CancellationTokenSource();
var options = new ParallelOptions() { CancellationToken = cts.Token };

try
{
    await Parallel.ForEachAsync(items, options, async (item, ct) =>
    {
        //...
        ct.ThrowIfCancellationRequested();
        //...
        await ProcessAsync(item, ct);
        //...
        ct.ThrowIfCancellationRequested();
        //...
    });
}
catch (OperationCanceledException ex)
{
    // ...
}

The token provided as argument in the lambda, the ct in the above example, is canceled not only when the ParallelOptions.CancellationToken is canceled, but also in case a ProcessAsync operation has failed. This mechanism allows faster propagation of exceptions. The parallel loop does not complete immediately when an error occurs, because it follows the principle of disallowing fire-and-forget operations. All operations that are started internally by the loop, must be completed before the whole loop completes either successfully or with failure. The token in the lambda makes it possible to reduce this latency to a minimum.

Solution 2:[2]

You're going to need something like this:

await Parallel.ForEachAsync(items, async (item, state) =>
{
    await ProcessAsync(item, cancellationToken);
});


async Task ProcessAsync(string item, CancellationToken ct)
{
    while (!ct.IsCancellationRequested)
    {
        //Process
    }
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Enigmativity