Úvod do Apache Storm
1. Prehľad
Tento tutoriál bude úvodom do Apache Storm, distribuovaný výpočtový systém v reálnom čase.
Zameriame sa a pokryjeme:
- Čo to vlastne Apache Storm je a aké problémy rieši
- Jeho architektúra a
- Ako ho použiť v projekte
2. Čo je Apache Storm?
Apache Storm je bezplatný a otvorený distribuovaný systém pre výpočty v reálnom čase.
Poskytuje odolnosť voči chybám, škálovateľnosť a zaručuje spracovanie údajov a je obzvlášť dobrý pri spracovaní neobmedzených tokov údajov.
Niektoré dobré prípady použitia Stormu môžu byť spracovanie operácií s kreditnými kartami na detekciu podvodov alebo spracovanie dát z inteligentných domov na detekciu chybných senzorov.
Storm umožňuje integráciu s rôznymi databázami a systémami čakania na fronte dostupnými na trhu.
3. Maven závislosť
Predtým, ako použijeme Apache Storm, musíme do nášho projektu zahrnúť závislosť typu storm-core:
poskytnuté org.apache.storm storm-core 1.2.2
Mali by sme používať iba poskytnutý rozsah ak máme v úmysle spustiť našu aplikáciu na klastri Storm.
Na lokálne spustenie aplikácie môžeme použiť takzvaný lokálny režim, ktorý bude simulovať klaster Storm v lokálnom procese, v takom prípade by sme mali odstrániť za predpokladu.
4. Dátový model
Dátový model Apache Storm sa skladá z dvoch prvkov: n-tice a streamy.
4.1. Násobný
A Násobný je usporiadaný zoznam pomenovaných polí s dynamickými typmi. To znamená, že nemusíme výslovne deklarovať typy polí.
Storm musí vedieť, ako serializovať všetky hodnoty, ktoré sa používajú v n-tici. V predvolenom nastavení už môže serializovať primitívne typy, Struny a bajt polia.
A keďže Storm používa serializáciu Kryo, musíme zaregistrovať serializátor pomocou Konfig používať vlastné typy. Môžeme to urobiť jedným z dvoch spôsobov:
Najskôr môžeme triedu zaregistrovať na serializáciu pomocou jej celého názvu:
Config config = nový Config (); config.registerSerialization (User.class);
V takom prípade Kryo serializuje triedu pomocou FieldSerializer. Predvolene sa tým serializujú všetky neprechodné polia triedy, súkromné aj verejné.
Alebo namiesto toho môžeme poskytnúť triedu na serializáciu aj serializátor, ktorý chce spoločnosť Storm pre túto triedu použiť:
Config config = nový Config (); config.registerSerialization (User.class, UserSerializer.class);
Aby sme vytvorili vlastný serializátor, musíme rozšíriť generickú triedu Serializátor ktorá má dve metódy napíš a čítať.
4.2. Prúd
A Prúd je základnou abstrakciou v ekosystéme Búrky. The Prúd je neobmedzená sekvencia n-tic.
Storms umožňuje paralelné spracovanie viacerých prúdov.
Každý stream má identifikátor, ktorý je poskytnutý a pridelený počas deklarácie.
5. Topológia
Logika aplikácie Storm v reálnom čase je zabalená do topológie. Topológia sa skladá z chrliče a skrutky.
5.1. Chrlič
Chrliče sú zdrojom prúdov. Vydávajú n-tice podľa topológie.
Tice možno čítať z rôznych externých systémov, ako sú Kafka, Kestrel alebo ActiveMQ.
Chrliče môžu byť spoľahlivé alebo nespoľahlivý. Spoľahlivý Znamená, že hubica môže odpovedať na tú n-ticu, ktorú Storm nedokázal spracovať. Nespoľahlivé Znamená, že hubica neodpovedá, pretože bude na vydávanie n-tíc používať mechanizmus oheň-a-zabudni.
Na vytvorenie vlastného výtoku je potrebné implementovať IRichSpout rozhranie alebo rozšíriť ktorúkoľvek triedu, ktorá už rozhranie implementuje, napríklad abstrakt BaseRichSpout trieda.
Vytvorme nespoľahlivý výtok:
verejná trieda RandomIntSpout rozširuje BaseRichSpout {private Random random; private SpoutOutputCollector outputCollector; @Override public void open (Mapa mapy, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {random = new Random (); outputCollector = spoutOutputCollector; } @Override public void nextTuple () {Utils.sleep (1000); outputCollector.emit (nové hodnoty (random.nextInt (), System.currentTimeMillis ())); } @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("randomInt", "timestamp")); }}
Náš zvyk RandomIntSpout vygeneruje každú sekundu náhodné celé číslo a časovú značku.
5.2. Bolt
Skrutky spracovávajú n-tice v prúde. Môžu vykonávať rôzne operácie, ako je filtrovanie, agregácie alebo vlastné funkcie.
Niektoré operácie vyžadujú viac krokov, a preto v takýchto prípadoch budeme musieť použiť viac skrutiek.
Na vytvorenie zvyku Bolt, musíme implementovať IRichBolt alebo pre jednoduchšie operácie IBasicBolt rozhranie.
Na implementáciu je k dispozícii aj niekoľko pomocných tried Bolt. V takom prípade použijeme BaseBasicBolt:
public class PrintingBolt extends BaseBasicBolt {@Override public void execute (Tuple n-tice, BasicOutputCollector basicOutputCollector) {System.out.println (n-tice); } @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {}}
Tento zvyk PrintingBolt jednoducho vytlačí všetky n-tice na konzolu.
6. Vytvorenie jednoduchej topológie
Poďme tieto myšlienky spojiť do jednoduchej topológie. Naša topológia bude mať jeden výtok a tri skrutky.
6.1. RandomNumberSpout
Na začiatku vytvoríme nespoľahlivý výtok. Každú sekundu vygeneruje náhodné celé čísla z rozsahu (0,100):
verejná trieda RandomNumberSpout rozširuje BaseRichSpout {private Random random; súkromný zberač SpoutOutputCollector; @Override public void open (Mapa mapy, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {random = new Random (); collector = spoutOutputCollector; } @Override public void nextTuple () {Utils.sleep (1000); int operation = random.nextInt (101); long timestamp = System.currentTimeMillis (); Hodnoty hodnoty = nové hodnoty (prevádzka, časová pečiatka); collector.emit (hodnoty); } @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("operation", "timestamp")); }}
6.2. FilteringBolt
Ďalej vytvoríme skrutku, ktorá odfiltruje všetky prvky pomocou prevádzka rovná sa 0:
public class FilteringBolt extends BaseBasicBolt {@Override public void execute (Tuple tuple, BasicOutputCollector basicOutputCollector) {int operation = tuple.getIntegerByField ("operation"); if (operácia> 0) {basicOutputCollector.emit (tuple.getValues ()); }} @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("operation", "timestamp")); }}
6.3. AggregatingBolt
Ďalej si vytvoríme komplikovanejšiu Bolt ktorá spojí všetky pozitívne operácie z každého dňa.
Na tento účel použijeme konkrétnu triedu vytvorenú špeciálne na implementáciu skrutiek, ktoré fungujú v systéme Windows, a nie v jednej n-tici: BaseWindowedBolt.
Windows sú základným konceptom pri spracovaní toku, ktorý rozdeľuje nekonečné prúdy na konečné časti. Potom môžeme použiť výpočty pre každý blok. Spravidla existujú dva typy okien:
Časové okná sa používajú na zoskupenie prvkov z daného časového obdobia pomocou časových značiek. Časové okná môžu obsahovať iný počet prvkov.
Počet okien sa používa na vytvorenie okien s definovanou veľkosťou. V takom prípade budú mať všetky okná rovnakú veľkosť a okno bude nebudú emitované, ak je menej prvkov ako je definovaná veľkosť.
Náš AggregatingBolt vygeneruje súčet všetkých pozitívnych operácií z a časové okno spolu s jeho začiatočnými a koncovými časovými značkami:
verejná trieda AggregatingBolt rozširuje BaseWindowedBolt {private OutputCollector outputCollector; @Override public void prepare (mapa stormConf, kontext TopologyContext, kolektor OutputCollector) {this.outputCollector = collector; } @Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("sumOfOperations", "beginningTimestamp", "endTimestamp")); } @Override public void execute (TupleWindow tupleWindow) {List tuples = tupleWindow.get (); tuples.sort (Comparator.comparing (this :: getTimestamp)); int sumOfOperations = tuples.stream () .mapToInt (tuple -> tuple.getIntegerByField ("operácia")) .sum (); Dlhý začiatokTimestamp = getTimestamp (tuples.get (0)); Long endTimestamp = getTimestamp (tuples.get (tuples.size () - 1)); Hodnoty hodnoty = nové hodnoty (sumOfOperations, beginTimestamp, endTimestamp); outputCollector.emit (hodnoty); } private Long getTimestamp (Tuple tuple) {return tuple.getLongByField ("timestamp"); }}
Upozorňujeme, že v tomto prípade je bezpečné získať prvý prvok zoznamu priamo. Je to tak preto, lebo každé okno sa počíta pomocou znaku časová značka pole Násobný, tak musí byť aspoň jeden prvok v každom okne.
6.4. FileWritingBolt
Nakoniec vytvoríme skrutku, ktorá zoberie všetky prvky sumOfOperations väčšie ako 2000, serializujte ich a zapíšte do súboru:
public class FileWritingBolt extends BaseRichBolt {public static Logger logger = LoggerFactory.getLogger (FileWritingBolt.class); súkromný spisovateľ BufferedWriter; private String filePath; súkromný ObjectMapper objectMapper; @Override public void cleanup () {try {writer.close (); } catch (IOException e) {logger.error ("Nepodarilo sa zavrieť program!"); }} @Override public void preparation (mapa, TopologyContext topologyContext, OutputCollector outputCollector) {objectMapper = nový ObjectMapper (); objectMapper.setVisibility (PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); try {writer = new BufferedWriter (new FileWriter (filePath)); } catch (IOException e) {logger.error ("Nepodarilo sa otvoriť súbor na zápis.", e); }} @Override public void execute (Tuple n-tica) {int sumOfOperations = tuple.getIntegerByField ("sumOfOperations"); dlhý začiatočnýTimestamp = tuple.getLongByField ("začínajúciTimestamp"); long endTimestamp = tuple.getLongByField ("endTimestamp"); if (sumOfOperations> 2000) {AggregatedWindow aggregatedWindow = new AggregatedWindow (sumOfOperations, beginningTimestamp, endTimestamp); try {writer.write (objectMapper.writeValueAsString (aggregatedWindow)); writer.newLine (); writer.flush (); } catch (IOException e) {logger.error ("Nepodarilo sa zapísať údaje do súboru.", e); }}} // verejný konštruktor a ďalšie metódy}
Upozorňujeme, že výstup nemusíme deklarovať, pretože to bude posledná skrutka v našej topológii
6.5. Spustenie topológie
Nakoniec môžeme všetko spojiť a spustiť našu topológiu:
public static void runTopology () {builder TopologyBuilder = nový TopologyBuilder (); Spout random = new RandomNumberSpout (); builder.setSpout ("randomNumberSpout"); Filtrovanie skrutiek = nový FilteringBolt (); builder.setBolt ("filteringBolt", filtrovanie) .shuffleGrouping ("randomNumberSpout"); Agregácia skrutiek = new AggregatingBolt () .withTimestampField ("timestamp") .withLag (BaseWindowedBolt.Duration.seconds (1)) .withWindow (BaseWindowedBolt.Duration.seconds (5)); builder.setBolt ("aggregatingBolt", agregácia) .shuffleGrouping ("filteringBolt"); Reťazec filePath = "./src/main/resources/data.txt"; Bolt file = new FileWritingBolt (filePath); builder.setBolt ("fileBolt", file) .shuffleGrouping ("aggregatingBolt"); Config config = nový Config (); config.setDebug (false); Klaster LocalCluster = nový LocalCluster (); cluster.submitTopology ("Test", config, builder.createTopology ()); }
Ak chcete, aby dátový tok prechádzal každým kúskom v topológii, musíme naznačiť, ako ich spojiť. shuffleGroup nám umožňuje uviesť tieto údaje pre filteringBolt bude pochádzať z randomNumberSpout.
Pre každý Bolt, musíme pridať shuffleGroup ktorá definuje zdroj prvkov pre túto skrutku. Zdrojom prvkov môže byť a Chrlič alebo iný Bolt. A ak nastavíme rovnaký zdroj pre viac ako jednu skrutku, zdroj bude emitovať všetky prvky do každého z nich.
V tomto prípade bude naša topológia používať znak LocalCluster spustiť prácu lokálne.
7. Záver
V tomto tutoriáli sme predstavili Apache Storm, distribuovaný výpočtový systém v reálnom čase. Vytvorili sme výtok, niekoľko skrutiek a spojili sme ich do úplnej topológie.
A ako vždy, všetky ukážky kódu nájdete na GitHub.