Úvod do Apache Storm

1. Prehľad

Tento tutoriál bude úvodom do Apache Storm, distribuovaný výpočtový systém v reálnom čase.

Zameriame sa a pokryjeme:

  • Čo to vlastne Apache Storm je a aké problémy rieši
  • Jeho architektúra a
  • Ako ho použiť v projekte

2. Čo je Apache Storm?

Apache Storm je bezplatný a otvorený distribuovaný systém pre výpočty v reálnom čase.

Poskytuje odolnosť voči chybám, škálovateľnosť a zaručuje spracovanie údajov a je obzvlášť dobrý pri spracovaní neobmedzených tokov údajov.

Niektoré dobré prípady použitia Stormu môžu byť spracovanie operácií s kreditnými kartami na detekciu podvodov alebo spracovanie dát z inteligentných domov na detekciu chybných senzorov.

Storm umožňuje integráciu s rôznymi databázami a systémami čakania na fronte dostupnými na trhu.

3. Maven závislosť

Predtým, ako použijeme Apache Storm, musíme do nášho projektu zahrnúť závislosť typu storm-core:

 poskytnuté org.apache.storm storm-core 1.2.2 

Mali by sme používať iba poskytnutý rozsah ak máme v úmysle spustiť našu aplikáciu na klastri Storm.

Na lokálne spustenie aplikácie môžeme použiť takzvaný lokálny režim, ktorý bude simulovať klaster Storm v lokálnom procese, v takom prípade by sme mali odstrániť za predpokladu.

4. Dátový model

Dátový model Apache Storm sa skladá z dvoch prvkov: n-tice a streamy.

4.1. Násobný

A Násobný je usporiadaný zoznam pomenovaných polí s dynamickými typmi. To znamená, že nemusíme výslovne deklarovať typy polí.

Storm musí vedieť, ako serializovať všetky hodnoty, ktoré sa používajú v n-tici. V predvolenom nastavení už môže serializovať primitívne typy, Struny a bajt polia.

A keďže Storm používa serializáciu Kryo, musíme zaregistrovať serializátor pomocou Konfig používať vlastné typy. Môžeme to urobiť jedným z dvoch spôsobov:

Najskôr môžeme triedu zaregistrovať na serializáciu pomocou jej celého názvu:

Config config = nový Config (); config.registerSerialization (User.class);

V takom prípade Kryo serializuje triedu pomocou FieldSerializer. Predvolene sa tým serializujú všetky neprechodné polia triedy, súkromné ​​aj verejné.

Alebo namiesto toho môžeme poskytnúť triedu na serializáciu aj serializátor, ktorý chce spoločnosť Storm pre túto triedu použiť:

Config config = nový Config (); config.registerSerialization (User.class, UserSerializer.class);

Aby sme vytvorili vlastný serializátor, musíme rozšíriť generickú triedu Serializátor ktorá má dve metódy napíš a čítať.

4.2. Prúd

A Prúd je základnou abstrakciou v ekosystéme Búrky. The Prúd je neobmedzená sekvencia n-tic.

Storms umožňuje paralelné spracovanie viacerých prúdov.

Každý stream má identifikátor, ktorý je poskytnutý a pridelený počas deklarácie.

5. Topológia

Logika aplikácie Storm v reálnom čase je zabalená do topológie. Topológia sa skladá z chrliče a skrutky.

5.1. Chrlič

Chrliče sú zdrojom prúdov. Vydávajú n-tice podľa topológie.

Tice možno čítať z rôznych externých systémov, ako sú Kafka, Kestrel alebo ActiveMQ.

Chrliče môžu byť spoľahlivé alebo nespoľahlivý. Spoľahlivý Znamená, že hubica môže odpovedať na tú n-ticu, ktorú Storm nedokázal spracovať. Nespoľahlivé Znamená, že hubica neodpovedá, pretože bude na vydávanie n-tíc používať mechanizmus oheň-a-zabudni.

Na vytvorenie vlastného výtoku je potrebné implementovať IRichSpout rozhranie alebo rozšíriť ktorúkoľvek triedu, ktorá už rozhranie implementuje, napríklad abstrakt BaseRichSpout trieda.

Vytvorme nespoľahlivý výtok:

verejná trieda RandomIntSpout rozširuje BaseRichSpout {private Random random; private SpoutOutputCollector outputCollector; @Override public void open (Mapa mapy, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {random = new Random (); outputCollector = spoutOutputCollector; } @Override public void nextTuple () {Utils.sleep (1000); outputCollector.emit (nové hodnoty (random.nextInt (), System.currentTimeMillis ())); } @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("randomInt", "timestamp")); }}

Náš zvyk RandomIntSpout vygeneruje každú sekundu náhodné celé číslo a časovú značku.

5.2. Bolt

Skrutky spracovávajú n-tice v prúde. Môžu vykonávať rôzne operácie, ako je filtrovanie, agregácie alebo vlastné funkcie.

Niektoré operácie vyžadujú viac krokov, a preto v takýchto prípadoch budeme musieť použiť viac skrutiek.

Na vytvorenie zvyku Bolt, musíme implementovať IRichBolt alebo pre jednoduchšie operácie IBasicBolt rozhranie.

Na implementáciu je k dispozícii aj niekoľko pomocných tried Bolt. V takom prípade použijeme BaseBasicBolt:

public class PrintingBolt extends BaseBasicBolt {@Override public void execute (Tuple n-tice, BasicOutputCollector basicOutputCollector) {System.out.println (n-tice); } @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {}}

Tento zvyk PrintingBolt jednoducho vytlačí všetky n-tice na konzolu.

6. Vytvorenie jednoduchej topológie

Poďme tieto myšlienky spojiť do jednoduchej topológie. Naša topológia bude mať jeden výtok a tri skrutky.

6.1. RandomNumberSpout

Na začiatku vytvoríme nespoľahlivý výtok. Každú sekundu vygeneruje náhodné celé čísla z rozsahu (0,100):

verejná trieda RandomNumberSpout rozširuje BaseRichSpout {private Random random; súkromný zberač SpoutOutputCollector; @Override public void open (Mapa mapy, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {random = new Random (); collector = spoutOutputCollector; } @Override public void nextTuple () {Utils.sleep (1000); int operation = random.nextInt (101); long timestamp = System.currentTimeMillis (); Hodnoty hodnoty = nové hodnoty (prevádzka, časová pečiatka); collector.emit (hodnoty); } @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("operation", "timestamp")); }}

6.2. FilteringBolt

Ďalej vytvoríme skrutku, ktorá odfiltruje všetky prvky pomocou prevádzka rovná sa 0:

public class FilteringBolt extends BaseBasicBolt {@Override public void execute (Tuple tuple, BasicOutputCollector basicOutputCollector) {int operation = tuple.getIntegerByField ("operation"); if (operácia> 0) {basicOutputCollector.emit (tuple.getValues ​​()); }} @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("operation", "timestamp")); }}

6.3. AggregatingBolt

Ďalej si vytvoríme komplikovanejšiu Bolt ktorá spojí všetky pozitívne operácie z každého dňa.

Na tento účel použijeme konkrétnu triedu vytvorenú špeciálne na implementáciu skrutiek, ktoré fungujú v systéme Windows, a nie v jednej n-tici: BaseWindowedBolt.

Windows sú základným konceptom pri spracovaní toku, ktorý rozdeľuje nekonečné prúdy na konečné časti. Potom môžeme použiť výpočty pre každý blok. Spravidla existujú dva typy okien:

Časové okná sa používajú na zoskupenie prvkov z daného časového obdobia pomocou časových značiek. Časové okná môžu obsahovať iný počet prvkov.

Počet okien sa používa na vytvorenie okien s definovanou veľkosťou. V takom prípade budú mať všetky okná rovnakú veľkosť a okno bude nebudú emitované, ak je menej prvkov ako je definovaná veľkosť.

Náš AggregatingBolt vygeneruje súčet všetkých pozitívnych operácií z a časové okno spolu s jeho začiatočnými a koncovými časovými značkami:

verejná trieda AggregatingBolt rozširuje BaseWindowedBolt {private OutputCollector outputCollector; @Override public void prepare (mapa stormConf, kontext TopologyContext, kolektor OutputCollector) {this.outputCollector = collector; } @Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("sumOfOperations", "beginningTimestamp", "endTimestamp")); } @Override public void execute (TupleWindow tupleWindow) {List tuples = tupleWindow.get (); tuples.sort (Comparator.comparing (this :: getTimestamp)); int sumOfOperations = tuples.stream () .mapToInt (tuple -> tuple.getIntegerByField ("operácia")) .sum (); Dlhý začiatokTimestamp = getTimestamp (tuples.get (0)); Long endTimestamp = getTimestamp (tuples.get (tuples.size () - 1)); Hodnoty hodnoty = nové hodnoty (sumOfOperations, beginTimestamp, endTimestamp); outputCollector.emit (hodnoty); } private Long getTimestamp (Tuple tuple) {return tuple.getLongByField ("timestamp"); }}

Upozorňujeme, že v tomto prípade je bezpečné získať prvý prvok zoznamu priamo. Je to tak preto, lebo každé okno sa počíta pomocou znaku časová značka pole Násobný, tak musí byť aspoň jeden prvok v každom okne.

6.4. FileWritingBolt

Nakoniec vytvoríme skrutku, ktorá zoberie všetky prvky sumOfOperations väčšie ako 2000, serializujte ich a zapíšte do súboru:

public class FileWritingBolt extends BaseRichBolt {public static Logger logger = LoggerFactory.getLogger (FileWritingBolt.class); súkromný spisovateľ BufferedWriter; private String filePath; súkromný ObjectMapper objectMapper; @Override public void cleanup () {try {writer.close (); } catch (IOException e) {logger.error ("Nepodarilo sa zavrieť program!"); }} @Override public void preparation (mapa, TopologyContext topologyContext, OutputCollector outputCollector) {objectMapper = nový ObjectMapper (); objectMapper.setVisibility (PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); try {writer = new BufferedWriter (new FileWriter (filePath)); } catch (IOException e) {logger.error ("Nepodarilo sa otvoriť súbor na zápis.", e); }} @Override public void execute (Tuple n-tica) {int sumOfOperations = tuple.getIntegerByField ("sumOfOperations"); dlhý začiatočnýTimestamp = tuple.getLongByField ("začínajúciTimestamp"); long endTimestamp = tuple.getLongByField ("endTimestamp"); if (sumOfOperations> 2000) {AggregatedWindow aggregatedWindow = new AggregatedWindow (sumOfOperations, beginningTimestamp, endTimestamp); try {writer.write (objectMapper.writeValueAsString (aggregatedWindow)); writer.newLine (); writer.flush (); } catch (IOException e) {logger.error ("Nepodarilo sa zapísať údaje do súboru.", e); }}} // verejný konštruktor a ďalšie metódy}

Upozorňujeme, že výstup nemusíme deklarovať, pretože to bude posledná skrutka v našej topológii

6.5. Spustenie topológie

Nakoniec môžeme všetko spojiť a spustiť našu topológiu:

public static void runTopology () {builder TopologyBuilder = nový TopologyBuilder (); Spout random = new RandomNumberSpout (); builder.setSpout ("randomNumberSpout"); Filtrovanie skrutiek = nový FilteringBolt (); builder.setBolt ("filteringBolt", filtrovanie) .shuffleGrouping ("randomNumberSpout"); Agregácia skrutiek = new AggregatingBolt () .withTimestampField ("timestamp") .withLag (BaseWindowedBolt.Duration.seconds (1)) .withWindow (BaseWindowedBolt.Duration.seconds (5)); builder.setBolt ("aggregatingBolt", agregácia) .shuffleGrouping ("filteringBolt"); Reťazec filePath = "./src/main/resources/data.txt"; Bolt file = new FileWritingBolt (filePath); builder.setBolt ("fileBolt", file) .shuffleGrouping ("aggregatingBolt"); Config config = nový Config (); config.setDebug (false); Klaster LocalCluster = nový LocalCluster (); cluster.submitTopology ("Test", config, builder.createTopology ()); }

Ak chcete, aby dátový tok prechádzal každým kúskom v topológii, musíme naznačiť, ako ich spojiť. shuffleGroup nám umožňuje uviesť tieto údaje pre filteringBolt bude pochádzať z randomNumberSpout.

Pre každý Bolt, musíme pridať shuffleGroup ktorá definuje zdroj prvkov pre túto skrutku. Zdrojom prvkov môže byť a Chrlič alebo iný Bolt. A ak nastavíme rovnaký zdroj pre viac ako jednu skrutku, zdroj bude emitovať všetky prvky do každého z nich.

V tomto prípade bude naša topológia používať znak LocalCluster spustiť prácu lokálne.

7. Záver

V tomto tutoriáli sme predstavili Apache Storm, distribuovaný výpočtový systém v reálnom čase. Vytvorili sme výtok, niekoľko skrutiek a spojili sme ich do úplnej topológie.

A ako vždy, všetky ukážky kódu nájdete na GitHub.