meerschaum.utils.formatting

Utilities for formatting output text

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Utilities for formatting output text
  7"""
  8
  9from __future__ import annotations
 10import platform
 11import os
 12import sys
 13from meerschaum.utils.typing import Optional, Union, Any
 14from meerschaum.utils.formatting._shell import make_header
 15from meerschaum.utils.formatting._pprint import pprint
 16from meerschaum.utils.formatting._pipes import (
 17    pprint_pipes,
 18    highlight_pipes,
 19    format_pipe_success_tuple,
 20    print_pipes_results,
 21    extract_stats_from_message,
 22    pipe_repr,
 23)
 24from meerschaum.utils.threading import Lock, RLock
 25
 26_attrs = {
 27    'ANSI': None,
 28    'UNICODE': None,
 29    'CHARSET': None,
 30}
 31__all__ = sorted([
 32    'ANSI', 'CHARSET', 'UNICODE',
 33    'colored',
 34    'translate_rich_to_termcolor',
 35    'get_console',
 36    'print_tuple',
 37    'print_options',
 38    'fill_ansi',
 39    'pprint',
 40    'highlight_pipes',
 41    'pprint_pipes',
 42    'make_header',
 43    'pipe_repr',
 44    'print_pipes_results',
 45    'extract_stats_from_message',
 46])
 47__pdoc__ = {}
 48_locks = {
 49    '_colorama_init': RLock(),
 50}
 51
 52
 53def colored_fallback(*args, **kw):
 54    return ' '.join(args)
 55
 56def translate_rich_to_termcolor(*colors) -> tuple:
 57    """Translate between rich and more_termcolor terminology.
 58    This is probably prone to breaking.
 59
 60    Parameters
 61    ----------
 62    *colors :
 63        
 64
 65    Returns
 66    -------
 67
 68    """
 69    _colors = []
 70    for c in colors:
 71        _c_list = []
 72        ### handle 'bright'
 73        c = c.replace('bright_', 'bright ')
 74
 75        ### handle 'on'
 76        if ' on ' in c:
 77            _on = c.split(' on ')
 78            _colors.append(_on[0])
 79            for _c in _on[1:]:
 80                _c_list.append('on ' + _c)
 81        else:
 82            _c_list += [c]
 83
 84        _colors += _c_list
 85
 86    return tuple(_colors)
 87
 88
 89def rich_text_to_str(text: 'rich.text.Text') -> str:
 90    """Convert a `rich.text.Text` object to a string with ANSI in-tact."""
 91    _console = get_console()
 92    if _console is None:
 93        return str(text)
 94    with console.capture() as cap:
 95        console.print(text)
 96    string = cap.get()
 97    return string[:-1]
 98
 99
100def _init():
101    """
102    Initial color settings (mostly for Windows).
103    """
104    if platform.system() != "Windows":
105        return
106    if 'PYTHONIOENCODING' not in os.environ:
107        os.environ['PYTHONIOENCODING'] = 'utf-8'
108    if 'PYTHONLEGACYWINDOWSSTDIO' not in os.environ:
109        os.environ['PYTHONLEGACYWINDOWSSTDIO'] = 'utf-8'
110    sys.stdin.reconfigure(encoding='utf-8')
111    sys.stdout.reconfigure(encoding='utf-8')
112    sys.stderr.reconfigure(encoding='utf-8')
113
114    from ctypes import windll
115    k = windll.kernel32
116    k.SetConsoleMode(k.GetStdHandle(-11), 7)
117    os.system("color")
118
119    from meerschaum.utils.packages import attempt_import
120    ### init colorama for Windows color output
121    colorama, more_termcolor = attempt_import(
122        'colorama',
123        'more_termcolor',
124        lazy = False,
125        warn = False,
126        color = False,
127    )
128    try:
129        colorama.init(autoreset=False)
130        success = True
131    except Exception as e:
132        import traceback
133        traceback.print_exc()
134        _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii'
135        success = False
136
137    if more_termcolor is None:
138        _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii'
139        success = False
140
141    return success
142
143_colorama_init = False
144def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']:
145    """Apply colors and rich styles to a string.
146    If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string.
147    Otherwise attempt to use the legacy `more_termcolor.colored` method.
148
149    Parameters
150    ----------
151    text: str
152        The string to apply formatting to.
153        
154    *colors:
155        A list of colors to pass to `more_termcolor.colored()`.
156        Has no effect if `style` is provided.
157
158    style: str, default None
159        If provided, pass to `rich` for processing.
160
161    as_rich_text: bool, default False
162        If `True`, return a `rich.Text` object.
163        `style` must be provided.
164        
165    **kw:
166        Keyword arguments to pass to `rich.text.Text` or `more_termcolor`.
167        
168
169    Returns
170    -------
171    An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`.
172
173    """
174    from meerschaum.utils.packages import import_rich, attempt_import
175    global _colorama_init
176    _init()
177    with _locks['_colorama_init']:
178        if not _colorama_init:
179            _colorama_init = _init()
180
181    if 'style' in kw:
182        rich = import_rich()
183        rich_text = attempt_import('rich.text')
184        text_obj = rich_text.Text(text, **kw)
185        if as_rich_text:
186            return text_obj
187        return rich_text_to_str(text_obj)
188
189    more_termcolor = attempt_import('more_termcolor', lazy=False)
190    try:
191        colored_text = more_termcolor.colored(text, *colors, **kw)
192    except Exception as e:
193        colored_text = None
194
195    if colored_text is not None:
196        return colored_text
197
198    try:
199        _colors = translate_rich_to_termcolor(*colors)
200        colored_text = more_termcolor.colored(text, *_colors, **kw)
201    except Exception as e:
202        colored_text = None
203
204    if colored_text is None:
205        ### NOTE: warn here?
206        return text
207
208    return colored_text
209
210console = None
211def get_console():
212    """Get the rich console."""
213    global console
214    from meerschaum.utils.packages import import_rich, attempt_import
215    rich = import_rich()
216    rich_console = attempt_import('rich.console')
217    try:
218        console = rich_console.Console(force_terminal=True, color_system='truecolor')
219    except Exception as e:
220        console = None
221    return console
222
223
224def print_tuple(
225        tup: tuple,
226        skip_common: bool = True,
227        common_only: bool = False,
228        upper_padding: int = 0,
229        lower_padding: int = 0,
230        calm: bool = False,
231        _progress: Optional['rich.progress.Progress'] = None,
232    ) -> None:
233    """
234    Print `meerschaum.utils.typing.SuccessTuple`.
235
236    Parameters
237    ----------
238    skip_common: bool, default True
239        If `True`, do not print common success tuples (i.e. `(True, "Success")`).
240
241    common_only: bool, default False
242        If `True`, only print if the success tuple is common.
243
244    upper_padding: int, default 0
245        How many newlines to prepend to the message.
246
247    lower_padding: int, default 0
248        How many newlines to append to the message.
249
250    calm: bool, default False
251        If `True`, use the default emoji and color scheme.
252    """
253    from meerschaum.config.static import STATIC_CONFIG
254    _init()
255    try:
256        status = 'success' if tup[0] else 'failure'
257    except TypeError:
258        status = 'failure'
259        tup = None, None
260
261    if calm:
262        status += '_calm'
263
264    omit_messages = STATIC_CONFIG['system']['success']['ignore']
265
266    do_print = True
267
268    if common_only:
269        skip_common = False
270        do_print = tup[1] in omit_messages
271
272    if skip_common:
273        do_print = tup[1] not in omit_messages
274
275    if do_print:
276        ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET')
277        from meerschaum.config import get_config
278        status_config = get_config('formatting', status, patch=True)
279
280        msg = ' ' + status_config[CHARSET]['icon'] + ' ' + str(tup[1])
281        lines = msg.split('\n')
282        lines = [lines[0]] + [
283            (('    ' + line if not line.startswith(' ') else line))
284            for line in lines[1:]
285        ]
286        if ANSI:
287            lines[0] = fill_ansi(highlight_pipes(lines[0]), **status_config['ansi']['rich'])
288        msg = '\n'.join(lines)
289        msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding)
290        print(msg)
291
292
293def print_options(
294        options: Optional[Dict[str, Any]] = None,
295        nopretty: bool = False,
296        no_rich: bool = False,
297        name: str = 'options',
298        header: Optional[str] = None,
299        num_cols: Optional[int] = None,
300        adjust_cols: bool = True,
301        **kw
302    ) -> None:
303    """
304    Print items in an iterable as a fancy table.
305
306    Parameters
307    ----------
308    options: Optional[Dict[str, Any]], default None
309        The iterable to be printed.
310
311    nopretty: bool, default False
312        If `True`, don't use fancy formatting.
313
314    no_rich: bool, default False
315        If `True`, don't use `rich` to format the output.
316
317    name: str, default 'options'
318        The text in the default header after `'Available'`.
319
320    header: Optional[str], default None
321        If provided, override `name` and use this as the header text.
322
323    num_cols: Optional[int], default None
324        How many columns in the table. Depends on the terminal size. If `None`, use 8.
325
326    adjust_cols: bool, default True
327        If `True`, adjust the number of columns depending on the terminal size.
328
329    """
330    import os
331    from meerschaum.utils.packages import import_rich
332    from meerschaum.utils.formatting import make_header, highlight_pipes
333    from meerschaum.actions import actions as _actions
334    from meerschaum.utils.misc import get_cols_lines, string_width, iterate_chunks
335
336
337    if options is None:
338        options = {}
339    _options = []
340    for o in options:
341        _options.append(str(o))
342    _header = f"Available {name}" if header is None else header
343
344    if num_cols is None:
345        num_cols = 8
346
347    def _print_options_no_rich():
348        if not nopretty:
349            print()
350            print(make_header(_header))
351        ### print actions
352        for option in sorted(_options):
353            if not nopretty:
354                print("  - ", end="")
355            print(option)
356        if not nopretty:
357            print()
358
359    rich = import_rich()
360    if rich is None or nopretty or no_rich:
361        _print_options_no_rich()
362        return None
363
364    ### Prevent too many options from being truncated on small terminals.
365    if adjust_cols and _options:
366        _cols, _lines = get_cols_lines()
367        while num_cols > 1:
368            cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols)
369            num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options])
370            if num_too_big > int(len(_options) / 3):
371                num_cols -= 1
372                continue
373            break
374
375    from meerschaum.utils.formatting import pprint, get_console
376    from meerschaum.utils.packages import attempt_import
377    rich_columns = attempt_import('rich.columns')
378    rich_panel = attempt_import('rich.panel')
379    rich_table = attempt_import('rich.table')
380    Text = attempt_import('rich.text').Text
381    box = attempt_import('rich.box')
382    Panel = rich_panel.Panel
383    Columns = rich_columns.Columns
384    Table = rich_table.Table
385
386    if _header is not None:
387        table = Table(
388            title = '\n' + _header,
389            box = box.SIMPLE,
390            show_header = False,
391            show_footer = False,
392            title_style = '',
393            expand = True,
394        )
395    else:
396        table = Table.grid(padding=(0, 2))
397    for i in range(num_cols):
398        table.add_column()
399
400    chunks = iterate_chunks(
401        [Text.from_ansi(highlight_pipes(o)) for o in sorted(_options)],
402        num_cols,
403        fillvalue=''
404    )
405    for c in chunks:
406        table.add_row(*c)
407
408    get_console().print(table)
409    return None
410
411
412def fill_ansi(string: str, style: str = '') -> str:
413    """
414    Fill in non-formatted segments of ANSI text.
415
416    Parameters
417    ----------
418    string: str
419        A string which contains ANSI escape codes.
420
421    style: str
422        Style arguments to pass to `rich.text.Text`.
423
424    Returns
425    -------
426    A string with ANSI styling applied to the segments which don't yet have a style applied.
427    """
428    from meerschaum.utils.packages import import_rich, attempt_import
429    from meerschaum.utils.misc import iterate_chunks
430    rich = import_rich()
431    Text = attempt_import('rich.text').Text
432    try:
433        msg = Text.from_ansi(string)
434    except AttributeError as e:
435        import traceback
436        traceback.print_stack()
437        msg = ''
438    plain_indices = []
439    for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)):
440        left = left_span.end
441        right = right_span.start if not isinstance(right_span, int) else right_span
442        if left != right:
443            plain_indices.append((left, right))
444    if msg.spans:
445        if msg.spans[0].start != 0:
446            plain_indices = [(0, msg.spans[0].start)] + plain_indices
447        if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg):
448            plain_indices.append((msg.spans[-1].end, len(msg)))
449
450    if plain_indices:
451        for left, right in plain_indices:
452            msg.stylize(style, left, right)
453    else:
454        msg = Text(str(msg), style)
455
456    return rich_text_to_str(msg)
457
458def __getattr__(name: str) -> str:
459    """
460    Lazily load module-level variables.
461    """
462    if name.startswith('__') and name.endswith('__'):
463        raise AttributeError("Cannot import dunders from this module.")
464
465    if name in _attrs:
466        if _attrs[name] is not None:
467            return _attrs[name]
468        from meerschaum.config import get_config
469        if name.lower() in get_config('formatting'):
470            _attrs[name] = get_config('formatting', name.lower())
471        elif name == 'CHARSET':
472            _attrs[name] = 'unicode' if __getattr__('UNICODE') else 'ascii'
473        return _attrs[name]
474    
475    if name == '__wrapped__':
476        import sys
477        return sys.modules[__name__]
478    if name == '__all__':
479        return __all__
480
481    try:
482        return globals()[name]
483    except KeyError:
484        raise AttributeError(f"Could not find '{name}'")
ANSI
CHARSET
UNICODE
def colored( text: str, *colors, as_rich_text: bool = False, **kw) -> Union[str, rich.text.Text]:
145def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']:
146    """Apply colors and rich styles to a string.
147    If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string.
148    Otherwise attempt to use the legacy `more_termcolor.colored` method.
149
150    Parameters
151    ----------
152    text: str
153        The string to apply formatting to.
154        
155    *colors:
156        A list of colors to pass to `more_termcolor.colored()`.
157        Has no effect if `style` is provided.
158
159    style: str, default None
160        If provided, pass to `rich` for processing.
161
162    as_rich_text: bool, default False
163        If `True`, return a `rich.Text` object.
164        `style` must be provided.
165        
166    **kw:
167        Keyword arguments to pass to `rich.text.Text` or `more_termcolor`.
168        
169
170    Returns
171    -------
172    An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`.
173
174    """
175    from meerschaum.utils.packages import import_rich, attempt_import
176    global _colorama_init
177    _init()
178    with _locks['_colorama_init']:
179        if not _colorama_init:
180            _colorama_init = _init()
181
182    if 'style' in kw:
183        rich = import_rich()
184        rich_text = attempt_import('rich.text')
185        text_obj = rich_text.Text(text, **kw)
186        if as_rich_text:
187            return text_obj
188        return rich_text_to_str(text_obj)
189
190    more_termcolor = attempt_import('more_termcolor', lazy=False)
191    try:
192        colored_text = more_termcolor.colored(text, *colors, **kw)
193    except Exception as e:
194        colored_text = None
195
196    if colored_text is not None:
197        return colored_text
198
199    try:
200        _colors = translate_rich_to_termcolor(*colors)
201        colored_text = more_termcolor.colored(text, *_colors, **kw)
202    except Exception as e:
203        colored_text = None
204
205    if colored_text is None:
206        ### NOTE: warn here?
207        return text
208
209    return colored_text

Apply colors and rich styles to a string. If a style keyword is provided, a rich.text.Text object will be parsed into a string. Otherwise attempt to use the legacy more_termcolor.colored method.

Parameters
  • text (str): The string to apply formatting to.
  • *colors:: A list of colors to pass to more_termcolor.colored(). Has no effect if style is provided.
  • style (str, default None): If provided, pass to rich for processing.
  • as_rich_text (bool, default False): If True, return a rich.Text object. style must be provided.
  • **kw:: Keyword arguments to pass to rich.text.Text or more_termcolor.
Returns
  • An ANSI-formatted string or a rich.text.Text object if as_rich_text is True.
def extract_stats_from_message(message: str, stat_keys: Optional[List[str]] = None) -> Dict[str, int]:
485def extract_stats_from_message(
486        message: str,
487        stat_keys: Optional[List[str]] = None,
488    ) -> Dict[str, int]:
489    """
490    Given a sync message, return the insert, update, upsert stats from within.
491
492    Parameters
493    ----------
494    message: str
495        The message to parse for statistics.
496
497    stat_keys: Optional[List[str]], default None
498        If provided, search for these words (case insensitive) in the message.
499        Defaults to `['inserted', 'updated', 'upserted']`.
500
501    Returns
502    -------
503    A dictionary mapping the stat keys to the total number of rows affected.
504    """
505    stat_keys = stat_keys or ['inserted', 'updated', 'upserted']
506    lines_stats = [extract_stats_from_line(line, stat_keys) for line in message.split('\n')]
507    message_stats = {
508        stat_key: sum(stats.get(stat_key, 0) for stats in lines_stats)
509        for stat_key in stat_keys
510    }
511    return message_stats

Given a sync message, return the insert, update, upsert stats from within.

Parameters
  • message (str): The message to parse for statistics.
  • stat_keys (Optional[List[str]], default None): If provided, search for these words (case insensitive) in the message. Defaults to ['inserted', 'updated', 'upserted'].
Returns
  • A dictionary mapping the stat keys to the total number of rows affected.
def fill_ansi(string: str, style: str = '') -> str:
413def fill_ansi(string: str, style: str = '') -> str:
414    """
415    Fill in non-formatted segments of ANSI text.
416
417    Parameters
418    ----------
419    string: str
420        A string which contains ANSI escape codes.
421
422    style: str
423        Style arguments to pass to `rich.text.Text`.
424
425    Returns
426    -------
427    A string with ANSI styling applied to the segments which don't yet have a style applied.
428    """
429    from meerschaum.utils.packages import import_rich, attempt_import
430    from meerschaum.utils.misc import iterate_chunks
431    rich = import_rich()
432    Text = attempt_import('rich.text').Text
433    try:
434        msg = Text.from_ansi(string)
435    except AttributeError as e:
436        import traceback
437        traceback.print_stack()
438        msg = ''
439    plain_indices = []
440    for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)):
441        left = left_span.end
442        right = right_span.start if not isinstance(right_span, int) else right_span
443        if left != right:
444            plain_indices.append((left, right))
445    if msg.spans:
446        if msg.spans[0].start != 0:
447            plain_indices = [(0, msg.spans[0].start)] + plain_indices
448        if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg):
449            plain_indices.append((msg.spans[-1].end, len(msg)))
450
451    if plain_indices:
452        for left, right in plain_indices:
453            msg.stylize(style, left, right)
454    else:
455        msg = Text(str(msg), style)
456
457    return rich_text_to_str(msg)

Fill in non-formatted segments of ANSI text.

Parameters
  • string (str): A string which contains ANSI escape codes.
  • style (str): Style arguments to pass to rich.text.Text.
Returns
  • A string with ANSI styling applied to the segments which don't yet have a style applied.
def get_console():
212def get_console():
213    """Get the rich console."""
214    global console
215    from meerschaum.utils.packages import import_rich, attempt_import
216    rich = import_rich()
217    rich_console = attempt_import('rich.console')
218    try:
219        console = rich_console.Console(force_terminal=True, color_system='truecolor')
220    except Exception as e:
221        console = None
222    return console

Get the rich console.

def highlight_pipes(message: str) -> str:
330def highlight_pipes(message: str) -> str:
331    """
332    Add syntax highlighting to an info message containing stringified `meerschaum.Pipe` objects.
333    """
334    if 'Pipe(' not in message:
335        return message
336
337    from meerschaum import Pipe
338    segments = message.split('Pipe(')
339    msg = ''
340    _d = {}
341    for i, segment in enumerate(segments):
342        comma_index = segment.find(',')
343        paren_index = segment.find(')')
344        single_quote_index = segment.find("'")
345        double_quote_index = segment.find('"')
346
347        has_comma = comma_index != -1
348        has_paren = paren_index != -1
349        has_single_quote = single_quote_index != -1
350        has_double_quote = double_quote_index != -1
351        has_quote = has_single_quote or has_double_quote
352        quote_index = (
353            min(single_quote_index, double_quote_index)
354            if has_double_quote and has_single_quote
355            else (single_quote_index if has_single_quote else double_quote_index)
356        )
357
358        has_pipe = (
359            has_comma
360            and
361            has_paren
362            and
363            has_quote
364            and not
365            (comma_index > paren_index or quote_index > paren_index)
366        )
367
368        if has_pipe:
369            code = "_d['pipe'] = Pipe(" + segment[:paren_index + 1]
370            try:
371                exec(code)
372                _to_add = pipe_repr(_d['pipe']) + segment[paren_index + 1:]
373                _ = _d.pop('pipe', None)
374            except Exception as e:
375                _to_add = 'Pipe(' + segment
376            msg += _to_add
377            continue
378        msg += segment
379    return msg

Add syntax highlighting to an info message containing stringified meerschaum.Pipe objects.

def make_header(message: str, ruler: str = '─') -> str:
14def make_header(
15        message : str,
16        ruler : str = '─',
17    ) -> str:
18    """Format a message string with a ruler.
19    Length of the ruler is the length of the longest word.
20    
21    Example:
22        'My\nheader' -> 'My\nheader\n──────'
23    """
24
25    from meerschaum.utils.formatting import ANSI, UNICODE, colored
26    if not UNICODE:
27        ruler = '-'
28    words = message.split('\n')
29    max_length = 0
30    for w in words:
31        length = len(w)
32        if length > max_length:
33            max_length = length
34
35    s = message + "\n"
36    for i in range(max_length):
37        s += ruler
38    return s

Format a message string with a ruler. Length of the ruler is the length of the longest word.

Example:
    'My

header' -> 'My header ──────'

def pipe_repr( pipe: meerschaum.core.Pipe.Pipe, as_rich_text: bool = False, ansi: Optional[bool] = None) -> Union[str, rich.text.Text]:
279def pipe_repr(
280        pipe: mrsm.Pipe,
281        as_rich_text: bool = False,
282        ansi: Optional[bool] = None,
283    ) -> Union[str, 'rich.text.Text']:
284    """
285    Return a formatted string for representing a `meerschaum.Pipe`.
286    """
287    from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, colored, rich_text_to_str
288    from meerschaum.utils.packages import import_rich, attempt_import
289    rich = import_rich()
290    Text = attempt_import('rich.text').Text
291
292    styles = get_config('formatting', 'pipes', '__repr__', 'ansi', 'styles')
293    if not ANSI or (ansi is False):
294        styles = {k: '' for k in styles}
295    _pipe_style_prefix, _pipe_style_suffix = (
296        (("[" + styles['Pipe'] + "]"), ("[/" + styles['Pipe'] + "]")) if styles['Pipe']
297        else ('', '')
298    )
299    text_obj = (
300        Text.from_markup(_pipe_style_prefix + "Pipe(" + _pipe_style_suffix)
301        + colored(("'" + pipe.connector_keys + "'"), style=styles['connector'], as_rich_text=True)
302        + Text.from_markup(_pipe_style_prefix + ", " + _pipe_style_suffix)
303        + colored(("'" + pipe.metric_key + "'"), style=styles['metric'], as_rich_text=True)
304        + (
305            (
306                colored(', ', style=styles['punctuation'], as_rich_text=True)
307                + colored(
308                    ("'" + pipe.location_key + "'"),
309                    style=styles['location'], as_rich_text=True
310                )
311            ) if pipe.location_key is not None
312            else colored('', style='', as_rich_text=True)
313        ) + (
314            ( ### Add the `instance=` argument.
315                colored(', instance=', style=styles['punctuation'], as_rich_text=True)
316                + colored(
317                    ("'" + pipe.instance_keys + "'"),
318                    style=styles['instance'], as_rich_text=True
319                )
320            ) if pipe.instance_keys != get_config('meerschaum', 'instance')
321            else colored('', style='', as_rich_text=True)
322        )
323        + Text.from_markup(_pipe_style_prefix + ")" + _pipe_style_suffix)
324    )
325    if as_rich_text:
326        return text_obj
327    return rich_text_to_str(text_obj)

Return a formatted string for representing a meerschaum.Pipe.

def pprint( *args, detect_password: bool = True, nopretty: bool = False, **kw) -> None:
 10def pprint(
 11        *args,
 12        detect_password: bool = True,
 13        nopretty: bool = False,
 14        **kw
 15    ) -> None:
 16    """Pretty print an object according to the configured ANSI and UNICODE settings.
 17    If detect_password is True (default), search and replace passwords with '*' characters.
 18    Does not mutate objects.
 19    """
 20    from meerschaum.utils.packages import attempt_import, import_rich
 21    from meerschaum.utils.formatting import ANSI, UNICODE, get_console, print_tuple
 22    from meerschaum.utils.warnings import error
 23    from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords
 24    from collections import OrderedDict
 25    import copy, json
 26
 27    if (
 28        len(args) == 1
 29        and
 30        isinstance(args[0], tuple)
 31        and
 32        len(args[0]) == 2
 33        and
 34        isinstance(args[0][0], bool)
 35        and
 36        isinstance(args[0][1], str)
 37    ):
 38        return print_tuple(args[0])
 39
 40    modify = True
 41    rich_pprint = None
 42    if ANSI and not nopretty:
 43        rich = import_rich()
 44        if rich is not None:
 45            rich_pretty = attempt_import('rich.pretty')
 46        if rich_pretty is not None:
 47            def _rich_pprint(*args, **kw):
 48                _console = get_console()
 49                _kw = filter_keywords(_console.print, **kw)
 50                _console.print(*args, **_kw)
 51            rich_pprint = _rich_pprint
 52    elif not nopretty:
 53        pprintpp = attempt_import('pprintpp', warn=False)
 54        try:
 55            _pprint = pprintpp.pprint
 56        except Exception as e:
 57            import pprint as _pprint_module
 58            _pprint = _pprint_module.pprint
 59
 60    func = (
 61        _pprint if rich_pprint is None else rich_pprint
 62    ) if not nopretty else print
 63
 64    try:
 65        args_copy = copy.deepcopy(args)
 66    except Exception as e:
 67        args_copy = args
 68        modify = False
 69    _args = []
 70    for a in args:
 71        c = a
 72        ### convert OrderedDict into dict
 73        if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict):
 74            c = dict_from_od(copy.deepcopy(c))
 75        _args.append(c)
 76    args = _args
 77
 78    _args = list(args)
 79    if detect_password and modify:
 80        _args = []
 81        for a in args:
 82            c = a
 83            if isinstance(c, dict):
 84                c = replace_password(copy.deepcopy(c))
 85            if nopretty:
 86                try:
 87                    c = json.dumps(c)
 88                    is_json = True
 89                except Exception as e:
 90                    is_json = False
 91                if not is_json:
 92                    try:
 93                        c = str(c)
 94                    except Exception as e:
 95                        pass
 96            _args.append(c)
 97
 98    ### filter out unsupported keywords
 99    func_kw = filter_keywords(func, **kw) if not nopretty else {}
100    error_msg = None
101    try:
102        func(*_args, **func_kw)
103    except Exception as e:
104        error_msg = e
105    if error_msg is not None:
106        error(error_msg)

Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.

def pprint_pipes( pipes: Dict[str, Dict[str, Dict[str, meerschaum.core.Pipe.Pipe]]]) -> None:
 16def pprint_pipes(pipes: PipesDict) -> None:
 17    """Print a stylized tree of a Pipes dictionary.
 18    Supports ANSI and UNICODE global settings."""
 19    from meerschaum.utils.warnings import error
 20    from meerschaum.utils.packages import attempt_import, import_rich
 21    from meerschaum.utils.misc import sorted_dict, replace_pipes_in_dict
 22    from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, pprint, colored, get_console
 23    import copy
 24    rich = import_rich('rich', warn=False)
 25    Text = None
 26    if rich is not None:
 27        rich_text = attempt_import('rich.text', lazy=False)
 28        Text = rich_text.Text
 29
 30    icons = get_config('formatting', 'pipes', CHARSET, 'icons')
 31    styles = get_config('formatting', 'pipes', 'ansi', 'styles')
 32    if not ANSI:
 33        styles = {k: '' for k in styles}
 34    print()
 35
 36    def ascii_print_pipes():
 37        """Print the dictionary with no unicode allowed. Also works in case rich fails to import
 38        (though rich should auto-install when `attempt_import()` is called)."""
 39        asciitree = attempt_import('asciitree')
 40        ascii_dict, replace_dict = {}, {'connector': {}, 'metric': {}, 'location': {}}
 41        for conn_keys, metrics in pipes.items():
 42            _colored_conn_key = colored(icons['connector'] + conn_keys, style=styles['connector'])
 43            if Text is not None:
 44                replace_dict['connector'][_colored_conn_key] = (
 45                    Text(conn_keys, style=styles['connector'])
 46                )
 47            ascii_dict[_colored_conn_key] = {}
 48            for metric, locations in metrics.items():
 49                _colored_metric_key = colored(icons['metric'] + metric, style=styles['metric'])
 50                if Text is not None:
 51                    replace_dict['metric'][_colored_metric_key] = (
 52                        Text(metric, style=styles['metric'])
 53                    )
 54                ascii_dict[_colored_conn_key][_colored_metric_key] = {}
 55                for location, pipe in locations.items():
 56                    _location_style = styles[('none' if location is None else 'location')]
 57                    pipe_addendum = '\n         ' + pipe.__repr__() + '\n'
 58                    _colored_location = colored(
 59                        icons['location'] + str(location), style=_location_style
 60                    )
 61                    _colored_location_key = _colored_location + pipe_addendum
 62                    if Text is not None:
 63                        replace_dict['location'][_colored_location] = (
 64                            Text(str(location), style=_location_style)
 65                        )
 66                    ascii_dict[_colored_conn_key][_colored_metric_key][_colored_location_key] = {}
 67
 68        tree = asciitree.LeftAligned()
 69        output = ''
 70        cols = []
 71
 72        ### This is pretty terrible, unreadable code.
 73        ### Please know that I'm normally better than this.
 74        key_str = (
 75            (Text("     ") if Text is not None else "     ") +
 76            (
 77                Text("Key", style='underline') if Text is not None else
 78                colored("Key", style='underline')
 79            ) + (Text('\n\n  ') if Text is not None else '\n\n  ') +
 80            (
 81                Text("Connector", style=styles['connector']) if Text is not None else
 82                colored("Connector", style=styles['connector'])
 83            ) + (Text('\n   +-- ') if Text is not None else '\n   +-- ') +
 84            (
 85                Text("Metric", style=styles['metric']) if Text is not None else
 86                colored("Metric", style=styles['metric'])
 87            ) + (Text('\n       +-- ') if Text is not None else '\n       +-- ') +
 88            (
 89                Text("Location", style=styles['location']) if Text is not None else
 90                colored("Location", style=styles['location'])
 91            ) + (Text('\n\n') if Text is not None else '\n\n')
 92        )
 93
 94        output += str(key_str)
 95        cols.append(key_str)
 96
 97        def replace_tree_text(tree_str : str) -> Text:
 98            """Replace the colored words with stylized Text instead.
 99            Is not executed if ANSI and UNICODE are disabled."""
100            tree_text = Text(tree_str) if Text is not None else None
101            for k, v in replace_dict.items():
102                for _colored, _text in v.items():
103                    parts = []
104                    lines = tree_text.split(_colored)
105                    for part in lines:
106                        parts += [part, _text]
107                    if lines[-1] != Text(''):
108                        parts = parts[:-1]
109                    _tree_text = Text('')
110                    for part in parts:
111                        _tree_text += part
112                    tree_text = _tree_text
113            return tree_text
114
115        tree_output = ""
116        for k, v in ascii_dict.items():
117            branch = {k : v}
118            tree_output += tree(branch) + '\n\n'
119            if not UNICODE and not ANSI:
120                _col = (Text(tree(branch)) if Text is not None else tree(branch))
121            else:
122                _col = replace_tree_text(tree(branch))
123            cols.append(_col)
124        if len(output) > 0:
125            tree_output = tree_output[:-2]
126        output += tree_output
127
128        if rich is None:
129            return print(output)
130
131        rich_columns = attempt_import('rich.columns')
132        Columns = rich_columns.Columns
133        columns = Columns(cols)
134        get_console().print(columns)
135
136    if not UNICODE:
137        return ascii_print_pipes()
138
139    rich_panel, rich_tree, rich_text, rich_columns, rich_table = attempt_import(
140        'rich.panel',
141        'rich.tree',
142        'rich.text',
143        'rich.columns',
144        'rich.table',
145    )
146    from rich import box
147    Panel = rich_panel.Panel
148    Tree = rich_tree.Tree
149    Text = rich_text.Text
150    Columns = rich_columns.Columns
151    Table = rich_table.Table
152
153    key_panel = Panel(
154        (
155            Text("\n") +
156            Text(icons['connector'] + "Connector", style=styles['connector']) + Text("\n\n") +
157            Text(icons['metric'] + "Metric", style=styles['metric']) + Text("\n\n") +
158            Text(icons['location'] + "Location", style=styles['location']) + Text("\n")
159        ),
160        title = Text(icons['key'] + "Keys", style=styles['guide']),
161        border_style = styles['guide'],
162        expand = True
163    )
164
165    cols = []
166    conn_trees = {}
167    metric_trees = {}
168    pipes = sorted_dict(pipes)
169    for conn_keys, metrics in pipes.items():
170        conn_trees[conn_keys] = Tree(
171            Text(
172                icons['connector'] + conn_keys,
173                style = styles['connector'],
174            ),
175            guide_style = styles['connector']
176        )
177        metric_trees[conn_keys] = {}
178        for metric, locations in metrics.items():
179            metric_trees[conn_keys][metric] = Tree(
180                Text(
181                    icons['metric'] + metric,
182                    style = styles['metric']
183                ),
184                guide_style = styles['metric']
185            )
186            conn_trees[conn_keys].add(metric_trees[conn_keys][metric])
187            for location, pipe in locations.items():
188                _location = (
189                    Text(str(location), style=styles['none']) if location is None
190                    else Text(location, style=styles['location'])
191                )
192                _location = (
193                    Text(icons['location'])
194                    + _location + Text('\n')
195                    + pipe_repr(pipe, as_rich_text=True) + Text('\n')
196                )
197                metric_trees[conn_keys][metric].add(_location)
198
199    cols += [key_panel]
200    for k, t in conn_trees.items():
201        cols.append(t)
202
203    columns = Columns(cols)
204    get_console().print(columns)

Print a stylized tree of a Pipes dictionary. Supports ANSI and UNICODE global settings.

def translate_rich_to_termcolor(*colors) -> tuple:
57def translate_rich_to_termcolor(*colors) -> tuple:
58    """Translate between rich and more_termcolor terminology.
59    This is probably prone to breaking.
60
61    Parameters
62    ----------
63    *colors :
64        
65
66    Returns
67    -------
68
69    """
70    _colors = []
71    for c in colors:
72        _c_list = []
73        ### handle 'bright'
74        c = c.replace('bright_', 'bright ')
75
76        ### handle 'on'
77        if ' on ' in c:
78            _on = c.split(' on ')
79            _colors.append(_on[0])
80            for _c in _on[1:]:
81                _c_list.append('on ' + _c)
82        else:
83            _c_list += [c]
84
85        _colors += _c_list
86
87    return tuple(_colors)

Translate between rich and more_termcolor terminology. This is probably prone to breaking.

Parameters
  • *colors :
  • Returns
  • -------