Unul dintre dezavantajele nodului este că este cu un singur fir. Desigur, există o cale în jurul ei – și anume un modul numit grup. Clusterul ne permite să răspândim aplicația noastră pe mai multe fire.

Acum, însă, se prezintă o nouă problemă. Vedeți, codul nostru care rulează în mai multe instanțe are de fapt unele dezavantaje semnificative. Unul dintre ei nu are state globale.

În mod normal, într-o instanță cu un singur fir, acest lucru nu ar fi o îngrijorare. Pentru noi acum schimbă totul.

Să vedem de ce.

Deci care este problema?

Aplicația noastră este un chat online simplu care rulează pe patru fire. Aceasta permite utilizatorului să fie conectat în același timp pe telefonul și computerul său.

Imaginați-vă că avem socluri configurate exact așa cum le-am fi setat pentru un fir. Cu alte cuvinte, acum avem un mare stat global cu socluri.

Când utilizatorul se conectează pe computerul său, site-ul web deschide conexiunea cu o instanță Socket.io pe serverul nostru. Soclul este stocat în starea firului # 3.

Acum, imaginați-vă că utilizatorul merge la bucătărie pentru a lua o gustare și își ia telefonul cu ei – dorind în mod natural să continue să trimită mesaje text cu prietenii lor online.

Telefonul lor se conectează la firul 4, iar soclul este salvat în starea firului.

Trimiterea unui mesaj de pe telefonul său nu îi va ajuta utilizatorului. Doar persoanele din thread-ul 3 vor putea vedea mesajul. Acest lucru se datorează faptului că soclurile salvate pe firul # 3 nu sunt cumva stocate magic și pe firele # 1, # 2 și # 4.

Destul de amuzant, chiar și utilizatorul nu își va vedea mesajele pe computer după ce se întorc din bucătărie.

Desigur, atunci când reîmprospătează site-ul, am putea trimite o solicitare GET și să preluăm ultimele 50 de mesaje, dar nu putem spune cu adevărat că este modul „dinamic”, nu-i așa?

De ce se întâmplă asta?

Răspândirea serverului nostru pe mai multe fire este într-un fel echivalează cu a avea mai multe servere separate. Nu știu despre existența celuilalt și cu siguranță nu împărtășesc nicio amintire. Aceasta înseamnă că un obiect dintr-o instanță nu există pe cealaltă.

Soclurile salvate în firul # 3 nu sunt neapărat toate soclurile pe care utilizatorul le folosește în acest moment. Dacă prietenii utilizatorului sunt pe fire diferite, nu vor vedea mesajele utilizatorului decât dacă reîmprospătează site-ul web.

În mod ideal, am dori să anunțăm alte instanțe despre un eveniment pentru utilizator. Astfel putem fi siguri că fiecare dispozitiv conectat primește actualizări live.

O solutie

Putem notifica alte fire folosind Redis‘publica / abona paradigma de mesagerie (pubsub).

Redis este un open source (BSD-licențiat) structură de date în memorie magazin. Poate fi folosit ca bază de date, cache și broker de mesaje.

Aceasta înseamnă că putem folosi Redis pentru a avea evenimente distribuite între instanțele noastre.

Rețineți că, în mod normal, probabil că vom stoca întreaga noastră structură în cadrul Redis. Cu toate acestea, deoarece structura nu este serializabilă și trebuie menținută „în viață” în interiorul memoriei, vom stoca o parte din ea în fiecare instanță.

Fluxul

Să ne gândim acum la pașii în care vom gestiona un eveniment de intrare.

  1. Evenimentul a fost numit mesaj vine într-una din prizele noastre – în acest fel, nu trebuie să ascultăm fiecare eveniment posibil.
  2. În interiorul obiectului transmis către gestionarul acestui eveniment ca argument, putem găsi numele evenimentului. De exemplu, Trimite mesaj.on('message', ({ event }) =>{}).
  3. Dacă există un handler pentru acest nume, îl vom executa.
  4. Handlerul poate executa expediere cu un răspuns.
  5. Expedierea trimite evenimentul de răspuns la pub-ul nostru Redis. De acolo devine emis pentru fiecare dintre instanțele noastre.
  6. Fiecare instanță o emite în socketsState, asigurându-se că fiecare client conectat va primi evenimentul.

Pare complicat, știu, dar suportă-mă.

Implementare

Aici este repertoriu cu mediul gata, astfel încât să nu trebuiască să instalăm și să configurăm totul noi înșine.

În primul rând, vom configura un server cu Expres.

import * as moduleAlias from 'module-alias';

moduleAlias.addAliases({
  src: __dirname,
});

import * as express from 'express';
import * as http from 'http';
import * as socketio from 'socket.io';

const port = 7999;

const app = express();
const server = http.createServer(app);
const io = initSocket(socketio(server).of('/socket'));

server.listen(port, () => {
  console.log(`Listening on port ${port}.`);
});

Creăm o aplicație Express, server HTTP și socketuri init.

Acum ne putem concentra pe adăugarea soclurilor.

Trecem de Instanța serverului Socket.io la funcția noastră în care setăm mijlocul.

const initSocket = (instance: socketio.Namespace): socketio.Namespace =>
  instance.use(onAuth).use(onConnection);

onAuth

onAuth funcția imită pur și simplu o autorizație falsă. În cazul nostru, este bazat pe simboluri.

Personal, l-aș înlocui probabil cu JWT în viitor, dar nu va fi aplicat în niciun fel.

const onAuth: SocketMiddleware = (socket, next) => {
  const { token, id }: { token: string; id: string } =
    socket.request._query || socket.request.headers;

  if (!token) {
    return next(new Error('Authorization failed, no token has been provided!'));
  }

  // mock
  const user = checkToken(token, id);

  socket.user = user;

  return next();
};

Acum, să trecem la onConnection middleware.

onConnection

const onConnection: SocketMiddleware = (socket, next) => {
  if (!socket.user) {
    return next(new Error('Something went wrong.'));
  }

  const { id } = socket.user;

  socketsState.add(id, socket);

  socket.on('message', ({ event, args }) => {
    const handler = handlers[event];

    if (!handler) {
      return null;
    }

    return handler && handler({ id, args });
  });

  socket.on('disconnect', () => {
    return socketsState.remove(id, socket);
  });

  return next();
};

Aici vedem că preluăm cele ale utilizatorului id, care a fost setat în middleware-ul anterior și salvați-l în socketsState, cu cheia fiind id-ul și valoarea fiind o serie de sockets.

Apoi, ascultăm pentru mesaj eveniment. Întreaga noastră logică se bazează pe asta – fiecare eveniment pe care ni-l trimite frontendul va fi numit: mesaj.

Numele evenimentului va fi trimis în interiorul obiectului de argumente – așa cum sa menționat mai sus.

Manipulatori

După cum puteți vedea în onConnection, în special în ascultătorul pentru evenimentul mesajului, căutăm un handler bazat pe numele evenimentului.

Al nostru manipulatori este pur și simplu un obiect în care cheia este numele evenimentului și valoarea este funcția. Îl vom folosi pentru a asculta evenimentele și a răspunde în consecință.

const dispatchTypes = {
  MESSAGE_SENT: 'message_sent',
  POST_UPDATED_NOTIFICATION: 'post_updated_notification',
};

interface Handlers {
  [key: string]: ({ id, args }: { id: string; args: any }) => any;
}

const handlers: Handlers = {
  sendMessage: async ({ id, args }) => {
    // await sendMessageToUser();

    dispatch({
      id,
      event: dispatchTypes.MESSAGE_SENT,
      args: {
        message: `A message from user with id: ${id} has been send`,
      },
    });
  },
  postUpdated: async ({ id, args }) => {
    dispatch({
      id,
      event: dispatchTypes.POST_UPDATED_NOTIFICATION,
      args: {
        message: 'A post you have been mentioned in has been updated',
      },
    });
  },
};

export = handlers;

De asemenea, mai târziu, vom adăuga fișierul expediere funcționează și folosește-l pentru a trimite evenimentul peste instanțe.

SocketsState

Cunoaștem interfața statului nostru, dar încă nu am implementat-o.

Adăugăm metode pentru adăugarea și eliminarea unui socket, precum și pentru emiterea unui eveniment.

import * as socketio from 'socket.io';

interface SocketsState {
  [id: string]: socketio.Socket[];
}

const socketsState: SocketsState = {};

const add = (id: string, socket: socketio.Socket) => {
  if (!socketsState[id]) {
    socketsState[id] = [];
  }

  socketsState[id] = [...socketsState[id], socket];

  return socketsState[id];
};

const remove = (id: string, socket: socketio.Socket) => {
  if (!socketsState[id]) {
    return null;
  }

  socketsState[id] = socketsState[id].filter((s) => s !== socket);

  if (!socketsState[id].length) {
    socketsState[id] = undefined;
  }

  return null;
};

const emit = ({
  event,
  id,
  args,
}: {
  event: string;
  id: string;
  args: any;
}) => {
  if (!socketsState[id]) {
    return null;
  }

  socketsState[id].forEach((socket) =>
    socket.emit('message', { event, id, args }),
  );

  return null;
};

export { add, remove, emit };

adăuga funcția verifică dacă statul are o proprietate care este egală cu ID-ul utilizatorului. Dacă acesta este cazul, îl adăugăm pur și simplu la matricea noastră deja existentă. În caz contrar, creăm mai întâi o nouă matrice.

elimina funcția verifică, de asemenea, dacă statul are ID-ul utilizatorului în proprietățile sale. Dacă nu – nu face nimic. În caz contrar, filtrează matricea pentru a elimina soclul din matrice. Apoi, dacă matricea este goală, o elimină din stare, setând proprietatea la nedefinit.

Redis ‘pubsub

Pentru crearea noastră pubsub vom folosi pachetul numit nod-redis-pubsub.

import * as NRP from 'node-redis-pubsub';

const client = new NRP({
  port: 6379,
  scope: 'message',
});

export = client;

Adăugarea expedierii

Ok, acum nu mai rămâne decât să adăugați funcția de expediere …

const dispatch = ({
  event,
  id,
  args,
}: {
  event: string;
  id: string;
  args: any;
}) => pubsub.emit('outgoing_socket_message', { event, id, args });

… și adăugați un ascultător pentru outgoing_socket_message. În acest fel, fiecare instanță primește evenimentul și îl trimite la soclurile utilizatorului.

pubsub.on('outgoing_socket_message', ({ event, id, args }) =>
  socketsState.emit({ event, id, args }),
);

Făcând totul multi-threaded

În cele din urmă, să adăugăm codul necesar pentru ca serverul nostru să fie cu mai multe fire.

import * as os from 'os';
import * as cluster from 'cluster';

const spawn = () => {
  const numWorkes = os.cpus().length;

  for (let i = 0; i < numWorkes; i += 1) {
    cluster.fork();
  }

  cluster.on('online', () => {
    console.log('Worker spawned');
  });

  cluster.on('exit', (worker, code, status) => {
    if (code === 0 || worker.exitedAfterDisconnect) {
      console.log(`Worker ${worker.process.pid} finished his job.`);
      return null;
    }

    console.log(
      `Worker ${
        worker.process.pid
      } crashed with code ${code} and status ${status}.`,
    );
    return cluster.fork();
  });
};

export { spawn };
import * as moduleAlias from 'module-alias';

moduleAlias.addAliases({
  src: __dirname,
});

import * as express from 'express';
import * as http from 'http';
import * as cluster from 'cluster';
import * as socketio from 'socket.io';
import * as killPort from 'kill-port';
import { initSocket } from 'src/common/socket';
import { spawn } from 'src/clusters';

const port = 7999;

if (cluster.isMaster) {
  killPort(port).then(spawn);
} else {
  const app = express();
  const server = http.createServer(app);
  const io = initSocket(socketio(server).of('/socket'));

  server.listen(port, () => {
    console.log(`Listening on port ${port}.`);
  });
}

Notă: Trebuie să omorâm portul, pentru că după ce l-am părăsit Nodemon proces cu Ctrl + c doar atârnă acolo.

Cu un pic de ajustare, avem acum prize de lucru în toate cazurile. Ca rezultat: un server mult mai eficient.

Vă mulțumesc foarte mult pentru citire!

Apreciez că totul ar putea părea copleșitor la început și obositor să ia totul dintr-o dată. Având în vedere acest lucru, vă încurajez să citiți codul din nou în întregime și să îl meditați în ansamblu.

Dacă aveți întrebări sau comentarii, nu ezitați să le puneți în secțiunea de comentarii de mai jos sau să-mi trimiteți un mesaj.

Verifică-mi social media!

Alătură-te newsletter-ului meu!

Publicat inițial la www.mcieslar.com pe 10 septembrie 2018.