Apache Spark: Rozdiely medzi dátovými rámcami, množinami údajov a RDD

1. Prehľad

Apache Spark je rýchly a distribuovaný systém na spracovanie údajov. Robí spracovanie údajov v pamäti a využíva ukladanie do pamäte cache a optimalizované vykonávanie, čo vedie k rýchlemu výkonu. Poskytuje rozhrania API na vysokej úrovni pre populárne programovacie jazyky ako Scala, Python, Java a R.

V tomto rýchlom návode si prejdeme tri základné koncepty Sparku: dátové rámce, množiny údajov a RDD.

2. Dátový rámec

Spark SQL predstavil od Spark 1.3 tabuľkovú abstrakciu dát, ktorá sa nazýva DataFrame. Od tej doby sa stala jednou z najdôležitejších funkcií Sparku. Toto API je užitočné, keď chceme spracovať štruktúrované a pološtruktúrované distribuované údaje.

V časti 3 si povieme Resilient Distributed Datasets (RDD). Údajové rámce ukladajú údaje účinnejším spôsobom ako RDD, je to preto, lebo využívajú nezmeniteľné, pamäťové, odolné, distribuované a paralelné možnosti RDD, ale na údaje tiež používajú schému. DataFrames tiež prevádzajú kód SQL na optimalizované operácie RDD na nízkej úrovni.

DataFrames môžeme vytvoriť tromi spôsobmi:

  • Konverzia existujúcich RDD
  • Spúšťajú sa dotazy SQL
  • Načítavajú sa externé údaje

Predstavil sa tím Spark SparkSession vo verzii 2.0 zjednocuje všetky rôzne kontexty a zaisťuje, že sa vývojári nebudú musieť starať o vytváranie rôznych kontextov:

SparkSession session = SparkSession.builder () .appName ("TouristDataFrameExample") .master ("local [*]") .getOrCreate (); DataFrameReader dataFrameReader = session.read ();

Budeme analyzovať Tourist.csv spis:

Dataset data = dataFrameReader.option ("hlavička", "pravda") .csv ("data / Tourist.csv");

Odkedy sa Spark 2.0 DataFrame stal a Množina údajov typu Riadok, takže môžeme použiť DataFrame ako alias pre a Množina údajov.

Môžeme vybrať konkrétne stĺpce, ktoré nás zaujímajú. Môžeme tiež filtrovať a zoskupiť podľa daného stĺpca:

data.select (col ("krajina"), col ("rok"), col ("hodnota")) .show (); data.filter (col ("krajina"). equalTo ("Mexiko")) .show (); data.groupBy (col ("country")) .count () .show ();

3. Množiny údajov

Dátová sada je sada štruktúrovaných údajov so silným typom. Poskytujú známy objektovo orientovaný štýl programovania a výhody bezpečnosti typov, pretože súbory údajov môžu skontrolovať syntax a zachytiť chyby v čase kompilácie.

Množina údajov je rozšírením DataFrame, môžeme ho teda považovať za netypové zobrazenie množiny údajov.

Tím Spark vydal Množina údajov API v Spark 1.6 a ako už spomenuli: „cieľom Spark Datasets je poskytnúť API, ktoré používateľom umožní ľahko vyjadrovať transformácie v doménach objektov a zároveň poskytne výkon a robustnosť výkonného nástroja Spark SQL“.

Najskôr budeme musieť vytvoriť triedu typu TouristData:

public class TouristData {private String region; súkromná reťazcová krajina; súkromný reťazcový rok; súkromná sláčiková séria; súkromná dvojitá hodnota; súkromné ​​poznámky pod čiarou; súkromný zdroj reťazcov; // ... zakladatelia a zakladatelia}

Na mapovanie každého z našich záznamov na zadaný typ budeme musieť použiť kódovač. Kodéry prekladajú medzi objektmi Java a interným binárnym formátom Sparku:

// Inicializácia SparkSession a načítanie dát Dátová sada responseWithSelectedColumns = data.select (col ("region"), col ("krajina"), col ("rok"), col ("séria"), col ("hodnota"). Cast („dvojitá“), col („poznámky pod čiarou“), col („zdroj“)); Dataset typedDataset = responseWithSelectedColumns .as (Encoders.bean (TouristData.class));

Rovnako ako v prípade DataFrame môžeme filtrovať a zoskupovať podľa konkrétnych stĺpcov:

typedDataset.filter ((FilterFunction) record -> record.getCountry () .equals ("Nórsko")) .show (); typedDataset.groupBy (typedDataset.col ("krajina")). účet () .show ();

Môžeme tiež robiť operácie ako filter podľa stĺpca, ktorý zodpovedá určitému rozsahu alebo vypočítava súčet konkrétneho stĺpca, aby sme získali jeho celkovú hodnotu:

typedDataset.filter ((FilterFunction) record -> record.getYear ()! = null && (Long.valueOf (record.getYear ())> 2010 && Long.valueOf (record.getYear ()) record.getValue ()! = null && record.getSeries () .contains ("výdavky")) .groupBy ("krajina") .agg (suma ("hodnota")) .show ();

4. RDD

Resilient Distributed Dataset alebo RDD je Sparkova primárna programátorská abstrakcia. Predstavuje kolekciu prvkov, ktorá je: nemenný, odolný a distribuovaný.

RDD zapuzdruje veľký súbor údajov, Spark automaticky distribuuje údaje obsiahnuté v RDD po ​​celom našom klastri a paralelizuje operácie, ktoré na nich vykonávame.

RDD môžeme vytvárať iba pomocou operácií s dátami v stabilnom úložisku alebo operáciami na iných RDD.

Tolerancia chýb je nevyhnutná, keď pracujeme s veľkými množinami údajov a údaje sú distribuované na klastrových strojoch. RDD sú odolné vďaka vstavanej mechanike zotavenia po chybe spoločnosti Spark. Spark sa spolieha na skutočnosť, že RDD si pamätajú, ako boli vytvorené, aby sme mohli ľahko vystopovať pôvod a obnoviť oddiel.

Na RDD môžeme robiť dva typy operácií: Transformácie a akcie.

4.1. Premeny

Môžeme použiť Transformácie na RDD na manipuláciu s jeho údajmi. Po vykonaní tejto manipulácie dostaneme úplne nový RDD, pretože RDD sú nemenné objekty.

Skontrolujeme, ako implementovať Map a Filter, dve z najbežnejších transformácií.

Najprv musíme vytvoriť a JavaSparkContext a načítať údaje ako RDD z Tourist.csv spis:

SparkConf conf = nový SparkConf (). SetAppName ("uppercaseCountries") .setMaster ("local [*]"); JavaSparkContext sc = nový JavaSparkContext (conf); JavaRDD turisti = sc.textFile ("data / Tourist.csv");

Ďalej použijeme mapovú funkciu na získanie názvu krajiny z každého záznamu a prevedenie názvu na veľké písmená. Túto novo vygenerovanú množinu údajov môžeme uložiť ako textový súbor na disk:

JavaRDD upperCaseCountries = turisti.map (riadok -> {reťazec [] stĺpce = line.split (COMMA_DELIMITER); vrátiť stĺpce [1] .toUpperCase ();}). Different (); upperCaseCountries.saveAsTextFile ("data / output / uppercase.txt");

Ak chceme vybrať iba konkrétnu krajinu, môžeme použiť funkciu filtra na našich pôvodných turistov RDD:

JavaRDD turistiInMexico = turisti .filter (riadok -> riadok.split (COMMA_DELIMITER) [1] .equals ("Mexiko")); turistovInMexico.saveAsTextFile ("data / output / touristInMexico.txt");

4.2. Akcie

Akcie po vykonaní určitého výpočtu údajov vrátia konečnú hodnotu alebo uložia výsledky na disk.

Dve z opakovane používaných akcií v programe Spark sú spočítať a znížiť.

Počítajme celkový počet krajín v našom súbore CSV:

// Inicializácia kontextu iskru a načítanie dát JavaRDD countries = tourist.map (line -> {String [] columns = line.split (COMMA_DELIMITER); návratové stĺpce [1];}). Different (); Long numberOfCountries = countries.count ();

Teraz vypočítame celkové výdavky podľa krajín. Budeme musieť filtrovať záznamy obsahujúce v ich popise výdavky.

Namiesto použitia a JavaRDD, použijeme a JavaPairRDD. Pár RDD je typ RDD, ktorý dokáže ukladať páry kľúč - hodnota. Poďme to skontrolovať ďalej:

JavaRDD touristExpenditure = turisti .filter (riadok -> riadok.split (COMMA_DELIMITER) [3] .obsahuje („výdavky“)); JavaPairRDD výdajPairRdd = turistiExpendiment .mapToPair (riadok -> {reťazec [] stĺpce = line.split (COMMA_DELIMITER); vrátiť nový Tuple2 (stĺpce [1], Double.valueOf (stĺpce [6])));}); Zoznam totalByCountry = výdajPairRdd .reduceByKey ((x, y) -> x + y) .collect ();

5. Záver

Ak to zhrnieme, mali by sme použiť DataFrames alebo datasety, keď potrebujeme API špecifické pre doménu, potrebujeme výrazy na vysokej úrovni, ako napríklad agregácia, súčet alebo dotazy SQL. Alebo keď chceme typovú bezpečnosť v čase kompilácie.

Na druhej strane by sme RDD mali používať, keď sú dáta neštruktúrované a nepotrebujeme implementovať konkrétnu schému alebo keď potrebujeme transformácie a akcie na nízkej úrovni.

Všetky vzorky kódu sú ako vždy k dispozícii na GitHub.


$config[zx-auto] not found$config[zx-overlay] not found