Úvod do MBassador

1. Prehľad

Jednoducho povedané, MBassador je vysoko výkonná zbernica udalostí využívajúca sémantiku publikovania a prihlasovania na odber.

Správy sa vysielajú jednému alebo viacerým kolegom bez predchádzajúceho vedomia počtu predplatiteľov alebo spôsobu, akým správu používajú.

2. Závislosť od Maven

Predtým, ako budeme môcť knižnicu používať, musíme pridať závislosť mbassador:

 net.engio mbassador 1.3.1 

3. Základné spracovanie udalostí

3.1. Jednoduchý príklad

Začneme jednoduchým príkladom zverejnenia správy:

súkromný dispečer MBassador = nový MBassador (); private String messageString; @Before public void prepareTests () {dispatcher.subscribe (this); } @Test public void whenStringDispatched_thenHandleString () {dispatcher.post ("TestString"). Now (); assertNotNull (messageString); assertEquals ("TestString", messageString); } @Handler public void handleString (reťazcová správa) {messageString = správa; } 

Na vrchole tejto testovacej triedy vidíme vytvorenie a MBassador s jeho predvoleným konštruktorom. Ďalej v @ Predtým metóda, voláme prihlásiť sa na odber () a odovzdať odkaz na samotnú triedu.

V prihlásiť sa na odber (), dispečer skontroluje predplatiteľa @Handler anotácie.

A v prvom teste zavoláme dispatcher.post (…). now () na odoslanie správy - ktorej výsledkom je handleString () byť volaný.

Tento počiatočný test demonštruje niekoľko dôležitých konceptov. akýkoľvek Objekt môže byť predplatiteľom, pokiaľ má jednu alebo viac metód anotovaných pomocou @Handler. Predplatiteľ môže mať akýkoľvek počet obslužných programov.

Používame testovacie objekty, ktoré sa predplatia na účely zjednodušenia, ale vo väčšine výrobných scenárov budú dispečeri v rôznych triedach od spotrebiteľov.

Metódy obslužného programu majú iba jeden vstupný parameter - správu a nemôžu hádzať žiadne kontrolované výnimky.

Podobne ako v prihlásiť sa na odber () metóda, metóda post akceptuje akékoľvek Objekt. Toto Objekt sa doručuje predplatiteľom.

Keď je správa zverejnená, doručí sa všetkým poslucháčom, ktorí si predplatili typ správy.

Pridajme ďalší obslužný program správ a odošleme iný typ správy:

private Integer messageInteger; @Test public void whenIntegerDispatched_thenHandleInteger () {dispatcher.post (42). Now (); assertNull (messageString); assertNotNull (messageInteger); assertTrue (42 == messageInteger); } @Handler public void handleInteger (celočíselná správa) {messageInteger = správa; } 

Podľa očakávania, keď expedujemean Celé číslo, handleInteger () sa volá a handleString () nie je. Na odoslanie viac ako jedného typu správy je možné použiť jedného dispečera.

3.2. Mŕtve správy

Kam teda smeruje správa, keď pre ňu neexistuje obslužný program? Pridajme nový obslužný program udalostí a potom pošleme tretí typ správy:

private Object deadEvent; @Test public void whenLongDispatched_thenDeadEvent () {dispatcher.post (42L) .now (); assertNull (messageString); assertNull (messageInteger); assertNotNull (deadEvent); assertTrue (deadEvent instanceof Long); assertTrue (42L == (Long) deadEvent); } @Handler public void handleDeadEvent (správa DeadMessage) {deadEvent = message.getMessage (); } 

V tomto teste odosielame a Dlhé namiesto Celé číslo. Ani jedno handleInteger () ani handleString () sa volajú, ale handleDeadEvent () je.

Ak pre správu neexistujú obslužné programy, zabalí sa do a DeadMessage objekt. Keďže sme pridali obslužný program pre Mŕtva správa, zachytávame to.

DeadMessage možno bezpečne ignorovať; ak aplikácia nemusí sledovať mŕtve správy, môže im byť dovolené nikam ísť.

4. Používanie hierarchie udalostí

Posielam String a Celé číslo udalosti sú limitujúce. Vytvorme niekoľko tried správ:

public class Message {} public class AckMessage extends Message {} public class RejectMessage extends Message {int code; // zakladatelia a prijímači}

Máme jednoduchú základnú triedu a dve triedy, ktoré ju rozširujú.

4.1. Posiela sa základná trieda Správa

Začneme s Správa diania:

súkromný dispečer MBassador = nový MBassador (); súkromná správa správy; súkromná AckMessage ackMessage; súkromná RejectMessage rejectMessage; @Before public void prepareTests () {dispatcher.subscribe (this); } @Test public void whenMessageDispatched_thenMessageHandled () {dispatcher.post (new Message ()). Now (); assertNotNull (správa); assertNull (ackMessage); assertNull (rejectMessage); } @Handler public void handleMessage (správa správy) {this.message = správa; } @Handler public void handleRejectMessage (správa RejectMessage) {rejectMessage = správa; } @Handler public void handleAckMessage (správa AckMessage) {ackMessage = správa; }

Objavte MBassador - vysoko výkonný autobusový podnikový autobus. To nás obmedzuje na používanie Správy ale pridáva ďalšiu vrstvu bezpečnosti typu.

Keď pošleme a Správa, handleMessage () prijíma to. Ostatní dvaja manipulátori nie.

4.2. Posielanie správy podtriedy

Pošleme a Odmietnuť správu:

@Test public void whenRejectDispatched_thenMessageAndRejectHandled () {dispatcher.post (new RejectMessage ()). Now (); assertNotNull (správa); assertNotNull (rejectMessage); assertNull (ackMessage); }

Keď pošleme a Odmietnuť správu oboje handleRejectMessage () a handleMessage () obdržať to.

Odkedy Odmietnuť správu predlžuje Správa, the Správa obslužný program ju prijal okrem RejectMessage psovod.

Overme toto správanie znakom AckMessage:

@Test public void whenAckDispatched_thenMessageAndAckHandled () {dispatcher.post (new AckMessage ()). Now (); assertNotNull (správa); assertNotNull (ackMessage); assertNull (rejectMessage); }

Rovnako, ako sme očakávali, keď pošleme správu AckMessage, obidve handleAckMessage () a handleMessage () obdržať to.

5. Filtrovanie správ

Usporiadanie správ podľa typu je už výkonná funkcia, ale môžeme ich ešte viac filtrovať.

5.1. Filtrovať podľa triedy a podtriedy

Keď sme zverejnili a Odmietnuť správu alebo AckMessage, udalosť sme dostali v obslužnej rutine udalosti pre konkrétny typ aj v základnej triede.

Tento problém s hierarchiou typov môžeme vyriešiť vytvorením Správa abstrakt a vytvorenie triedy ako napr Generická správa. Čo však v prípade, že tento luxus nemáme?

Môžeme použiť filtre správ:

súkromná správa baseMessage; subMessage súkromnej správy; @ Test public void whenMessageDispatched_thenMessageFiltered () {dispatcher.post (new Message ()). Now (); assertNotNull (baseMessage); assertNull (subMessage); } @Test public void whenRejectDispatched_thenRejectFiltered () {dispatcher.post (new RejectMessage ()). Now (); assertNotNull (subMessage); assertNull (baseMessage); } @Handler (filters = {@Filter (Filters.RejectSubtypes.class)}) public void handleBaseMessage (správa správy) {this.baseMessage = správa; } @Handler (filters = {@Filter (Filters.SubtypesOnly.class)}) public void handleSubMessage (správa správy) {this.subMessage = správa; }

The filtre parameter pre @Handler anotácia akceptuje a Trieda ktorý realizuje IMessageFilter. Knižnica ponúka dva príklady:

The Filtre.RejectSubtypes robí, ako naznačuje jeho názov: odfiltruje všetky podtypy. V tomto prípade to vidíme Odmietnuť správu nie je spracovaný používateľom handleBaseMessage ().

The Filtre. Iba podtypy robí aj to, ako naznačuje jeho názov: odfiltruje všetky základné typy. V tomto prípade to vidíme Správa nie je spracovaný používateľom handleSubMessage ().

5.2. IMessageFilter

The Filtre.RejectSubtypes a Filtre. Iba podtypy obaja realizujú IMessageFilter.

RejectSubTypes porovnáva triedu správy s jej definovanými typmi správ a na rozdiel od akýchkoľvek podtried umožňuje iba správy, ktoré sa rovnajú jednému z jej typov.

5.3. Filtrovať podľa podmienok

Našťastie existuje ľahší spôsob filtrovania správ. MBassador podporuje podmnožinu výrazov Java EL ako podmienky pre filtrovanie správ.

Poďme filtrovať a String správa na základe jej dĺžky:

private String testString; @Test public void whenLongStringDispatched_thenStringFiltered () {dispatcher.post ("foobar!"). Now (); assertNull (testString); } @Handler (condition = "msg.length () <7") public void handleStringMessage (reťazcová správa) {this.testString = správa; }

„Foobar!“ správa má sedem znakov a je filtrovaná. Pošleme kratšiu String:

 @Test public void whenShortStringDispatched_thenStringHandled () {dispatcher.post ("foobar"). Now (); assertNotNull (testString); }

Teraz má „foobar“ iba šesť znakov a prechádza sa ním.

Náš Odmietnuť správu obsahuje pole s prístupovým bodom. Napíšme na to filter:

súkromná RejectMessage rejectMessage; @Test public void whenWrongRejectDispatched_thenRejectFiltered () {RejectMessage testReject = new RejectMessage (); testReject.setCode (-1); dispatcher.post (testReject) .now (); assertNull (rejectMessage); assertNotNull (subMessage); assertEquals (-1, ((RejectMessage) subMessage) .getCode ()); } @Handler (condition = "msg.getCode ()! = -1") public void handleRejectMessage (RejectMessage rejectMessage) {this.rejectMessage = rejectMessage; }

Opäť tu môžeme vykonať dotaz na metódu na objekte a buď filtrovať správu, alebo nie.

5.4. Zachyťte filtrované správy

Podobný DeadEvents, možno budeme chcieť zachytiť a spracovať filtrované správy. Existuje aj vyhradený mechanizmus na zachytávanie filtrovaných udalostí. S filtrovanými udalosťami sa zaobchádza odlišne od „mŕtvych“ udalostí.

Poďme napísať test, ktorý to ilustruje:

private String testString; private FilteredMessagefiltrMessage; súkromná DeadMessage deadMessage; @Test public void whenLongStringDispatched_thenStringFiltered () {dispatcher.post ("foobar!"). Now (); assertNull (testString); assertNotNull (filtrovaná správa); assertTrue (filtrovanéMessage.getMessage () inštancia reťazca); assertNull (deadMessage); } @Handler (condition = "msg.length () <7") public void handleStringMessage (reťazcová správa) {this.testString = správa; } @Handler public void handleFilterMessage (správa FilteredMessage) {this.filteredMessage = správa; } @Handler public void handleDeadMessage (DeadMessage deadMessage) {this.deadMessage = deadMessage; } 

S pridaním a Filtrovaná správa psovod, môžeme sledovať Struny ktoré sú filtrované kvôli svojej dĺžke. The filterMessage obsahuje naše príliš dlhé String zatiaľ čo mŕtva správa zvyšky nulový.

6. Asynchrónne odoslanie a spracovanie správ

Doteraz všetky naše príklady používali synchronné odosielanie správ; keď sme volali post.now () správy boli doručené každému spracovateľovi v rovnakom vlákne, ako sme volali príspevok () od.

6.1. Asynchrónny dispečing

The MBassador.post () vráti SyncAsyncPostCommand. Táto trieda ponúka niekoľko metód, vrátane:

  • teraz () - odosielať správy synchrónne; hovor sa zablokuje, kým nebudú doručené všetky správy
  • asynchrónne () - vykoná zverejnenie správy asynchrónne

Použijme asynchrónne odoslanie v triede vzorky. V týchto testoch použijeme Awaitility na zjednodušenie kódu:

súkromný dispečer MBassador = nový MBassador (); private String testString; private AtomicBoolean ready = nový AtomicBoolean (false); @Test public void whenAsyncDispatched_thenMessageReceived () {dispatcher.post ("foobar"). Asynchronously (); await (). untilAtomic (pripravený, rovnáTo (pravda)); assertNotNull (testString); } @Handler public void handleStringMessage (reťazcová správa) {this.testString = správa; ready.set (true); }

Voláme asynchrónne () v tomto teste a použite znak AtomicBoolean ako vlajka s čakať () čakať na doručovacie vlákno na doručenie správy.

Ak hovor okomentujeme čakať (), riskujeme zlyhanie testu, pretože ho kontrolujeme testString pred dokončením dodávacieho vlákna.

6.2. Asynchrónne vyvolanie obslužného programu

Asynchrónne odoslanie umožňuje poskytovateľovi správ vrátiť sa k spracovaniu správy skôr, ako sú správy doručené každému obslužnému programu, ale napriek tomu zavolá každého obslužného programu v poradí a každý obslužný program musí čakať na dokončenie predchádzajúceho.

To môže viesť k problémom, ak jeden obslužný program vykoná nákladnú operáciu.

MBassador poskytuje mechanizmus na asynchrónne vyvolanie obslužnej rutiny. Obslužné nástroje nakonfigurované pre toto prijímajú správy do svojho vlákna:

private integer testInteger; private String invocationThreadName; private AtomicBoolean ready = nový AtomicBoolean (false); @Test public void whenHandlerAsync_thenHandled () {dispatcher.post (42) .now (); await (). untilAtomic (pripravené, rovnáTo (pravda)); assertNotNull (testInteger); assertFalse (Thread.currentThread (). getName (). equals (invocationThreadName)); } @Handler (delivery = Invoke.Asynchronously) public void handleIntegerMessage (celočíselná správa) {this.invocationThreadName = Thread.currentThread (). GetName (); this.testInteger = správa; ready.set (true); }

Obslužné programy môžu požiadať o asynchrónne vyvolanie pomocou nástroja delivery = Invoke.Asynchrónne majetok na Psovod anotácia. Overujeme to v našom teste porovnaním Závit mená v metóde odoslania a obsluha.

7. Prispôsobenie MBassadora

Doteraz sme používali inštanciu MBassador s jej predvolenou konfiguráciou. Správanie dispečera je možné upraviť anotáciami, podobnými tým, ktoré sme videli doteraz; na dokončenie tohto tutoriálu sa budeme venovať ešte niekoľkým.

7.1. Spracovanie výnimiek

Obslužné programy nemôžu definovať kontrolované výnimky. Namiesto toho môže byť dispečer vybavený IPublicationErrorHandler ako argument pre svojho konštruktéra:

verejná trieda MBassadorConfigurationTest implementuje IPublicationErrorHandler {súkromný dispečer MBassador; private String messageString; private Throwable errorCause; @Before public void prepareTests () {dispečer = nový MBassador (toto); dispečer.subscribe (this); } @Test public void whenErrorOccurs_thenErrorHandler () {dispatcher.post ("Chyba"). Now (); assertNull (messageString); assertNotNull (errorCause); } @Test public void whenNoErrorOccurs_thenStringHandler () {dispatcher.post ("Chyba"). Now (); assertNull (errorCause); assertNotNull (messageString); } @Handler public void handleString (String message) {if ("Error" .equals (message)) {throw new Error ("BOOM"); } messageString = správa; } @Override public void handleError (chyba PublicationError) {errorCause = error.getCause (). GetCause (); }}

Kedy handleString () hodí Chyba, je uložený do errorCause.

7.2. Priorita obsluhy

Manipulanti sa volajú v opačnom poradí ako sú pridané, ale nejde o správanie, na ktoré sa chceme spoľahnúť. Aj napriek možnosti volať v ich vláknach obsluhujúcim pracovníkom možno budeme potrebovať vedieť, v akom poradí budú volaní.

Prioritu obsluhy môžeme nastaviť explicitne:

private LinkedList list = new LinkedList (); @Test public void whenRejectDispatched_thenPriorityHandled () {dispatcher.post (new RejectMessage ()). Now (); // Položky by sa mali vypínať () v poradí s opačnou prioritou assertTrue (1 == list.pop ()); assertTrue (3 == list.pop ()); assertTrue (5 == list.pop ()); } @Handler (priorita = 5) public void handleRejectMessage5 (RejectMessage rejectMessage) {list.push (5); } @Handler (priorita = 3) public void handleRejectMessage3 (RejectMessage rejectMessage) {list.push (3); } @Handler (priorita = 2, rejectSubtypes = true) public void handleMessage (správa rejectMessage) logger.error ("obsluha odmietnutia # 3"); list.push (3); } @Handler (priorita = 0) public void handleRejectMessage0 (RejectMessage rejectMessage) {list.push (1); } 

Obslužné rutiny sa volajú od najvyššej priority po najnižšiu. Pracovníci s predvolenou prioritou, ktorá je nulová, sa volajú poslední. Vidíme, že čísla psovoda pop () vypnuté v opačnom poradí.

7.3. Odmietnite podtypy, jednoduchá cesta

Čo sa stalo handleMessage () vo vyššie uvedenom teste? Nemusíme používať RejectSubTypes.class filtrovať naše podtypy.

RejectSubTypes je boolovský príznak, ktorý poskytuje rovnaké filtrovanie ako trieda, ale s lepším výkonom ako IMessageFilter implementácia.

Stále však musíme použiť implementáciu založenú na filtroch iba na prijímanie podtypov.

8. Záver

MBassador je jednoduchá a priama knižnica na odovzdávanie správ medzi objektmi. Správy môžu byť organizované rôznymi spôsobmi a môžu byť odosielané synchrónne alebo asynchrónne.

Príklad je ako vždy k dispozícii v tomto projekte GitHub.