Sprievodca po CountDownLatch v Jave

1. Úvod

V tomto článku uvedieme sprievodcu CountDownLatch triedy a na niekoľkých praktických príkladoch ukážte, ako sa dá využiť.

V zásade pomocou a CountDownLatch môžeme spôsobiť blokovanie vlákna, kým dané vlákna nedokončia danú úlohu.

2. Použitie pri súbežnom programovaní

Jednoducho povedané, a CountDownLatchpult pole, ktoré môžete podľa potreby znižovať. Potom ho môžeme použiť na blokovanie volajúceho vlákna, kým sa neodpočíta na nulu.

Keby sme robili nejaké paralelné spracovanie, mohli by sme vytvoriť inštanciu CountDownLatch s rovnakou hodnotou pre počítadlo ako počet vlákien, s ktorými chceme pracovať. Potom sme mohli zavolať odpočítavanie () po dokončení každého vlákna, čím sa zaručí, že bude závislé vlákno volať čakať () bude blokované, kým nebudú pracovné vlákna dokončené.

3. Čakanie na dokončenie skupiny vlákien

Vyskúšajme tento vzor vytvorením a Pracovník a pomocou a CountDownLatch po dokončení signalizuje pole:

verejná trieda Worker implementuje Runnable {private List outputScraper; private CountDownLatch countDownLatch; public Worker (List outputScraper, CountDownLatch countDownLatch) {this.outputScraper = outputScraper; this.countDownLatch = countDownLatch; } @Override public void run () {doSomeWork (); outputScraper.add ("Odpočítavané"); countDownLatch.countDown (); }}

Potom vytvorme test, aby sme dokázali, že môžeme získať a CountDownLatch čakať na Pracovník prípady na dokončenie:

@Test public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion () hodí InterruptedException {List outputScraper = Collections.synchronizedList (new ArrayList ()); CountDownLatch countDownLatch = nový CountDownLatch (5); Zoznam pracovníkov = Stream .generate (() -> nové vlákno (nový pracovník (outputScraper, countDownLatch))) .limit (5) .collect (toList ()); workers.forEach (Thread :: start); countDownLatch.await (); outputScraper.add ("západka uvoľnená"); assertThat (outputScraper) .containsExactly ("Odpočítané", "Odpočítané", "Odpočítané", "Odpočítané", "Odpočítané", "Západka uvoľnená"); }

Prirodzene „uvoľnená západka“ bude vždy posledným výstupom - pretože závisí od CountDownLatch uvoľnenie.

Všimnite si, že ak sme nezavolali čakať (), neboli by sme schopní zaručiť poradie vykonania vlákien, takže test by náhodne zlyhal.

4. Skupina vlákien čakajúca na začiatok

Ak by sme si vzali predchádzajúci príklad, ale tentoraz sme začali tisíce vlákien namiesto piatich, je pravdepodobné, že mnoho z tých starších dokončí spracovanie skôr, ako sme vôbec zavolali štart () na tých neskorších. To by mohlo sťažiť pokus o reprodukciu problému súbežnosti, pretože by sme nedokázali zabezpečiť, aby všetky naše vlákna bežali paralelne.

Aby sme to obišli, poďme CountdownLatch pracovať inak ako v predchádzajúcom príklade. Namiesto blokovania nadradeného vlákna, kým nebudú dokončené niektoré podradené vlákna, môžeme blokovať každé podradené vlákno, kým sa nezačnú všetky ostatné.

Upravme naše run () metóda tak blokuje pred spracovaním:

verejná trieda WaitingWorker implementuje Runnable {private List outputScraper; private CountDownLatch readyThreadCounter; súkromné ​​CountDownLatch volanieThreadBlocker; súkromné ​​CountDownLatch vyplnenéThreadCounter; public WaitingWorker (zoznam outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch vyplnenýThreadCounter) {this.outputScraper = outputScraper; this.readyThreadCounter = readyThreadCounter; this.callingThreadBlocker = callingThreadBlocker; this.completedThreadCounter = CompleteThreadCounter; } @Override public void run () {readyThreadCounter.countDown (); skúste {callingThreadBlocker.await (); doSomeWork (); outputScraper.add ("Odpočítavané"); } catch (InterruptedException e) {e.printStackTrace (); } konečne {completedThreadCounter.countDown (); }}}

Teraz upravme náš test tak, aby blokoval všetky Robotníci začali, odblokujú Robotníci, a potom blokuje až do Robotníci skončili:

@Test public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime () hodí InterruptedException {List outputScraper = Collections.synchronizedList (new ArrayList ()); CountDownLatch readyThreadCounter = nový CountDownLatch (5); CountDownLatch callingThreadBlocker = nový CountDownLatch (1); CountDownLatch completedThreadCounter = nový CountDownLatch (5); Zoznam pracovníkov = Stream .generate (() -> nové vlákno (nové WaitingWorker (outputScraper, readyThreadCounter, callingThreadBlocker, CompleteThreadCounter))) .limit (5) .collect (toList ()); workers.forEach (Thread :: start); readyThreadCounter.await (); outputScraper.add ("Pracovníci pripravení"); callingThreadBlocker.countDown (); CompleteThreadCounter.await (); outputScraper.add ("Pracovníci sú dokončení"); assertThat (outputScraper) .containsExactly ("Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete"); }

Tento vzor je skutočne užitočný pri pokuse o reprodukciu chýb súbežnosti, pretože ho možno použiť na vynútenie tisícov vlákien, aby sa pokúsili paralelne vykonať určitú logiku.

5. Ukončenie a CountdownLatch Skoro

Niekedy sa môžeme dostať do situácie, keď Robotníci ukončiť omylom pred odpočítaním CountDownLatch. To by mohlo viesť k tomu, že nikdy nedosiahne nulu a čakať () nikdy nekončiaci:

@Override public void run () {if (true) {throw new RuntimeException ("Oh drahá, som BrokenWorker"); } countDownLatch.countDown (); outputScraper.add ("Odpočítavané"); }

Upravme náš skorší test tak, aby používal a BrokenWorker, aby sa ukázalo ako čakať () bude navždy blokovaný:

@ Test public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck () vyvolá InterruptedException {List outputScraper = Collections.synchronizedList (new ArrayList ()); CountDownLatch countDownLatch = nový CountDownLatch (5); Zoznam pracovníkov = Stream .generate (() -> nové vlákno (nové BrokenWorker (outputScraper, countDownLatch))) .limit (5) .collect (toList ()); workers.forEach (Thread :: start); countDownLatch.await (); }

Je zrejmé, že nejde o správanie, ktoré chceme - pre aplikáciu by bolo oveľa lepšie pokračovať, než by ju mohlo nekonečne blokovať.

Aby sme to obišli, pridajme k výzve na adresu argument časový limit čakať ().

boolean dokončené = countDownLatch.await (3L, TimeUnit.SECONDS); assertThat (dokončené) .isFalse ();

Ako vidíme, test nakoniec vyprší a čakať () vráti sa nepravdivé.

6. Záver

V tomto rýchlom sprievodcovi sme si ukázali, ako môžeme používať a CountDownLatch za účelom blokovania vlákna, kým iné vlákna nedokončia nejaké spracovanie.

Ukázali sme tiež, ako sa dá použiť na ladenie problémov so súbežnosťou tým, že sa zabezpečí, že vlákna budú bežať paralelne.

Implementáciu týchto príkladov možno nájsť na GitHub; toto je projekt založený na Maven, takže by mal byť ľahko spustiteľný tak, ako je.


$config[zx-auto] not found$config[zx-overlay] not found