Rozdiel medzi RxJava API a Java 9 Flow API

1. Úvod

Rozhranie Java Flow API bolo predstavené v prostredí Java 9 ako implementácia špecifikácie Reactive Stream.

V tomto tutoriáli najskôr preskúmame reaktívne prúdy. Potom sa dozvieme o jeho vzťahu k RxJava a Flow API.

2. Čo sú reaktívne toky?

Reaktívny manifest predstavil Reaktívne prúdy, aby špecifikoval štandard pre asynchrónne spracovanie prúdov s neblokujúcim spätným tlakom.

Rozsah špecifikácie reaktívneho toku je vymedziť minimálna sada rozhraní na dosiahnutie týchto cieľov:

  • org.reactivestreams.Publisher je poskytovateľ údajov, ktorý zverejňuje údaje predplatiteľom na základe ich dopytu

  • org.reactivestreams.Predplatiteľ je spotrebiteľom údajov - údaje môže prijímať po prihlásení k odberu vydavateľa

  • org.reactivestreams.Predplatné sa vytvorí, keď vydavateľ prijme predplatiteľa

  • org.reactivestreams.Procesor je predplatiteľom aj vydavateľom - predplatí si vydavateľa, spracuje údaje a potom predá spracované údaje predplatiteľovi

Flow API pochádza zo špecifikácie. RxJava to predchádza, ale od verzie 2.0 podporuje aj RxJava.

Pôjdeme hlboko do oboch, ale najskôr sa pozrime na praktický prípad použitia.

3. Prípad použitia

V tomto tutoriále použijeme ako príklad použitia video službu živého vysielania.

Video naživo, na rozdiel od videa na požiadanie, nezávisí od spotrebiteľa. Server preto publikuje stream vlastným tempom a je zodpovednosťou spotrebiteľa sa prispôsobiť.

V najjednoduchšej forme náš model pozostáva z vydavateľa videostreamu a videoprehrávača ako predplatiteľa.

Poďme realizovať VideoFrame ako naša údajová položka:

verejná trieda VideoFrame {súkromné ​​dlhé číslo; // ďalšie dátové polia // konštruktor, getri, nastavovatelia}

Potom si postupne poďme prejsť naše implementácie Flow API a RxJava.

4. Implementácia pomocou Flow API

Flow API v JDK 9 zodpovedajú špecifikácii reaktívnych tokov. Ak aplikácia Flow API vyžaduje na začiatku N položiek, potom vydavateľ postúpi predplatiteľovi najviac N položiek.

Rozhrania Flow API sú všetky v java.util.concurrent.Flow rozhranie. Sú sémanticky ekvivalentné s ich príslušnými kolegami z Reaktívnych prúdov.

Poďme realizovať VideoStreamServer ako vydavateľ VideoFrame.

verejná trieda VideoStreamServer rozširuje SubmissionPublisher {public VideoStreamServer () {super (Executors.newSingleThreadExecutor (), 5); }}

Rozšírili sme našu VideoStreamServer od SubmissionPublisher namiesto priamej implementácie Tok :: Vydavateľ. SubmissionPublisher je implementácia JDK Tok :: Vydavateľ pre asynchrónnu komunikáciu s predplatiteľmi, takže umožňuje našim VideoStreamServer emitovať vlastným tempom.

Je to tiež užitočné pri spätnom tlaku a vyrovnávacej pamäti, pretože keď SubmissionPublisher :: prihlásiť sa na odber zavolal, vytvorí inštanciu Predplatné do vyrovnávacej pamätea potom pridá nové predplatné do svojho reťazca predplatných. BufferedSubscription môže ukladať vydané položky do vyrovnávacej pamäte SubmissionPublisher # maxBufferCapacity.

Teraz poďme definovať Video prehrávač, ktorý spotrebováva prúd VideoFrame. Preto sa musí implementovať Tok :: Predplatiteľ.

verejná trieda VideoPlayer implementuje Flow.Subscriber {Flow.Subscription predplatné = null; @Override public void onSubscribe (predplatné Flow.Subscription) {this.subscription = predplatné; predplatné.požiadavka (1); } @Override public void onNext (položka VideoFrame) {log.info ("play # {}", item.getNumber ()); predplatné.požiadavka (1); } @Override public void onError (Throwable throwable) {log.error ("Pri streamovaní videa sa vyskytla chyba: {}", throwable.getMessage ()); } @Override public void onComplete () {log.error ("Video skončilo"); }}

Video prehrávač odoberá VideoStreamServer, potom po úspešnom predplatnom Video prehrávač::onSubscribe metóda sa volá a požaduje jeden rámec. Video prehrávač:: onNext prijme rámec a žiadosti o nový. Počet požadovaných rámcov závisí od prípadu použitia a Predplatiteľ implementácie.

Na záver si dáme veci dokopy:

VideoStreamServer streamServer = nový VideoStreamServer (); streamServer.subscribe (nový VideoPlayer ()); // odosielanie videozáznamov ScheduledExecutorService executor = Executors.newScheduledThreadPool (1); AtomicLong frameNumber = nový AtomicLong (); executor.scheduleWithFixedDelay (() -> {streamServer.offer (new VideoFrame (frameNumber.getAndIncrement ()), (subscriber, videoFrame) -> {subscriber.onError (new RuntimeException ("Frame #" + videoFrame.getNumber () + " spadol kvôli protitlaku ")); návrat true;});}, 0, 1, TimeUnit.MILLISECONDS); spánok (1 000);

5. Implementácia pomocou RxJava

RxJava je implementácia ReactiveX v prostredí Java. Cieľom projektu ReactiveX (alebo Reactive Extensions) je poskytnúť koncept reaktívneho programovania. Je to kombinácia vzoru Observer, vzoru Iterator a funkčného programovania.

Posledná hlavná verzia pre RxJava je 3.x. RxJava podporuje Reactive Streams od verzie 2.x s jeho Tekutý base class, ale je to významnejšia množina ako Reactive Streams s niekoľkými základnými triedami ako Tekutý, Pozorovateľné, Slobodný, Dokončiteľné.

Tekutý ako zložka súladu s reaktívnym prúdom je tok 0 až N položiek so spracovaním protitlaku. Tekutý predlžuje Vydavateľ z reaktívnych prúdov. Preto mnoho operátorov RxJava prijíma Vydavateľ priamo a umožňujú priamu spoluprácu s inými implementáciami Reactive Streams.

Teraz urobme náš generátor videostreamu, ktorý je nekonečným lenivým streamom:

Stream videoStream = Stream.iterate (nový VideoFrame (0), videoFrame -> {// spánok po dobu 1ms; návrat nového VideoFrame (videoFrame.getNumber () + 1);}));

Potom definujeme a Tekutý inštancia na generovanie rámcov na samostatnom vlákne:

Tekuté .fromStream (videoStream) .subscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ()))

Je dôležité poznamenať, že nám stačí nekonečný prúd, ale ak potrebujeme flexibilnejší spôsob generovania nášho prúdu, potom Flowable.create je dobrá voľba.

Flowable .create (new FlowableOnSubscribe () {AtomicLong frame = new AtomicLong (); @Override public void subscribe (@NonNull FlowableEmitter emitter) {while (true) {emitter.onNext (new VideoFrame (frame.incrementAndGet ())); / / spánok po dobu 1 ms, aby sa simulovalo oneskorenie}}}, / * Tu nastavte stratégiu protitlaku * /)

V ďalšom kroku sa program VideoPlayer prihlási na odber tohto programu Flowable a sleduje položky v samostatnom vlákne.

videoFlowable .observeOn (Schedulers.from (Executors.newSingleThreadExecutor ())) .subscribe (item -> {log.info ("play #" + item.getNumber ()); // spánok po dobu 30 ms na simulovanie zobrazenia rámca}) ;

A nakoniec nakonfigurujeme stratégiu pre protitlak. Ak chceme zastaviť video v prípade straty rámca, musíme ho použiť BackpressureOverflowStrategy :: CHYBA keď je vyrovnávacia pamäť plná.

Flowable .fromStream (videoStream) .subscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ())) .onBackpressureBuffer (5, null, BackpressureOverflowStrategy.ERROR) .observeOn (Schedulers.from (Executors.nes) > {log.info ("play #" + item.getNumber ()); // spánok po dobu 30 ms, aby sa simulovalo zobrazenie rámca});

6. Porovnanie RxJava a Flow API

Aj v týchto dvoch jednoduchých implementáciách môžeme vidieť, ako je rozhranie API RxJava bohaté, najmä pokiaľ ide o správu vyrovnávacej pamäte, spracovanie chýb a stratégiu spätného tlaku. Dáva nám viac možností a menej riadkov kódu pomocou svojho plynulého rozhrania API. Teraz zvážime komplikovanejšie prípady.

Predpokladajme, že náš prehrávač nedokáže zobraziť videozáznamy bez kodeku. Preto s Flow API musíme implementovať a procesor simulovať kodek a sedieť medzi serverom a hráčom. S RxJava to dokážeme Tekuté :: flatMap alebo Tekuté :: mapa.

Alebo si predstavme, že náš prehrávač bude vysielať aj zvuk v živom preklade, takže musíme kombinovať prúdy videa a zvuku od samostatných vydavateľov. S RxJava môžeme použiť Flowable :: combineLatest, ale s Flow API to nie je ľahká úloha.

Aj keď je možné napísať zvyk procesor ktorá sa prihlási na odber oboch streamov a odošle spojené dáta do nášho Video prehrávač. Implementácia je však bolesťou hlavy.

7. Prečo Flow API?

V tejto chvíli si môžeme položiť otázku, aká filozofia stojí za API Flow?

Ak hľadáme v JDK použitie Flow API, niečo nájdeme v java.net.http a jdk.internal.net.http.

Ďalej nájdeme adaptéry v projekte reaktora alebo balíku reaktívnych prúdov. Napríklad, org.reactivestreams.FlowAdapters má metódy na prevod rozhraní Flow API na reaktívny prúd a naopak. Preto pomáha interoperabilite medzi Flow API a knižnicami s podporou reaktívneho toku.

Všetky tieto skutočnosti nám pomáhajú pochopiť účel Flow API: Bolo vytvorené ako skupina reaktívnych špecifikovaných rozhraní v JDK bez prenosu na tretie strany. Okrem toho Java očakáva, že Flow API bude akceptované ako štandardné rozhranie pre reaktívnu špecifikáciu a bude použité v JDK alebo iných knižniciach založených na Jave, ktoré implementujú reaktívnu špecifikáciu pre middleware a utility.

8. Závery

V tomto výučbe máme úvod do špecifikácií reaktívneho toku, rozhrania Flow API a RxJava.

Ďalej sme videli praktický príklad implementácií Flow API a RxJava pre živý videostream.

Ale všetky aspekty Flow API a RxJava sa páčia Tok :: Procesor, Tekuté :: mapa a Tekuté :: flatMap alebo protitlakové stratégie tu nie sú zahrnuté.

Ako vždy nájdete kompletný kód tutoriálu na GitHub.


$config[zx-auto] not found$config[zx-overlay] not found