The Internet Of Things, With RabbitMQ, Node.Js, MQTT And AMQP
Introduction
Ths is the second post in a 2 part series, where I explore how to setup a dockerised RabbitMQ instance with MQTT support and outline how to talk to devices/brokers with MQTT and AMQP in a polyglot manner.
Part one can be found here:
- The Internet of things, Dockerising RabbitMQ and MQTT
Source code can be found here:
- the-internet-of-things-rabbitmq-mqtt
In this post
- A Polyglot Setup
- RabbitMQ and MQTT
- Creating an MQTT Publisher
- Creating an MQTT Subscriber
- Mapping MQTT to AMQP
- Creating an AMQP Publisher
- Creating an AMQP Subscriber
A Polyglot Setup
In a previous post we explained how to run an MQTT enabled RabbitMQ instance in a docker container.
Now we have RabbitMQ up and running; lets take a look at the below snippets which will allow polyglot communication between AMQP and MQTT. The examples are written in node.js.
Modules used:
In this post we will demonstrate the following scenarios:
- MQTT publish -> MQTT subscriber
- MQTT publish -> AMQP subscriber
- AMQP publish -> AMQP subscriber
- AMQP publish -> MQTT subscriber
RabbitMQ and MQTT
RabbitMQ supports MQTT via plugin.
MQTT has no concept of queues and exchanges; only topics
. RabbitMQ does a pretty good job of mapping MQTT concepts, MQTT uses a topic exchange, and MQTT topics
map to routing keys.
Creating an MQTT subscriber
The MQTT subscriber creates a connection on port 1883
, this binds to the default amq.topic
exchange. As options we pass
{
clientId: 'mqtt-sub-',
clean: false
}
Lets break this down.
clientId
ensures the queue is not deleted, requires clean:false
. The value mqtt-sub-
provided is prefixed to the queue name.
clean
, controls if queues are auto-deleted. This means that when the client disconnects, any subscriptions it has will remain and any subsequent qos 1 messages will be stored until a subscriber reconnects.
When we subscribe
we pass the topic
name or routing key
, airasoul
and a qos: 1
.
qos
or Quality of Service sets the subscription level, if you do not provide a qos, two queues will be created, qos1
queue and a qos2
queue.
qos: 0
orat most once
uses a non-durable, auto-delete queue that will be deleted when the client disconnects. A message is sent immediately on return from publish.qos: 1
orat least once
, uses durable queues. Delivery is complete when an acknowledgment to the publisher has been received from the broker.
The result of this creates a durable queue called, mqtt-subscription-mqtt-sub-qos1
.
We then define a handler to listen to message
, this handler receives messages and outputs to console.
var mqtt = require('mqtt');
var url = 'mqtt://localhost:1883';
var client = mqtt.connect(url, { clientId: 'mqtt-sub-', clean: false });
client.on('connect', function () {
client.subscribe('airasoul', { qos: 1 });
});
client.on('message', function (topic, message) {
console.log('received message ', message.toString());
});
If you start this subscriber up, you should see the following in the RabbitMQ management console.
$ node mqtt-subscriber.js
As you can see, we have a single queue mqtt-subscription-mqtt-sub-qos1
, if you click on this queue, you can see the queue's details. Under details
you can see its a durable
queue, and under bindings you can see its bound to the default amq.topic
exchange, with a routing key airasoul
.
Creating an MQTT publisher
The MQTT publisher connects on port 1883
and creates a client which connects to the default amq.topic
exchange. When we call publish we pass the topic airasoul
. We have to JSON.stringify
the message, and we pass the qos level qos: 1
. The retain
option specified in the documentation for mqtt.js has no relevance in RabbitMQs MQTT implementation.
var mqtt = require('mqtt');
var url = 'mqtt://localhost:1883';
var payload = {
deviceId : '8675309'
};
var client = mqtt.connect(url);
client.on('connect', function () {
client.publish('airasoul', JSON.stringify(payload), { qos: 1 }, function() {
client.end();
process.exit();
});
});
With the subscriber running; start this publisher, and you will see a message sent via the publisher, on the subscriber terminal.
$ node mqtt-publisher.js
Feel free to play around with this. Stop the subscriber and make sure the queue persists, you can also publish messages without the subscriber running and see these in the management console, under queues. The messages
ready
section will contain a count of the messages sent. Run the subscriber again, and all of the messages will be consumed.
Mapping MQTT to AMQP
RabbitMQ maps MQTT to a default amq.topic
exchange, and generates a queue, mqtt-subscription-mqtt-sub-qos1
based on the options specified. In order for an AMQP
subscriber to consume from this queue, or for MQTT
to consume from a queue created by an AMQP
subscriber, the queues and exchanges need to mimic this MQTT
setup.
Creating an AMQP subscriber
The following AMQP subscriber, simply creates a connection, passing as option a url
for AMQP on port 5672
and importantly, we set defaultExchangeName: 'amq.topic'
, which sets the default exchange to a topic
exchange.
We create a queue, called airasoul-queue
, which is a durable: true
queue, this will survive a RabbitMQ restart, autoDelete: false
will ensure the queue remains after all subscribers disconnect from it, and its no longer in use. We also bind the queue with a routing key airasoul
.
This queue will receive messages sent to the default exchange, with a routing key airasoul
.
When we subscribe we set ack: true, prefetchCount: 1
, which ensures the sever only sends 1 message at a time; a call to shift()
, informs the server to send the next message. The message is simply output to console.
var amqp = require('amqp');
var url = 'amqp://localhost:5672'
var connection = amqp.createConnection({url: url }, { defaultExchangeName: 'amq.topic' });
connection.on('ready', function () {
connection.queue('airasoul-queue', { durable: true, autoDelete: false }, function (q) {
q.bind('airasoul');
q.subscribe({ ack: true, prefetchCount: 1 }, function (message) {
console.log('received message', message);
q.shift();
});
});
});
If you start this subscriber up, you should see the following in the RabbitMQ management console.
$ node amqp-subscriber.js
As you can see, we have a single queue airasoul-queue
, if you click on this queue, you can see the queue's details. Under details
you can see its a durable
queue, and under bindings you can see its bound to the default amq.topic
exchange, with a routing key airasoul
.
Creating an AMQP publisher
The AMQP publisher, uses the same connection options, as the subscriber. When we publish a message, all queues that have been bound to the default exchange with a routing key
set to airasoul
will receive it.
var amqp = require('amqp');
var url = 'amqp://localhost:5672'
var payload = {
deviceId : '8675309'
};
var connection = amqp.createConnection({url: url}, { defaultExchangeName: 'amq.topic' });
connection.on('ready', function () {
connection.publish('airasoul', payload);
setTimeout(function(){
connection.disconnect();
process.exit();
}, 500);
});
With the subscriber running; start this publisher, and you will see a message sent via the publisher, on the subscriber terminal.
$ node amqp-publisher.js
Once again, feel free to play around with this; but this time have mqtt-publisher
and amqp-publisher
publish to amqp-subscriber
. Also we can have mqtt-publisher
and amqp-publisher
publish to mqtt-subscriber
.
And there you have it, polyglot messaging with RabbitMQ.