'How to collect multiple results from concurrently working for loop?

I would like to run a function with lots of different variables

Let assume my function is :

async fn do_the_hard_job(my_input: u16) {
    ...
    // do the hard job that takes some time
    if my_condition {
        println!("{}", input);
    }
}

And I would like to run this function for many inputs; let's say my inputs are

inputs = stream::iter(1..=u16::MAX);

Since the function spends some cycles i would like to run all inputs concurrently as possible. So I could run this like

 inputs.for_each_concurrent(0, |input| do_the_hard_job(input))
       .await;

So far so good; i ran all the inputs with the function and get my output on stdout. But what if I want the output to be written to a certain output file ?

I can not open a file and append into it in do_the_hard_job function. That would mess things. I cannot add file as a parameter since the method will be run concurrently which one will borrow the mutable file.

I have tried returning the value instead of printing in the method and then collect returned vales ; like this :

let mut return_values:Vec<u16> = Vec::new();
inputs
    .for_each_concurrent(0, |input| async move {
        if let done = do_the_hard_job(port).await{
            if done > 0 {
                return_values.push(port);
            }
        }}).await;

Sadly that didn't work. What can i try to achieve my goal ?

Edit : I prepared a reproducer for the problem : https://github.com/kursatkobya/bite-sized-qualms/tree/main/concurrent-write



Solution 1:[1]

You can combine then and collect to get the results:

use futures::{stream, StreamExt};
use std::{
    time::Duration,
};

#[tokio::main]
async fn main() {
    let inputs = stream::iter(0..=10);

    // Finaly the one below does not work
    let output_values: Vec<u16> = inputs
        .then(|input: u16| async move {
            let result = f(input).await;
            result
        }).collect::<Vec<u16>>().await;
    println!("{:?}", output_values);

}

async fn f(input: u16) -> u16 {
    tokio::time::sleep(Duration::from_millis(200)).await;
    input
}

Playground

Solution 2:[2]

you can use zip() to zipping two streams together

use std::sync::Arc;
use async_stream::stream;
use futures::{Future, Stream, StreamExt};
use tokio::{sync::Mutex, time::sleep};
fn foo() -> impl Stream<Item = impl Future<Output = i32>> {
    stream! {
        for i in 0..10 {
            yield async move {
                sleep(std::time::Duration::from_millis(1000)).await;
                i
            }
        }
    }
}
#[tokio::main]
async fn main() {
    let s = Box::pin(foo());
    let v = Arc::new(Mutex::new(Vec::<i32>::new()));
    let z = {
        let _v = v.clone();
        s.zip(stream! {
            loop {
                yield _v.clone();
            }
        })
    };

    z.for_each_concurrent(2, |(fut, v)| async move {
        let num = fut.await;
        println!("{}", num);
        v.lock().await.push(num);
    })
    .await;
    println!("{:?}", v);
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Netwave
Solution 2 rw YAN