meerschaum.utils
The utils module contains utility functions. These include tools from primary utilities (get_pipes) to miscellaneous helper functions.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6The utils module contains utility functions. 7These include tools from primary utilities (get_pipes) 8to miscellaneous helper functions. 9""" 10 11__all__ = ( 12 'daemon', 13 'dataframe', 14 'debug', 15 'dtypes', 16 'formatting', 17 'interactive', 18 'misc', 19 'networking', 20 'packages', 21 'pipes', 22 'pool', 23 'process', 24 'prompt', 25 'schedule', 26 'sql', 27 'threading', 28 'typing', 29 'venv', 30 'warnings', 31 'yaml', 32 "get_pipes", 33 "fetch_pipes_keys", 34) 35from meerschaum.utils._get_pipes import get_pipes, fetch_pipes_keys
def
get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, datetime_dtypes: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.InstanceConnector, NoneType] = None, instance: Union[str, meerschaum.InstanceConnector, NoneType] = None, as_list: bool = False, as_tags_dict: bool = False, method: str = 'registered', workers: Optional[int] = None, debug: bool = False, _cache_parameters: bool = True, **kw: Any) -> Union[Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]], List[meerschaum.Pipe], Dict[str, meerschaum.Pipe]]:
29def get_pipes( 30 connector_keys: Union[str, List[str], None] = None, 31 metric_keys: Union[str, List[str], None] = None, 32 location_keys: Union[str, List[str], None] = None, 33 tags: Optional[List[str]] = None, 34 datetime_dtypes: Optional[List[str]] = None, 35 params: Optional[Dict[str, Any]] = None, 36 mrsm_instance: Union[str, InstanceConnector, None] = None, 37 instance: Union[str, InstanceConnector, None] = None, 38 as_list: bool = False, 39 as_tags_dict: bool = False, 40 method: str = 'registered', 41 workers: Optional[int] = None, 42 debug: bool = False, 43 _cache_parameters: bool = True, 44 **kw: Any 45) -> Union[PipesDict, List[mrsm.Pipe], Dict[str, mrsm.Pipe]]: 46 """ 47 Return a dictionary or list of `meerschaum.Pipe` objects. 48 49 Parameters 50 ---------- 51 connector_keys: Union[str, List[str], None], default None 52 String or list of connector keys. 53 If omitted or is `'*'`, fetch all possible keys. 54 If a string begins with `'_'`, select keys that do NOT match the string. 55 56 metric_keys: Union[str, List[str], None], default None 57 String or list of metric keys. See `connector_keys` for formatting. 58 59 location_keys: Union[str, List[str], None], default None 60 String or list of location keys. See `connector_keys` for formatting. 61 62 tags: Optional[List[str]], default None 63 If provided, only include pipes with these tags. 64 65 datetime_dtypes: Optional[List[str]], default None 66 If provided, only include pipes with the corresponding `datetime` axis dtypes. 67 Accepted values are `datetime`, `int`, `None` (or `null`, etc.). 68 May be negated by `_`. 69 70 params: Optional[Dict[str, Any]], default None 71 Dictionary of additional parameters to search by. 72 Params are parsed into a SQL WHERE clause. 73 E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'` 74 75 mrsm_instance: Union[str, InstanceConnector, None], default None 76 Connector keys for the Meerschaum instance of the pipes. 77 Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or 78 `meerschaum.connectors.api.APIConnector.APIConnector`. 79 80 as_list: bool, default False 81 If `True`, return pipes in a list instead of a hierarchical dictionary. 82 `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}` 83 `True` : `[Pipe]` 84 85 as_tags_dict: bool, default False 86 If `True`, return a dictionary mapping tags to pipes. 87 Pipes with multiple tags will be repeated. 88 89 method: str, default 'registered' 90 Available options: `['registered', 'explicit', 'all']` 91 If `'registered'` (default), create pipes based on registered keys in the connector's pipes table 92 (API or SQL connector, depends on mrsm_instance). 93 If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys 94 instead of consulting the pipes table. Useful for creating non-existent pipes. 95 If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`. 96 **NOTE:** Method `'all'` is not implemented! 97 98 workers: Optional[int], default None 99 If provided (and `as_tags_dict` is `True`), set the number of workers for the pool 100 to fetch tags. 101 Only takes effect if the instance connector supports multi-threading 102 103 **kw: Any: 104 Keyword arguments to pass to the `meerschaum.Pipe` constructor. 105 106 Returns 107 ------- 108 A dictionary of dictionaries and `meerschaum.Pipe` objects 109 in the connector, metric, location hierarchy. 110 If `as_list` is `True`, return a list of `meerschaum.Pipe` objects. 111 If `as_tags_dict` is `True`, return a dictionary mapping tags to pipes. 112 113 Examples 114 -------- 115 ``` 116 >>> ### Manual definition: 117 >>> pipes = { 118 ... <connector_keys>: { 119 ... <metric_key>: { 120 ... <location_key>: Pipe( 121 ... <connector_keys>, 122 ... <metric_key>, 123 ... <location_key>, 124 ... ), 125 ... }, 126 ... }, 127 ... }, 128 >>> ### Accessing a single pipe: 129 >>> pipes['sql:main']['weather'][None] 130 >>> ### Return a list instead: 131 >>> get_pipes(as_list=True) 132 [Pipe('sql:main', 'weather')] 133 >>> get_pipes(as_tags_dict=True) 134 {'gvl': Pipe('sql:main', 'weather')} 135 ``` 136 """ 137 import json 138 from collections import defaultdict 139 from meerschaum.config import get_config 140 from meerschaum.config.static import STATIC_CONFIG 141 from meerschaum.utils.warnings import error 142 from meerschaum.utils.misc import filter_keywords, separate_negation_values 143 from meerschaum.utils.pool import get_pool 144 from meerschaum.utils.pipes import replace_pipes_syntax 145 from meerschaum.utils.debug import dprint 146 from meerschaum.utils.dtypes import value_is_null, get_current_timestamp 147 from meerschaum import Pipe 148 149 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 150 if datetime_dtypes: 151 if isinstance(datetime_dtypes, str): 152 datetime_dtypes = [datetime_dtypes] 153 for _dt in datetime_dtypes: 154 _clean = str(_dt).lstrip(negation_prefix).lower() 155 if _clean not in ('datetime', 'int') and not value_is_null(_clean): 156 error(f"Invalid datetime dtype '{_dt}'.") 157 158 if connector_keys is None: 159 connector_keys = [] 160 if metric_keys is None: 161 metric_keys = [] 162 if location_keys is None: 163 location_keys = [] 164 if params is None: 165 params = {} 166 if tags is None: 167 tags = [] 168 169 if isinstance(connector_keys, str): 170 connector_keys = [connector_keys] 171 if isinstance(metric_keys, str): 172 metric_keys = [metric_keys] 173 if isinstance(location_keys, str): 174 location_keys = [location_keys] 175 176 ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`). 177 if mrsm_instance is None: 178 mrsm_instance = instance 179 if mrsm_instance is None: 180 mrsm_instance = get_config('meerschaum', 'instance', patch=True) 181 if isinstance(mrsm_instance, str): 182 from meerschaum.connectors.parse import parse_instance_keys 183 connector = parse_instance_keys(keys=mrsm_instance, debug=debug) 184 else: 185 from meerschaum.connectors import instance_types 186 valid_connector = False 187 if hasattr(mrsm_instance, 'type'): 188 if mrsm_instance.type in instance_types: 189 valid_connector = True 190 if not valid_connector: 191 error(f"Invalid instance connector: {mrsm_instance}") 192 connector = mrsm_instance 193 if debug: 194 dprint(f"Using instance connector: {connector}") 195 if not connector: 196 error(f"Could not create connector from keys: '{mrsm_instance}'") 197 198 ### Get a list of tuples for the keys needed to build pipes. 199 result = fetch_pipes_keys( 200 method, 201 connector, 202 connector_keys = connector_keys, 203 metric_keys = metric_keys, 204 location_keys = location_keys, 205 tags = tags, 206 params = params, 207 workers = workers, 208 debug = debug 209 ) 210 if result is None: 211 error("Unable to build pipes!") 212 result_items: List[Tuple] = ( 213 list(result.items()) 214 if isinstance(result, dict) 215 else [(None, keys_tuple) for keys_tuple in result] 216 ) 217 218 ### Populate the `pipes` dictionary with Pipes based on the keys 219 ### obtained from the chosen `method`. 220 in_dtypes, ex_dtypes = separate_negation_values(datetime_dtypes or []) 221 pipes: PipesDict = {} 222 for pipe_id, keys_tuple in result_items: 223 ck, mk, lk = keys_tuple[0], keys_tuple[1], keys_tuple[2] 224 pipe_tags_or_parameters = keys_tuple[3] if len(keys_tuple) == 4 else None 225 pipe_parameters = ( 226 pipe_tags_or_parameters 227 if isinstance(pipe_tags_or_parameters, (dict, str)) 228 else None 229 ) 230 if isinstance(pipe_parameters, str): 231 pipe_parameters = json.loads(pipe_parameters) 232 pipe_tags = ( 233 pipe_tags_or_parameters 234 if isinstance(pipe_tags_or_parameters, list) 235 else ( 236 pipe_tags_or_parameters.get('tags', []) 237 if isinstance(pipe_tags_or_parameters, dict) 238 else None 239 ) 240 ) 241 242 pipe = Pipe( 243 ck, mk, lk, 244 mrsm_instance = connector, 245 parameters = pipe_parameters, 246 tags = pipe_tags, 247 debug = debug, 248 **filter_keywords(Pipe, **kw) 249 ) 250 pipe.__dict__['_tags'] = pipe_tags 251 if pipe_id is not None: 252 pipe._cache_value('_id', pipe_id, memory_only=True, debug=debug) 253 if pipe_parameters is not None: 254 now = get_current_timestamp('ms', as_int=True) / 1000 255 full_attributes = { 256 'connector_keys': ck, 257 'metric_key': mk, 258 'location_key': lk, 259 'parameters': pipe_parameters, 260 } 261 if pipe_id is not None: 262 full_attributes['pipe_id'] = pipe_id 263 pipe._cache_value('attributes', full_attributes, memory_only=True, debug=debug) 264 pipe._cache_value('_attributes_sync_time', now, memory_only=True, debug=debug) 265 if datetime_dtypes: 266 if pipe_parameters is None: 267 pipe_parameters = pipe.get_parameters(debug=debug) 268 columns_val = (pipe_parameters or {}).get('columns', {}) or {} 269 if isinstance(columns_val, str) and 'Pipe(' in columns_val: 270 columns_val = replace_pipes_syntax(columns_val) 271 272 dt_col = columns_val.get('datetime', None) 273 dt_typ = ( 274 ((pipe_parameters or {}).get('dtypes', None) or {}).get(dt_col, None) 275 if dt_col 276 else None 277 ) 278 279 def _dtype_matches(clean_d): 280 if not dt_col: 281 return value_is_null(clean_d) 282 return ( 283 (clean_d == 'int' and 'int' in str(dt_typ).lower()) 284 or 285 (clean_d == 'datetime' and 'int' not in str(dt_typ).lower()) 286 ) 287 288 in_match = not in_dtypes or any(_dtype_matches(d) for d in in_dtypes) 289 ex_match = bool(ex_dtypes and any(_dtype_matches(d) for d in ex_dtypes)) 290 keep_pipe = in_match and not ex_match 291 292 if not keep_pipe: 293 continue 294 295 if ck not in pipes: 296 pipes[ck] = {} 297 298 if mk not in pipes[ck]: 299 pipes[ck][mk] = {} 300 301 302 pipes[ck][mk][lk] = pipe 303 304 if not as_list and not as_tags_dict: 305 return pipes 306 307 from meerschaum.utils.pipes import flatten_pipes_dict 308 pipes_list = flatten_pipes_dict(pipes) 309 if as_list: 310 return pipes_list 311 312 pool = get_pool(workers=(workers if connector.IS_THREAD_SAFE else 1)) 313 def gather_pipe_tags(pipe: mrsm.Pipe) -> Tuple[mrsm.Pipe, List[str]]: 314 _tags = pipe.__dict__.get('_tags', None) 315 gathered_tags = _tags if _tags is not None else pipe.tags 316 return pipe, (gathered_tags or []) 317 318 tags_pipes = defaultdict(lambda: []) 319 pipes_tags = dict(pool.map(gather_pipe_tags, pipes_list)) 320 for pipe, tags in pipes_tags.items(): 321 for tag in (tags or []): 322 tags_pipes[tag].append(pipe) 323 324 return dict(tags_pipes)
Return a dictionary or list of meerschaum.Pipe objects.
Parameters
- connector_keys (Union[str, List[str], None], default None):
String or list of connector keys.
If omitted or is
'*', fetch all possible keys. If a string begins with'_', select keys that do NOT match the string. - metric_keys (Union[str, List[str], None], default None):
String or list of metric keys. See
connector_keysfor formatting. - location_keys (Union[str, List[str], None], default None):
String or list of location keys. See
connector_keysfor formatting. - tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
- datetime_dtypes (Optional[List[str]], default None):
If provided, only include pipes with the corresponding
datetimeaxis dtypes. Accepted values aredatetime,int,None(ornull, etc.). May be negated by_. - params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
Params are parsed into a SQL WHERE clause.
E.g.
{'a': 1, 'b': 2}equates to'WHERE a = 1 AND b = 2' - mrsm_instance (Union[str, InstanceConnector, None], default None):
Connector keys for the Meerschaum instance of the pipes.
Must be a
meerschaum.connectors.sql.SQLConnector.SQLConnectorormeerschaum.connectors.api.APIConnector.APIConnector. - as_list (bool, default False):
If
True, return pipes in a list instead of a hierarchical dictionary.False:{connector_keys: {metric_key: {location_key: Pipe}}}True:[Pipe] - as_tags_dict (bool, default False):
If
True, return a dictionary mapping tags to pipes. Pipes with multiple tags will be repeated. - method (str, default 'registered'):
Available options:
['registered', 'explicit', 'all']If'registered'(default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If'all', create pipes from predefined metrics and locations. Requiredconnector_keys. NOTE: Method'all'is not implemented! - workers (Optional[int], default None):
If provided (and
as_tags_dictisTrue), set the number of workers for the pool to fetch tags. Only takes effect if the instance connector supports multi-threading - **kw (Any:):
Keyword arguments to pass to the
meerschaum.Pipeconstructor.
Returns
- A dictionary of dictionaries and
meerschaum.Pipeobjects - in the connector, metric, location hierarchy.
- If
as_listisTrue, return a list ofmeerschaum.Pipeobjects. - If
as_tags_dictisTrue, return a dictionary mapping tags to pipes.
Examples
>>> ### Manual definition:
>>> pipes = {
... <connector_keys>: {
... <metric_key>: {
... <location_key>: Pipe(
... <connector_keys>,
... <metric_key>,
... <location_key>,
... ),
... },
... },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[Pipe('sql:main', 'weather')]
>>> get_pipes(as_tags_dict=True)
{'gvl': Pipe('sql:main', 'weather')}
def
fetch_pipes_keys( method: str, connector: meerschaum.InstanceConnector, **kw: Any) -> Union[List[Tuple[str, str, str]], List[Tuple[str, str, str, Union[str, Dict[str, Any]]]]]:
327def fetch_pipes_keys( 328 method: str, 329 connector: 'mrsm.connectors.InstanceConnector', 330 **kw: Any 331) -> Union[List[Tuple[str, str, str]], List[Tuple[str, str, str, Union[str, Dict[str, Any]]]]]: 332 """ 333 Fetch keys for pipes according to a method. 334 335 Parameters 336 ---------- 337 method: str 338 The method by which to fetch the keys. See `get_pipes()` above. 339 340 connector: meerschaum.connectors.InstanceConnector 341 The connector to use to fetch the keys. 342 Must be of type `meerschaum.connectors.sql.SQLConnector.SQLConnector` 343 or `meerschaum.connectors.api.APIConnector.APIConnector`. 344 345 connector_keys: Optional[List[str]], default None 346 The list of `connector_keys` to filter by. 347 348 metric_keys: Optional[List[str]], default None 349 The list of `metric_keys` to filter by. 350 351 location_keys: Optional[List[str]], default None 352 The list of `location_keys` to filter by. 353 354 params: Optional[Dict[str, Any]], default None 355 A dictionary of parameters to filter by. 356 357 debug: bool 358 Verbosity toggle. 359 360 Returns 361 ------- 362 A list or a dictionary (with pipe IDs as indices) of tuples of strings (or `None` for `location_key`) 363 in the form `(connector_keys, metric_key, location_key)`. 364 Optionally the parameters or tags may be returned alongside the keys. 365 Note the return value depends on the instance connector implementation. 366 367 Examples 368 -------- 369 >>> fetch_pipes_keys(metric_keys=['weather']) 370 {1: ('sql:main', 'weather', None, {'columns': {'datetime': 'ts', 'id': 'station'}})} 371 """ 372 from meerschaum.utils.warnings import error 373 374 def _registered( 375 connector_keys: Optional[List[str]] = None, 376 metric_keys: Optional[List[str]] = None, 377 location_keys: Optional[List[str]] = None, 378 tags: Optional[List[str]] = None, 379 params: Optional[Dict[str, Any]] = None, 380 debug: bool = False, 381 **kw 382 ) -> List[Tuple[str, str, str]]: 383 """ 384 Get keys from the pipes table or the API directly. 385 Builds query or URL based on provided keys and parameters. 386 387 Only works for SQL and API Connectors. 388 """ 389 if connector_keys is None: 390 connector_keys = [] 391 if metric_keys is None: 392 metric_keys = [] 393 if location_keys is None: 394 location_keys = [] 395 if params is None: 396 params = {} 397 if tags is None: 398 tags = [] 399 400 return connector.fetch_pipes_keys( 401 connector_keys = connector_keys, 402 metric_keys = metric_keys, 403 location_keys = location_keys, 404 tags = tags, 405 params = params, 406 debug = debug 407 ) 408 409 def _explicit( 410 connector_keys: Optional[List[str]] = None, 411 metric_keys: Optional[List[str]] = None, 412 location_keys: Optional[List[str]] = None, 413 params: Optional[Dict[str, Any]] = None, 414 tags: Optional[List[str]] = None, 415 debug: bool = False, 416 **kw 417 ) -> List[Tuple[str, str, str]]: 418 """ 419 Explicitly build Pipes based on provided keys. 420 Raises an error if `connector_keys` or `metric_keys` is empty, 421 and assumes `location_keys = [None]` if empty. 422 """ 423 424 if connector_keys is None: 425 connector_keys = [] 426 if metric_keys is None: 427 metric_keys = [] 428 if location_keys is None: 429 location_keys = [] 430 if params is None: 431 params = {} 432 433 if not isinstance(connector_keys, list): 434 connector_keys = [connector_keys] 435 if not isinstance(metric_keys, list): 436 metric_keys = [metric_keys] 437 if not isinstance(location_keys, list): 438 location_keys = [location_keys] 439 440 missing_keys = [] 441 if len(connector_keys) == 0: 442 missing_keys.append('connector_keys') 443 if len(metric_keys) == 0: 444 missing_keys.append('metric_keys') 445 if len(location_keys) == 0: 446 location_keys.append(None) 447 if len(missing_keys) > 0: 448 error_message = "Missing parameters: '" + "', '".join(missing_keys) + "'" 449 error_message += "\nSee --help for information for passing parameters." 450 error(error_message) 451 result = [] 452 for ck in connector_keys: 453 for mk in metric_keys: 454 for lk in location_keys: 455 result.append((ck, mk, lk)) 456 return result 457 458 _method_functions = { 459 'registered' : _registered, 460 'explicit' : _explicit, 461 } 462 if method not in _method_functions: 463 error(f"Method '{method}' is not supported!", NotImplementedError) 464 return _method_functions[method](**kw)
Fetch keys for pipes according to a method.
Parameters
- method (str):
The method by which to fetch the keys. See
get_pipes()above. - connector (meerschaum.connectors.InstanceConnector):
The connector to use to fetch the keys.
Must be of type
meerschaum.connectors.sql.SQLConnector.SQLConnectorormeerschaum.connectors.api.APIConnector.APIConnector. - connector_keys (Optional[List[str]], default None):
The list of
connector_keysto filter by. - metric_keys (Optional[List[str]], default None):
The list of
metric_keysto filter by. - location_keys (Optional[List[str]], default None):
The list of
location_keysto filter by. - params (Optional[Dict[str, Any]], default None): A dictionary of parameters to filter by.
- debug (bool): Verbosity toggle.
Returns
- A list or a dictionary (with pipe IDs as indices) of tuples of strings (or
Noneforlocation_key) - in the form
(connector_keys, metric_key, location_key). - Optionally the parameters or tags may be returned alongside the keys.
- Note the return value depends on the instance connector implementation.
Examples
>>> fetch_pipes_keys(metric_keys=['weather'])
{1: ('sql:main', 'weather', None, {'columns': {'datetime': 'ts', 'id': 'station'}})}