Úvod do Apache Kafka s jarom
Práve som oznámil nové Naučte sa jar kurz zameraný na základy jari 5 a Spring Boot 2:
>> SKONTROLUJTE KURZ1. Prehľad
Apache Kafka je distribuovaný a na chyby odolný systém spracovania toku.
V tomto článku sa budeme venovať jarnej podpore Kafky a úrovni abstrakcií, ktoré poskytuje cez natívne klientske rozhrania API klienta Kafka Java.
Jarná Kafka prináša jednoduchý a typický jarný programovací model šablón s a Šablóna Kafka a POJO riadené správami prostredníctvom @KafkaListener anotácia.
2. Inštalácia a nastavenie
Ak si chcete stiahnuť a nainštalovať Kafku, pozrite si tu oficiálneho sprievodcu.
Musíme tiež pridať jar-kafka závislosť na našom pom.xml:
org.springframework.kafka spring-kafka 2.3.7.VYDANIE
Najnovšiu verziu tohto artefaktu nájdete tu.
Našou ukážkovou aplikáciou bude aplikácia Spring Boot.
Tento článok predpokladá, že server je spustený pomocou predvolenej konfigurácie a nezmenia sa žiadne porty servera.
3. Konfigurácia tém
Predtým sme používali nástroje príkazového riadku na vytváranie tém v Kafke, ako napríklad:
$ bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replikačný faktor 1 - oddiely 1 \ --topický mytopický
Ale so zavedením AdminClient v Kafke teraz môžeme vytvárať témy programovo.
Musíme pridať KafkaAdmin Jarná fazuľa, ktorá automaticky pridá témy pre všetky fazule typu Nová téma:
@ Konfigurácia verejná trieda KafkaTopicConfig {@Value (hodnota = "$ {kafka.bootstrapAddress}") súkromný reťazec bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin () {Map configs = new HashMap (); configs.put (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); vrátiť nový KafkaAdmin (konfigurácie); } @Bean public NewTopic topic1 () {návrat new NewTopic ("baeldung", 1, (short) 1); }}
4. Produkcia správ
Na vytváranie správ je potrebné najskôr nakonfigurovať a Producentská továreň ktorá stanovuje stratégiu pre vytvorenie Kafku Výrobca inštancie.
Potom potrebujeme a Šablóna Kafka ktorý obaľuje a Výrobca inštancia a poskytuje pohodlné metódy na odosielanie správ na témy Kafka.
Výrobca inštancie sú bezpečné z hľadiska vlákien, a preto použitie jednej inštancie v celom kontexte aplikácie poskytne vyšší výkon. V dôsledku toho Šablóna Kakfa inštancie sú tiež bezpečné z hľadiska vlákien a odporúča sa použitie jednej inštancie.
4.1. Konfigurácia výrobcu
@Configuration public class KafkaProducerConfig {@Bean public ProducerFactory producerFactory () {Map configProps = new HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); vrátiť nový DefaultKafkaProducerFactory (configProps); } @Bean public KafkaTemplate kafkaTemplate () {return new KafkaTemplate (producerFactory ()); }}
4.2. Publikovanie správ
Správy môžeme posielať pomocou Šablóna Kafka trieda:
@Autowired private KafkaTemplate kafkaTemplate; public void sendMessage (reťazec msg) {kafkaTemplate.send (topicName, msg); }
The poslať API vráti a ListenableFuture objekt. Ak chceme zablokovať odosielacie vlákno a získať výsledok o odoslanej správe, môžeme zavolať na dostať API ListenableFuture objekt. Vlákno počká na výsledok, ale spomalí to výrobcu.
Kafka je platforma na rýchle spracovanie streamov. Preto je lepšie pracovať s výsledkami asynchrónne, aby nasledujúce správy nečakali na výsledok predchádzajúcej správy. Môžeme to urobiť pomocou spätného volania:
public void sendMessage (reťazcová správa) {ListenableFuture future = kafkaTemplate.send (topicName, message); future.addCallback (nový ListenableFutureCallback() {@Override public void onSuccess (SendResult result) {System.out.println ("Sent message = [" + message + "] with offset = [" + result.getRecordMetadata (). Offset () + "]")) ; } @Override public void onFailure (Throwable ex) {System.out.println ("Nemožno odoslať správu = [" + správa + "] z dôvodu:" + ex.getMessage ()); }}); }
5. Konzumácia správ
5.1. Konfigurácia spotrebiteľa
Na príjem správ je potrebné nakonfigurovať a ConsumerFactory a a KafkaListenerContainerFactory. Len čo sú tieto fazule dostupné v továrni na výrobu jarných bôbov, môžu sa zákazníci používajúci POJO nakonfigurovať pomocou @KafkaListener anotácia.
@EnableKafka pre konfiguračnú triedu sa vyžaduje anotácia, aby sa umožnila detekcia @KafkaListener anotácia k jarným fazuľám:
@EnableKafka @Configuration verejná trieda KafkaConsumerConfig {@Bean public ConsumerFactory consumerFactory () {Map props = new HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put (ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); vrátiť nové DefaultKafkaConsumerFactory (rekvizity); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = nový ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory ()); vrátiť továreň; }}
5.2. Konzumácia správ
@KafkaListener (topic = "topicName", groupId = "foo") public void listenGroupFoo (reťazcová správa) {System.out.println ("prijatá správa v skupine foo:" + správa); }
Pre jednu tému je možné implementovať viac poslucháčov, každá s inou skupinou Id. Jeden spotrebiteľ môže ďalej počúvať správy z rôznych tém:
@KafkaListener (topic = "topic1, topic2", groupId = "foo")
Jar tiež podporuje načítanie jednej alebo viacerých hlavičiek správ pomocou @ Hlavička anotácia v poslucháčovi:
@KafkaListener (topic = "topicName") public void listenWithHeaders (@Payload String message, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {System.out.println ("Received Message:" + message "+" from partition: "+ oddiel);}
5.3. Konzumácia správ z konkrétneho oddielu
Ako ste si mohli všimnúť, vytvorili sme tému baeldung iba s jedným oddielom. Avšak pre tému s viacerými oddielmi a @KafkaListener sa môže výslovne prihlásiť na odber konkrétneho oddielu témy s počiatočným odsadením:
@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitionOffsets = {@PartitionOffset (partition = "0", initialOffset = "0"), @PartitionOffset (partition = "3", initialOffset = "0")}) , containerFactory = "partitionsKafkaListenerContainerFactory") public void listenToPartition (@Payload String message, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {System.out.println ("Received Message:" + message "+" from partition: "+ partition) ;}
Keďže initialOffset bol odoslaný na 0 v tomto poslucháčovi, všetky predtým spotrebované správy z oddielov 0 a troch sa znova spotrebujú pri každej inicializácii tohto poslucháča. Pokiaľ nastavenie offsetu nie je potrebné, môžeme použiť priečky majetok @TopicPartition anotácia na nastavenie iba oddielov bez odsadenia:
@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitions = {"0", "1"}))
5.4. Pridanie filtra správ pre poslucháčov
Poslucháčov je možné nakonfigurovať tak, aby konzumovali konkrétne typy správ, a to pridaním vlastného filtra. To je možné dosiahnuť nastavením a RecordFilterStrategy do KafkaListenerContainerFactory:
@Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = nový ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory ()); factory.setRecordFilterStrategy (record -> record.value (). contains ("World")); vrátiť továreň; }
Poslucháč potom môže byť nakonfigurovaný na používanie tejto továrne na kontajnery:
@KafkaListener (topic = "topicName", containerFactory = "filterKafkaListenerContainerFactory") public void listenWithFilter (reťazcová správa) {System.out.println ("Prijatá správa vo filtrovanom poslucháčovi:" + správa); }
V tomto poslucháčovi všetky správy zodpovedajúce filtru budú zahodené.
6. Vlastné prevádzače správ
Zatiaľ sme pokryli iba odosielanie a prijímanie reťazcov ako správ. Môžeme však tiež odosielať a prijímať vlastné objekty Java. To si vyžaduje nakonfigurovať vhodný serializátor v Producentská továreň a deserializátor v ConsumerFactory.
Pozrime sa na jednoduchú triedu fazule, ktoré pošleme ako správy:
public class Greeting {private String msg; súkromné meno reťazca; // štandardní hľadači, zakladatelia a konštruktor}
6.1. Produkcia vlastných správ
V tomto príklade použijeme JsonSerializer. Pozrime sa na kód pre Producentská továreň a Šablóna Kafka:
@Bean public ProducerFactory greetingProducerFactory () {// ... configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); vrátiť nový DefaultKafkaProducerFactory (configProps); } @Bean public KafkaTemplate greetingKafkaTemplate () {return new KafkaTemplate (greetingProducerFactory ()); }
Táto nová Šablóna Kafka možno použiť na odoslanie Pozdravujem vás správa:
kafkaTemplate.send (topicName, new Greeting ("Hello", "World"));
6.2. Spotrebovanie vlastných správ
Podobne upravme ConsumerFactory a KafkaListenerContainerFactory správna deserializácia pozdravu:
@Bean public ConsumerFactory greetingConsumerFactory () {// ... vrátiť nový DefaultKafkaConsumerFactory (rekvizity, nový StringDeserializer (), nový JsonDeserializer (Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory pozdravKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = nový ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (greetingConsumerFactory ()); vrátiť továreň; }
Serializátor a deserializátor JSON na jar-kafka používa knižnicu Jackson, ktorá je tiež voliteľnou závislosťou maven pre projekt jar-kafka. Pridajme to teda k nášmu pom.xml:
com.fasterxml.jackson.core jackson-databind 2.9.7
Namiesto použitia najnovšej verzie Jacksona sa odporúča používať verziu, ktorá je pridaná do súboru pom.xml z jari-kafky.
Nakoniec musíme napísať poslucháča, ktorý konzumuje Pozdravujem vás správy:
@KafkaListener (topic = "topicName", containerFactory = "pozdravKafkaListenerContainerFactory") public void greetingListener (pozdrav s pozdravom) {// spracovanie pozdravu}
7. Záver
V tomto článku sme sa venovali základom jarnej podpory pre Apache Kafku. Krátko sme sa pozreli na triedy, ktoré sa používajú na odosielanie a prijímanie správ.
Kompletný zdrojový kód tohto článku nájdete na GitHub. Pred spustením kódu sa uistite, že je spustený server Kafka a témy sú vytvorené ručne.
Perzistencia dno