'Parallel processing values emitted by flow in Kotlin
Kotlin code
runBlocking {
flow {
for (i in 0..4) {
println("Emit $i")
emit(i)
}} .onEach { if (it%2 == 0) delay(200) // Block 1
println("A: got $it")
}
.onEach { println("B: got $it") } // Block 2
.collect()
}
print in console:
Emit 0
A: got 0
B: got 0
Emit 1
A: got 1
B: got 1
Emit 2
...
How can I run parallel process both block1 & block2 to get messages from block 2 before block 1 in half cases?
Solution 1:[1]
You could try to launch separate coroutines in those blocks:
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
flow {
for (i in 0..4) {
println("Emit $i")
emit(i)
}
}.onEach { // Block 1
scope.launch {
if (it % 2 == 0) delay(200)
println("A: got $it")
}
}.onEach { // Block 2
scope.launch {
println("B: got $it")
}
}.launchIn(scope)
In this case Block 2
will be printed before Block 1
. SupervisorJob
needs here to prevent cancelling launched coroutines when one of them fails.
This solution doesn't guarantee the order, e.g. there can be logs in the next order:
Emit 0
Emit 1
Emit 2
Emit 3
Emit 4
B: got 0
A: got 1
B: got 1
B: got 2
A: got 3
B: got 3
B: got 4
A: got 0
A: got 2
A: got 4
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |