Module meerschaum.connectors.mqtt.MQTTConnector
Implement the Meerschaum Connector to connect to MQTT brokers.
Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
"""
Implement the Meerschaum Connector to connect to MQTT brokers.
"""
from meerschaum.connectors.Connector import Connector
class MQTTConnector(Connector):
from ._subscribe import subscribe
from ._fetch import fetch
def __init__(
self,
label : str = 'main',
debug : bool = False,
**kw
):
super().__init__('mqtt', label=label, **kw)
self.verify_attributes({'host'})
### default port for MQTT is 1883
if 'port' not in self.__dict__:
self.port = 1883
if 'keepalive' not in self.__dict__:
self.keepalive = 60
from meerschaum.utils.packages import attempt_import
mqtt = attempt_import('paho.mqtt.client')
### default: 'tcp'. Can also be 'websockets'
transport = 'tcp'
if 'transport' in self.__dict__:
transport = self.transport
### tell the broker to delete client information on disconnect
clean_session = True
if 'clean_session' in self.__dict__:
clean_session = self.clean_session
self.client = mqtt.Client(
clean_session = clean_session,
transport = transport,
)
### if username and password provided, pass to client
if 'username' in self.__dict__ and 'password' in self.__dict__:
self.client.username_pw_set(username=self.username, password=self.password)
### keep a record of the last messages per topic in case we want to omit duplicate values
self._last_msgs = dict()
def __del__(self):
self.client.disconnect()
Classes
class MQTTConnector (label: str = 'main', debug: bool = False, **kw)
-
The base connector class to hold connection attributes,
Parameters
type
:str
- The type of the connection. Used as a key in config.yaml to get attributes. Supported values are 'sql', 'api', 'mqtt', 'plugin'.
label
:str
- The label for the connection. Used as a key within config.yaml
pandas
:str
- Custom pandas implementation name. E.g. May change to modin.pandas. NOTE: This is experimental!
Run
mrsm edit config
and to edit connectors in the YAML file:meerschaum: connections: {type}: {label}: ### attributes go here
Expand source code
class MQTTConnector(Connector): from ._subscribe import subscribe from ._fetch import fetch def __init__( self, label : str = 'main', debug : bool = False, **kw ): super().__init__('mqtt', label=label, **kw) self.verify_attributes({'host'}) ### default port for MQTT is 1883 if 'port' not in self.__dict__: self.port = 1883 if 'keepalive' not in self.__dict__: self.keepalive = 60 from meerschaum.utils.packages import attempt_import mqtt = attempt_import('paho.mqtt.client') ### default: 'tcp'. Can also be 'websockets' transport = 'tcp' if 'transport' in self.__dict__: transport = self.transport ### tell the broker to delete client information on disconnect clean_session = True if 'clean_session' in self.__dict__: clean_session = self.clean_session self.client = mqtt.Client( clean_session = clean_session, transport = transport, ) ### if username and password provided, pass to client if 'username' in self.__dict__ and 'password' in self.__dict__: self.client.username_pw_set(username=self.username, password=self.password) ### keep a record of the last messages per topic in case we want to omit duplicate values self._last_msgs = dict() def __del__(self): self.client.disconnect()
Ancestors
- meerschaum.connectors.Connector.Connector
Methods
def fetch(self, pipe: Pipe, callback: function = None, debug: bool = False, **kw) ‑> None
-
Subscribe to a topic, parse the JSON when messages come in, and send data to Pipe.
Unlike other fetch functions, MQTT fetch depends on callbacks and calls pipe.sync() directly, rather than being a subroutine like SQL or API.
Parameters
pipe
:'meerschaum.Pipe' :
callback
:'function' :
- (Default value = None)
debug
:bool :
- (Default value = False)
**kw :
Returns
Expand source code
def fetch( self, pipe : 'meerschaum.Pipe', callback : 'function' = None, debug : bool = False, **kw ) -> 'None': """Subscribe to a topic, parse the JSON when messages come in, and send data to Pipe. Unlike other fetch functions, MQTT fetch depends on callbacks and calls pipe.sync() directly, rather than being a subroutine like SQL or API. Parameters ---------- pipe : 'meerschaum.Pipe' : callback : 'function' : (Default value = None) debug : bool : (Default value = False) **kw : Returns ------- """ from meerschaum.utils.warnings import warn, error from meerschaum.utils.debug import dprint if 'fetch' not in pipe.parameters: warn(f"Parameters for pipe {pipe} must include \"fetch\".") return None instructions = pipe.parameters.get('fetch', {}) topic = instructions.get('topic', None) if topic is None: warn(f"Missing topic from parameters for pipe {pipe}. Defaulting to \"#\" (all possible topics!).") topic = '#' ### default: only include values that have changed skip_duplicates = True if 'skip_duplicates' in instructions: skip_duplicates = instructions['skip_duplicates'] ### callback is executed each time a message is published def _fetch_callback(msg : str): from meerschaum.utils.packages import import_pandas from meerschaum.utils.misc import parse_df_datetimes, df_from_literal pd = import_pandas() df = None ### first, try to parse JSON try: df = parse_df_datetimes(pd.read_json(msg)) except Exception as e: pass ### if parsing JSON fails, see if we can parse it literally if df is None: df = df_from_literal(pipe, msg, debug=debug) if debug: dprint(f"{df}") pipe.sync(df, debug=debug) ### optional: user may override callback if callback is None: callback = _fetch_callback ### subscribe to the Pipe's topic self.subscribe( topic, callback, skip_duplicates = skip_duplicates, debug = debug )
def subscribe(self, topic: str = '#', callback: "'function'" = None, skip_duplicates=True, forever: bool = False, debug: bool = False, **kw)
-
Subscribe to an MQTT topic and execute a callback function.
topic : str : '#' MQTT topic to subscribe to. Default is all available topics. (WARNING: this may have unexpected results!)
callback : function Callback function must take only one string parameter.
skip_duplicates : bool : True If True, only execute the callback function if the message value is different from the last one received.
forever : bool : False If
forever
is True, block the main thread in a loop. Otherwise spin up a new thread for the duration of the main thread.Parameters
topic
:str :
- (Default value = '#')
callback
:'function' :
- (Default value = None)
- skip_duplicates :
- (Default value = True)
forever
:bool :
- (Default value = False)
debug
:bool :
- (Default value = False)
**kw :
Returns
Expand source code
def subscribe( self, topic : str = '#', callback : 'function' = None, skip_duplicates = True, forever : bool = False, debug : bool = False, **kw ): """Subscribe to an MQTT topic and execute a callback function. topic : str : '#' MQTT topic to subscribe to. Default is all available topics. (WARNING: this may have unexpected results!) callback : function Callback function must take only one string parameter. skip_duplicates : bool : True If True, only execute the callback function if the message value is different from the last one received. forever : bool : False If `forever` is True, block the main thread in a loop. Otherwise spin up a new thread for the duration of the main thread. Parameters ---------- topic : str : (Default value = '#') callback : 'function' : (Default value = None) skip_duplicates : (Default value = True) forever : bool : (Default value = False) debug : bool : (Default value = False) **kw : Returns ------- """ from meerschaum.utils.warnings import error, warn from meerschaum.utils.debug import dprint ### default callback action: debug print message def default_callback(msg : str): dprint(msg) if callback is None: callback = default_callback ### decode the payload and execute the callback function with the string as the only parameter def _parse_message(client, userdata, message): """Parse the payload (assuming it's a UTF-8 string. May add options to this later) and if skip_duplicates is True, check for a change in the payload. Parameters ---------- client : userdata : message : Returns ------- """ execute_callback = True if skip_duplicates: ### check if the current message is different from the last if message.topic not in self._last_msgs: execute_callback = True else: execute_callback = (self._last_msgs[message.topic] != message.payload) self._last_msgs[message.topic] = message.payload if execute_callback: return callback(message.payload.decode('utf-8')) dprint("Message on topic " + f'"{topic}"' + " has not changed since the last message. Skipping...") def _subscribe_on_connect(client, userdata, flags, rc): """Subscribe to the topic when connecting (resubscribes in case of disconnect). Parameters ---------- client : userdata : flags : rc : Returns ------- """ if rc > 0: warn(f"Received return code {rc} from '{self.host}' on topic '{topic}'.") if rc == 5: warn(f"Are the credentials for '{self}' correct?", stack=False) if debug: dprint("Subscribed to " + f'"{topic}"' + ". Starting network loop...") client.subscribe(topic) self.client.on_message = _parse_message self.client.on_connect = _subscribe_on_connect try: self.client.connect(self.host, self.port, self.keepalive) except Exception as e: error( "Failed to connect to MQTT broker " + '"{self.host}"' + " with connector: " + f"{self}" ) ### start a new thread that fires callback when messages are received if not forever: self.client.loop_start() else: self.client.loop_forever()