Úvod do aplikácie Hazelcast Jet

1. Úvod

V tomto výučbe sa dozvieme niečo o Hazelcast Jet. Je to distribuovaný procesor na spracovanie údajov poskytovaný spoločnosťou Hazelcast, Inc. a je postavený na vrchole Hazelcast IMDG.

Ak sa chcete dozvedieť viac o Hazelcast IMDG, tu je článok pre začiatok.

2. Čo je Hazelcast Jet?

Hazelcast Jet je distribuovaný nástroj na spracovanie údajov, ktorý s údajmi zaobchádza ako s prúdmi. Môže spracovávať údaje uložené v databáze alebo súboroch, ako aj údaje streamované serverom Kafka.

Okrem toho môže vykonávať agregačné funkcie nad nekonečnými dátovými tokmi rozdelením tokov na podmnožiny a aplikáciou agregácie na každú podmnožinu. Tento koncept je v terminológii Jet známy ako windowing.

Môžeme nasadiť Jet do klastra strojov a potom mu odovzdať naše úlohy spracovania údajov. Jet umožní všetkým členom klastra automaticky spracovávať údaje. Každý člen klastra spotrebúva časť údajov, čo uľahčuje škálovanie na ľubovoľnú úroveň priepustnosti.

Tu sú typické prípady použitia pre Hazelcast Jet:

  • Spracovanie toku v reálnom čase
  • Rýchle dávkové spracovanie
  • Spracovanie streamov Java 8 distribuovaným spôsobom
  • Spracovanie údajov v mikroslužbách

3. Inštalácia

Ak chcete nastaviť Hazelcast Jet v našom prostredí, stačí do nášho pridať jednu závislosť Maven pom.xml.

Robíme to takto:

 com.hazelcast.jet hazelcast-jet 4.2 

Zahrnutím tejto závislosti sa stiahne 10 MB súbor jar, ktorý nám poskytne všetku infraštruktúru, ktorú potrebujeme na vybudovanie distribuovaného potrubia na spracovanie údajov.

Najnovšiu verziu aplikácie Hazelcast Jet nájdete tu.

4. Ukážka aplikácie

Ak sa chceme o Hazelcast Jet dozvedieť viac, vytvoríme ukážkovú aplikáciu, ktorá prevezme zadanie viet a slova, ktoré sa v týchto vetách nájdu, a vráti počet zadaných slov v týchto vetách.

4.1. Potrubie

Pipeline tvorí základný konštrukt pre aplikáciu Jet. Spracovanie v potrubí sa riadi týmito krokmi:

  • načítať údaje zo zdroja
  • transformovať údaje
  • zapisovat data do umyvadla

Pre našu aplikáciu bude kanál čítať z distribuovaného Zoznam, použite transformáciu zoskupenia a agregácie a nakoniec zapíšte do distribuovaného Mapa.

Náš postup napíšeme takto:

private Pipeline createPipeLine () {Pipeline p = Pipeline.create (); p.readFrom (Sources.list (LIST_NAME)) .flatMap (word -> traverseArray (word.toLowerCase (). split ("\ W +"))) .filter (word ->! word.isEmpty ()) .groupingKey (wholeItem ()) .aggregate (countting ()) .writeTo (Sinks.map (MAP_NAME)); návrat p; }

Keď sme čítali zo zdroja, prechádzame údajmi a pomocou regulárneho výrazu ich rozdelíme okolo priestoru. Po tom odfiltrujeme prázdne miesta.

Nakoniec slová zoskupíme, spojíme a výsledky zapíšeme do a Mapa.

4.2. Práca

Teraz, keď je náš kanál definovaný, vytvoríme úlohu na vykonanie potrubia.

Takto píšeme a countWord funkcia, ktorá akceptuje parametre a vráti počet:

public Long countWord (Zoznam viet, Reťazcové slovo) {long count = 0; JetInstance jet = Jet.newJetInstance (); skúsiť {List textList = jet.getList (LIST_NAME); textList.addAll (vety); Potrubie p = createPipeLine (); jet.newJob (p) .join (); Počty máp = jet.getMap (MAP_NAME); count = countts.get (slovo); } nakoniec {Jet.shutdownAll (); } počet vrátení; }

Najskôr vytvoríme inštanciu Jet, aby sme vytvorili našu úlohu a použili kanál. Ďalej skopírujeme vstup Zoznam do distribuovaného zoznamu, aby bol k dispozícii vo všetkých inštanciách.

Potom zadáme úlohu pomocou ropovodu, ktorý sme postavili vyššie. Metóda Nová práca() vráti spustiteľnú úlohu, ktorú spustí Jet asynchrónne. The pripojiť sa metóda čaká na dokončenie úlohy a hodí výnimkou ak je úloha dokončená s chybou.

Po dokončení úlohy sa výsledky načítajú distribuovane Mapa, ako sme definovali v našom potrubí. Takže dostaneme Mapa z inštancie Jet a získajte proti nej počty slova.

Nakoniec sme ukončili inštanciu Jet. Je dôležité ho vypnúť po skončení našej exekúcie, napr Inštancia Jet spúšťa svoje vlastné vlákna. V opačnom prípade bude náš proces Java stále živý aj po ukončení našej metódy.

Tu je test jednotky, ktorý testuje kód, ktorý sme napísali pre Jet:

@Test public void whenGivenSentencesAndWord_ThenReturnCountOfWord () {Zoznam viet = nový ArrayList (); vety.add ("Prvá sekunda bola v poriadku, ale druhá sekunda bola tvrdá."); WordCounter wordCounter = nový WordCounter (); long countSecond = wordCounter.countWord (vety, "druhé"); assertEquals (3, countSecond); }

5. Záver

V tomto článku sme sa dozvedeli niečo o Hazelcast Jet. Ak sa chcete dozvedieť viac informácií o ňom a jeho funkciách, prečítajte si príručku.

Ako obvykle, kód príkladov použitých v tomto článku nájdete na Githube.


$config[zx-auto] not found$config[zx-overlay] not found