Úvod do KafkaStreams v Jave

1. Prehľad

V tomto článku sa pozrieme na KafkaStreams knižnica.

KafkaStreams je vyvinuté tvorcami Apache Kafky. Primárnym cieľom tohto softvéru je umožniť programátorom vytvárať efektívne streamovacie aplikácie v reálnom čase, ktoré by mohli fungovať ako Microservices.

KafkaStreams umožňuje nám konzumovať z tém Kafky, analyzovať alebo transformovať údaje a prípadne ich odosielať do inej témy Kafka.

Demonštrovať KafkaStreams, vytvoríme jednoduchú aplikáciu, ktorá načíta vety z témy, spočíta výskyty slov a vytlačí počet za slovo.

Je dôležité si uvedomiť, že KafkaStreams knižnica nie je reaktívna a nemá podporu pre asynchronné operácie a spracovanie protitlaku.

2. Závislosť od Maven

Ak chcete začať písať logiku spracovania streamu pomocou KafkaStreams, musíme pridať závislosť na kafka-potoky a kafka-klienti:

 org.apache.kafka kafka-streams 1.0.0 org.apache.kafka kafka-klienti 1.0.0 

Tiež musíme mať nainštalovaný a spustený Apache Kafka, pretože budeme používať tému Kafka. Táto téma bude zdrojom údajov pre našu prácu so streamovaním.

Môžeme si stiahnuť Kafku a ďalšie požadované závislosti z oficiálnych webových stránok.

3. Konfigurácia vstupu KafkaStreams

Prvá vec, ktorú urobíme, je definícia vstupnej témy Kafka.

Môžeme použiť Sútok nástroj, ktorý sme stiahli - obsahuje server Kafka. Obsahuje tiež kafka-konzola-producent ktoré môžeme použiť na zverejnenie správ pre Kafku.

Na začiatok spustíme náš kafkovský klaster:

./spokojný štart

Hneď ako Kafka začne, môžeme definovať náš zdroj údajov a názov našej aplikácie pomocou APPLICATION_ID_CONFIG:

Reťazec inputTopic = "inputTopic";
Vlastnosti streamsConfiguration = nové vlastnosti (); streamsConfiguration.put (StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");

Rozhodujúcim konfiguračným parametrom je BOOTSTRAP_SERVER_CONFIG. Toto je adresa URL našej miestnej inštancie Kafka, ktorú sme práve spustili:

private String bootstrapServers = "localhost: 9092"; streamsConfiguration.put (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

Ďalej musíme odovzdať typ kľúča a hodnotu správ, ktoré sa budú spotrebovávať inputTopic:

streamsConfiguration.put (StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ()); streamsConfiguration.put (StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ());

Streamové spracovanie je často stavové. Ak chceme uložiť medzivýsledky, musíme určiť STATE_DIR_CONFIG parameter.

V našom teste používame lokálny súborový systém:

streamsConfiguration.put (StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory (). getAbsolutePath ()); 

4. Budovanie streamovacej topológie

Keď sme definovali našu vstupnú tému, môžeme vytvoriť streamovaciu topológiu - to je definícia toho, ako by sa mali udalosti spracovávať a transformovať.

V našom príklade by sme chceli implementovať počítadlo slov. Za každú vetu zaslanú inputTopic, chceme ho rozdeliť na slová a vypočítať výskyt každého slova.

Môžeme použiť inštanciu KStreamsBuilder triedy, aby sme začali konštruovať našu topológiu:

Builder KStreamBuilder = nový KStreamBuilder (); KStream textLines = builder.stream (inputTopic); Pattern pattern = Pattern.compile ("\ W +", Pattern.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines .flatMapValues ​​(hodnota -> Arrays.asList (pattern.split (value.toLowerCase ()))) .groupBy ((kľúč, slovo) -> slovo). Účet ();

Ak chcete implementovať počet slov, najskôr je potrebné rozdeliť hodnoty pomocou regulárneho výrazu.

Metóda split vracia pole. Používame flatMapValues ​​() vyrovnať to. V opačnom prípade by sme skončili so zoznamom polí a bolo by nepohodlné písať kód pomocou takejto štruktúry.

Nakoniec agregujeme hodnoty pre každé slovo a voláme count () ktorá vypočíta výskyty konkrétneho slova.

5. Spracovanie výsledkov

Počet slov v našich vstupných správach sme už vypočítali. Teraz si vytlačíme výsledky na štandardný výstup pomocou pre každý() metóda:

wordCounts .foreach ((w, c) -> System.out.println ("slovo:" + w + "->" + c));

V produkcii môže často taká streamovaná úloha zverejniť výstup na inú tému Kafka.

Mohli by sme to urobiť pomocou metóda ():

Reťazec outputTopic = "outputTopic"; Serde stringSerde = Serdes.String (); Serde longSerde = Serdes.Long (); wordCounts.to (stringSerde, longSerde, outputTopic);

The Serde class nám dáva predkonfigurované serializátory pre typy Java, ktoré sa použijú na serializáciu objektov na pole bajtov. Pole bajtov sa potom odošle na tému Kafka.

Používame String ako kľúč k našej téme a Dlhé ako hodnota pre skutočný počet. The do () metóda uloží výsledné dáta do outputTopic.

6. Spustenie úlohy KafkaStream

Do tohto bodu sme vytvorili topológiu, ktorú je možné vykonať. Práca sa však ešte nezačala.

Svoju prácu musíme začať výslovne zavolaním na číslo štart () metóda na KafkaStreams inštancia:

KafkaStreams streams = nové KafkaStreams (builder, streamsConfiguration); streams.start (); Závit. Spánok (30 000); streams.close ();

Upozorňujeme, že na dokončenie úlohy čakáme 30 sekúnd. V scenári zo skutočného sveta by táto úloha bežala neustále a pri ich príchode by sa spracovávali udalosti z Kafky.

Svoju prácu si môžeme vyskúšať zverejnením niektorých udalostí na našu tému Kafka.

Začnime a kafka-konzola-producent a ručne pošlite niektoré udalosti našej službe inputTopic:

./kafka-console-producer --topic inputTopic --broker-list localhost: 9092> "this is a pony"> "this is a horse and pony" 

Týmto spôsobom sme pre Kafku zverejnili dve udalosti. Naša aplikácia tieto udalosti spotrebuje a vytlačí nasledujúci výstup:

slovo: -> 1 slovo: toto -> 1 slovo: je -> 1 slovo: a -> 1 slovo: pony -> 1 slovo: -> 2 slovo: toto -> 2 slovo: je -> 2 slovo: a - > 2 slová: kôň -> 1 slovo: a -> 1 slovo: poník -> 2

Vidíme, že keď prišla prvá správa, slovo poníka došlo iba raz. Ale keď sme poslali druhú správu, slovo poníka došlo k druhému tlačeniu: “slovo: poník -> 2 ″.

6. Záver

Tento článok pojednáva o tom, ako vytvoriť aplikáciu na spracovanie primárneho toku pomocou Apache Kafka ako zdroja údajov a KafkaStreams knižnica ako knižnica spracovania toku.

Všetky tieto príklady a útržky kódu nájdete v projekte GitHub - toto je projekt Maven, takže by malo byť ľahké ho importovať a spustiť tak, ako je.


$config[zx-auto] not found$config[zx-overlay] not found