Začíname so spracovaním streamov s tokom údajov Spring Cloud

1. Úvod

Tok dát jarného cloudu je cloudový natívny programovací a operačný model pre mikroslužby zložiteľných údajov.

S Tok dát jarného cloudu, môžu vývojári vytvárať a organizovať dátové kanály pre bežné prípady použitia, ako je príjem dát, analýza v reálnom čase a import / export dát.

Tieto dátové kanály majú dve príchute, streamovacie a dávkové dátové kanály.

V prvom prípade sa nespútané množstvo dát spotrebuje alebo vytvorí prostredníctvom middlewaru správ. Zatiaľ čo v druhom prípade krátkodobá úloha spracuje konečný súbor údajov a potom sa ukončí.

Tento článok sa zameria na spracovanie streamingu.

2. Architektonický prehľad

Kľúčovými komponentmi tohto typu architektúry sú Aplikácie, Server toku údajova cieľový modul runtime.

Okrem týchto kľúčových komponentov tiež zvyčajne máme a Shell toku údajov a a sprostredkovateľ správ v rámci architektúry.

Pozrime sa na všetky tieto komponenty podrobnejšie.

2.1. Aplikácie

Streamovanie dátových tokov zvyčajne zahrnuje náročné udalosti z externých systémov, spracovanie údajov a perzistenciu polyglotov. Tieto fázy sa bežne označujú ako Zdroj, procesora drez v Jarný mrak terminológia:

  • Zdroj: je aplikácia, ktorá využíva udalosti
  • Procesor: spotrebúva údaje z Zdroj, vykoná na ňom nejaké spracovanie a odošle spracované údaje do ďalšej pripravovanej aplikácie
  • Drez: buď konzumuje z a Zdroj alebo procesor a zapíše údaje do požadovanej vrstvy perzistencie

Tieto aplikácie je možné zbaliť dvoma spôsobmi:

  • Spring Boot uber-jar, ktorý je hostený v úložisku maven, súbore, http alebo v akejkoľvek inej implementácii jarných zdrojov (táto metóda bude použitá v tomto článku)
  • Docker

Mnoho zdrojov, aplikácií procesorov a umývadiel pre bežné prípady použitia (napr. Jdbc, hdfs, http, router) je už poskytnutých a pripravených na použitie Tok dát jarného cloudu tím.

2.2. Beh programu

Na vykonanie týchto aplikácií je tiež potrebný runtime. Podporované runtime sú:

  • Cloud Foundry
  • Apache PRIADZE
  • Kubernetes
  • Apache Mesos
  • Lokálny server pre vývoj (ktorý sa použije v tomto článku)

2.3. Server toku údajov

Komponent zodpovedný za nasadenie aplikácií za behu je Server toku údajov. Existuje Server toku údajov spustiteľný balík poskytnutý pre každý z cieľových runtime.

The Server toku údajov je zodpovedný za tlmočenie:

  • Stream DSL, ktorý popisuje logický tok údajov cez viac aplikácií.
  • Manifest nasadenia, ktorý popisuje mapovanie aplikácií do modulu runtime.

2.4. Shell toku údajov

Data Flow Shell je klientom servera Data Flow. Shell nám umožňuje vykonať príkaz DSL potrebný na interakciu so serverom.

Napríklad DSL na opis toku údajov zo zdroja http do jdbc drezu bude napísaný ako „http | jdbc ”. Tieto mená v DSL sú registrované u Server toku údajov a mapujte na artefakty aplikácie, ktoré môžu byť hostené v úložiskách Maven alebo Docker.

Jar ponúka aj grafické rozhranie s názvom Flo, na vytváranie a monitorovanie streamovacích dátových potrubí. Jeho použitie však nie je predmetom diskusie o tomto článku.

2.5. Sprostredkovateľ správ

Ako sme videli v príklade predchádzajúcej časti, do definície toku údajov sme použili symbol potrubia. Symbol prepojenia predstavuje komunikáciu medzi týmito dvoma aplikáciami prostredníctvom middlewaru správ.

To znamená, že potrebujeme sprostredkovateľa správ, ktorý je v cieľovom prostredí funkčný.

Podporovaní sú dvaja sprostredkovatelia middleware správ:

  • Apache Kafka
  • RabbitMQ

Takže teraz, keď máme prehľad o architektonických komponentoch - je čas postaviť náš prvý plynovod na spracovanie toku.

3. Nainštalujte si Sprostredkovateľa správ

Ako sme videli, aplikácie, ktoré sa pripravujú, na komunikáciu potrebujú middleware na odosielanie správ. Na účely tohto článku si povieme ďalej RabbitMQ.

Všetky podrobnosti o inštalácii nájdete podľa pokynov na oficiálnych stránkach.

4. Lokálny server toku údajov

Na urýchlenie procesu generovania našich aplikácií použijeme Spring Initializr; s jeho pomocou môžeme získať naše Jarná topánka aplikácie za pár minút.

Po prechode na webovú stránku jednoducho vyberte a Skupina a an Artefakt názov.

Po dokončení kliknite na tlačidlo Vytvoriť projekt na spustenie sťahovania artefaktu Maven.

Po dokončení sťahovania rozbaľte projekt a naimportujte ho ako projekt Maven do svojho IDE podľa vášho výberu.

Pridajme do projektu závislosť Maven. Ako budeme potrebovať Lokálny server toku údajov knižnice, pridajme závislosť spring-cloud-starter-dataflow-server-local:

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 

Teraz musíme anotovať Jarná topánka hlavná trieda s @EnableDataFlowServer anotácia:

@EnableDataFlowServer @SpringBootApplication verejná trieda SpringDataFlowServerApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowServerApplication.class, args); }} 

To je všetko. Náš Lokálny server toku údajov je pripravený na vykonanie:

mvn spring-boot: spustiť

Aplikácia sa spustí na porte 9393.

5. Shell toku údajov

Opäť choďte na Spring Initializr a zvoľte a Skupina a Artefakt názov.

Po stiahnutí a importovaní projektu pridajme závislosť spring-cloud-dataflow-shell:

 org.springframework.cloud spring-cloud-dataflow-shell 

Teraz musíme pridať @EnableDataFlowShell anotácia k Jarná topánka hlavná trieda:

@EnableDataFlowShell @SpringBootApplication verejná trieda SpringDataFlowShellApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowShellApplication.class, args); }} 

Teraz môžeme spustiť shell:

mvn spring-boot: spustiť

Po spustení shellu môžeme napísať Pomoc príkazom vo výzve zobrazíte kompletný zoznam príkazov, ktoré môžeme vykonať.

6. Zdrojová aplikácia

Podobne na Initializr teraz vytvoríme jednoduchú aplikáciu a pridáme a Stream králik závislosť zvaná jarný-oblak-štartovací prúd-králik:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 

Potom pridáme @EnableBinding (Source.class) anotácia k Jarná topánka hlavná trieda:

@EnableBinding (Source.class) @SpringBootApplication verejná trieda SpringDataFlowTimeSourceApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowTimeSourceApplication.class, args); }}

Teraz musíme definovať zdroj údajov, ktoré sa musia spracovať. Týmto zdrojom môže byť akékoľvek potenciálne nekonečné pracovné zaťaženie (údaje senzorov internetu vecí, spracovanie udalostí 24 hodín denne, 7 dní v týždni, príjem údajov z online transakcií).

V našej vzorovej aplikácii vyrábame jednu udalosť (pre jednoduchosť novú časovú pečiatku) každých 10 sekúnd s a Poller.

The @InboundChannelAdapter anotácia odošle správu na výstupný kanál zdroja a použije návratovú hodnotu ako užitočné zaťaženie správy:

@Bean @InboundChannelAdapter (hodnota = Source.OUTPUT, poller = @Poller (fixedDelay = "10 000", maxMessagesPerPoll = "1")) verejný MessageSource timeMessageSource () {return () -> MessageBuilder.withPayload (nový dátum (). GetTime ()). build (); } 

Náš zdroj údajov je pripravený.

7. Aplikácia procesora

Ďalej - vytvoríme aplikáciu a pridáme a Stream králik závislosť.

Potom pridáme @EnableBinding (Processor.class) anotácia k Jarná topánka hlavná trieda:

@EnableBinding (Processor.class) @SpringBootApplication verejná trieda SpringDataFlowTimeProcessorApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowTimeProcessorApplication.class, args); }}

Ďalej musíme definovať metódu na spracovanie údajov pochádzajúcich zo zdrojovej aplikácie.

Ak chcete definovať transformátor, musíme túto metódu anotovať @Transformátor anotácia:

@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) transformácia verejného objektu (dlhá časová pečiatka) {DateFormat dateFormat = nový SimpleDateFormat ("rrrr / MM / dd hh: mm: rr"); Reťazec date = dateFormat.format (timestamp); dátum návratu; }

Prevedie časovú značku zo „vstupného“ kanálu na formátovaný dátum, ktorý sa odošle na „výstupný“ kanál.

8. Aplikácia Sink

Poslednou aplikáciou, ktorá sa vytvorí, je aplikácia Sink.

Opäť choďte na Spring Initializr a zvoľte a Skupina, an Artefakt názov. Po stiahnutí projektu pridajme a Stream králik závislosť.

Potom pridajte @EnableBinding (Sink.class) anotácia k Jarná topánka hlavná trieda:

@EnableBinding (Sink.class) @SpringBootApplication verejná trieda SpringDataFlowLoggingSinkApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowLoggingSinkApplication.class, args); }}

Teraz potrebujeme metódu na zachytávanie správ prichádzajúcich z aplikácie procesora.

Aby sme to dosiahli, musíme pridať @StreamListener (Sink.INPUT) anotácia k našej metóde:

@StreamListener (Sink.INPUT) public void loggerSink (dátum reťazca) {logger.info ("Prijaté:" + dátum); }

Metóda jednoducho vytlačí časovú pečiatku transformovanú vo formáte dátumu do súboru protokolu.

9. Zaregistrujte si aplikáciu Stream

Prostredie Spring Cloud Data Flow Shell nám umožňuje zaregistrovať streamovanú aplikáciu v registri aplikácií pomocou registrácia aplikácie príkaz.

Musíme zadať jedinečný názov, typ aplikácie a URI, ktoré je možné vyriešiť na artefakt aplikácie. Ako typ uveďte „zdroj“, “procesor„Alebo“drez“.

Pri poskytovaní URI so schémou maven by mal formát zodpovedať nasledujúcemu:

maven: //: [: [:]]:

Ak chcete zaregistrovať Zdroj, procesor a drez predtým vytvorené aplikácie, prejdite na stránku Jarný cloudový dátový tok a z príkazového riadka zadajte nasledujúce príkazy:

register aplikácie - name time-source --typ source --uri maven: //com.baeldung.spring.cloud: spring-data-flow-time-source: jar: 0.0.1-SNAPSHOT app register --name time -procesor - typ procesora --uri maven: //com.baeldung.spring.cloud: spring-data-flow-time-processor: jar: 0.0.1-SNAPSHOT app register --name logging-sink --type sink --uri maven: //com.baeldung.spring.cloud: spring-data-flow-loging-sink: jar: 0.0.1-SNAPSHOT 

10. Vytvorte a nasaďte stream

Ak chcete vytvoriť novú definíciu streamu, prejdite na stránku Jarný cloudový dátový tok a spustite nasledujúci príkaz shellu:

stream create --name time-to-log --definition 'time-source | časový procesor | drez na ťažbu dreva

Toto definuje stream s názvom time-to-log na základe výrazu DSL „Zdroj času | časový procesor | drez na ťažbu dreva.

Potom na nasadenie streamu vykonajte nasledujúci príkaz shellu:

stream nasadiť --name čas do prihlásenia

The Server toku údajov rieši zdroj času, časový procesora drevorubač namapovať súradnice a pomocou nich spustiť zdroj času, časový procesor a drevorubač aplikácie streamu.

Ak je stream správne nasadený, uvidíte v Server toku údajov protokoly, že moduly boli spustené a navzájom spojené:

2016-08-24 12:29: 10.516 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: nasadenie aplikácie time-to-log.logging-sink inštancia 0 Záznamy budú v PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink 2016-08-24 12:29: 17.600 INFO 8096 --- [io-9393-exec-10] oscd spi.local.LocalAppDeployer: nasadenie aplikácie time-to-log.time-processor instance 0 Záznamy budú v PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034556862 / time-to-log.time-processor 2016-08-24 12:29: 23.280 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: nasadenie aplikácie time-to-log.time-source instance 0 Záznamy budú v PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034562861 / time-to-log.time-source

11. Kontrola výsledku

V tomto príklade zdroj jednoducho každú sekundu odošle aktuálnu časovú pečiatku ako správu, procesor ju naformátuje a výstup protokolu naformátuje načasovanú časovú pečiatku pomocou rámca protokolovania.

Súbory denníka sa nachádzajú v adresári zobrazenom v priečinku Server toku údajovVýstup protokolu, ako je uvedené vyššie. Aby sme videli výsledok, môžeme záznam ohraničiť:

tail -f PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink / stdout_0.log 2016-08-24 12:40: 42,029 INFO 9488 --- [ r.time-to-log-1] scSpringDataFlowLoggingSinkApplication: Prijaté: 2016/08/24 11:40:01 2016-08-24 12:40: 52,035 INFO 9488 --- [r.time-to-log-1 ] scSpringDataFlowLoggingSinkApplication: Prijaté: 2016/08/24 11:40:11 2016-08-24 12: 41: 02.030 INFO 9488 --- [r.time-to-log-1] scSpringDataFlowLoggingSinkApplication: Prijaté: 2016/08 / 24 11:40:21

12. Záver

V tomto článku sme videli, ako vytvoriť dátový kanál na spracovanie toku pomocou protokolu Tok dát jarného cloudu.

Videli sme tiež úlohu Zdroj, procesor a drez aplikácie vo vnútri streamu a ako pripojiť a zviazať tento modul vo vnútri a Server toku údajov prostredníctvom použitia Shell toku údajov.

Vzorový kód nájdete v projekte GitHub.


$config[zx-auto] not found$config[zx-overlay] not found