Zaoberanie sa protitlakom pomocou RxJava

1. Prehľad

V tomto článku sa pozrieme na to, ako nám knižnica RxJava pomáha zvládať protitlak.

Jednoducho povedané - RxJava využíva zavedením koncept reaktívnych prúdov Pozorovateľní, ku ktorému jednému alebo mnohým Pozorovatelia sa môže prihlásiť na odber. Zaoberať sa pravdepodobne nekonečnými prúdmi je veľmi náročné, pretože musíme čeliť problému protitlaku.

Nie je ťažké sa dostať do situácie, v ktorej Pozorovateľné emituje položky rýchlejšie, ako ich môže predplatiteľ spotrebovať. Pozrime sa na rôzne riešenia problému rastúceho buffera nespotrebovaných položiek.

2. Horúce Pozorovateľné Versus Cold Pozorovateľné

Najskôr si vytvorme jednoduchú funkciu spotrebiteľa, ktorá sa použije ako spotrebiteľ prvkov z Pozorovateľné ktoré definujeme neskôr:

public class ComputeFunction {public static void compute (Integer v) {try {System.out.println ("compute integer v:" + v); Závit. Spánok (1 000); } catch (InterruptedException e) {e.printStackTrace (); }}}

Náš vypočítať () funkcia jednoducho vytlačí argument. Je dôležité si tu všimnúť vyvolanie a Závit. Spánok (1 000) metóda - robíme to preto, aby sme napodobnili nejakú dlho bežiacu úlohu, ktorá spôsobí Pozorovateľné aby sa položky naplnili rýchlejšie Pozorovateľ ich môže konzumovať.

Máme dva typy Pozorovateľné - horúce a Chladný - ktoré sú úplne odlišné, pokiaľ ide o manipuláciu s protitlakom.

2.1. Chladný Pozorovateľné

Nádcha Pozorovateľné emituje konkrétnu postupnosť položiek, ale môže začať emitovať túto postupnosť, keď je Pozorovateľ považuje za pohodlné a za každú cenu Pozorovateľ želania bez narušenia celistvosti sekvencie. Chladný Pozorovateľné poskytuje položky lenivým spôsobom.

The Pozorovateľ berie prvky, až keď je pripravená na spracovanie tejto položky, a položky sa nemusia v pamäti ukladať do vyrovnávacej pamäte Pozorovateľné pretože sú požadované v móde.

Napríklad ak vytvoríte Pozorovateľné na základe statického rozsahu prvkov od jedného do jedného milióna, to Pozorovateľné by vydával rovnakú postupnosť položiek bez ohľadu na to, ako často sú tieto položky pozorované:

Observable.range (1, 1_000_000) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute);

Keď spustíme náš program, položky budú počítané pomocou Pozorovateľ lenivo a bude o ne požiadané v móde. The Schedulers.computation () metóda znamená, že chceme spustiť našu Pozorovateľ Linux v rámci oblasti výpočtových vlákien v systéme Windows RxJava.

Výstup programu bude pozostávať z výsledku a vypočítať () metóda vyvolaná po jednej položke z Pozorovateľné:

vypočítať celé číslo v: 1 vypočítať celé číslo v: 2 vypočítať celé číslo v: 3 vypočítať celé číslo v: 4 ...

Chladný Pozorovateľné nemusia mať žiadnu formu protitlaku, pretože pracujú ťahom. Príklady predmetov emitovaných prechladnutím Pozorovateľné môžu obsahovať výsledky databázového dotazu, načítania súborov alebo webovej požiadavky.

2.2. Horúce Pozorovateľné

Horúca Pozorovateľné začne generovať položky a ihneď po ich vytvorení ich vydá. Je to v rozpore s prechladnutím Pozorovateľné ťahový model spracovania. Horúce Pozorovateľné vydáva položky svojim vlastným tempom a je na jeho pozorovateľoch, aby držali krok.

Keď Pozorovateľ nie je schopný spotrebovať položky tak rýchlo, ako ich vyrába Pozorovateľné treba ich ukladať do vyrovnávacej pamäte alebo iným spôsobom s nimi manipulovať, pretože zaplnia pamäť a nakoniec spôsobia OutOfMemoryException.

Zvážme príklad horúceho Pozorovateľné, ktorá vyrába 1 milión položiek pre konečného spotrebiteľa, ktorý tieto položky spracováva. Keď vypočítať () metóda v Pozorovateľ spracovanie každej položky trvá nejaký čas, Pozorovateľné začína zapĺňať pamäť položkami, čo spôsobuje zlyhanie programu:

PublishSubject source = PublishSubject.create (); source.observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); IntStream.range (1, 1_000_000) .forEach (zdroj :: onNext); 

Spustenie tohto programu zlyhá s MissingBackpressureException pretože sme nedefinovali spôsob riešenia nadprodukcie Pozorovateľné.

Príklady predmetov emitovaných horúcou látkou Pozorovateľné môžu zahŕňať udalosti myši a klávesnice, udalosti systému alebo ceny akcií.

3. Vyrovnávanie nadprodukcie Pozorovateľné

Prvý spôsob riešenia nadmernej výroby Pozorovateľné je definovať nejaký druh medzipamäte pre prvky, ktoré nemôže spracovať Pozorovateľ.

Môžeme to urobiť zavolaním na a vyrovnávacia pamäť () metóda:

PublishSubject source = PublishSubject.create (); source.buffer (1024) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); 

Definovanie medzipamäte s veľkosťou 1024 dá hodnotu Pozorovateľ nejaký čas dobehnúť zdroj nadmernej produkcie. Vyrovnávacia pamäť bude ukladať položky, ktoré ešte neboli spracované.

Môžeme zväčšiť veľkosť medzipamäte, aby sme mali dostatok priestoru pre vyrobené hodnoty.

Všimnite si však, že vo všeobecnosti toto môže byť iba dočasná oprava pretože k preplneniu môže stále dôjsť, ak zdroj nadprodukuje predpokladanú veľkosť vyrovnávacej pamäte.

4. Dávkové odosielanie položiek

Nadprodukované položky môžeme dávkovať v oknách N prvkov.

Kedy Pozorovateľné vyrába prvky rýchlejšie ako Pozorovateľ dokážeme ich spracovať, môžeme to zmierniť zoskupením vyrobených prvkov a odoslaním dávky prvkov do Pozorovateľ ktorý dokáže spracovať zbierku prvkov namiesto prvkov jeden po druhom:

PublishSubject source = PublishSubject.create (); source.window (500) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); 

Použitím okno () metóda s argumentom 500, povie Pozorovateľné na zoskupenie prvkov do dávok veľkosti 500. Táto technika môže znížiť problém nadprodukcie Pozorovateľné kedy Pozorovateľ je schopný rýchlejšie spracovať dávku prvkov v porovnaní so spracovaním prvkov jeden po druhom.

5. Preskakovacie prvky

Ak niektoré z hodnôt vyprodukovaných Pozorovateľné môžeme bezpečne ignorovať, môžeme použiť vzorkovanie v konkrétnom čase a škrtiace operátory.

Metódy ukážka () a throttleFirst () berú trvanie ako parameter:

  • Spoločnosť sdostatok () metóda periodicky skúma postupnosť prvkov a vydáva poslednú položku, ktorá bola vyprodukovaná v rámci doby určenej ako parameter
  • The throttleFirst () metóda emituje prvú položku, ktorá bola vyprodukovaná po dobe určenej ako parameter

Trvanie je čas, po ktorom sa jeden konkrétny prvok vyberie zo sekvencie vyrobených prvkov. Stratégiu na zvládnutie protitlaku môžeme určiť preskočením prvkov:

PublishSubject source = PublishSubject.create (); source.sample (100, TimeUnit.MILLISECONDS) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace);

Upresnili sme, že stratégia preskakovania prvkov bude a ukážka () metóda. Chceme vzorku sekvencie s trvaním 100 milisekúnd. Tento prvok bude emitovaný do Pozorovateľ.

Pamätajte však, že títo operátori znižujú iba mieru prijatia hodnoty následným podnikom Pozorovateľ a teda môžu stále viesť k MissingBackpressureException.

6. Zaobchádzanie s náplňou Pozorovateľné Nárazník

V prípade, že naše stratégie vzorkovania alebo dávkovania prvkov nepomôžu s vyplnením medzipamäte, musíme implementovať stratégiu riešenia prípadov, keď sa vyrovnávacia pamäť plní.

Musíme použiť onBackpressureBuffer () metóda prevencie BufferOverflowException.

The onBackpressureBuffer () metóda má tri argumenty: kapacita Pozorovateľné medzipamäť, metóda vyvolaná pri plnení medzipamäte a stratégia spracovania prvkov, ktoré je potrebné z medzipamäte vyradiť. Stratégie pre pretečenie sú v a Pretlak spätného tlaku trieda.

Pri vyplnení vyrovnávacej pamäte je možné vykonať 4 typy akcií:

  • ON_OVERFLOW_ERROR - toto je predvolené chovanie signalizujúce a BufferOverflowException keď je vyrovnávacia pamäť plná
  • ON_OVERFLOW_DEFAULT - v súčasnosti je to rovnaké ako ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST - ak dôjde k pretečeniu, aktuálna hodnota bude jednoducho ignorovaná a iba staré hodnoty budú dodané po prúde Pozorovateľ žiadosti
  • ON_OVERFLOW_DROP_OLDEST - vypustí najstarší prvok z medzipamäte a pridá k nemu aktuálnu hodnotu

Pozrime sa, ako určiť túto stratégiu:

Observable.range (1, 1_000_000) .onBackpressureBuffer (16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) .observeOn (Schedulers.computation ()) .subscribe (e -> {}, Throwable :: printStackTrace); 

Tu je našou stratégiou na riešenie pretečenia medzipamäte vypustenie najstaršieho prvku vo medzipamäti a pridanie najnovšej položky vyprodukovanej znakom Pozorovateľné.

Upozorňujeme, že posledné dve stratégie spôsobujú pri vypúšťaní prvkov diskontinuitu v prúde. Navyše nebudú signalizovať BufferOverflowException.

7. Vypustenie všetkých nadprodukovaných prvkov

Kedykoľvek po prúde Pozorovateľ nie je pripravený na príjem prvku, môžeme použiť onBackpressureDrop () metóda na vypustenie tohto prvku zo sekvencie.

Túto metódu si môžeme predstaviť ako onBackpressureBuffer () metóda s kapacitou vyrovnávacej pamäte nastavená na nulu so stratégiou ON_OVERFLOW_DROP_LATEST.

Tento operátor je užitočný, keď môžeme bezpečne ignorovať hodnoty zo zdroja Pozorovateľné (napríklad pohyby myši alebo aktuálne signály o polohe GPS), pretože neskôr budú k dispozícii aktuálnejšie hodnoty:

Observable.range (1, 1_000_000) .onBackpressureDrop () .observeOn (Schedulers.computation ()) .doOnNext (ComputeFunction :: compute) .subscribe (v -> {}, Throwable :: printStackTrace);

Metóda onBackpressureDrop () eliminuje problém nadprodukcie Pozorovateľné ale je potrebné ho používať opatrne.

8. Záver

V tomto článku sme sa zaoberali problémom nadprodukcie Pozorovateľné a spôsoby riešenia protitlaku. Pozerali sme sa na stratégie ukladania do vyrovnávacej pamäte, dávkovania a preskakovania prvkov, keď Pozorovateľ nie je schopný spotrebovať prvky tak rýchlo, ako ich produkuje Pozorovateľné.

Implementáciu všetkých týchto príkladov a útržkov kódu nájdete v projekte GitHub - jedná sa o projekt Maven, takže by malo byť ľahké ho importovať a spustiť tak, ako je.


$config[zx-auto] not found$config[zx-overlay] not found