Dacă ai citit povestea mea anterioară despre Scalachain, probabil ați observat că este departe de a fi un sistem distribuit. Îi lipsesc toate caracteristicile pentru a lucra corect cu alte noduri. Adăugați că un blockchain compus dintr-un singur nod este inutil. Din acest motiv, am decis că este timpul să lucrez la această problemă.
Deoarece Scalachain este alimentat de Akka, de ce să nu profitați de ocazia de a juca cu Akka Cluster? Am creat un proiect simplu să te joci puțin Clusterul Akka, și în această poveste voi împărtăși învățăturile mele. Vom crea un cluster de trei noduri, folosind Cluster Aware Routers pentru a echilibra sarcina dintre ei. Totul va rula într-un container Docker și vom folosi docker-compose pentru o implementare ușoară.
Ok, hai să ne rostogolim! ?
Conţinut
Introducere rapidă la Clusterul Akka
Akka Cluster oferă un mare sprijin pentru crearea de aplicații distribuite. Cel mai bun caz de utilizare este atunci când aveți un nod pe care doriți să îl reproduceți de N ori într-un mediu distribuit. Aceasta înseamnă că toate cele N noduri sunt colegii care rulează același cod. Clusterul Akka vă oferă imediat descoperirea membrilor din același cluster. Folosind Cluster Aware Routers este posibil să se echilibreze mesajele dintre actorii din diferite noduri. De asemenea, este posibil să alegeți politica de echilibrare, făcând echilibrarea sarcinii o bucată de tort!
De fapt, puteți alege între două tipuri de routere:
Router de grup – Actorii la care să trimită mesajele – numite routees – sunt specificate folosind calea lor de actor. Routerele partajează rutele create în cluster. Vom folosi un Router de grup în acest exemplu.
Router pentru piscină – Rutele sunt create și implementate de router, deci sunt copiii acestuia în ierarhia actorilor. Traseele nu sunt partajate între rute. Acest lucru este ideal pentru un scenariu de replică primară, în care fiecare ruter este principalul și rute sale replici.
Acesta este doar vârful aisbergului, așa că vă invit să citiți documentație oficială pentru mai multe informații.
Un cluster pentru calcule matematice
Să ne imaginăm un scenariu de utilizare. Să presupunem că proiectăm un sistem pentru a executa calcule matematice la cerere. Sistemul este implementat online, deci are nevoie de un API REST pentru a primi cererile de calcul. Un procesor intern gestionează aceste solicitări, executând calculul și returnând rezultatul.
În acest moment procesorul poate calcula numai fișierul Numărul lui Fibonacci. Decidem să folosim un grup de noduri pentru a distribui sarcina între noduri și pentru a îmbunătăți performanța. Akka Cluster va gestiona dinamica clusterului și echilibrarea încărcării între noduri. OK suna bine!
Ierarhia actorilor
În primul rând, trebuie să ne definim ierarhia actorilor. Sistemul poate fi împărțit în trei părți funcționale: lociga afacerii, gestionarea clusterelor, si nodul în sine. Există, de asemenea, Server dar nu este un actor și vom lucra la asta mai târziu.
Lociga afacerii
Aplicația ar trebui să facă calcule matematice. Putem defini un simplu Processor
actor pentru a gestiona toate sarcinile de calcul. Fiecare calcul pe care îl susținem poate fi implementat într-un anumit actor, care va fi un copil al Processor
unu. În acest fel, aplicația este modulară și mai ușor de extins și întreținut. Chiar acum singurul copil al Processor
va fi ProcessorFibonacci
actor. Presupun că poți ghici care este sarcina sa. Acest lucru ar trebui să fie suficient pentru a începe.
Managementul clusterelor
Pentru a gestiona clusterul avem nevoie de un ClusterManager
. Sună simplu, nu? Acest actor se ocupă de tot ceea ce este legat de cluster, cum ar fi returnarea membrilor săi atunci când i se cere. Ar fi util să înregistrăm ce se întâmplă în interiorul clusterului, așa că definim un ClusterListener
actor. Acesta este un copil al ClusterManager
și se abonează la evenimentele cluster care le înregistrează.
Nodul
Node
actorul este rădăcina ierarhiei noastre. Punctul de intrare al sistemului nostru este cel care comunică cu API-ul. Processor
si ClusterManager
sunt copiii săi, împreună cu ProcessorRouter
actor. Acesta este echilibratorul de sarcină al sistemului, distribuind sarcina între Processor
s. Îl vom configura ca un Cluster Aware Router, deci fiecare ProcessorRouter
poate trimite mesaje către Processor
pe fiecare nod.
Implementarea actorului
E timpul să ne punem în aplicare actorii! La început punem în aplicare actorii legați de logica de afaceri a sistemului. Mergem apoi pe actorii pentru managementul clusterului și pe actorul rădăcină (Node
) în cele din urmă.
Procesor Fibonacci
Acest actor execută calculul numărului Fibonacci. Primește un Compute
mesaj care conține numărul de calculat și referința actorului la care să răspundă. Referința este importantă, deoarece pot exista diferiți actori care solicită. Amintiți-vă că lucrăm într-un mediu distribuit!
Odata ce Compute
mesajul este primit, fibonacci
funcția calculează rezultatul. Îl înfășurăm într-un ProcessorResponse
obiectul de a furniza informații despre nodul care a executat calculul. Acest lucru va fi util mai târziu pentru a vedea politica de „round-robin” în acțiune.
Rezultatul este apoi trimis actorului la care ar trebui să răspundem. Ușor de păros.
object ProcessorFibonacci {
sealed trait ProcessorFibonacciMessage
case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage
def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId))
def fibonacci(x: Int): BigInt = {
@tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match {
case 0 => prev
case 1 => next
case _ => fibHelper(x - 1, next, next + prev)
}
fibHelper(x)
}
}
class ProcessorFibonacci(nodeId: String) extends Actor {
import ProcessorFibonacci._
override def receive: Receive = {
case Compute(value, replyTo) => {
replyTo ! ProcessorResponse(nodeId, fibonacci(value))
}
}
}
Procesor
Processor
actorul gestionează subprocesoarele specifice, precum cel din Fibonacci. Ar trebui să instanțeze sub-procesoarele și să le transmită solicitările. În acest moment avem doar un sub-procesor, deci Processor
primește un fel de mesaj: ComputeFibonacci
. Acest mesaj conține numărul Fibonacci de calculat. Odată primit, numărul de calculat este trimis către un FibonacciProcessor
, împreună cu referința sender()
.
object Processor {
sealed trait ProcessorMessage
case class ComputeFibonacci(n: Int) extends ProcessorMessage
def props(nodeId: String) = Props(new Processor(nodeId))
}
class Processor(nodeId: String) extends Actor {
import Processor._
val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci")
override def receive: Receive = {
case ComputeFibonacci(value) => {
val replyTo = sender()
fibonacciProcessor ! Compute(value, replyTo)
}
}
}
ClusterListener
Am dori să înregistrăm informații utile despre ceea ce se întâmplă în cluster. Acest lucru ne-ar putea ajuta să depanăm sistemul dacă este nevoie. Acesta este scopul ClusterListener
actor. Înainte de a începe, se abonează la mesajele de eveniment ale clusterului. Actorul reacționează la mesaje de genul MemberUp
, UnreachableMember
, sau MemberRemoved
, înregistrarea evenimentului corespunzător. Când ClusterListener
este oprit, se dezabonează de la evenimentele cluster.
object ClusterListener {
def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster))
}
class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging {
override def preStart(): Unit = {
cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
classOf[MemberEvent], classOf[UnreachableMember])
}
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case MemberUp(member) =>
log.info("Node {} - Member is Up: {}", nodeId, member.address)
case UnreachableMember(member) =>
log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member)
case MemberRemoved(member, previousStatus) =>
log.info(s"Node {} - Member is Removed: {} after {}",
nodeId, member.address, previousStatus)
case _: MemberEvent => // ignore
}
}
ClusterManager
Actorul responsabil de gestionarea clusterului este ClusterManager
. Se creează ClusterListener
actor și furnizează lista membrilor grupului la cerere. Ar putea fi extins pentru a adăuga mai multe funcționalități, dar în acest moment este suficient.
object ClusterManager {
sealed trait ClusterMessage
case object GetMembers extends ClusterMessage
def props(nodeId: String) = Props(new ClusterManager(nodeId))
}
class ClusterManager(nodeId: String) extends Actor with ActorLogging {
val cluster: Cluster = Cluster(context.system)
val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener")
override def receive: Receive = {
case GetMembers => {
sender() ! cluster.state.members.filter(_.status == MemberStatus.up)
.map(_.address.toString)
.toList
}
}
}
ProcessorRouter
Echilibrarea sarcinii între procesoare este gestionată de ProcessorRouter
. Este creat de Node
actor, dar de data aceasta toate informațiile necesare sunt furnizate în configurația sistemului.
class Node(nodeId: String) extends Actor {
//...
val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter")
//...
}
Să analizăm partea relevantă din application.conf
fişier.
akka {
actor {
...
deployment {
/node/processorRouter {
router = round-robin-group
routees.paths = ["/user/node/processor"]
cluster {
enabled = on
allow-local-routees = on
}
}
}
}
...
}
Primul lucru este să specificați calea către actorul routerului, adică /node/processorRouter
. În interiorul acelei proprietăți putem configura comportamentul routerului:
-
router
: aceasta este politica pentru echilibrarea încărcării mesajelor. Am alesround-robin-group
, dar sunt multe altele. -
routees.paths
: acestea sunt căile către actorii care vor primi mesajele gestionate de router. Spunem: „Când primiți un mesaj, căutați actorii care corespund acestor căi. Alegeți una conform politicii și transmiteți-i mesajul. ” Deoarece folosim Cluster Aware Routers, rutele pot fi pe orice nod al clusterului. -
cluster.enabled
: operăm într-un cluster? Raspunsul esteon
, desigur! -
cluster.allow-local-routees
: aici permitem routerului să aleagă o rută în nodul său.
Folosind această configurație putem crea un router pentru a încărca echilibrul de lucru între procesoarele noastre.
Nodul
Rădăcina ierarhiei actorilor noștri este Node
. Creează actori pentru copii – ClusterManager
, Processor
, și ProcessorRouter
– și redirecționează mesajele către cel corect. Nimic complex aici.
object Node {
sealed trait NodeMessage
case class GetFibonacci(n: Int)
case object GetClusterMembers
def props(nodeId: String) = Props(new Node(nodeId))
}
class Node(nodeId: String) extends Actor {
val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor")
val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter")
val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager")
override def receive: Receive = {
case GetClusterMembers => clusterManager forward GetMembers
case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value)
}
}
Server și API
Fiecare nod al clusterului nostru rulează un server capabil să primească cereri. Server
creează sistemul nostru actor și este configurat prin application.conf
fişier.
object Server extends App with NodeRoutes {
implicit val system: ActorSystem = ActorSystem("cluster-playground")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val config: Config = ConfigFactory.load()
val address = config.getString("http.ip")
val port = config.getInt("http.port")
val nodeId = config.getString("clustering.ip")
val node: ActorRef = system.actorOf(Node.props(nodeId), "node")
lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes
Http().bindAndHandle(routes, address, port)
println(s"Node $nodeId is listening at http://$address:$port")
Await.result(system.whenTerminated, Duration.Inf)
}
Akka HTTP alimentează serverul în sine și API-ul REST, expunând trei puncte finale simple. Aceste puncte finale sunt definite în NodeRoutes
trăsătură.
Primul este /health
, pentru a verifica starea unui nod. Răspunde cu un 200 OK
dacă nodul funcționează
lazy val healthRoute: Route = pathPrefix("health") {
concat(
pathEnd {
concat(
get {
complete(StatusCodes.OK)
}
)
}
)
}
/status/members
punctul final răspunde cu membrii activi actuali ai clusterului.
lazy val statusRoutes: Route = pathPrefix("status") {
concat(
pathPrefix("members") {
concat(
pathEnd {
concat(
get {
val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]]
onSuccess(membersFuture) { members =>
complete(StatusCodes.OK, members)
}
}
)
}
)
}
)
}
Ultimul (dar nu cel mai mic) este /process/fibonacci/n
punct final, folosit pentru a solicita numărul Fibonacci al n
.
lazy val processRoutes: Route = pathPrefix("process") {
concat(
pathPrefix("fibonacci") {
concat(
path(IntNumber) { n =>
pathEnd {
concat(
get {
val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse]
onSuccess(processFuture) { response =>
complete(StatusCodes.OK, response)
}
}
)
}
}
)
}
)
}
Răspunde cu un ProcessorResponse
care conține rezultatul, împreună cu ID-ul nodului în care a avut loc calculul.
Configurare cluster
Odată ce avem toți actorii noștri, trebuie să configurăm sistemul pentru a rula ca un cluster! application.conf
fișierul este locul unde are loc magia. Voi împărți în bucăți pentru a-l prezenta mai bine, dar puteți găsi fișierul complet Aici.
Să începem să definim câteva variabile utile.
clustering {
ip = "127.0.0.1"
ip = ${?CLUSTER_IP}
port = 2552
port = ${?CLUSTER_PORT}
seed-ip = "127.0.0.1"
seed-ip = ${?CLUSTER_SEED_IP}
seed-port = 2552
seed-port = ${?CLUSTER_SEED_PORT}
cluster.name = "cluster-playground"
}
Aici definim pur și simplu ip-ul și portul nodurilor și semințelor, precum și numele cluster-ului. Setăm o valoare implicită, apoi o înlocuim dacă este specificată una nouă. Configurația clusterului este următoarea.
akka {
actor {
provider = "cluster"
...
/* router configuration */
...
}
remote {
log-remote-lifecycle-events = on
netty.tcp {
hostname = ${clustering.ip}
port = ${clustering.port}
}
}
cluster {
seed-nodes = [
"akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port}
]
auto-down-unreachable-after = 10s
}
}
...
/* server vars */
...
/* cluster vars */
}
Akka Cluster este construit pe partea de sus a Akka Remoting, deci trebuie să-l configurăm corect. În primul rând, specificăm că vom folosi Akka Cluster spunând asta provider = "cluster"
. Apoi ne legăm cluster.ip
și cluster.port
la hostname
și port
din netty
cadru web.
Clusterul necesită câteva noduri de semințe ca puncte de intrare. Le-am așezat în seed-nodes
matrice, în format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”
. În acest moment avem un singur nod de semințe, dar putem adăuga mai multe mai târziu.
auto-down-unreachable-after
proprietatea stabilește un membru ca fiind jos după ce nu este accesibil pentru o perioadă de timp. Acest lucru trebuie utilizat numai în timpul dezvoltării, după cum se explică în documentație oficială.
Ok, clusterul este configurat, putem trece la pasul următor: andocare și implementare!
Dockerizarea și implementarea
Pentru a crea containerul Docker al nodului nostru, îl putem folosi sbt-native-packager. Instalarea sa este ușoară: adăugați addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15")
la plugin.sbt
fișier în project/
pliant. Acest instrument uimitor are un plugin pentru crearea containerelor Docker. ne permite să configurăm proprietățile fișierului nostru Docker în build.sbt
fişier.
// other build.sbt properties
enablePlugins(JavaAppPackaging)
enablePlugins(DockerPlugin)
enablePlugins(AshScriptPlugin)
mainClass in Compile := Some("com.elleflorio.cluster.playground.Server")
dockerBaseImage := "java:8-jre-alpine"
version in Docker := "latest"
dockerExposedPorts := Seq(8000)
dockerRepository := Some("elleflorio")
Odată ce am configurat pluginul, putem crea imaginea de andocare care rulează comanda sbt docker:publishLocal
. Rulați comanda și gustați magia …?
Avem imaginea Docker a nodului nostru, acum trebuie să o implementăm și să verificăm dacă totul funcționează bine. Cel mai simplu mod este să creați un fișier docker-compose
fișier care va genera o sămânță și câteva alte noduri.
version: '3.5'
networks:
cluster-network:
services:
seed:
networks:
- cluster-network
image: elleflorio/akka-cluster-playground
ports:
- '2552:2552'
- '8000:8000'
environment:
SERVER_IP: 0.0.0.0
CLUSTER_IP: seed
CLUSTER_SEED_IP: seed
node1:
networks:
- cluster-network
image: elleflorio/akka-cluster-playground
ports:
- '8001:8000'
environment:
SERVER_IP: 0.0.0.0
CLUSTER_IP: node1
CLUSTER_PORT: 1600
CLUSTER_SEED_IP: seed
CLUSTER_SEED_PORT: 2552
node2:
networks:
- cluster-network
image: elleflorio/akka-cluster-playground
ports:
- '8002:8000'
environment:
SERVER_IP: 0.0.0.0
CLUSTER_IP: node2
CLUSTER_PORT: 1600
CLUSTER_SEED_IP: seed
CLUSTER_SEED_PORT: 2552
Nu voi petrece timp trecând prin el, deoarece este destul de simplu.
Să-l rulăm!
E timpul să ne testăm munca! Odată ce rulăm docker-compose up
comanda, vom avea un cluster de trei noduri în funcțiune. seed
va răspunde cererilor din port :8000
, in timp ce node1
și node2
la port :8001
și :8002
. Joacă puțin cu diferitele puncte finale. Veți vedea că cererile pentru un număr Fibonacci vor fi calculate de un nod diferit de fiecare dată, în urma unei politici round-robin. Este bine, suntem mândri de munca noastră și putem ieși la o bere pentru a sărbători! ?
Concluzie
Am terminat aici! Am învățat o mulțime de lucruri în aceste zece minute:
- Ce este clusterul Akka și ce poate face pentru noi.
- Cum să creați o aplicație distribuită cu aceasta.
- Cum se configurează un router de grup pentru echilibrarea încărcării în cluster.
- Cum să Dockerize totul și să-l implementeze folosind docker-compose.
Aplicația completă o puteți găsi în Repo GitHub. Simțiți-vă liber să contribuiți sau să vă jucați cu el după cum doriți! ?
Te văd! ?
#Cum #să #faci #aplicație #simplă #Akka #Cluster
Cum să faci o aplicație simplă cu Akka Cluster