'Why can I not access class variables from onComplete in Flow API Subscriber
Using the Java Flow API. I have a chain of Processor
s and a terminal Subscriber
.
I have coded up Subscriber
s that have kept a count of items received that work fine with a SubmissionPublisher
.
However, when I pass values on from a Processor
(which extends SubmissionPublisher
) to the Subscriber
below, no output is printed to the console.
If I remove minValue.get()
the text is printed to the console.
Why does the calling class variable cause onComplete
not to execute as expected?
public class MinValue implements Subscriber<Integer> {
private Flow.Subscription subscription;
private AtomicInteger minValue=new AtomicInteger(Integer.MAX_VALUE);
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
if(minValue.get()>item){
System.out.println("Min Value : "+item);
minValue.set(item);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error + throwable.getMessage() \nMin Value is : "+minValue.get());
}
@Override
public void onComplete() {
System.out.println("Successful Completion - Min Value is : "+minValue.get());
}
}
Edit - adding Minimal Reproduceable Example
public class NewClass {
public static void main(String args[]) {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
Subscriber<Integer> subscriber = new MinValue();
publisher.subscribe(subscriber);
publisher.submit(10);
publisher.submit(11);
publisher.submit(9);
publisher.submit(12);
publisher.submit(8);
publisher.close();
}
}
Solution 1:[1]
The issue was that the main thread was exiting before the subscribers onComplete() was called.
If I add a sleep after calling publisher.close() the subscriber.onComplete() gets time to execute on its own thread.
Solution 2:[2]
minValue.get() returns an int
while item is an Integer
object. Maybe that's the problem.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | JNeff |
Solution 2 | Gerd |