meerschaum.utils.interactive

Interactive "wizards" to guide the user.

 1#! /usr/bin/env python3
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Interactive "wizards" to guide the user.
 7"""
 8
 9from __future__ import annotations
10
11def select_pipes(
12        yes: bool = False,
13        force: bool = False,
14        debug: bool = False,
15    ) -> List[Pipe]:
16    """Prompt the user for the keys to identify a list of pipes."""
17    from meerschaum.utils.misc import get_connector_labels
18    from meerschaum.utils.prompt import prompt, choose, yes_no
19    from meerschaum.utils import get_pipes
20    from meerschaum.utils.formatting._shell import clear_screen
21    from meerschaum.utils.formatting import pprint_pipes
22    from meerschaum.config import get_config
23    from meerschaum.utils.warnings import warn
24    from meerschaum.utils.misc import flatten_pipes_dict
25    from meerschaum.connectors import instance_types
26    clear_screen(debug=debug)
27    while True:
28        instance = choose(
29            "On which instance are the pipes stored?",
30            get_connector_labels(*instance_types),
31            numeric = True,
32            default = get_config('meerschaum', 'instance'),
33        )
34        pipes_dict = get_pipes(instance=instance, debug=debug)
35        if pipes_dict:
36            break
37        warn(f"There are no pipes registered on instance '{instance}'.", stack=False)
38        print("    Please choose another instance.")
39
40    conn_keys = sorted(list(pipes_dict.keys()))
41    clear_screen(debug=debug)
42    chosen_conn_keys = choose(
43        "What are the connectors for the pipes?",
44        conn_keys,
45        numeric = True,
46        multiple = True,
47        default = (conn_keys[0] if len(conn_keys) == 1 else None)
48    )
49
50    pipes_dict = get_pipes(chosen_conn_keys, instance=instance, debug=debug)
51    metric_keys = []
52    for ck, metrics in pipes_dict.items():
53        for mk in metrics:
54            metric_keys.append(mk)
55    metric_keys = sorted(metric_keys)
56    clear_screen(debug=debug)
57    chosen_metric_keys = choose(
58        "What are the metrics for the pipes?",
59        metric_keys,
60        numeric = True,
61        multiple = True,
62        default = (metric_keys[0] if len(metric_keys) == 1 else None)
63    )
64   
65    pipes_dict = get_pipes(chosen_conn_keys, chosen_metric_keys, instance=instance, debug=debug)
66    location_keys = []
67    for ck, metrics in pipes_dict.items():
68        for mk, locations in metrics.items():
69            for lk, p in locations.items():
70                location_keys.append(str(lk))
71    location_keys = sorted(location_keys)
72    clear_screen(debug=debug)
73    chosen_location_keys = [(lk if lk != 'None' else None) for lk in choose(
74        "What are the locations for the pipes?",
75        location_keys,
76        numeric = True,
77        multiple = True,
78        default = (location_keys[0] if len(location_keys) == 1 else None)
79    )]
80
81    pipes_dict = get_pipes(
82        chosen_conn_keys,
83        chosen_metric_keys,
84        chosen_location_keys,
85        instance = instance,
86        debug = debug,
87    )
88    clear_screen(debug=debug)
89    pprint_pipes(pipes_dict)
90    if force or yes_no("Choose these pipes?", yes=yes):
91        return flatten_pipes_dict(pipes_dict)
92    return select_pipes(yes=yes, force=force, debug=debug)
def select_pipes( yes: bool = False, force: bool = False, debug: bool = False) -> 'List[Pipe]':
12def select_pipes(
13        yes: bool = False,
14        force: bool = False,
15        debug: bool = False,
16    ) -> List[Pipe]:
17    """Prompt the user for the keys to identify a list of pipes."""
18    from meerschaum.utils.misc import get_connector_labels
19    from meerschaum.utils.prompt import prompt, choose, yes_no
20    from meerschaum.utils import get_pipes
21    from meerschaum.utils.formatting._shell import clear_screen
22    from meerschaum.utils.formatting import pprint_pipes
23    from meerschaum.config import get_config
24    from meerschaum.utils.warnings import warn
25    from meerschaum.utils.misc import flatten_pipes_dict
26    from meerschaum.connectors import instance_types
27    clear_screen(debug=debug)
28    while True:
29        instance = choose(
30            "On which instance are the pipes stored?",
31            get_connector_labels(*instance_types),
32            numeric = True,
33            default = get_config('meerschaum', 'instance'),
34        )
35        pipes_dict = get_pipes(instance=instance, debug=debug)
36        if pipes_dict:
37            break
38        warn(f"There are no pipes registered on instance '{instance}'.", stack=False)
39        print("    Please choose another instance.")
40
41    conn_keys = sorted(list(pipes_dict.keys()))
42    clear_screen(debug=debug)
43    chosen_conn_keys = choose(
44        "What are the connectors for the pipes?",
45        conn_keys,
46        numeric = True,
47        multiple = True,
48        default = (conn_keys[0] if len(conn_keys) == 1 else None)
49    )
50
51    pipes_dict = get_pipes(chosen_conn_keys, instance=instance, debug=debug)
52    metric_keys = []
53    for ck, metrics in pipes_dict.items():
54        for mk in metrics:
55            metric_keys.append(mk)
56    metric_keys = sorted(metric_keys)
57    clear_screen(debug=debug)
58    chosen_metric_keys = choose(
59        "What are the metrics for the pipes?",
60        metric_keys,
61        numeric = True,
62        multiple = True,
63        default = (metric_keys[0] if len(metric_keys) == 1 else None)
64    )
65   
66    pipes_dict = get_pipes(chosen_conn_keys, chosen_metric_keys, instance=instance, debug=debug)
67    location_keys = []
68    for ck, metrics in pipes_dict.items():
69        for mk, locations in metrics.items():
70            for lk, p in locations.items():
71                location_keys.append(str(lk))
72    location_keys = sorted(location_keys)
73    clear_screen(debug=debug)
74    chosen_location_keys = [(lk if lk != 'None' else None) for lk in choose(
75        "What are the locations for the pipes?",
76        location_keys,
77        numeric = True,
78        multiple = True,
79        default = (location_keys[0] if len(location_keys) == 1 else None)
80    )]
81
82    pipes_dict = get_pipes(
83        chosen_conn_keys,
84        chosen_metric_keys,
85        chosen_location_keys,
86        instance = instance,
87        debug = debug,
88    )
89    clear_screen(debug=debug)
90    pprint_pipes(pipes_dict)
91    if force or yes_no("Choose these pipes?", yes=yes):
92        return flatten_pipes_dict(pipes_dict)
93    return select_pipes(yes=yes, force=force, debug=debug)

Prompt the user for the keys to identify a list of pipes.