'static ScheduledThreadPoolExecutor in CompletableFuture.Delayer
In java-9 the new method completeOnTimeout
in the CompletableFuture
class was introduced:
public CompletableFuture<T> completeOnTimeout(T value, long timeout,
TimeUnit unit) {
if (unit == null)
throw new NullPointerException();
if (result == null)
whenComplete(new Canceller(Delayer.delay(
new DelayedCompleter<T>(this, value),
timeout, unit)));
return this;
}
What I do not understand is why it uses the static ScheduledThreadPoolExecutor
inside its implementation:
static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
Where
static final ScheduledThreadPoolExecutor delayer;
static {
(delayer = new ScheduledThreadPoolExecutor(
1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
For me it is a very strange approach, as it can become a bottleneck for the whole application: the only one ScheduledThreadPoolExecutor
with the only one thread keeping inside the pool for all possible CompletableFuture
tasks?
What am I missing here?
P.S. It looks like:
authors of this code were reluctant to extract this logic and preferred to reuse the
ScheduledThreadPoolExecutor
,and this apparently leaded to a such solution with a static variable, because it is very inefficient to create a new executor for each
CompletableFuture
.
But my doubt still remains, as I find general approach strange.
Solution 1:[1]
You are right, this could become a bottleneck, but not for the completion itself, which is merely setting a variable in the CompletableFuture
. That single thread could complete millions of futures in a second. The critical aspect is that the completion could trigger the evaluation of dependent stages within the completing thread.
So
Executor neverDone = r -> {};
long t0 = System.nanoTime();
CompletableFuture<String> c11 =
CompletableFuture.supplyAsync(() -> "foo", neverDone)
.completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
.thenApply(s -> {
System.out.println("long dependent action 1 "+Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return s;
});
CompletableFuture<String> c12 =
CompletableFuture.supplyAsync(() -> "bar", neverDone)
.completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
.thenApply(s -> {
System.out.println("long dependent action 2 "+Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return s;
});
System.out.println("set up");
CompletableFuture.allOf(
c11.thenAccept(System.out::println),
c12.thenAccept(System.out::println)
).join();
System.out.println(Math.round((System.nanoTime()-t0)*1e-9)+" s");
will print
set up
long dependent action 1 Thread[CompletableFutureDelayScheduler,5,main]
timeout
long dependent action 2 Thread[CompletableFutureDelayScheduler,5,main]
timeout
12 s
Using the …Async
chaining methods will eliminate the issue
Executor neverDone = r -> {};
long t0 = System.nanoTime();
CompletableFuture<String> c11 =
CompletableFuture.supplyAsync(() -> "foo", neverDone)
.completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
.thenApplyAsync(s -> {
System.out.println("long dependent action 1 "+Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return s;
});
CompletableFuture<String> c12 =
CompletableFuture.supplyAsync(() -> "bar", neverDone)
.completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
.thenApplyAsync(s -> {
System.out.println("long dependent action 2 "+Thread.currentThread());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return s;
});
System.out.println("set up");
CompletableFuture.allOf(
c11.thenAccept(System.out::println),
c12.thenAccept(System.out::println)
).join();
System.out.println(Math.round((System.nanoTime()-t0)*1e-9)+" s");
will print
set up
long dependent action 2 Thread[ForkJoinPool.commonPool-worker-2,5,main]
long dependent action 1 Thread[ForkJoinPool.commonPool-worker-9,5,main]
timeout
timeout
7 s
The conclusion is that when you have a potentially lengthy evaluation, you should always chain is via one of the …Async
methods. Given the absence of control over the executing thread when using the methods without the “…Async” suffix (it could also be the thread calling the chaining method or any other thread calling a “completion method”, see also this answer), this is what you always should do.
Solution 2:[2]
For sure, this is a question to be answered by the authors. Anyway, here's my opinion on the matter.
What I do not understand is why does it use the static
ScheduledThreadPoolExecutor
inside its implementation:...
For me it is a very strange approach, as it can become a bottleneck for the whole application: the only one
ScheduledThreadPoolExecutor
with the only one thread keeping inside the pool for all possibleCompletableFuture
tasks?
You're right. The ScheduledThreadPoolExecutor
can run arbitrary code. Specifically, orTimeout()
and completeOnTimeout()
will call completeExceptionally()
and complete()
, which, by default, call dependents synchronously.
To avoid this behavior, you must use your own CompletionStage
or subclass of CompletableFuture
which makes non-*Async
methods always call *Async
methods. This is much easier since Java 9 by overriding newIncompleteFuture()
.
It looks like:
1) authors of this code were reluctant to extract this logic and preferred to reuse the
ScheduledThreadPoolExecutor
,
When ForkJoinPool
appeared in Java 7, it lacked a common thread pool. Java 8 introduced the static commonPool()
, used by default (among others) in the introduced CompletableFuture
and Stream
classes.
It seems they were reluctant to expose a common scheduled executor. This would be just as useful as the common thread pool to avoid having many rarely used scheduled executors spread out.
If you need delayed tasks with static intervals, then CompletableFuture.delayedExecutor()
is probably good enough, given a small overhead of wrapping objects.
For variable intervals, there's the extra overhead of creating a wrapper Executor
each time, but there are already a few created objects along the way, such as new instances of the internal Canceller
, Timeout
, DelayedCompleter
and TaskSubmitter
classes.
How often do we need to delay many tasks in variable intervals? Pure asynchronous code may do it all the time for varying timeouts, but since we don't have the scheduled executor itself exposed, either we assume this overhead or we use yet another static scheduler.
2) and this apparently leaded to such a solution with the static variable, because it is very inefficient to create a new executor for each
CompletableFuture
.
Exactly.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Holger |
Solution 2 | acelent |