Redux-Observabil este un middleware bazat pe RxJS pentru Redux care permite dezvoltatorilor să lucreze cu acțiuni asincronizate. Este o alternativă la redux-thunk și redux-saga.

Acest articol acoperă elementele de bază ale RxJS, modul de configurare Redux-Observables și câteva dintre cazurile sale practice de utilizare. Dar înainte de asta, trebuie să înțelegem Model de observator.

Model de observator

În modelul Observator, un obiect numit „Observabil” sau „Subiect”, menține o colecție de abonați numită „Observatori”. Când starea subiecților se schimbă, ea notifică toți observatorii săi.

În JavaScript, cel mai simplu exemplu ar fi emițătoarele de evenimente și gestionarele de evenimente.

Când o faci .addEventListener, împingeți un observator în colecția de observatori a subiectului. Ori de câte ori se întâmplă evenimentul, subiectul notifică toți observatorii.

Un ghid pentru incepatori pentru RxJS si Redux observabil
Model de observator

RxJS

Conform site-ului oficial,

RxJS este implementarea JavaScript a ReactiveX, o bibliotecă pentru compunerea de programe asincrone și bazate pe evenimente utilizând secvențe observabile.

În termeni simpli, RxJS este o implementare a modelului Observer. De asemenea, extinde tiparul de observator oferind operatori care ne permit să compunem observabile și subiecte într-o manieră declarativă.

Observatorii, observabilele, operatorii și subiectele sunt elementele de bază ale RxJS. Deci, să ne uităm la fiecare în detaliu acum.

Observatori

Observatorii sunt obiecte care se pot abona la Observabile și subiecte. După abonare, pot primi notificări de trei tipuri – următoare, eroare și completă.

Orice obiect cu următoarea structură poate fi folosit ca observator.

interface Observer<T> {
    closed?: boolean;
    next: (value: T) => void;
    error: (err: any) => void;
    complete: () => void;
}

Când observabilul împinge următoarea, eroarea și notificările complete, ale observatorului .next, .error, și .complete se invocă metode.

Observabile

Observabilele sunt obiecte care pot emite date pe o perioadă de timp. Poate fi reprezentat folosind „diagrama de marmură”.

1612179850 997 Un ghid pentru incepatori pentru RxJS si Redux observabil
Finalizat cu succes Observabil

În cazul în care linia orizontală reprezintă timpul, nodurile circulare reprezintă datele emise de Observabil, iar linia verticală indică faptul că Observabilul s-a finalizat cu succes.

1612179850 985 Un ghid pentru incepatori pentru RxJS si Redux observabil
Observabil cu o eroare

Observabilele pot întâmpina o eroare. Crucea reprezintă eroarea emisă de Observabil.

Stările „finalizate” și „eroare” sunt definitive. Asta înseamnă că Observables nu pot emite date după finalizarea cu succes sau întâmpinarea unei erori.

Crearea unui observabil

Observabilele sunt create folosind new Observable constructor care ia un argument – funcția subscribe. Observabilele pot fi create și folosind unii operatori, dar despre asta vom vorbi mai târziu când vom vorbi despre Operatori.

import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
   // Subscribe function 
});

Abonarea la un Observable

Observabilele pot fi abonate folosind lor .subscribe metoda și trecerea unui observator.

observable.subscribe({
    next: (x) => console.log(x),
    error: (x) => console.log(x),
    complete: () => console.log('completed');
});

Executarea unui observabil

Funcția de subscriere pe care am trecut-o către new Observable constructorul este executat de fiecare dată când Observable este subscris.

Funcția de abonare acceptă un argument – Abonatul. Abonatul seamănă cu structura unui observator și are aceleași 3 metode: .next, .error, și .complete.

Observabilele pot trimite date către observator folosind .next metodă. Dacă observabilul s-a finalizat cu succes, poate notifica observatorul folosind .complete metodă. Dacă observabilul a întâmpinat o eroare, poate împinge eroarea către observator folosind .error metodă.

// Create an Observable
const observable = new Observable(subscriber => {
   subscriber.next('first data');
   subscriber.next('second data');
   setTimeout(() => {
       subscriber.next('after 1 second - last data');
       subscriber.complete();
       subscriber.next('data after completion'); // <-- ignored
   }, 1000);
   subscriber.next('third data');
});

// Subscribe to the Observable
observable.subscribe({
    next: (x) => console.log(x),
    error: (x) => console.log(x),
    complete: () => console.log('completed')
});

// Outputs:
//
// first data
// second data
// third data
// after 1 second - last data
// completed

Observabilele sunt Unicast

Observabile sunt unicast, ceea ce înseamnă că Observables poate avea cel mult un abonat. Când un Observator se abonează la un Observabil, acesta primește o copie a Observabilului care are propria cale de execuție, făcând Observabilele unicast.

Este ca și cum ai viziona un videoclip YouTube. Toți spectatorii vizionează același conținut video, dar pot viziona diferite segmente ale videoclipului.

Exemplu: permiteți-ne să creăm un Observable care emite 1-10 în 10 secunde. Apoi, abonați-vă la Observable o dată imediat și din nou după 5 secunde.

// Create an Observable that emits data every second for 10 seconds
const observable = new Observable(subscriber => {
	let count = 1;
    const interval = setInterval(() => {
		subscriber.next(count++);
        
        if (count > 10) {
        	clearInterval(interval);   
        }
    }, 1000);
});

// Subscribe to the Observable
observable.subscribe({
	next: value => {
        console.log(`Observer 1: ${value}`);
    }
});

// After 5 seconds subscribe again
setTimeout(() => {
    observable.subscribe({
        next: value => {
            console.log(`Observer 2: ${value}`);
        }
    });
}, 5000);

/* Output

Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 1
Observer 1: 6
Observer 2: 2
Observer 1: 7
Observer 2: 3
Observer 1: 8
Observer 2: 4
Observer 1: 9
Observer 2: 5
Observer 1: 10
Observer 2: 6
Observer 2: 7
Observer 2: 8
Observer 2: 9
Observer 2: 10

*/

În ieșire, puteți observa că al doilea observator a început să tipărească de la 1, chiar dacă s-a abonat după 5 secunde. Acest lucru se întâmplă deoarece al doilea Observator a primit o copie a Observabilului a cărei funcție de abonare a fost invocată din nou. Aceasta ilustrează comportamentul unicast al Observables.

Subiecte

Un subiect este un tip special de observabil.

Crearea unui subiect

Un subiect este creat folosind new Subject constructor.

import { Subject } from 'rxjs';

// Create a subject
const subject = new Subject();

Abonarea la un subiect

Abonarea la un subiect este similară cu abonarea la un observabil: utilizați .subscribe metoda și treceți un observator.

subject.subscribe({
    next: (x) => console.log(x),
    error: (x) => console.log(x),
    complete: () => console.log("done")
});

Executarea unui subiect

Spre deosebire de Observables, un subiect își numește propriul .next, .error, și .complete metode de transmitere a datelor către observatori.

// Push data to all observers
subject.next('first data');

// Push error to all observers
subject.error('oops something went wrong');

// Complete
subject.complete('done');

Subiecții sunt Multicast

Subiecții sunt multidifuziune: mai mulți observatori împărtășesc același subiect și calea sa de execuție. Înseamnă că toate notificările sunt transmise tuturor observatorilor. Este ca și cum ai viziona un program live. Toți spectatorii urmăresc același segment al aceluiași conținut în același timp.

Exemplu: haideți să creăm un subiect care emite de la 1 la 10 în 10 secunde. Apoi, abonați-vă la Observable o dată imediat și din nou după 5 secunde.

// Create a subject
const subject = new Subject();

let count = 1;
const interval = setInterval(() => {
    subscriber.next(count++);
    if (count > 10) {
        clearInterval(interval);
    }
}, 1000);

// Subscribe to the subjects
subject.subscribe(data => {
    console.log(`Observer 1: ${data}`);
});

// After 5 seconds subscribe again
setTimeout(() => {
    subject.subscribe(data => {
    	console.log(`Observer 2: ${data}`);
	});
}, 5000);

/* OUTPUT

Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 5
Observer 1: 6
Observer 2: 6
Observer 1: 7
Observer 2: 7
Observer 1: 8
Observer 2: 8
Observer 1: 9
Observer 2: 9
Observer 1: 10
Observer 2: 10

*/

În rezultat, puteți observa că al doilea observator a început să tipărească de la 5 în loc să înceapă de la 1. Acest lucru se întâmplă deoarece al doilea observator împarte același subiect. De când s-a abonat după 5 secunde, subiectul a terminat deja emiterea de la 1 la 4. Aceasta ilustrează comportamentul multicast al unui subiect.

Subiecții sunt atât observabili, cât și observatori

Subiecții au .next, .error și .complete metode. Asta înseamnă că urmează structura Observatorilor. Prin urmare, un subiect poate fi folosit și ca observator și transmis către .subscribe funcția Observabilelor sau a altor Subiecte.

Exemplu: să creăm un Observabil și un Subiect. Apoi, abonați-vă la Observable folosind subiectul ca observator. În cele din urmă, abonați-vă la subiect. Toate valorile emise de Observabil vor fi împinse către Subiect, iar Subiectul va transmite valorile primite către toți Observatorii săi.

// Create an Observable that emits data every second
const observable = new Observable(subscriber => {
   let count = 1;
   const interval = setInterval(() => {
       subscriber.next(count++);
       
       if (count > 5) {
        	clearInterval(interval);   
       }
   }, 1000);
});

// Create a subject
const subject = new Subject();

// Use the Subject as Observer and subscribe to the Observable
observable.subscribe(subject);

// Subscribe to the subject
subject.subscribe({
    next: value => console.log(value)
});

/* Output

1
2
3
4
5

*/

Operatori

Operatorii sunt ceea ce face RxJS util. Operatorii sunt funcții pure care returnează un nou Observabil. Acestea pot fi clasificate în 2 categorii principale:

  1. Operatori de creație
  2. Operatori conductabili

Operatori de creație

Operatorii de creație sunt funcții care pot crea un nou Observabil.

Exemplu: putem crea un Observable care emite fiecare element al unui tablou folosind from operator.

const observable = from([2, 30, 5, 22, 60, 1]);

observable.subscribe({
    next: (value) => console.log("Received", value),
    error: (err) => console.log(err),
    complete: () => console.log("done")
});

/* OUTPUTS

Received 2
Received 30
Received 5
Received 22
Received 60
Received 1
done

*/

Același lucru poate fi observabil folosind diagrama de marmură.

1612179850 256 Un ghid pentru incepatori pentru RxJS si Redux observabil

Operatori conductabili

Operatorii conductabili sunt funcții care iau un observabil ca intrare și returnează un nou observabil cu un comportament modificat.

Exemplu: să luăm Observabilul pe care l-am creat folosind from operator. Acum, folosind acest Observable, putem pentru a crea un nou Observable care emite numai numere mai mari de 10 folosind filter operator.

const greaterThanTen = observable.pipe(filter(x => x > 10));

greaterThanTen.subscribe(console.log, console.log, () => console.log("completed"));

// OUTPUT
// 11
// 12
// 13
// 14
// 15

Același lucru poate fi reprezentat folosind diagrama de marmură.

1612179850 265 Un ghid pentru incepatori pentru RxJS si Redux observabil

Există mult mai mulți operatori utili acolo. Puteți vedea lista completă a operatorilor împreună cu exemple în documentația oficială RxJS Aici.

Este crucial să înțelegem toți operatorii utilizați în mod obișnuit. Iată câțiva operatori pe care îi folosesc des:

  1. mergeMap
  2. switchMap
  3. exhaustMap
  4. map
  5. catchError
  6. startWith
  7. delay
  8. debounce
  9. throttle
  10. interval
  11. from
  12. of

Redux Observables

Conform site-ului oficial,

RxJS– middleware bazat pe Redux. Compuneți și anulați acțiuni asincronizate pentru a crea efecte secundare și multe altele.

În Redux, ori de câte ori este expediată o acțiune, aceasta rulează prin toate funcțiile reductorului și se returnează o nouă stare.

Redux-observable ia toate aceste acțiuni expediate și stări noi și creează două observabile din acesta – Acțiuni observabile action$, și statele observabile state$.

Acțiunile observabile vor emite toate acțiunile care sunt expediate folosind store.dispatch(). Statele observabile vor emite toate noile obiecte de stare returnate de reductorul rădăcină.

Epopee

Conform site-ului oficial,

Este o funcție care ia un flux de acțiuni și returnează un flux de acțiuni. Acțiuni în, acțiuni în afara.

Epopeile sunt funcții care pot fi utilizate pentru a vă abona la Acțiuni și Observabile ale statelor. Odată abonate, epopeile vor primi fluxul de acțiuni și stări ca intrare și trebuie să returneze un flux de acțiuni ca ieșire. Actions In – Actions Out.

const someEpic = (action$, state$) => { 
    return action$.pipe( // subscribe to actions observable
        map(action => { // Receive every action, Actions In
            return someOtherAction(); // return an action, Actions Out
        })
    )
}

Este important să înțelegem că toate acțiunile primite în Epic au deja a terminat de rulat prin reductoare.

Într-o Epic, putem folosi orice tipare observabile RxJS, iar acest lucru face util redux-observabile.

Exemplu: putem folosi .filter operator pentru a crea un nou observabil intermediar. În mod similar, putem crea orice număr de observabile intermediare, dar rezultatul final al observabilului final trebuie să fie o acțiune, altfel o excepție va fi ridicată de redux-observabil.

const sampleEpic = (action$, state$) => {
    return action$.pipe(
    	filter(action => action.payload.age >= 18), // can create intermediate observables and streams
        map(value => above18(value)) // where above18 is an action creator
    );
}

Fiecare acțiune emisă de Epics este trimisă imediat folosind store.dispatch().

Înființat

Mai întâi, să instalăm dependențele.

npm install --save rxjs redux-observable

Creați un folder separat numit epics să păstrez toate epopeile. Creați un fișier nou index.js în interiorul epics folder și combinați toate epopeile folosind combineEpics funcție pentru a crea epopeea rădăcină. Apoi exportați epopeea rădăcină.

import { combineEpics } from 'redux-observable';
import { epic1 } from './epic1';
import { epic2 } from './epic2';

const epic1 = (action$, state$) => {
 ...   
}
 
const epic2 = (action$, state$) => {
 ...   
}
 
export default combineEpics(epic1, epic2);

Creați un middleware epic folosind createEpicMiddleware funcția și treceți-o la createStore Funcția de redux.

import { createEpicMiddleware } from 'redux-observable';
import { createStore, applyMiddleware } from 'redux';
import rootEpic from './rootEpics';

const epicMiddleware = createEpicMiddlware();

const store = createStore(
    rootReducer,
    applyMiddleware(epicMiddlware)
);

În cele din urmă, treceți rădăcina epică la middleware-ul epic .run metodă.

epicMiddleware.run(rootEpic);

Unele cazuri practice

RxJS are o mare curbă de învățare, iar configurarea observabilă redux înrăutățește procesul deja dureros de configurare Redux. Tot ceea ce face ca Redux observabil să arate ca o exagerare. Iată însă câteva cazuri practice de utilizare care vă pot răzgândi.

De-a lungul acestei secțiuni, voi compara redux-observables cu redux-thunk pentru a arăta cum redux-observables poate fi util în cazuri de utilizare complexe. Nu urăsc redux-thunk, îl ador și îl folosesc în fiecare zi!

1. Efectuați apeluri API

Utilizare caz: Efectuați un apel API pentru a prelua comentariile unei postări. Afișați încărcătoarele când apelul API este în desfășurare și gestionați, de asemenea, erorile API.

O implementare redux-thunk va arăta astfel,

function getComments(postId){
    return (dispatch) => {
        dispatch(getCommentsInProgress());
        axios.get(`/v1/api/posts/${postId}/comments`).then(response => {
            dispatch(getCommentsSuccess(response.data.comments));
        }).catch(() => {
            dispatch(getCommentsFailed());
        });
    }
}

iar acest lucru este absolut corect. Dar creatorul de acțiune este umflat.

Putem scrie un Epic pentru a-l implementa folosind redux-observables.

const getCommentsEpic = (action$, state$) => action$.pipe(
    ofType('GET_COMMENTS'),
    mergeMap((action) => from(axios.get(`/v1/api/posts/${action.payload.postId}/comments`).pipe(
        map(response => getCommentsSuccess(response.data.comments)),
        catchError(() => getCommentsFailed()),
        startWith(getCommentsInProgress())
    )
);

Acum ne permite să avem un creator de acțiune simplu și curat ca acesta,

function getComments(postId) {
    return {
        type: 'GET_COMMENTS',
        payload: {
            postId
        }
    }
}

2. Solicitați dezabonarea

Utilizare caz: Furnizați completarea automată pentru un câmp de text apelând un API ori de câte ori se modifică valoarea câmpului de text. Apelul API trebuie efectuat la o secundă după ce utilizatorul a încetat să scrie.

O implementare redux-thunk va arăta astfel,

let timeout;

function valueChanged(value) {
    return dispatch => {
        dispatch(loadSuggestionsInProgress());
        dispatch({
            type: 'VALUE_CHANGED',
            payload: {
                value
            }
        });

        // If changed again within 1 second, cancel the timeout
        timeout && clearTimeout(timeout);

        // Make API Call after 1 second
        timeout = setTimeout(() => {
        	axios.get(`/suggestions?q=${value}`)
                .then(response =>
                      dispatch(loadSuggestionsSuccess(response.data.suggestions)))
                .catch(() => dispatch(loadSuggestionsFailed()))
        }, 1000, value);
    }
}

Necesită o variabilă globală timeout. Când începem să utilizăm variabile globale, creatorii noștri de acțiune nu mai sunt funcții pure. De asemenea, devine dificil să testați unitatea creatorii de acțiune care utilizează o variabilă globală.

Putem implementa același lucru cu redux-observable folosind .debounce operator.

const loadSuggestionsEpic = (action$, state$) => action$.pipe(
    ofType('VALUE_CHANGED'),
    debounce(1000),
    mergeMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe(
    	map(response => loadSuggestionsSuccess(response.data.suggestions)),
        catchError(() => loadSuggestionsFailed())
    )),
    startWith(loadSuggestionsInProgress())
);

Acum, creatorii noștri de acțiune pot fi curățați și, mai important, pot fi din nou funcții pure.

function valueChanged(value) {
    return {
        type: 'VALUE_CHANGED',
        payload: {
            value
        }
    }
}

3. Solicitați anularea

Utilizare caz: Continuând cazul de utilizare anterior, presupunem că utilizatorul nu a tastat nimic timp de 1 secundă și am făcut primul apel API pentru a prelua sugestiile.

Să presupunem că API-ul în sine durează în medie 2-3 secunde pentru a returna rezultatul. Acum, dacă utilizatorul tastează ceva în timp ce primul apel API este în desfășurare, după o secundă, vom face al doilea API. Putem ajunge să avem două apeluri API în același timp și poate crea o condiție de cursă.

Pentru a evita acest lucru, trebuie să anulăm primul apel API înainte de a efectua al doilea apel API.

O implementare redux-thunk va arăta astfel,

let timeout;
var cancelToken = axios.cancelToken;
let apiCall;

function valueChanged(value) {    
    return dispatch => {
        dispatch(loadSuggestionsInProgress());
        dispatch({
            type: 'VALUE_CHANGED',
            payload: {
                value
            }
        });

        // If changed again within 1 second, cancel the timeout
        timeout && clearTimeout(timeout);

        // Make API Call after 1 second
        timeout = setTimeout(() => {
            // Cancel the existing API
            apiCall && apiCall.cancel('Operation cancelled');
            
            // Generate a new token
            apiCall = cancelToken.source();
            
            
            axios.get(`/suggestions?q=${value}`, {
                cancelToken: apiCall.token
            })
                .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions)))
                .catch(() => dispatch(loadSuggestionsFailed()))
     
        }, 1000, value);
    }
}

Acum, este nevoie de o altă variabilă globală pentru a stoca simbolul de anulare Axios. Mai multe variabile globale = mai multe funcții impure!

Pentru a implementa același lucru utilizând redux-observable, tot ce trebuie să facem este să înlocuim .mergeMap cu .switchMap.

const loadSuggestionsEpic = (action$, state$) => action$.pipe(
    ofType('VALUE_CHANGED'),
    throttle(1000),
    switchMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe(
    	map(response => loadSuggestionsSuccess(response.data.suggestions)),
        catchError(() => loadSuggestionsFailed())
    )),
    startWith(loadSuggestionsInProgress())
);

Deoarece nu necesită nicio modificare a creatorilor noștri de acțiune, aceștia pot continua să fie funcții pure.

În mod similar, există multe cazuri de utilizare în care Redux-Observables strălucește de fapt! De exemplu, sondarea unui API, afișarea snack-barurilor, gestionarea conexiunilor WebSocket, etc.

A concluziona

Dacă dezvoltați o aplicație Redux care implică astfel de cazuri de utilizare complexe, este foarte recomandat să utilizați Redux-Observables. La urma urmei, beneficiile utilizării acesteia sunt direct proporționale cu complexitatea aplicației dvs. și este evident din cazurile de utilizare practice menționate mai sus.

Cred cu tărie că folosirea setului corect de biblioteci ne va ajuta dezvoltați aplicații mult mai curate și care pot fi întreținuteși, pe termen lung, beneficiile utilizării acestora vor depăși dezavantajele.