'Type erasure in Swift Concurrency AsyncStream
Experimenting with swift concurrency, I would like to have a clean API for exposing an async sequence of a given element type and a throttled version of the same:
var intStream: AsyncStream<Int> {
AsyncStream<Int>(Int.self, bufferingPolicy: .bufferingNewest(5)) { continuation in
Task.detached {
for _ in 0..<100 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(Int.random(in: 1...10))
}
continuation.finish()
}
}
}
var throttledIntStream: AsyncStream<Int> {
intStream.throttle(for: .seconds(2))
}
But this does not work as throttle returns its own type:
Error:
Cannot convert return expression of type 'AsyncThrottleSequence<AsyncStream<Int>, ContinuousClock, Int>' to return type 'AsyncStream<Int>'
To get type erasure I could do
var throttledIntStream: some AsyncSequence {
intStream.debounce(for: Duration.seconds(2))
}
but then I lose the element type information as well, which I would like to keep.
Any suggestions how to best solve that?
Edit: This is pointing to the solution I want, but I guess I will need to wait https://forums.swift.org/t/anyasyncsequence/50828/2
Solution 1:[1]
What problem to use
var throttledIntStream: AsyncThrottleSequence<AsyncStream<Int>, ContinuousClock, Int>{
intStream.throttle(for: .seconds(2))
}
Where did you get AsyncThrottleSequence
?
Anyway you can extend AsyncStream with something like:
extension AsyncStream {
typealias throttled = AsyncThrottleSequence<AsyncStream<Element>, ContinuousClock, Element>
}
And use it as:
var throttledIntStream: AsyncStream<Int>.throttled {
intStream.throttle(for: .seconds(2))
}
Solution 2:[2]
You could define an extension on AsyncSequence
that wraps the underlying sequence in an AsyncStream
/AsyncThrowingStream
:
extension AsyncSequence {
func toAsyncThrowingStream() -> AsyncThrowingStream<Element, Error> {
var asyncIterator = self.makeAsyncIterator()
return AsyncThrowingStream<Element, Error> {
try await asyncIterator.next()
}
}
func toAsyncStream() -> AsyncStream<Element> {
var asyncIterator = self.makeAsyncIterator()
return AsyncStream<Element> {
try! await asyncIterator.next()
}
}
}
Which you can use like this:
var intStream: AsyncStream<Int> {
AsyncStream<Int>(Int.self, bufferingPolicy: .bufferingNewest(5)) { continuation in
Task.detached {
for _ in 0..<100 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(Int.random(in: 1...10))
}
continuation.finish()
}
}
}
var throttledIntStream: AsyncStream<Int> {
intStream.throttle(for: .seconds(2)).toAsyncStream()
}
The big drawback of this approach is that to my knowledge there is no way of knowing if the underlying async sequence is a throwing or non-throwing one, thus one cannot implement a solution generic on the failure type.
This means that it is entirely the programmer's responsibility to chose the suitable adaptor toAsyncStream
or toAsyncThrowingStream
.
As you mentioned in your question, your problem will probably be solved when better opaque return types will land.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Cy-4AH |
Solution 2 | Louis Lac |