Redux-Observabil este un middleware bazat pe RxJS pentru Redux care permite dezvoltatorilor să lucreze cu acțiuni asincronizate. Este o alternativă la redux-thunk și redux-saga.
Acest articol acoperă elementele de bază ale RxJS, modul de configurare Redux-Observables și câteva dintre cazurile sale practice de utilizare. Dar înainte de asta, trebuie să înțelegem Model de observator.
Table of Contents
Model de observator
În modelul Observator, un obiect numit „Observabil” sau „Subiect”, menține o colecție de abonați numită „Observatori”. Când starea subiecților se schimbă, ea notifică toți observatorii săi.
În JavaScript, cel mai simplu exemplu ar fi emițătoarele de evenimente și gestionarele de evenimente.
Când o faci .addEventListener
, împingeți un observator în colecția de observatori a subiectului. Ori de câte ori se întâmplă evenimentul, subiectul notifică toți observatorii.

RxJS
Conform site-ului oficial,
RxJS este implementarea JavaScript a ReactiveX, o bibliotecă pentru compunerea de programe asincrone și bazate pe evenimente utilizând secvențe observabile.
În termeni simpli, RxJS este o implementare a modelului Observer. De asemenea, extinde tiparul de observator oferind operatori care ne permit să compunem observabile și subiecte într-o manieră declarativă.
Observatorii, observabilele, operatorii și subiectele sunt elementele de bază ale RxJS. Deci, să ne uităm la fiecare în detaliu acum.
Observatori
Observatorii sunt obiecte care se pot abona la Observabile și subiecte. După abonare, pot primi notificări de trei tipuri – următoare, eroare și completă.
Orice obiect cu următoarea structură poate fi folosit ca observator.
interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
Când observabilul împinge următoarea, eroarea și notificările complete, ale observatorului .next
, .error
, și .complete
se invocă metode.
Observabile
Observabilele sunt obiecte care pot emite date pe o perioadă de timp. Poate fi reprezentat folosind „diagrama de marmură”.

În cazul în care linia orizontală reprezintă timpul, nodurile circulare reprezintă datele emise de Observabil, iar linia verticală indică faptul că Observabilul s-a finalizat cu succes.

Observabilele pot întâmpina o eroare. Crucea reprezintă eroarea emisă de Observabil.
Stările „finalizate” și „eroare” sunt definitive. Asta înseamnă că Observables nu pot emite date după finalizarea cu succes sau întâmpinarea unei erori.
Crearea unui observabil
Observabilele sunt create folosind new Observable
constructor care ia un argument – funcția subscribe. Observabilele pot fi create și folosind unii operatori, dar despre asta vom vorbi mai târziu când vom vorbi despre Operatori.
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
// Subscribe function
});
Abonarea la un Observable
Observabilele pot fi abonate folosind lor .subscribe
metoda și trecerea unui observator.
observable.subscribe({
next: (x) => console.log(x),
error: (x) => console.log(x),
complete: () => console.log('completed');
});
Executarea unui observabil
Funcția de subscriere pe care am trecut-o către new Observable
constructorul este executat de fiecare dată când Observable este subscris.
Funcția de abonare acceptă un argument – Abonatul. Abonatul seamănă cu structura unui observator și are aceleași 3 metode: .next
, .error
, și .complete
.
Observabilele pot trimite date către observator folosind .next
metodă. Dacă observabilul s-a finalizat cu succes, poate notifica observatorul folosind .complete
metodă. Dacă observabilul a întâmpinat o eroare, poate împinge eroarea către observator folosind .error
metodă.
// Create an Observable
const observable = new Observable(subscriber => {
subscriber.next('first data');
subscriber.next('second data');
setTimeout(() => {
subscriber.next('after 1 second - last data');
subscriber.complete();
subscriber.next('data after completion'); // <-- ignored
}, 1000);
subscriber.next('third data');
});
// Subscribe to the Observable
observable.subscribe({
next: (x) => console.log(x),
error: (x) => console.log(x),
complete: () => console.log('completed')
});
// Outputs:
//
// first data
// second data
// third data
// after 1 second - last data
// completed
Observabilele sunt Unicast
Observabile sunt unicast, ceea ce înseamnă că Observables poate avea cel mult un abonat. Când un Observator se abonează la un Observabil, acesta primește o copie a Observabilului care are propria cale de execuție, făcând Observabilele unicast.
Este ca și cum ai viziona un videoclip YouTube. Toți spectatorii vizionează același conținut video, dar pot viziona diferite segmente ale videoclipului.
Exemplu: permiteți-ne să creăm un Observable care emite 1-10 în 10 secunde. Apoi, abonați-vă la Observable o dată imediat și din nou după 5 secunde.
// Create an Observable that emits data every second for 10 seconds
const observable = new Observable(subscriber => {
let count = 1;
const interval = setInterval(() => {
subscriber.next(count++);
if (count > 10) {
clearInterval(interval);
}
}, 1000);
});
// Subscribe to the Observable
observable.subscribe({
next: value => {
console.log(`Observer 1: ${value}`);
}
});
// After 5 seconds subscribe again
setTimeout(() => {
observable.subscribe({
next: value => {
console.log(`Observer 2: ${value}`);
}
});
}, 5000);
/* Output
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 1
Observer 1: 6
Observer 2: 2
Observer 1: 7
Observer 2: 3
Observer 1: 8
Observer 2: 4
Observer 1: 9
Observer 2: 5
Observer 1: 10
Observer 2: 6
Observer 2: 7
Observer 2: 8
Observer 2: 9
Observer 2: 10
*/
În ieșire, puteți observa că al doilea observator a început să tipărească de la 1, chiar dacă s-a abonat după 5 secunde. Acest lucru se întâmplă deoarece al doilea Observator a primit o copie a Observabilului a cărei funcție de abonare a fost invocată din nou. Aceasta ilustrează comportamentul unicast al Observables.
Subiecte
Un subiect este un tip special de observabil.
Crearea unui subiect
Un subiect este creat folosind new Subject
constructor.
import { Subject } from 'rxjs';
// Create a subject
const subject = new Subject();
Abonarea la un subiect
Abonarea la un subiect este similară cu abonarea la un observabil: utilizați .subscribe
metoda și treceți un observator.
subject.subscribe({
next: (x) => console.log(x),
error: (x) => console.log(x),
complete: () => console.log("done")
});
Executarea unui subiect
Spre deosebire de Observables, un subiect își numește propriul .next
, .error
, și .complete
metode de transmitere a datelor către observatori.
// Push data to all observers
subject.next('first data');
// Push error to all observers
subject.error('oops something went wrong');
// Complete
subject.complete('done');
Subiecții sunt Multicast
Subiecții sunt multidifuziune: mai mulți observatori împărtășesc același subiect și calea sa de execuție. Înseamnă că toate notificările sunt transmise tuturor observatorilor. Este ca și cum ai viziona un program live. Toți spectatorii urmăresc același segment al aceluiași conținut în același timp.
Exemplu: haideți să creăm un subiect care emite de la 1 la 10 în 10 secunde. Apoi, abonați-vă la Observable o dată imediat și din nou după 5 secunde.
// Create a subject
const subject = new Subject();
let count = 1;
const interval = setInterval(() => {
subscriber.next(count++);
if (count > 10) {
clearInterval(interval);
}
}, 1000);
// Subscribe to the subjects
subject.subscribe(data => {
console.log(`Observer 1: ${data}`);
});
// After 5 seconds subscribe again
setTimeout(() => {
subject.subscribe(data => {
console.log(`Observer 2: ${data}`);
});
}, 5000);
/* OUTPUT
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 5
Observer 1: 6
Observer 2: 6
Observer 1: 7
Observer 2: 7
Observer 1: 8
Observer 2: 8
Observer 1: 9
Observer 2: 9
Observer 1: 10
Observer 2: 10
*/
În rezultat, puteți observa că al doilea observator a început să tipărească de la 5 în loc să înceapă de la 1. Acest lucru se întâmplă deoarece al doilea observator împarte același subiect. De când s-a abonat după 5 secunde, subiectul a terminat deja emiterea de la 1 la 4. Aceasta ilustrează comportamentul multicast al unui subiect.
Subiecții sunt atât observabili, cât și observatori
Subiecții au .next
, .error
și .complete
metode. Asta înseamnă că urmează structura Observatorilor. Prin urmare, un subiect poate fi folosit și ca observator și transmis către .subscribe
funcția Observabilelor sau a altor Subiecte.
Exemplu: să creăm un Observabil și un Subiect. Apoi, abonați-vă la Observable folosind subiectul ca observator. În cele din urmă, abonați-vă la subiect. Toate valorile emise de Observabil vor fi împinse către Subiect, iar Subiectul va transmite valorile primite către toți Observatorii săi.
// Create an Observable that emits data every second
const observable = new Observable(subscriber => {
let count = 1;
const interval = setInterval(() => {
subscriber.next(count++);
if (count > 5) {
clearInterval(interval);
}
}, 1000);
});
// Create a subject
const subject = new Subject();
// Use the Subject as Observer and subscribe to the Observable
observable.subscribe(subject);
// Subscribe to the subject
subject.subscribe({
next: value => console.log(value)
});
/* Output
1
2
3
4
5
*/
Operatori
Operatorii sunt ceea ce face RxJS util. Operatorii sunt funcții pure care returnează un nou Observabil. Acestea pot fi clasificate în 2 categorii principale:
- Operatori de creație
- Operatori conductabili
Operatori de creație
Operatorii de creație sunt funcții care pot crea un nou Observabil.
Exemplu: putem crea un Observable care emite fiecare element al unui tablou folosind from
operator.
const observable = from([2, 30, 5, 22, 60, 1]);
observable.subscribe({
next: (value) => console.log("Received", value),
error: (err) => console.log(err),
complete: () => console.log("done")
});
/* OUTPUTS
Received 2
Received 30
Received 5
Received 22
Received 60
Received 1
done
*/
Același lucru poate fi observabil folosind diagrama de marmură.

Operatori conductabili
Operatorii conductabili sunt funcții care iau un observabil ca intrare și returnează un nou observabil cu un comportament modificat.
Exemplu: să luăm Observabilul pe care l-am creat folosind from
operator. Acum, folosind acest Observable, putem pentru a crea un nou Observable care emite numai numere mai mari de 10 folosind filter
operator.
const greaterThanTen = observable.pipe(filter(x => x > 10));
greaterThanTen.subscribe(console.log, console.log, () => console.log("completed"));
// OUTPUT
// 11
// 12
// 13
// 14
// 15
Același lucru poate fi reprezentat folosind diagrama de marmură.

Există mult mai mulți operatori utili acolo. Puteți vedea lista completă a operatorilor împreună cu exemple în documentația oficială RxJS Aici.
Este crucial să înțelegem toți operatorii utilizați în mod obișnuit. Iată câțiva operatori pe care îi folosesc des:
mergeMap
switchMap
exhaustMap
map
catchError
startWith
delay
debounce
throttle
interval
from
of
Redux Observables
Conform site-ului oficial,
RxJS– middleware bazat pe Redux. Compuneți și anulați acțiuni asincronizate pentru a crea efecte secundare și multe altele.
În Redux, ori de câte ori este expediată o acțiune, aceasta rulează prin toate funcțiile reductorului și se returnează o nouă stare.
Redux-observable ia toate aceste acțiuni expediate și stări noi și creează două observabile din acesta – Acțiuni observabile action$
, și statele observabile state$
.
Acțiunile observabile vor emite toate acțiunile care sunt expediate folosind store.dispatch()
. Statele observabile vor emite toate noile obiecte de stare returnate de reductorul rădăcină.
Epopee
Conform site-ului oficial,
Este o funcție care ia un flux de acțiuni și returnează un flux de acțiuni. Acțiuni în, acțiuni în afara.
Epopeile sunt funcții care pot fi utilizate pentru a vă abona la Acțiuni și Observabile ale statelor. Odată abonate, epopeile vor primi fluxul de acțiuni și stări ca intrare și trebuie să returneze un flux de acțiuni ca ieșire. Actions In – Actions Out.
const someEpic = (action$, state$) => {
return action$.pipe( // subscribe to actions observable
map(action => { // Receive every action, Actions In
return someOtherAction(); // return an action, Actions Out
})
)
}
Este important să înțelegem că toate acțiunile primite în Epic au deja a terminat de rulat prin reductoare.
Într-o Epic, putem folosi orice tipare observabile RxJS, iar acest lucru face util redux-observabile.
Exemplu: putem folosi .filter
operator pentru a crea un nou observabil intermediar. În mod similar, putem crea orice număr de observabile intermediare, dar rezultatul final al observabilului final trebuie să fie o acțiune, altfel o excepție va fi ridicată de redux-observabil.
const sampleEpic = (action$, state$) => {
return action$.pipe(
filter(action => action.payload.age >= 18), // can create intermediate observables and streams
map(value => above18(value)) // where above18 is an action creator
);
}
Fiecare acțiune emisă de Epics este trimisă imediat folosind store.dispatch()
.
Înființat
Mai întâi, să instalăm dependențele.
npm install --save rxjs redux-observable
Creați un folder separat numit epics
să păstrez toate epopeile. Creați un fișier nou index.js
în interiorul epics
folder și combinați toate epopeile folosind combineEpics
funcție pentru a crea epopeea rădăcină. Apoi exportați epopeea rădăcină.
import { combineEpics } from 'redux-observable';
import { epic1 } from './epic1';
import { epic2 } from './epic2';
const epic1 = (action$, state$) => {
...
}
const epic2 = (action$, state$) => {
...
}
export default combineEpics(epic1, epic2);
Creați un middleware epic folosind createEpicMiddleware
funcția și treceți-o la createStore
Funcția de redux.
import { createEpicMiddleware } from 'redux-observable';
import { createStore, applyMiddleware } from 'redux';
import rootEpic from './rootEpics';
const epicMiddleware = createEpicMiddlware();
const store = createStore(
rootReducer,
applyMiddleware(epicMiddlware)
);
În cele din urmă, treceți rădăcina epică la middleware-ul epic .run
metodă.
epicMiddleware.run(rootEpic);
Unele cazuri practice
RxJS are o mare curbă de învățare, iar configurarea observabilă redux înrăutățește procesul deja dureros de configurare Redux. Tot ceea ce face ca Redux observabil să arate ca o exagerare. Iată însă câteva cazuri practice de utilizare care vă pot răzgândi.
De-a lungul acestei secțiuni, voi compara redux-observables cu redux-thunk pentru a arăta cum redux-observables poate fi util în cazuri de utilizare complexe. Nu urăsc redux-thunk, îl ador și îl folosesc în fiecare zi!
1. Efectuați apeluri API
Utilizare caz: Efectuați un apel API pentru a prelua comentariile unei postări. Afișați încărcătoarele când apelul API este în desfășurare și gestionați, de asemenea, erorile API.
O implementare redux-thunk va arăta astfel,
function getComments(postId){
return (dispatch) => {
dispatch(getCommentsInProgress());
axios.get(`/v1/api/posts/${postId}/comments`).then(response => {
dispatch(getCommentsSuccess(response.data.comments));
}).catch(() => {
dispatch(getCommentsFailed());
});
}
}
iar acest lucru este absolut corect. Dar creatorul de acțiune este umflat.
Putem scrie un Epic pentru a-l implementa folosind redux-observables.
const getCommentsEpic = (action$, state$) => action$.pipe(
ofType('GET_COMMENTS'),
mergeMap((action) => from(axios.get(`/v1/api/posts/${action.payload.postId}/comments`).pipe(
map(response => getCommentsSuccess(response.data.comments)),
catchError(() => getCommentsFailed()),
startWith(getCommentsInProgress())
)
);
Acum ne permite să avem un creator de acțiune simplu și curat ca acesta,
function getComments(postId) {
return {
type: 'GET_COMMENTS',
payload: {
postId
}
}
}
2. Solicitați dezabonarea
Utilizare caz: Furnizați completarea automată pentru un câmp de text apelând un API ori de câte ori se modifică valoarea câmpului de text. Apelul API trebuie efectuat la o secundă după ce utilizatorul a încetat să scrie.
O implementare redux-thunk va arăta astfel,
let timeout;
function valueChanged(value) {
return dispatch => {
dispatch(loadSuggestionsInProgress());
dispatch({
type: 'VALUE_CHANGED',
payload: {
value
}
});
// If changed again within 1 second, cancel the timeout
timeout && clearTimeout(timeout);
// Make API Call after 1 second
timeout = setTimeout(() => {
axios.get(`/suggestions?q=${value}`)
.then(response =>
dispatch(loadSuggestionsSuccess(response.data.suggestions)))
.catch(() => dispatch(loadSuggestionsFailed()))
}, 1000, value);
}
}
Necesită o variabilă globală timeout
. Când începem să utilizăm variabile globale, creatorii noștri de acțiune nu mai sunt funcții pure. De asemenea, devine dificil să testați unitatea creatorii de acțiune care utilizează o variabilă globală.
Putem implementa același lucru cu redux-observable folosind .debounce
operator.
const loadSuggestionsEpic = (action$, state$) => action$.pipe(
ofType('VALUE_CHANGED'),
debounce(1000),
mergeMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe(
map(response => loadSuggestionsSuccess(response.data.suggestions)),
catchError(() => loadSuggestionsFailed())
)),
startWith(loadSuggestionsInProgress())
);
Acum, creatorii noștri de acțiune pot fi curățați și, mai important, pot fi din nou funcții pure.
function valueChanged(value) {
return {
type: 'VALUE_CHANGED',
payload: {
value
}
}
}
3. Solicitați anularea
Utilizare caz: Continuând cazul de utilizare anterior, presupunem că utilizatorul nu a tastat nimic timp de 1 secundă și am făcut primul apel API pentru a prelua sugestiile.
Să presupunem că API-ul în sine durează în medie 2-3 secunde pentru a returna rezultatul. Acum, dacă utilizatorul tastează ceva în timp ce primul apel API este în desfășurare, după o secundă, vom face al doilea API. Putem ajunge să avem două apeluri API în același timp și poate crea o condiție de cursă.
Pentru a evita acest lucru, trebuie să anulăm primul apel API înainte de a efectua al doilea apel API.
O implementare redux-thunk va arăta astfel,
let timeout;
var cancelToken = axios.cancelToken;
let apiCall;
function valueChanged(value) {
return dispatch => {
dispatch(loadSuggestionsInProgress());
dispatch({
type: 'VALUE_CHANGED',
payload: {
value
}
});
// If changed again within 1 second, cancel the timeout
timeout && clearTimeout(timeout);
// Make API Call after 1 second
timeout = setTimeout(() => {
// Cancel the existing API
apiCall && apiCall.cancel('Operation cancelled');
// Generate a new token
apiCall = cancelToken.source();
axios.get(`/suggestions?q=${value}`, {
cancelToken: apiCall.token
})
.then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions)))
.catch(() => dispatch(loadSuggestionsFailed()))
}, 1000, value);
}
}
Acum, este nevoie de o altă variabilă globală pentru a stoca simbolul de anulare Axios. Mai multe variabile globale = mai multe funcții impure!
Pentru a implementa același lucru utilizând redux-observable, tot ce trebuie să facem este să înlocuim .mergeMap
cu .switchMap
.
const loadSuggestionsEpic = (action$, state$) => action$.pipe(
ofType('VALUE_CHANGED'),
throttle(1000),
switchMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe(
map(response => loadSuggestionsSuccess(response.data.suggestions)),
catchError(() => loadSuggestionsFailed())
)),
startWith(loadSuggestionsInProgress())
);
Deoarece nu necesită nicio modificare a creatorilor noștri de acțiune, aceștia pot continua să fie funcții pure.
În mod similar, există multe cazuri de utilizare în care Redux-Observables strălucește de fapt! De exemplu, sondarea unui API, afișarea snack-barurilor, gestionarea conexiunilor WebSocket, etc.
A concluziona
Dacă dezvoltați o aplicație Redux care implică astfel de cazuri de utilizare complexe, este foarte recomandat să utilizați Redux-Observables. La urma urmei, beneficiile utilizării acesteia sunt direct proporționale cu complexitatea aplicației dvs. și este evident din cazurile de utilizare practice menționate mai sus.
Cred cu tărie că folosirea setului corect de biblioteci ne va ajuta dezvoltați aplicații mult mai curate și care pot fi întreținuteși, pe termen lung, beneficiile utilizării acestora vor depăși dezavantajele.