'How to have a publisher emit only to the last subscriber in Combine

Is there a way to have the publisher emit a value only to the latest subscriber/observer?

An example for that would be; a manager class that can be subscribed to by multiple observers. When an event occurs, I would like only the latest subscriber to be observed. As far as I know, there is no way for the publisher to keep track of its subscribers but my knowledge regarding Combine and reactive programming is limited so I am unsure if this is possible in the first place.



Solution 1:[1]

You are right. Unfortunately, there is no way to list/track subscribers of a publisher. To solve your problem, you have to implement a custom publisher. There are two possibilities here. Either you implement a custom publisher with the Publisher protocol, but Apple advises against this (see here), or you create a custom publisher with already existing types, as Apple recommends. I have prepared an example for the second option.

The logic is very simple. We create a publisher with a PassthroughSubject inside (it can also be a CurrentValueSubject). Then we implement the methods typical of a PassthroughSubject and use them to overwrite the same methods of the PassthroughSubject, which is inside our class. In the sink method we store all returning subscriptions BUT before we add a new subscription to the Set, we go through all the already cached subscriptions and cancel them. This way we achieve the goal that only the last subscription works.


// The subscriptions will be cached in the publisher.
// To avoid strong references, I use the WeakBox recommendation from the Swift forum.
struct WeakBox<T: AnyObject & Hashable>: Hashable {
    weak var item: T?

    func hash(into hasher: inout Hasher) {
        hasher.combine(item)
    }
}

class MyPublisher<T, E: Error> {
    private let subject = PassthroughSubject<T, E>()
    private var subscriptions = Set<WeakBox<AnyCancellable>>()
    
    deinit {
        subscriptions.removeAll()
    }
    
    public func send(_ input: T) {
        subject.send(input)
    }
    
    public func send(completion: Subscribers.Completion<E>) {
        subject.send(completion: completion)
    }
    
    public func sink(receiveCompletion receivedCompletion: @escaping (Subscribers.Completion<E>) -> Void, receiveValue receivedValue: @escaping (T) -> Void) -> AnyCancellable {
        let subscription = subject
            .sink(receiveCompletion: { completion in
                receivedCompletion(completion)
            }, receiveValue: { value in
                receivedValue(value)
            })
        
        // Cancel previous subscriptions.
        subscriptions.forEach { $0.item?.cancel() }
        
        // Add new subscription.
        subscriptions.insert(WeakBox(item: subscription))
        
        return subscription
    }
}

I tested the class in Playground as follows.

let publisher = MyPublisher<Int, Never>()

let firstSubscription = publisher
    .sink(receiveCompletion: { completion in
        print("1st subscription completion \(completion)")
    }, receiveValue: { value in
        print("1st subscription value \(value)")
    })

let secondSubscription = publisher
    .sink(receiveCompletion: { completion in
        print("2st subscription completion \(completion)")
    }, receiveValue: { value in
        print("2st subscription value \(value)")
    })

let thirdSubscription = publisher
    .sink(receiveCompletion: { completion in
        print("3st subscription completion \(completion)")
    }, receiveValue: { value in
        print("3st subscription value \(value)")
    })

publisher.send(123)

Console output:

3st subscription value 123

If you comment out the line subscriptions.forEach { $0.cancel() }, then you get:

3st subscription value 123
1st subscription value 123
2st subscription value 123

Hopefully I could help you.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1