'Swift 5.5 Concurrency: how to serialize async Tasks to replace an OperationQueue with maxConcurrentOperationCount = 1?

I’m currently migrating my app to use the concurrency model in Swift. I want to serialize Tasks to make sure they are executed one after the other (no paralellism). In my use case, I want to listen to notifications posted by the NotificationCenter and execute a Task every time a new notification is posted. But I want to make sure no previous task is running. It's the equivalent of using an OperationQueue with maxConcurrentOperationCount = 1.

For example, I’m using CloudKit with Core Data in my app and I use persistent history tracking to determine what changes have occurred in the store. In this Synchronizing a Local Store to the Cloud Sample Code, Apple uses an operation queue for handling history processing tasks (in CoreDataStack). This OperationQueue has a maximum number of operations set to 1.

private lazy var historyQueue: OperationQueue = {
    let queue = OperationQueue()
    queue.maxConcurrentOperationCount = 1
    return queue
}()

When a Core Data notification is received, a new task is added to this serial operation queue. So if many notifications are received, they will all be performed one after the other one in a serial way.

@objc
func storeRemoteChange(_ notification: Notification) {
    // Process persistent history to merge changes from other coordinators.
    historyQueue.addOperation {
        self.processPersistentHistory()
    }
}

In this Loading and Displaying a Large Data Feed Sample Code, Apple uses Tasks to handle history changes (in QuakesProvider).

// Observe Core Data remote change notifications on the queue where the changes were made.
notificationToken = NotificationCenter.default.addObserver(forName: .NSPersistentStoreRemoteChange, object: nil, queue: nil) { note in
    Task {
        await self.fetchPersistentHistory()
    }
}

I feel something is wrong in the second project as Tasks could happen in any order, and not necessarily in a serial order (contrary to the first project where the OperationQueue as a maxConcurrentOperationCount = 1).

Should we use an actor somewhere to make sure the methods are serially called?

I thought about an implementation like this but I’m not yet really comfortable with that:

actor PersistenceStoreListener {
    let historyTokenManager: PersistenceHistoryTokenManager = .init()
    private let persistentContainer: NSPersistentContainer

    init(persistentContainer: NSPersistentContainer) {
        self.persistentContainer = persistentContainer
    }

    func processRemoteStoreChange() async {
        print("\(#function) called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
    }
}

where the processRemoteStoreChange method would be called by when a new notification is received (AsyncSequence):

notificationListenerTask = Task {
   let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
   
   for await _ in notifications {
        print("notificationListenerTask called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
        await self.storeListener?.processRemoteStoreChange()
    }
}


Solution 1:[1]

If you want to get the behavior of an OperationQueue with a maxConcurrentOperationCount of 1 (a ”serial” operation queue), one can achieve that with an actor.

There are two patterns that you will see with a serial OperationQueue:

  1. The operations in the queue are, themselves, synchronous.

    If you are using the standard OperationQueue (i.e., you have not subclassed Operation that does manual KVO for isFinished, etc.), a simple actor achieves what we want. An actor will prevent concurrent execution.

    The key here, though, that this only works with synchronous methods (i.e., those methods that do not have await suspension points).

  2. The operations in the queue are asynchronous.

    One of the more advanced use-cases of operation queues is to handle dependencies between tasks that are, themselves, asynchronous. This is a more complicated scenario in operation queues, requiring a custom Operation subclass in which you manually handle the KVO of isFinished, etc. (See this answer for an example of that pattern.)

    The challenge in doing this with Swift concurrency is that actors are reentrant (see reentrancy discussion in SE-0306. If the actor’s method is asynchronous (i.e., with async-await) that introduces suspension points, i.e., where an await in one call will allow another async method to run on that actor.

    To achieve serial execution between separate async methods, you have to keep a reference to the prior Task and await that. But you have to be careful, where the scheduling method (add in my example, below) is synchronous.


Consider the following (which uses OS signposts so that I can graphically illustrate the behavior in Instruments):

import os.signpost

private let pointsOfInterest = OSLog(subsystem: "log", category: .pointsOfInterest)

class ViewController: UIViewController {

    let example = Example()
    let taskSerializer = SerialTasks<Void>()

    @IBAction func didTapSync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        startSynchronous()
    }

    @IBAction func didTapAsync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        Task { try await startAsynchronous() }
    }

    @IBAction func didTapSerializedAsync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        Task { await startSerializedAsynchronous() }
    }

    func startSynchronous() {
        Task {
            await example.synchronousExample("1. synchronous")
        }
    }

    func startAsynchronous() async throws {
        try await example.asynchronousExample("2. asynchronous")
    }

    func startSerializedAsynchronous() async {
        await taskSerializer.add {
            try await self.example.asynchronousExample("3. serial async")
        }
    }
}

actor Example {
    func asynchronousExample(_ name: StaticString) async throws {
        let id = OSSignpostID(log: pointsOfInterest)
        os_signpost(.begin, log: pointsOfInterest, name: name, signpostID: id)
        defer { os_signpost(.end, log: pointsOfInterest, name: name, signpostID: id) }

        try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
    }

    func synchronousExample(_ name: StaticString) {
        let id = OSSignpostID(log: pointsOfInterest)
        os_signpost(.begin, log: pointsOfInterest, name: name, signpostID: id)
        defer { os_signpost(.end, log: pointsOfInterest, name: name, signpostID: id) }

        Thread.sleep(forTimeInterval: 2)
    }
}

actor SerialTasks<Success> {
    private var previousTask: Task<Success, Error>?

    func add(block: @Sendable @escaping () async throws -> Success) {
        previousTask = Task { [previousTask] in
            let _ = await previousTask?.result
            return try await block()
        }
    }
}

With synchronous tasks (scenario 1), startSynchronous, is the simplest. Just call the synchronous method of the actor and you get serial execution.

With asynchronous tasks (scenario 2), startAsynchronous, if you have await suspension points, you lose sequential behaviors due to actor reentrancy.

But you can refine that asynchronous task pattern (scenario 3), by having an actor, SerialTasks in the above code, that keeps track of the previous task, awaiting it before starting the next task. A subtle point is that the add method is, itself, synchronous (although the closure it takes is asynchronous). This avoids subtle races if you add multiple tasks.

Running the above in Instruments, we can graphically see the execution, with ? signposts where tasks were initiated, and intervals showing when the tasks execute:

enter image description here

In short, if your actor is performing only synchronous tasks (which is your case), then the actor yields maxConcurrentOperationCount = 1 sort of behavior automatically. If the tasks are asynchronous, you simply need to await the prior tasks before starting the next one.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1