Prehľad súboru java.util.concurrent

1. Prehľad

The java.util.concurrent balík poskytuje nástroje na vytváranie súbežných aplikácií.

V tomto článku urobíme prehľad celého balíka.

2. Hlavné komponenty

The java.util.concurrent obsahuje príliš veľa funkcií na to, aby sme ich mohli prediskutovať v jednom dokumente. V tomto článku sa zameriame hlavne na niektoré z najužitočnejších nástrojov z tohto balíka, ako napríklad:

  • Exekútor
  • ExecutorService
  • ScheduledExecutorService
  • Budúcnosť
  • CountDownLatch
  • CyclicBarrier
  • Semafor
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Zámky
  • Phaser

Tiež tu nájdete veľa venovaných článkov pre jednotlivé triedy.

2.1. Exekútor

Exekútor je rozhranie, ktoré predstavuje objekt, ktorý vykonáva zadané úlohy.

Závisí od konkrétnej implementácie (odkiaľ je spustené vyvolanie), či sa má úloha spustiť na novom alebo aktuálnom vlákne. Preto pomocou tohto rozhrania môžeme oddeliť tok vykonávania úloh od skutočného mechanizmu vykonávania úloh.

Tu je potrebné poznamenať, že Exekútor nevyžaduje striktne asynchrónne vykonávanie úlohy. V najjednoduchšom prípade môže exekútor predloženú úlohu vyvolať okamžite vo vyvolávacom vlákne.

Potrebujeme vytvoriť vyvolávač na vytvorenie inštancie vykonávateľa:

public class Invoker implementuje Exekútor {@Override public void execute (Runnable r) {r.run (); }}

Teraz môžeme použiť tento vyvolávač na vykonanie úlohy.

public void execute () {Exekútor exekútor = nový Vyvolávač (); executor.execute (() -> {// úloha, ktorá sa má vykonať}); }

Tu je potrebné poznamenať, že ak exekútor nemôže prijať úlohu na vykonanie, vrhne sa RejectedExecutionException.

2.2. ExecutorService

ExecutorService je kompletné riešenie pre asynchrónne spracovanie. Spravuje front v pamäti a plánuje odoslané úlohy na základe dostupnosti vlákna.

Použit Exekútorská služba, musíme jeden vytvoriť Spustiteľné trieda.

verejná trieda Implementácia úlohy spustiteľná {@Override public void run () {// podrobnosti úlohy}}

Teraz môžeme vytvoriť ExecutorService inštancie a zadať túto úlohu. V čase vytvorenia musíme určiť veľkosť fondu vlákien.

ExecutorService vykonávateľ = Executors.newFixedThreadPool (10);

Ak chceme vytvoriť jednovláknové ExecutorService napríklad môžeme použiť newSingleThreadExecutor (ThreadFactory threadFactory) vytvoriť inštanciu.

Po vytvorení exekútora ho môžeme použiť na zadanie úlohy.

public void execute () {executor.submit (new Task ()); }

Môžeme tiež vytvoriť Spustiteľné napríklad pri zadávaní úlohy.

Exekútor.submit (() -> {nová úloha ();});

Dodáva sa tiež s dvoma hotovými metódami ukončenia vykonania. Prvý z nich je vypnúť(); počká, kým sa dokončí vykonávanie všetkých zadaných úloh. Druhá metóda je shutdownNow () ktoréh okamžite ukončí všetky čakajúce / vykonávajúce úlohy.

Existuje aj iná metóda awaitTermination (dlhý časový limit, jednotka TimeUnit) ktoré násilne blokuje, kým všetky úlohy nedokončia vykonávanie po spustení udalosti vypnutia alebo po uplynutí časového limitu vykonania, alebo kým nie je prerušené samotné vlákno vykonania,

try {executor.awaitTermination (20l, TimeUnit.NANOSECONDS); } catch (InterruptedException e) {e.printStackTrace (); }

2.3. ScheduledExecutorService

ScheduledExecutorService je podobné rozhranie ako Exekútorská služba, ale môže vykonávať úlohy pravidelne.

Exekútor a ExecutorServiceMetódy sú naplánované na mieste bez zavedenia umelého zdržania. Nula alebo akákoľvek záporná hodnota znamená, že požiadavka musí byť vykonaná okamžite.

Môžeme použiť oboje Spustiteľné a Vyvolávateľná rozhranie na definovanie úlohy.

public void execute () {ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor (); Future future = executorService.schedule (() -> {// ... return "Hello world";}, 1, TimeUnit.SECONDS); ScheduledFuture scheduledFuture = executorService.schedule (() -> {// ...}, 1, TimeUnit.SECONDS); executorService.shutdown (); }

ScheduledExecutorService môže tiež naplánovať úlohu po určitom stanovenom oneskorení:

executorService.scheduleAtFixedRate (() -> {// ...}, 1, 10, TimeUnit.SECONDS); executorService.scheduleWithFixedDelay (() -> {// ...}, 1, 10, TimeUnit.SECONDS);

Tu je scheduleAtFixedRate (príkaz Runnable, long initialDelay, long period, TimeUnit unit) metóda vytvára a vykonáva periodickú akciu, ktorá je vyvolaná najskôr po zadanom počiatočnom oneskorení a následne s daným obdobím až do vypnutia inštancie služby.

The scheduleWithFixedDelay (príkaz Runnable, dlhé initialDelay, dlhé oneskorenie, jednotka TimeUnit) metóda vytvára a vykonáva periodickú akciu, ktorá je vyvolaná najskôr po zadanom počiatočnom oneskorení a opakovane s daným oneskorením medzi ukončením vykonávajúceho a vyvolaním ďalšej.

2.4. Budúcnosť

Budúcnosť sa používa na vyjadrenie výsledku asynchrónnej operácie. Dodáva sa s metódami na kontrolu, či je alebo nie je dokončená asynchrónna operácia, na získanie vypočítaného výsledku atď.

A čo viac, zrušiť (boolean mayInterruptIfRunning) API zruší operáciu a uvoľní vykonávajúce vlákno. Ak je hodnota mayInterruptIfRunning je pravda, vlákno vykonávajúce úlohu bude okamžite ukončené.

V opačnom prípade bude možné dokončiť prebiehajúce úlohy.

Nižšie uvedený úryvok kódu môžeme použiť na vytvorenie budúcej inštancie:

public void invoke () {ExecutorService executorService = Executors.newFixedThreadPool (10); Budúca budúcnosť = executorService.submit (() -> {// ... Thread.sleep (10 000l); návrat "Hello world";}); }

Pomocou nasledujúceho úryvku kódu môžeme skontrolovať, či je budúci výsledok pripravený, a načítať údaje, ak je výpočet vykonaný:

if (future.isDone () &&! future.isCancelled ()) {try {str = future.get (); } catch (InterruptedException | ExecutionException e) {e.printStackTrace (); }}

Môžeme tiež určiť časový limit pre danú operáciu. Ak úloha trvá dlhšie ako tento čas, a Výnimka časového limitu je hodená:

try {future.get (10, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) {e.printStackTrace (); }

2.5. CountDownLatch

CountDownLatch (zavedené v 5. JDK) je trieda nástrojov, ktorá blokuje skupinu vlákien, kým sa nedokončí nejaká operácia.

A CountDownLatch sa inicializuje pomocou pult (Celé číslo typ); toto počítadlo klesá, keď závislé vlákna dokončia vykonávanie. Ale akonáhle počítadlo dosiahne nulu, uvoľnia sa ďalšie vlákna.

Môžete sa dozvedieť viac o CountDownLatch tu.

2.6. CyclicBarrier

CyclicBarrier funguje takmer rovnako ako CountDownLatch až na to, že to môžeme znovu použiť. Na rozdiel od CountDownLatch, umožňuje viacerým vláknam čakať na seba navzájom pomocou čakať () bariérovou metódou) pred vyvolaním záverečnej úlohy.

Musíme vytvoriť Spustiteľné inštancia úlohy na spustenie bariérovej podmienky:

verejná trieda Úloha implementuje Runnable {private CyclicBarrier bariéra; public Task (bariéra CyclicBarrier) {this.barrier = bariéra; } @Override public void run () {try {LOG.info (Thread.currentThread (). GetName () + "čaká"); bariéra.await (); LOG.info (Thread.currentThread (). GetName () + "je vydané"); } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace (); }}}

Teraz môžeme vyvolať niektoré vlákna, aby sme dosiahli stav bariéry:

public void start () {CyclicBarrier cyklickýBariér = nový CyclicBarrier (3, () -> {// ... LOG.info ("Všetky predchádzajúce úlohy sú splnené");}); Vlákno t1 = nové vlákno (nová úloha (cyclicBarrier), "T1"); Vlákno t2 = nové vlákno (nová úloha (cyclicBarrier), "T2"); Vlákno t3 = nové vlákno (nová úloha (cyclicBarrier), "T3"); if (! cyclicBarrier.isBroken ()) {t1.start (); t2.start (); t3.start (); }}

Tu je je zlomené() metóda kontroluje, či sa nejaké vlákno počas vykonávania prerušilo. Túto kontrolu by sme mali vykonať vždy pred vykonaním skutočného procesu.

2.7. Semafor

The Semafor sa používa na blokovanie prístupu na úrovni vlákna k niektorej časti fyzického alebo logického prostriedku. Semafor obsahuje súbor povolení; vždy, keď sa vlákno pokúša vstúpiť do kritickej sekcie, musí skontrolovať semafor, či je alebo nie je povolenie k dispozícii.

Ak povolenie nie je k dispozícii (prostredníctvom tryAcquire ()), vlákno nesmie skákať do kritickej časti; ak je však povolenie k dispozícii, prístup sa udelí a počítadlo povolení sa zníži.

Akonáhle vykonávajúce vlákno uvoľní kritickú časť, opäť sa zvýši počítadlo povolení (urobené uvoľniť () metóda).

Časový limit pre získanie prístupu môžeme určiť pomocou tryAcquire (dlhý časový limit, jednotka TimeUnit) metóda.

Môžeme tiež skontrolovať počet dostupných povolení alebo počet vlákien čakajúcich na získanie semaforu.

Na implementáciu semaforu je možné použiť nasledujúci úryvok kódu:

statický semafor semafor = nový semafor (10); public void execute () hodí InterruptedException {LOG.info ("Dostupné povolenie:" + semaphore.availablePermits ()); LOG.info ("Počet vlákien čakajúcich na získanie:" + semaphore.getQueueLength ()); if (semaphore.tryAcquire ()) {vyskúšať {// ...} konečne {semaphore.release (); }}}

Môžeme implementovať a Mutex ako dátová štruktúra pomocou Semafor. Viac podrobností nájdete tu.

2.8. ThreadFactory

Ako už názov napovedá, ThreadFactory funguje ako vlákno (neexistujúce) združenie, ktoré na požiadanie vytvára nové vlákno. Eliminuje potrebu veľkého množstva štandardných kódovaní pre implementáciu efektívnych mechanizmov vytvárania vlákien.

Môžeme definovať a ThreadFactory:

verejná trieda BaeldungThreadFactory implementuje ThreadFactory {private int threadId; súkromné ​​meno reťazca; public BaeldungThreadFactory (názov reťazca) {threadId = 1; this.name = meno; } @Override public Thread newThread (Runnable r) {Thread t = new Thread (r, name + "-Thread_" + threadId); LOG.info ("vytvorené nové vlákno s id:" + threadId + "a menom:" + t.getName ()); threadId ++; návrat t; }}

Môžeme to využiť newThread (spustiteľný r) metóda na vytvorenie nového vlákna za behu:

Továreň BaeldungThreadFactory = nová BaeldungThreadFactory ("BaeldungThreadFactory"); for (int i = 0; i <10; i ++) {Thread t = factory.newThread (new Task ()); t.start (); }

2.9. BlockingQueue

V asynchrónnom programovaní je jedným z najbežnejších integračných vzorov model výrobca-spotrebiteľ. The java.util.concurrent balík je dodávaný s dátovou štruktúrou známou ako BlockingQueue - čo môže byť v týchto asynchrónnych scenároch veľmi užitočné.

Viac informácií a funkčný príklad sú k dispozícii tu.

2.10. DelayQueue

DelayQueue je front blokujúcich nekonečných prvkov, kde je možné prvok vytiahnuť, iba ak je dokončený čas vypršania platnosti (známy ako oneskorenie definované používateľom). Preto je najvrchnejší prvok (hlava) bude mať najväčšie oneskorenie čiastky a bude vyzvaný ako posledný.

Viac informácií a funkčný príklad sú k dispozícii tu.

2.11. Zámky

Niet divu, Zamknúť je nástroj na blokovanie prístupu iných vlákien k určitému segmentu kódu, okrem vlákna, ktoré ich práve vykonáva.

Hlavný rozdiel medzi zámkom a synchronizovaným blokom je v tom, že synchronizovaný blok je plne obsiahnutý v metóde; operáciu uzamknutia () a odomknutie () rozhrania Lock API však môžeme mať samostatnými metódami.

Viac informácií a funkčný príklad sú k dispozícii tu.

2.12. Phaser

Phaser je flexibilnejšie riešenie ako CyclicBarrier a CountDownLatch - slúži ako opakovane použiteľná bariéra, na ktorú musí dynamický počet vlákien čakať pred ďalším spustením. Môžeme koordinovať viac fáz vykonania, opätovné použitie a Phaser pre každú fázu programu.

Viac informácií a funkčný príklad sú k dispozícii tu.

3. Záver

V tomto prehľadnom článku na vysokej úrovni sme sa zamerali na rôzne dostupné nástroje java.util.concurrent balíček.

Celý zdrojový kód je ako vždy k dispozícii na serveri GitHub.


$config[zx-auto] not found$config[zx-overlay] not found