Sprievodca po Java SynchronousQueue

1. Prehľad

V tomto článku sa pozrieme na SynchronousQueue z java.util.concurrent balíček.

Zjednodušene povedané, táto implementácia nám umožňuje výmenu informácií medzi vláknami spôsobom bezpečným pre vlákna.

2. Prehľad API

The SynchronousQueue iba má dve podporované operácie: vziať () a put (), a obaja blokujú.

Napríklad, ak chceme do fronty pridať prvok, musíme zavolať znak put () metóda. Táto metóda bude blokovaná, kým nejaké iné vlákno nezavolá vziať () metóda, ktorá signalizuje, že je pripravený na prijatie prvku.

Napriek tomu SynchronousQueue má rozhranie frontu, mali by sme o ňom uvažovať ako o výmennom bode pre jeden prvok medzi dvoma vláknami, v ktorom jedno vlákno odovzdáva prvok a iné vlákno tento prvok berie.

3. Implementácia odovzdaní pomocou zdieľanej premennej

Aby ste zistili, prečo SynchronousQueue môže byť také užitočné, implementujeme logiku pomocou zdieľanej premennej medzi dvoma vláknami a potom túto logiku prepíšeme pomocou SynchronousQueue vďaka čomu je náš kód oveľa jednoduchší a čitateľnejší.

Povedzme, že máme dve vlákna - producent a spotrebiteľ - a keď producent nastavuje hodnotu zdieľanej premennej, chceme túto skutočnosť signalizovať spotrebiteľskému vláknu. Ďalej bude spotrebiteľské vlákno načítať hodnotu zo zdieľanej premennej.

Použijeme CountDownLatch na koordináciu týchto dvoch vlákien, aby sa zabránilo situácii, keď spotrebiteľ získa hodnotu zdieľanej premennej, ktorá ešte nebola nastavená.

Definujeme a sharedState premenná a a CountDownLatch ktoré sa použijú na koordináciu spracovania:

ExecutorService vykonávateľ = Executors.newFixedThreadPool (2); AtomicInteger sharedState = nový AtomicInteger (); CountDownLatch countDownLatch = nový CountDownLatch (1);

Výrobca uloží do súboru náhodné celé číslo sharedState premenná a vykonajte countDown () metóda na countDownLatch, signalizácia spotrebiteľovi, že môže načítať hodnotu z sharedState:

Spustiteľný producent = () -> {Celé číslo vyrobenéElement = ThreadLocalRandom .current () .nextInt (); sharedState.set (producedElement); countDownLatch.countDown (); };

Spotrebiteľ počká na countDownLatch pomocou čakať () metóda. Keď výrobca signalizuje, že premenná bola nastavená, spotrebiteľ ju získa z sharedState:

Spustiteľný spotrebiteľ = () -> {try {countDownLatch.await (); Celé číslo consumedElement = sharedState.get (); } catch (InterruptedException ex) {ex.printStackTrace (); }};

V neposlednom rade začnime náš program:

Exekútor.výkon (výrobca); Exekútor.výkon (spotrebiteľ); executor.awaitTermination (500, TimeUnit.MILLISECONDS); executor.shutdown (); assertEquals (countDownLatch.getCount (), 0);

Vyprodukuje nasledujúci výstup:

Uloženie prvku: -1507375353 do bodu výmeny spotrebovalo prvok: -1507375353 z bodu výmeny

Vidíme, že ide o veľa kódu na implementáciu takej jednoduchej funkcie, ako je výmena prvku medzi dvoma vláknami. V nasledujúcej časti sa pokúsime urobiť to lepšie.

4. Implementácia odovzdaní pomocou SynchronousQueue

Poďme teraz implementovať rovnakú funkcionalitu ako v predchádzajúcej časti, ale s a SynchronousQueue. Má to dvojitý efekt, pretože ho môžeme použiť na výmenu stavu medzi vláknami a na koordináciu tejto akcie, aby sme nemuseli používať nič iné ako SynchronousQueue.

Najskôr definujeme poradie:

ExecutorService vykonávateľ = Executors.newFixedThreadPool (2); SynchronousQueue queue = nový SynchronousQueue ();

Výrobca zavolá a put () metóda, ktorá bude blokovať, kým iné vlákno nezoberie prvok z frontu:

Spustiteľný producent = () -> {Celé číslo vyrobenéElement = ThreadLocalRandom .current () .nextInt (); try {queue.put (producedElement); } catch (InterruptedException ex) {ex.printStackTrace (); }};

Spotrebiteľ tento prvok jednoducho vyhľadá pomocou vziať () metóda:

Spustiteľný spotrebiteľ = () -> {try {Integer consumedElement = queue.take (); } catch (InterruptedException ex) {ex.printStackTrace (); }};

Ďalej začneme náš program:

Exekútor.výkon (výrobca); Exekútor.výkon (spotrebiteľ); executor.awaitTermination (500, TimeUnit.MILLISECONDS); executor.shutdown (); assertEquals (queue.size (), 0);

Vyprodukuje nasledujúci výstup:

Uloženie prvku: 339626897 do bodu výmeny spotrebovalo prvok: 339626897 z bodu výmeny

Môžeme vidieť, že a SynchronousQueue sa používa ako bod výmeny medzi vláknami, čo je oveľa lepšie a zrozumiteľnejšie ako v predchádzajúcom príklade, ktorý používal zdieľaný stav spolu s CountDownLatch.

5. Záver

V tomto rýchlom výučbe sme sa pozreli na SynchronousQueue konštrukt. Vytvorili sme program, ktorý si vymieňa údaje medzi dvoma vláknami pomocou zdieľaného stavu, a potom sme tento program prepísali, aby sme využili tento SynchronousQueue konštrukt. Toto slúži ako výmenný bod, ktorý koordinuje vlákno výrobcu a spotrebiteľa.

Implementáciu všetkých týchto príkladov a útržkov kódu nájdete v projekte GitHub - jedná sa o projekt Maven, takže by malo byť ľahké ho importovať a spustiť tak, ako je.


$config[zx-auto] not found$config[zx-overlay] not found