Dávkové spracovanie Java EE 7

1. Úvod

Predstavte si, že sme museli ručne dokončiť úlohy, ako je spracovanie výplatných pások, výpočet úrokov a generovanie účtov. Stal by sa dosť nudným, náchylným na chyby a nekonečným zoznamom manuálnych úloh!

V tomto výučbe sa pozrieme na Java Batch Processing (JSR 352), súčasť platformy Jakarta EE, a skvelú špecifikáciu pre automatizáciu podobných úloh. Vývojárom aplikácií ponúka model pre vývoj robustných systémov dávkového spracovania, aby sa mohli sústrediť na obchodnú logiku.

2. Maven závislosti

Pretože JSR 352 je iba špecifikácia, budeme musieť zahrnúť jeho API a implementáciu jberet:

 javax.batch javax.batch-api 1.0.1 org.jberet jberet-core 1.0.2.Final org.jberet jberet-support 1.0.2.Final org.jberet jberet-se 1.0.2.Final 

Pridáme tiež databázu v pamäti, aby sme sa mohli pozrieť na niektoré realistickejšie scenáre.

3. Kľúčové koncepty

JSR 352 predstavuje niekoľko konceptov, na ktoré sa môžeme pozrieť takto:

Najprv si zadefinujeme každý kúsok:

  • Začínajúc vľavo máme JobOperator. To spravuje všetky aspekty spracovania úloh, ako je spustenie, zastavenie a reštartovanie
  • Ďalej tu máme Job. Úloha je logická zbierka krokov; zapuzdruje celý dávkový proces
  • Úloha bude obsahovať od 1 do n Kroks. Každý krok je samostatnou postupnou pracovnou jednotkou. Krok sa skladá z čítanie vstup, spracovanie tento vstup a písanie výkon
  • A v neposlednom rade tu máme JobRepository ktorý ukladá priebežné informácie o pracovných pozíciách. Pomáha sledovať úlohy, ich stav a výsledky ich dokončenia

Kroky majú o niečo viac podrobností, tak sa na to pozrime ďalej. Najprv sa pozrieme na Kus kroky a potom o Šaržas.

4. Vytvorenie bloku

Ako už bolo uvedené vyššie, kus je akýmsi krokom. Často použijeme blok na vyjadrenie operácie, ktorá sa vykonáva opakovane, povedzme nad množinou položiek. Je to niečo ako prechodné operácie z Java Streams.

Pri popise bloku musíme vyjadriť, odkiaľ majú byť predmety odobraté, ako ich spracovať a kam ich potom poslať.

4.1. Čítanie položiek

Aby sme si mohli prečítať položky, budeme musieť implementovať ItemReader.

V takom prípade vytvoríme čítačku, ktorá jednoducho vydá čísla 1 až 10:

@Pomenovaná verejná trieda SimpleChunkItemReader rozširuje AbstractItemReader {private Integer [] tokeny; počet súkromných celých čísel; @Inject JobContext jobContext; @Override public Integer readItem () vyvolá výnimku {if (count> = tokens.length) {return null; } jobContext.setTransientUserData (count); návratové tokeny [počet ++]; } @Override public void open (Serializable checkpoint) throws Exception {tokens = new Integer [] {1,2,3,4,5,6,7,8,9,10}; počet = 0; }}

Teraz tu iba čítame z interného stavu triedy. Ale samozrejme, readItem mohol vytiahnuť z databázy, zo súborového systému alebo z iného externého zdroja.

Upozorňujeme, že niektoré z týchto vnútorných stavov ukladáme pomocou JobContext # setTransientUserData () ktoré sa neskôr budú hodiť.

Nezabudnite tiež na kontrolný bod parameter. Aj to si opäť vyzdvihneme.

4.2. Spracovanie položiek

Samozrejme, dôvod, prečo sa rozhodujeme, je ten, že chceme vykonať nejaký druh operácie s našimi položkami!

Kedykoľvek sa vrátime nulový z procesora položiek, vypustíme túto položku z dávky.

Povedzme tu teda, že si chceme ponechať iba párne čísla. Môžeme použiť ItemProcessor ktorá odmieta nepárne návratom nulový:

@Pomenovaná verejná trieda SimpleChunkItemProcessor implementuje ItemProcessor {@Override public Integer processItem (Object t) {Integer item = (Integer) t; vrátiť položku% 2 == 0? položka: null; }}

processItem sa zavolá raz pre každú položku, ktorú náš ItemReader vyžaruje.

4.3. Písanie položiek

Nakoniec úloha vyvolá ItemWriter aby sme mohli písať naše transformované položky:

@Pomenovaná verejná trieda SimpleChunkWriter rozširuje AbstractItemWriter {Zoznam spracovaný = nový ArrayList (); @Override public void writeItems (zoznam položiek) vyvolá výnimku {items.stream (). Mapu (Integer.class :: cast) .forEach (spracované :: pridať); }} 

Aké je dlhé položky? Za chvíľu definujeme veľkosť bloku, ktorá určí veľkosť zoznamu, na ktorý sa bude posielať writeItems.

4.4. Definovanie bloku v práci

Teraz sme to všetko spojili do súboru XML pomocou jazyka JSL alebo Job Specification Language. Upozorňujeme, že uvedieme zoznam našich čítačiek, procesorov, blokov a tiež veľkosti blokov:

Veľkosť diskovej jednotky je to, ako často sa pokrok v diskovej jednotke potvrdzuje v úložisku úloh, ktorá je dôležitá na zaručenie dokončenia, by mala zlyhať časť systému.

Tento súbor budeme musieť umiestniť do META-INF / dávkové úlohy pre.jar súbory a v WEB-INF / triedy / META-INF / dávkové úlohy pre .vojna súbory.

Dali sme svojej práci identifikačné číslo „SimpleChunk“, tak to skúsme v jednotkovom teste.

Teraz sa úlohy vykonávajú asynchrónne, čo znemožňuje ich testovanie. Vo vzorke nezabudnite skontrolovať naše BatchTestHelper ktorý hlasuje a čaká na dokončenie úlohy:

@Test public void givenChunk_thenBatch_completesWithSuccess () vyvolá výnimku {JobOperator jobOperator = BatchRuntime.getJobOperator (); Dlhé prevedenieId = jobOperator.start ("simpleChunk", nové vlastnosti ()); JobExecution jobExecution = jobOperator.getJobExecution (executionId); jobExecution = BatchTestHelper.keepTestAlive (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); } 

Takže to sú kúsky. Poďme sa teraz pozrieť na dávky.

5. Vytvorenie dávky

Nie všetko zapadá do iteračného modelu úhľadne. Napríklad môžeme mať úlohu, ktorú jednoducho potrebujeme vyvolať raz, spustiť do konca a vrátiť stav ukončenia.

Zmluva o dávke je dosť jednoduchá:

@Pomenovaná verejná trieda SimpleBatchLet rozširuje AbstractBatchlet {@Override public String process () vyvolá výnimku {return BatchStatus.COMPLETED.toString (); }}

Rovnako ako JSL:

A môžeme to otestovať pomocou rovnakého prístupu ako predtým:

@Test public void givenBatchlet_thenBatch_completeWithSuccess () vyvolá výnimku {JobOperator jobOperator = BatchRuntime.getJobOperator (); Dlhé prevedenieId = jobOperator.start ("simpleBatchLet", nové vlastnosti ()); JobExecution jobExecution = jobOperator.getJobExecution (executionId); jobExecution = BatchTestHelper.keepTestAlive (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

Pozreli sme sa teda na niekoľko rôznych spôsobov implementácie krokov.

Teraz sa pozrime na mechanizmy pre značenie a zaručenie pokroku.

6. Vlastný kontrolný bod

Zlyhania sa musia stať uprostred práce. Mali by sme jednoducho začať od celej veci, alebo môžeme nejako začať tam, kde sme skončili?

Ako už názov napovedá, kontrolné body pomôžte nám pravidelne nastavovať záložku pre prípad zlyhania.

Koniec spracovania bloku je v predvolenom nastavení prirodzeným kontrolným bodom.

Môžeme si ho však prispôsobiť pomocou vlastných Kontrolný bodový algoritmus:

@Pomenovaná verejná trieda CustomCheckPoint rozširuje AbstractCheckpointAlgorithm {@Inject JobContext jobContext; @Override public boolean isReadyToCheckpoint () vyvolá výnimku {int counterRead = (Integer) jobContext.getTransientUserData (); návrat CounterRead% 5 == 0; }}

Pamätáte si počet, ktorý sme predtým vložili do prechodných údajov? Tu, môžeme to vytiahnuť pomocou JobContext # getTransientUserDatauviesť, že sa chceme zaviazať na každom piatom spracovanom čísle.

Bez toho by došlo k potvrdeniu na konci každého bloku, alebo v našom prípade každého 3. čísla.

A potom to zladíme s pokladničný algoritmus smernica v našom XML pod naším blokom:

Vyskúšajme kód a znova si všimnime, že niektoré kroky štandardného postupu sú skryté BatchTestHelper:

@Test public void givenChunk_whenCustomCheckPoint_thenCommitCountIsThree () vyvolá výnimku {// ... spustiť úlohu a počkať na dokončenie jobOperator.getStepExecutions (executionId) .stream () .map (BatchTestHelper :: getCommitCount) .forEach (count -> assertEquals (3L .longValue ())); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

Mohli by sme teda čakať počet potvrdení 2, pretože máme desať položiek a nakonfigurujeme ich tak, aby boli každé piate. Ale, rámec na konci vykoná ešte jedno záverečné čítanie zabezpečiť, aby bolo všetko spracované, čo nás vedie až k 3.

Ďalej sa pozrime, ako zaobchádzať s chybami.

7. Riešenie výnimiek

Predvolene, operátor práce označí našu prácu ako Zlyhal v prípade výnimky.

Zmeňme našu čítačku položiek, aby sme sa uistili, že zlyháva:

@Override public Integer readItem () vyvolá výnimku {if (tokens.hasMoreTokens ()) {String tempTokenize = tokens.nextToken (); hodiť nový RuntimeException (); } return null; }

A potom otestujte:

@Test public void whenChunkError_thenBatch_CompletesWithFailed () hodí výnimku {// ... spustiť prácu a počkať na dokončenie assertEquals (jobExecution.getBatchStatus (), BatchStatus.FAILED); }

Toto predvolené správanie však môžeme prepísať niekoľkými spôsobmi:

  • skip-limit určuje počet výnimiek, ktoré bude tento krok pred zlyhaním ignorovať
  • retry-limit Určuje počet opakovaní pokusu operátora, aby zlyhal
  • skippable-exception-class určuje množinu výnimiek, ktoré bude spracovanie blokov ignorovať

Môžeme teda upraviť našu prácu tak, aby ju ignorovala RuntimeException, ako aj niekoľko ďalších, len pre ilustráciu:

A teraz náš kód prejde:

@Test public void givenChunkError_thenErrorSkipped_CompletesWithSuccess () vyvolá výnimku {// ... spustiť úlohu a počkať na dokončenie jobOperator.getStepExecutions (executionId) .stream () .map (BatchTestHelper :: getProcessSkipCount) .forEach (skipCountals - 1 .longValue ())); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8. Vykonanie viacerých krokov

Už sme spomínali, že práca môže mať ľubovoľný počet krokov, tak sa pozrime teraz.

8.1. Spustenie ďalšieho kroku

Predvolene, každý krok je posledným krokom v práci.

Aby sme mohli vykonať ďalší krok v rámci dávkovej úlohy, budeme musieť explicitne určiť pomocou Ďalšie atribút v rámci definície kroku:

Ak tento atribút zabudneme, ďalší krok v poradí sa nezrealizuje.

A vidíme, ako to vyzerá v API:

@Test public void givenTwoSteps_thenBatch_CompleteWithSuccess () vyvolá výnimku {// ... spustiť úlohu a počkať na dokončenie assertEquals (2, jobOperator.getStepExecutions (executionId) .size ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8.2. Toky

Postupnosť krokov môže byť tiež zapuzdrená do a tok. Po dokončení toku ide o celý tok, ktorý prechádza na vykonávací prvok. Prvky vo vnútri toku tiež nemôžu prechádzať na prvky mimo toku.

Môžeme povedzme vykonať dva kroky vo vnútri toku a potom vykonať prechod prechodu do izolovaného kroku:

Stále môžeme vidieť vykonávanie jednotlivých krokov nezávisle:

@Test public void givenFlow_thenBatch_CompleteWithSuccess () vyvolá výnimku {// ... spustiť úlohu a počkať na dokončenie assertEquals (3, jobOperator.getStepExecutions (executionId) .size ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8.3. Rozhodnutia

Máme tiež podporu if / else v podobe rozhodnutia. Rozhodnutia poskytujú prispôsobený spôsob určovania postupnosti medzi krokmi, tokmi a rozdeleniami.

Rovnako ako kroky funguje aj na prechodových prvkoch ako napr Ďalšie ktoré môžu riadiť alebo ukončiť vykonávanie úlohy.

Pozrime sa, ako je možné úlohu nakonfigurovať:

akýkoľvek rozhodnutie prvok je potrebné nakonfigurovať s triedou, ktorá implementuje Rozhoduje. Jeho úlohou je vrátiť rozhodnutie ako a String.

Každý Ďalšie vo vnútri rozhodnutie je ako prípade v prepínač vyhlásenie.

8.4. Rozdeľuje sa

Rozdeľuje sa sú užitočné, pretože nám umožňujú vykonávať toky súčasne:

Samozrejme, to znamená, že objednávka nie je zaručená.

Potvrdíme, že stále všetci bežia. Kroky toku sa vykonajú v ľubovoľnom poradí, izolovaný krok však bude vždy posledný:

@Test public void givenSplit_thenBatch_CompletesWithSuccess () vyvolá výnimku {// ... spustiť úlohu a počkať na dokončenie Zoznam stepExecutions = jobOperator.getStepExecutions (executionId); assertEquals (3, stepExecutions.size ()); assertEquals ("splitJobSequenceStep3", stepExecutions.get (2) .getStepName ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

9. Rozdelenie práce

Môžeme tiež spotrebovať vlastnosti dávky v rámci nášho Java kódu, ktoré boli definované v našej práci.

Môžu mať rozsah na troch úrovniach - úloha, krok a dávkový artefakt.

Pozrime sa na niekoľko príkladov, ako ich konzumovali.

Keď chceme spotrebovať vlastnosti na úrovni práce:

@Inject JobContext jobContext; ... jobProperties = jobContext.getProperties (); ...

Toto je možné spotrebovať aj na stupňovitej úrovni:

@Inject StepContext stepContext; ... stepProperties = stepContext.getProperties (); ...

Keď chceme spotrebovať vlastnosti na úrovni dávkového artefaktu:

@Inject @BatchProperty (name = "name") private String nameString;

To sa hodí pri oddieloch.

Uvidíte, s rozdeleniami môžeme súčasne riadiť toky. Ale môžeme tiež prepážka krok do n sady položiek alebo samostatné vstupy, čo nám umožňuje ďalší spôsob rozdelenia práce na viac vlákien.

Aby sme pochopili segment práce, ktorú by mal robiť každý oddiel, môžeme kombinovať vlastnosti s oddielmi:

10. Zastavte a reštartujte

Teraz je to na definovanie pracovných miest. Poďme sa teraz na chvíľu baviť o ich správe.

Už sme v našich jednotkových testoch videli, že môžeme získať príklad JobOperator od BatchRuntime:

JobOperator jobOperator = BatchRuntime.getJobOperator ();

A potom môžeme začať pracovať:

Dlhé prevedenieId = jobOperator.start ("simpleBatchlet", nové vlastnosti ());

Prácu však môžeme aj zastaviť:

jobOperator.stop (executionId);

A nakoniec môžeme úlohu reštartovať:

executionId = jobOperator.restart (executionId, nové vlastnosti ());

Pozrime sa, ako môžeme zastaviť spustenú prácu:

@Test public void givenBatchLetStarted_whenStopped_thenBatchStopped () vyvolá výnimku {JobOperator jobOperator = BatchRuntime.getJobOperator (); Dlhé prevedenieId = jobOperator.start ("simpleBatchLet", nové vlastnosti ()); JobExecution jobExecution = jobOperator.getJobExecution (executionId); jobOperator.stop (executionId); jobExecution = BatchTestHelper.keepTestStopped (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.STOPPED); }

A ak je šarža ZASTAVENÉ, potom ho môžeme reštartovať:

@Test public void givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess () {// ... spustiť a zastaviť úlohu assertEquals (jobExecution.getBatchStatus (), BatchStatus.STOPPED); executionId = jobOperator.restart (jobExecution.getExecutionId (), nové vlastnosti ()); jobExecution = BatchTestHelper.keepTestAlive (jobOperator.getJobExecution (executionId)); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

11. Načítanie pracovných miest

Po zadaní dávkovej úlohy dávkový runtime vytvorí inštanciu Vykonanie úlohy sledovať to.

Ak chcete získať Vykonanie úlohy pre ID spustenia môžeme použiť JobOperator # getJobExecution (executionId) metóda.

A StepExecution poskytuje užitočné informácie na sledovanie vykonania kroku.

Ak chcete získať StepExecution pre ID spustenia môžeme použiť JobOperator # getStepExecutions (executionId) metóda.

A z toho môžeme získať niekoľko metrík tohto kroku StepExecution # getMetrics:

@Test public void givenChunk_whenJobStarts_thenStepsHaveMetrics () vyvolá výnimku {// ... začať prácu a čakať na dokončenie assertTrue (jobOperator.getJobNames (). Contains ("simpleChunk")); assertTrue (jobOperator.getParameters (executionId) .isEmpty ()); StepExecution stepExecution = jobOperator.getStepExecutions (executionId) .get (0); Map metricTest = BatchTestHelper.getMetricsMap (stepExecution.getMetrics ()); assertEquals (10L, metricTest.get (Metric.MetricType.READ_COUNT) .longValue ()); assertEquals (5L, metricTest.get (Metric.MetricType.FILTER_COUNT) .longValue ()); assertEquals (4L, metricTest.get (Metric.MetricType.COMMIT_COUNT) .longValue ()); assertEquals (5L, metricTest.get (Metric.MetricType.WRITE_COUNT) .longValue ()); // ... a mnoho ďalších! }

12. Nevýhody

JSR 352 je výkonný, aj keď v mnohých oblastiach chýba:

  • Zdá sa, že nie je dostatok čitateľov a autorov, ktorí by mohli spracovávať ďalšie formáty, napríklad JSON
  • Neexistuje podpora generík
  • Delenie na oddiely podporuje iba jeden krok
  • API neponúka nič na podporu plánovania (aj keď J2EE má samostatný plánovací modul)
  • Vďaka svojej asynchrónnej povahe môže byť testovanie výzvou
  • API je dosť podrobné

13. Záver

V tomto článku sme sa pozreli na JSR 352 a dozvedeli sme sa o blokoch, dávkach, rozdeleniach, tokoch a oveľa viac. Napriek tomu sme povrch sotva poškriabali.

Demo kód ako vždy nájdete na GitHub.


$config[zx-auto] not found$config[zx-overlay] not found