'How to elegantly perform multiple effects in parallel with ZIO
I know that I can use
import zio.Task
def zip3Par[A, B, C](a: Task[A], b: Task[B], c: Task[C]): Task[(A, B, C)] =
a.zipPar(b).zipWithPar(c) { case ((a, b), c) => (a, b, c) }
def zip4Par[A, B, C, D](a: Task[A], b: Task[B], c: Task[C], d: Task[D]): Task[(A, B, C, D)] =
zip3Par(a, b, c).zipWithPar(d) { case ((a, b, c), d) => (a, b, c, d) }
to execute 3 or 4 tasks in parallel, but I wounder if there is a more elegant solution?
Solution 1:[1]
You could just use ZIO.collectAllPar
with list of tasks:
def collectTasks(tasks: Task[Int]*):Task[List[Int]] = ZIO.collectAllPar(tasks)
Then you could use it like:
val t1 = Task.effect{
Thread.sleep(100)
println("t1 started")
Thread.sleep(1000)
1
}
val t2 = Task.effect{
println("t2 started")
Thread.sleep(1000)
2
}
val t3 = Task.effect{
println("t3 started")
Thread.sleep(1000)
3
}
(new DefaultRuntime() {}).unsafeRun(collectTasks(t1,t2,t3))
and it will run all your tasks concurrently.
A generic solution using tuples instead of a list would be hard to achieve in Scala 2 without shapeless. It would change in Scala 3, because then they could be handled as heterogenous lists.
Solution 2:[2]
Also note that there is the <&>
combinator. This is an alias for zipPar
. This will yield a tuple, and if you use for comprehensions I would suggest to have a look at better-monadic-for
which fixes the issues with tuples in for comprehensions
Here's an example of using the <&>
combinator with map:
(t1 <&> t2 <&> t3 <&> t4) map {
case i1 <*> i2 <*> i3 <*> i4 => s"$i1, $i2, $i3, $i4"
}
ZIO.collectAllPar
and ZIO.collectAllParN
only work when all the ZIO
have the same return type. That was not the question.
Solution 3:[3]
Adding to Krzysztof At?asik's answer, there is also collectAllParN which works like collectAllPAr but lets you specify the maximum number of fibers to use:
val a = Task {
println("t1 started")
Thread.sleep(2000)
println("t1 finished")
1
}
val b = Task {
println("t2 started")
Thread.sleep(1000)
println("t2 finished")
2
}
val c = Task {
println("t3 started")
Thread.sleep(3000)
println("t3 finished")
3
}
val d = Task {
println("t4 started")
Thread.sleep(1000)
println("t4 finished")
4
}
And you can run it so:
Task.collectAllParN(4)(List(a, b, c, d))
This is specially useful if you have many (hundreds or thousands) parallel tasks so you can avoid overflow and memory errors. Go ahead and change the number of fibers to use to 2 or 3 and see for yourself how execution changes.
Another option for parallel execution is to put tasks on a ZQueue and fork them once your consumer(s) receive them.
Solution 4:[4]
Since ZIO 2.x you can use foreachPar
that has ability to control parallelism with withParallelism
. A simple example might look something like this
ZIO.foreachPar(urls)(download).withParallelism(8)
The withParallelismUnbounded
method can be used when we want to run a parallel effect with an unbounded maximum number of fibers:
ZIO.foreachPar(urls)(download).withParallelismUnbounded
All parallelism operators ending in N, such as foreachParN
and collectAllParN
, have now been deprecated in 2.x.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Krzysztof Atłasik |
Solution 2 | Fredrik Skogberg |
Solution 3 | |
Solution 4 | Oto Brglez |