'Getting reactor.pool.PoolShutdownException during save in database

Service is using org.springframework.r2dbc.core.DatabaseClient with reactor-pool and r2dbc-mysql driver.

I'm doing inserts in the database every 5-10 seconds (50-100 insert statements) and randomly after every 2-3 hours I'm getting reactor.pool.PoolShutdownException: Pool has been shut down, what might be the reason for this behavior?

Dependecy versions:

  • r2dbc-pool: 0.8.8.RELEASE
  • r2dbc-mysql: 0.8.2
  • spring-r2dbc: 5.3.15

Stacktrace:

org.springframework.dao.DataAccessResourceFailureException: Failed to obtain R2DBC Connection; nested exception is reactor.pool.PoolShutdownException: Pool has been shut down
at org.springframework.r2dbc.connection.ConnectionFactoryUtils.lambda$getConnection$0(ConnectionFactoryUtils.java:88)
at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3733)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
at reactor.core.publisher.FluxRetry$RetrySubscriber.onError(FluxRetry.java:95)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
at reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:477)
at reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:264)
at reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:432)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110)
at reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:676)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.FluxRetry$RetrySubscriber.resubscribe(FluxRetry.java:117)
at reactor.core.publisher.MonoRetry.subscribeOrReturn(MonoRetry.java:50)
at reactor.core.publisher.Mono.subscribe(Mono.java:4385)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:104)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251)
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCompletionStage.lambda$subscribe$0(MonoCompletionStage.java:83)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(Unknown Source)
at reactor.core.publisher.MonoCompletionStage.subscribe(MonoCompletionStage.java:58)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)


Solution 1:[1]

Basically, it happens when you have too many pending acquired connections

Example: your connection pool is 100 but you are trying to do 500 parallel inserts, where 400 will be in pending status).

In this situation, reactor-pool disposes connection pool. So to avoid such an issue, I’m controlling the number of parallel executions.


Actually I see two ways to handle this case:

  1. (The right way in my opinion) Is to control the flow of incoming messages by specifying concurrency parameter on the operator (concurrency =< then pool size)
   flux
     //some intermediate operators
      .flatMap( {databaseOperation(it) }, poolSize)

In this case, you won't have more parallel executions than your connection pool can afford.

  1. using delayUntil operator, which delays elements until there are unused connections (you can use connection pool metrics to retrieve that). I wouldn't recommend this approach because you can end up with of memory exception if you are not controlling back-pressure, or you will have to drop some items if the buffer overflows.

Method that delays message until there are available connections

fun <T> Flux<T>.delayUntilHasAvailableConnections(pool: ConnectionPool): Flux<T> {
    val hasAvailableConnections = Supplier {
        val metrics = pool.metrics.get()
        metrics.pendingAcquireSize() <= metrics.maxAllocatedSize
    }

    val connectionsExistsMono = Mono.fromSupplier(hasAvailableConnections)

    val hasConnectionMono = connectionsExistsMono
        .handle { hasConnections, sink: SynchronousSink<Boolean> ->
            if (hasConnections) {
                sink.next(hasConnections)
            } else {
                sink.error(RuntimeException("No Connections"))
            }
        }.retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5)))

    return delayUntil { hasConnectionMono }
}

Usage:

   flux
     //some intermediate operators
      .delayUntilHasAvailableConnections(connectionPool)
      .flatMap { databaseOperation(it) } 

Solution 2:[2]

As Vladlen pointed out, reactor-pool has the ability to some kind of refuse new connections to the pool if there is a large queue for acquiring DB connections. In case of a Spring application with spring-r2dbc, this functionality is disabled by default and all connection attempts get queued.

Nevertheless I got the same exception in my Spring application. In my case it was a more special condition but if someone else stumbles upon this: The Spring actuator health check also checks the connectivity to the database. If you have your application deployed to Kubernetes, the request to the /actuator/health endpoint may not return in time if there is a large queue in front of the DB pool. This causes the readiness-/liveness-probes of Kubernetes to fail with "Connection Timed out" making Kubernetes think the application is unhealthy (which is some kind of true). In the end, Kubernetes kills the application and all existing connections to the DB pool get terminated with the above exception.

My solution was to manually limit the load similar to what Vladlen pointed out:

val coroutineLimit = Semaphore(dbPoolSize)
workItems.forEach {
    launch {
        coroutineLimit.withPermit {
            // My DB operation
        }
    }
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Vladlen Gladis
Solution 2 TheMagican