'Flow message not delivered in unit test

I have a consumer that reads messages off MutableSharedFlow (which acts as an EventBus in my application). I am trying to write a unit test to show that passing a message into the Flow triggers my Listener.

This is my Flow definition:

class MessageBus {

    private val _messages = MutableSharedFlow<Message>()
    val messages = _messages.asSharedFlow()

    suspend fun send(message: Message) {
        _messages.emit(message)
    }
}

Here is the Listener:

class Listener(private val messageBus: MessageBus) {

    private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())

    init {
        scope.launch {
            messageBus.messages.collectLatest { message ->
                when (message) {
                    is CustomMessage -> handleCustomMessage(message)
                }
            }
        }
    }

And finally here is my unit test:

class CommandTest {
    @Test
    fun `should process CustomMessage`(): Unit = runBlocking {
        val messageBus = MessageBus()
        val listener = Listener(messageBus)
        messageBus.send(CustomMessage("test command"))
        //argumentCaptor...verify[removed for brevity]
    }
}

Unfortunately the above code does not trigger the break point in my Listener (breakpoint on line init is triggered, but a message is never received and no breakpoints triggered in the collectLatest block).

I even tried adding a Thread.sleep(5_000) before the verify statement but the result is the same. Am I missing something obvious with how coroutines work?

Edit: if it matters this is not an Android project. Simply Kotlin + Ktor



Solution 1:[1]

I imagine that since the code is in the init block in the Listener once you initialize val listener = Listener(messageBus, this) in the test it reads all messages and at this point you have none then in the next line you emit a message messageBus.send(CustomMessage("test command")) but your launch block should have finished by then. You can emit the message first or place your launch in an loop or in a different method that can be called after you emit the message

Solution 2:[2]

First of all I would recomend reading this article about how to test flows in Android.

Secondly in your example the issues arise from having the scope inside the Listener hardcoded. You should pass the scope as a parameter and inject it in the test:

class Listener(private val messageBus: MessageBus, private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()))

class CommandTest {
    @Test
    fun `should process CustomMessage`(): Unit = runBlockingTest {
        val messageBus = MessageBus()
        val listener = Listener(messageBus, this)
        messageBus.send(CustomMessage("test command"))
        //argumentCaptor...verify[removed for brevity]
    }
}

I would also recomend using runBlockingTest instead of runBlocking so your tests don't have to actually wait. It will also fail in case any coroutines are left running once the test finishes.

Solution 3:[3]

You could use something like this

class Emitter {
  private val emitter: MutableSharedFlow<String> = MutableSharedFlow()

  suspend fun publish(messages: Flow<String>) = messages.onEach {
      emitter.emit(it)
  }.collect()

  fun stream(): Flow<String> = emitter
}

the collect at the end of your onEach will be used to trigger the collection initially as a terminal operation... I need further understanding on emit because it does not work as I expect in all cases and when used in this way you have initially it does not post anything in your Flow unless you collect first to process

Then in your collector itself

class Collector {

suspend fun collect(emitter: Emitter): Unit = coroutineScope {
    println("Starting collection...")
    emitter.stream().collect { println("collecting message: $it") }
  }
}

then your main (or test)

fun main() = runBlocking {
  withContext(Dispatchers.Default + Job()) {
    val emitter = Emitter()
    val collector = Collector()

    launch {
        collector.collect(emitter)
    }

    emitter.publish(listOf("article#1", "article#2", "article#3", "article#4").asFlow())
  }
}

output:

Starting collection...
collecting message: article#1
collecting message: article#2
collecting message: article#3
collecting message: article#4

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Alain Plana
Solution 2 Eric Martori
Solution 3 Alain Plana