Integrácia pružiny s AWS Kinesis

1. Úvod

Kinesis je nástroj na zhromažďovanie, spracovanie a analýzu dátových tokov v reálnom čase vyvinutý spoločnosťou Amazon. Jednou z jeho hlavných výhod je, že pomáha pri vývoji aplikácií riadených udalosťami.

V tomto tutoriáli sa pozrieme na niekoľko knižníc, ktoré umožnite našej aplikácii Spring vyrábať a spotrebúvať záznamy z Kinesis Stream. Príklady kódu ukážu základnú funkčnosť, ale nepredstavujú kód pripravený na výrobu.

2. Predpoklad

Než pôjdeme ďalej, musíme urobiť dve veci.

Prvým je vytvorenie jarného projektu, pretože cieľom je interakcia s programom Kinesis z jarného projektu.

Druhým je vytvorenie kinezického dátového toku. Môžeme to urobiť z webového prehliadača v našom účte AWS. Jednou z alternatív pre fanúšikov AWS CLI medzi nami je použitie príkazového riadku. Pretože s ním budeme interagovať z kódu, musíme mať po ruke aj poverenia AWS IAM, prístupový kľúč a tajný kľúč a oblasť.

Všetci naši producenti vytvoria fiktívne záznamy IP adries, zatiaľ čo spotrebitelia tieto hodnoty prečítajú a uvedú ich v konzole aplikácie.

3. AWS SDK pre Javu

Úplne prvou knižnicou, ktorú použijeme, je AWS SDK pre Javu. Jeho výhodou je, že nám umožňuje spravovať mnoho častí práce s dátovými prúdmi Kinesis. Môžeme čítať dáta, vyrábať dáta, vytvárať dátové toky a zmenšovať toky dát. Nevýhodou je, že ak chceme mať kód pripravený na výrobu, budeme musieť kódovať aspekty, ako je napríklad zmena formátu, spracovanie chýb alebo démon, aby sme udržali zákazníka nažive.

3.1. Maven závislosť

Závislosť amazon-kinesis-client Maven prinesie všetko, čo potrebujeme, aby sme mali fungujúce príklady. Teraz ju pridáme do našej pom.xml spis:

 com.amazonaws amazon-kinesis-client 1.11.2 

3.2. Jarné nastavenie

Zopakujme AmazonKinesis objekt potrebný na interakciu s našim Kinezným prúdom. Vytvoríme to ako @Bean vo vnútri nášho @SpringBootApplication trieda:

@Bean public AmazonKinesis buildAmazonKinesis () {BasicAWSCredentials awsCredentials = nový BasicAWSCredentials (accessKey, secretKey); return AmazonKinesisClientBuilder.standard () .withCredentials (new AWSStaticCredentialsProvider (awsCredentials)) .withRegion (Regions.EU_CENTRAL_1) .build (); }

Ďalej definujeme aws.access.key a aws.secret.key, potrebné pre miestny stroj, v application.properties:

aws.access.key = môj-aws-prístupový kľúč-ide-tu aws.secret.key = môj-aws-tajný-kľúč-ide-tu

A prečítame si ich pomocou @Hodnota anotácia:

@Value ("$ {aws.access.key}") private String accessKey; @Value ("$ {aws.secret.key}") private String secretKey;

Pre jednoduchosť sa budeme spoliehať na @Naplánovaný metódy vytvárania a spotreby záznamov.

3.3. Spotrebiteľ

The AWS SDK Kinesis Consumer používa pull model, čo znamená, že náš kód bude čerpať záznamy zo zlomkov dátového toku Kinesis:

GetRecordsRequest recordsRequest = nový GetRecordsRequest (); recordsRequest.setShardIterator (shardIterator.getShardIterator ()); recordsRequest.setLimit (25); GetRecordsResult recordsResult = kinesis.getRecords (recordsRequest); while (! recordsResult.getRecords (). isEmpty ()) {recordsResult.getRecords (). stream () .map (record -> new String (record.getData (). array ())) .forEach (System.out: : println); recordsRequest.setShardIterator (recordsResult.getNextShardIterator ()); recordsResult = kinesis.getRecords (recordsRequest); }

The GetRecordsRequest objekt vytvorí požiadavku na dáta toku. V našom príklade sme definovali limit 25 záznamov na žiadosť a pokračujeme v čítaní, kým nebude čo čítať.

Môžeme si tiež všimnúť, že pre svoju iteráciu sme použili a GetShardIteratorResult objekt. Tento objekt sme vytvorili vo vnútri a @PostConstrucMetóda tak, aby sme okamžite začali sledovať záznamy:

súkromný GetShardIteratorResult shardIterator; @PostConstruct private void buildShardIterator () {GetShardIteratorRequest readShardsRequest = nový GetShardIteratorRequest (); readShardsRequest.setStreamName (IPS_STREAM); readShardsRequest.setShardIteratorType (ShardIteratorType.LATEST); readShardsRequest.setShardId (IPS_SHARD_ID); this.shardIterator = kinesis.getShardIterator (readShardsRequest); }

3.4. Výrobca

Pozrime sa, ako na to spracovávať vytváranie záznamov pre náš dátový tok Kinesis.

Dáta vkladáme pomocou a PutRecordsRequest objekt. Pre tento nový objekt pridáme zoznam, ktorý obsahuje viac PutRecordsRequestEntry objekty:

Zoznam položiek = IntStream.range (1 200) .mapToObj (ipSuffix -> {PutRecordsRequestEntry záznam = nový PutRecordsRequestEntry (); entry.setData (ByteBuffer.wrap (("192.168.0." + IpSuffix) .getBytes ())) ; entry.setPartitionKey (IPS_PARTITION_KEY); návratový záznam;}). collect (Collectors.toList ()); PutRecordsRequest createRecordsRequest = nový PutRecordsRequest (); createRecordsRequest.setStreamName (IPS_STREAM); createRecordsRequest.setRecords (záznamy); kinesis.putRecords (createRecordsRequest);

Vytvorili sme základného spotrebiteľa a producenta simulovaných záznamov IP. Teraz už zostáva iba spustiť náš jarný projekt a pozrieť si adresy IP uvedené v našej aplikačnej konzole.

4. KCL a KPL

Kinesis Client Library (KCL) je knižnica, ktorá zjednodušuje konzumáciu záznamov. Je to tiež vrstva abstrakcie nad AWS SDK Java API pre kinezické dátové toky. V zákulisí knižnica spracováva rozloženie záťaže v mnohých inštanciách, reaguje na zlyhania inštancie, kontrolne boduje spracované záznamy a reaguje na prelomenie.

KPL Kinesis Producer Library (KPL) je knižnica užitočná na zápis do dátového toku Kinesis. Poskytuje tiež vrstvu abstrakcie, ktorá sedí nad AWS SDK Java API pre Kinesis Data Streams. Pre lepší výkon knižnica automaticky spracúva logiku dávkovania, viacerých vlákien a opakuje to.

KCL a KPL majú hlavnú výhodu v tom, že sa ľahko používajú, aby sme sa mohli sústrediť na produkciu a konzumáciu záznamov.

4.1. Maven závislosti

V prípade potreby je možné tieto dve knižnice v našom projekte priniesť osobitne. Ak chcete do nášho projektu Maven zahrnúť KPL a KCL, musíme aktualizovať náš súbor pom.xml:

 com.amazonaws amazon-kinesis-producent 0.13.1 com.amazonaws amazon-kinesis-klient 1.11.2 

4.2. Jarné nastavenie

Jedinou jarnou prípravou, ktorú potrebujeme, je zabezpečiť, aby sme mali po ruke poverenia IAM. Hodnoty pre aws.access.key a aws.secret.key sú definované v našom application.properties súbor, aby sme si ich mohli prečítať pomocou @Hodnota keď treba.

4.3. Spotrebiteľ

Najprv urobíme vytvoriť triedu, ktorá implementuje IRecordProcessor rozhranie a definuje našu logiku spôsobu zaobchádzania so záznamami dátového toku Kinesis, ktorá ich má vytlačiť v konzole:

verejná trieda IpProcessor implementuje IRecordProcessor {@Override public void initialize (InitializationInput initializationInput) {} @Override public void processRecords (ProcessRecordsInput processRecordsInput) {processRecordsInput.getRecords () .forEach (record -> System.out.println () .array ()))); } @Override vypnutie verejnej neplatnosti (ShutdownInput shutdownInput) {}}

Ďalším krokom je definovať továrenskú triedu, ktorá implementuje IRecordProcessorFactory rozhranie a vráti predtým vytvorené IpProcesor objekt:

public class IpProcessorFactory implementuje IRecordProcessorFactory {@Override public IRecordProcessor createProcessor () {return new IpProcessor (); }}

A teraz posledný krok, použijeme a Pracovník namietať, aby sme definovali náš kanál spotrebiteľov. Potrebujeme KinesisClientLibConfiguration objekt, ktorý v prípade potreby definuje poverenia IAM a oblasť AWS.

Prejdeme KinesisClientLibConfigurationa naše IpProcessorFactory objekt, na náš Pracovník a potom ho spustite v samostatnom vlákne. Túto logiku uchovávania záznamov udržiavame stále nažive pomocou protokolu Pracovník triedy, takže teraz neustále čítame nové záznamy:

BasicAWSCredentials awsCredentials = nové BasicAWSCredentials (accessKey, secretKey); KinesisClientLibConfiguration consumerConfig = nový KinesisClientLibConfiguration (APP_NAME, IPS_STREAM, nový AWSStaticCredentialsProvider (awsCredentials), IPS_WORKER) .withRegionName (Regions.EU_CENTRAL_1.getName ()); final Worker worker = new Worker.Builder () .recordProcessorFactory (nový IpProcessorFactory ()) .config (consumerConfig) .build (); CompletableFuture.runAsync (worker.run ());

4.4. Výrobca

Poďme si teraz definovať Konfigurácia KinesisProducer objekt, pridaním poverení IAM a oblasti AWS:

BasicAWSCredentials awsCredentials = nové BasicAWSCredentials (accessKey, secretKey); KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration () .setCredentialsProvider (new AWSStaticCredentialsProvider (awsCredentials)) .setVerifyCertificate (false) .setRegion (Regions.EU_CENTRAL_1.getName ()); this.kinesisProducer = nový KinesisProducer (producerConfig);

Zahrnieme kinesisVýrobca objekt predtým vytvorený v a @Naplánovaný nepretržite pracovať a vytvárať záznamy pre náš dátový tok Kinesis:

IntStream.range (1, 200) .mapToObj (ipSuffix -> ByteBuffer.wrap (("" 192.168.0. "+ IpSuffix) .getBytes ())). ForEach (entry -> kinesisProducer.addUserRecord (IPS_STREAM, IPS_PARTITION_KEY, záznam) );

5. Kinesis Spring Stream Stream Binder

Už sme videli dve knižnice, obe vytvorené mimo jarného ekosystému. No teraz Pozrite sa, ako môže kinesis Spring Cloud Stream Binder ďalej zjednodušiť náš život pri budovaní na vrchole Spring Cloud Stream.

5.1. Maven závislosť

Závislosť Maven, ktorú musíme definovať v našej aplikácii pre Spring Cloud Stream Binder Kinesis, je:

 org.springframework.cloud spring-cloud-stream-binder-kinesis 1.2.1.RELEASE 

5.2. Jarné nastavenie

Pri spustení na EC2 sa požadované vlastnosti AWS objavia automaticky, takže nie je potrebné ich definovať. Pretože spustíme naše príklady na lokálnom počítači, musíme pre náš účet AWS definovať náš prístupový kľúč, tajný kľúč a oblasť IAM. Zakázali sme tiež automatickú detekciu názvu zásobníka CloudFormation pre aplikáciu:

cloud.aws.credentials.access-key = môj-aws-prístupový kľúč cloud.aws.credentials.secret-key = môj-aws-tajný-kľúč cloud.aws.region.static = eu-central-1 cloud.aws .stack.auto = false

Spring Cloud Stream je dodávaný s tromi rozhraniami, ktoré môžeme použiť v našej väzbe streamu:

  • The drez je na príjem dát
  • The Zdroj sa používa na zverejňovanie záznamov
  • The procesor je kombináciou oboch

Ak je to potrebné, môžeme si definovať aj naše vlastné rozhrania.

5.3. Spotrebiteľ

Definovanie spotrebiteľa je práca zložená z dvoch častí. Najskôr definujeme v application.properties, dátový tok, z ktorého budeme spotrebovávať:

spring.cloud.stream.bindings.input.destination = live-ips spring.cloud.stream.bindings.input.group = live-ips-group spring.cloud.stream.bindings.input.content-type = text / plain

A potom definujme jar @ Komponent trieda. Anotácia @EnableBinding (Sink.class) nám umožní čítať z toku Kinesis pomocou metódy anotovanej s @StreamListener (Sink.INPUT):

@EnableBinding (Sink.class) verejná trieda IpConsumer {@StreamListener (Sink.INPUT) public void consume (String ip) {System.out.println (ip); }}

5.4. Výrobca

Producent sa dá rozdeliť aj na dve časti. Najprv musíme definovať naše vlastnosti streamu vo vnútri application.properties:

spring.cloud.stream.bindings.output.destination = live-ips spring.cloud.stream.bindings.output.content-type = text / plain

A potom pridáme @EnableBinding (Source.class) na jar @ Komponent a vytvárať nové testovacie správy každých pár sekúnd:

@Component @EnableBinding (Source.class) verejná trieda IpProducer {@Autowired súkromný zdrojový zdroj; @Scheduled (fixedDelay = 3000L) private void produce () {IntStream.range (1, 200) .mapToObj (ipSuffix -> "192.168.0." + IpSuffix) .forEach (entry -> source.output (). Send ( MessageBuilder.withPayload (entry) .build ())); }}

To je všetko, čo potrebujeme, aby Spring Cloud Stream Binder Kinesis fungoval. Aplikáciu môžeme jednoducho spustiť hneď teraz.

6. Záver

V tomto článku sme videli, ako integrovať náš projekt Spring s dvoma knižnicami AWS pre interakciu s kinezisovým dátovým prúdom. Tiež sme videli, ako používať knižnicu Spring Cloud Stream Binder Kinesis, aby sme implementáciu ešte uľahčili.

Zdrojový kód tohto článku nájdete na stránkach Github.