Apache RocketMQ s Spring Boot

1. Úvod

V tomto tutoriáli vytvoríme producenta a spotrebiteľa správ pomocou platformy Spring Boot a Apache RocketMQ, čo je open-source platforma pre distribuované zasielanie správ a streamovanie údajov.

2. Závislosti

Pre projekty Maven musíme pridať závislosť RocketMQ Spring Boot Starter:

 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 

3. Produkcia správ

Pre náš príklad vytvoríme základného producenta správ, ktorý bude odosielať udalosti vždy, keď používateľ pridá alebo odstráni položku z nákupného košíka.

Najskôr nastavíme umiestnenie nášho servera a názov skupiny v našom application.properties:

rocketmq.name-server = 127.0.0.1: 9876 rocketmq.producer.group = skupina výrobcov-vozidiel

Upozorňujeme, že ak by sme mali viac ako jeden menný server, mohli by sme ich uviesť v zozname rád hostiteľ: port; hostiteľ: port.

Aby sme to zjednodušili, vytvoríme teraz CommandLineRunner a počas spustenia aplikácie vygenerovať niekoľko udalostí:

@SpringBootApplication verejná trieda CartEventProducer implementuje CommandLineRunner {@Autowired private RocketMQTemplate rocketMQTemplate; public static void main (String [] args) {SpringApplication.run (CartEventProducer.class, args); } public void run (String ... args) hodí Exception {rocketMQTemplate.convertAndSend ("cart-item-add-topic", new CartItemEvent ("bike", 1)); rocketMQTemplate.convertAndSend ("cart-item-add-topic", nový CartItemEvent ("počítač", 2)); rocketMQTemplate.convertAndSend ("cart-item-removed-topic", new CartItemEvent ("bike", 1)); }}

The CartItemEvent pozostáva iba z dvoch vlastností - id položky a množstvo:

trieda CartItemEvent {private String itemId; súkromné ​​množstvo; // konštruktor, getri a nastavovatelia}

Vo vyššie uvedenom príklade používame convertAndSend () metóda, všeobecná metóda definovaná AbstractMessageSendingTemplate abstraktná trieda, na odoslanie udalostí nášho košíka. Trvá to dva parametre: Cieľ, ktorým je v našom prípade názov témy, a užitočné zaťaženie správy.

4. Spotrebiteľ správy

Spotrebovanie správ RocketMQ je také jednoduché ako vytváranie komponentu Spring s poznámkami @RocketMQMessageListener a implementácia RocketMQListener rozhranie:

@SpringBootApplication verejná trieda CartEventConsumer {public static void main (String [] args) {SpringApplication.run (CartEventConsumer.class, args); } @Service @RocketMQMessageListener (topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic") verejná trieda CardItemAddConsumer implementuje RocketMQListener {public void onMessage (CartItemEvent addItemEvent) {log.info ( "Pridávanie položky: {}", addItemEvent); // additional logic}} @Service @RocketMQMessageListener (topic = "cart-item-removed-topic", consumerGroup = "cart-consumer_cart-item-removed-topic") verejná trieda CardItemRemoveConsumer implementuje RocketMQListener {public void onMessage (CartItemEvent removeItemEvent) {log.info ("Odstraňuje sa položka: {}", removeItemEvent); // ďalšia logika}}}

Musíme vytvoriť samostatný komponent pre každú tému správ, ktorú počúvame. V každom z týchto poslucháčov definujeme názov témy a názov skupiny spotrebiteľov prostredníctvom @RocketMQMessageListener anotácia.

5. Synchrónny a asynchrónny prenos

V predchádzajúcich príkladoch sme použili convertAndSend spôsob odosielania našich správ. Máme však niekoľko ďalších možností.

Mohli sme napríklad zavolať syncSend čo sa líši od convertAndSend pretože sa vracia SendResult objekt.

Môže sa použiť napríklad na overenie, či bola naša správa úspešne odoslaná, alebo na získanie jej ID:

public void run (String ... args) vyvolá Výnimku {SendResult addBikeResult = rocketMQTemplate.syncSend ("cart-item-add-topic", new CartItemEvent ("bike", 1)); SendResult addComputerResult = rocketMQTemplate.syncSend ("cart-item-add-topic", new CartItemEvent ("computer", 2)); SendResult removeBikeResult = rocketMQTemplate.syncSend ("cart-item-removed-topic", new CartItemEvent ("bike", 1)); }

Páči sa mi to convertAndSend, táto metóda sa vráti iba po dokončení postupu odosielania.

Synchrónny prenos by sme mali používať v prípadoch vyžadujúcich vysokú spoľahlivosť, ako sú dôležité notifikačné správy alebo SMS notifikácie.

Na druhej strane môžeme chcieť odoslať správu asynchrónne a po dokončení odosielania dostať upozornenie.

Môžeme to urobiť pomocou asyncSend, ktorá trvá a SendCallback ako parameter a vráti sa okamžite:

rocketMQTemplate.asyncSend ("cart-item-add-topic", nový CartItemEvent ("bike", 1), nový SendCallback () {@Override public void onSuccess (SendResult sendResult) {log.error ("Úspešne odoslaná položka košíka") ;} @Override public void onException (Throwable throwable) {log.error ("Výnimka počas odosielania položky košíka", throwable);}});

Asynchrónny prenos používame v prípadoch vyžadujúcich vysokú priepustnosť.

Nakoniec môžeme použiť scenáre, kde máme veľmi vysoké požiadavky na priepustnosť sendOneWay namiesto asyncSend. sendOneWay sa líši od asyncSend v tom, že nezaručuje odoslanie správy.

Jednosmerný prenos je možné použiť aj na bežné prípady spoľahlivosti, napríklad na zhromažďovanie protokolov.

6. Posielanie správ v transakcii

RocketMQ nám poskytuje možnosť odosielať správy v rámci transakcie. Môžeme to urobiť pomocou sendInTransaction () metóda:

MessageBuilder.withPayload (new CartItemEvent ("bike", 1)). Build (); rocketMQTemplate.sendMessageInTransaction ("test-transakcia", "topic-name", msg, null);

Musíme tiež implementovať a RocketMQLocalTransactionListener rozhranie:

@RocketMQTransactionListener (txProducerGroup = "test-transaction") trieda TransactionListenerImpl implementuje RocketMQLocalTransactionListener {@Override public RocketMQLocalTransactionState executeLocalTransaction (Message msg, Object arg) {// ... lokálny transakčný proces, návrat ROLLBACK, COMMIT UNOUNKONATOLÍCIA, COMMITUNOUNKNÁVKA, COMMITUNOUNKNIHÁTKA, COMMITUNOUNKNIČKA, COMMIT alebo } @Override public RocketMQLocalTransactionState checkLocalTransaction (Správa msg) {// ... skontrolovať stav transakcie a vrátiť ROLLBACK, COMMIT alebo UNKNOWN vrátiť RocketMQLocalTransactionState.COMMIT; }}

V sendMessageInTransaction (), prvým parametrom je názov transakcie. Musí to byť to isté ako @RocketMQTransactionListenerČlenské pole txProducerGroup.

7. Konfigurácia producenta správ

Môžeme tiež nakonfigurovať aspekty samotného producenta správ:

  • rocketmq.producer.send-message-timeout: Časový limit odoslania správy v milisekundách - predvolená hodnota je 3 000
  • rocketmq.producer.compress-message-body-threshold: Prahová hodnota, nad ktorou RocketMQ komprimuje správy - predvolená hodnota je 1024.
  • rocketmq.producer.max-message-size: Maximálna veľkosť správy v bajtoch - predvolená hodnota je 4096.
  • rocketmq.producer.retry-times-when-send-async-failed: Maximálny počet pokusov o interné vykonanie v asynchrónnom režime pred odoslaním zlyhania - predvolená hodnota je 2.
  • rocketmq.producer.retry-next-server: Označuje, či sa má pokúsiť skúsiť iného sprostredkovateľa pri internom odoslaní zlyhania - predvolená hodnota je nepravdivé.
  • rocketmq.producer.retry-times-when-send-failed: Maximálny počet pokusov o interné vykonanie v asynchrónnom režime pred odoslaním zlyhania - predvolená hodnota je 2.

8. Záver

V tomto článku sme sa naučili, ako odosielať a spotrebúvať správy pomocou serverov Apache RocketMQ a Spring Boot. Ako vždy je všetok zdrojový kód k dispozícii na GitHub.


$config[zx-auto] not found$config[zx-overlay] not found