Learn reactive programming frameworks by building your own

by: | Mar 13, 2019

Reactive programming frameworks like ReactiveCocoa and RxSwift have become quite popular in the iOS world in the last few years. The value of the frameworks is to express application logic in terms of event streams that trigger side effects — for example, screen updates when new data is retrieved from the server.

In that example, fetching the data would be the source of events. These events could then be fed to a chain of observers, which will parse, persist, combine and format the data to be displayed on the user interface.

Even though these frameworks can be powerful, the learning curve to using them to their full potential is steep. Sometimes that learning curve might be a barrier to the adoption. In order to understand the mechanics behind these somewhat magical tools, I decided to create my own reactive framework from scratch.

The result of my work is a simple library called YARF (Yet Another Reactive Framework), which we are going to rebuild together. If you follow along, you’ll walk away with a better understanding of third-party frameworks and how to integrate them within your code.

In this first post, we’ll create some basic functionality present in the frameworks. (We’ll create additional building blocks in future posts, so stay tuned!).

Most of my experience using a reactive framework is with RxSwift, so most of my references will naturally relate more to that framework.

The observer pattern

The foundation stone is the first stone set in a masonry foundation. Every other stone is set in relation to this one, which makes it the most important building block.

The observer pattern is the foundation stone of reactive programming frameworks like RxSwift. Usually, the entire framework is built around the implementation of that pattern, so we are going to start there.

Here is the definition from Wikipedia:

“The observer pattern is a software design pattern in which an object, called the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods.”

Swift basic implementation

In Swift, the most obvious approach is to implement the observer pattern using protocols that represent the contract between the observable and the observer. In the example below, we implement the observable class Car (which notifies its listeners about changes in speed) and an observer for it: the Speedometer (which prints the values it receives).

import Foundation

protocol CarObserver { // Observable protocol declaration
  func car(_ car: Car, didUpdateSpeedTo speed: Float)
}

final class Car {
  private var observers: [CarObserver] = [] // List of registered observers

  func add(observer: CarObserver) {
    observers.append(observer)
  }
  //...
}

final class Speedometer: CarObserver {
  func car(_ car: Car, didUpdateSpeedTo speed: Float) {
    print("Speed: \(speed)")
  }
}

In order for the Speedometer to receive the speed update events, we need to notify it of the changes by calling the method from our observable protocol. For example, we could have a speed property and notify our observers in the didSet like this:

final class Car {
  var speed: Float = 0 {
    didSet {
      for observer in observers { observer.car(self, didUpdateSpeedTo: speed) }
    }
  }
  //...

A functional approach to implementing the observer pattern

In languages like Swift, where we have functions as first-class citizens and such functions can be declared as in-line closures, there is another way to represent this contract without using protocols.

typealias SpeedUpdateHandler = (Float) -> ()

final class Car {
  private var observers: [SpeedUpdateHandler] = [] // List of registered observers

  func add(observer: @escaping SpeedUpdateHandler) {
    observers.append(observer)
  }
  //...

In this example, instances of SpeedUpdateHandler will be our observers and we do not need to declare a new class or structto implement a protocol every time we need a different behavior to be triggered by an event. In RxSwift, this would be a closure you would pass as a parameter to subscribe methods.

OK, but how do we stop observing events now?

Closures in Swift are reference types, although, for various reasons that are beyond the scope of this article, Swift doesn’t support testing closures and functions for equality.

Given that restriction, we need an alternative way to identify our observers in the list so we can remove them. So, we need to wrap the observer function in something we can identify.

Here is how such a wrapper would look like:

typealias ObserverToken = String

class SpeedUpdateHandlerWrapper {
  let identity: ObserverToken = UUID().uuidString
  let observer: SpeedUpdateHandler

  init(wrapping wrapped: @escaping SpeedUpdateHandler) {
    self.observer = wrapped
  }
}

In the example above we could have used a struct instead of a class and get the same results, but there is a reason to choose a class (more on that later).

We also need to update our Car class to use our wrapper internally and return the wrapper token so we can request the removal of our observers later.

final class Car {
  private var observers: [SpeedUpdateHandlerWrapper] = []
  var speed: Float = 0 {
    didSet { for wrapper in observers { wrapper.observer(speed) } }
  }

  func add(observer: @escaping SpeedUpdateHandler) -> ObserverToken {
    let wrapper = SpeedUpdateHandlerWrapper(wrapping: observer)
    observers.append(wrapper)
    return wrapper.identity
  }

  func removeObserver(token: ObserverToken) {
    observers.removeAll { $0.identity == token }
  }
  //...
}

The add(observer:) function now instantiates the wrapper to enclose our observer block and returns the generated identity of the instance. You might also have noticed that, instead of calling the observer directly, we now need to access it through the observer property of our wrapper.

Housekeeping

Now we are able to add and remove observers for our events, but the closures keep strong references to their context variables (unless otherwise specified) and are being stored in arrays that keep strong references to them.

This means if we do not unsubscribe before the object holding the token is deallocated, that observer will stay in the list forever and might keep references to things we don’t want it to.

Unexpected behavior may arise when that observer receives new events, so we need to do our housekeeping and unsubscribe all our observers when the token holder is deallocated (deinit is called).

Let’s update our Speedometer example to use our new interface for observing speed changes and put the call to removeObserver in our deinit.

class Speedometer {
  var observerToken: ObserverToken!

  init(car: Car) {
    self.observerToken = car.add { [weak self] speed in
      self?.show(speed: speed)
    }
  }

  deinit {
    car.removeObserver(token: observerToken)
  }
  //...

But wait… there is a better way!

Using the deinit for that works, but it’s easy to forget to do it. So, what if those subscriptions could remove themselves when the token holder is deallocated?

To achieve that, we need to transform our ObserverToken into our subscription holder and the token holder will simply keep it alive by keeping a reference to it. This way, when the token holder is deallocated, the token itself is deallocated along with it and removes the subscription in the process.

typealias SpeedUpdateHandler = (Float) -> ()

class ObserverToken {
  let unsubscribe: () -> ()

  init(unsubscribe: @escaping () -> ()) {
    self.unsubscribe = unsubscribe
  }

  deinit { unsubscribe() }
}

We also need to update our Car to give our tokens the callback to remove the subscription.

final class Car {
  func add(observer: @escaping SpeedUpdateHandler) -> ObserverToken {
    let wrapper = SpeedUpdateHandlerWrapper(wrapping: observer)
    observers.append(wrapper)
    return ObserverToken(unsubscribe: {
      self.removeObserver(token: wrapper.identity)
    })
  }
  //...

Making your reactive component reusable

Now that we know how to create an observable object and how to manage our subscriptions to events, we can leverage this knowledge to create a way to easily add observable events to any class or struct.

We are going to create a generic implementation in our reactive framework that is going to be somewhat like a middle man, where we register our observers and also post events to be pushed to those observers.

typealias UpdateHandler<T> = (T) -> ()

class UpdateHandlerWrapper<T> {
  let identity: String = UUID().uuidString
  let observer: UpdateHandler<T>

  init(wrapping wrapped: @escaping UpdateHandler<T>) {
    self.observer = wrapped
  }
}

final class ObservableEvent<T> {
  private var observers: [UpdateHandlerWrapper<T>] = [] // List of registered observers
  func post(update: T) {
    for wrapper in observers { // Notify all observers of the change
      wrapper.observer(update)
    }
  }

  func add(observer: @escaping UpdateHandler<T>) -> ObserverToken {
    let wrapper = UpdateHandlerWrapper(wrapping: observer)
    observers.append(wrapper)
    return ObserverToken(unsubscribe: {
      self.removeObserver(token: wrapper.identity)
    })
  }

  private func removeObserver(token: String) {
    observers.removeAll { $0.identity == token }
  }
}

And here is how our Car class looks when we use the component we just created for notifying speed updates:

final class Car {
  var speedUpdates: ObservableEvent = .init()
  var speed: Float = 0 {
    didSet { speedUpdates.post(update: speed) }
  }

  func accelerate(amount: Float) {
    speed += amount
  }
}

Last but not least, here is an example of how we’d put it all together to implement our Speedometer.

class Speedometer {
  func show(speed: Float) {
    print("New speed \(speed)")
  }

  var observerToken: ObserverToken!

  init(car: Car) {
    self.observerToken = car.speedUpdates.add { [weak self] speed in
      self?.show(speed: speed)
    }
  }
}

let car = Car()

var speedometer: Speedometer? = Speedometer(car: car)

car.accelerate(amount: 10) // calls to method that triggers updates
car.accelerate(amount: 20)
speedometer = nil // deallocating speedometer removes observer
car.accelerate(amount: 30) // new updates do NOT get printed

/*
  // CONSOLE OUTPUT:
  New speed 10.0
  New speed 30.0
  */

Conclusion

We just created a generic implementation of an observable. This is pretty much the most basic component of libraries such as RxSwift, but it’s a long way from a super powerful framework like RxSwift, though. You now have at least a little bit better understanding of the basic machinery behind it.

You can find a working playground of the reactive framework with all the code above in my Github repo here.

On the next post of this series about reactive programming frameworks, we’ll use the ObservableEvent to create an ObservableProperty and create basic operators to interact with it. Meanwhile, if you have any feedback or I missed anything, please leave a comment below.