Úvod do RxJava

1. Prehľad

V tomto článku sa zameriame na použitie Reactive Extensions (Rx) v Jave na zostavenie a skonzumovanie sekvencií údajov.

Na prvý pohľad môže API vyzerať podobne ako Java 8 Streams, ale v skutočnosti je oveľa flexibilnejšie a plynulejšie, čo z neho robí výkonnú paradigmu programovania.

Ak si chcete prečítať viac o RxJava, pozrite si tento zápis.

2. Inštalácia

Ak chcete v našom projekte Maven použiť RxJava, budeme musieť do nášho systému pridať nasledujúcu závislosť pom.xml:

 io.reactivex rxjava $ {rx.java.version} 

Alebo pre projekt Gradle:

kompilujte 'io.reactivex.rxjava: rxjava: x.y.z'

3. Funkčné reaktívne koncepty

Na jednej strane, funkčné programovanie je proces vytvárania softvéru skladaním čistých funkcií, vyhýbaním sa zdieľanému stavu, premenlivým údajom a vedľajším účinkom.

Na druhej strane, reaktívne programovanie je paradigma asynchrónneho programovania zaoberajúca sa dátovými tokmi a šírením zmien.

Spolu, funkčné reaktívne programovanie tvorí kombináciu funkčných a reaktívnych techník, ktoré môžu predstavovať elegantný prístup k programovaniu založenému na udalostiach - s hodnotami, ktoré sa časom menia a kde spotrebiteľ reaguje na dáta tak, ako prichádzajú.

Táto technológia spája rôzne implementácie jej základných princípov, niektorí autori prišli s dokumentom, ktorý definuje bežnú slovnú zásobu pre popis nového typu aplikácií.

3.1. Reaktívny manifest

Reaktívny manifest je online dokument, ktorý stanovuje vysoký štandard pre aplikácie v priemysle vývoja softvéru. Zjednodušene povedané, reaktívne systémy sú:

  • Responzívne - systémy by mali reagovať včas
  • Správa riadená - systémy by mali používať asynchronné odovzdávanie správ medzi komponentmi, aby sa zabezpečilo voľné spojenie
  • Elastické - systémy by mali zostať citlivé aj pri vysokom zaťažení
  • Odolné - systémy by mali zostať citlivé, ak niektoré komponenty zlyhajú

4. Pozorovateľné

Pri práci je potrebné pochopiť dva kľúčové typy Rx:

  • Pozorovateľné predstavuje akýkoľvek objekt, ktorý môže získať údaje zo zdroja údajov a ktorého stav môže byť zaujímavý takým spôsobom, že záujem môžu zaregistrovať iné objekty
  • An pozorovateľ je akýkoľvek objekt, ktorý si želá byť informovaný, keď sa zmení stav iného objektu

An pozorovateľ odoberá Pozorovateľné postupnosť. Sekvencia odošle položky do pozorovateľ jeden po druhom.

The pozorovateľ spracováva každú z nich pred spracovaním ďalšej. Ak veľa udalostí prichádza asynchrónne, musia byť uložené v rade alebo zrušené.

V Rx, an pozorovateľ nikdy nebude volaný s položkou mimo poradia alebo bude volaný pred vrátením spätného volania pre predchádzajúcu položku.

4.1. Druhy Pozorovateľné

Existujú dva typy:

  • Neblokujúce - je podporovaná asynchrónna exekúcia a je možné ju kedykoľvek zrušiť v streame udalosti. V tomto článku sa zameriame predovšetkým na tento druh typu
  • Blokovanie - všetko ďalej hovory pozorovateľov budú synchrónne a uprostred toku udalostí nie je možné zrušiť ich odber. Vždy môžeme previesť Pozorovateľné do a Blokovanie pozorovateľné, pomocou metódy toBlocking:
BlockingObservable blockingObservable = observable.toBlocking ();

4.2. Operátorov

An operátor je funkcia, ktorá berie jednu Observable (zdroj) ako prvý argument a vráti ďalší Pozorovateľné (cieľ). Potom pre každú položku, ktorú pozorovateľ zdroja vydá, použije na túto položku funkciu a potom vydá výsledok na cieľovom mieste. Pozorovateľné.

Operátorov je možné spojiť dohromady a vytvoriť tak zložité dátové toky, ktoré filtrujú udalosti na základe určitých kritérií. Na toho istého možno použiť viac operátorov pozorovateľný.

Nie je ťažké sa dostať do situácie, v ktorej by Pozorovateľné emituje položky rýchlejšie ako operátor alebo pozorovateľ ich môže konzumovať. Viac informácií o protitlaku si môžete prečítať tu.

4.3. Vytvorte pozorovateľné

Základný operátor len vyrába Pozorovateľné ktorá pred dokončením emituje jednu generickú inštanciu, String "Ahoj". Keď chceme získať informácie z Pozorovateľné, implementujeme pozorovateľ rozhranie a potom zavolajte na odber požadovaného Pozorovateľné:

Observable observable = Observable.just ("Ahoj"); observable.subscribe (s -> result = s); assertTrue (result.equals ("Hello"));

4.4. OnNext, OnError, a Zapnuté Dokončené

Existujú tri metódy na pozorovateľ rozhranie, o ktorom chceme vedieť:

  1. OnNext sa volá na našu pozorovateľ zakaždým, keď je nová udalosť zverejnená v prílohe Pozorovateľné. Toto je metóda, pri ktorej pri každej udalosti vykonáme nejakú akciu
  2. Zapnuté Dokončené sa volá, keď je sled udalostí spojený s Pozorovateľné je kompletný, čo naznačuje, že by sme už nemali čakať ďalej vyzýva nášho pozorovateľa
  3. OnError sa volá, keď sa počas RxJava rámcový kód alebo náš kód na spracovanie udalostí

Návratová hodnota pre Pozorovateľnéprihlásiť sa na odber metóda je a prihlásiť sa na odber rozhranie:

Reťazec [] písmená = {"a", "b", "c", "d", "e", "f", "g"}; Pozorovateľný pozorovateľný = Pozorovateľný.z (písmen); observable.subscribe (i -> result + = i, // OnNext Throwable :: printStackTrace, // OnError () -> result + = "_Completed" // OnCompleted); assertTrue (result.equals ("abcdefg_Completed"));

5. Pozorovateľné transformácie a podmienené operátory

5.1. Mapa

Moperátor ap transformuje položky emitované pomocou Pozorovateľné aplikáciou funkcie na každú položku.

Predpokladajme, že existuje deklarované pole reťazcov, ktoré obsahuje niektoré písmená z abecedy, a chceme ich vytlačiť v režime veľkých písmen:

Pozorovateľné. Z (písmen). Mapa (String :: toUpperCase). Odhlásiť sa (písmeno -> výsledok + = písmeno); assertTrue (result.equals ("ABCDEFG"));

The flatMap možno použiť na sploštenie Pozorovateľné kedykoľvek skončíme s vnorenými Pozorovateľné.

Viac podrobností o rozdiele medzi mapa a flatMap nájdete tu.

Za predpokladu, že máme metódu, ktorá vráti Pozorovateľné zo zoznamu reťazcov. Teraz budeme tlačiť na každý reťazec z nového Pozorovateľné zoznam titulov podľa čoho Predplatiteľ vidí:

Pozorovateľné getTitle () {návrat Observable.from (titleList); } Observable.just ("book1", "book2") .flatMap (s -> getTitle ()) .subscribe (l -> vysledok + = l); assertTrue (result.equals ("titletitle"));

5.2. Skenovať

The operátor skenovania aaplikuje funkciu na každú položku emitovanú kódom Pozorovateľné postupne a vydáva každú nasledujúcu hodnotu.

Umožňuje nám prenášať stav z udalosti na udalosť:

Reťazec [] písmená = {"a", "b", "c"}; Pozorovateľné.z (písmen) .scan (nový StringBuilder (), StringBuilder :: append) .subscribe (total -> result + = total.toString ()); assertTrue (result.equals ("aababc"));

5.3. GroupBy

Zoskupiť podľa operátor nám umožňuje klasifikovať udalosti vo vstupe Pozorovateľné do výstupných kategórií.

Predpokladajme, že sme vytvorili pole celých čísel od 0 do 10, potom platí zoskupiť podľa tým sa rozdelia do kategórií dokonca a zvláštny:

Observable.from (numbers) .groupBy (i -> 0 == (i% 2)? "EVEN": "ODD") .subscribe (group -> group.subscribe ((number) -> {if (group.getKey) () .toString (). equals ("EVEN")) {EVEN [0] + = number;} else {ODD [0] + = number;}})); assertTrue (EVEN [0] .equals ("0246810")); assertTrue (ODD [0] .equals ("13579"));

5.4. Filtrovať

Prevádzkovateľ filter vydáva iba tie položky z pozorovateľný že prejsť a predikát test.

Poďme teda filtrovať celé pole pre nepárne čísla:

Pozorovateľné.z (čísel) .filtre (i -> (i% 2 == 1)). Prihlásiť sa na odber (i -> výsledok + = i); assertTrue (result.equals ("13579"));

5.5. Podmienení operátori

DefaultIfEmpty vydáva položku zo zdroja Pozorovateľné, alebo predvolená položka, ak je zdroj Pozorovateľné je prázdny:

Observable.empty () .defaultIfEmpty ("Pozorovateľné je prázdne") .subscribe (s -> výsledok + = s); assertTrue (result.equals ("Pozorovateľné je prázdne"));

Nasledujúci kód vydáva prvé písmeno abecedy „a ' pretože pole písmená nie je prázdny a na prvom mieste obsahuje:

Observable.from (letters) .defaultIfEmpty ("Observable is empty") .first () .subscribe (s -> result + = s); assertTrue (result.equals ("a"));

TakeWhile operátor zahodí položky emitované Pozorovateľné keď sa zadaná podmienka stane nepravdivou:

Pozorovateľné.z (čísel) .takeWhile (i -> i súčet [0] + = s); assertTrue (suma [0] == 10);

Samozrejme, existuje viac ďalších operátorov, ktorí by mohli pokryť naše potreby Contain, SkipWhile, SkipUntil, TakeUntil, atď.

6. Pripojiteľné pozorovateľné

A Pripojiteľné Pozorovateľné pripomína obyčajný Pozorovateľné, až na to, že nezačne vydávať položky, keď sa prihlásite na odber, ale iba keď spojiť operátor.

Týmto spôsobom môžeme čakať, kým sa všetci zamýšľaní pozorovatelia prihlásia k Pozorovateľné pred Pozorovateľné začne emitovať položky:

Reťazec [] result = {""}; ConnectableObservable connectable = Observable.interval (200, TimeUnit.MILLISECONDS) .publish (); connectable.subscribe (i -> výsledok [0] + = i); assertFalse (výsledok [0] .equals ("01")); connectable.connect (); Závit. Spánok (500); assertTrue (výsledok [0] .equals ("01"));

7. Slobodný

Slobodný je ako Pozorovateľné ktorý namiesto emisie sérií hodnôt vydá jednu hodnotu alebo chybové hlásenie.

Pri tomto zdroji údajov môžeme na prihlásenie použiť iba dve metódy:

  • Úspech vracia a Slobodný ktorá volá aj metódu, ktorú zadáme
  • OnError tiež vracia a Slobodný ktorá predplatiteľom okamžite oznámi chybu
Reťazec [] result = {""}; Single single = Observable.just ("Hello") .toSingle () .doOnSuccess (i -> výsledok [0] + = i) .doOnError (chyba -> {hodiť novú RuntimeException (error.getMessage ());}); single.subscribe (); assertTrue (výsledok [0] .equals („ahoj“));

8. Predmety

A Predmet je súčasne dva prvky, a predplatiteľ a an pozorovateľný. Ako predplatiteľ môže byť subjekt použitý na zverejnenie udalostí pochádzajúcich z viac ako jedného pozorovateľného subjektu.

A pretože je tiež pozorovateľný, udalosti od viacerých predplatiteľov môžu byť vrátené ako ich udalosti každému, kto ich pozoruje.

V nasledujúcom príklade sa pozrieme na to, ako budú môcť pozorovatelia vidieť udalosti, ku ktorým dôjde po prihlásení na odber:

Celé číslo predplatiteľ1 = 0; Celé číslo predplatiteľ2 = 0; Observer getFirstObserver () {return new Observer () {@Override public void onNext (Integer value) {subscriber1 + = value; } @Override public void onError (Throwable e) {System.out.println ("chyba"); } @Override public void onCompleted () {System.out.println ("Účastník1 dokončený"); }}; } Observer getSecondObserver () {return new Observer () {@Override public void onNext (Integer value) {subscriber2 + = value; } @Override public void onError (Throwable e) {System.out.println ("chyba"); } @Override public void onCompleted () {System.out.println ("Subscriber2 Complete"); }}; } PublishSubject subject = PublishSubject.create (); subject.subscribe (getFirstObserver ()); subject.onNext (1); subject.onNext (2); subject.onNext (3); subject.subscribe (getSecondObserver ()); subject.onNext (4); subject.onCompleted (); assertTrue (predplatiteľ1 + predplatiteľ2 == 14)

9. Správa zdrojov

Použitím Táto operácia nám umožňuje priradiť zdroje, ako napríklad databázové pripojenie JDBC, sieťové pripojenie alebo otvorené súbory k našim pozorovateľným položkám.

Tu uvádzame v komentároch kroky, ktoré musíme urobiť na dosiahnutie tohto cieľa, a tiež príklad implementácie:

Reťazec [] result = {""}; Pozorovateľné hodnoty = Observable.using (() -> "MyResource", r -> {návrat Observable.create (o -> {for (Character c: r.toCharArray ()) {o.onNext (c);} o. onCompleted ();});}, r -> System.out.println ("Zlikvidované:" + r)); hodnoty.subscribe (v -> výsledok [0] + = v, e -> výsledok [0] + = e); assertTrue (výsledok [0] .equals ("MyResource"));

10. Záver

V tomto článku sme hovorili o tom, ako používať knižnicu RxJava a tiež ako preskúmať jej najdôležitejšie funkcie.

Celý zdrojový kód projektu vrátane všetkých tu použitých vzorových kódov nájdete na Githube.


$config[zx-auto] not found$config[zx-overlay] not found