Presne raz Spracovanie v Kafke pomocou Javy

1. Prehľad

V tejto príručke sa pozrieme na to, ako na to Spoločnosť Kafka zaisťuje dodávku medzi výrobcami a spotrebiteľmi okamžite prostredníctvom novo zavedeného transakčného rozhrania API.

Ďalej použijeme toto API na implementáciu producentov a spotrebiteľov transakcií, aby sme v príklade WordCount dosiahli úplné doručenie naraz.

2. Doručenie správy v Kafke

Z dôvodu rôznych zlyhaní nemôžu systémy správ zaručiť doručenie správ medzi výrobcami a spotrebiteľmi. V závislosti od toho, ako klientske aplikácie interagujú s takýmito systémami, je možná nasledujúca sémantika správ:

  • Ak systém správ nikdy duplikuje správu, ale príležitostná správa by mohla chýbať, hovoríme tomu nanajvýš raz
  • Alebo, ak nikdy nezmešká správu, ale môže duplikovať príležitostnú správu, hovoríme jej aspoň raz
  • Ale ak vždy doručuje všetky správy bez duplikácie, to je presne-raz

Kafka spočiatku podporovala doručovanie správ iba nanajvýš a najmenej raz.

Avšak zavedenie Transakcií medzi maklérmi Kafka a klientskymi aplikáciami zaisťuje doručenie v Kafke okamžite. Aby sme tomu lepšie porozumeli, poďme rýchlo skontrolovať API transakčného klienta.

3. Závislosti Maven

Aby sme mohli pracovať s transakčným API, budeme v našom pom potrebovať klienta Java Kafka:

 org.apache.kafka kafka-klienti 2.0.0 

4. Transakčné konzumovať-transformovať-vyrábať Slučka

Pre náš príklad spotrebujeme správy zo vstupnej témy, vety.

Potom pre každú vetu spočítame každé slovo a jednotlivé počty slov pošleme na výstupnú tému, počíta.

V príklade budeme predpokladať, že v transakcii už sú k dispozícii údaje o transakciách vety téma.

4.1. Producent uvedomujúci si transakcie

Najprv teda pridajme typického producenta Kafky.

Vlastnosti producerProps = nové Vlastnosti (); producerProps.put ("bootstrap.servers", "localhost: 9092");

Ďalej však musíme určiť a transakčné.id a povoliť idempotencia:

producerProps.put ("enable.idempotence", "true"); producerProps.put ("transactional.id", "prod-1"); KafkaProducer producer = nový KafkaProducer (producerProps);

Pretože sme povolili idempotenciu, Kafka použije toto ID transakcie ako súčasť svojho algoritmu na deduplikovať každú správu tohto výrobcupošle, zabezpečenie idempotencie.

Jednoducho povedané, ak výrobca omylom pošle rovnakú správu spoločnosti Kafka viackrát, tieto nastavenia mu umožnia všimnúť si to.

Všetko, čo musíme urobiť, je uistite sa, že ID transakcie je pre každého producenta odlišné, aj keď je konzistentný pri všetkých reštartoch.

4.2. Povolenie producenta pre transakcie

Keď sme pripravení, musíme tiež zavolať initTransaction pripraviť výrobcu na použitie transakcií:

producer.initTransactions ();

To zaregistruje výrobcu u sprostredkovateľa ako ten, ktorý môže používať transakcie, jeho identifikáciu podľa jeho transakčné.id a poradové číslo alebo epocha. Maklér ich následne použije na zapísanie akýchkoľvek akcií do protokolu transakcií.

A následne sprostredkovateľ odstráni z tohto denníka všetky akcie, ktoré patria producentovi s rovnakým ID transakcie a staršímepocha, ich predpokladom je, že pochádzajú zo zaniknutých transakcií.

4.3. Zákazník informovaný o transakciách

Keď skonzumujeme, môžeme si prečítať všetky správy v tematickej oblasti v poradí. Predsa, môžeme označiť pomocou izolácia.úroveň že by sme mali počkať na prečítanie transakčných správ, kým nebude vykonaná príslušná transakcia:

Vlastnosti consumerProps = nové Vlastnosti (); consumerProps.put ("bootstrap.servers", "localhost: 9092"); consumerProps.put ("group.id", "my-group-id"); consumerProps.put ("enable.auto.commit", "false"); consumerProps.put ("isolation.level", "read_committed"); KafkaConsumer consumer = nový KafkaConsumer (consumerProps); spotrebiteľ.subscribe (singleton („vety“));

Pomocou hodnoty read_committed zaisťuje, že nebudeme čítať žiadne transakčné správy pred dokončením transakcie.

Predvolená hodnota je izolácia.úroveň je read_uncommitted.

4.4. Spotreba a transformácia transakciou

Teraz, keď máme výrobcu aj spotrebiteľa nakonfigurovaného na transakčné písanie aj čítanie, môžeme konzumovať záznamy z našej vstupnej témy a počítať každé slovo v každom zázname:

Záznamy ConsumerRecords = consumer.poll (ofSeconds (60)); Mapa wordCountMap = records.records (new TopicPartition ("input", 0)) .stream () .flatMap (record -> Stream.of (record.value (). Split (""))) .map (word -> Tuple.of (word, 1)) .collect (Collectors.toMap (tuple -> tuple.getKey (), t1 -> t1.getValue (), (v1, v2) -> v1 + v2));

Upozorňujeme, že s vyššie uvedeným kódom nie je nič transakčné. Ale, odkedy sme použili read_committed, to znamená, že tento spotrebiteľ nebude čítať žiadne správy, ktoré boli zapísané na vstupnú tému v tej istej transakcii, kým nebudú všetky napísané.

Teraz môžeme do výstupnej témy poslať vypočítaný počet slov.

Pozrime sa, ako môžeme dosiahnuť naše výsledky, a to aj transakčne.

4.5. Poslať API

Aby sme odoslali naše počty ako nové správy, ale v rovnakej transakcii, zavoláme beginTransaction:

producer.beginTransaction ();

Potom môžeme každú z nich zapísať do našej témy „počty“, pričom kľúčom bude slovo a počtom bude hodnota:

wordCountMap.forEach ((kľúč, hodnota) -> producent.send (nový ProducerRecord ("počty", kľúč, hodnota.toString ())));

Upozorňujeme, že pretože výrobca môže údaje rozdeliť podľa kľúča, znamená to transakčné správy môžu obsahovať viac oddielov, z ktorých každú čítajú samostatní spotrebitelia. Sprostredkovateľ Kafka preto uloží zoznam všetkých aktualizovaných oddielov pre transakciu.

Všimnite si tiež, že v rámci transakcie môže producent použiť viac vlákien na paralelné odoslanie záznamov.

4.6. Viazanie kompenzácií

A nakoniec sa musíme zaviazať k vyrovnaniu, ktoré sme práve skončili so spotrebou. Pri transakciách potvrdzujeme kompenzácie späť na vstupnú tému, z ktorej sme ich čítali, ako je to bežné. Aj keď my poslať ich na transakciu výrobcu.

Toto všetko môžeme urobiť v jednom hovore, ale najskôr musíme vypočítať posuny pre každý oddiel témy:

Map offsetsToCommit = nový HashMap (); pre (oblasť TopicPartition: records.partitions ()) {zoznam partitionedRecords = records.records (partition); long offset = partitionedRecords.get (partitionedRecords.size () - 1) .offset (); offsetsToCommit.put (oblasť, nová OffsetAndMetadata (offset + 1)); }

Upozorňujeme, že transakciou sa zaviažeme, je nadchádzajúci offset, čo znamená, že musíme pridať 1.

Potom môžeme do transakcie poslať naše vypočítané kompenzácie:

producer.sendOffsetsToTransaction (offsetsToCommit, "my-group-id");

4.7. Zaviazanie alebo prerušenie transakcie

A nakoniec môžeme spáchať transakciu, ktorá atomicky zapíše vyrovnania na spotrebné ofsety ako aj k samotnej transakcii:

producer.commitTransaction ();

Týmto sa vyprázdnia všetky správy vo vyrovnávacej pamäti do príslušných oddielov. Sprostredkovateľ Kafka navyše sprístupňuje spotrebiteľom všetky správy z tejto transakcie.

Samozrejme, ak sa počas spracovania niečo pokazí, napríklad ak zachytíme výnimku, môžeme zavolať abortTransaction:

skúste {// ... prečítať zo vstupnej témy // ... transformovať // ... zapísať na výstupnú tému producent.commitTransaction (); } catch (Výnimka e) {producer.abortTransaction (); }

A zahodiť všetky správy vo vyrovnávacej pamäti a odstrániť transakciu od sprostredkovateľa.

Ak sa nedopustíme ani nevyrušíme skôr, ako je nakonfigurovaný sprostredkovateľom max.transaction.timeout.ms, broker Kafka samotnú transakciu preruší. Predvolená hodnota pre toto vlastníctvo je 900 000 milisekúnd alebo 15 minút.

5. Iné konzumovať-transformovať-vyrábať Slučky

To, čo sme práve videli, je základné konzumovať-transformovať-vyrábať slučka, ktorá číta a píše do rovnakého kafkovského klastra.

Naopak, aplikácie, ktoré musia čítať a zapisovať do rôznych klastrov Kafka, musia používať staršie commitSync a commitAsync API. Aplikácie budú zvyčajne ukladať kompenzácie spotrebiteľov do externého úložiska stavu, aby sa zachovala transakčnosť.

6. Záver

Pre aplikácie kritické z hľadiska dát je často nevyhnutné komplexné spracovanie naraz.

V tomto návode videli sme, ako pomocou Kafku robíme presne toto, pomocou transakciía na ilustráciu princípu sme implementovali príklad počítania slov založený na transakciách.

Neváhajte a vyskúšajte všetky ukážky kódu na GitHub.


$config[zx-auto] not found$config[zx-overlay] not found