Úvod do služby Netflix Mantis

1. Prehľad

V tomto článku sa pozrieme na platformu Mantis vyvinutú spoločnosťou Netflix.

Preskúmame hlavné koncepty Mantis vytvorením, spustením a vyšetrením úlohy spracovania prúdu.

2. Čo je Mantis?

Mantis je platforma pre vytváranie aplikácií na spracovanie streamov (pracovných miest). Poskytuje jednoduchý spôsob, ako riadiť zavádzanie a životný cyklus pracovných miest. Navyše uľahčuje alokáciu zdrojov, objavovanie a komunikáciu medzi týmito úlohami.

Preto sa vývojári môžu sústrediť na skutočnú obchodnú logiku, pričom majú podporu a robustná a škálovateľná platforma na vykonávanie ich neblokujúcich aplikácií s veľkým objemom a nízkou latenciou.

Práca Mantis sa skladá z troch odlišných častí:

  • the zdroj, zodpovedný za načítanie údajov z externého zdroja
  • jeden alebo viac etapy, zodpovedný za spracovanie prúdov prichádzajúcich udalostí
  • a a drez ktorý zhromažďuje spracované údaje

Poďme teraz preskúmať každú z nich.

3. Nastavenie a závislosti

Začnime pridaním mantis-runtime a jackson-databind závislosti:

 io.mantisrx mantis-runtime com.fasterxml.jackson.core jackson-databind 

Teraz, keď nastavíme zdroj údajov našej úlohy, implementujme Mantis Zdroj rozhranie:

verejná trieda RandomLogSource implementuje zdroj {@Override public Observable call (Kontextový kontext, index indexu) {návrat Observable.just (Observable .interval (250, TimeUnit.MILLISECONDS) .map (this :: createRandomLogEvent)); } private String createRandomLogEvent (Long tick) {// vygenerovať náhodný reťazec vstupu do protokolu ...}}

Ako vidíme, jednoducho generuje náhodné položky protokolu niekoľkokrát za sekundu.

4. Naše prvé zamestnanie

Poďme teraz vytvoriť úlohu Mantis, ktorá jednoducho zhromažďuje udalosti protokolu z našej RandomLogSource. Neskôr pridáme skupinové a agregačné transformácie pre komplexnejší a zaujímavejší výsledok.

Na začiatok si vytvoríme a LogEvent subjekt:

verejná trieda LogEvent implementuje JsonType {private Long index; súkromná úroveň reťazca; súkromná reťazcová správa; // ...}

Potom pridajme naše TransformLogStage.

Je to jednoduchá etapa, ktorá implementuje rozhranie ScalarComputation a rozdelí položku protokolu na zostavenie LogEvent. Filtruje tiež všetky nesprávne naformátované reťazce:

verejná trieda TransformLogStage implementuje ScalarComputation {@Override public Observable call (Context context, Observable logEntry) {return logEntry .map (log -> log.split ("#")) .filter (parts -> parts.length == 3). mapa (LogEvent :: nový); }}

4.1. Spustenie úlohy

V tomto okamihu máme dostatok stavebných kameňov na zostavenie našej práce Mantis:

verejná trieda LogCollectingJob rozširuje MantisJobProvider {@Override public Job getJobInstance () {return MantisJob .source (new RandomLogSource ()) .stage (new TransformLogStage (), new ScalarToScalar.Config ()) .sink (Sinks.eagerSubscribe (Sinks.eagerSubscribe (Sinks.eagerSubscribe (Sinks.eagerSubscribe (Sinks.eagerSubscribe)) LogEvent :: toJsonString))) .metadata (nové Metadata.Builder (). Build ()) .create (); }}

Poďme sa bližšie pozrieť na našu prácu.

Ako vidíme, rozširuje sa MantisJobProvider. Spočiatku získava údaje z našej RandomLogSource a uplatňuje TransformLogStage k načítaným údajom. Na záver odošle spracované údaje do zabudovaného drezu, ktorý sa dychtivo prihlási na odber a dodáva údaje cez SSE.

Teraz nakonfigurujme našu prácu tak, aby sa lokálne vykonávala pri štarte:

@SpringBootApplication verejná trieda MantisApplication implementuje CommandLineRunner {// ... @Override public void run (String ... args) {LocalJobExecutorNetworked.execute (new LogCollectingJob (). GetJobInstance ()); }}

Spustíme aplikáciu. Uvidíme správu ako:

... Poskytuje moderné umývadlo servera HTTP SSE na porte: 86XX

Pripojme sa teraz k drezu pomocou zvlnenie:

$ curl localhost: 86XX data: {"index": 86, "level": "WARN", "message": "pokus o prihlásenie"} data: {"index": 87, "level": "ERROR", "správa ":" user created "} data: {" index ": 88," level ":" INFO "," message ":" user created "} data: {" index ": 89," level ":" INFO ", "message": "pokus o prihlásenie"} data: {"index": 90, "level": "INFO", "message": "user created"} data: {"index": 91, "level": "CHYBA "," message ":" user created "} data: {" index ": 92," level ":" WARN "," message ":" pokus o prihlásenie "} data: {" index ": 93," level ": "INFO", "správa": "používateľ vytvorený"} ...

4.2. Konfigurácia drezu

Doteraz sme na zhromažďovanie našich spracovaných údajov používali zabudovaný drez. Uvidíme, či môžeme nášmu scenáru pridať väčšiu flexibilitu poskytnutím vlastného umývadla.

Čo ak by sme napríklad chceli filtrovať protokoly podľa správa?

Vytvorme a LogSink ktorý vykonáva drez rozhranie:

verejná trieda LogSink implementuje Sink {@Override public void call (kontextový kontext, PortRequest portRequest, pozorovateľný logEventObservable) {SelfDocumentingSink drez = nový ServerSentEventsSink.Builder () .withEncoder (LogEvent :: toJsonString) .withPredicate (filterByL) (filterByL) ; logEventObservable.subscribe (); sink.call (kontext, portRequest, logEventObservable); } private Predicate filterByLogMessage () {return new Predicate ("filter by message", parameters -> {if (parameters! = null && parameters.containsKey ("filter")) {return logEvent -> logEvent.getMessage (). contains ( parametre.get ("filter"). get (0));} vratit logEvent -> true;}); }}

V tejto implementácii drezu sme nakonfigurovali predikát, ktorý používa filter parameter na načítanie iba protokolov, ktoré obsahujú text nastavený v priečinku filter parameter:

$ curl localhost: 8874? filter = prihlasovacie údaje: {"index": 93, "level": "CHYBA", "správa": "pokus o prihlásenie"} údaje: {"index": 95, "level": "INFO "," message ":" pokus o prihlásenie "} údaje: {" index ": 97," level ":" CHYBA "," správa ":" pokus o prihlásenie "} ...

Poznámka Mantis tiež ponúka výkonný dotazovací jazyk MQL, ktorý je možné použiť na dopytovanie, transformáciu a analýzu údajov o prúdoch spôsobom SQL.

5. Etapové reťazenie

Predpokladajme, že nás teraz zaujíma koľko CHYBA, POZOR, alebo INFO logovacie záznamy, ktoré máme v danom časovom intervale. K tomu pridáme k našej práci ďalšie dve etapy a spojíme ich dohromady.

5.1. Zoskupenie

Najskôr vytvorme a GroupLogStage.

Táto etapa je a ToGroupComputation implementácia, ktorá dostane a LogEvent streamovať dáta z existujúcich TransformLogStage. Potom zoskupí položky podľa úrovne prihlásenia a odošle ich do ďalšej fázy:

verejná trieda GroupLogStage implementuje ToGroupComputation {@Override public Observable call (Kontextový kontext, Pozorovateľný logEvent) {návrat logEvent.map (log -> nový MantisGroup (log.getLevel (), log)); } public static ScalarToGroup.Config config () {return new ScalarToGroup.Config () .description ("Zoskupiť údaje udalosti podľa úrovne") .codec (JacksonCodecs.pojo (LogEvent.class)) .concurrentInput (); }}

Vytvorili sme tiež vlastnú konfiguráciu fázy poskytnutím popisu, kodeku, ktorý sa má použiť na serializáciu výstupu, a umožnili sme, aby metóda volania tejto fázy bežala súbežne pomocou concurrentInput ().

Je potrebné si uvedomiť, že táto fáza je horizontálne škálovateľná. To znamená, že môžeme spustiť toľko inštancií tejto fázy, koľko je potrebné. Za zmienku tiež stojí, že keď sú nasadené v klastri Mantis, táto fáza odosiela údaje do ďalšej fázy, takže všetky udalosti patriace do konkrétnej skupiny pristanú rovnakému pracovníkovi v ďalšej fáze.

5.2. Agregácia

Predtým, ako prejdeme k ďalšej fáze, pridajme najskôr a LogAggregate subjekt:

verejná trieda LogAggregate implementuje JsonType {súkromné ​​konečné celé číslo; súkromná konečná úroveň reťazca; }

Poďme teraz vytvoriť poslednú fázu v reťazci.

Táto etapa sa implementuje GroupToScalarComputation a transformuje prúd logovacích skupín na skalárny LogAggregate. Robí to spočítaním, koľkokrát sa každý typ denníka objaví v streame. Okrem toho má aj LogAggregationDuration parameter, ktorým je možné riadiť veľkosť agregačného okna:

verejná trieda CountLogStage implementuje GroupToScalarComputation {private int duration; @Override public void init (kontextový kontext) {duration = (int) context.getParameters (). Get ("LogAggregationDuration", 1000); } @Override verejné pozorovateľné volanie (kontextový kontext, pozorovateľné mantisGroup) {return mantisGroup .window (duration, TimeUnit.MILLISECONDS) .flatMap (o -> o.groupBy (MantisGroup :: getKeyValue) .flatMap (group -> group.reduce (0, (count, value) -> count = count + 1) .map ((count) -> new LogAggregate (count, group.getKey ())))); } public static GroupToScalar.Config config () {return new GroupToScalar.Config () .description ("sum events for a log level") .codec (JacksonCodecs.pojo (LogAggregate.class)) .withParameters (getParameters ()); } verejny staticky zoznam getParameters () {Zoznam parametre = new ArrayList (); params.add (new IntParameter () .name ("LogAggregationDuration") .description ("veľkosť okna pre agregáciu v milisekundách") .validator (Validators.range (100, 10 000)) .defaultValue (5000) .build ()); návratové parametre; }}

5.3. Nakonfigurujte a spustite úlohu

Teraz zostáva iba nakonfigurovať našu prácu:

verejná trieda LogAggregationJob rozširuje MantisJobProvider {@Override public Job getJobInstance () {return MantisJob .source (new RandomLogSource ()) .stage (new TransformLogStage (), TransformLogStage.stageConfig ()) .stage (new GroupLogStage (), GroupLogStage (), GroupLogStage (), GroupLogStage (), )) .stage (new CountLogStage (), CountLogStage.config ()) .sink (Sinks.eagerSubscribe (Sinks.sse (LogAggregate :: toJsonString))) .metadata (nové Metadata.Builder (). build ()). vytvoriť (); }}

Hneď ako spustíme aplikáciu a vykonáme našu novú úlohu, môžeme vidieť načítanie protokolov každých pár sekúnd:

$ curl localhost: 8133 data: {"count": 3, "level": "ERROR"} data: {"count": 13, "level": "INFO"} data: {"count": 4, "level ":" WARN "} data: {" count ": 8," level ":" ERROR "} data: {" count ": 5," level ":" INFO "} data: {" count ": 7," úroveň ":" UPOZORNENIE "} ...

6. Záver

Ak to zhrnieme, v tomto článku sme videli, čo je Netflix Mantis a na čo sa dá použiť. Ďalej sme sa pozreli na hlavné koncepty, použili sme ich na vytváranie úloh a preskúmali sme vlastné konfigurácie pre rôzne scenáre.

Celý kód je ako vždy k dispozícii na stránkach GitHub.


$config[zx-auto] not found$config[zx-overlay] not found