meerschaum.utils.formatting

Utilities for formatting output text

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Utilities for formatting output text
  7"""
  8
  9from __future__ import annotations
 10import platform
 11import os
 12import sys
 13import meerschaum as mrsm
 14from meerschaum.utils.typing import Optional, Union, Any, Dict, Iterable
 15from meerschaum.utils.formatting._shell import make_header
 16from meerschaum.utils.formatting._pprint import pprint
 17from meerschaum.utils.formatting._dataframe import pprint_df, format_dataframe
 18from meerschaum.utils.formatting._pipes import (
 19    pprint_pipes,
 20    highlight_pipes,
 21    format_pipe_success_tuple,
 22    print_pipes_results,
 23    extract_stats_from_message,
 24    pipe_repr,
 25)
 26from meerschaum.utils.threading import Lock, RLock
 27
 28_attrs = {
 29    'ANSI': None,
 30    'UNICODE': None,
 31    'CHARSET': None,
 32    'RESET': '\033[0m',
 33}
 34__all__ = sorted([
 35    'ANSI', 'CHARSET', 'UNICODE', 'RESET',
 36    'colored',
 37    'translate_rich_to_termcolor',
 38    'get_console',
 39    'format_success_tuple',
 40    'print_tuple',
 41    'print_options',
 42    'fill_ansi',
 43    'pprint',
 44    'pprint_df',
 45    'format_dataframe',
 46    'highlight_pipes',
 47    'pprint_pipes',
 48    'make_header',
 49    'pipe_repr',
 50    'print_pipes_results',
 51    'extract_stats_from_message',
 52    'format_bytes',
 53])
 54__pdoc__ = {}
 55_locks = {
 56    '_colorama_init': RLock(),
 57}
 58
 59
 60def colored_fallback(*args, **kw):
 61    return ' '.join(args)
 62
 63
 64def format_bytes(num_bytes: Optional[Union[int, float]], precision: int = 1) -> str:
 65    """
 66    Return a human-readable representation of a number of bytes.
 67
 68    Parameters
 69    ----------
 70    num_bytes: Optional[Union[int, float]]
 71        The number of bytes to format. If `None`, return `'?'`.
 72
 73    precision: int, default 1
 74        The number of decimal places to display for non-byte units.
 75
 76    Returns
 77    -------
 78    A human-readable string such as `'1.2 MB'` or `'340.0 kB'`.
 79
 80    Examples
 81    --------
 82    >>> format_bytes(0)
 83    '0 B'
 84    >>> format_bytes(1536)
 85    '1.5 kB'
 86    >>> format_bytes(None)
 87    '?'
 88    """
 89    if num_bytes is None:
 90        return '?'
 91    try:
 92        value = float(num_bytes)
 93    except (TypeError, ValueError):
 94        return '?'
 95
 96    sign = '-' if value < 0 else ''
 97    value = abs(value)
 98    units = ('B', 'kB', 'MB', 'GB', 'TB', 'PB', 'EB')
 99    unit_index = 0
100    while value >= 1000.0 and unit_index < len(units) - 1:
101        value /= 1000.0
102        unit_index += 1
103
104    if unit_index == 0:
105        return f"{sign}{int(value)} {units[unit_index]}"
106    return f"{sign}{value:.{precision}f} {units[unit_index]}"
107
108def translate_rich_to_termcolor(*colors) -> tuple:
109    """Translate between rich and more_termcolor terminology."""
110    _colors = []
111    for c in colors:
112        _c_list = []
113        ### handle 'bright'
114        c = c.replace('bright_', 'bright ')
115
116        ### handle 'on'
117        if ' on ' in c:
118            _on = c.split(' on ')
119            _colors.append(_on[0])
120            for _c in _on[1:]:
121                _c_list.append('on ' + _c)
122        else:
123            _c_list += [c]
124
125        _colors += _c_list
126
127    return tuple(_colors)
128
129
130def rich_text_to_str(text: 'rich.text.Text') -> str:
131    """Convert a `rich.text.Text` object to a string with ANSI in-tact."""
132    _console = get_console()
133    if _console is None:
134        return str(text)
135    with console.capture() as cap:
136        console.print(text)
137    string = cap.get()
138    return string[:-1]
139
140
141def _init():
142    """
143    Initial color settings (mostly for Windows).
144    """
145    if platform.system() != "Windows":
146        return
147    if 'PYTHONIOENCODING' not in os.environ:
148        os.environ['PYTHONIOENCODING'] = 'utf-8'
149    if 'PYTHONLEGACYWINDOWSSTDIO' not in os.environ:
150        os.environ['PYTHONLEGACYWINDOWSSTDIO'] = 'utf-8'
151    sys.stdin.reconfigure(encoding='utf-8')
152    sys.stdout.reconfigure(encoding='utf-8')
153    sys.stderr.reconfigure(encoding='utf-8')
154
155    from ctypes import windll
156    k = windll.kernel32
157    k.SetConsoleMode(k.GetStdHandle(-11), 7)
158    os.system("color")
159
160    from meerschaum.utils.packages import attempt_import
161    ### init colorama for Windows color output
162    colorama, more_termcolor = attempt_import(
163        'colorama',
164        'more_termcolor',
165        lazy = False,
166        warn = False,
167        color = False,
168    )
169    try:
170        colorama.init(autoreset=False)
171        success = True
172    except Exception:
173        import traceback
174        traceback.print_exc()
175        _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii'
176        success = False
177
178    if more_termcolor is None:
179        _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii'
180        success = False
181
182    return success
183
184_colorama_init = False
185def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']:
186    """Apply colors and rich styles to a string.
187    If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string.
188    Otherwise attempt to use the legacy `more_termcolor.colored` method.
189
190    Parameters
191    ----------
192    text: str
193        The string to apply formatting to.
194        
195    *colors:
196        A list of colors to pass to `more_termcolor.colored()`.
197        Has no effect if `style` is provided.
198
199    style: str, default None
200        If provided, pass to `rich` for processing.
201
202    as_rich_text: bool, default False
203        If `True`, return a `rich.Text` object.
204        `style` must be provided.
205        
206    **kw:
207        Keyword arguments to pass to `rich.text.Text` or `more_termcolor`.
208        
209
210    Returns
211    -------
212    An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`.
213
214    """
215    from meerschaum.utils.packages import import_rich, attempt_import
216    global _colorama_init
217    _init()
218    with _locks['_colorama_init']:
219        if not _colorama_init:
220            _colorama_init = _init()
221
222    if 'style' in kw:
223        rich = import_rich()
224        rich_text = attempt_import('rich.text')
225        text_obj = rich_text.Text(text, **kw)
226        if as_rich_text:
227            return text_obj
228        return rich_text_to_str(text_obj)
229
230    more_termcolor = attempt_import('more_termcolor', lazy=False)
231    try:
232        colored_text = more_termcolor.colored(text, *colors, **kw)
233    except Exception as e:
234        colored_text = None
235
236    if colored_text is not None:
237        return colored_text
238
239    try:
240        _colors = translate_rich_to_termcolor(*colors)
241        colored_text = more_termcolor.colored(text, *_colors, **kw)
242    except Exception as e:
243        colored_text = None
244
245    if colored_text is None:
246        ### NOTE: warn here?
247        return text
248
249    return colored_text
250
251console = None
252def get_console():
253    """Get the rich console."""
254    global console
255    from meerschaum.utils.packages import import_rich, attempt_import
256    rich = import_rich()
257    rich_console = attempt_import('rich.console')
258    try:
259        console = rich_console.Console(force_terminal=True, color_system='truecolor')
260    except Exception:
261        import traceback
262        traceback.print_exc()
263        console = None
264    return console
265
266
267def print_tuple(
268    tup: mrsm.SuccessTuple,
269    skip_common: bool = True,
270    common_only: bool = False,
271    upper_padding: int = 1,
272    lower_padding: int = 1,
273    left_padding: int = 1,
274    calm: bool = False,
275    _progress: Optional['rich.progress.Progress'] = None,
276) -> None:
277    """
278    Format `meerschaum.utils.typing.SuccessTuple`.
279
280    Parameters
281    ----------
282    skip_common: bool, default True
283        If `True`, do not print common success tuples (i.e. `(True, "Success")`).
284
285    common_only: bool, default False
286        If `True`, only print if the success tuple is common.
287
288    upper_padding: int, default 0
289        How many newlines to prepend to the message.
290
291    lower_padding: int, default 0
292        How many newlines to append to the message.
293
294    left_padding: int, default 1
295        How mant spaces to preprend to the message.
296
297    calm: bool, default False
298        If `True`, use the default emoji and color scheme.
299
300    """
301    from meerschaum._internal.static import STATIC_CONFIG
302    do_print = True
303
304    omit_messages = STATIC_CONFIG['system']['success']['ignore']
305
306    if common_only:
307        skip_common = False
308        do_print = tup[1].strip() in omit_messages
309
310    if skip_common:
311        do_print = tup[1].strip() not in omit_messages
312
313    if not do_print:
314        return
315
316    print(format_success_tuple(
317        tup,
318        upper_padding=upper_padding,
319        lower_padding=lower_padding,
320        calm=calm,
321        _progress=_progress,
322    ))
323
324
325def format_success_tuple(
326    tup: mrsm.SuccessTuple,
327    upper_padding: int = 0,
328    lower_padding: int = 0,
329    left_padding: int = 1,
330    calm: bool = False,
331    _progress: Optional['rich.progress.Progress'] = None,
332) -> str:
333    """
334    Format `meerschaum.utils.typing.SuccessTuple`.
335
336    Parameters
337    ----------
338    upper_padding: int, default 0
339        How many newlines to prepend to the message.
340
341    lower_padding: int, default 0
342        How many newlines to append to the message.
343
344    left_padding: int, default 1
345        How mant spaces to preprend to the message.
346
347    calm: bool, default False
348        If `True`, use the default emoji and color scheme.
349    """
350    _init()
351    try:
352        status = 'success' if tup[0] else 'failure'
353    except TypeError:
354        status = 'failure'
355        tup = None, None
356
357    if calm:
358        status += '_calm'
359
360    ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET')
361    from meerschaum.config import get_config
362    status_config = get_config('formatting', status, patch=True)
363
364    msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1]))
365    lines = msg.split('\n')
366    lines = [lines[0]] + [
367        (('    ' + line if not line.startswith(' ') else line))
368        for line in lines[1:]
369    ]
370    if ANSI:
371        lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich'])
372
373    msg = '\n'.join(lines)
374    msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding)
375    return msg
376
377
378def print_options(
379    options: Optional[Iterable[Any]] = None,
380    nopretty: bool = False,
381    no_rich: bool = False,
382    name: str = 'options',
383    header: Optional[str] = None,
384    num_cols: Optional[int] = None,
385    adjust_cols: bool = True,
386    sort_options: bool = False,
387    number_options: bool = False,
388    **kw
389) -> None:
390    """
391    Print items in an iterable as a fancy table.
392
393    Parameters
394    ----------
395    options: Optional[Dict[str, Any]], default None
396        The iterable to be printed.
397
398    nopretty: bool, default False
399        If `True`, don't use fancy formatting.
400
401    no_rich: bool, default False
402        If `True`, don't use `rich` to format the output.
403
404    name: str, default 'options'
405        The text in the default header after `'Available'`.
406
407    header: Optional[str], default None
408        If provided, override `name` and use this as the header text.
409
410    num_cols: Optional[int], default None
411        How many columns in the table. Depends on the terminal size. If `None`, use 8.
412
413    adjust_cols: bool, default True
414        If `True`, adjust the number of columns depending on the terminal size.
415
416    sort_options: bool, default False
417        If `True`, print the options in sorted order.
418
419    number_options: bool, default False
420        If `True`, print the option's number in the list (1 index).
421
422    """
423    from meerschaum.utils.packages import import_rich
424    from meerschaum.utils.formatting import highlight_pipes
425    from meerschaum.utils.misc import get_cols_lines, string_width
426
427    if options is None:
428        options = {}
429    _options = []
430    for o in options:
431        _options.append(str(o))
432    if sort_options:
433        _options = sorted(_options)
434    _header = f"\nAvailable {name}" if header is None else header
435
436    if num_cols is None:
437        num_cols = 8
438
439    def _print_options_no_rich():
440        if not nopretty:
441            print()
442            print(make_header(_header))
443        ### print actions
444        for i, option in enumerate(_options):
445            marker = '-' if not number_options else (str(i + 1) + '.')
446            if not nopretty:
447                print(f"  {marker} ", end="")
448            print(option)
449        if not nopretty:
450            print()
451
452    rich = import_rich()
453    if rich is None or nopretty or no_rich:
454        _print_options_no_rich()
455        return None
456
457    ### Prevent too many options from being truncated on small terminals.
458    if adjust_cols and _options:
459        _cols, _lines = get_cols_lines()
460        while num_cols > 1:
461            cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols)
462            num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options])
463            if num_too_big > int(len(_options) / 3):
464                num_cols -= 1
465                continue
466            break
467
468    from meerschaum.utils.packages import attempt_import
469    rich_table = attempt_import('rich.table')
470    Text = attempt_import('rich.text').Text
471    box = attempt_import('rich.box')
472    Table = rich_table.Table
473
474    if _header is not None:
475        table = Table(
476            title=_header,
477            box=box.SIMPLE,
478            show_header=False,
479            show_footer=False,
480            title_style='',
481            expand = True,
482        )
483    else:
484        table = Table.grid(padding=(0, 2))
485    for i in range(num_cols):
486        table.add_column()
487
488    if len(_options) < 12:
489        ### If fewer than 12 items, use a single column
490        for i, option in enumerate(_options):
491            item = highlight_pipes(option)
492            if number_options:
493                item = str(i + 1) + '. ' + item
494            table.add_row(Text.from_ansi(item))
495    else:
496        ### Otherwise, use multiple columns as before
497        num_rows = (len(_options) + num_cols - 1) // num_cols
498        item_ix = 0
499        for i in range(num_rows):
500            row = []
501            for j in range(num_cols):
502                index = i + j * num_rows
503                if index < len(_options):
504                    item = highlight_pipes(_options[index])
505                    if number_options:
506                        item = str(i + 1) + '. ' + item
507                    row.append(Text.from_ansi(item))
508                    item_ix += 1
509                else:
510                    row.append('')
511            table.add_row(*row)
512
513    get_console().print(table)
514    return None
515
516
517def fill_ansi(string: str, style: str = '') -> str:
518    """
519    Fill in non-formatted segments of ANSI text.
520
521    Parameters
522    ----------
523    string: str
524        A string which contains ANSI escape codes.
525
526    style: str
527        Style arguments to pass to `rich.text.Text`.
528
529    Returns
530    -------
531    A string with ANSI styling applied to the segments which don't yet have a style applied.
532    """
533    from meerschaum.utils.packages import import_rich, attempt_import
534    from meerschaum.utils.misc import iterate_chunks
535    _ = import_rich()
536    rich_ansi, rich_text = attempt_import('rich.ansi', 'rich.text')
537    Text = rich_text.Text
538    try:
539        msg = Text.from_ansi(string)
540    except AttributeError:
541        import traceback
542        traceback.print_exc()
543        msg = ''
544
545    plain_indices = []
546    for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)):
547        left = left_span.end
548        right = right_span.start if not isinstance(right_span, int) else right_span
549        if left != right:
550            plain_indices.append((left, right))
551    if msg.spans:
552        if msg.spans[0].start != 0:
553            plain_indices = [(0, msg.spans[0].start)] + plain_indices
554        if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg):
555            plain_indices.append((msg.spans[-1].end, len(msg)))
556
557    if plain_indices:
558        for left, right in plain_indices:
559            msg.stylize(style, left, right)
560    else:
561        msg = Text(str(msg), style)
562
563    return rich_text_to_str(msg)
564
565
566def __getattr__(name: str) -> str:
567    """
568    Lazily load module-level variables.
569    """
570    if name.startswith('__') and name.endswith('__'):
571        raise AttributeError("Cannot import dunders from this module.")
572
573    if name in _attrs:
574        if _attrs[name] is not None:
575            return _attrs[name]
576        from meerschaum.config import get_config
577        if name.lower() in get_config('formatting'):
578            _attrs[name] = get_config('formatting', name.lower())
579        elif name == 'CHARSET':
580            _attrs[name] = 'unicode' if __getattr__('UNICODE') else 'ascii'
581        return _attrs[name]
582    
583    if name == '__wrapped__':
584        import sys
585        return sys.modules[__name__]
586    if name == '__all__':
587        return __all__
588
589    try:
590        return globals()[name]
591    except KeyError:
592        raise AttributeError(f"Could not find '{name}'")
ANSI
CHARSET
RESET
UNICODE
def colored( text: str, *colors, as_rich_text: bool = False, **kw) -> Union[str, rich.text.Text]:
186def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']:
187    """Apply colors and rich styles to a string.
188    If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string.
189    Otherwise attempt to use the legacy `more_termcolor.colored` method.
190
191    Parameters
192    ----------
193    text: str
194        The string to apply formatting to.
195        
196    *colors:
197        A list of colors to pass to `more_termcolor.colored()`.
198        Has no effect if `style` is provided.
199
200    style: str, default None
201        If provided, pass to `rich` for processing.
202
203    as_rich_text: bool, default False
204        If `True`, return a `rich.Text` object.
205        `style` must be provided.
206        
207    **kw:
208        Keyword arguments to pass to `rich.text.Text` or `more_termcolor`.
209        
210
211    Returns
212    -------
213    An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`.
214
215    """
216    from meerschaum.utils.packages import import_rich, attempt_import
217    global _colorama_init
218    _init()
219    with _locks['_colorama_init']:
220        if not _colorama_init:
221            _colorama_init = _init()
222
223    if 'style' in kw:
224        rich = import_rich()
225        rich_text = attempt_import('rich.text')
226        text_obj = rich_text.Text(text, **kw)
227        if as_rich_text:
228            return text_obj
229        return rich_text_to_str(text_obj)
230
231    more_termcolor = attempt_import('more_termcolor', lazy=False)
232    try:
233        colored_text = more_termcolor.colored(text, *colors, **kw)
234    except Exception as e:
235        colored_text = None
236
237    if colored_text is not None:
238        return colored_text
239
240    try:
241        _colors = translate_rich_to_termcolor(*colors)
242        colored_text = more_termcolor.colored(text, *_colors, **kw)
243    except Exception as e:
244        colored_text = None
245
246    if colored_text is None:
247        ### NOTE: warn here?
248        return text
249
250    return colored_text

Apply colors and rich styles to a string. If a style keyword is provided, a rich.text.Text object will be parsed into a string. Otherwise attempt to use the legacy more_termcolor.colored method.

Parameters
  • text (str): The string to apply formatting to.
  • *colors:: A list of colors to pass to more_termcolor.colored(). Has no effect if style is provided.
  • style (str, default None): If provided, pass to rich for processing.
  • as_rich_text (bool, default False): If True, return a rich.Text object. style must be provided.
  • **kw:: Keyword arguments to pass to rich.text.Text or more_termcolor.
Returns
  • An ANSI-formatted string or a rich.text.Text object if as_rich_text is True.
def extract_stats_from_message(message: str, stat_keys: Optional[List[str]] = None) -> Dict[str, int]:
487def extract_stats_from_message(
488    message: str,
489    stat_keys: Optional[List[str]] = None,
490) -> Dict[str, int]:
491    """
492    Given a sync message, return the insert, update, upsert stats from within.
493
494    Parameters
495    ----------
496    message: str
497        The message to parse for statistics.
498
499    stat_keys: Optional[List[str]], default None
500        If provided, search for these words (case insensitive) in the message.
501        Defaults to `['inserted', 'updated', 'upserted']`.
502
503    Returns
504    -------
505    A dictionary mapping the stat keys to the total number of rows affected.
506    """
507    stat_keys = stat_keys or ['inserted', 'updated', 'upserted', 'checked']
508    lines_stats = [extract_stats_from_line(line, stat_keys) for line in message.split('\n')]
509    message_stats = {
510        stat_key: sum(stats.get(stat_key, 0) for stats in lines_stats)
511        for stat_key in stat_keys
512    }
513    return message_stats

Given a sync message, return the insert, update, upsert stats from within.

Parameters
  • message (str): The message to parse for statistics.
  • stat_keys (Optional[List[str]], default None): If provided, search for these words (case insensitive) in the message. Defaults to ['inserted', 'updated', 'upserted'].
Returns
  • A dictionary mapping the stat keys to the total number of rows affected.
def fill_ansi(string: str, style: str = '') -> str:
518def fill_ansi(string: str, style: str = '') -> str:
519    """
520    Fill in non-formatted segments of ANSI text.
521
522    Parameters
523    ----------
524    string: str
525        A string which contains ANSI escape codes.
526
527    style: str
528        Style arguments to pass to `rich.text.Text`.
529
530    Returns
531    -------
532    A string with ANSI styling applied to the segments which don't yet have a style applied.
533    """
534    from meerschaum.utils.packages import import_rich, attempt_import
535    from meerschaum.utils.misc import iterate_chunks
536    _ = import_rich()
537    rich_ansi, rich_text = attempt_import('rich.ansi', 'rich.text')
538    Text = rich_text.Text
539    try:
540        msg = Text.from_ansi(string)
541    except AttributeError:
542        import traceback
543        traceback.print_exc()
544        msg = ''
545
546    plain_indices = []
547    for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)):
548        left = left_span.end
549        right = right_span.start if not isinstance(right_span, int) else right_span
550        if left != right:
551            plain_indices.append((left, right))
552    if msg.spans:
553        if msg.spans[0].start != 0:
554            plain_indices = [(0, msg.spans[0].start)] + plain_indices
555        if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg):
556            plain_indices.append((msg.spans[-1].end, len(msg)))
557
558    if plain_indices:
559        for left, right in plain_indices:
560            msg.stylize(style, left, right)
561    else:
562        msg = Text(str(msg), style)
563
564    return rich_text_to_str(msg)

Fill in non-formatted segments of ANSI text.

Parameters
  • string (str): A string which contains ANSI escape codes.
  • style (str): Style arguments to pass to rich.text.Text.
Returns
  • A string with ANSI styling applied to the segments which don't yet have a style applied.
def format_bytes(num_bytes: Union[int, float, NoneType], precision: int = 1) -> str:
 65def format_bytes(num_bytes: Optional[Union[int, float]], precision: int = 1) -> str:
 66    """
 67    Return a human-readable representation of a number of bytes.
 68
 69    Parameters
 70    ----------
 71    num_bytes: Optional[Union[int, float]]
 72        The number of bytes to format. If `None`, return `'?'`.
 73
 74    precision: int, default 1
 75        The number of decimal places to display for non-byte units.
 76
 77    Returns
 78    -------
 79    A human-readable string such as `'1.2 MB'` or `'340.0 kB'`.
 80
 81    Examples
 82    --------
 83    >>> format_bytes(0)
 84    '0 B'
 85    >>> format_bytes(1536)
 86    '1.5 kB'
 87    >>> format_bytes(None)
 88    '?'
 89    """
 90    if num_bytes is None:
 91        return '?'
 92    try:
 93        value = float(num_bytes)
 94    except (TypeError, ValueError):
 95        return '?'
 96
 97    sign = '-' if value < 0 else ''
 98    value = abs(value)
 99    units = ('B', 'kB', 'MB', 'GB', 'TB', 'PB', 'EB')
100    unit_index = 0
101    while value >= 1000.0 and unit_index < len(units) - 1:
102        value /= 1000.0
103        unit_index += 1
104
105    if unit_index == 0:
106        return f"{sign}{int(value)} {units[unit_index]}"
107    return f"{sign}{value:.{precision}f} {units[unit_index]}"

Return a human-readable representation of a number of bytes.

Parameters
  • num_bytes (Optional[Union[int, float]]): The number of bytes to format. If None, return '?'.
  • precision (int, default 1): The number of decimal places to display for non-byte units.
Returns
  • A human-readable string such as '1.2 MB' or '340.0 kB'.
Examples
>>> format_bytes(0)
'0 B'
>>> format_bytes(1536)
'1.5 kB'
>>> format_bytes(None)
'?'
def format_dataframe(df: Any, max_rows: int | None = None) -> str:
20def format_dataframe(df: Any, max_rows: int | None = None) -> str:
21    """
22    Return a full, untruncated string representation of a DataFrame.
23
24    Parameters
25    ----------
26    df: pandas.DataFrame
27        The DataFrame to format.
28
29    max_rows: int | None, default None
30        If set, only render the first `max_rows` rows and append a note.
31        `None` renders every row.
32
33    Returns
34    -------
35    A Markdown table (falling back to a plain fixed-width table if `tabulate`
36    is unavailable), followed by a `[rows x columns]` shape footer.
37    """
38    from meerschaum.utils.packages import attempt_import
39    pd = attempt_import('pandas', lazy=False)
40
41    n_rows, n_cols = df.shape
42    render_df = df if (max_rows is None or n_rows <= max_rows) else df.head(max_rows)
43    truncated_rows = n_rows - len(render_df)
44
45    ### Never let pandas insert `...` for columns, rows, or cell contents.
46    options = [
47        'display.max_columns', None,
48        'display.max_rows', None,
49        'display.width', None,
50        'display.max_colwidth', None,
51    ]
52
53    body = None
54    with pd.option_context(*options):
55        try:
56            tabulate = attempt_import('tabulate', warn=False)
57            if tabulate is not None:
58                body = render_df.to_markdown(index=False)
59        except Exception:
60            body = None
61
62        if body is None:
63            ### Fall back to the built-in fixed-width table (no extra dependency).
64            body = render_df.to_string(index=False, max_rows=None, max_cols=None)
65
66    footer = (
67        f"\n[{n_rows} row{'s' if n_rows != 1 else ''} "
68        f"x {n_cols} column{'s' if n_cols != 1 else ''}]"
69    )
70    if truncated_rows > 0:
71        footer = (
72            f"\n[showing first {len(render_df)} of {n_rows} rows "
73            f"x {n_cols} column{'s' if n_cols != 1 else ''}]"
74        )
75    return body + footer

Return a full, untruncated string representation of a DataFrame.

Parameters
  • df (pandas.DataFrame): The DataFrame to format.
  • max_rows (int | None, default None): If set, only render the first max_rows rows and append a note. None renders every row.
Returns
  • A Markdown table (falling back to a plain fixed-width table if tabulate
  • is unavailable), followed by a [rows x columns] shape footer.
def format_success_tuple( tup: Tuple[bool, str], upper_padding: int = 0, lower_padding: int = 0, left_padding: int = 1, calm: bool = False, _progress: Optional[rich.progress.Progress] = None) -> str:
326def format_success_tuple(
327    tup: mrsm.SuccessTuple,
328    upper_padding: int = 0,
329    lower_padding: int = 0,
330    left_padding: int = 1,
331    calm: bool = False,
332    _progress: Optional['rich.progress.Progress'] = None,
333) -> str:
334    """
335    Format `meerschaum.utils.typing.SuccessTuple`.
336
337    Parameters
338    ----------
339    upper_padding: int, default 0
340        How many newlines to prepend to the message.
341
342    lower_padding: int, default 0
343        How many newlines to append to the message.
344
345    left_padding: int, default 1
346        How mant spaces to preprend to the message.
347
348    calm: bool, default False
349        If `True`, use the default emoji and color scheme.
350    """
351    _init()
352    try:
353        status = 'success' if tup[0] else 'failure'
354    except TypeError:
355        status = 'failure'
356        tup = None, None
357
358    if calm:
359        status += '_calm'
360
361    ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET')
362    from meerschaum.config import get_config
363    status_config = get_config('formatting', status, patch=True)
364
365    msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1]))
366    lines = msg.split('\n')
367    lines = [lines[0]] + [
368        (('    ' + line if not line.startswith(' ') else line))
369        for line in lines[1:]
370    ]
371    if ANSI:
372        lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich'])
373
374    msg = '\n'.join(lines)
375    msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding)
376    return msg

Format meerschaum.utils.typing.SuccessTuple.

Parameters
  • upper_padding (int, default 0): How many newlines to prepend to the message.
  • lower_padding (int, default 0): How many newlines to append to the message.
  • left_padding (int, default 1): How mant spaces to preprend to the message.
  • calm (bool, default False): If True, use the default emoji and color scheme.
def get_console():
253def get_console():
254    """Get the rich console."""
255    global console
256    from meerschaum.utils.packages import import_rich, attempt_import
257    rich = import_rich()
258    rich_console = attempt_import('rich.console')
259    try:
260        console = rich_console.Console(force_terminal=True, color_system='truecolor')
261    except Exception:
262        import traceback
263        traceback.print_exc()
264        console = None
265    return console

Get the rich console.

def highlight_pipes(message: str) -> str:
345def highlight_pipes(message: str) -> str:
346    """
347    Add syntax highlighting to an info message containing stringified `meerschaum.Pipe` objects.
348    """
349    if 'Pipe(' not in message:
350        return message
351
352    from meerschaum.utils.misc import parse_arguments_str
353    segments = message.split('Pipe(')
354    msg = ''
355    for i, segment in enumerate(segments):
356        if i == 0:
357            msg += segment
358            continue
359
360        paren_index = segment.find(')')
361        if paren_index == -1:
362            msg += 'Pipe(' + segment
363            continue
364        
365        pipe_args_str = segment[:paren_index]
366        try:
367            args, kwargs = parse_arguments_str(pipe_args_str)
368            pipe_dict = {
369                'connector_keys': args[0],
370                'metric_key': args[1],
371            }
372            if len(args) > 2:
373                pipe_dict['location_key'] = args[2]
374            if 'instance' in kwargs:
375                pipe_dict['instance_keys'] = kwargs['instance']
376            
377            _to_add = pipe_repr(pipe_dict) + segment[paren_index + 1:]
378        except Exception:
379            _to_add = 'Pipe(' + segment
380        msg += _to_add
381    return msg

Add syntax highlighting to an info message containing stringified meerschaum.Pipe objects.

def make_header( message: str, ruler: str = '─', left_pad: int = 2, top: bool = False, top_pad: int = 1) -> str:
17def make_header(
18    message: str,
19    ruler: str = '─',
20    left_pad: int = 2,
21    top: bool = False,
22    top_pad: int = 1,
23) -> str:
24    """
25    Format a message string with a ruler or box.
26    Length of the ruler is the length of the longest word.
27    
28    Example:
29        'My\nheader' -> '  My\n  header\n  ──────'
30    """
31
32    from meerschaum.utils.formatting import ANSI, UNICODE, colored
33    if not UNICODE:
34        ruler = '-'
35    words = message.split('\n')
36    max_length = 0
37    for w in words:
38        length = len(w)
39        if length > max_length:
40            max_length = length
41
42    left_buffer = left_pad * ' '
43
44    return (
45        (('\n' * top_pad) if top else "")
46        + left_buffer
47        + (((ruler * max_length) + '\n') if top else "")
48        + message.replace('\n', '\n' + left_buffer)
49        + "\n"
50        + left_buffer
51        + (ruler * max_length)
52    )

Format a message string with a ruler or box. Length of the ruler is the length of the longest word.

Example:
    'My

header' -> ' My header ──────'

def pipe_repr( pipe: Union[meerschaum.Pipe, Dict[str, Any]], as_rich_text: bool = False, ansi: Optional[bool] = None) -> Union[str, rich.text.Text]:
280def pipe_repr(
281    pipe: Union[mrsm.Pipe, Dict[str, Any]],
282    as_rich_text: bool = False,
283    ansi: Optional[bool] = None,
284) -> Union[str, 'rich.text.Text']:
285    """
286    Return a formatted string for representing a `meerschaum.Pipe`.
287    """
288    from meerschaum.utils.formatting import ANSI, colored, rich_text_to_str
289    from meerschaum.utils.packages import import_rich, attempt_import
290    import meerschaum as mrsm
291
292    _ = import_rich()
293    Text = attempt_import('rich.text').Text
294
295    if isinstance(pipe, mrsm.Pipe):
296        connector_keys = pipe.connector_keys
297        metric_key = pipe.metric_key
298        location_key = pipe.location_key
299        instance_keys = pipe.instance_keys
300    else:
301        connector_keys = pipe.get('connector_keys')
302        metric_key = pipe.get('metric_key')
303        location_key = pipe.get('location_key')
304        instance_keys = pipe.get('instance_keys', get_config('meerschaum', 'instance'))
305
306    styles = get_config('formatting', 'pipes', '__repr__', 'ansi', 'styles')
307    if not ANSI or (ansi is False):
308        styles = {k: '' for k in styles}
309    _pipe_style_prefix, _pipe_style_suffix = (
310        (("[" + styles['Pipe'] + "]"), ("[/" + styles['Pipe'] + "]")) if styles['Pipe']
311        else ('', '')
312    )
313    text_obj = (
314        Text.from_markup(_pipe_style_prefix + "Pipe(" + _pipe_style_suffix)
315        + colored(("'" + connector_keys + "'"), style=styles['connector'], as_rich_text=True)
316        + Text.from_markup(_pipe_style_prefix + ", " + _pipe_style_suffix)
317        + colored(("'" + metric_key + "'"), style=styles['metric'], as_rich_text=True)
318        + (
319            (
320                colored(', ', style=styles['punctuation'], as_rich_text=True)
321                + colored(
322                    ("'" + location_key + "'"),
323                    style=styles['location'], as_rich_text=True
324                )
325            ) if location_key is not None
326            else colored('', style='', as_rich_text=True)
327        ) + (
328            ( ### Add the `instance=` argument.
329                colored(', instance=', style=styles['punctuation'], as_rich_text=True)
330                + colored(
331                    ("'" + instance_keys + "'"),
332                    style=styles['instance'], as_rich_text=True
333                )
334            ) if instance_keys != get_config('meerschaum', 'instance')
335            else colored('', style='', as_rich_text=True)
336        )
337        + Text.from_markup(_pipe_style_prefix + ")" + _pipe_style_suffix)
338    )
339    if as_rich_text:
340        return text_obj
341    return rich_text_to_str(text_obj).replace('\n', '')

Return a formatted string for representing a meerschaum.Pipe.

def pprint( *args, detect_password: bool = True, nopretty: bool = False, **kw) -> None:
 10def pprint(
 11    *args,
 12    detect_password: bool = True,
 13    nopretty: bool = False,
 14    **kw
 15) -> None:
 16    """Pretty print an object according to the configured ANSI and UNICODE settings.
 17    If detect_password is True (default), search and replace passwords with '*' characters.
 18    Does not mutate objects.
 19    """
 20    import copy
 21    import json
 22    from meerschaum.utils.packages import attempt_import, import_rich
 23    from meerschaum.utils.formatting import ANSI, get_console, print_tuple
 24    from meerschaum.utils.warnings import error
 25    from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords
 26    from collections import OrderedDict
 27
 28    if (
 29        len(args) == 1
 30        and
 31        isinstance(args[0], tuple)
 32        and
 33        len(args[0]) == 2
 34        and
 35        isinstance(args[0][0], bool)
 36        and
 37        isinstance(args[0][1], str)
 38    ):
 39        return print_tuple(args[0], **filter_keywords(print_tuple, **kw))
 40
 41    modify = True
 42    rich_pprint = None
 43    if ANSI and not nopretty:
 44        rich = import_rich()
 45        if rich is not None:
 46            rich_pretty = attempt_import('rich.pretty')
 47        if rich_pretty is not None:
 48            def _rich_pprint(*args, **kw):
 49                _console = get_console()
 50                _kw = filter_keywords(_console.print, **kw)
 51                _console.print(*args, **_kw)
 52            rich_pprint = _rich_pprint
 53    elif not nopretty:
 54        pprintpp = attempt_import('pprintpp', warn=False)
 55        try:
 56            _pprint = pprintpp.pprint
 57        except Exception :
 58            import pprint as _pprint_module
 59            _pprint = _pprint_module.pprint
 60
 61    func = (
 62        _pprint if rich_pprint is None else rich_pprint
 63    ) if not nopretty else print
 64
 65    try:
 66        args_copy = copy.deepcopy(args)
 67    except Exception:
 68        args_copy = args
 69        modify = False
 70
 71    _args = []
 72    for a in args:
 73        c = a
 74        ### convert OrderedDict into dict
 75        if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict):
 76            c = dict_from_od(copy.deepcopy(c))
 77        _args.append(c)
 78    args = _args
 79
 80    _args = list(args)
 81    if detect_password and modify:
 82        _args = []
 83        for a in args:
 84            c = a
 85            if isinstance(c, dict):
 86                c = replace_password(copy.deepcopy(c))
 87            if nopretty:
 88                try:
 89                    c = json.dumps(c)
 90                    is_json = True
 91                except Exception:
 92                    is_json = False
 93                if not is_json:
 94                    try:
 95                        c = str(c)
 96                    except Exception:
 97                        pass
 98            _args.append(c)
 99
100    ### filter out unsupported keywords
101    func_kw = filter_keywords(func, **kw) if not nopretty else {}
102    error_msg = None
103    try:
104        func(*_args, **func_kw)
105    except Exception as e:
106        error_msg = e
107    if error_msg is not None:
108        error(error_msg)

Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.

def pprint_df(df: Any, max_rows: int | None = None) -> None:
78def pprint_df(df: Any, max_rows: int | None = None) -> None:
79    """
80    Print a DataFrame in full (no column truncation), Markdown-formatted.
81
82    See `format_dataframe` for details.
83    """
84    print(format_dataframe(df, max_rows=max_rows))

Print a DataFrame in full (no column truncation), Markdown-formatted.

See format_dataframe for details.

def pprint_pipes( pipes: Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]]) -> None:
 17def pprint_pipes(pipes: PipesDict) -> None:
 18    """Print a stylized tree of a Pipes dictionary.
 19    Supports ANSI and UNICODE global settings."""
 20    from meerschaum.utils.warnings import error
 21    from meerschaum.utils.packages import attempt_import, import_rich
 22    from meerschaum.utils.misc import sorted_dict, replace_pipes_in_dict
 23    from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, pprint, colored, get_console
 24    import copy
 25    rich = import_rich('rich', warn=False)
 26    Text = None
 27    if rich is not None:
 28        rich_text = attempt_import('rich.text', lazy=False)
 29        Text = rich_text.Text
 30
 31    icons = get_config('formatting', 'pipes', CHARSET, 'icons')
 32    styles = get_config('formatting', 'pipes', 'ansi', 'styles')
 33    if not ANSI:
 34        styles = {k: '' for k in styles}
 35    print()
 36
 37    def ascii_print_pipes():
 38        """Print the dictionary with no unicode allowed. Also works in case rich fails to import
 39        (though rich should auto-install when `attempt_import()` is called)."""
 40        asciitree = attempt_import('asciitree')
 41        ascii_dict, replace_dict = {}, {'connector': {}, 'metric': {}, 'location': {}}
 42        for conn_keys, metrics in pipes.items():
 43            _colored_conn_key = colored(icons['connector'] + conn_keys, style=styles['connector'])
 44            if Text is not None:
 45                replace_dict['connector'][_colored_conn_key] = (
 46                    Text(conn_keys, style=styles['connector'])
 47                )
 48            ascii_dict[_colored_conn_key] = {}
 49            for metric, locations in metrics.items():
 50                _colored_metric_key = colored(icons['metric'] + metric, style=styles['metric'])
 51                if Text is not None:
 52                    replace_dict['metric'][_colored_metric_key] = (
 53                        Text(metric, style=styles['metric'])
 54                    )
 55                ascii_dict[_colored_conn_key][_colored_metric_key] = {}
 56                for location, pipe in locations.items():
 57                    _location_style = styles[('none' if location is None else 'location')]
 58                    pipe_addendum = '\n         ' + pipe.__repr__() + '\n'
 59                    _colored_location = colored(
 60                        icons['location'] + str(location), style=_location_style
 61                    )
 62                    _colored_location_key = _colored_location + pipe_addendum
 63                    if Text is not None:
 64                        replace_dict['location'][_colored_location] = (
 65                            Text(str(location), style=_location_style)
 66                        )
 67                    ascii_dict[_colored_conn_key][_colored_metric_key][_colored_location_key] = {}
 68
 69        tree = asciitree.LeftAligned()
 70        output = ''
 71        cols = []
 72
 73        ### This is pretty terrible, unreadable code.
 74        ### Please know that I'm normally better than this.
 75        key_str = (
 76            (Text("     ") if Text is not None else "     ") +
 77            (
 78                Text("Key", style='underline') if Text is not None else
 79                colored("Key", style='underline')
 80            ) + (Text('\n\n  ') if Text is not None else '\n\n  ') +
 81            (
 82                Text("Connector", style=styles['connector']) if Text is not None else
 83                colored("Connector", style=styles['connector'])
 84            ) + (Text('\n   +-- ') if Text is not None else '\n   +-- ') +
 85            (
 86                Text("Metric", style=styles['metric']) if Text is not None else
 87                colored("Metric", style=styles['metric'])
 88            ) + (Text('\n       +-- ') if Text is not None else '\n       +-- ') +
 89            (
 90                Text("Location", style=styles['location']) if Text is not None else
 91                colored("Location", style=styles['location'])
 92            ) + (Text('\n\n') if Text is not None else '\n\n')
 93        )
 94
 95        output += str(key_str)
 96        cols.append(key_str)
 97
 98        def replace_tree_text(tree_str : str) -> Text:
 99            """Replace the colored words with stylized Text instead.
100            Is not executed if ANSI and UNICODE are disabled."""
101            tree_text = Text(tree_str) if Text is not None else None
102            for k, v in replace_dict.items():
103                for _colored, _text in v.items():
104                    parts = []
105                    lines = tree_text.split(_colored)
106                    for part in lines:
107                        parts += [part, _text]
108                    if lines[-1] != Text(''):
109                        parts = parts[:-1]
110                    _tree_text = Text('')
111                    for part in parts:
112                        _tree_text += part
113                    tree_text = _tree_text
114            return tree_text
115
116        tree_output = ""
117        for k, v in ascii_dict.items():
118            branch = {k : v}
119            tree_output += tree(branch) + '\n\n'
120            if not UNICODE and not ANSI:
121                _col = (Text(tree(branch)) if Text is not None else tree(branch))
122            else:
123                _col = replace_tree_text(tree(branch))
124            cols.append(_col)
125        if len(output) > 0:
126            tree_output = tree_output[:-2]
127        output += tree_output
128
129        if rich is None:
130            return print(output)
131
132        rich_columns = attempt_import('rich.columns')
133        Columns = rich_columns.Columns
134        columns = Columns(cols)
135        get_console().print(columns)
136
137    if not UNICODE:
138        return ascii_print_pipes()
139
140    rich_panel, rich_tree, rich_text, rich_columns, rich_table = attempt_import(
141        'rich.panel',
142        'rich.tree',
143        'rich.text',
144        'rich.columns',
145        'rich.table',
146    )
147    from rich import box
148    Panel = rich_panel.Panel
149    Tree = rich_tree.Tree
150    Text = rich_text.Text
151    Columns = rich_columns.Columns
152    Table = rich_table.Table
153
154    key_panel = Panel(
155        (
156            Text("\n") +
157            Text(icons['connector'] + "Connector", style=styles['connector']) + Text("\n\n") +
158            Text(icons['metric'] + "Metric", style=styles['metric']) + Text("\n\n") +
159            Text(icons['location'] + "Location", style=styles['location']) + Text("\n")
160        ),
161        title = Text(icons['key'] + "Keys", style=styles['guide']),
162        border_style = styles['guide'],
163        expand = True
164    )
165
166    cols = []
167    conn_trees = {}
168    metric_trees = {}
169    pipes = sorted_dict(pipes)
170    for conn_keys, metrics in pipes.items():
171        conn_trees[conn_keys] = Tree(
172            Text(
173                icons['connector'] + conn_keys,
174                style = styles['connector'],
175            ),
176            guide_style = styles['connector']
177        )
178        metric_trees[conn_keys] = {}
179        for metric, locations in metrics.items():
180            metric_trees[conn_keys][metric] = Tree(
181                Text(
182                    icons['metric'] + metric,
183                    style = styles['metric']
184                ),
185                guide_style = styles['metric']
186            )
187            conn_trees[conn_keys].add(metric_trees[conn_keys][metric])
188            for location, pipe in locations.items():
189                _location = (
190                    Text(str(location), style=styles['none']) if location is None
191                    else Text(location, style=styles['location'])
192                )
193                _location = (
194                    Text(icons['location'])
195                    + _location + Text('\n')
196                    + pipe_repr(pipe, as_rich_text=True) + Text('\n')
197                )
198                metric_trees[conn_keys][metric].add(_location)
199
200    cols += [key_panel]
201    for k, t in conn_trees.items():
202        cols.append(t)
203
204    columns = Columns(cols)
205    get_console().print(columns)

Print a stylized tree of a Pipes dictionary. Supports ANSI and UNICODE global settings.

def translate_rich_to_termcolor(*colors) -> tuple:
109def translate_rich_to_termcolor(*colors) -> tuple:
110    """Translate between rich and more_termcolor terminology."""
111    _colors = []
112    for c in colors:
113        _c_list = []
114        ### handle 'bright'
115        c = c.replace('bright_', 'bright ')
116
117        ### handle 'on'
118        if ' on ' in c:
119            _on = c.split(' on ')
120            _colors.append(_on[0])
121            for _c in _on[1:]:
122                _c_list.append('on ' + _c)
123        else:
124            _c_list += [c]
125
126        _colors += _c_list
127
128    return tuple(_colors)

Translate between rich and more_termcolor terminology.