Limbile JVM, cum ar fi Java și Scala, au capacitatea de a rula cod simultan folosind Thread clasă. Firele sunt notoriu complexe și foarte predispuse la erori, astfel încât să aveți o înțelegere solidă a modului în care funcționează este esențial.

Să începem cu Javadoc pentru Thread.sleep:

Provoacă executarea curentă fir a dormi (temporar încetează executarea) pentru numărul specificat de milisecunde

Care sunt implicațiile cease execution, de asemenea cunoscut ca si blocare, si ce inseamna asta? Este rau? Și dacă da, putem realiza somn neblocant?

Ce vom acoperi în acest articol

Această postare acoperă o mulțime de teren și, sperăm, veți învăța o mulțime de lucruri interesante.

  • Ce se întâmplă la Nivelul sistemului de operare când dormiți?
  • problemă cu somnul
  • Project Loom și fire virtuale
  • Programare funcțională și proiectare
  • ZIO Biblioteca Scala pentru concurență

Da, toate acestea apar mai jos.

Dar mai întâi, să începem cu acest fragment Scala simplu pe care îl vom schimba pe tot parcursul postării pentru a realiza ceea ce dorim:

println("a")
Thread.sleep(1000)
println("b")

Este destul de simplu: tipărește „a” și apoi 10 secunde mai târziu în tipărește „b”

Să ne concentrăm asupra Thread.sleep și încearcă să înțelegi CUM realizează somnul. Odată ce vom înțelege cum, vom putea vedea problema și o vom defini mai concret.

Cum funcționează somnul la nivelul sistemului de operare?

Iată ce se întâmplă când suni Thread.sleep sub capotă.

  • Apelează API-ul thread al sistemului de operare subiacent
  • Deoarece JVM folosește o mapare unu la unu între firele Java și kernel, solicită sistemului de operare să renunțe la „drepturile” firului la CPU pentru timpul specificat
  • Când s-a scurs timpul, programatorul de sistem de operare va trezi firul printr-o întrerupere (acest lucru este eficient) și îi va atribui o porțiune CPU pentru a-i permite să reia rularea

Punctul critic aici este că firul adormit este complet scos și nu este refolosibil în timpul somnului.

Limitările firelor

Iată câteva limitări importante care vin cu firele:

  • Există o limită a numărului de fire pe care le puteți crea. După aproximativ 30K, veți primi această eroare:
java.lang.OutOfMemoryError : unable to create new native Thread
  • Subiectele JVM pot fi scumpe de creat în ceea ce privește memoria, deoarece vin cu o stivă dedicată
  • Prea multe fire JVM vor suporta cheltuieli generale din cauza comutatoarelor contextuale costisitoare și a modului în care acestea împart resurse hardware finite

Acum, că înțelegem mai multe despre ceea ce se întâmplă în culise, să revenim la problema somnului.

Problema cu somnul

Să definim problema mai concret și să rulăm un fragment pentru a arăta problema cu care ne confruntăm. Vom folosi această funcție pentru a ilustra punctul:

def task(id: Int): Runnable = () => 
{
  println(s"${Thread.currentThread().getName()} start-$id")
  Thread.sleep(10000)
  println(s"${Thread.currentThread().getName()} end-$id")
}

Această funcție simplă va

  • imprimare start urmat de ID-ul firului
  • doarme 10 secunde
  • imprimare end urmat de ID-ul firului

Misiunea dvs. dacă o acceptați este să rulați 2 sarcini concomitent cu 1 fir

Vrem să executăm 2 sarcini simultan, ceea ce înseamnă că întregul program ar trebui să dureze în total 10 secunde. Dar avem doar 1 fir disponibil.

Ești pregătit pentru această provocare?

Să ne jucăm puțin cu numărul de sarcini și fire pentru a obține o înțelegere exactă a problemei.

1 task -> 1 thread

new Thread(task(1)).start()
12:11:08 INFO  Thread-0 start-1
12:11:18 INFO  Thread-0 end-1

Hai să aprindem jvisualvm pentru a verifica ce face firul:

Cum se foloseste Threadsleep fara blocare pe JVM

Puteți vedea că firul-0 este în mov sleeping stat.

Dacă apăsați butonul de descărcare a firului, se va imprima acest lucru:

"Thread-0" #13 prio=5 os_prio=31 tid=0x00007f9a3e0e2000 nid=0x5b03 waiting on condition [0x0000700004ac8000]  
  java.lang.Thread.State: TIMED_WAITING (sleeping)
  at java.lang.Thread.sleep(Native Method)
  at example.Blog$.$anonfun$task$1(Blog.scala:7)
  at example.Blog$$$Lambda$2/1359484306.run(Unknown Source)
  at java.lang.Thread.run(Thread.java:748)
  Locked ownable synchronizers:        - None

În mod clar, acest fir nu mai poate fi utilizat până când nu termină de dormit.

2 tasks -> 1 thread

Să ilustrăm problema executând 2 astfel de activități cu un singur fir disponibil:

import java.util.concurrent.Executors

// an executor with only 1 thread available
val oneThreadExecutor = Executors.newFixedThreadPool(1)

// send 2 tasks to the executor
(1 to 2).foreach(id =>
   oneThreadExecutor.execute(task(id)))

Obținem această ieșire:

2020.09.28 21:49:56 INFO  pool-1-thread-1 start-1
2020.09.28 21:50:07 INFO  pool-1-thread-1 end-1
2020.09.28 21:50:07 INFO  pool-1-thread-1 start-2
2020.09.28 21:50:17 INFO  pool-1-thread-1 end-2
1612172949 529 Cum se foloseste Threadsleep fara blocare pe JVM

Puteți vedea culoarea mov (stare de dormit) pentru pool-1-thread-1. Sarcinile nu au de ales decât să ruleze una după alta, deoarece firul este scos de fiecare dată Thread.sleep este folosit.

2 tasks -> 2 threads

Să rulăm același cod cu 2 fire disponibile. Obținem acest lucru:

// an executor with 2 threads available
val oneThreadExecutor = Executors.newFixedThreadPool(2)

// send 2 tasks to the executor
(1 to 2).foreach(id =>
   oneThreadExecutor.execute(task(id)))
2020.09.28 22:42:04 INFO  pool-1-thread-2 start-2
2020.09.28 22:42:04 INFO  pool-1-thread-1 start-1
2020.09.28 22:42:14 INFO  pool-1-thread-1 end-1
2020.09.28 22:42:14 INFO  pool-1-thread-2 end-2

Fiecare fir poate rula câte o sarcină la un moment dat. În sfârșit, am realizat ceea ce ne-am dorit, executând simultan 2 sarcini și întregul program sa încheiat în 10 secunde.

1612172949 75 Cum se foloseste Threadsleep fara blocare pe JVM

A fost ușor, deoarece am folosit 2 fire (pool-1-thread-1 și pool-1-thread-2), dar vrem să facem același lucru cu doar 1 thread.

Să identificăm problema și apoi să găsim o soluție.

Problema : Thread.sleep is blocking

Acum înțelegem că nu putem folosi Thread.sleep – blochează firul.

Acest lucru îl face inutilizabil până când se reia, împiedicându-ne să executăm simultan 2 sarcini.

Din fericire, există soluții, pe care le vom discuta în continuare.

Prima soluție: actualizați JVM cu Project Loom

Am menționat mai devreme că firele JVM mapează unul la unu firele de operare. Iar această greșeală fatală de proiectare ne conduce aici.

Project Loom își propune să corecteze acest lucru prin adăugarea de fire virtuale.

Iată codul nostru rescris folosind fire virtuale de la Loom:

Thread.startVirtualThread(() -> {
  System.out.println("a")  
  Thread.sleep(1000)  
  System.out.println("b")
});

Lucrul uimitor este că Thread.sleep nu se va mai bloca! Este pe deplin asincron. Și, pe deasupra, firele virtuale sunt foarte ieftine. Ați putea crea sute de mii dintre ele fără cheltuieli generale sau limitări.

Toate problemele noastre sunt rezolvate acum – în afară de faptul că Project Loom nu va fi disponibil până cel puțin JDK 17 (începând cu acum programat pentru septembrie 2021).

Ei bine, să ne întoarcem și să încercăm să rezolvăm problema somnului cu ceea ce ne oferă în prezent JVM.

Insight key: Puteți exprima somnul în ceea ce privește programarea unei sarcini în viitor

Dacă îi spui șefului tău că ești ocupat și îți vei relua munca în 10 minute, șeful tău nu știe că ești pe cale să faci un pui de somn. Văd doar că ți-ai început munca dimineața, apoi ai făcut o pauză timp de 10 minute, apoi ai reluat.

Acest:

start
sleep(10)
end

este echivalent din exterior cu acesta:

start
resumeIn(10s, end)

Ceea ce am făcut mai sus este să PROGRAMA sarcina se va încheia în 10 secunde.

Gata, nu mai avem nevoie să dormim. În schimb, trebuie doar să putem programa lucrurile în viitor.

Am redus o problemă cu alta, una care este mai ușoară și are o soluție mai simplă.

Problema de programare

Din fericire pentru noi, programarea sarcinilor este foarte simplă. Trebuie doar să schimbăm executorul după cum urmează:

val oneThreadScheduleExecutor = Executors.newScheduledThreadPool(1)

Acum putem folosi schedule funcție în loc de execute:

oneThreadScheduleExecutor.schedule
(task(1),10, TimeUnit.SECONDS)

Ei bine, asta nu este exact ceea ce ne dorim. Vrem să împărțim imprimarea de început și de sfârșit cu 10 secunde, așa că să schimbăm funcția noastră de sarcină după cum urmează:

def nonBlockingTask(id: Int): Runnable = () => {
  println(s"${Thread.currentThread().getName()} start-$id")
  val endTask: Runnable = () => 
  {
    println(s"${Thread.currentThread().getName()} end-$id")
  }
  //instead of Thread.sleep for 10s, we schedule it in the future, no     more blocking!
  oneThreadScheduleExecutor.schedule(endTask, 10, TimeUnit.SECONDS)
  }
2020.09.28 23:35:45 INFO  pool-1-thread-1 start-1
2020.09.28 23:35:45 INFO  pool-1-thread-1 start-2
2020.09.28 23:35:56 INFO  pool-1-thread-1 end-1
2020.09.28 23:35:56 INFO  pool-1-thread-1 end-2

Da! Am reusit! Doar un fir și 2 sarcini simultane care „dorm” 10 secunde fiecare.

Bine, dar nu poți scrie cu adevărat codul de genul acesta. Ce se întâmplă dacă doriți o altă sarcină la mijloc, după cum urmează:

00:00:00 start
00:00:10 middle
00:00:20 end

Ar trebui să schimbați implementarea programului nonBlockingTask și adăugați un alt apel la schedule acolo. Și asta va deveni destul de dezordonat foarte repede.

Cum se utilizează programarea funcțională pentru a scrie un DSL cu un somn fără blocare

Programarea funcțională în Scala este o bucurie, iar scrierea unui DSL (limbaj specific domeniului) folosind principiile FP este destul de ușoară.

Să începem la sfârșit. Am dori ca programul nostru final să arate cam așa:

def nonBlockingFunctionalTask(id: Int) = {
  Print(id,"start") andThen 
  Print(id,"middle").sleep(1000) andThen
  Print(id,"end").sleep(1000)
}

Acest mini-limbaj va atinge exact același comportament ca soluția noastră anterioară, dar fără a expune toate internele urâte ale executorului și thread-urilor programate.

Modelul

Să ne definim tipurile de date:

object Task {
sealed trait Task { self =>
  def andThen(other: Task) = AndThen(self,other)
  def sleep(millis: Long) = Sleep(self,millis)
}
  
case class AndThen(t1: Task, t2: Task) extends Task
case class Print(id: Int, value: String) extends Task 
case class Sleep(t1: Task, millis: Long) extends Task

În FP, tipurile de date conțin doar date și nu au comportament. Deci, acest întreg cod nu face nimic, ci doar surprinde structura limbajului și informațiile pe care le dorim.

Avem nevoie de 2 funcții:

  • sleep a face o sarcină să doarmă
  • andThen să lanțeze sarcini

Observați că implementarea lor nu face nimic. Îl înfășoară doar în clasa corectă și atât.

Să ne folosim de noi nonBlockingFunctionalTask funcţie:

import Task._
//create 2 tasks, this does not run them, no threads involved here
(1 to 2).toList.map(nonBlockingFunctionalTask)

Este o descriere a problemei. Nu face nimic, doar construiește o listă cu 2 sarcini, fiecare descriind ce trebuie făcut.

Dacă imprimăm rezultatul în REPL, obținem acest lucru:

res3: List[Task] = List(
//first task  
AndThen(AndThen(Print(1,start),Sleep(Print(1,middle),10000)),Sleep(Print(1,end),10000)), 
//second task  
AndThen(AndThen(Print(2,start),Sleep(Print(2,middle),10000)),Sleep(Print(2,end),10000))
)

Să scriem interpreter care va transforma acest copac într-unul care execută de fapt sarcinile.

Interpretul

În FP funcția care transformă o descriere într-un program executabil se numește interpreter. Acesta ia descrierea programului, modelul și îl interpretează într-o formă executabilă. Aici va executa și programa sarcinile direct.

Mai întâi avem nevoie de un Stack care ne va permite să codificăm dependențele dintre sarcini. Cred că start >>= middle >>= end fiecare va fi împins în stivă și apoi va apărea în ordinea executării. Acest lucru va fi evident în implementare.

Și acum interpretul (nu vă faceți griji dacă nu înțelegeți acest cod, este puțin complicat, vine o soluție mai simplă):

def interpret(task: Task, executor: ScheduledExecutorService): Unit = {
  def loop(current: Task, stack: Stack[Task]): Unit =
  current match {
    case AndThen(t1, t2) =>
      loop(t1,stack.push(t2))
    case Print(id, value) =>  
      stack.pop match {
        case Some((t2, b)) => 
          executor.execute(() => {
          println(s"${Thread.currentThread().getName()} $value-$id")
          })   
        loop(t2,b)
        case None => 
          executor.execute(() => {
          println(s"${Thread.currentThread().getName()} $value-$id")
          })
    case Sleep(t1,millis) => 
      val r: Runnable = () =>{loop(t1,stack)}
      executor.schedule(r, millis, TimeUnit.MILLISECONDS)
}
loop(task,Nil)
}

Iar rezultatul este ceea ce ne dorim:

2020.09.29 00:06:39 INFO  pool-1-thread-1 start-1
2020.09.29 00:06:39 INFO  pool-1-thread-1 start-2
2020.09.29 00:06:50 INFO  pool-1-thread-1 middle-1
2020.09.29 00:06:50 INFO  pool-1-thread-1 middle-2
2020.09.29 00:07:00 INFO  pool-1-thread-1 end-1
2020.09.29 00:07:00 INFO  pool-1-thread-1 end-2

Un fir care rulează 2 sarcini de somn simultane. Este mult cod și multă muncă. Ca de obicei, ar trebui să vă întrebați întotdeauna dacă există o bibliotecă care rezolvă deja această problemă. Se pare că există: ZIO.

Somn fără blocare în ZIO

ZIO este o bibliotecă funcțională pentru programare asincronă și concurentă. Funcționează într-un mod similar cu micul nostru DSL, deoarece vă oferă câteva tipuri pe care le puteți combina și potrivi pentru a descrie programul dvs. și nimic mai mult.

Și apoi ne oferă un interpret care vă permite să rulați un program ZIO.

După cum am spus, acest model de interpret este omniprezent în lumea FP. Odată ce o primești, ți se deschide o nouă lume.

ZIO.sleep – o versiune mai bună a Thread.sleep

ZIO ne dă ZIO.sleep funcție, o versiune non-blocantă a Thread.sleep. Iată funcția noastră scrisă folosind ZIO:

import zio._
import zio.console._
import zio.duration._
object ZIOApp extends zio.App {
def zioTask(id: Int) = 
  for {
  _ <- putStrLn(s"${Thread.currentThread().getName()} start-$id")
  _ <- ZIO.sleep(10.seconds)
  _ <- putStrLn(s"${Thread.currentThread().getName()} end-$id")
} yield ()

Seamănă izbitor cu primul fragment:

def task(id: Int): Runnable = () => 
{
  println(s"${Thread.currentThread().getName()} start-$id")
  Thread.sleep(10000)
  println(s"${Thread.currentThread().getName()} end-$id")
}

Diferența clară este for sintaxă care ne permite să lanțăm declarații cu ZIO tip. Este foarte asemănător cu andThen funcție din mini-limbajul anterior.

La fel ca înainte cu mini-limbajul nostru, acest program este doar o descriere. Sunt date pure și nu face nimic. Pentru a face ceva avem nevoie de interpret.

Interpretul ZIO

Pentru a interpreta un program ZIO, trebuie doar să extindeți ZIO.App interfață și puneți-l în run metoda și ZIO va avea grijă să-l ruleze, astfel:

object ZIOApp extends zio.App 
{ 
override def run(args: List[String]) = {
  ZIO
  //start 2 ZIO tasks in parallel
  .foreachPar((1 to 2))(zioTasks)
  //complete program when done
  .as(ExitCode.success) 
}

Și obținem această ieșire – sarcinile se finalizează corect în 10 secunde:

2020.09.29 00:45:12 INFO  zio-default-async-3-1594199808 start-2
2020.09.29 00:45:12 INFO  zio-default-async-2-1594199808 start-1
2020.09.29 00:45:33 INFO  zio-default-async-7-1594199808 end-1
2020.09.29 00:45:33 INFO  zio-default-async-8-1594199808 end-2

Concluzii

  • Fiecare fir JVM se mapează la un fir de operare, într-un moda unu la unu. Și aceasta este rădăcina multor probleme.
  • Thread.sleep este rău! Aceasta blochează firul curent și îl face inutilizabil pentru lucrări ulterioare.
  • Project Loom (care va fi disponibil în JDK 17) va rezolva o mulțime de probleme. Iată o discuție interesantă despre asta.
  • Poți să folosești ScheduledExecutorService pentru a realiza somn neblocant.
  • Poți să folosești Programare funcțională pentru modelarea unui limbaj unde a dormi nu este blocant.
  • Biblioteca ZIO oferă un somn neblocant în afara cutiei.