Úvod do Netty

1. Úvod

V tomto článku sa pozrieme na Netty - asynchrónny rámec sieťových aplikácií riadený udalosťami.

Hlavným účelom Netty je budovanie vysoko výkonných protokolových serverov založených na NIO (alebo prípadne NIO.2) s oddelením a voľným prepojením sieťových a obchodných logických komponentov. Môže implementovať všeobecne známy protokol, napríklad HTTP, alebo váš vlastný špecifický protokol.

2. Základné koncepty

Netty je neblokujúci rámec. To vedie k vysokej priepustnosti v porovnaní s blokovaním IO. Pochopenie neblokujúcich IO je zásadné pre pochopenie základných komponentov Netty a ich vzťahov.

2.1. Kanál

Kanál je základom Java NIO. Predstavuje otvorené spojenie, ktoré umožňuje operácie IO, ako je čítanie a zápis.

2.2. Budúcnosť

Každá operácia IO na a Kanál v Netty neblokuje.

To znamená, že každá operácia sa vráti okamžite po ukončení hovoru. Existuje Budúcnosť rozhranie v štandardnej knižnici Java, ale nie je to vhodné pre Netty účely - môžeme sa opýtať iba na Budúcnosť o dokončení operácie alebo o zablokovaní aktuálneho vlákna, kým sa operácia neskončí.

Preto Netty má svoje vlastné ChannelFuture rozhranie. Môžeme poslať spätné volanie na ChannelFuture ktorá bude vyzvaná po ukončení prevádzky.

2.3. Udalosti a spracovatelia

Netty používa aplikačnú paradigmu založenú na udalostiach, takže potrubím spracovania údajov je reťazec udalostí prechádzajúcich manipulátormi. Udalosti a obslužné programy môžu súvisieť s tokom prichádzajúcich a odchádzajúcich údajov. Prichádzajúce udalosti môžu byť nasledujúce:

  • Aktivácia a deaktivácia kanála
  • Prečítajte si prevádzkové udalosti
  • Výnimočné udalosti
  • Udalosti používateľov

Odchádzajúce udalosti sú jednoduchšie a vo všeobecnosti súvisia s otvorením / zatvorením spojenia a zápisom / vyprázdnením údajov.

Sieťové aplikácie pozostávajú z niekoľkých sieťových a aplikačných logických udalostí a ich obslužných rutín. Základné rozhrania pre obslužné rutiny udalostí kanála sú ChannelHandler a jeho predkovia ChannelOutboundHandler a ChannelInboundHandler.

Netty poskytuje obrovskú hierarchiu implementácií ChannelHandler. Za zmienku stojí adaptéry, ktoré sú iba prázdnymi implementáciami, napr. ChannelInboundHandlerAdapter a ChannelOutboundHandlerAdapter. Tieto adaptéry by sme mohli rozšíriť, keď potrebujeme spracovať iba podmnožinu všetkých udalostí.

Existuje tiež veľa implementácií konkrétnych protokolov, ako je HTTP, napr. HttpRequestDecoder, HttpResponseEncoder, HttpObjectAggregator. Bolo by dobré sa s nimi zoznámiť v Nettyho Javadocu.

2.4. Kodéry a dekodéry

Keď pracujeme so sieťovým protokolom, musíme vykonať serializáciu a deserializáciu údajov. Za týmto účelom Netty predstavuje špeciálne rozšírenia ChannelInboundHandler pre dekodéry ktoré sú schopné dekódovať prichádzajúce dáta. Základná trieda väčšiny dekodérov je ByteToMessageDecoder.

Na kódovanie odchádzajúcich údajov má Netty prípony ChannelOutboundHandler zavolal kódovače. MessageToByteEncoder je základom pre väčšinu implementácií kódovacieho zariadenia. Správu môžeme previesť z bajtovej sekvencie na objekt Java a naopak pomocou kódovacích a dekódovacích prostriedkov.

3. Ukážka serverovej aplikácie

Vytvorme projekt predstavujúci jednoduchý protokolový server, ktorý prijme požiadavku, vykoná výpočet a odošle odpoveď.

3.1. Závislosti

Najskôr musíme zabezpečiť závislosť od Netty v našom pom.xml:

 io.netty netty-all 4.1.10. Konečné 

Najnovšiu verziu nájdeme na serveri Maven Central.

3.2. Dátový model

Trieda údajov žiadosti by mala nasledujúcu štruktúru:

verejná trieda RequestData {private int intValue; private String stringValue; // štandardné getre a setre}

Predpokladajme, že server prijme požiadavku a vráti intValue vynásobené 2. Odpoveď by mala hodnotu single int:

verejná trieda ResponseData {private int intValue; // štandardné getre a setre}

3.3. Vyžiadajte si dekodér

Teraz musíme pre naše protokolové správy vytvoriť kodéry a dekodéry.

Je potrebné poznamenať, že Netty pracuje s prijímacou pamäťou socketu, ktorý je reprezentovaný nie ako rad, ale ako skupina bajtov. To znamená, že náš prichádzajúci obslužný program je možné zavolať, keď server neprijme úplnú správu.

Pred spracovaním sa musíme ubezpečiť, že sme dostali celú správu a existuje veľa spôsobov, ako to urobiť.

Najskôr môžeme vytvoriť dočasný ByteBuf a pripojte k nej všetky prichádzajúce bajty, kým nezískame požadovaný počet bajtov:

verejná trieda SimpleProcessingHandler rozširuje ChannelInboundHandlerAdapter {private ByteBuf tmp; @Override public void handlerAdded (ChannelHandlerContext ctx) {System.out.println ("Bol pridaný obslužný program"); tmp = ctx.alloc (). buffer (4); } @Override public void handlerRemoved (ChannelHandlerContext ctx) {System.out.println ("Handler odstranený"); tmp.release (); tmp = null; } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) {ByteBuf m = (ByteBuf) msg; tmp.writeBytes (m); m.vydanie (); if (tmp.readableBytes ()> = 4) {// spracovanie žiadosti RequestData requestData = new RequestData (); requestData.setIntValue (tmp.readInt ()); ResponseData responseData = new ResponseData (); responseData.setIntValue (requestData.getIntValue () * 2); ChannelFuture future = ctx.writeAndFlush (responseData); future.addListener (ChannelFutureListener.CLOSE); }}}

Vyššie uvedený príklad vyzerá trochu čudne, ale pomáha nám pochopiť, ako funguje Netty. Každá metóda nášho obslužného programu sa volá, keď dôjde k jeho zodpovedajúcej udalosti. Po pridaní obslužnej rutiny teda inicializujeme vyrovnávaciu pamäť, vyplníme ju údajmi o prijatí nových bajtov a začneme ju spracúvať, keď získame dostatok údajov.

Zámerne sme nepoužili a hodnota reťazca - dekódovanie takýmto spôsobom by bolo zbytočne zložité. Preto Netty poskytuje užitočné triedy dekodérov, ktoré sú implementáciami ChannelInboundHandler: ByteToMessageDecoder a ReplayingDecoder.

Ako sme si poznamenali vyššie, pomocou Netty môžeme vytvoriť kanál na spracovanie kanálov. Môžeme teda dať náš dekodér ako prvý obslužný program a spracovateľ logiky spracovania môže prísť za ním.

Ďalej sa zobrazuje dekodér pre RequestData:

verejná trieda RequestDecoder rozširuje ReplayingDecoder {private final Charset charset = Charset.forName ("UTF-8"); @Override chránený void decode (ChannelHandlerContext ctx, ByteBuf in, List out) vyvolá Exception {RequestData data = new RequestData (); data.setIntValue (in.readInt ()); int strLen = in.readInt (); data.setStringValue (in.readCharSequence (strLen, charset) .toString ()); out.add (údaje); }}

Myšlienka tohto dekodéra je dosť jednoduchá. Používa implementáciu ByteBuf čo spôsobí výnimku, keď v pamäti nie je dostatok údajov na operáciu čítania.

Keď sa zachytí výnimka, vyrovnávacia pamäť sa previnie na začiatok a dekodér čaká na novú časť údajov. Dekódovanie sa zastaví, keď von zoznam nie je prázdny po dekódovať exekúcia.

3.4. Kodér odpovedí

Okrem dekódovania RequestData musíme správu zakódovať. Táto operácia je jednoduchšia, pretože keď dôjde k operácii zápisu, máme úplné údaje o správe.

Môžeme zapisovať údaje do Kanál v našom hlavnom obslužnom programe alebo môžeme oddeliť logiku a vytvoriť obslužný program rozširujúci sa MessageToByteEncoder ktorý zachytí zápis ResponseData prevádzka:

verejná trieda ResponseDataEncoder rozširuje MessageToByteEncoder {@Override chránené neplatné kódovanie (ChannelHandlerContext ctx, ResponseData msg, ByteBuf out) vyvolá výnimku {out.writeInt (msg.getIntValue ()); }}

3.5. Vyžiadajte si spracovanie

Pretože sme dekódovanie a kódovanie vykonávali v samostatných obslužných programoch, bolo treba zmeniť naše ProcessingHandler:

verejná trieda ProcessingHandler rozširuje ChannelInboundHandlerAdapter {@Override public void channelRead (ChannelHandlerContext ctx, Object msg) vyvolá výnimku {RequestData requestData = (RequestData) msg; ResponseData responseData = new ResponseData (); responseData.setIntValue (requestData.getIntValue () * 2); ChannelFuture future = ctx.writeAndFlush (responseData); future.addListener (ChannelFutureListener.CLOSE); System.out.println (requestData); }}

3.6. Bootstrap servera

Teraz to dajme dohromady a spustime náš server:

public class NettyServer {private int port; // konštruktor public static void main (String [] args) vyvolá výnimku {int port = args.length> 0? Integer.parseInt (args [0]); : 8080; nový NettyServer (port) .run (); } public void run () vyvolá výnimku {EventLoopGroup bossGroup = new NioEventLoopGroup (); EventLoopGroup workerGroup = new NioEventLoopGroup (); skúste {ServerBootstrap b = nový ServerBootstrap (); b.group (bossGroup, workerGroup) .channel (NioServerSocketChannel.class) .childHandler (new ChannelInitializer () {@Override public void initChannel (SocketChannel ch) vyvolá výnimku {ch.pipeline (). addLast (nový RequestDecoder (), nový ResponseData) (), nový ProcessingHandler ());}}). option (ChannelOption.SO_BACKLOG, 128) .childOption (ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind (port) .sync (); f.channel (). closeFuture (). sync (); } nakoniec {workerGroup.shutdownGracefully (); bossGroup.shutdownGracefully (); }}}

Podrobnosti o triedach použitých vo vyššie uvedenom príklade bootstrap servera nájdete v ich Javadoc. Najzaujímavejšou časťou je tento riadok:

ch.pipeline (). addLast (nový RequestDecoder (), nový ResponseDataEncoder (), nový ProcessingHandler ());

Tu definujeme prichádzajúce a odchádzajúce obslužné rutiny, ktoré budú spracovávať požiadavky a výstup v správnom poradí.

4. Klientská aplikácia

Klient by mal vykonávať reverzné kódovanie a dekódovanie, takže musíme mať a RequestDataEncoder a ResponseDataDecoder:

public class RequestDataEncoder rozširuje MessageToByteEncoder {private final Charset charset = Charset.forName ("UTF-8"); @Override chránené void encode (ChannelHandlerContext ctx, RequestData msg, ByteBuf out) vyvolá Exception {out.writeInt (msg.getIntValue ()); out.writeInt (msg.getStringValue (). dĺžka ()); out.writeCharSequence (msg.getStringValue (), znaková sada); }}
public class ResponseDataDecoder extends ReplayingDecoder {@Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {ResponseData data = new ResponseData (); data.setIntValue (in.readInt ()); out.add (údaje); }}

Musíme tiež definovať a ClientHandler ktorý odošle požiadavku a prijme odpoveď zo servera:

verejná trieda ClientHandler rozširuje ChannelInboundHandlerAdapter {@Override public void channelActive (ChannelHandlerContext ctx) vyvolá výnimku {RequestData msg = new RequestData (); msg.setIntValue (123); msg.setStringValue ("celá práca a žiadna hra robí z Jacka nudného chlapca"); ChannelFuture future = ctx.writeAndFlush (správa); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println ((ResponseData) msg); ctx.close (); }}

Teraz poďme bootstrap klienta:

public class NettyClient {public static void main (String [] args) hodí Exception {String host = "localhost"; int port = 8080; EventLoopGroup workerGroup = new NioEventLoopGroup (); skus {Bootstrap b = new Bootstrap (); b.skupina (pracovnaSkupina); b.kanál (NioSocketChannel.class); b.volba (ChannelOption.SO_KEEPALIVE, true); b.handler (nový ChannelInitializer () {@Override public void initChannel (SocketChannel ch) hodí výnimku {ch.pipeline (). addLast (nový RequestDataEncoder (), nový ResponseDataDecoder (), nový ClientHandler ());}}); ChannelFuture f = b.connect (hostiteľ, port) .sync (); f.channel (). closeFuture (). sync (); } nakoniec {workerGroup.shutdownGracefully (); }}}

Ako vidíme, s bootstrappingom servera je veľa spoločných vecí.

Teraz môžeme spustiť hlavnú metódu klienta a pozrieť sa na výstup z konzoly. Podľa očakávania sme sa dočkali ResponseData s intValue rovná sa 246.

5. Záver

V tomto článku sme mali rýchly úvod do Netty. Ukázali sme jeho základné komponenty ako napr Kanál a ChannelHandler. Tiež sme vytvorili jednoduchý neblokujúci protokolový server a klienta pre tento server.

Všetky vzorky kódu sú ako vždy k dispozícii na GitHub.


$config[zx-auto] not found$config[zx-overlay] not found