RxJava 2 - tekutá

1. Úvod

RxJava je implementácia Java Reactive Extensions Java, ktorá nám umožňuje písať asynchrónne aplikácie riadené udalosťami. Viac informácií o tom, ako používať RxJava, nájdete v našom úvodnom článku tu.

RxJava 2 bola prepísaná úplne od nuly, čo prinieslo niekoľko nových funkcií; niektoré z nich boli vytvorené ako reakcia na problémy, ktoré existovali v predchádzajúcej verzii rámca.

Jednou z takýchto funkcií je io.reactivex.Flowable.

2. Pozorovateľné vs. Tekutý

V predchádzajúcej verzii RxJava bola iba jedna základná trieda pre prácu so zdrojmi uvedomujúcimi si spätný tlak a bez neho - Pozorovateľné.

RxJava 2 zaviedla jasný rozdiel medzi týmito dvoma druhmi zdrojov - zdroje pracujúce na protitlak sú teraz reprezentované pomocou vyhradenej triedy - Tekutý.

Pozorovateľné zdroje nepodporujú protitlak. Z tohto dôvodu by sme ho mali používať na zdroje, ktoré iba konzumujeme a nemôžeme na ne mať vplyv.

Pokiaľ máme do činenia s veľkým počtom prvkov, môžu sa vyskytnúť dva možné scenáre spojené s protitlakom v závislosti od typu Pozorovateľné.

V prípade použitia tzv chladný Pozorovateľné“, udalosti sa vysielajú lenivo, takže sme v bezpečí pred preplnením pozorovateľa.

Pri použití a horúci Pozorovateľnétoto však bude aj naďalej emitovať udalosti, aj keď spotrebiteľ nestíha.

3. Vytvorenie a Tekutý

Existujú rôzne spôsoby, ako vytvoriť Tekutý. Pohodlne pre nás tieto metódy vyzerajú podobne ako metódy v Pozorovateľné v prvej verzii RxJava.

3.1. Jednoduché Tekutý

Môžeme vytvoriť Tekutý pomocou iba () metóda podobne, ako by sme mohli s Pozorovateľné:

Flowable integerFlowable = Flowable.just (1, 2, 3, 4);

Aj keď sa používa iba () je celkom jednoduchý, vytvorenie a Tekutý zo statických údajov a používa sa na účely testovania.

3.2. Tekutý od Pozorovateľné

Keď máme Pozorovateľné môžeme to ľahko premeniť na Tekutý pomocou toFlowable () metóda:

Observable integerObservable = Observable.just (1, 2, 3); Flowable integerFlowable = integerObservable .toFlowable (BackpressureStrategy.BUFFER);

Všimnite si, že aby sme mohli vykonať konverziu, musíme obohatiť Pozorovateľné s BackpressureStrategy. Dostupné stratégie si popíšeme v nasledujúcej časti.

3.3. Tekutý od FlowableOnSubscribe

RxJava 2 predstavila funkčné rozhranie FlowableOnSubscribe, čo predstavuje a Tekutý ktorá začne emitovať udalosti potom, ako sa spotrebiteľ prihlási na odber.

Z tohto dôvodu budú všetci klienti dostávať rovnaký súbor udalostí FlowableOnSubscribe bezpečné pre tlak.

Keď máme FlowableOnSubscribe môžeme ho použiť na vytvorenie Tekutý:

FlowableOnSubscribe flowableOnSubscribe = flowable -> flowable.onNext (1); Flowable integerFlowable = Flowable .create (flowableOnSubscribe, BackpressureStrategy.BUFFER);

Dokumentácia popisuje oveľa viac metód na vytvorenie Tekutý.

4. TekutýBackpressureStrategy

Niektoré metódy ako toFlowable () alebo vytvoriť () vziať a BackpressureStrategy ako argument.

The BackpressureStrategy je výčet, ktorý definuje protitlakové správanie, ktoré použijeme na naše Tekutý.

Môže ukladať udalosti do medzipamäte alebo ich zrušiť alebo vôbec neimplementovať žiadne správanie. V poslednom prípade budeme za jeho definovanie zodpovední pomocou operátorov spätného tlaku.

BackpressureStrategy je podobný BackpressureMode prítomný v predchádzajúcej verzii RxJava.

V RxJava 2 je k dispozícii päť rôznych stratégií.

4.1. Nárazník

Ak použijeme BackpressureStrategy.BUFFER, zdroj bude ukladať do vyrovnávacej pamäte všetky udalosti, kým ich účastník nedokáže spotrebovať:

public void thenAllValuesAreBufferedAndReceived () {List testList = IntStream.range (0, 100000) .boxed () .collect (Collectors.toList ()); Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = pozorovateľný .toFlowable (BackpressureStrategy.BUFFER) .observeOn (Schedulers.computation ()). Test (); testSubscriber.awaitTerminalEvent (); Zoznam receiveInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (objekt -> (int) objekt) .boxed () .collect (Collectors.toList ()); assertEquals (testList, receiveInts); }

Je to podobné ako s vyvolaním onBackpressureBuffer () metóda na Tekutý, ale neumožňuje výslovne definovať veľkosť vyrovnávacej pamäte alebo akciu onOverflow.

4.2. Pokles

Môžeme použiť BackpressureStrategy.DROP zahodiť udalosti, ktoré nie je možné spotrebovať, namiesto ich ukladania do vyrovnávacej pamäte.

Opäť je to podobné ako pri použití onBackpressureDrop() na Tekutý:

public void whenDropStrategyUsed_thenOnBackpressureDropped () {Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = pozorovateľný .toFlowable (BackpressureStrategy.DROP) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); Zoznam receiveInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (objekt -> (int) objekt) .boxed () .collect (Collectors.toList ()); assertThat (receiveInts.size () <testList.size ()); assertThat (! receiveInts.contains (100000)); }

4.3. Najnovšie

Pomocou BackpressureStrategy.LATEST prinúti zdroj uchovávať iba najnovšie udalosti, a teda prepíše všetky predchádzajúce hodnoty, ak spotrebiteľ nestíha:

public void whenLatestStrategyUsed_thenTheLastElementReceived () {Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = pozorovateľný .toFlowable (BackpressureStrategy.LATEST) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); Zoznam receiveInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (objekt -> (int) objekt) .boxed () .collect (Collectors.toList ()); assertThat (receiveInts.size () <testList.size ()); assertThat (receiveInts.contains (100000)); }

BackpressureStrategy.LATEST a BackpressureStrategy.DROP vyzerajú veľmi podobne, keď sa pozrieme na kód.

Avšak BackpressureStrategy.LATEST prepíše prvky, ktoré náš predplatiteľ nedokáže spracovať, a ponechá si iba tie najnovšie, odtiaľ pochádza aj názov.

BackpressureStrategy.DROP, na druhej strane zahodí prvky, s ktorými sa nedá manipulovať. To znamená, že najnovšie prvky nebudú nevyhnutne emitované.

4.4. Chyba

Keď používame BackpressureStrategy.ERROR, jednoducho to hovoríme neočakávame, že dôjde k protitlaku. V dôsledku toho a MissingBackpressureException Ak spotrebiteľ nedokáže držať krok so zdrojom, mali by byť vyhodení:

public void whenErrorStrategyUsed_thenExceptionIsThrown () {Observable observable = Observable.range (1, 100000); Odberateľ TestSubscriber = pozorovateľný .toFlowable (BackpressureStrategy.ERROR) .observeOn (Schedulers.computation ()) .test (); subscriber.awaitTerminalEvent (); subscriber.assertError (MissingBackpressureException.class); }

4.5. Chýba

Ak použijeme BackpressureStrategy.MISSING, zdroj bude tlačiť prvky bez zahodenia alebo medzipamäte.

Následný prúd bude musieť v tomto prípade čeliť pretečeniu:

public void whenMissingStrategyUsed_thenException () {Observable observable = Observable.range (1, 100000); Odberateľ TestSubscriber = pozorovateľný .toFlowable (BackpressureStrategy.MISSING) .observeOn (Schedulers.computation ()) .test (); subscriber.awaitTerminalEvent (); subscriber.assertError (MissingBackpressureException.class); }

V našich testoch sme výnimoční MissingbackpressureException pre oba CHYBA a CHÝBAJÚ stratégie. Pretože obaja vyvolajú takúto výnimku, keď dôjde k preplneniu vnútornej vyrovnávacej pamäte zdroja.

Je však potrebné poznamenať, že obaja majú iný účel.

Prvý by sme mali použiť, keď vôbec neočakávame protitlak, a chceme, aby zdroj v prípade, že nastane, hodil výnimku.

Posledný z nich by sa dal použiť, ak nechceme špecifikovať predvolené správanie pri vytváraní súboru Tekutý. Neskôr to definujeme pomocou operátorov protitlaku.

5. Zhrnutie

V tomto tutoriáli sme predstavili novú triedu uvedenú v RxJava 2 zavolal Tekutý.

Ak sa chcete dozvedieť viac informácií o Tekutý sám o sebe a je to API, ktoré môžeme odkázať na dokumentáciu.

Ako vždy všetky vzorky kódu nájdete na GitHub.


$config[zx-auto] not found$config[zx-overlay] not found