Reaktívne prúdy Java 9

1. Prehľad

V tomto článku sa pozrieme na reaktívne prúdy Java 9. Jednoducho povedané, budeme môcť používať Prietok triedy, ktorá obklopuje primárne stavebné bloky pre budovanie logiky spracovania reaktívneho prúdu.

Reaktívne prúdy je štandard pre spracovanie asynchrónneho toku s neblokujúcim spätným tlakom. Táto špecifikácia je definovaná v dokumente Reaktívny manifest, a existujú rôzne jeho implementácie, napríklad RxJava alebo Prúdy Akka.

2. Prehľad reaktívneho API

Postaviť a Prietok, môžeme použiť tri hlavné abstrakcie a zostaviť ich do logiky asynchrónneho spracovania.

Každý Prietok potrebuje spracovať udalosti, ktoré sú v nej publikované inštanciou Publisher; the Vydavateľ má jednu metódu - prihlásiť sa na odber ().

Ak ktorýkoľvek z predplatiteľov chce dostávať udalosti ním zverejnené, musí sa prihlásiť na odber daného udalosti Vydavateľ.

Príjemca správ musí implementovať Predplatiteľ rozhranie. Spravidla je to koniec pre každého Prietok spracovanie, pretože jeho inštancia už neposiela správy.

Môžeme sa zamyslieť Predplatiteľ ako Drez. Má štyri metódy, ktoré je potrebné prepísať - onSubscribe (), onNext (), onError (), a onComplete (). Na tie sa pozrieme v nasledujúcej časti.

Ak chceme transformovať prichádzajúcu správu a odovzdať ju ďalej do ďalšej Predplatiteľ, musíme implementovať procesor rozhranie. Funguje to ako a Predplatiteľ pretože prijíma správy a ako Vydavateľ pretože tieto správy spracováva a posiela ich na ďalšie spracovanie.

3. Publikovanie a konzumácia správ

Povedzme, že chceme vytvoriť jednoduchý Prietok, v ktorej máme a Vydavateľ zverejňovanie správ a jednoduché Predplatiteľ konzumácia správ hneď, ako dorazia - jedna po druhej.

Vytvorme EndSubscriber trieda. Musíme implementovať Predplatiteľ rozhranie. Ďalej prepíšeme požadované metódy.

The onSubscribe () metóda sa volá pred začiatkom spracovania. Inštancia Predplatné sa odovzdáva ako argument. Je to trieda, ktorá sa používa na riadenie toku správ medzi Predplatiteľ a Vydavateľ:

verejná trieda EndSubscriber implementuje Subscriber {súkromné ​​predplatné predplatného; public List consumedElements = new LinkedList (); @Override public void onSubscribe (predplatné predplatného) {this.subscription = predplatné; predplatné.požiadavka (1); }}

Tiež sme inicializovali prázdny Zoznam z spotrebovanéPrvky ktoré sa využijú pri testoch.

Teraz musíme implementovať zostávajúce metódy z Predplatiteľ rozhranie. Hlavná metóda tu je onNext () - volá sa vždy, keď Vydavateľ zverejňuje novú správu:

@Override public void onNext (položka T) {System.out.println ("Mám:" + položka); predplatné.požiadavka (1); }

Upozorňujeme, že keď sme začali predplatné v onSubscribe () metóda a keď sme spracovali správu, musíme zavolať požiadavka () metóda na Predplatné signalizovať, že prúd Predplatiteľ je pripravený spotrebovať viac správ.

Napokon musíme vykonať onError () - ktorý sa volá vždy, keď sa vloží nejaká výnimka do spracovania, rovnako ako onComplete () - zavolal, keď Vydavateľ je zatvorené:

@Override public void onError (Throwable t) {t.printStackTrace (); } @Override public void onComplete () {System.out.println ("Hotovo"); }

Poďme napísať test na Spracovanie Prietok. Budeme používať SubmissionPublisher triedy - konštrukt z java.util.concurrent - ktorým sa vykonáva Vydavateľ rozhranie.

Budeme sa podriaďovať N prvky do Vydavateľ - ktoré naše EndSubscriber dostane:

@ Test public void whenSubscribeToIt_thenShouldConsumeAll () vyvolá InterruptedException {// dané SubmissionPublisher publisher = new SubmissionPublisher (); Predplatiteľ EndSubscriber = nový EndSubscriber (); publisher.subscribe (predplatiteľ); Zoznam položiek = List.of ("1", "x", "2", "x", "3", "x"); // keď assertThat (publisher.getNumberOfSubscribers ()). isEqualTo (1); items.forEach (vydavateľ :: odoslať); publisher.close (); // potom await (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (items)); }

Upozorňujeme, že voláme Zavrieť() metóda na inštancii EndSubscriber. Vyvolá to onComplete () spätné volanie na každom Predplatiteľ z daného Vydavateľ.

Spustenie tohto programu vyprodukuje nasledujúci výstup:

Mám: 1 Mám: x Mám: 2 Mám: x Mám: 3 Mám: x Hotovo

4. Transformácia správ

Povedzme, že chceme vybudovať podobnú logiku medzi a Vydavateľ a a Predplatiteľ, ale uplatniť aj určitú transformáciu.

Vytvoríme TransformProcessor triedy, ktorá implementuje procesor a rozširuje sa SubmissionPublisher - pretože to bude oboje Publisher a Sposlucháč.

Prejdeme do a Funkcia ktorý transformuje vstupy na výstupy:

verejná trieda TransformProcessor rozširuje SubmissionPublisher implementuje Flow.Processor {private Function function; súkromné ​​predplatné Flow.Subscription; public TransformProcessor (funkčná funkcia) {super (); this.function = function; } @Override public void onSubscribe (Flow.Subscription subscription) {this.subscription = subscription; predplatné.požiadavka (1); } @Override public void onNext (T item) {submit (function.apply (item)); predplatné.požiadavka (1); } @Override public void onError (Throwable t) {t.printStackTrace (); } @Override public void onComplete () {close (); }}

Poďme teraz napíš rýchly test s tokom spracovania, v ktorom Vydavateľ vydáva String prvkov.

Náš TransformProcessor bude analyzovať String ako Celé číslo - čo znamená, že tu musí ku konverzii dôjsť:

@ Test public void whenSubscribeAndTransformElements_thenShouldConsumeAll () vyvolá InterruptedException {// dané SubmissionPublisher publisher = new SubmissionPublisher (); TransformProcessor transformProcessor = nový TransformProcessor (Integer :: parseInt); Predplatiteľ EndSubscriber = nový EndSubscriber (); Položky zoznamu = List.of ("1", "2", "3"); List expectResult = List.of (1, 2, 3); // keď publisher.subscribe (transformProcessor); transformProcessor.subscribe (predplatiteľ); items.forEach (vydavateľ :: odoslať); publisher.close (); // potom await (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (expectResult)); }

Upozorňujeme, že volanie na číslo Zavrieť() metóda na základe Vydavateľ spôsobí onComplete () metóda na TransformProcessor byť vyvolaný.

Pamätajte, že všetci vydavatelia v spracovateľskom reťazci musia byť uzatvorení týmto spôsobom.

5. Kontrola dopytu po správach pomocou Predplatné

Povedzme, že chceme spotrebovať iba prvý prvok z predplatného, ​​uplatniť určitú logiku a dokončiť spracovanie. Môžeme použiť požiadavka () spôsob, ako to dosiahnuť.

Upravme naše EndSubscriber spotrebovať iba N počet správ. Toto číslo odovzdáme ako howMuchMessagesConsume argument konštruktora:

verejná trieda EndSubscriber implementuje Subscriber {private AtomicInteger howMuchMessagesConsume; súkromné ​​predplatné; public List consumedElements = new LinkedList (); public EndSubscriber (Integer howMuchMessagesConsume) {this.howMuchMessagesConsume = nový AtomicInteger (howMuchMessagesConsume); } @Override public void onSubscribe (Predplatné predplatného) {this.subscription = predplatné; predplatné.požiadavka (1); } @Override public void onNext (položka T) {howMuchMessagesConsume.decrementAndGet (); System.out.println ("Mám:" + položka); consumedElements.add (položka); if (howMuchMessagesConsume.get ()> 0) {subscription.request (1); }} // ...}

Môžeme požadovať prvky, ako dlho chceme.

Napíšme test, pri ktorom chceme spotrebovať iba jeden prvok z daného Predplatné:

@ Test public void whenRequestForOnlyOneElement_thenShouldConsumeOne () vyvolá InterruptedException {// dané SubmissionPublisher publisher = new SubmissionPublisher (); Predplatiteľ EndSubscriber = nový EndSubscriber (1); publisher.subscribe (predplatiteľ); Zoznam položiek = List.of ("1", "x", "2", "x", "3", "x"); Zoznam sa očakáva = List.of ("1"); // keď assertThat (publisher.getNumberOfSubscribers ()). isEqualTo (1); items.forEach (vydavateľ :: odoslať); publisher.close (); // potom await (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (očakáva sa)); }

Napriek tomu vydavateľ vydáva šesť prvkov, náš EndSubscriber spotrebuje iba jeden prvok, pretože signalizuje dopyt po spracovaní iba tohto jediného prvku.

Použitím požiadavka () metóda na Predplatné, môžeme implementovať sofistikovanejší mechanizmus protitlaku na riadenie rýchlosti spotreby správy.

6. Záver

V tomto článku sme sa pozreli na reaktívne prúdy Java 9.

Videli sme, ako vytvoriť spracovanie Prietok pozostávajúci z a Vydavateľ a a Predplatiteľ. Vytvorili sme zložitejší tok spracovania s transformáciou prvkov pomocou Procesory.

Nakoniec sme použili Predplatné kontrolovať dopyt po prvkoch prostredníctvom Predplatiteľ.

Implementáciu všetkých týchto príkladov a útržkov kódu nájdete v projekte GitHub - jedná sa o projekt Maven, takže by malo byť ľahké ho importovať a spustiť tak, ako je.


$config[zx-auto] not found$config[zx-overlay] not found