Understanding Combine’s publishers and subscribers

Published on: January 13, 2020

In my previous post about Combine, I introduced you to Functional Reactive Programming (FRP) and I've shown you can subscribe to Combine publishers using the sink(receiveCompletion:receiveValue:) method. I also showed you how you can transform the output of publishers using some of its built-in functions like map and collect. This week I want to focus more on how Combine publishers and subscribers work under the hood.

By the end of today's post, you should have a clear picture of what Combine's core components are, and how they relate to each other. The topics covered in today's post are the following:

  • Learning how publishers and subscribers are tied together
  • Creating a custom subscriber
  • Writing a custom publisher

Learning how publishers and subscribers are tied together

In last week's post I explained that publishers send values to their subscribers, and that a publisher can emit one or more values, but that they only emit a single completion (or error) event. This explanation was good enough for an introduction, but the reality is a little bit more nuanced.

A publisher/subscriber relationship in Combine is solidified in a third object, the subscription. When a subscriber is created and subscribes to a publisher, the publisher will create a subscription object and it passes a reference to the subscription to the subscriber. The subscriber will then request a number of values from the subscription in order to begin receiving those values. This is done by calling request(_:) on the subscription. The number of values that a subscriber wants to receive is communicated using a Subscribers.Demand instance. A demand can be any of the following values:

  • Subscribers.Demand.none
  • Subscribers.Demand.unlimited
  • Subscribers.Demand.max(Int)

When you create a subscription by calling sink on a Publisher, the created subscriber calls request(_:) on its subscription with a value of Subscribers.Demand.unlimited because it wants to receive all values that are published, no matter how many. In most cases, this is exactly what you want, but sometimes a subscriber might want to limit the number of items it takes from a publisher. If you want to limit the number of items received from a publisher, you can call request with Subscribers.Demand.max(Int) where Int is replaced with the number of items that should be received. As new values come in, a subscriber can update the number of values it wants to receive from the publisher. Note that this number can only be incremented. This means that a subscriber that is created with a demand of .unlimited, it will always receive all values from the publisher it's subscribed to. If you call request(_:) with a demand of .max(1) and subsequently, call it with a demand of .max(10), the total number of items sent to the subscriber is 11 since demands are always additive.

When a new value is generated for a publisher, it's the subscription's job to mediate between the publisher and the subscriber and to make sure that subscribers don't receive more values than they request. Subscriptions are also responsible for retaining and releasing subscribers. The following diagram visualizes the complex relationship between subscribers, publishers, and subscriptions:

The subscriber flow

As you can see in the image above, the publisher doesn't really do an awful lot. It's mostly the subscription and the subscriber that interact with each other. In many cases, you will not need the publisher itself once a subscription is established, but this is not a given. The subscriber will ask the subscription for values, and the subscription will take care of sending these values to the subscriber. This does not mean that the subscription must do all of the work, that it must generate the values on its own, or that you can't use the publisher to generate values at all. You're free to implement your logic as you see fit as long as your implementation fits the contract that is established by the way objects relate to each other in Combine. For example, it's perfectly valid for a subscription to ask a publisher for the "next" value if that makes sense. It's equally valid for a subscription to execute a URLRequest and forward its result to a subscriber without consulting the publisher at all as long as it doesn't break the established contract.

To deepen your understanding of how publishers, subscriptions, and subscribers relate to each other I will show you how you can implement your own version of a URLSession.DataTaskPublisher. I briefly showed how you can use DataTaskPublisher in an earlier post I wrote about supporting low data mode in your app. My purpose today is not to teach you how to properly do networking in Combine. If you want to learn more about networking in Combine, you can check out this post. My purpose today is merely to show you how you can create your own publisher, subscription and subscriber triad from scratch. We'll start implementing a custom subscriber. After doing that I will show you how to implement a custom publisher and subscription.

Creating a custom subscriber

In the previous section, I explained that subscribers request (or demand) a number of values from a subscription object. You also learned that a publisher receives a subscriber, creates a subscription and ties the subscription and subscriber together. When you implement your own subscriber, part of this process happens outside of the subscriber, but there is also some work you need to do in your custom subscriber. Before we implement our own subscriber, let's look at the protocol requirements of the Subscriber protocol:

public protocol Subscriber : CustomCombineIdentifierConvertible {

  associatedtype Input
  associatedtype Failure : Error

  func receive(subscription: Subscription)
  func receive(_ input: Self.Input) -> Subscribers.Demand
  func receive(completion: Subscribers.Completion<Self.Failure>)
}

The Subscriber protocol has two associated types, Input and Failure. The concrete values for these associated types must match those of the publisher that it wants to subscribe to. If a subscriber expects Int as its Input, it can't subscribe to a publisher that has String as its Output. The same is true for the Failure associated type.

The three different flavors of receive that are required by the protocol are all used at a different time in the subscription lifecycle. The first, receive(subscription:) is called when a publisher creates and assigns a subscription object to the subscriber. At that point, the subscriber communicates its initial demand to the subscription. I will demonstrate this in a moment.

Next, receive(_:) is called every time the subscription pushes a new object to the subscriber. The subscriber returns a Subscription.Demand to the subscription to communicate the number of items it wants to receive now. As mentioned before, this demand is added to any demands that were sent to the subscription earlier. When the final value is generated, the subscription will call receive(completion:) with the result of the sequence. This method is only called once and concludes the stream of new values.

Before we implement a custom subscriber for the data task publisher that I mentioned at the end of the previous section, I want to show you a simple subscriber that takes a sequence of Int objects:

class IntSubscriber: Subscriber {
  typealias Input = Int
  typealias Failure = Never

  func receive(subscription: Subscription) {
    print("Received subscription")
    subscription.request(.max(1))
  }

  func receive(_ input: Input) -> Subscribers.Demand {
    print("Received input: \(input)")
    return .none
  }

  func receive(completion: Subscribers.Completion<Never>) {
    print("Received completion: \(completion)")
  }
}

(0...6).publisher.subscribe(IntSubscriber())

The typealias declarations in the code above specify that this subscriber takes Int as its input, and because Never is the failure type, we don't expect publishers that this subscriber subscribes to emit failures. This subscriber will only work on publishers that emit Int values, and no errors.

In the receive(subscription:) method, the subscriber receives a subscription and immediately requests a demand of .max(1). This means that this subscriber wants to receive a single value. The receive(_:) method receives a value and then returns .none. This means that we don't want to alter the demand of this publisher when we receive a new value. If we would return a demand of .max(1) here, we would increase the demand by one, and receive the next value. The last method, receive(completion:) is called when the subscriber finishes.

If you put this code in a playground, you'll find that the output is the following:

Received subscription
Received input: 0

As expected we only receive a single value here. What's interesting is that we never receive a completion event. By calling subscription.request(.max(1)), the subscriber explicitly communicates that it wants to receive a single event. No more, no less. Once the subscription has pushed that single value to the subscriber and the demand is not bumped in response to receiving that single value, the subscriber is discarded and the subscription is invalidated. If we want to receive all the values that are published in the preceding example, we need to update the receive(subscription:) method as follows:

func receive(subscription: Subscription) {
  print("Received subscription")
  subscription.request(.unlimited)
}

By requesting an unlimited number of values, all values are pushed from the subscription to the subscriber until the publisher sends a completion event. Makes sense, right? Let's look at the subscriber that's used in the next section when I show you how to build a custom version of a DataTaskPublisher:

// Skeleton for a subscriber, we'll finish it later in the article

class DecodableDataTaskSubscriber<Input: Decodable>: Subscriber {
  typealias Failure = Error

  func receive(subscription: Subscription) {
    print("Received subscription")
    subscription.request(.unlimited)
  }

  func receive(_ input: Input) -> Subscribers.Demand {
    print("Received value: \(input)")
    return .none
  }

  func receive(completion: Subscribers.Completion<Error>) {
    print("Received completion \(completion)")
  }
}

This subscriber is very similar to the IntSubscriber. The main difference is that the Input is defined as a generic type instead of a typealias. The Failure type is Error instead of Never because network requests can fail.

Note:
The subscriber in this code snippet is not yet in a properly usable state, I will show you how to finish and use this subscriber later. First, I want to show you how to create your own publisher and subscription objects.

Writing a custom publisher

Before we continue, I want to make sure you understand that writing a custom publisher is not something that you'll likely do in practice. It's also something that Apple does not recommend you do. In most cases, the default publishers and subjects that Combine exposes are more than good enough. In fact, I'm sure that the implementation I'm about to show you is not perfect, and that's okay. The purpose of the code in this section is to give you an idea of how Combine works under the hood by implementing a rudimentary publisher, not to write a clever, usable publisher that you might want to use in your projects.

Like I mentioned before, in this post I will show you a custom version of a DataTaskPublisher. This custom publisher will automatically decode Data into a Decodable model. The first step in building a custom publisher is to define the publisher itself. Before we do this, let's look at the Publisher protocol:

public protocol Publisher {

  associatedtype Output
  associatedtype Failure : Error

  func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}

The most interesting part of the Publisher protocol is the receive(subscriber:) method. This method constrains the generic subscriber S by requiring that S conforms to the Subscriber protocol and it ensures that the Failure and Output for the Publisher match the Failure and Input of the Subscriber. This receive(subscriber:) method is called immediately when the subscribe(subscriber:) method is called on the publisher. In the receive(subscriber:) method you are expected to create a Subscription object and pass it to the subscriber.

The following code defines the custom data task publisher I wanted to show you:

extension URLSession {
  struct DecodedDataTaskPublisher<Output: Decodable>: Publisher {
    typealias Failure = Error

    let urlRequest: URLRequest

    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
      let subscription = DecodedDataTaskSubscription(urlRequest: urlRequest, subscriber: subscriber)
      subscriber.receive(subscription: subscription)
    }
  }

  func decodedDataTaskPublisher<Output: Decodable>(for urlRequest: URLRequest) -> DecodedDataTaskPublisher<Output> {
    return DecodedDataTaskPublisher<Output>(urlRequest: urlRequest)
  }
}

The preceding code defines a DecodedDataTaskPublisher<Output, Decodable> struct in an extension on URLSession. It also defines a convenience method that enables users of this custom publisher to create a DecodedDataTaskPublisher<Output, Decodable> in a way that's similar to how a normal DataTaskPublisher is created. The DecodedDataTaskPublisher has a property that holds the URLRequest that should be executed. In the receive(subscriber:) method, an instance of DecodedDataTaskSubscription is created. The subscription object receives the URLRequest and the subscriber, and the subscription is passed to the subscriber's receive(subscription:) method that you saw in the previous section.

All that's left to complete our publisher, subscription and subscriber triad, is to write the subscription object. Let's look at the Subscription protocol first:

public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {
  func request(_ demand: Subscribers.Demand)
}

The Subscription protocol itself only requires a request(_:) method to be implemented. Because Subscription inherits from the Cancellable protocol, subscriptions are also required to implement a cancel() method that's used to free up any resources that the subscription holds on to, like the subscriber itself for example. The request(_:) method is used by the subscriber to communicate the number of values the subscriber wants to receive and when you write a custom subscription it's your responsibility to honor this demand correctly and to not send more values than requested by the subscriber.

Now that you know a bit about the Subscription protocol, let's look at the implementation of the DecodedDataTaskSubscription object:

extension URLSession.DecodedDataTaskPublisher {
  class DecodedDataTaskSubscription<Output: Decodable, S: Subscriber>: Subscription
    where S.Input == Output, S.Failure == Error {

    private let urlRequest: URLRequest
    private var subscriber: S?

    init(urlRequest: URLRequest, subscriber: S) {
      self.urlRequest = urlRequest
      self.subscriber = subscriber
    }

    func request(_ demand: Subscribers.Demand) {
      if demand > 0 {
        URLSession.shared.dataTask(with: urlRequest) { [weak self] data, response, error in
          defer { self?.cancel() }

          if let data = data {
            do {
              let result = try JSONDecoder().decode(Output.self, from: data)
              self?.subscriber?.receive(result)
              self?.subscriber?.receive(completion: .finished)
            } catch {
              self?.subscriber?.receive(completion: .failure(error))
            }
          } else if let error = error {
            self?.subscriber?.receive(completion: .failure(error))
          }
        }.resume()
      }
    }

    func cancel() {
      subscriber = nil
    }
  }
}

There's a lot of code in the preceding snippet. The most important part of this code is the request(_:) method. If this method is called with a demand that's larger than zero, a regular data task is created and kicked off. The completion handler for this data task uses the defer statement to call cancel on the subscription after the response from the data task is handled. We do this because once the data task is completed, the subscription will emit no further values because its job is done.

If the data task succeeded, and the callback receives data, an attempt is made to decode the data into the Output type. If this succeeds, the resulting object is passed to the subscriber so it can receive and handle the decoded response. Immediately after, we send a completion event to indicate that we finished successfully. If the decoding fails, or if we receive an error instead of data, we complete the subscription with .failure(error) so the subscriber can handle the error.

Before we attempt to use our custom subscriber, publisher and subscription together, let's try to use just the publisher and subscription first. We can do this by subscribing to the custom publisher using sink:

struct SomeModel: Decodable {}
var cancellable: AnyCancellable?

func makeTheRequest() {
  let request = URLRequest(url: URL(string: "https://www.donnywals.com")!)
  let publisher: URLSession.DecodedDataTaskPublisher<SomeModel> = URLSession.shared.decodedDataTaskPublisher(for: request)
  cancellable = publisher.sink(receiveCompletion: { completion in
    print("Received completion: \(completion)")
  }, receiveValue: { value in
    print("Received value: \(value)")
  })
}

makeTheRequest()

In the preceding example, I wrapped the code to make a request using the custom publisher in a function to simulate real-world usage. If you remove cancellable = from the code above, you will notice that the AnyCancellable returned by sink is not retained, and the completion and value closures are never called. If you run the above code as-is, the completion closure is called with a failure event. This is expected because the endpoint https://www.donnywals.com does not return a valid JSON response. While it's pretty neat that our custom publisher and subscription seem to work as intended, let's try to use it with our custom subscriber instead of sink:

struct SomeModel: Decodable {}
var cancellable: AnyCancellable?

func makeTheRequest() {
  let request = URLRequest(url: URL(string: "https://www.donnywals.com")!)
  let publisher: URLSession.DecodedDataTaskPublisher<SomeModel> = URLSession.shared.decodedDataTaskPublisher(for: request)
  let subscriber = DecodableDataTaskSubscriber<SomeModel>()
  publisher.subscribe(subscriber)
}

makeTheRequest()

If you run the code above, you'll find that only the Received subscription statement from the custom subscriber is printed. It appears that the subscriber is deallocated before it receives any events from the subscription. To confirm this, add the following code to DecodableDataTaskSubscriber:

deinit {
  Swift.print("deinit subscriber")
}

This will show you when the subscriber is deallocated, and if you run the code again you'll see that deinit subscriber is printed immediately after Received subscription. The reason for this is simple and complex at the same time.

When you call subscribe(_:) on a publisher, it creates a subscription and passes this subscription to the subscriber. It does not retain the subscription it creates, which means that the subscription will be deallocated once the publishers' subscribe(_:) method finishes unless another object retains the subscription.

When the subscription is initialized, it receives the subscriber object that it will push values to and the subscription stores this subscriber object for future reference. The subscriber itself only accesses the subscription once when its receive(subscription:) method is called. In our current implementation, it does not retain the subscription at all. And since the subscription itself is also not held on to outside of makeRequest, both the subscription and the subscriber are both deallocated by the time makeRequest exits.

Based on the description above, we need an object to hold on to our subscription to prevent it from being deallocated. Preferably, this object is Cancellable so it can be used to cancel the subscription if needed. Based on the following excerpt from Apple's documentation on Subscriber, the subscriber itself is a good candidate for retaining the subscription and canceling it later:

You connect a subscriber to a publisher by calling the publisher’s subscribe(_:) method. After making this call, the publisher invokes the subscriber’s receive(subscription:) method. This gives the subscriber a Subscription instance, which it uses to demand elements from the publisher, and to optionally cancel the subscription.

Update the DecodableDataTaskSubscriber so it's implementation looks as follows:

class DecodableDataTaskSubscriber<Input: Decodable>: Subscriber, Cancellable {
  typealias Failure = Error

  var subscription: Subscription?

  func receive(subscription: Subscription) {
    print("Received subscription")
    self.subscription = subscription
    subscription.request(.unlimited)
  }

  func receive(_ input: Input) -> Subscribers.Demand {
    print("Received value: \(input)")
    return .none
  }

  func receive(completion: Subscribers.Completion<Error>) {
    print("Received completion \(completion)")
    cancel()
  }

  func cancel() {
    subscription?.cancel()
    subscription = nil
  }
}

By conforming DecodableDataTaskSubscriber to Cancellable and holding on to the subscription it receives in receive(subscription:) we created somewhat of a retain cycle which keeps all objects alive. The subscriber holds on to its subscription and the subscription holds on to the subscriber. By canceling the subscription and setting it to nil when the subscriber's cancel method, we make sure to break the cycle when needed. Surprisingly enough, I have found that Subscribers.Sink seems to work in the same way. The major difference is that when you use sink to subscribe to a publisher, the Subscribers.Sink is immediately wrapped in an AnyCancellable instance that, when deallocated, calls cancel() on the cancellable object it wraps which immediately breaks any retain cycles that may have existed.

With the code above, you have completed our triad of a custom publisher, subscription and subscriber. That's a huge achievement! Even though you won't be building all of these components yourself in most cases, it's good to know what happens on the inside and I hope you now know a lot more about Combine and what it does behind the curtain.

In summary

Today's post is a long one, but it's also an important one. I hope that I was able to show you how publishers, subscribers, and subscriptions work in the Combine framework, and what role they fulfill. All publishers that you use in Combine are built by composing the same building blocks in one way or the other. What's important is that you understand what these building blocks are, and what role they fulfill.

To summarize, when a subscriber wants to subscribe to a publisher, the publisher creates a subscription. The subscriber is passed to this subscription. The subscription is then passed to the subscriber so the subscriber can ask for an initial number of items. The subscription will then push values to the subscriber, and the subscriber can adjust its demand every time a value is received. If the subscriber asks for a number of items that is greater than, or equal to the number of items pushed by the subscription, it will eventually receive a completion or error event. If the subscriber demands fewer values than the subscription will push, the subscriber will not receive a completion event.

Since this post is quite long and complex, I would recommend that you let this sink in for a moment. If you're confused or overwhelmed, that's okay. Come back to this post again at a later time. And most importantly, try to follow along with the code presented in this post. Put it in a Playground and experiment with it. It will eventually make sense.

if you have any questions for me, or if you have feedback, make sure to let me know on Twitter.

Categories

Combine

Subscribe to my newsletter