Sprievodca Apache Crunch

1. Úvod

V tomto výučbe si ukážeme Apache Crunch s príkladom aplikácie na spracovanie údajov. Túto aplikáciu spustíme pomocou rámca MapReduce.

Začneme krátkym vysvetlením niektorých konceptov Apache Crunch. Potom skočíme do ukážkovej aplikácie. V tejto aplikácii urobíme spracovanie textu:

  • Najskôr si prečítame riadky z textového súboru
  • Neskôr ich rozdelíme na slová a odstránime niektoré bežné slová
  • Potom zoskupíme zostávajúce slová, aby sme získali zoznam jedinečných slov a ich počet
  • Nakoniec tento zoznam napíšeme do textového súboru

2. Čo je to Crunch?

MapReduce je distribuovaný paralelný programovací rámec pre spracovanie veľkého množstva údajov na klastri serverov. Softvérové ​​rámce ako Hadoop a Spark implementujú MapReduce.

Crunch poskytuje rámec pre písanie, testovanie a spustenie potrubí MapReduce v Jave. Tu nepíšeme priamo úlohy MapReduce. Namiesto toho definujeme dátový kanál (t. J. Operácie na vykonávanie krokov vstupu, spracovania a výstupu) pomocou rozhraní Crunch API. Crunch Planner ich namapuje na úlohy MapReduce a podľa potreby ich vykoná.

Preto je každý dátový kanál Crunch koordinovaný inštanciou súboru Potrubie rozhranie. Toto rozhranie tiež definuje metódy čítania údajov do potrubia prostredníctvom Zdroj inštancie a zápis dát z potrubia do Cieľ inštancie.

Máme 3 rozhrania na reprezentáciu údajov:

  1. PCzber - nemenná, distribuovaná zbierka prvkov
  2. PTable<>, V> - nemenný, distribuovaný, neusporiadaný multimapa kľúčov a hodnôt
  3. PGroupedTable<>, V> - distribuovaná, triedená mapa kľúčov typu K až Iterable V, ktoré môžu byť iterované presne raz

DoFn je základná trieda pre všetky funkcie spracovania údajov. Zodpovedá to Mapovač, Reduktor a Kombinátor triedy v MapReduce. Väčšinu času stráveného vývojom trávime jeho písaním a testovaním logických výpočtov.

Teraz, keď už Crunch poznáme, použijeme ho na zostavenie ukážkovej aplikácie.

3. Pripravenie krízového projektu

Najskôr si založme Crunch Project s Mavenom. Môžeme tak urobiť dvoma spôsobmi:

  1. Pridajte požadované závislosti do súboru pom.xml súbor existujúceho projektu
  2. Na vytvorenie štartovacieho projektu použite archetyp

Poďme sa rýchlo pozrieť na oba prístupy.

3.1. Maven závislosti

Ak chcete pridať Crunch do existujúceho projektu, pridajme požadované závislosti do súboru pom.xml spis.

Najskôr pridajme chrumkavé jadro knižnica:

 org.apache.crunch crunch-core 0.15.0 

Ďalej pridajme hadoop-klient knižnica pre komunikáciu s Hadoop. Používame verziu zodpovedajúcu inštalácii Hadoop:

 poskytnutý org.apache.hadoop hadoop-client 2.2.0 

Môžeme skontrolovať, či Maven Central obsahuje najnovšie verzie knižníc crunch-core a hadoop-client.

3.2. Mavenský archetyp

Ďalším prístupom je rýchle vygenerovanie štartovacieho projektu pomocou archetypu Maven poskytnutého Crunchom:

mvn archetyp: generate -Dfilter = org.apache.crunch: crunch-archetype 

Na výzvu vyššie uvedeného príkazu poskytneme verziu Crunch a podrobnosti artefaktu projektu.

4. Crunch Pipeline Setup

Po nastavení projektu musíme vytvoriť a Potrubie objekt. Crunch má 3 Potrubie implementácie:

  • MRPipeline - vykonáva sa v rámci programu Hadoop MapReduce
  • SparkPipeline - vykonáva sa ako séria potrubí Spark
  • MemPipeline - vykonáva v pamäti klienta a je vhodný na testovanie jednotiek

Zvyčajne vyvíjame a testujeme pomocou inštancie MemPipeline. Neskôr použijeme inštanciu MRPipeline alebo SparkPipeline na skutočné vykonanie.

Ak by sme potrebovali potrubie v pamäti, mohli by sme použiť statickú metódu getInstance získať MemPipeline inštancia:

Pipeline pipeline = MemPipeline.getInstance ();

Ale teraz si vytvorme inštanciu MRPipeline na spustenie aplikácie pomocou Hadoop:

Pipeline pipeline = nový MRPipeline (WordCount.class, getConf ());

5. Prečítajte si vstupné údaje

Po vytvorení objektu pipeline chceme načítať vstupné údaje. The Potrubie rozhranie poskytuje pohodlnú metódu na čítanie vstupu z textového súboru, readTextFile (pathName).

Zavolajme túto metódu na čítanie vstupného textového súboru:

PCollection lines = pipeline.readTextFile (inputPath);

Vyššie uvedený kód číta textový súbor ako zbierku súborov String.

Ako ďalší krok napíšeme testovací prípad na čítanie vstupu:

@Test public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead () {Pipeline pipeline = MemPipeline.getInstance (); PCollection lines = pipeline.readTextFile (INPUT_FILE_PATH); assertEquals (21, lines.asCollection () .getValue () .size ()); }

V tomto teste overujeme, či pri čítaní textového súboru dostaneme očakávaný počet riadkov.

6. Kroky spracovania údajov

Po prečítaní vstupných údajov ich musíme spracovať. Crunch API obsahuje množstvo podtried triedy DoFn zvládnuť bežné scenáre spracovania údajov:

  • FilterFn - filtruje členov zbierky na základe boolovských podmienok
  • MapFn - mapuje každý vstupný záznam na presne jeden výstupný záznam
  • CombineFn - kombinuje niekoľko hodnôt do jednej hodnoty
  • JoinFn - vykonáva spojenia ako vnútorné spojenie, ľavé vonkajšie spojenie, pravé vonkajšie spojenie a úplné vonkajšie spojenie

Implementujme nasledujúcu logiku spracovania údajov pomocou týchto tried:

  1. Každý riadok vo vstupnom súbore rozdeľte na slová
  2. Odstráňte ukončovacie slová
  3. Spočítajte jedinečné slová

6.1. Rozdelte riadok textu na slová

Najskôr si vytvorme Tokenizer triedy na rozdelenie riadku na slová.

Predĺžime DoFn trieda. Táto trieda má abstraktnú metódu nazvanú procesu. Táto metóda spracováva vstupné záznamy z a PCzber a odošle výstup do Vysielač.

Logiku rozdelenia musíme implementovať do tejto metódy:

public class Tokenizer extends DoFn {private static final Splitter SPLITTER = Splitter .onPattern ("\ s +") .omitEmptyStrings (); @Override public void process (String line, Emitter emitter) {for (String word: SPLITTER.split (line)) {emitter.emit (word); }}} 

Vo vyššie uvedenej implementácii sme použili Rozdeľovač triedy z knižnice Guava na extrahovanie slov z riadku.

Ďalej napíšeme jednotkový test pre Tokenizer trieda:

@RunWith (MockitoJUnitRunner.class) verejná trieda TokenizerUnitTest {@Mock súkromný vysielač; @Test public void givenTokenizer_whenLineProcessed_thenOnlyExectedWordsEmitted () {Tokenizer splitter = nový Tokenizer (); splitter.process ("ahoj svet", vysielač); verify (emitor) .emit ("ahoj"); verify (emitor) .emit ("svet"); verifyNoMoreInteractions (emitor); }}

Vyššie uvedený test overuje, či sú vrátené správne slová.

Nakoniec pomocou tejto triedy rozdelíme riadky načítané zo vstupného textového súboru.

The paralelnéDo metóda PCzber rozhranie aplikuje dané DoFn ku všetkým prvkom a vráti nový PCzber.

Zavolajme túto metódu na kolekciu liniek a odovzdajme inštanciu Tokenizer:

PCollection words = lines.parallelDo (new Tokenizer (), Writables.strings ()); 

Vo výsledku dostaneme zoznam slov vo vstupnom textovom súbore. V ďalšom kroku odstránime zastavovacie slová.

6.2. Odstrániť zastavovacie slová

Podobne ako v predchádzajúcom kroku si vytvorme a StopWordFilter triedy na odfiltrovanie stop slov.

Avšak predĺžime FilterFn namiesto DoFn. FilterFn má abstraktnú metódu nazvanú súhlasiť. Logiku filtrovania musíme implementovať do tejto metódy:

public class StopWordFilter rozširuje FilterFn {// anglické stop slová, požičané od Lucene. súkromná statická konečná množina STOP_WORDS = ImmutableSet .copyOf (nový reťazec [] {"a", "a", "sú", "ako", "o", "byť", "ale", "od", "pre" , „ak“, „in“, „do“, „je“, „to“, „nie“, „nie“, „z“, „zapnuté“, „alebo“, „s“, „také“, „ t "," that "," the "," their "," then "," there "," these "," they "," this "," to "," was "," will "," with " }); @ Override public boolean accept (reťazcové slovo) {návrat! STOP_WORDS.contains (slovo); }}

Ďalej napíšeme jednotkový test pre StopWordFilter trieda:

public class StopWordFilterUnitTest {@Test public void givenFilter_whenStopWordPassed_thenFalseReturned () {FilterFn filter = new StopWordFilter (); assertFalse (filter.accept ("the")); assertFalse (filter.accept ("a")); } @Test public void givenFilter_whenNonStopWordPassed_thenTrueReturned () {FilterFn filter = nový StopWordFilter (); assertTrue (filter.accept ("Hello")); assertTrue (filter.accept ("svet")); } @Test public void givenWordCollection_whenFiltered_thenStopWordsRemoved () {PCollection words = MemPipeline .collectionOf ("This", "is", "a", "test", "vety"); PCollection noStopWords = words.filter (new StopWordFilter ()); assertEquals (ImmutableList.of ("Tento", "test", "veta"), Lists.newArrayList (noStopWords.materialize ())); }}

Tento test overuje, či sa filtračná logika vykonáva správne.

Nakoniec použijeme StopWordFilter na filtrovanie zoznamu slov vygenerovaných v predchádzajúcom kroku. The filter metóda PCzber rozhranie aplikuje dané FilterFn ku všetkým prvkom a vráti nový PCzber.

Zavolajme túto metódu na zbieranie slov a odovzdajme inštanciu StopWordFilter:

PCollection noStopWords = words.filter (new StopWordFilter ());

Vo výsledku dostaneme filtrovanú zbierku slov.

6.3. Počítajte jedinečné slová

Po získaní filtrovanej zbierky slov chceme spočítať, ako často sa každé slovo vyskytuje. PCzber rozhranie má množstvo metód na vykonávanie bežných agregácií:

  • min - vráti minimálny prvok zbierky
  • max - vráti maximálny prvok zbierky
  • dĺžka - vráti počet prvkov v zbierke
  • počítať - vráti a PTable ktorá obsahuje počet jednotlivých prvkov zbierky

Použime počítať metóda na získanie jedinečných slov spolu s ich počtom:

// Metóda count použije sériu primitívov Crunch a vráti // mapu jedinečných slov vo vstupnom PCollection na ich počet. Počty tabuliek PT = noStopWords.count ();

7. Zadajte výstup

Výsledkom predchádzajúcich krokov je, že máme tabuľku slov a ich počty. Tento výsledok chceme zapísať do textového súboru. The Potrubie rozhranie poskytuje pohodlné metódy na zápis výstupu:

void write (PCollection collection, Target target); void write (zbierka PCollection, Target target, Target.WriteMode writeMode); void writeTextFile (kolekcia PCollection, reťazec pathName);

Preto nazvime writeTextFile metóda:

pipeline.writeTextFile (counts, outputPath); 

8. Spravujte vykonávanie potrubia

Všetky doterajšie kroky práve definovali dátový kanál. Nebol prečítaný ani spracovaný žiadny vstup. To je preto, že Crunch používa model lenivého prevedenia.

To nespustí úlohy MapReduce, kým sa na rozhraní Pipeline nevyvolá metóda, ktorá riadi plánovanie a vykonávanie úloh:

  • bežať - pripraví plán vykonania na vytvorenie požadovaných výstupov a potom ho vykoná synchrónne
  • hotový - spustí všetky zostávajúce úlohy potrebné na generovanie výstupov a potom vyčistí všetky vytvorené prechodné dátové súbory
  • runAsync - je podobný spôsobu spustenia, ale vykonáva sa neblokujúcim spôsobom

Preto nazvime hotový metóda na vykonanie kanálu ako úlohy MapReduce:

PipelineResult result = pipeline.done (); 

Vyššie uvedené vyhlásenie spustí úlohy MapReduce na načítanie vstupu, ich spracovanie a zapísanie výsledku do výstupného adresára.

9. Zostavenie potrubia

Doteraz sme vyvinuli a jednotkovo otestovali logiku na načítanie vstupných údajov, ich spracovanie a zápis do výstupného súboru.

Ďalej ich spojíme a zostavíme celý dátový kanál:

public int run (String [] args) hodí Exception {String inputPath = args [0]; Reťazec outputPath = args [1]; // Vytvorenie objektu na koordináciu vytvárania a vykonávania potrubia. Pipeline pipeline = nový MRPipeline (WordCount.class, getConf ()); // Odkaz na daný textový súbor ako na zbierku reťazcov. PCollection lines = pipeline.readTextFile (inputPath); // Definujte funkciu, ktorá rozdelí každý riadok v PCollection reťazcov na // PCollection zložený z jednotlivých slov v súbore. // Druhý argument nastavuje formát serializácie. PCollection words = lines.parallelDo (new Tokenizer (), Writables.strings ()); // Vezmite zbierku slov a odstráňte známe zastavovacie slová. PCollection noStopWords = words.filter (new StopWordFilter ()); // Metóda count použije sériu primitívov Crunch a vráti // mapu jedinečných slov vo vstupnom PCollection na ich počet. Počty tabuliek PT = noStopWords.count (); // Poraďte kanálu, aby výsledné počty zapísal do textového súboru. pipeline.writeTextFile (counts, outputPath); // Vykonajte kanál ako MapReduce. PipelineResult result = pipeline.done (); vrátiť výsledok.podarilo sa ()? 0: 1; }

10. Konfigurácia spustenia Hadoop

Dátový kanál je tak pripravený.

Na jeho spustenie však potrebujeme kód. Preto napíšme hlavný spôsob spustenia aplikácie:

public class WordCount extends Configured implements Tool {public static void main (String [] args) throws Exception {ToolRunner.run (new Configuration (), new WordCount (), args); }

ToolRunner.run analyzuje konfiguráciu Hadoop z príkazového riadku a vykoná úlohu MapReduce.

11. Spustite aplikáciu

Kompletná aplikácia je teraz pripravená. Spustíme nasledujúci príkaz na jeho zostavenie:

balíček mvn 

Výsledkom vyššie uvedeného príkazu je, že v cieľovom adresári dostaneme zabalenú aplikáciu a špeciálny zásobník úloh.

Použime tento job jar na vykonanie aplikácie na Hadoop:

terč hadoop / crunch-1.0-SNAPSHOT-job.jar 

Aplikácia načíta vstupný súbor a zapíše výsledok do výstupného súboru. Výstupný súbor obsahuje jedinečné slová a ich počty podobné týmto:

[Pridať, 1] [Pridané, 1] [Obdiv, 1] [Priznáva, 1] [Príspevok, 1]

Okrem Hadoopu môžeme aplikáciu spustiť v rámci IDE, ako samostatnú aplikáciu alebo ako jednotkové testy.

12. Záver

V tomto tutoriáli sme vytvorili aplikáciu na spracovanie dát bežiacu na MapReduce. Apache Crunch uľahčuje písanie, testovanie a vykonávanie potrubí MapReduce v Jave.

Celý zdrojový kód nájdete ako obvykle na serveri Github.


$config[zx-auto] not found$config[zx-overlay] not found