visit
With a neat pipe
function, you can string them together and create amazing data pipelines. So, buckle up as we're going to craft some custom pipeable RxJS operators!
Consider this: You’ve got an observable spitting out numbers. You can use the map()
operator to transform each number by squaring it:
import { of } from 'rxjts';
import { map } from 'rxjs/operators';
const numbers$ = of(1, 2, 3, 4, 5);
numbers$.pipe(
map(num => num * num)
).subscribe(square => console.log(square));
// Output: 1, 4, 9, 16, 25
In this scenario, we whipped up an Observable numbers$
using the of()
function. Then, we used the map()
operator to transform each number by squaring it, giving us the squared numbers as the output. Easy, right?
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const numbers$ = of(1, 2, 3, 4, 5);
numbers$.pipe(
filter(number => number % 2 === 0),
map(number => number * 2)
).subscribe(number => console.log(number));
// Output: 4, 8
We combined two operators, filter()
and map()
, to filter out even numbers and double them. It's like Lego blocks for code!
import { pipe } from 'rxjs';
import { map } from 'rxjs/operators';
function multiplyByTwo() {
return function (source) {
return source.pipe(map(value => value * 2));
}
}
In this snippet, we crafted a function named multiplyByTwo
that churns out another function. This new function takes in an observable (source
) and returns a new observable that doubles every value.
import { of } from 'rxjs';
of(1, 2, 3, 4, 5).pipe(multiplyByTwo()).subscribe(console.log);
// Output: 2, 4, 6, 8, 10
You can also create custom operators that accept parameters. Let’s make our multiplyByTwo
operator more generic:
function multiplyBy(factor) {
return function (source) {
return source.pipe(map(value => value * factor));
}
}
of(1, 2, 3, 4, 5).pipe(multiplyBy(3)).subscribe(console.log);
// Output: 3, 6, 9, 12, 15
Can we build an operator from scratch? Absolutely! Let’s build from scratch the same multiplyBy
operator, but this time without using any of the existing operators:
import { Observable } from 'rxjs';
function multiplyBy(factor) {
return function (source) {
return new Observable(observer => {
const subscription = source.subscribe({
next(value) {
observer.next(value * factor);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
}
});
return {
unsubscribe() {
subscription.unsubscribe();
}
}
});
}
}
Whew! We did it! We crafted an operator from scratch that takes care of next()
, error()
, and complete()
. This operator even cleans up after itself when the observable is done, and it’s important not to forget the unsubscribe logic while implementing your own operators later.
You’re almost there on becoming an RxJS operators’ maestro. But first, let’s get some more practice in. I’ll guide you through two more examples where we’ll recreate the existing tap
and delay
operators.
Creating a custom tap
operator is pretty straightforward:
function customTap<T>(callback: (value: T) => void): OperatorFunction<T, T> {
return function(source) {
return new Observable(observer => {
const subscription = source.subscribe({
next(value) {
callback(value);
observer.next(value);
},
error(err) { observer.error(err); },
complete() { observer.complete(); }
});
return subscription;
});
}
}
Now, let’s create a custom delay
operator. This one needs JavaScript's setTimeout
:
function customDelay<T>(delayDuration: number): OperatorFunction<T, T> {
return function(source) {
return new Observable(observer => {
const subscription = source.subscribe({
next(value) {
setTimeout(() => observer.next(value), delayDuration);
},
error(err) { observer.error(err); },
complete() { observer.complete(); }
});
return subscription;
});
}
}
In this case, we’re using setTimeout
to hold off on sending each value from the source observable for a certain amount of time. Cool, right?
We’re going to design and recreate the well-known mergeMap
andswitchMap
operators, which are some of the most commonly used built-in operators.
Making a mergeMap
operator is a bit trickier. Here's a simple version:
function customMergeMap<T, R>(projection: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R> {
return function(source) {
return new Observable(observer => {
let index = 0;
const outerSubscription = source.subscribe({
next(value) {
const innerObservable = from(projection(value, index++));
innerObservable.subscribe({
next(innerValue) {
observer.next(innerValue);
},
error(err) { observer.error(err); }
});
},
error(err) { observer.error(err); },
complete() { observer.complete(); }
});
return () => outerSubscription.unsubscribe();
});
}
}
The customMergeMap
function takes in the source Observable and, for each value it emits, uses the projection function to create an inner Observable. It then subscribes to this inner Observable and sends out its values.
Now, the switchMap
operator works somewhat like mergeMap
, but it cancels the inner subscription when the source sends out a new value:
function customSwitchMap<T, R>(projection: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R> {
return function(source) {
return new Observable(observer => {
let index = 0;
let innerSubscription: Subscription;
const outerSubscription = source.subscribe({
next(value) {
if (innerSubscription) {
innerSubscription.unsubscribe();
}
const innerObservable = from(projection(value, index++));
innerSubscription = innerObservable.subscribe({
next(innerValue) {
observer.next(innerValue);
},
error(err) { observer.error(err); }
});
},
error(err) { observer.error(err); },
complete() { observer.complete(); }
});
return () => {
outerSubscription.unsubscribe();
if (innerSubscription) {
innerSubscription.unsubscribe();
}
}
});
}
}
customSwitchMap
uses a projection function to map each value from the source Observable to a new Observable. Every time the source emits a value, it unsubscribes from the current inner Observable and subscribes to a new one, hence "switching" Observables.
Consider adding try/catch
blocks to handle errors in user-provided functions:
try {
const result = projection(value);
observer.next(result);
} catch (err) {
observer.error(err);
}
innerObservable.subscribe({
next(innerValue) { observer.next(innerValue); },
error(err) { observer.error(err); },
complete() { /* Handle completion here */ }
});
return () => {
outerSubscription.unsubscribe();
innerSubscription?.unsubscribe();
/* Additional cleanup logic here */
};
Also published here