'Why Concurrent Collector in Java besides using ConcurrentMap does not use some Concurrent List Implementation?

Consider the signature of the concurrentGroupBy:

static <T,K> Collector<T,?,ConcurrentMap<K,List<T>>> groupingByConcurrent(Function<? super T,? extends K> classifier)

Here consider the diagram below on how the parallel threads might be filling in the data into the respective lists corresponding to the respective keys.

enter image description here

  • Now - since at given point in time - there can be multiple threads that might be filling in a given list - why doesn't the implementation internally use some concurrent implementation of List?
  • the API documentation states that it does not provide any guarantees for the implementation to the thread safe - but shouldn't the list implementation be thread safe for this overall implementation to be actually concurrent?


Solution 1:[1]

Short answer:

There are two types of concurrent list implementations: synchronized versions of regular lists and java.util.concurrent.CopyOnWriteArrayList. On the one hand synchronized lists are not different from regular lists, on the other hand CopyOnWriteArrayList was designed for cases when reads are much more frequent than writes (it's totally not the case for a collector).

Long answer:

If we have a look at how groupingByConcurrent(Function<? super T,? extends K> classifier) works under the hood we will see that it just calls:

return groupingByConcurrent(classifier, ConcurrentHashMap::new, toList());

So by default it uses the regular toList() collector which is not concurrent (It doesn't contain Collector.Characteristics.CONCURRENT in its characteristics).

If you call the method above directly you will be able to supply it with a concurrent collector if you want. For example you could implement a collector that adds to java.util.concurrent.CopyOnWriteArrayList if you find it somehow beneficial but it would recreate an underlying array every time you insert into it. Most likely it won't create a performance boost. All the other concurrent implementations are more or less messing around just wrapping the call with additional synchronization (like Collections.synchronizedList()).

And it's basically what happens inside of the Collector<T, ?, M> groupingByConcurrent(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream) method. Here is the code snippet from JDK:

    if (downstream.characteristics().contains(Collector.Characteristics.CONCURRENT)) {
            accumulator = (m, t) -> {
                K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
                A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get());
                downstreamAccumulator.accept(resultContainer, t);
            };
        }
        else {
            accumulator = (m, t) -> {
                K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
                A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get());
                synchronized (resultContainer) {
                    downstreamAccumulator.accept(resultContainer, t);
                }
            };
        }

It checks internally whether a nested collector is concurrent and if it's not, just surrounds the mutating call with a synchronized block.

It means that there's no need to do a double synchronization in this case as built-in concurrent list implementations in JDK are mostly just synchronized versions of regular lists.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1