'Rx.Net - process groups asynchronously and in parallel with a constrained concurrency
Playing with System.Reactive trying to resolve the next task -
- Break an incoming stream of strings into groups
- Items in each group must be processed asynchronously and sequentially
- Groups must be processed in parallel
- No more than N groups must be processed at the same time
- Ideally, w/o using sync primitives
Here is the best I've figured out so far -
TaskFactory taskFactory = new (new LimitedConcurrencyLevelTaskScheduler(2));
TaskPoolScheduler scheduler = new (taskFactory);
source
.GroupBy(item => item)
.SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).ObserveOn(scheduler).Concat())
.Subscribe();
Any idea how to achieve it w/o a scheduler? Couldn't make it work via Merge()
Solution 1:[1]
To solve this problem using exclusively Rx tools, ideally you would like to have something like this:
source
.GroupBy(item => item.Key)
.Select(group => group.Select(
item => Observable.FromAsync(() => ProcessAsync(item))).Merge(1))
.Merge(maxConcurrent: N)
.Wait();
The inner Merge(1)
would enforce the exclusive processing within each group, and the outer Merge(N)
would enforce the global maximum concurrency policy. Unfortunately this doesn't work because the outer Merge(N)
restricts the subscriptions to the inner sequences (the IGroupedObservable<T>
s), not to their individual elements. This is not what you want. The result will be that only the first N groups to be processed, and the elements of all other groups will be ignored. The GroupBy
operator creates hot subsequences, and if you don't subscribe to them immediately you'll lose elements.
In order for the outer Merge(N)
to work as desired, you'll have to merge freely all the inner sequences that are produced by the Observable.FromAsync
, and have some other mechanism to serialize the processing of each group. One idea is to implement a special Select
operator that emits an Observable.FromAsync
only after the previous one is completed. Below is such an implementation, based on the Zip
operator. The Zip
operator maintains internally two hidden buffers, so that it can produce pairs from two sequences that might emit elements with different frequences. This buffering is exactly what we need in order to avoid losing elements.
private static IObservable<IObservable<TResult>> SelectOneByOne<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, IObservable<TResult>> selector)
{
var subject = new BehaviorSubject<Unit>(default);
var synchronizedSubject = Observer.Synchronize(subject);
return source
.Zip(subject, (item, _) => item)
.Select(item => selector(item).Do(
_ => { },
_ => synchronizedSubject.OnNext(default),
() => synchronizedSubject.OnNext(default)));
}
The BehaviorSubject<T>
contains initially one element, so the first pair will be produced immediately. The second pair will not be produced before the first element has been processed. The same with the third pair and second element, etc.
You could then use this operator to solve the problem like this:
source
.GroupBy(item => item.Key)
.SelectMany(group => group.SelectOneByOne(
item => Observable.FromAsync(() => ProcessAsync(item))))
.Merge(maxConcurrent: N)
.Wait();
The above solution is presented only for the purpose of answering the question. I don't think that I would trust it in a production environment.
Solution 2:[2]
The easiest way to enforce the "No more than N groups must be processed at the same time" limitation, is probably to use a SemaphoreSlim
. So instead of this:
.SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).Concat())
...you can do this:
var semaphore = new SemaphoreSlim(N, N);
//...
.SelectMany(g => g.Select(item => Observable.FromAsync(async () =>
{
await semaphore.WaitAsync();
try { return await onNextAsync(item); }
finally { semaphore.Release(); }
})).Merge(1))
Btw in the current Rx version (5.0.0) I don't trust the Concat
operator, and I prefer to use the Merge(1)
instead.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | Theodor Zoulias |