'How apply pagination in reactive Spring Data?

In Spring Data, we have PagingAndSortingRepository which inherits from CrudRepository. In reactive Spring Data, we only have ReactiveSortingRepository which inherits from ReactiveCrudRepository. How could we make pagination in a reactive way ? Will we able to make this in future with ReactivePagingAndSortingRepository for instance?



Solution 1:[1]

Reactive Spring Data MongoDB repositories do not provide paging in the sense of paging how it's designed for imperative repositories. Imperative paging requires additional details while fetching a page. In particular:

  • The number of returned records for a paging query
  • Optionally, total count of records the query yields if the number of returned records is zero or matches the page size to calculate the overall number of pages

Both aspects do not fit to the notion of efficient, non-blocking resource usage. Waiting until all records are received (to determine the first chunk of paging details) would remove a huge part of the benefits you get by reactive data access. Additionally, executing a count query is rather expensive, and increases the lag until you're able to process data.

You can still fetch chunks of data yourself by passing a Pageable (PageRequest) to repository query methods:

interface ReactivePersonRepository extends Repository<Person, Long> {

  Flux<Person> findByFirstnameOrderByLastname(String firstname, Pageable pageable);
}

Spring Data will apply pagination to the query by translating Pageable to LIMIT and OFFSET.

References:

Solution 2:[2]

import com.thepracticaldeveloper.reactiveweb.domain.Quote;
import org.springframework.data.domain.Pageable;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;

public interface QuoteMongoReactiveRepository extends ReactiveCrudRepository<Quote, String> {

    @Query("{ id: { $exists: true }}")
    Flux<Quote> retrieveAllQuotesPaged(final Pageable page);
}

more details , you could check here

Solution 3:[3]

I created a service with this method for anyone that may still be looking for a solution:

@Resource
private UserRepository userRepository; //Extends ReactiveSortingRepository<User, String>

public Mono<Page<User>> findAllUsersPaged(Pageable pageable) {
        return this.userRepository.count()
                .flatMap(userCount -> {
                    return this.userRepository.findAll(pageable.getSort())
                            .buffer(pageable.getPageSize(),(pageable.getPageNumber() + 1))
                            .elementAt(pageable.getPageNumber(), new ArrayList<>())
                            .map(users -> new PageImpl<User>(users, pageable, userCount));
                });
    }

Solution 4:[4]

I have created another approach using @kn3l solution (without using @Query ):

fun findByIdNotNull(page: Pageable): Flux< Quote>

It creates the same query without using @Query method

Solution 5:[5]

public Mono<Page<ChatUser>> findByChannelIdPageable(String channelId, Integer page, Integer size) {
    Pageable pageable = PageRequest.of(page, size, Sort.by(Sort.Direction.DESC, "chatChannels.joinedTime"));
    Criteria criteria = new Criteria("chatChannels.chatChannelId").is(channelId);
    Query query = new Query().with(pageable);
    query.addCriteria(criteria);
    Flux<ChatUser> chatUserFlux = reactiveMongoTemplate.find(query, ChatUser.class, "chatUser");
    Mono<Long> countMono = reactiveMongoTemplate.count(Query.of(query).limit(-1).skip(-1), ChatUser.class);
    return Mono.zip(chatUserFlux.collectList(),countMono).map(tuple2 -> {
        return PageableExecutionUtils.getPage(
                tuple2.getT1(),
                pageable,
                () -> tuple2.getT2());
    });
}

Solution 6:[6]

I've faced same issue and end up having similar approach as the above but changed slightly the code as I use Query DSL, following example if someone needed.

@Repository
public interface PersonRepository extends ReactiveMongoRepository<Person, String>, ReactiveQuerydslPredicateExecutor<Person> {

    default Flux<Person> applyPagination(Flux<Person> persons, Pageable pageable) {
    return persons.buffer(pageable.getPageSize(), (pageable.getPageNumber() + 1))
        .elementAt(pageable.getPageNumber(), new ArrayList<>())
        .flatMapMany(Flux::fromIterable);
    }

}


public Flux<Person> findAll(Pageable pageable, Predicate predicate) {
    return personRepository.applyPagination(personRepository.findAll(predicate), pageable);
}

Solution 7:[7]

Searching for some idea for reactive pageable repositories I had seen solutions that will result in horrible boiler plate code so I ended up with this (did not tried yet in real life but should work fine, or maybe it can be inspiration for your solution)

So … let's create a brand new toolbox class with such method

    public static 
           <R extends PageableForReactiveMongo<S, K>, S, T, K> Mono<Page<T>>
           pageableForReactiveMongo(Pageable pageable, 
                                             R repository, Class<T> clazzTo) {
        return repository.count()
                .flatMap(c ->
                        repository.findOderByLimitedTo(pageable.getSort(),
                                              pageable.getPageNumber() + 1)
                                .buffer(pageable.getPageSize(), (pageable.getPageNumber() + 1))
                                .elementAt(pageable.getPageNumber(), new ArrayList<>())
                                .map(r -> mapToPage(pageable, c, r, clazzTo))
                );
    }

and it will need also something like that:

    private static <S, T> Page<T> mapToPage(Pageable pageable, Long userCount, Collection<S> collection, Class<T> clazzTo) {
        return new PageImpl<>(
                collection.stream()
                        .map(r -> mapper.map(r, clazzTo))
                        .collect(Collectors.toList())
                , pageable, userCount);
    }

and then we need also an abstract layer wrapping reactive repositories

public interface PageableForReactiveMongo<D, K> extends ReactiveMongoRepository<D, K> {
    Flux<D> findOderByLimitedTo(Sort sort, int i);
}

to let it instantiate by spring

@Repository
interface ControllerRepository extends PageableForReactiveMongo<ControllerDocument, String> {
}

And finally use it many many many times like that

public Mono<Page<Controller>> findAllControllers(Pageable pageable) {
    return getFromPageableForReactiveMongo(pageable, controllerRepository, Controller.class);
}

this is how your code can look like :) please tell me if it is fine, or helped out with something

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 mp911de
Solution 2 kn3l
Solution 3 Sam1am
Solution 4 CRISTIAN ROMERO MATESANZ
Solution 5 yaoandw
Solution 6 user2669657
Solution 7