Javascript is required
Mokkapps LogoMichael Hoffmann

How I Write Marble Tests For RxJS Observables In Angular

Michael Hoffmann (Mokkapps) - Senior Frontend Developer (Freelancer)
Michael Hoffmann
Nov 12, 2018
9 min read
How I Write Marble Tests For RxJS Observables In Angular Image

I am a passionate Reactive Extensions user and mostly use them in RxJS, which is integrated into the Angular framework.

In Angular, I often use observables in my services and need to write tests for these asynchronous data streams.

Unfortunately, testing observables is hard, and to be honest, I often need more time to write unit tests for my streams than to implement them in the production code itself. But luckily, there exists an integrated solution for RxJS which helps write this kind of test: the so-called marble tests.

Marble testing is not difficult if you are already familiar with representing asynchronous data streams as marble diagrams. In this blog post, I want to introduce you to the concept of marble diagrams, the basics of marble testing, and examples of how I use them in my projects.

I will not provide a general introduction to RxJS in this article, but I can highly recommend this article to refresh the basics.

Marble Testing

There exists an official documentation about marble testing for RxJS users but it can be tough to get started since there were a lot of changes from v5 to v6. Therefore I want to start by explaining the basics, show an exemplary Angular test implementation and in the end, talk about some of the new RxJS 6 features.

Marble Diagrams

For easier visualization of RxJS observables, a new domain-specific language called "marble diagram" was introduced.

Marble Diagrams are visual representations of how operators work and include the input Observable(s), the operator and its parameters, and the output Observable.

The following image from the official documentation describes the anatomy of a marble diagram:

Angular Marble Diagram Anatomy

In a marble diagram, time flows to the right, and the diagram describes how values (“marbles”) are emitted on the Observable execution.

Marble Syntax

In RxJS marble tests, the marble diagrams are represented as a string containing a special syntax representing events happening over virtual time. The start of time (also called the zero frame) in any marble string is always represented by the first character in the string.

  • - time: 10 "frames" of time passage.
  • | complete: The successful completion of an observable. This is the observable producer signaling complete().
  • # error: An error terminating the observable. This is the observable producer signaling error().
  • "a" any character: All other characters represent a value being emitted by the producer signaling next().
  • () sync groupings: When multiple events need to be in the same frame synchronously, parentheses are used to group those events. You can group nested values, a completion or an error in this manner. The position of the initial ( determines the time at which its values are emitted.
  • ^ subscription point: (hot observables only) shows the point at which the tested observables will be subscribed to the hot observable. This is the "zero frame" for that observable, every frame before the ^ will be negative.


- or ------: Equivalent to Observable.never(), or an observable that never emits or completes

|: Equivalent to Observable.empty()

#: Equivalent to Observable.throw()

--a--: An observable that waits for 20 "frames", emits value a and then never completes.

--a--b--|: On frame 20 emit a, on frame 50 emit b, and on frame 80, complete

--a--b--#: On frame 20 emit a, on frame 50 emit b, and on frame 80, error

-a-^-b--|: In a hot observable, on frame -20 emit a, then on frame 20 emit b, and on frame 50, complete.

--(abc)-|: on frame 20, emit a, b, and c, then on frame 80 complete

-----(a|): on frame 50, emit a and complete.

A Practical Angular Example

As you now know the theoretical basis, I want to show you a real-world Angular example.

In this GitHub repository, I have implemented a basic test setup which I will now explain in detail. The Angular CLI project consists of these components and services:


This service provides a public getter getUsers(), which returns an Observable that emits a new username each second.

import { Injectable } from '@angular/core'import { Observable, interval } from 'rxjs'import { take, map } from 'rxjs/operators'@Injectable({  providedIn: 'root',})export class UserService {  private readonly testData = ['Anna', 'Bert', 'Chris']  get getUsers(): Observable<string> {    return interval(1000).pipe(      take(this.testData.length),      map((i) => this.testData[i])    )  }}


This service injects the above introduced UserService and provides the public getter getModifiedUsers. This getter also returns an Observable and maps the emitted usernames from userService.getUsers to make them more "mighty".

import { Injectable } from '@angular/core'import { map } from 'rxjs/operators'import { Observable } from 'rxjs'import { UserService } from './user.service'@Injectable({  providedIn: 'root',})export class AllMightyService {  get getModifiedUsers(): Observable<string> {    return this.userService.getUsers.pipe(map((user) => `Mighty ${user}`))  }  constructor(private userService: UserService) {}}


In our app.component.ts, we inject the UserService and update a list each time a new username is emitted from the getUsers Observable.

import { Component, OnDestroy, OnInit } from '@angular/core'import { Subscription } from 'rxjs'import { UserService } from './services/user.service'@Component({  selector: 'app-root',  templateUrl: './app.component.html',  styleUrls: ['./app.component.scss'],})export class AppComponent implements OnInit, OnDestroy {  title = 'MarbleDemo'  users: string[] = []  private subscription: Subscription | undefined  constructor(private userService: UserService) {}  ngOnInit() {    this.subscription = this.userService.getUsers.subscribe((user) => {      this.users.push(user)    })  }  ngOnDestroy() {    if (this.subscription) {      this.subscription.unsubscribe()    }  }}
<div style="text-align:center"><h1>Welcome to {{ title }}!</h1></div><h2>Here will users pop in asynchronously:</h2><ul>  <li class="user" *ngFor="let user of users"><h2>{{user}}</h2></li></ul>

Now we can write different unit tests for this project:

  • Test that the AppComponent shows the correct list of usernames
  • Test that the AllMightyService correctly maps and emits the usernames

Let us start with the unit test for the AppComponent.

In these tests, I am using the npm package jasmine-marbles which is a helper library that provides a neat API for marble tests if you are using jasmine (which is used per default in Angular).

Basic idea is to mock the public observables from the provided services and test our asynchronous data streams in a synchronous way.

We mock the UserService and the getUsers observable. In the test case we flush all observables by calling getTestScheduler().flush(). This means that after this line has been executed our mocked observable has emitted all of its events and we can run our test assertions. I will talk more about the TestScheduler after this example.

import { TestBed, async } from '@angular/core/testing'import { getTestScheduler, cold } from 'jasmine-marbles'import { AppComponent } from './app.component'import { UserService } from './services/user.service'import { By } from '@angular/platform-browser'describe('AppComponent', () => {  let userService: any  beforeEach(async(() => {    // Here we mock the UserService to a cold Observable emitting three names    userService = jasmine.createSpy('UserService')    userService.getUsers = cold('a-b-c', { a: 'Mike', b: 'Flo', c: 'Rolf' })    TestBed.configureTestingModule({      declarations: [AppComponent],      providers: [{ provide: UserService, useValue: userService }],    }).compileComponents()  }))  it('should correctly show all user names', async () => {    const fixture = TestBed.createComponent(AppComponent)    fixture.detectChanges() // trigger change detection    getTestScheduler().flush() // flush the observable    fixture.detectChanges() // trigger change detection again    const liElements = fixture.debugElement.queryAll(By.css('.user'))    expect(liElements.length).toBe(3)    expect(liElements[0].nativeElement.innerText).toBe('Mike')    expect(liElements[1].nativeElement.innerText).toBe('Flo')    expect(liElements[2].nativeElement.innerText).toBe('Rolf')  })})

In the next step, let us analyze a service test, in this case for the AllMightyService.

import { hot, cold } from 'jasmine-marbles'import { TestScheduler } from 'rxjs/testing'import { AllMightyService } from './all-mighty.service'import { fakeAsync } from '@angular/core/testing'describe('AllMightyService', () => {  let sut: AllMightyService  let userService: any  beforeEach(() => {    // we mock the getUsers Observable of the UserService    userService = jasmine.createSpy('UserService')    userService.getUsers = hot('^-a-b-c', {      a: 'Hans',      b: 'Martin',      c: 'Julia',    })    sut = new AllMightyService(userService)  })  it('should be created', () => {    expect(sut).toBeTruthy()  })  it('should correctly return mighty users (using jasmine-marbles)', () => {    // Here we define the Observable we expect to be returned by "getModifiedUsers"    const expectedObservable = cold('--a-b-c', {      a: 'Mighty Hans',      b: 'Mighty Martin',      c: 'Mighty Julia',    })    expect(sut.getModifiedUsers).toBeObservable(expectedObservable)  })})

The TestScheduler

As we already saw in the first AppComponent test, RxJS provides a TestScheduler for "time manipulation".

The internal schedulers control the emission order of events in RxJS. Most of the time, we do not have to care about the schedulers as they are handled mainly by RxJS internally. But we can provide a scheduler to operators, as we can see in the signature of the "delay" operator:

delay(delay: number | Date, scheduler: Scheduler): Observable

The last parameter is optional and defaults to the async Scheduler. RxJS includes the following Schedulers:

  • AsyncScheduler
  • AnimationFrameScheduler
  • AsapScheduler
  • QueueScheduler
  • TestScheduler
  • VirtualTimeScheduler

To avoid using real time in our test, we can pass the TestScheduler (who derives from the VirtualTimeScheduler) to our operator. The TestScheduler allows us to manipulate the time in our test cases and synchronously write asynchronous tests.

New RxJS 6 marble test features

In RxJS v5 there was nearly no documentation for the TestScheduler as it was mainly used internally by the library authors. Since RxJS 6 this has changed and we can now use the TestScheduler to write marble tests.

In previous RxJS versions, we had to pass the Scheduler to our operators in production code to be able to test them with virtual time manipulation.

getUsers(scheduler) {    const dummyData = Observable.from(['Anna', 'Bert', 'Chris']);    return dummyData.delay(1000, scheduler); // each user is emitted after 1 second}

As you can see, we are now mixing our productive code with logic, we only need for tests. The scheduler parameter is only added and used for the tests.

This issue is solved by the new run method. Every RxJS operator who uses the AsyncScheduler (for example "timer" or "debounce") will automatically use the TestScheduler when it is executed inside the run method and therefore uses virtual instead of real time.

This way, the same method above can be rewritten without the scheduler parameter and has no more test code inside the production code:

getUsers() {    const dummyData = Observable.from(['Anna', 'Bert', 'Chris');    return dummyData.delay(1000); // each user is emitted after 1 second}

A unit test for the AllMightyService's getModifiedUsers method using the new run method can look this way:

it('should correctly return mighty users (using RxJS 6 tools)', () => {  const scheduler = new TestScheduler((actual, expected) => {    // asserting the two objects are equal    expect(actual).toEqual(expected)  }) => {    const { expectObservable } = helpers    const coldObservable = scheduler.createHotObservable('^-a-b-c', {      a: 'Hans',      b: 'Martin',      c: 'Julia',    })    userService.getUsers = coldObservable    sut = new AllMightyService(userService)    const expectedMarble = '--a-b-c'    const expectedVales = {      a: 'Mighty Hans',      b: 'Mighty Martin',      c: 'Mighty Julia',    }    expectObservable(sut.getModifiedUsers).toBe(expectedMarble, expectedVales)  })})

It looks the same as in our jasmine-marble test above, but the new run method provides some interesting new features like the Time progression syntax.

At this time the TestScheduler can only be used to test code that uses timers, like delay/debounceTime/etc (i.e. it uses AsyncScheduler with delays > 1). If the code consumes a Promise or does scheduling with AsapScheduler/AnimationFrameScheduler/etc it cannot be reliably tested with TestScheduler, but instead should be tested more traditionally. See the Known Issues section for more details.


Marble diagrams are an established concept to visualize asynchronous data, as seen on the popular website RxMarbles. Using marble strings, we can also use this clean way to test our observables.

I recommend getting started by using helper libraries like jasmine-marbles as they are more beginner-friendly. You can combine your jasmine-marble tests with the new RxJS 6 features in the same project I demonstrate in my example project.

From my experience, I can tell you that it is worth learning marble testing as you can then test very complex observable streams, understandably.

I hope that you are now able to start using marble tests in your project and that you will begin enjoying writing unit tests for observables.