Google Pub/Sub

Description

ezvpn-mqtt-googlepubsub listens to a MQTT broker and sends data to a specific Google Pub/Sub topic. From Google Pub/Sub you can send messages to IOhubTM to write data directly to the field.

Pub/Sub is an asynchronous messaging service that decouples services that produce events from services that process events.

You can use Pub/Sub as messaging-oriented middleware or event ingestion and delivery for streaming analytics pipelines.

Pub/Sub offers durable message storage and real-time message delivery with high availability and consistent performance at scale. Pub/Sub servers run in all Google Cloud regions around the world.

The core concepts is:

  • Topic: A named resource to which messages are sent by publishers ezvpn-mqtt-googlepubsub.

  • Subscription: A named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application. Used to send command to the field.

  • Message: The combination of data and (optional) attributes that a publisher sends to a topic and is eventually delivered to subscribers.

  • Message attribute: A key-value pair that a publisher can define for a message.

Google Pub/Sub Schema

How to use it

ezvpn-mqtt-googlepubsub is a Docker container image pre-configured for communication with the ezvpn-mqtt and Google Pub/Sub service. In order to use it, you need to enable a Google account with Pub/Sub Service, install the ezvpn-mqtt service with a field container, and configure the ezvpn-mqtt-googlepubsub with Google Pub/Sub credential with required data.

All data read from the field will be sent to a specific topic in Google Pub/Sub. From Pub/Sub, if you send a message to the Subscription topic, you can write values to the field. The communication of this container is bidirectional.

To create a data credential, you need to have an account on Google GCP. Check this link to create an account to Google GCP if you don't have it already.

Create a Google Pub/Sub service account

Create a new service account to grant permission to Pub/Sub service.

Create service account

Grant permission using Cloud Service Pub/Sub Agent role.

Create service account role

Create a key in JSON format and download the file to your PC. Use the content of the file for authentication on Google Pub/Sub Service using the Environment variable PUBSUB_AUTHENTICATION

Create key

Now you have successfully created the authentication.

Configure Pub/Sub for inbound communication

To receive messages from IOhubTM to Google Pub/Sub service, you need to create a new topic name to use as an Environment variable in the IOhubTM configuration on Environment variable PUBSUB_TOPIC. For example you can use iohub-fld-topic name.

Create new topic

Now, you need to create a new subscription and bind it to the topic created above. For example, we can use the iohub-fld-sub name and bind it to iohub-fld-topic topic name.

Create new subscription
Create new subscription

Configure Pub/Sub for outbound communication

To send messages from IOhubTM to Google Pub/Sub service. You need to create a new topic name that you will use as Environment variable in IOhubTM configuration on Environment variable PUBSUB_SUBSCRIPTION. For example, you can use the iohub-cmd-topic name.

Create new topic

Now, you need to create a new subscription and bind it to the topic created above. For example, we can use the iohub-cmd-sub name and bind it to iohub-cmd-topic topic name.

Create new subscription
Create new subscription

Now you have created a topic to receive/send data from and to IOhubTM.

Please note, you do not need to open any ports or change any network configuration to communicate from the cloud to your machine!

Pub/Sub public message format

The message is sent to PUBSUB_TOPIC in the following format:

{
    "protocol": "<protocol>",
    "measurement": "<measurement>",
    "value": <value>,
    "ts": <timestamp>
}

e.g.

{
    "protocol": "modbus",
    "measurement": "temp1",
    "value": 1240,
    "ts": 1602887221913
}

Embedded Metadata

If the variable METADATA is defined, it will be sent together with the payload, as a string value.

In the example below, METADATA value is

{
    "company": "ACME",
    "machine_name": "cutter 23",
    "serial_number": "1234567890"
}

and the message sent will be:

{
    "protocol": "modbus",
    "measurement": "temp1",
    "value": 1240,
    "ts": 1602887221913,
    "metadata": {
        "company": "ACME",
        "machine_name": "cutter 23",
        "serial_number": "1234567890"
    }
}

Detached Metadata

To minimize MQTT traffic, it is possible to avoid sending metadata along with every message. The detached metadata will be sent, on container startup, to a specific topic. Each payload message will contain an id that will allow the consumer to map the payload to the metadata.

If USE_METADATA_CHANNEL is true, the following payload will be sent to the topic METADATA_TOPIC:

{
    "id": "<the value of METADATA_ID>",
    "metadata": "<the value of METADATA_FOR_CHANNEL>"
}

Upon each write, the standard payload will be augmented with metadataId (having the value of the environment variable METADATA_ID), resulting in the following payload sent (an example).

{
    "protocol": "modbus",
    "measurement": "temp1",
    "value": 1240,
    "ts": 1602887221913,
    "metadataId": <the value of METADATA_ID>
}

With this example below, from the payload augmented with the metadataId, and considering the standard structure (protocol, measurement), you can get any information you want while post-processing your data. You can apply any structure you like, we will forward anything you write in the metadata information so you can look up the incoming messages.

{
    "id": 123456,
    "metadata": {
        "global": {
            "machineName": "Cutter 123",
            "serialNumber": "A12345"
        },
        "measurements": {
            "modbus": {
                "temp1": {
                    "sensorType": "Temperature",
                    "sensorModel": "PT100",
                    "description": "Outlet flange"
                },
                "temp2": {
                    "sensorType": "Temperature",
                    "sensorModel": "J",
                    "description": "Inlet flange",
                    "unitOfM": "C"
                },
                "inPressure": {
                    "sensorType": "Pressure transducer",
                    "sensorModel": "Gefran ME 150",
                    "description": "Inlet flange"
                }
            },
            "opcua": {
                "mainplc.outkg": {
                    "description": "output kilograms"
                },
                "mainplc.humidity": {
                    "sensorType": "Humidity sensor",
                    "description": "Pellets moisture"
                }
            }
        }
    }
}

Please note that the current behavior will keep working, metadata will be added to the payload, adding the content of the METADATA payload to the payload sent to Pub/Sub. If you do not want an embedded metadata, you will just not populate the METADATA environment variable, and nothing will be added. The two mechanism can be used in parallel, and in such a case you will end up with a payload like the one below (with both metadataId and metadata populated)

{
    "protocol": "modbus",
    "measurement": "temp1",
    "value": 1240,
    "ts": 1602887221913,
    "metadataId": 123456,
    "metadata": {
        "company": "ACME",
        "machine_name": "cutter 23",
        "serial_number": "1234567890"
    }
}

Buffer and Batch mode

You can enable buffer using MQTT_WANT_QOS2 environment variable.

The messages are sent to Google PubSub server individually. If you have a lot of messages to send you can group them using BATCH_MODE functionality.

Pub/Sub subscription message format

Each message sent to the topic supporting the subscription PUBSUB_SUBSCRIPTION, is managed. The message sent must be in JSON format.

The expected JSON format is:

{
   "protocol": "<protocol>",
   "address": "<address>",
   "value": <value>
}

e.g.

{
   "protocol": "modbus",
   "address": "address80",
   "value": 123.45
}

You can develop your application Using Pub/Sub to store data from the field and also send commands to the field. For example, you can create an application to view the temperature in realtime and set the setpoint to the machine.

Troubleshooting

Check incoming data for test on PubSub Web Interface

You can check incoming data from Google Pub/Sub web interface. Click on the topic name in the example iohub-fld-topic, VIEW MESSAGES button, and select the subscription to see the message. Click on PULL to see the incoming messages.

Check incoming data for test on your console

To read the incoming messages on a Google PubSub subscription, you can use our subscriber logger.

Send a command for test

You can send a command from Google Pub/Sub web interface to test communication from Pub/Sub to IOhubTM. Click on the topic name in the example iohub-cmd-topic, + PUBLISH MESSAGE button and click Publish single message. On Message body paste the JSON with your data and click the PUBLISH button.

Environment variables

When you start the container, you can adjust the instance's configuration by passing one or more environment variables to the docker run command.

MQTT connection

  • MQTT_HOST: MQTT host. Defaults to 127.0.0.1
  • MQTT_PORT: MQTT port. Defaults to 1883
  • MQTT_IN_TOPIC: IOhubTM MQTT topic to subscribe (e.g. fld/modbus/r/#). Defaults to fld/+/r/#. Subscribe to multiple topics can be obtained defining each topic on a separated line.
  • MQTT_WANT_QOS2: ask MQTT to upgrade each incoming message on MQTT_IN_TOPIC to QoS 2 (delivery guaranteed). MQTT will honor the request only if persistence is enabled.

PubSub connection

  • PUBSUB_AUTHENTICATION(Required): JSON file key, from the google Pub/Sub service account.
  • PUBSUB_TOPIC: Pub/Sub topic. Destination of messages read from MQTT_IN_TOPIC. If absent, the message is not sent to pubsub.
  • PUBSUB_SUBSCRIPTION: Pub/Sub subscription. Messages read from PUBSUB_SUBSCRIPTION will be interpreted and sent to the topic included in the message to the broker at MQTT_HOST:MQTT_PORT. If absent, no subscription will be activated.

Embedded Metadata

  • METADATA: JSON string. If present, it is added to the standard payload sent to MQTT under the field metadata. Can be any valid JSON type (number, string, boolean, object, array, ...)

Detached Metadata

  • USE_METADATA_CHANNEL: if true, detached metadata is enabled. Defaults to false.
  • METADATA_FOR_CHANNEL: the content of the metadata property, in the payload sent to METADATA_TOPIC. Ignored if USE_METADATA_CHANNEL is not true.
  • METADATA_ID: the metadata instance id. It should be different in each iohub application. Ignored if USE_METADATA_CHANNEL is not true.
  • METADATA_TOPIC: a pubsub topic, receiving the metadata. Ignored if USE_METADATA_CHANNEL is not true.

Batch mode

  • BATCH_MODE: If true, messages are sent grouped in batches. Defaults to false.
  • BATCH_MAX_MSG_SIZE: It represents the maximum number of messages to buffer before sending a payload. Used only if BATCH_MODE is true. Defaults to 100.
  • BATCH_MAX_BYTE_SIZE: It represents the maximum number of bytes to buffer before sending a payload. Used only if BATCH_MODE is true. Defaults to 1 * 1024 * 1024 (1MB).
  • BATCH_MAX_WAIT: It represents the maximum duration in milliseconds to wait before sending a payload. Used only if BATCH_MODE is true. Defaults to 10000.

Other variables

  • SKIP_RETAIN: If true, discard all incoming MQTT messages with retain flag set to true. Defaults to false.

With SKIP_RETAIN set to false, each client that subscribes to a topic pattern receives the retained message immediately after they subscribe.

The broker stores only one retained message per topic.

Docker container details

Image: us-central1-docker.pkg.dev/ez-shared/iohub/iohub-mqtt-googlepubsub

Supported architecture: amd64

Changelog

v1.0.5
  • Smaller image
v1.0.4
  • USE_METADATA_CHANNEL Environment variable added
  • METADATA_FOR_CHANNEL Environment variable added
  • METADATA_ID Environment variable added
  • METADATA_TOPIC Environment variable added
v1.0.3
  • MQTT_IN_TOPIC accepts multiple topics
v1.0.2
  • MQTT_WANT_QOS2 Environment variable added
  • Batch mode added
  • SKIP_RETAIN Environment variable added
v1.0.1
  • METADATA Environment Variable added
v1.0.0
  • First Release