MQTT klient v Jave

1. Prehľad

V tomto tutoriále uvidíme, ako môžeme pridať správy MQTT do projektu Java pomocou knižníc poskytovaných projektom Eclipse Paho.

2. MQTT základný náter

MQTT (MQ Telemetry Transport) je protokol pre zasielanie správ ktorá bola vyvinutá s cieľom vyriešiť potrebu jednoduchej a ľahkej metódy na prenos údajov do / z nízkoenergetických zariadení, napríklad tých, ktoré sa používajú v priemyselných aplikáciách.

So zvýšenou popularitou zariadení IoT (Internet of Things) zaznamenal MQTT zvýšené používanie, čo viedlo k jeho štandardizácii prostredníctvom OASIS a ISO.

Protokol podporuje jediný vzor správ, a to vzor Publish-Subscribe: každá správa odoslaná klientom obsahuje súvisiacu „tému“, ktorú sprostredkovateľ používa na smerovanie k predplateným klientom. Názvy tém môžu byť jednoduché reťazce ako „oiltemp„Alebo reťazec podobný ceste“motor / 1 / ot“.

Za účelom prijímania správ sa klient prihlási na odber jednej alebo viacerých tém pomocou presného názvu alebo reťazca obsahujúceho jeden z podporovaných zástupných znakov („#“ pre viacúrovňové témy a „+“ pre jednu úroveň).

3. Nastavenie projektu

Aby sme mohli zahrnúť knižnicu Paho do projektu Maven, musíme pridať nasledujúcu závislosť:

 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0 

Najnovšiu verziu modulu knižnice Java Eclipse Paho je možné stiahnuť z Maven Central.

4. Nastavenie klienta

Pri používaní knižnice Paho je prvou vecou, ​​ktorú musíme urobiť, aby sme mohli odosielať a / alebo prijímať správy od sprostredkovateľa MQTT, získať implementáciu IMqttClient rozhranie. Toto rozhranie obsahuje všetky metódy vyžadované aplikáciou na nadviazanie spojenia so serverom, na odosielanie a prijímanie správ.

Paho vychádza z krabice s dvoma implementáciami tohto rozhrania, asynchrónnym (MqttAsyncClient) a synchrónny (MqttClient).V našom prípade sa zameriame na synchrónnu verziu, ktorá má jednoduchšiu sémantiku.

Samotné nastavenie je dvojkrokový proces: najskôr vytvoríme inštanciu súboru MqttClient triedy a potom ho pripojíme k nášmu serveru. Nasledujúca podkapitola podrobne popisuje tieto kroky.

4.1. Vytvorenie nového IMqttClient Inštancia

Nasledujúci úryvok kódu ukazuje, ako vytvoriť nový IMqttClient synchrónna inštancia:

Reťazec publisherId = UUID.randomUUID (). ToString (); IMqttClient publisher = nový MqttClient ("tcp: //iot.eclipse.org: 1883", publisherId);

V tomto prípade, používame najjednoduchší dostupný konštruktor, ktorý berie adresu koncového bodu nášho MQTT brokera a identifikátor klienta, ktorá jednoznačne identifikuje nášho klienta.

V našom prípade sme použili náhodný UUID, takže pri každom spustení sa vygeneruje nový identifikátor klienta.

Paho tiež poskytuje ďalšie konštruktory, ktoré môžeme použiť na prispôsobenie mechanizmu vytrvalosti používaného na ukladanie nepotvrdených správ a / alebo ScheduledExecutorService slúži na spustenie úloh na pozadí vyžadovaných implementáciou protokolového modulu.

Koncový bod servera, ktorý používame, je verejný sprostredkovateľ MQTT hostený v projekte Paho, ktorá umožňuje komukoľvek s pripojením na internet testovať klientov bez potreby akejkoľvek autentifikácie.

4.2. Pripojenie k serveru

Náš novo vytvorený MqttClient inštancia nie je pripojená k serveru. Robíme to tak, že zavoláme jeho pripojiť () metóda, voliteľne absolvovanie a MqttConnectOptions inštancia, ktorá nám umožňuje prispôsobiť niektoré aspekty protokolu.

Tieto možnosti môžeme použiť najmä na odovzdanie ďalších informácií, ako sú bezpečnostné poverenia, režim obnovenia relácie, režim opätovného pripojenia atď.

The MqttConnectionOptions trieda vystavuje tieto možnosti ako jednoduché vlastnosti, ktoré môžeme nastaviť pomocou bežných metód nastavenia. Musíme iba nastaviť vlastnosti požadované pre náš scenár - tie zvyšné budú mať predvolené hodnoty.

Kód použitý na nadviazanie spojenia so serverom zvyčajne vyzerá takto:

Možnosti MqttConnectOptions = nové MqttConnectOptions (); options.setAutomaticReconnect (true); options.setCleanSession (true); options.setConnectionTimeout (10); publisher.connect (možnosti);

Tu definujeme naše možnosti pripojenia tak, aby:

  • Knižnica sa v prípade zlyhania siete automaticky pokúsi znova pripojiť k serveru
  • Zahodí neodoslané správy z predchádzajúceho spustenia
  • Časový limit pripojenia je nastavený na 10 sekúnd

5. Posielanie správ

Odosielanie správ pomocou už pripojeného MqttClient je veľmi priamy. Používame jeden z zverejniť () varianty metód na odoslanie užitočného zaťaženia, ktorým je vždy bajtové pole, na danú témupomocou jednej z nasledujúcich možností kvality služieb:

  • 0 - sémantika „nanajvýš raz“, známa tiež ako „oheň a zabudni“. Túto možnosť použite, keď je strata správy prijateľná, pretože nevyžaduje žiadny druh potvrdenia alebo vytrvalosti
  • 1 - sémantika „aspoň raz“. Túto možnosť použite, ak je strata správy neprijateľná a vaši predplatitelia môžu spracovávať duplikáty
  • 2 - sémantika „presne raz“. Túto možnosť použite, ak je strata správy neprijateľná a vaši predplatitelia nemôžu spracovať duplikáty

V našom vzorovom projekte Senzor teploty motora triedy hrá rolu simulovaného senzora, ktorý vytvára nové čítanie teploty zakaždým, keď ho vyvoláme hovor () metóda.

Táto trieda implementuje Vyvolávateľná rozhranie, aby sme ho mohli ľahko použiť s jedným z ExecutorService implementácie dostupné v java.util.concurrent balenie:

public class EngineTemperatureSensor implementuje Callable {// ... súkromní členovia vynechali public EngineTemperatureSensor (klient IMqttClient) {this.client = klient; } @Override public Void call () vyvolá výnimku {if (! Client.isConnected ()) {return null; } MqttMessage msg = readEngineTemp (); msg.setQos (0); msg.setRetained (true); client.publish (TÉMA, správa); návrat null; } private MqttMessage readEngineTemp () {double temp = 80 + rnd.nextDouble () * 20.0; byte [] užitočné zaťaženie = String.format ("T:% 04.2f", teplota) .getBytes (); vrátiť novú MqttMessage (užitočné zaťaženie); }}

The MqttMessage zapuzdruje samotné užitočné zaťaženie, požadovanú kvalitu služieb a tiež ponechané príznak pre správu. Tento príznak naznačuje sprostredkovateľovi, že by si mal túto správu ponechať, kým ju odberateľ nespotrebuje.

Túto funkciu môžeme použiť na implementáciu správania „posledného známeho dobrého“, takže keď sa nový predplatiteľ pripojí k serveru, okamžite dostane zadržanú správu.

6. Príjem správ

Za účelom prijímania správ od MQTT brokera, musíme použiť jednu z prihlásiť sa na odber () varianty metód, ktoré nám umožňujú špecifikovať:

  • Jeden alebo viac filtrov tém pre správy, ktoré chceme dostávať
  • Súvisiaca QoS
  • Obsluha spätného volania na spracovanie prijatých správ

V nasledujúcom príklade si ukážeme, ako pridať poslucháča správ k existujúcemu IMqttClient napríklad na príjem správ z danej témy. Používame a CountDownLatch ako synchronizačný mechanizmus medzi našim spätným volaním a hlavným vláknom vykonávania, ktorý ho znižuje pri každom príchode novej správy.

Vo vzorovom kóde sme použili iný IMqttClient napríklad na príjem správ. Urobili sme to len preto, aby sme objasnili, ktorý klient čo robí, ale nejde o Paho obmedzenie - ak chcete, môžete rovnakého klienta použiť na publikovanie a prijímanie správ:

CountDownLatch receivedSignal = nový CountDownLatch (10); subscriber.subscribe (EngineTemperatureSensor.TOPIC, (topic, msg) -> {byte [] payload = msg.getPayload (); // ... manipulácia s payloadom vynechaná receiveSignal.countDown ();}); receiveSignal.await (1, TimeUnit.MINUTES);

The prihlásiť sa na odber () variant použitý vyššie trvá - IMqttMessageListener inštancia ako druhý argument.

V našom prípade použijeme jednoduchú funkciu lambda, ktorá spracuje užitočné zaťaženie a zníži počítadlo. Ak v stanovenom časovom okne (1 minúta) nepríde dostatok správ, čakať () metóda spôsobí výnimku.

Pri používaní služby Paho nemusíme výslovne potvrdzovať prijatie správy. Ak sa spätné volanie vráti normálne, Paho predpokladá jeho úspešnú spotrebu a odošle potvrdenie na server.

Ak spätné volanie hodí Výnimka, bude klient odstavený. Toto bude mať za následok stratu všetkých správ odosielaných s úrovňou QoS 0.

Správy odosielané s úrovňou QoS úrovne 1 alebo 2 budú serverom znova odosielané, keď sa klient znovu pripojí a znovu sa prihlási na odber témy.

7. Záver

V tomto článku sme demonštrovali, ako môžeme pridať podporu pre protokol MQTT do našich aplikácií Java pomocou knižnice poskytnutej projektom Eclipse Paho.

Táto knižnica spracováva všetky podrobnosti protokolu na nízkej úrovni, čo nám umožňuje sústrediť sa na ďalšie aspekty nášho riešenia a zároveň ponecháva dostatočný priestor na prispôsobenie dôležitých aspektov jeho vnútorných funkcií, ako je napríklad vytrvalosť správ.

Kód uvedený v tomto článku je k dispozícii na stránkach GitHub.


$config[zx-auto] not found$config[zx-overlay] not found