'r2dbc-pool connection not released after cancel

I have a strange behaviour with R2DBC Pool: We happen to create a large number of threads and send them to the R2DBC pool to acquire a DB connection. When all of the R2DBC connections from the pool are in use, the threads that we created queue for an idle connection to be available, which happens when previous used connections are released. If we cancel those threads while they are waiting for idled connections, the following behaviour happens:

  • even though they are cancelled, a few threads acquire a connection and go through their normal DB process
  • MOST IMPORTANTLY: some of the connections are acquired and never gets released, even if all threads are cancelled and none are active anymore.

Consequently, some connections do not go back to idle state. They remain acquired and block subsequent connection requests from acquiring those particular connections. The connections remain locked until we restart the service.

It is important to mention that we make a query to the database at the time we acquire the connection (we have a multi-tenant database and use SET SCHEMA at connection acquisition to select the correct tenant).

I made a program to reproduce the issue.

For testing, I use a pool with maxConnection=2. After calling the test method a few times (controller.test), some connections in the pool remain acquired indefinitely (they should have all been released either by the onCancel or by a close statements handled by Spring). This can easily be put in evidence by using jmx to monitor the pool.

I suppose that the cancellation request propagates to connectionPool.create(), but some iterations seems to have enough time to end the preQuery before receiving the cancellation, which cause the connection to be available to Spring for use. In these case, the cancellation is not seen in the TestConnectionFactory and, about 1/3 times, Spring do not call connection.close, resulting in a connection remaining acquired.

@Slf4j
@RestController
public class TestController {
    private final TestRepo1 testRepo1;

    @Autowired
    public TestController(
            TestRepo1 testRepo1
    ) {
        this.testRepo1 = testRepo1;
    }

    @GetMapping("test")
    Mono<Void> test(
    ) {
        // Will made 49 queries to the database.
        return Mono
                .when(
                        IntStream.range(0, 100)
                                .mapToObj(i -> Mono.defer(() ->
                                        i == 0 ? // the first element throw an error after 2 seconds, canceling all query not already done.
                                                Mono.just(0)
                                                        .delayElement(Duration.ofMillis(2000))
                                                        .doOnNext(x -> log.info("{} -> throw", x))
                                                        .then(Mono.error(new Exception("FAIL"))) :
                                                testRepo1.query(String.valueOf(i)))
                                )
                                .collect(Collectors.toList())
                )
                .then()
                .onErrorResume(e -> Mono.empty()); // avoid propagating error to http response.
    }
}
@Slf4j
public class TestConnectionFactory implements ConnectionFactory {
    private final ConnectionPool connectionPool;

    TestConnectionFactory(ConnectionPool connectionPool) {
        this.connectionPool = connectionPool;
    }

    @Override
    public Publisher<? extends Connection> create() {
       return createTenantConnection()
                .doOnNext(x -> log.info("creation transaction done"))
                .doOnCancel(() -> log.info("cancel while creation"));
    }

    private Mono<Connection> createTenantConnection() {
        return connectionPool.create()
                .flatMap(connection -> preQuery(connection));
    }

    private Mono<Connection> preQuery(Connection connection) {
        return Mono.from(connection
                .createStatement("SELECT 1;") // enough to produce the error, in our real code, this is a SET SCHEMA XXX
                .execute())
                .doOnCancel(() -> log.info("cancel during preQuery"))
                .thenReturn(connection);
    }

    @Override
    public ConnectionFactoryMetadata getMetadata() {
        return connectionPool.getMetadata();
    }
}
@Configuration
public class MyConfiguration {
    @Bean
    @Scope("singleton")
    ConnectionFactory connectionFactory(
            ConnectionPool connectionPool
    ) {
        return new TestConnectionFactory(connectionPool);
    }
}
@Slf4j
@Repository
public class TestRepo1 {
    // simple query waiting 1 second
    private static final String QUERY = "SELECT pg_sleep(1);";

    private final DatabaseClient databaseClient;

    @Autowired
    public TestRepo1(DatabaseClient databaseClient) {
        this.databaseClient = databaseClient;
    }

    public Mono<Void> query(String msg) {
        log.info("start query {}", msg);
        return databaseClient.execute(QUERY)
                .map(row -> "result")
                .first()
                .doOnCancel(() -> log.info("cancel query {}", msg))
                .doOnNext(x -> log.info("query {} result", msg))
                .then()
                .doOnTerminate(() -> log.info("terminate {}", msg));
    }
}

We use org.springframework.boot 2.3.5.RELEASE with io.r2dbc:r2dbc-postgresql and io.r2dbc:r2dbc-pool.

We tried to upgrade to io.r2dbc:r2dbc-postgresql 0.8.8.RELEASE and io.r2dbc:r2dbc-pool 0.9.0.M1 but the result remain the same.



Solution 1:[1]

As explained in this article about using jOOQ with R2DBC, a good way to manage resources with R2DBC is to use Flux.usingWhen(), e.g.

Flux.usingWhen(
        pool.create(),
        c -> c.createStatement("SELECT col FROM my_table").execute(),
        c -> c.close()
    )
    .flatMap(it -> it.map((r, m) -> r.get(0, String.class)))
    .doOnNext(System.out::println)
    .subscribe();

This was also recommended on the mailing list:

And will hopefully be documented on the r2dbc.io website in the future:

Solution 2:[2]

If you use spring for managing r2dbc connections you should also use spring transaction manager that properly handles all related resources (connections, pools, etc) at least in theory (in practice in production not so often - less than once in several months I still see cases when resources leak - I think it happens because of the issue: https://github.com/r2dbc/r2dbc-pool/issues/140 - I should update the r2dbc-pool version). The same issue can be the reason of your problem. Nevertheless you should properly close resources. Otherwise connections will leak. I know only 2 options here:

  1. use Flux.usingWhen - as already proposed
  2. use Spring Transaction Manager

To use Spring Transaction Manager you should create it. The easiest and most convenient way to do it is to use the corresponding spring boot auto-configuration: see class R2dbcTransactionManagerAutoConfiguration from org.springframework.boot:spring-boot-autoconfigure library (you can also use R2dbcAutoConfiguration for automatic creating connection factory based on configuration yaml file instead of manual bean creation).

Once you create transaction manager, you should create service that will encapsulates your repository and wrap required logic to the transaction context. For example:

class Service {
  private final Repository repo;
  private final Repository2 repo2;
   ...
  @Transactional // R2DBC
  public Mono<Result> process(…) {
    return repo.save(e).zipWith(
      repo2.save(e2), (t, t2) -> t2);  
  }
}

You can use annotate whole class or particular methods if you want.

In you example you can just annotate your repository, but I do not recommend it, because repositories represent simple operations in database for particular tables. So repositories don't know about transaction context, because in one transaction can be several different calls to several different repositories.

You can also see my tech-talk that I prepared last year: https://www.youtube.com/watch?v=_1QPCoCsCTY&t=3s

Now I am preparing the update for the topic.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Lukas Eder
Solution 2 Anton Kotov