Budovanie dátového toku pomocou Kafky, Spark Streamingu a Cassandry

1. Prehľad

Apache Kafka je škálovateľná, vysoko výkonná platforma s nízkou latenciou umožňuje čítanie a zápis tokov údajov, ako je systém správ. Môžeme začať s Kafkou v Jave pomerne ľahko.

Spark Streaming je súčasťou platformy Apache Spark, ktorá umožňuje škálovateľné, vysoko výkonné a na chyby odolné spracovanie dátových tokov. Aj keď je program Spark napísaný v Scale, ponúka Java API, s ktorými môžete pracovať.

Apache Cassandra je a distribuované a široko-stĺpcové úložisko dát NoSQL. Viac podrobností o Cassandre je k dispozícii v našom predchádzajúcom článku.

V tomto tutoriáli ich skombinujeme a vytvoríme vysoko škálovateľný a odolný voči chybám v dátovom toku pre tok údajov v reálnom čase.

2. Inštalácie

Na začiatok budeme na spustenie aplikácie potrebovať lokálne nainštalované Kafky, Sparka a Cassandru na našom stroji. Postupne uvidíme, ako vyvinúť dátový kanál pomocou týchto platforiem.

Ponecháme však všetky predvolené konfigurácie vrátane portov pre všetky inštalácie, čo nám pomôže pri bezproblémovom fungovaní tutoriálu.

2.1. Kafka

Inštalácia Kafky na náš lokálny počítač je pomerne jednoduchá a dá sa nájsť ako súčasť oficiálnej dokumentácie. Budeme používať vydanie Kafky vo verzii 2.1.0.

Navyše, Kafka vyžaduje na spustenie Apache Zookeeper ale na účely tohto tutoriálu využijeme inštanciu Zookeeper s jedným uzlom zabalenú s Kafkou.

Keď sa nám podarí začať Zookeeper a Kafku lokálne sledovať oficiálneho sprievodcu, môžeme pokračovať v vytváraní našej témy s názvom „správy“:

 $ KAFKA_HOME $ \ bin \ windows \ kafka-topic.bat --create \ --zookeeper localhost: 2181 \ --replikačný faktor 1 - oddiely 1 \ --topické správy

Uvedený skript je určený pre platformu Windows, ale sú k dispozícii podobné skripty aj pre platformy podobné systému Unix.

2.2. Iskra

Spark používa klientske knižnice Hadoop pre HDFS a YARN. V dôsledku toho zostavenie kompatibilných verzií všetkých týchto verzií môže byť veľmi zložité. Oficiálne stiahnutie Spark však prichádza vopred zabalené s populárnymi verziami Hadoop. V tomto výučbe budeme používať balík verzie 2.3.0 „vopred zostavený pre Apache Hadoop 2.7 a novšie“.

Po rozbalení správneho balíka programu Spark je možné na odoslanie aplikácií použiť dostupné skripty. Uvidíme to neskôr, keď vyvinieme našu aplikáciu v Spring Boot.

2.3. Cassandra

DataStax sprístupňuje komunitné vydanie Cassandry pre rôzne platformy vrátane Windows. Môžeme to stiahnuť a nainštalovať na náš lokálny počítač veľmi ľahko podľa oficiálnej dokumentácie. Budeme používať verziu 3.9.0.

Keď sa nám podarí nainštalovať a spustiť Cassandru na našom lokálnom počítači, môžeme pokračovať v vytváraní nášho priestoru kľúčov a tabuľky. To je možné vykonať pomocou CQL Shell, ktorý sa dodáva s našou inštaláciou:

VYTVORIŤ slovník KEYSPACE S REPLIKÁCIOU = {'class': 'SimpleStrategy', 'replication_factor': 1}; POUŽÍVAŤ slovnú zásobu; CREATE TABLE slová (text slova PRIMARY KEY, count int);

Upozorňujeme, že sme vytvorili menný priestor s názvom slovná zásoba a zavolal tam stôl slov s dvoma stĺpmi, slovoa počítať.

3. Závislosti

Závislosti Kafka a Spark môžeme do našej aplikácie integrovať cez Maven. Tieto závislosti stiahneme z Maven Central:

  • Core Spark
  • SQL Spark
  • Streamovanie Spark
  • Streamuje sa Kafka Spark
  • Cassandra Spark
  • Cassandra Java Spark

A podľa toho ich môžeme pridať do nášho pom:

 org.apache.spark spark-core_2.11 2.3.0 poskytnuté org.apache.spark spark-sql_2.11 2.3.0 poskytnuté org.apache.spark spark-streaming_2.11 2.3.0 poskytnuté org.apache.spark spark-streaming -kafka-0-10_2.11 2.3.0 com.datastax.spark spark-cassandra-connector_2.11 2.3.0 com.datastax.spark spark-cassandra-connector-java_2.11 1.5.2 

Upozorňujeme, že niektoré z týchto závislostí sú označené ako za predpokladu v rozsahu. Je to tak preto, lebo ich sprístupní inštalácia Spark, kde pomocou príkazu spark-submit pošleme žiadosť o vykonanie.

4. Spark Streaming - Stratégie integrácie Kafka

V tomto okamihu stojí za to krátko hovoriť o integračných stratégiách pre Spark a Kafku.

Spoločnosť Kafka predstavila nové spotrebiteľské API medzi verziami 0.8 a 0.10. Preto sú pre obidve verzie sprostredkovateľa k dispozícii zodpovedajúce balíčky Spark Streaming. Je dôležité zvoliť správny balík v závislosti od dostupného sprostredkovateľa a požadovaných funkcií.

4.1. Streamovanie iskier Kafka 0.8

Verzia 0.8 je stabilné integračné API s možnosťou použitia prijímača alebo priameho prístupu. Nebudeme sa podrobne zaoberať týmito prístupmi, ktoré môžeme nájsť v oficiálnej dokumentácii. Je potrebné poznamenať, že tento balík je kompatibilný s verziou Kafka Broker verzie 0.8.2.1 alebo novšou.

4.2. Streamovanie iskier Kafka 0,10

Toto je momentálne v experimentálnom stave a je kompatibilné iba s verziami Kafka Broker verzie 0.10.0 alebo novšou. Tento balík ponúka iba priamy prístup a teraz využíva nové spotrebiteľské rozhranie API Kafka. Viac podrobností o tom nájdeme v oficiálnej dokumentácii. Dôležité je, že je nie je spätne kompatibilný so staršími verziami Kafka Broker.

Upozorňujeme, že pre tento tutoriál využijeme balík 0,10. Závislosť uvedená v predchádzajúcej časti sa vzťahuje iba na toto.

5. Vývoj dátového toku

Vytvoríme jednoduchú aplikáciu v Jave pomocou programu Spark, ktorá bude integrovaná s témou Kafka, ktorú sme vytvorili predtým. Aplikácia bude správy čítať ako uverejnené a v každej správe bude počítať počet slov. To sa potom aktualizuje v tabuľke Cassandra, ktorú sme vytvorili predtým.

Poďme si rýchlo predstaviť, ako budú údaje prúdiť:

5.1. Získanie JavaStreamingContext

Najskôr začneme inicializáciou JavaStreamingContext ktorý je vstupným bodom pre všetky aplikácie Spark Streaming:

SparkConf sparkConf = nový SparkConf (); sparkConf.setAppName ("WordCountingApp"); sparkConf.set ("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = nový JavaStreamingContext (sparkConf, Durations.seconds (1));

5.2. Získanie DStream od Kafku

Teraz sa môžeme pripojiť k téme Kafka z JavaStreamingContext:

Mapa kafkaParams = nový HashMap (); kafkaParams.put ("bootstrap.servers", "localhost: 9092"); kafkaParams.put ("key.deserializer", StringDeserializer.class); kafkaParams.put ("value.deserializer", StringDeserializer.class); kafkaParams.put ("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put ("auto.offset.reset", "najnovšie"); kafkaParams.put ("enable.auto.commit", false); Témy zbierky = Arrays.asList ("správy"); JavaInputDStream messages = KafkaUtils.createDirectStream (streamingContext, LocationStrategies.PreferConsistent (), ConsumerStrategies. Prihlásiť sa na odber (topic, kafkaParams));

Upozorňujeme, že tu musíme poskytnúť deserializátory kľúča a hodnoty. Pre bežné dátové typy ako String, deserializátor je predvolene k dispozícii. Ak však chceme načítať vlastné dátové typy, budeme musieť poskytnúť vlastné deserializátory.

Tu sme získali JavaInputDStream čo je implementácia diskretizovaných prúdov alebo DStreams, základná abstrakcia poskytovaná službou Spark Streaming. Interne DStreams nie je nič iné ako súvislá séria RDD.

5.3. Získané spracovanie DStream

Teraz vykonáme sériu operácií na JavaInputDStream na získanie frekvencie slov v správach:

Výsledky JavaPairDStream = správy .mapToPair (záznam -> nový Tuple2 (record.key (), record.value ())); Riadky JavaDStream = výsledky .map (tuple2 -> tuple2._2 ()); Slová JavaDStream = riadky .flatMap (x -> Arrays.asList (x.split ("\ s +")). Iterator ()); JavaPairDStream wordCounts = slová .mapToPair (s -> nový Tuple2 (s, 1)) .reduceByKey ((i1, i2) -> i1 + i2);

5.4. Trvácne spracované DStream do Cassandry

Nakoniec môžeme spracované iterovať JavaPairDStream vložiť ich do našej tabuľky Cassandra:

wordCounts.foreachRDD (javaRdd -> {Map wordCountMap = javaRdd.collectAsMap (); for (String key: wordCountMap.keySet ()) {List wordList = Arrays.asList (new Word (key, wordCountMap.get (key)))); JavaRDD rdd = streamingContext.sparkContext (). Parallelize (wordList); javaFunctions (rdd) .writerBuilder ("slovník", "slová", mapToRow (Word.class)). SaveToCassandra ();}});

5.5. Spustenie aplikácie

Pretože sa jedná o aplikáciu na spracovanie streamu, chceli by sme ju nechať bežať:

streamingContext.start (); streamingContext.awaitTermination ();

6. Využívanie kontrolných bodov

V aplikácii na spracovanie streamu je často užitočné zachovať stav medzi dávkami spracovávaných údajov.

Napríklad v našom predchádzajúcom pokuse sme schopní uložiť iba aktuálnu frekvenciu slov. Čo ak chceme namiesto toho uložiť kumulatívnu frekvenciu? Streamovanie iskier to umožňuje prostredníctvom konceptu nazývaného kontrolné body.

Teraz upravíme kanál, ktorý sme vytvorili skôr, aby sme využili kontrolné body:

Upozorňujeme, že kontrolné body budeme používať iba na reláciu spracovania údajov. To nezabezpečuje odolnosť voči chybám. Kontrolné body však možno použiť aj na odolnosť proti chybám.

V našej aplikácii je potrebných niekoľko zmien, aby sme využili kontrolné body. To zahŕňa poskytnutie JavaStreamingContext s umiestnením kontrolného bodu:

streamingContext.checkpoint ("./. kontrolný bod");

Tu používame lokálny súborový systém na ukladanie kontrolných bodov. Kvôli robustnosti by sa to však malo ukladať na miestach ako HDFS, S3 alebo Kafka. Viac informácií je k dispozícii v oficiálnej dokumentácii.

Ďalej budeme musieť načítať kontrolný bod a vytvoriť kumulatívny počet slov pri spracovaní každého oddielu pomocou mapovacej funkcie:

JavaMapWithStateDStream cumulativeWordCounts = wordCounts .mapWithState (StateSpec.function ((word, one, state) -> {int sum = one.orElse (0) + (state.exists ()? state.get (): 0); výstup Tuple2 = nový Tuple2 (slovo, súčet); state.update (súčet); návratový výstup;}));

Keď dostaneme kumulatívny počet slov, môžeme pokračovať v iterácii a uložiť ich do Cassandry ako predtým.

Upozorňujeme, že zatiaľ čo údajové kontrolné stanovište je užitočné pre stavové spracovanie, prichádza s nákladmi na latenciu. Preto je potrebné používať to rozumne spolu s optimálnym intervalom kontroly.

7. Pochopenie kompenzácií

Ak si spomenieme na niektoré z Kafkových parametrov, ktoré sme nastavili skôr:

kafkaParams.put ("auto.offset.reset", "najnovšie"); kafkaParams.put ("enable.auto.commit", false);

Tieto to v podstate znamenajú nechceme sa automaticky zaviazať na posun a chceli by sme zvoliť najnovší posun vždy, keď sa inicializuje skupina spotrebiteľov. V dôsledku toho bude naša aplikácia schopná využívať iba správy odoslané počas obdobia, ktoré je v prevádzke.

Ak chceme spotrebovať všetky odoslané správy bez ohľadu na to, či bola aplikácia spustená alebo nie, a tiež chceme sledovať už odoslané správy, budeme musieť príslušne nakonfigurovať offset spolu s uložením stavu offsetu, aj keď je to pre tento tutoriál trochu mimo rozsahu.

Toto je tiež spôsob, akým Spark Streaming ponúka konkrétnu úroveň záruky, napríklad „presne raz“. To v podstate znamená, že každá správa zverejnená na tému Kafka bude Spark streamingom spracovaná iba raz.

8. Nasadenie aplikácie

Môžeme nasaďte našu aplikáciu pomocou skriptu Spark-submit ktorý je dodávaný vopred zabalený s inštaláciou Spark:

$ SPARK_HOME $ \ bin \ spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local [2] \ target \ spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies .jar

Pamätajte, že jar, ktorú vytvoríme pomocou Mavenu, by mala obsahovať závislosti, ktoré nie sú označené ako za predpokladu v rozsahu.

Po odoslaní tejto žiadosti a uverejnení niektorých správ v téme Kafka, ktorú sme vytvorili skôr, by sa mali zobraziť kumulatívne počty slov, ktoré sa zverejňujú v predtým vytvorenej tabuľke Cassandra.

9. Záver

Ak to zhrnieme, v tomto tutoriáli sme sa naučili, ako vytvoriť jednoduchý dátový kanál pomocou Kafky, Spark Streamingu a Cassandry. Tiež sme sa naučili, ako využiť kontrolné body v Spark Streamingu na udržanie stavu medzi dávkami.

Ako vždy, kód príkladov je k dispozícii na GitHub.