'Possible to insert values into stream asynchronously in dart?

I have a function which downloads the RSS Feed and parses it. After that, I iterate through each team and do some processing on it. For now, this processing is asynchronous and whenever it completes I yield it but that means each item will be processed linearly. Is there a way to process all the items once and yield them as they are processed?

High level code:

Stream<String> processRSSFeedItems() async* {
    RSSFeed rssFeed = RSSFeed("some URL");
    for(int itemIndex = 0; itemIndex < rssFeed.items.lengthl itemIndex++){
         String processedItem = await processRssItem(rssFeed.items[itemIndex]);
         yield processedItem;
    }
}

void refreshRssFeed(){
    var stream = processRSSFeedItems();
    stream.listen((item) => setState(() => something(item));
}

Is there any way to call processRssItem() for all items without waiting for the previous process to complete and yield them as they arrive?



Solution 1:[1]

You can add the elements to a StreamController as they arrive, and return the stream.

It is considered best practice that streams should wait for subscribers before starting their work. This is achieved by using the onListen callback of the StreamController.

import 'dart:async';
...

Stream<String> processRSSFeedItems() {
  late StreamController<String> controller;

  void startProcessRssFeedItems() {
    RSSFeed rssFeed = RSSFeed("some URL");
    int workers = rssFeed.items.length;
    for (int itemIndex = 0; itemIndex < rssFeed.items.length; itemIndex++) {
      processRssItem(rssFeed.items[itemIndex]).then((processedItem) {
        controller.add(processedItem);
        if (--workers == 0) {
          controller.close();
        }
      });
    }
  }

  controller = StreamController<String>(
    onListen: startProcessRssFeedItems,
  );

  return controller.stream;
}

Solution 2:[2]

You can do

List<Future<String>> futures = <Future<String>>[];
for(int itemIndex = 0; itemIndex < rssFeed.items.length; itemIndex++){
     futures.add(processRssItem(rssFeed.items[itemIndex]));
}

// Waits for multiple futures to complete and collects their results.
final result = await Future.wait<String>(futures);

for(var i = 0 ; i < result.length; i++) {
    yield result[i];
}

Solution 3:[3]

From your question:

"For now, this processing is asynchronous and whenever it completes I yield it but that means each item will be processed linearly. Is there a way to process all the items once and yield them as they are processed?"

"yield" in this context is equivalent to "return". So Stream will not fit your use-case because Streams are designed to be listened for values iteratively, not for "all the items once". For "all the items once" you should use Future and return the value, not yield. Using while loop instead of using for loop on List<Future> will process them one-by-one:

    Future getAllWhenProcessed()async{
      var items = rssFeed.items;
      var count = items.length;
      var results = [];
      while(count > 0){
         results.add(await processRssItem(items[--count]));
       }
      return results;
    }

Then just

getAllWhenProcessed().then((value){ updateUi(); });

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Rahul
Solution 3