Leonardo Maia Pugliese
Holy Swift

Holy Swift

Swift and Combine: Which thread runs my sink closure?

Swift and Combine: Which thread runs my sink closure?

We never know when a message will come

Leonardo Maia Pugliese's photo
Leonardo Maia Pugliese
·Mar 9, 2022·

8 min read

Subscribe to my newsletter and never miss my upcoming articles

Table of contents

  • The Painting
  • The Problem
  • General Tips for Threading in Combine
  • Summary

Hallo allemaal, Leo hier.

Last week we started to dive more into Apple's Combine framework and a very important question was raised. When you use the reactive programming paradigm you probably will end with the pub-sub paradigm, implying that you will have an object that will publish some data and another object that will receive that data.

As you can imagine a lot of things are intrinsically reactive on mobile development. For example, when you open your app all the UI elements are waiting for user input to react to that action, or even when the user request something to the server the app is waiting for the completion of the request to react to the new data and show the info.

Those examples occur all along with mobile development and we should be aware of how important is to master the reactive programming way. Today we explore the question in the title and see what we can discover in this wonderful land of threading and Combine in Swift.

Let's code, but first...

The Painting

This painting is a 1534 masterpiece called The Annunciation painted by Lorenzo Lotto.

Lorenzo Lotto (c. 1480 – 1556/57) was an Italian painter, draughtsman, and illustrator, traditionally placed in the Venetian school, though much of his career was spent in other North Italian cities. While he was active during the High Renaissance, his nervous and eccentric posings and distortions represent a transitional stage to the first Florentine and Roman Mannerists of the 16th century.

I chose this painting because you never know when an angel will publish a message, right?

The Problem

We want to know on what thread my sink closure on Combine Framework will run

First of all, let's set up the environment to run our tests. Create a new project and copy-paste the code below to your ViewController:


class ViewController: UIViewController {

private let viewModel = LoginViewModel()
private var cancellables = Set<AnyCancellable>()

override func viewDidLoad() {
    super.viewDidLoad()
    view.backgroundColor = .red

    print("The SINKED in the thread : \(Thread.current)")

    viewModel
        .userPublisher
        .sink { user in
            print("The user is: \(user) on thread: \(Thread.current)")
        }.store(in: &cancellables)

    viewModel.fetchUser() 

    print("Finished viewDidLoad")
}

}


This example will have a lot of prints to show us what threads are being used. This code is getting the publisher from the view model and sinking whichever data it sends. But if this keeps listening to the publisher, on which thread does it sink after all?

Let's build our LoginViewModel to start our testings:


struct LoginViewModel {

private let userSubject = PassthroughSubject<User,Never>()

var userPublisher: AnyPublisher<User,Never> {
    userSubject.eraseToAnyPublisher()
}

func fetchUser() {
    print("The SEND thread is: \(Thread.current)")
    userSubject.send(User(name: "Pepijn", age: 26))
}

}

struct User { let name: String let age: Int }


The result is:

Screenshot 2022-03-07 at 09.02.30.png

Well... Looks like if you don't specify on which thread you will receive the sink, it uses the thread the sink was created as default. But is it?

What happens if the LoginViewModel sends the User model through a background queue? Let's see:

        DispatchQueue.global().async {
            print("The SEND thread: \(Thread.current)")
            userSubject.send(User(name: "Alan", age: 26))
        }

You can check below the result:

Screenshot 2022-03-08 at 07.31.09.png

As you can see, if you send the User in a background queue the PassthroughSubject sends and the sink closure will run in that exact queue. In other words, if you have any UI thing occurring on that sink, you can be sure that you will have problems, my friend.

The first learning here is: By default, the sink closure will run on the same thread that the data is created.

Let's solve that problem and continue to explore the Publisher.

Exploring the Receive(on::) function

Apple has a solution for that problem, is the receive function operator. This function specifies the Scheduler on which to receive elements from the publisher.

But wait, what is a Scheduler? TheScheduler is a protocol that defines when and how to execute a closure. It is sort of new because it's only for iOS 13 or higher. You can create your Schedulers just by conforming to that protocol.

Another great news is that is pretty convenient that DispatchQueue conforms to the Schedule protocol, do you know where we are going with this right? This enables receiving things on the main thread and the background thread easily! Let's try both:

The next example we'll send from a background queue but now we will have control on which thread we will be sinking:

Receive on the main thread:

class ViewController: UIViewController {

    private let viewModel = LoginViewModel()
    private var cancellables = Set<AnyCancellable>()

    override func viewDidLoad() {
        super.viewDidLoad()
        view.backgroundColor = .red

        print("The PUBLISHER thread: \(Thread.current)")

        viewModel
            .userPublisher
            .receive(on: DispatchQueue.main) // added this to the sink configuration
            .sink { user in
                print("The SINK thread: \(Thread.current) with user: \(user)")
            }.store(in: &cancellables)

        viewModel.fetchUser()

        print("Finished viewDidLoad")
    }
}

struct LoginViewModel {

    private let userSubject = PassthroughSubject<User,Never>()

    var userPublisher: AnyPublisher<User,Never> {
        userSubject.eraseToAnyPublisher()
    }

    func fetchUser() {
        DispatchQueue.global().async {
            print("The SEND thread: \(Thread.current)")
            userSubject.send(User(name: "Mike", age: 26))
        }
    }
}

Resulting in:

Screenshot 2022-03-08 at 07.49.04.png

Now we are receiving our data from the publisher in the main thread.

But we can do another way around, imagine that the LoginViewModel sends in the main thread but we want to receive in a background queue:

Receive on a background thread:

class ViewController: UIViewController {
    private let viewModel = LoginViewModel()
    private var cancellables = Set<AnyCancellable>()

    override func viewDidLoad() {
        super.viewDidLoad()
        view.backgroundColor = .red

        print("The PUBLISHER thread: \(Thread.current)")

        viewModel
            .userPublisher
            .receive(on: DispatchQueue.global())
            .sink { user in
                print("The SINK thread: \(Thread.current) with user: \(user)")
            }.store(in: &cancellables)

        viewModel.fetchUser()

        print("Finished viewDidLoad")
    }
}


struct LoginViewModel {

    private let userSubject = PassthroughSubject<User,Never>()

    var userPublisher: AnyPublisher<User,Never> {
        userSubject.eraseToAnyPublisher()
    }

    func fetchUser() {

        print("The SEND thread: \(Thread.current)")
        userSubject.send(User(name: "Mike", age: 32))
    }
}

Resulting in:

Screenshot 2022-03-08 at 07.55.12.png

Now we learned! If the thread of the sink closure is relevant for the sinking process you should specify on which scheduler it will run.

A curiosity about the receive function and schedulers is that you can play with it:

        viewModel
            .userPublisher
            .map({ _ in
                print("The map thread: \(Thread.current)")
            })
            .receive(on: DispatchQueue.global())
            .map({ _ in
                print("The map thread: \(Thread.current)")
            })
            .receive(on: DispatchQueue.main)
            .sink { user in
                print("The SINK thread: \(Thread.current) with user: \(user)")
            }.store(in: &cancellables)

Generating this:

Screenshot 2022-03-08 at 08.17.34.png

This happens because all the operators after the receive function will run in the same Scheduler that was set in that function. In other words, the receive function defines on which Scheduler all the operators after it will run.

This way our little play is a good example where: we are sending in the main thread, then the first map receives it in the main thread, after that the second map receives in a background thread and we finally sink it in the main thread again. Crazy right?

Testing the Subscribe function with Custom Publishers

Furthermore, we also have control over how we subscribe to publishers using the subscribe(on::) function. On our example, that power is not relevant, but imagine that you create a custom publisher that the subscription process is somewhat slow:

struct LongTaskPublisher: Publisher {
    typealias Output = User
    typealias Failure = Never

    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        subscriber.receive(subscription: Subscriptions.empty)
        _ = subscriber.receive(executeForLongTime())
        subscriber.receive(completion: .finished)
    }

    private func executeForLongTime() -> User {
        for _ in 0...10000000 {}
        return User(name: "Ana", age: 27)
    }
}

Let's test this in the viewDidLoad and see what happens:

class ViewController: UIViewController {

    private let viewModel = LoginViewModel()
    private var cancellables = Set<AnyCancellable>()

    override func viewDidLoad() {
        super.viewDidLoad()
        view.backgroundColor = .red

        print("The PUBLISHER thread: \(Thread.current)")

        viewModel
            .longTaskPublisher
            .sink { user in
                print("The SINK thread: \(Thread.current)")
            }.store(in: &cancellables)

        print("Finished viewDidLoad")
    }
}

If you are following through, you will check that the UI takes a few seconds to appear all red.

Now you can think: "This is easy! I just learned that we can receive the values on the background thread, let's do this."

And you add the receive(on: DispatchQueue.global()) to that pipeline... but it still freezes the app. What is happening after all? The problem here is the subscription process. We can unfreeze our app just by setting the subscription thread as a background thread:

        viewModel
            .longTaskPublisher
            .subscribe(on: DispatchQueue.global())
            .receive(on: DispatchQueue.main)
            .sink { user in
                print("The SINK thread: \(Thread.current) with user: \(user)")
            }.store(in: &cancellables)

And now having our desired outcome:

Screenshot 2022-03-08 at 08.33.38.png

General Tips for Threading in Combine

Now let's gather all the data we collected until now into some advice:

  • Be mindful of which thread you will run your sink closures. If you don't define the default Scheduler will be used and it's based on where the data is created.
  • Never rely on the default Scheduler to run your code if it changes UI.
  • A good pattern for your async code is to subscribe in the background thread and receive on the main thread.

Summary

Today we explored how threading works in Apple's Combine Framework and how you can avoid UI crashes and freezes. Threading is an amazing topic in iOS development that I'm intrigued by, and almost every day I learn new things about this topic.

That's all my people, I hope you liked it as I enjoyed writing this article. If you want to support this blog you can Buy Me a Coffee or just leave a comment saying hello. You can also sponsor posts and I'm open to writing freelancing! Just reach me in LinkedIn or Twitter for details.

Thanks for the reading and... That's all folks.

credits: image

Did you find this article valuable?

Support Leonardo Maia Pugliese by becoming a sponsor. Any amount is appreciated!

Learn more about Hashnode Sponsors
 
Share this