meerschaum.utils
The utils module contains utility functions. These include tools from primary utilities (get_pipes) to miscellaneous helper functions.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6The utils module contains utility functions. 7These include tools from primary utilities (get_pipes) 8to miscellaneous helper functions. 9""" 10 11__all__ = ( 12 'daemon', 13 'dataframe', 14 'debug', 15 'dtypes', 16 'formatting', 17 'interactive', 18 'misc', 19 'networking', 20 'packages', 21 'pool', 22 'process', 23 'prompt', 24 'schedule', 25 'sql', 26 'threading', 27 'typing', 28 'venv', 29 'warnings', 30 'yaml', 31 "get_pipes", 32 "fetch_pipes_keys", 33) 34from meerschaum.utils._get_pipes import get_pipes, fetch_pipes_keys
def
get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.InstanceConnector, NoneType] = None, instance: Union[str, meerschaum.InstanceConnector, NoneType] = None, as_list: bool = False, as_tags_dict: bool = False, method: str = 'registered', workers: Optional[int] = None, debug: bool = False, _cache_parameters: bool = True, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]], List[meerschaum.Pipe], Dict[str, meerschaum.Pipe]]:
29def get_pipes( 30 connector_keys: Union[str, List[str], None] = None, 31 metric_keys: Union[str, List[str], None] = None, 32 location_keys: Union[str, List[str], None] = None, 33 tags: Optional[List[str]] = None, 34 params: Optional[Dict[str, Any]] = None, 35 mrsm_instance: Union[str, InstanceConnector, None] = None, 36 instance: Union[str, InstanceConnector, None] = None, 37 as_list: bool = False, 38 as_tags_dict: bool = False, 39 method: str = 'registered', 40 workers: Optional[int] = None, 41 debug: bool = False, 42 _cache_parameters: bool = True, 43 **kw: Any 44) -> Union[PipesDict, List[mrsm.Pipe], Dict[str, mrsm.Pipe]]: 45 """ 46 Return a dictionary or list of `meerschaum.Pipe` objects. 47 48 Parameters 49 ---------- 50 connector_keys: Union[str, List[str], None], default None 51 String or list of connector keys. 52 If omitted or is `'*'`, fetch all possible keys. 53 If a string begins with `'_'`, select keys that do NOT match the string. 54 55 metric_keys: Union[str, List[str], None], default None 56 String or list of metric keys. See `connector_keys` for formatting. 57 58 location_keys: Union[str, List[str], None], default None 59 String or list of location keys. See `connector_keys` for formatting. 60 61 tags: Optional[List[str]], default None 62 If provided, only include pipes with these tags. 63 64 params: Optional[Dict[str, Any]], default None 65 Dictionary of additional parameters to search by. 66 Params are parsed into a SQL WHERE clause. 67 E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'` 68 69 mrsm_instance: Union[str, InstanceConnector, None], default None 70 Connector keys for the Meerschaum instance of the pipes. 71 Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or 72 `meerschaum.connectors.api.APIConnector.APIConnector`. 73 74 as_list: bool, default False 75 If `True`, return pipes in a list instead of a hierarchical dictionary. 76 `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}` 77 `True` : `[Pipe]` 78 79 as_tags_dict: bool, default False 80 If `True`, return a dictionary mapping tags to pipes. 81 Pipes with multiple tags will be repeated. 82 83 method: str, default 'registered' 84 Available options: `['registered', 'explicit', 'all']` 85 If `'registered'` (default), create pipes based on registered keys in the connector's pipes table 86 (API or SQL connector, depends on mrsm_instance). 87 If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys 88 instead of consulting the pipes table. Useful for creating non-existent pipes. 89 If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`. 90 **NOTE:** Method `'all'` is not implemented! 91 92 workers: Optional[int], default None 93 If provided (and `as_tags_dict` is `True`), set the number of workers for the pool 94 to fetch tags. 95 Only takes effect if the instance connector supports multi-threading 96 97 **kw: Any: 98 Keyword arguments to pass to the `meerschaum.Pipe` constructor. 99 100 Returns 101 ------- 102 A dictionary of dictionaries and `meerschaum.Pipe` objects 103 in the connector, metric, location hierarchy. 104 If `as_list` is `True`, return a list of `meerschaum.Pipe` objects. 105 If `as_tags_dict` is `True`, return a dictionary mapping tags to pipes. 106 107 Examples 108 -------- 109 ``` 110 >>> ### Manual definition: 111 >>> pipes = { 112 ... <connector_keys>: { 113 ... <metric_key>: { 114 ... <location_key>: Pipe( 115 ... <connector_keys>, 116 ... <metric_key>, 117 ... <location_key>, 118 ... ), 119 ... }, 120 ... }, 121 ... }, 122 >>> ### Accessing a single pipe: 123 >>> pipes['sql:main']['weather'][None] 124 >>> ### Return a list instead: 125 >>> get_pipes(as_list=True) 126 [Pipe('sql:main', 'weather')] 127 >>> get_pipes(as_tags_dict=True) 128 {'gvl': Pipe('sql:main', 'weather')} 129 ``` 130 """ 131 132 import json 133 from collections import defaultdict 134 from meerschaum.config import get_config 135 from meerschaum.utils.warnings import error 136 from meerschaum.utils.misc import filter_keywords 137 from meerschaum.utils.pool import get_pool 138 139 if connector_keys is None: 140 connector_keys = [] 141 if metric_keys is None: 142 metric_keys = [] 143 if location_keys is None: 144 location_keys = [] 145 if params is None: 146 params = {} 147 if tags is None: 148 tags = [] 149 150 if isinstance(connector_keys, str): 151 connector_keys = [connector_keys] 152 if isinstance(metric_keys, str): 153 metric_keys = [metric_keys] 154 if isinstance(location_keys, str): 155 location_keys = [location_keys] 156 157 ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`). 158 if mrsm_instance is None: 159 mrsm_instance = instance 160 if mrsm_instance is None: 161 mrsm_instance = get_config('meerschaum', 'instance', patch=True) 162 if isinstance(mrsm_instance, str): 163 from meerschaum.connectors.parse import parse_instance_keys 164 connector = parse_instance_keys(keys=mrsm_instance, debug=debug) 165 else: 166 from meerschaum.connectors import instance_types 167 valid_connector = False 168 if hasattr(mrsm_instance, 'type'): 169 if mrsm_instance.type in instance_types: 170 valid_connector = True 171 if not valid_connector: 172 error(f"Invalid instance connector: {mrsm_instance}") 173 connector = mrsm_instance 174 if debug: 175 from meerschaum.utils.debug import dprint 176 dprint(f"Using instance connector: {connector}") 177 if not connector: 178 error(f"Could not create connector from keys: '{mrsm_instance}'") 179 180 ### Get a list of tuples for the keys needed to build pipes. 181 result = fetch_pipes_keys( 182 method, 183 connector, 184 connector_keys = connector_keys, 185 metric_keys = metric_keys, 186 location_keys = location_keys, 187 tags = tags, 188 params = params, 189 workers = workers, 190 debug = debug 191 ) 192 if result is None: 193 error("Unable to build pipes!") 194 195 ### Populate the `pipes` dictionary with Pipes based on the keys 196 ### obtained from the chosen `method`. 197 from meerschaum import Pipe 198 pipes = {} 199 for keys_tuple in result: 200 ck, mk, lk = keys_tuple[0], keys_tuple[1], keys_tuple[2] 201 pipe_tags_or_parameters = keys_tuple[3] if len(keys_tuple) == 4 else None 202 pipe_parameters = ( 203 pipe_tags_or_parameters 204 if isinstance(pipe_tags_or_parameters, (dict, str)) 205 else None 206 ) 207 if isinstance(pipe_parameters, str): 208 pipe_parameters = json.loads(pipe_parameters) 209 pipe_tags = ( 210 pipe_tags_or_parameters 211 if isinstance(pipe_tags_or_parameters, list) 212 else ( 213 pipe_tags_or_parameters.get('tags', []) 214 if isinstance(pipe_tags_or_parameters, dict) 215 else None 216 ) 217 ) 218 219 if ck not in pipes: 220 pipes[ck] = {} 221 222 if mk not in pipes[ck]: 223 pipes[ck][mk] = {} 224 225 pipe = Pipe( 226 ck, mk, lk, 227 mrsm_instance = connector, 228 parameters = pipe_parameters, 229 tags = pipe_tags, 230 debug = debug, 231 **filter_keywords(Pipe, **kw) 232 ) 233 pipe.__dict__['_tags'] = pipe_tags 234 pipes[ck][mk][lk] = pipe 235 236 if not as_list and not as_tags_dict: 237 return pipes 238 239 from meerschaum.utils.misc import flatten_pipes_dict 240 pipes_list = flatten_pipes_dict(pipes) 241 if as_list: 242 return pipes_list 243 244 pool = get_pool(workers=(workers if connector.IS_THREAD_SAFE else 1)) 245 def gather_pipe_tags(pipe: mrsm.Pipe) -> Tuple[mrsm.Pipe, List[str]]: 246 _tags = pipe.__dict__.get('_tags', None) 247 gathered_tags = _tags if _tags is not None else pipe.tags 248 return pipe, (gathered_tags or []) 249 250 tags_pipes = defaultdict(lambda: []) 251 pipes_tags = dict(pool.map(gather_pipe_tags, pipes_list)) 252 for pipe, tags in pipes_tags.items(): 253 for tag in (tags or []): 254 tags_pipes[tag].append(pipe) 255 256 return dict(tags_pipes)
Return a dictionary or list of meerschaum.Pipe
objects.
Parameters
- connector_keys (Union[str, List[str], None], default None):
String or list of connector keys.
If omitted or is
'*'
, fetch all possible keys. If a string begins with'_'
, select keys that do NOT match the string. - metric_keys (Union[str, List[str], None], default None):
String or list of metric keys. See
connector_keys
for formatting. - location_keys (Union[str, List[str], None], default None):
String or list of location keys. See
connector_keys
for formatting. - tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
Params are parsed into a SQL WHERE clause.
E.g.
{'a': 1, 'b': 2}
equates to'WHERE a = 1 AND b = 2'
- mrsm_instance (Union[str, InstanceConnector, None], default None):
Connector keys for the Meerschaum instance of the pipes.
Must be a
meerschaum.connectors.sql.SQLConnector.SQLConnector
ormeerschaum.connectors.api.APIConnector.APIConnector
. - as_list (bool, default False):
If
True
, return pipes in a list instead of a hierarchical dictionary.False
:{connector_keys: {metric_key: {location_key: Pipe}}}
True
:[Pipe]
- as_tags_dict (bool, default False):
If
True
, return a dictionary mapping tags to pipes. Pipes with multiple tags will be repeated. - method (str, default 'registered'):
Available options:
['registered', 'explicit', 'all']
If'registered'
(default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If'explicit'
, create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If'all'
, create pipes from predefined metrics and locations. Requiredconnector_keys
. NOTE: Method'all'
is not implemented! - workers (Optional[int], default None):
If provided (and
as_tags_dict
isTrue
), set the number of workers for the pool to fetch tags. Only takes effect if the instance connector supports multi-threading - **kw (Any:):
Keyword arguments to pass to the
meerschaum.Pipe
constructor.
Returns
- A dictionary of dictionaries and
meerschaum.Pipe
objects - in the connector, metric, location hierarchy.
- If
as_list
isTrue
, return a list ofmeerschaum.Pipe
objects. - If
as_tags_dict
isTrue
, return a dictionary mapping tags to pipes.
Examples
>>> ### Manual definition:
>>> pipes = {
... <connector_keys>: {
... <metric_key>: {
... <location_key>: Pipe(
... <connector_keys>,
... <metric_key>,
... <location_key>,
... ),
... },
... },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[Pipe('sql:main', 'weather')]
>>> get_pipes(as_tags_dict=True)
{'gvl': Pipe('sql:main', 'weather')}
def
fetch_pipes_keys( method: str, connector: meerschaum.InstanceConnector, **kw: Any) -> List[Tuple[str, str, str]]:
259def fetch_pipes_keys( 260 method: str, 261 connector: 'mrsm.connectors.InstanceConnector', 262 **kw: Any 263) -> List[Tuple[str, str, str]]: 264 """ 265 Fetch keys for pipes according to a method. 266 267 Parameters 268 ---------- 269 method: str 270 The method by which to fetch the keys. See `get_pipes()` above. 271 272 connector: meerschaum.connectors.InstanceConnector 273 The connector to use to fetch the keys. 274 Must be of type `meerschaum.connectors.sql.SQLConnector.SQLConnector` 275 or `meerschaum.connectors.api.APIConnector.APIConnector`. 276 277 connector_keys: Optional[List[str]], default None 278 The list of `connector_keys` to filter by. 279 280 metric_keys: Optional[List[str]], default None 281 The list of `metric_keys` to filter by. 282 283 location_keys: Optional[List[str]], default None 284 The list of `location_keys` to filter by. 285 286 params: Optional[Dict[str, Any]], default None 287 A dictionary of parameters to filter by. 288 289 debug: bool 290 Verbosity toggle. 291 292 Returns 293 ------- 294 A list of tuples of strings (or `None` for `location_key`) 295 in the form `(connector_keys, metric_key, location_key)`. 296 297 Examples 298 -------- 299 >>> fetch_pipes_keys(metric_keys=['weather']) 300 [('sql:main', 'weather', None)] 301 """ 302 from meerschaum.utils.warnings import error 303 304 def _registered( 305 connector_keys: Optional[List[str]] = None, 306 metric_keys: Optional[List[str]] = None, 307 location_keys: Optional[List[str]] = None, 308 tags: Optional[List[str]] = None, 309 params: Optional[Dict[str, Any]] = None, 310 debug: bool = False, 311 **kw 312 ) -> List[Tuple[str, str, str]]: 313 """ 314 Get keys from the pipes table or the API directly. 315 Builds query or URL based on provided keys and parameters. 316 317 Only works for SQL and API Connectors. 318 """ 319 if connector_keys is None: 320 connector_keys = [] 321 if metric_keys is None: 322 metric_keys = [] 323 if location_keys is None: 324 location_keys = [] 325 if params is None: 326 params = {} 327 if tags is None: 328 tags = [] 329 330 return connector.fetch_pipes_keys( 331 connector_keys = connector_keys, 332 metric_keys = metric_keys, 333 location_keys = location_keys, 334 tags = tags, 335 params = params, 336 debug = debug 337 ) 338 339 def _explicit( 340 connector_keys: Optional[List[str]] = None, 341 metric_keys: Optional[List[str]] = None, 342 location_keys: Optional[List[str]] = None, 343 params: Optional[Dict[str, Any]] = None, 344 tags: Optional[List[str]] = None, 345 debug: bool = False, 346 **kw 347 ) -> List[Tuple[str, str, str]]: 348 """ 349 Explicitly build Pipes based on provided keys. 350 Raises an error if `connector_keys` or `metric_keys` is empty, 351 and assumes `location_keys = [None]` if empty. 352 """ 353 354 if connector_keys is None: 355 connector_keys = [] 356 if metric_keys is None: 357 metric_keys = [] 358 if location_keys is None: 359 location_keys = [] 360 if params is None: 361 params = {} 362 363 if not isinstance(connector_keys, list): 364 connector_keys = [connector_keys] 365 if not isinstance(metric_keys, list): 366 metric_keys = [metric_keys] 367 if not isinstance(location_keys, list): 368 location_keys = [location_keys] 369 370 missing_keys = [] 371 if len(connector_keys) == 0: 372 missing_keys.append('connector_keys') 373 if len(metric_keys) == 0: 374 missing_keys.append('metric_keys') 375 if len(location_keys) == 0: 376 location_keys.append(None) 377 if len(missing_keys) > 0: 378 error_message = "Missing parameters: '" + "', '".join(missing_keys) + "'" 379 error_message += "\nSee --help for information for passing parameters." 380 error(error_message) 381 result = [] 382 for ck in connector_keys: 383 for mk in metric_keys: 384 for lk in location_keys: 385 result.append((ck, mk, lk)) 386 return result 387 388 _method_functions = { 389 'registered' : _registered, 390 'explicit' : _explicit, 391 } 392 if method not in _method_functions: 393 error(f"Method '{method}' is not supported!", NotImplementedError) 394 return _method_functions[method](**kw)
Fetch keys for pipes according to a method.
Parameters
- method (str):
The method by which to fetch the keys. See
get_pipes()
above. - connector (meerschaum.connectors.InstanceConnector):
The connector to use to fetch the keys.
Must be of type
meerschaum.connectors.sql.SQLConnector.SQLConnector
ormeerschaum.connectors.api.APIConnector.APIConnector
. - connector_keys (Optional[List[str]], default None):
The list of
connector_keys
to filter by. - metric_keys (Optional[List[str]], default None):
The list of
metric_keys
to filter by. - location_keys (Optional[List[str]], default None):
The list of
location_keys
to filter by. - params (Optional[Dict[str, Any]], default None): A dictionary of parameters to filter by.
- debug (bool): Verbosity toggle.
Returns
- A list of tuples of strings (or
None
forlocation_key
) - in the form
(connector_keys, metric_key, location_key)
.
Examples
>>> fetch_pipes_keys(metric_keys=['weather'])
[('sql:main', 'weather', None)]