'static ScheduledThreadPoolExecutor in CompletableFuture.Delayer

In java-9 the new method completeOnTimeout in the CompletableFuture class was introduced:

public CompletableFuture<T> completeOnTimeout(T value, long timeout,
                                              TimeUnit unit) {
    if (unit == null)
        throw new NullPointerException();
    if (result == null)
        whenComplete(new Canceller(Delayer.delay(
                                       new DelayedCompleter<T>(this, value),
                                       timeout, unit)));
    return this;
}

What I do not understand is why it uses the static ScheduledThreadPoolExecutor inside its implementation:

    static ScheduledFuture<?> delay(Runnable command, long delay,
                                    TimeUnit unit) {
        return delayer.schedule(command, delay, unit);
    }

Where

    static final ScheduledThreadPoolExecutor delayer;
    static {
        (delayer = new ScheduledThreadPoolExecutor(
            1, new DaemonThreadFactory())).
            setRemoveOnCancelPolicy(true);
    }

For me it is a very strange approach, as it can become a bottleneck for the whole application: the only one ScheduledThreadPoolExecutor with the only one thread keeping inside the pool for all possible CompletableFuture tasks?

What am I missing here?

P.S. It looks like:

  1. authors of this code were reluctant to extract this logic and preferred to reuse the ScheduledThreadPoolExecutor,

  2. and this apparently leaded to a such solution with a static variable, because it is very inefficient to create a new executor for each CompletableFuture.

But my doubt still remains, as I find general approach strange.



Solution 1:[1]

You are right, this could become a bottleneck, but not for the completion itself, which is merely setting a variable in the CompletableFuture. That single thread could complete millions of futures in a second. The critical aspect is that the completion could trigger the evaluation of dependent stages within the completing thread.

So

Executor neverDone = r -> {};
long t0 = System.nanoTime();
CompletableFuture<String> c11 =
    CompletableFuture.supplyAsync(() -> "foo", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApply(s -> {
            System.out.println("long dependent action 1 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
CompletableFuture<String> c12 =
    CompletableFuture.supplyAsync(() -> "bar", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApply(s -> {
            System.out.println("long dependent action 2 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
System.out.println("set up");
CompletableFuture.allOf(
    c11.thenAccept(System.out::println),
    c12.thenAccept(System.out::println)
).join();
System.out.println(Math.round((System.nanoTime()-t0)*1e-9)+" s");

will print

set up
long dependent action 1 Thread[CompletableFutureDelayScheduler,5,main]
timeout
long dependent action 2 Thread[CompletableFutureDelayScheduler,5,main]
timeout
12 s

Using the …Async chaining methods will eliminate the issue

Executor neverDone = r -> {};
long t0 = System.nanoTime();
CompletableFuture<String> c11 =
    CompletableFuture.supplyAsync(() -> "foo", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApplyAsync(s -> {
            System.out.println("long dependent action 1 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
CompletableFuture<String> c12 =
    CompletableFuture.supplyAsync(() -> "bar", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApplyAsync(s -> {
            System.out.println("long dependent action 2 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
System.out.println("set up");
CompletableFuture.allOf(
    c11.thenAccept(System.out::println),
    c12.thenAccept(System.out::println)
).join();
System.out.println(Math.round((System.nanoTime()-t0)*1e-9)+" s");

will print

set up
long dependent action 2 Thread[ForkJoinPool.commonPool-worker-2,5,main]
long dependent action 1 Thread[ForkJoinPool.commonPool-worker-9,5,main]
timeout
timeout
7 s

The conclusion is that when you have a potentially lengthy evaluation, you should always chain is via one of the …Async methods. Given the absence of control over the executing thread when using the methods without the “…Async” suffix (it could also be the thread calling the chaining method or any other thread calling a “completion method”, see also this answer), this is what you always should do.

Solution 2:[2]

For sure, this is a question to be answered by the authors. Anyway, here's my opinion on the matter.

What I do not understand is why does it use the static ScheduledThreadPoolExecutor inside its implementation:

...

For me it is a very strange approach, as it can become a bottleneck for the whole application: the only one ScheduledThreadPoolExecutor with the only one thread keeping inside the pool for all possible CompletableFuture tasks?

You're right. The ScheduledThreadPoolExecutor can run arbitrary code. Specifically, orTimeout() and completeOnTimeout() will call completeExceptionally() and complete(), which, by default, call dependents synchronously.

To avoid this behavior, you must use your own CompletionStage or subclass of CompletableFuture which makes non-*Async methods always call *Async methods. This is much easier since Java 9 by overriding newIncompleteFuture().

It looks like:

1) authors of this code were reluctant to extract this logic and preferred to reuse the ScheduledThreadPoolExecutor,

When ForkJoinPool appeared in Java 7, it lacked a common thread pool. Java 8 introduced the static commonPool(), used by default (among others) in the introduced CompletableFuture and Stream classes.

It seems they were reluctant to expose a common scheduled executor. This would be just as useful as the common thread pool to avoid having many rarely used scheduled executors spread out.

If you need delayed tasks with static intervals, then CompletableFuture.delayedExecutor() is probably good enough, given a small overhead of wrapping objects.

For variable intervals, there's the extra overhead of creating a wrapper Executor each time, but there are already a few created objects along the way, such as new instances of the internal Canceller, Timeout, DelayedCompleter and TaskSubmitter classes.

How often do we need to delay many tasks in variable intervals? Pure asynchronous code may do it all the time for varying timeouts, but since we don't have the scheduled executor itself exposed, either we assume this overhead or we use yet another static scheduler.

2) and this apparently leaded to such a solution with the static variable, because it is very inefficient to create a new executor for each CompletableFuture.

Exactly.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Holger
Solution 2 acelent