Úvod do jarného cloudového prúdu

1. Prehľad

Spring Cloud Stream je rámec postavený na vrchole Spring Boot a Spring Integration pomáha pri vytváraní mikroslužieb riadených udalosťami alebo správami.

V tomto článku si predstavíme koncepty a konštrukcie Spring Cloud Stream s niekoľkými jednoduchými príkladmi.

2. Maven závislosti

Na začiatok budeme musieť pridať Spring Cloud Starter Stream so závislosťou sprostredkovateľa RabbitMQ Maven ako middleware pre zasielanie správ pom.xml:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 1.3.0.RELEASE 

A pridáme závislosť modulu z Maven Central, aby sme povolili aj podporu JUnit:

 org.springframework.cloud spring-cloud-stream-test-support 1.3.0.RELEASE test 

3. Hlavné koncepty

Architektúra mikroslužieb sa riadi zásadou „inteligentné koncové body a nemé rúry“. Komunikáciu medzi koncovými bodmi riadia strany zaoberajúce sa odosielaním middlewaru ako RabbitMQ alebo Apache Kafka. Služby komunikujú zverejnením doménových udalostí prostredníctvom týchto koncových bodov alebo kanálov.

Poďme sa prejsť pojmami, ktoré tvoria rámec Spring Cloud Stream, spolu so základnými paradigmami, ktoré si musíme uvedomiť, aby sme mohli budovať služby založené na správach.

3.1. Konštruuje

Pozrime sa na jednoduchú službu v Spring Cloud Stream, ktorá počúva vstup záväzné a odošle odpoveď do výkon väzba:

@SpringBootApplication @EnableBinding (Processor.class) verejná trieda MyLoggerServiceApplication {public static void main (String [] args) {SpringApplication.run (MyLoggerServiceApplication.class, args); } @StreamListener (Processor.INPUT) @SendTo (Processor.OUTPUT) public LogMessage enrichLogMessage (LogMessage log) {return new LogMessage (String.format ("[1]:% s", log.getMessage ())); }}

Anotácia @EnableBinding nakonfiguruje aplikáciu tak, aby viazala kanály VSTUP a VÝKON definované v rámci rozhrania procesor. Oba kanály sú väzby, ktoré je možné nakonfigurovať na použitie konkrétneho middlewaru pre správy alebo spojiva.

Pozrime sa na definíciu všetkých týchto pojmov:

  • Viazania - súbor rozhraní, ktoré deklaratívne identifikujú vstupné a výstupné kanály
  • Binder - implementácia messaging-middleware ako Kafka alebo RabbitMQ
  • Kanál - predstavuje komunikačné potrubie medzi messaging-middleware a aplikáciou
  • StreamListeners - metódy spracovania správ v beanoch, ktoré budú automaticky vyvolané na správu z kanála po MessageConverter vykonáva serializáciu / deserializáciu medzi udalosťami špecifickými pre middleware a typmi doménových objektov / POJO
  • Mesšalvia Schémy - používajú sa na serializáciu a deserializáciu správ, tieto schémy je možné staticky načítať z miesta alebo načítať dynamicky, čo podporuje vývoj typov doménových objektov

3.2. Komunikačné vzorce

Správy určené pre destinácie doručuje Publish-Subscribe vzor správ. Vydavatelia kategorizujú správy do tém, z ktorých každá je označená menom. Predplatitelia prejavujú záujem o jednu alebo viac tém. Middleware filtruje správy a doručuje predplatiteľom zaujímavé témy.

Teraz je možné zoskupiť predplatiteľov. A skupina spotrebiteľov je skupina predplatiteľov alebo spotrebiteľov označená a ID skupiny, v rámci ktorého sa správy z témy alebo jej oddielu doručujú spôsobom vyváženým z hľadiska zaťaženia.

4. Programovací model

Táto časť popisuje základy tvorby aplikácií Spring Cloud Stream.

4.1. Funkčné testovanie

Podpora testu je implementácia spojiva, ktorá umožňuje interakciu s kanálmi a kontrolu správ.

Pošleme správu vyššie uvedenému enrichLogMessage služba a skontrolujte, či odpoveď obsahuje text “[1]: “ na začiatku správy:

@RunWith (SpringJUnit4ClassRunner.class) @ContextConfiguration (classes = MyLoggerServiceApplication.class) @DirtiesContext verejná trieda MyLoggerApplicationTests {@Autowired private Processor pipe; @Autowired private MessageCollector messageCollector; @Test public void whenSendMessage_thenResponseShouldUpdateText () {pipe.input () .send (MessageBuilder.withPayload (new LogMessage ("This is my message")) .build ()); Užitočné zaťaženie objektu = messageCollector.forChannel (pipe.output ()) .poll () .getPayload (); assertEquals ("[1]: Toto je moja správa", payload.toString ()); }}

4.2. Vlastné kanály

Vo vyššie uvedenom príklade sme použili procesor rozhranie poskytované službou Spring Cloud, ktorá má iba jeden vstupný a jeden výstupný kanál.

Ak potrebujeme niečo iné, napríklad jeden vstupný a dva výstupné kanály, môžeme vytvoriť vlastný procesor:

verejné rozhranie MyProcessor {String INPUT = "myInput"; @Input SubscribableChannel myInput (); @Output ("myOutput") MessageChannel anOutput (); @Output MessageChannel anotherOutput (); }

Jar pre nás zabezpečí správnu implementáciu tohto rozhrania. Názvy kanálov je možné nastaviť pomocou anotácií ako v @Output („myOutput“).

V opačnom prípade bude Spring používať názvy metód ako názvy kanálov. Preto máme zavolané tri kanály myInput, myOutputa anotherOutput.

Teraz si predstavme, že chceme smerovať správy na jeden výstup, ak je hodnota menšia ako 10 a do iného výstupu je hodnota väčšia alebo rovná 10:

@Autowired súkromný procesor MyProcessor; @StreamListener (MyProcessor.INPUT) public void routeValues ​​(Integer val) {if (val <10) {processor.anOutput (). Send (message (val)); } else {processor.anotherOutput (). send (message (val)); }} private static final Message message (T val) {return MessageBuilder.withPayload (val) .build (); }

4.3. Podmienené odoslanie

Pomocou @StreamListener anotáciu, môžeme tiež filtrovať správy, ktoré od spotrebiteľa očakávame pomocou akejkoľvek podmienky, ktorú definujeme pomocou výrazov SpEL.

Ako príklad by sme mohli použiť podmienené dispečing ako ďalší prístup na smerovanie správ na rôzne výstupy:

@Autowired súkromný procesor MyProcessor; @StreamListener (target = MyProcessor.INPUT, condition = "payload = 10") public void routeValuesToAnotherOutput (Integer val) {processor.anotherOutput (). Send (message (val)); }

Jediný obmedzením tohto prístupu je, že tieto metódy nesmú vracať hodnotu.

5. Inštalácia

Poďme nastaviť aplikáciu, ktorá spracuje správu od brokera RabbitMQ.

5.1. Konfigurácia spojiva

Môžeme nakonfigurovať našu aplikáciu tak, aby používala predvolenú implementáciu spojiva pomocou META-INF / jarné spojivá:

králik: \ org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Alebo môžeme pridať knižnicu väzieb pre RabbitMQ do cesty triedy zahrnutím táto závislosť:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 1.3.0.RELEASE 

Pokiaľ nie je poskytnutá implementácia spojiva, Spring použije priamu komunikáciu medzi kanálmi.

5.2. Konfigurácia RabbitMQ

Ak chcete nakonfigurovať príklad v časti 3.1 na použitie spojiva RabbitMQ, musíme aktualizovať aplikácia.yml umiestnený na src / main / resources:

jar: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit output: destination: queue.pretty.log.messages binder: local_rabbit binders: local_rabbit: type: rabbit environment: spring: rabbitmq: host: port : 5672 používateľské meno: heslo: virtuálny hostiteľ: /

The vstup väzba použije burzu tzv fronty.log.správya výkon záväzné použije výmenu front.pretty.log.messages. Obe väzby budú používať spojivo tzv local_rabbit.

Upozorňujeme, že nemusíme vopred vytvárať burzy alebo fronty RabbitMQ. Pri spustení aplikácie obe burzy sa vytvárajú automaticky.

Na otestovanie aplikácie môžeme na zverejnenie správy použiť webovú stránku správy RabbitMQ. V Zverejniť správu panel výmeny fronty.log.správy, musíme zadať požiadavku vo formáte JSON.

5.3. Prispôsobenie prevodu správ

Spring Cloud Stream nám umožňuje použiť konverziu správ pre konkrétne typy obsahu. Vo vyššie uvedenom príklade chceme namiesto použitia formátu JSON poskytnúť obyčajný text.

Aby sme to dosiahli, urobíme to použiť vlastnú transformáciu na LogMessage pomocou a MessageConverter:

@SpringBootApplication @EnableBinding (Processor.class) verejná trieda MyLoggerServiceApplication {// ... @Bean public MessageConverter providesTextPlainMessageConverter () {return new TextPlainMessageConverter (); } // ...}
verejná trieda TextPlainMessageConverter rozširuje AbstractMessageConverter {public TextPlainMessageConverter () {super (nový MimeType ("text", "obyčajný")); } @Override chránené logické podpory (Class clazz) {return (LogMessage.class == clazz); } @Override chránený objekt convertFromInternal (správa správy, trieda targetClass, prevod objektuHint) {Object payload = message.getPayload (); String text = užitočné zaťaženie reťazca? (String) užitočné zaťaženie: nový reťazec ((byte []) užitočné zaťaženie); vrátiť novú LogMessage (textovú); }}

Po uplatnení týchto zmien sa vrátime späť na Zverejniť správu panel, ak nastavíme hlavičku “typy obsahu“Až„text / obyčajný“A užitočné zaťaženie do„Ahoj svet“, Malo by to fungovať ako predtým.

5.4. Skupiny spotrebiteľov

Pri spustení viacerých inštancií našej aplikácie zakaždým, keď je na vstupnom kanáli nová správa, budú všetci predplatitelia informovaní.

Väčšinou potrebujeme, aby bola správa spracovaná iba raz. Spring Cloud Stream implementuje toto správanie prostredníctvom skupín spotrebiteľov.

Na umožnenie tohto správania môže každá spotrebiteľská väzba používať znak jarný mrak.prúd.viazania..skupina vlastnosť na zadanie názvu skupiny:

jar: cloud: stream: väzby: vstup: cieľ: queue.log.messages binder: local_rabbit skupina: logMessageConsumers ...

6. Mikroslužby riadené správami

V tejto časti predstavujeme všetky požadované funkcie na spustenie našich aplikácií Spring Cloud Stream v kontexte mikroslužieb.

6.1. Škálovanie

Keď je spustených viac aplikácií, je dôležité zabezpečiť správne rozdelenie údajov medzi spotrebiteľov. Za týmto účelom poskytuje Spring Cloud Stream dve vlastnosti:

  • jar.oblačno.prúd.instanciePočet - počet spustených aplikácií
  • spring.cloud.stream.instanceIndex - index aktuálnej aplikácie

Napríklad, ak sme nasadili dve inštancie vyššie uvedeného MyLoggerServiceApplication žiadosť, vlastníctvo jar.oblačno.prúd.instanciePočet by mala byť 2 pre obidve aplikácie a vlastníctvo spring.cloud.stream.instanceIndex by mali byť 0, respektíve 1.

Tieto vlastnosti sa nastavia automaticky, ak nasadíme aplikácie Spring Cloud Stream pomocou Spring Data Flow, ako je popísané v tomto článku.

6.2. Delenie na oddiely

Udalosti domény môžu byť Rozdelené správ. To pomáha, keď sme zväčšenie úložného priestoru a zlepšenie výkonu aplikácií.

Udalosť domény má zvyčajne kľúč oddielu, takže končí v rovnakom oddiele so súvisiacimi správami.

Povedzme, že chceme, aby boli správy denníka rozdelené podľa prvého písmena v správe, ktorým by bol kľúč oddielu, a boli zoskupené do dvoch oddielov.

Pre správy denníka, ktoré začínajú, by existoval jeden oddiel A-M a ďalší oddiel pre N-Z. Toto je možné nakonfigurovať pomocou dvoch vlastností:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression - výraz na rozdelenie užitočného zaťaženia
  • jarný oblak.prúdové_viazania.výstup.výrobca.partitionCount - počet skupín

Niekedy je výraz na rozdelenie príliš zložitý na to, aby sa dal napísať iba do jedného riadku. V týchto prípadoch môžeme pomocou vlastnosti napísať našu vlastnú stratégiu oddielov spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. Ukazovateľ zdravia

V kontexte mikroslužieb tiež musíme zistiť, kedy je služba nefunkčná alebo či zlyháva. Ubytovacie zariadenie poskytuje Spring Cloud Stream správa.zdravie.prijímače.povolené povoliť zdravotné ukazovatele pre spojivá.

Pri spustení aplikácie môžeme zisťovať zdravotný stav na //: / zdravie.

7. Záver

V tomto tutoriáli sme predstavili hlavné koncepty Spring Cloud Stream a ukázali sme si, ako ho používať prostredníctvom niekoľkých jednoduchých príkladov cez RabbitMQ. Viac informácií o Spring Cloud Stream nájdete tu.

Zdrojový kód tohto článku nájdete na GitHub.


$config[zx-auto] not found$config[zx-overlay] not found