Sprievodca prúdmi Akka

1. Prehľad

V tomto článku sa pozrieme na akka-potoky knižnica, ktorá je postavená na vrchole hereckého rámca Akka, ktorý dodržiava manifest reaktívnych prúdov. Rozhranie Akka Streams API nám umožňuje ľahko zostaviť toky transformácie údajov z nezávislých krokov.

Všetko spracovanie sa navyše deje reaktívnym, neblokujúcim a asynchrónnym spôsobom.

2. Maven závislosti

Na začiatok je potrebné pridať akka-prúd a akka-stream-testkit knižnice do našej pom.xml:

 com.typesafe.akka akka-stream_2.11 2.5.2 com.typesafe.akka akka-stream-testkit_2.11 2.5.2 

3. Akka Streams API

Aby sme mohli pracovať s prúdmi Akka, musíme si uvedomiť základné koncepty API:

  • Zdroj - - vstupný bod k spracovaniu v akka-prúd knižnica - inštanciu tejto triedy môžeme vytvoriť z viacerých zdrojov; môžeme napríklad použiť single () metóda, ak chceme vytvoriť a Zdroj od jedného String, alebo môžeme vytvoriť Zdroj z an Iterable prvkov
  • Prietok - hlavný stavebný blok spracovania - každý Prietok inštancia má jednu vstupnú a jednu výstupnú hodnotu
  • Materializátor - šMôžeme použiť jeden, ak chceme náš Prietok mať nejaké vedľajšie účinky, ako je protokolovanie alebo ukladanie výsledkov; najčastejšie prejdeme okolo Nepoužité alias ako a Materializátor naznačiť, že náš Prietok by nemal mať žiadne vedľajšie účinky
  • drez prevádzka - keď staviame a Prietok, nevykoná sa, kým nezaregistrujeme a drez prevádzka na ňom - ​​je to operácia terminálu, ktorá spúšťa všetky výpočty v celku Prietok

4. Tvorenie Toky v prúdoch Akka

Začnime zostavením jednoduchého príkladu, kde si ukážeme, ako na to vytvárať a kombinovať viac Prietoks - spracovať prúd celých čísel a vypočítať z okna priemerné pohyblivé okno celých párov.

Rozoberieme bodkočiarku String celých čísel ako vstup na vytvorenie našej akka-stream Zdroj pre príklad.

4.1. Pomocou a Prietok analyzovať vstup

Najskôr vytvorme a DataImporter triedy, ktorá bude mať inštanciu súboru ActorSystem ktoré neskôr použijeme na vytvorenie našej Prietok:

verejná trieda DataImporter {private ActorSystem actorSystem; // štandardné konštruktory, getre ...}

Ďalej vytvoríme a parseLine metóda, ktorá vygeneruje a Zoznam z Celé číslo z nášho vymedzeného vstupu String. Majte na pamäti, že tu používame rozhranie Java Stream API iba na analýzu:

private List parseLine (String line) {String [] fields = line.split (";"); return Arrays.stream (fields) .map (Integer :: parseInt) .collect (Collectors.toList ()); }

Naše počiatočné Prietok bude platiť parseLine na náš vstup k vytvoreniu a Prietok s typom vstupu String a typ výstupu Celé číslo:

private Flow parseContent () {návrat Flow.of (String.class) .mapConcat (this :: parseLine); }

Keď hovoríme parseLine () metóda, kompilátor vie, že argumentom pre túto funkciu lambda bude a String - rovnaký ako typ vstupu pre náš Prietok.

Upozorňujeme, že používame mapConcat () metóda - ekvivalentná s Java 8 flatMap () metóda - pretože chceme sploštiť Zoznam z Celé číslo vrátil sa parseLine () do a Prietok z Celé číslo takže ďalšie kroky v našom spracovaní sa nemusia zaoberať Zoznam.

4.2. Pomocou a Prietok vykonávať výpočty

V tomto okamihu máme svoje Prietok analyzovaných celých čísel. Teraz musíme implementujte logiku, ktorá zoskupí všetky vstupné prvky do párov a vypočíta priemer týchto párov.

Teraz budeme Vytvor Prietok z Celé číslos a zoskupiť ich pomocou zoskupené () metóda.

Ďalej chceme vypočítať priemer.

Pretože nás nezaujíma poradie, v akom budú tieto priemery spracované, môžeme mať priemery vypočítané paralelne s použitím viacerých vlákien pomocou mapAsyncUnordered () metóda, odovzdanie počtu vlákien ako argument k tejto metóde.

Akcia, ktorá bude postúpená ako lambda do Prietok treba vrátiť a CompletableFuture pretože táto akcia sa bude počítať asynchrónne v samostatnom vlákne:

private Flow computeAverage () {return Flow.of (Integer.class) .grouped (2) .mapAsyncUnordered (8, integers -> CompletableFuture.supplyAsync (() -> integers.stream () .mapToDouble (v -> v). priemer () .orElse (-1,0))); }

Vypočítavame priemery v ôsmich paralelných vláknach. Upozorňujeme, že na výpočet priemeru používame rozhranie Java 8 Stream API.

4.3. Skladanie viacerých Toky do single Prietok

The Prietok API je plynulá abstrakcia, ktorá nám umožňuje zložiť viac Prietok prípadoch na dosiahnutie nášho konečného cieľa spracovania. Môžeme mať granulárne toky, kde napríklad jeden analyzuje JSON, iný robí určitú transformáciu a iný zhromažďuje nejaké štatistiky.

Takáto granularita nám pomôže vytvoriť viac testovateľných kódov, pretože každý krok spracovania môžeme testovať nezávisle.

Vyššie sme vytvorili dva toky, ktoré môžu fungovať nezávisle na sebe. Teraz ich chceme zostaviť spoločne.

Najprv chceme analyzovať náš vstup String, a potom chceme vypočítať priemer z prúdu prvkov.

Toky môžeme skladať pomocou cez() metóda:

Tok vypočítať Priemer () {vrátiť Flow.of (String.class) .via (parseContent ()) .via (computeAverage ()); }

Vytvorili sme a Prietok majúci typ vstupu String a ďalšie dva toky po ňom. The parseContent ()Prietok berie a String vstup a vráti Celé číslo ako výstup. The výpočetný tok () to berie Celé číslo a počíta priemerný výnos Dvojitý ako typ výstupu.

5. Pridanie drez do Prietok

Ako sme už spomenuli, do tejto chvíle celok Prietok ešte nie je vykonaná, pretože je lenivá. Na začatie vykonávania Prietok musíme definovať a drez. The drez operácia môže napríklad uložiť dáta do databázy alebo poslať výsledky do nejakej externej webovej služby.

Predpokladajme, že máme AverageRepository triedy s nasledujúcim uložiť () metóda, ktorá zapisuje výsledky do našej databázy:

CompletionStage save (dvojnásobný priemer) {return CompletableFuture.supplyAsync (() -> {// write to database return average;}); }

Teraz chceme vytvoriť drez operácie, ktoré používajú túto metódu na uloženie výsledkov našej Prietok spracovanie. Aby sme vytvorili našu Drez, najskôr potrebujeme Vytvor Prietok ktorý berie výsledok nášho spracovania ako typ vstupu. Ďalej chceme všetky naše výsledky uložiť do databázy.

Opäť nám nezáleží na usporiadaní prvkov, tak to môžeme vykonať uložiť () operácie paralelne pomocou mapAsyncUnordered () metóda.

Ak chcete vytvoriť drez z Prietok musíme zavolať toMat () s Sink.ignore () ako prvý argument a Keep.right () ako druhý, pretože chceme vrátiť stav spracovania:

súkromný drez storeAages () {return Flow.of (Double.class) .mapAsyncUnordered (4, averageRepository :: save) .toMat (Sink.ignore (), Keep.right ()); }

6. Definovanie zdroja pre Prietok

Posledná vec, ktorú musíme urobiť, je urobiť Vytvor Zdroj zo vstupu String. Môžeme aplikovať a vypočítať priemer ()Prietok k tomuto zdroju pomocou cez() metóda.

Potom pridajte drez k spracovaniu, musíme zavolať runWith () metóda a odovzdať storeAders () Drez ktoré sme práve vytvorili:

CompletionStage CalcAverageForContent (obsah reťazca) {návrat Source.single (obsah) .via (CalcAverage ()) .runWith (storeAages (), ActorMaterializer.create (herecSystem)) .whenComplete ((d, e) -> {if (d! = null) {System.out.println ("Import bol dokončený");} else {e.printStackTrace ();}}); }

Všimnite si, že keď je spracovanie hotové, pridávame whenComplete () spätné volanie, pri ktorom môžeme vykonať určité kroky v závislosti od výsledku spracovania.

7. Testovanie Prúdy Akka

Naše spracovanie môžeme otestovať pomocou akka-stream-testkit.

Najlepším spôsobom, ako otestovať skutočnú logiku spracovania, je otestovať všetky Prietok logika a použitie TestSink spustiť výpočet a uplatniť výsledky.

V našom teste vytvárame Prietok ktoré chceme otestovať, a potom vytvárame a Zdroj z obsahu vstupného testu:

@Test public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults () {// daný testovaný tok = nový DataImporter (herecSystem) .calculateAverage (); Reťazcový vstup = "1; 9; 11; 0"; // keď Zdrojový tok = Zdroj.single (vstup) .via (testované); // potom flow .runWith (TestSink.probe (herecSystem), ActorMaterializer.create (herecSystem)) .request (4) .expectNextUnordered (5d, 5.5); }

Kontrolujeme, či očakávame štyri vstupné argumenty, a dva priemerné výsledky, ktoré dostaneme, môžu prísť v akomkoľvek poradí, pretože naše spracovanie sa deje asynchrónnym a paralelným spôsobom.

8. Záver

V tomto článku sme sa pozreli na akka-prúd knižnica.

Definovali sme proces, ktorý kombinuje viac Toky na výpočet kĺzavého priemeru prvkov. Potom sme definovali a Zdroj to je vstupný bod spracovania toku a drez ktoré spustí samotné spracovanie.

Nakoniec sme napísali test na naše spracovanie pomocou akka-stream-testkit.

Implementáciu všetkých týchto príkladov a útržkov kódu nájdete v projekte GitHub - jedná sa o projekt Maven, takže by malo byť ľahké ho importovať a spustiť tak, ako je.


$config[zx-auto] not found$config[zx-overlay] not found