'How to elegantly perform multiple effects in parallel with ZIO

I know that I can use

import zio.Task

def zip3Par[A, B, C](a: Task[A], b: Task[B], c: Task[C]): Task[(A, B, C)] =
  a.zipPar(b).zipWithPar(c) { case ((a, b), c) => (a, b, c) }

def zip4Par[A, B, C, D](a: Task[A], b: Task[B], c: Task[C], d: Task[D]): Task[(A, B, C, D)] =
  zip3Par(a, b, c).zipWithPar(d) { case ((a, b, c), d) => (a, b, c, d) }

to execute 3 or 4 tasks in parallel, but I wounder if there is a more elegant solution?



Solution 1:[1]

You could just use ZIO.collectAllPar with list of tasks:

def collectTasks(tasks: Task[Int]*):Task[List[Int]] = ZIO.collectAllPar(tasks)

Then you could use it like:

val t1 = Task.effect{
  Thread.sleep(100)
  println("t1 started")
  Thread.sleep(1000)
  1
}

val t2 = Task.effect{
  println("t2 started")
  Thread.sleep(1000)
  2
}


val t3 = Task.effect{
  println("t3 started")
  Thread.sleep(1000)
  3
}

(new DefaultRuntime() {}).unsafeRun(collectTasks(t1,t2,t3))

and it will run all your tasks concurrently.

A generic solution using tuples instead of a list would be hard to achieve in Scala 2 without shapeless. It would change in Scala 3, because then they could be handled as heterogenous lists.

Solution 2:[2]

Also note that there is the <&> combinator. This is an alias for zipPar. This will yield a tuple, and if you use for comprehensions I would suggest to have a look at better-monadic-for which fixes the issues with tuples in for comprehensions

Here's an example of using the <&> combinator with map:

(t1 <&> t2 <&> t3 <&> t4) map { case i1 <*> i2 <*> i3 <*> i4 => s"$i1, $i2, $i3, $i4" }

ZIO.collectAllPar and ZIO.collectAllParN only work when all the ZIO have the same return type. That was not the question.

Solution 3:[3]

Adding to Krzysztof At?asik's answer, there is also collectAllParN which works like collectAllPAr but lets you specify the maximum number of fibers to use:

 val a = Task {
      println("t1 started")
      Thread.sleep(2000)
      println("t1 finished")
      1
    }
    val b = Task {
      println("t2 started")
      Thread.sleep(1000)
      println("t2 finished")
      2
    }
    val c = Task {
      println("t3 started")
      Thread.sleep(3000)
      println("t3 finished")
      3
    }
    val d = Task {
      println("t4 started")
      Thread.sleep(1000)
      println("t4 finished")
      4
    }

And you can run it so:

 Task.collectAllParN(4)(List(a, b, c, d))

This is specially useful if you have many (hundreds or thousands) parallel tasks so you can avoid overflow and memory errors. Go ahead and change the number of fibers to use to 2 or 3 and see for yourself how execution changes.

Another option for parallel execution is to put tasks on a ZQueue and fork them once your consumer(s) receive them.

Solution 4:[4]

Since ZIO 2.x you can use foreachPar that has ability to control parallelism with withParallelism. A simple example might look something like this

ZIO.foreachPar(urls)(download).withParallelism(8)

The withParallelismUnbounded method can be used when we want to run a parallel effect with an unbounded maximum number of fibers:

ZIO.foreachPar(urls)(download).withParallelismUnbounded

All parallelism operators ending in N, such as foreachParN and collectAllParN, have now been deprecated in 2.x.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Krzysztof Atłasik
Solution 2 Fredrik Skogberg
Solution 3
Solution 4 Oto Brglez