Úvod do Apache Flink s Java
1. Prehľad
Apache Flink je rámec pre spracovanie veľkých dát, ktorý umožňuje programátorom veľmi efektívne a škálovateľne spracovať obrovské množstvo dát.
V tomto článku si predstavíme niektoré z nich základné koncepty API a štandardné transformácie údajov dostupné v Apache Flink Java API. Plynulý štýl tohto API umožňuje ľahkú prácu s centrálnym konštruktom Flink - distribuovanou kolekciou.
Najskôr sa pozrieme na Flink Množina údajov Transformácie API a použiť ich na implementáciu programu počítania slov. Potom sa krátko pozrieme na Flink DataStream API, ktoré umožňuje spracovávať prúdy udalostí v reálnom čase.
2. Závislosť od Maven
Na začiatok budeme musieť pridať závislosti Maven flink-java a flink-test-utils knižnice:
org.apache.flink flink-java 1.2.0 org.apache.flink flink-test-utils_2.10 1.2.0 test
3. Základné koncepty API
Pri práci s Flink musíme vedieť niekoľko vecí týkajúcich sa jeho API:
- Každý program Flink vykonáva transformácie na distribuovaných zbierkach údajov. K dispozícii sú rôzne funkcie na transformáciu údajov, vrátane filtrovania, mapovania, pripájania, zoskupovania a agregácie
- A drez operácia vo Flinku spustí vykonanie toku za účelom dosiahnutia požadovaného výsledku programu, ako je napríklad uloženie výsledku do súborového systému alebo jeho vytlačenie na štandardný výstup
- Flink transformácie sú lenivé, čo znamená, že sa nevykonajú, kým a drez operácia je vyvolaná
- Apache Flink API podporuje dva režimy prevádzky - dávkový a v reálnom čase. Ak pracujete s obmedzeným zdrojom údajov, ktorý je možné spracovať v dávkovom režime, použijete Množina údajov API. Ak chcete spracovať neobmedzené toky údajov v reálnom čase, budete musieť použiť DataStream API
4. Transformácie rozhrania DataSet API
Vstupným bodom do programu Flink je inštancia ExecutionEnvironment trieda - definuje kontext, v ktorom sa program vykonáva.
Vytvorme ExecutionEnvironment na spustenie nášho spracovania:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment ();
Upozorňujeme, že keď aplikáciu spustíte na lokálnom počítači, vykoná spracovanie na lokálnom JVM. Ak chcete spustiť spracovanie na klastri strojov, budete si musieť na tieto stroje nainštalovať Apache Flink a nakonfigurovať ExecutionEnvironment podľa toho.
4.1. Vytvorenie množiny údajov
Aby sme mohli začať vykonávať transformácie údajov, musíme dodať nášmu programu údaje.
Vytvorme inštanciu súboru Množina údajov triedy pomocou nášho ExecutionEnvironmentement:
Množiny DataSet = env.fromElements (1, 29, 40, 50);
Môžete vytvoriť Množina údajov z viacerých zdrojov, ako napríklad Apache Kafka, súbor CSV, súbor alebo prakticky akýkoľvek iný zdroj údajov.
4.2. Filtrujte a redukujte
Po vytvorení inštancie súboru Množina údajov triedy, môžete na ňu použiť transformácie.
Povedzme, že chcete filtrovať čísla, ktoré sú nad určitou hranicou, a potom ich všetky sčítať. Môžete použiť filter () a znížiť () transformácie na dosiahnutie tohto cieľa:
medzná hodnota int = 30; Zoznam zhromaždiť = sumy .filtre (a -> a> prahová hodnota) .reduce ((integer, t1) -> integer + t1) .collect (); assertThat (collect.get (0)). isEqualTo (90);
Všimnite si, že zbierať () metóda je a drez operácia, ktorá spustí skutočné transformácie údajov.
4.3. Mapa
Povedzme, že máte Množina údajov z Osoba objekty:
private static class Osoba {private int age; súkromné meno reťazca; // štandardné konštruktory / getre / setre}
Ďalej vytvoríme a Množina údajov týchto objektov:
DataSet personDataSource = env.fromCollection (Arrays.asList (nová osoba (23, "Tom"), nová osoba (75, "Michael"))));
Predpokladajme, že chcete extrahovať iba súbor Vek pole z každého objektu zbierky. Môžete použiť mapa () transformácia získať iba konkrétne pole Osoba trieda:
Zoznam vekových skupín = personDataSource .map (p -> p.age) .collect (); assertThat (vek) .hasSize (2); tvrdiťTo (vek) .obsahuje (23, 75);
4.4. Pripojte sa
Ak máte dva súbory údajov, môžete sa k nim pripojiť id lúka. Na tento účel môžete použiť pripojiť sa () transformácia.
Vytvorme zbierky transakcií a adries používateľa:
Adresa Tuple3 = nová Tuple3 (1, "5th Avenue", "London"); Množina údajov adresy = env.fromElements (adresa); Tuple2 firstTransaction = nový Tuple2 (1, „Transaction_1“); Množina údajov transakcie = env.fromElements (firstTransaction, new Tuple2 (12, "Transaction_2"));
Prvé pole v oboch n-ticiach je Celé číslo typu, a toto je id pole, na ktorom chceme spojiť obidve množiny údajov.
Ak chcete vykonať skutočnú logiku spájania, musíme implementovať a KeySelector rozhranie pre adresu a transakciu:
súkromná statická trieda IdKeySelectorTransaction implementuje KeySelector {@Override public Integer getKey (hodnota Tuple2) {návratová hodnota.f0; }} súkromná statická trieda IdKeySelectorAddress implementuje KeySelector {@Override public Integer getKey (hodnota Tuple3) {návratová hodnota.f0; }}
Každý selektor vracia iba pole, v ktorom by sa malo vykonať spojenie.
Bohužiaľ tu nie je možné použiť výrazy lambda, pretože Flink potrebuje informácie o všeobecnom type.
Ďalej implementujme logiku zlučovania pomocou týchto selektorov:
Zoznam<>> join = transakcie.join (adresy). kde (nový IdKeySelectorTransaction ()) .equalTo (nový IdKeySelectorAddress ()) .collect (); assertThat (spojené) .hasSize (1); assertThat (spojené). obsahuje (nový Tuple2 (firstTransaction, adresa));
4.5. Triediť
Povedzme, že máte nasledujúcu zbierku súborov Tuple2:
Tuple2 secondPerson = nový Tuple2 (4, "Tom"); Tuple2 thirdPerson = nový Tuple2 (5, "Scott"); Tuple2 quarterPerson = nový Tuple2 (200, "Michael"); Tuple2 firstPerson = nový Tuple2 (1, "Jack"); Množina údajov transakcie = env.fromElements (quarterPerson, secondPerson, thirdPerson, firstPerson);
Ak chcete túto kolekciu zoradiť podľa prvého poľa n-tice, môžete použiť sortPartitions () transformácia:
Zoznam triedené = transakcie .sortPartition (new IdKeySelectorTransaction (), Order.ASCENDING) .collect (); assertThat (zoradené). obsahuje Presne (firstPerson, secondPerson, tretíPerson, štvrtýPerson);
5. Počet slov
Problém počtu slov je problém, ktorý sa bežne používa na prezentáciu schopností rámcov spracovania veľkých dát. Základné riešenie spočíva v počítaní výskytov slov pri zadávaní textu. Použime Flink na implementáciu riešenia tohto problému.
Ako prvý krok v našom riešení vytvoríme a LineSplitter trieda, ktorá rozdeľuje náš vstup na tokeny (slová), pričom pre každý token zhromažďuje a Tuple2 párov kľúč - hodnota. V každej z týchto n-tic je kľúčom slovo nájdené v texte a hodnotou je celé číslo (1).
Táto trieda implementuje FlatMapFunction rozhranie, ktoré zaberá String ako vstup a produkuje a Tuple2:
verejná trieda LineSplitter implementuje FlatMapFunction {@Override public void flatMap (hodnota reťazca, Collector out) {Stream.of (value.toLowerCase (). split ("\ W +")) .filter (t -> t.length ()> 0). forEach (token -> out.collect (nový Tuple2 (token) , 1))); }}
Voláme zbierať () metóda na Zberateľ triedy na posunutie údajov vpred v potrubí spracovania.
Naším ďalším a posledným krokom je zoskupiť n-tice podľa ich prvých prvkov (slov) a potom vykonať a súčet agregovať na druhých prvkoch a vytvoriť počet výskytov slova:
verejný statický súbor údajov startWordCount (ExecutionEnvironment env, zoznam riadkov) vyvolá výnimku {DataSet text = env.fromCollection (riadky); vrátiť text.flatMap (nový LineSplitter ()) .groupBy (0) .agregát (Aggregations.SUM, 1); }
Používame tri typy Flinkových transformácií: flatMap (), groupBy ()a agregát ().
Poďme napísať test, ktorý tvrdí, že implementácia počtu slov funguje podľa očakávaní:
List lines = Arrays.asList ("Toto je prvá veta", "Toto je druhá veta s jedným slovom"); Množina údajov result = WordCount.startWordCount (env, riadky); Zoznam collect = result.collect (); assertThat (collect) .containsExactlyInAnyOrder (nový Tuple2 ("a", 3), nový Tuple2 ("veta", 2), nový Tuple2 ("slovo", 1), nový Tuple2 ("je", 2), nový Tuple2 ( „this“, 2), new Tuple2 ("second", 1), new Tuple2 ("first", 1), new Tuple2 ("with", 1), new Tuple2 ("one", 1));
6. Rozhranie DataStream API
6.1. Vytvorenie DataStream
Apache Flink podporuje spracovanie streamov udalostí aj prostredníctvom svojho rozhrania DataStream API. Ak chceme začať konzumovať udalosti, musíme najskôr použiť StreamExecutionEnvironment trieda:
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment ();
Ďalej môžeme vytvoriť prúd udalostí pomocou prevedenieŽivotné prostredie z rôznych zdrojov. Môže to byť nejaký autobus so správami Apache Kafka, ale v tomto príklade jednoducho vytvoríme zdroj z niekoľkých prvkov reťazca:
DataStream dataStream = executionEnvironment.fromElements ("Toto je prvá veta", "Toto je druhá veta s jedným slovom");
Transformácie môžeme použiť na každý prvok prvku DataStream ako v normále Množina údajov trieda:
SingleOutputStreamOperator upperCase = text.map (String :: toUpperCase);
Na spustenie vykonania je potrebné vyvolať operáciu umývania, ako je napr print () ktorý iba vytlačí výsledok transformácií na štandardný výstup, nasledujúci s vykonať () metóda na StreamExecutionEnvironment trieda:
upperCase.print (); env.execute ();
Vyprodukuje nasledujúci výstup:
1> TOTO JE PRVÁ VETA 2> TOTO JE DRUHÁ VETA S JEDNÝM SLOVOM
6.2. Prezentácia udalostí
Pri spracovávaní toku udalostí v reálnom čase môže byť niekedy potrebné zoskupiť udalosti a použiť nejaký výpočet v okne týchto udalostí.
Predpokladajme, že máme prúd udalostí, kde každá udalosť predstavuje dvojicu pozostávajúcu z čísla udalosti a časovej pečiatky, keď bola udalosť odoslaná do nášho systému, a že môžeme tolerovať udalosti, ktoré sú mimo poradia, ale iba ak nie sú mešká viac ako dvadsať sekúnd.
V tomto príklade najskôr vytvoríme prúd simulujúci dve udalosti, ktoré sú od seba vzdialené niekoľko minút, a definujeme extraktor časovej pečiatky, ktorý určuje náš prah oneskorenia:
SingleOutputStreamOperator windowed = env.fromElements (nový Tuple2 (16, ZonedDateTime.now (). plusMinutes (25) .toInstant (). getEpochSecond ()), nový Tuple2 (15, ZonedDateTime.now (). plusMinutes (2) .toInstant () .getEpochSecond ())) .assignTimestampsAndWatermarks (nový BoundedOutOfOrdernessTimestampExtractor (Time.seconds (20)) {@Override public long extractTimestamp (Tuple2 element) {return element.f1 * 1000; }});
Ďalej definujeme operáciu okna na zoskupenie našich udalostí do päťsekundových okien a na tieto udalosti použijeme transformáciu:
SingleOutputStreamOperator znížené = okenné .windowAll (TumblingEventTimeWindows.of (Time.seconds (5))) .maxBy (0, true); zmenšený.tlač ();
Dostane posledný prvok z každého päťsekundového okna, takže sa vytlačí:
1> (15,1491221519)
Upozorňujeme, že druhú udalosť nevidíme, pretože dorazila neskôr ako zadaný prah oneskorenia.
7. Záver
V tomto článku sme predstavili rámec Apache Flink a pozreli sa na niektoré transformácie dodávané s jeho API.
Implementovali sme program počítania slov pomocou plynulého a funkčného rozhrania DataSet API spoločnosti Flink. Potom sme sa pozreli na API DataStream a implementovali sme jednoduchú transformáciu v reálnom čase na prúd udalostí.
Implementáciu všetkých týchto príkladov a útržkov kódu nájdete na GitHub - jedná sa o projekt Maven, takže by malo byť ľahké ho importovať a spustiť tak, ako je.