meerschaum.utils.pipes

Define utilities for working with pipes.

  1#! /usr/bin/env python3
  2# vim:fenc=utf-8
  3
  4"""
  5Define utilities for working with pipes.
  6"""
  7
  8from __future__ import annotations
  9
 10from typing import Any, Dict, Callable
 11import re
 12import json
 13import ast
 14import copy
 15import uuid
 16
 17from meerschaum.utils.typing import PipesDict, Optional
 18import meerschaum as mrsm
 19
 20
 21def get_pipe_from_value(
 22    value: str | dict[str, str],
 23    _pipe: mrsm.Pipe | None = None,
 24) -> mrsm.Pipe | None:
 25    """
 26    Given an input value (string or dictionary), return the corresponding pipe.
 27
 28    Parameters
 29    ----------
 30    value: str | dict[str, str]
 31        The definition keys for the pipe to be constructed.
 32        This may either be the literal constructor string or a dictionary of constructor kwargs.
 33
 34    _pipe: mrsm.Pipe | None, default None
 35        If provided, use the instance connector for this reference pipe
 36        if an explicit instance connector is undefined.
 37
 38    Returns
 39    -------
 40    A `Pipe` corresponding to the given keys.
 41    """
 42    from meerschaum.utils.warnings import warn
 43    if isinstance(value, mrsm.Pipe):
 44        return value
 45
 46    if isinstance(value, str):
 47        return get_pipe_from_string(value, _pipe=_pipe)
 48
 49    if not isinstance(value, dict):
 50        raise ValueError("Expecting kwargs for a Pipe.")
 51
 52    if (
 53        _pipe is not None
 54        and not (
 55            'instance' in value
 56            or 'mrsm_instance' in value
 57            or 'instance_keys' in value
 58        )
 59    ):
 60        value = {**value}
 61        value['instance'] = _pipe.instance_keys
 62
 63    try:
 64        pipe = mrsm.Pipe(**value)
 65    except Exception as e:
 66        warn(f"Failed build pipe from value '{value}':\n{e}")
 67        return None
 68
 69    return pipe
 70
 71
 72def get_pipe_from_string(
 73    string: str,
 74    _pipe: mrsm.Pipe | None = None,
 75) -> mrsm.Pipe | None:
 76    """
 77    If a string is equal to a `Pipe` constructor, return the pipe.
 78
 79    Parameters
 80    ----------
 81    string: str
 82        The string containing the constructor syntax.
 83        Must be an exact match.
 84
 85    _pipe: mrsm.Pipe | None, default None
 86        If provided, use the instance connector for this reference pipe
 87        if an explicit instance connector is undefined.
 88
 89    Returns
 90    -------
 91    A `Pipe` corresponding the defined pipe in the input string.
 92    
 93    Examples
 94    --------
 95    >>> get_pipe_from_string('Pipe("foo", "bar")')
 96    Pipe('foo', 'bar')
 97    >>> get_pipe_from_string('mrsm.Pipe("spam", "eggs", instance="sql:local")')
 98    Pipe('spam', 'eggs', instance='sql:local')
 99    """
100    from meerschaum.utils.warnings import warn
101    from meerschaum.utils.misc import parse_arguments_str
102    pattern = r'(?:mrsm\.)?Pipe\((.*?)\)'
103    matches = list(re.finditer(pattern, string))
104    if not matches:
105        return None
106
107    try:
108        match = matches[0]
109        args_str = match.group(1)
110        args, kwargs = parse_arguments_str(args_str)
111        if (
112            _pipe is not None
113            and not (
114                'instance' in kwargs
115                or 'mrsm_instance' in kwargs
116                or 'instance_keys' in kwargs
117            )
118        ):
119            kwargs['instance'] = _pipe.instance_keys
120        pipe = mrsm.Pipe(*args, **kwargs)
121    except Exception as e:
122        warn(f"Failed to build pipe from string '{string}': \n{e}")
123        return None
124
125    return pipe
126
127
128def evaluate_pipe_access_chain(access_chain: str, pipe: mrsm.Pipe, _pipe=None):
129    """
130    Safely evaluate the access chain on a Pipe.
131    """
132    expr = f"pipe{access_chain}"
133    if pipe == _pipe and access_chain.lstrip('.').startswith('parameters'):
134        parameters_access_chain = access_chain.lstrip('.').split('parameters', maxsplit=1)[-1]
135        expr = f"pipe.attributes['parameters']{parameters_access_chain}"
136    tree = ast.parse(expr, mode='eval')
137
138    def _eval(node, context):
139        if isinstance(node, ast.Expression):
140            return _eval(node.body, context)
141
142        elif isinstance(node, ast.Name):
143            if node.id == "pipe":
144                return context
145            raise ValueError(f"Unknown variable: {node.id}")
146
147        elif isinstance(node, ast.Attribute):
148            value = _eval(node.value, context)
149            return getattr(value, node.attr)
150
151        elif isinstance(node, ast.Subscript):
152            value = _eval(node.value, context)
153            key = _eval(node.slice, context) if isinstance(node.slice, ast.Index) else _eval(node.slice, context)
154            return value[key]
155
156        elif isinstance(node, ast.Constant):  # Python 3.8+
157            return node.value
158
159        elif isinstance(node, ast.Str):  # Older Python
160            return node.s
161
162        elif isinstance(node, ast.Index):  # Older Python AST style
163            return _eval(node.value, context)
164
165        else:
166            raise TypeError(f"Unsupported AST node: {ast.dump(node)}")
167
168    return _eval(tree, pipe)
169
170
171
172def _evaluate_pipe_access_chain_from_match(pipe_match: re.Match, _pipe=None) -> Any:
173    """
174    Helper function to evaluate a pipe from a regex match object.
175    """
176    from meerschaum.utils.warnings import warn
177    from meerschaum.utils.misc import parse_arguments_str
178    from meerschaum.utils.sql import sql_item_name
179    try:
180        if 'self' in pipe_match.group(0) and _pipe is not None:
181            pipe = _pipe
182            access_chain = pipe_match.group(1)
183        else:
184            args_str = pipe_match.group(1)
185            access_chain = pipe_match.group(2)
186            args, kwargs = parse_arguments_str(args_str)
187            if (
188                _pipe is not None
189                and not (
190                    'instance' in kwargs
191                    or 'mrsm_instance' in kwargs
192                    or 'instance_keys' in kwargs
193                )
194            ):
195                kwargs['instance'] = _pipe.instance_keys
196            pipe = mrsm.Pipe(*args, **kwargs)
197    except Exception as e:
198        warn(f"Failed to parse pipe from template string:\n{e}")
199        raise e
200
201    if not access_chain:
202        target = pipe.target
203        schema = (
204            pipe.instance_connector.get_pipe_schema(pipe)
205            if hasattr(pipe.instance_connector, 'get_pipe_schema')
206            else None
207        )
208        return (
209            sql_item_name(target, pipe.instance_connector.flavor, schema)
210            if pipe.instance_connector.type == 'sql'
211            else pipe.target
212        )
213
214    return evaluate_pipe_access_chain(access_chain, pipe, _pipe=_pipe)
215
216
217def replace_pipes_syntax(text: str, _pipe=None) -> Any:
218    """
219    Parse a string containing the `{{ Pipe() }}` syntax.
220    """
221    from meerschaum.utils.warnings import warn
222    from meerschaum.utils.dtypes import json_serialize_value
223    pattern = r'\{\{\s*(?:mrsm\.)?Pipe\((.*?)\)((?:\.[\w]+|\[[^\]]+\])*)\s*\}\}'
224    self_pattern = r'\{\{\s*self((?:\.[\w]+|\[[^\]]+\])*)\s*\}\}'
225
226    matches = list(re.finditer(pattern, text))
227    if _pipe is not None:
228        self_matches = list(re.finditer(self_pattern, text))
229        matches.extend(self_matches)
230    if not matches:
231        return text
232
233    placeholders = {}
234    for match in matches:
235        placeholder = f"__mrsm_pipe_placeholder_{uuid.uuid4().hex}__"
236        placeholders[placeholder] = match
237
238    substituted_text = text
239    for placeholder, match in placeholders.items():
240        substituted_text = substituted_text.replace(match.group(0), placeholder)
241
242    resolved_values = {}
243    for placeholder, match in placeholders.items():
244        try:
245            resolved_values[placeholder] = _evaluate_pipe_access_chain_from_match(
246                match,
247                _pipe=_pipe,
248            )
249        except Exception:
250            import traceback
251            warn(f"Failed to resolve pipe syntax '{match.group(0)}': {traceback.format_exc()}")
252            resolved_values[placeholder] = match.group(0)
253
254    if len(matches) == 1:
255        match = matches[0]
256        placeholder = list(placeholders.keys())[0]
257        if text.strip() == match.group(0):
258            return resolved_values[placeholder]
259
260    final_text = substituted_text
261    for placeholder, value in resolved_values.items():
262        if isinstance(value, (dict, list, bool, int, float)) or value is None:
263            final_text = final_text.replace(placeholder, json.dumps(value, default=json_serialize_value))
264        else:
265            final_text = final_text.replace(placeholder, str(value))
266
267    return final_text
268
269
270def replace_pipes_in_dict(
271    pipes: Optional[PipesDict] = None,
272    func: Callable[[Any], Any] = str,
273    debug: bool = False,
274    **kw
275) -> PipesDict:
276    """
277    Replace the Pipes in a Pipes dict with the result of another function.
278
279    Parameters
280    ----------
281    pipes: Optional[PipesDict], default None
282        The pipes dict to be processed.
283
284    func: Callable[[Any], Any], default str
285        The function to be applied to every pipe.
286        Defaults to the string constructor.
287
288    debug: bool, default False
289        Verbosity toggle.
290
291    Returns
292    -------
293    A dictionary where every pipe is replaced with the output of a function.
294
295    """
296    def change_dict(d: Dict[Any, Any]) -> None:
297        for k, v in d.items():
298            if isinstance(v, dict):
299                change_dict(v)
300            elif isinstance(v, list):
301                d[k] = [func(i) for i in v]
302            elif isinstance(v, tuple):
303                d[k] = tuple([func(i) for i in v])
304            else:
305                d[k] = func(v)
306
307    if pipes is None:
308        from meerschaum import get_pipes
309        pipes = get_pipes(debug=debug, **kw)
310
311    result = copy.deepcopy(pipes)
312    change_dict(result)
313    return result
314
315
316def is_pipe_registered(
317    pipe: mrsm.Pipe,
318    pipes: PipesDict,
319    debug: bool = False
320) -> bool:
321    """
322    Check if a Pipe is inside the pipes dictionary.
323
324    Parameters
325    ----------
326    pipe: meerschaum.Pipe
327        The pipe to see if it's in the dictionary.
328
329    pipes: PipesDict
330        The dictionary to search inside.
331
332    debug: bool, default False
333        Verbosity toggle.
334
335    Returns
336    -------
337    A bool indicating whether the pipe is inside the dictionary.
338    """
339    ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key
340    try:
341        return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk]
342    except KeyError:
343        return False
344
345
346def pipes_dict_from_list(pipes_list: list[mrsm.Pipe]) -> PipesDict:
347    """
348    Return a PipesDict from a list of pipes.
349    Note that all pipes must share the same instance connector.
350
351    Examples
352    --------
353    >>> pipes_list = [mrsm.Pipe('a', 'b'), mrsm.Pipe('a', 'b', 'c')]
354    >>> pipes_dict_from_list(pipes)
355    {
356        'a': {
357            'b': {
358                None: Pipe('a', 'b')
359                'c': Pipe('a', 'b', 'c')
360            }
361        }
362    }
363    """
364    from meerschaum.utils.warnings import warn
365    from meerschaum.utils.misc import items_str
366    if not pipes_list:
367        return {}
368
369    instance_keys = pipes_list[0].instance_keys
370    same_pipes = [pipe for pipe in pipes_list if pipe.instance_keys == instance_keys]
371    diff_pipes = [pipe for pipe in pipes_list if pipe.instance_keys != instance_keys]
372    if diff_pipes:
373        warn(
374            f"Skipping pipes with instance keys different from '{instance_keys}':\n"
375            f"{items_str(diff_pipes, quotes=False)}"
376        )
377
378    pipes_dict: PipesDict = {}
379    for pipe in same_pipes:
380        if pipe.connector_keys not in pipes_dict:
381            pipes_dict[pipe.connector_keys] = {}
382        if pipe.metric_key not in pipes_dict[pipe.connector_key]:
383            pipes_dict[pipe.connector_keys][pipe.metric_key] = {}
384        pipes_dict[pipe.connector_keys][pipe.metric_key][pipe.location_key] = pipe
385
386    return pipes_dict
387
388
389def flatten_pipes_dict(pipes_dict: PipesDict) -> list[mrsm.Pipe]:
390    """
391    Convert the standard pipes dictionary into a list.
392
393    Parameters
394    ----------
395    pipes_dict: PipesDict
396        The pipes dictionary to be flattened.
397
398    Returns
399    -------
400    A list of `Pipe` objects.
401
402    """
403    pipes_list = []
404    for ck in pipes_dict.values():
405        for mk in ck.values():
406            pipes_list.extend(list(mk.values()))
407    return pipes_list
def get_pipe_from_value( value: str | dict[str, str], _pipe: meerschaum.Pipe | None = None) -> meerschaum.Pipe | None:
22def get_pipe_from_value(
23    value: str | dict[str, str],
24    _pipe: mrsm.Pipe | None = None,
25) -> mrsm.Pipe | None:
26    """
27    Given an input value (string or dictionary), return the corresponding pipe.
28
29    Parameters
30    ----------
31    value: str | dict[str, str]
32        The definition keys for the pipe to be constructed.
33        This may either be the literal constructor string or a dictionary of constructor kwargs.
34
35    _pipe: mrsm.Pipe | None, default None
36        If provided, use the instance connector for this reference pipe
37        if an explicit instance connector is undefined.
38
39    Returns
40    -------
41    A `Pipe` corresponding to the given keys.
42    """
43    from meerschaum.utils.warnings import warn
44    if isinstance(value, mrsm.Pipe):
45        return value
46
47    if isinstance(value, str):
48        return get_pipe_from_string(value, _pipe=_pipe)
49
50    if not isinstance(value, dict):
51        raise ValueError("Expecting kwargs for a Pipe.")
52
53    if (
54        _pipe is not None
55        and not (
56            'instance' in value
57            or 'mrsm_instance' in value
58            or 'instance_keys' in value
59        )
60    ):
61        value = {**value}
62        value['instance'] = _pipe.instance_keys
63
64    try:
65        pipe = mrsm.Pipe(**value)
66    except Exception as e:
67        warn(f"Failed build pipe from value '{value}':\n{e}")
68        return None
69
70    return pipe

Given an input value (string or dictionary), return the corresponding pipe.

Parameters
  • value (str | dict[str, str]): The definition keys for the pipe to be constructed. This may either be the literal constructor string or a dictionary of constructor kwargs.
  • _pipe (mrsm.Pipe | None, default None): If provided, use the instance connector for this reference pipe if an explicit instance connector is undefined.
Returns
  • A Pipe corresponding to the given keys.
def get_pipe_from_string( string: str, _pipe: meerschaum.Pipe | None = None) -> meerschaum.Pipe | None:
 73def get_pipe_from_string(
 74    string: str,
 75    _pipe: mrsm.Pipe | None = None,
 76) -> mrsm.Pipe | None:
 77    """
 78    If a string is equal to a `Pipe` constructor, return the pipe.
 79
 80    Parameters
 81    ----------
 82    string: str
 83        The string containing the constructor syntax.
 84        Must be an exact match.
 85
 86    _pipe: mrsm.Pipe | None, default None
 87        If provided, use the instance connector for this reference pipe
 88        if an explicit instance connector is undefined.
 89
 90    Returns
 91    -------
 92    A `Pipe` corresponding the defined pipe in the input string.
 93    
 94    Examples
 95    --------
 96    >>> get_pipe_from_string('Pipe("foo", "bar")')
 97    Pipe('foo', 'bar')
 98    >>> get_pipe_from_string('mrsm.Pipe("spam", "eggs", instance="sql:local")')
 99    Pipe('spam', 'eggs', instance='sql:local')
100    """
101    from meerschaum.utils.warnings import warn
102    from meerschaum.utils.misc import parse_arguments_str
103    pattern = r'(?:mrsm\.)?Pipe\((.*?)\)'
104    matches = list(re.finditer(pattern, string))
105    if not matches:
106        return None
107
108    try:
109        match = matches[0]
110        args_str = match.group(1)
111        args, kwargs = parse_arguments_str(args_str)
112        if (
113            _pipe is not None
114            and not (
115                'instance' in kwargs
116                or 'mrsm_instance' in kwargs
117                or 'instance_keys' in kwargs
118            )
119        ):
120            kwargs['instance'] = _pipe.instance_keys
121        pipe = mrsm.Pipe(*args, **kwargs)
122    except Exception as e:
123        warn(f"Failed to build pipe from string '{string}': \n{e}")
124        return None
125
126    return pipe

If a string is equal to a Pipe constructor, return the pipe.

Parameters
  • string (str): The string containing the constructor syntax. Must be an exact match.
  • _pipe (mrsm.Pipe | None, default None): If provided, use the instance connector for this reference pipe if an explicit instance connector is undefined.
Returns
  • A Pipe corresponding the defined pipe in the input string.
Examples
>>> get_pipe_from_string('Pipe("foo", "bar")')
Pipe('foo', 'bar')
>>> get_pipe_from_string('mrsm.Pipe("spam", "eggs", instance="sql:local")')
Pipe('spam', 'eggs', instance='sql:local')
def evaluate_pipe_access_chain(access_chain: str, pipe: meerschaum.Pipe, _pipe=None):
129def evaluate_pipe_access_chain(access_chain: str, pipe: mrsm.Pipe, _pipe=None):
130    """
131    Safely evaluate the access chain on a Pipe.
132    """
133    expr = f"pipe{access_chain}"
134    if pipe == _pipe and access_chain.lstrip('.').startswith('parameters'):
135        parameters_access_chain = access_chain.lstrip('.').split('parameters', maxsplit=1)[-1]
136        expr = f"pipe.attributes['parameters']{parameters_access_chain}"
137    tree = ast.parse(expr, mode='eval')
138
139    def _eval(node, context):
140        if isinstance(node, ast.Expression):
141            return _eval(node.body, context)
142
143        elif isinstance(node, ast.Name):
144            if node.id == "pipe":
145                return context
146            raise ValueError(f"Unknown variable: {node.id}")
147
148        elif isinstance(node, ast.Attribute):
149            value = _eval(node.value, context)
150            return getattr(value, node.attr)
151
152        elif isinstance(node, ast.Subscript):
153            value = _eval(node.value, context)
154            key = _eval(node.slice, context) if isinstance(node.slice, ast.Index) else _eval(node.slice, context)
155            return value[key]
156
157        elif isinstance(node, ast.Constant):  # Python 3.8+
158            return node.value
159
160        elif isinstance(node, ast.Str):  # Older Python
161            return node.s
162
163        elif isinstance(node, ast.Index):  # Older Python AST style
164            return _eval(node.value, context)
165
166        else:
167            raise TypeError(f"Unsupported AST node: {ast.dump(node)}")
168
169    return _eval(tree, pipe)

Safely evaluate the access chain on a Pipe.

def replace_pipes_syntax(text: str, _pipe=None) -> Any:
218def replace_pipes_syntax(text: str, _pipe=None) -> Any:
219    """
220    Parse a string containing the `{{ Pipe() }}` syntax.
221    """
222    from meerschaum.utils.warnings import warn
223    from meerschaum.utils.dtypes import json_serialize_value
224    pattern = r'\{\{\s*(?:mrsm\.)?Pipe\((.*?)\)((?:\.[\w]+|\[[^\]]+\])*)\s*\}\}'
225    self_pattern = r'\{\{\s*self((?:\.[\w]+|\[[^\]]+\])*)\s*\}\}'
226
227    matches = list(re.finditer(pattern, text))
228    if _pipe is not None:
229        self_matches = list(re.finditer(self_pattern, text))
230        matches.extend(self_matches)
231    if not matches:
232        return text
233
234    placeholders = {}
235    for match in matches:
236        placeholder = f"__mrsm_pipe_placeholder_{uuid.uuid4().hex}__"
237        placeholders[placeholder] = match
238
239    substituted_text = text
240    for placeholder, match in placeholders.items():
241        substituted_text = substituted_text.replace(match.group(0), placeholder)
242
243    resolved_values = {}
244    for placeholder, match in placeholders.items():
245        try:
246            resolved_values[placeholder] = _evaluate_pipe_access_chain_from_match(
247                match,
248                _pipe=_pipe,
249            )
250        except Exception:
251            import traceback
252            warn(f"Failed to resolve pipe syntax '{match.group(0)}': {traceback.format_exc()}")
253            resolved_values[placeholder] = match.group(0)
254
255    if len(matches) == 1:
256        match = matches[0]
257        placeholder = list(placeholders.keys())[0]
258        if text.strip() == match.group(0):
259            return resolved_values[placeholder]
260
261    final_text = substituted_text
262    for placeholder, value in resolved_values.items():
263        if isinstance(value, (dict, list, bool, int, float)) or value is None:
264            final_text = final_text.replace(placeholder, json.dumps(value, default=json_serialize_value))
265        else:
266            final_text = final_text.replace(placeholder, str(value))
267
268    return final_text

Parse a string containing the {{ Pipe() }} syntax.

def replace_pipes_in_dict( pipes: Optional[Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]]] = None, func: Callable[[Any], Any] = <class 'str'>, debug: bool = False, **kw) -> Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]]:
271def replace_pipes_in_dict(
272    pipes: Optional[PipesDict] = None,
273    func: Callable[[Any], Any] = str,
274    debug: bool = False,
275    **kw
276) -> PipesDict:
277    """
278    Replace the Pipes in a Pipes dict with the result of another function.
279
280    Parameters
281    ----------
282    pipes: Optional[PipesDict], default None
283        The pipes dict to be processed.
284
285    func: Callable[[Any], Any], default str
286        The function to be applied to every pipe.
287        Defaults to the string constructor.
288
289    debug: bool, default False
290        Verbosity toggle.
291
292    Returns
293    -------
294    A dictionary where every pipe is replaced with the output of a function.
295
296    """
297    def change_dict(d: Dict[Any, Any]) -> None:
298        for k, v in d.items():
299            if isinstance(v, dict):
300                change_dict(v)
301            elif isinstance(v, list):
302                d[k] = [func(i) for i in v]
303            elif isinstance(v, tuple):
304                d[k] = tuple([func(i) for i in v])
305            else:
306                d[k] = func(v)
307
308    if pipes is None:
309        from meerschaum import get_pipes
310        pipes = get_pipes(debug=debug, **kw)
311
312    result = copy.deepcopy(pipes)
313    change_dict(result)
314    return result

Replace the Pipes in a Pipes dict with the result of another function.

Parameters
  • pipes (Optional[PipesDict], default None): The pipes dict to be processed.
  • func (Callable[[Any], Any], default str): The function to be applied to every pipe. Defaults to the string constructor.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A dictionary where every pipe is replaced with the output of a function.
def is_pipe_registered( pipe: meerschaum.Pipe, pipes: Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]], debug: bool = False) -> bool:
317def is_pipe_registered(
318    pipe: mrsm.Pipe,
319    pipes: PipesDict,
320    debug: bool = False
321) -> bool:
322    """
323    Check if a Pipe is inside the pipes dictionary.
324
325    Parameters
326    ----------
327    pipe: meerschaum.Pipe
328        The pipe to see if it's in the dictionary.
329
330    pipes: PipesDict
331        The dictionary to search inside.
332
333    debug: bool, default False
334        Verbosity toggle.
335
336    Returns
337    -------
338    A bool indicating whether the pipe is inside the dictionary.
339    """
340    ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key
341    try:
342        return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk]
343    except KeyError:
344        return False

Check if a Pipe is inside the pipes dictionary.

Parameters
  • pipe (meerschaum.Pipe): The pipe to see if it's in the dictionary.
  • pipes (PipesDict): The dictionary to search inside.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating whether the pipe is inside the dictionary.
def pipes_dict_from_list( pipes_list: list[meerschaum.Pipe]) -> Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]]:
347def pipes_dict_from_list(pipes_list: list[mrsm.Pipe]) -> PipesDict:
348    """
349    Return a PipesDict from a list of pipes.
350    Note that all pipes must share the same instance connector.
351
352    Examples
353    --------
354    >>> pipes_list = [mrsm.Pipe('a', 'b'), mrsm.Pipe('a', 'b', 'c')]
355    >>> pipes_dict_from_list(pipes)
356    {
357        'a': {
358            'b': {
359                None: Pipe('a', 'b')
360                'c': Pipe('a', 'b', 'c')
361            }
362        }
363    }
364    """
365    from meerschaum.utils.warnings import warn
366    from meerschaum.utils.misc import items_str
367    if not pipes_list:
368        return {}
369
370    instance_keys = pipes_list[0].instance_keys
371    same_pipes = [pipe for pipe in pipes_list if pipe.instance_keys == instance_keys]
372    diff_pipes = [pipe for pipe in pipes_list if pipe.instance_keys != instance_keys]
373    if diff_pipes:
374        warn(
375            f"Skipping pipes with instance keys different from '{instance_keys}':\n"
376            f"{items_str(diff_pipes, quotes=False)}"
377        )
378
379    pipes_dict: PipesDict = {}
380    for pipe in same_pipes:
381        if pipe.connector_keys not in pipes_dict:
382            pipes_dict[pipe.connector_keys] = {}
383        if pipe.metric_key not in pipes_dict[pipe.connector_key]:
384            pipes_dict[pipe.connector_keys][pipe.metric_key] = {}
385        pipes_dict[pipe.connector_keys][pipe.metric_key][pipe.location_key] = pipe
386
387    return pipes_dict

Return a PipesDict from a list of pipes. Note that all pipes must share the same instance connector.

Examples
>>> pipes_list = [mrsm.Pipe('a', 'b'), mrsm.Pipe('a', 'b', 'c')]
>>> pipes_dict_from_list(pipes)
{
    'a': {
        'b': {
            None: Pipe('a', 'b')
            'c': Pipe('a', 'b', 'c')
        }
    }
}
def flatten_pipes_dict( pipes_dict: Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]]) -> list[meerschaum.Pipe]:
390def flatten_pipes_dict(pipes_dict: PipesDict) -> list[mrsm.Pipe]:
391    """
392    Convert the standard pipes dictionary into a list.
393
394    Parameters
395    ----------
396    pipes_dict: PipesDict
397        The pipes dictionary to be flattened.
398
399    Returns
400    -------
401    A list of `Pipe` objects.
402
403    """
404    pipes_list = []
405    for ck in pipes_dict.values():
406        for mk in ck.values():
407            pipes_list.extend(list(mk.values()))
408    return pipes_list

Convert the standard pipes dictionary into a list.

Parameters
  • pipes_dict (PipesDict): The pipes dictionary to be flattened.
Returns
  • A list of Pipe objects.