Module meerschaum.connectors.mqtt.MQTTConnector

Implement the Meerschaum Connector to connect to MQTT brokers.

Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

"""
Implement the Meerschaum Connector to connect to MQTT brokers.
"""

from meerschaum.connectors.Connector import Connector

class MQTTConnector(Connector):

    from ._subscribe import subscribe
    from ._fetch import fetch

    def __init__(
        self,
        label : str = 'main',
        debug : bool = False,
        **kw
    ):
        super().__init__('mqtt', label=label, **kw)

        self.verify_attributes({'host'})

        ### default port for MQTT is 1883
        if 'port' not in self.__dict__:
            self.port = 1883

        if 'keepalive' not in self.__dict__:
            self.keepalive = 60

        from meerschaum.utils.packages import attempt_import
        mqtt = attempt_import('paho.mqtt.client')

        ### default: 'tcp'. Can also be 'websockets'
        transport = 'tcp'
        if 'transport' in self.__dict__:
            transport = self.transport

        ### tell the broker to delete client information on disconnect
        clean_session = True
        if 'clean_session' in self.__dict__:
            clean_session = self.clean_session

        self.client = mqtt.Client(
            clean_session = clean_session,
            transport = transport,
        )

        ### if username and password provided, pass to client
        if 'username' in self.__dict__ and 'password' in self.__dict__:
            self.client.username_pw_set(username=self.username, password=self.password)

        ### keep a record of the last messages per topic in case we want to omit duplicate values
        self._last_msgs = dict()

    def __del__(self):
        self.client.disconnect()

Classes

class MQTTConnector (label: str = 'main', debug: bool = False, **kw)

The base connector class to hold connection attributes,

Parameters

type : str
The type of the connection. Used as a key in config.yaml to get attributes. Supported values are 'sql', 'api', 'mqtt', 'plugin'.
label : str
The label for the connection. Used as a key within config.yaml
pandas : str
Custom pandas implementation name. E.g. May change to modin.pandas. NOTE: This is experimental!

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
Expand source code
class MQTTConnector(Connector):

    from ._subscribe import subscribe
    from ._fetch import fetch

    def __init__(
        self,
        label : str = 'main',
        debug : bool = False,
        **kw
    ):
        super().__init__('mqtt', label=label, **kw)

        self.verify_attributes({'host'})

        ### default port for MQTT is 1883
        if 'port' not in self.__dict__:
            self.port = 1883

        if 'keepalive' not in self.__dict__:
            self.keepalive = 60

        from meerschaum.utils.packages import attempt_import
        mqtt = attempt_import('paho.mqtt.client')

        ### default: 'tcp'. Can also be 'websockets'
        transport = 'tcp'
        if 'transport' in self.__dict__:
            transport = self.transport

        ### tell the broker to delete client information on disconnect
        clean_session = True
        if 'clean_session' in self.__dict__:
            clean_session = self.clean_session

        self.client = mqtt.Client(
            clean_session = clean_session,
            transport = transport,
        )

        ### if username and password provided, pass to client
        if 'username' in self.__dict__ and 'password' in self.__dict__:
            self.client.username_pw_set(username=self.username, password=self.password)

        ### keep a record of the last messages per topic in case we want to omit duplicate values
        self._last_msgs = dict()

    def __del__(self):
        self.client.disconnect()

Ancestors

  • meerschaum.connectors.Connector.Connector

Methods

def fetch(self, pipe: Pipe, callback: function = None, debug: bool = False, **kw) ‑> None

Subscribe to a topic, parse the JSON when messages come in, and send data to Pipe.

Unlike other fetch functions, MQTT fetch depends on callbacks and calls pipe.sync() directly, rather than being a subroutine like SQL or API.

Parameters

pipe : 'meerschaum.Pipe' :
 
callback : 'function' :
(Default value = None)
debug : bool :
(Default value = False)

**kw :

Returns

Expand source code
def fetch(
        self,
        pipe : 'meerschaum.Pipe',
        callback : 'function' = None,
        debug : bool = False,
        **kw
    ) -> 'None':
    """Subscribe to a topic, parse the JSON when messages come in, and send data to Pipe.
    
    Unlike other fetch functions, MQTT fetch depends on callbacks and calls pipe.sync() directly,
    rather than being a subroutine like SQL or API.

    Parameters
    ----------
    pipe : 'meerschaum.Pipe' :
        
    callback : 'function' :
         (Default value = None)
    debug : bool :
         (Default value = False)
    **kw :
        

    Returns
    -------

    """
    from meerschaum.utils.warnings import warn, error
    from meerschaum.utils.debug import dprint

    if 'fetch' not in pipe.parameters:
        warn(f"Parameters for pipe {pipe} must include \"fetch\".")
        return None
    
    instructions = pipe.parameters.get('fetch', {})

    topic = instructions.get('topic', None)
    if topic is None:
        warn(f"Missing topic from parameters for pipe {pipe}. Defaulting to \"#\" (all possible topics!).")
        topic = '#'

    ### default: only include values that have changed
    skip_duplicates = True
    if 'skip_duplicates' in instructions:
        skip_duplicates = instructions['skip_duplicates']

    ### callback is executed each time a message is published
    def _fetch_callback(msg : str):
        from meerschaum.utils.packages import import_pandas
        from meerschaum.utils.misc import parse_df_datetimes, df_from_literal
        pd = import_pandas()

        df = None
        ### first, try to parse JSON
        try:
            df = parse_df_datetimes(pd.read_json(msg))
        except Exception as e:
            pass

        ### if parsing JSON fails, see if we can parse it literally
        if df is None:
            df = df_from_literal(pipe, msg, debug=debug)

        if debug:
            dprint(f"{df}")
        pipe.sync(df, debug=debug)

    ### optional: user may override callback
    if callback is None:
        callback = _fetch_callback

    ### subscribe to the Pipe's topic
    self.subscribe(
        topic,
        callback,
        skip_duplicates = skip_duplicates,
        debug = debug
    )
def subscribe(self, topic: str = '#', callback: "'function'" = None, skip_duplicates=True, forever: bool = False, debug: bool = False, **kw)

Subscribe to an MQTT topic and execute a callback function.

topic : str : '#' MQTT topic to subscribe to. Default is all available topics. (WARNING: this may have unexpected results!)

callback : function Callback function must take only one string parameter.

skip_duplicates : bool : True If True, only execute the callback function if the message value is different from the last one received.

forever : bool : False If forever is True, block the main thread in a loop. Otherwise spin up a new thread for the duration of the main thread.

Parameters

topic : str :
(Default value = '#')
callback : 'function' :
(Default value = None)
skip_duplicates :
(Default value = True)
forever : bool :
(Default value = False)
debug : bool :
(Default value = False)

**kw :

Returns

Expand source code
def subscribe(
        self,
        topic : str = '#',
        callback : 'function' = None,
        skip_duplicates = True,
        forever : bool = False,
        debug : bool = False,
        **kw
    ):
    """Subscribe to an MQTT topic and execute a callback function.
    
    topic : str : '#'
        MQTT topic to subscribe to. Default is all available topics.
        (WARNING: this may have unexpected results!)
    
    callback : function
        Callback function must take only one string parameter.
    
    skip_duplicates : bool : True
        If True, only execute the callback function if the message value is different from the last
        one received.
    
    forever : bool : False
        If `forever` is True, block the main thread in a loop. Otherwise spin up a new thread
        for the duration of the main thread.

    Parameters
    ----------
    topic : str :
         (Default value = '#')
    callback : 'function' :
         (Default value = None)
    skip_duplicates :
         (Default value = True)
    forever : bool :
         (Default value = False)
    debug : bool :
         (Default value = False)
    **kw :
        

    Returns
    -------

    """
    from meerschaum.utils.warnings import error, warn
    from meerschaum.utils.debug import dprint
    
    ### default callback action: debug print message
    def default_callback(msg : str):
        dprint(msg)

    if callback is None:
        callback = default_callback

    ### decode the payload and execute the callback function with the string as the only parameter
    def _parse_message(client, userdata, message):
        """Parse the payload (assuming it's a UTF-8 string. May add options to this later)
        and if skip_duplicates is True, check for a change in the payload.

        Parameters
        ----------
        client :
            
        userdata :
            
        message :
            

        Returns
        -------

        """
        execute_callback = True
        if skip_duplicates:
            ### check if the current message is different from the last
            if message.topic not in self._last_msgs:
                execute_callback = True
            else:
                execute_callback = (self._last_msgs[message.topic] != message.payload)
        self._last_msgs[message.topic] = message.payload
        if execute_callback:
            return callback(message.payload.decode('utf-8'))
        dprint("Message on topic " + f'"{topic}"' + " has not changed since the last message. Skipping...")

    def _subscribe_on_connect(client, userdata, flags, rc):
        """Subscribe to the topic when connecting (resubscribes in case of disconnect).

        Parameters
        ----------
        client :
            
        userdata :
            
        flags :
            
        rc :
            

        Returns
        -------

        """
        if rc > 0:
            warn(f"Received return code {rc} from '{self.host}' on topic '{topic}'.")
            if rc == 5:
                warn(f"Are the credentials for '{self}' correct?", stack=False)
        if debug:
            dprint("Subscribed to " + f'"{topic}"' + ". Starting network loop...")
        client.subscribe(topic)

    self.client.on_message = _parse_message
    self.client.on_connect = _subscribe_on_connect

    try:
        self.client.connect(self.host, self.port, self.keepalive)
    except Exception as e:
        error(
            "Failed to connect to MQTT broker " +
            '"{self.host}"' + " with connector: " + f"{self}"
        )

    ### start a new thread that fires callback when messages are received
    if not forever:
        self.client.loop_start()
    else:
        self.client.loop_forever()