'How does ktor websocket flow api works?

I'm using ktor for server side development with websockets.

Documentations shows us this example of using incoming channel:

for (frame in incoming.mapNotNull { it as? Frame.Text }) {
    // some
}

But mapNotNull is marked as deprecated in favor of Flow. How should I use this API and what problems could be there? For example, the Flow is a cold stream. It means that the producer function will be called on each collect. How does it work in context of websocket. Will it be reopened on second collect call, or maybe old messages will be delivered once after the next collect? How can I collect N messages, then stop collecting, then collect again?

Thanks in advance :)



Solution 1:[1]

How should I use this API and what problems could be there?

What I am using and what I have seen in one of the examples somewhere in the docs is the consumeAsFlow() method called on ReceiveChannel. Here is the entire snippet:

webSocket("/websocket") { //this: DefaultWebSocketServerSession
    incoming
        .consumeAsFlow()
        .map { receive(it) }
        .collect()
}

Haven't seen major issues with this approach. One thing you should be aware of (but that goes for the non-flow approach as well) is that if you throw inside your flow, then it will break the WebSocket connection, which is usually not something you'd like to do. It might be worth considering wrapping the entire thing in a try-catch.

Will it be reopened on second collect call, or maybe old messages will be delivered once after the next collect?

You open the websocket before you even start consuming the messages from the flow. You can see that inside webSocket() {} you are in the context of DefaultWebSocketServerSession. This is your connection management. Inside your flow you are simply receiving messages one by one as they arrive (after the connection has been established). If the connection breaks, then you're out of the flow. It needs to be re-established before you can process your messages. This establishing bit is done by the Route.webSocket() method. I do recommend taking a look at its Javadoc.

If you wish to add some clean up after the connection is closed you can add a finally block like so:

webSocket("/chat") {
    try {
        incoming
            .consumeAsFlow()
            .map { receive(it, client) }
            .collect()
    } finally {
        // cleanup
    }
}

In short: collect is called once per received message. If there is no connection (or it was broken) then collect won't be called.

How can I collect N messages, then stop collecting, then collect again?

What is the use case for this? I don't think you should be doing this with any flow. You can of course take(n) items from a flow, but you won't be able to take any more from it again.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 user3681304