Connect to MQTT Mosquitto Broker in Node.js

In Node.js, we must create events that are called once our client is created. Node.js is asynchronous, which you will see later, and event driven. Once the client connects, we determine what happens. We do this by creating listeners and defining what their callback method will be when an event is called. Let’s start by installing the necessary tools to run our script then create our MQTT client.

Installing MQTT.js

First, you need to install the MQTT Client using this command:

npm install mqtt –save 

You can also choose to download Command Line tools with this command:

npm install mqtt -g

Connect to MQTT Broker

Let’s create a script called “mqtt-connect.js” that we will be using to connect and send messages from. To connect to the MQTT broker, first we need to create our client. To do so, we need to declare this:

var mqtt = require('mqtt');

Next, we define our client and connect to the MQTT broker. We will use the test Mosquitto broker instance to run our own MQTT broker for connecting. Add this line to call to our broker:

var client = mqtt.connect("mqtt://test.mosquitto.org", {clientId:"mqtt-tester"});

Set MQTT On_connect Event

Our MQTT broker will acknowledge our connect request with a CONNACK message. This reply calls the on_connect() callback method and so let’s create a handle for that method with:

client.on('connect', function() {
    console.log("Connected!”);
});

Now let’s run our script by going back to the terminal and running:

node mqtt-connect.js

You will notice that the cursor remains open on the next line.

Connected!
_

This is because the called our callback method and is now waiting in an event loop. Our script ended but we never notified the broker of exiting. Terminate the script by adding this after logging to the Console:

client.end();
process.exit(0);

This ends the connection with the broker and exits the script with code 0, meaning no error.

Create Publisher

We publish to the MQTT Mosquitto broker by calling the “client.publish()” method. The callback arguments are optional, but we will use our topic and message to print to the Console here. It is recommended to only publish to the broker if you are still connected so we also add a check before publishing:

function publish(topic, message) {
    console.log("publishing: ", message);

    if (client.connected == true) {
        console.log("publishing topic: " + topic + ", message: " + message);
        client.publish(topic, message);
    }
}

You can add a call to you publish() method inside your on_connect() callback like so:

client.on('connect', function() {
    console.log("Is connected ? " + client.connected);
    if (client.connected == true) {
        console.log("publishing message...");
        client.publish("test topic", "test message");
    }
});

But let’s publish on regular intervals using a loop. We need to set an interval timer and also create an array of topics and messages to loop through:

var timerId;
var topic="test topic";
var message="test message";
var message_list = ["message1", "message2", "message3"];
var topic_list = ["topic1", "topic2", "topic3"];
topic_list.forEach((element, index) => {
    timer_id = setInterval(function() {publish(element, message_list[index]);}, 1000);
});

We just created an array of topics and messages, looped through each element, and made a call to our publish() method on one second intervals.

Subscribe to Messages

We can create subscribers by subscribing to all our publishers in order to receive messages. You can subscribe to a single topic, an array of topics, or an Object. We first create our subscribers and make a call to client.subscribe():

client.subscribe(topic);
client.subscribe(topic_list);

You also have the option of choosing to subscribe to specific QoS settings or an array of topics with a particular setting, but we won’t focus on that just yet. You are subscribing to topics but will not receive messages on those topics until you make a call to the handler’s callback method. To create a listener to incoming messages, you must listen to the event ‘message’ using the OnMessageCallback() method containing our topic and message:

client.on('message', function(topic, message) {
    console.log("message is: " + message);
    console.log("topic is: " + topic);
});

Handling Connection Errors

We should add a condition for handling failure to connect events. We need to create a listener for error events using the OnErrorCallback() method. We should exit disgracefully on connection failures, so we do not attempt any publishing or subscribing to a disconnected broker:

client.on('error', function(error) {
    console.log("Unable to connect: " + error);
    process.exit(1);
});

And That’s a Wrap!

We now have a script that can connect to an MQTT broker, subscribe to message topics, publish messages using various topics and QoS’s, and exit on connection failures. I hope you’ve been continuously testing as you went through this guide so you can understand what went wrong, if anything. If not, then no worries! Here is the final script we just made:

var mqtt = require('mqtt');
var msgCount = 0;

// connect to public test.moquitto.org broker
var client = mqtt.connect("mqtt://test.mosquitto.org", {clientId:"mqtt-tester"});

// receive incoming messages
client.on('message', function(topic, message) {
    console.log("message is: " + message);
    console.log("topic is: " + topic);
});

// on_connect
client.on('connect', function() {
    console.log("Is connected ? " + client.connected);
});

// on_error
client.on('error', function(error) {
    console.log("Unable to connect: " + error);
    process.exit(1);
});

// publish messages
function publish(topic, message) {
    console.log("publishing: ", message);

    if (client.connected == true) {
        console.log("publishing topic: " + topic + ", message: " + message);
        client.publish(topic, message);
    }

    msgCount += 1;
    if (msgCount > 2) {
        console.log("message count = 2, end script");
        clearTimeout(timer_id);
        client.end();
        process.exit(0);
    }
}

var timerId;
var topic="test topic";
var message="test message";
var message_list = ["message1", "message2", "message3"];
var topic_list = ["topic1", "topic2", "topic3"];
console.log("subscribing to topics...");
client.subscribe(topic);
client.subscribe(topic_list);
topic_list.forEach((element, index) => {
    timer_id = setInterval(function() {publish(element, message_list[index]);}, 1000);
});
timerId = setInterval(function() {publish(topic, message);}, 1000);
console.log("end of script");

You will notice that the “end of script” Console log displays before the publish and subscribe logs. This is a testament to the asynchronous nature of Node.js! Play around with it and try new method options adding multiple publishers and subscribers to various topics and QoS’s.

Thanks!!!