Úvod do Apache Flink s Java

1. Prehľad

Apache Flink je rámec pre spracovanie veľkých dát, ktorý umožňuje programátorom veľmi efektívne a škálovateľne spracovať obrovské množstvo dát.

V tomto článku si predstavíme niektoré z nich základné koncepty API a štandardné transformácie údajov dostupné v Apache Flink Java API. Plynulý štýl tohto API umožňuje ľahkú prácu s centrálnym konštruktom Flink - distribuovanou kolekciou.

Najskôr sa pozrieme na Flink Množina údajov Transformácie API a použiť ich na implementáciu programu počítania slov. Potom sa krátko pozrieme na Flink DataStream API, ktoré umožňuje spracovávať prúdy udalostí v reálnom čase.

2. Závislosť od Maven

Na začiatok budeme musieť pridať závislosti Maven flink-java a flink-test-utils knižnice:

 org.apache.flink flink-java 1.2.0 org.apache.flink flink-test-utils_2.10 1.2.0 test 

3. Základné koncepty API

Pri práci s Flink musíme vedieť niekoľko vecí týkajúcich sa jeho API:

  • Každý program Flink vykonáva transformácie na distribuovaných zbierkach údajov. K dispozícii sú rôzne funkcie na transformáciu údajov, vrátane filtrovania, mapovania, pripájania, zoskupovania a agregácie
  • A drez operácia vo Flinku spustí vykonanie toku za účelom dosiahnutia požadovaného výsledku programu, ako je napríklad uloženie výsledku do súborového systému alebo jeho vytlačenie na štandardný výstup
  • Flink transformácie sú lenivé, čo znamená, že sa nevykonajú, kým a drez operácia je vyvolaná
  • Apache Flink API podporuje dva režimy prevádzky - dávkový a v reálnom čase. Ak pracujete s obmedzeným zdrojom údajov, ktorý je možné spracovať v dávkovom režime, použijete Množina údajov API. Ak chcete spracovať neobmedzené toky údajov v reálnom čase, budete musieť použiť DataStream API

4. Transformácie rozhrania DataSet API

Vstupným bodom do programu Flink je inštancia ExecutionEnvironment trieda - definuje kontext, v ktorom sa program vykonáva.

Vytvorme ExecutionEnvironment na spustenie nášho spracovania:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment ();

Upozorňujeme, že keď aplikáciu spustíte na lokálnom počítači, vykoná spracovanie na lokálnom JVM. Ak chcete spustiť spracovanie na klastri strojov, budete si musieť na tieto stroje nainštalovať Apache Flink a nakonfigurovať ExecutionEnvironment podľa toho.

4.1. Vytvorenie množiny údajov

Aby sme mohli začať vykonávať transformácie údajov, musíme dodať nášmu programu údaje.

Vytvorme inštanciu súboru Množina údajov triedy pomocou nášho ExecutionEnvironmentement:

Množiny DataSet = env.fromElements (1, 29, 40, 50);

Môžete vytvoriť Množina údajov z viacerých zdrojov, ako napríklad Apache Kafka, súbor CSV, súbor alebo prakticky akýkoľvek iný zdroj údajov.

4.2. Filtrujte a redukujte

Po vytvorení inštancie súboru Množina údajov triedy, môžete na ňu použiť transformácie.

Povedzme, že chcete filtrovať čísla, ktoré sú nad určitou hranicou, a potom ich všetky sčítať. Môžete použiť filter () a znížiť () transformácie na dosiahnutie tohto cieľa:

medzná hodnota int = 30; Zoznam zhromaždiť = sumy .filtre (a -> a> prahová hodnota) .reduce ((integer, t1) -> integer + t1) .collect (); assertThat (collect.get (0)). isEqualTo (90); 

Všimnite si, že zbierať () metóda je a drez operácia, ktorá spustí skutočné transformácie údajov.

4.3. Mapa

Povedzme, že máte Množina údajov z Osoba objekty:

private static class Osoba {private int age; súkromné ​​meno reťazca; // štandardné konštruktory / getre / setre}

Ďalej vytvoríme a Množina údajov týchto objektov:

DataSet personDataSource = env.fromCollection (Arrays.asList (nová osoba (23, "Tom"), nová osoba (75, "Michael"))));

Predpokladajme, že chcete extrahovať iba súbor Vek pole z každého objektu zbierky. Môžete použiť mapa () transformácia získať iba konkrétne pole Osoba trieda:

Zoznam vekových skupín = personDataSource .map (p -> p.age) .collect (); assertThat (vek) .hasSize (2); tvrdiťTo (vek) .obsahuje (23, 75);

4.4. Pripojte sa

Ak máte dva súbory údajov, môžete sa k nim pripojiť id lúka. Na tento účel môžete použiť pripojiť sa () transformácia.

Vytvorme zbierky transakcií a adries používateľa:

Adresa Tuple3 = nová Tuple3 (1, "5th Avenue", "London"); Množina údajov adresy = env.fromElements (adresa); Tuple2 firstTransaction = nový Tuple2 (1, „Transaction_1“); Množina údajov transakcie = env.fromElements (firstTransaction, new Tuple2 (12, "Transaction_2")); 

Prvé pole v oboch n-ticiach je Celé číslo typu, a toto je id pole, na ktorom chceme spojiť obidve množiny údajov.

Ak chcete vykonať skutočnú logiku spájania, musíme implementovať a KeySelector rozhranie pre adresu a transakciu:

súkromná statická trieda IdKeySelectorTransaction implementuje KeySelector {@Override public Integer getKey (hodnota Tuple2) {návratová hodnota.f0; }} súkromná statická trieda IdKeySelectorAddress implementuje KeySelector {@Override public Integer getKey (hodnota Tuple3) {návratová hodnota.f0; }}

Každý selektor vracia iba pole, v ktorom by sa malo vykonať spojenie.

Bohužiaľ tu nie je možné použiť výrazy lambda, pretože Flink potrebuje informácie o všeobecnom type.

Ďalej implementujme logiku zlučovania pomocou týchto selektorov:

Zoznam<>> join = transakcie.join (adresy). kde (nový IdKeySelectorTransaction ()) .equalTo (nový IdKeySelectorAddress ()) .collect (); assertThat (spojené) .hasSize (1); assertThat (spojené). obsahuje (nový Tuple2 (firstTransaction, adresa)); 

4.5. Triediť

Povedzme, že máte nasledujúcu zbierku súborov Tuple2:

Tuple2 secondPerson = nový Tuple2 (4, "Tom"); Tuple2 thirdPerson = nový Tuple2 (5, "Scott"); Tuple2 quarterPerson = nový Tuple2 (200, "Michael"); Tuple2 firstPerson = nový Tuple2 (1, "Jack"); Množina údajov transakcie = env.fromElements (quarterPerson, secondPerson, thirdPerson, firstPerson); 

Ak chcete túto kolekciu zoradiť podľa prvého poľa n-tice, môžete použiť sortPartitions () transformácia:

Zoznam triedené = transakcie .sortPartition (new IdKeySelectorTransaction (), Order.ASCENDING) .collect (); assertThat (zoradené). obsahuje Presne (firstPerson, secondPerson, tretíPerson, štvrtýPerson);

5. Počet slov

Problém počtu slov je problém, ktorý sa bežne používa na prezentáciu schopností rámcov spracovania veľkých dát. Základné riešenie spočíva v počítaní výskytov slov pri zadávaní textu. Použime Flink na implementáciu riešenia tohto problému.

Ako prvý krok v našom riešení vytvoríme a LineSplitter trieda, ktorá rozdeľuje náš vstup na tokeny (slová), pričom pre každý token zhromažďuje a Tuple2 párov kľúč - hodnota. V každej z týchto n-tic je kľúčom slovo nájdené v texte a hodnotou je celé číslo (1).

Táto trieda implementuje FlatMapFunction rozhranie, ktoré zaberá String ako vstup a produkuje a Tuple2:

verejná trieda LineSplitter implementuje FlatMapFunction {@Override public void flatMap (hodnota reťazca, Collector out) {Stream.of (value.toLowerCase (). split ("\ W +")) .filter (t -> t.length ()> 0). forEach (token -> out.collect (nový Tuple2 (token) , 1))); }}

Voláme zbierať () metóda na Zberateľ triedy na posunutie údajov vpred v potrubí spracovania.

Naším ďalším a posledným krokom je zoskupiť n-tice podľa ich prvých prvkov (slov) a potom vykonať a súčet agregovať na druhých prvkoch a vytvoriť počet výskytov slova:

verejný statický súbor údajov startWordCount (ExecutionEnvironment env, zoznam riadkov) vyvolá výnimku {DataSet text = env.fromCollection (riadky); vrátiť text.flatMap (nový LineSplitter ()) .groupBy (0) .agregát (Aggregations.SUM, 1); }

Používame tri typy Flinkových transformácií: flatMap (), groupBy ()a agregát ().

Poďme napísať test, ktorý tvrdí, že implementácia počtu slov funguje podľa očakávaní:

List lines = Arrays.asList ("Toto je prvá veta", "Toto je druhá veta s jedným slovom"); Množina údajov result = WordCount.startWordCount (env, riadky); Zoznam collect = result.collect (); assertThat (collect) .containsExactlyInAnyOrder (nový Tuple2 ("a", 3), nový Tuple2 ("veta", 2), nový Tuple2 ("slovo", 1), nový Tuple2 ("je", 2), nový Tuple2 ( „this“, 2), new Tuple2 ("second", 1), new Tuple2 ("first", 1), new Tuple2 ("with", 1), new Tuple2 ("one", 1));

6. Rozhranie DataStream API

6.1. Vytvorenie DataStream

Apache Flink podporuje spracovanie streamov udalostí aj prostredníctvom svojho rozhrania DataStream API. Ak chceme začať konzumovať udalosti, musíme najskôr použiť StreamExecutionEnvironment trieda:

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment ();

Ďalej môžeme vytvoriť prúd udalostí pomocou prevedenieŽivotné prostredie z rôznych zdrojov. Môže to byť nejaký autobus so správami Apache Kafka, ale v tomto príklade jednoducho vytvoríme zdroj z niekoľkých prvkov reťazca:

DataStream dataStream = executionEnvironment.fromElements ("Toto je prvá veta", "Toto je druhá veta s jedným slovom");

Transformácie môžeme použiť na každý prvok prvku DataStream ako v normále Množina údajov trieda:

SingleOutputStreamOperator upperCase = text.map (String :: toUpperCase);

Na spustenie vykonania je potrebné vyvolať operáciu umývania, ako je napr print () ktorý iba vytlačí výsledok transformácií na štandardný výstup, nasledujúci s vykonať () metóda na StreamExecutionEnvironment trieda:

upperCase.print (); env.execute ();

Vyprodukuje nasledujúci výstup:

1> TOTO JE PRVÁ VETA 2> TOTO JE DRUHÁ VETA S JEDNÝM SLOVOM

6.2. Prezentácia udalostí

Pri spracovávaní toku udalostí v reálnom čase môže byť niekedy potrebné zoskupiť udalosti a použiť nejaký výpočet v okne týchto udalostí.

Predpokladajme, že máme prúd udalostí, kde každá udalosť predstavuje dvojicu pozostávajúcu z čísla udalosti a časovej pečiatky, keď bola udalosť odoslaná do nášho systému, a že môžeme tolerovať udalosti, ktoré sú mimo poradia, ale iba ak nie sú mešká viac ako dvadsať sekúnd.

V tomto príklade najskôr vytvoríme prúd simulujúci dve udalosti, ktoré sú od seba vzdialené niekoľko minút, a definujeme extraktor časovej pečiatky, ktorý určuje náš prah oneskorenia:

SingleOutputStreamOperator windowed = env.fromElements (nový Tuple2 (16, ZonedDateTime.now (). plusMinutes (25) .toInstant (). getEpochSecond ()), nový Tuple2 (15, ZonedDateTime.now (). plusMinutes (2) .toInstant () .getEpochSecond ())) .assignTimestampsAndWatermarks (nový BoundedOutOfOrdernessTimestampExtractor (Time.seconds (20)) {@Override public long extractTimestamp (Tuple2 element) {return element.f1 * 1000; }});

Ďalej definujeme operáciu okna na zoskupenie našich udalostí do päťsekundových okien a na tieto udalosti použijeme transformáciu:

SingleOutputStreamOperator znížené = okenné .windowAll (TumblingEventTimeWindows.of (Time.seconds (5))) .maxBy (0, true); zmenšený.tlač ();

Dostane posledný prvok z každého päťsekundového okna, takže sa vytlačí:

1> (15,1491221519)

Upozorňujeme, že druhú udalosť nevidíme, pretože dorazila neskôr ako zadaný prah oneskorenia.

7. Záver

V tomto článku sme predstavili rámec Apache Flink a pozreli sa na niektoré transformácie dodávané s jeho API.

Implementovali sme program počítania slov pomocou plynulého a funkčného rozhrania DataSet API spoločnosti Flink. Potom sme sa pozreli na API DataStream a implementovali sme jednoduchú transformáciu v reálnom čase na prúd udalostí.

Implementáciu všetkých týchto príkladov a útržkov kódu nájdete na GitHub - jedná sa o projekt Maven, takže by malo byť ľahké ho importovať a spustiť tak, ako je.


$config[zx-auto] not found$config[zx-overlay] not found