Budovanie dátového toku s spoločnosťami Flink a Kafka

1. Prehľad

Apache Flink je rámec spracovania prúdov, ktorý sa dá ľahko použiť v prostredí Java. Apache Kafka je systém na spracovanie distribuovaného toku, ktorý podporuje vysokú odolnosť proti chybám.

V tomto výučbe sa pozrieme na to, ako vybudovať dátový kanál pomocou týchto dvoch technológií.

2. Inštalácia

Informácie o inštalácii a konfigurácii servera Apache Kafka nájdete v oficiálnom sprievodcovi. Po inštalácii môžeme pomocou nasledujúcich príkazov vytvoriť nové témy s názvom flink_input a flink_output:

 bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replikačný faktor 1 - oddiely 1 \ --topic flink_output bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replikačný faktor 1 - oddiely 1 \ --topický flink_input

V záujme tohto tutoriálu použijeme predvolenú konfiguráciu a predvolené porty pre Apache Kafka.

3. Využitie Flink

Apache Flink umožňuje technológiu spracovania toku v reálnom čase. Rámec umožňuje použitie viacerých systémov tretích strán ako zdrojov prúdu alebo prepadov.

Vo Flink - sú k dispozícii rôzne konektory:

  • Apache Kafka (zdroj / umývadlo)
  • Apache Cassandra (umývadlo)
  • Streamovanie Amazon Kinesis (zdroj / umývadlo)
  • Elasticsearch (drez)
  • Hadoop FileSystem (umývadlo)
  • RabbitMQ (zdroj / drez)
  • Apache NiFi (zdroj / drez)
  • Twitter Streaming API (zdroj)

Ak chcete do nášho projektu pridať Flink, musíme zahrnúť nasledujúce závislosti Maven:

 org.apache.flink flink-core 1.5.0 org.apache.flink flink-konektor-kafka-0.11_2.11 1.5.0 

Pridanie týchto závislostí nám umožní konzumovať a vyrábať z a na témy Kafka. Aktuálnu verziu aplikácie Flink nájdete na serveri Maven Central.

4. Spotrebiteľ reťazca Kafka

Aby sme mohli spotrebovať údaje z Kafky pomocou Flinku, musíme uviesť tému a adresu Kafku. Mali by sme tiež poskytnúť ID skupiny, ktoré sa použije na zadržanie kompenzácií, aby sme od začiatku nemuseli vždy načítať všetky údaje.

Poďme vytvoriť statickú metódu, ktorá umožní vytvorenie FlinkKafkaConsumer jednoduchšie:

public static FlinkKafkaConsumer011 createStringConsumerForTopic (Reťazcová téma, Reťazec kafkaAddress, Reťazec kafkaGroup) {Vlastnosti rekvizity = nové Vlastnosti (); props.setProperty ("bootstrap.servers", kafkaAddress); props.setProperty ("group.id", kafkaGroup); FlinkKafkaConsumer011 consumer = nový FlinkKafkaConsumer011 (téma, nový SimpleStringSchema (), rekvizity); spätný spotrebiteľ; }

Táto metóda vyžaduje a téma, adresa kafka, a kafkaGroup a vytvára FlinkKafkaConsumer ktorá bude spotrebovávať údaje z danej témy ako a String odkedy sme použili SimpleStringSchema na dekódovanie dát.

Číslo 011 v mene triedy odkazuje na verziu Kafka.

5. Kafka String Producer

Na získanie údajov pre Kafku je potrebné uviesť adresu a tému Kafky, ktoré chceme použiť. Opäť môžeme vytvoriť statickú metódu, ktorá nám pomôže vytvoriť producentov pre rôzne témy:

public static FlinkKafkaProducer011 createStringProducer (Reťazec topic, Reťazec kafkaAddress) {vrátiť nový FlinkKafkaProducer011 (kafkaAddress, topic, new SimpleStringSchema ()); }

Táto metóda trvá iba téma a kafkaAddress ako argumenty, pretože pri výrobe témy Kafka nie je potrebné uvádzať ID skupiny.

6. Spracovanie toku reťazcov

Keď máme plne funkčného spotrebiteľa a výrobcu, môžeme sa pokúsiť spracovať údaje z Kafky a potom uložiť naše výsledky späť do Kafky. Celý zoznam funkcií, ktoré možno použiť na spracovanie streamu, nájdete tu.

V tomto príklade použijeme slová v každej položke Kafka veľké a potom ich zapíšeme späť do Kafky.

Z tohto dôvodu musíme vytvoriť zvyk MapFunction:

public class WordsCapitalizer implementuje MapFunction {@Override public String map (String s) {return s.toUpperCase (); }}

Po vytvorení funkcie ju môžeme použiť pri spracovaní streamu:

public static void capitalize () {String inputTopic = "flink_input"; Reťazec outputTopic = "flink_output"; Reťazec consumerGroup = "baeldung"; Reťazcová adresa = "localhost: 9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment .getExecutionEnvironment (); FlinkKafkaConsumer011 flinkKafkaConsumer = createStringConsumerForTopic (inputTopic, address, consumerGroup); DataStream stringInputStream = prostredie .addSource (flinkKafkaConsumer); FlinkKafkaProducer011 flinkKafkaProducer = createStringProducer (outputTopic, address); stringInputStream .map (new WordsCapitalizer ()) .addSink (flinkKafkaProducer); }

Aplikácia načíta údaje z flink_input téma, vykonajte operácie so streamom a potom výsledky uložte do flink_output téma v Kafke.

Už sme videli, ako sa vysporiadať s reťazcami pomocou Flinku a Kafku. Často je však potrebné vykonávať operácie na vlastných objektoch. Uvidíme, ako na to v ďalších kapitolách.

7. Deserializácia vlastných objektov

Nasledujúca trieda predstavuje jednoduchú správu s informáciami o odosielateľovi a príjemcovi:

@JsonSerialize verejná trieda InputMessage {odosielateľ reťazca; Príjemca reťazca; LocalDateTime sentAt; Reťazcová správa; }

Predtým sme používali SimpleStringSchema deserializovať správy od Kafku, ale teraz chceme deserializovať údaje priamo na vlastné objekty.

Potrebujeme na to zvyk Schéma deserializácie:

verejná trieda InputMessageDeserializationSchema implementuje DeserializationSchema {statický ObjectMapper objectMapper = nový ObjectMapper () .registerModule (nový JavaTimeModule ()); @Override public InputMessage deserialize (byte [] bajtov) hodí IOException {return objectMapper.readValue (bytes, InputMessage.class); } @Override public boolean isEndOfStream (InputMessage inputMessage) {return false; } @Override public TypeInformation getProducedType () {return TypeInformation.of (InputMessage.class); }}

Tu predpokladáme, že správy sa v Kafke uchovávajú ako JSON.

Keďže máme pole typu LocalDateTime, musíme špecifikovať JavaTimeModule, ktorá sa stará o mapovanie LocalDateTime namieta voči JSON.

Flinkové schémy nemôžu obsahovať polia, ktoré nie sú serializovateľné pretože všetky operátory (ako schémy alebo funkcie) sú serializované na začiatku úlohy.

Podobné problémy sú aj v Apache Spark. Jednou zo známych opráv tohto problému je inicializácia polí ako statický, ako sme to urobili ObjectMapper vyššie. Nie je to najkrajšie riešenie, ale je pomerne jednoduché a robí svoju prácu.

Metóda isEndOfStream je možné použiť v osobitnom prípade, keď by sa prúd mal spracovávať iba do prijatia určitých konkrétnych údajov. Ale v našom prípade to nie je potrebné.

8. Serializácia vlastných objektov

Teraz predpokladajme, že chceme, aby náš systém mal možnosť vytvorenia zálohy správ. Chceme, aby bol proces automatický, a každá záloha by mala pozostávať zo správ odoslaných počas jedného celého dňa.

Záložnej správe by mal byť priradený aj jedinečný identifikátor.

Na tento účel môžeme vytvoriť nasledujúcu triedu:

public class Backup {@JsonProperty ("inputMessages") Zoznam inputMessages; @JsonProperty ("backupTimestamp") LocalDateTime backupTimestamp; @JsonProperty ("uuid") UUID uuid; verejné zálohovanie (zoznam inputMessages, LocalDateTime backupTimestamp) {this.inputMessages = inputMessages; this.backupTimestamp = backupTimestamp; this.uuid = UUID.randomUUID (); }}

Upozorňujeme, že mechanizmus generovania UUID nie je dokonalý, pretože umožňuje duplikáty. To však na rozsah tohto príkladu stačí.

Chceme zachrániť naše Zálohovanie objekt ako JSON pre Kafku, takže musíme vytvoriť náš SerializationSchema:

verejná trieda BackupSerializationSchema implementuje SerializationSchema {ObjectMapper objectMapper; Logger logger = LoggerFactory.getLogger (BackupSerializationSchema.class); @Override public byte [] serialize (Backup backupMessage) {if (objectMapper == null) {objectMapper = new ObjectMapper () .registerModule (new JavaTimeModule ()); } skúsiť {return objectMapper.writeValueAsString (backupMessage) .getBytes (); } catch (com.fasterxml.jackson.core.JsonProcessingException e) {logger.error ("Nepodarilo sa analyzovať JSON", e); } vrátiť nový bajt [0]; }}

9. Správy s časovou pečiatkou

Pretože chceme vytvoriť zálohu pre všetky správy každého dňa, je potrebné správy označiť časovou značkou.

Flink poskytuje tri rôzne časové charakteristiky EventTime, ProcessingTime, a Čas prijatia.

V našom prípade musíme použiť čas, v ktorom bola správa odoslaná, takže použijeme EventTime.

Použit EventTimepotrebujeme a Priraďovateľ časovej pečiatky ktorý extrahuje časové značky z našich vstupných údajov:

verejná trieda InputMessageTimestampAssigner implementuje AssignerWithPunctuatedWatermarks {@Override public long extractTimestamp (prvok InputMessage, long previousElementTimestamp) {ZoneId zoneId = ZoneId.systemDefault (); návratový prvok.getSentAt (). atZone (zoneId) .toEpochSecond () * 1000; } @Nullable @Override public Watermark checkAndGetNextWatermark (InputMessage lastElement, long extrahovanýTimestamp) {vrátiť nový vodoznak (extrahovanýtimestamp - 1500); }}

Musíme transformovať svoje LocalDateTime do EpochaSecond pretože toto je formát očakávaný Flinkom. Po priradení časových pečiatok budú všetky operácie založené na čase využívať čas od odoslaný pole pôsobiť.

Pretože spoločnosť Flink očakáva, že časové pečiatky budú v milisekundách a toEpochSecond () vráti čas v sekundách, ktorý sme potrebovali vynásobiť číslom 1000, takže Flink vytvorí okná správne.

Flink definuje koncept a Vodoznak. Vodoznaky sú užitočné v prípade údajov, ktoré neprídu v poradí, v akom boli odoslané. Vodoznak definuje maximálnu latenciu, ktorá je povolená pri spracovaní prvkov.

Prvky, ktoré majú časové pečiatky nižšie ako vodoznak, sa nespracujú vôbec.

10. Vytváranie časových Windows

Aby sme sa uistili, že naša záloha zhromažďuje iba správy odoslané počas jedného dňa, môžeme použiť timeWindowAll metóda v streame, ktorá rozdelí správy do okien.

Stále však budeme musieť agregovať správy z každého okna a vrátiť ich ako Zálohovanie.

Aby sme to dosiahli, budeme potrebovať zvyk Agregovaná funkcia:

verejná trieda BackupAggregator implementuje AggregateFunction {@Override public List createAccumulator () {return new ArrayList (); } @Override public List add (InputMessage inputMessage, List inputMessages) {inputMessages.add (inputMessage); návrat inputMessages; } @Override public Backup getResult (List inputMessages) {return new Backup (inputMessages, LocalDateTime.now ()); } @Override verejné zlúčenie zoznamu (zoznam inputMessages, zoznam acc1) {inputMessages.addAll (acc1); návrat inputMessages; }}

11. Agregácia záloh

Po priradení správnych časových značiek a implementácii našich Agregovaná funkcia, môžeme konečne vziať náš vstup Kafku a spracovať ho:

public static void createBackup () vyvolá výnimku {String inputTopic = "flink_input"; Reťazec outputTopic = "flink_output"; Reťazec consumerGroup = "baeldung"; Reťazec kafkaAddress = "192.168.99.100:9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment (); environment.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); FlinkKafkaConsumer011 flinkKafkaConsumer = createInputMessageConsumer (inputTopic, kafkaAddress, consumerGroup); flinkKafkaConsumer.setStartFromEarliest (); flinkKafkaConsumer.assignTimestampsAndWatermarks (nový InputMessageTimestampAssigner ()); FlinkKafkaProducer011 flinkKafkaProducer = createBackupProducer (outputTopic, kafkaAddress); DataStream inputMessagesStream = environment.addSource (flinkKafkaConsumer); inputMessagesStream .timeWindowAll (Time.hours (24)) .aggregate (new BackupAggregator ()) .addSink (flinkKafkaProducer); environment.execute (); }

12. Záver

V tomto článku sme si predstavili, ako vytvoriť jednoduchý dátový kanál pomocou aplikácií Apache Flink a Apache Kafka.

Ako vždy, kód nájdete na Githube.