'Type erasure in Swift Concurrency AsyncStream

Experimenting with swift concurrency, I would like to have a clean API for exposing an async sequence of a given element type and a throttled version of the same:

  var intStream: AsyncStream<Int> {
    AsyncStream<Int>(Int.self, bufferingPolicy: .bufferingNewest(5)) { continuation in
      Task.detached {
        for _ in 0..<100 {
          try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
          continuation.yield(Int.random(in: 1...10))
        }
        continuation.finish()
      }
    }
  }
  
  var throttledIntStream: AsyncStream<Int> {
    intStream.throttle(for: .seconds(2))
  }

But this does not work as throttle returns its own type: Error: Cannot convert return expression of type 'AsyncThrottleSequence<AsyncStream<Int>, ContinuousClock, Int>' to return type 'AsyncStream<Int>'

To get type erasure I could do

var throttledIntStream: some AsyncSequence {
  intStream.debounce(for: Duration.seconds(2))
}

but then I lose the element type information as well, which I would like to keep.

Any suggestions how to best solve that?

Edit: This is pointing to the solution I want, but I guess I will need to wait https://forums.swift.org/t/anyasyncsequence/50828/2



Solution 1:[1]

What problem to use

var throttledIntStream: AsyncThrottleSequence<AsyncStream<Int>, ContinuousClock, Int>{
  intStream.throttle(for: .seconds(2))
}

Where did you get AsyncThrottleSequence? Anyway you can extend AsyncStream with something like:

extension AsyncStream {
  typealias throttled = AsyncThrottleSequence<AsyncStream<Element>, ContinuousClock, Element>
}

And use it as:

var throttledIntStream: AsyncStream<Int>.throttled {
    intStream.throttle(for: .seconds(2))
  }

Solution 2:[2]

You could define an extension on AsyncSequence that wraps the underlying sequence in an AsyncStream/AsyncThrowingStream:

extension AsyncSequence {
    func toAsyncThrowingStream() -> AsyncThrowingStream<Element, Error> {
        var asyncIterator = self.makeAsyncIterator()
        
        return AsyncThrowingStream<Element, Error> {
            try await asyncIterator.next()
        }
    }
    
    func toAsyncStream() -> AsyncStream<Element> {
        var asyncIterator = self.makeAsyncIterator()
        return AsyncStream<Element> {
            try! await asyncIterator.next()
        }
    }
}

Which you can use like this:

var intStream: AsyncStream<Int> {
    AsyncStream<Int>(Int.self, bufferingPolicy: .bufferingNewest(5)) { continuation in
      Task.detached {
        for _ in 0..<100 {
          try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
          continuation.yield(Int.random(in: 1...10))
        }
        continuation.finish()
      }
    }
  }
  
  var throttledIntStream: AsyncStream<Int> {
    intStream.throttle(for: .seconds(2)).toAsyncStream()
  }

The big drawback of this approach is that to my knowledge there is no way of knowing if the underlying async sequence is a throwing or non-throwing one, thus one cannot implement a solution generic on the failure type.

This means that it is entirely the programmer's responsibility to chose the suitable adaptor toAsyncStream or toAsyncThrowingStream.

As you mentioned in your question, your problem will probably be solved when better opaque return types will land.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Cy-4AH
Solution 2 Louis Lac