Odosielanie správ RabbitMQ s jarným AMQP

1. Úvod

V tomto tutoriáli preskúmame koncept fanout a výmeny tém s jarnými AMQP a RabbitMQ.

Na vysokej úrovni fanoutové výmeny bude vysielať rovnakú správu do všetkých viazaných frontov, zatiaľ čo výmeny tém použiť smerovací kľúč pre odovzdávanie správ do konkrétneho viazaného frontu alebo frontov.

V tomto výučbe sa odporúča predchádzajúce čítanie správ s jarným programom AMQP.

2. Nastavenie výmeny fanoutov

Pripravme jednu výmenu fanoutu s dvoma radmi, ktoré sú k nej pripojené. Keď pošleme správu na túto burzu, obidve fronty správu dostanú. Naša výmenná služba fanout ignoruje akýkoľvek smerovací kľúč zahrnutý v správe.

Jarný AMQP nám umožňuje agregovať všetky vyhlásenia o frontoch, výmenách a väzbách do a Deklarovateľné objekt:

@Bean public Declarables fanoutBindings () {Queue fanoutQueue1 = new Queue ("fanout.queue1", false); Fronta fanoutQueue2 = nová fronta ("fanout.queue2", false); FanoutExchange fanoutExchange = nový FanoutExchange ("fanout.exchange"); vrátiť nové Declarables (fanoutQueue1, fanoutQueue2, fanoutExchange, bind (fanoutQueue1) .to (fanoutExchange), BindingBuilder.bind (fanoutQueue2) .to (fanoutExchange)); }

3. Nastavenie výmeny tém

Teraz tiež nastavíme výmenu tém s dvoma radmi, z ktorých každá má iný vzor väzby:

@Bean public Declarables topicBindings () {Queue topicQueue1 = nový front (topicQueue1Name, false); Fronta topicQueue2 = nový front (topicQueue2Name, false); TopicExchange topicExchange = nový TopicExchange (topicExchangeName); vrátiť nové Declarables (topicQueue1, topicQueue2, topicExchange, BindingBuilder .bind (topicQueue1) .to (topicExchange) .with ("*. important. *"), BindingBuilder .bind (topicQueue2) .to (topicExchange) .with ("#. chyba")); }

Výmena tém nám umožňuje viazať na ňu fronty s rôznymi kľúčovými vzormi. Toto je veľmi flexibilné a umožňuje nám viazať viac frontov s rovnakým vzorom alebo dokonca viac vzorov do rovnakého radu.

Keď sa smerovací kľúč správy zhoduje so vzorom, bude umiestnený v poradí. Ak má front viac väzieb, ktoré sa zhodujú so smerovacím kľúčom správy, do frontu sa umiestni iba jedna kópia správy.

Naše vzory väzieb môžu používať hviezdičku („*“) na priradenie slova v konkrétnej pozícii alebo znak libry („#“) na priradenie nula alebo viacerých slov.

Takže náš topicQueue1 bude dostávať správy, ktoré majú smerovacie klávesy s trojslovným vzorom, pričom stredné slovo je „dôležité“ - napríklad: „User.important.error“ alebo „Blog.important.notification“.

A náš topicQueue2 bude dostávať správy, ktoré majú smerovacie kľúče končiace slovom chyba; zodpovedajúce príklady sú "chyba", „User.important.error“ alebo „Blog.post.save.error“.

4. Nastavenie producenta

Použijeme convertAndSend metóda KrálikŠablóna poslať naše vzorové správy:

 String message = "užitočné zaťaženie sa vysiela"; return args -> {rabbitTemplate.convertAndSend (FANOUT_EXCHANGE_NAME, "", "fanout" + správa); rabbitTemplate.convertAndSend (TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, „téma dôležité varovať“ + správa); rabbitTemplate.convertAndSend (TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, „téma dôležitá chyba“ + správa); };

The KrálikŠablóna poskytuje mnoho preťažených convertAndSend () metódy pre rôzne typy výmeny.

Keď pošleme správu na fanoutovú ústredňu, smerovací kľúč sa ignoruje a správa sa odošle do všetkých viazaných frontov.

Keď pošleme správu na burzu tém, musíme odovzdať smerovací kľúč. Na základe tohto smerovacieho kľúča bude správa doručená do konkrétnych frontov.

5. Konfigurácia spotrebiteľov

Nakoniec vytvorme štyroch spotrebiteľov - jedného pre každú frontu - na vyzdvihnutie vytvorených správ:

 @RabbitListener (fronty = {FANOUT_QUEUE_1_NAME}) verejné neplatné receiveMessageFromFanout1 (reťazcová správa) {System.out.println ("Prijatá správa pre fanout 1:" + správa); } @RabbitListener (fronty = {FANOUT_QUEUE_2_NAME}) verejné void receiveMessageFromFanout2 (reťazcová správa) {System.out.println ("Prijatá správa pre fanout 2:" + správa); } @RabbitListener (queues = {TOPIC_QUEUE_1_NAME}) public void receiveMessageFromTopic1 (reťazcová správa) {System.out.println ("Prijatá téma 1 (" + BINDING_PATTERN_IMPORTANT + ") správa:" + správa); } @RabbitListener (queues = {TOPIC_QUEUE_2_NAME}) public void receiveMessageFromTopic2 (reťazcová správa) {System.out.println ("Prijatá téma 2 (" + BINDING_PATTERN_ERROR + ") správa:" + správa); }

Spotrebiteľov konfigurujeme pomocou @RabbitListener anotácia. Jediným argumentom, ktorý sa tu predloží, je názov frontu. Spotrebitelia tu nie sú informovaní o výmene ani smerovacích kľúčoch.

6. Spustenie príkladu

Náš vzorový projekt je aplikácia Spring Boot, a tak inicializuje aplikáciu spolu s pripojením k RabbitMQ a nastaví všetky fronty, výmeny a väzby.

V predvolenom nastavení naša aplikácia očakáva inštanciu RabbitMQ bežiacu na localhost na porte 5672. Toto a ďalšie predvolené hodnoty môžeme upraviť v aplikácia.yaml.

Náš projekt vystavuje koncový bod HTTP na URI - / vysielať - ktorý prijíma POST so správou v tele žiadosti.

Keď pošleme požiadavku na tento URI s textom „Test“, mali by sme vo výstupe vidieť niečo podobné ako toto:

Prijatá správa fanout 1: vysielané užitočné zaťaženie fanout Prijatá správa témy 1 (*. Dôležité. *): Správa dôležité témy varovať užitočné zaťaženie Vysiela sa správa prijatá téma 2 (# .error): je odosielaná dôležitá správa dôležitej chyby Prijatá správa fanout 2: užitočné zaťaženie fanout je vysielaná Prijatá správa s témou 1 (*. dôležité. *): je vysielaná dôležitá téma s dôležitou chybou

Poradie, v ktorom sa tieto správy zobrazia, samozrejme nie je zaručené.

7. Záver

V tomto rýchlom výučbe sme sa venovali výmenám fanout a tém s Spring AMQP a RabbitMQ.

Kompletný zdrojový kód a všetky útržky kódu pre tento tutoriál sú k dispozícii v úložisku GitHub.


$config[zx-auto] not found$config[zx-overlay] not found