Úvod do konektorov Kafka

1. Prehľad

Apache Kafka® je distribuovaná streamovacia platforma. V predchádzajúcom tutoriáli sme diskutovali o tom, ako implementovať spotrebiteľov a producentov Kafky pomocou Spring.

V tomto tutoriále sa dozvieme, ako používať konektory Kafka.

Pozrime sa na:

  • Rôzne typy konektorov Kafka
  • Funkcie a režimy Kafka Connect
  • Konfigurácia konektorov pomocou súborov vlastností, ako aj rozhrania REST API

2. Základy konektorov Kafka Connect a Kafka

Kafka Connect je rámec na pripojenie Kafky k externým systémom ako sú databázy, obchody s kľúčmi a hodnotami, hľadané indexy a súborové systémy, pomocou tzv Konektory.

Konektory Kafka sú komponenty pripravené na použitie, ktoré nám môžu pomôcť importovať dáta z externých systémov do tém Kafky a exportovať dáta z tém Kafky do externých systémov. Môžeme použiť existujúce implementácie konektorov pre bežné zdroje údajov a umývadlá alebo implementovať naše vlastné konektory.

A zdrojový konektor zhromažďuje údaje zo systému. Zdrojovými systémami môžu byť celé databázy, tabuľky streamov alebo sprostredkovatelia správ. Konektor zdroja môže tiež zhromažďovať metriky z aplikačných serverov do tém Kafka, čím sa dáta sprístupnia na spracovanie streamu s nízkou latenciou.

A konektor umývadla dodáva dáta z tém Kafky do iných systémov, čo môžu byť indexy ako Elasticsearch, dávkové systémy ako Hadoop alebo akýkoľvek druh databázy.

Niektoré konektory udržuje komunita, zatiaľ čo iné podporuje spoločnosť Confluent alebo jej partneri. Naozaj nájdeme konektory pre najpopulárnejšie systémy, ako sú S3, JDBC a Cassandra, aby sme vymenovali aspoň niektoré.

3. Funkcie

Medzi funkcie Kafka Connect patria:

  • Rámec pre pripojenie externých systémov k Kafke - zjednodušuje vývoj, nasadenie a správu konektorov
  • Distribuované a samostatné režimy - pomáha nám nasadiť veľké klastre využitím distribuovanej povahy Kafky, ako aj nastavení pre vývoj, testovanie a nasadenie v malom rozsahu
  • Rozhranie REST - konektory môžeme spravovať pomocou rozhrania REST API
  • Automatická správa ofsetu - Kafka Connect nám pomáha zvládnuť proces offsetového potvrdenia, čo nám šetrí problémy s manuálnou implementáciou tejto chyby náchylnej časti vývoja konektorov
  • Štandardne distribuované a škálovateľné - Kafka Connect používa existujúci protokol pre správu skupín; môžeme pridať ďalších pracovníkov na zväčšenie klastra Kafka Connect
  • Streamovanie a dávková integrácia - Kafka Connect je ideálne riešenie na premostenie streamovacích a dávkových dátových systémov v súvislosti s existujúcimi možnosťami spoločnosti Kafka.
  • Transformácie - umožňujú nám jednoduché a ľahké úpravy jednotlivých správ

4. Inštalácia

Namiesto použitia obyčajnej distribúcie Kafka si stiahneme Confluent Platform, distribúciu Kafka, ktorú poskytuje spoločnosť Confluent, Inc., ktorá stojí za spoločnosťou Kafka. Confluent Platform prichádza s niektorými ďalšími nástrojmi a klientmi v porovnaní s obyčajnou Kafkou, ako aj s niektorými ďalšími vopred pripravenými konektormi.

Pre náš prípad postačuje edícia Open Source, ktorú nájdete na stránkach Confluent.

5. Rýchly štart Kafka Connect

Pre začiatočníkov si povieme niečo o princípe Kafka Connect, pomocou svojich najzákladnejších konektorov, ktorými sú súbor zdroj konektor a pilník drez konektor.

Confluent Platform je pohodlne dodávaný s oboma týmito konektormi, ako aj s referenčnými konfiguráciami.

5.1. Konfigurácia zdrojového konektora

Pre zdrojový konektor je referenčná konfigurácia k dispozícii na $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties:

name = local-file-source connector.class = FileStreamSource tasks.max = 1 topic = connect-test file = test.txt

Táto konfigurácia má niektoré vlastnosti, ktoré sú spoločné pre všetky zdrojové konektory:

  • názov je užívateľom zadaný názov pre inštanciu konektora
  • konektor.trieda určuje implementačnú triedu, v podstate druh konektora
  • úlohy.max určuje, koľko inštancií nášho zdrojového konektora by malo bežať paralelne, a
  • téma definuje tému, na ktorú má konektor poslať výstup

V tomto prípade máme aj atribút špecifický pre konektor:

  • spis definuje súbor, z ktorého má konektor načítať vstup

Aby to potom fungovalo, vytvorme si základný súbor s určitým obsahom:

echo -e "foo \ nbar \ n"> $ CONFLUENT_HOME / test.txt

Upozorňujeme, že pracovný adresár je $ CONFLUENT_HOME.

5.2. Konfigurácia umývadlového konektora

Pre náš drezový konektor použijeme referenčnú konfiguráciu na $ CONFLUENT_HOME / etc / kafka / connect-file-sink.properties:

name = local-file-sink connector.class = FileStreamSink tasks.max = 1 súbor = test.sink.txt topic = connect-test

Logicky obsahuje úplne rovnaké parametre, aj keď tentokrát konektor.trieda špecifikuje implementáciu konektora umývadla a spis je miesto, kam má konektor zapisovať obsah.

5.3. Worker Config

Nakoniec musíme nakonfigurovať pracovníka Connect, ktorý bude integrovať naše dva konektory a bude pracovať na čítaní zo zdrojového konektora a zápise do konektora drezu.

Na to môžeme použiť $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties:

bootstrap.servers = localhost: 9092 key.converter = org.apache.kafka.connect.json.JsonConverter value.converter = org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable = false value.converter. schemas.enable = false offset.storage.file.filename = / tmp / connect.offsets offset.flush.interval.ms = 10 000 plugin.path = / share / java

Poznač si to plugin.path môže obsahovať zoznam ciest, kde sú k dispozícii implementácie konektorov

Pretože budeme používať konektory dodávané s Kafkou, môžeme nastaviť plugin.path do $ CONFLUENT_HOME / share / java. Pri práci s Windows môže byť potrebné poskytnúť absolútnu cestu.

Pre ostatné parametre môžeme ponechať predvolené hodnoty:

  • bootstrap.servers obsahuje adresy sprostredkovateľov spoločnosti Kafka
  • prevodník kľúčov a hodnota.konvertor definovať triedy prevodníkov, ktoré serializujú a deserializujú údaje, ktoré prúdia zo zdroja do Kafky a potom z Kafky do drezu
  • key.converter.schemas.enable a hodnota.konvertor.schémy.povoliť sú nastavenia špecifické pre prevodník
  • offset.storage.file.filename je najdôležitejším nastavením pri spustení aplikácie Connect v samostatnom režime: definuje, kam má aplikácia Connect ukladať svoje údaje o ofsetu
  • offset.flush.interval.ms definuje interval, v ktorom sa pracovník pokúsi zaviazať kompenzácie úloh

A zoznam parametrov je dosť vyspelý, takže kompletný zoznam nájdete v oficiálnej dokumentácii.

5.4. Kafka Connect v samostatnom režime

A s tým môžeme zahájiť prvé nastavenie konektorov:

$ CONFLUENT_HOME / bin / connect-standalone \ $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-sink. vlastnosti

Najskôr môžeme skontrolovať obsah témy pomocou príkazového riadku:

$ CONFLUENT_HOME / bin / kafka-console-consumer --bootstrap-server localhost: 9092 --topický test pripojenia --od začiatku

Ako vidíme, zdrojový konektor prevzal údaje z test.txt súbor, transformoval na JSON a poslal ho Kafkovi:

{"schema": {"type": "string", "optional": false}, "payload": "foo"} {"schema": {"type": "string", "optional": false}, "payload": "bar"}

A ak sa pozrieme na priečinok $ CONFLUENT_HOME, môžeme vidieť, že súbor test.sink.txt bol vytvorený tu:

mačka $ CONFLUENT_HOME / test.sink.txt foo bar

Keď konektor umývadla extrahuje hodnotu z užitočné zaťaženie atribút a zapíše ho do cieľového súboru, údajov v test.sink.txt má obsah originálu test.txt spis.

Teraz pridajme ďalšie riadky do test.txt.

Keď to urobíme, zistíme, že zdrojový konektor detekuje tieto zmeny automaticky.

Musíme sa len ubezpečiť, že na koniec vložíte nový riadok, inak zdrojový konektor nebude brať do úvahy posledný riadok.

V tomto okamihu zastavme proces pripojenia, pretože začneme pripojenie distribuovaný režim v niekoľkých riadkoch.

6. Pripojte REST API

Doteraz sme všetky konfigurácie robili tak, že sme cez príkazový riadok odovzdávali súbory vlastností. Pretože je však Connect navrhnutý tak, aby fungoval ako služba, je k dispozícii aj REST API.

Predvolene je k dispozícii na // localhost: 8083. Niekoľko koncových bodov:

  • GET / konektory - vráti zoznam všetkých použitých konektorov
  • GET / konektory / {name} - vráti podrobnosti o konkrétnom konektore
  • POST / konektory - vytvorí nový konektor; telo žiadosti by mal byť objekt JSON obsahujúci pole názvu reťazca a pole konfigurácie objektu s konfiguračnými parametrami konektora
  • GET / konektory / {name} / stav - vráti aktuálny stav konektora - vrátane toho, či je spustený, zlyhal alebo bol pozastavený - akému pracovníkovi je priradený, informácie o chybe, ak zlyhal, a stav všetkých jeho úloh
  • DELETE / konektory / {name} - vymaže konektor, ladne zastaví všetky úlohy a vymaže jeho konfiguráciu
  • GET / plug-in pluginy - vráti zoznam konektorových doplnkov nainštalovaných v klastri Kafka Connect

Oficiálna dokumentácia obsahuje zoznam všetkých koncových bodov.

Na vytvorenie nových konektorov v nasledujúcej časti použijeme rozhranie REST API.

7. Kafka Connect v distribuovanom režime

Samostatný režim funguje perfektne pre vývoj a testovanie, ako aj pre menšie nastavenia. Ak však chceme naplno využiť distribuovanú povahu Kafky, musíme spustiť Connect v distribuovanom režime.

Týmto spôsobom sa nastavenia konektorov a metadáta ukladajú v témach Kafka namiesto v systéme súborov. Výsledkom je, že uzly pracovníkov sú skutočne bez štátnej príslušnosti.

7.1. Začína sa pripájať

Referenčnú konfiguráciu pre distribuovaný režim nájdete na $ CONFLUENT_HOME/etc/kafka/connect-distributed.properties.

Parametre sú väčšinou rovnaké ako v prípade samostatného režimu. Existuje iba niekoľko rozdielov:

  • skupina.id definuje názov skupiny klastrov Pripojiť. Hodnota sa musí líšiť od ID skupiny spotrebiteľov
  • offset.storage.topic, config.storage.topic a status.storage.topic definovať témy pre tieto nastavenia. Pre každú tému môžeme tiež definovať faktor replikácie

Oficiálna dokumentácia opäť poskytuje zoznam so všetkými parametrami.

Pripojenie môžeme spustiť v distribuovanom režime nasledovne:

$ CONFLUENT_HOME / bin / connect-distribuovaný $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

7.2. Pridávanie konektorov pomocou rozhrania REST API

Teraz sme v porovnaní so samostatným príkazom na spustenie neprešli ako konfigurácia žiadnou konfiguráciou konektora. Namiesto toho musíme konektory vytvoriť pomocou rozhrania REST API.

Aby sme nastavili náš príklad z minulosti, musíme poslať dve POST požiadavky na // localhost: 8083 / konektory obsahujú nasledujúce štruktúry JSON.

Najskôr musíme vytvoriť telo zdrojového konektora POST ako súbor JSON. Tu to nazveme connect-file-source.json:

{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "tasks.max": 1, "file": "test-distribution.txt", "téma ":" connect-distribuovaný "}}

Všimnite si, ako to vyzerá dosť podobne ako v referenčnom konfiguračnom súbore, ktorý sme použili prvýkrát.

A potom to zverejníme:

curl -d @ "$ CONFLUENT_HOME / connect-file-source.json" \ -H "typ obsahu: application / json" \ -X POST // localhost: 8083 / konektory

Potom urobíme to isté pre konektor umývadla a zavoláme súbor connect-file-sink.json:

{"name": "local-file-sink", "config": {"connector.class": "FileStreamSink", "tasks.max": 1, "file": "test-distribution.sink.txt", "topic": "connect-distribuovaný"}}

A vykonajte POST ako predtým:

curl -d @ $ CONFLUENT_HOME / connect-file-sink.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / konektory

V prípade potreby môžeme overiť, či toto nastavenie funguje správne:

$ CONFLUENT_HOME / bin / kafka-console-consumer --bootstrap-server localhost: 9092 --topické pripojenie distribuované --od začiatku {"schema": {"type": "string", "optional": false}, "payload": "foo"} {"schema": {"type": "string", "optional": false}, "payload": "bar"}

A ak sa pozrieme na priečinok $ CONFLUENT_HOME, môžeme vidieť, že súbor test-distribuovaný.sink.txt bol vytvorený tu:

mačka $ CONFLUENT_HOME / test-distribution.sink.txt foo bar

Po vyskúšaní distribuovaného nastavenia urobme vyčistenie odstránením dvoch konektorov:

curl -X DELETE // localhost: 8083 / konektory / zdroj lokálneho súboru curl -X DELETE // localhost: 8083 / konektory / lokálny súbor-umývadlo

8. Transformácia údajov

8.1. Podporované transformácie

Transformácie nám umožňujú vykonávať jednoduché a ľahké úpravy jednotlivých správ.

Kafka Connect podporuje nasledujúce vstavané transformácie:

  • InsertField - Pridajte pole pomocou statických údajov alebo zaznamenajte metadáta
  • ReplaceField - Filtrovanie alebo premenovanie polí
  • MaskField - Nahraďte pole platnou nulovou hodnotou pre typ (napríklad nula alebo prázdny reťazec)
  • HoistField - Celú udalosť zabaľte ako jedno pole do štruktúry alebo mapy
  • ExtractField - Extrahujte konkrétne pole zo štruktúry a mapy a do výsledkov zahrňte iba toto pole
  • SetSchemaMetadata - Upravte názov alebo verziu schémy
  • TimestampRouter - Upraviť tému záznamu na základe pôvodnej témy a časovej pečiatky
  • RegexRouter - Upravte tému záznamu na základe pôvodnej témy, náhradného reťazca a regulárneho výrazu

Transformácia sa konfiguruje pomocou nasledujúcich parametrov:

  • transformuje - Čiarkami oddelený zoznam aliasov pre transformácie
  • transformuje. $ alias.typ - Názov triedy pre transformáciu
  • transformuje. $ alias. $ transitionSpecificConfig - Konfigurácia pre príslušnú transformáciu

8.2. Aplikácia transformátora

Ak chcete otestovať niektoré transformačné funkcie, pripravme nasledujúce dve transformácie:

  • Najskôr zabalme celú správu ako štruktúru JSON
  • Potom do tejto štruktúry pridáme pole

Pred aplikáciou našich transformácií musíme nakonfigurovať Connect tak, aby používal JSON bez schémy, a to úpravou connect-distribuované.vlastnosti:

key.converter.schemas.enable = nepravdivá hodnota.converter.schemas.enable = false

Potom musíme reštartovať Connect, opäť v distribuovanom režime:

$ CONFLUENT_HOME / bin / connect-distribuovaný $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

Opäť musíme vytvoriť telo zdrojového konektora POST ako súbor JSON. Tu to nazveme connect-file-source-transform.json.

Okrem už známych parametrov pridáme niekoľko riadkov pre dve požadované transformácie:

{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "tasks.max": 1, "file": "test-transformácia.txt", "téma ":" connect-transformácia "," transformuje ":" MakeMap, InsertSource "," transforms.MakeMap.type ":" org.apache.kafka.connect.transforms.HoistField $ Value "," transforms.MakeMap.field ": "line", "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField $ Value", "transforms.InsertSource.static.field": "data_source", "transforms.InsertSource.static.value ":" test-zdroj-súboru "}}

Potom vykonajme POST:

curl -d @ $ CONFLUENT_HOME / connect-file-source-transform.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / konektory

Napíšme niekoľko riadkov k nášmu test-transformácia.txt:

Foo Bar

Ak teraz skontrolujeme pripojiť-transformácia téma, mali by sme dostať nasledujúce riadky:

{"line": "Foo", "data_source": "test-file-source"} {"line": "Bar", "data_source": "test-file-source"}

9. Používanie konektorov Ready

Po použití týchto jednoduchých konektorov sa pozrime na pokročilejšie konektory pripravené na použitie a na spôsob ich inštalácie.

9.1. Kde nájdete konektory

Vopred vyrobené konektory sú k dispozícii z rôznych zdrojov:

  • Niekoľko konektorov je dodávaných s obyčajným serverom Apache Kafka (zdroj a umývadlo pre súbory a konzolu)
  • Niektoré ďalšie konektory sú dodávané s platformou Confluent Platform (ElasticSearch, HDFS, JDBC a AWS S3)
  • Vyskúšajte tiež Confluent Hub, ktorý je akýmsi obchodom s aplikáciami pre konektory Kafka. Počet ponúkaných konektorov neustále rastie:
    • Konfluentné konektory (vyvinuté, testované, zdokumentované a sú spoločnosťou Confluent plne podporované)
    • Certifikované konektory (implementované treťou stranou a certifikované spoločnosťou Confluent)
    • Komunitou vyvinuté a podporované konektory
  • Okrem toho Confluent poskytuje aj stránku konektorov, s niektorými konektormi, ktoré sú tiež k dispozícii v Confluent Hub, ale tiež s niekoľkými ďalšími komunitnými konektormi.
  • A nakoniec sú tu aj predajcovia, ktorí poskytujú konektory ako súčasť svojho produktu. Napríklad Landoop poskytuje streamovaciu knižnicu s názvom Lenses, ktorá obsahuje aj sadu ~ 25 konektorov otvoreného zdroja (mnohé z nich sú tiež krížovo uvedené na iných miestach)

9.2. Inštalácia konektorov z Confluent Hub

Podniková verzia servera Confluent poskytuje skript na inštaláciu konektorov a ďalších komponentov z centra Confluent Hub (skript nie je súčasťou verzie Open Source). Ak používame podnikovú verziu, môžeme konektor nainštalovať pomocou nasledujúceho príkazu:

$ CONFLUENT_HOME / bin / confluent-hub nainštalovať confluentinc / kafka-connect-mqtt: 1.0.0-preview

9.3. Ručná inštalácia konektorov

Ak potrebujeme konektor, ktorý nie je k dispozícii na serveri Confluent Hub, alebo ak máme verziu softvéru Confluent s otvoreným zdrojom, môžeme požadované konektory nainštalovať ručne. Za týmto účelom musíme stiahnuť a rozbaliť konektor, ako aj presunúť zahrnuté libs do priečinka určeného ako plugin.path.

Pre každý konektor by archív mal obsahovať dva pre nás zaujímavé priečinky:

  • The lib priečinok obsahuje konektorovú nádobu, napríklad kafka-connect-mqtt-1.0.0-preview.jar, ako aj niekoľko ďalších pohárov vyžadovaných konektorom
  • The atď priečinok obsahuje jeden alebo viac referenčných konfiguračných súborov

Musíme presunúť lib priečinok do $ CONFLUENT_HOME / share / javaalebo ktorúkoľvek cestu, ktorú sme zadali ako plugin.path v connect-standalone.properties a connect-distribuované.vlastnosti. Môže to mať tiež zmysel premenovať priečinok na niečo zmysluplné.

Môžeme použiť konfiguračné súbory z atď buď ich odkazom pri štarte v samostatnom režime, alebo môžeme iba chytiť vlastnosti a vytvoriť z nich súbor JSON.

10. Záver

V tomto tutoriáli sme sa pozreli na to, ako nainštalovať a používať Kafka Connect.

Pozreli sme sa na typy konektorov, zdrojové aj umývadlové. Pozreli sme sa tiež na niektoré funkcie a režimy, v ktorých môže Connect pracovať. Potom sme skontrolovali transformátory. A nakoniec sme sa dozvedeli, kde sa dá dostať a ako inštalovať vlastné konektory.

Konfiguračné súbory nájdete ako vždy na GitHub.


$config[zx-auto] not found$config[zx-overlay] not found