Spoľahlivé zasielanie správ so skupinami JGroups

1. Prehľad

JGroups je Java API pre spoľahlivú výmenu správ. Je vybavený jednoduchým rozhraním, ktoré poskytuje:

  • flexibilný zásobník protokolov vrátane TCP a UDP
  • fragmentácia a opätovné zostavenie veľkých správ
  • spoľahlivý unicast a multicast
  • detekcia poruchy
  • riadenie prietoku

Rovnako ako mnoho ďalších funkcií.

V tomto tutoriáli vytvoríme jednoduchú aplikáciu na výmenu String správy medzi aplikáciami a dodávanie zdieľaného stavu novým aplikáciám po ich pripojení k sieti.

2. Inštalácia

2.1. Maven závislosť

Musíme do našej pridať jednu závislosť pom.xml:

 org.jgroups jgroups 4.0.10.Final 

Najnovšiu verziu knižnice môžete skontrolovať na serveri Maven Central.

2.2. Siete

Skupiny JG sa pokúsia predvolene použiť protokol IPV6. V závislosti od našej konfigurácie systému to môže mať za následok, že aplikácie nebudú schopné komunikovať.

Aby sme sa tomu vyhli, nastavíme java.net.preferIPv4Stack do pravda pri spustení našich aplikácií tu:

java -Djava.net.preferIPv4Stack = true com.baeldung.jgroups.JGroupsMessenger 

3. JChannels

Naše pripojenie k sieti JGroups je a JChannel. Kanál sa pripája ku klastru a odosiela a prijíma správy, ako aj informácie o stave siete.

3.1. Vytvára sa kanál

Vytvárame a JChannel s cestou k konfiguračnému súboru. Ak vynecháme názov súboru, vyhľadá sa udp.xml v aktuálnom pracovnom adresári.

Vytvoríme kanál s explicitne pomenovaným konfiguračným súborom:

Kanál JChannel = nový JChannel ("src / main / resources / udp.xml"); 

Konfigurácia JGroups môže byť veľmi komplikovaná, ale predvolená konfigurácia UDP a TCP je dostatočná pre väčšinu aplikácií. Súbor pre UDP sme zahrnuli do nášho kódu a použijeme ho pre tento tutoriál.

Viac informácií o konfigurácii prenosu nájdete v príručke JGroups tu.

3.2. Pripojenie kanála

Po vytvorení nášho kanála sa musíme pripojiť ku klastru. Klaster je skupina uzlov, ktoré si vymieňajú správy.

Pripojenie ku klastru vyžaduje názov klastra:

channel.connect ("Baeldung"); 

Prvý uzol, ktorý sa pokúsi pripojiť ku klastru, ho vytvorí, ak neexistuje. Tento proces uvidíme v akcii nižšie.

3.3. Pomenovanie kanála

Uzly sú identifikované menom, aby mohli rovesníci odosielať smerované správy a dostávať oznámenia o tom, kto do klastra vstupuje a odchádza z neho. Skupiny J priradia meno automaticky, alebo si môžeme nastaviť vlastný:

channel.name ("user1");

Tieto názvy nižšie použijeme na sledovanie toho, kedy uzly vstupujú a opúšťajú klaster.

3.4. Zatvára sa kanál

Vyčistenie kanála je nevyhnutné, ak chceme, aby kolegovia dostávali včasné oznámenia o tom, že sme vystúpili.

Uzatvárame a JChannel s jeho blízkou metódou:

channel.close ()

4. Zmeny zobrazenia klastra

Po vytvorení JChannel sme teraz pripravení sledovať stav rovnocenných serverov v klastri a vymieňať si s nimi správy.

JGroups udržuje stav klastra vo vnútri vyhliadka trieda. Každý kanál má jeden vyhliadka siete. Keď sa zobrazenie zmení, zobrazí sa prostredníctvom viewAccepted () zavolaj späť.

Pre tento tutoriál rozšírime ReceiverAdaptor Trieda API, ktorá implementuje všetky metódy rozhrania vyžadované pre aplikáciu.

Je to odporúčaný spôsob implementácie spätných volaní.

Pridajme viewAccepted do našej aplikácie:

public void viewAccepted (View newView) {private View lastView; if (lastView == null) {System.out.println ("Prijaté počiatočné zobrazenie:"); newView.forEach (System.out :: println); } else {System.out.println ("Prijaté nové zobrazenie."); Zoznam newMembers = View.newMembers (lastView, newView); System.out.println ("Noví členovia:"); newMembers.forEach (System.out :: println); Zoznam exMembers = View.leftMembers (lastView, newView); System.out.println ("Opustení členovia:"); exMembers.forEach (System.out :: println); } lastView = newView; } 

Každý vyhliadka obsahuje a Zoznam z Adresa objekty predstavujúce každého člena klastra. JGroups ponúka pohodlné metódy na porovnávanie jedného pohľadu na druhé, ktoré používame na zisťovanie nových alebo ukončených členov klastra.

5. Posielanie správ

Spracovanie správ v JGroups je jednoduché. A Správa obsahuje a bajt pole a Adresa predmety zodpovedajúce odosielateľovi a príjemcovi.

Pre tento tutoriál používame Struny čítať z príkazového riadku, ale je ľahké zistiť, ako by si aplikácia mohla vymieňať iné dátové typy.

5.1. Vysielať správy

A Správa je vytvorený s cieľom a bajtovým poľom; JChannel nastaví pre nás odosielateľa. Ak je cieľ nulový, správu dostane celý klaster.

Prijmeme text z príkazového riadku a pošleme ho do klastra:

System.out.print ("Zadajte správu:"); Reťazec line = in.readLine (). ToLowerCase (); Správa správa = nová správa (null, line.getBytes ()); channel.send (správa); 

Ak spustíme viac inštancií nášho programu a odošleme túto správu (po implementácii príjem () nižšie), dostali by ho všetci, vrátane odosielateľa.

5.2. Blokujeme naše správy

Ak nechceme vidieť naše správy, môžeme si k tomu nastaviť vlastnosť:

channel.setDiscardOwnMessages (true); 

Keď spustíme predchádzajúci test, odosielateľ správy nedostane svoju vysielanú správu.

5.3. Priame správy

Odoslanie priamej správy vyžaduje platný údaj Adresa. Ak máme na mysli uzly podľa názvu, potrebujeme spôsob, ako vyhľadať Adresa. Našťastie máme vyhliadka pre to.

Súčasný vyhliadka je vždy k dispozícii na webe JChannel:

private Voliteľné getAddress (názov reťazca) {Zobraziť view = channel.view (); vrátiť view.getMembers (). stream () .filter (adresa -> name.equals (address.toString ())) .findAny (); } 

Adresa mená sú dostupné prostredníctvom triedy natiahnuť() metóda, takže iba prehľadávame Zoznam členov klastra pre požadované meno.

Takže môžeme z konzoly prijať meno, nájsť priradený cieľ a poslať priamu správu:

Cieľová adresa = null; System.out.print ("Zadajte cieľ:"); Reťazec destinationName = in.readLine (). ToLowerCase (); destination = getAddress (destinationName) .orElseThrow (() -> new Exception ("Destination not found"); Message message = new Message (destination, "Hi there!"); channel.send (message); 

6. Príjem správ

Môžeme posielať správy, pridajme sa teraz a skúsme ich teraz prijať.

Prepíšme to ReceiverAdaptor's metóda prázdneho príjmu:

public void receive (správa správy) {String line = správa prijatá od: "+ message.getSrc () +" do: "+ message.getDest () +" -> "+ message.getObject (); System.out.println (riadok);} 

Pretože vieme, že správa obsahuje a String, môžeme bezpečne prejsť getObject () do System.out.

7. Štátna burza

Keď uzol vstúpi do siete, bude pravdepodobne potrebné načítať informácie o stave klastra. Spoločnosť JGroups na to poskytuje mechanizmus prenosu stavu.

Keď sa uzol pripojí ku klastru, jednoducho zavolá getState (). Klaster zvyčajne získava stav od najstaršieho člena skupiny - koordinátora.

Pridajte k našej aplikácii počet vysielaných správ. Pridáme novú premennú člena a zvýšime ju dovnútra príjem ():

súkromné ​​celé číslo messageCount = 0; public void receive (správa správy) {String line = "Správa prijatá od:" + message.getSrc () + "do:" + message.getDest () + "->" + message.getObject (); System.out.println (riadok); if (message.getDest () == null) {messageCount ++; System.out.println ("Počet správ:" + messageCount); }} 

Skontrolujeme, či a nulový cieľ, pretože ak spočítame priame správy, každý uzol bude mať iné číslo.

Ďalej prepíšeme ďalšie dve metódy v ReceiverAdaptor:

public void setState (vstup InputStream) {try {messageCount = Util.objectFromStream (nový DataInputStream (vstup)); } catch (Výnimka e) {System.out.println ("Chyba stavu deserializácie!"); } System.out.println (messageCount + "je aktuálny počet správ."); } public void getState (výstup OutputStream) vyvolá výnimku {Util.objectToStream (messageCount, nový DataOutputStream (výstup)); } 

Podobne ako správy, prenosy JGroups prenášajú stav ako pole bajtov.

JGroups dodáva InputStream koordinátorovi, aby napísal stav, a OutputStream pre načítanie nového uzla. API poskytuje triedy pohodlia na serializáciu a deserializáciu údajov.

Upozorňujeme, že v produkčnom kóde musí byť prístup k informáciám o stave bezpečný pre vlákna.

Nakoniec pridáme hovor na getState () k nášmu spusteniu po pripojení ku klastru:

channel.connect (clusterName); channel.getState (null, 0); 

getState () prijme cieľ, od ktorého žiada o stav, a časový limit v milisekundách. A nulový cieľ označuje koordinátora a 0 znamená, že časový limit nevyprší.

Keď spustíme túto aplikáciu s dvojicou uzlov a vymeníme si vysielané správy, uvidíme prírastok počtu správ.

Ak potom pridáme tretieho klienta alebo zastavíme a jedného z nich spustíme, uvidíme, ako novo pripojený uzol vytlačí správny počet správ.

8. Záver

V tomto tutoriáli sme pomocou JGroups vytvorili aplikáciu na výmenu správ. Použili sme API na sledovanie, ktoré uzly sa pripojili a opustili klaster, a tiež na prenos stavu klastra do nového uzla, keď sa pripojil.

Ukážky kódu, ako vždy, nájdete na GitHub.


$config[zx-auto] not found$config[zx-overlay] not found