Dacă ai citit povestea mea anterioară despre Scalachain, probabil ați observat că este departe de a fi un sistem distribuit. Îi lipsesc toate caracteristicile pentru a lucra corect cu alte noduri. Adăugați că un blockchain compus dintr-un singur nod este inutil. Din acest motiv, am decis că este timpul să lucrez la această problemă.

Deoarece Scalachain este alimentat de Akka, de ce să nu profitați de ocazia de a juca cu Akka Cluster? Am creat un proiect simplu să te joci puțin Clusterul Akka, și în această poveste voi împărtăși învățăturile mele. Vom crea un cluster de trei noduri, folosind Cluster Aware Routers pentru a echilibra sarcina dintre ei. Totul va rula într-un container Docker și vom folosi docker-compose pentru o implementare ușoară.

Ok, hai să ne rostogolim! ?

Introducere rapidă la Clusterul Akka

Akka Cluster oferă un mare sprijin pentru crearea de aplicații distribuite. Cel mai bun caz de utilizare este atunci când aveți un nod pe care doriți să îl reproduceți de N ori într-un mediu distribuit. Aceasta înseamnă că toate cele N noduri sunt colegii care rulează același cod. Clusterul Akka vă oferă imediat descoperirea membrilor din același cluster. Folosind Cluster Aware Routers este posibil să se echilibreze mesajele dintre actorii din diferite noduri. De asemenea, este posibil să alegeți politica de echilibrare, făcând echilibrarea sarcinii o bucată de tort!

De fapt, puteți alege între două tipuri de routere:

Router de grup – Actorii la care să trimită mesajele – numite routees – sunt specificate folosind calea lor de actor. Routerele partajează rutele create în cluster. Vom folosi un Router de grup în acest exemplu.

Cum sa faci o aplicatie simpla cu Akka Cluster
Router de grup

Router pentru piscină – Rutele sunt create și implementate de router, deci sunt copiii acestuia în ierarhia actorilor. Traseele nu sunt partajate între rute. Acest lucru este ideal pentru un scenariu de replică primară, în care fiecare ruter este principalul și rute sale replici.

1612126150 969 Cum sa faci o aplicatie simpla cu Akka Cluster
Router pentru piscină

Acesta este doar vârful aisbergului, așa că vă invit să citiți documentație oficială pentru mai multe informații.

Un cluster pentru calcule matematice

Să ne imaginăm un scenariu de utilizare. Să presupunem că proiectăm un sistem pentru a executa calcule matematice la cerere. Sistemul este implementat online, deci are nevoie de un API REST pentru a primi cererile de calcul. Un procesor intern gestionează aceste solicitări, executând calculul și returnând rezultatul.

În acest moment procesorul poate calcula numai fișierul Numărul lui Fibonacci. Decidem să folosim un grup de noduri pentru a distribui sarcina între noduri și pentru a îmbunătăți performanța. Akka Cluster va gestiona dinamica clusterului și echilibrarea încărcării între noduri. OK suna bine!

Ierarhia actorilor

În primul rând, trebuie să ne definim ierarhia actorilor. Sistemul poate fi împărțit în trei părți funcționale: lociga afacerii, gestionarea clusterelor, si nodul în sine. Există, de asemenea, Server dar nu este un actor și vom lucra la asta mai târziu.

Lociga afacerii

Aplicația ar trebui să facă calcule matematice. Putem defini un simplu Processor actor pentru a gestiona toate sarcinile de calcul. Fiecare calcul pe care îl susținem poate fi implementat într-un anumit actor, care va fi un copil al Processor unu. În acest fel, aplicația este modulară și mai ușor de extins și întreținut. Chiar acum singurul copil al Processor va fi ProcessorFibonacci actor. Presupun că poți ghici care este sarcina sa. Acest lucru ar trebui să fie suficient pentru a începe.

Managementul clusterelor

Pentru a gestiona clusterul avem nevoie de un ClusterManager. Sună simplu, nu? Acest actor se ocupă de tot ceea ce este legat de cluster, cum ar fi returnarea membrilor săi atunci când i se cere. Ar fi util să înregistrăm ce se întâmplă în interiorul clusterului, așa că definim un ClusterListener actor. Acesta este un copil al ClusterManagerși se abonează la evenimentele cluster care le înregistrează.

Nodul

Node actorul este rădăcina ierarhiei noastre. Punctul de intrare al sistemului nostru este cel care comunică cu API-ul. Processor si ClusterManager sunt copiii săi, împreună cu ProcessorRouter actor. Acesta este echilibratorul de sarcină al sistemului, distribuind sarcina între Processors. Îl vom configura ca un Cluster Aware Router, deci fiecare ProcessorRouter poate trimite mesaje către Processorpe fiecare nod.

1612126150 339 Cum sa faci o aplicatie simpla cu Akka Cluster
Ierarhia actorilor

Implementarea actorului

E timpul să ne punem în aplicare actorii! La început punem în aplicare actorii legați de logica de afaceri a sistemului. Mergem apoi pe actorii pentru managementul clusterului și pe actorul rădăcină (Node) în cele din urmă.

Procesor Fibonacci

Acest actor execută calculul numărului Fibonacci. Primește un Compute mesaj care conține numărul de calculat și referința actorului la care să răspundă. Referința este importantă, deoarece pot exista diferiți actori care solicită. Amintiți-vă că lucrăm într-un mediu distribuit!

Odata ce Compute mesajul este primit, fibonacci funcția calculează rezultatul. Îl înfășurăm într-un ProcessorResponse obiectul de a furniza informații despre nodul care a executat calculul. Acest lucru va fi util mai târziu pentru a vedea politica de „round-robin” în acțiune.

Rezultatul este apoi trimis actorului la care ar trebui să răspundem. Ușor de păros.

object ProcessorFibonacci {
  sealed trait ProcessorFibonacciMessage
  case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage

  def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId))

  def fibonacci(x: Int): BigInt = {
    @tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match {
      case 0 => prev
      case 1 => next
      case _ => fibHelper(x - 1, next, next + prev)
    }
    fibHelper(x)
  }
}

class ProcessorFibonacci(nodeId: String) extends Actor {
  import ProcessorFibonacci._

  override def receive: Receive = {
    case Compute(value, replyTo) => {
      replyTo ! ProcessorResponse(nodeId, fibonacci(value))
    }
  }
}

Procesor

Processor actorul gestionează subprocesoarele specifice, precum cel din Fibonacci. Ar trebui să instanțeze sub-procesoarele și să le transmită solicitările. În acest moment avem doar un sub-procesor, deci Processor primește un fel de mesaj: ComputeFibonacci. Acest mesaj conține numărul Fibonacci de calculat. Odată primit, numărul de calculat este trimis către un FibonacciProcessor, împreună cu referința sender().

object Processor {

  sealed trait ProcessorMessage

  case class ComputeFibonacci(n: Int) extends ProcessorMessage

  def props(nodeId: String) = Props(new Processor(nodeId))
}

class Processor(nodeId: String) extends Actor {
  import Processor._

  val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci")

  override def receive: Receive = {
    case ComputeFibonacci(value) => {
      val replyTo = sender()
      fibonacciProcessor ! Compute(value, replyTo)
    }
  }
}

ClusterListener

Am dori să înregistrăm informații utile despre ceea ce se întâmplă în cluster. Acest lucru ne-ar putea ajuta să depanăm sistemul dacă este nevoie. Acesta este scopul ClusterListener actor. Înainte de a începe, se abonează la mesajele de eveniment ale clusterului. Actorul reacționează la mesaje de genul MemberUp, UnreachableMember, sau MemberRemoved, înregistrarea evenimentului corespunzător. Când ClusterListener este oprit, se dezabonează de la evenimentele cluster.

object ClusterListener {
  def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster))
}

class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging {

  override def preStart(): Unit = {
    cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
      classOf[MemberEvent], classOf[UnreachableMember])
  }

  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case MemberUp(member) =>
      log.info("Node {} - Member is Up: {}", nodeId, member.address)
    case UnreachableMember(member) =>
      log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member)
    case MemberRemoved(member, previousStatus) =>
      log.info(s"Node {} - Member is Removed: {} after {}",
        nodeId, member.address, previousStatus)
    case _: MemberEvent => // ignore
  }
}

ClusterManager

Actorul responsabil de gestionarea clusterului este ClusterManager. Se creează ClusterListener actor și furnizează lista membrilor grupului la cerere. Ar putea fi extins pentru a adăuga mai multe funcționalități, dar în acest moment este suficient.

object ClusterManager {

  sealed trait ClusterMessage
  case object GetMembers extends ClusterMessage

  def props(nodeId: String) = Props(new ClusterManager(nodeId))
}

class ClusterManager(nodeId: String) extends Actor with ActorLogging {

  val cluster: Cluster = Cluster(context.system)
  val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener")

  override def receive: Receive = {
    case GetMembers => {
      sender() ! cluster.state.members.filter(_.status == MemberStatus.up)
        .map(_.address.toString)
        .toList
    }
  }
}

ProcessorRouter

Echilibrarea sarcinii între procesoare este gestionată de ProcessorRouter. Este creat de Node actor, dar de data aceasta toate informațiile necesare sunt furnizate în configurația sistemului.

class Node(nodeId: String) extends Actor {

  //...
  
  val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter")
  
  //...
}

Să analizăm partea relevantă din application.conf fişier.

akka {
  actor {
    ...
    deployment {
      /node/processorRouter {
        router = round-robin-group
        routees.paths = ["/user/node/processor"]
        cluster {
          enabled = on
          allow-local-routees = on
        }
      }
    }
  }
  ...
}

Primul lucru este să specificați calea către actorul routerului, adică /node/processorRouter. În interiorul acelei proprietăți putem configura comportamentul routerului:

  • router: aceasta este politica pentru echilibrarea încărcării mesajelor. Am ales round-robin-group, dar sunt multe altele.
  • routees.paths: acestea sunt căile către actorii care vor primi mesajele gestionate de router. Spunem: „Când primiți un mesaj, căutați actorii care corespund acestor căi. Alegeți una conform politicii și transmiteți-i mesajul. ” Deoarece folosim Cluster Aware Routers, rutele pot fi pe orice nod al clusterului.
  • cluster.enabled: operăm într-un cluster? Raspunsul este on, desigur!
  • cluster.allow-local-routees: aici permitem routerului să aleagă o rută în nodul său.

Folosind această configurație putem crea un router pentru a încărca echilibrul de lucru între procesoarele noastre.

Nodul

Rădăcina ierarhiei actorilor noștri este Node. Creează actori pentru copii – ClusterManager, Processor, și ProcessorRouter – și redirecționează mesajele către cel corect. Nimic complex aici.

object Node {

  sealed trait NodeMessage

  case class GetFibonacci(n: Int)

  case object GetClusterMembers

  def props(nodeId: String) = Props(new Node(nodeId))
}

class Node(nodeId: String) extends Actor {

  val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor")
  val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter")
  val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager")

  override def receive: Receive = {
    case GetClusterMembers => clusterManager forward GetMembers
    case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value)
  }
}

Server și API

Fiecare nod al clusterului nostru rulează un server capabil să primească cereri. Server creează sistemul nostru actor și este configurat prin application.conf fişier.

object Server extends App with NodeRoutes {

  implicit val system: ActorSystem = ActorSystem("cluster-playground")
  implicit val materializer: ActorMaterializer = ActorMaterializer()

  val config: Config = ConfigFactory.load()
  val address = config.getString("http.ip")
  val port = config.getInt("http.port")
  val nodeId = config.getString("clustering.ip")

  val node: ActorRef = system.actorOf(Node.props(nodeId), "node")

  lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes

  Http().bindAndHandle(routes, address, port)
  println(s"Node $nodeId is listening at http://$address:$port")

  Await.result(system.whenTerminated, Duration.Inf)

}

Akka HTTP alimentează serverul în sine și API-ul REST, expunând trei puncte finale simple. Aceste puncte finale sunt definite în NodeRoutes trăsătură.

Primul este /health, pentru a verifica starea unui nod. Răspunde cu un 200 OK dacă nodul funcționează

lazy val healthRoute: Route = pathPrefix("health") {
    concat(
      pathEnd {
        concat(
          get {
            complete(StatusCodes.OK)
          }
        )
      }
    )
  }

/status/members punctul final răspunde cu membrii activi actuali ai clusterului.

lazy val statusRoutes: Route = pathPrefix("status") {
    concat(
      pathPrefix("members") {
        concat(
          pathEnd {
            concat(
              get {
                val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]]
                onSuccess(membersFuture) { members =>
                  complete(StatusCodes.OK, members)
                }
              }
            )
          }
        )
      }
    )
  }

Ultimul (dar nu cel mai mic) este /process/fibonacci/n punct final, folosit pentru a solicita numărul Fibonacci al n.

lazy val processRoutes: Route = pathPrefix("process") {
    concat(
      pathPrefix("fibonacci") {
        concat(
          path(IntNumber) { n =>
            pathEnd {
              concat(
                get {
                  val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse]
                  onSuccess(processFuture) { response =>
                    complete(StatusCodes.OK, response)
                  }
                }
              )
            }
          }
        )
      }
    )
  }

Răspunde cu un ProcessorResponse care conține rezultatul, împreună cu ID-ul nodului în care a avut loc calculul.

Configurare cluster

Odată ce avem toți actorii noștri, trebuie să configurăm sistemul pentru a rula ca un cluster! application.conf fișierul este locul unde are loc magia. Voi împărți în bucăți pentru a-l prezenta mai bine, dar puteți găsi fișierul complet Aici.

Să începem să definim câteva variabile utile.

clustering {
  ip = "127.0.0.1"
  ip = ${?CLUSTER_IP}

  port = 2552
  port = ${?CLUSTER_PORT}

  seed-ip = "127.0.0.1"
  seed-ip = ${?CLUSTER_SEED_IP}

  seed-port = 2552
  seed-port = ${?CLUSTER_SEED_PORT}

  cluster.name = "cluster-playground"
}

Aici definim pur și simplu ip-ul și portul nodurilor și semințelor, precum și numele cluster-ului. Setăm o valoare implicită, apoi o înlocuim dacă este specificată una nouă. Configurația clusterului este următoarea.

akka {
  actor {
    provider = "cluster"
    ...
    /* router configuration */
    ...
  }
  remote {
    log-remote-lifecycle-events = on
    netty.tcp {
      hostname = ${clustering.ip}
      port = ${clustering.port}
    }
  }
  cluster {
    seed-nodes = [
      "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port}
    ]
    auto-down-unreachable-after = 10s
  }
}
...
/* server vars */
...
/* cluster vars */
}

Akka Cluster este construit pe partea de sus a Akka Remoting, deci trebuie să-l configurăm corect. În primul rând, specificăm că vom folosi Akka Cluster spunând asta provider = "cluster". Apoi ne legăm cluster.ip și cluster.port la hostname și port din netty cadru web.

Clusterul necesită câteva noduri de semințe ca puncte de intrare. Le-am așezat în seed-nodes matrice, în format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”. În acest moment avem un singur nod de semințe, dar putem adăuga mai multe mai târziu.

auto-down-unreachable-after proprietatea stabilește un membru ca fiind jos după ce nu este accesibil pentru o perioadă de timp. Acest lucru trebuie utilizat numai în timpul dezvoltării, după cum se explică în documentație oficială.

Ok, clusterul este configurat, putem trece la pasul următor: andocare și implementare!

Dockerizarea și implementarea

Pentru a crea containerul Docker al nodului nostru, îl putem folosi sbt-native-packager. Instalarea sa este ușoară: adăugați addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15") la plugin.sbt fișier în project/ pliant. Acest instrument uimitor are un plugin pentru crearea containerelor Docker. ne permite să configurăm proprietățile fișierului nostru Docker în build.sbt fişier.

// other build.sbt properties

enablePlugins(JavaAppPackaging)
enablePlugins(DockerPlugin)
enablePlugins(AshScriptPlugin)

mainClass in Compile := Some("com.elleflorio.cluster.playground.Server")
dockerBaseImage := "java:8-jre-alpine"
version in Docker := "latest"
dockerExposedPorts := Seq(8000)
dockerRepository := Some("elleflorio")

Odată ce am configurat pluginul, putem crea imaginea de andocare care rulează comanda sbt docker:publishLocal. Rulați comanda și gustați magia …?

Avem imaginea Docker a nodului nostru, acum trebuie să o implementăm și să verificăm dacă totul funcționează bine. Cel mai simplu mod este să creați un fișier docker-compose fișier care va genera o sămânță și câteva alte noduri.

version: '3.5'

networks:
  cluster-network:

services:
  seed:
    networks:
      - cluster-network
    image: elleflorio/akka-cluster-playground
    ports:
      - '2552:2552'
      - '8000:8000'
    environment:
      SERVER_IP: 0.0.0.0
      CLUSTER_IP: seed
      CLUSTER_SEED_IP: seed

  node1:
    networks:
      - cluster-network
    image: elleflorio/akka-cluster-playground
    ports:
      - '8001:8000'
    environment:
      SERVER_IP: 0.0.0.0
      CLUSTER_IP: node1
      CLUSTER_PORT: 1600
      CLUSTER_SEED_IP: seed
      CLUSTER_SEED_PORT: 2552

  node2:
    networks:
      - cluster-network
    image: elleflorio/akka-cluster-playground
    ports:
      - '8002:8000'
    environment:
      SERVER_IP: 0.0.0.0
      CLUSTER_IP: node2
      CLUSTER_PORT: 1600
      CLUSTER_SEED_IP: seed
      CLUSTER_SEED_PORT: 2552

Nu voi petrece timp trecând prin el, deoarece este destul de simplu.

Să-l rulăm!

E timpul să ne testăm munca! Odată ce rulăm docker-compose up comanda, vom avea un cluster de trei noduri în funcțiune. seed va răspunde cererilor din port :8000, in timp ce node1 și node2 la port :8001 și :8002. Joacă puțin cu diferitele puncte finale. Veți vedea că cererile pentru un număr Fibonacci vor fi calculate de un nod diferit de fiecare dată, în urma unei politici round-robin. Este bine, suntem mândri de munca noastră și putem ieși la o bere pentru a sărbători! ?

Concluzie

Am terminat aici! Am învățat o mulțime de lucruri în aceste zece minute:

  • Ce este clusterul Akka și ce poate face pentru noi.
  • Cum să creați o aplicație distribuită cu aceasta.
  • Cum se configurează un router de grup pentru echilibrarea încărcării în cluster.
  • Cum să Dockerize totul și să-l implementeze folosind docker-compose.

Aplicația completă o puteți găsi în Repo GitHub. Simțiți-vă liber să contribuiți sau să vă jucați cu el după cum doriți! ?

Te văd! ?