Understanding Connectable Observable Sequences in RxSwift

by: | Apr 9, 2019

[Editor’s note: This is the first of two posts on sharing subscriptions in RxSwift, to help developers learn how to use replay and share operators with RxSwift’s playground examples]

No matter if you’re a rookie or a wizard in RxSwift, you will eventually forget (or find out) that the chain of operators gets re-executed with each new subscription. Consider the following code:

let observable = Observable<Int>.create { observer -> Disposable in
  // Simulating a very resource expensive operation
  print("Creating observable...")
  DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 10, execute: {
    observer.onNext(1)
    observer.onCompleted()
  })
  return Disposables.create()
}

let subscriptionA = observable.subscribe()
let subscriptionB = observable.subscribe()

Listing 1: Example observable simulating resource expensive operation.

If you run the snippet above, you will see that Creating observable… is printed twice! That’s because the code inside the closure passed to the create method gets executed as many times as we subscribe. In the example, I performed two subscriptions in the last couple of lines.

This is a contrived example, but imagine that the operation inside the create closure is a network request. You would be performing the same request multiple times! This is where the share operator goes into action. We apply it for a single subscription to be shared across all subscribers. Thus, the observable won’t be recreated and the chain of operators (if any) won’t be re-executed.

let sharedSubscription = observable.share()
let subscriptionA = sharedSubscription.subscribe()
let subscriptionB = sharedSubscription.subscribe()

Listing 2: Basic usage of share operator.

Seems pretty easy, right? It is indeed, but things get complicated as we start to use share, replay, share with replay, share with scope, etc. Suddenly, you find yourself in complete chaos where you’re totally confused about how to use all of these operators properly.

To help overcome the confusion, I’ll explain how these operators work and how they should be used. The code I’m sharing is mostly an adaptation of the code found in the Connectable Operators playground within the RxSwift repo.

This first post focuses on the auxiliary functions used to perform the analyses and explaining the publish, replay and refCount operators. Those are fundamental for understanding the share operator, which is the focus of the second post.

These posts are written for developers with at least a basic understanding of the Swift 5.0 language and the RxSwift 4.5 library. Let’s get started!

Explaining the structure of code examples

Before going too far, I need first to explain how the code examples are structured:

_ = printSecondBoundary()
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)

_ = intSequence.printNext("Subscription A")
delay(2) { _ = intSequence.printNext("Subscription B") }
delay(4) { _ = intSequence.printNext("Subscription C") }

Listing 3: Basic structure of the code studied in this series.

The interval operator is used to create intSequence, an infinite stream of Ints that starts from 0 and where each new value is emitted after 1 second. Since I will perform time-based analysis, it’s important to keep in mind the fact that the intSequence takes 1 second to emit the first value.

And because many subscriptions to intSequence are required, I defined the printNext(:_) function within an extension of ObservableType to keep the code concise. It subscribes to an observable and prints the emitted values prefixed by a tag it receives as a parameter.

extension ObservableType {
  func printNext(_ tag: String? = nil) -> Disposable {
    return subscribe(onNext: { value in
      if let tag = tag {
        print("\(tag) -> \(value)")
      } else {
        print("\(value)")
      }
    })
  }
}

Listing 4: printNext function, a syntax sugar for subscribing and printing emitted values to the console.

RxSwift’s playground also brings the delay(_:closure:) function, which executes the given closure after the given amount of time. Since this function and intSequence both work with time, the order in which the closure is executed and values are emitted may vary. So, I’ve just added an offset to that function to keep the output constant. My changes force the closure to execute before values are emitted by intSequence. Now, there’s a delay(_:offset:closure:) function which is depicted in Listing 5.

func delay(_ delay: Double, offset: Double = 0.1, closure: @escaping () -> Void) {
  DispatchQueue.main.asyncAfter(deadline: .now() + delay - offset) {
    closure()
  }
}

Listing 5: Modified delay function, which receives an additional offset parameter.

To help visualizing the output, I also added the printSecondBoundary() function, which groups the logs that happen between one second and the next. To build that function, I used another interval operator delayed by 0.1 seconds so that it’s executed after intSequence emits values. Listing 6 shows the definition of that function and Output 1 presents example logs obtained with its usage.

func printSecondBoundary() -> Disposable {
  return Observable
    .interval(1, scheduler: MainScheduler.instance)
    .delay(0.1, scheduler: MainScheduler.instance)
    .subscribe(onNext: { index in
      print("----- t\(index + 1) -----\n")
    })
}

Listing 6: printSecondBoundary function, for time grouping event logs.

This happened at some point between second 0 and 1
----- t1 -----

This happened at some point between second 1 and 2
This happened at some point between second 1 and 2
----- t2 -----

This happened at some point between second 2 and 3
----- t3 -----

----- t4 -----

...

Output 1: Example output of using printSecondBoundary function.

To make things crystal clear, let’s briefly inspect the behavior of these auxiliary functions together. intSequence emits values at one-second intervals (e.g. 1.0, 2.0, etc.), while the delay function dispatches code to execute just before (e.g. 0.9, 1.9, etc.) and printSecondBoundary() closes the boundary just after (e.g. 1.1, 2.1, etc.) each interval. Output 2 depicts this behavior.

0.9 -> subscriptions (printNext) usually happen here due to the delay function
1.0 -> intSequence emits a value
1.1 -> printSecondBoundary() closes the boundary (i.e. ----- t1 -----)

Output 2: Behavior of the time-based auxiliary functions.

In this series, the subscriptions mostly take place inside a delay function. Therefore, keep in mind that a subscription delayed by T actually happens slightly before T.

Connectable Observable sequences

After knowing all about the auxiliary functions, there’s only one topic we still need to cover before having fun with share: Connectable Observable sequences. As defined by the RxSwift community:

“Connectable Observable sequences resemble ordinary Observable sequences, except that they do not begin emitting elements when subscribed to, but instead, only when their connect() method is called. In this way, you can wait for all intended subscribers to subscribe to a connectable Observable sequence before it begins emitting elements.”

Conceptually, that’s pretty self-explanatory. But let’s see what that really looks like in practice with some examples using publish, replay, and refCount operators.

Creating Connectable Observable sequences

The publish operator wraps a standard Observable into a ConnectableObservable. As per the definition, this wrapper is an Observable that starts emitting values after its connect() method is called. That behavior is shown in Listing 7 and Output 3 bellow:

_ = printSecondBoundary()
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
  .publish()

_ = intSequence.printNext("Subscription A")

delay(2) { _ = intSequence.connect() }
delay(4) { _ = intSequence.printNext("Subscription B") }

Listing 7: Example of the publish operator.

----- t1 -----

----- t2 -----

Subscription A -> 0
----- t3 -----

Subscription A -> 1
Subscription B -> 1
----- t4 -----

Subscription A -> 2
Subscription B -> 2
----- t5 -----

Output 3: Logs of Listing 7.

The first thing to notice here is that, despite A subscribing at t0 (before any delay), no values are printed until t3. That’s because values are emitted only after connecting, which happens at t2.

Second, notice that when B subscribes at t4 it emits the same value of A right away. What happens here is that the publish operator turns the original sequence into a single shared sequence (similar to the share operator we’ll see in Part 2), which started after connecting at t2. Also, remember that discussion about the delay function? B subscribes slightly before the next value (1) is emitted, causing it to print at t4.

There can only be 0 or 1 — and no more than that — shared subscriptions at any given time. Two or more subscribers actually share a single subscription.

Also, did you notice that connect() has a return value I just discarded? Connecting a ConnectableObservable actually means subscribing to the underlying Observable it wraps around. Thus, that return value is just a Disposable, which we can use for disconnecting (disposing the underlying subscription). Let’s play with that:

_ = printSecondBoundary()
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
  .debug("Subscription S")
  .publish()

let subscriptionA = intSequence.debug("Subscription A").subscribe()

var subscriptionS: Disposable!
delay(2) { subscriptionS = intSequence.connect() }
delay(4) { subscriptionS.dispose() }
delay(6) { _ = intSequence.connect() }

Listing 8: Example of disposing the underlying subscription of a ConnectableObservable.

2019-03-05 15:15:13.575: Subscription A -> subscribed
----- t1 -----

2019-03-05 15:15:15.555: Subscription S -> subscribed
----- t2 -----

2019-03-05 15:15:16.557: Subscription S -> Event next(0)
2019-03-05 15:15:16.558: Subscription A -> Event next(0)
----- t3 -----

2019-03-05 15:15:17.516: Subscription S -> isDisposed
----- t4 -----

----- t5 -----

2019-03-05 15:15:19.515: Subscription S -> subscribed
----- t6 -----

2019-03-05 15:15:20.516: Subscription S -> Event next(0)
----- t7 -----

Output 4: Logs from execution of Listing 8.

In Listing 8, I am using a debug operator instead of the printNext(:_) function. That is mainly for verifying the behavior of the underlying subscription, which I called S. Notice that when connect() is called, S is subscribed to. After that, all values emitted to S are passed along to A (which subscribes to the wrapping ConnectableObservable).

Moreover, when S is disposed at t4, both S and A stop printing values. Notice, however, that A is not disposed. Also, when S is resubscribed to at t6, values restart emitting to it at t7. That doesn’t happen for A. You should pay extra attention to this or you will wind up with a subscription in limbo.

Buffering elements in a Connectable Observable sequence

The replay operator is very similar to publish. The difference is that it uses a buffer for replaying the last emitted elements. Let’s look at an example followed by its output:

_ = printSecondBoundary()
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replay(1)

_ = intSequence.printNext("Subscription A")
delay(2) { _ = intSequence.connect() }
delay(4) { _ = intSequence.printNext("Subscription B") }

Listing 9: Example of the replay operator.

----- t1 -----

----- t2 -----

Subscription A -> 0
----- t3 -----

Subscription B -> 0
Subscription A -> 1
Subscription B -> 1
----- t4 -----

Subscription A -> 2
Subscription B -> 2
----- t5 -----

Output 5: Logs from execution of Listing 9.

Note that both the code example and the output are very similar to the ones for the publish operator (Listing 7 and Output 3). The only point of difference is in t4, where there is an additional 0 being printed for B. The explanation is that the replay operator buffered the last sent element, 0, and re-emitted it upon B‘s subscription. After that, A and B print the same values, since they’re sharing the same single subscription.

There is much more to understand about the replay operator. For example, it is sometimes confused with the share(replay:scope:) operator, but I’ll save that disambiguation and other details for the second part of this series.

Tracking subscriptions in Connectable Observable sequences

There is also an operator called refCount, which performs the opposite operation of publish. It turns a ConnectableObservable into a standard Observable. It also tracks the number of subscribers to decide whether to connect or disconnect the underlying connectable observable. The rule is straightforward:

If the number of subscribers changes from 0 to 1, it connects. If that number changes from 1 to 0, it disconnects. For any other cases, it just delivers a shared subscription.

Let’s take a look at Listing 9 and Output 5 to better understand this rule:

_ = printSecondBoundary()
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
  .publish().refCount()

let subscriptionA = intSequence.printNext("Subscription A")
var subscriptionB: Disposable!

delay(2) { subscriptionB = intSequence.printNext("Subscription B") }
delay(4) { subscriptionA.dispose() }
delay(6) { subscriptionB.dispose() }
delay(8) { _ = intSequence.printNext("Subscription C") }

Listing 10: Example of refCount operator.

Subscription A -> 0
----- t1 -----

Subscription A -> 1
Subscription B -> 1
----- t2 -----

Subscription A -> 2
Subscription B -> 2
----- t3 -----

Subscription B -> 3
----- t4 -----

Subscription B -> 4
----- t5 -----

----- t6 -----

----- t7 -----

----- t8 -----

Subscription C -> 0
----- t9 -----

Output 6: Logs from execution of Listing 10.

Here we have three subscriptions. First, A subscribes at t0. At this point, refCount‘s internal counter goes from 0 to 1, causing the connection of the underlying connectable observable and the actual creation of the sequence. A then starts printing values at t1.

At t2, B subscribes. Now the counter goes from 1 to 2, which means that the previously created (shared) subscription is delivered. A and B are now printing the same values.

By t6, both A and B are disposed, causing the counter to reach 0 again. Thus, the underlying connectable observable is disconnected. Notice that when C subscribes at t8, it starts printing from 0 since the Connectable Observable had to be reconnected and the sequence recreated.

Next up: the share operator

In this blog post, I explored Connectable Observable sequences and explored in detail the behavior of the publish, replay and refCount operators by performing time-based analysis on RxSwift’s playground adapted code. There’s also another connectable operator called multicast that I won’t cover in this series because it is not commonly used (and it is less important in understanding shared subscriptions).

In the second post, we will use the same connectable operators and auxiliary functions to look in-depth at the share operator. I’ll explain its key features and how it differs from the replay operator. Make sure to follow us on Twitter, Facebook and LinkedIn to find out when my next post is published!