Scala are o bibliotecă de streaming foarte specială numită FS2 (Fluxuri funcționale pentru Scala). Această bibliotecă întruchipează toate avantajele programării funcționale (FP). Înțelegând obiectivele sale de proiectare, veți primi expunerea la ideile de bază care fac FP atât de atrăgător.
FS2 are un tip central: Stream[Effect,Output]
S-ar putea să obțineți de la acest tip că este un Stream
și că emite valori de tip Output
.
Întrebarea evidentă aici este ce este Effect
? Care este legătura dintre Effect
și Output
? Și ce avantaje are FS2 față de alte biblioteci de streaming?
Conţinut
- 1 Prezentare generală
- 2 Ce probleme pot rezolva cu FS2?
- 3 List vs. Stream
- 4 Datele dvs. sunt prea mari și nu se încadrează în memorie
- 5 Încărcarea unui fișier mare folosind List
- 6 Când List nu va reuși … Transmiteți în salvare!
- 7 Cum pot alimenta date dintr-un API / fișier / DB către Stream?
- 8 Cum transform o Pure transmite la ceva util?
- 9 IMPORTANT: SINGURUL lucru care trebuie luat de la această postare
- 10 Separarea descrierii programului de evaluare
- 11 Ce face IO foarte rău?
- 12 Recapitulare
- 13 Concluzii
- 14 Videoclipuri FS2
Prezentare generală
Voi începe prin a revizui ce probleme rezolvă FS2. Apoi compar List
și Stream
cu mai multe exemple de cod. După aceea, mă voi concentra asupra modului de utilizare Stream
cu un DB sau orice alt IO. Acolo strălucește FS2 și unde Effect
se utilizează tipul. Odată ce vei înțelege ce Effect
adică avantajele programării funcționale ar trebui să vă fie evidente.
La sfârșitul acestei postări veți primi răspunsuri la următoarele întrebări:
- Ce probleme pot rezolva cu FS2?
- Cu ce pot face
Stream
aceaList
nu poti ? - Cum pot alimenta date dintr-un API / fișier / DB către
Stream
? - Ce este asta
Effect
tipul și cum se raportează la programarea funcțională?
Notă: Codul este în Scala și ar trebui să fie de înțeles chiar și fără cunoștințe prealabile despre sintaxă.
Ce probleme pot rezolva cu FS2?
- Streaming I / O: încărcarea seturilor de date mari, care nu ar încapea în memorie și funcționează pe ele fără a arunca heap-ul.
- Flux de control (neacoperit): mutarea datelor dintr-un / mai multe DB-uri / fișiere / API-uri către altele într-un mod declarativ frumos.
- Concurență (neacoperită): Rulați diferite fluxuri în paralel și faceți-i să comunice împreună. De exemplu, încărcarea datelor din mai multe fișiere și prelucrarea acestora simultan, spre deosebire de secvențial. Puteți face câteva lucruri avansate aici. Fluxurile pot comunica împreună pe parcursul etapa de procesare și nu numai la sfârșit.
List
vs. Stream
List
este cea mai cunoscută și utilizată structură de date. Pentru a avea o idee despre diferența față de un FS2 Stream
, vom trece prin câteva cazuri de utilizare. Vom vedea cum Stream
poate rezolva probleme care List
nu poti.
Datele dvs. sunt prea mari și nu se încadrează în memorie
Să presupunem că aveți un fișier foarte mare (40 GB) fahrenheit.txt
. Fișierul are o temperatură pe fiecare linie și doriți să îl convertiți în celsius.txt
.
Încărcarea unui fișier mare folosind List
import scala.io.Source
val list = Source.fromFile("testdata/fahrenheit.txt").getLines.toList
java.lang.OutOfMemoryError: Java heap space
java.util.Arrays.copyOfRange(Arrays.java:3664)
java.lang.String.<init>(String.java:207)
java.io.BufferedReader.readLine(BufferedReader.java:356)
java.io.BufferedReader.readLine(BufferedReader.java:389)
List
eșuează lamentabil, deoarece, desigur, fișierul este prea mare pentru a se potrivi în memorie. Dacă sunteți curios, puteți merge să verificați soluția completă folosind Stream
Aici – dar face asta mai târziu, citește mai departe 🙂
Când List nu va reuși … Transmiteți în salvare!
Să presupunem că am reușit să-mi citesc fișierul și vreau să îl scriu înapoi. Aș dori să păstrez structura liniei. Trebuie să inserez un caracter de linie nouă n
după fiecare temperatură.
Pot folosi intersperse
combinator pentru a face asta
import fs2._
Stream(1,2,3,4).intersperse("n").toList
Un alt simpatic este zipWithNext
scala> Stream(1,2,3,4).zipWithNext.toList
res1: List[(Int, Option[Int])] = List((1,Some(2)), (2,Some(3)), (3,Some(4)), (4,None))
Împachetează lucruri consecutive, foarte utile dacă doriți eliminați duplicatele consecutive.
Acestea sunt doar câteva dintre cele foarte utile, aici este lista plina.
Evident Stream
poate face o mulțime de lucruri care List
nu poate, dar cea mai bună caracteristică vine în secțiunea următoare, este vorba despre modul de utilizare Stream
în lumea reală cu DB-uri / fișiere / API-uri …
Cum pot alimenta date dintr-un API / fișier / DB către Stream
?
Să spunem doar pentru moment că acesta este programul nostru
scala> Stream(1,2,3)
res2: fs2.Stream[fs2.Pure,Int] = Stream(..)
Ce face aceasta Pure
Rău? Iată scaladocul din codul sursă:
/**
* Indicates that a stream evaluates no effects.
*
* A `Stream[Pure,O]` can be safely converted to a `Stream[F,O]` for all `F`.
*/
type Pure[A] <: Nothing
Nu înseamnă efecte, ok …, dar Ce este un efect? și mai precis care este efectul programului nostru Stream(1,2,3)
?
Acest program nu are literalmente efect în lume. Singurul său efect va fi să-ți facă procesorul să funcționeze și să consume puțină energie !! Nu afectează lumea din jurul tău.
Prin afectarea lumii mă refer la asta consumă orice resursă semnificativă, cum ar fi un fișier, o bază de date sau acesta produce orice ca un fișier, încărcarea unor date undeva, scrierea la terminalul dvs. și așa mai departe.
Cum transform o Pure
transmite la ceva util?
Să presupunem că vreau să încarc ID-uri de utilizator dintr-un DB, mi se oferă această funcție, face un apel către DB și returnează userId Long
.
import scala.concurrent.Future
def loadUserIdByName(userName: String): Future[Long] = ???
Se întoarce un Future
ceea ce indică faptul că acest apel este asincron și valoarea va fi disponibilă la un moment dat în viitor. Înfășoară valoarea returnată de DB.
am asta Pure
curent.
scala> val names = Stream("bob", "alice", "joe")
names: fs2.Stream[fs2.Pure,String] = Stream(..)
Cum obțin un Stream
de id-uri?
Abordarea naivă ar fi folosirea map
funcție, ar trebui să ruleze funcția pentru fiecare valoare din Stream
.
scala> userIdsFromDB.compile
res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = fs2.Stream$ToEffect@fc0f18da
Încă m-am întors un Pure
! Am dat Stream
o funcție care afectează lumea și încă am primit un Pure
, nu mișto … Ar fi fost îngrijit dacă FS2 ar fi detectat automat că loadUserIdByName
funcția are un efect pe lume și mi-a returnat ceva care NU este Pure
dar nu funcționează așa. Trebuie să utilizați un combinator special în loc de map
: trebuie sa folosesti evalMap
.
scala> userIdsFromDB.toList
<console>:18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long]
userIdsFromDB.toList
^
Nu mai Pure
! avem Future
în schimb, da! Ce s-a intamplat?
A luat:
loadUserIdByName: Future[Long]
Stream[Pure, String]
Și am schimbat tipurile de flux la
Stream[Future, Long]
A separat Future
și l-am izolat! Partea stângă care era Effect
parametrul de tip este acum concretul Future
tip.
Șmecherie îngrijită, dar cum mă ajută?
Tocmai ai fost martor la adevăr separarea preocupărilor. Puteți continua să operați pe flux cu tot ceea ce este frumos List
cum ar fi combinatorii și nu trebuie să vă faceți griji dacă DB nu funcționează, este lent sau toate lucrurile care au legătură cu rețeaua (efectul).
Totul funcționează până când vreau să îl folosesc toList
pentru a recupera valorile
scala> userIdsFromDB.toList
<console>:18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long]
userIdsFromDB.toList
^
Ce???!!! Aș putea să jur că am folosit toList
înainte și a funcționat, cum se poate spune asta toList
nu este membru al fs2.Stream[Future,String]
mai mult? Este ca și cum această funcție a fost eliminată în momentul în care am început să folosesc un flux efectiv, este impresionant! Dar cum îmi recuperez valorile?
scala> userIdsFromDB.compile
res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = fs2.Stream$ToEffect@fc0f18da
Mai întâi folosim compile
să le spun Stream
pentru a combina toate efectele într-unul singur, efectiv pliază toate apelurile către loadUserIdByName
într-un mare Future
. Acest lucru este necesar în cadrul și va deveni evident de ce acest pas este necesar în curând.
Acum toList
ar trebui să funcționeze
scala> userIdsFromDB.compile.toList
<console>:18: error: could not find implicit value for parameter F: cats.effect.Sync[scala.concurrent.Future]
userIdsFromDB.compile.toList
^
Ce?! compilatorul încă se plânge. Asta-i pentru că Future
nu este un bun Effect
tip – rupe filosofia separării preocupărilor, așa cum este explicat în următoarea secțiune foarte importantă.
IMPORTANT: SINGURUL lucru care trebuie luat de la această postare
Un punct cheie aici este că DB nu a fost apelat în acest moment. Nu s-a întâmplat nimic, programul complet nu produce nimic.
def loadUserIdByName(userName: String): Future[Long] = ???
Stream("bob", "alice", "joe").evalMap(loadUserIdByName).compile
Separarea descrierii programului de evaluare
Da, ar putea fi surprinzător, dar tema principală din FP este separarea
- Descriere programului dvs.: un exemplu bun este programul pe care tocmai l-am scris, este o descriere pură a problemei „Vă dau nume și un DB, dați-mi înapoi ID-uri”
Si
- Execuţie al programului dvs.: rularea codului real și solicitarea acestuia să meargă la DB
Încă o dată programul nostru nu are literalmente efect pe lume, în afară de a vă încălzi computerul, exact ca al nostru Pure
curent.
Se numește cod care nu are efect pur și despre asta este vorba despre toată programarea funcțională: scrierea programelor cu funcții care sunt pur. Bravo, acum știi ce înseamnă FP.
De ce ai vrea să scrii cod în acest fel? Simplu: pentru a realiza separarea preocupărilor între părțile IO și restul codului nostru.
Acum hai să reparăm programul și să ne ocupăm de asta Future
problemă.
După cum am spus Future
este un rău Effect
de tip, merge împotriva principiului separării preocupărilor. Într-adevăr, Future
este nerăbdător în Scala: în momentul în care îl creați, începe să se execute pe un fir, nu aveți controlul execuției și astfel se rupe. FS2 este conștient de acest lucru și nu vă permite să compilați. Pentru a remedia acest lucru, trebuie să folosim un tip numit IO
care ne înfășoară răul Future
.
Asta ne aduce la ultima parte, ce este aceasta IO
tip? și cum obțin în cele din urmă lista mea usedIds
înapoi?
scala> import cats.effect.IO
import cats.effect.IO
scala> Stream("bob", "alice", "joe").evalMap(name => IO.fromFuture(IO(loadUserIdByName(name)))).compile.toList
res8: cats.effect.IO[List[Long]] = IO$2104439279
Acum ne dă înapoi un List
dar totuși, nu ne-am recuperat actele de identitate, așa că trebuie să lipsească un ultim lucru.

Ce face IO
foarte rău?
IO
vine de la biblioteca cu efect de pisici. Mai întâi să terminăm programul și, în cele din urmă, să scoatem ID-urile din DB.
scala> userIds.compile.toList.unsafeRunSync
<console>:18: error: not found: value userIds
userIds.compile.toList.unsafeRunSync
^
Dovada că face ceva este faptul că eșuează.
loadUserIdByName(userName: String): Future[Long] = ???
Când ???
este numit, veți obține această excepție, înseamnă că funcția a fost executată (spre deosebire de înainte, când am subliniat că nimic nu se întâmplă cu adevărat). Când implementăm această funcție, va merge la DB și va încărca ID-urile și va avea un efect pe lume (rețea / sistem de fișiere).
IO[Long]
este un Descriere de Cum pentru a obține o valoare de tip Long
și cu siguranță implică efectuarea unor I / O, adică accesarea rețelei, încărcarea unui fișier, …
Este Cum și nu Ce. Acesta descrie cum să obțineți valoarea din rețea. Dacă doriți să executați această descriere, puteți utiliza unsafeRunSync
(sau alte funcții prefixate unsafe
). Puteți ghici de ce sunt numiți astfel: într-adevăr, un apel către o bază de date este inerent nesigur, deoarece ar putea eșua dacă, de exemplu, conexiunea dvs. la internet este întreruptă.
Recapitulare
Să aruncăm o ultimă privire Stream[Effect,Output]
.
Output
este tipul pe care îl emite fluxul (poate fi un flux de String
, Long
sau orice tip ați definit).
Effect
este modul (rețeta) de a produce Output
(adică mergeți la DB și dați-mi un id
de tip Long
).
Este important să înțelegeți că, dacă aceste tipuri sunt separate pentru a ușura lucrurile, descompunerea unei probleme în subprobleme vă permite să argumentați independent de subprobleme. Puteți apoi să le rezolvați și să le combinați soluțiile.
Legătura dintre aceste 2 tipuri este următoarea:
Pentru ca Stream
să emită un element de tip
Output
Trebuie să evalueze un tip
Effect
Un tip special care codifică o acțiune eficientă ca valoare de tip IO
, acest IO
valoarea permite separarea a 2 preocupări:
-
Descriere:
IO
este o valoare imuabilă simplă, este o rețetă pentru a obține un tipA
făcând un fel de IO (rețea / sistem de fișiere / …) -
Execuţie: in asa fel incat
IO
pentru a face ceva, trebuie executați-l / rulați-l folosindio.unsafeRunSync
Punând totul împreună
Stream[IO,Long]
spune:
Acesta este un Stream
care emite valori de tip Long
și pentru a face acest lucru, trebuie să ruleze un efectiv funcție care produceIO[Long]
pentru fiecare valoare.
Sunt multe detalii în acest tip foarte scurt. Cu cât obțineți mai multe detalii despre cum se întâmplă lucrurile, cu atât faceți mai puține erori.
Concluzii
-
Stream
este un super încărcat versiune aList
-
Stream(1,2,3)
este de tipStream[Pure, Int]
, al doilea tipInt
este tipul tuturor valorilor pe care le va emite acest flux -
Pure
înseamnă nu efect în lume. Pur și simplu îți face procesorul să funcționeze și consumă puțină energie, dar pe lângă asta nu afectează lumea din jur. - Utilizare
evalMap
în loc demap
atunci când doriți să aplicați o funcție care are un efect caloadUserIdByName
la oStream
. -
Stream[IO, Long]
separă preocupările de Ce și Cum, permițându-vă să lucrați numai cu valorile și să nu vă faceți griji cu privire la cum să le obțineți (încărcarea din db). - Separarea descrierii programului de evaluare este un aspect cheie al PC.
- Toate programele cu care scrii
Stream
nu va face nimic până nu îl folosițiunsafeRunSync
. Înainte de aceasta, codul dvs. este eficient pur. -
IO[Long]
este un tip de efect care vă spune: veți obțineLong
valori din IO (ar putea fi un fișier, rețeaua, consola …). Este o descriere și nu un wrapper! R -
Future
nu respectă această filozofie și, prin urmare, nu este compatibil cu FS2, trebuie să utilizațiIO
tastați în schimb.
Videoclipuri FS2
- Hands on screencast de Michael Pilquist: https://www.youtube.com/watch?v=B1wb4fIdtn4
- Discuție de Fabio Labella https://www.youtube.com/watch?v=x3GLwl1FxcA
#bibliotecă #streaming #superputere #FS2 #și #programare #funcțională
O bibliotecă de streaming cu o superputere: FS2 și programare funcțională