RxJava 2 - tekutá
1. Úvod
RxJava je implementácia Java Reactive Extensions Java, ktorá nám umožňuje písať asynchrónne aplikácie riadené udalosťami. Viac informácií o tom, ako používať RxJava, nájdete v našom úvodnom článku tu.
RxJava 2 bola prepísaná úplne od nuly, čo prinieslo niekoľko nových funkcií; niektoré z nich boli vytvorené ako reakcia na problémy, ktoré existovali v predchádzajúcej verzii rámca.
Jednou z takýchto funkcií je io.reactivex.Flowable.
2. Pozorovateľné vs. Tekutý
V predchádzajúcej verzii RxJava bola iba jedna základná trieda pre prácu so zdrojmi uvedomujúcimi si spätný tlak a bez neho - Pozorovateľné.
RxJava 2 zaviedla jasný rozdiel medzi týmito dvoma druhmi zdrojov - zdroje pracujúce na protitlak sú teraz reprezentované pomocou vyhradenej triedy - Tekutý.
Pozorovateľné zdroje nepodporujú protitlak. Z tohto dôvodu by sme ho mali používať na zdroje, ktoré iba konzumujeme a nemôžeme na ne mať vplyv.
Pokiaľ máme do činenia s veľkým počtom prvkov, môžu sa vyskytnúť dva možné scenáre spojené s protitlakom v závislosti od typu Pozorovateľné.
V prípade použitia tzv “chladný Pozorovateľné“, udalosti sa vysielajú lenivo, takže sme v bezpečí pred preplnením pozorovateľa.
Pri použití a “horúci Pozorovateľné” toto však bude aj naďalej emitovať udalosti, aj keď spotrebiteľ nestíha.
3. Vytvorenie a Tekutý
Existujú rôzne spôsoby, ako vytvoriť Tekutý. Pohodlne pre nás tieto metódy vyzerajú podobne ako metódy v Pozorovateľné v prvej verzii RxJava.
3.1. Jednoduché Tekutý
Môžeme vytvoriť Tekutý pomocou iba () metóda podobne, ako by sme mohli s Pozorovateľné:
Flowable integerFlowable = Flowable.just (1, 2, 3, 4);
Aj keď sa používa iba () je celkom jednoduchý, vytvorenie a Tekutý zo statických údajov a používa sa na účely testovania.
3.2. Tekutý od Pozorovateľné
Keď máme Pozorovateľné môžeme to ľahko premeniť na Tekutý pomocou toFlowable () metóda:
Observable integerObservable = Observable.just (1, 2, 3); Flowable integerFlowable = integerObservable .toFlowable (BackpressureStrategy.BUFFER);
Všimnite si, že aby sme mohli vykonať konverziu, musíme obohatiť Pozorovateľné s BackpressureStrategy. Dostupné stratégie si popíšeme v nasledujúcej časti.
3.3. Tekutý od FlowableOnSubscribe
RxJava 2 predstavila funkčné rozhranie FlowableOnSubscribe, čo predstavuje a Tekutý ktorá začne emitovať udalosti potom, ako sa spotrebiteľ prihlási na odber.
Z tohto dôvodu budú všetci klienti dostávať rovnaký súbor udalostí FlowableOnSubscribe bezpečné pre tlak.
Keď máme FlowableOnSubscribe môžeme ho použiť na vytvorenie Tekutý:
FlowableOnSubscribe flowableOnSubscribe = flowable -> flowable.onNext (1); Flowable integerFlowable = Flowable .create (flowableOnSubscribe, BackpressureStrategy.BUFFER);
Dokumentácia popisuje oveľa viac metód na vytvorenie Tekutý.
4. TekutýBackpressureStrategy
Niektoré metódy ako toFlowable () alebo vytvoriť () vziať a BackpressureStrategy ako argument.
The BackpressureStrategy je výčet, ktorý definuje protitlakové správanie, ktoré použijeme na naše Tekutý.
Môže ukladať udalosti do medzipamäte alebo ich zrušiť alebo vôbec neimplementovať žiadne správanie. V poslednom prípade budeme za jeho definovanie zodpovední pomocou operátorov spätného tlaku.
BackpressureStrategy je podobný BackpressureMode prítomný v predchádzajúcej verzii RxJava.
V RxJava 2 je k dispozícii päť rôznych stratégií.
4.1. Nárazník
Ak použijeme BackpressureStrategy.BUFFER, zdroj bude ukladať do vyrovnávacej pamäte všetky udalosti, kým ich účastník nedokáže spotrebovať:
public void thenAllValuesAreBufferedAndReceived () {List testList = IntStream.range (0, 100000) .boxed () .collect (Collectors.toList ()); Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = pozorovateľný .toFlowable (BackpressureStrategy.BUFFER) .observeOn (Schedulers.computation ()). Test (); testSubscriber.awaitTerminalEvent (); Zoznam receiveInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (objekt -> (int) objekt) .boxed () .collect (Collectors.toList ()); assertEquals (testList, receiveInts); }
Je to podobné ako s vyvolaním onBackpressureBuffer () metóda na Tekutý, ale neumožňuje výslovne definovať veľkosť vyrovnávacej pamäte alebo akciu onOverflow.
4.2. Pokles
Môžeme použiť BackpressureStrategy.DROP zahodiť udalosti, ktoré nie je možné spotrebovať, namiesto ich ukladania do vyrovnávacej pamäte.
Opäť je to podobné ako pri použití onBackpressureDrop() na Tekutý:
public void whenDropStrategyUsed_thenOnBackpressureDropped () {Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = pozorovateľný .toFlowable (BackpressureStrategy.DROP) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); Zoznam receiveInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (objekt -> (int) objekt) .boxed () .collect (Collectors.toList ()); assertThat (receiveInts.size () <testList.size ()); assertThat (! receiveInts.contains (100000)); }
4.3. Najnovšie
Pomocou BackpressureStrategy.LATEST prinúti zdroj uchovávať iba najnovšie udalosti, a teda prepíše všetky predchádzajúce hodnoty, ak spotrebiteľ nestíha:
public void whenLatestStrategyUsed_thenTheLastElementReceived () {Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = pozorovateľný .toFlowable (BackpressureStrategy.LATEST) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); Zoznam receiveInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (objekt -> (int) objekt) .boxed () .collect (Collectors.toList ()); assertThat (receiveInts.size () <testList.size ()); assertThat (receiveInts.contains (100000)); }
BackpressureStrategy.LATEST a BackpressureStrategy.DROP vyzerajú veľmi podobne, keď sa pozrieme na kód.
Avšak BackpressureStrategy.LATEST prepíše prvky, ktoré náš predplatiteľ nedokáže spracovať, a ponechá si iba tie najnovšie, odtiaľ pochádza aj názov.
BackpressureStrategy.DROP, na druhej strane zahodí prvky, s ktorými sa nedá manipulovať. To znamená, že najnovšie prvky nebudú nevyhnutne emitované.
4.4. Chyba
Keď používame BackpressureStrategy.ERROR, jednoducho to hovoríme neočakávame, že dôjde k protitlaku. V dôsledku toho a MissingBackpressureException Ak spotrebiteľ nedokáže držať krok so zdrojom, mali by byť vyhodení:
public void whenErrorStrategyUsed_thenExceptionIsThrown () {Observable observable = Observable.range (1, 100000); Odberateľ TestSubscriber = pozorovateľný .toFlowable (BackpressureStrategy.ERROR) .observeOn (Schedulers.computation ()) .test (); subscriber.awaitTerminalEvent (); subscriber.assertError (MissingBackpressureException.class); }
4.5. Chýba
Ak použijeme BackpressureStrategy.MISSING, zdroj bude tlačiť prvky bez zahodenia alebo medzipamäte.
Následný prúd bude musieť v tomto prípade čeliť pretečeniu:
public void whenMissingStrategyUsed_thenException () {Observable observable = Observable.range (1, 100000); Odberateľ TestSubscriber = pozorovateľný .toFlowable (BackpressureStrategy.MISSING) .observeOn (Schedulers.computation ()) .test (); subscriber.awaitTerminalEvent (); subscriber.assertError (MissingBackpressureException.class); }
V našich testoch sme výnimoční MissingbackpressureException pre oba CHYBA a CHÝBAJÚ stratégie. Pretože obaja vyvolajú takúto výnimku, keď dôjde k preplneniu vnútornej vyrovnávacej pamäte zdroja.
Je však potrebné poznamenať, že obaja majú iný účel.
Prvý by sme mali použiť, keď vôbec neočakávame protitlak, a chceme, aby zdroj v prípade, že nastane, hodil výnimku.
Posledný z nich by sa dal použiť, ak nechceme špecifikovať predvolené správanie pri vytváraní súboru Tekutý. Neskôr to definujeme pomocou operátorov protitlaku.
5. Zhrnutie
V tomto tutoriáli sme predstavili novú triedu uvedenú v RxJava 2 zavolal Tekutý.
Ak sa chcete dozvedieť viac informácií o Tekutý sám o sebe a je to API, ktoré môžeme odkázať na dokumentáciu.
Ako vždy všetky vzorky kódu nájdete na GitHub.