RxJS Error and Completed events demystified

Motivation

RxJS documentation is huge and relatively hard to grasp. Some entry-level stuff is just omitted as "obvious". But neither error nor completion handling can be called such in RxJS. One could imagine a dozen of possible behaviors satisfying the same terminology.

So if terms are not self-speaking we should build a mental model of the RxJS behavior. And we will do that by some sort of mental reverse-engineering.

We're going to rediscover design decisions on which RxJS were built upon and which honestly should be listed in docs in plain text.

Introduction

To be short I will call Stream an instance of Observable or Subject where both behave equally. Take Observable and Subject (without quotes) as instances of corresponding classes.

It's useful to view Stream as Promise relative.

According to docs Promise can be either pending or settled. Settled Promise is either fullfilled or rejected.

By analogy, Stream can be either "passive" (Cold), "active" (Hot and Cold) or "settled" (Hot and Cold). Settled Stream can be either "completed" or "failed".

It means that onError and onCompleted events are mututally exclusive. Each one of them settles Stream immediately.

According to my tests, there seems to be no difference on "settlement" behavior between Hot and Cold Observables. So I will omit those tests for brevity.

Here and below phrase "Stream completes" should be read as "Stream emits onCompleted event to it's Observers". Phrase "Stream fails" should be read as "Stream emits onError event to it's Observers".

Just as Promise, Stream can be terminated without error or completion if program terminates or by different reasons. Keep that in mind as it may be counter-intuitive at start.

Here and below I assume that every chunk of code we test should start with

let {Observable, Subject} = require("rx")

function Listener(name) {
  return {
    onNext: function (v) {
      console.log(name + ".onNext:", v)
    },
    onError: function (err) {
      console.log(name + ".onError:", err.message)
    },
    onCompleted: function () {
      console.log(name + ".onCompleted!")
    },
  }
}

Completion

Experiments

1. Finite Observable completes naturally.

let obs = Observable.of(1, 2)
obs.subscribe(Listener("obs"))
obs.onNext: 1
obs.onNext: 2
obs.onCompleted!
(exit)

2. Infinite Observable never completes. Termination by Ctrl-C or process.exit does not settle Observable.

let obs = Observable.interval(100)
obs.subscribe(Listener("obs"))
setTimeout(process.exit, 500)
obs.onNext: 1
obs.onNext: 2
^C
obs.onNext: 1
obs.onNext: 2
obs.onNext: 3
obs.onNext: 4
(exit)

3. Subject does not block process from natural exit. Natural exit does not settle Subject.

let subj = new Subject()
subj.subscribe(Listener("subj"))
(exit)

4. Subject can be manually completed.

let subj = new Subject()
subj.subscribe(Listener("subj"))
setTimeout(() => subj.onCompleted(), 500)
subj.onCompleted!

5. Completion settles Observable.

Remark: dunno how to demonstrate

6. Completion settles Subject.

let subj = new Subject()
subj.subscribe(Listener("obs"))
subj.onCompleted()
subj.onNext(1) // no effect
obs.onCompleted!

7. Completion propagates downstream from Observable to Observable.

let obs1 = Observable.of(1, 2)
let obs2 = obs1.map(x => x)
obs2.subscribe(Listener("obs"))
obs.onNext: 1
obs.onNext: 2
obs.onCompleted!

8. Completion propagates downstream from Subject to Subject.

let subj1 = new Subject()
let subj2 = new Subject()
subj1.subscribe(subj2)
subj2.subscribe(Listener("subj2"))
(exit)
let subj1 = new Subject()
let subj2 = new Subject()
subj1.subscribe(subj2)
subj2.subscribe(Listener("subj2"))
subj1.onCompleted()
subj2.onCompleted!

9. Completion propagates downstream from Subject to Observable.

let subj = new Subject()
let obs = subj.map(x => x)
obs.subscribe(Listener("obs"))
(exit)
let subj = new Subject()
let obs = subj.map(x => x)
obs.subscribe(Listener("obs"))
subj.onCompleted()
obs.onCompleted!

10. Completion propagates downstream from Observable to Subject.

let obs = Observable.of(1, 2)
let subj = new Subject()
subj.subscribe(Listener("subj")) // order of subscription
obs.subscribe(subj)              // matters here!
subj.onNext: 1
subj.onNext: 2
subj.onCompleted!

11. Subject subscribed to infinite Observable never completes. Interruption by Ctrl-C or process.exit does not settle Subject.

let obs = Observable.interval(100)
let subj = new Subject()
subj.subscribe(Listener("subj")) // order of subscriptions
obs.subscribe(subj)              // matters here!
subj.onNext: 0
subj.onNext: 1
subj.onNext: 2
^C

12. Error does not complete Observable.

let obs = Observable.throw(Error("foo"))
obs.subscribe(Listener("obs"))
obs.onError: foo

13. Disposal does not settle Observable.

let obs = Observable.interval(100)
let subs = obs.subscribe(Listener("obs"))
setTimeout(() => subs.dispose(), 500)
obs.onNext: 0
obs.onNext: 1
obs.onNext: 2
obs.onNext: 3

Rules

1. Completion should not be taken as "event at the end of the program" or something. It's an event which may happen or not.

Three possible ways for Stream to complete:

Process termination / subscriber disposal may leave Stream unsettled.

2. Completion always terminate Stream. Nothing happens in a Stream after completion.

3. Completion always propagate downstream. Observable completes if/when upstream Observable or Subject completes. Subject completes if/when upstream Observable or Subject completes.

4. There is no special difference between Hot and Cold Observable on completion handling.

Error

Here and below I assume that every chunk of code we test should start with

// ... (put Completion starter here) ...

function brokenMapper(v) {
  if (v == 2) {
    throw Error("oops")
  } else {
    return v
  }
}

function request(url) {
  if (url == "url1") {
    return Observable.throw("foo")
  } else {
    return Observable.just("[response for " + url + "]")
  }
}

Experiments

1. Uncaught Error fails finite Observable.

let obs = Observable.of(1, 2, 3).map(brokenMapper)
obs.subscribe(Listener("obs"))
obs.onNext: 1
obs.onError: oops

2. Caught Error completes finite Observable.

let obs = Observable.of(1, 2, 3).map(brokenMapper).catch(Observable.just("foo"))
obs.subscribe(Listener("obs"))
obs.onNext: 1
obs.onNext: foo
obs.onCompleted!

3. Uncaught Error fails infinite Observable.

let obs = Observable.interval(100).map(brokenMapper)
obs.subscribe(Listener("obs"))
obs.onNext: 1
obs.onError: oops

4. Caught Error completes infinite Observable.

let obs = Observable.interval(100).map(brokenMapper).catch(Observable.just("foo"))
obs.subscribe(Listener("obs"))
obs.onNext: 0
obs.onNext: 1
obs.onNext: foo
obs.onCompleted!

5. Uncaught Error fails Subject.

let subj = new Subject()
subj.subscribe(Listener("subj"))
subj.onError(Error("oops"))
subj.onNext(1) // no effect
subj.onError: oops

6. No way to catch Error in Subject with basic constructor.

7. Error propagates downstream from Observable to Observable.

let obs1 = Observable.interval(100)
let obs2 = obs1.map(brokenMapper)
obs2.subscribe((Listener("obs2")))
obs2.onNext: 0
obs2.onNext: 1
obs2.onError: oops

8. Error propagates downstream from Subject to Subject.

let subj1 = new Subject()
let subj2 = new Subject()
subj1.subscribe(subj2)
subj2.subscribe(Listener("subj2"))
subj1.onError(Error("oops"))
subj1.onNext(1) // no effect
subj2.onError: oops

9. Error propagates downstream from Observable to Subject.

let obs = Observable.interval(100).map(brokenMapper)
let subj = new Subject()
subj.subscribe(Listener("subj")) // order of subscription
obs.subscribe(subj)              // matters here!
subj.onNext: 0
subj.onNext: 1
subj.onError: oops

10. Error propagates downstream from Subject to Observable.

let subj = new Subject()
let obs = subj.map(x => x)
obs.subscribe((Listener("obs")))
subj.onError(Error("oops"))
subj.onNext(1) // no effect
obs.onError: oops

Read more about different catch options here.

11. Subscription to failed Stream fail

let subj = new Subject()

subj.subscribe(console.log, console.log, () => console.log("end"))
subj.onError(Error("boom!"))

subj.subscribe(console.log) // throws
subj.onNext("foo")
Error: boom!

Rules

1. Just as Completion, Error is an event which may happen or not.
The main difference is that Error can possibly be caught.

Three possible ways for Stream to fail:

Process termination / subscriber disposal may leave Stream unsettled.

2. Uncaught Error always terminate Stream. Nothing happens in a Stream after uncaught error.

3. There are multiple options to catch errors, retry, etc. deserving a separate article.

4. Error always propagate downstream. Observable fails if/when upstream Observable or Subject fails. Subject fails if/when upstream Observable or Subject fails.

5. There is no special difference between hot and cold Observable on error handling.

6. There is no special difference between finite and infinite Observables on error handling.

Conclusion

Error and Completion events behave similarly and can be described with a little set of rules. This is a sign of a good design. They are mutually exclusive events which mean only one or none of them can happen until program terminates. The most productive step to understand behavior of Observable and Subject on errors and completions is to draw an analogy between them and Promises until proper specification comes.