Testovanie Kafky a Spring Boot

1. Prehľad

Apache Kafka je výkonný, distribuovaný systém spracovania toku odolný voči chybám. V predchádzajúcom tutoriáli sme sa naučili, ako pracovať s Springom a Kafkom.

V tomto návode nadviažeme na ten predchádzajúci a naučíme sa, ako napísať spoľahlivé, samostatné integračné testy, ktoré sa nespoliehajú na spustený externý server Kafka.

Najprv začneme, ale pozrieme sa na to, ako používať a konfigurovať vloženú inštanciu Kafky. Potom uvidíme, ako môžeme z našich testov využiť populárny framework Testcontainers.

2. Závislosti

Samozrejme, budeme musieť pridať štandard jar-kafka závislosť na našom pom.xml:

 org.springframework.kafka spring-kafka 2.6.3.VYDANIE 

Potom budeme špeciálne pre naše testy potrebovať ďalšie dve závislosti. Najskôr pridáme jarna-kafkova-skuska artefakt:

 org.springframework.kafka spring-kafka-test 2.6.3.RELEASE test 

A na záver pridáme závislosť Testcontainers Kafka, ktorá je tiež k dispozícii na serveri Maven Central:

 org.testcontainers kafka 1.15.0 test 

Teraz, keď máme nakonfigurované všetky potrebné závislosti, môžeme pomocou Kafky napísať jednoduchú aplikáciu Spring Boot.

3. Jednoduchá aplikácia výrobcu a spotrebiteľa Kafka

V celom tomto výučbe sa zameriame na naše testy na jednoduchú aplikáciu Spring Boot Kafka medzi výrobcom a spotrebiteľom.

Začnime definovaním nášho vstupného bodu aplikácie:

@SpringBootApplication @EnableAutoConfiguration verejná trieda KafkaProducerConsumerApplication {public static void main (String [] args) {SpringApplication.run (KafkaProducerConsumerApplication.class, args); }}

Ako vidíme, jedná sa o štandardnú aplikáciu Spring Boot. Pokiaľ je to možné, chceme využiť predvolené konfiguračné hodnoty. V tejto súvislosti využívame @EnableAutoConfiguration anotácia na automatickú konfiguráciu našej aplikácie.

3.1. Nastavenie výrobcu

Ďalej uvažujme o fazuli producenta, ktorú použijeme na odosielanie správ na danú tému Kafka:

@Component public class KafkaProducer {private static final Logger LOGGER = LoggerFactory.getLogger (KafkaProducer.class); @Autowired private KafkaTemplate kafkaTemplate; public void send (téma reťazca, užitočné zaťaženie reťazca) {LOGGER.info ("odosielanie užitočného zaťaženia =" {} "do témy =" {} "", užitočné zaťaženie, téma); kafkaTemplate.send (téma, užitočné zaťaženie); }}

Náš KafkaVýrobca fazuľa definovaná vyššie je iba obalom okolo Šablóna Kafka trieda. Táto trieda poskytuje operácie bezpečné na vysokej úrovni pre vlákna, napríklad odosielanie údajov na zadanú tému, čo je presne to, čo robíme v našom poslať metóda.

3.2. Spotrebiteľské nastavenie

Rovnako teraz definujeme jednoduchý fazuľa spotrebiteľa, ktorá bude počúvať tému Kafka a bude dostávať správy:

@Component public class KafkaConsumer {private static final Logger LOGGER = LoggerFactory.getLogger (KafkaConsumer.class); súkromná západka CountDownLatch = nová CountDownLatch (1); private String užitočné zaťaženie = null; @KafkaListener (topic = "$ {test.topic}") public void receive (ConsumerRecord consumerRecord) {LOGGER.info ("receive payload =" {} "", consumerRecord.toString ()); setPayload (consumerRecord.toString ()); latch.countDown (); } public CountDownLatch getLatch () {návratová západka; } public String getPayload () {návrat užitočné zaťaženie; }}

Náš jednoduchý spotrebiteľ používa @KafkaListener anotácia k prijímať metóda počúvania správ na danú tému. Uvidíme neskôr, ako nakonfigurujeme test.topický z našich testov.

Ďalej metóda prijímania ukladá obsah správy do našej fazule a znižuje počet západka premenná. Táto premenná je jednoduché pole počítadla bezpečné pre vlákna, ktoré použijeme neskôr z našich testov, aby sme sa uistili, že sme úspešne dostali správu.

Teraz, keď máme implementovanú našu jednoduchú aplikáciu Kafka pomocou Spring Boot, pozrime sa, ako môžeme písať integračné testy.

4. Slovo o testovaní

Všeobecne by sme pri písaní čistých integračných testov nemali závisieť od externých služieb, ktoré by sme nemuseli ovládať alebo by mohli náhle prestať fungovať.. To by mohlo mať nepriaznivé účinky na výsledky našich testov.

Podobne, ak sme závislí od externej služby, v tomto prípade od bežiaceho sprostredkovateľa Kafka, pravdepodobne ju nebudeme schopní nastaviť, ovládať a zbúrať tak, ako by sme chceli, z našich testov.

4.1. Vlastnosti aplikácie

Z našich testov použijeme veľmi ľahkú sadu vlastností konfigurácie aplikácie. Tieto vlastnosti si zadefinujeme v našom src / test / resources / application.yml spis:

jar: kafka: spotrebiteľ: auto-offset-reset: najskoršie ID skupiny: test baeldung: téma: téma vloženého testu

Toto je minimálna sada vlastností, ktoré potrebujeme pri práci s vloženou inštanciou Kafky alebo miestnym sprostredkovateľom.

Väčšina z nich je samozrejmých, ale ten, ktorý by sme mali osobitne zdôrazniť, je spotrebiteľské vlastníctvo auto-offset-reset: najskôr. Táto vlastnosť zaisťuje, že naša skupina spotrebiteľov dostane správy, ktoré odosielame, pretože kontajner sa môže spustiť po dokončení odosielania.

Ďalej nakonfigurujeme vlastnosť témy s hodnotou embedded-test-topic, čo je téma, ktorú použijeme z našich testov.

5. Testovanie pomocou zabudovanej Kafky

V tejto časti sa pozrieme na to, ako použiť inštanciu Kafka v pamäti na spustenie našich testov. Toto je tiež známe ako Embedded Kafka.

Závislosť jarna-kafkova-skuska predtým sme pridali niekoľko užitočných nástrojov na pomoc s testovaním našej aplikácie. Najdôležitejšie je, že obsahuje EmbeddedKafkaBroker trieda.

S týmto vedomím poďme ďalej a napíšme náš prvý test integrácie:

@SpringBootTest @DirtiesContext @EmbeddedKafka (partitions = 1, brokerProperties = {"listers = PLAINTEXT: // localhost: 9092", "port = 9092"}) trieda EmbeddedKafkaIntegrationTest {@Autowired súkromný spotrebiteľ KafkaConsumer; @Autowired private KafkaProducer producent; @Value ("$ {test.topic}") súkromná téma reťazca; @Test public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived () vyvolá výnimku {producer.send (téma, "Odosielanie pomocou vlastného jednoduchého programu KafkaProducer"); consumer.getLatch (). await (10 000, TimeUnit.MILLISECONDS); assertThat (consumer.getLatch (). getCount (), equalTo (0L)); assertThat (consumer.getPayload (), containsString ("embedded-test-topic")); }}

Prejdime si kľúčové časti nášho testu. Najprv začneme zdobením našej testovacej triedy dvoma celkom štandardnými jarnými anotáciami:

  • The @SpringBootTest anotácia zabezpečí, že náš testovací bootstrap zachytí kontext aplikácie Spring
  • Používame tiež @DirtiesContext anotácia, ktorá zabezpečí, že sa tento kontext vyčistí a vynuluje medzi rôznymi testami

Tu prichádza rozhodujúca časť, ktorú používame @EmbeddedKafka anotácia na vloženie inštancie súboru EmbeddedKafkaBroker do našich testov. Okrem toho je k dispozícii niekoľko vlastností, ktoré môžeme použiť na konfiguráciu vloženého uzla Kafka:

  • priečky - toto je počet oddielov použitých na tému. Aby boli veci pekné a jednoduché, chceme, aby sa z našich testov používal iba jeden
  • brokerVlastnosti - ďalšie vlastnosti pre sprostredkovateľa Kafka. Znovu udržujeme veci jednoduché a určujeme poslucháča obyčajného textu a číslo portu

Ďalej automaticky drôtujeme naše spotrebiteľ a producent triedy a nakonfigurovať tému tak, aby používala hodnotu z nášho application.properties.

Za posledný kúsok skladačky jednoducho pošleme správu na našu testovaciu tému a overíme si, že správa bola prijatá a obsahuje názov našej testovacej témy.

Keď spustíme náš test, uvidíme medzi podrobným výstupom Spring:

... 12: 45: 35.099 [main] INFO cbkafka.embedded.KafkaProducer - odosielanie užitočného zaťaženia = "Odosielanie pomocou nášho vlastného jednoduchého programu KafkaProducer" do topic = "embedded-test-topic" ... 12: 45: 35.103 [org .springframework.kafka.KafkaListenerEndpointContainer # 0-0-C-1] INFO cbkafka.embedded.KafkaConsumer - prijaté užitočné zaťaženie = 'ConsumerRecord (topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1605267935099, veľkosť serializovaného kľúča = -1, veľkosť serializovanej hodnoty = 41, hlavičky = RecordHeaders (hlavičky = [], isReadOnly = false),  key = null, value = Sending with our own simple KafkaProducer) ' 

To potvrdzuje, že náš test funguje správne. Úžasné! Teraz máme spôsob, ako písať samostatné, nezávislé integračné testy pomocou sprostredkovateľa Kafka v pamäti.

6. Testovanie Kafky pomocou TestContainers

Niekedy môžeme vidieť malé rozdiely medzi skutočnou externou službou a vloženou inštanciou služby v pamäti, ktorá bola špeciálne poskytnutá na účely testovania. Aj keď je to nepravdepodobné, mohlo by sa tiež stať, že port použitý v našom teste môže byť obsadený, čo spôsobí poruchu.

Z tohto dôvodu v tejto časti uvidíme variáciu nášho predchádzajúceho prístupu k testovaniu pomocou rámca Testcontainers. Uvidíme, ako z nášho integračného testu vytvoriť inštanciu a spravovať externého sprostredkovateľa Apache Kafka hosteného vo vnútri kontajnera Docker.

Poďme definovať ďalší integračný test, ktorý bude dosť podobný tomu, ktorý sme videli v predchádzajúcej časti:

@RunWith (SpringRunner.class) @import (com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class) @SpringBootTest (triedy = KafkaProducerConsumerApplication.class) @DirtiesContext public class KafkaTestContainersLiveTest {@ClassRule public static KafkaContainer Kafka = nový KafkaContainer (DockerImageName .parse ("confluentinc / cp-kafka: 5.4.3")); @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producent; @Value ("$ {test.topic}") súkromná téma reťazca; @Test public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived () vyvolá výnimku {producer.send (téma, "Odosielanie s vlastným radičom"); consumer.getLatch (). await (10 000, TimeUnit.MILLISECONDS); assertThat (consumer.getLatch (). getCount (), equalTo (0L)); assertThat (consumer.getPayload (), containsString ("embedded-test-topic")); }}

Poďme sa teraz pozrieť na rozdiely. Vyhlasujeme kafka pole, čo je štandardná JUnit @ClassRule. Toto pole je inštanciou súboru Kontajner Kafka triedy, ktorá pripraví a bude riadiť životný cyklus nášho kontajnera s prevádzkou Kafka.

Aby sa zabránilo kolíziám portov, Testcontainers pridelí číslo portu dynamicky, keď sa spustí náš kontajner ukotvenia. Z tohto dôvodu poskytujeme zákaznícku a výrobnú továrenskú konfiguráciu pomocou triedy Konfigurácia KafkaTestContainers:

@Bean public Map consumerConfigs () {Map rekvizity = nový HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers ()); props.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, „najskoršie“); props.put (ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); // viac štandardných rekvizít na vrátenie konfigurácie; } @Bean public ProducerFactory producerFactory () {Map configProps = nový HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers ()); // viac štandardnej konfigurácie vráti new DefaultKafkaProducerFactory (configProps); }

Na túto konfiguráciu potom odkazujeme prostredníctvom servera @Import anotáciu na začiatku nášho testu.

Dôvod je ten, že potrebujeme spôsob, ako vložiť adresu servera do našej aplikácie, ktorá sa, ako už bolo spomenuté, generuje dynamicky. Dosiahneme to volaním getBootstrapServers () metóda, ktorá vráti umiestnenie bootstrap servera:

bootstrap.servers = [PLAINTEXT: // localhost: 32789]

Teraz, keď spustíme náš test, mali by sme vidieť, že Testcontainers robí niekoľko vecí:

  • Skontroluje naše miestne nastavenie Dockeru.
  • Potiahne confluentinc / cp-kafka: 5.4.3 obrázok ukotvenia, ak je to potrebné
  • Spustí nový kontajner a čaká, až bude pripravený
  • Nakoniec po ukončení nášho testu vypne a vymaže kontajner

Opäť sa to potvrdzuje kontrolou výstupného testu:

13: 33: 10.396 [hlavné] INFO 🐳 [confluentinc / cp-kafka: 5.4.3] - Vytvorenie kontajnera pre obrázok: confluentinc / cp-kafka: 5.4.3 13: 33: 10.454 [hlavné] INFO 🐳 [confluentinc / cp -kafka: 5.4.3] - Počnúc nádobu s ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3 13: 33: 10,785 [hlavný] INFO 🐳 [confluentinc / CP-Kafka: 5.4.3] - kontajner confluentinc / CP-Kafka: 5.4.3 je predvolený: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

Presto! Fungujúci test integrácie využívajúci kontajner ukotvenia Kafka.

7. Záver

V tomto článku sme sa dozvedeli o niekoľkých prístupoch k testovaniu aplikácií Kafka pomocou nástroja Spring Boot. V prvom prístupe sme videli, ako nakonfigurovať a používať lokálneho sprostredkovateľa Kafka v pamäti.

Potom sme videli, ako použiť Testcontainers na nastavenie externého sprostredkovateľa Kafka bežiaceho vo vnútri dokovacieho kontajnera z našich testov.

Celý zdrojový kód článku je ako vždy k dispozícii na GitHub.


$config[zx-auto] not found$config[zx-overlay] not found