Read from MQTT

We are now going to start to implement MQTT readings.

You should still have your docker-compose up and running. If not, you should start it now.

Create a basic application

Let's modify our index.mjs as displayed below (remember to save).

import mqtt from "mqtt";

const config = {
    mqttHost: process.env.MQTT_HOST ?? "127.0.0.1",
    mqttPort: process.env.MQTT_PORT ? parseInt(process.env.MQTT_PORT) : 1883,
    mqttInTopic: process.env.MQTT_IN_TOPIC ?? "fld/+/r/#",
    mqttReconnectPeriod: 5000,
    csvPath: process.env.CSV_PATH ?? "/tmp/log.csv"
};

const messageHandler = async (topic, message) => {
    console.log(topic, message); // To change later
};

const mqttBrokerSetup = (messageHandler) => {
    console.log(`Connecting to mqtt broker at ${config.mqttHost}:${config.mqttPort}`);

    const mqttClient = mqtt.connect(`mqtt://${config.mqttHost}:${config.mqttPort}`, {
        reconnectPeriod: config.mqttReconnectPeriod
    });

    mqttClient.on("connect", function () {
        console.log("connected to mqtt broker");

        // subscribe to configured topics
        mqttClient.subscribe(config.mqttInTopic, function (error) {
            if (error) {
                console.error(`Cannot subscribe to ${config.mqttInTopic}`, error);
            }
        });
    });

    mqttClient.on("message", function (topic, message) {
        try {
            messageHandler(topic, message.toString());
        } catch (error) {
            console.error(error);
        }
    });
};

const main = async () => {
    // connect to mqtt broker and subscribe to topics
    mqttBrokerSetup(messageHandler);
};

main();

Start the application

We need to add the mqtt package to our project.

npm install --save mqtt

And we can now start the modified project.

npm run start

You should see in the console a list of MQTT messages, along with their topic names.

Connecting to mqtt broker at 127.0.0.1:1883
connected to mqtt broker
fld/modbus/r/rnd1 {"value":2185,"ts":1605577425381}
fld/modbus/r/rnd2 {"value":447,"ts":1605577425382}
fld/modbus/r/rnd3 {"value":6,"ts":1605577425383}
fld/modbus/r/nornd {"value":0,"ts":1605576545161}
fld/modbus/r/rnd1 {"value":2190,"ts":1605577426886}
fld/modbus/r/rnd2 {"value":445,"ts":1605577426887}
...

Environment variables

Let's analyze the code we wrote.

const config = {
    mqttHost: process.env.MQTT_HOST ?? "127.0.0.1",
    mqttPort: process.env.MQTT_PORT ? parseInt(process.env.MQTT_PORT) : 1883,
    mqttInTopic: process.env.MQTT_IN_TOPIC ?? "fld/+/r/#",
    mqttReconnectPeriod: 5000,
    csvPath: process.env.CSV_PATH ?? "/tmp/log.csv"
};

We are initializing the config object with default values if no environment variable is defined.

The environment variables are the primary mechanism in use in IOhubTM to provide configuration to the containers.

Modify the main function adding the environment variables log.

const main = async () => {
    // show configuration in use
    console.log("Actual configuration");
    console.table(config);

    // connect to mqtt broker and subscribe to topics
    mqttBrokerSetup(messageHandler);
};

Stop the running application with ctrl-c and execute

export MQTT_PORT=12345

and restart the application

npm run start

You should see the following text in the console

Actual configuration
┌─────────────────────┬────────────────┐
│       (index)       │     Values     │
├─────────────────────┼────────────────┤
│      mqttHost       │  '127.0.0.1'   │
│      mqttPort       │     12345      │
│     mqttInTopic     │  'fld/+/r/#'   │
│ mqttReconnectPeriod │      5000      │
│       csvPath       │ '/tmp/log.csv' │
└─────────────────────┴────────────────┘
Connecting to mqtt broker at 127.0.0.1:12345

with mqttPort overridden by the value in MQTT_PORT.

If you have the wrong port provided, the application will no longer connect to the MQTT broker.

ctrl-c again to stop the application.

Run export MQTT_PORT=1883 to fix it (or just unset the variable and use the default value with unset MQTT_PORT).

and restart the application

npm run start

and you should be back to normal.

Explain the MQTT messages

Each message we read from the MQTT broker has a predefined format, explained below.

MQTT Topics

You can see the topic format in the console.

Each MQTT incoming message from field drivers is in a topic with the following format:

fld/<protocol>/r/<address>

Using the topic name, you can find out what information you are getting.

MQTT Values

You can see the value format in the console.

Each messaging coming from field drivers is a JSON object with fields value (containing the actual value) and ts (the Unix timestamp in milliseconds, of the reading event).

Parse MQTT messages

We can now extract the protocol and value portions from the topic name. We will use those values as CSV columns, together with the value and timestamp.

Add the function messageParser

const messageParser = (topic, message) => {
    // extract protocol and measurement
    const topicRegexp = /^([^\/]+)\/([^\/]+)\/{0,1}.*\/([^\/]+)$/;
    const match = topicRegexp.exec(topic);
    if (!match || match.length < 4) {
        console.error("bad topic, cannot extract protocol");
        throw new Error("bad topic, cannot extract protocol");
    }
    const protocol = match[2];
    const measurement = match[3];

    // create object with all fields
    try {
        const messageObject = JSON.parse(message);
        return { ...messageObject, protocol, measurement };
    } catch (error) {
        console.error("Cannot parse message `${message}` as a valid json", error);
        throw new Error("Cannot parse message as a valid json");
    }
};

We can rewrite messageHandler, now using messageParser.

const messageHandler = async (topic, message) => {
    try {
        // parse message from string message received
        const parsedMessage = messageParser(topic, message);
        console.table(parsedMessage);

        // save to csv
        // TODO
    } catch (error) {
        console.log(error);
    }
};

Restart the application and you should get a series of formatted messages like the one below.

(index)Values
value3
ts1605578777757
protocol'modbus'
measurement'rnd3'

We now have all the information to save into CSV columns.