Scala are o bibliotecă de streaming foarte specială numită FS2 (Fluxuri funcționale pentru Scala). Această bibliotecă întruchipează toate avantajele programării funcționale (FP). Înțelegând obiectivele sale de proiectare, veți primi expunerea la ideile de bază care fac FP atât de atrăgător.

FS2 are un tip central: Stream[Effect,Output]

S-ar putea să obțineți de la acest tip că este un Stream și că emite valori de tip Output.

Întrebarea evidentă aici este ce este Effect? Care este legătura dintre Effect și Output? Și ce avantaje are FS2 față de alte biblioteci de streaming?

Prezentare generală

Voi începe prin a revizui ce probleme rezolvă FS2. Apoi compar List și Stream cu mai multe exemple de cod. După aceea, mă voi concentra asupra modului de utilizare Stream cu un DB sau orice alt IO. Acolo strălucește FS2 și unde Effectse utilizează tipul. Odată ce vei înțelege ce Effect adică avantajele programării funcționale ar trebui să vă fie evidente.

ad-banner

La sfârșitul acestei postări veți primi răspunsuri la următoarele întrebări:

  • Ce probleme pot rezolva cu FS2?
  • Cu ce ​​pot face Stream acea List nu poti ?
  • Cum pot alimenta date dintr-un API / fișier / DB către Stream ?
  • Ce este asta Effect tipul și cum se raportează la programarea funcțională?

Notă: Codul este în Scala și ar trebui să fie de înțeles chiar și fără cunoștințe prealabile despre sintaxă.

Ce probleme pot rezolva cu FS2?

  1. Streaming I / O: încărcarea seturilor de date mari, care nu ar încapea în memorie și funcționează pe ele fără a arunca heap-ul.
  2. Flux de control (neacoperit): mutarea datelor dintr-un / mai multe DB-uri / fișiere / API-uri către altele într-un mod declarativ frumos.
  3. Concurență (neacoperită): Rulați diferite fluxuri în paralel și faceți-i să comunice împreună. De exemplu, încărcarea datelor din mai multe fișiere și prelucrarea acestora simultan, spre deosebire de secvențial. Puteți face câteva lucruri avansate aici. Fluxurile pot comunica împreună pe parcursul etapa de procesare și nu numai la sfârșit.

List vs. Stream

List este cea mai cunoscută și utilizată structură de date. Pentru a avea o idee despre diferența față de un FS2 Stream, vom trece prin câteva cazuri de utilizare. Vom vedea cum Stream poate rezolva probleme care List nu poti.

Datele dvs. sunt prea mari și nu se încadrează în memorie

Să presupunem că aveți un fișier foarte mare (40 GB) fahrenheit.txt. Fișierul are o temperatură pe fiecare linie și doriți să îl convertiți în celsius.txt.

Încărcarea unui fișier mare folosind List

import scala.io.Source
val list = Source.fromFile("testdata/fahrenheit.txt").getLines.toList
java.lang.OutOfMemoryError: Java heap space
  java.util.Arrays.copyOfRange(Arrays.java:3664)
  java.lang.String.<init>(String.java:207)
  java.io.BufferedReader.readLine(BufferedReader.java:356)
  java.io.BufferedReader.readLine(BufferedReader.java:389)

List eșuează lamentabil, deoarece, desigur, fișierul este prea mare pentru a se potrivi în memorie. Dacă sunteți curios, puteți merge să verificați soluția completă folosind Stream Aici – dar face asta mai târziu, citește mai departe 🙂

Când List nu va reuși … Transmiteți în salvare!

Să presupunem că am reușit să-mi citesc fișierul și vreau să îl scriu înapoi. Aș dori să păstrez structura liniei. Trebuie să inserez un caracter de linie nouă n după fiecare temperatură.

Pot folosi intersperse combinator pentru a face asta

import fs2._
Stream(1,2,3,4).intersperse("n").toList

Un alt simpatic este zipWithNext

scala> Stream(1,2,3,4).zipWithNext.toList
res1: List[(Int, Option[Int])] = List((1,Some(2)), (2,Some(3)), (3,Some(4)), (4,None))

Împachetează lucruri consecutive, foarte utile dacă doriți eliminați duplicatele consecutive.

Acestea sunt doar câteva dintre cele foarte utile, aici este lista plina.

Evident Stream poate face o mulțime de lucruri care List nu poate, dar cea mai bună caracteristică vine în secțiunea următoare, este vorba despre modul de utilizare Stream în lumea reală cu DB-uri / fișiere / API-uri …

Cum pot alimenta date dintr-un API / fișier / DB către Stream?

Să spunem doar pentru moment că acesta este programul nostru

scala> Stream(1,2,3)
res2: fs2.Stream[fs2.Pure,Int] = Stream(..)

Ce face aceasta Pure Rău? Iată scaladocul din codul sursă:

/**
    * Indicates that a stream evaluates no effects.
    *
    * A `Stream[Pure,O]` can be safely converted to a `Stream[F,O]` for all `F`.
*/
type Pure[A] <: Nothing

Nu înseamnă efecte, ok …, dar Ce este un efect? și mai precis care este efectul programului nostru Stream(1,2,3)?

Acest program nu are literalmente efect în lume. Singurul său efect va fi să-ți facă procesorul să funcționeze și să consume puțină energie !! Nu afectează lumea din jurul tău.

Prin afectarea lumii mă refer la asta consumă orice resursă semnificativă, cum ar fi un fișier, o bază de date sau acesta produce orice ca un fișier, încărcarea unor date undeva, scrierea la terminalul dvs. și așa mai departe.

Cum transform o Pure transmite la ceva util?

Să presupunem că vreau să încarc ID-uri de utilizator dintr-un DB, mi se oferă această funcție, face un apel către DB și returnează userId Long.

import scala.concurrent.Future
def loadUserIdByName(userName: String): Future[Long] = ???

Se întoarce un Future ceea ce indică faptul că acest apel este asincron și valoarea va fi disponibilă la un moment dat în viitor. Înfășoară valoarea returnată de DB.

am asta Pure curent.

scala> val names = Stream("bob", "alice", "joe")
names: fs2.Stream[fs2.Pure,String] = Stream(..)

Cum obțin un Stream de id-uri?

Abordarea naivă ar fi folosirea map funcție, ar trebui să ruleze funcția pentru fiecare valoare din Stream.

scala> userIdsFromDB.compile
res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = fs2.Stream$ToEffect@fc0f18da

Încă m-am întors un Pure! Am dat Stream o funcție care afectează lumea și încă am primit un Pure, nu mișto … Ar fi fost îngrijit dacă FS2 ar fi detectat automat că loadUserIdByName funcția are un efect pe lume și mi-a returnat ceva care NU este Pure dar nu funcționează așa. Trebuie să utilizați un combinator special în loc de map: trebuie sa folosesti evalMap.

scala> userIdsFromDB.toList
<console>:18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long]
       userIdsFromDB.toList
                     ^

Nu mai Pure! avem Future în schimb, da! Ce s-a intamplat?

A luat:

  • loadUserIdByName: Future[Long]
  • Stream[Pure, String]

Și am schimbat tipurile de flux la

  • Stream[Future, Long]

A separat Future și l-am izolat! Partea stângă care era Effect parametrul de tip este acum concretul Future tip.

Șmecherie îngrijită, dar cum mă ajută?

Tocmai ai fost martor la adevăr separarea preocupărilor. Puteți continua să operați pe flux cu tot ceea ce este frumos List cum ar fi combinatorii și nu trebuie să vă faceți griji dacă DB nu funcționează, este lent sau toate lucrurile care au legătură cu rețeaua (efectul).

Totul funcționează până când vreau să îl folosesc toList pentru a recupera valorile

scala> userIdsFromDB.toList
<console>:18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long]
       userIdsFromDB.toList
                     ^

Ce???!!! Aș putea să jur că am folosit toList înainte și a funcționat, cum se poate spune asta toList nu este membru al fs2.Stream[Future,String] mai mult? Este ca și cum această funcție a fost eliminată în momentul în care am început să folosesc un flux efectiv, este impresionant! Dar cum îmi recuperez valorile?

scala> userIdsFromDB.compile
res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = fs2.Stream$ToEffect@fc0f18da

Mai întâi folosim compile să le spun Stream pentru a combina toate efectele într-unul singur, efectiv pliază toate apelurile către loadUserIdByName într-un mare Future. Acest lucru este necesar în cadrul și va deveni evident de ce acest pas este necesar în curând.

Acum toList ar trebui să funcționeze

scala> userIdsFromDB.compile.toList
<console>:18: error: could not find implicit value for parameter F: cats.effect.Sync[scala.concurrent.Future]
       userIdsFromDB.compile.toList
                             ^

Ce?! compilatorul încă se plânge. Asta-i pentru că Future nu este un bun Effect tip – rupe filosofia separării preocupărilor, așa cum este explicat în următoarea secțiune foarte importantă.

IMPORTANT: SINGURUL lucru care trebuie luat de la această postare

Un punct cheie aici este că DB nu a fost apelat în acest moment. Nu s-a întâmplat nimic, programul complet nu produce nimic.

def loadUserIdByName(userName: String): Future[Long] = ???
Stream("bob", "alice", "joe").evalMap(loadUserIdByName).compile

Separarea descrierii programului de evaluare

Da, ar putea fi surprinzător, dar tema principală din FP este separarea

  • Descriere programului dvs.: un exemplu bun este programul pe care tocmai l-am scris, este o descriere pură a problemei „Vă dau nume și un DB, dați-mi înapoi ID-uri”

Si

  • Execuţie al programului dvs.: rularea codului real și solicitarea acestuia să meargă la DB

Încă o dată programul nostru nu are literalmente efect pe lume, în afară de a vă încălzi computerul, exact ca al nostru Pure curent.

Se numește cod care nu are efect pur și despre asta este vorba despre toată programarea funcțională: scrierea programelor cu funcții care sunt pur. Bravo, acum știi ce înseamnă FP.

De ce ai vrea să scrii cod în acest fel? Simplu: pentru a realiza separarea preocupărilor între părțile IO și restul codului nostru.

Acum hai să reparăm programul și să ne ocupăm de asta Future problemă.

După cum am spus Future este un rău Effect de tip, merge împotriva principiului separării preocupărilor. Într-adevăr, Future este nerăbdător în Scala: în momentul în care îl creați, începe să se execute pe un fir, nu aveți controlul execuției și astfel se rupe. FS2 este conștient de acest lucru și nu vă permite să compilați. Pentru a remedia acest lucru, trebuie să folosim un tip numit IO care ne înfășoară răul Future.

Asta ne aduce la ultima parte, ce este aceasta IO tip? și cum obțin în cele din urmă lista mea usedIds înapoi?

scala> import cats.effect.IO
import cats.effect.IO
scala> Stream("bob", "alice", "joe").evalMap(name => IO.fromFuture(IO(loadUserIdByName(name)))).compile.toList
res8: cats.effect.IO[List[Long]] = IO$2104439279

Acum ne dă înapoi un List dar totuși, nu ne-am recuperat actele de identitate, așa că trebuie să lipsească un ultim lucru.

O biblioteca de streaming cu o superputere FS2 si programare

Ce face IO foarte rău?

IO vine de la biblioteca cu efect de pisici. Mai întâi să terminăm programul și, în cele din urmă, să scoatem ID-urile din DB.

scala> userIds.compile.toList.unsafeRunSync
<console>:18: error: not found: value userIds
       userIds.compile.toList.unsafeRunSync
       ^

Dovada că face ceva este faptul că eșuează.

loadUserIdByName(userName: String): Future[Long] = ???

Când ??? este numit, veți obține această excepție, înseamnă că funcția a fost executată (spre deosebire de înainte, când am subliniat că nimic nu se întâmplă cu adevărat). Când implementăm această funcție, va merge la DB și va încărca ID-urile și va avea un efect pe lume (rețea / sistem de fișiere).

IO[Long] este un Descriere de Cum pentru a obține o valoare de tip Long și cu siguranță implică efectuarea unor I / O, adică accesarea rețelei, încărcarea unui fișier, …

Este Cum și nu Ce. Acesta descrie cum să obțineți valoarea din rețea. Dacă doriți să executați această descriere, puteți utiliza unsafeRunSync (sau alte funcții prefixate unsafe). Puteți ghici de ce sunt numiți astfel: într-adevăr, un apel către o bază de date este inerent nesigur, deoarece ar putea eșua dacă, de exemplu, conexiunea dvs. la internet este întreruptă.

Recapitulare

Să aruncăm o ultimă privire Stream[Effect,Output].

Output este tipul pe care îl emite fluxul (poate fi un flux de String, Long sau orice tip ați definit).

Effect este modul (rețeta) de a produce Output (adică mergeți la DB și dați-mi un id de tip Long).

Este important să înțelegeți că, dacă aceste tipuri sunt separate pentru a ușura lucrurile, descompunerea unei probleme în subprobleme vă permite să argumentați independent de subprobleme. Puteți apoi să le rezolvați și să le combinați soluțiile.

Legătura dintre aceste 2 tipuri este următoarea:

Pentru ca Stream să emită un element de tip

  • Output

Trebuie să evalueze un tip

  • Effect

Un tip special care codifică o acțiune eficientă ca valoare de tip IO, acest IO valoarea permite separarea a 2 preocupări:

  • Descriere:IO este o valoare imuabilă simplă, este o rețetă pentru a obține un tip A făcând un fel de IO (rețea / sistem de fișiere / …)
  • Execuţie: in asa fel incatIO pentru a face ceva, trebuie executați-l / rulați-l folosind io.unsafeRunSync

Punând totul împreună

Stream[IO,Long] spune:

Acesta este un Stream care emite valori de tip Long și pentru a face acest lucru, trebuie să ruleze un efectiv funcție care produceIO[Long] pentru fiecare valoare.

Sunt multe detalii în acest tip foarte scurt. Cu cât obțineți mai multe detalii despre cum se întâmplă lucrurile, cu atât faceți mai puține erori.

Concluzii

  • Stream este un super încărcat versiune a List
  • Stream(1,2,3) este de tip Stream[Pure, Int] , al doilea tip Int este tipul tuturor valorilor pe care le va emite acest flux
  • Pure înseamnă nu efect în lume. Pur și simplu îți face procesorul să funcționeze și consumă puțină energie, dar pe lângă asta nu afectează lumea din jur.
  • Utilizare evalMap în loc de map atunci când doriți să aplicați o funcție care are un efect ca loadUserIdByName la o Stream.
  • Stream[IO, Long] separă preocupările de Ce și Cum, permițându-vă să lucrați numai cu valorile și să nu vă faceți griji cu privire la cum să le obțineți (încărcarea din db).
  • Separarea descrierii programului de evaluare este un aspect cheie al PC.
  • Toate programele cu care scrii Stream nu va face nimic până nu îl folosiți unsafeRunSync. Înainte de aceasta, codul dvs. este eficient pur.
  • IO[Long] este un tip de efect care vă spune: veți obține Long valori din IO (ar putea fi un fișier, rețeaua, consola …). Este o descriere și nu un wrapper! R
  • Future nu respectă această filozofie și, prin urmare, nu este compatibil cu FS2, trebuie să utilizați IO tastați în schimb.

Videoclipuri FS2