meerschaum.utils

The utils module contains utility functions. These include tools from primary utilities (get_pipes) to miscellaneous helper functions.

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6The utils module contains utility functions.
 7These include tools from primary utilities (get_pipes)
 8to miscellaneous helper functions.
 9"""
10
11__all__ = (
12    'daemon',
13    'dataframe',
14    'debug',
15    'dtypes',
16    'formatting',
17    'interactive',
18    'misc',
19    'networking',
20    'packages',
21    'pool',
22    'process',
23    'prompt',
24    'schedule',
25    'sql',
26    'threading',
27    'typing',
28    'venv',
29    'warnings',
30    'yaml',
31    "get_pipes",
32    "fetch_pipes_keys",
33)
34from meerschaum.utils._get_pipes import get_pipes, fetch_pipes_keys
def get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType] = None, instance: Union[str, meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType] = None, as_list: bool = False, method: str = 'registered', wait: bool = False, debug: bool = False, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, meerschaum.core.Pipe.Pipe]]], List[meerschaum.core.Pipe.Pipe]]:
 19def get_pipes(
 20        connector_keys: Union[str, List[str], None] = None,
 21        metric_keys: Union[str, List[str], None] = None,
 22        location_keys: Union[str, List[str], None] = None,
 23        tags: Optional[List[str]] = None,
 24        params: Optional[Dict[str, Any]] = None,
 25        mrsm_instance: Union[str, InstanceConnector, None] = None,
 26        instance: Union[str, InstanceConnector, None] = None,
 27        as_list: bool = False,
 28        method: str = 'registered',
 29        wait: bool = False,
 30        debug: bool = False,
 31        **kw: Any
 32    ) -> Union[PipesDict, List[mrsm.Pipe]]:
 33    """
 34    Return a dictionary or list of `meerschaum.Pipe` objects.
 35
 36    Parameters
 37    ----------
 38    connector_keys: Union[str, List[str], None], default None
 39        String or list of connector keys.
 40        If omitted or is `'*'`, fetch all possible keys.
 41        If a string begins with `'_'`, select keys that do NOT match the string.
 42
 43    metric_keys: Union[str, List[str], None], default None
 44        String or list of metric keys. See `connector_keys` for formatting.
 45
 46    location_keys: Union[str, List[str], None], default None
 47        String or list of location keys. See `connector_keys` for formatting.
 48
 49    tags: Optional[List[str]], default None
 50         If provided, only include pipes with these tags.
 51
 52    params: Optional[Dict[str, Any]], default None
 53        Dictionary of additional parameters to search by.
 54        Params are parsed into a SQL WHERE clause.
 55        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`
 56
 57    mrsm_instance: Union[str, InstanceConnector, None], default None
 58        Connector keys for the Meerschaum instance of the pipes.
 59        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
 60        `meerschaum.connectors.api.APIConnector.APIConnector`.
 61        
 62    as_list: bool, default False
 63        If `True`, return pipes in a list instead of a hierarchical dictionary.
 64        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
 65        `True`  : `[Pipe]`
 66
 67    method: str, default 'registered'
 68        Available options: `['registered', 'explicit', 'all']`
 69        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
 70        (API or SQL connector, depends on mrsm_instance).
 71        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
 72        instead of consulting the pipes table. Useful for creating non-existent pipes.
 73        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
 74        **NOTE:** Method `'all'` is not implemented!
 75
 76    wait: bool, default False
 77        Wait for a connection before getting Pipes. Should only be true for cases where the
 78        database might not be running (like the API).
 79
 80    **kw: Any:
 81        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
 82        
 83
 84    Returns
 85    -------
 86    A dictionary of dictionaries and `meerschaum.Pipe` objects
 87    in the connector, metric, location hierarchy.
 88    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.
 89
 90    Examples
 91    --------
 92    ```
 93    >>> ### Manual definition:
 94    >>> pipes = {
 95    ...     <connector_keys>: {
 96    ...         <metric_key>: {
 97    ...             <location_key>: Pipe(
 98    ...                 <connector_keys>,
 99    ...                 <metric_key>,
100    ...                 <location_key>,
101    ...             ),
102    ...         },
103    ...     },
104    ... },
105    >>> ### Accessing a single pipe:
106    >>> pipes['sql:main']['weather'][None]
107    >>> ### Return a list instead:
108    >>> get_pipes(as_list=True)
109    [sql_main_weather]
110    >>> 
111    ```
112    """
113
114    from meerschaum.config import get_config
115    from meerschaum.utils.warnings import error
116    from meerschaum.utils.misc import filter_keywords
117
118    if connector_keys is None:
119        connector_keys = []
120    if metric_keys is None:
121        metric_keys = []
122    if location_keys is None:
123        location_keys = []
124    if params is None:
125        params = {}
126    if tags is None:
127        tags = []
128
129    if isinstance(connector_keys, str):
130        connector_keys = [connector_keys]
131    if isinstance(metric_keys, str):
132        metric_keys = [metric_keys]
133    if isinstance(location_keys, str):
134        location_keys = [location_keys]
135
136    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
137    ### If `wait`, wait until a connection is made
138    if mrsm_instance is None:
139        mrsm_instance = instance
140    if mrsm_instance is None:
141        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
142    if isinstance(mrsm_instance, str):
143        from meerschaum.connectors.parse import parse_instance_keys
144        connector = parse_instance_keys(keys=mrsm_instance, wait=wait, debug=debug)
145    else: ### NOTE: mrsm_instance MUST be a SQL or API connector for this to work
146        from meerschaum.connectors import instance_types
147        valid_connector = False
148        if hasattr(mrsm_instance, 'type'):
149            if mrsm_instance.type in instance_types:
150                valid_connector = True
151        if not valid_connector:
152            error(f"Invalid instance connector: {mrsm_instance}")
153        connector = mrsm_instance
154    if debug:
155        from meerschaum.utils.debug import dprint
156        dprint(f"Using instance connector: {connector}")
157    if not connector:
158        error(f"Could not create connector from keys: '{mrsm_instance}'")
159
160    ### Get a list of tuples for the keys needed to build pipes.
161    result = fetch_pipes_keys(
162        method,
163        connector,
164        connector_keys = connector_keys,
165        metric_keys = metric_keys,
166        location_keys = location_keys,
167        tags = tags,
168        params = params,
169        debug = debug
170    )
171    if result is None:
172        error(f"Unable to build pipes!")
173
174    ### Populate the `pipes` dictionary with Pipes based on the keys
175    ### obtained from the chosen `method`.
176    from meerschaum import Pipe
177    pipes = {}
178    for ck, mk, lk in result:
179        if ck not in pipes:
180            pipes[ck] = {}
181
182        if mk not in pipes[ck]:
183            pipes[ck][mk] = {}
184
185        pipes[ck][mk][lk] = Pipe(
186            ck, mk, lk,
187            mrsm_instance = connector,
188            debug = debug,
189            **filter_keywords(Pipe, **kw)
190        )
191
192    if not as_list:
193        return pipes
194    from meerschaum.utils.misc import flatten_pipes_dict
195    return flatten_pipes_dict(pipes)

Return a dictionary or list of meerschaum.Pipe objects.

Parameters
  • connector_keys (Union[str, List[str], None], default None): String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
  • metric_keys (Union[str, List[str], None], default None): String or list of metric keys. See connector_keys for formatting.
  • location_keys (Union[str, List[str], None], default None): String or list of location keys. See connector_keys for formatting.
  • tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
  • mrsm_instance (Union[str, InstanceConnector, None], default None): Connector keys for the Meerschaum instance of the pipes. Must be a meerschaum.connectors.sql.SQLConnector.SQLConnector or meerschaum.connectors.api.APIConnector.APIConnector.
  • as_list (bool, default False): If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
  • method (str, default 'registered'): Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
  • wait (bool, default False): Wait for a connection before getting Pipes. Should only be true for cases where the database might not be running (like the API).
  • **kw (Any:): Keyword arguments to pass to the meerschaum.Pipe constructor.
Returns
  • A dictionary of dictionaries and meerschaum.Pipe objects
  • in the connector, metric, location hierarchy.
  • If as_list is True, return a list of meerschaum.Pipe objects.
Examples
>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>> 
def fetch_pipes_keys( method: str, connector: meerschaum.connectors.Connector.Connector, **kw: Any) -> List[Tuple[str, str, str]]:
198def fetch_pipes_keys(
199        method: str,
200        connector: 'meerschaum.connectors.Connector',
201        **kw: Any
202    ) -> List[Tuple[str, str, str]]:
203    """
204    Fetch keys for pipes according to a method.
205
206    Parameters
207    ----------
208    method: str
209        The method by which to fetch the keys. See `get_pipes()` above.
210
211    connector: meerschaum.connectors.Connector
212        The connector to use to fetch the keys.
213        Must be of type `meerschaum.connectors.sql.SQLConnector.SQLConnector`
214        or `meerschaum.connectors.api.APIConnector.APIConnector`.
215
216    connector_keys: Optional[List[str]], default None
217        The list of `connector_keys` to filter by.
218
219    metric_keys: Optional[List[str]], default None
220        The list of `metric_keys` to filter by.
221
222    location_keys: Optional[List[str]], default None
223        The list of `location_keys` to filter by.
224
225    params: Optional[Dict[str, Any]], default None
226        A dictionary of parameters to filter by.
227
228    debug: bool
229        Verbosity toggle.
230
231    Returns
232    -------
233    A list of tuples of strings (or `None` for `location_key`)
234    in the form `(connector_keys, metric_key, location_key)`.
235    
236    Examples
237    --------
238    >>> fetch_pipes_keys(metric_keys=['weather'])
239    [('sql:main', 'weather', None)]
240    """
241    from meerschaum.utils.warnings import error
242
243    def _registered(
244            connector_keys: Optional[List[str]] = None,
245            metric_keys: Optional[List[str]] = None,
246            location_keys: Optional[List[str]] = None,
247            tags: Optional[List[str]] = None,
248            params: Optional[Dict[str, Any]] = None,
249            debug: bool = False,
250            **kw
251        ) -> List[Tuple[str, str, str]]:
252        """
253        Get keys from the pipes table or the API directly.
254        Builds query or URL based on provided keys and parameters.
255        
256        Only works for SQL and API Connectors.
257        """
258        if connector_keys is None:
259            connector_keys = []
260        if metric_keys is None:
261            metric_keys = []
262        if location_keys is None:
263            location_keys = []
264        if params is None:
265            params = {}
266        if tags is None:
267            tags = []
268
269        return connector.fetch_pipes_keys(
270            connector_keys = connector_keys,
271            metric_keys = metric_keys,
272            location_keys = location_keys,
273            tags = tags,
274            params = params,
275            debug = debug
276        )
277
278    def _explicit(
279            connector_keys: Optional[List[str]] = None,
280            metric_keys: Optional[List[str]] = None,
281            location_keys: Optional[List[str]] = None,
282            params: Optional[Dict[str, Any]] = None,
283            tags: Optional[List[str]] = None,
284            debug: bool = False,
285            **kw
286        ) -> List[Tuple[str, str, str]]:
287        """
288        Explicitly build Pipes based on provided keys.
289        Raises an error if `connector_keys` or `metric_keys` is empty,
290        and assumes `location_keys = [None]` if empty.
291        """
292
293        if connector_keys is None:
294            connector_keys = []
295        if metric_keys is None:
296            metric_keys = []
297        if location_keys is None:
298            location_keys = []
299        if params is None:
300            params = {}
301
302        if not isinstance(connector_keys, list):
303            connector_keys = [connector_keys]
304        if not isinstance(metric_keys, list):
305            metric_keys = [metric_keys]
306        if not isinstance(location_keys, list):
307            location_keys = [location_keys]
308
309        missing_keys = []
310        if len(connector_keys) == 0:
311            missing_keys.append('connector_keys')
312        if len(metric_keys) == 0:
313            missing_keys.append('metric_keys')
314        if len(location_keys) == 0:
315            location_keys.append(None)
316        if len(missing_keys) > 0:
317            error_message = "Missing parameters: '" + "', '".join(missing_keys) + "'"
318            error_message += "\nSee --help for information for passing parameters."
319            error(error_message)
320        result = []
321        for ck in connector_keys:
322            for mk in metric_keys:
323                for lk in location_keys:
324                    result.append((ck, mk, lk))
325        return result
326
327    _method_functions = {
328        'registered' : _registered,
329        'explicit'   : _explicit,
330    }
331    if method not in _method_functions:
332        error(f"Method '{method}' is not supported!", NotImplementedError)
333    return _method_functions[method](**kw)

Fetch keys for pipes according to a method.

Parameters
  • method (str): The method by which to fetch the keys. See get_pipes() above.
  • connector (meerschaum.connectors.Connector): The connector to use to fetch the keys. Must be of type meerschaum.connectors.sql.SQLConnector.SQLConnector or meerschaum.connectors.api.APIConnector.APIConnector.
  • connector_keys (Optional[List[str]], default None): The list of connector_keys to filter by.
  • metric_keys (Optional[List[str]], default None): The list of metric_keys to filter by.
  • location_keys (Optional[List[str]], default None): The list of location_keys to filter by.
  • params (Optional[Dict[str, Any]], default None): A dictionary of parameters to filter by.
  • debug (bool): Verbosity toggle.
Returns
  • A list of tuples of strings (or None for location_key)
  • in the form (connector_keys, metric_key, location_key).
Examples
>>> fetch_pipes_keys(metric_keys=['weather'])
[('sql:main', 'weather', None)]