de Jayvardhan Reddy

Scufundați-vă în interiorul Spark și în arhitectură

Scufundati va in interiorul Spark si in arhitectura
Credite de imagine: spark.apache.org

Apache Spark este un cadru de calcul de cluster cu scop general distribuit open-source. O aplicație spark este un proces JVM care rulează un cod de utilizator folosind spark ca o bibliotecă terță parte.

Ca parte a acestui blog, voi arăta modul în care Spark funcționează pe arhitectura Yarn cu un exemplu și diversele procese de fundal care sunt implicate, precum:

  • Spark Context
  • Yarn Resource Manager, Application Master și lansarea executanților (containere).
  • Configurarea variabilelor de mediu, a resurselor de locuri de muncă.
  • RPC bazat pe CoarseGrainedExecutorBackend și Netty.
  • SparkListeners.
  • Executarea unui loc de muncă (plan logic, plan fizic).
  • Spark-WebUI.

Spark Context

Contextul Spark este primul nivel al punctului de intrare și centrul oricărei aplicații spark. Scânteie nu este altceva decât un REPL bazat pe Scala cu binare scânteie care va crea un obiect sc numit context scânteie.

Putem lansa învelișul de scânteie așa cum se arată mai jos:

spark-shell --master yarn  --conf spark.ui.port=12345  --num-executors 3  --executor-cores 2  --executor-memory 500M

Ca parte a spark-shell, am menționat numii executanți. Acestea indică numărul de noduri lucrător care urmează să fie utilizate și numărul de nuclee pentru fiecare dintre aceste noduri lucrător pentru a executa sarcini în paralel.

Sau puteți lansa spark shell utilizând configurația implicită.

spark-shell --master yarn

Configurațiile sunt prezente ca parte a spark-env.sh

1612045090 394 Scufundati va in interiorul Spark si in arhitectura

Programul nostru Driver este executat pe nodul Gateway, care nu este altceva decât o scânteie. Acesta va crea un context spark și va lansa o aplicație.

1612045090 436 Scufundati va in interiorul Spark si in arhitectura

Obiectul context spark poate fi accesat folosind sc.

1612045091 17 Scufundati va in interiorul Spark si in arhitectura

După crearea contextului Spark, așteaptă resursele. Odată ce resursele sunt disponibile, contextul Spark configurează servicii interne și stabilește o conexiune la un mediu de execuție Spark.

Yarn Resource Manager, Application Master și lansarea executanților (containere).

Odată ce contextul Spark este creat, acesta va verifica cu Manager cluster și lansați Aplicație Master adică, lansează un container și înregistrează manipulatoare de semnal.

1612045091 378 Scufundati va in interiorul Spark si in arhitectura
1612045091 674 Scufundati va in interiorul Spark si in arhitectura

Odată ce aplicația Master este pornită, ea stabilește o conexiune cu driverul.

1612045092 397 Scufundati va in interiorul Spark si in arhitectura

Apoi, ApplicationMasterEndPoint declanșează o aplicație proxy pentru a se conecta la managerul de resurse.

1612045092 833 Scufundati va in interiorul Spark si in arhitectura

Acum, containerul de fire va efectua operațiunile de mai jos, așa cum se arată în diagramă.

1612045092 741 Scufundati va in interiorul Spark si in arhitectura
Credite de imagine: jaceklaskowski.gitbooks.io

ii) YarnRMClient se va înregistra la aplicația Master.

1612045093 842 Scufundati va in interiorul Spark si in arhitectura

iii) YarnAllocator: Va solicita 3 containere de executare, fiecare cu 2 nuclee și 884 MB memorie, inclusiv 384 MB overhead

1612045093 806 Scufundati va in interiorul Spark si in arhitectura

iv) AM pornește firul Reporter

1612045093 165 Scufundati va in interiorul Spark si in arhitectura

Acum, Yarn Allocator primește jetoane de la Driver pentru a lansa nodurile Executor și a porni containerele.

1612045094 139 Scufundati va in interiorul Spark si in arhitectura

Configurarea variabilelor de mediu, resursele jobului și lansarea containerelor.

De fiecare dată când este lansat un container, face următoarele 3 lucruri în fiecare dintre acestea.

  • Configurarea variabilelor env

Spark Runtime Environment (SparkEnv) este mediul de execuție cu serviciile Spark care sunt utilizate pentru a interacționa între ele pentru a stabili o platformă de calcul distribuită pentru o aplicație Spark.

1612045094 778 Scufundati va in interiorul Spark si in arhitectura
1612045094 970 Scufundati va in interiorul Spark si in arhitectura
  • Configurarea resurselor de locuri de muncă
1612045095 654 Scufundati va in interiorul Spark si in arhitectura
  • Lansarea containerului
1612045095 358 Scufundati va in interiorul Spark si in arhitectura

Contextul de lansare a executorului YARN atribuie fiecărui executant un ID de executant pentru a identifica executantul corespunzător (prin Spark WebUI) și pornește un CoarseGrainedExecutorBackend.

1612045096 257 Scufundati va in interiorul Spark si in arhitectura

RPC bazat pe CoarseGrainedExecutorBackend și Netty.

După obținerea resurselor de la Resource Manager, vom vedea executorul pornind

1612045096 641 Scufundati va in interiorul Spark si in arhitectura

CoarseGrainedExecutorBackend este un ExecutorBackend care controlează ciclul de viață al unui singur executant. Trimite statutul executantului șoferului.

Când ExecutorRunnable este pornit, CoarseGrainedExecutorBackend înregistrează punctul final Executor RPC și handlerele de semnal pentru a comunica cu driverul (adică cu punctul final CoarseGrainedScheduler RPC) și pentru a informa că este gata să lanseze sarcini.

1612045096 31 Scufundati va in interiorul Spark si in arhitectura

RPC bazat pe Netty – Se folosește pentru a comunica între nodurile lucrătorilor, a scânteia contextul, a executanților.

1612045097 727 Scufundati va in interiorul Spark si in arhitectura

NettyRPCEndPoint este utilizat pentru a urmări starea rezultatului nodului lucrător.

RpcEndpointAddress este adresa logică pentru un punct final înregistrat într-un mediu RPC, cu RpcAddress și nume.

Este în formatul prezentat mai jos:

1612045097 888 Scufundati va in interiorul Spark si in arhitectura

Acesta este primul moment când CoarseGrainedExecutorBackend inițiază comunicarea cu driverul disponibil la driverUrl prin RpcEnv.

1612045098 423 Scufundati va in interiorul Spark si in arhitectura

SparkListeners

1612045098 149 Scufundati va in interiorul Spark si in arhitectura
Credite de imagine: jaceklaskowski.gitbooks.io

SparkListener (ascultător Scheduler) este o clasă care ascultă evenimentele de execuție din DAGScheduler de la Spark și înregistrează toate informațiile despre evenimente ale unei aplicații, cum ar fi executorul, detaliile alocării driverului, împreună cu joburile, etapele și sarcinile și alte modificări ale proprietăților mediului.

SparkContext pornește LiveListenerBus care se află în interiorul driverului. Înregistrează JobProgressListener cu LiveListenerBus care colectează toate datele pentru a afișa statisticile în interfața spark.

În mod implicit, doar ascultătorul pentru WebUI ar fi activat, dar dacă dorim să adăugăm alți ascultători, îl putem folosi spark.extraListeners.

Spark vine cu doi ascultători care prezintă majoritatea activităților

i) StatsReportListener

ii) EventLoggingListener

EventLoggingListener: Dacă doriți să analizați în continuare performanța aplicațiilor dvs. dincolo de ceea ce este disponibil ca parte a serverului istoric Spark, puteți procesa datele jurnalului de evenimente. Jurnalul de evenimente Spark înregistrează informații despre lucrările / etapele / sarcinile procesate. Poate fi activat așa cum se arată mai jos …

1612045098 255 Scufundati va in interiorul Spark si in arhitectura

Fișierul jurnal de evenimente poate fi citit așa cum se arată mai jos

  • Driverul Spark se conectează la valorile sarcinii de lucru / perf din job în directorul spark.evenLog.dir ca fișiere JSON.
  • Există un fișier pentru fiecare aplicație, numele fișierelor conțin ID-ul aplicației (prin urmare, inclusiv un timestamp) application_1540458187951_38909.
1612045099 283 Scufundati va in interiorul Spark si in arhitectura

Afișează tipul de evenimente și numărul de intrări pentru fiecare.

1612045099 794 Scufundati va in interiorul Spark si in arhitectura

Acum, să adăugăm StatsReportListener la scânteie.extraListeners și verificați starea postului.

Activați nivelul de înregistrare INFO pentru jurnalul org.apache.spark.scheduler.StatsReportListener pentru a vedea evenimentele Spark.

1612045099 930 Scufundati va in interiorul Spark si in arhitectura

Pentru a activa ascultătorul, îl înregistrați la SparkContext. Se poate face în două moduri.

i) Utilizarea metodei SparkContext.addSparkListener (ascultător: SparkListener) în aplicația dvs. Spark.

Faceți clic pe link pentru a implementa ascultători personalizați – CustomListener

ii) Folosind opțiunea linie de comandă conf

1612045100 143 Scufundati va in interiorul Spark si in arhitectura

Să citim un fișier eșantion și să efectuăm o operație de numărare pentru a vedea StatsReportListener.

1612045100 644 Scufundati va in interiorul Spark si in arhitectura

Executarea unui loc de muncă (plan logic, plan fizic).

În Spark, RDD (set de date distribuit rezistent) este primul nivel al stratului de abstractizare. Este o colecție de elemente partiționate de-a lungul nodurilor clusterului care pot fi operate în paralel. RDD-urile pot fi create în 2 moduri.

i) Pparalelizant o colecție existentă în programul dvs. de driver

1612045100 809 Scufundati va in interiorul Spark si in arhitectura

ii) Referirea unui set de date într-un sistem de stocare extern

1612045101 861 Scufundati va in interiorul Spark si in arhitectura

RDD-urile sunt create fie utilizând un fișier în sistemul de fișiere Hadoop, fie o colecție Scala existentă în programul driver și transformându-l.

Să luăm un fragment de eșantion așa cum se arată mai jos

1612045101 937 Scufundati va in interiorul Spark si in arhitectura

Executarea fragmentului de mai sus are loc în 2 faze.

6.1 Planul logic: În această fază, un RDD este creat folosind un set de transformări, Ține evidența acestor transformări în programul driver prin construirea unui lanț de calcul (o serie de RDD) ca un grafic al transformărilor pentru a produce un RDD numit a Liniage Graph.

Transformările pot fi împărțite în continuare în 2 tipuri

  • Transformare îngustă: O conductă de operații care poate fi executată ca o singură etapă și nu necesită ca datele să fie amestecate pe partiții – de exemplu, Hartă, filtru etc.
1612045101 320 Scufundati va in interiorul Spark si in arhitectura

Acum datele vor fi citite în driver folosind variabila de difuzare.

1612045102 443 Scufundati va in interiorul Spark si in arhitectura
  • Transformare largă: Aici fiecare operație necesită amestecarea datelor, de acum înainte pentru fiecare transformare largă va fi creată o nouă etapă – de exemplu, reduceByKey etc.
1612045102 515 Scufundati va in interiorul Spark si in arhitectura

Putem vizualiza graficul genealogiei folosind toDebugString

1612045103 769 Scufundati va in interiorul Spark si in arhitectura

6.2 Plan fizic: În această fază, odată ce declanșăm o acțiune pe RDD, The Programator DAG privește linia RDD și vine cu cel mai bun plan de execuție cu etape și sarcini împreună cu TaskSchedulerImpl și execută lucrarea într-un set de sarcini în paralel.

1612045103 986 Scufundati va in interiorul Spark si in arhitectura

Odată ce efectuăm o operațiune de acțiune, SparkContext declanșează o lucrare și înregistrează RDD până la prima etapă (adică înainte de orice transformări largi) ca parte a programatorului DAGScheduler.

1612045103 143 Scufundati va in interiorul Spark si in arhitectura

Acum, înainte de a trece la etapa următoare (Transformări largi), va verifica dacă există date de partiție care urmează să fie amestecate și dacă există rezultate ale operației părinte lipsă de care depinde, dacă lipsește o astfel de etapă, atunci se re- execută acea parte a operației utilizând DAG (Graficul aciclic direcționat), care îl face tolerant la erori.

1612045104 317 Scufundati va in interiorul Spark si in arhitectura

În cazul sarcinilor lipsă, atribuie sarcini executanților.

1612045104 398 Scufundati va in interiorul Spark si in arhitectura

Fiecare sarcină este atribuită CoarseGrainedExecutorBackend a executantului.

1612045105 128 Scufundati va in interiorul Spark si in arhitectura

Obține informațiile despre bloc de la Namenode.

1612045105 511 Scufundati va in interiorul Spark si in arhitectura

acum, efectuează calculul și returnează rezultatul.

1612045105 459 Scufundati va in interiorul Spark si in arhitectura

Apoi, DAGScheduler caută etapele nou rulabile și declanșează operațiunea etapei următoare (reduceByKey).

1612045106 22 Scufundati va in interiorul Spark si in arhitectura

ShuffleBlockFetcherIterator primește blocarea blocurilor.

1612045106 868 Scufundati va in interiorul Spark si in arhitectura

Acum operația de reducere este împărțită în 2 sarcini și executată.

1612045106 683 Scufundati va in interiorul Spark si in arhitectura

La finalizarea fiecărei sarcini, executantul returnează rezultatul înapoi șoferului.

1612045107 539 Scufundati va in interiorul Spark si in arhitectura

După finalizarea lucrării, rezultatul este afișat.

1612045107 76 Scufundati va in interiorul Spark si in arhitectura

Spark-WebUI

Spark-UI ajută la înțelegerea fluxului de execuție a codului și a timpului necesar pentru a finaliza o anumită lucrare. Vizualizarea ajută la descoperirea oricăror probleme de bază care au loc în timpul execuției și la optimizarea aplicației spark.

Vom vedea vizualizarea Spark-UI ca parte a celei anterioare pasul 6.

Odată ce lucrarea este finalizată, puteți vedea detaliile lucrului, cum ar fi numărul de etape, numărul de sarcini care au fost programate în timpul execuției unui job.

1612045107 224 Scufundati va in interiorul Spark si in arhitectura

Dând clic pe joburile finalizate, putem vizualiza vizualizarea DAG, adică diferitele transformări largi și înguste ca parte a acesteia.

1612045108 160 Scufundati va in interiorul Spark si in arhitectura

Puteți vedea timpul de execuție luat de fiecare etapă.

1612045108 975 Scufundati va in interiorul Spark si in arhitectura

Când faceți clic pe o etapă specială ca parte a lucrării, va afișa detaliile complete cu privire la locul în care se află blocurile de date, dimensiunea datelor, executorul utilizat, memoria utilizată și timpul necesar pentru a finaliza o anumită sarcină. De asemenea, arată numărul de amestecuri care au loc.

1612045109 731 Scufundati va in interiorul Spark si in arhitectura

Mai departe, putem face clic pe fila Executori pentru a vizualiza Executorul și driverul folosit.

1612045109 667 Scufundati va in interiorul Spark si in arhitectura

Acum că am văzut cum funcționează Spark intern, puteți determina fluxul de execuție utilizând Spark UI, jurnale și modificând Spark EventListeners pentru a determina soluția optimă la trimiterea unui job Spark.

Notă: Comenzile care au fost executate legate de această postare sunt adăugate ca parte a mea GIT cont.

În mod similar, puteți citi mai multe aici:

Dacă doriți și dvs., vă puteți conecta cu mine pe LinkedIn – Jayvardhan Reddy.

Dacă ți-a plăcut să-l citești, poți să dai clic pe clap și să îi anunți pe ceilalți. Dacă doriți să mai adaug ceva, vă rugăm să nu ezitați să lăsați un răspuns?