Building an AsyncSequence with AsyncStream.makeStream

Published on: March 25, 2024

A while ago I’ve published a post that explains how you can use AsyncStream to build your own asynchronous sequences in Swift Concurrency. Since writing that post, a new approach to creating AsyncStream objects has been introduced to allow for more convenience stream building.

In this post, I’ll expand on what we’ve already covered in the previous post so that we don’t have to go over everything from scratch.

By the end of this post you will understand the new and more convenient makeStream method that was added to AsyncStream. You’ll learn how and when it makes sense to build your own async streams, and I will reiterate some of their gotchas to help you avoid mistakes that I’ve had to make in the past.

If you prefer to learn by watching videos, this video is for you:

Reviewing the older situation

While I won’t explain the old approach in detail, I think it makes sense to go over the old approach in order to refresh your mind. Or if you weren’t familiar with the old approach, it will help put the improvements in Swift 5.9 into perspective a bit more.

Pre-Swift 5.9 we could create our AsyncStream objects as follows:

let stream = AsyncStream(unfolding: {
    return Int.random(in: 0..<Int.max)
})

The approach shown here is the simplest way to build an async stream but also the least flexible.

In short, the closure that we pass to unfolding here will be called every time we’re expected to asynchronously produce a new value for our stream. Once the value is produced, you return it so that the for loop iterating over this sequence can use the value. To terminate your async stream, you return nil from your closure to indicate that there are no further values to be produced.

This approach lacks some flexibility and doesn’t fit very well for transforming things like delegate based code over into Swift Concurrency.

A more useful and flexible way to build an AsyncStream that can bridge a callback based API like CLLocationManagerDelegate looks as follows:

class AsyncLocationStream: NSObject, CLLocationManagerDelegate {
    lazy var stream: AsyncStream<CLLocation> = {
        AsyncStream { (continuation: AsyncStream<CLLocation>.Continuation) -> Void in
            self.continuation = continuation
        }
    }()
    var continuation: AsyncStream<CLLocation>.Continuation?

    func locationManager(_ manager: CLLocationManager, didUpdateLocations locations: [CLLocation]) {

        for location in locations {
            continuation?.yield(location)
        }
    }
}

This code does a little bit more than build an async stream so let’s go over it in a bit more detail.

First, there’s a lazy var that’s used to create an instance of AsyncStream. When we create the async stream, we pass the AsyncStream initializer a closure. This closure receives a continuation object that we can use to push values onto our AsyncStream. Because we’re bridging a callback based API we need access to the continuation from outside of the initial closure so we assign the continuation to a var on the AsyncLocationStream object.

Next, we have the didUpdateLocations delegate method. From that method, we call yield on the continuation to push every received location onto our AsyncStream which allows anybody that’s writing a for loop over the stream property to receive locations. Here’s what that would like like in a simplified example:

let locationStream = AsyncLocationStream()

for await value in locationStream.stream {
  print("location received", value)
}

While this all works perfectly fine, there’s this optional continuation that we’re dealing with. Luckily, the new makeStream approach takes care of this.

Creating a stream with makeStream

In essence, a makeStream based AsyncStream works identical to the one you saw earlier.

We still work with a continuation that’s used to yield values to whoever is iterating our stream. In order to end the stream we call finish on the continuation, and to handle someone cancelling their Task or breaking out of the for loop you can still use onTermination on the continuation to perform cleanup. We’ll take a look at onTermination in the next section.

For now, let’s focus on seeing how makeStream allows us to rewrite the example you just saw to be a bit cleaner.

class AsyncLocationStream: NSObject, CLLocationManagerDelegate {
  let stream: AsyncStream<CLLocation>
  private let continuation: AsyncStream<CLLocation>.Continuation

  override init() {
    let (stream, continuation) = AsyncStream.makeStream(of: CLLocation.self)
    self.stream = stream
    self.continuation = continuation

    super.init()
  }

  func locationManager(_ manager: CLLocationManager, didUpdateLocations locations: [CLLocation]) {
    for location in locations {
      continuation.yield(location)
    }
  }
}

We’ve written a little bit more code than we had before but the code we have now is slightly cleaner and more readable.

Instead of a lazy var we can now define two let properties which fits much better with what we’re trying to do. Additionally, we create our AsyncStream and its continuation in a single line of code instead of needing a closure to lift the continuation from our closure onto our class.

Everything else remains pretty much the same. We still call yield to push values onto our stream, and we still use finish to end our continuation (we’re not calling that in the snippet above).

While this is all very convenient, AsyncStream.makeStream comes with the same memory and lifecycle related issues as its older counterparts. Let’s take a brief look at these issues and how to fix them in the next section.

Avoiding memory leaks and infinite loops

When we’re iterating an async sequence from within a task, it’s reasonable to expect that at some point the object we’re iterating goes out of scope and that our iteration stops.

For example, if we’re leveraging the AsyncLocationStream you saw before from within a ViewModel we’d want the location updates to stop automatically whenever the screen, its ViewModel, and the AsyncLocationStream go out of scope.

In reality, these objects will go out of scope but any task that’s iterating the AsyncLocationStream's stream won’t end until the stream’s continuation is explicitly ended. I've explored this phenomenon more in depth in this post where I dig into lifecycle management for async sequences.

Let’s look at an example that demonstrates this effect. We’ll look at a dummy LocationProvider first.

class LocationProvider {
  let locations: AsyncStream<UUID>
  private let continuation: AsyncStream<UUID>.Continuation
  private let cancellable: AnyCancellable?

  init() {
    let stream = AsyncStream.makeStream(of: UUID.self)
    locations = stream.stream
    continuation = stream.continuation
  }

  deinit {
    print("location provider is gone")
  }

  func startUpdates() {
    cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
      .autoconnect()
      .sink(receiveValue: { [weak self] _ in
        print("will send")
        self?.continuation.yield(UUID())
      })
  }
}

The object above creates an AsyncStream just like you saw before. When we call startUpdates we start simulating receiving location updates. Every second, we send a new unique UUID onto our stream.

To make the test realistic, I’ve added a MyViewModel object that would normally serve as the interface in between the location provider and the view:

class MyViewModel {
  let locationProvider = LocationProvider()

  var locations: AsyncStream<UUID> {
    locationProvider.locations
  }

  deinit {
    print("view model is gone")
  }

  init() {
    locationProvider.startUpdates()
  }
}

We’re not doing anything special in this code so let’s move on to creating the test scenario itself:

var viewModel: MyViewModel? = MyViewModel()

let sampleTask = Task {
  guard let locations = viewModel?.locations else { return }

  print("before for loop")
  for await location in locations {
    print(location)
  }
  print("after for loop")
}

Task {
  try await Task.sleep(for: .seconds(2))
  viewModel = nil
}

In our test, we set up two tasks. One that we’ll use to iterate over our AsyncStream and we print some strings before and after the loop.

We have a second task that runs in parallel. This task will wait for two seconds and then it sets the viewModel property to nil. This simulates a screen going away and the view model being deallocated because of it.

Let’s look at the printed results for this code:

before for loop
will send
B9BED2DE-B929-47A6-B47D-C28AD723FCB1
will send
FCE7DAD1-D47C-4D03-81FD-42B0BA38F976
view model is gone
location provider is gone

Notice how we’re not seeing after the loop printed here.

This means that while the view model and location provider both get deallocated as expected, we’re not seeing the for loop end like we’d want to.

To fix this, we need to make sure that we finish our continuation when the location provider is deallocated:

class LocationProvider {
  // ...

  deinit {
    print("location provider is gone")
    continuation.finish()
  }

  // ...
}

In the deinit for LocationProvider we can call continuation.finish() which will fix the leak that we just saw. If we run the code again, we’ll see the following output:

before for loop
will send
B3DE2994-E0E1-4397-B04E-448047315133
will send
D790D3FA-FE40-4182-9F58-1FEC93335F18
view model is gone
location provider is gone
after for loop

So that fixed our for loop sitting and waiting for a value that would never come (and our Task being stuck forever as a result). However, we’re not out of the woods yet. Let’s change the test setup a little bit. Instead of deallocating the view model, let’s try cancelling the Task that we created to iterate the AsyncStream.

var viewModel: MyViewModel? = MyViewModel()

let sampleTask = Task {
  guard let locations = viewModel?.locations else { return }

  print("before for loop")
  for await location in locations {
    print(location)
  }
  print("after for loop")
}

Task {
  try await Task.sleep(for: .seconds(2))
  sampleTask.cancel()
}

Running to code now results in the following output:

before for loop
will send
0B6E962F-F2ED-4C33-8155-140DB94F3AE0
will send
1E195613-2CE1-4763-80C4-590083E4353E
after for loop
will send
will send
will send
will send

So while our loop ended, the location updates don’t stop. We can add an onTermination closure to our continuation to be notified of an ended for loop (which happens when you cancel a Task that’s iterating an async sequence):

class LocationProvider {
  // ...

  func startUpdates() {
    cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
      .autoconnect()
      .sink(receiveValue: { [weak self] _ in
        print("will send")
        self?.continuation.yield(UUID())
      })

    continuation.onTermination = { [weak self] _ in
      self?.cancellable = nil
    }
  }
}

With this code in place, we can now handle both a task getting cancelled as well as our LocationProvider being deallocated.

Whenever you’re writing your own async streams it’s important that you test what happens when the owner of your continuation is deallocated (you’ll usually want to finish your continuation) or when the for loop that iterates your stream is ended (you’ll want to perform some cleanup as needed).

Making mistakes here is quite easy so be sure to keep an eye out!

In Summary

In this post, you saw the new and more convenient AsyncStream.makeStream method in action. You learned that this method replaces a less convenient AsyncStream initializer that forced us to manually store a continuation outside of the closure which would usually lead to having a lazy var for the stream and an optional for the continuation.

After showing you how you can use AsyncStream.makeStream, you learned about some of the gotchas that come with async streams in general. I showed you how you can test for these gotchas, and how you can fix them to make sure that your streams end and clean up as and when you expect.

Subscribe to my newsletter