Kombinovanie pozorovateľných súborov v RxJava

1. Úvod

V tomto rýchlom návode si ukážeme rôzne spôsoby kombinovania Pozorovateľné v RxJava.

Ak ste v RxJave nováčikom, určite si najskôr vyskúšajte tento úvodný návod.

Poďme hneď na to.

2. Pozorovateľné

Pozorovateľné sekvencie alebo jednoducho Pozorovateľné, sú reprezentácie asynchrónnych dátových tokov.

Vychádzajú zo vzoru pozorovateľa, v ktorom sa objekt nazýva Pozorovateľ, Prihlasuje sa na odber položiek emitovaných spoločnosťou Pozorovateľné.

Predplatné neblokuje ako Pozorovateľ stojí za reakciu na čokoľvek Pozorovateľné bude emitovať v budúcnosti. To zase uľahčuje súbežnosť.

Tu je jednoduchá ukážka v RxJava:

Pozorovateľné .from (nový reťazec [] {"John", "Doe"}) .subscribe (name -> System.out.println ("Hello" + meno))

3. Kombinácia pozorovateľných

Pri programovaní pomocou reaktívneho rámca je bežné kombinovať rôzne Pozorovateľné.

Napríklad vo webovej aplikácii možno budeme musieť získať dve sady asynchrónnych dátových tokov, ktoré sú navzájom nezávislé.

Namiesto čakania na dokončenie predchádzajúceho streamu pred vyžiadaním ďalšieho streamu môžeme zavolať obidve naraz a prihlásiť sa na odber kombinovaných streamov.

V tejto časti si rozoberieme niektoré z rôznych spôsobov, ako môžeme kombinovať viac Pozorovateľné v RxJava a rôzne prípady použitia, na ktoré sa každá metóda vzťahuje.

3.1. Zlúčiť

Môžeme použiť zlúčiť operátor kombinovať výstup z viacerých Pozorovateľné aby sa správali ako jeden:

@Test public void givenTwoObservables_whenMerged_shouldEmitCombinedResults () {TestSubscriber testSubscriber = nový TestSubscriber (); Observable.merge (Observable.from (new String [] {"Hello", "World"}), Observable.from (new String [] {"I love", "RxJava"})) .subscribe (testSubscriber); testSubscriber.assertValues ​​("Hello", "World", "I love", "RxJava"); }

3.2. MergeDelayError

The mergeDelayError metóda je rovnaká ako zlúčiť v tom, že kombinuje viac Pozorovateľné do jedného, ​​ale ak sa počas zlúčenia vyskytnú chyby, umožní to bezchybným položkám pokračovať pred rozšírením chýb:

@Test public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError () {TestSubscriber testSubscriber = nový TestSubscriber (); Observable.mergeDelayError (Observable.from (new String [] {"hello", "world"}), Observable.error (new RuntimeException ("Some exception")), Observable.from (new String [] {"rxjava"} )) .prihlásiť (testSubscriber); testSubscriber.assertValues ​​("ahoj", "svet", "rxjava"); testSubscriber.assertError (RuntimeException.class); }

Vyššie uvedený príklad vydáva všetky bezchybné hodnoty:

ahoj svet rxjava

Upozorňujeme, že ak použijeme zlúčiť namiesto mergeDelayError, Stringrxjava ” nebudú emitované, pretože zlúčiť okamžite zastaví tok údajov z Pozorovateľné keď sa vyskytne chyba.

3.3. PSČ

The PSČ metóda rozšírenia združuje dve postupnosti hodnôt ako páry:

@Test public void givenTwoObservables_whenZipped_thenReturnCombinedResults () {List zippedStrings = new ArrayList (); Observable.zip (Observable.from (new String [] {"Simple", "Moderate", "Complex"}), Observable.from (new String [] {"Solutions", "Success", "Hierarchy"}), (str1, str2) -> str1 + "" + str2) .subscribe (zippedStrings :: add); assertThat (zippedStrings) .isNotEmpty (); assertThat (zippedStrings.size ()). isEqualTo (3); assertThat (zippedStrings) .contains ("Jednoduché riešenia", "Mierny úspech", "Komplexná hierarchia"); }

3.4. PSČ s intervalom

V tomto príklade zazipujeme stream pomocou interval čo v skutočnosti oneskorí emisiu prvkov prvého prúdu:

@Test public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission () {TestSubscriber testSubscriber = nový TestSubscriber (); Pozorovateľné údaje = Observable.just ("jeden", "dva", "tri", "štyri", "päť"); Pozorovateľný interval = Pozorovateľný.interval (1L, TimeUnit.SECONDS); Pozorovateľný .zip (dáta, interval, (strData, tick) -> String.format ("[% d] =% s", tick, strData)) .toBlocking (). Subscribe (testSubscriber); testSubscriber.assertCompleted (); testSubscriber.assertValueCount (5); testSubscriber.assertValues ​​("[0] = jeden", "[1] = dva", "[2] = tri", "[3] = štyri", "[4] = päť"); }

4. Zhrnutie

V tomto článku sme videli niekoľko metód kombinovania Pozorovateľné s RxJava. Môžete sa dozvedieť o ďalších metódach, ako je kombinovaťNajnovšie, pripojiť sa, groupJoin, switchOnNext, v oficiálnej dokumentácii RxJava.

Zdrojový kód tohto článku je ako vždy k dispozícii v našom repozitári GitHub.


$config[zx-auto] not found$config[zx-overlay] not found