meerschaum.utils

The utils module contains utility functions. These include tools from primary utilities (get_pipes) to miscellaneous helper functions.

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6The utils module contains utility functions.
 7These include tools from primary utilities (get_pipes)
 8to miscellaneous helper functions.
 9"""
10
11__all__ = (
12    'daemon',
13    'dataframe',
14    'debug',
15    'dtypes',
16    'formatting',
17    'interactive',
18    'misc',
19    'networking',
20    'packages',
21    'pool',
22    'process',
23    'prompt',
24    'schedule',
25    'sql',
26    'threading',
27    'typing',
28    'venv',
29    'warnings',
30    'yaml',
31    "get_pipes",
32    "fetch_pipes_keys",
33)
34from meerschaum.utils._get_pipes import get_pipes, fetch_pipes_keys
def get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, as_list: bool = False, method: str = 'registered', debug: bool = False, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]], List[meerschaum.Pipe]]:
 19def get_pipes(
 20    connector_keys: Union[str, List[str], None] = None,
 21    metric_keys: Union[str, List[str], None] = None,
 22    location_keys: Union[str, List[str], None] = None,
 23    tags: Optional[List[str]] = None,
 24    params: Optional[Dict[str, Any]] = None,
 25    mrsm_instance: Union[str, InstanceConnector, None] = None,
 26    instance: Union[str, InstanceConnector, None] = None,
 27    as_list: bool = False,
 28    method: str = 'registered',
 29    debug: bool = False,
 30    **kw: Any
 31) -> Union[PipesDict, List[mrsm.Pipe]]:
 32    """
 33    Return a dictionary or list of `meerschaum.Pipe` objects.
 34
 35    Parameters
 36    ----------
 37    connector_keys: Union[str, List[str], None], default None
 38        String or list of connector keys.
 39        If omitted or is `'*'`, fetch all possible keys.
 40        If a string begins with `'_'`, select keys that do NOT match the string.
 41
 42    metric_keys: Union[str, List[str], None], default None
 43        String or list of metric keys. See `connector_keys` for formatting.
 44
 45    location_keys: Union[str, List[str], None], default None
 46        String or list of location keys. See `connector_keys` for formatting.
 47
 48    tags: Optional[List[str]], default None
 49         If provided, only include pipes with these tags.
 50
 51    params: Optional[Dict[str, Any]], default None
 52        Dictionary of additional parameters to search by.
 53        Params are parsed into a SQL WHERE clause.
 54        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`
 55
 56    mrsm_instance: Union[str, InstanceConnector, None], default None
 57        Connector keys for the Meerschaum instance of the pipes.
 58        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
 59        `meerschaum.connectors.api.APIConnector.APIConnector`.
 60        
 61    as_list: bool, default False
 62        If `True`, return pipes in a list instead of a hierarchical dictionary.
 63        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
 64        `True`  : `[Pipe]`
 65
 66    method: str, default 'registered'
 67        Available options: `['registered', 'explicit', 'all']`
 68        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
 69        (API or SQL connector, depends on mrsm_instance).
 70        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
 71        instead of consulting the pipes table. Useful for creating non-existent pipes.
 72        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
 73        **NOTE:** Method `'all'` is not implemented!
 74
 75    **kw: Any:
 76        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
 77        
 78
 79    Returns
 80    -------
 81    A dictionary of dictionaries and `meerschaum.Pipe` objects
 82    in the connector, metric, location hierarchy.
 83    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.
 84
 85    Examples
 86    --------
 87    ```
 88    >>> ### Manual definition:
 89    >>> pipes = {
 90    ...     <connector_keys>: {
 91    ...         <metric_key>: {
 92    ...             <location_key>: Pipe(
 93    ...                 <connector_keys>,
 94    ...                 <metric_key>,
 95    ...                 <location_key>,
 96    ...             ),
 97    ...         },
 98    ...     },
 99    ... },
100    >>> ### Accessing a single pipe:
101    >>> pipes['sql:main']['weather'][None]
102    >>> ### Return a list instead:
103    >>> get_pipes(as_list=True)
104    [sql_main_weather]
105    >>> 
106    ```
107    """
108
109    from meerschaum.config import get_config
110    from meerschaum.utils.warnings import error
111    from meerschaum.utils.misc import filter_keywords
112
113    if connector_keys is None:
114        connector_keys = []
115    if metric_keys is None:
116        metric_keys = []
117    if location_keys is None:
118        location_keys = []
119    if params is None:
120        params = {}
121    if tags is None:
122        tags = []
123
124    if isinstance(connector_keys, str):
125        connector_keys = [connector_keys]
126    if isinstance(metric_keys, str):
127        metric_keys = [metric_keys]
128    if isinstance(location_keys, str):
129        location_keys = [location_keys]
130
131    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
132    if mrsm_instance is None:
133        mrsm_instance = instance
134    if mrsm_instance is None:
135        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
136    if isinstance(mrsm_instance, str):
137        from meerschaum.connectors.parse import parse_instance_keys
138        connector = parse_instance_keys(keys=mrsm_instance, debug=debug)
139    else:
140        from meerschaum.connectors import instance_types
141        valid_connector = False
142        if hasattr(mrsm_instance, 'type'):
143            if mrsm_instance.type in instance_types:
144                valid_connector = True
145        if not valid_connector:
146            error(f"Invalid instance connector: {mrsm_instance}")
147        connector = mrsm_instance
148    if debug:
149        from meerschaum.utils.debug import dprint
150        dprint(f"Using instance connector: {connector}")
151    if not connector:
152        error(f"Could not create connector from keys: '{mrsm_instance}'")
153
154    ### Get a list of tuples for the keys needed to build pipes.
155    result = fetch_pipes_keys(
156        method,
157        connector,
158        connector_keys = connector_keys,
159        metric_keys = metric_keys,
160        location_keys = location_keys,
161        tags = tags,
162        params = params,
163        debug = debug
164    )
165    if result is None:
166        error(f"Unable to build pipes!")
167
168    ### Populate the `pipes` dictionary with Pipes based on the keys
169    ### obtained from the chosen `method`.
170    from meerschaum import Pipe
171    pipes = {}
172    for ck, mk, lk in result:
173        if ck not in pipes:
174            pipes[ck] = {}
175
176        if mk not in pipes[ck]:
177            pipes[ck][mk] = {}
178
179        pipes[ck][mk][lk] = Pipe(
180            ck, mk, lk,
181            mrsm_instance = connector,
182            debug = debug,
183            **filter_keywords(Pipe, **kw)
184        )
185
186    if not as_list:
187        return pipes
188    from meerschaum.utils.misc import flatten_pipes_dict
189    return flatten_pipes_dict(pipes)

Return a dictionary or list of meerschaum.Pipe objects.

Parameters
  • connector_keys (Union[str, List[str], None], default None): String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
  • metric_keys (Union[str, List[str], None], default None): String or list of metric keys. See connector_keys for formatting.
  • location_keys (Union[str, List[str], None], default None): String or list of location keys. See connector_keys for formatting.
  • tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
  • mrsm_instance (Union[str, InstanceConnector, None], default None): Connector keys for the Meerschaum instance of the pipes. Must be a meerschaum.connectors.sql.SQLConnector.SQLConnector or meerschaum.connectors.api.APIConnector.APIConnector.
  • as_list (bool, default False): If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
  • method (str, default 'registered'): Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
  • **kw (Any:): Keyword arguments to pass to the meerschaum.Pipe constructor.
Returns
  • A dictionary of dictionaries and meerschaum.Pipe objects
  • in the connector, metric, location hierarchy.
  • If as_list is True, return a list of meerschaum.Pipe objects.
Examples
>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>> 
def fetch_pipes_keys( method: str, connector: meerschaum.Connector, **kw: Any) -> List[Tuple[str, str, str]]:
192def fetch_pipes_keys(
193        method: str,
194        connector: 'meerschaum.connectors.Connector',
195        **kw: Any
196    ) -> List[Tuple[str, str, str]]:
197    """
198    Fetch keys for pipes according to a method.
199
200    Parameters
201    ----------
202    method: str
203        The method by which to fetch the keys. See `get_pipes()` above.
204
205    connector: meerschaum.connectors.Connector
206        The connector to use to fetch the keys.
207        Must be of type `meerschaum.connectors.sql.SQLConnector.SQLConnector`
208        or `meerschaum.connectors.api.APIConnector.APIConnector`.
209
210    connector_keys: Optional[List[str]], default None
211        The list of `connector_keys` to filter by.
212
213    metric_keys: Optional[List[str]], default None
214        The list of `metric_keys` to filter by.
215
216    location_keys: Optional[List[str]], default None
217        The list of `location_keys` to filter by.
218
219    params: Optional[Dict[str, Any]], default None
220        A dictionary of parameters to filter by.
221
222    debug: bool
223        Verbosity toggle.
224
225    Returns
226    -------
227    A list of tuples of strings (or `None` for `location_key`)
228    in the form `(connector_keys, metric_key, location_key)`.
229    
230    Examples
231    --------
232    >>> fetch_pipes_keys(metric_keys=['weather'])
233    [('sql:main', 'weather', None)]
234    """
235    from meerschaum.utils.warnings import error
236
237    def _registered(
238            connector_keys: Optional[List[str]] = None,
239            metric_keys: Optional[List[str]] = None,
240            location_keys: Optional[List[str]] = None,
241            tags: Optional[List[str]] = None,
242            params: Optional[Dict[str, Any]] = None,
243            debug: bool = False,
244            **kw
245        ) -> List[Tuple[str, str, str]]:
246        """
247        Get keys from the pipes table or the API directly.
248        Builds query or URL based on provided keys and parameters.
249        
250        Only works for SQL and API Connectors.
251        """
252        if connector_keys is None:
253            connector_keys = []
254        if metric_keys is None:
255            metric_keys = []
256        if location_keys is None:
257            location_keys = []
258        if params is None:
259            params = {}
260        if tags is None:
261            tags = []
262
263        return connector.fetch_pipes_keys(
264            connector_keys = connector_keys,
265            metric_keys = metric_keys,
266            location_keys = location_keys,
267            tags = tags,
268            params = params,
269            debug = debug
270        )
271
272    def _explicit(
273            connector_keys: Optional[List[str]] = None,
274            metric_keys: Optional[List[str]] = None,
275            location_keys: Optional[List[str]] = None,
276            params: Optional[Dict[str, Any]] = None,
277            tags: Optional[List[str]] = None,
278            debug: bool = False,
279            **kw
280        ) -> List[Tuple[str, str, str]]:
281        """
282        Explicitly build Pipes based on provided keys.
283        Raises an error if `connector_keys` or `metric_keys` is empty,
284        and assumes `location_keys = [None]` if empty.
285        """
286
287        if connector_keys is None:
288            connector_keys = []
289        if metric_keys is None:
290            metric_keys = []
291        if location_keys is None:
292            location_keys = []
293        if params is None:
294            params = {}
295
296        if not isinstance(connector_keys, list):
297            connector_keys = [connector_keys]
298        if not isinstance(metric_keys, list):
299            metric_keys = [metric_keys]
300        if not isinstance(location_keys, list):
301            location_keys = [location_keys]
302
303        missing_keys = []
304        if len(connector_keys) == 0:
305            missing_keys.append('connector_keys')
306        if len(metric_keys) == 0:
307            missing_keys.append('metric_keys')
308        if len(location_keys) == 0:
309            location_keys.append(None)
310        if len(missing_keys) > 0:
311            error_message = "Missing parameters: '" + "', '".join(missing_keys) + "'"
312            error_message += "\nSee --help for information for passing parameters."
313            error(error_message)
314        result = []
315        for ck in connector_keys:
316            for mk in metric_keys:
317                for lk in location_keys:
318                    result.append((ck, mk, lk))
319        return result
320
321    _method_functions = {
322        'registered' : _registered,
323        'explicit'   : _explicit,
324    }
325    if method not in _method_functions:
326        error(f"Method '{method}' is not supported!", NotImplementedError)
327    return _method_functions[method](**kw)

Fetch keys for pipes according to a method.

Parameters
Returns
  • A list of tuples of strings (or None for location_key)
  • in the form (connector_keys, metric_key, location_key).
Examples
>>> fetch_pipes_keys(metric_keys=['weather'])
[('sql:main', 'weather', None)]