'Parallel processing values emitted by flow in Kotlin

Kotlin code

runBlocking {
    flow {
        for (i in 0..4) {
            println("Emit $i")
            emit(i)
        }}  .onEach { if (it%2 == 0) delay(200) // Block 1
                println("A: got $it")
            }
            .onEach { println("B: got $it") } // Block 2
            .collect()
}

print in console:

Emit 0
A: got 0
B: got 0
Emit 1
A: got 1
B: got 1
Emit 2
...

How can I run parallel process both block1 & block2 to get messages from block 2 before block 1 in half cases?



Solution 1:[1]

You could try to launch separate coroutines in those blocks:

private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())

flow {
    for (i in 0..4) {
        println("Emit $i")
        emit(i)
    }
}.onEach { // Block 1
    scope.launch {
        if (it % 2 == 0) delay(200) 
        println("A: got $it")
    }

}.onEach { // Block 2
    scope.launch { 
        println("B: got $it") 
    }
}.launchIn(scope)

In this case Block 2 will be printed before Block 1. SupervisorJob needs here to prevent cancelling launched coroutines when one of them fails.

This solution doesn't guarantee the order, e.g. there can be logs in the next order:

Emit 0
Emit 1
Emit 2
Emit 3
Emit 4

B: got 0
A: got 1
B: got 1
B: got 2
A: got 3
B: got 3
B: got 4
A: got 0
A: got 2
A: got 4

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1