Recursively execute a paginated network call with Combine

Published on: June 15, 2020

Last week, my attention was caught by a question that Dennis Parussini asked on Twitter. Dennis wanted to recursively make calls to a paginated API to load all pages of data before rendering UI. Since I love Combine and interesting problems I immediately started thinking about ways to achieve this using a nice, clean API. And then I realized that this is a non-trivial task that was worth exploring.

In this week's post, I would like to share my thought process and solution with you, hoping you'll learn something new about Combine in the process.

Understanding the problem and setting a goal

Whenever I get to work on a problem like this, I always start by writing the code I would like to write when using an API or abstraction that I've written. In this case, I would like to be able to write something like the following code to fetch all pages from the paginated endpoint:

networking.loadPages()
  .sink(receiveCompletion: { _ in
    // handle errors
  }, receiveValue: { items in
    print(items)
  })
  .store(in: &cancellables)

In this case, it didn't matter to me what networking is, or what kind of object owns it. In other words, I don't care about the architecture this would be used in. All I really care about is that I have an object that implements loadPages(). And that loadPages() will return a publisher that emits data for all pages at once. I don't want to receive all intermediate pages in my sink. The publisher completes immediately after delivering my complete data set.

The tricky bit here is that this means that in loadPages() we'll need to somehow create a publisher that collects the responses from several network calls, bundles them into one big result, and outputs them to the created publisher.

Since I didn't have access to an API that would give me paginated responses I decided that a very naive abstraction would be sufficient. The abstraction uses a Response object that looks as follows:

struct Response {
  var hasMorePages = true
  var items = [Item(), Item()]
}

struct Item {}

My loader should keep making more requests until it receives a Response that has its hasMorePages set to false. At that point, the chain is considered complete and the publisher created in loadPages() should emit all fetched values and complete.

The starting point for my experimentation would look like this:

class RecursiveLoader {
  var requestsMade = 0
  var cancellables = Set<AnyCancellable>()

  init() { }

  private func loadPage() -> AnyPublisher<Response, Never> {
    // this would be the individual network call
    Future { promise in
      DispatchQueue.global().asyncAfter(deadline: .now() + 0.1) {
        self.requestsMade += 1
        if self.requestsMade < 5 {
          return promise(.success(Response()))
        } else {
          return promise(.success(Response(hasMorePages: false)))
        }
      }
    }.eraseToAnyPublisher()
  }
}

This setup is fairly simple. I have a loadPage() function that will load an individual page depending on the number of requests I have already made. In the real implementation, this would be replaced by a network call but for my purposes, this would do. What matters is that I have a publisher that emits a Response object that I can use to determine whether I need to load another page or not.

So now that I knew what I wanted to write and had set up some scaffolding it was time to write the solution.

Finding an appropriate solution

Attempt one: a simplified version

My initial thought was to use Combine's reduce operator on some kind of publisher that would emit responses or arrays of items as they came in. When you apply reduce to a publisher in Combine you can accumulate all emitted values into one new value that's emitted when the upstream publisher completes. This sounds perfect for my purposes so I started my experimentation with that as a base thought. However, instead of making loadPage() return a publisher I wanted to simplify everything a little bit. To load all pages I would create an instance of RecursiveLoader, subscribe to a publisher that I'd define as a property on RecursiveLoader and tell it to begin loading:

let loader = RecursiveLoader()

loader.finishedPublisher
  .sink(receiveValue: { items in
    print("items \(items.count)")
  })
  .store(in: &cancellables)

loader.initiateLoadSequence()

While it's not at all what I wanted to write, this was a basic idea that I considered to be approachable enough to start with.

Whenever I'm solving complicated problems, or experiment with new ideas I tend to try and get something working first before I go back to my initial design to see how I can adapt my initial prototype to be like my design. By doing this I make sure that I keep typing and trying things rather than fighting the system to immediately get the implementation I wanted.

With the simplified end goal in place, I started writing some code. First, I needed to define the finishedPublisher and a skeleton for initiateLoadSequence():

class RecursiveLoader {
  var requestsMade = 0

  private let loadedPagePublisher = PassthroughSubject<Response, Never>()
  let finishedPublisher: AnyPublisher<[Item], Never>

  var cancellables = Set<AnyCancellable>()

  init() {
    self.finishedPublisher = loadedPagePublisher
      .reduce([Item](), { allItems, response in
        return response.items + allItems
      })
      .eraseToAnyPublisher()
  }

  func initiateLoadSequence() {
    // do something
  }
}

I defined two publishers on RecursiveLoader instead of one. The private loadedPagePublisher is where I decided I would publish pages as they came in from the network. The finishedPublisher takes the loadedPagePublisher and applies the reduce operator. That way, once I complete the loadedPagePublisher, the finsihedPublisher will emit an array of [Item]. Pretty cool, right?

At this point I came up with the following implementation for initiateLoadSequence():

func initiateLoadSequence() {
  loadPage()
    .sink(receiveValue: { response in
      self.loadedPagePublisher.send(response)

      if response.hasMorePages == false {
        self.loadedPagePublisher.send(completion: .finished)
      } else {
        self.initiateLoadSequence()
      }
    })
    .store(in: &cancellables)
}

In initiateLoadSequence() I call loadPage() and subscribe to the publisher returned by loadPage(). When I receive a response I forward that response to loadedPagePublisher and if we don't have any more pages to load, I complete the loadedPagePublisher so the finishedPublisher emits its array of Item objects. If we do have more pages to load, I call self.initiateLoadSequence() again to load the next page.

This example works, but I don't think it's great. An instance of RecursiveLoader can only load all pages once, and users of this object will need to subscriber to finishedPublisher before calling initiateLoadSequence to prevent dropping events since the loadedPagePublisher will not emit any values if it doesn't have any subscribers. For loadedPagePublisher to have subscribers, users of RecursiveLoader must subscribe to finishedPublisher since that publisher is built upon loadedPagePublisher.

That said, this first attempt did show me that using reduce is a good idea, and I also like the idea of having a publisher that I publish fetched results onto so another publisher can reduce over it to collect all responses returned by the paginated API.

Attempt two: the solution I wanted to write

Since I had an okay first attempt that just had a couple of issues I figured I wanted to push that idea forward and make it work as I had initially intended. To do this, I wanted to get rid of finishedPublisher and loadedPagePublisher because those made my RecursiveLoader into a non-reusable object that can only load all pages once. Instead, I figured that I could write a function loadPages() that would create a publisher in its own scope and then pass that publisher to a function that would load an individual page, and then send its result to loadPagePublisher.

Let me show you what I mean by showing you the end result of my second attempt at implementing this functionality:

class RecursiveLoader {
  var requestsMade = 0
  var cancellables = Set<AnyCancellable>()

  init() { }

  private func loadPage() -> AnyPublisher<Response, Never> {
    // unchanged from the original
  }

  private func performPageLoad(using publisher: PassthroughSubject<Response, Never>) {
    loadPage().sink(receiveValue: { [weak self] response in
      publisher.send(response)

      if response.hasMorePages {
        self?.performPageLoad(using: publisher)
      } else {
        requestsMade = 0
        publisher.send(completion: .finished)
      }
    }).store(in: &cancellables)
  }

  func loadPages() -> AnyPublisher<[Item], Never> {
    let intermediatePublisher = PassthroughSubject<Response, Never>()

    return intermediatePublisher
      .reduce([Item](), { allItems, response in
        return response.items + allItems
      })
      .handleEvents(receiveSubscription: { [weak self] _ in
        self?.performPageLoad(using: intermediatePublisher)
      })
      .eraseToAnyPublisher()
  }
}

As you can see, I no longer have any publishers defined as properties of RecursiveLoader. Instead, loadPages() now returns an AnyPublisher<[Item], Never> that I can subscribe to directly which is much cleaner. Inside loadPages() I create a publisher that will be used to push new responses on by the performPageLoad(using:) method. The loadPages() method returns the intermediate publisher but applies a reduce on it to collect all intermediate responses and create an array of items.

I also use the handleEvents() function to hook into receiveSubscription. This allows me to kick off the page loading as soon as the publisher returned by loadPages is subscribed to. By doing this users of loadPage() don't have to kick off any loading manually and they can't forget to subscribe before starting the loading process like they could in my initial attempt.

The performPageLoad(using:) takes a PassthroughSubject<Response, Never> as its argument. Inside of this method, I call loadPage() and subscribe to its result. I then send the received result using the received subject and complete it if there are no more pages to load. If there are more pages to load, I call performPageLoad(using:) again, and pass the same subject along to that method so that next call will also publish its result on the same passthrough subject so I can reduce it into my collection of items.

Using this approach looks exactly as I wanted:

let networking = RecursiveLoader()
networking.loadPages()
  .sink(receiveCompletion: { _ in
    // handle errors
  }, receiveValue: { items in
    print(items)
  })
  .store(in: &cancellables)

There are still some things I'm not entirely happy with in this implementation. For example, performPageLoad(using:) must emit its values asynchrononously. For an implementation like this were you rely on the network that's not a problem. But if you'd modify my loadPage method and remove the delay that I have added before completing my Future, you'll find that a number of items are dropped because the PassthroughSubject didn't forward them into the reduce since the publisher created by loadPage() wasn't set up just yet. The reason for this is that receiveSubscription is called just before the subscription is completely set up and established.

Additionally, I subscribe to the publisher created by loadPage() in performPageLoad(using:) which is also not ideal, but doesn't directly harm the implementation.

Luckily, we can do better.

Attempt three: community help

After publishing the initial version of this article, a reader reached out to me with a very clean, and in hindsight, obvious solution to this problem that fixes both issues I had with my own second attempt. This solution gets rid of the need to subscribe to the publisher created in loadPage() entirely and also ensures that no matter how loadPage() generates its result, all results are always collected and forwarded.

To make this solution work, the RecursiveLoader skeleton needs to be modified slightly compared to my earlier version:

struct Response {
  var hasMorePages = true
  var items = [Item(), Item()]
  var nextPageIndex = 0
}

class RecursiveLoader {
  init() { }

  private func loadPage(withIndex index: Int) -> AnyPublisher<Response, Never> {
    // this would be the individual network call
    Future { promise in
      DispatchQueue.global().asyncAfter(deadline: .now() + 0.1) {
        let nextIndex = index + 1
        if nextIndex < 5 {
          return promise(.success(Response(nextPageIndex: nextIndex)))
        } else {
          return promise(.success(Response(hasMorePages: false)))
        }
      }
    }.eraseToAnyPublisher()
  }

The loader no longer tracks the number of requests it has made. The loadPage() method is now loadPage(withIndex:). This index represents the page that should be loaded. In this case I want to load 5 pages and then complete the chain. The Response object now has a nextPageIndex that's used to represent the next index that should be loaded. So in this case I will start with an index of 0 and create new Response objects until I reach index 4 which is the fifth page because I started counting at 0.

The loadPages() still does all of the work but it's modified as follows:

func loadPages() -> AnyPublisher<[Item], Never> {
  let pageIndexPublisher = CurrentValueSubject<Int, Never>(0)

  return pageIndexPublisher
    .flatMap({ index in
      return self.loadPage(withIndex: index)
    })
    .handleEvents(receiveOutput: { (response: Response) in
      if response.hasMorePages {
        pageIndexPublisher.send(response.nextPageIndex)
      } else {
        pageIndexPublisher.send(completion: .finished)
      }
    })
    .reduce([Item](), { allItems, response in
      return response.items + allItems
    })
    .eraseToAnyPublisher()
}

Inside loadPages() a CurrentValueSubject is used to drive the loading of pages. Since we want to start loading pages when somebody subscribes to the publisher created by loadPages(), a CurrentValueSubject makes sense because it emits its current (initial) value once it receives a subscriber. The publisher returned by loadPages() applies a flatMap to pageIndexPublisher. Inside of the flatMap, the page index emitted by pageIndexPublisher is used to create a new loadPage publisher that will load the page at a certain index. After the flatMap, handleEvents(receiveOutput:) is used to determine whether the nextPageIndex should be sent through the pageIndexPublisher or if the pageIndexPublisher should be completed. When the nextPageIndex is emitted by the pageIndexPublisher, this triggers another call to loadPage(withIndex:) in the flatMap.

Since we still use a reduce after handleEvents(receiveOutput:), all results from the flatMap are still collected and an array of Item objects is still emitted when pageIndexPublisher completed.

I can imagine that this is slightly mindbending so let's go through it step by step.

When the publisher that's returned by loadPages() receives a subscriber, pageIndexPublisher immediately emits its initial value: 0. This value is transformed into a publisher using flatMap by returning a publisher created by loadPage(withIndex:). The loadPage(withIndex:) fakes a network requests and produces a Response value.

This Response is passed to handleEvents(receiveOutput:), where it's inspected to see if there are more pages to be loaded. If more pages need to be loaded, pageIndexPublisher emits the index for the next page which will be forwarded into flatMap so it can be converted into a new network call. If there are no further pages available, the pageIndexPublisher sends a completion event.

After the Response is inspected by handleEvents(receiveOutput:), it is forwarded to the reduce where the Response object's item property is used to build an array of Item objects. The reduce will keep collecting items until the pageIndexPublisher sends its completion event.

In Summary

This blog post was a fun one to write. Especially because I didn't know I was going to write it until I did. I hope I've been able to give you a glimpse into the thought process that I use what I design and implement solutions to complicated problems. By coming up with an ideal call-site first I usually already get a good sense of what my implementation should look like. And by throwing that ideal implementation aside for a moment and getting something that works first, I always get a good sense of what works and what doesn't without worrying too much about the result.

If you have any questions or feedback for me, don't be scared and send me a message on Twitter.

I did not mention who gave me the tip to use a CurrentValueSubject and a flatMap in my solution because they preferred to remain anonymous.

Categories

Combine

Subscribe to my newsletter