Úvod do aktívnej zóny reaktora

1. Úvod

Reactor Core je knižnica Java 8, ktorá implementuje model reaktívneho programovania. Je postavený na vrchole špecifikácie reaktívnych prúdov, štandardu pre vytváranie reaktívnych aplikácií.

Z pozadia nereaktívneho vývoja Javy môže byť reaktívna cesta dosť strmá krivka učenia. To bude náročnejšie pri porovnaní s Java 8 Prúd API, pretože si ich možno pomýliť s rovnakými abstrakciami na vysokej úrovni.

V tomto článku sa pokúsime demystifikovať túto paradigmu. Urobíme malé kroky cez reaktor, až kým nevytvoríme obraz o tom, ako zostaviť reaktívny kód, čím položíme základ pre ďalšie články, ktoré majú prísť v neskoršej sérii.

2. Špecifikácia reaktívnych prúdov

Predtým, ako sa pozrieme na reaktor, mali by sme sa pozrieť na špecifikáciu reaktívnych prúdov. Toto implementuje reaktor a vytvára základy pre knižnicu.

Reactive Streams je v podstate špecifikácia pre asynchrónne spracovanie toku.

Inými slovami, systém, v ktorom sa veľa udalostí vyrába a spotrebúva asynchrónne. Pomyslite na to, že do finančnej aplikácie prichádza prúd tisícov aktualizácií akcií za sekundu, a na to, aby na tieto aktualizácie musela reagovať včas.

Jedným z hlavných cieľov tohto riešenia je vyriešiť problém protitlaku. Ak máme výrobcu, ktorý emituje udalosti spotrebiteľovi rýchlejšie, ako ich dokáže spracovať, potom bude spotrebiteľ nakoniec zahltený udalosťami, ktorým dôjde systémové prostriedky.

Protitlak znamená, že náš spotrebiteľ by mal byť schopný povedať výrobcovi, koľko údajov má poslať, aby sa tomu zabránilo. To je to, čo je uvedené v špecifikácii.

3. Závislosti Maven

Než začneme, pridajme naše závislosti Maven:

 io.projectreactor reaktor-jadro 3.3.9RELEASE ch.qos.logback logback-classic 1.1.3 

Pridávame tiež Logback ako závislosť. Je to preto, že budeme zaznamenávať výstup reaktora, aby sme lepšie porozumeli toku údajov.

4. Produkcia toku dát

Aby bola aplikácia reaktívna, musí byť v prvom rade schopná vytvoriť prúd dát.

Môže to byť niečo ako príklad aktualizácie skladu, ktorý sme uviedli skôr. Bez týchto údajov by sme nemali na čo reagovať, a preto je toto prvý logický krok.

Reaktívne jadro nám poskytuje dva dátové typy, ktoré nám to umožňujú.

4.1. Flux

Prvý spôsob, ako to urobiť, je a Flux. Je to prúd, ktorý môže vyžarovať 0..n prvkov. Skúsme vytvoriť jednoduchý:

Flux just = Flux.just (1, 2, 3, 4);

V tomto prípade máme statický prúd štyroch prvkov.

4.2. Mono

Druhý spôsob, ako to urobiť, je a Mono, čo je prúd 0..1 prvkov. Skúsme vytvoriť inštanciu jedného:

Mono just = Mono.just (1);

Toto vyzerá a správa sa takmer rovnako ako Flux, iba tentoraz sme obmedzený na najviac jeden prvok.

4.3. Prečo nielen Flux?

Pred ďalším experimentovaním stojí za to zdôrazniť, prečo máme tieto dva typy údajov.

Najskôr je potrebné poznamenať, že tak a Flux a Mono sú implementácie reaktívnych prúdov Vydavateľ rozhranie. Obe triedy vyhovujú špecifikácii a na ich miesto by sme mohli použiť toto rozhranie:

Publisher just = Mono.just ("foo");

Ale v skutočnosti je poznanie tejto mohutnosti užitočné. Je to tak preto, lebo niekoľko operácií má zmysel iba pre jeden z týchto dvoch typov a pretože môže byť expresívnejšie (predstavte si findOne () v úložisku).

5. Prihlásenie na odber streamu

Teraz máme prehľad na vysokej úrovni o tom, ako vytvoriť prúd údajov, musíme sa ho prihlásiť na odber, aby mohol emitovať prvky.

5.1. Zbieranie prvkov

Použime prihlásiť sa na odber () metóda na zhromaždenie všetkých prvkov v streame:

Zoznam prvkov = new ArrayList (); Flux.just (1, 2, 3, 4) .log () .subscribe (elements :: add); assertThat (elements) .containsExactly (1, 2, 3, 4);

Dáta začnú prúdiť, až keď sa prihlásime na odber. Všimnite si, že sme pridali aj nejaké protokolovanie, ktoré bude užitočné, keď sa pozrieme na to, čo sa deje v zákulisí.

5.2. Tok prvkov

Po prihlásení na miesto ho môžeme použiť na vizualizáciu toku údajov naším prúdom:

20: 25: 19,550 [hlavný] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20:25: 19,553 [hlavný] INFO reactor.Flux.Array.1 - | požiadavka (bez obmedzenia) 20: 25: 19,553 [hlavný] INFO reaktor.Flux.Array.1 - | onNext (1) 20: 25: 19,553 [hlavný] INFO reaktor.Flux.Array.1 - | onNext (2) 20: 25: 19,553 [hlavný] INFO reaktor.Flux.Array.1 - | onNext (3) 20: 25: 19,553 [hlavný] INFO reaktor.Flux.Array.1 - | onNext (4) 20: 25: 19,553 [hlavný] INFO reaktor.Flux.Array.1 - | onComplete ()

V prvom rade všetko beží na hlavnom vlákne. Nejdeme o tom nijako podrobnejšie, pretože súbežnosti sa budeme ďalej venovať v tomto článku. Robí to však jednoduchými, pretože môžeme pracovať so všetkým v poriadku.

Teraz si poďme prejsť postupnosťou, ktorú sme si po jednom prihlásili:

  1. onSubscribe () - Toto sa volá, keď sa prihlásime na odber nášho streamu
  2. požiadavka (bez obmedzenia) - Keď voláme prihlásiť sa na odber, v zákulisí, ktoré tvoríme Predplatné. Toto predplatné vyžaduje prvky zo streamu. V takom prípade je predvolené nastavenie bez obmedzenia, To znamená, že vyžaduje všetky dostupné prvky
  3. onNext () - Toto sa volá pri každom jednom prvku
  4. onComplete () - Toto sa nazýva posledné po prijatí posledného prvku. Vlastne existuje onError () tiež, ktorý by sa volal, ak existuje výnimka, ale v tomto prípade nie je

Toto je tok stanovený v Predplatiteľ rozhranie ako súčasť špecifikácie reaktívnych prúdov a v skutočnosti to bolo to, čo sa stalo inštanciou zákulisia našej výzvy onSubscribe (). Je to užitočná metóda, ale aby sme lepšie pochopili, čo sa deje, poskytneme a Predplatiteľ priamo rozhranie:

Flux.just (1, 2, 3, 4) .log () .subscribe (nový Subscriber () {@Override public void onSubscribe (Subscription s) {s.request (Long.MAX_VALUE);} @Override public void onNext ( Celé číslo celé číslo) {elements.add (integer);} @Override public void onError (Throwable t) {} @Override public void onComplete () {}});

Vidíme, že každá možná etapa vo vyššie uvedených vývojových mapách mapuje metódu v Predplatiteľ implementácia. Stáva sa len to, že Flux nám poskytol pomocnú metódu na zníženie tejto výrečnosti.

5.3. Porovnanie s Java 8 Prúdy

Stále by sa mohlo zdať, že máme niečo synonymom pre Java 8 Prúd robiť zbierať:

Zhromaždený zoznam = Stream.of (1, 2, 3, 4) .collect (toList ());

Len my nie.

Hlavný rozdiel je v tom, že Reactive je push model, zatiaľ čo Java 8 Prúdy sú ťahový model. Reaktívnym prístupom sú udalosti tlačil predplatiteľom, keď vstúpia.

Ďalšia vec, ktorú si treba všimnúť, je Prúdy operátor terminálu je práve ten, terminál, ktorý stiahne všetky dáta a vráti výsledok. S Reactive by sme mohli mať nekonečný prúd prichádzajúci z externého zdroja, s viacerými účastníkmi pripojenými a odstránenými ad hoc. Môžeme tiež robiť veci, ako je kombinovanie prúdov, škrtiacich prúdov a použitie protitlaku, ktorému sa budeme venovať ďalej.

6. Protitlak

Ďalšou vecou, ​​ktorú by sme mali zvážiť, je protitlak. V našom príklade predplatiteľ hovorí výrobcovi, aby tlačil každý jeden prvok naraz. To by mohlo skončiť pre odberateľa ohromujúcim a spotrebujúcim všetky jeho zdroje.

Protitlak je, keď následný podnik môže povedať, že dodávateľský priemysel mu má posielať menej dát, aby zabránil jeho preťaženiu..

Môžeme upraviť naše Predplatiteľ implementácia uplatniť protitlak. Povedzme upstream, aby poslal iba dva prvky naraz pomocou požiadavka ():

Flux.just (1, 2, 3, 4) .log () .subscribe (new Subscriber () {private Subscription s; int onNextAmount; @Override public void onSubscribe (Subscription s) {this.s = s; s.request (2);} @Override public void onNext (Integer integer) {elements.add (integer); onNextAmount ++; if (onNextAmount% 2 == 0) {s.request (2);}} @Override public void onError (Throwable t) {} @Override public void onComplete () {}});

Teraz, ak náš kód spustíme znova, uvidíme požiadavka (2) sa volá a za nimi nasledujú dve onNext () volá teda požiadavka (2) ešte raz.

23: 31: 15 395 [hlavný] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchrónne taviteľné] FluxArray.ArraySubscription) 23:31: 15,397 [hlavné] INFO reactor.Flux.Array.1 - | požiadavka (2) 23: 31: 15.397 [hlavná] INFO reaktor.Flux.Array.1 - | onNext (1) 23: 31: 15.398 [hlavný] INFO reaktor.Flux.Array.1 - | onNext (2) 23: 31: 15.398 [hlavný] INFO reaktor.Flux.Array.1 - | požiadavka (2) 23: 31: 15.398 [hlavný] INFO reactor.Flux.Array.1 - | onNext (3) 23: 31: 15.398 [hlavný] INFO reaktor.Flux.Array.1 - | onNext (4) 23: 31: 15.398 [hlavný] INFO reaktor.Flux.Array.1 - | požiadavka (2) 23: 31: 15.398 [hlavná] INFO reaktor.Flux.Array.1 - | onComplete ()

Jedná sa v podstate o reaktívny protitlak. Žiadame horný tok, aby tlačil iba na určité množstvo prvkov, a to len vtedy, keď sme pripravení.

Ak si predstavíme, že sa nám streamujú tweety z twitteru, potom by bolo na rade proti prúdu, aby sa rozhodla, čo urobí. Ak prichádzali tweety, ale neexistujú žiadne požiadavky od odberateľa, potom by odberateľ mohol upustiť od položiek, uložiť ich do medzipamäte alebo do inej stratégie.

7. Prevádzka v prúde

Môžeme tiež vykonávať operácie s údajmi v našom streame a reagovať na udalosti, ako uznáme za vhodné.

7.1. Mapovanie údajov v prúde

Jednoduchá operácia, ktorú môžeme vykonať, je použitie transformácie. V takom prípade zdvojnásobíme všetky čísla v našom streame:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .subscribe (elements :: add);

mapa () sa uplatní, keď onNext () sa volá.

7.2. Kombinácia dvoch prúdov

Potom môžeme urobiť veci zaujímavejšími spojením iného streamu s týmto streamom. Skúsme to pomocou PSČ() funkcie:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .zipWith (Flux.range (0, Integer.MAX_VALUE), (jeden, dva) -> String.format ("Prvý tok:% d, Druhý tok:% d", jeden, dva)) .subscribe (elements :: add); assertThat (elements) .containsExactly ("Prvý tok: 2, Druhý tok: 0", "Prvý tok: 4, Druhý tok: 1", "Prvý tok: 6, Druhý tok: 2", "Prvý tok: 8, Druhý Tok: 3 ");

Tu tvoríme ďalší Flux ktorá sa zvyšuje o jednu a streamuje ju spolu s našou pôvodnou. Ich vzájomnú spoluprácu môžeme vidieť pri kontrole protokolov:

20: 04: 38,064 [hlavný] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 04: 38,065 [hlavný] INFO reactor.Flux.Array.1 - | onNext (1) 20: 04: 38,066 [hlavný] INFO reaktor.Flux.Range.2 - | onSubscribe ([Synchronous Fuseable] FluxRange.RangeSubscription) 20: 04: 38,066 [hlavný] INFO reactor.Flux.Range.2 - | onNext (0) 20: 04: 38.067 [hlavný] INFO reaktor.Flux.Array.1 - | onNext (2) 20: 04: 38.067 [hlavný] INFO reaktor.Flux.Range.2 - | onNext (1) 20: 04: 38,067 [hlavný] INFO reaktor.Flux.Array.1 - | onNext (3) 20: 04: 38.067 [hlavný] INFO reaktor.Flux.Range.2 - | onNext (2) 20: 04: 38.067 [hlavný] INFO reaktor.Flux.Array.1 - | onNext (4) 20: 04: 38,067 [hlavný] INFO reaktor.Flux.Range.2 - | onNext (3) 20: 04: 38.067 [hlavný] INFO reaktor.Flux.Array.1 - | onComplete () 20: 04: 38,067 [hlavný] INFO reaktor.Flux.Array.1 - | cancel () 20: 04: 38.067 [hlavný] INFO reactor.Flux.Range.2 - | Zrušiť()

Všimnite si, ako teraz máme jedno predplatné na Flux. The onNext () hovory sa tiež striedajú, takže index každého prvku v streame sa bude zhodovať, keď použijeme znak PSČ() funkcie.

8. Horúce toky

V súčasnosti sme sa zameriavali predovšetkým na studené toky. Jedná sa o statické prúdy s pevnou dĺžkou, s ktorými sa dá ľahko pracovať. Realistickejším prípadom použitia reaktívnych látok môže byť niečo, čo sa deje nekonečne dlho.

Mohli by sme mať napríklad prúd pohybov myši, na ktorý je treba neustále reagovať, alebo poslať twitter. Tieto typy prúdov sa nazývajú horúce prúdy, pretože stále prebiehajú a je možné si ich predplatiť kedykoľvek, pričom sa nezačne začiatok údajov.

8.1. Vytvorenie a Pripojiteľný tok

Jedným zo spôsobov, ako vytvoriť horúci prúd, je prevedenie studeného prúdu do jedného. Vytvorme a Flux ktorý trvá večne a výstup výsledkov do konzoly, ktorá by simulovala nekonečný tok údajov prichádzajúcich z externého zdroja:

ConnectableFlux publish = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}) .publish ();

Volaním zverejniť () dostávame a Pripojiteľný tok. To znamená, že volanie prihlásiť sa na odber () nespôsobí, že začne emitovať, čo nám umožní pridať viac predplatných:

publish.subscribe (System.out :: println); publish.subscribe (System.out :: println);

Ak sa pokúsime spustiť tento kód, nič sa nestane. Až keď zavoláme pripojiť (), že Flux začne emitovať:

publish.connect ();

8.2. Škrtenie

Ak spustíme náš kód, naša konzola bude zavalená protokolovaním. Toto simuluje situáciu, keď sa našim zákazníkom odovzdáva príliš veľa údajov. Skúsme to obísť pomocou škrtenia:

ConnectableFlux publish = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}) .sample (ofSeconds (2)) .publish ();

Tu sme predstavili a ukážka () metóda s intervalom dvoch sekúnd. Teraz budú hodnoty odosielané nášmu predplatiteľovi iba každé dve sekundy, čo znamená, že konzola bude oveľa menej hektická.

Existuje samozrejme niekoľko stratégií na zníženie množstva údajov odosielaných po prúde, ako napríklad vytváranie okien a ukladanie do vyrovnávacej pamäte, pre tento článok však budú vynechané.

9. Súbežnosť

Všetky naše vyššie uvedené príklady sú momentálne spustené v hlavnom vlákne. Ak však chceme, môžeme určiť, na ktorom vlákne náš kód beží. The Plánovač rozhranie poskytuje abstrakciu okolo asynchrónneho kódu, pre ktorý je k dispozícii veľa implementácií. Skúsme sa prihlásiť na odber iného vlákna ako main:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .subscribeOn (Schedulers.parallel ()) .subscribe (elements :: add);

The Paralelne Plánovač spôsobí, že naše predplatné bude spustené v inom vlákne, čo dokážeme pri pohľade na protokoly. Vidíme, že prvý záznam pochádza z hlavný vlákno a Flux beží v inom vlákne s názvom paralelne-1.

20:03:27.505 [hlavný] DEBUG reactor.util.Loggers $ LoggerFactory - pomocou protokolovacieho rámca Slf4j 20: 03: 27,529 [paralelne-1] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 03: 27,531 [paralelne-1] INFO reaktor.Flux.Array.1 - | požiadavka (bez obmedzenia) 20: 03: 27,531 [paralelne-1] INFO reaktor.Flux.Array.1 - | onNext (1) 20: 03: 27.531 [paralelne-1] INFO reaktor.Flux.Array.1 - | onNext (2) 20: 03: 27.531 [paralelne-1] INFO reaktor.Flux.Array.1 - | onNext (3) 20: 03: 27.531 [paralelne-1] INFO reaktor.Flux.Array.1 - | onNext (4) 20: 03: 27.531 [paralelne-1] INFO reaktor.Flux.Array.1 - | onComplete ()

Súbežnosť je oveľa zaujímavejšia ako táto a bude stáť za to, aby sme ju preskúmali v inom článku.

10. Záver

V tomto článku sme poskytli komplexný prehľad o reaktívnom jadre na vysokej úrovni. Vysvetlili sme, ako môžeme publikovať a prihlásiť sa na odber streamov, použiť protitlak, pracovať s streammi a tiež asynchrónne spracovávať údaje. Toto by, dúfajme, malo položiť základ pre písanie reaktívnych aplikácií.

Neskoršie články v tejto sérii sa budú venovať pokročilejším koncepciám súbežnosti a ďalším reaktívnym konceptom. Je tu aj ďalší článok, ktorý sa venuje reaktoru s pružinou.

Zdrojový kód našej aplikácie je k dispozícii na stránkach GitHub; toto je projekt Maven, ktorý by mal byť schopný bežať tak, ako je.


$config[zx-auto] not found$config[zx-overlay] not found