meerschaum.utils

The utils module contains utility functions. These include tools from primary utilities (get_pipes) to miscellaneous helper functions.

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6The utils module contains utility functions.
 7These include tools from primary utilities (get_pipes)
 8to miscellaneous helper functions.
 9"""
10
11__all__ = (
12    'daemon',
13    'dataframe',
14    'debug',
15    'dtypes',
16    'formatting',
17    'interactive',
18    'misc',
19    'networking',
20    'packages',
21    'pipes',
22    'pool',
23    'process',
24    'prompt',
25    'schedule',
26    'sql',
27    'threading',
28    'typing',
29    'venv',
30    'warnings',
31    'yaml',
32    "get_pipes",
33    "fetch_pipes_keys",
34)
35from meerschaum.utils._get_pipes import get_pipes, fetch_pipes_keys
def get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, datetime_dtypes: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.InstanceConnector, NoneType] = None, instance: Union[str, meerschaum.InstanceConnector, NoneType] = None, as_list: bool = False, as_tags_dict: bool = False, method: str = 'registered', workers: Optional[int] = None, debug: bool = False, _cache_parameters: bool = True, **kw: Any) -> Union[Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]], List[meerschaum.Pipe], Dict[str, meerschaum.Pipe]]:
 29def get_pipes(
 30    connector_keys: Union[str, List[str], None] = None,
 31    metric_keys: Union[str, List[str], None] = None,
 32    location_keys: Union[str, List[str], None] = None,
 33    tags: Optional[List[str]] = None,
 34    datetime_dtypes: Optional[List[str]] = None,
 35    params: Optional[Dict[str, Any]] = None,
 36    mrsm_instance: Union[str, InstanceConnector, None] = None,
 37    instance: Union[str, InstanceConnector, None] = None,
 38    as_list: bool = False,
 39    as_tags_dict: bool = False,
 40    method: str = 'registered',
 41    workers: Optional[int] = None,
 42    debug: bool = False,
 43    _cache_parameters: bool = True,
 44    **kw: Any
 45) -> Union[PipesDict, List[mrsm.Pipe], Dict[str, mrsm.Pipe]]:
 46    """
 47    Return a dictionary or list of `meerschaum.Pipe` objects.
 48
 49    Parameters
 50    ----------
 51    connector_keys: Union[str, List[str], None], default None
 52        String or list of connector keys.
 53        If omitted or is `'*'`, fetch all possible keys.
 54        If a string begins with `'_'`, select keys that do NOT match the string.
 55
 56    metric_keys: Union[str, List[str], None], default None
 57        String or list of metric keys. See `connector_keys` for formatting.
 58
 59    location_keys: Union[str, List[str], None], default None
 60        String or list of location keys. See `connector_keys` for formatting.
 61
 62    tags: Optional[List[str]], default None
 63        If provided, only include pipes with these tags.
 64
 65    datetime_dtypes: Optional[List[str]], default None
 66        If provided, only include pipes with the corresponding `datetime` axis dtypes.
 67        Accepted values are `datetime`, `int`, `None` (or `null`, etc.).
 68        May be negated by `_`.
 69
 70    params: Optional[Dict[str, Any]], default None
 71        Dictionary of additional parameters to search by.
 72        Params are parsed into a SQL WHERE clause.
 73        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`
 74
 75    mrsm_instance: Union[str, InstanceConnector, None], default None
 76        Connector keys for the Meerschaum instance of the pipes.
 77        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
 78        `meerschaum.connectors.api.APIConnector.APIConnector`.
 79        
 80    as_list: bool, default False
 81        If `True`, return pipes in a list instead of a hierarchical dictionary.
 82        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
 83        `True`  : `[Pipe]`
 84
 85    as_tags_dict: bool, default False
 86        If `True`, return a dictionary mapping tags to pipes.
 87        Pipes with multiple tags will be repeated.
 88
 89    method: str, default 'registered'
 90        Available options: `['registered', 'explicit', 'all']`
 91        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
 92        (API or SQL connector, depends on mrsm_instance).
 93        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
 94        instead of consulting the pipes table. Useful for creating non-existent pipes.
 95        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
 96        **NOTE:** Method `'all'` is not implemented!
 97
 98    workers: Optional[int], default None
 99        If provided (and `as_tags_dict` is `True`), set the number of workers for the pool
100        to fetch tags.
101        Only takes effect if the instance connector supports multi-threading
102
103    **kw: Any:
104        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
105
106    Returns
107    -------
108    A dictionary of dictionaries and `meerschaum.Pipe` objects
109    in the connector, metric, location hierarchy.
110    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.
111    If `as_tags_dict` is `True`, return a dictionary mapping tags to pipes.
112
113    Examples
114    --------
115    ```
116    >>> ### Manual definition:
117    >>> pipes = {
118    ...     <connector_keys>: {
119    ...         <metric_key>: {
120    ...             <location_key>: Pipe(
121    ...                 <connector_keys>,
122    ...                 <metric_key>,
123    ...                 <location_key>,
124    ...             ),
125    ...         },
126    ...     },
127    ... },
128    >>> ### Accessing a single pipe:
129    >>> pipes['sql:main']['weather'][None]
130    >>> ### Return a list instead:
131    >>> get_pipes(as_list=True)
132    [Pipe('sql:main', 'weather')]
133    >>> get_pipes(as_tags_dict=True)
134    {'gvl': Pipe('sql:main', 'weather')}
135    ```
136    """
137    import json
138    from collections import defaultdict
139    from meerschaum.config import get_config
140    from meerschaum.config.static import STATIC_CONFIG
141    from meerschaum.utils.warnings import error
142    from meerschaum.utils.misc import filter_keywords, separate_negation_values
143    from meerschaum.utils.pool import get_pool
144    from meerschaum.utils.pipes import replace_pipes_syntax
145    from meerschaum.utils.debug import dprint
146    from meerschaum.utils.dtypes import value_is_null, get_current_timestamp
147    from meerschaum import Pipe
148
149    negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
150    if datetime_dtypes:
151        if isinstance(datetime_dtypes, str):
152            datetime_dtypes = [datetime_dtypes]
153        for _dt in datetime_dtypes:
154            _clean = str(_dt).lstrip(negation_prefix).lower()
155            if _clean not in ('datetime', 'int') and not value_is_null(_clean):
156                error(f"Invalid datetime dtype '{_dt}'.")
157
158    if connector_keys is None:
159        connector_keys = []
160    if metric_keys is None:
161        metric_keys = []
162    if location_keys is None:
163        location_keys = []
164    if params is None:
165        params = {}
166    if tags is None:
167        tags = []
168
169    if isinstance(connector_keys, str):
170        connector_keys = [connector_keys]
171    if isinstance(metric_keys, str):
172        metric_keys = [metric_keys]
173    if isinstance(location_keys, str):
174        location_keys = [location_keys]
175
176    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
177    if mrsm_instance is None:
178        mrsm_instance = instance
179    if mrsm_instance is None:
180        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
181    if isinstance(mrsm_instance, str):
182        from meerschaum.connectors.parse import parse_instance_keys
183        connector = parse_instance_keys(keys=mrsm_instance, debug=debug)
184    else:
185        from meerschaum.connectors import instance_types
186        valid_connector = False
187        if hasattr(mrsm_instance, 'type'):
188            if mrsm_instance.type in instance_types:
189                valid_connector = True
190        if not valid_connector:
191            error(f"Invalid instance connector: {mrsm_instance}")
192        connector = mrsm_instance
193    if debug:
194        dprint(f"Using instance connector: {connector}")
195    if not connector:
196        error(f"Could not create connector from keys: '{mrsm_instance}'")
197
198    ### Get a list of tuples for the keys needed to build pipes.
199    result = fetch_pipes_keys(
200        method,
201        connector,
202        connector_keys = connector_keys,
203        metric_keys = metric_keys,
204        location_keys = location_keys,
205        tags = tags,
206        params = params,
207        workers = workers,
208        debug = debug
209    )
210    if result is None:
211        error("Unable to build pipes!")
212    result_items: List[Tuple] = (
213        list(result.items())
214        if isinstance(result, dict)
215        else [(None, keys_tuple) for keys_tuple in result]
216    )
217
218    ### Populate the `pipes` dictionary with Pipes based on the keys
219    ### obtained from the chosen `method`.
220    in_dtypes, ex_dtypes = separate_negation_values(datetime_dtypes or [])
221    pipes: PipesDict = {}
222    for pipe_id, keys_tuple in result_items:
223        ck, mk, lk = keys_tuple[0], keys_tuple[1], keys_tuple[2]
224        pipe_tags_or_parameters = keys_tuple[3] if len(keys_tuple) == 4 else None
225        pipe_parameters = (
226            pipe_tags_or_parameters
227            if isinstance(pipe_tags_or_parameters, (dict, str))
228            else None
229        )
230        if isinstance(pipe_parameters, str):
231            pipe_parameters = json.loads(pipe_parameters)
232        pipe_tags = (
233            pipe_tags_or_parameters
234            if isinstance(pipe_tags_or_parameters, list)
235            else (
236                pipe_tags_or_parameters.get('tags', [])
237                if isinstance(pipe_tags_or_parameters, dict)
238                else None
239            )
240        )
241
242        pipe = Pipe(
243            ck, mk, lk,
244            mrsm_instance = connector,
245            parameters = pipe_parameters,
246            tags = pipe_tags,
247            debug = debug,
248            **filter_keywords(Pipe, **kw)
249        )
250        pipe.__dict__['_tags'] = pipe_tags
251        if pipe_id is not None:
252            pipe._cache_value('_id', pipe_id, memory_only=True, debug=debug)
253        if pipe_parameters is not None:
254            now = get_current_timestamp('ms', as_int=True) / 1000
255            full_attributes = {
256                'connector_keys': ck,
257                'metric_key': mk,
258                'location_key': lk,
259                'parameters': pipe_parameters,
260            }
261            if pipe_id is not None:
262                full_attributes['pipe_id'] = pipe_id
263            pipe._cache_value('attributes', full_attributes, memory_only=True, debug=debug)
264            pipe._cache_value('_attributes_sync_time', now, memory_only=True, debug=debug)
265        if datetime_dtypes:
266            if pipe_parameters is None:
267                pipe_parameters = pipe.get_parameters(debug=debug)
268            columns_val = (pipe_parameters or {}).get('columns', {}) or {}
269            if isinstance(columns_val, str) and 'Pipe(' in columns_val:
270                columns_val = replace_pipes_syntax(columns_val)
271
272            dt_col = columns_val.get('datetime', None)
273            dt_typ = (
274                ((pipe_parameters or {}).get('dtypes', None) or {}).get(dt_col, None)
275                if dt_col
276                else None
277            )
278
279            def _dtype_matches(clean_d):
280                if not dt_col:
281                    return value_is_null(clean_d)
282                return (
283                    (clean_d == 'int' and 'int' in str(dt_typ).lower())
284                    or
285                    (clean_d == 'datetime' and 'int' not in str(dt_typ).lower())
286                )
287
288            in_match = not in_dtypes or any(_dtype_matches(d) for d in in_dtypes)
289            ex_match = bool(ex_dtypes and any(_dtype_matches(d) for d in ex_dtypes))
290            keep_pipe = in_match and not ex_match
291
292            if not keep_pipe:
293                continue
294
295        if ck not in pipes:
296            pipes[ck] = {}
297
298        if mk not in pipes[ck]:
299            pipes[ck][mk] = {}
300
301
302        pipes[ck][mk][lk] = pipe
303
304    if not as_list and not as_tags_dict:
305        return pipes
306
307    from meerschaum.utils.pipes import flatten_pipes_dict
308    pipes_list = flatten_pipes_dict(pipes)
309    if as_list:
310        return pipes_list
311
312    pool = get_pool(workers=(workers if connector.IS_THREAD_SAFE else 1))
313    def gather_pipe_tags(pipe: mrsm.Pipe) -> Tuple[mrsm.Pipe, List[str]]:
314        _tags = pipe.__dict__.get('_tags', None)
315        gathered_tags = _tags if _tags is not None else pipe.tags
316        return pipe, (gathered_tags or [])
317
318    tags_pipes = defaultdict(lambda: [])
319    pipes_tags = dict(pool.map(gather_pipe_tags, pipes_list))
320    for pipe, tags in pipes_tags.items():
321        for tag in (tags or []):
322            tags_pipes[tag].append(pipe)
323
324    return dict(tags_pipes)

Return a dictionary or list of meerschaum.Pipe objects.

Parameters
  • connector_keys (Union[str, List[str], None], default None): String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
  • metric_keys (Union[str, List[str], None], default None): String or list of metric keys. See connector_keys for formatting.
  • location_keys (Union[str, List[str], None], default None): String or list of location keys. See connector_keys for formatting.
  • tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
  • datetime_dtypes (Optional[List[str]], default None): If provided, only include pipes with the corresponding datetime axis dtypes. Accepted values are datetime, int, None (or null, etc.). May be negated by _.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
  • mrsm_instance (Union[str, InstanceConnector, None], default None): Connector keys for the Meerschaum instance of the pipes. Must be a meerschaum.connectors.sql.SQLConnector.SQLConnector or meerschaum.connectors.api.APIConnector.APIConnector.
  • as_list (bool, default False): If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
  • as_tags_dict (bool, default False): If True, return a dictionary mapping tags to pipes. Pipes with multiple tags will be repeated.
  • method (str, default 'registered'): Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
  • workers (Optional[int], default None): If provided (and as_tags_dict is True), set the number of workers for the pool to fetch tags. Only takes effect if the instance connector supports multi-threading
  • **kw (Any:): Keyword arguments to pass to the meerschaum.Pipe constructor.
Returns
  • A dictionary of dictionaries and meerschaum.Pipe objects
  • in the connector, metric, location hierarchy.
  • If as_list is True, return a list of meerschaum.Pipe objects.
  • If as_tags_dict is True, return a dictionary mapping tags to pipes.
Examples
>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[Pipe('sql:main', 'weather')]
>>> get_pipes(as_tags_dict=True)
{'gvl': Pipe('sql:main', 'weather')}
def fetch_pipes_keys( method: str, connector: meerschaum.InstanceConnector, **kw: Any) -> Union[List[Tuple[str, str, str]], List[Tuple[str, str, str, Union[str, Dict[str, Any]]]]]:
327def fetch_pipes_keys(
328    method: str,
329    connector: 'mrsm.connectors.InstanceConnector',
330    **kw: Any
331) -> Union[List[Tuple[str, str, str]], List[Tuple[str, str, str, Union[str, Dict[str, Any]]]]]:
332    """
333    Fetch keys for pipes according to a method.
334
335    Parameters
336    ----------
337    method: str
338        The method by which to fetch the keys. See `get_pipes()` above.
339
340    connector: meerschaum.connectors.InstanceConnector
341        The connector to use to fetch the keys.
342        Must be of type `meerschaum.connectors.sql.SQLConnector.SQLConnector`
343        or `meerschaum.connectors.api.APIConnector.APIConnector`.
344
345    connector_keys: Optional[List[str]], default None
346        The list of `connector_keys` to filter by.
347
348    metric_keys: Optional[List[str]], default None
349        The list of `metric_keys` to filter by.
350
351    location_keys: Optional[List[str]], default None
352        The list of `location_keys` to filter by.
353
354    params: Optional[Dict[str, Any]], default None
355        A dictionary of parameters to filter by.
356
357    debug: bool
358        Verbosity toggle.
359
360    Returns
361    -------
362    A list or a dictionary (with pipe IDs as indices) of tuples of strings (or `None` for `location_key`)
363    in the form `(connector_keys, metric_key, location_key)`.
364    Optionally the parameters or tags may be returned alongside the keys.
365    Note the return value depends on the instance connector implementation.
366    
367    Examples
368    --------
369    >>> fetch_pipes_keys(metric_keys=['weather'])
370    {1: ('sql:main', 'weather', None, {'columns': {'datetime': 'ts', 'id': 'station'}})}
371    """
372    from meerschaum.utils.warnings import error
373
374    def _registered(
375        connector_keys: Optional[List[str]] = None,
376        metric_keys: Optional[List[str]] = None,
377        location_keys: Optional[List[str]] = None,
378        tags: Optional[List[str]] = None,
379        params: Optional[Dict[str, Any]] = None,
380        debug: bool = False,
381        **kw
382    ) -> List[Tuple[str, str, str]]:
383        """
384        Get keys from the pipes table or the API directly.
385        Builds query or URL based on provided keys and parameters.
386        
387        Only works for SQL and API Connectors.
388        """
389        if connector_keys is None:
390            connector_keys = []
391        if metric_keys is None:
392            metric_keys = []
393        if location_keys is None:
394            location_keys = []
395        if params is None:
396            params = {}
397        if tags is None:
398            tags = []
399
400        return connector.fetch_pipes_keys(
401            connector_keys = connector_keys,
402            metric_keys = metric_keys,
403            location_keys = location_keys,
404            tags = tags,
405            params = params,
406            debug = debug
407        )
408
409    def _explicit(
410        connector_keys: Optional[List[str]] = None,
411        metric_keys: Optional[List[str]] = None,
412        location_keys: Optional[List[str]] = None,
413        params: Optional[Dict[str, Any]] = None,
414        tags: Optional[List[str]] = None,
415        debug: bool = False,
416        **kw
417    ) -> List[Tuple[str, str, str]]:
418        """
419        Explicitly build Pipes based on provided keys.
420        Raises an error if `connector_keys` or `metric_keys` is empty,
421        and assumes `location_keys = [None]` if empty.
422        """
423
424        if connector_keys is None:
425            connector_keys = []
426        if metric_keys is None:
427            metric_keys = []
428        if location_keys is None:
429            location_keys = []
430        if params is None:
431            params = {}
432
433        if not isinstance(connector_keys, list):
434            connector_keys = [connector_keys]
435        if not isinstance(metric_keys, list):
436            metric_keys = [metric_keys]
437        if not isinstance(location_keys, list):
438            location_keys = [location_keys]
439
440        missing_keys = []
441        if len(connector_keys) == 0:
442            missing_keys.append('connector_keys')
443        if len(metric_keys) == 0:
444            missing_keys.append('metric_keys')
445        if len(location_keys) == 0:
446            location_keys.append(None)
447        if len(missing_keys) > 0:
448            error_message = "Missing parameters: '" + "', '".join(missing_keys) + "'"
449            error_message += "\nSee --help for information for passing parameters."
450            error(error_message)
451        result = []
452        for ck in connector_keys:
453            for mk in metric_keys:
454                for lk in location_keys:
455                    result.append((ck, mk, lk))
456        return result
457
458    _method_functions = {
459        'registered' : _registered,
460        'explicit'   : _explicit,
461    }
462    if method not in _method_functions:
463        error(f"Method '{method}' is not supported!", NotImplementedError)
464    return _method_functions[method](**kw)

Fetch keys for pipes according to a method.

Parameters
Returns
  • A list or a dictionary (with pipe IDs as indices) of tuples of strings (or None for location_key)
  • in the form (connector_keys, metric_key, location_key).
  • Optionally the parameters or tags may be returned alongside the keys.
  • Note the return value depends on the instance connector implementation.
Examples
>>> fetch_pipes_keys(metric_keys=['weather'])
{1: ('sql:main', 'weather', None, {'columns': {'datetime': 'ts', 'id': 'station'}})}