ETL s dátovým tokom Spring Cloud

1. Prehľad

Spring Cloud Data Flow je cloudová natívna sada nástrojov na vytváranie dátových potrubí a dávkových procesov v reálnom čase. Tok dát Spring Cloud je pripravený na použitie v rade prípadov použitia spracovania údajov, ako je jednoduchý import / export, spracovanie ETL, streamovanie udalostí a prediktívna analýza.

V tomto tutoriále sa dozvieme príklad extrakcie transformácie a načítania (ETL) v reálnom čase pomocou toku prúdov, ktorý extrahuje údaje z databázy JDBC, transformuje ich na jednoduché POJO a načíta do MongoDB.

2. Spracovanie ETL a toku udalostí

ETL - extrakt, transformácia a načítanie - sa bežne označoval ako proces, ktorý hromadne načítava údaje z niekoľkých databáz a systémov do spoločného dátového skladu. V tomto dátovom sklade je možné vykonávať náročné analýzy analýzy údajov bez toho, aby bol ohrozený celkový výkon systému.

Nové trendy však menia spôsob, akým sa to deje. ETL má stále úlohu pri prenose údajov do dátových skladov a dátových jazier.

V dnešnej dobe sa to dá urobiť pomocou prúdy v architektúre udalostí a prúdov pomocou Spring Cloud Data Flow.

3. Jarný cloudový dátový tok

S Spring Cloud Data Flow (SCDF) môžu vývojári vytvárať dátové kanály v dvoch príchutiach:

  • Aplikácie streamov v reálnom čase s dlhou životnosťou využívajúce Spring Cloud Stream
  • Krátkodobé dávkové aplikácie úloh využívajúce Spring Cloud Task

V tomto článku sa budeme venovať prvej, dlhovekej streamovacej aplikácii založenej na Spring Cloud Stream.

3.1. Aplikácie Spring Cloud Stream

Potrubie toku SCDF sa skladá z krokov, kdekaždý krok je aplikácia postavená v štýle Spring Boot pomocou mikrorámca Spring Cloud Stream. Tieto aplikácie sú integrované pomocou middlewaru na odosielanie správ, ako je Apache Kafka alebo RabbitMQ.

Tieto aplikácie sú rozdelené do zdrojov, procesorov a výleviek. V porovnaní s procesom ETL by sme mohli povedať, že zdrojom je „extrakt“, procesor „transformátor“ a drez časť „záťaž“.

V niektorých prípadoch môžeme použiť spúšťač aplikácií v jednom alebo viacerých krokoch potrubia. To znamená, že by sme krok nemuseli implementovať novú aplikáciu, ale namiesto toho nakonfigurovať už implementovaný existujúci spúšťač aplikácií.

Zoznam spúšťacích aplikácií nájdete tu.

3.2. Server jarných cloudových dátových tokov

Posledným kúskom architektúry je Spring Cloud Data Flow Server. Server SCDF vykonáva nasadenie aplikácií a toku potrubí pomocou špecifikácie Spring Cloud Deployer. Táto špecifikácia podporuje cloudovú natívnu príchuť SCDF nasadením do radu moderných runtime, ako sú Kubernetes, Apache Mesos, Yarn a Cloud Foundry.

Stream môžeme spustiť aj ako lokálne nasadenie.

Viac informácií o architektúre SCDF nájdete tu.

4. Nastavenie prostredia

Skôr ako začneme, musíme vyberte kúsky tohto zložitého nasadenia. Prvou časťou, ktorú je potrebné definovať, je server SCDF.

Na testovanie na miestny vývoj použijeme SCDF Server Local. Pre produkčné nasadenie si môžeme neskôr zvoliť cloudový natívny runtime, ako je SCDF Server Kubernetes. Zoznam runtime serverov nájdete tu.

Teraz skontrolujeme systémové požiadavky na spustenie tohto servera.

4.1. Požiadavky na systém

Aby sme mohli prevádzkovať SCDF Server, budeme musieť definovať a nastaviť dve závislosti:

  • - middleware pre zasielanie správ a -
  • RDBMS.

Pre middlevér na odosielanie správ budeme pracovať s RabbitMQ a ako RDBMS zvolíme PostgreSQL na ukladanie našich definícií potrubného toku.

Pre spustenie RabbitMQ si stiahnite najnovšiu verziu tu a spustite inštanciu RabbitMQ pomocou predvolenej konfigurácie alebo spustite nasledujúci príkaz Docker:

docker run --name dataflow-rabbit -p 15672: 15672 -p 5672: 5672 -d rabbitmq: 3-management

Ako posledný krok nastavenia nainštalujte a spustite PostgreSQL RDBMS na predvolenom porte 5432. Potom vytvorte databázu, do ktorej môže SCDF ukladať svoje definície prúdov pomocou nasledujúceho skriptu:

CREATE DATABASE dátový tok;

4.2. Jarný cloudový server toku údajov lokálne

Pre spustenie lokálneho servera SCDF môžeme zvoliť spustenie servera pomocou ukotviť-zložiť, alebo ho môžeme spustiť ako aplikáciu Java.

Tu spustíme SCDF Server Local ako aplikáciu Java. Pre konfiguráciu aplikácie musíme definovať konfiguráciu ako parametre aplikácie Java. Na systémovej ceste budeme potrebovať Java 8.

Aby sme mohli hostovať poháre a závislosti, musíme vytvoriť domovský priečinok pre náš server SCDF a stiahnuť do tohto priečinka lokálnu distribúciu servera SCDF. Tu si môžete stiahnuť najnovšiu distribúciu servera SCDF Server Local.

Tiež musíme vytvoriť priečinok lib a vložiť tam ovládač JDBC. Najnovšia verzia ovládača PostgreSQL je k dispozícii tu.

Nakoniec spustíme lokálny server SCDF:

$ java -Dloader.path = lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \ --spring.datasource.url = jdbc: postgresql: //127.0.0.1: 5432 / dataflow \ --spring.datasource.username = postgres_username \ --spring.datasource.password = postgres_password \ --spring.datasource.driver-class-name = org.postgresql.Driver \ --spring.rabbitmq.host = 127.0.0.1 \ --spring.rabbitmq.port = 5672 \ --spring.rabbitmq.username = hosť \ --spring.rabbitmq.password = hosť

To, či je spustené, môžeme skontrolovať na tejto adrese URL:

// localhost: 9393 / dashboard

4.3. Jarný cloudový dátový tok

SCDF Shell je a nástroj príkazového riadku, ktorý umožňuje ľahké zostavenie a nasadenie našich aplikácií a potrubí. Tieto príkazy Shell prebiehajú cez REST API servera Spring Cloud Data Flow Server.

Stiahnite si najnovšiu verziu nádoby do svojho domovského priečinka SCDF, ktorý je k dispozícii tu. Po dokončení spustite nasledujúci príkaz (podľa potreby aktualizujte verziu):

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar ____ ____ _ __ / ___ | _ __ _ __ (_) _ __ __ _ / ___ | | ___ _ _ __ | | \ ___ \ | „_ \ | „__ | | „_ \ / _` | | | | | / _ \ | | | | / _` | ___) | | _) | | | | | | | (_ | | | | ___ | | (_) | | _ | | (_ | | | ____ / | .__ / | _ | | _ | _ | | _ | \ __, | \ ____ | _ | \ ___ / \ __, _ | \ __, _ | ____ | _ | _ __ | ___ / __________ | _ \ __ _ | | _ __ _ | ___ | | _____ __ \ \ \ \ \ \ | | | | / _` | __ / _` | | | _ | | / _ \ \ / \ / / \ \ \ \ \ | | _ | | (_ | | || (_ | | | _ | | | (_) \ VV / / / / / / / ____ / \ __, _ | \ __ \ __, _ | | _ | | _ | \ ___ / \ _ / \ _ / / _ / _ / _ / _ / _ / Vitajte na shell Spring Cloud Data Flow. Ak potrebujete pomoc, stlačte TAB alebo napíšte „help“. dataflow:>

Ak namiesto „dátový tok:> ” dostanete „server-unknown:> ” v poslednom riadku nespúšťate server SCDF na serveri localhost. V takom prípade sa spustením nasledujúceho príkazu pripojte k inému hostiteľovi:

server-unknown:> server konfigurácie toku údajov // {host}

Teraz je Shell pripojený k serveru SCDF a my môžeme spúšťať naše príkazy.

Prvá vec, ktorú musíme v Shell urobiť, je importovať spúšťače aplikácií. Najnovšiu verziu programu RabbitMQ + Maven nájdete tu v Spring Boot 2.0.x a spustite nasledujúci príkaz (znova aktualizujte verziu, tu “Darwin-SR1", podľa potreby):

$ dataflow:> import aplikácií --uri //bit.ly/Darwin-SR1-stream-applications-rabbit-maven

Na kontrolu nainštalovaných aplikácií spustite nasledujúci príkaz Shell:

$ dataflow:> zoznam aplikácií

Vo výsledku by sa nám mala zobraziť tabuľka obsahujúca všetky nainštalované aplikácie.

SCDF tiež ponúka grafické rozhranie s názvom Flo, ku ktorým máme prístup na tejto adrese: // localhost: 9393 / dashboard. Jeho použitie však nie je v rozsahu tohto článku.

5. Zostavenie potrubia ETL

Poďme teraz vytvoriť náš prúdový tok. Na tento účel použijeme štartovací program zdroja JDBC na extrahovanie informácií z našej relačnej databázy.

Tiež vytvoríme vlastný procesor na transformáciu informačnej štruktúry a vlastný drez na načítanie našich údajov do MongoDB.

5.1. Extrakt - Príprava relačnej databázy na extrakciu

Vytvorme databázu s názvom crm a tabuľku s menom zákazník:

VYTVORIŤ DATABÁZU crm;
CREATE TABLE customer (id bigint NOT NULL, imported boolean DEFAULT false, customer_name character varying (50), PRIMARY KEY (id))

Upozorňujeme, že používame príznak dovezené, ktorá uloží, ktorý záznam už bol importovaný. Ak je to potrebné, mohli by sme tieto informácie tiež uložiť do inej tabuľky.

Teraz vložme niekoľko údajov:

INSERT INTO customer (id, customer_name, imported) VALUES (1, 'John Doe', false);

5.2. Transformácia - mapovanie JDBC Polia do MongoDB Štruktúra polí

Pre transformačný krok urobíme jednoduchý preklad poľa Meno zákazníka zo zdrojovej tabuľky do nového poľa názov. Tu je možné vykonať ďalšie transformácie, ale nechajme si príklad krátky.

Za týmto účelom vytvoríme nový projekt s názvom transformácia zákazníka. Najjednoduchšie to urobíte tak, že na vytvorenie projektu použijete web Spring Initializr. Po prechode na webovú stránku vyberte názov skupiny a artefaktu. Použijeme com.zákazník a zákaznícka transformácia, resp.

Po dokončení kliknite na tlačidlo „Generovať projekt“ a stiahnite si projekt. Potom projekt rozbaľte a importujte do svojho obľúbeného IDE a pridajte do neho nasledujúcu závislosť pom.xml:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 

Teraz sme nastavení na začatie kódovania prevodu názvu poľa. Za týmto účelom vytvoríme Zákazník triedy fungovať ako adaptér. Táto trieda dostane Meno zákazníka cez setName () metódou a svoju hodnotu odošle cez getName metóda.

The @JsonProperty anotácie urobia transformáciu pri deserializácii z JSON na Java:

public class Zákazník {private Long id; súkromné ​​meno reťazca; @JsonProperty ("customer_name") public void setName (názov reťazca) {this.name = meno; } @JsonProperty ("name") public String getName () {návratové meno; } // Zakladatelia a zakladatelia}

Procesor musí prijímať údaje zo vstupu, vykonať transformáciu a spojiť výsledok s výstupným kanálom. Vytvorme na to triedu:

importovať org.springframework.cloud.stream.annotation.EnableBinding; importovať org.springframework.cloud.stream.messaging.procesor; importovať org.springframework.integration.annotation.Transformer; @EnableBinding (Processor.class) verejná trieda CustomerProcessorConfiguration {@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) verejný zákazník convertToPojo (užitočné zaťaženie zákazníka) {návrat užitočné zaťaženie; }}

Vo vyššie uvedenom kóde môžeme pozorovať, že transformácia nastáva automaticky. Vstup prijíma údaje ako JSON a Jackson ich deserializuje na a Zákazník objekt pomocou nastaviť metódy.

Opak je pre výstup, dáta sú serializované do JSON pomocou dostať metódy.

5.3. Load - Sink v MongoDB

Podobne ako v transformačnom kroku, vytvoríme ďalší projekt maven, teraz s menom zákazník-mongodb-drez. Opäť prístup k jarnej Initializr, pre skupinu zvoliť com.zákazníka pre Artefakt vyberte customer-mongodb-sink. Potom napíšte MongoDB do vyhľadávacieho poľa závislostí a stiahnite si projekt.

Ďalej ho rozbaľte a importujte do svojho obľúbeného IDE.

Potom pridajte rovnakú závislosť navyše ako v priečinku transformácia zákazníka projekt.

Teraz vytvoríme ďalší Zákazník triedy, na príjem vstupov v tomto kroku:

importovať org.springframework.data.mongodb.core.mapping.Document; @Document (kolekcia = "zákazník") verejná trieda Zákazník {private Long id; súkromné ​​meno reťazca; // Getters and Setters}

Na potopenie Zákazník, vytvoríme triedu poslucháča, ktorá uloží entitu zákazníka pomocou Repozitár zákazníka:

@EnableBinding (Sink.class) verejná trieda CustomerListener {@Autowired súkromné ​​úložisko CustomerRepository; @StreamListener (Sink.INPUT) public void save (zákazník zákazníka) {repository.save (zákazník); }}

A Repozitár zákazníka, v tomto prípade je a Úložisko Mongo z jarných údajov:

importovať org.springframework.data.mongodb.repository.MongoRepository; importovať org.springframework.stereotyp.Repository; @Repository verejné rozhranie CustomerRepository rozširuje MongoRepository {} 

5.4. Definícia streamu

Teraz, obe vlastné aplikácie sú pripravené na registráciu na serveri SCDF. Aby ste to dosiahli, kompilujte oba projekty pomocou príkazu Maven inštalácia mvn.

Potom ich zaregistrujeme pomocou Spring Cloud Data Flow Shell:

registrácia aplikácie - názov transformácie zákazníka - typ procesora --uri maven: //com.zákazník: transformácia zákazníka: 0,0.1-SNAPSHOT
register aplikácií --name customer-mongodb-sink --type sink --uri maven: //com.customer: customer-mongodb-sink: jar: 0.0.1-SNAPSHOT

Na záver skontrolujme, či sú aplikácie uložené v SCDF, spustíme príkaz zoznamu aplikácií v shelli:

zoznam aplikácií

Vo výsledku by sme sa mali vo výslednej tabuľke pozrieť obe aplikácie.

5.4.1. Streamujte jazyk špecifický pre doménu kanálu - DSL

DSL definuje konfiguráciu a tok údajov medzi aplikáciami. SCDF DSL je jednoduchý. V prvom slove definujeme názov aplikácie a potom konfigurácie.

Syntax tiež predstavuje syntax Pipeline inšpirovanú Unixom, ktorá na pripojenie viacerých aplikácií využíva zvislé pruhy, známe aj ako „pipe“:

http --port = 8181 | log

Takto sa vytvorí aplikácia HTTP obsluhovaná na porte 8181, ktorá odošle akékoľvek prijaté užitočné množstvo tela do protokolu.

Teraz sa pozrime, ako vytvoriť definíciu toku DSL zdroja JDBC.

5.4.2. Definícia zdrojového toku JDBC

Kľúčové konfigurácie pre zdroj JDBC sú dopyt a aktualizovať.dopyt vyberie neprečítané záznamy, zatiaľ čo aktualizovať zmení príznak, aby sa zabránilo opätovnému načítaniu aktuálnych záznamov.

Definujeme tiež zdroj JDBC, ktorý bude dopytovať s pevným oneskorením 30 sekúnd a maximálnym počtom dotazov bude 1 000 riadkov. Na záver definujeme konfigurácie pripojenia, napríklad ovládač, používateľské meno, heslo a adresu URL pripojenia:

jdbc --query = 'SELECT id, customer_name FROM public.customer WHERE imported = false' --update = 'UPDATE public.customer SET imported = true WHERE id in (: id)' --max-lines-per-poll = 1000 --fixed-delay = 30 --time-unit = SECONDS --driver-class-name = org.postgresql.Driver --url = jdbc: postgresql: // localhost: 5432 / crm --username = postgres - heslo = postgres

Viac vlastností konfigurácie zdroja JDBC nájdete tu.

5.4.3. Zákazník Definícia MongoDB Sink Stream

Pretože sme nedefinovali konfigurácie pripojenia v application.properties z customer-mongodb-sink, nakonfigurujeme pomocou parametrov DSL.

Naša aplikácia je úplne založená na MongoDataAutoConfiguration. Tu si môžete pozrieť ďalšie možné konfigurácie. V zásade definujeme jar.data.mongodb.uri:

customer-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main

5.4.4. Vytvorte a nasaďte stream

Najskôr vytvorte konečnú definíciu streamu, vráťte sa do prostredia Shell a vykonajte nasledujúci príkaz (bez zalomenia riadku, práve boli vložené kvôli čitateľnosti):

stream create --name jdbc-to-mongodb --definition "jdbc --query = 'SELECT id, customer_name FROM public.customer WHERE imported = false' --fixed-delay = 30 --max-lines-per-poll = 1 000 --update = 'AKTUALIZOVANÁ SÚPRAVA zákazníka = true WHERE id in (: id)' --time-unit = SECONDS --password = postgres --driver-class-name = org.postgresql.Driver --username = postgres --url = jdbc: postgresql: // localhost: 5432 / crm | customer-transform | customer-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main " 

Tento prúd DSL definuje prúd s názvom jdbc-to-mongodb. Ďalšie, nasadíme stream podľa jeho názvu:

stream nasadiť --name jdbc-to-mongodb 

Nakoniec by sme mali vidieť umiestnenie všetkých dostupných protokolov vo výstupe protokolu:

Denníky budú v {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink Denníky budú v {PATH_TO_LOG} / spring-cloud-deployer / jdbc-to-mongodb /jdbc-to-mongodb.customer-transform Záznamy budú v {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6. Záver

V tomto článku sme videli úplný príklad dátového toku ETL pomocou Spring Cloud Data Flow.

Najpozoruhodnejšie je, že sme videli konfigurácie spúšťača aplikácií, vytvorili sme potrubie toku ETL pomocou Spring Cloud Data Flow Shell a implementovali sme vlastné aplikácie pre naše čítanie, transformáciu a zápis dát.

Vzorový kód nájdete ako vždy v projekte GitHub.


$config[zx-auto] not found$config[zx-overlay] not found