meerschaum.utils.formatting

Utilities for formatting output text

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Utilities for formatting output text
  7"""
  8
  9from __future__ import annotations
 10import platform
 11import os
 12import sys
 13import meerschaum as mrsm
 14from meerschaum.utils.typing import Optional, Union, Any, Dict
 15from meerschaum.utils.formatting._shell import make_header
 16from meerschaum.utils.formatting._pprint import pprint
 17from meerschaum.utils.formatting._pipes import (
 18    pprint_pipes,
 19    highlight_pipes,
 20    format_pipe_success_tuple,
 21    print_pipes_results,
 22    extract_stats_from_message,
 23    pipe_repr,
 24)
 25from meerschaum.utils.threading import Lock, RLock
 26
 27_attrs = {
 28    'ANSI': None,
 29    'UNICODE': None,
 30    'CHARSET': None,
 31}
 32__all__ = sorted([
 33    'ANSI', 'CHARSET', 'UNICODE',
 34    'colored',
 35    'translate_rich_to_termcolor',
 36    'get_console',
 37    'format_success_tuple',
 38    'print_tuple',
 39    'print_options',
 40    'fill_ansi',
 41    'pprint',
 42    'highlight_pipes',
 43    'pprint_pipes',
 44    'make_header',
 45    'pipe_repr',
 46    'print_pipes_results',
 47    'extract_stats_from_message',
 48])
 49__pdoc__ = {}
 50_locks = {
 51    '_colorama_init': RLock(),
 52}
 53
 54
 55def colored_fallback(*args, **kw):
 56    return ' '.join(args)
 57
 58def translate_rich_to_termcolor(*colors) -> tuple:
 59    """Translate between rich and more_termcolor terminology.
 60    This is probably prone to breaking.
 61
 62    Parameters
 63    ----------
 64    *colors :
 65        
 66
 67    Returns
 68    -------
 69
 70    """
 71    _colors = []
 72    for c in colors:
 73        _c_list = []
 74        ### handle 'bright'
 75        c = c.replace('bright_', 'bright ')
 76
 77        ### handle 'on'
 78        if ' on ' in c:
 79            _on = c.split(' on ')
 80            _colors.append(_on[0])
 81            for _c in _on[1:]:
 82                _c_list.append('on ' + _c)
 83        else:
 84            _c_list += [c]
 85
 86        _colors += _c_list
 87
 88    return tuple(_colors)
 89
 90
 91def rich_text_to_str(text: 'rich.text.Text') -> str:
 92    """Convert a `rich.text.Text` object to a string with ANSI in-tact."""
 93    _console = get_console()
 94    if _console is None:
 95        return str(text)
 96    with console.capture() as cap:
 97        console.print(text)
 98    string = cap.get()
 99    return string[:-1]
100
101
102def _init():
103    """
104    Initial color settings (mostly for Windows).
105    """
106    if platform.system() != "Windows":
107        return
108    if 'PYTHONIOENCODING' not in os.environ:
109        os.environ['PYTHONIOENCODING'] = 'utf-8'
110    if 'PYTHONLEGACYWINDOWSSTDIO' not in os.environ:
111        os.environ['PYTHONLEGACYWINDOWSSTDIO'] = 'utf-8'
112    sys.stdin.reconfigure(encoding='utf-8')
113    sys.stdout.reconfigure(encoding='utf-8')
114    sys.stderr.reconfigure(encoding='utf-8')
115
116    from ctypes import windll
117    k = windll.kernel32
118    k.SetConsoleMode(k.GetStdHandle(-11), 7)
119    os.system("color")
120
121    from meerschaum.utils.packages import attempt_import
122    ### init colorama for Windows color output
123    colorama, more_termcolor = attempt_import(
124        'colorama',
125        'more_termcolor',
126        lazy = False,
127        warn = False,
128        color = False,
129    )
130    try:
131        colorama.init(autoreset=False)
132        success = True
133    except Exception as e:
134        import traceback
135        traceback.print_exc()
136        _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii'
137        success = False
138
139    if more_termcolor is None:
140        _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii'
141        success = False
142
143    return success
144
145_colorama_init = False
146def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']:
147    """Apply colors and rich styles to a string.
148    If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string.
149    Otherwise attempt to use the legacy `more_termcolor.colored` method.
150
151    Parameters
152    ----------
153    text: str
154        The string to apply formatting to.
155        
156    *colors:
157        A list of colors to pass to `more_termcolor.colored()`.
158        Has no effect if `style` is provided.
159
160    style: str, default None
161        If provided, pass to `rich` for processing.
162
163    as_rich_text: bool, default False
164        If `True`, return a `rich.Text` object.
165        `style` must be provided.
166        
167    **kw:
168        Keyword arguments to pass to `rich.text.Text` or `more_termcolor`.
169        
170
171    Returns
172    -------
173    An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`.
174
175    """
176    from meerschaum.utils.packages import import_rich, attempt_import
177    global _colorama_init
178    _init()
179    with _locks['_colorama_init']:
180        if not _colorama_init:
181            _colorama_init = _init()
182
183    if 'style' in kw:
184        rich = import_rich()
185        rich_text = attempt_import('rich.text')
186        text_obj = rich_text.Text(text, **kw)
187        if as_rich_text:
188            return text_obj
189        return rich_text_to_str(text_obj)
190
191    more_termcolor = attempt_import('more_termcolor', lazy=False)
192    try:
193        colored_text = more_termcolor.colored(text, *colors, **kw)
194    except Exception as e:
195        colored_text = None
196
197    if colored_text is not None:
198        return colored_text
199
200    try:
201        _colors = translate_rich_to_termcolor(*colors)
202        colored_text = more_termcolor.colored(text, *_colors, **kw)
203    except Exception as e:
204        colored_text = None
205
206    if colored_text is None:
207        ### NOTE: warn here?
208        return text
209
210    return colored_text
211
212console = None
213def get_console():
214    """Get the rich console."""
215    global console
216    from meerschaum.utils.packages import import_rich, attempt_import
217    rich = import_rich()
218    rich_console = attempt_import('rich.console')
219    try:
220        console = rich_console.Console(force_terminal=True, color_system='truecolor')
221    except Exception as e:
222        console = None
223    return console
224
225
226def print_tuple(
227    tup: mrsm.SuccessTuple,
228    skip_common: bool = True,
229    common_only: bool = False,
230    upper_padding: int = 0,
231    lower_padding: int = 0,
232    left_padding: int = 1,
233    calm: bool = False,
234    _progress: Optional['rich.progress.Progress'] = None,
235) -> None:
236    """
237    Format `meerschaum.utils.typing.SuccessTuple`.
238
239    Parameters
240    ----------
241    skip_common: bool, default True
242        If `True`, do not print common success tuples (i.e. `(True, "Success")`).
243
244    common_only: bool, default False
245        If `True`, only print if the success tuple is common.
246
247    upper_padding: int, default 0
248        How many newlines to prepend to the message.
249
250    lower_padding: int, default 0
251        How many newlines to append to the message.
252
253    left_padding: int, default 1
254        How mant spaces to preprend to the message.
255
256    calm: bool, default False
257        If `True`, use the default emoji and color scheme.
258
259    """
260    from meerschaum.config.static import STATIC_CONFIG
261    do_print = True
262
263    omit_messages = STATIC_CONFIG['system']['success']['ignore']
264
265    if common_only:
266        skip_common = False
267        do_print = tup[1] in omit_messages
268
269    if skip_common:
270        do_print = tup[1] not in omit_messages
271
272    if not do_print:
273        return
274
275    print(format_success_tuple(
276        tup,
277        upper_padding=upper_padding,
278        lower_padding=lower_padding,
279        calm=calm,
280        _progress=_progress,
281    ))
282
283
284def format_success_tuple(
285    tup: mrsm.SuccessTuple,
286    upper_padding: int = 0,
287    lower_padding: int = 0,
288    left_padding: int = 1,
289    calm: bool = False,
290    _progress: Optional['rich.progress.Progress'] = None,
291) -> str:
292    """
293    Format `meerschaum.utils.typing.SuccessTuple`.
294
295    Parameters
296    ----------
297    upper_padding: int, default 0
298        How many newlines to prepend to the message.
299
300    lower_padding: int, default 0
301        How many newlines to append to the message.
302
303    left_padding: int, default 1
304        How mant spaces to preprend to the message.
305
306    calm: bool, default False
307        If `True`, use the default emoji and color scheme.
308    """
309    from meerschaum.config.static import STATIC_CONFIG
310    _init()
311    try:
312        status = 'success' if tup[0] else 'failure'
313    except TypeError:
314        status = 'failure'
315        tup = None, None
316
317    if calm:
318        status += '_calm'
319
320    ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET')
321    from meerschaum.config import get_config
322    status_config = get_config('formatting', status, patch=True)
323
324    msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(tup[1])
325    lines = msg.split('\n')
326    lines = [lines[0]] + [
327        (('    ' + line if not line.startswith(' ') else line))
328        for line in lines[1:]
329    ]
330    if ANSI:
331        lines[0] = fill_ansi(highlight_pipes(lines[0]), **status_config['ansi']['rich'])
332
333    msg = '\n'.join(lines)
334    msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding)
335    return msg
336
337
338def print_options(
339    options: Optional[Dict[str, Any]] = None,
340    nopretty: bool = False,
341    no_rich: bool = False,
342    name: str = 'options',
343    header: Optional[str] = None,
344    num_cols: Optional[int] = None,
345    adjust_cols: bool = True,
346    sort_options: bool = False,
347    **kw
348) -> None:
349    """
350    Print items in an iterable as a fancy table.
351
352    Parameters
353    ----------
354    options: Optional[Dict[str, Any]], default None
355        The iterable to be printed.
356
357    nopretty: bool, default False
358        If `True`, don't use fancy formatting.
359
360    no_rich: bool, default False
361        If `True`, don't use `rich` to format the output.
362
363    name: str, default 'options'
364        The text in the default header after `'Available'`.
365
366    header: Optional[str], default None
367        If provided, override `name` and use this as the header text.
368
369    num_cols: Optional[int], default None
370        How many columns in the table. Depends on the terminal size. If `None`, use 8.
371
372    adjust_cols: bool, default True
373        If `True`, adjust the number of columns depending on the terminal size.
374
375    """
376    import os
377    from meerschaum.utils.packages import import_rich
378    from meerschaum.utils.formatting import make_header, highlight_pipes
379    from meerschaum.actions import actions as _actions
380    from meerschaum.utils.misc import get_cols_lines, string_width, iterate_chunks
381
382
383    if options is None:
384        options = {}
385    _options = []
386    for o in options:
387        _options.append(str(o))
388    if sort_options:
389        _options = sorted(_options)
390    _header = f"\nAvailable {name}" if header is None else header
391
392    if num_cols is None:
393        num_cols = 8
394
395    def _print_options_no_rich():
396        if not nopretty:
397            print()
398            print(make_header(_header))
399        ### print actions
400        for option in _options:
401            if not nopretty:
402                print("  - ", end="")
403            print(option)
404        if not nopretty:
405            print()
406
407    rich = import_rich()
408    if rich is None or nopretty or no_rich:
409        _print_options_no_rich()
410        return None
411
412    ### Prevent too many options from being truncated on small terminals.
413    if adjust_cols and _options:
414        _cols, _lines = get_cols_lines()
415        while num_cols > 1:
416            cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols)
417            num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options])
418            if num_too_big > int(len(_options) / 3):
419                num_cols -= 1
420                continue
421            break
422
423    from meerschaum.utils.formatting import pprint, get_console
424    from meerschaum.utils.packages import attempt_import
425    rich_columns = attempt_import('rich.columns')
426    rich_panel = attempt_import('rich.panel')
427    rich_table = attempt_import('rich.table')
428    Text = attempt_import('rich.text').Text
429    box = attempt_import('rich.box')
430    Panel = rich_panel.Panel
431    Columns = rich_columns.Columns
432    Table = rich_table.Table
433
434    if _header is not None:
435        table = Table(
436            title = _header,
437            box = box.SIMPLE,
438            show_header = False,
439            show_footer = False,
440            title_style = '',
441            expand = True,
442        )
443    else:
444        table = Table.grid(padding=(0, 2))
445    for i in range(num_cols):
446        table.add_column()
447
448    if len(_options) < 12:
449        # If fewer than 12 items, use a single column
450        for option in _options:
451            table.add_row(Text.from_ansi(highlight_pipes(option)))
452    else:
453        # Otherwise, use multiple columns as before
454        num_rows = (len(_options) + num_cols - 1) // num_cols
455        for i in range(num_rows):
456            row = []
457            for j in range(num_cols):
458                index = i + j * num_rows
459                if index < len(_options):
460                    row.append(Text.from_ansi(highlight_pipes(_options[index])))
461                else:
462                    row.append('')
463            table.add_row(*row)
464
465    get_console().print(table)
466    return None
467
468
469def fill_ansi(string: str, style: str = '') -> str:
470    """
471    Fill in non-formatted segments of ANSI text.
472
473    Parameters
474    ----------
475    string: str
476        A string which contains ANSI escape codes.
477
478    style: str
479        Style arguments to pass to `rich.text.Text`.
480
481    Returns
482    -------
483    A string with ANSI styling applied to the segments which don't yet have a style applied.
484    """
485    from meerschaum.utils.packages import import_rich, attempt_import
486    from meerschaum.utils.misc import iterate_chunks
487    rich = import_rich()
488    Text = attempt_import('rich.text').Text
489    try:
490        msg = Text.from_ansi(string)
491    except AttributeError as e:
492        import traceback
493        traceback.print_stack()
494        msg = ''
495    plain_indices = []
496    for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)):
497        left = left_span.end
498        right = right_span.start if not isinstance(right_span, int) else right_span
499        if left != right:
500            plain_indices.append((left, right))
501    if msg.spans:
502        if msg.spans[0].start != 0:
503            plain_indices = [(0, msg.spans[0].start)] + plain_indices
504        if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg):
505            plain_indices.append((msg.spans[-1].end, len(msg)))
506
507    if plain_indices:
508        for left, right in plain_indices:
509            msg.stylize(style, left, right)
510    else:
511        msg = Text(str(msg), style)
512
513    return rich_text_to_str(msg)
514
515
516def __getattr__(name: str) -> str:
517    """
518    Lazily load module-level variables.
519    """
520    if name.startswith('__') and name.endswith('__'):
521        raise AttributeError("Cannot import dunders from this module.")
522
523    if name in _attrs:
524        if _attrs[name] is not None:
525            return _attrs[name]
526        from meerschaum.config import get_config
527        if name.lower() in get_config('formatting'):
528            _attrs[name] = get_config('formatting', name.lower())
529        elif name == 'CHARSET':
530            _attrs[name] = 'unicode' if __getattr__('UNICODE') else 'ascii'
531        return _attrs[name]
532    
533    if name == '__wrapped__':
534        import sys
535        return sys.modules[__name__]
536    if name == '__all__':
537        return __all__
538
539    try:
540        return globals()[name]
541    except KeyError:
542        raise AttributeError(f"Could not find '{name}'")
ANSI
CHARSET
UNICODE
def colored( text: str, *colors, as_rich_text: bool = False, **kw) -> Union[str, rich.text.Text]:
147def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']:
148    """Apply colors and rich styles to a string.
149    If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string.
150    Otherwise attempt to use the legacy `more_termcolor.colored` method.
151
152    Parameters
153    ----------
154    text: str
155        The string to apply formatting to.
156        
157    *colors:
158        A list of colors to pass to `more_termcolor.colored()`.
159        Has no effect if `style` is provided.
160
161    style: str, default None
162        If provided, pass to `rich` for processing.
163
164    as_rich_text: bool, default False
165        If `True`, return a `rich.Text` object.
166        `style` must be provided.
167        
168    **kw:
169        Keyword arguments to pass to `rich.text.Text` or `more_termcolor`.
170        
171
172    Returns
173    -------
174    An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`.
175
176    """
177    from meerschaum.utils.packages import import_rich, attempt_import
178    global _colorama_init
179    _init()
180    with _locks['_colorama_init']:
181        if not _colorama_init:
182            _colorama_init = _init()
183
184    if 'style' in kw:
185        rich = import_rich()
186        rich_text = attempt_import('rich.text')
187        text_obj = rich_text.Text(text, **kw)
188        if as_rich_text:
189            return text_obj
190        return rich_text_to_str(text_obj)
191
192    more_termcolor = attempt_import('more_termcolor', lazy=False)
193    try:
194        colored_text = more_termcolor.colored(text, *colors, **kw)
195    except Exception as e:
196        colored_text = None
197
198    if colored_text is not None:
199        return colored_text
200
201    try:
202        _colors = translate_rich_to_termcolor(*colors)
203        colored_text = more_termcolor.colored(text, *_colors, **kw)
204    except Exception as e:
205        colored_text = None
206
207    if colored_text is None:
208        ### NOTE: warn here?
209        return text
210
211    return colored_text

Apply colors and rich styles to a string. If a style keyword is provided, a rich.text.Text object will be parsed into a string. Otherwise attempt to use the legacy more_termcolor.colored method.

Parameters
  • text (str): The string to apply formatting to.
  • *colors:: A list of colors to pass to more_termcolor.colored(). Has no effect if style is provided.
  • style (str, default None): If provided, pass to rich for processing.
  • as_rich_text (bool, default False): If True, return a rich.Text object. style must be provided.
  • **kw:: Keyword arguments to pass to rich.text.Text or more_termcolor.
Returns
  • An ANSI-formatted string or a rich.text.Text object if as_rich_text is True.
def extract_stats_from_message(message: str, stat_keys: Optional[List[str]] = None) -> Dict[str, int]:
485def extract_stats_from_message(
486        message: str,
487        stat_keys: Optional[List[str]] = None,
488    ) -> Dict[str, int]:
489    """
490    Given a sync message, return the insert, update, upsert stats from within.
491
492    Parameters
493    ----------
494    message: str
495        The message to parse for statistics.
496
497    stat_keys: Optional[List[str]], default None
498        If provided, search for these words (case insensitive) in the message.
499        Defaults to `['inserted', 'updated', 'upserted']`.
500
501    Returns
502    -------
503    A dictionary mapping the stat keys to the total number of rows affected.
504    """
505    stat_keys = stat_keys or ['inserted', 'updated', 'upserted']
506    lines_stats = [extract_stats_from_line(line, stat_keys) for line in message.split('\n')]
507    message_stats = {
508        stat_key: sum(stats.get(stat_key, 0) for stats in lines_stats)
509        for stat_key in stat_keys
510    }
511    return message_stats

Given a sync message, return the insert, update, upsert stats from within.

Parameters
  • message (str): The message to parse for statistics.
  • stat_keys (Optional[List[str]], default None): If provided, search for these words (case insensitive) in the message. Defaults to ['inserted', 'updated', 'upserted'].
Returns
  • A dictionary mapping the stat keys to the total number of rows affected.
def fill_ansi(string: str, style: str = '') -> str:
470def fill_ansi(string: str, style: str = '') -> str:
471    """
472    Fill in non-formatted segments of ANSI text.
473
474    Parameters
475    ----------
476    string: str
477        A string which contains ANSI escape codes.
478
479    style: str
480        Style arguments to pass to `rich.text.Text`.
481
482    Returns
483    -------
484    A string with ANSI styling applied to the segments which don't yet have a style applied.
485    """
486    from meerschaum.utils.packages import import_rich, attempt_import
487    from meerschaum.utils.misc import iterate_chunks
488    rich = import_rich()
489    Text = attempt_import('rich.text').Text
490    try:
491        msg = Text.from_ansi(string)
492    except AttributeError as e:
493        import traceback
494        traceback.print_stack()
495        msg = ''
496    plain_indices = []
497    for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)):
498        left = left_span.end
499        right = right_span.start if not isinstance(right_span, int) else right_span
500        if left != right:
501            plain_indices.append((left, right))
502    if msg.spans:
503        if msg.spans[0].start != 0:
504            plain_indices = [(0, msg.spans[0].start)] + plain_indices
505        if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg):
506            plain_indices.append((msg.spans[-1].end, len(msg)))
507
508    if plain_indices:
509        for left, right in plain_indices:
510            msg.stylize(style, left, right)
511    else:
512        msg = Text(str(msg), style)
513
514    return rich_text_to_str(msg)

Fill in non-formatted segments of ANSI text.

Parameters
  • string (str): A string which contains ANSI escape codes.
  • style (str): Style arguments to pass to rich.text.Text.
Returns
  • A string with ANSI styling applied to the segments which don't yet have a style applied.
def format_success_tuple( tup: Tuple[bool, str], upper_padding: int = 0, lower_padding: int = 0, left_padding: int = 1, calm: bool = False, _progress: Optional[rich.progress.Progress] = None) -> str:
285def format_success_tuple(
286    tup: mrsm.SuccessTuple,
287    upper_padding: int = 0,
288    lower_padding: int = 0,
289    left_padding: int = 1,
290    calm: bool = False,
291    _progress: Optional['rich.progress.Progress'] = None,
292) -> str:
293    """
294    Format `meerschaum.utils.typing.SuccessTuple`.
295
296    Parameters
297    ----------
298    upper_padding: int, default 0
299        How many newlines to prepend to the message.
300
301    lower_padding: int, default 0
302        How many newlines to append to the message.
303
304    left_padding: int, default 1
305        How mant spaces to preprend to the message.
306
307    calm: bool, default False
308        If `True`, use the default emoji and color scheme.
309    """
310    from meerschaum.config.static import STATIC_CONFIG
311    _init()
312    try:
313        status = 'success' if tup[0] else 'failure'
314    except TypeError:
315        status = 'failure'
316        tup = None, None
317
318    if calm:
319        status += '_calm'
320
321    ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET')
322    from meerschaum.config import get_config
323    status_config = get_config('formatting', status, patch=True)
324
325    msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(tup[1])
326    lines = msg.split('\n')
327    lines = [lines[0]] + [
328        (('    ' + line if not line.startswith(' ') else line))
329        for line in lines[1:]
330    ]
331    if ANSI:
332        lines[0] = fill_ansi(highlight_pipes(lines[0]), **status_config['ansi']['rich'])
333
334    msg = '\n'.join(lines)
335    msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding)
336    return msg

Format meerschaum.SuccessTuple.

Parameters
  • upper_padding (int, default 0): How many newlines to prepend to the message.
  • lower_padding (int, default 0): How many newlines to append to the message.
  • left_padding (int, default 1): How mant spaces to preprend to the message.
  • calm (bool, default False): If True, use the default emoji and color scheme.
def get_console():
214def get_console():
215    """Get the rich console."""
216    global console
217    from meerschaum.utils.packages import import_rich, attempt_import
218    rich = import_rich()
219    rich_console = attempt_import('rich.console')
220    try:
221        console = rich_console.Console(force_terminal=True, color_system='truecolor')
222    except Exception as e:
223        console = None
224    return console

Get the rich console.

def highlight_pipes(message: str) -> str:
330def highlight_pipes(message: str) -> str:
331    """
332    Add syntax highlighting to an info message containing stringified `meerschaum.Pipe` objects.
333    """
334    if 'Pipe(' not in message:
335        return message
336
337    from meerschaum import Pipe
338    segments = message.split('Pipe(')
339    msg = ''
340    _d = {}
341    for i, segment in enumerate(segments):
342        comma_index = segment.find(',')
343        paren_index = segment.find(')')
344        single_quote_index = segment.find("'")
345        double_quote_index = segment.find('"')
346
347        has_comma = comma_index != -1
348        has_paren = paren_index != -1
349        has_single_quote = single_quote_index != -1
350        has_double_quote = double_quote_index != -1
351        has_quote = has_single_quote or has_double_quote
352        quote_index = (
353            min(single_quote_index, double_quote_index)
354            if has_double_quote and has_single_quote
355            else (single_quote_index if has_single_quote else double_quote_index)
356        )
357
358        has_pipe = (
359            has_comma
360            and
361            has_paren
362            and
363            has_quote
364            and not
365            (comma_index > paren_index or quote_index > paren_index)
366        )
367
368        if has_pipe:
369            code = "_d['pipe'] = Pipe(" + segment[:paren_index + 1]
370            try:
371                exec(code)
372                _to_add = pipe_repr(_d['pipe']) + segment[paren_index + 1:]
373                _ = _d.pop('pipe', None)
374            except Exception as e:
375                _to_add = 'Pipe(' + segment
376            msg += _to_add
377            continue
378        msg += segment
379    return msg

Add syntax highlighting to an info message containing stringified meerschaum.Pipe objects.

def make_header(message: str, ruler: str = '─') -> str:
14def make_header(
15        message : str,
16        ruler : str = '─',
17    ) -> str:
18    """Format a message string with a ruler.
19    Length of the ruler is the length of the longest word.
20    
21    Example:
22        'My\nheader' -> 'My\nheader\n──────'
23    """
24
25    from meerschaum.utils.formatting import ANSI, UNICODE, colored
26    if not UNICODE:
27        ruler = '-'
28    words = message.split('\n')
29    max_length = 0
30    for w in words:
31        length = len(w)
32        if length > max_length:
33            max_length = length
34
35    s = message + "\n"
36    for i in range(max_length):
37        s += ruler
38    return s

Format a message string with a ruler. Length of the ruler is the length of the longest word.

Example:
    'My

header' -> 'My header ──────'

def pipe_repr( pipe: meerschaum.Pipe, as_rich_text: bool = False, ansi: Optional[bool] = None) -> Union[str, rich.text.Text]:
279def pipe_repr(
280        pipe: mrsm.Pipe,
281        as_rich_text: bool = False,
282        ansi: Optional[bool] = None,
283    ) -> Union[str, 'rich.text.Text']:
284    """
285    Return a formatted string for representing a `meerschaum.Pipe`.
286    """
287    from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, colored, rich_text_to_str
288    from meerschaum.utils.packages import import_rich, attempt_import
289    rich = import_rich()
290    Text = attempt_import('rich.text').Text
291
292    styles = get_config('formatting', 'pipes', '__repr__', 'ansi', 'styles')
293    if not ANSI or (ansi is False):
294        styles = {k: '' for k in styles}
295    _pipe_style_prefix, _pipe_style_suffix = (
296        (("[" + styles['Pipe'] + "]"), ("[/" + styles['Pipe'] + "]")) if styles['Pipe']
297        else ('', '')
298    )
299    text_obj = (
300        Text.from_markup(_pipe_style_prefix + "Pipe(" + _pipe_style_suffix)
301        + colored(("'" + pipe.connector_keys + "'"), style=styles['connector'], as_rich_text=True)
302        + Text.from_markup(_pipe_style_prefix + ", " + _pipe_style_suffix)
303        + colored(("'" + pipe.metric_key + "'"), style=styles['metric'], as_rich_text=True)
304        + (
305            (
306                colored(', ', style=styles['punctuation'], as_rich_text=True)
307                + colored(
308                    ("'" + pipe.location_key + "'"),
309                    style=styles['location'], as_rich_text=True
310                )
311            ) if pipe.location_key is not None
312            else colored('', style='', as_rich_text=True)
313        ) + (
314            ( ### Add the `instance=` argument.
315                colored(', instance=', style=styles['punctuation'], as_rich_text=True)
316                + colored(
317                    ("'" + pipe.instance_keys + "'"),
318                    style=styles['instance'], as_rich_text=True
319                )
320            ) if pipe.instance_keys != get_config('meerschaum', 'instance')
321            else colored('', style='', as_rich_text=True)
322        )
323        + Text.from_markup(_pipe_style_prefix + ")" + _pipe_style_suffix)
324    )
325    if as_rich_text:
326        return text_obj
327    return rich_text_to_str(text_obj)

Return a formatted string for representing a meerschaum.Pipe.

def pprint( *args, detect_password: bool = True, nopretty: bool = False, **kw) -> None:
 10def pprint(
 11        *args,
 12        detect_password: bool = True,
 13        nopretty: bool = False,
 14        **kw
 15    ) -> None:
 16    """Pretty print an object according to the configured ANSI and UNICODE settings.
 17    If detect_password is True (default), search and replace passwords with '*' characters.
 18    Does not mutate objects.
 19    """
 20    from meerschaum.utils.packages import attempt_import, import_rich
 21    from meerschaum.utils.formatting import ANSI, UNICODE, get_console, print_tuple
 22    from meerschaum.utils.warnings import error
 23    from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords
 24    from collections import OrderedDict
 25    import copy, json
 26
 27    if (
 28        len(args) == 1
 29        and
 30        isinstance(args[0], tuple)
 31        and
 32        len(args[0]) == 2
 33        and
 34        isinstance(args[0][0], bool)
 35        and
 36        isinstance(args[0][1], str)
 37    ):
 38        return print_tuple(args[0])
 39
 40    modify = True
 41    rich_pprint = None
 42    if ANSI and not nopretty:
 43        rich = import_rich()
 44        if rich is not None:
 45            rich_pretty = attempt_import('rich.pretty')
 46        if rich_pretty is not None:
 47            def _rich_pprint(*args, **kw):
 48                _console = get_console()
 49                _kw = filter_keywords(_console.print, **kw)
 50                _console.print(*args, **_kw)
 51            rich_pprint = _rich_pprint
 52    elif not nopretty:
 53        pprintpp = attempt_import('pprintpp', warn=False)
 54        try:
 55            _pprint = pprintpp.pprint
 56        except Exception as e:
 57            import pprint as _pprint_module
 58            _pprint = _pprint_module.pprint
 59
 60    func = (
 61        _pprint if rich_pprint is None else rich_pprint
 62    ) if not nopretty else print
 63
 64    try:
 65        args_copy = copy.deepcopy(args)
 66    except Exception as e:
 67        args_copy = args
 68        modify = False
 69    _args = []
 70    for a in args:
 71        c = a
 72        ### convert OrderedDict into dict
 73        if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict):
 74            c = dict_from_od(copy.deepcopy(c))
 75        _args.append(c)
 76    args = _args
 77
 78    _args = list(args)
 79    if detect_password and modify:
 80        _args = []
 81        for a in args:
 82            c = a
 83            if isinstance(c, dict):
 84                c = replace_password(copy.deepcopy(c))
 85            if nopretty:
 86                try:
 87                    c = json.dumps(c)
 88                    is_json = True
 89                except Exception as e:
 90                    is_json = False
 91                if not is_json:
 92                    try:
 93                        c = str(c)
 94                    except Exception as e:
 95                        pass
 96            _args.append(c)
 97
 98    ### filter out unsupported keywords
 99    func_kw = filter_keywords(func, **kw) if not nopretty else {}
100    error_msg = None
101    try:
102        func(*_args, **func_kw)
103    except Exception as e:
104        error_msg = e
105    if error_msg is not None:
106        error(error_msg)

Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.

def pprint_pipes( pipes: Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]) -> None:
 16def pprint_pipes(pipes: PipesDict) -> None:
 17    """Print a stylized tree of a Pipes dictionary.
 18    Supports ANSI and UNICODE global settings."""
 19    from meerschaum.utils.warnings import error
 20    from meerschaum.utils.packages import attempt_import, import_rich
 21    from meerschaum.utils.misc import sorted_dict, replace_pipes_in_dict
 22    from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, pprint, colored, get_console
 23    import copy
 24    rich = import_rich('rich', warn=False)
 25    Text = None
 26    if rich is not None:
 27        rich_text = attempt_import('rich.text', lazy=False)
 28        Text = rich_text.Text
 29
 30    icons = get_config('formatting', 'pipes', CHARSET, 'icons')
 31    styles = get_config('formatting', 'pipes', 'ansi', 'styles')
 32    if not ANSI:
 33        styles = {k: '' for k in styles}
 34    print()
 35
 36    def ascii_print_pipes():
 37        """Print the dictionary with no unicode allowed. Also works in case rich fails to import
 38        (though rich should auto-install when `attempt_import()` is called)."""
 39        asciitree = attempt_import('asciitree')
 40        ascii_dict, replace_dict = {}, {'connector': {}, 'metric': {}, 'location': {}}
 41        for conn_keys, metrics in pipes.items():
 42            _colored_conn_key = colored(icons['connector'] + conn_keys, style=styles['connector'])
 43            if Text is not None:
 44                replace_dict['connector'][_colored_conn_key] = (
 45                    Text(conn_keys, style=styles['connector'])
 46                )
 47            ascii_dict[_colored_conn_key] = {}
 48            for metric, locations in metrics.items():
 49                _colored_metric_key = colored(icons['metric'] + metric, style=styles['metric'])
 50                if Text is not None:
 51                    replace_dict['metric'][_colored_metric_key] = (
 52                        Text(metric, style=styles['metric'])
 53                    )
 54                ascii_dict[_colored_conn_key][_colored_metric_key] = {}
 55                for location, pipe in locations.items():
 56                    _location_style = styles[('none' if location is None else 'location')]
 57                    pipe_addendum = '\n         ' + pipe.__repr__() + '\n'
 58                    _colored_location = colored(
 59                        icons['location'] + str(location), style=_location_style
 60                    )
 61                    _colored_location_key = _colored_location + pipe_addendum
 62                    if Text is not None:
 63                        replace_dict['location'][_colored_location] = (
 64                            Text(str(location), style=_location_style)
 65                        )
 66                    ascii_dict[_colored_conn_key][_colored_metric_key][_colored_location_key] = {}
 67
 68        tree = asciitree.LeftAligned()
 69        output = ''
 70        cols = []
 71
 72        ### This is pretty terrible, unreadable code.
 73        ### Please know that I'm normally better than this.
 74        key_str = (
 75            (Text("     ") if Text is not None else "     ") +
 76            (
 77                Text("Key", style='underline') if Text is not None else
 78                colored("Key", style='underline')
 79            ) + (Text('\n\n  ') if Text is not None else '\n\n  ') +
 80            (
 81                Text("Connector", style=styles['connector']) if Text is not None else
 82                colored("Connector", style=styles['connector'])
 83            ) + (Text('\n   +-- ') if Text is not None else '\n   +-- ') +
 84            (
 85                Text("Metric", style=styles['metric']) if Text is not None else
 86                colored("Metric", style=styles['metric'])
 87            ) + (Text('\n       +-- ') if Text is not None else '\n       +-- ') +
 88            (
 89                Text("Location", style=styles['location']) if Text is not None else
 90                colored("Location", style=styles['location'])
 91            ) + (Text('\n\n') if Text is not None else '\n\n')
 92        )
 93
 94        output += str(key_str)
 95        cols.append(key_str)
 96
 97        def replace_tree_text(tree_str : str) -> Text:
 98            """Replace the colored words with stylized Text instead.
 99            Is not executed if ANSI and UNICODE are disabled."""
100            tree_text = Text(tree_str) if Text is not None else None
101            for k, v in replace_dict.items():
102                for _colored, _text in v.items():
103                    parts = []
104                    lines = tree_text.split(_colored)
105                    for part in lines:
106                        parts += [part, _text]
107                    if lines[-1] != Text(''):
108                        parts = parts[:-1]
109                    _tree_text = Text('')
110                    for part in parts:
111                        _tree_text += part
112                    tree_text = _tree_text
113            return tree_text
114
115        tree_output = ""
116        for k, v in ascii_dict.items():
117            branch = {k : v}
118            tree_output += tree(branch) + '\n\n'
119            if not UNICODE and not ANSI:
120                _col = (Text(tree(branch)) if Text is not None else tree(branch))
121            else:
122                _col = replace_tree_text(tree(branch))
123            cols.append(_col)
124        if len(output) > 0:
125            tree_output = tree_output[:-2]
126        output += tree_output
127
128        if rich is None:
129            return print(output)
130
131        rich_columns = attempt_import('rich.columns')
132        Columns = rich_columns.Columns
133        columns = Columns(cols)
134        get_console().print(columns)
135
136    if not UNICODE:
137        return ascii_print_pipes()
138
139    rich_panel, rich_tree, rich_text, rich_columns, rich_table = attempt_import(
140        'rich.panel',
141        'rich.tree',
142        'rich.text',
143        'rich.columns',
144        'rich.table',
145    )
146    from rich import box
147    Panel = rich_panel.Panel
148    Tree = rich_tree.Tree
149    Text = rich_text.Text
150    Columns = rich_columns.Columns
151    Table = rich_table.Table
152
153    key_panel = Panel(
154        (
155            Text("\n") +
156            Text(icons['connector'] + "Connector", style=styles['connector']) + Text("\n\n") +
157            Text(icons['metric'] + "Metric", style=styles['metric']) + Text("\n\n") +
158            Text(icons['location'] + "Location", style=styles['location']) + Text("\n")
159        ),
160        title = Text(icons['key'] + "Keys", style=styles['guide']),
161        border_style = styles['guide'],
162        expand = True
163    )
164
165    cols = []
166    conn_trees = {}
167    metric_trees = {}
168    pipes = sorted_dict(pipes)
169    for conn_keys, metrics in pipes.items():
170        conn_trees[conn_keys] = Tree(
171            Text(
172                icons['connector'] + conn_keys,
173                style = styles['connector'],
174            ),
175            guide_style = styles['connector']
176        )
177        metric_trees[conn_keys] = {}
178        for metric, locations in metrics.items():
179            metric_trees[conn_keys][metric] = Tree(
180                Text(
181                    icons['metric'] + metric,
182                    style = styles['metric']
183                ),
184                guide_style = styles['metric']
185            )
186            conn_trees[conn_keys].add(metric_trees[conn_keys][metric])
187            for location, pipe in locations.items():
188                _location = (
189                    Text(str(location), style=styles['none']) if location is None
190                    else Text(location, style=styles['location'])
191                )
192                _location = (
193                    Text(icons['location'])
194                    + _location + Text('\n')
195                    + pipe_repr(pipe, as_rich_text=True) + Text('\n')
196                )
197                metric_trees[conn_keys][metric].add(_location)
198
199    cols += [key_panel]
200    for k, t in conn_trees.items():
201        cols.append(t)
202
203    columns = Columns(cols)
204    get_console().print(columns)

Print a stylized tree of a Pipes dictionary. Supports ANSI and UNICODE global settings.

def translate_rich_to_termcolor(*colors) -> tuple:
59def translate_rich_to_termcolor(*colors) -> tuple:
60    """Translate between rich and more_termcolor terminology.
61    This is probably prone to breaking.
62
63    Parameters
64    ----------
65    *colors :
66        
67
68    Returns
69    -------
70
71    """
72    _colors = []
73    for c in colors:
74        _c_list = []
75        ### handle 'bright'
76        c = c.replace('bright_', 'bright ')
77
78        ### handle 'on'
79        if ' on ' in c:
80            _on = c.split(' on ')
81            _colors.append(_on[0])
82            for _c in _on[1:]:
83                _c_list.append('on ' + _c)
84        else:
85            _c_list += [c]
86
87        _colors += _c_list
88
89    return tuple(_colors)

Translate between rich and more_termcolor terminology. This is probably prone to breaking.

Parameters
  • *colors :
  • Returns
  • -------