'How do I check in an operator that current element is last element?
Context:
To process a Flowable<Item>
, I need to first process the first item
and then depending on that either accumulate all items into a single item (reduce
) OR apply a simple map on each item without any accumulation (map
).
One way I can think of requires operator to be aware that current element is last element. Is there any such operator which is aware whether current element is last element ? I can't use buffer because then it'll always fetch 2 elements even when accumulation shouldn't be done.
AtomicReference<Item> itemRef = new AtomicReference();
itemRef.set(new Item());
Flowable<Item> accumulateOrProcessFlowable = source.
flatMap(item -> {
if(item.shouldBeAccumulated()) {
//Accumulate data into reference
itemRef.set(itemRef.get().addData(item.getData()));
//Return empty to throw away consumed item;
return Flowable.empty();
} else {
item.updateProperty();
return Flowable.just(item);
}
})
.applyIfLastElement(item -> {
if (item.shouldBeAccumulated()) {
return Flowable.just(itemRef.get());
}
})
Solution 1:[1]
Below is how you can do it (in RxJava 2.x which is very close to RxJava 3.x). The trick is to use defer
(the best way to encapsulate state for a Flowable so that it can be subscribed to many times) and concatWith
. defer
also enables lazy evaluation in the case of last
. Notice also as a performance improvement that you may not care about I used one element arrays instead of AtomicReference objects (to avoid unnecessary volatile reads, sets etc).
Flowable<Integer> result = Flowable.defer(() -> {
boolean[] isFirst = new boolean[] { true };
Integer[] state = new Integer[1];
Maybe<Integer> last = Maybe.defer(() -> {
if (state[0] == null) {
return Maybe.empty();
} else {
return Maybe.just(state[0]);
}
});
return source //
.flatMap(x -> {
if (state[0] != null || isFirst[0] && shouldBeAccumulated(x)) {
// accumulate
state[0] = state[0] == null ? 0 : state[0] + x;
isFirst[0] = false;
return Flowable.empty();
} else {
isFirst[0] = false;
return Flowable.just(x);
}
})
.concatWith(last);
});
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |