Sprievodca po DelayQueue

1. Prehľad

V tomto článku sa pozrieme na DelayQueue konštruovať z java.util.concurrent balíček. Toto je blokovanie, ktoré by sa dalo použiť v programoch producent - spotrebiteľ.

Má veľmi užitočnú vlastnosť - keď chce spotrebiteľ vziať prvok z poradia, môže ho zobrať, až keď uplynie oneskorenie pre tento konkrétny prvok.

2. Vykonávanie Oneskorené pre Prvky v DelayQueue

Každý prvok, do ktorého chceme vložiť DelayQueue musí implementovať Oneskorené rozhranie. Povedzme, že chceme vytvoriť a DelayObject trieda. Inštancie tejto triedy budú zaradené do DelayQueue.

Prejdeme okolo String údaje a delayInMilliseconds as a argumenty jeho konštruktorovi:

verejná trieda DelayObject implementuje oneskorené {súkromné ​​dáta reťazca; private long startTime; public DelayObject (reťazcové dáta, dlhé delayInMilliseconds) {this.data = data; this.startTime = System.currentTimeMillis () + delayInMilliseconds; }

Definujeme a Doba spustenia - toto je čas, kedy by sa mal prvok spotrebovať z frontu. Ďalej musíme implementovať getDelay () metóda - mala by vrátiť zostávajúce oneskorenie spojené s týmto objektom v danej časovej jednotke.

Preto musíme použiť TimeUnit.convert () metóda na vrátenie zostávajúceho oneskorenia v správnom TimeUnit:

@Override public long getDelay (jednotka TimeUnit) {long diff = startTime - System.currentTimeMillis (); návrat unit.convert (diff, TimeUnit.MILLISECONDS); }

Keď sa spotrebiteľ pokúsi vziať nejaký prvok z poradia, DelayQueue vykoná getDelay () zistiť, či je možné tento prvok vrátiť z frontu. Ak getDelay () metóda vráti nulu alebo záporné číslo, znamená to, že by sa dala načítať z frontu.

Musíme tiež implementovať porovnať s() metóda, pretože prvky v DelayQueue budú zoradené podľa času expirácie. Položka, ktorej platnosť vyprší ako prvá, je umiestnená na čele frontu a prvok s najvyššou dobou platnosti je uchovaný na konci fronty:

@Override public int compareTo (oneskorené o) {návrat Ints.saturatedCast (this.startTime - ((DelayObject) o) .startTime); }

3. DelayQueue Codberateľ a výrobca

Aby sme mohli otestovať našu DelayQueue musíme implementovať logiku výrobcov a spotrebiteľov. Trieda producenta berie ako argument frontu, počet prvkov na vyprodukovanie a oneskorenie každej správy v milisekundách.

Potom, keď run () metóda je vyvolaná, dáva prvky do poradia a po každom vložení spí 500 milisekúnd:

verejná trieda DelayQueueProducer implementuje Runnable {private BlockingQueue queue; súkromné ​​celé číslo numberOfElementsToProduce; private Integer delayOfEachProducedMessageMilliseconds; // štandardný konštruktor @Override public void run () {for (int i = 0; i <numberOfElementsToProduce; i ++) {DelayObject object = new DelayObject (UUID.randomUUID (). toString (), delayOfEachProducedMessageMilliseconds); System.out.println ("Vložiť objekt:" + objekt); skus {queue.put (objekt); Závit. Spánok (500); } catch (InterruptedException tj.) {ie.printStackTrace (); }}}}

Spotrebiteľská implementácia je veľmi podobný, ale tiež sleduje počet správ, ktoré boli spotrebované:

verejná trieda DelayQueueConsumer implementuje Runnable {súkromný front BlockingQueue; súkromné ​​celé číslo numberOfElementsToTake; public AtomicInteger numberOfConsumedElements = new AtomicInteger (); // štandardné konštruktory @Override public void run () {for (int i = 0; i <numberOfElementsToTake; i ++) {try {DelayObject object = queue.take (); numberOfConsumedElements.incrementAndGet (); System.out.println ("Zákazník berie:" + objekt); } catch (InterruptedException e) {e.printStackTrace (); }}}}

4. DelayQueue Test používania

Ak chcete vyskúšať správanie DelayQueue, vytvoríme jedno vlákno výrobcu a jedno spotrebiteľské vlákno.

Výrobca bude put () dva objekty do poradia s oneskorením 500 milisekúnd. Test potvrdzuje, že spotrebiteľ spotreboval dve správy:

@ Test public void givenDelayQueue_whenProduceElement _thenShouldConsumeAfterGivenDelay () vyvolá InterruptedException {// given ExecutorService executor = Executors.newFixedThreadPool (2); Fronta BlockingQueue = nová DelayQueue (); int numberOfElementsToProduce = 2; int delayOfEachProducedMessageMilliseconds = 500; Zákazník DelayQueueConsumer = nový DelayQueueConsumer (front, numberOfElementsToProduce); Producent DelayQueueProducer = nový DelayQueueProducer (front, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds); // keď executor.submit (producent); Exekútor.submit (spotrebiteľ); // potom executor.awaitTermination (5, TimeUnit.SECONDS); executor.shutdown (); assertEquals (consumer.numberOfConsumedElements.get (), numberOfElementsToProduce); }

Môžeme pozorovať, že spustenie tohto programu vyprodukuje nasledujúci výstup:

Vložte objekt: {data = '86046157-e8a0-49b2-9cbb-8326124bcab8', startTime = 1494069868007} Zákazník: {data = '86046157-e8a0-49b2-9cbb-8326124bcab8', startTime = 14940698687 údaje 'd47927ef-18c7-449b-b491-5ff30e6795ed', startTime = 1494069868512} Spotreba spotrebiteľom: {data = 'd47927ef-18c7-449b-b491-5ff30e6795ed', startTime = 1494069868512}

Výrobca umiestni objekt a po chvíli sa spotrebuje prvý objekt, pre ktorý vypršalo oneskorenie.

Rovnaká situácia nastala aj v prípade druhého prvku.

5. Spotrebiteľ nie je schopný spotrebovať v danom čase

Povedzme, že máme producenta, ktorý produkuje prvok, ktorý bude vyprší o 10 sekúnd:

int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = 10_000; Zákazník DelayQueueConsumer = nový DelayQueueConsumer (front, numberOfElementsToProduce); Producent DelayQueueProducer = nový DelayQueueProducer (front, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

Spustíme náš test, ktorý sa však ukončí po 5 sekundách. Vzhľadom na vlastnosti DelayQueue, spotrebiteľ nebude môcť správu z fronty spotrebovať, pretože platnosť prvku ešte nevypršala:

Exekútor. predložiť (výrobca); Exekútor.submit (spotrebiteľ); executor.awaitTermination (5, TimeUnit.SECONDS); executor.shutdown (); assertEquals (consumer.numberOfConsumedElements.get (), 0);

Upozorňujeme, že spotrebiteľ numberOfConsumedElements má hodnotu rovnú nule.

6. Výroba prvku s okamžitým vypršaním platnosti

Keď sa implementácie Oneskorené správa getDelay () metóda vráti záporné číslo, to znamená, že danému prvku už vypršala platnosť. V tejto situácii výrobca tento prvok okamžite spotrebuje.

Môžeme otestovať situáciu výroby prvku so záporným oneskorením:

int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = -10_000; Zákazník DelayQueueConsumer = nový DelayQueueConsumer (front, numberOfElementsToProduce); Producent DelayQueueProducer = nový DelayQueueProducer (front, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

Keď spustíme testovací prípad, spotrebiteľ spotrebuje prvok okamžite, pretože jeho platnosť už uplynula:

Exekútor. predložiť (výrobca); Exekútor.submit (spotrebiteľ); executor.awaitTermination (1, TimeUnit.SECONDS); executor.shutdown (); assertEquals (consumer.numberOfConsumedElements.get (), 1);

7. Záver

V tomto článku sme sa pozreli na DelayQueue konštruovať z java.util.concurrent balíček.

Realizovali sme a Oneskorené prvok, ktorý bol vyrobený a spotrebovaný z frontu.

Využili sme našu implementáciu DelayQueue konzumovať prvky, ktorým uplynula doba platnosti.

Implementáciu všetkých týchto príkladov a útržkov kódu nájdete v projekte GitHub - čo je projekt Maven, takže by malo byť ľahké ho importovať a spustiť tak, ako je.


$config[zx-auto] not found$config[zx-overlay] not found