La bibliothèque RxJS, en raison de son large choix d'opérateurs, est à juste titre considérée comme un outil extrêmement puissant dans l'arsenal du développeur. Dans cet article, je souhaite vous présenter le concept des opérateurs personnalisés RxJS avec des exemples de mise en œuvre.

RxJS . , . RxJS .
RxJS — , (observable) . , RxJS JavaScript (TypeScript). (identity), :
import { interval, Observable } from "rxjs";
import { take } from "rxjs/operators";
const source$ = interval(1000).pipe(take(3));
function identity<T>(source$: Observable<T>): Observable<T> {
return source$;
}
const results$ = source$.pipe(identity);
results$.subscribe(console.log);
// console output: 0, 1, 2
- .
( ) :
<Copy>
import { interval, Observable } from "rxjs";
import { take, tap } from "rxjs/operators";
const source$ = interval(1000).pipe(take(3));
function log<T>(source$: Observable<T>): Observable<T> {
return source$.pipe(tap(v => console.log(`log: ${v}`)));
}
const results$ = source$.pipe(log);
results$.subscribe(console.log);
// console output: log: 0, log: 1, log: 2
source$, pipe.
. , . :
import { interval, Observable } from "rxjs";
import { take, tap } from "rxjs/operators";
const source$ = interval(1000).pipe(take(3));
function logWithTag<T>(tag: string): (source$: Observable<T>) => Observable<T> {
return source$ =>
source$.pipe(tap(v => console.log(`logWithTag(${tag}): ${v}`)));
}
const results$ = source$.pipe(logWithTag("RxJS"));
results$.subscribe(console.log);
// console output: logWithTag(RxJS): 0, logWithTag(RxJS): 1, logWithTag(RxJS): 2
, MonoTypeOperatorFunction RxJS. , pipe :
import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { take, tap } from "rxjs/operators";
const source$ = interval(1000).pipe(take(3));
function logWithTag<T>(tag: string): MonoTypeOperatorFunction<T> {
return pipe(tap(v => console.log(`logWithTag(${tag}): ${v}`)));
}
const results$ = source$.pipe(logWithTag("RxJS"));
results$.subscribe(console.log);
// console output: logWithTag(RxJS): 0, logWithTag(RxJS): 1, logWithTag(RxJS): 2
RxJS .
. :
import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { take, tap } from "rxjs/operators";
const source$ = interval(1000).pipe(take(3));
function tapOnce<T>(job: Function): MonoTypeOperatorFunction<T> {
let isFirst = true;
return pipe(
tap(v => {
if (!isFirst) {
return;
}
job(v);
isFirst = false;
})
);
}
const results$ = source$.pipe(tapOnce(() => console.log("First value emitted")));
results$.subscribe(console.log);
results$.subscribe(console.log);
// console output: First value emitted, 0, 0, 1, 1, 2, 2
, defer:
import { defer, interval, MonoTypeOperatorFunction } from "rxjs";
import { take, tap } from "rxjs/operators";
const source$ = interval(1000).pipe(take(3));
function tapOnceUnique<T>(job: Function): MonoTypeOperatorFunction<T> {
return source$ =>
defer(() => {
let isFirst = true;
return source$.pipe(
tap(v => {
if (!isFirst) {
return;
}
job(v);
isFirst = false;
})
);
});
}
const results$ = source$.pipe(tapOnceUnique(() => console.log("First value emitted")));
results$.subscribe(console.log);
results$.subscribe(console.log);
// console output: First value emitted, 0, First value emitted, 0, 1, 1, 2, 2
tapOnce
.
firstTruthy
:
import { MonoTypeOperatorFunction, of, pipe } from "rxjs";
import { first } from "rxjs/operators";
const source1$ = of(0, "", "foo", 69);
function firstTruthy<T>(): MonoTypeOperatorFunction<T> {
return pipe(first(v => Boolean(v)));
}
const result1$ = source1$.pipe(firstTruthy());
result1$.subscribe(console.log);
// console output: foo
evenMultiplied
:
import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { filter, map, take } from "rxjs/operators";
const source2$ = interval(10).pipe(take(3));
function evenMultiplied(multiplier: number): MonoTypeOperatorFunction<number> {
return pipe(
filter(v => v % 2 === 0),
map(v => v * multiplier)
);
}
const result2$ = source2$.pipe(evenMultiplied(3));
result2$.subscribe(console.log);
// console output: 0, 6
liveSearch
:
import { ObservableInput, of, OperatorFunction, pipe } from "rxjs";
import { debounceTime, delay, distinctUntilChanged, switchMap } from "rxjs/operators";
const source3$ = of("politics", "sport");
type DataProducer<T> = (q: string) => ObservableInput<T>;
function liveSearch<R>(
time: number,
dataProducer: DataProducer<R>
): OperatorFunction<string, R> {
return pipe(
debounceTime(time),
distinctUntilChanged(),
switchMap(dataProducer)
);
}
const newsProducer = (q: string) =>
of(`Data fetched for ${q}`).pipe(delay(2000));
const result3$ = source3$.pipe(liveSearch(500, newsProducer));
result3$.subscribe(console.log);
// console output: Data fetched for sport
RxJS . , pipe-.
: [ ]
, - .
"JavaScript Developer. Professional". , , .