Iterating over web socket messages with async / await in Swift

Published on: January 24, 2023

In iOS 13, we gained the ability to easily send and receive data using web sockets through URLSession. With async/await, we gained the ability to fetch data from servers using the await keyword and we can iterate over asynchronous sequences using async for loops.

We can even read data from a URL one line at a time by calling the lines property on URL:

let url = URL(string: "https://donnywals.com")!

for try await line in url.lines {
    // use line
}

While this is really cool and allows us to build apps that ingest data in real time if the server supports streaming bodies, we cannot use the lines property to set up a web socket connection and listen for incoming messages and potentially send messages over the same connection too.

In this post, you will learn everything you need to know about building your own mechanism to conveniently iterate over messages from a web socket asynchronously. We will leverage some existing functionality from URLSessionWebSocketTask and AsyncThrowingStream to build our own AsyncSequence that conveniently wraps our URLSessionWebSocketTask.

Note that the resulting code has only had relatively limited testing done so I cannot guarantee that the provided solution will be 100% correct for everything you throw at it. If you find any issues with the final code, feel free to contact me. Bonus points if you’re able to provide some ideas for a potential fix.

Using a web socket without async / await

Before we get started, let's quickly review how to use a web socket without async/await. The code details are outlined in this post. Be sure to read it if you want to learn more about using web sockets in your apps.


let url = URL(string: "ws://127.0.0.1:8080")!
let socketConnection = URLSession.shared.webSocketTask(with: url)
socketConnection.resume()

func setReceiveHandler() {
    socketConnection.receive { result in
        defer { self.setReceiveHandler() }

        do {
            let message = try result.get()
            switch message {
            case let .string(string):
                print(string)
            case let .data(data):
                print(data)
            @unknown default:
                print("unkown message received")
            }
        } catch {
            // handle the error
            print(error)
        }
    }
}

setReceiveHandler()

Notice how, to receive messages from the socket, I must call receive with a completion handler. This method only allows me to receive a single incoming message, so I must re-set my handler after receiving a message to automatically begin listening for the next message.

This is a great example of a situation where an async for loop such as for try await message in socketConnection would make a lot of sense. Unfortunately, this is not possible out of the box. However, URLSessionWebSocketTask provides some form of support for async / await so we’re not entirely out of luck.

A basic implementation of web sockets with async / await

While URLSessionWebSocketTask doesn’t expose an AsyncSequence that emits incoming messages out of the box, it does come with an async version of the receive method you saw earlier.

This allows us to rewrite the example above as an async method as follows:

func setReceiveHandler() async {
    do {
        let message = try await socketConnection.receive()

        switch message {
        case let .string(string):
          print(string)
        case let .data(data):
          print(data)
        @unknown default:
          print("unkown message received")
        }
    } catch {
        print(error)
    }

    await setReceiveHandler()
}

This code works just fine, except we don’t really have a means to stop the recursion here. The code you saw earlier actually has the exact same issue; there’s no condition to stop listening for web socket messages even if the web socket connection has already been closed.

We could improve our code by only recursing if:

  1. We didn’t encounter any errors
  2. The socket connection is still active

This would look a bit as follows:

func setReceiveHandler() async {
    guard socketConnection.closeCode == .invalid else {
        return
    }

    do {
        let message = try await socketConnection.receive()

        switch message {
        case let .string(string):
          print(string)
        case let .data(data):
          print(data)
        @unknown default:
          print("unkown message received")
        }

        await setReceiveHandler()
    } catch {
        print(error)
    }
}

An open web socket’s closed code is always said to invalid to signal that the connection has not (yet) been closed. We can leverage this to check that our connection is still active before waiting for the next message to be received.

This is much better already because we respect closed sockets and failures much nicer now, but we could improve the readability of this code a tiny bit by leveraging a while loop instead of recursively calling the setReceiveHandler function:

func setReceiveHandler() async {
    var isActive = true

    while isActive && socketConnection.closeCode == .invalid {
        do {
            let message = try await socketConnection.receive()

            switch message {
            case let .string(string):
              print(string)
            case let .data(data):
              print(data)
            @unknown default:
              print("unkown message received")
            }
        } catch {
            print(error)
            isActive = false
        }
    }
}

To me, this version of the code is slightly easier to read but that might not be the case for you. It’s functionally equivalent so you can choose to use whichever option suits you best.

While this code works, I’m not quite happy with where we’ve landed right now. There’s a lot of logic in this function and I would prefer to separate handling the incoming values from the calls to socketConnection.receive() somehow. Ideally, I should be able to write the following:

do {
    for try await message in socketConnection {
        switch message {
        case let .string(string):
            print(string)
        case let .data(data):
            print(data)
        @unknown default:
            print("unkown message received")
      }
} catch {
    // handle error
}

This is much, much nicer from a call-site perspective and it would allow us to put the ugly bits elsewhere.

To do this, we can leverage the power of AsyncStream which allows us to build a custom async sequence of values.

Using AsyncStream to emit web socket messages

Given our end goal, there are a few ways for us to get where we want to be. The easiest way would be to write a function in an extension on URLSessionWebSocketTask that would encapsulate the while loop you saw earlier. This implementation would look as follows:

typealias WebSocketStream = AsyncThrowingStream<URLSessionWebSocketTask.Message, Error>

public extension URLSessionWebSocketTask {    
    var stream: WebSocketStream {
        return WebSocketStream { continuation in
            Task {
                var isAlive = true

                while isAlive && closeCode == .invalid {
                    do {
                        let value = try await receive()
                        continuation.yield(value)
                    } catch {
                        continuation.finish(throwing: error)
                        isAlive = false
                    }
                }
            }
        }
    }
}

To make the code a little bit easier to read, I’ve defined a typealias for my AsyncThrowingStream so we don’t have to look at the same long type signature all over the place.

The code above creates an instance of AsyncThrowingStream that asynchronously awaits new values from the web socket as long as the web socket is considered active and hasn't been closed. To emit incoming messages and potential errors, the continuation's yield and finish methods are used. These methods will either emit a new value (yield) or end the stream of values with an error (finish).

This code works great in many situations, but there is one issue. If we decide to close the web socket connection from the app's side by calling cancel(with:reason:) on our socketConnection, our WebSocketStream does not end. Instead, it will be stuck waiting for messages, and the call site will be stuck too.

Task {
    try await Task.sleep(for: .seconds(5))
    try await socketConnection.cancel(with: .goingAway, reason: nil)
}

Task {    
    do {
        for try await message in socketConnection.stream2 {
            // handle incoming messages
        }
    } catch {
        // handle error
    }

    print("this would never be printed")
}

If everything works as expected, our web socket connection will close after five seconds. At that point, our for loop should end and our print statement should execute, since the asynchronous stream is no longer active. Unfortunately, this is not the case, so we need to find a better way to model our stream.

URLSessionWebSocketTask does not provide a way for us to detect cancellation. So, I have found that it is best to use an object that wraps the URLSessionWebSocketTask, and to cancel the task through that object. This allows us to both end the async stream we are providing to callers and close the web socket connection with one method call.

Here’s what that object looks like:

class SocketStream: AsyncSequence {
    typealias AsyncIterator = WebSocketStream.Iterator
    typealias Element = URLSessionWebSocketTask.Message

    private var continuation: WebSocketStream.Continuation?
    private let task: URLSessionWebSocketTask

    private lazy var stream: WebSocketStream = {
        return WebSocketStream { continuation in
            self.continuation = continuation

            Task {
                var isAlive = true

                while isAlive && task.closeCode == .invalid {
                    do {
                        let value = try await task.receive()
                        continuation.yield(value)
                    } catch {
                        continuation.finish(throwing: error)
                        isAlive = false
                    }
                }
            }
        }
    }()

    init(task: URLSessionWebSocketTask) {
        self.task = task
        task.resume()
    }

    deinit {
        continuation?.finish()
    }

    func makeAsyncIterator() -> AsyncIterator {
        return stream.makeAsyncIterator()
    }

    func cancel() async throws {
        task.cancel(with: .goingAway, reason: nil)
        continuation?.finish()
    }
}

There’s a bunch of code here, but it’s not too bad. The first few lines are all about setting up some type aliases and properties for convenience. The lazy var stream is essentially the exact same code that you’ve already in the URLSessionWebSocketTask extension from before.

When our SocketStream's deinit is called we make sure that we end our stream. There’s also a cancel method that closes the socket connection as well as the stream. Because SocketStream conforms to AsyncSequence we must provide an Iterator object that’s used when we try to iterate over our SocketStreams. We simply ask our internal stream object to make an iterator and use that as our return value.

Using the code above looks as follows:

let url = URL(string: "ws://127.0.0.1:8080")!
let socketConnection = URLSession.shared.webSocketTask(with: url)
let stream = SocketStream(task: socketConnection)

Task {  
    do {
        for try await message in stream {
            // handle incoming messages
        }
    } catch {
        // handle error
    }

    print("this will be printed once the stream ends")
}

To cancel our stream after 5 seconds just like before, you can run the following task in parallel with our iterating task:

Task {
    try await Task.sleep(for: .seconds(5))
    try await stream.cancel()
}

Task {
    // iterate...
}

While this is pretty cool, we do have a bit of an issue here because of the following bit of code:

private lazy var stream: WebSocketStream = {
    return WebSocketStream { continuation in
        self.continuation = continuation

        Task {
            var isAlive = true

            while isAlive && task.closeCode == .invalid {
                do {
                    let value = try await task.receive()
                    continuation.yield(value)
                } catch {
                    continuation.finish(throwing: error)
                    isAlive = false
                }
            }
        }
    }
}()

The task that we run our while loop in won’t end unless we end our stream from within our catch block. If we manually close the web socket connection using the cancel method we write earlier, the call to receive() will never receive an error nor a value which means that it will be stuck forever.

The most reliable way to fix this is to go back to the callback based version of receive to drive your async stream:

private lazy var stream: WebSocketStream = {
    return WebSocketStream { continuation in
        self.continuation = continuation
        waitForNextValue()
    }
}()

private func waitForNextValue() {
    guard task.closeCode == .invalid else {
        continuation?.finish()
        return
    }

    task.receive(completionHandler: { [weak self] result in
        guard let continuation = self?.continuation else {
            return
        }

        do {
            let message = try result.get()
            continuation.yield(message)
            self?.waitForNextValue()
        } catch {
            continuation.finish(throwing: error)
        }
    })
}

With this approach we don’t have any lingering tasks, and our call site is as clean and concise as ever; we’ve only changed some of our internal logic.

In Summary

Swift Concurrency provides many useful features for writing better code, and Apple quickly adopted async / await for existing APIs. However, some APIs that would be useful are missing, such as iterating over web socket messages.

In this post, you learned how to use async streams to create an async sequence that emits web socket messages. You first saw a fully async / await version that was neat, but had memory and task lifecycle issues. Then, you saw a version that combines a callback-based approach with the async stream.

The result is an easy way to iterate over incoming web socket messages with async / await. If you have any questions, comments, or improvements for this post, please don't hesitate to reach out to me on Twitter.