airasoul.

The Internet Of Things, With RabbitMQ, Node.Js, MQTT And AMQP

Cover Image for The Internet Of Things, With RabbitMQ, Node.Js, MQTT And AMQP

Introduction

Ths is the second post in a 2 part series, where I explore how to setup a dockerised RabbitMQ instance with MQTT support and outline how to talk to devices/brokers with MQTT and AMQP in a polyglot manner.

Part one can be found here:
The Internet of things, Dockerising RabbitMQ and MQTT

Source code can be found here:
the-internet-of-things-rabbitmq-mqtt

In this post

  • A Polyglot Setup
  • RabbitMQ and MQTT
  • Creating an MQTT Publisher
  • Creating an MQTT Subscriber
  • Mapping MQTT to AMQP
  • Creating an AMQP Publisher
  • Creating an AMQP Subscriber

A Polyglot Setup

In a previous post we explained how to run an MQTT enabled RabbitMQ instance in a docker container.

Now we have RabbitMQ up and running; lets take a look at the below snippets which will allow polyglot communication between AMQP and MQTT. The examples are written in node.js.

Modules used:

In this post we will demonstrate the following scenarios:

  • MQTT publish -> MQTT subscriber
  • MQTT publish -> AMQP subscriber
  • AMQP publish -> AMQP subscriber
  • AMQP publish -> MQTT subscriber

RabbitMQ and MQTT

RabbitMQ supports MQTT via plugin.

MQTT has no concept of queues and exchanges; only topics. RabbitMQ does a pretty good job of mapping MQTT concepts, MQTT uses a topic exchange, and MQTT topics map to routing keys.

Creating an MQTT subscriber

The MQTT subscriber creates a connection on port 1883, this binds to the default amq.topic exchange. As options we pass

{
   clientId: 'mqtt-sub-',
   clean: false
}

Lets break this down.

clientId ensures the queue is not deleted, requires clean:false. The value mqtt-sub- provided is prefixed to the queue name.

clean, controls if queues are auto-deleted. This means that when the client disconnects, any subscriptions it has will remain and any subsequent qos 1 messages will be stored until a subscriber reconnects.

When we subscribe we pass the topic name or routing keyairasoul and a qos: 1.

qos or Quality of Service sets the subscription level, if you do not provide a qos, two queues will be created, qos1 queue and a qos2 queue.

  • qos: 0 or at most once uses a non-durable, auto-delete queue that will be deleted when the client disconnects. A message is sent immediately on return from publish.
  • qos: 1 or at least once, uses durable queues. Delivery is complete when an acknowledgment to the publisher has been received from the broker.

The result of this creates a durable queue called, mqtt-subscription-mqtt-sub-qos1.

We then define a handler to listen to message, this handler receives messages and outputs to console.

    var mqtt = require('mqtt');
    var url  = 'mqtt://localhost:1883';

    var client = mqtt.connect(url, { clientId: 'mqtt-sub-', clean: false });

    client.on('connect', function () {
      client.subscribe('airasoul', { qos: 1 });
    });

    client.on('message', function (topic, message) {
      console.log('received message ',  message.toString());
    });

If you start this subscriber up, you should see the following in the RabbitMQ management console.

$ node mqtt-subscriber.js

As you can see, we have a single queue mqtt-subscription-mqtt-sub-qos1, if you click on this queue, you can see the queue's details. Under details you can see its a durable queue, and under bindings you can see its bound to the default amq.topic exchange, with a routing key airasoul.

Creating an MQTT publisher

The MQTT publisher connects on port 1883 and creates a client which connects to the default amq.topic exchange. When we call publish we pass the topic airasoul. We have to JSON.stringify the message, and we pass the qos level qos: 1. The retain option specified in the documentation for mqtt.js has no relevance in RabbitMQs MQTT implementation.

    var mqtt = require('mqtt');
    var url  = 'mqtt://localhost:1883';

    var payload = {
      deviceId : '8675309'
    };

    var client = mqtt.connect(url);

    client.on('connect', function () {
      client.publish('airasoul', JSON.stringify(payload), { qos: 1 }, function() {
        client.end();
        process.exit();
      });
    });

With the subscriber running; start this publisher, and you will see a message sent via the publisher, on the subscriber terminal.

$ node mqtt-publisher.js

Feel free to play around with this. Stop the subscriber and make sure the queue persists, you can also publish messages without the subscriber running and see these in the management console, under queues. The messages ready section will contain a count of the messages sent. Run the subscriber again, and all of the messages will be consumed.

Mapping MQTT to AMQP

RabbitMQ maps MQTT to a default amq.topic exchange, and generates a queue, mqtt-subscription-mqtt-sub-qos1 based on the options specified. In order for an AMQP subscriber to consume from this queue, or for MQTT to consume from a queue created by an AMQP subscriber, the queues and exchanges need to mimic this MQTT setup.

Creating an AMQP subscriber

The following AMQP subscriber, simply creates a connection, passing as option a url for AMQP on port 5672 and importantly, we set defaultExchangeName: 'amq.topic', which sets the default exchange to a topic exchange.

We create a queue, called airasoul-queue, which is a durable: true queue, this will survive a RabbitMQ restart, autoDelete: false will ensure the queue remains after all subscribers disconnect from it, and its no longer in use. We also bind the queue with a routing key airasoul.

This queue will receive messages sent to the default exchange, with a routing key airasoul.

When we subscribe we set ack: true, prefetchCount: 1, which ensures the sever only sends 1 message at a time; a call to shift(), informs the server to send the next message. The message is simply output to console.

    var amqp = require('amqp');
    var url  = 'amqp://localhost:5672'

    var connection = amqp.createConnection({url: url }, { defaultExchangeName: 'amq.topic' });

    connection.on('ready', function () {
        connection.queue('airasoul-queue', { durable: true, autoDelete: false }, function (q) {
            q.bind('airasoul');

            q.subscribe({ ack: true, prefetchCount: 1 }, function (message) {
                console.log('received message', message);
                q.shift();
            });
        });
    });

If you start this subscriber up, you should see the following in the RabbitMQ management console.

$ node amqp-subscriber.js

As you can see, we have a single queue airasoul-queue, if you click on this queue, you can see the queue's details. Under details you can see its a durable queue, and under bindings you can see its bound to the default amq.topic exchange, with a routing key airasoul.

Creating an AMQP publisher

The AMQP publisher, uses the same connection options, as the subscriber. When we publish a message, all queues that have been bound to the default exchange with a routing key set to airasoul will receive it.

    var amqp = require('amqp');
    var url  = 'amqp://localhost:5672'

    var payload = {
      deviceId : '8675309'
    };

    var connection = amqp.createConnection({url: url},  { defaultExchangeName: 'amq.topic' });

    connection.on('ready', function () {
      connection.publish('airasoul', payload);

      setTimeout(function(){
        connection.disconnect();
        process.exit();
      }, 500);
    });

With the subscriber running; start this publisher, and you will see a message sent via the publisher, on the subscriber terminal.

$ node amqp-publisher.js

Once again, feel free to play around with this; but this time have mqtt-publisher and amqp-publisher publish to amqp-subscriber. Also we can have mqtt-publisher and amqp-publisher publish to mqtt-subscriber.

And there you have it, polyglot messaging with RabbitMQ.