meerschaum.utils
The utils module contains utility functions. These include tools from primary utilities (get_pipes) to miscellaneous helper functions.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6The utils module contains utility functions. 7These include tools from primary utilities (get_pipes) 8to miscellaneous helper functions. 9""" 10 11__all__ = ( 12 'daemon', 13 'dataframe', 14 'debug', 15 'dtypes', 16 'formatting', 17 'interactive', 18 'misc', 19 'networking', 20 'packages', 21 'pool', 22 'process', 23 'prompt', 24 'schedule', 25 'sql', 26 'threading', 27 'typing', 28 'venv', 29 'warnings', 30 'yaml', 31 "get_pipes", 32 "fetch_pipes_keys", 33) 34from meerschaum.utils._get_pipes import get_pipes, fetch_pipes_keys
def
get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType] = None, instance: Union[str, meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType] = None, as_list: bool = False, method: str = 'registered', wait: bool = False, debug: bool = False, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, meerschaum.core.Pipe.Pipe]]], List[meerschaum.core.Pipe.Pipe]]:
19def get_pipes( 20 connector_keys: Union[str, List[str], None] = None, 21 metric_keys: Union[str, List[str], None] = None, 22 location_keys: Union[str, List[str], None] = None, 23 tags: Optional[List[str]] = None, 24 params: Optional[Dict[str, Any]] = None, 25 mrsm_instance: Union[str, InstanceConnector, None] = None, 26 instance: Union[str, InstanceConnector, None] = None, 27 as_list: bool = False, 28 method: str = 'registered', 29 wait: bool = False, 30 debug: bool = False, 31 **kw: Any 32 ) -> Union[PipesDict, List[mrsm.Pipe]]: 33 """ 34 Return a dictionary or list of `meerschaum.Pipe` objects. 35 36 Parameters 37 ---------- 38 connector_keys: Union[str, List[str], None], default None 39 String or list of connector keys. 40 If omitted or is `'*'`, fetch all possible keys. 41 If a string begins with `'_'`, select keys that do NOT match the string. 42 43 metric_keys: Union[str, List[str], None], default None 44 String or list of metric keys. See `connector_keys` for formatting. 45 46 location_keys: Union[str, List[str], None], default None 47 String or list of location keys. See `connector_keys` for formatting. 48 49 tags: Optional[List[str]], default None 50 If provided, only include pipes with these tags. 51 52 params: Optional[Dict[str, Any]], default None 53 Dictionary of additional parameters to search by. 54 Params are parsed into a SQL WHERE clause. 55 E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'` 56 57 mrsm_instance: Union[str, InstanceConnector, None], default None 58 Connector keys for the Meerschaum instance of the pipes. 59 Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or 60 `meerschaum.connectors.api.APIConnector.APIConnector`. 61 62 as_list: bool, default False 63 If `True`, return pipes in a list instead of a hierarchical dictionary. 64 `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}` 65 `True` : `[Pipe]` 66 67 method: str, default 'registered' 68 Available options: `['registered', 'explicit', 'all']` 69 If `'registered'` (default), create pipes based on registered keys in the connector's pipes table 70 (API or SQL connector, depends on mrsm_instance). 71 If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys 72 instead of consulting the pipes table. Useful for creating non-existent pipes. 73 If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`. 74 **NOTE:** Method `'all'` is not implemented! 75 76 wait: bool, default False 77 Wait for a connection before getting Pipes. Should only be true for cases where the 78 database might not be running (like the API). 79 80 **kw: Any: 81 Keyword arguments to pass to the `meerschaum.Pipe` constructor. 82 83 84 Returns 85 ------- 86 A dictionary of dictionaries and `meerschaum.Pipe` objects 87 in the connector, metric, location hierarchy. 88 If `as_list` is `True`, return a list of `meerschaum.Pipe` objects. 89 90 Examples 91 -------- 92 ``` 93 >>> ### Manual definition: 94 >>> pipes = { 95 ... <connector_keys>: { 96 ... <metric_key>: { 97 ... <location_key>: Pipe( 98 ... <connector_keys>, 99 ... <metric_key>, 100 ... <location_key>, 101 ... ), 102 ... }, 103 ... }, 104 ... }, 105 >>> ### Accessing a single pipe: 106 >>> pipes['sql:main']['weather'][None] 107 >>> ### Return a list instead: 108 >>> get_pipes(as_list=True) 109 [sql_main_weather] 110 >>> 111 ``` 112 """ 113 114 from meerschaum.config import get_config 115 from meerschaum.utils.warnings import error 116 from meerschaum.utils.misc import filter_keywords 117 118 if connector_keys is None: 119 connector_keys = [] 120 if metric_keys is None: 121 metric_keys = [] 122 if location_keys is None: 123 location_keys = [] 124 if params is None: 125 params = {} 126 if tags is None: 127 tags = [] 128 129 if isinstance(connector_keys, str): 130 connector_keys = [connector_keys] 131 if isinstance(metric_keys, str): 132 metric_keys = [metric_keys] 133 if isinstance(location_keys, str): 134 location_keys = [location_keys] 135 136 ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`). 137 ### If `wait`, wait until a connection is made 138 if mrsm_instance is None: 139 mrsm_instance = instance 140 if mrsm_instance is None: 141 mrsm_instance = get_config('meerschaum', 'instance', patch=True) 142 if isinstance(mrsm_instance, str): 143 from meerschaum.connectors.parse import parse_instance_keys 144 connector = parse_instance_keys(keys=mrsm_instance, wait=wait, debug=debug) 145 else: ### NOTE: mrsm_instance MUST be a SQL or API connector for this to work 146 from meerschaum.connectors import instance_types 147 valid_connector = False 148 if hasattr(mrsm_instance, 'type'): 149 if mrsm_instance.type in instance_types: 150 valid_connector = True 151 if not valid_connector: 152 error(f"Invalid instance connector: {mrsm_instance}") 153 connector = mrsm_instance 154 if debug: 155 from meerschaum.utils.debug import dprint 156 dprint(f"Using instance connector: {connector}") 157 if not connector: 158 error(f"Could not create connector from keys: '{mrsm_instance}'") 159 160 ### Get a list of tuples for the keys needed to build pipes. 161 result = fetch_pipes_keys( 162 method, 163 connector, 164 connector_keys = connector_keys, 165 metric_keys = metric_keys, 166 location_keys = location_keys, 167 tags = tags, 168 params = params, 169 debug = debug 170 ) 171 if result is None: 172 error(f"Unable to build pipes!") 173 174 ### Populate the `pipes` dictionary with Pipes based on the keys 175 ### obtained from the chosen `method`. 176 from meerschaum import Pipe 177 pipes = {} 178 for ck, mk, lk in result: 179 if ck not in pipes: 180 pipes[ck] = {} 181 182 if mk not in pipes[ck]: 183 pipes[ck][mk] = {} 184 185 pipes[ck][mk][lk] = Pipe( 186 ck, mk, lk, 187 mrsm_instance = connector, 188 debug = debug, 189 **filter_keywords(Pipe, **kw) 190 ) 191 192 if not as_list: 193 return pipes 194 from meerschaum.utils.misc import flatten_pipes_dict 195 return flatten_pipes_dict(pipes)
Return a dictionary or list of meerschaum.Pipe
objects.
Parameters
- connector_keys (Union[str, List[str], None], default None):
String or list of connector keys.
If omitted or is
'*'
, fetch all possible keys. If a string begins with'_'
, select keys that do NOT match the string. - metric_keys (Union[str, List[str], None], default None):
String or list of metric keys. See
connector_keys
for formatting. - location_keys (Union[str, List[str], None], default None):
String or list of location keys. See
connector_keys
for formatting. - tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
Params are parsed into a SQL WHERE clause.
E.g.
{'a': 1, 'b': 2}
equates to'WHERE a = 1 AND b = 2'
- mrsm_instance (Union[str, InstanceConnector, None], default None):
Connector keys for the Meerschaum instance of the pipes.
Must be a
meerschaum.connectors.sql.SQLConnector.SQLConnector
ormeerschaum.connectors.api.APIConnector.APIConnector
. - as_list (bool, default False):
If
True
, return pipes in a list instead of a hierarchical dictionary.False
:{connector_keys: {metric_key: {location_key: Pipe}}}
True
:[Pipe]
- method (str, default 'registered'):
Available options:
['registered', 'explicit', 'all']
If'registered'
(default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If'explicit'
, create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If'all'
, create pipes from predefined metrics and locations. Requiredconnector_keys
. NOTE: Method'all'
is not implemented! - wait (bool, default False): Wait for a connection before getting Pipes. Should only be true for cases where the database might not be running (like the API).
- **kw (Any:):
Keyword arguments to pass to the
meerschaum.Pipe
constructor.
Returns
- A dictionary of dictionaries and
meerschaum.Pipe
objects - in the connector, metric, location hierarchy.
- If
as_list
isTrue
, return a list ofmeerschaum.Pipe
objects.
Examples
>>> ### Manual definition:
>>> pipes = {
... <connector_keys>: {
... <metric_key>: {
... <location_key>: Pipe(
... <connector_keys>,
... <metric_key>,
... <location_key>,
... ),
... },
... },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>>
def
fetch_pipes_keys( method: str, connector: meerschaum.connectors.Connector.Connector, **kw: Any) -> List[Tuple[str, str, str]]:
198def fetch_pipes_keys( 199 method: str, 200 connector: 'meerschaum.connectors.Connector', 201 **kw: Any 202 ) -> List[Tuple[str, str, str]]: 203 """ 204 Fetch keys for pipes according to a method. 205 206 Parameters 207 ---------- 208 method: str 209 The method by which to fetch the keys. See `get_pipes()` above. 210 211 connector: meerschaum.connectors.Connector 212 The connector to use to fetch the keys. 213 Must be of type `meerschaum.connectors.sql.SQLConnector.SQLConnector` 214 or `meerschaum.connectors.api.APIConnector.APIConnector`. 215 216 connector_keys: Optional[List[str]], default None 217 The list of `connector_keys` to filter by. 218 219 metric_keys: Optional[List[str]], default None 220 The list of `metric_keys` to filter by. 221 222 location_keys: Optional[List[str]], default None 223 The list of `location_keys` to filter by. 224 225 params: Optional[Dict[str, Any]], default None 226 A dictionary of parameters to filter by. 227 228 debug: bool 229 Verbosity toggle. 230 231 Returns 232 ------- 233 A list of tuples of strings (or `None` for `location_key`) 234 in the form `(connector_keys, metric_key, location_key)`. 235 236 Examples 237 -------- 238 >>> fetch_pipes_keys(metric_keys=['weather']) 239 [('sql:main', 'weather', None)] 240 """ 241 from meerschaum.utils.warnings import error 242 243 def _registered( 244 connector_keys: Optional[List[str]] = None, 245 metric_keys: Optional[List[str]] = None, 246 location_keys: Optional[List[str]] = None, 247 tags: Optional[List[str]] = None, 248 params: Optional[Dict[str, Any]] = None, 249 debug: bool = False, 250 **kw 251 ) -> List[Tuple[str, str, str]]: 252 """ 253 Get keys from the pipes table or the API directly. 254 Builds query or URL based on provided keys and parameters. 255 256 Only works for SQL and API Connectors. 257 """ 258 if connector_keys is None: 259 connector_keys = [] 260 if metric_keys is None: 261 metric_keys = [] 262 if location_keys is None: 263 location_keys = [] 264 if params is None: 265 params = {} 266 if tags is None: 267 tags = [] 268 269 return connector.fetch_pipes_keys( 270 connector_keys = connector_keys, 271 metric_keys = metric_keys, 272 location_keys = location_keys, 273 tags = tags, 274 params = params, 275 debug = debug 276 ) 277 278 def _explicit( 279 connector_keys: Optional[List[str]] = None, 280 metric_keys: Optional[List[str]] = None, 281 location_keys: Optional[List[str]] = None, 282 params: Optional[Dict[str, Any]] = None, 283 tags: Optional[List[str]] = None, 284 debug: bool = False, 285 **kw 286 ) -> List[Tuple[str, str, str]]: 287 """ 288 Explicitly build Pipes based on provided keys. 289 Raises an error if `connector_keys` or `metric_keys` is empty, 290 and assumes `location_keys = [None]` if empty. 291 """ 292 293 if connector_keys is None: 294 connector_keys = [] 295 if metric_keys is None: 296 metric_keys = [] 297 if location_keys is None: 298 location_keys = [] 299 if params is None: 300 params = {} 301 302 if not isinstance(connector_keys, list): 303 connector_keys = [connector_keys] 304 if not isinstance(metric_keys, list): 305 metric_keys = [metric_keys] 306 if not isinstance(location_keys, list): 307 location_keys = [location_keys] 308 309 missing_keys = [] 310 if len(connector_keys) == 0: 311 missing_keys.append('connector_keys') 312 if len(metric_keys) == 0: 313 missing_keys.append('metric_keys') 314 if len(location_keys) == 0: 315 location_keys.append(None) 316 if len(missing_keys) > 0: 317 error_message = "Missing parameters: '" + "', '".join(missing_keys) + "'" 318 error_message += "\nSee --help for information for passing parameters." 319 error(error_message) 320 result = [] 321 for ck in connector_keys: 322 for mk in metric_keys: 323 for lk in location_keys: 324 result.append((ck, mk, lk)) 325 return result 326 327 _method_functions = { 328 'registered' : _registered, 329 'explicit' : _explicit, 330 } 331 if method not in _method_functions: 332 error(f"Method '{method}' is not supported!", NotImplementedError) 333 return _method_functions[method](**kw)
Fetch keys for pipes according to a method.
Parameters
- method (str):
The method by which to fetch the keys. See
get_pipes()
above. - connector (meerschaum.connectors.Connector):
The connector to use to fetch the keys.
Must be of type
meerschaum.connectors.sql.SQLConnector.SQLConnector
ormeerschaum.connectors.api.APIConnector.APIConnector
. - connector_keys (Optional[List[str]], default None):
The list of
connector_keys
to filter by. - metric_keys (Optional[List[str]], default None):
The list of
metric_keys
to filter by. - location_keys (Optional[List[str]], default None):
The list of
location_keys
to filter by. - params (Optional[Dict[str, Any]], default None): A dictionary of parameters to filter by.
- debug (bool): Verbosity toggle.
Returns
- A list of tuples of strings (or
None
forlocation_key
) - in the form
(connector_keys, metric_key, location_key)
.
Examples
>>> fetch_pipes_keys(metric_keys=['weather'])
[('sql:main', 'weather', None)]