'How do I check in an operator that current element is last element?

Context: To process a Flowable<Item>, I need to first process the first item and then depending on that either accumulate all items into a single item (reduce) OR apply a simple map on each item without any accumulation (map).

One way I can think of requires operator to be aware that current element is last element. Is there any such operator which is aware whether current element is last element ? I can't use buffer because then it'll always fetch 2 elements even when accumulation shouldn't be done.

AtomicReference<Item> itemRef = new AtomicReference();
itemRef.set(new Item());
Flowable<Item> accumulateOrProcessFlowable = source.
    flatMap(item -> {
        if(item.shouldBeAccumulated()) {
            //Accumulate data into reference
            itemRef.set(itemRef.get().addData(item.getData()));
            //Return empty to throw away consumed item;
            return Flowable.empty();
        } else {
            item.updateProperty();
            return Flowable.just(item);
        }
    })
    .applyIfLastElement(item -> {
        if (item.shouldBeAccumulated()) {
            return Flowable.just(itemRef.get());
        }
    })


Solution 1:[1]

Below is how you can do it (in RxJava 2.x which is very close to RxJava 3.x). The trick is to use defer (the best way to encapsulate state for a Flowable so that it can be subscribed to many times) and concatWith. defer also enables lazy evaluation in the case of last. Notice also as a performance improvement that you may not care about I used one element arrays instead of AtomicReference objects (to avoid unnecessary volatile reads, sets etc).

Flowable<Integer> result = Flowable.defer(() -> {
    boolean[] isFirst = new boolean[] { true };
    Integer[] state = new Integer[1];
    Maybe<Integer> last = Maybe.defer(() -> {
        if (state[0] == null) {
            return Maybe.empty();
        } else {
            return Maybe.just(state[0]);
        }
    });
    return source //
            .flatMap(x -> {
                if (state[0] != null || isFirst[0] && shouldBeAccumulated(x)) {
                        // accumulate
                        state[0] = state[0] == null ? 0 : state[0] + x;
                        isFirst[0] = false;
                        return Flowable.empty();
                    } else {
                        isFirst[0] = false;
                        return Flowable.just(x);
                    }
                })
            .concatWith(last);
    });

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1