Úvod do RSocket

1. Úvod

V tomto výučbe sa najskôr pozrieme na RSocket a na to, ako umožňuje komunikáciu klient-server.

2. Čo je RSocket?

RSocket je binárny komunikačný protokol point-to-point určené na použitie v distribuovaných aplikáciách. V tomto zmysle poskytuje alternatívu k iným protokolom, ako je HTTP.

Úplné porovnanie medzi RSocket a ostatnými protokolmi presahuje rámec tohto článku. Namiesto toho sa zameriame na kľúčovú vlastnosť RSocket: jej modely interakcie.

RSocket poskytuje štyri modely interakcie. Z tohto dôvodu preskúmame každú z nich na príklade.

3. Závislosti Maven

RSocket potrebuje pre naše príklady iba dve priame závislosti:

 io.rsocket rsocket-jadro 0.11.13 io.rsocket rsocket-transport-netty 0.11.13 

Závislosti rsocket-core a rsocket-transport-netty sú k dispozícii na serveri Maven Central.

Dôležitá poznámka je, že knižnica RSocket často využíva reaktívne prúdy. The Flux a Mono tohto článku sú použité triedy, takže bude užitočné ich základné porozumenie.

4. Nastavenie servera

Najskôr vytvorme Server trieda:

public class Server {private final Jednorazový server; public Server () {this.server = RSocketFactory.receive () .acceptor ((setupPayload, reactiveSocket) -> Mono.just (nový RSocketImpl ())). transport (TcpServerTransport.create ("localhost", TCP_PORT)) .start () .predplatné (); } public void dispose () {this.server.dispose (); } súkromná trieda RSocketImpl rozširuje AbstractRSocket {}}

Tu používame RSocketFactory nastaviť a počúvať soket TCP. Míňame svoj zvyk RSocketImpl vybavovať žiadosti klientov. Do metódy pridáme metódy RSocketImpl ako ideme.

Ďalej, na spustenie servera ho musíme iba vytvoriť:

Server server = nový server ();

Jedna inštancia servera dokáže spracovať viac pripojení. Výsledkom je, že iba jedna inštancia servera bude podporovať všetky naše príklady.

Keď skončíme, zlikvidovať metóda zastaví server a uvoľní port TCP.

4. Interakčné modely

4.1. Žiadosť / odpoveď

RSocket poskytuje model požiadavka / odpoveď - každá požiadavka dostane jednu odpoveď.

Pre tento model vytvoríme jednoduchú službu, ktorá vráti správu späť klientovi.

Začnime pridaním metódy k nášmu rozšíreniu AbstractRSocket, RSocketImpl:

@Override public Mono requestResponse (užitočné zaťaženie užitočné zaťaženie) {try {return Mono.just (užitočné zaťaženie); // odráža užitočné zaťaženie späť odosielateľovi} catch (Výnimka x) {návrat Mono.error (x); }}

The requestResponse metóda vráti pre každú požiadavku jeden výsledok, ako vidíme na Mono typ odpovede.

Užitočné zaťaženie je trieda, ktorá obsahuje obsah správy a metadáta. Používajú ho všetky modely interakcie. Obsah užitočného zaťaženia je binárny, ale existujú aj podporné metódy String- na základe obsahu.

Ďalej môžeme vytvoriť našu triedu klientov:

verejná trieda ReqResClient {súkromná konečná zásuvka RSocket; public ReqResClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } public String callBlocking (reťazec reťazca) {návrat soketu .requestResponse (DefaultPayload.create (reťazec)). mapa (Payload :: getDataUtf8) .block (); } public void dispose () {this.socket.dispose (); }}

Klient používa RSocketFactory.connect () metóda na zahájenie soketového spojenia so serverom. Používame requestResponse metóda na sokete na odoslanie užitočného zaťaženia na server.

Naše užitočné zaťaženie obsahuje String prešiel do klienta. Keď Mono príde odpoveď môžeme použiť getDataUtf8 () metóda prístupu k String obsah odpovede.

Na záver môžeme spustiť test integrácie, aby sme videli akciu / požiadavku v akcii. Pošleme a String na server a overte si to isté String sa vracia:

@Test public void whenSendingAString_thenRevceiveTheSameString () {ReqResClient client = new ReqResClient (); Reťazcový reťazec = "Hello RSocket"; assertEquals (string, client.callBlocking (string)); client.dispose (); }

4.2. Ohnivé a zabudni

S modelom „oheň a zabudni“ klient nedostane zo servera žiadnu odpoveď.

V tomto príklade bude klient odosielať simulované merania na server v 50ms intervaloch. Server zverejní merania.

Pridajme na náš server obslužnú rutinu typu fire-and-forget RSocketImpl trieda:

@Override public Mono fireAndForget (užitočné zaťaženie) {try {dataPublisher.publish (užitočné zaťaženie); // preposlať návratnosť užitočného zaťaženia Mono.empty (); } catch (Výnimka x) {návrat Mono.error (x); }}

Tento obslužný program vyzerá veľmi podobne ako obslužný program žiadosti / odpovede. Avšak fireAndForget vracia Mono namiesto Mono.

The dataPublisher je príkladom org.reactivestreams.Publisher. Týmto sprístupňuje užitočné zaťaženie predplatiteľom. Využijeme to v príklade požiadavky / streamu.

Ďalej vytvoríme klienta typu fire-and-forget:

verejná trieda FireNForgetClient {súkromná konečná zásuvka RSocket; súkromné ​​konečné zoznamové údaje; public FireNForgetClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } / ** Poslať binárnu rýchlosť (float) každých 50ms * / public void sendData () {data = Collections.unmodifiableList (generateData ()); Flux.interval (Duration.ofMillis (50)) .take (data.size ()) .map (this :: createFloatPayload) .flatMap (socket :: fireAndForget) .blockLast (); } // ...}

Nastavenie soketu je úplne rovnaké ako predtým.

The sendData () metóda používa a Flux stream na odoslanie viacerých správ. Pre každú správu vyvoláme socket :: fireAndForget.

Musíme sa prihlásiť na odber Mono odpoveď na každú správu. Ak sa potom zabudneme prihlásiť na odber socket :: fireAndForget nevykoná.

The flatMap operátor zabezpečí Neplatný odpovede sa predávajú predplatiteľovi, zatiaľ čo blockLast operátor vystupuje ako predplatiteľ.

Počkáme si do ďalšej časti, aby sme vykonali skúšku ohňom a zabudnutím. V tom okamihu vytvoríme klienta požiadavka / stream na príjem údajov, ktoré boli odoslané klientom typu fire-and-forget.

4.3. Žiadosť / Stream

V modeli request / stream jedna žiadosť môže dostať viac odpovedí. Aby sme to videli v praxi, môžeme stavať na príklade „oheň a zabudni“. Ak to chcete urobiť, požiadajme prúd, aby načítal merania, ktoré sme odoslali v predchádzajúcej časti.

Rovnako ako predtým, začnime pridaním nového poslucháča do RSocketImpl na serveri:

@Override public Flux requestStream (užitočné zaťaženie užitočné zaťaženie) {return Flux.from (dataPublisher); }

The requestStream obsluha vráti a Flux Prúd. Ako si pripomíname z predchádzajúcej časti, fireAndForget handler zverejnil prichádzajúce údaje do dataPublisher. Teraz vytvoríme Flux stream pomocou toho istého dataPublisher ako zdroj udalosti. Týmto spôsobom budú namerané údaje prúdiť asynchrónne z nášho klienta typu „fire-and-forget“ do nášho klienta typu „žiadosť / prúd“.

Ďalej vytvoríme klienta žiadosť / stream:

public class ReqStreamClient {private final RSocket socket; public ReqStreamClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } public Flux getDataStream () {return socket .requestStream (DefaultPayload.create (DATA_STREAM_NAME)) .map (Payload :: getData) .map (buf -> buf.getFloat ()) .onErrorReturn (null); } public void dispose () {this.socket.dispose (); }}

K serveru sa pripájame rovnakým spôsobom ako naši predchádzajúci klienti.

V getDataStream ()používame socket.requestStream () na príjem toku Flux zo servera. Z tohto prúdu extrahujeme Plavák hodnoty z binárnych údajov. Nakoniec sa prúd vráti volajúcemu, čo mu umožní prihlásiť sa na odber a spracovať výsledky.

Teraz poďme otestovať. Overíme spiatočnú cestu z požiaru a zabudnutia vyžiadať / streamovať.

Môžeme tvrdiť, že každá hodnota je prijatá v rovnakom poradí, v akom bola odoslaná. Potom môžeme tvrdiť, že dostávame rovnaký počet hodnôt, ktoré boli odoslané:

@Test public void whenSendingStream_thenReceiveTheSameStream () {FireNForgetClient fnfClient = new FireNForgetClient (); ReqStreamClient streamClient = nový ReqStreamClient (); Zoznam údajov = fnfClient.getData (); List dataReceived = new ArrayList (); Jednorazové predplatné = streamClient.getDataStream () .index () .subscribe (tuple -> {assertEquals ("nesprávna hodnota", data.get (tuple.getT1 (). IntValue ()), tuple.getT2 ()); dataReceived. add (tuple.getT2 ());}, chyba -> LOG.error (err.getMessage ())); fnfClient.sendData (); // ... zbaviť klienta a predplatné assertEquals ("prijatý nesprávny počet dát", data.size (), dataReceived.size ()); }

4.4. Kanál

Model kanála poskytuje obojsmernú komunikáciu. V tomto modeli toky správ prúdia asynchrónne oboma smermi.

Vytvorme jednoduchú simuláciu hry, ktorá to otestuje. V tejto hre sa každá strana kanálu stane hráčom. Počas hry budú títo hráči posielať správy na druhú stranu v náhodných časových intervaloch. Opačná strana bude na správy reagovať.

Najskôr vytvoríme obslužný program na serveri. Rovnako ako predtým, pridávame do RSocketImpl:

@Override public Flux requestChannel (užitočné zaťaženie vydavateľa) {Flux.from (užitočné zaťaženie) .subscribe (gameController :: processPayload); návrat Flux.from (gameController); }

The requestChannel psovod má Užitočné zaťaženie streamy pre vstup aj výstup. The Vydavateľ vstupný parameter je tok užitočných zaťažení prijatých od klienta. Po príchode sa tieto užitočné zaťaženia odovzdajú úradu gameController :: processPayload funkcie.

Ako odpoveď vrátime inú Flux stream späť ku klientovi. Tento prúd je vytvorený z nášho gameController, ktorá je tiež a Vydavateľ.

Tu je zhrnutie GameController trieda:

public class GameController implementuje Publisher {@Override public void subscribe (Subscriber subscriber) {// odosielať správy Payload predplatiteľom v náhodných intervaloch} public void processPayload (Payload payload) {// reagovať na správy od druhého hráča}}

Keď GameController prijme predplatiteľa začne posielať správy tomuto predplatiteľovi.

Ďalej vytvoríme klienta:

verejná trieda ChannelClient {súkromná konečná zásuvka RSocket; súkromná záverečná GameController gameController; public ChannelClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); this.gameController = nový GameController ("klientsky hráč"); } public void playGame () {socket.requestChannel (Flux.from (gameController)) .doOnNext (gameController :: processPayload) .blockLast (); } public void dispose () {this.socket.dispose (); }}

Ako sme videli v našich predchádzajúcich príkladoch, klient sa pripája k serveru rovnakým spôsobom ako ostatní klienti.

Klient si vytvorí vlastnú inštanciu súboru GameController.

Používame socket.requestChannel () poslať naše Užitočné zaťaženie stream na server. Server reaguje vlastným tokom užitočného zaťaženia.

Ako užitočné zaťaženie prijaté zo servera ich odovzdávame nášmu gameController :: processPayload psovod.

V našej hernej simulácii sú klient a server navzájom zrkadlovým obrazom. To znamená, každá strana posiela prúd Užitočné zaťaženie a príjem prúdu Užitočné zaťaženie z druhého konca.

Streamy bežia nezávisle, bez synchronizácie.

Nakoniec spustíme simuláciu v teste:

@Test public void whenRunningChannelGame_thenLogTheResults () {ChannelClient client = new ChannelClient (); client.playGame (); client.dispose (); }

5. Záver

V tomto úvodnom článku sme preskúmali modely interakcie poskytované programom RSocket. Celý zdrojový kód príkladov nájdete v našom úložisku Github.

Nezabudnite sa pozrieť na webovú stránku RSocket, kde nájdete hlbšiu diskusiu. Dobré podklady poskytujú najmä dokumenty FAQ a Motivations.