meerschaum.utils

The utils module contains utility functions. These include tools from primary utilities (get_pipes) to miscellaneous helper functions.

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6The utils module contains utility functions.
 7These include tools from primary utilities (get_pipes)
 8to miscellaneous helper functions.
 9"""
10
11__all__ = (
12    'daemon',
13    'dataframe',
14    'debug',
15    'dtypes',
16    'formatting',
17    'interactive',
18    'misc',
19    'networking',
20    'packages',
21    'pipes',
22    'pool',
23    'process',
24    'prompt',
25    'schedule',
26    'sql',
27    'threading',
28    'typing',
29    'venv',
30    'warnings',
31    'yaml',
32    "get_pipes",
33    "fetch_pipes_keys",
34)
35from meerschaum.utils._get_pipes import get_pipes, fetch_pipes_keys
def get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, targets: Optional[List[str]] = None, datetime_dtypes: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.InstanceConnector, NoneType] = None, instance: Union[str, meerschaum.InstanceConnector, NoneType] = None, as_list: bool = False, as_tags_dict: bool = False, as_targets_dict: bool = False, method: str = 'registered', workers: Optional[int] = None, debug: bool = False, _cache_parameters: bool = True, **kw: Any) -> Union[Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]], List[meerschaum.Pipe], Dict[str, meerschaum.Pipe]]:
 29def get_pipes(
 30    connector_keys: Union[str, List[str], None] = None,
 31    metric_keys: Union[str, List[str], None] = None,
 32    location_keys: Union[str, List[str], None] = None,
 33    tags: Optional[List[str]] = None,
 34    targets: Optional[List[str]] = None,
 35    datetime_dtypes: Optional[List[str]] = None,
 36    params: Optional[Dict[str, Any]] = None,
 37    mrsm_instance: Union[str, InstanceConnector, None] = None,
 38    instance: Union[str, InstanceConnector, None] = None,
 39    as_list: bool = False,
 40    as_tags_dict: bool = False,
 41    as_targets_dict: bool = False,
 42    method: str = 'registered',
 43    workers: Optional[int] = None,
 44    debug: bool = False,
 45    _cache_parameters: bool = True,
 46    **kw: Any
 47) -> Union[PipesDict, List[mrsm.Pipe], Dict[str, mrsm.Pipe]]:
 48    """
 49    Return a dictionary or list of `meerschaum.Pipe` objects.
 50
 51    Parameters
 52    ----------
 53    connector_keys: Union[str, List[str], None], default None
 54        String or list of connector keys.
 55        If omitted or is `'*'`, fetch all possible keys.
 56        If a string begins with `'_'`, select keys that do NOT match the string.
 57
 58    metric_keys: Union[str, List[str], None], default None
 59        String or list of metric keys. See `connector_keys` for formatting.
 60
 61    location_keys: Union[str, List[str], None], default None
 62        String or list of location keys. See `connector_keys` for formatting.
 63
 64    tags: Optional[List[str]], default None
 65        If provided, only include pipes with these tags.
 66
 67    datetime_dtypes: Optional[List[str]], default None
 68        If provided, only include pipes with the corresponding `datetime` axis dtypes.
 69        Accepted values are `datetime`, `int`, `None` (or `null`, etc.).
 70        May be negated by `_`.
 71
 72    params: Optional[Dict[str, Any]], default None
 73        Dictionary of additional parameters to search by.
 74        Params are parsed into a SQL WHERE clause.
 75        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`
 76
 77    mrsm_instance: Union[str, InstanceConnector, None], default None
 78        Connector keys for the Meerschaum instance of the pipes.
 79        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
 80        `meerschaum.connectors.api.APIConnector.APIConnector`.
 81        
 82    as_list: bool, default False
 83        If `True`, return pipes in a list instead of a hierarchical dictionary.
 84        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
 85        `True`  : `[Pipe]`
 86
 87    as_tags_dict: bool, default False
 88        If `True`, return a dictionary mapping tags to pipes.
 89        Pipes with multiple tags will be repeated.
 90
 91    as_targets_dict: bool, default False
 92        If `True`, return a dictionary mapping `(schema, target)` tuples to pipes.
 93        Pipes sharing the same target across different schemata are grouped separately.
 94
 95    method: str, default 'registered'
 96        Available options: `['registered', 'explicit', 'all']`
 97        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
 98        (API or SQL connector, depends on mrsm_instance).
 99        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
100        instead of consulting the pipes table. Useful for creating non-existent pipes.
101        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
102        **NOTE:** Method `'all'` is not implemented!
103
104    workers: Optional[int], default None
105        If provided (and `as_tags_dict` or `as_targets_dict` is `True`), set the number of workers
106        for the pool to fetch tags or targets.
107        Only takes effect if the instance connector supports multi-threading.
108
109    **kw: Any:
110        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
111
112    Returns
113    -------
114    A dictionary of dictionaries and `meerschaum.Pipe` objects
115    in the connector, metric, location hierarchy.
116    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.
117    If `as_tags_dict` is `True`, return a dictionary mapping tags to pipes.
118    If `as_targets_dict` is `True`, return a dictionary mapping targets to pipes.
119
120    Examples
121    --------
122    ```
123    >>> ### Manual definition:
124    >>> pipes = {
125    ...     <connector_keys>: {
126    ...         <metric_key>: {
127    ...             <location_key>: Pipe(
128    ...                 <connector_keys>,
129    ...                 <metric_key>,
130    ...                 <location_key>,
131    ...             ),
132    ...         },
133    ...     },
134    ... },
135    >>> ### Accessing a single pipe:
136    >>> pipes['sql:main']['weather'][None]
137    >>> ### Return a list instead:
138    >>> get_pipes(as_list=True)
139    [Pipe('sql:main', 'weather')]
140    >>> get_pipes(as_tags_dict=True)
141    {'gvl': Pipe('sql:main', 'weather')}
142    ```
143    """
144    import json
145    from collections import defaultdict
146    from meerschaum.config import get_config
147    from meerschaum.config.static import STATIC_CONFIG
148    from meerschaum.utils.warnings import error
149    from meerschaum.utils.misc import filter_keywords, separate_negation_values
150    from meerschaum.utils.pool import get_pool
151    from meerschaum.utils.pipes import replace_pipes_syntax
152    from meerschaum.utils.debug import dprint
153    from meerschaum.utils.dtypes import value_is_null, get_current_timestamp
154    from meerschaum import Pipe
155
156    negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
157    if datetime_dtypes:
158        if isinstance(datetime_dtypes, str):
159            datetime_dtypes = [datetime_dtypes]
160        for _dt in datetime_dtypes:
161            _clean = str(_dt).lstrip(negation_prefix).lower()
162            if _clean not in ('datetime', 'int') and not value_is_null(_clean):
163                error(f"Invalid datetime dtype '{_dt}'.")
164
165    if connector_keys is None:
166        connector_keys = []
167    if metric_keys is None:
168        metric_keys = []
169    if location_keys is None:
170        location_keys = []
171    if params is None:
172        params = {}
173    if tags is None:
174        tags = []
175
176    if isinstance(connector_keys, str):
177        connector_keys = [connector_keys]
178    if isinstance(metric_keys, str):
179        metric_keys = [metric_keys]
180    if isinstance(location_keys, str):
181        location_keys = [location_keys]
182
183    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
184    if mrsm_instance is None:
185        mrsm_instance = instance
186    if mrsm_instance is None:
187        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
188    if isinstance(mrsm_instance, str):
189        from meerschaum.connectors.parse import parse_instance_keys
190        connector = parse_instance_keys(keys=mrsm_instance, debug=debug)
191    else:
192        from meerschaum.connectors import instance_types
193        valid_connector = False
194        if hasattr(mrsm_instance, 'type'):
195            if mrsm_instance.type in instance_types:
196                valid_connector = True
197        if not valid_connector:
198            error(f"Invalid instance connector: {mrsm_instance}")
199        connector = mrsm_instance
200    if debug:
201        dprint(f"Using instance connector: {connector}")
202    if not connector:
203        error(f"Could not create connector from keys: '{mrsm_instance}'")
204
205    ### Get a list of tuples for the keys needed to build pipes.
206    result = fetch_pipes_keys(
207        method,
208        connector,
209        connector_keys = connector_keys,
210        metric_keys = metric_keys,
211        location_keys = location_keys,
212        tags = tags,
213        params = params,
214        workers = workers,
215        debug = debug
216    )
217    if result is None:
218        error("Unable to build pipes!")
219    result_items: List[Tuple] = (
220        list(result.items())
221        if isinstance(result, dict)
222        else [(None, keys_tuple) for keys_tuple in result]
223    )
224
225    ### Populate the `pipes` dictionary with Pipes based on the keys
226    ### obtained from the chosen `method`.
227    in_dtypes, ex_dtypes = separate_negation_values(datetime_dtypes or [])
228    in_targets, ex_targets = separate_negation_values(targets or [])
229    pipes: PipesDict = {}
230    targets_pipes: Dict[Tuple[Optional[str], str], List[mrsm.Pipe]] = defaultdict(lambda: [])
231    connector_schema = getattr(connector, 'schema', None)
232    connector_is_sql = getattr(connector, 'type', None) == 'sql'
233    connector_flavor = getattr(connector, 'flavor', None)
234    for pipe_id, keys_tuple in result_items:
235        ck, mk, lk = keys_tuple[0], keys_tuple[1], keys_tuple[2]
236        pipe_tags_or_parameters = keys_tuple[3] if len(keys_tuple) == 4 else None
237        pipe_parameters = (
238            pipe_tags_or_parameters
239            if isinstance(pipe_tags_or_parameters, (dict, str))
240            else None
241        )
242        if isinstance(pipe_parameters, str):
243            pipe_parameters = json.loads(pipe_parameters)
244        pipe_tags = (
245            pipe_tags_or_parameters
246            if isinstance(pipe_tags_or_parameters, list)
247            else (
248                pipe_tags_or_parameters.get('tags', [])
249                if isinstance(pipe_tags_or_parameters, dict)
250                else None
251            )
252        )
253
254        pipe = Pipe(
255            ck, mk, lk,
256            mrsm_instance = connector,
257            parameters = pipe_parameters,
258            tags = pipe_tags,
259            debug = debug,
260            **filter_keywords(Pipe, **kw)
261        )
262        pipe.__dict__['_tags'] = pipe_tags
263        if pipe_id is not None:
264            pipe._cache_value('_id', pipe_id, memory_only=True, debug=debug)
265        if pipe_parameters is not None:
266            now = get_current_timestamp('ms', as_int=True) / 1000
267            full_attributes = {
268                'connector_keys': ck,
269                'metric_key': mk,
270                'location_key': lk,
271                'parameters': pipe_parameters,
272            }
273            if pipe_id is not None:
274                full_attributes['pipe_id'] = pipe_id
275            pipe._cache_value('attributes', full_attributes, memory_only=True, debug=debug)
276            pipe._cache_value('_attributes_sync_time', now, memory_only=True, debug=debug)
277
278        if datetime_dtypes or targets:
279            parameters_str = str(pipe_parameters)
280            if pipe_parameters is None or 'MRSM{' in parameters_str or 'Pipe(' in parameters_str:
281                pipe_parameters = pipe.get_parameters(debug=debug)
282
283        keep_pipe = True
284
285        if datetime_dtypes:
286            columns_val = (pipe_parameters or {}).get('columns', {}) or {}
287            dt_col = columns_val.get('datetime', None)
288            pipe_dtypes = (
289                ((pipe_parameters or {}).get('dtypes', None) or {})
290                if dt_col
291                else None
292            )
293            dt_typ = pipe_dtypes.get(dt_col, None) if dt_col else None
294
295            def _dtype_matches(clean_d):
296                if not dt_col:
297                    return value_is_null(clean_d)
298                return (
299                    (clean_d == 'int' and 'int' in str(dt_typ).lower())
300                    or
301                    (clean_d == 'datetime' and 'int' not in str(dt_typ).lower())
302                )
303
304            in_match = not in_dtypes or any(_dtype_matches(d) for d in in_dtypes)
305            ex_match = bool(ex_dtypes and any(_dtype_matches(d) for d in ex_dtypes))
306            keep_pipe = keep_pipe and in_match and not ex_match
307            if not keep_pipe:
308                continue
309
310        if targets:
311            pipe_target = pipe.target
312            in_target_match = not in_targets or any(t == pipe_target for t in in_targets)
313            ex_target_match = bool(ex_targets and any(t == pipe_target for t in ex_targets))
314            keep_pipe = keep_pipe and in_target_match and not ex_target_match
315            if not keep_pipe:
316                continue
317
318        if ck not in pipes:
319            pipes[ck] = {}
320
321        if mk not in pipes[ck]:
322            pipes[ck][mk] = {}
323
324
325        pipes[ck][mk][lk] = pipe
326
327        if as_targets_dict:
328            raw_params = pipe_parameters if isinstance(pipe_parameters, dict) else {}
329            schema = raw_params.get('schema') or connector_schema
330            explicit_target = (
331                raw_params.get('target')
332                or raw_params.get('target_name')
333                or raw_params.get('target_table')
334                or raw_params.get('target_table_name')
335            )
336            if explicit_target:
337                target_name = (
338                    replace_pipes_syntax(explicit_target, _pipe=pipe)
339                    if isinstance(explicit_target, str) and '{{' in explicit_target
340                    else explicit_target
341                )
342            else:
343                target_name = pipe._target_legacy()
344                if connector_is_sql and connector_flavor:
345                    from meerschaum.utils.sql import truncate_item_name
346                    target_name = truncate_item_name(target_name, connector_flavor)
347            targets_pipes[(schema, target_name)].append(pipe)
348
349    if not as_list and not as_tags_dict and not as_targets_dict:
350        return pipes
351
352    from meerschaum.utils.pipes import flatten_pipes_dict
353    pipes_list = flatten_pipes_dict(pipes)
354    if as_list:
355        return pipes_list
356
357    pool = get_pool(workers=(workers if connector.IS_THREAD_SAFE else 1))
358
359    def gather_pipe_tags(pipe: mrsm.Pipe) -> Tuple[mrsm.Pipe, List[str]]:
360        _tags = pipe.__dict__.get('_tags', None)
361        gathered_tags = _tags if _tags is not None else pipe.tags
362        return pipe, (gathered_tags or [])
363
364    if as_tags_dict:
365        tags_pipes = defaultdict(lambda: [])
366        pipes_tags = dict(pool.map(gather_pipe_tags, pipes_list))
367        for pipe, tags in pipes_tags.items():
368            for tag in (tags or []):
369                tags_pipes[tag].append(pipe)
370
371        return dict(tags_pipes)
372
373    if as_targets_dict:
374        return dict(targets_pipes)
375
376    raise NotImplementedError("No futher options for returning pipes.")

Return a dictionary or list of meerschaum.Pipe objects.

Parameters
  • connector_keys (Union[str, List[str], None], default None): String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
  • metric_keys (Union[str, List[str], None], default None): String or list of metric keys. See connector_keys for formatting.
  • location_keys (Union[str, List[str], None], default None): String or list of location keys. See connector_keys for formatting.
  • tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
  • datetime_dtypes (Optional[List[str]], default None): If provided, only include pipes with the corresponding datetime axis dtypes. Accepted values are datetime, int, None (or null, etc.). May be negated by _.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
  • mrsm_instance (Union[str, InstanceConnector, None], default None): Connector keys for the Meerschaum instance of the pipes. Must be a meerschaum.connectors.sql.SQLConnector.SQLConnector or meerschaum.connectors.api.APIConnector.APIConnector.
  • as_list (bool, default False): If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
  • as_tags_dict (bool, default False): If True, return a dictionary mapping tags to pipes. Pipes with multiple tags will be repeated.
  • as_targets_dict (bool, default False): If True, return a dictionary mapping (schema, target) tuples to pipes. Pipes sharing the same target across different schemata are grouped separately.
  • method (str, default 'registered'): Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
  • workers (Optional[int], default None): If provided (and as_tags_dict or as_targets_dict is True), set the number of workers for the pool to fetch tags or targets. Only takes effect if the instance connector supports multi-threading.
  • **kw (Any:): Keyword arguments to pass to the meerschaum.Pipe constructor.
Returns
  • A dictionary of dictionaries and meerschaum.Pipe objects
  • in the connector, metric, location hierarchy.
  • If as_list is True, return a list of meerschaum.Pipe objects.
  • If as_tags_dict is True, return a dictionary mapping tags to pipes.
  • If as_targets_dict is True, return a dictionary mapping targets to pipes.
Examples
>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[Pipe('sql:main', 'weather')]
>>> get_pipes(as_tags_dict=True)
{'gvl': Pipe('sql:main', 'weather')}
def fetch_pipes_keys( method: str, connector: meerschaum.InstanceConnector, **kw: Any) -> Union[List[Tuple[str, str, str]], List[Tuple[str, str, str, Union[str, Dict[str, Any]]]]]:
379def fetch_pipes_keys(
380    method: str,
381    connector: 'mrsm.connectors.InstanceConnector',
382    **kw: Any
383) -> Union[List[Tuple[str, str, str]], List[Tuple[str, str, str, Union[str, Dict[str, Any]]]]]:
384    """
385    Fetch keys for pipes according to a method.
386
387    Parameters
388    ----------
389    method: str
390        The method by which to fetch the keys. See `get_pipes()` above.
391
392    connector: meerschaum.connectors.InstanceConnector
393        The connector to use to fetch the keys.
394        Must be of type `meerschaum.connectors.sql.SQLConnector.SQLConnector`
395        or `meerschaum.connectors.api.APIConnector.APIConnector`.
396
397    connector_keys: Optional[List[str]], default None
398        The list of `connector_keys` to filter by.
399
400    metric_keys: Optional[List[str]], default None
401        The list of `metric_keys` to filter by.
402
403    location_keys: Optional[List[str]], default None
404        The list of `location_keys` to filter by.
405
406    params: Optional[Dict[str, Any]], default None
407        A dictionary of parameters to filter by.
408
409    debug: bool
410        Verbosity toggle.
411
412    Returns
413    -------
414    A list or a dictionary (with pipe IDs as indices) of tuples of strings (or `None` for `location_key`)
415    in the form `(connector_keys, metric_key, location_key)`.
416    Optionally the parameters or tags may be returned alongside the keys.
417    Note the return value depends on the instance connector implementation.
418    
419    Examples
420    --------
421    >>> fetch_pipes_keys(metric_keys=['weather'])
422    {1: ('sql:main', 'weather', None, {'columns': {'datetime': 'ts', 'id': 'station'}})}
423    """
424    from meerschaum.utils.warnings import error
425
426    def _registered(
427        connector_keys: Optional[List[str]] = None,
428        metric_keys: Optional[List[str]] = None,
429        location_keys: Optional[List[str]] = None,
430        tags: Optional[List[str]] = None,
431        params: Optional[Dict[str, Any]] = None,
432        debug: bool = False,
433        **kw
434    ) -> List[Tuple[str, str, str]]:
435        """
436        Get keys from the pipes table or the API directly.
437        Builds query or URL based on provided keys and parameters.
438        
439        Only works for SQL and API Connectors.
440        """
441        if connector_keys is None:
442            connector_keys = []
443        if metric_keys is None:
444            metric_keys = []
445        if location_keys is None:
446            location_keys = []
447        if params is None:
448            params = {}
449        if tags is None:
450            tags = []
451
452        return connector.fetch_pipes_keys(
453            connector_keys = connector_keys,
454            metric_keys = metric_keys,
455            location_keys = location_keys,
456            tags = tags,
457            params = params,
458            debug = debug
459        )
460
461    def _explicit(
462        connector_keys: Optional[List[str]] = None,
463        metric_keys: Optional[List[str]] = None,
464        location_keys: Optional[List[str]] = None,
465        params: Optional[Dict[str, Any]] = None,
466        tags: Optional[List[str]] = None,
467        debug: bool = False,
468        **kw
469    ) -> List[Tuple[str, str, str]]:
470        """
471        Explicitly build Pipes based on provided keys.
472        Raises an error if `connector_keys` or `metric_keys` is empty,
473        and assumes `location_keys = [None]` if empty.
474        """
475
476        if connector_keys is None:
477            connector_keys = []
478        if metric_keys is None:
479            metric_keys = []
480        if location_keys is None:
481            location_keys = []
482        if params is None:
483            params = {}
484
485        if not isinstance(connector_keys, list):
486            connector_keys = [connector_keys]
487        if not isinstance(metric_keys, list):
488            metric_keys = [metric_keys]
489        if not isinstance(location_keys, list):
490            location_keys = [location_keys]
491
492        missing_keys = []
493        if len(connector_keys) == 0:
494            missing_keys.append('connector_keys')
495        if len(metric_keys) == 0:
496            missing_keys.append('metric_keys')
497        if len(location_keys) == 0:
498            location_keys.append(None)
499        if len(missing_keys) > 0:
500            error_message = "Missing parameters: '" + "', '".join(missing_keys) + "'"
501            error_message += "\nSee --help for information for passing parameters."
502            error(error_message)
503        result = []
504        for ck in connector_keys:
505            for mk in metric_keys:
506                for lk in location_keys:
507                    result.append((ck, mk, lk))
508        return result
509
510    _method_functions = {
511        'registered' : _registered,
512        'explicit'   : _explicit,
513    }
514    if method not in _method_functions:
515        error(f"Method '{method}' is not supported!", NotImplementedError)
516    return _method_functions[method](**kw)

Fetch keys for pipes according to a method.

Parameters
Returns
  • A list or a dictionary (with pipe IDs as indices) of tuples of strings (or None for location_key)
  • in the form (connector_keys, metric_key, location_key).
  • Optionally the parameters or tags may be returned alongside the keys.
  • Note the return value depends on the instance connector implementation.
Examples
>>> fetch_pipes_keys(metric_keys=['weather'])
{1: ('sql:main', 'weather', None, {'columns': {'datetime': 'ts', 'id': 'station'}})}