'Possible to insert values into stream asynchronously in dart?
I have a function which downloads the RSS Feed and parses it. After that, I iterate through each team and do some processing on it. For now, this processing is asynchronous and whenever it completes I yield it but that means each item will be processed linearly. Is there a way to process all the items once and yield them as they are processed?
High level code:
Stream<String> processRSSFeedItems() async* {
RSSFeed rssFeed = RSSFeed("some URL");
for(int itemIndex = 0; itemIndex < rssFeed.items.lengthl itemIndex++){
String processedItem = await processRssItem(rssFeed.items[itemIndex]);
yield processedItem;
}
}
void refreshRssFeed(){
var stream = processRSSFeedItems();
stream.listen((item) => setState(() => something(item));
}
Is there any way to call processRssItem() for all items without waiting for the previous process to complete and yield them as they arrive?
Solution 1:[1]
You can add the elements to a StreamController as they arrive, and return the stream.
It is considered best practice that streams should wait for subscribers before starting their work. This is achieved by using the onListen
callback of the StreamController
.
import 'dart:async';
...
Stream<String> processRSSFeedItems() {
late StreamController<String> controller;
void startProcessRssFeedItems() {
RSSFeed rssFeed = RSSFeed("some URL");
int workers = rssFeed.items.length;
for (int itemIndex = 0; itemIndex < rssFeed.items.length; itemIndex++) {
processRssItem(rssFeed.items[itemIndex]).then((processedItem) {
controller.add(processedItem);
if (--workers == 0) {
controller.close();
}
});
}
}
controller = StreamController<String>(
onListen: startProcessRssFeedItems,
);
return controller.stream;
}
Solution 2:[2]
You can do
List<Future<String>> futures = <Future<String>>[];
for(int itemIndex = 0; itemIndex < rssFeed.items.length; itemIndex++){
futures.add(processRssItem(rssFeed.items[itemIndex]));
}
// Waits for multiple futures to complete and collects their results.
final result = await Future.wait<String>(futures);
for(var i = 0 ; i < result.length; i++) {
yield result[i];
}
Solution 3:[3]
From your question:
"For now, this processing is asynchronous and whenever it completes I yield it but that means each item will be processed linearly. Is there a way to process all the items once and yield them as they are processed?"
"yield" in this context is equivalent to "return". So Stream will not fit your use-case because Streams are designed to be listened for values iteratively, not for "all the items once". For "all the items once" you should use Future
and return
the value, not yield
. Using while
loop instead of using for
loop on List<Future>
will process them one-by-one:
Future getAllWhenProcessed()async{
var items = rssFeed.items;
var count = items.length;
var results = [];
while(count > 0){
results.add(await processRssItem(items[--count]));
}
return results;
}
Then just
getAllWhenProcessed().then((value){ updateUi(); });
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | Rahul |
Solution 3 |