'How to collect multiple results from concurrently working for loop?
I would like to run a function with lots of different variables
Let assume my function is :
async fn do_the_hard_job(my_input: u16) {
...
// do the hard job that takes some time
if my_condition {
println!("{}", input);
}
}
And I would like to run this function for many inputs; let's say my inputs are
inputs = stream::iter(1..=u16::MAX);
Since the function spends some cycles i would like to run all inputs concurrently as possible. So I could run this like
inputs.for_each_concurrent(0, |input| do_the_hard_job(input))
.await;
So far so good; i ran all the inputs with the function and get my output on stdout
. But what if I want the output to be written to a certain output file ?
I can not open a file and append into it in do_the_hard_job
function. That would mess things. I cannot add file as a parameter since the method will be run concurrently which one will borrow the mutable file.
I have tried returning the value instead of printing in the method and then collect returned vales ; like this :
let mut return_values:Vec<u16> = Vec::new();
inputs
.for_each_concurrent(0, |input| async move {
if let done = do_the_hard_job(port).await{
if done > 0 {
return_values.push(port);
}
}}).await;
Sadly that didn't work. What can i try to achieve my goal ?
Edit : I prepared a reproducer for the problem : https://github.com/kursatkobya/bite-sized-qualms/tree/main/concurrent-write
Solution 1:[1]
You can combine then
and collect
to get the results:
use futures::{stream, StreamExt};
use std::{
time::Duration,
};
#[tokio::main]
async fn main() {
let inputs = stream::iter(0..=10);
// Finaly the one below does not work
let output_values: Vec<u16> = inputs
.then(|input: u16| async move {
let result = f(input).await;
result
}).collect::<Vec<u16>>().await;
println!("{:?}", output_values);
}
async fn f(input: u16) -> u16 {
tokio::time::sleep(Duration::from_millis(200)).await;
input
}
Solution 2:[2]
you can use zip()
to zipping two streams together
use std::sync::Arc;
use async_stream::stream;
use futures::{Future, Stream, StreamExt};
use tokio::{sync::Mutex, time::sleep};
fn foo() -> impl Stream<Item = impl Future<Output = i32>> {
stream! {
for i in 0..10 {
yield async move {
sleep(std::time::Duration::from_millis(1000)).await;
i
}
}
}
}
#[tokio::main]
async fn main() {
let s = Box::pin(foo());
let v = Arc::new(Mutex::new(Vec::<i32>::new()));
let z = {
let _v = v.clone();
s.zip(stream! {
loop {
yield _v.clone();
}
})
};
z.for_each_concurrent(2, |(fut, v)| async move {
let num = fut.await;
println!("{}", num);
v.lock().await.push(num);
})
.await;
println!("{:?}", v);
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Netwave |
Solution 2 | rw YAN |