Úvod do Apache Kafka s jarom

Perzistencia hore

Práve som oznámil nové Naučte sa jar kurz zameraný na základy jari 5 a Spring Boot 2:

>> SKONTROLUJTE KURZ

1. Prehľad

Apache Kafka je distribuovaný a na chyby odolný systém spracovania toku.

V tomto článku sa budeme venovať jarnej podpore Kafky a úrovni abstrakcií, ktoré poskytuje cez natívne klientske rozhrania API klienta Kafka Java.

Jarná Kafka prináša jednoduchý a typický jarný programovací model šablón s a Šablóna Kafka a POJO riadené správami prostredníctvom @KafkaListener anotácia.

2. Inštalácia a nastavenie

Ak si chcete stiahnuť a nainštalovať Kafku, pozrite si tu oficiálneho sprievodcu.

Musíme tiež pridať jar-kafka závislosť na našom pom.xml:

 org.springframework.kafka spring-kafka 2.3.7.VYDANIE 

Najnovšiu verziu tohto artefaktu nájdete tu.

Našou ukážkovou aplikáciou bude aplikácia Spring Boot.

Tento článok predpokladá, že server je spustený pomocou predvolenej konfigurácie a nezmenia sa žiadne porty servera.

3. Konfigurácia tém

Predtým sme používali nástroje príkazového riadku na vytváranie tém v Kafke, ako napríklad:

$ bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replikačný faktor 1 - oddiely 1 \ --topický mytopický

Ale so zavedením AdminClient v Kafke teraz môžeme vytvárať témy programovo.

Musíme pridať KafkaAdmin Jarná fazuľa, ktorá automaticky pridá témy pre všetky fazule typu Nová téma:

@ Konfigurácia verejná trieda KafkaTopicConfig {@Value (hodnota = "$ {kafka.bootstrapAddress}") súkromný reťazec bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin () {Map configs = new HashMap (); configs.put (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); vrátiť nový KafkaAdmin (konfigurácie); } @Bean public NewTopic topic1 () {návrat new NewTopic ("baeldung", 1, (short) 1); }}

4. Produkcia správ

Na vytváranie správ je potrebné najskôr nakonfigurovať a Producentská továreň ktorá stanovuje stratégiu pre vytvorenie Kafku Výrobca inštancie.

Potom potrebujeme a Šablóna Kafka ktorý obaľuje a Výrobca inštancia a poskytuje pohodlné metódy na odosielanie správ na témy Kafka.

Výrobca inštancie sú bezpečné z hľadiska vlákien, a preto použitie jednej inštancie v celom kontexte aplikácie poskytne vyšší výkon. V dôsledku toho Šablóna Kakfa inštancie sú tiež bezpečné z hľadiska vlákien a odporúča sa použitie jednej inštancie.

4.1. Konfigurácia výrobcu

@Configuration public class KafkaProducerConfig {@Bean public ProducerFactory producerFactory () {Map configProps = new HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); vrátiť nový DefaultKafkaProducerFactory (configProps); } @Bean public KafkaTemplate kafkaTemplate () {return new KafkaTemplate (producerFactory ()); }}

4.2. Publikovanie správ

Správy môžeme posielať pomocou Šablóna Kafka trieda:

@Autowired private KafkaTemplate kafkaTemplate; public void sendMessage (reťazec msg) {kafkaTemplate.send (topicName, msg); }

The poslať API vráti a ListenableFuture objekt. Ak chceme zablokovať odosielacie vlákno a získať výsledok o odoslanej správe, môžeme zavolať na dostať API ListenableFuture objekt. Vlákno počká na výsledok, ale spomalí to výrobcu.

Kafka je platforma na rýchle spracovanie streamov. Preto je lepšie pracovať s výsledkami asynchrónne, aby nasledujúce správy nečakali na výsledok predchádzajúcej správy. Môžeme to urobiť pomocou spätného volania:

public void sendMessage (reťazcová správa) {ListenableFuture future = kafkaTemplate.send (topicName, message); future.addCallback (nový ListenableFutureCallback() {@Override public void onSuccess (SendResult result) {System.out.println ("Sent message = [" + message + "] with offset = [" + result.getRecordMetadata (). Offset () + "]")) ; } @Override public void onFailure (Throwable ex) {System.out.println ("Nemožno odoslať správu = [" + správa + "] z dôvodu:" + ex.getMessage ()); }}); }

5. Konzumácia správ

5.1. Konfigurácia spotrebiteľa

Na príjem správ je potrebné nakonfigurovať a ConsumerFactory a a KafkaListenerContainerFactory. Len čo sú tieto fazule dostupné v továrni na výrobu jarných bôbov, môžu sa zákazníci používajúci POJO nakonfigurovať pomocou @KafkaListener anotácia.

@EnableKafka pre konfiguračnú triedu sa vyžaduje anotácia, aby sa umožnila detekcia @KafkaListener anotácia k jarným fazuľám:

@EnableKafka @Configuration verejná trieda KafkaConsumerConfig {@Bean public ConsumerFactory consumerFactory () {Map props = new HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put (ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); vrátiť nové DefaultKafkaConsumerFactory (rekvizity); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = nový ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory ()); vrátiť továreň; }}

5.2. Konzumácia správ

@KafkaListener (topic = "topicName", groupId = "foo") public void listenGroupFoo (reťazcová správa) {System.out.println ("prijatá správa v skupine foo:" + správa); }

Pre jednu tému je možné implementovať viac poslucháčov, každá s inou skupinou Id. Jeden spotrebiteľ môže ďalej počúvať správy z rôznych tém:

@KafkaListener (topic = "topic1, topic2", groupId = "foo")

Jar tiež podporuje načítanie jednej alebo viacerých hlavičiek správ pomocou @ Hlavička anotácia v poslucháčovi:

@KafkaListener (topic = "topicName") public void listenWithHeaders (@Payload String message, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {System.out.println ("Received Message:" + message "+" from partition: "+ oddiel);}

5.3. Konzumácia správ z konkrétneho oddielu

Ako ste si mohli všimnúť, vytvorili sme tému baeldung iba s jedným oddielom. Avšak pre tému s viacerými oddielmi a @KafkaListener sa môže výslovne prihlásiť na odber konkrétneho oddielu témy s počiatočným odsadením:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitionOffsets = {@PartitionOffset (partition = "0", initialOffset = "0"), @PartitionOffset (partition = "3", initialOffset = "0")}) , containerFactory = "partitionsKafkaListenerContainerFactory") public void listenToPartition (@Payload String message, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {System.out.println ("Received Message:" + message "+" from partition: "+ partition) ;}

Keďže initialOffset bol odoslaný na 0 v tomto poslucháčovi, všetky predtým spotrebované správy z oddielov 0 a troch sa znova spotrebujú pri každej inicializácii tohto poslucháča. Pokiaľ nastavenie offsetu nie je potrebné, môžeme použiť priečky majetok @TopicPartition anotácia na nastavenie iba oddielov bez odsadenia:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitions = {"0", "1"}))

5.4. Pridanie filtra správ pre poslucháčov

Poslucháčov je možné nakonfigurovať tak, aby konzumovali konkrétne typy správ, a to pridaním vlastného filtra. To je možné dosiahnuť nastavením a RecordFilterStrategy do KafkaListenerContainerFactory:

@Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = nový ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory ()); factory.setRecordFilterStrategy (record -> record.value (). contains ("World")); vrátiť továreň; }

Poslucháč potom môže byť nakonfigurovaný na používanie tejto továrne na kontajnery:

@KafkaListener (topic = "topicName", containerFactory = "filterKafkaListenerContainerFactory") public void listenWithFilter (reťazcová správa) {System.out.println ("Prijatá správa vo filtrovanom poslucháčovi:" + správa); }

V tomto poslucháčovi všetky správy zodpovedajúce filtru budú zahodené.

6. Vlastné prevádzače správ

Zatiaľ sme pokryli iba odosielanie a prijímanie reťazcov ako správ. Môžeme však tiež odosielať a prijímať vlastné objekty Java. To si vyžaduje nakonfigurovať vhodný serializátor v Producentská továreň a deserializátor v ConsumerFactory.

Pozrime sa na jednoduchú triedu fazule, ktoré pošleme ako správy:

public class Greeting {private String msg; súkromné ​​meno reťazca; // štandardní hľadači, zakladatelia a konštruktor}

6.1. Produkcia vlastných správ

V tomto príklade použijeme JsonSerializer. Pozrime sa na kód pre Producentská továreň a Šablóna Kafka:

@Bean public ProducerFactory greetingProducerFactory () {// ... configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); vrátiť nový DefaultKafkaProducerFactory (configProps); } @Bean public KafkaTemplate greetingKafkaTemplate () {return new KafkaTemplate (greetingProducerFactory ()); }

Táto nová Šablóna Kafka možno použiť na odoslanie Pozdravujem vás správa:

kafkaTemplate.send (topicName, new Greeting ("Hello", "World"));

6.2. Spotrebovanie vlastných správ

Podobne upravme ConsumerFactory a KafkaListenerContainerFactory správna deserializácia pozdravu:

@Bean public ConsumerFactory greetingConsumerFactory () {// ... vrátiť nový DefaultKafkaConsumerFactory (rekvizity, nový StringDeserializer (), nový JsonDeserializer (Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory pozdravKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = nový ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (greetingConsumerFactory ()); vrátiť továreň; }

Serializátor a deserializátor JSON na jar-kafka používa knižnicu Jackson, ktorá je tiež voliteľnou závislosťou maven pre projekt jar-kafka. Pridajme to teda k nášmu pom.xml:

 com.fasterxml.jackson.core jackson-databind 2.9.7 

Namiesto použitia najnovšej verzie Jacksona sa odporúča používať verziu, ktorá je pridaná do súboru pom.xml z jari-kafky.

Nakoniec musíme napísať poslucháča, ktorý konzumuje Pozdravujem vás správy:

@KafkaListener (topic = "topicName", containerFactory = "pozdravKafkaListenerContainerFactory") public void greetingListener (pozdrav s pozdravom) {// spracovanie pozdravu}

7. Záver

V tomto článku sme sa venovali základom jarnej podpory pre Apache Kafku. Krátko sme sa pozreli na triedy, ktoré sa používajú na odosielanie a prijímanie správ.

Kompletný zdrojový kód tohto článku nájdete na GitHub. Pred spustením kódu sa uistite, že je spustený server Kafka a témy sú vytvorené ručne.

Perzistencia dno

Práve som oznámil nové Naučte sa jar kurz zameraný na základy jari 5 a Spring Boot 2:

>> SKONTROLUJTE KURZ

$config[zx-auto] not found$config[zx-overlay] not found