visit
If you develop an Angular application, one of the first things you hear about is the RxJS library. And it's true — I haven't seen an Angular application without the RxJS library and it's hard to imagine such an application i.e. if it's not just markup. So the question is: why do you need RxJS for Angular applications? Why is so important? Let's dig into it.
To create an Observable, you need data. This data can vary by its source, resulting in two different types of Observables: hot and cold. If your data is produced by the Observable itself, it's a cold Observable. and when your data is produced outside it - it's a hot one.
There are four types of Subjects:
Subject. The base type allows multicasting to multiple observers. It doesn't hold any initial value/the current value and doesn't replay it to new subscribers:
import { Subject } from 'rxjs';
let subject = new Subject();
subject.subscribe(data => console.log(data));
subject.next('Hello');
// Output => Subscriber A: Hello
BehaviorSubject. BehaviorSubject requires a default value at the time of its creation. This default value is emitted to any subscribers when they first subscribe. When new values are emitted, they are pushed to all current subscribers:
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject('First');
subject.subscribe(data => console.log(data));
// Output => First
subject.next('Second');
// Output => Second
ReplaySubject. It allows to specify a buffer size, which determines how many recent values will be sent to new subscribers:
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(2);
subject.next('First');
subject.next('Second');
subject.next('Third');
subject.subscribe(data => console.log(data));
// Output => Second, Third
AsyncSubject. AsyncSubject is designed to only deliver the last value emitted by the Observable, and only after the Observable completes. If the Observable is not complete, the AsyncSubject will not emit any values to its subscribers:
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.next('First');
subject.next('Second');
subject.next('Third');
subject.complete();
subject.subscribe(data => console.log(data));
// Output => Third
Operators are an integral part of RxJS, allowing us to transform, combine, manipulate, and work with our Observables in various ways. They are simply functions that take an input Observable, transform it in some way, and return a new Observable. You can chain operators together using the pipe()
function on an Observable. The order in which you place your operators in the pipe does matter.
map. This is the same as an array method in JavaScript — it transforms the data stream values with a given project function:
import { of, map } from 'rxjs';
const source = of(1, 2, 3);
const example = source.pipe(map(val => val + 10));
example.subscribe(val => console.log(val));
// Output: 11, 12, 13
filter. Just like its array counterpart, filter only emits values from the source Observable that satisfy a specified condition:
import { of, filter } from 'rxjs';
const source = of(1, 2, 3, 4, 5);
const example = source.pipe(filter(num => num % 2 === 0));
example.subscribe(val => console.log(val));
// Output: 2, 4
catchError. It catches the errors if they occur in your Observable
import { of, throwError, catchError } from 'rxjs';
const source = throwError(new Error('This is an error!'));
const example = source.pipe(catchError(val => of(`I caught: ${val}`)));
example.subscribe(val => console.log(val));
// Output: I caught: This is an error!
of. This operator converts the arguments to an Observable sequence
import { of } from 'rxjs';
const source = of(1, 2, 3);
source.subscribe(val => console.log(val));
// Output: 1, 2, 3
from. This operator turns an array, promise, or iterable into an Observable
import { from } from 'rxjs';
const array = [1, 2, 3, 4, 5];
const source = from(array);
source.subscribe(val => console.log(val));
// Output: 1, 2, 3, 4, 5
take. This operator only takes the first count values emitted by the source Observable, then completes.
import { of, take } from 'rxjs';
const source = of(1, 2, 3, 4, 5);
const example = source.pipe(take(3));
example.subscribe(val => console.log(val));
// Output: 1, 2, 3
Caution! Failing to unsubscribe from Observables can lead to memory leaks. This usually happens when an Observable continues to produce values that are no longer needed but are still being consumed due to an active subscription. To prevent this, it's important to unsubscribe when the data is no longer needed. Operators like take
and takeUntil
can be used to automatically complete an Observable and unsubscribe from it once a certain condition is met. However, remember that not all Observables require manual unsubscribing, particularly those that complete or emit a finite number of values.
import { map } from 'rxjs';
this.http.get('//api.example.com/data')
.pipe(map(({ results }) => results)
.subscribe(data => console.log(data));
import { debounceTime } from 'rxjs';
this.form.get('name').valueChanges.subscribe(newValue => console.log(newValue));
// will output value from 'name' control on every change
this.form.get('query').valueChanges
.pipe(debounceTime(500))
.subscribe(query => this.performSearch(query));
// will invoke 'performSearch' after 500ms of last typed value
import { BehaviorSubject } from 'rxjs';
// In a shared service
private dataSubject = new BehaviorSubject(null);
public data$ = this.dataSubject.asObservable();
public updateData(newData) {
this.dataSubject.next(newData);
}
// In component A
this.sharedService.updateData('Hello from Component A');
// In component B
this.sharedService.data$.subscribe(data => console.log(data)); // 'Hello from Component A'
These links can help you master the RxJS library: