meerschaum.utils

The utils module contains utility functions. These include tools from primary utilities (get_pipes) to miscellaneous helper functions.

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6The utils module contains utility functions.
 7These include tools from primary utilities (get_pipes)
 8to miscellaneous helper functions.
 9"""
10
11__all__ = (
12    'daemon',
13    'dataframe',
14    'debug',
15    'dtypes',
16    'formatting',
17    'interactive',
18    'misc',
19    'networking',
20    'packages',
21    'pool',
22    'process',
23    'prompt',
24    'schedule',
25    'sql',
26    'threading',
27    'typing',
28    'venv',
29    'warnings',
30    'yaml',
31    "get_pipes",
32    "fetch_pipes_keys",
33)
34from meerschaum.utils._get_pipes import get_pipes, fetch_pipes_keys
def get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.InstanceConnector, NoneType] = None, instance: Union[str, meerschaum.InstanceConnector, NoneType] = None, as_list: bool = False, as_tags_dict: bool = False, method: str = 'registered', workers: Optional[int] = None, debug: bool = False, _cache_parameters: bool = True, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]], List[meerschaum.Pipe], Dict[str, meerschaum.Pipe]]:
 29def get_pipes(
 30    connector_keys: Union[str, List[str], None] = None,
 31    metric_keys: Union[str, List[str], None] = None,
 32    location_keys: Union[str, List[str], None] = None,
 33    tags: Optional[List[str]] = None,
 34    params: Optional[Dict[str, Any]] = None,
 35    mrsm_instance: Union[str, InstanceConnector, None] = None,
 36    instance: Union[str, InstanceConnector, None] = None,
 37    as_list: bool = False,
 38    as_tags_dict: bool = False,
 39    method: str = 'registered',
 40    workers: Optional[int] = None,
 41    debug: bool = False,
 42    _cache_parameters: bool = True,
 43    **kw: Any
 44) -> Union[PipesDict, List[mrsm.Pipe], Dict[str, mrsm.Pipe]]:
 45    """
 46    Return a dictionary or list of `meerschaum.Pipe` objects.
 47
 48    Parameters
 49    ----------
 50    connector_keys: Union[str, List[str], None], default None
 51        String or list of connector keys.
 52        If omitted or is `'*'`, fetch all possible keys.
 53        If a string begins with `'_'`, select keys that do NOT match the string.
 54
 55    metric_keys: Union[str, List[str], None], default None
 56        String or list of metric keys. See `connector_keys` for formatting.
 57
 58    location_keys: Union[str, List[str], None], default None
 59        String or list of location keys. See `connector_keys` for formatting.
 60
 61    tags: Optional[List[str]], default None
 62         If provided, only include pipes with these tags.
 63
 64    params: Optional[Dict[str, Any]], default None
 65        Dictionary of additional parameters to search by.
 66        Params are parsed into a SQL WHERE clause.
 67        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`
 68
 69    mrsm_instance: Union[str, InstanceConnector, None], default None
 70        Connector keys for the Meerschaum instance of the pipes.
 71        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
 72        `meerschaum.connectors.api.APIConnector.APIConnector`.
 73        
 74    as_list: bool, default False
 75        If `True`, return pipes in a list instead of a hierarchical dictionary.
 76        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
 77        `True`  : `[Pipe]`
 78
 79    as_tags_dict: bool, default False
 80        If `True`, return a dictionary mapping tags to pipes.
 81        Pipes with multiple tags will be repeated.
 82
 83    method: str, default 'registered'
 84        Available options: `['registered', 'explicit', 'all']`
 85        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
 86        (API or SQL connector, depends on mrsm_instance).
 87        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
 88        instead of consulting the pipes table. Useful for creating non-existent pipes.
 89        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
 90        **NOTE:** Method `'all'` is not implemented!
 91
 92    workers: Optional[int], default None
 93        If provided (and `as_tags_dict` is `True`), set the number of workers for the pool
 94        to fetch tags.
 95        Only takes effect if the instance connector supports multi-threading
 96
 97    **kw: Any:
 98        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
 99
100    Returns
101    -------
102    A dictionary of dictionaries and `meerschaum.Pipe` objects
103    in the connector, metric, location hierarchy.
104    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.
105    If `as_tags_dict` is `True`, return a dictionary mapping tags to pipes.
106
107    Examples
108    --------
109    ```
110    >>> ### Manual definition:
111    >>> pipes = {
112    ...     <connector_keys>: {
113    ...         <metric_key>: {
114    ...             <location_key>: Pipe(
115    ...                 <connector_keys>,
116    ...                 <metric_key>,
117    ...                 <location_key>,
118    ...             ),
119    ...         },
120    ...     },
121    ... },
122    >>> ### Accessing a single pipe:
123    >>> pipes['sql:main']['weather'][None]
124    >>> ### Return a list instead:
125    >>> get_pipes(as_list=True)
126    [Pipe('sql:main', 'weather')]
127    >>> get_pipes(as_tags_dict=True)
128    {'gvl': Pipe('sql:main', 'weather')}
129    ```
130    """
131
132    import json
133    from collections import defaultdict
134    from meerschaum.config import get_config
135    from meerschaum.utils.warnings import error
136    from meerschaum.utils.misc import filter_keywords
137    from meerschaum.utils.pool import get_pool
138
139    if connector_keys is None:
140        connector_keys = []
141    if metric_keys is None:
142        metric_keys = []
143    if location_keys is None:
144        location_keys = []
145    if params is None:
146        params = {}
147    if tags is None:
148        tags = []
149
150    if isinstance(connector_keys, str):
151        connector_keys = [connector_keys]
152    if isinstance(metric_keys, str):
153        metric_keys = [metric_keys]
154    if isinstance(location_keys, str):
155        location_keys = [location_keys]
156
157    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
158    if mrsm_instance is None:
159        mrsm_instance = instance
160    if mrsm_instance is None:
161        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
162    if isinstance(mrsm_instance, str):
163        from meerschaum.connectors.parse import parse_instance_keys
164        connector = parse_instance_keys(keys=mrsm_instance, debug=debug)
165    else:
166        from meerschaum.connectors import instance_types
167        valid_connector = False
168        if hasattr(mrsm_instance, 'type'):
169            if mrsm_instance.type in instance_types:
170                valid_connector = True
171        if not valid_connector:
172            error(f"Invalid instance connector: {mrsm_instance}")
173        connector = mrsm_instance
174    if debug:
175        from meerschaum.utils.debug import dprint
176        dprint(f"Using instance connector: {connector}")
177    if not connector:
178        error(f"Could not create connector from keys: '{mrsm_instance}'")
179
180    ### Get a list of tuples for the keys needed to build pipes.
181    result = fetch_pipes_keys(
182        method,
183        connector,
184        connector_keys = connector_keys,
185        metric_keys = metric_keys,
186        location_keys = location_keys,
187        tags = tags,
188        params = params,
189        workers = workers,
190        debug = debug
191    )
192    if result is None:
193        error("Unable to build pipes!")
194
195    ### Populate the `pipes` dictionary with Pipes based on the keys
196    ### obtained from the chosen `method`.
197    from meerschaum import Pipe
198    pipes = {}
199    for keys_tuple in result:
200        ck, mk, lk = keys_tuple[0], keys_tuple[1], keys_tuple[2]
201        pipe_tags_or_parameters = keys_tuple[3] if len(keys_tuple) == 4 else None
202        pipe_parameters = (
203            pipe_tags_or_parameters
204            if isinstance(pipe_tags_or_parameters, (dict, str))
205            else None
206        )
207        if isinstance(pipe_parameters, str):
208            pipe_parameters = json.loads(pipe_parameters)
209        pipe_tags = (
210            pipe_tags_or_parameters
211            if isinstance(pipe_tags_or_parameters, list)
212            else (
213                pipe_tags_or_parameters.get('tags', [])
214                if isinstance(pipe_tags_or_parameters, dict)
215                else None
216            )
217        )
218
219        if ck not in pipes:
220            pipes[ck] = {}
221
222        if mk not in pipes[ck]:
223            pipes[ck][mk] = {}
224
225        pipe = Pipe(
226            ck, mk, lk,
227            mrsm_instance = connector,
228            parameters = pipe_parameters,
229            tags = pipe_tags,
230            debug = debug,
231            **filter_keywords(Pipe, **kw)
232        )
233        pipe.__dict__['_tags'] = pipe_tags
234        pipes[ck][mk][lk] = pipe
235
236    if not as_list and not as_tags_dict:
237        return pipes
238
239    from meerschaum.utils.misc import flatten_pipes_dict
240    pipes_list = flatten_pipes_dict(pipes)
241    if as_list:
242        return pipes_list
243
244    pool = get_pool(workers=(workers if connector.IS_THREAD_SAFE else 1))
245    def gather_pipe_tags(pipe: mrsm.Pipe) -> Tuple[mrsm.Pipe, List[str]]:
246        _tags = pipe.__dict__.get('_tags', None)
247        gathered_tags = _tags if _tags is not None else pipe.tags
248        return pipe, (gathered_tags or [])
249
250    tags_pipes = defaultdict(lambda: [])
251    pipes_tags = dict(pool.map(gather_pipe_tags, pipes_list))
252    for pipe, tags in pipes_tags.items():
253        for tag in (tags or []):
254            tags_pipes[tag].append(pipe)
255
256    return dict(tags_pipes)

Return a dictionary or list of meerschaum.Pipe objects.

Parameters
  • connector_keys (Union[str, List[str], None], default None): String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
  • metric_keys (Union[str, List[str], None], default None): String or list of metric keys. See connector_keys for formatting.
  • location_keys (Union[str, List[str], None], default None): String or list of location keys. See connector_keys for formatting.
  • tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
  • mrsm_instance (Union[str, InstanceConnector, None], default None): Connector keys for the Meerschaum instance of the pipes. Must be a meerschaum.connectors.sql.SQLConnector.SQLConnector or meerschaum.connectors.api.APIConnector.APIConnector.
  • as_list (bool, default False): If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
  • as_tags_dict (bool, default False): If True, return a dictionary mapping tags to pipes. Pipes with multiple tags will be repeated.
  • method (str, default 'registered'): Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
  • workers (Optional[int], default None): If provided (and as_tags_dict is True), set the number of workers for the pool to fetch tags. Only takes effect if the instance connector supports multi-threading
  • **kw (Any:): Keyword arguments to pass to the meerschaum.Pipe constructor.
Returns
  • A dictionary of dictionaries and meerschaum.Pipe objects
  • in the connector, metric, location hierarchy.
  • If as_list is True, return a list of meerschaum.Pipe objects.
  • If as_tags_dict is True, return a dictionary mapping tags to pipes.
Examples
>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[Pipe('sql:main', 'weather')]
>>> get_pipes(as_tags_dict=True)
{'gvl': Pipe('sql:main', 'weather')}
def fetch_pipes_keys( method: str, connector: meerschaum.InstanceConnector, **kw: Any) -> List[Tuple[str, str, str]]:
259def fetch_pipes_keys(
260    method: str,
261    connector: 'mrsm.connectors.InstanceConnector',
262    **kw: Any
263) -> List[Tuple[str, str, str]]:
264    """
265    Fetch keys for pipes according to a method.
266
267    Parameters
268    ----------
269    method: str
270        The method by which to fetch the keys. See `get_pipes()` above.
271
272    connector: meerschaum.connectors.InstanceConnector
273        The connector to use to fetch the keys.
274        Must be of type `meerschaum.connectors.sql.SQLConnector.SQLConnector`
275        or `meerschaum.connectors.api.APIConnector.APIConnector`.
276
277    connector_keys: Optional[List[str]], default None
278        The list of `connector_keys` to filter by.
279
280    metric_keys: Optional[List[str]], default None
281        The list of `metric_keys` to filter by.
282
283    location_keys: Optional[List[str]], default None
284        The list of `location_keys` to filter by.
285
286    params: Optional[Dict[str, Any]], default None
287        A dictionary of parameters to filter by.
288
289    debug: bool
290        Verbosity toggle.
291
292    Returns
293    -------
294    A list of tuples of strings (or `None` for `location_key`)
295    in the form `(connector_keys, metric_key, location_key)`.
296    
297    Examples
298    --------
299    >>> fetch_pipes_keys(metric_keys=['weather'])
300    [('sql:main', 'weather', None)]
301    """
302    from meerschaum.utils.warnings import error
303
304    def _registered(
305        connector_keys: Optional[List[str]] = None,
306        metric_keys: Optional[List[str]] = None,
307        location_keys: Optional[List[str]] = None,
308        tags: Optional[List[str]] = None,
309        params: Optional[Dict[str, Any]] = None,
310        debug: bool = False,
311        **kw
312    ) -> List[Tuple[str, str, str]]:
313        """
314        Get keys from the pipes table or the API directly.
315        Builds query or URL based on provided keys and parameters.
316        
317        Only works for SQL and API Connectors.
318        """
319        if connector_keys is None:
320            connector_keys = []
321        if metric_keys is None:
322            metric_keys = []
323        if location_keys is None:
324            location_keys = []
325        if params is None:
326            params = {}
327        if tags is None:
328            tags = []
329
330        return connector.fetch_pipes_keys(
331            connector_keys = connector_keys,
332            metric_keys = metric_keys,
333            location_keys = location_keys,
334            tags = tags,
335            params = params,
336            debug = debug
337        )
338
339    def _explicit(
340        connector_keys: Optional[List[str]] = None,
341        metric_keys: Optional[List[str]] = None,
342        location_keys: Optional[List[str]] = None,
343        params: Optional[Dict[str, Any]] = None,
344        tags: Optional[List[str]] = None,
345        debug: bool = False,
346        **kw
347    ) -> List[Tuple[str, str, str]]:
348        """
349        Explicitly build Pipes based on provided keys.
350        Raises an error if `connector_keys` or `metric_keys` is empty,
351        and assumes `location_keys = [None]` if empty.
352        """
353
354        if connector_keys is None:
355            connector_keys = []
356        if metric_keys is None:
357            metric_keys = []
358        if location_keys is None:
359            location_keys = []
360        if params is None:
361            params = {}
362
363        if not isinstance(connector_keys, list):
364            connector_keys = [connector_keys]
365        if not isinstance(metric_keys, list):
366            metric_keys = [metric_keys]
367        if not isinstance(location_keys, list):
368            location_keys = [location_keys]
369
370        missing_keys = []
371        if len(connector_keys) == 0:
372            missing_keys.append('connector_keys')
373        if len(metric_keys) == 0:
374            missing_keys.append('metric_keys')
375        if len(location_keys) == 0:
376            location_keys.append(None)
377        if len(missing_keys) > 0:
378            error_message = "Missing parameters: '" + "', '".join(missing_keys) + "'"
379            error_message += "\nSee --help for information for passing parameters."
380            error(error_message)
381        result = []
382        for ck in connector_keys:
383            for mk in metric_keys:
384                for lk in location_keys:
385                    result.append((ck, mk, lk))
386        return result
387
388    _method_functions = {
389        'registered' : _registered,
390        'explicit'   : _explicit,
391    }
392    if method not in _method_functions:
393        error(f"Method '{method}' is not supported!", NotImplementedError)
394    return _method_functions[method](**kw)

Fetch keys for pipes according to a method.

Parameters
Returns
  • A list of tuples of strings (or None for location_key)
  • in the form (connector_keys, metric_key, location_key).
Examples
>>> fetch_pipes_keys(metric_keys=['weather'])
[('sql:main', 'weather', None)]