Sprievodca po java.util.concurrent.BlockingQueue

1. Prehľad

V tomto článku sa pozrieme na jeden z najužitočnejších konštruktov java.util.concurrent vyriešiť súčasný problém výrobca - spotrebiteľ. Pozrime sa na API BlockingQueue rozhranie a ako metódy z tohto rozhrania uľahčujú písanie súbežných programov.

Ďalej v článku si ukážeme príklad jednoduchého programu, ktorý má viac výrobných vlákien a viac spotrebiteľských vlákien.

2. BlockingQueue Typy

Môžeme rozlišovať dva typy BlockingQueue:

  • neobmedzený rad - môže rásť takmer donekonečna
  • ohraničený rad - s definovanou maximálnou kapacitou

2.1. Bez obmedzenia

Vytváranie neviazaných front je jednoduché:

BlockingQueue blockingQueue = new LinkedBlockingDeque ();

Kapacita blockingQueue bude nastavený na Celé číslo.MAX_VALUE. Všetky operácie, ktoré pridajú prvok do neobmedzeného frontu, sa nikdy nezablokujú, a tak by mohol narásť do veľmi veľkej veľkosti.

Najdôležitejšou vecou pri navrhovaní programu producent-spotrebiteľ pomocou neobmedzenej BlockingQueue je, že spotrebitelia by mali byť schopní spotrebovať správy tak rýchlo, ako výrobcovia pridávajú správy do frontu. Inak by sa pamäť mohla zaplniť a dostali by sme Nedostatok pamäte výnimkou.

2.2. Ohraničený front

Druhým typom frontov je ohraničený front. Takéto fronty môžeme vytvoriť odovzdaním kapacity ako argumentu konštruktoru:

BlockingQueue blockingQueue = nový LinkedBlockingDeque (10);

Tu máme blockingQueue ktorá má kapacitu rovnú 10. Znamená to, že keď sa producent pokúsi pridať prvok do už úplnej fronty, v závislosti od metódy, ktorá sa použila na jeho pridanie (ponuka (), pridať () alebo put ()), bude sa blokovať, kým nebude k dispozícii miesto na vloženie objektu. V opačnom prípade operácie zlyhajú.

Používanie ohraničenej fronty je dobrý spôsob, ako navrhnúť súbežné programy, pretože keď vložíme prvok do už plnej fronty, tieto operácie musia počkať, kým sa spotrebitelia chytia a vo fronte uvoľnia miesto. Dáva nám škrtenie bez akejkoľvek námahy z našej strany.

3. BlockingQueue API

V dokumente sú dva typy metód BlockingQueue rozhraniemetódy zodpovedné za pridávanie prvkov do frontu a metódy, ktoré tieto prvky načítajú. Každá metóda z týchto dvoch skupín sa chová odlišne, ak je rad plný / prázdny.

3.1. Pridávanie prvkov

  • pridať () - vracia pravda ak bolo vloženie úspešné, inak hodí an IllegalStateException
  • put () - vloží zadaný prvok do poradia a v prípade potreby čaká na voľný slot
  • ponuka () - vracia pravda ak bolo vloženie úspešné, inak nepravdivé
  • ponuka (napr. dlhá doba, jednotka TimeUnit) - sa pokúsi vložiť prvok do poradia a v stanovenom časovom limite čaká na dostupný slot

3.2. Načítavanie prvkov

  • vziať () - čaká na hlavný prvok vo fronte a odstráni ho. Ak je poradie prázdne, zablokuje sa a čaká na sprístupnenie prvku
  • anketa (dlhý časový limit, jednotka TimeUnit) - načíta a odstráni hlavu frontu a v prípade potreby čaká na zadaný čas čakania, kým bude prvok k dispozícii. Vráti sa nulový po uplynutí časového limitu

Tieto metódy sú najdôležitejšou stavebnou jednotkou systému BlockingQueue rozhranie pri vytváraní programov výrobca - spotrebiteľ.

4. Príklad s viacvláknovým výrobcom a spotrebiteľom

Vytvorme program, ktorý sa skladá z dvoch častí - producenta a spotrebiteľa.

Producent bude vyrábať náhodné číslo od 0 do 100 a toto číslo vloží do a BlockingQueue. Budeme mať 4 vlákna výrobcu a budeme používať put () spôsob blokovania, kým nebude vo fronte miesto.

Je potrebné pamätať na to, že musíme zabrániť tomu, aby naše spotrebiteľské vlákna čakali na neurčitý čas, kým sa nejaký prvok objaví v rade.

Dobrou technikou na signalizáciu od výrobcu spotrebiteľovi, že už nie sú žiadne ďalšie správy, je spracovanie, je poslať špeciálnu správu nazvanú jedovatá pilulka. Musíme poslať toľko tabletiek na jed, koľko máme spotrebiteľov. Keď potom spotrebiteľ vezme túto špeciálnu správu o otrave práškom z frontu, ladne dokončí vykonávanie.

Pozrime sa na triedu producentov:

public class NumbersProducer implementuje Runnable {private BlockingQueue numbersQueue; private final int poisonPill; private final int poisonPillPerProducer; public NumbersProducer (BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) {this.numbersQueue = numbersQueue; this.poisonPill = poisonPill; this.poisonPillPerProducer = poisonPillPerProducer; } public void run () {try {generateNumbers (); } catch (InterruptedException e) {Thread.currentThread (). interrupt (); }} private void generateNumbers () hodí InterruptedException {for (int i = 0; i <100; i ++) {numbersQueue.put (ThreadLocalRandom.current (). nextInt (100)); } for (int j = 0; j <poisonPillPerProducer; j ++) {numbersQueue.put (poisonPill); }}}

Náš výrobca konštruktérov berie ako argument BlockingQueue ktorý sa používa na koordináciu spracovania medzi výrobcom a spotrebiteľom. Vidíme túto metódu generateNumbers () umiestni 100 prvkov do poradia. Chce to tiež správu s otravou tabletkou, aby ste vedeli, aký typ správy musí byť zaradený do frontu, keď bude vykonanie ukončené. Túto správu je potrebné uviesť poisonPillPerProducer krát do poradia.

Každý spotrebiteľ si vezme prvok z a BlockingQueue použitím vziať () metóda, takže bude blokovať, kým sa vo fronte nenachádza prvok. Po užití Celé číslo z frontu skontroluje, či je správa jedovatou tabletkou, ak áno, potom je vykonanie vlákna ukončené. V opačnom prípade výsledok vytlačí na štandardný výstup spolu s názvom aktuálneho vlákna.

Takto získate prehľad o vnútornom fungovaní našich spotrebiteľov:

verejná trieda NumbersConsumer implementuje Runnable {private BlockingQueue queue; private final int poisonPill; public NumbersConsumer (blokovanie fronty frontu, int poisonPill) {this.queue = fronta; this.poisonPill = poisonPill; } public void run () {try {while (true) {Integer number = queue.take (); if (number.equals (poisonPill)) {return; } System.out.println (Thread.currentThread (). GetName () + "výsledok:" + číslo); }} catch (InterruptedException e) {Thread.currentThread (). interrupt (); }}}

Dôležité je všimnúť si použitie frontu. Rovnako ako v konštruktore producenta, aj rad sa odovzdáva ako argument. Môžeme to urobiť, pretože BlockingQueue môžu byť zdieľané medzi vláknami bez akejkoľvek výslovnej synchronizácie.

Teraz, keď máme svojho výrobcu a spotrebiteľa, môžeme zahájiť náš program. Musíme definovať kapacitu frontu a nastavili sme ju na 100 prvkov.

Chceme mať 4 vlákna výrobcu a počet vlákien spotrebiteľov sa bude rovnať počtu dostupných procesorov:

int BOUND = 10; int N_PRODUCERS = 4; int N_CONSUMERS = Runtime.getRuntime (). availableProcessors (); int poisonPill = Integer.MAX_VALUE; int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; int mod = N_CONSUMERS% N_PRODUCERS; Fronta BlockingQueue = nová LinkedBlockingQueue (BOUND); pre (int i = 1; i <N_PRODUCERS; i ++) {new Thread (new NumbersProducer (queue, poisonPill, poisonPillPerProducer)). start (); } for (int j = 0; j <N_CONSUMERS; j ++) {new Thread (new NumbersConsumer (queue, poisonPill)). start (); } new Thread (new NumbersProducer (queue, poisonPill, poisonPillPerProducer + mod)). start (); 

BlockingQueue sa vytvára pomocou konštruktu s kapacitou. Vytvárame 4 výrobcov a N spotrebiteľov. Správu o našej tabletke s jedom označujeme ako Celé číslo.MAX_VALUE pretože takúto hodnotu náš výrobca za bežných pracovných podmienok nikdy neodošle. Najdôležitejšie je všimnúť si to tu BlockingQueue sa používa na koordináciu práce medzi nimi.

Po spustení programu budú náhodne umiestňovať 4 vlákna výrobcu Celé čísla v BlockingQueue a spotrebitelia budú brať tieto prvky z poradia. Každé vlákno vytlačí na štandardný výstup názov vlákna spolu s výsledkom.

5. Záver

Tento článok ukazuje praktické využitie BlockingQueue a vysvetľuje metódy, ktoré sa používajú na pridávanie a načítanie prvkov z neho. Tiež sme si ukázali, ako vytvoriť viacvláknový program producent - spotrebiteľ pomocou BlockingQueue koordinovať prácu medzi výrobcami a spotrebiteľmi.

Implementáciu všetkých týchto príkladov a útržkov kódu nájdete v projekte GitHub - jedná sa o projekt založený na Maven, takže by malo byť ľahké ho importovať a spustiť tak, ako je.


$config[zx-auto] not found$config[zx-overlay] not found