Reaktívne systémy v Jave

1. Úvod

V tomto tutoriále pochopíme základy vytvárania reaktívnych systémov v Jave pomocou Spring a ďalších nástrojov a rámcov.

V tomto procese budeme diskutovať o tom, ako je reaktívne programovanie iba hnacou silou pri vytváraní reaktívneho systému. To nám pomôže pochopiť dôvody pre vytváranie reaktívnych systémov a rôznych špecifikácií, knižníc a štandardov, ktoré pri tom inšpirovala.

2. Čo sú reaktívne systémy?

Za posledných niekoľko desaťročí došlo v technologickom prostredí k niekoľkým prerušeniam, ktoré viedli k úplnej transformácii spôsobu, akým vidíme hodnotu v technológii. Výpočtový svet pred internetom si nikdy nemohol predstaviť spôsoby a prostriedky, ktorými zmení náš dnešný deň.

Vďaka dosahu internetu na masy a neustále sa vyvíjajúcim skúsenostiam, ktoré sľubuje, musia byť aplikační architekti v strehu, aby uspokojili ich dopyt.

V zásade to znamená, že nikdy nemôžeme navrhnúť aplikáciu tak, ako sme boli zvyknutí predtým. A vysoko responzívna aplikácia už nie je luxusom, ale nutnosťou.

Aj to čelí náhodným poruchám a nepredvídateľnému zaťaženiu. Potreba hodiny nie je len dosiahnuť správny výsledok, ale dosiahnuť ju aj rýchlo! Je dosť dôležité riadiť úžasné používateľské zážitky, ktoré sľubujeme.

To je to, čo vytvára potrebu architektonického štýlu, ktorý nám môže dať reaktívne systémy.

2.1. Reaktívny manifest

V roku 2013 tím vývojárov pod vedením Jonasa Bonera sa spojil, aby definoval súbor základných princípov v dokumente známom ako Reaktívny manifest. To je to, čo položilo základ pre štýl architektúry pre vytvorenie Reactive Systems. Od tej doby tento manifest získal veľký záujem komunity vývojárov.

Tento dokument v zásade predpisuje recept na to, aby bol reaktívny systém flexibilný, voľne spojený a škálovateľný. Vďaka tomu sa tieto systémy ľahko vyvíjajú, sú odolné voči poruchám a hlavne sú veľmi pohotové, čo je základom pre neuveriteľné používateľské skúsenosti.

Aký je teda tento tajný recept? No, sotva to je tajomstvom! Manifest definuje základné charakteristiky alebo princípy reaktívneho systému:

  • Reagovať: Reaktívny systém by mal poskytovať rýchly a konzistentný čas odozvy, a teda konzistentnú kvalitu služieb
  • Odolný: Reaktívny systém by mal zostať citlivý v prípade náhodných porúch prostredníctvom replikácie a izolácie
  • Elastické: Takýto systém by mal zostať citlivý pri nepredvídateľnom zaťažení prostredníctvom nákladovo efektívnej škálovateľnosti
  • Správa riadená: Mal by sa spoliehať na asynchrónne posielanie správ medzi komponentmi systému

Tieto princípy znejú jednoducho a rozumne, ale implementácia v zložitej podnikovej architektúre nie je vždy ľahšia. V tomto tutoriáli vyvinieme vzorový systém v Jave s ohľadom na tieto princípy!

3. Čo je reaktívne programovanie?

Než budeme pokračovať, je dôležité pochopiť rozdiel medzi reaktívnym programovaním a reaktívnymi systémami. Oba tieto výrazy používame pomerne často a ľahko chápeme jeden za druhého nesprávne. Ako sme už videli skôr, reaktívne systémy sú výsledkom špecifického architektonického štýlu.

Naproti tomu reaktívne programovanie je paradigma programovania, v ktorej sa kladie dôraz na vývoj asynchrónnych a neblokujúcich komponentov. Jadrom reaktívneho programovania je dátový tok, ktorý môžeme pozorovať a reagovať na neho, dokonca môžeme na neho tiež vyvinúť protitlak. To vedie k neblokujúcemu spusteniu, a tým k lepšej škálovateľnosti s menším počtom vlákien vykonania.

To teraz neznamená, že reaktívne systémy a reaktívne programovanie sa navzájom vylučujú. Reaktívne programovanie je v skutočnosti dôležitým krokom k realizácii reaktívneho systému, ale nie je to všetko!

3.1. Reaktívne prúdy

Reactive Streams je komunitná iniciatíva, ktorá začala v roku 2013 až poskytujú štandard pre spracovanie asynchrónneho toku s neblokujúcim spätným tlakom. Cieľom bolo definovať množinu rozhraní, metód a protokolov, ktoré môžu popisovať potrebné operácie a entity.

Odvtedy sa objavilo niekoľko implementácií vo viacerých programovacích jazykoch, ktoré zodpovedajú špecifikácii reaktívnych prúdov. Patria sem napríklad Akka Streams, Ratpack a Vert.x.

3.2. Reaktívne knižnice pre Javu

Jedným z počiatočných cieľov za reaktívnymi prúdmi malo byť nakoniec zahrnutie ako oficiálna štandardná knižnica Java. Vo výsledku je špecifikácia reaktívnych tokov sémanticky ekvivalentná s knižnicou Java Flow zavedenou v prostredí Java 9.

Okrem toho existuje niekoľko populárnych možností implementácie reaktívneho programovania v Jave:

  • Reaktívne rozšírenia: Populárne známe ako ReactiveX, poskytujú API pre asynchrónne programovanie so sledovateľnými prúdmi. Sú k dispozícii pre viaceré programovacie jazyky a platformy, vrátane Javy, ktorá je známa ako RxJava
  • Projektový reaktor: Toto je ďalšia reaktívna knižnica založená na špecifikácii reaktívnych prúdov zameraná na budovanie neaplikácií na JVM. Stáva sa tiež základom reaktívneho komína v jarnom ekosystéme

4. Jednoduchá aplikácia

Na účely tohto tutoriálu vyvinieme jednoduchú aplikáciu založenú na architektúre mikroslužieb s minimálnym rozhraním. Architektúra aplikácie by mala mať dostatok prvkov na vytvorenie reaktívneho systému.

Pre našu aplikáciu prijmeme end-to-end reaktívne programovanie a ďalšie vzory a nástroje na dosiahnutie základných charakteristík reaktívneho systému.

4.1. Architektúra

Začneme definíciou jednoduchá aplikačná architektúra, ktorá nemusí nevyhnutne vykazovať vlastnosti reaktívnych systémov. Ďalej budeme robiť potrebné zmeny, aby sme tieto vlastnosti dosiahli jeden po druhom.

Najprv teda začnime definovaním jednoduchej architektúry:

Jedná sa o celkom jednoduchú architektúru, ktorá má veľa mikroslužieb na uľahčenie obchodného použitia, kde môžeme zadať objednávku. Má tiež rozhranie pre užívateľskú skúsenosť a všetka komunikácia prebieha ako REST cez HTTP. Okrem toho každá mikroslužba spravuje svoje údaje v jednotlivých databázach, čo je postup známy ako databáza podľa služieb.

Pokračujeme ďalej a v nasledujúcich pododdieloch vytvoríme túto jednoduchú aplikáciu. Toto bude naše základne pre pochopenie omylov tejto architektúry a spôsoby a prostriedky na prijatie zásad a postupov, aby sme to mohli transformovať do reaktívneho systému.

4.3. Mikroslužba zásob

Mikroslužba zásob bude zodpovedný za správu zoznamu produktov a ich aktuálnych zásob. Umožní to tiež zmenu stavu zásob pri spracovávaní objednávok. Na vývoj tejto služby použijeme Spring Boot s MongoDB.

Začnime definovaním kontrolóra, ktorý odhalí niektoré koncové body:

@GetMapping public List getAllProducts () {return productService.getProducts (); } @PostMapping public Order processOrder (@RequestBody Order order) {return productService.handleOrder (order); } @DeleteMapping public Order revertOrder (@RequestBody Order order) {return productService.revertOrder (order); }

a služba na zapuzdrenie našej obchodnej logiky:

@Transactional public Order handleOrder (objednávka) produkt: "+ l.getProductId ())); if (p.getStock ()> = l.getQuantity ()) {p.setStock (p.getStock () - l.getQuantity ()); productRepository.save ( p);} else {throw new RuntimeException ("Product is out of stock:" + l.getProductId ());}}); vrátiť order.setOrderStatus (OrderStatus.SUCCESS); } @Transactional public Order revertOrder (Order order) {order.getLineItems () .forEach (l -> {Product p = productRepository.findById (l.getProductId ()) .orElseThrow (() -> new RuntimeException ("Could not find produkt: "+ l.getProductId ())); p.setStock (p.getStock () + l.getQuantity ()); productRepository.save (p);}); vrátiť order.setOrderStatus (OrderStatus.SUCCESS); }

Všimnite si, že sme pretrvávanie subjektov v rámci transakcie, ktorá zaručuje, že v prípade výnimiek nedôjde k nekonzistentnému stavu.

Okrem toho budeme musieť definovať aj entity domény, rozhranie úložiska a množstvo konfiguračných tried potrebných na to, aby všetko fungovalo správne.

Pretože sa ale jedná väčšinou o štandardné verzie, nebudeme sa nimi prechádzať a dá sa o nich hovoriť v úložisku GitHub poskytnutom v poslednej časti tohto článku.

4.4. Prepravná mikroslužba

Mikroslužba na prepravu sa tiež nebude veľmi líšiť. Toto bude zodpovedný za kontrolu, či je možné pre objednávku vygenerovať zásielku a vytvorte ho, ak je to možné.

Rovnako ako predtým definujeme kontrolór, ktorý odhalí naše koncové body, v skutočnosti iba jeden koncový bod:

@PostMapping verejný proces objednávky (@RequestBody objednávka objednávky) {return shippingService.handleOrder (objednávka); }

a služba na zapuzdrenie obchodnej logiky súvisiacej so zásielkou objednávky:

public Order handleOrder (objednávka objednávky) {LocalDate shippingDate = null; if (LocalTime.now (). isAfter (LocalTime.parse ("10:00")) && LocalTime.now (). isBefore (LocalTime.parse ("18:00"))) {shippingDate = LocalDate.now () .plusDays (1); } else {hodiť novú RuntimeException ("Aktuálny čas je mimo limitov na zadanie objednávky."); } shipmentRepository.save (new Shipment () .setAddress (order.getShippingAddress ()) .setShippingDate (shippingDate)); vrátiť order.setShippingDate (shippingDate) .setOrderStatus (OrderStatus.SUCCESS); }

Naša jednoduchá prepravná služba iba kontroluje platné časové okno na zadávanie objednávok. Vyhneme sa diskusii o zvyšku štandardného kódu ako predtým.

4.5. Objednajte si Microservice

Nakoniec definujeme mikroslužbu objednávky, ktorá bude zodpovedný za vytvorenie novej objednávky okrem iného. Je zaujímavé, že bude hrať aj ako služba orchestrátora, kde bude komunikovať so službou zásob a prepravnou službou pre objednávku.

Definujme náš kontrolór s požadovanými koncovými bodmi:

@PostMapping verejná objednávka vytvorenie (@RequestBody objednávka objednávky) {Objednávka spracovanáOrder = orderService.createOrder (objednávka); if (OrderStatus.FAILURE.equals (processingOrder.getOrderStatus ())) {hodiť novú RuntimeException ("Spracovanie objednávky zlyhalo, skúste to znova neskôr."); } návrat spracovanýObjednávka; } @GetMapping verejný zoznam getAll () {návrat orderService.getOrders (); }

A služba na zapuzdrenie obchodnej logiky súvisiacej s objednávkami:

public Order createOrder (Order order) {boolean success = true; Objednávka savedOrder = orderRepository.save (objednávka); Objednávka inventoryResponse = null; try {inventoryResponse = restTemplate.postForObject (inventoryServiceUrl, order, Order.class); } catch (Exception ex) {success = false; } Objednať shippingResponse = null; try {shippingResponse = restTemplate.postForObject (shippingServiceUrl, order, Order.class); } catch (Exception ex) {success = false; HttpEntity deleteRequest = nový HttpEntity (poradie); ResponseEntity deleteResponse = restTemplate.exchange (inventoryServiceUrl, HttpMethod.DELETE, deleteRequest, Order.class); } if (success) {savedOrder.setOrderStatus (OrderStatus.SUCCESS); savedOrder.setShippingDate (shippingResponse.getShippingDate ()); } else {savedOrder.setOrderStatus (OrderStatus.FAILURE); } vrátiť orderRepository.save (savedOrder); } public List getOrders () {return orderRepository.findAll (); }

Vybavovanie objednávok, pri ktorých organizujeme hovory do skladových a prepravných služieb, nie je ani zďaleka ideálne. Distribuovaný transakcie s viacerými mikroslužbami je komplexná téma sama osebe a presahuje rámec tohto tutoriálu.

Neskôr v tomto tutoriále však uvidíme, ako sa reaktívny systém môže do istej miery vyhnúť potrebe distribuovaných transakcií.

Rovnako ako predtým, nebudeme prechádzať zvyškom štandardného kódu. Na to sa však dá odkazovať v repo GitHub.

4.6. Front-end

Aby bola diskusia kompletná, pridajme tiež používateľské rozhranie. Užívateľské rozhranie bude založené na Angular a bude to jednoduchá jednostránková aplikácia.

Budeme musieť vytvorte jednoduchý komponent v Angular na zvládnutie vytvárania a načítania objednávok. Osobitne dôležitá je časť, kde na vytvorenie objednávky voláme naše API:

createOrder () {let headers = new HttpHeaders ({'Content-Type': 'application / json'}); let options = {headers: headers} this.http.post ('// localhost: 8080 / api / commands', this.form.value, options) .subscribe ((response) => {this.response = response}, (chyba) => {this.error = chyba})}

Vyššie uvedený úryvok kódu očakáva, že údaje o objednávke budú zachytené vo forme a budú dostupné v rozsahu komponentu. Angular ponúka fantastickú podporu pre vytváranie jednoduchých až zložitých formulárov pomocou reaktívnych formulárov a formulárov riadených šablónami.

Dôležitá je tiež časť, kde dostávame predtým vytvorené objednávky:

getOrders () {this.previousOrders = this.http.get ('' // localhost: 8080 / api / orders '')}}

Upozorňujeme, že modul Angular HTTP je asynchrónnej povahy, a teda vracia RxJS Pozorovateľnés. Odpoveď môžeme podľa nášho názoru zvládnuť tak, že ich predáme cez asynchronnú rúru:

Vaše doteraz zadané objednávky:

  • ID objednávky: {{order.id}}, stav objednávky: {{order.orderStatus}}, správa o objednávke: {{order.responseMessage}}

Angular bude samozrejme na svoju prácu vyžadovať šablóny, štýly a konfigurácie, ale na tie sa dá odkazovať v úložisku GitHub. Upozorňujeme, že sme tu zoskupili všetko do jednej súčasti, čo by v ideálnom prípade nemalo robiť.

Pre tento tutoriál však tieto obavy nie sú v rozsahu pôsobnosti.

4.7. Nasadenie aplikácie

Teraz, keď sme vytvorili všetky jednotlivé časti aplikácie, ako by sme ich mali nasadiť? Vždy to môžeme urobiť manuálne. Mali by sme však byť opatrní, aby to čoskoro mohlo byť nudné.

Pre tento tutoriál použijeme Docker Compose to vytvoriť a nasadiť našu aplikáciu na Docker Machine. To si bude vyžadovať, aby sme do každej služby pridali štandardný súbor Dockerfile a vytvorili súbor Docker Compose pre celú aplikáciu.

Pozrime sa, ako na to docker-compose.yml súbor vyzerá:

verzia: „3“ služby: frontend: build: ./ frontend porty: - „80:80“ objednávková služba: build: ./order-service porty: - „8080: 8080“ inventarizačná služba: build: ./inventár -služobné porty: - „8081: 8081“ prepravná služba: zostavenie: ./shipping-service porty: - „8082: 8082“

Toto je pomerne štandardná definícia služieb v Docker Compose a nevyžaduje si nijakú zvláštnu pozornosť.

4.8. Problémy s touto architektúrou

Teraz, keď máme zavedenú jednoduchú aplikáciu s viacerými službami interagujúcimi navzájom, môžeme diskutovať o problémoch v tejto architektúre. V nasledujúcich častiach sa pokúsime vyriešiť niektoré veci a nakoniec sa dostaneme do stavu, v ktorom by sme našu aplikáciu transformovali na reaktívny systém!

Aj keď táto aplikácia zďaleka nie je produkčným softvérom a existuje niekoľko problémov, urobíme to zamerať sa na problémy súvisiace s motiváciou pre reaktívne systémy:

  • Zlyhanie inventarizačnej alebo prepravnej služby môže mať kaskádový efekt
  • Hovory do externých systémov a databáz majú blokujúcu povahu
  • Nasadenie nedokáže automaticky zvládnuť poruchy a kolísavé zaťaženia

5. Reaktívne programovanie

Blokovanie hovorov v ľubovoľnom programe často výsledkom sú kritické zdroje, ktoré čakajú na to, čo sa stane. Patria sem volania do databáz, volania do webových služieb a volania v súborovom systéme. Ak dokážeme uvoľniť vlákna vykonávania z tohto čakania a poskytneme mechanizmus na spätné krúženie, akonáhle budú k dispozícii výsledky, prinesie to oveľa lepšie využitie zdrojov.

To pre nás robí prijatie paradigmy reaktívneho programovania. Aj keď je možné pri mnohých z týchto hovorov prejsť na reaktívnu knižnicu, nemusí byť možné všetko. Pre nás našťastie Spring výrazne uľahčuje použitie reaktívneho programovania s MongoDB a REST API:

Spoločnosť Spring Data Mongo podporuje reaktívny prístup prostredníctvom ovládača Java MongoDB Reactive Streams. To poskytuje ReactiveMongoTemplate a ReactiveMongoRepository, ktoré obidve majú rozsiahlu funkcionalitu mapovania.

Spring WebFlux poskytuje webový rámec reaktívneho zásobníka pre Spring, ktorý umožňuje neblokujúci kód a protitlak Reactive Streams. Využíva reaktor ako svoju reaktívnu knižnicu. Ďalej poskytuje Webový klient na vykonávanie požiadaviek HTTP s protitlakom reaktívnych prúdov. Ako knižnica klientov HTTP používa Reactor Netty.

5.1. Inventúrna služba

Začneme zmenou našich koncových bodov tak, aby vydávali reaktívnych vydavateľov:

@GetMapping public Flux getAllProducts () {return productService.getProducts (); }
@PostMapping public Mono processOrder (@RequestBody Order order) {return productService.handleOrder (order); } @DeleteMapping public Mono revertOrder (@RequestBody Order order) {return productService.revertOrder (order); }

Je zrejmé, že budeme musieť urobiť potrebné zmeny aj v službe:

@Transactional public Mono handleOrder (objednávka objednávky) {return Flux.fromIterable (order.getLineItems ()) .flatMap (l -> productRepository.findById (l.getProductId ())) .flatMap (p -> {int q = objednávka. getLineItems (). stream () .filter (l -> l.getProductId (). sa rovná (p.getId ())) .findAny (). get () .getQuantity (); if (p.getStock ()> = q) {p.setStock (p.getStock () - q); return productRepository.save (p);} else {return Mono.error (new RuntimeException ("Product is out of stock:" + p.getId ()) );}}). potom (Mono.just (order.setOrderStatus ("ÚSPECH")))); } @Transactional public Mono revertOrder (objednávka objednávky) {return Flux.fromIterable (order.getLineItems ()) .flatMap (l -> productRepository.findById (l.getProductId ())) .flatMap (p -> {int q = objednávka .getLineItems (). stream () .filter (l -> l.getProductId (). sa rovná (p.getId ())) .findAny (). get () .getQuantity (); p.setStock (p.getStock ( ) + q); vrátiť productRepository.save (p);}). potom (Mono.just (order.setOrderStatus ("ÚSPECH"))); }

5.2. Prepravná služba

Podobne zmeníme koncový bod našej prepravnej služby:

@PostMapping verejný Mono proces (@RequestBody objednávka objednávky) {návrat shippingService.handleOrder (objednávka); }

A zodpovedajúce zmeny v službe s cieľom využiť reaktívne programovanie:

public Mono handleOrder (Objednávka objednávky) {return Mono.just (objednávka) .flatMap (o -> {LocalDate shippingDate = null; if (LocalTime.now (). isAfter (LocalTime.parse ("10:00")) && LocalTime .now (). isBefore (LocalTime.parse ("18:00"))) {shippingDate = LocalDate.now (). plusDays (1);} else {return Mono.error (new RuntimeException ("Aktuálny čas je vypnutý) limity pre zadanie objednávky. "));} vrátiť shipmentRepository.save (new Shipment () .setAddress (order.getShippingAddress ()) .setShippingDate (shippingDate));}) .map (s -> order.setShippingDate (s. getShippingDate ()) .setOrderStatus (OrderStatus.SUCCESS)); }

5.3. Objednať službu

V koncových bodoch objednávkovej služby budeme musieť urobiť podobné zmeny:

@PostMapping public Mono create (@RequestBody Order order) {return orderService.createOrder (order) .flatMap (o -> {if (OrderStatus.FAILURE.equals (o.getOrderStatus ())) {return Mono.error (new RuntimeException ( „Spracovanie objednávky zlyhalo, skúste to znova neskôr.“ + O.getResponseMessage ()));} else {return Mono.just (o);}}); } @GetMapping public Flux getAll () {return orderService.getOrders (); }

Zmeny v službe budú zapojené viac, pretože budeme musieť využiť jar Webový klient na vyvolanie reaktívnych koncových bodov zásob a prepravy:

public Mono createOrder (objednávka objednávky) {návrat Mono.just (objednávka) .flatMap (orderRepository :: save) .flatMap (o -> {návrat webClient.method (HttpMethod.POST) .uri (inventoryServiceUrl) .body (BodyInserters.fromValue (o)) .exchange ();}) .onErrorResume (chyba -> {návrat Mono.just (order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .flatMap (o -> {if (! OrderStatus.FAILURE.equals (o.getOrderStatus ())) {návrat webClient.method (HttpMethod.POST) .uri (shippingServiceUrl) .body (BodyInserters.fromValue (o)) .exchange ();} else { návrat Mono.just (o);}}) .onErrorResume (chyba -> {návrat webClient.method (HttpMethod.POST) .uri (inventoryServiceUrl) .body (BodyInserters.fromValue (objednávka)) .retrieve () .bodyToMono (objednávka .class) .map (o -> o.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .map (o -> {if (! OrderStatus.FAILURE.equals (o.getOrderStatus ( ))) {návrat order.setShippingDate (o.getShippingDate ()) .setOrderStatus (OrderStatus.SUCCESS);} else {návrat order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (o.getResponseMessage ()); }}) .flatMap (orderRepository :: save); } public Flux getOrders () {return orderRepository.findAll (); }

Toto orchestrácia s reaktívnymi API nie je ľahké a často je náchylná na chyby a je ťažké ju odladiť. Ako sa to dá zjednodušiť, uvidíme v nasledujúcej časti.

5.4. Front-end

Teraz, keď sú naše rozhrania API schopné streamovať udalosti, ktoré sa vyskytujú, je celkom prirodzené, že by sme to mali mať možnosť využiť aj v našom klientskom rozhraní. Angular našťastie podporuje EventSource, rozhranie pre udalosti odoslané serverom.

Pozrime sa, ako môžeme stiahnuť a spracovať všetky naše predchádzajúce objednávky ako prúd udalostí:

getOrderStream () {return Observable.create ((observer) => {let eventSource = new EventSource ('// localhost: 8080 / api / commands') eventSource.onmessage = (event) => {let json = JSON.parse ( event.data) this.orders.push (json) this._zone.run (() => {observer.next (this.orders)})} eventSource.onerror = (chyba) => {if (eventSource.readyState = == 0) {eventSource.close () this._zone.run (() => {observer.complete ()})} else {this._zone.run (() => {observer.error ('chyba EventSource: '+ chyba)})}}})}

6. Architektúra riadená správami

Prvý problém, ktorý budeme riešiť, sa týka komunikácie medzi službami. Práve teraz, tieto komunikácie sú synchrónne, čo predstavuje niekoľko problémov. Patria sem napríklad kaskádové zlyhania, zložitá orchestrácia a distribuované transakcie.

Zrejmým spôsobom riešenia tohto problému je asynchrónna komunikácia. A sprostredkovateľ správ pre uľahčenie všetkej komunikácie medzi službami môže urobiť tento trik za nás. Použijeme Kafku ako nášho sprostredkovateľa správ a program Spring for Kafka na výrobu a konzumáciu správ:

Na vytvorenie a spotrebovanie správ o objednávkach s rôznymi stavmi objednávok, aby mohli služby reagovať, použijeme jednu tému.

Pozrime sa, ako sa každá služba musí zmeniť.

6.1. Inventúrna služba

Začnime definovaním producenta správ pre našu inventárnu službu:

@Autowired private KafkaTemplate kafkaTemplate; public void sendMessage (objednávka objednávky) {this.kafkaTemplate.send ("objednávky", objednávka); }

Ďalej budeme musieť definovať spotrebiteľa správ pre inventarizačnú službu, aby reagoval na rôzne správy týkajúce sa tejto témy:

@KafkaListener (topic = "objednávky", groupId = "inventár") public void consume (objednávka objednávky) vyvolá IOException {if (OrderStatus.RESERVE_INVENTORY.equals (order.getOrderStatus ())) {productService.handleOrder (objednávka) .doOnSuccess ( o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_SUCCESS))}) .doOnError (e -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_FAILURE) .setResponseMessage (e.getget) }). prihlásiť sa na odber (); } else if (OrderStatus.REVERT_INVENTORY.equals (order.getOrderStatus ())) {productService.revertOrder (order) .doOnSuccess (o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_REVERT_SUCCESS))}}) e -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_REVERT_FAILURE) .setResponseMessage (e.getMessage ()));}). subscribe (); }}

To tiež znamená, že teraz môžeme bezpečne vylúčiť niektoré nadbytočné koncové body z nášho kontrolóra. Tieto zmeny sú dostatočné na dosiahnutie asynchrónnej komunikácie v našej aplikácii.

6.2. Prepravná služba

Zmeny v prepravnej službe sú relatívne podobné tým, ktoré sme urobili predtým v prípade inventarizačnej služby. Producent správ je rovnaký a spotrebiteľ správ je špecifický pre logiku prepravy:

@KafkaListener (topic = "orders", groupId = "shipping") public void consume (Order order) throws IOException {if (OrderStatus.PREPARE_SHIPPING.equals (order.getOrderStatus ())) {shippingService.handleOrder (order) .doOnSuccess ( o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.SHIPPING_SUCCESS) .setShippingDate (o.getShippingDate ()));}) .doOnError (e -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.SHIPP). (e.getMessage ()));}). prihlásiť sa (); }}

Teraz môžeme bezpečne vylúčiť všetky koncové body v našom kontroléri, pretože ich už nepotrebujeme.

6.3. Objednať službu

Zmeny v objednávkovej službe budú o niečo viac zapojené, pretože práve tu sme robili všetku orchestráciu skôr.

Producent správ však zostáva nezmenený a spotrebiteľ správ preberá logiku špecifickú pre službu objednávok:

@KafkaListener (topic = "objednávky", groupId = "objednávky") public void consume (objednávka objednávky) vyvolá IOException {if (OrderStatus.INITIATION_SUCCESS.equals (order.getOrderStatus ())) {orderRepository.findById (order.getId () ). mapa (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.RESERVE_INVENTORY)); vrátiť o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (objednávka) : uložiť) .subscribe (); } else if ("INVENTORY-SUCCESS" .equals (order.getOrderStatus ())) {orderRepository.findById (order.getId ()) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.PREPARE_SHIPPING)) ; návrat o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: save) .subscribe (); } else if ("SHIPPING-FAILURE" .equals (order.getOrderStatus ())) {orderRepository.findById (order.getId ()) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.REVERT_INVENTORY)) ; návrat o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: save) .subscribe (); } else {orderRepository.findById (order.getId ()) .map (o -> {návrat o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: save ) .subscribe (); }}

The spotrebiteľ tu iba reaguje na správy s rôznymi stavmi objednávky. To je to, čo nám dáva choreografiu medzi rôznymi službami.

Nakoniec bude potrebné zmeniť aj našu objednávkovú službu, aby podporila túto choreografiu:

public Mono createOrder (objednávka objednávky) {return Mono.just (objednávka) .flatMap (orderRepository :: save) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.INITIATION_SUCCESS)); návrat o;}). onErrorResume (err -> {return Mono.just (order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .flatMap (orderRepository :: save); }

Upozorňujeme, že je to oveľa jednoduchšie ako služba, ktorú sme museli písať s reaktívnymi koncovými bodmi v poslednej časti. Asynchrónne choreografia často vedie k oveľa jednoduchšiemu kódu, aj keď to stojí za cenu prípadnej konzistencie a zložitého ladenia a monitorovania. Ako môžeme hádať, náš front-end už nezíska okamžitý konečný stav objednávky.

7. Služba orchestrácie kontajnerov

Posledný kúsok skladačky, ktorú chceme vyriešiť, súvisí s nasadením.

To, čo v aplikácii chceme, je dostatočná nadbytočnosť a tendencia k automatickému zväčšovaniu alebo zmenšovaniu v závislosti od potreby.

Už sme dosiahli kontajnerizáciu služieb cez Docker a spravujeme medzi nimi závislosti prostredníctvom Docker Compose. Aj keď sú to samé o sebe fantastické nástroje, nepomáhajú nám dosiahnuť to, čo chceme.

Preto sme Potrebujete službu orchestrácie kontajnerov, ktorá sa v našej aplikácii postará o nadbytočnosť a škálovateľnosť. Aj keď existuje niekoľko možností, jednou z populárnych je napríklad Kubernetes. Spoločnosť Kubernetes nám poskytuje cloudovo-agnostický spôsob dosiahnutia vysoko škálovateľného nasadenia kontajnerovaných pracovných záťaží.

Kubernetes zabalí kontajnery ako Docker do podov, ktoré sú najmenšou jednotkou nasadenia. Ďalej môžeme použiť Deployment na deklaratívny popis požadovaného stavu.

Nasadením sa vytvorí sada ReplicaSets, ktorá je interne zodpovedná za uvedenie podov. Môžeme opísať minimálny počet identických podov, ktoré by mali byť v prevádzke kedykoľvek. To poskytuje nadbytočnosť a tým aj vysokú dostupnosť.

Pozrime sa, ako môžeme definovať nasadenie Kubernetes pre naše aplikácie:

apiVersion: apps / v1 kind: Deployment metadata: name: inventory-deployment spec: replicas: 3 selector: matchLabels: name: inventory-deployment template: metadata: labels: name: inventory-deployment spec: containers: - name: inventory image: inventory-service-async: najnovšie porty: - containerPort: 8081 --- apiVerzia: druh aplikácií / v1: metaúdaje nasadenia: názov: špecifikácia prepravy-nasadenia: repliky: 3 selektor: matchLabels: názov: šablóna pre prepravu-nasadenie: metadáta: štítky : name: shipping-deployment spec: containers: - name: shipping image: shipping-service-async: latest ports: - containerPort: 8082 --- apiVersion: apps / v1 kind: Deployment metadata: name: order-deployment spec: replicas : 3 selector: matchLabels: name: order-deployment template: metadata: labels: name: order-deployment spec: containers: - name: order image: order-service-async: latest ports: - containerPort: 8080

Tu deklarujeme naše nasadenie na udržanie troch identických replík podov kedykoľvek. Aj keď je to dobrý spôsob, ako pridať nadbytočnosť, nemusí to stačiť na rôzne zaťaženia. Kubernetes poskytuje ďalší zdroj známy ako Horizontálny pod automatický škálovač, ktorý dokáže škálovať počet podov v nasadení na základe pozorovaných metrík ako využitie procesora.

Upozorňujeme, že sme práve pokryli aspekty škálovateľnosti aplikácie hostenej v klastri Kubernetes. To nevyhnutne neznamená, že samotný klaster, ktorý je ním podložený, je škálovateľný. Vytvorenie klastra Kubernetes s vysokou dostupnosťou je netriviálna úloha a presahuje rámec tohto tutoriálu.

8. Výsledný reaktívny systém

Teraz, keď sme v našej architektúre vykonali niekoľko vylepšení, je možno čas vyhodnotiť to vzhľadom na definíciu reaktívneho systému. Vyhodnotenie ponecháme oproti štyrom charakteristikám reaktívnych systémov, o ktorých sme hovorili už skôr v tutoriále:

  • Reagovať: Prijatie paradigmy reaktívneho programovania by nám malo pomôcť dosiahnuť úplné blokovanie, a teda responzívnu aplikáciu
  • Odolný: Nasadenie Kubernetes s ReplicaSet požadovaného počtu podov by malo poskytovať odolnosť proti náhodným zlyhaniam
  • Elastické: Klaster a zdroje Kubernetes by nám mali poskytnúť potrebnú podporu, aby sme boli elastickí voči nepredvídateľným zaťaženiam
  • Správa riadená: Tu by nám malo pomôcť to, že všetka komunikácia medzi službami bude prebiehať asynchrónne prostredníctvom sprostredkovateľa Kafka

Aj keď to vyzerá celkom sľubne, ešte to zďaleka nie je na konci. Úprimne povedané, snahou o skutočne reaktívny systém by malo byť neustále zlepšovanie. Vo vysoko komplexnej infraštruktúre, kde je naša aplikácia len malou časťou, nikdy nemôžeme zabrániť tomu, čo môže zlyhať.

Reaktívny systém tak bude požadovať spoľahlivosť od každej časti, ktorá robí celok. Od fyzickej siete po služby infraštruktúry, ako je DNS, by všetky mali zapadať do radu, aby nám pomohli dosiahnuť konečný cieľ.

Často nemusí byť možné, aby sme všetky tieto časti spravovali a poskytovali potrebné záruky. A toto je miesto Spravovaná cloudová infraštruktúra pomáha zmierniť našu bolesť. Môžeme si vybrať z množstva služieb ako IaaS (Infeastrure-as-a-Service), BaaS (Backend-as-a-Service) a PaaS (Platform-as-a-Service) na delegovanie zodpovedností na externé strany. Toto nám ponecháva zodpovednosť za našu aplikáciu, pokiaľ je to možné.

9. Záver

V tomto tutoriáli sme si prešli základy reaktívnych systémov a aké sú ich porovnania s reaktívnym programovaním. Vytvorili sme jednoduchú aplikáciu s viacerými mikroslužbami a zvýraznili sme problémy, ktoré chceme vyriešiť pomocou reaktívneho systému.

Ďalej sme pokračovali v zavádzaní reaktívneho programovania, architektúry založenej na správach a služby orchestrácie kontajnerov v architektúre na realizáciu reaktívneho systému.

Na záver sme diskutovali o výslednej architektúre a o tom, ako zostáva cestou k reaktívnemu systému! Tento tutoriál nám nepredstavuje všetky nástroje, rámce alebo vzory, ktoré nám môžu pomôcť vytvoriť reaktívny systém, ale zoznamuje nás s cestou.

Ako obvykle, zdrojový kód tohto článku nájdete na GitHub.


$config[zx-auto] not found$config[zx-overlay] not found