meerschaum.utils
The utils module contains utility functions. These include tools from primary utilities (get_pipes) to miscellaneous helper functions.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6The utils module contains utility functions. 7These include tools from primary utilities (get_pipes) 8to miscellaneous helper functions. 9""" 10 11__all__ = ( 12 'daemon', 13 'dataframe', 14 'debug', 15 'dtypes', 16 'formatting', 17 'interactive', 18 'misc', 19 'networking', 20 'packages', 21 'pool', 22 'process', 23 'prompt', 24 'schedule', 25 'sql', 26 'threading', 27 'typing', 28 'venv', 29 'warnings', 30 'yaml', 31 "get_pipes", 32 "fetch_pipes_keys", 33) 34from meerschaum.utils._get_pipes import get_pipes, fetch_pipes_keys
def
get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, as_list: bool = False, method: str = 'registered', debug: bool = False, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]], List[meerschaum.Pipe]]:
19def get_pipes( 20 connector_keys: Union[str, List[str], None] = None, 21 metric_keys: Union[str, List[str], None] = None, 22 location_keys: Union[str, List[str], None] = None, 23 tags: Optional[List[str]] = None, 24 params: Optional[Dict[str, Any]] = None, 25 mrsm_instance: Union[str, InstanceConnector, None] = None, 26 instance: Union[str, InstanceConnector, None] = None, 27 as_list: bool = False, 28 method: str = 'registered', 29 debug: bool = False, 30 **kw: Any 31) -> Union[PipesDict, List[mrsm.Pipe]]: 32 """ 33 Return a dictionary or list of `meerschaum.Pipe` objects. 34 35 Parameters 36 ---------- 37 connector_keys: Union[str, List[str], None], default None 38 String or list of connector keys. 39 If omitted or is `'*'`, fetch all possible keys. 40 If a string begins with `'_'`, select keys that do NOT match the string. 41 42 metric_keys: Union[str, List[str], None], default None 43 String or list of metric keys. See `connector_keys` for formatting. 44 45 location_keys: Union[str, List[str], None], default None 46 String or list of location keys. See `connector_keys` for formatting. 47 48 tags: Optional[List[str]], default None 49 If provided, only include pipes with these tags. 50 51 params: Optional[Dict[str, Any]], default None 52 Dictionary of additional parameters to search by. 53 Params are parsed into a SQL WHERE clause. 54 E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'` 55 56 mrsm_instance: Union[str, InstanceConnector, None], default None 57 Connector keys for the Meerschaum instance of the pipes. 58 Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or 59 `meerschaum.connectors.api.APIConnector.APIConnector`. 60 61 as_list: bool, default False 62 If `True`, return pipes in a list instead of a hierarchical dictionary. 63 `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}` 64 `True` : `[Pipe]` 65 66 method: str, default 'registered' 67 Available options: `['registered', 'explicit', 'all']` 68 If `'registered'` (default), create pipes based on registered keys in the connector's pipes table 69 (API or SQL connector, depends on mrsm_instance). 70 If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys 71 instead of consulting the pipes table. Useful for creating non-existent pipes. 72 If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`. 73 **NOTE:** Method `'all'` is not implemented! 74 75 **kw: Any: 76 Keyword arguments to pass to the `meerschaum.Pipe` constructor. 77 78 79 Returns 80 ------- 81 A dictionary of dictionaries and `meerschaum.Pipe` objects 82 in the connector, metric, location hierarchy. 83 If `as_list` is `True`, return a list of `meerschaum.Pipe` objects. 84 85 Examples 86 -------- 87 ``` 88 >>> ### Manual definition: 89 >>> pipes = { 90 ... <connector_keys>: { 91 ... <metric_key>: { 92 ... <location_key>: Pipe( 93 ... <connector_keys>, 94 ... <metric_key>, 95 ... <location_key>, 96 ... ), 97 ... }, 98 ... }, 99 ... }, 100 >>> ### Accessing a single pipe: 101 >>> pipes['sql:main']['weather'][None] 102 >>> ### Return a list instead: 103 >>> get_pipes(as_list=True) 104 [sql_main_weather] 105 >>> 106 ``` 107 """ 108 109 from meerschaum.config import get_config 110 from meerschaum.utils.warnings import error 111 from meerschaum.utils.misc import filter_keywords 112 113 if connector_keys is None: 114 connector_keys = [] 115 if metric_keys is None: 116 metric_keys = [] 117 if location_keys is None: 118 location_keys = [] 119 if params is None: 120 params = {} 121 if tags is None: 122 tags = [] 123 124 if isinstance(connector_keys, str): 125 connector_keys = [connector_keys] 126 if isinstance(metric_keys, str): 127 metric_keys = [metric_keys] 128 if isinstance(location_keys, str): 129 location_keys = [location_keys] 130 131 ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`). 132 if mrsm_instance is None: 133 mrsm_instance = instance 134 if mrsm_instance is None: 135 mrsm_instance = get_config('meerschaum', 'instance', patch=True) 136 if isinstance(mrsm_instance, str): 137 from meerschaum.connectors.parse import parse_instance_keys 138 connector = parse_instance_keys(keys=mrsm_instance, debug=debug) 139 else: 140 from meerschaum.connectors import instance_types 141 valid_connector = False 142 if hasattr(mrsm_instance, 'type'): 143 if mrsm_instance.type in instance_types: 144 valid_connector = True 145 if not valid_connector: 146 error(f"Invalid instance connector: {mrsm_instance}") 147 connector = mrsm_instance 148 if debug: 149 from meerschaum.utils.debug import dprint 150 dprint(f"Using instance connector: {connector}") 151 if not connector: 152 error(f"Could not create connector from keys: '{mrsm_instance}'") 153 154 ### Get a list of tuples for the keys needed to build pipes. 155 result = fetch_pipes_keys( 156 method, 157 connector, 158 connector_keys = connector_keys, 159 metric_keys = metric_keys, 160 location_keys = location_keys, 161 tags = tags, 162 params = params, 163 debug = debug 164 ) 165 if result is None: 166 error(f"Unable to build pipes!") 167 168 ### Populate the `pipes` dictionary with Pipes based on the keys 169 ### obtained from the chosen `method`. 170 from meerschaum import Pipe 171 pipes = {} 172 for ck, mk, lk in result: 173 if ck not in pipes: 174 pipes[ck] = {} 175 176 if mk not in pipes[ck]: 177 pipes[ck][mk] = {} 178 179 pipes[ck][mk][lk] = Pipe( 180 ck, mk, lk, 181 mrsm_instance = connector, 182 debug = debug, 183 **filter_keywords(Pipe, **kw) 184 ) 185 186 if not as_list: 187 return pipes 188 from meerschaum.utils.misc import flatten_pipes_dict 189 return flatten_pipes_dict(pipes)
Return a dictionary or list of meerschaum.Pipe
objects.
Parameters
- connector_keys (Union[str, List[str], None], default None):
String or list of connector keys.
If omitted or is
'*'
, fetch all possible keys. If a string begins with'_'
, select keys that do NOT match the string. - metric_keys (Union[str, List[str], None], default None):
String or list of metric keys. See
connector_keys
for formatting. - location_keys (Union[str, List[str], None], default None):
String or list of location keys. See
connector_keys
for formatting. - tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
Params are parsed into a SQL WHERE clause.
E.g.
{'a': 1, 'b': 2}
equates to'WHERE a = 1 AND b = 2'
- mrsm_instance (Union[str, InstanceConnector, None], default None):
Connector keys for the Meerschaum instance of the pipes.
Must be a
meerschaum.connectors.sql.SQLConnector.SQLConnector
ormeerschaum.connectors.api.APIConnector.APIConnector
. - as_list (bool, default False):
If
True
, return pipes in a list instead of a hierarchical dictionary.False
:{connector_keys: {metric_key: {location_key: Pipe}}}
True
:[Pipe]
- method (str, default 'registered'):
Available options:
['registered', 'explicit', 'all']
If'registered'
(default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If'explicit'
, create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If'all'
, create pipes from predefined metrics and locations. Requiredconnector_keys
. NOTE: Method'all'
is not implemented! - **kw (Any:):
Keyword arguments to pass to the
meerschaum.Pipe
constructor.
Returns
- A dictionary of dictionaries and
meerschaum.Pipe
objects - in the connector, metric, location hierarchy.
- If
as_list
isTrue
, return a list ofmeerschaum.Pipe
objects.
Examples
>>> ### Manual definition:
>>> pipes = {
... <connector_keys>: {
... <metric_key>: {
... <location_key>: Pipe(
... <connector_keys>,
... <metric_key>,
... <location_key>,
... ),
... },
... },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>>
def
fetch_pipes_keys( method: str, connector: meerschaum.Connector, **kw: Any) -> List[Tuple[str, str, str]]:
192def fetch_pipes_keys( 193 method: str, 194 connector: 'meerschaum.connectors.Connector', 195 **kw: Any 196 ) -> List[Tuple[str, str, str]]: 197 """ 198 Fetch keys for pipes according to a method. 199 200 Parameters 201 ---------- 202 method: str 203 The method by which to fetch the keys. See `get_pipes()` above. 204 205 connector: meerschaum.connectors.Connector 206 The connector to use to fetch the keys. 207 Must be of type `meerschaum.connectors.sql.SQLConnector.SQLConnector` 208 or `meerschaum.connectors.api.APIConnector.APIConnector`. 209 210 connector_keys: Optional[List[str]], default None 211 The list of `connector_keys` to filter by. 212 213 metric_keys: Optional[List[str]], default None 214 The list of `metric_keys` to filter by. 215 216 location_keys: Optional[List[str]], default None 217 The list of `location_keys` to filter by. 218 219 params: Optional[Dict[str, Any]], default None 220 A dictionary of parameters to filter by. 221 222 debug: bool 223 Verbosity toggle. 224 225 Returns 226 ------- 227 A list of tuples of strings (or `None` for `location_key`) 228 in the form `(connector_keys, metric_key, location_key)`. 229 230 Examples 231 -------- 232 >>> fetch_pipes_keys(metric_keys=['weather']) 233 [('sql:main', 'weather', None)] 234 """ 235 from meerschaum.utils.warnings import error 236 237 def _registered( 238 connector_keys: Optional[List[str]] = None, 239 metric_keys: Optional[List[str]] = None, 240 location_keys: Optional[List[str]] = None, 241 tags: Optional[List[str]] = None, 242 params: Optional[Dict[str, Any]] = None, 243 debug: bool = False, 244 **kw 245 ) -> List[Tuple[str, str, str]]: 246 """ 247 Get keys from the pipes table or the API directly. 248 Builds query or URL based on provided keys and parameters. 249 250 Only works for SQL and API Connectors. 251 """ 252 if connector_keys is None: 253 connector_keys = [] 254 if metric_keys is None: 255 metric_keys = [] 256 if location_keys is None: 257 location_keys = [] 258 if params is None: 259 params = {} 260 if tags is None: 261 tags = [] 262 263 return connector.fetch_pipes_keys( 264 connector_keys = connector_keys, 265 metric_keys = metric_keys, 266 location_keys = location_keys, 267 tags = tags, 268 params = params, 269 debug = debug 270 ) 271 272 def _explicit( 273 connector_keys: Optional[List[str]] = None, 274 metric_keys: Optional[List[str]] = None, 275 location_keys: Optional[List[str]] = None, 276 params: Optional[Dict[str, Any]] = None, 277 tags: Optional[List[str]] = None, 278 debug: bool = False, 279 **kw 280 ) -> List[Tuple[str, str, str]]: 281 """ 282 Explicitly build Pipes based on provided keys. 283 Raises an error if `connector_keys` or `metric_keys` is empty, 284 and assumes `location_keys = [None]` if empty. 285 """ 286 287 if connector_keys is None: 288 connector_keys = [] 289 if metric_keys is None: 290 metric_keys = [] 291 if location_keys is None: 292 location_keys = [] 293 if params is None: 294 params = {} 295 296 if not isinstance(connector_keys, list): 297 connector_keys = [connector_keys] 298 if not isinstance(metric_keys, list): 299 metric_keys = [metric_keys] 300 if not isinstance(location_keys, list): 301 location_keys = [location_keys] 302 303 missing_keys = [] 304 if len(connector_keys) == 0: 305 missing_keys.append('connector_keys') 306 if len(metric_keys) == 0: 307 missing_keys.append('metric_keys') 308 if len(location_keys) == 0: 309 location_keys.append(None) 310 if len(missing_keys) > 0: 311 error_message = "Missing parameters: '" + "', '".join(missing_keys) + "'" 312 error_message += "\nSee --help for information for passing parameters." 313 error(error_message) 314 result = [] 315 for ck in connector_keys: 316 for mk in metric_keys: 317 for lk in location_keys: 318 result.append((ck, mk, lk)) 319 return result 320 321 _method_functions = { 322 'registered' : _registered, 323 'explicit' : _explicit, 324 } 325 if method not in _method_functions: 326 error(f"Method '{method}' is not supported!", NotImplementedError) 327 return _method_functions[method](**kw)
Fetch keys for pipes according to a method.
Parameters
- method (str):
The method by which to fetch the keys. See
get_pipes()
above. - connector (meerschaum.connectors.Connector):
The connector to use to fetch the keys.
Must be of type
meerschaum.connectors.sql.SQLConnector.SQLConnector
ormeerschaum.connectors.api.APIConnector.APIConnector
. - connector_keys (Optional[List[str]], default None):
The list of
connector_keys
to filter by. - metric_keys (Optional[List[str]], default None):
The list of
metric_keys
to filter by. - location_keys (Optional[List[str]], default None):
The list of
location_keys
to filter by. - params (Optional[Dict[str, Any]], default None): A dictionary of parameters to filter by.
- debug (bool): Verbosity toggle.
Returns
- A list of tuples of strings (or
None
forlocation_key
) - in the form
(connector_keys, metric_key, location_key)
.
Examples
>>> fetch_pipes_keys(metric_keys=['weather'])
[('sql:main', 'weather', None)]