Hallo allemaal, Leo hier. Today we will discuss which thread runs my sink closure using Combine and Swift.

Last week we started to dive more into Apple’s Combine framework and a very important question was raised. When you use the reactive programming paradigm you probably will end with the pub-sub paradigm, implying that you will have an object that will publish some data and another object that will receive that data.

As you can imagine a lot of things are intrinsically reactive on mobile development. For example, when you open your app all the UI elements are waiting for user input to *react* to that action, or even when the user request something to the server the app is waiting for the completion of the request to *react* to the new data and show the info.

Those examples occur all along with mobile development and we should be aware of how important is to master reactive programming way. Today we explore the question in the title and see what we can discover in this wonderful land of threading and Combine in Swift.

Let’s code, but first…

 

Painting of The Day

This painting is a 1534 masterpiece called The Annunciation painted by Lorenzo Lotto.

Lorenzo Lotto (c. 1480 – 1556/57) was an Italian painter, draughtsman, and illustrator, traditionally placed in the Venetian school, though much of his career was spent in other North Italian cities. While he was active during the High Renaissance, his nervous and eccentric posings and distortions represent a transitional stage to the first Florentine and Roman Mannerists of the 16th century.

I chose this painting because you never know when an angel will publish a message, right?

 

The Problem – Which thread runs my sink closure?

We want to know on what thread my sink closure on Combine Framework will run

First of all, let’s set up the environment to run our tests. Create a new project and copy-paste the code below to your ViewController:

class ViewController: UIViewController {
    
    private let viewModel = LoginViewModel()
    private var cancellables = Set<AnyCancellable>()
    
    override func viewDidLoad() {
        super.viewDidLoad()
        view.backgroundColor = .red
        
        print("The SINKED in the thread : \(Thread.current)")
        
        viewModel
            .userPublisher
            .sink { user in
                print("The user is: \(user) on thread: \(Thread.current)")
            }.store(in: &cancellables)
        
        viewModel.fetchUser()
        
        print("Finished viewDidLoad")
    }
}

This example will have a lot of prints to show us what threads are being used. This code is getting the publisher from the view model and sinking whichever data it sends. But if this keeps listening to the publisher, on which thread does it sink after all?

Let’s build our LoginViewModel to start our tests:

struct LoginViewModel {
    
    private let userSubject = PassthroughSubject<User,Never>()
    
    var userPublisher: AnyPublisher<User,Never> {
        userSubject.eraseToAnyPublisher()
    }
    
    func fetchUser() {
        print("The SEND thread is: \(Thread.current)")
        userSubject.send(User(name: "Pepijn", age: 26))
    }
}

struct User {
    let name: String
    let age: Int
}

The result is:

Which thread runs my sink closure image result example 1

Well… Looks like if you don’t specify on which thread you will receive the sink, it uses the thread the sink was created as default. But is it?

What happens if the LoginViewModel sends the User model through a background queue? Let’s see:

DispatchQueue.global().async {
    print("The SEND thread: \(Thread.current)")
    userSubject.send(User(name: "Alan", age: 26))
}

You can check below the result:

Which thread runs my sink closure image result example 2

As you can see, if you send the User in a background queue the PassthroughSubject sends and the sink closure will run in that exact queue. In other words, if you have any UI thing occurring on that sink, you can be sure that you will have problems, my friend.

The first learning here is: By default, the sink closure will run on the same thread that the data is created.

Let’s solve that problem and continue to explore the Publisher.

 

Exploring the Receive(on::) function – Which thread runs my sink closure?

Apple has a solution for that problem, which is the receive function operator. This function specifies the Scheduler on which to receive elements from the publisher.

But wait, what is a Scheduler? TheScheduler is a protocol that defines when and how to execute a closure. It is sort of new because it’s only for iOS 13 or higher. You can create your Schedulers just by conforming to that protocol.

Another great news is that is pretty convenient that DispatchQueue conforms to the Schedule protocol, do you know where we are going with this right? This enables receiving things on the main thread and the background thread easily! Let’s try both.

The next example we’ll send from a background queue but now we will have control over which thread we will be sinking.

 

Receive on the main thread

class ViewController: UIViewController {
    
    private let viewModel = LoginViewModel()
    private var cancellables = Set<AnyCancellable>()
    
    override func viewDidLoad() {
        super.viewDidLoad()
        view.backgroundColor = .red
        
        print("The PUBLISHER thread: \(Thread.current)")
        
        viewModel
            .userPublisher
            .receive(on: DispatchQueue.main) // added this to the sink configuration
            .sink { user in
                print("The SINK thread: \(Thread.current) with user: \(user)")
            }.store(in: &cancellables)
        
        viewModel.fetchUser()
        
        print("Finished viewDidLoad")
    }
}

struct LoginViewModel {
    
    private let userSubject = PassthroughSubject<User,Never>()
    
    var userPublisher: AnyPublisher<User,Never> {
        userSubject.eraseToAnyPublisher()
    }
    
    func fetchUser() {
        DispatchQueue.global().async {
            print("The SEND thread: \(Thread.current)")
            userSubject.send(User(name: "Mike", age: 26))
        }
    }
}

Resulting in:

Which thread runs my sink closure image result example 3

Now we are receiving our data from the publisher in the main thread.

But we can do another way around, imagine that the LoginViewModel sends in the main thread but we want to receive in a background queue.

 

Receive on a background thread

class ViewController: UIViewController {
    private let viewModel = LoginViewModel()
    private var cancellables = Set<AnyCancellable>()
    
    override func viewDidLoad() {
        super.viewDidLoad()
        view.backgroundColor = .red
        
        print("The PUBLISHER thread: \(Thread.current)")
        
        viewModel
            .userPublisher
            .receive(on: DispatchQueue.global())
            .sink { user in
                print("The SINK thread: \(Thread.current) with user: \(user)")
            }.store(in: &cancellables)
        
        viewModel.fetchUser()
        
        print("Finished viewDidLoad")
    }
}


struct LoginViewModel {
    
    private let userSubject = PassthroughSubject<User,Never>()
    
    var userPublisher: AnyPublisher<User,Never> {
        userSubject.eraseToAnyPublisher()
    }
    
    func fetchUser() {
        
        print("The SEND thread: \(Thread.current)")
        userSubject.send(User(name: "Mike", age: 32))
    }
}

Resulting in:

Which thread runs my sink closure image result example 4

Now we learned! If the thread of the sink closure is relevant for the sinking process you should specify on which scheduler it will run.

A curiosity about the receive function and schedulers that you can play with it:

viewModel
    .userPublisher
    .map({ _ in
        print("The map thread: \(Thread.current)")
    })
    .receive(on: DispatchQueue.global())
    .map({ _ in
        print("The map thread: \(Thread.current)")
    })
    .receive(on: DispatchQueue.main)
    .sink { user in
        print("The SINK thread: \(Thread.current) with user: \(user)")
    }.store(in: &cancellables)

Generating this:

result of a lot of receive in combine framework example

This happens because all the operators after the receive function will run in the same Scheduler that was set in that function. In other words, the receive function defines on which Scheduler all the operators after it will run.

This way our little play is a good example where: we are sending in the main thread, then the first map receives it in the main thread, after that the second map receives in a background thread and we finally sink it in the main thread again. Crazy right?

 

Testing the Subscribe function with Custom Publishers

Furthermore, we also have control over how we subscribe to publishers using the `subscribe(on::)` function. In our example, that power is not relevant, but imagine that you create a custom publisher that the subscription process is somewhat slow:

struct LongTaskPublisher: Publisher {
    typealias Output = User
    typealias Failure = Never
    
    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        subscriber.receive(subscription: Subscriptions.empty)
        _ = subscriber.receive(executeForLongTime())
        subscriber.receive(completion: .finished)
    }
    
    private func executeForLongTime() -> User {
        for _ in 0...10000000 {}
        return User(name: "Ana", age: 27)
    }
}

Let’s test this in the viewDidLoad and see what happens:

class ViewController: UIViewController {
    
    private let viewModel = LoginViewModel()
    private var cancellables = Set<AnyCancellable>()
    
    override func viewDidLoad() {
        super.viewDidLoad()
        view.backgroundColor = .red
        
        print("The PUBLISHER thread: \(Thread.current)")
        
        viewModel
            .longTaskPublisher
            .sink { user in
                print("The SINK thread: \(Thread.current)")
            }.store(in: &cancellables)
        
        print("Finished viewDidLoad")
    }
} 

If you are following through, you will check that the UI takes a few seconds to appear all red.

Now you can think: “This is easy! I just learned that we can receive the values on the background thread, let’s do this.”

And you add the receive(on: DispatchQueue.global()) to that pipeline… but it still freezes the app. What is happening after all? The problem here is the subscription process. We can unfreeze our app just by setting the subscription thread as a background thread:

viewModel
    .longTaskPublisher
    .subscribe(on: DispatchQueue.global())
    .receive(on: DispatchQueue.main)
    .sink { user in
        print("The SINK thread: \(Thread.current) with user: \(user)")
    }.store(in: &cancellables)

And now having our desired outcome:

final combine thread main example image

 

General Tips for Threading in Combine

Now let’s gather all the data we collected until now into some advice:

  • Be mindful of which thread you will run your sink closures. If you don’t define the default Scheduler will be used and it’s based on where the data is created.
  • Never rely on the default Scheduler to run your code if it changes UI.
  • A good pattern for your async code is to subscribe in the background thread and receive on the main thread.

 

Summary – Which thread runs my sink closure?

Today we explored how threading works in Apple’s Combine Framework and how you can avoid UI crashes and freezes. Threading is an amazing topic in iOS development that I’m intrigued by, and almost every day I learn new things about this topic. Now you know how main and other threads work with Combine and how to use them in your project to take maximum advantage of all framework features on multithreading.

That’s all my people, I hope you liked reading this article as much as I enjoyed writing it. If you want to support this blog you can Buy Me a Coffee or leave a comment saying hello. You can also sponsor posts and I’m open to freelance writing! You can reach me on LinkedIn or Twitter and send me an e-mail through the contact page.

Thanks for reading and… That’s all folks.

Credits:

title image