meerschaum.utils.pipes
Define utilities for working with pipes.
1#! /usr/bin/env python3 2# vim:fenc=utf-8 3 4""" 5Define utilities for working with pipes. 6""" 7 8from __future__ import annotations 9 10from typing import Any, Dict, Callable 11import re 12import json 13import ast 14import copy 15import uuid 16 17from meerschaum.utils.typing import PipesDict, Optional 18import meerschaum as mrsm 19 20 21def get_pipe_from_value( 22 value: str | dict[str, str], 23 _pipe: mrsm.Pipe | None = None, 24) -> mrsm.Pipe | None: 25 """ 26 Given an input value (string or dictionary), return the corresponding pipe. 27 28 Parameters 29 ---------- 30 value: str | dict[str, str] 31 The definition keys for the pipe to be constructed. 32 This may either be the literal constructor string or a dictionary of constructor kwargs. 33 34 _pipe: mrsm.Pipe | None, default None 35 If provided, use the instance connector for this reference pipe 36 if an explicit instance connector is undefined. 37 38 Returns 39 ------- 40 A `Pipe` corresponding to the given keys. 41 """ 42 from meerschaum.utils.warnings import warn 43 if isinstance(value, mrsm.Pipe): 44 return value 45 46 if isinstance(value, str): 47 return get_pipe_from_string(value, _pipe=_pipe) 48 49 if not isinstance(value, dict): 50 raise ValueError("Expecting kwargs for a Pipe.") 51 52 if ( 53 _pipe is not None 54 and not ( 55 'instance' in value 56 or 'mrsm_instance' in value 57 or 'instance_keys' in value 58 ) 59 ): 60 value = {**value} 61 value['instance'] = _pipe.instance_keys 62 63 try: 64 pipe = mrsm.Pipe(**value) 65 except Exception as e: 66 warn(f"Failed build pipe from value '{value}':\n{e}") 67 return None 68 69 return pipe 70 71 72def get_pipe_from_string( 73 string: str, 74 _pipe: mrsm.Pipe | None = None, 75) -> mrsm.Pipe | None: 76 """ 77 If a string is equal to a `Pipe` constructor, return the pipe. 78 79 Parameters 80 ---------- 81 string: str 82 The string containing the constructor syntax. 83 Must be an exact match. 84 85 _pipe: mrsm.Pipe | None, default None 86 If provided, use the instance connector for this reference pipe 87 if an explicit instance connector is undefined. 88 89 Returns 90 ------- 91 A `Pipe` corresponding the defined pipe in the input string. 92 93 Examples 94 -------- 95 >>> get_pipe_from_string('Pipe("foo", "bar")') 96 Pipe('foo', 'bar') 97 >>> get_pipe_from_string('mrsm.Pipe("spam", "eggs", instance="sql:local")') 98 Pipe('spam', 'eggs', instance='sql:local') 99 """ 100 from meerschaum.utils.warnings import warn 101 from meerschaum.utils.misc import parse_arguments_str 102 pattern = r'(?:mrsm\.)?Pipe\((.*?)\)' 103 matches = list(re.finditer(pattern, string)) 104 if not matches: 105 return None 106 107 try: 108 match = matches[0] 109 args_str = match.group(1) 110 args, kwargs = parse_arguments_str(args_str) 111 if ( 112 _pipe is not None 113 and not ( 114 'instance' in kwargs 115 or 'mrsm_instance' in kwargs 116 or 'instance_keys' in kwargs 117 ) 118 ): 119 kwargs['instance'] = _pipe.instance_keys 120 pipe = mrsm.Pipe(*args, **kwargs) 121 except Exception as e: 122 warn(f"Failed to build pipe from string '{string}': \n{e}") 123 return None 124 125 return pipe 126 127 128def evaluate_pipe_access_chain(access_chain: str, pipe: mrsm.Pipe, _pipe=None): 129 """ 130 Safely evaluate the access chain on a Pipe. 131 """ 132 expr = f"pipe{access_chain}" 133 if pipe == _pipe and access_chain.lstrip('.').startswith('parameters'): 134 parameters_access_chain = access_chain.lstrip('.').split('parameters', maxsplit=1)[-1] 135 expr = f"pipe.attributes['parameters']{parameters_access_chain}" 136 tree = ast.parse(expr, mode='eval') 137 138 def _eval(node, context): 139 if isinstance(node, ast.Expression): 140 return _eval(node.body, context) 141 142 elif isinstance(node, ast.Name): 143 if node.id == "pipe": 144 return context 145 raise ValueError(f"Unknown variable: {node.id}") 146 147 elif isinstance(node, ast.Attribute): 148 value = _eval(node.value, context) 149 return getattr(value, node.attr) 150 151 elif isinstance(node, ast.Subscript): 152 value = _eval(node.value, context) 153 key = _eval(node.slice, context) if isinstance(node.slice, ast.Index) else _eval(node.slice, context) 154 return value[key] 155 156 elif isinstance(node, ast.Constant): # Python 3.8+ 157 return node.value 158 159 elif isinstance(node, ast.Str): # Older Python 160 return node.s 161 162 elif isinstance(node, ast.Index): # Older Python AST style 163 return _eval(node.value, context) 164 165 else: 166 raise TypeError(f"Unsupported AST node: {ast.dump(node)}") 167 168 return _eval(tree, pipe) 169 170 171 172def _evaluate_pipe_access_chain_from_match(pipe_match: re.Match, _pipe=None) -> Any: 173 """ 174 Helper function to evaluate a pipe from a regex match object. 175 """ 176 from meerschaum.utils.warnings import warn 177 from meerschaum.utils.misc import parse_arguments_str 178 from meerschaum.utils.sql import sql_item_name 179 try: 180 if 'self' in pipe_match.group(0) and _pipe is not None: 181 pipe = _pipe 182 access_chain = pipe_match.group(1) 183 else: 184 args_str = pipe_match.group(1) 185 access_chain = pipe_match.group(2) 186 args, kwargs = parse_arguments_str(args_str) 187 if ( 188 _pipe is not None 189 and not ( 190 'instance' in kwargs 191 or 'mrsm_instance' in kwargs 192 or 'instance_keys' in kwargs 193 ) 194 ): 195 kwargs['instance'] = _pipe.instance_keys 196 pipe = mrsm.Pipe(*args, **kwargs) 197 except Exception as e: 198 warn(f"Failed to parse pipe from template string:\n{e}") 199 raise e 200 201 if not access_chain: 202 target = pipe.target 203 schema = ( 204 pipe.instance_connector.get_pipe_schema(pipe) 205 if hasattr(pipe.instance_connector, 'get_pipe_schema') 206 else None 207 ) 208 return ( 209 sql_item_name(target, pipe.instance_connector.flavor, schema) 210 if pipe.instance_connector.type == 'sql' 211 else pipe.target 212 ) 213 214 return evaluate_pipe_access_chain(access_chain, pipe, _pipe=_pipe) 215 216 217def replace_pipes_syntax(text: str, _pipe=None) -> Any: 218 """ 219 Parse a string containing the `{{ Pipe() }}` syntax. 220 """ 221 from meerschaum.utils.warnings import warn 222 from meerschaum.utils.dtypes import json_serialize_value 223 pattern = r'\{\{\s*(?:mrsm\.)?Pipe\((.*?)\)((?:\.[\w]+|\[[^\]]+\])*)\s*\}\}' 224 self_pattern = r'\{\{\s*self((?:\.[\w]+|\[[^\]]+\])*)\s*\}\}' 225 226 matches = list(re.finditer(pattern, text)) 227 if _pipe is not None: 228 self_matches = list(re.finditer(self_pattern, text)) 229 matches.extend(self_matches) 230 if not matches: 231 return text 232 233 placeholders = {} 234 for match in matches: 235 placeholder = f"__mrsm_pipe_placeholder_{uuid.uuid4().hex}__" 236 placeholders[placeholder] = match 237 238 substituted_text = text 239 for placeholder, match in placeholders.items(): 240 substituted_text = substituted_text.replace(match.group(0), placeholder) 241 242 resolved_values = {} 243 for placeholder, match in placeholders.items(): 244 try: 245 resolved_values[placeholder] = _evaluate_pipe_access_chain_from_match( 246 match, 247 _pipe=_pipe, 248 ) 249 except Exception: 250 import traceback 251 warn(f"Failed to resolve pipe syntax '{match.group(0)}': {traceback.format_exc()}") 252 resolved_values[placeholder] = match.group(0) 253 254 if len(matches) == 1: 255 match = matches[0] 256 placeholder = list(placeholders.keys())[0] 257 if text.strip() == match.group(0): 258 return resolved_values[placeholder] 259 260 final_text = substituted_text 261 for placeholder, value in resolved_values.items(): 262 if isinstance(value, (dict, list, bool, int, float)) or value is None: 263 final_text = final_text.replace(placeholder, json.dumps(value, default=json_serialize_value)) 264 else: 265 final_text = final_text.replace(placeholder, str(value)) 266 267 return final_text 268 269 270def replace_pipes_in_dict( 271 pipes: Optional[PipesDict] = None, 272 func: Callable[[Any], Any] = str, 273 debug: bool = False, 274 **kw 275) -> PipesDict: 276 """ 277 Replace the Pipes in a Pipes dict with the result of another function. 278 279 Parameters 280 ---------- 281 pipes: Optional[PipesDict], default None 282 The pipes dict to be processed. 283 284 func: Callable[[Any], Any], default str 285 The function to be applied to every pipe. 286 Defaults to the string constructor. 287 288 debug: bool, default False 289 Verbosity toggle. 290 291 Returns 292 ------- 293 A dictionary where every pipe is replaced with the output of a function. 294 295 """ 296 def change_dict(d: Dict[Any, Any]) -> None: 297 for k, v in d.items(): 298 if isinstance(v, dict): 299 change_dict(v) 300 elif isinstance(v, list): 301 d[k] = [func(i) for i in v] 302 elif isinstance(v, tuple): 303 d[k] = tuple([func(i) for i in v]) 304 else: 305 d[k] = func(v) 306 307 if pipes is None: 308 from meerschaum import get_pipes 309 pipes = get_pipes(debug=debug, **kw) 310 311 result = copy.deepcopy(pipes) 312 change_dict(result) 313 return result 314 315 316def is_pipe_registered( 317 pipe: mrsm.Pipe, 318 pipes: PipesDict, 319 debug: bool = False 320) -> bool: 321 """ 322 Check if a Pipe is inside the pipes dictionary. 323 324 Parameters 325 ---------- 326 pipe: meerschaum.Pipe 327 The pipe to see if it's in the dictionary. 328 329 pipes: PipesDict 330 The dictionary to search inside. 331 332 debug: bool, default False 333 Verbosity toggle. 334 335 Returns 336 ------- 337 A bool indicating whether the pipe is inside the dictionary. 338 """ 339 ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key 340 try: 341 return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk] 342 except KeyError: 343 return False 344 345 346def pipes_dict_from_list(pipes_list: list[mrsm.Pipe]) -> PipesDict: 347 """ 348 Return a PipesDict from a list of pipes. 349 Note that all pipes must share the same instance connector. 350 351 Examples 352 -------- 353 >>> pipes_list = [mrsm.Pipe('a', 'b'), mrsm.Pipe('a', 'b', 'c')] 354 >>> pipes_dict_from_list(pipes) 355 { 356 'a': { 357 'b': { 358 None: Pipe('a', 'b') 359 'c': Pipe('a', 'b', 'c') 360 } 361 } 362 } 363 """ 364 from meerschaum.utils.warnings import warn 365 from meerschaum.utils.misc import items_str 366 if not pipes_list: 367 return {} 368 369 instance_keys = pipes_list[0].instance_keys 370 same_pipes = [pipe for pipe in pipes_list if pipe.instance_keys == instance_keys] 371 diff_pipes = [pipe for pipe in pipes_list if pipe.instance_keys != instance_keys] 372 if diff_pipes: 373 warn( 374 f"Skipping pipes with instance keys different from '{instance_keys}':\n" 375 f"{items_str(diff_pipes, quotes=False)}" 376 ) 377 378 pipes_dict: PipesDict = {} 379 for pipe in same_pipes: 380 if pipe.connector_keys not in pipes_dict: 381 pipes_dict[pipe.connector_keys] = {} 382 if pipe.metric_key not in pipes_dict[pipe.connector_key]: 383 pipes_dict[pipe.connector_keys][pipe.metric_key] = {} 384 pipes_dict[pipe.connector_keys][pipe.metric_key][pipe.location_key] = pipe 385 386 return pipes_dict 387 388 389def flatten_pipes_dict(pipes_dict: PipesDict) -> list[mrsm.Pipe]: 390 """ 391 Convert the standard pipes dictionary into a list. 392 393 Parameters 394 ---------- 395 pipes_dict: PipesDict 396 The pipes dictionary to be flattened. 397 398 Returns 399 ------- 400 A list of `Pipe` objects. 401 402 """ 403 pipes_list = [] 404 for ck in pipes_dict.values(): 405 for mk in ck.values(): 406 pipes_list.extend(list(mk.values())) 407 return pipes_list
def
get_pipe_from_value( value: str | dict[str, str], _pipe: meerschaum.Pipe | None = None) -> meerschaum.Pipe | None:
22def get_pipe_from_value( 23 value: str | dict[str, str], 24 _pipe: mrsm.Pipe | None = None, 25) -> mrsm.Pipe | None: 26 """ 27 Given an input value (string or dictionary), return the corresponding pipe. 28 29 Parameters 30 ---------- 31 value: str | dict[str, str] 32 The definition keys for the pipe to be constructed. 33 This may either be the literal constructor string or a dictionary of constructor kwargs. 34 35 _pipe: mrsm.Pipe | None, default None 36 If provided, use the instance connector for this reference pipe 37 if an explicit instance connector is undefined. 38 39 Returns 40 ------- 41 A `Pipe` corresponding to the given keys. 42 """ 43 from meerschaum.utils.warnings import warn 44 if isinstance(value, mrsm.Pipe): 45 return value 46 47 if isinstance(value, str): 48 return get_pipe_from_string(value, _pipe=_pipe) 49 50 if not isinstance(value, dict): 51 raise ValueError("Expecting kwargs for a Pipe.") 52 53 if ( 54 _pipe is not None 55 and not ( 56 'instance' in value 57 or 'mrsm_instance' in value 58 or 'instance_keys' in value 59 ) 60 ): 61 value = {**value} 62 value['instance'] = _pipe.instance_keys 63 64 try: 65 pipe = mrsm.Pipe(**value) 66 except Exception as e: 67 warn(f"Failed build pipe from value '{value}':\n{e}") 68 return None 69 70 return pipe
Given an input value (string or dictionary), return the corresponding pipe.
Parameters
- value (str | dict[str, str]): The definition keys for the pipe to be constructed. This may either be the literal constructor string or a dictionary of constructor kwargs.
- _pipe (mrsm.Pipe | None, default None): If provided, use the instance connector for this reference pipe if an explicit instance connector is undefined.
Returns
- A
Pipecorresponding to the given keys.
def
get_pipe_from_string( string: str, _pipe: meerschaum.Pipe | None = None) -> meerschaum.Pipe | None:
73def get_pipe_from_string( 74 string: str, 75 _pipe: mrsm.Pipe | None = None, 76) -> mrsm.Pipe | None: 77 """ 78 If a string is equal to a `Pipe` constructor, return the pipe. 79 80 Parameters 81 ---------- 82 string: str 83 The string containing the constructor syntax. 84 Must be an exact match. 85 86 _pipe: mrsm.Pipe | None, default None 87 If provided, use the instance connector for this reference pipe 88 if an explicit instance connector is undefined. 89 90 Returns 91 ------- 92 A `Pipe` corresponding the defined pipe in the input string. 93 94 Examples 95 -------- 96 >>> get_pipe_from_string('Pipe("foo", "bar")') 97 Pipe('foo', 'bar') 98 >>> get_pipe_from_string('mrsm.Pipe("spam", "eggs", instance="sql:local")') 99 Pipe('spam', 'eggs', instance='sql:local') 100 """ 101 from meerschaum.utils.warnings import warn 102 from meerschaum.utils.misc import parse_arguments_str 103 pattern = r'(?:mrsm\.)?Pipe\((.*?)\)' 104 matches = list(re.finditer(pattern, string)) 105 if not matches: 106 return None 107 108 try: 109 match = matches[0] 110 args_str = match.group(1) 111 args, kwargs = parse_arguments_str(args_str) 112 if ( 113 _pipe is not None 114 and not ( 115 'instance' in kwargs 116 or 'mrsm_instance' in kwargs 117 or 'instance_keys' in kwargs 118 ) 119 ): 120 kwargs['instance'] = _pipe.instance_keys 121 pipe = mrsm.Pipe(*args, **kwargs) 122 except Exception as e: 123 warn(f"Failed to build pipe from string '{string}': \n{e}") 124 return None 125 126 return pipe
If a string is equal to a Pipe constructor, return the pipe.
Parameters
- string (str): The string containing the constructor syntax. Must be an exact match.
- _pipe (mrsm.Pipe | None, default None): If provided, use the instance connector for this reference pipe if an explicit instance connector is undefined.
Returns
- A
Pipecorresponding the defined pipe in the input string.
Examples
>>> get_pipe_from_string('Pipe("foo", "bar")')
Pipe('foo', 'bar')
>>> get_pipe_from_string('mrsm.Pipe("spam", "eggs", instance="sql:local")')
Pipe('spam', 'eggs', instance='sql:local')
129def evaluate_pipe_access_chain(access_chain: str, pipe: mrsm.Pipe, _pipe=None): 130 """ 131 Safely evaluate the access chain on a Pipe. 132 """ 133 expr = f"pipe{access_chain}" 134 if pipe == _pipe and access_chain.lstrip('.').startswith('parameters'): 135 parameters_access_chain = access_chain.lstrip('.').split('parameters', maxsplit=1)[-1] 136 expr = f"pipe.attributes['parameters']{parameters_access_chain}" 137 tree = ast.parse(expr, mode='eval') 138 139 def _eval(node, context): 140 if isinstance(node, ast.Expression): 141 return _eval(node.body, context) 142 143 elif isinstance(node, ast.Name): 144 if node.id == "pipe": 145 return context 146 raise ValueError(f"Unknown variable: {node.id}") 147 148 elif isinstance(node, ast.Attribute): 149 value = _eval(node.value, context) 150 return getattr(value, node.attr) 151 152 elif isinstance(node, ast.Subscript): 153 value = _eval(node.value, context) 154 key = _eval(node.slice, context) if isinstance(node.slice, ast.Index) else _eval(node.slice, context) 155 return value[key] 156 157 elif isinstance(node, ast.Constant): # Python 3.8+ 158 return node.value 159 160 elif isinstance(node, ast.Str): # Older Python 161 return node.s 162 163 elif isinstance(node, ast.Index): # Older Python AST style 164 return _eval(node.value, context) 165 166 else: 167 raise TypeError(f"Unsupported AST node: {ast.dump(node)}") 168 169 return _eval(tree, pipe)
Safely evaluate the access chain on a Pipe.
def
replace_pipes_syntax(text: str, _pipe=None) -> Any:
218def replace_pipes_syntax(text: str, _pipe=None) -> Any: 219 """ 220 Parse a string containing the `{{ Pipe() }}` syntax. 221 """ 222 from meerschaum.utils.warnings import warn 223 from meerschaum.utils.dtypes import json_serialize_value 224 pattern = r'\{\{\s*(?:mrsm\.)?Pipe\((.*?)\)((?:\.[\w]+|\[[^\]]+\])*)\s*\}\}' 225 self_pattern = r'\{\{\s*self((?:\.[\w]+|\[[^\]]+\])*)\s*\}\}' 226 227 matches = list(re.finditer(pattern, text)) 228 if _pipe is not None: 229 self_matches = list(re.finditer(self_pattern, text)) 230 matches.extend(self_matches) 231 if not matches: 232 return text 233 234 placeholders = {} 235 for match in matches: 236 placeholder = f"__mrsm_pipe_placeholder_{uuid.uuid4().hex}__" 237 placeholders[placeholder] = match 238 239 substituted_text = text 240 for placeholder, match in placeholders.items(): 241 substituted_text = substituted_text.replace(match.group(0), placeholder) 242 243 resolved_values = {} 244 for placeholder, match in placeholders.items(): 245 try: 246 resolved_values[placeholder] = _evaluate_pipe_access_chain_from_match( 247 match, 248 _pipe=_pipe, 249 ) 250 except Exception: 251 import traceback 252 warn(f"Failed to resolve pipe syntax '{match.group(0)}': {traceback.format_exc()}") 253 resolved_values[placeholder] = match.group(0) 254 255 if len(matches) == 1: 256 match = matches[0] 257 placeholder = list(placeholders.keys())[0] 258 if text.strip() == match.group(0): 259 return resolved_values[placeholder] 260 261 final_text = substituted_text 262 for placeholder, value in resolved_values.items(): 263 if isinstance(value, (dict, list, bool, int, float)) or value is None: 264 final_text = final_text.replace(placeholder, json.dumps(value, default=json_serialize_value)) 265 else: 266 final_text = final_text.replace(placeholder, str(value)) 267 268 return final_text
Parse a string containing the {{ Pipe() }} syntax.
def
replace_pipes_in_dict( pipes: Optional[Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]]] = None, func: Callable[[Any], Any] = <class 'str'>, debug: bool = False, **kw) -> Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]]:
271def replace_pipes_in_dict( 272 pipes: Optional[PipesDict] = None, 273 func: Callable[[Any], Any] = str, 274 debug: bool = False, 275 **kw 276) -> PipesDict: 277 """ 278 Replace the Pipes in a Pipes dict with the result of another function. 279 280 Parameters 281 ---------- 282 pipes: Optional[PipesDict], default None 283 The pipes dict to be processed. 284 285 func: Callable[[Any], Any], default str 286 The function to be applied to every pipe. 287 Defaults to the string constructor. 288 289 debug: bool, default False 290 Verbosity toggle. 291 292 Returns 293 ------- 294 A dictionary where every pipe is replaced with the output of a function. 295 296 """ 297 def change_dict(d: Dict[Any, Any]) -> None: 298 for k, v in d.items(): 299 if isinstance(v, dict): 300 change_dict(v) 301 elif isinstance(v, list): 302 d[k] = [func(i) for i in v] 303 elif isinstance(v, tuple): 304 d[k] = tuple([func(i) for i in v]) 305 else: 306 d[k] = func(v) 307 308 if pipes is None: 309 from meerschaum import get_pipes 310 pipes = get_pipes(debug=debug, **kw) 311 312 result = copy.deepcopy(pipes) 313 change_dict(result) 314 return result
Replace the Pipes in a Pipes dict with the result of another function.
Parameters
- pipes (Optional[PipesDict], default None): The pipes dict to be processed.
- func (Callable[[Any], Any], default str): The function to be applied to every pipe. Defaults to the string constructor.
- debug (bool, default False): Verbosity toggle.
Returns
- A dictionary where every pipe is replaced with the output of a function.
def
is_pipe_registered( pipe: meerschaum.Pipe, pipes: Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]], debug: bool = False) -> bool:
317def is_pipe_registered( 318 pipe: mrsm.Pipe, 319 pipes: PipesDict, 320 debug: bool = False 321) -> bool: 322 """ 323 Check if a Pipe is inside the pipes dictionary. 324 325 Parameters 326 ---------- 327 pipe: meerschaum.Pipe 328 The pipe to see if it's in the dictionary. 329 330 pipes: PipesDict 331 The dictionary to search inside. 332 333 debug: bool, default False 334 Verbosity toggle. 335 336 Returns 337 ------- 338 A bool indicating whether the pipe is inside the dictionary. 339 """ 340 ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key 341 try: 342 return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk] 343 except KeyError: 344 return False
Check if a Pipe is inside the pipes dictionary.
Parameters
- pipe (meerschaum.Pipe): The pipe to see if it's in the dictionary.
- pipes (PipesDict): The dictionary to search inside.
- debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating whether the pipe is inside the dictionary.
def
pipes_dict_from_list( pipes_list: list[meerschaum.Pipe]) -> Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]]:
347def pipes_dict_from_list(pipes_list: list[mrsm.Pipe]) -> PipesDict: 348 """ 349 Return a PipesDict from a list of pipes. 350 Note that all pipes must share the same instance connector. 351 352 Examples 353 -------- 354 >>> pipes_list = [mrsm.Pipe('a', 'b'), mrsm.Pipe('a', 'b', 'c')] 355 >>> pipes_dict_from_list(pipes) 356 { 357 'a': { 358 'b': { 359 None: Pipe('a', 'b') 360 'c': Pipe('a', 'b', 'c') 361 } 362 } 363 } 364 """ 365 from meerschaum.utils.warnings import warn 366 from meerschaum.utils.misc import items_str 367 if not pipes_list: 368 return {} 369 370 instance_keys = pipes_list[0].instance_keys 371 same_pipes = [pipe for pipe in pipes_list if pipe.instance_keys == instance_keys] 372 diff_pipes = [pipe for pipe in pipes_list if pipe.instance_keys != instance_keys] 373 if diff_pipes: 374 warn( 375 f"Skipping pipes with instance keys different from '{instance_keys}':\n" 376 f"{items_str(diff_pipes, quotes=False)}" 377 ) 378 379 pipes_dict: PipesDict = {} 380 for pipe in same_pipes: 381 if pipe.connector_keys not in pipes_dict: 382 pipes_dict[pipe.connector_keys] = {} 383 if pipe.metric_key not in pipes_dict[pipe.connector_key]: 384 pipes_dict[pipe.connector_keys][pipe.metric_key] = {} 385 pipes_dict[pipe.connector_keys][pipe.metric_key][pipe.location_key] = pipe 386 387 return pipes_dict
Return a PipesDict from a list of pipes. Note that all pipes must share the same instance connector.
Examples
>>> pipes_list = [mrsm.Pipe('a', 'b'), mrsm.Pipe('a', 'b', 'c')]
>>> pipes_dict_from_list(pipes)
{
'a': {
'b': {
None: Pipe('a', 'b')
'c': Pipe('a', 'b', 'c')
}
}
}
def
flatten_pipes_dict( pipes_dict: Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]]) -> list[meerschaum.Pipe]:
390def flatten_pipes_dict(pipes_dict: PipesDict) -> list[mrsm.Pipe]: 391 """ 392 Convert the standard pipes dictionary into a list. 393 394 Parameters 395 ---------- 396 pipes_dict: PipesDict 397 The pipes dictionary to be flattened. 398 399 Returns 400 ------- 401 A list of `Pipe` objects. 402 403 """ 404 pipes_list = [] 405 for ck in pipes_dict.values(): 406 for mk in ck.values(): 407 pipes_list.extend(list(mk.values())) 408 return pipes_list
Convert the standard pipes dictionary into a list.
Parameters
- pipes_dict (PipesDict): The pipes dictionary to be flattened.
Returns
- A list of
Pipeobjects.