Plánovače v RxJava

1. Prehľad

V tomto článku sa zameriame na rôzne typy Plánovače ktoré použijeme pri písaní multithreadingových programov založených na RxJava Pozorovateľné prihlásenie na odber a dodržiavaťOn metódy.

Plánovače dať príležitosť určiť, kde a pravdepodobne kedy sa majú vykonať úlohy spojené s prevádzkou systému Pozorovateľné reťaz.

Môžeme získať a Plánovač z továrenských metód opísaných v tejto triede Plánovače.

2. Predvolené správanie vlákna

Predvolene,Rx je jednovláknový z čoho vyplýva, že an Pozorovateľné a reťaz operátorov, ktorú na ňu môžeme použiť, upozorní svojich pozorovateľov rovnakým vláknom, na akom je prihlásiť sa na odber () metóda sa volá.

The dodržiavaťOn a prihlásiť sa na odber metódy berú ako argument a Plánovač, to, ako už názov napovedá, je nástroj, ktorý môžeme použiť na plánovanie jednotlivých akcií.

Vytvoríme našu implementáciu a Plánovač pomocou vytvoriťPracovník metóda, ktorá vráti a Plánovač. Pracovník. A pracovník prijíma akcie a vykonáva ich postupne na jednom vlákne.

Svojím spôsobom a pracovník je a Ssamotný cheduler, ale nebudeme to označovať ako a Plánovač aby nedošlo k zámene.

2.1. Plánovanie akcie

Môžeme naplánovať prácu na ľubovoľnom Plánovač vytvorením nového pracovník a plánovanie niektorých akcií:

Scheduler scheduler = Schedulers.immediate (); Pracovník Scheduler.Worker = scheduler.createWorker (); worker.schedule (() -> vysledok + = "akcia"); Assert.assertTrue (result.equals ("akcia"));

Akcia je potom zaradená do frontu vo vlákne, ku ktorému je pracovník priradený.

2.2. Zrušenie žaloby

Plánovač. Pracovník predlžuje Predplatné. Volá sa odhlásiť metóda na a pracovník bude mať za následok vyprázdnenie frontu a zrušenie všetkých čakajúcich úloh. Vidíme to na príklade:

Plánovač plánovača = Schedulers.newThread (); Pracovník Scheduler.Worker = scheduler.createWorker (); worker.schedule (() -> {result + = "First_Action"; worker.unsubscribe ();}); worker.schedule (() -> výsledok + = "Druhá_akcia"); Assert.assertTrue (result.equals ("Prvá_akcia"));

Druhá úloha sa nikdy nevykoná, pretože tá pred ňou zrušila celú operáciu. Akcie, ktoré sa práve vykonávali, budú prerušené.

3. Schedulers.newThread

Tento plánovač jednoducho spustí nové vlákno zakaždým, keď je o to požiadané prostredníctvom subscribeOn () alebo ObservOn ().

Ťažko to bude dobrá voľba, a to nielen z dôvodu latencie pri spustení vlákna, ale aj preto, že sa toto vlákno nepoužíva opakovane:

Observable.just ("Hello") .observeOn (Schedulers.newThread ()) .doOnNext (s -> result2 + = Thread.currentThread (). GetName ()) .observeOn (Schedulers.newThread ()) .subscribe (s - > result1 + = Thread.currentThread (). getName ()); Závit. Spánok (500); Assert.assertTrue (result1.equals ("RxNewThreadScheduler-1")); Assert.assertTrue (result2.equals ("RxNewThreadScheduler-2"));

Keď Pracovník je hotové, vlákno sa jednoducho ukončí. Toto Plánovač možno použiť iba vtedy, keď sú úlohy hrubozrnné: ich dokončenie trvá veľa času, ale je ich veľmi málo, takže je nepravdepodobné, že sa vlákna znovu použijú vôbec.

Plánovač plánovača = Schedulers.newThread (); Pracovník Scheduler.Worker = scheduler.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> result + = "_worker_"); result + = "_End";} ); Závit. Spánok (3000); Assert.assertTrue (result.equals ("RxNewThreadScheduler-1_Start_End_worker_"));

Keď sme naplánovali pracovník na a NewThreadScheduler, videli sme, že pracovník bol viazaný na konkrétne vlákno.

4. Plánovače. Okamžité

Plánovače. Okamžité je špeciálny plánovač, ktorý blokuje asynchrónne vyvolanie úlohy v klientskom vlákne a vráti sa po dokončení akcie:

Scheduler scheduler = Schedulers.immediate (); Pracovník Scheduler.Worker = scheduler.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> result + = "_worker_"); result + = "_End";} ); Závit. Spánok (500); Assert.assertTrue (result.equals ("main_Start_worker__End"));

V skutočnosti je prihlásenie na odber Pozorovateľné cez okamžitý plánovač má zvyčajne rovnaký účinok ako neprihlásenie na odber žiadneho konkrétneho produktu Scheduler vôbec:

Observable.just ("Hello") .subscribeOn (Schedulers.immediate ()) .subscribe (s -> výsledok + = Thread.currentThread (). GetName ()); Závit. Spánok (500); Assert.assertTrue (result.equals ("hlavný"));

5. Plánovače.trampolína

The trampolínaPlánovač je veľmi podobný okamžitá pretože tiež plánuje úlohy v rovnakom vlákne a efektívne blokuje.

Nadchádzajúca úloha sa však vykoná, keď sa dokončia všetky predtým naplánované úlohy:

Observable.just (2, 4, 6, 8) .subscribeOn (Schedulers.trampoline ()) .subscribe (i -> výsledok + = "" + i); Observable.just (1, 3, 5, 7, 9) .subscribeOn (Schedulers.trampoline ()) .subscribe (i -> výsledok + = "" + i); Závit. Spánok (500); Assert.assertTrue (result.equals ("246813579"));

Okamžitá okamžite vyvolá danú úlohu, zatiaľ čo trampolína čaká na dokončenie aktuálnej úlohy.

The trampolína‘S pracovník vykoná každú úlohu vo vlákne, ktorá naplánovala prvú úlohu. Prvá výzva na číslo harmonogram blokuje, kým sa fronta nevyprázdni:

Scheduler scheduler = Schedulers.trampoline (); Pracovník Scheduler.Worker = scheduler.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "Start"; worker.schedule (() -> {result + = "_middleStart"; worker.schedule (() -> result + = "_worker_"); result + = "_middleEnd";}); result + = "_mainEnd";}); Závit. Spánok (500); Assert.assertTrue (result .equals ("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Plánovače. Z

Plánovače sú vnútorne zložitejšie ako Exekútori od java.util.concurrent - takže bola potrebná samostatná abstrakcia.

Ale pretože sú si koncepčne dosť podobné, neprekvapivo existuje obal, ktorý sa môže otočiť Exekútor do Plánovač pomocou od továrenská metóda:

private ThreadFactory threadFactory (String pattern) {return new ThreadFactoryBuilder () .setNameFormat (pattern) .build (); } @Test public void givenExecutors_whenSchedulerFrom_thenReturnElements () hodí InterruptedException {ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched-A-% d"))); Scheduler schedulerA = Schedulers.from (poolA); ExecutorService poolB = newFixedThreadPool (10, threadFactory ("Sched-B-% d")); Scheduler schedulerB = Schedulers.from (poolB); Observable observable = Observable.create (subscriber -> {subscriber.onNext ("Alfa"); subscriber.onNext ("Beta"); subscriber.onCompleted ();}) ;; pozorovateľné .subscribeOn (schedulerA) .subscribeOn (schedulerB) .subscribe (x -> výsledok + = Thread.currentThread (). getName () + x + "_", Throwable :: printStackTrace, () -> výsledok + = "_Completed "); Thread.sleep (2000); Assert.assertTrue (result.equals ("Sched-A-0Alfa_Sched-A-0Beta__Completed")); }

Plánovač B. sa používa na krátke obdobie, ale sotva naplánuje novú akciu na plánovačA, ktorá vykonáva všetku prácu. Teda viacnásobné subscribeOn metódy sú nielen ignorované, ale predstavujú aj malú réžiu.

7. Schedulers.io

Toto Plánovač je podobný ako newThread až na to, že už spustené vlákna sa recyklujú a môžu prípadne zvládnuť budúce požiadavky.

Táto implementácia funguje obdobne ako ThreadPoolExecutor od java.util.concurrent s neobmedzenou zásobou vlákien. Zakaždým nový pracovník je požadované, je spustené nové vlákno (a neskôr je nejaký čas nečinné) alebo je nečinné znovu použité:

Observable.just ("io") .subscribeOn (Schedulers.io ()) .subscribe (i -> výsledok + = Thread.currentThread (). GetName ()); Assert.assertTrue (result.equals ("RxIoScheduler-2"));

Musíme byť opatrní pri neobmedzených zdrojoch akéhokoľvek druhu - v prípade pomalých alebo nereagujúcich externých závislostí, ako sú webové služby, ioplánovač môže spustiť obrovské množstvo vlákien, čo vedie k tomu, že naša vlastná aplikácia prestane reagovať.

V praxi po Schedulers.io je takmer vždy lepšou voľbou.

8. Výpočet plánovača

Výpočet Scheduler predvolene obmedzuje počet paralelne prebiehajúcich vlákien na hodnotu availableProcessors (), ako sa uvádza v Runtime.getRuntime () úžitková trieda.

Mali by sme teda použiť a plánovač výpočtov keď sú úlohy úplne spojené s CPU; to znamená, že vyžadujú výpočtový výkon a nemajú blokovací kód.

Používa neobmedzený front pred každým vláknom, takže ak je úloha naplánovaná, ale všetky jadrá sú obsadené, bude zaradená do frontu. Poradie tesne pred každým vláknom sa však bude neustále zväčšovať:

Observable.just ("výpočet") .subscribeOn (Schedulers.computation ()) .subscribe (i -> výsledok + = Thread.currentThread (). GetName ()); Assert.assertTrue (result.equals ("RxComputationScheduler-1"));

Ak z nejakého dôvodu potrebujeme iný počet vlákien ako predvolený, vždy môžeme použiť rx.scheduler.max-výpočtové vlákna systémový majetok.

Tým, že vezmeme menej vlákien, môžeme zaistiť, že vždy bude jedno alebo viac jadier CPU nečinných, a to aj pri vysokom zaťažení, výpočet fond vlákien server nenasýtil. Jednoducho nie je možné mať viac výpočtových vlákien ako jadier.

9. Plánovače.test

Toto Plánovač sa používa iba na testovacie účely a v produkčnom kóde sa už nikdy neuvidíme. Jeho hlavnou výhodou je schopnosť posunúť čas, ľubovoľne simulujúci plynutie času:

Zoznam písmen = Arrays.asList ("A", "B", "C"); Plánovač TestScheduler = Schedulers.test (); Predplatiteľ TestSubscriber = nový TestSubscriber (); Pozorovateľné začiarknutie = Pozorovateľný .interval (1, TimeUnit.SECONDS, plánovač); Pozorovateľné.z (písmená) .zipWith (zaškrtnutie, (reťazec, index) -> index + "-" + reťazec) .subscribeOn (plánovač) .subscribe (predplatiteľ); subscriber.assertNoValues ​​(); subscriber.assertNotCompleted (); scheduler.advanceTimeBy (1, TimeUnit.SECONDS); subscriber.assertNoErrors (); subscriber.assertValueCount (1); subscriber.assertValues ​​("0-A"); scheduler.advanceTimeTo (3, TimeUnit.SECONDS); subscriber.assertCompleted (); subscriber.assertNoErrors (); subscriber.assertValueCount (3); assertThat (subscriber.getOnNextEvents (), hasItems ("0-A", "1-B", "2-C"));

10. Predvolené plánovače

Niektoré Pozorovateľné operátori v RxJava majú alternatívne formy, ktoré nám umožňujú nastaviť ktoré Plánovač operátor použije na svoju prevádzku. Ostatné nepôsobia nijako zvlášť Plánovač alebo fungujú na konkrétnom zlyhaní Plánovač.

Napríklad meškanie operátor prijíma udalosti proti prúdu a po danom čase ich tlačí za prúd. Je zrejmé, že počas tohto obdobia nemôže držať pôvodné vlákno, takže musí používať iné vlákno Plánovač:

ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched1-")); Scheduler schedulerA = Schedulers.from (poolA); Observable.just ('A', 'B') .delay (1, TimeUnit.SECONDS, schedulerA) .subscribe (i -> result + = Thread.currentThread (). GetName () + i + ""); Thread.sleep (2000); Assert.assertTrue (result.equals ("Sched1-A Sched1-B"));

Bez dodania zvyku plánovačA, všetci operátori nižšie meškanie by použil výpočtový plánovač.

Ďalší dôležití operátori, ktorí podporujú zvyk Plánovačenárazník, interval, rozsah, časovač, preskočiť, vziať, čas vypršala niekoľko ďalších. Ak neposkytneme a Plánovač týmto operátorom, výpočet je využitý plánovač, čo je vo väčšine prípadov bezpečné predvolené nastavenie.

11. Záver

V skutočne reaktívnych aplikáciách, pre ktoré sú všetky dlhotrvajúce operácie asynchrónne, veľmi málo vlákien, a teda Plánovače sú potrebné.

Ovládanie plánovačov je nevyhnutné na písanie škálovateľného a bezpečného kódu pomocou RxJava. Rozdiel medzi prihlásiť sa na odber a dodržiavaťOn je obzvlášť dôležitá pri vysokom zaťažení, kde sa každá úloha musí vykonať presne, keď očakávame.

V neposlednom rade si musíme byť istí Plánovače použitý v následnom smere môže držať krok s lo reklamou vygenerovanou Plánovače upstrea m. Viac informácií nájdete v tomto článku o protitlaku.

Implementáciu všetkých týchto príkladov a útržkov kódu nájdete v projekte GitHub - jedná sa o projekt Maven, takže by malo byť ľahké ho importovať a spustiť tak, ako je.


$config[zx-auto] not found$config[zx-overlay] not found