meerschaum.utils.daemon

Manage Daemons via the Daemon class.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Manage Daemons via the `Daemon` class.
  7"""
  8
  9from __future__ import annotations
 10
 11import os
 12
 13from meerschaum.utils.typing import SuccessTuple, List, Optional, Callable, Any, Union
 14from meerschaum.utils.daemon.StdinFile import StdinFile
 15from meerschaum.utils.daemon.Daemon import Daemon
 16from meerschaum.utils.daemon.RotatingFile import RotatingFile
 17from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor
 18from meerschaum.utils.daemon._names import get_new_daemon_name
 19
 20
 21__all__ = (
 22    'daemon_action',
 23    'daemon_entry',
 24    'get_daemons',
 25    'get_daemon_ids',
 26    'get_running_daemons',
 27    'get_stopped_daemons',
 28    'get_paused_daemons',
 29    'get_filtered_daemons',
 30    'get_new_daemon_name',
 31    'run_daemon',
 32    'running_in_daemon',
 33    'Daemon',
 34    'StdinFile',
 35    'RotatingFile',
 36    'FileDescriptorInterceptor',
 37)
 38
 39_daemons = {}
 40
 41
 42def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple:
 43    """Parse sysargs and execute a Meerschaum action as a daemon.
 44
 45    Parameters
 46    ----------
 47    sysargs: Optional[List[str]], default None
 48        The command line arguments used in a Meerschaum action.
 49
 50    Returns
 51    -------
 52    A SuccessTuple.
 53    """
 54    from meerschaum._internal.entry import entry
 55    _args = {}
 56    if '--name' in sysargs or '--job-name' in sysargs:
 57        from meerschaum._internal.arguments._parse_arguments import parse_arguments
 58        _args = parse_arguments(sysargs)
 59    filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')]
 60    try:
 61        label = shlex.join(filtered_sysargs) if sysargs else None
 62    except Exception:
 63        label = ' '.join(filtered_sysargs) if sysargs else None
 64
 65    name = _args.get('name', None)
 66    daemon = None
 67    if name:
 68        try:
 69            daemon = Daemon(daemon_id=name)
 70        except Exception:
 71            daemon = None
 72
 73    if daemon is not None:
 74        existing_sysargs = daemon.properties['target']['args'][0]
 75        existing_kwargs = parse_arguments(existing_sysargs)
 76
 77        ### Remove sysargs because flags are aliased.
 78        _ = _args.pop('daemon', None)
 79        _ = _args.pop('sysargs', None)
 80        _ = _args.pop('filtered_sysargs', None)
 81        debug = _args.pop('debug', None)
 82        _args['sub_args'] = sorted(_args.get('sub_args', []))
 83        _ = existing_kwargs.pop('daemon', None)
 84        _ = existing_kwargs.pop('sysargs', None)
 85        _ = existing_kwargs.pop('filtered_sysargs', None)
 86        _ = existing_kwargs.pop('debug', None)
 87        existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', []))
 88
 89        ### Only run if the kwargs equal or no actions are provided.
 90        if existing_kwargs == _args or not _args.get('action', []):
 91            if daemon.status == 'running':
 92                return True, f"Daemon '{daemon}' is already running."
 93            return daemon.run(
 94                debug=debug,
 95                allow_dirty_run=True,
 96            )
 97
 98    success_tuple = run_daemon(
 99        entry,
100        filtered_sysargs,
101        daemon_id=_args.get('name', None) if _args else None,
102        label=label,
103        keep_daemon_output=('--rm' not in (sysargs or [])),
104    )
105    return success_tuple
106
107
108def daemon_action(**kw) -> SuccessTuple:
109    """Execute a Meerschaum action as a daemon."""
110    from meerschaum.utils.packages import run_python_package
111    from meerschaum.utils.threading import Thread
112    from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs
113    from meerschaum.actions import get_action
114
115    kw['daemon'] = True
116    kw['shell'] = False
117
118    action = kw.get('action', None)
119    if action and get_action(action) is None:
120        if not kw.get('allow_shell_job') and not kw.get('force'):
121            return False, (
122                f"Action '{action}' isn't recognized.\n\n"
123                + "  Include `--allow-shell-job`, `--force`, or `-f`\n  " 
124                + "to enable shell commands to run as Meerschaum jobs."
125            )
126
127    sysargs = parse_dict_to_sysargs(kw)
128    rc = run_python_package('meerschaum', sysargs, venv=None, debug=False)
129    msg = "Success" if rc == 0 else f"Daemon returned code: {rc}"
130    return rc == 0, msg
131
132
133def run_daemon(
134    func: Callable[[Any], Any],
135    *args,
136    daemon_id: Optional[str] = None,
137    keep_daemon_output: bool = True,
138    allow_dirty_run: bool = False,
139    label: Optional[str] = None,
140    **kw
141) -> Any:
142    """Execute a function as a daemon."""
143    daemon = Daemon(
144        func,
145        daemon_id=daemon_id,
146        target_args=[arg for arg in args],
147        target_kw=kw,
148        label=label,
149    )
150    return daemon.run(
151        keep_daemon_output=keep_daemon_output,
152        allow_dirty_run=allow_dirty_run,
153    )
154
155
156def get_daemons() -> List[Daemon]:
157    """
158    Return all existing Daemons, sorted by end time.
159    """
160    daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()]
161    daemons_status = {daemon: daemon.status for daemon in daemons}
162    running_daemons = {
163        daemon: daemons_status[daemon]
164        for daemon in daemons
165        if daemons_status[daemon] == 'running'
166    }
167    paused_daemons = {
168        daemon: daemons_status[daemon]
169        for daemon in daemons
170        if daemons_status[daemon] == 'paused'
171    }
172    stopped_daemons = {
173        daemon: daemons_status[daemon]
174        for daemon in daemons
175        if daemons_status[daemon] == 'stopped'
176    }
177    daemons_began = {
178        daemon: daemon.properties.get('process', {}).get('began', '9999')
179        for daemon in daemons
180    }
181    daemons_paused = {
182        daemon: daemon.properties.get('process', {}).get('paused', '9999')
183        for daemon in daemons
184    }
185    daemons_ended = {
186        daemon: daemon.properties.get('process', {}).get('ended', '9999')
187        for daemon in daemons
188    }
189    sorted_stopped_daemons = [
190        daemon
191        for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x])
192    ]
193    sorted_paused_daemons = [
194        daemon
195        for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x])
196    ]
197    sorted_running_daemons = [
198        daemon
199        for daemon in sorted(running_daemons, key=lambda x: daemons_began[x])
200    ]
201    return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons
202
203
204def get_daemon_ids() -> List[str]:
205    """
206    Return the IDs of all daemons on disk.
207    """
208    import meerschaum.config.paths as paths
209    return [
210        daemon_dir
211        for daemon_dir in sorted(os.listdir(paths.DAEMON_RESOURCES_PATH))
212        if (paths.DAEMON_RESOURCES_PATH / daemon_dir / 'properties.json').exists()
213    ]
214
215
216def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
217    """
218    Return a list of currently running daemons.
219    """
220    if daemons is None:
221        daemons = get_daemons()
222    return [
223        d
224        for d in daemons
225        if d.status == 'running'
226    ]
227
228
229def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
230    """
231    Return a list of active but paused daemons.
232    """
233    if daemons is None:
234        daemons = get_daemons()
235    return [
236        d
237        for d in daemons
238        if d.status == 'paused'
239    ]
240
241
242def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
243    """
244    Return a list of stopped daemons.
245    """
246    if daemons is None:
247        daemons = get_daemons()
248
249    return [
250        d
251        for d in daemons
252        if d.status == 'stopped'
253    ]
254
255
256def get_filtered_daemons(
257    filter_list: Optional[List[str]] = None,
258    warn: bool = False,
259) -> List[Daemon]:
260    """
261    Return a list of `Daemons` filtered by a list of `daemon_ids`.
262    Only `Daemons` that exist are returned.
263    
264    If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`).
265
266    Parameters
267    ----------
268    filter_list: Optional[List[str]], default None
269        List of `daemon_ids` to include. If `daemon_ids` is `None` or empty,
270        return all `Daemons`.
271
272    warn: bool, default False
273        If `True`, raise warnings for non-existent `daemon_ids`.
274
275    Returns
276    -------
277    A list of Daemon objects.
278
279    """
280    if not filter_list:
281        daemons = get_daemons()
282        return [d for d in daemons if not d.hidden]
283
284    from meerschaum.utils.warnings import warn as _warn
285    daemons = []
286    for d_id in filter_list:
287        try:
288            d = Daemon(daemon_id=d_id)
289            _exists = d.path.exists()
290        except Exception:
291            _exists = False
292        if not _exists:
293            if warn:
294                _warn(f"Daemon '{d_id}' does not exist.", stack=False)
295            continue
296        daemons.append(d)
297    return daemons
298
299
300def running_in_daemon() -> bool:
301    """
302    Return whether the current thread is running in a Daemon context.
303    """
304    from meerschaum._internal.static import STATIC_CONFIG
305    daemon_env_var = STATIC_CONFIG['environment']['daemon_id']
306    return daemon_env_var in os.environ
307
308
309def get_current_daemon() -> Union[Daemon, None]:
310    """
311    If running withing a daemon context, return the corresponding `Daemon`.
312    Otherwise return `None`.
313    """
314    from meerschaum._internal.static import STATIC_CONFIG
315    daemon_env_var = STATIC_CONFIG['environment']['daemon_id']
316    daemon_id = os.environ.get(daemon_env_var, None)
317    if daemon_id is None:
318        return None
319
320    return _daemons.get(daemon_id, Daemon(daemon_id=daemon_id))
def daemon_action(**kw) -> Tuple[bool, str]:
109def daemon_action(**kw) -> SuccessTuple:
110    """Execute a Meerschaum action as a daemon."""
111    from meerschaum.utils.packages import run_python_package
112    from meerschaum.utils.threading import Thread
113    from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs
114    from meerschaum.actions import get_action
115
116    kw['daemon'] = True
117    kw['shell'] = False
118
119    action = kw.get('action', None)
120    if action and get_action(action) is None:
121        if not kw.get('allow_shell_job') and not kw.get('force'):
122            return False, (
123                f"Action '{action}' isn't recognized.\n\n"
124                + "  Include `--allow-shell-job`, `--force`, or `-f`\n  " 
125                + "to enable shell commands to run as Meerschaum jobs."
126            )
127
128    sysargs = parse_dict_to_sysargs(kw)
129    rc = run_python_package('meerschaum', sysargs, venv=None, debug=False)
130    msg = "Success" if rc == 0 else f"Daemon returned code: {rc}"
131    return rc == 0, msg

Execute a Meerschaum action as a daemon.

def daemon_entry(sysargs: Optional[List[str]] = None) -> Tuple[bool, str]:
 43def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple:
 44    """Parse sysargs and execute a Meerschaum action as a daemon.
 45
 46    Parameters
 47    ----------
 48    sysargs: Optional[List[str]], default None
 49        The command line arguments used in a Meerschaum action.
 50
 51    Returns
 52    -------
 53    A SuccessTuple.
 54    """
 55    from meerschaum._internal.entry import entry
 56    _args = {}
 57    if '--name' in sysargs or '--job-name' in sysargs:
 58        from meerschaum._internal.arguments._parse_arguments import parse_arguments
 59        _args = parse_arguments(sysargs)
 60    filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')]
 61    try:
 62        label = shlex.join(filtered_sysargs) if sysargs else None
 63    except Exception:
 64        label = ' '.join(filtered_sysargs) if sysargs else None
 65
 66    name = _args.get('name', None)
 67    daemon = None
 68    if name:
 69        try:
 70            daemon = Daemon(daemon_id=name)
 71        except Exception:
 72            daemon = None
 73
 74    if daemon is not None:
 75        existing_sysargs = daemon.properties['target']['args'][0]
 76        existing_kwargs = parse_arguments(existing_sysargs)
 77
 78        ### Remove sysargs because flags are aliased.
 79        _ = _args.pop('daemon', None)
 80        _ = _args.pop('sysargs', None)
 81        _ = _args.pop('filtered_sysargs', None)
 82        debug = _args.pop('debug', None)
 83        _args['sub_args'] = sorted(_args.get('sub_args', []))
 84        _ = existing_kwargs.pop('daemon', None)
 85        _ = existing_kwargs.pop('sysargs', None)
 86        _ = existing_kwargs.pop('filtered_sysargs', None)
 87        _ = existing_kwargs.pop('debug', None)
 88        existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', []))
 89
 90        ### Only run if the kwargs equal or no actions are provided.
 91        if existing_kwargs == _args or not _args.get('action', []):
 92            if daemon.status == 'running':
 93                return True, f"Daemon '{daemon}' is already running."
 94            return daemon.run(
 95                debug=debug,
 96                allow_dirty_run=True,
 97            )
 98
 99    success_tuple = run_daemon(
100        entry,
101        filtered_sysargs,
102        daemon_id=_args.get('name', None) if _args else None,
103        label=label,
104        keep_daemon_output=('--rm' not in (sysargs or [])),
105    )
106    return success_tuple

Parse sysargs and execute a Meerschaum action as a daemon.

Parameters
  • sysargs (Optional[List[str]], default None): The command line arguments used in a Meerschaum action.
Returns
  • A SuccessTuple.
def get_daemons() -> List[Daemon]:
157def get_daemons() -> List[Daemon]:
158    """
159    Return all existing Daemons, sorted by end time.
160    """
161    daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()]
162    daemons_status = {daemon: daemon.status for daemon in daemons}
163    running_daemons = {
164        daemon: daemons_status[daemon]
165        for daemon in daemons
166        if daemons_status[daemon] == 'running'
167    }
168    paused_daemons = {
169        daemon: daemons_status[daemon]
170        for daemon in daemons
171        if daemons_status[daemon] == 'paused'
172    }
173    stopped_daemons = {
174        daemon: daemons_status[daemon]
175        for daemon in daemons
176        if daemons_status[daemon] == 'stopped'
177    }
178    daemons_began = {
179        daemon: daemon.properties.get('process', {}).get('began', '9999')
180        for daemon in daemons
181    }
182    daemons_paused = {
183        daemon: daemon.properties.get('process', {}).get('paused', '9999')
184        for daemon in daemons
185    }
186    daemons_ended = {
187        daemon: daemon.properties.get('process', {}).get('ended', '9999')
188        for daemon in daemons
189    }
190    sorted_stopped_daemons = [
191        daemon
192        for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x])
193    ]
194    sorted_paused_daemons = [
195        daemon
196        for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x])
197    ]
198    sorted_running_daemons = [
199        daemon
200        for daemon in sorted(running_daemons, key=lambda x: daemons_began[x])
201    ]
202    return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons

Return all existing Daemons, sorted by end time.

def get_daemon_ids() -> List[str]:
205def get_daemon_ids() -> List[str]:
206    """
207    Return the IDs of all daemons on disk.
208    """
209    import meerschaum.config.paths as paths
210    return [
211        daemon_dir
212        for daemon_dir in sorted(os.listdir(paths.DAEMON_RESOURCES_PATH))
213        if (paths.DAEMON_RESOURCES_PATH / daemon_dir / 'properties.json').exists()
214    ]

Return the IDs of all daemons on disk.

def get_running_daemons( daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
217def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
218    """
219    Return a list of currently running daemons.
220    """
221    if daemons is None:
222        daemons = get_daemons()
223    return [
224        d
225        for d in daemons
226        if d.status == 'running'
227    ]

Return a list of currently running daemons.

def get_stopped_daemons( daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
243def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
244    """
245    Return a list of stopped daemons.
246    """
247    if daemons is None:
248        daemons = get_daemons()
249
250    return [
251        d
252        for d in daemons
253        if d.status == 'stopped'
254    ]

Return a list of stopped daemons.

def get_paused_daemons( daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
230def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
231    """
232    Return a list of active but paused daemons.
233    """
234    if daemons is None:
235        daemons = get_daemons()
236    return [
237        d
238        for d in daemons
239        if d.status == 'paused'
240    ]

Return a list of active but paused daemons.

def get_filtered_daemons( filter_list: Optional[List[str]] = None, warn: bool = False) -> List[Daemon]:
257def get_filtered_daemons(
258    filter_list: Optional[List[str]] = None,
259    warn: bool = False,
260) -> List[Daemon]:
261    """
262    Return a list of `Daemons` filtered by a list of `daemon_ids`.
263    Only `Daemons` that exist are returned.
264    
265    If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`).
266
267    Parameters
268    ----------
269    filter_list: Optional[List[str]], default None
270        List of `daemon_ids` to include. If `daemon_ids` is `None` or empty,
271        return all `Daemons`.
272
273    warn: bool, default False
274        If `True`, raise warnings for non-existent `daemon_ids`.
275
276    Returns
277    -------
278    A list of Daemon objects.
279
280    """
281    if not filter_list:
282        daemons = get_daemons()
283        return [d for d in daemons if not d.hidden]
284
285    from meerschaum.utils.warnings import warn as _warn
286    daemons = []
287    for d_id in filter_list:
288        try:
289            d = Daemon(daemon_id=d_id)
290            _exists = d.path.exists()
291        except Exception:
292            _exists = False
293        if not _exists:
294            if warn:
295                _warn(f"Daemon '{d_id}' does not exist.", stack=False)
296            continue
297        daemons.append(d)
298    return daemons

Return a list of Daemons filtered by a list of daemon_ids. Only Daemons that exist are returned.

If filter_list is None or empty, return all Daemons (from get_daemons()).

Parameters
  • filter_list (Optional[List[str]], default None): List of daemon_ids to include. If daemon_ids is None or empty, return all Daemons.
  • warn (bool, default False): If True, raise warnings for non-existent daemon_ids.
Returns
  • A list of Daemon objects.
def get_new_daemon_name() -> str:
117def get_new_daemon_name() -> str:
118    """
119    Generate a new random name until a unique one is found
120    (up to ~6000 maximum possibilities).
121    """
122    import meerschaum.config.paths as paths
123    existing_names = (
124        os.listdir(paths.DAEMON_RESOURCES_PATH)
125        if paths.DAEMON_RESOURCES_PATH.exists()
126        else []
127    )
128    while True:
129        _name = generate_random_name()
130        if _name in existing_names:
131            continue
132        break
133    return _name

Generate a new random name until a unique one is found (up to ~6000 maximum possibilities).

def run_daemon( func: Callable[[Any], Any], *args, daemon_id: Optional[str] = None, keep_daemon_output: bool = True, allow_dirty_run: bool = False, label: Optional[str] = None, **kw) -> Any:
134def run_daemon(
135    func: Callable[[Any], Any],
136    *args,
137    daemon_id: Optional[str] = None,
138    keep_daemon_output: bool = True,
139    allow_dirty_run: bool = False,
140    label: Optional[str] = None,
141    **kw
142) -> Any:
143    """Execute a function as a daemon."""
144    daemon = Daemon(
145        func,
146        daemon_id=daemon_id,
147        target_args=[arg for arg in args],
148        target_kw=kw,
149        label=label,
150    )
151    return daemon.run(
152        keep_daemon_output=keep_daemon_output,
153        allow_dirty_run=allow_dirty_run,
154    )

Execute a function as a daemon.

def running_in_daemon() -> bool:
301def running_in_daemon() -> bool:
302    """
303    Return whether the current thread is running in a Daemon context.
304    """
305    from meerschaum._internal.static import STATIC_CONFIG
306    daemon_env_var = STATIC_CONFIG['environment']['daemon_id']
307    return daemon_env_var in os.environ

Return whether the current thread is running in a Daemon context.

class Daemon:
  42class Daemon:
  43    """
  44    Daemonize Python functions into background processes.
  45
  46    Examples
  47    --------
  48    >>> import meerschaum as mrsm
  49    >>> from meerschaum.utils.daemons import Daemon
  50    >>> daemon = Daemon(print, ('hi',))
  51    >>> success, msg = daemon.run()
  52    >>> print(daemon.log_text)
  53
  54    2024-07-29 18:03 | hi
  55    2024-07-29 18:03 |
  56    >>> daemon.run(allow_dirty_run=True)
  57    >>> print(daemon.log_text)
  58
  59    2024-07-29 18:03 | hi
  60    2024-07-29 18:03 |
  61    2024-07-29 18:05 | hi
  62    2024-07-29 18:05 |
  63    >>> mrsm.pprint(daemon.properties)
  64    {
  65        'label': 'print',
  66        'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
  67        'result': None,
  68        'process': {'ended': '2024-07-29T18:03:33.752806'}
  69    }
  70
  71    """
  72
  73    def __new__(
  74        cls,
  75        *args,
  76        daemon_id: Optional[str] = None,
  77        **kw
  78    ):
  79        """
  80        If a daemon_id is provided and already exists, read from its pickle file.
  81        """
  82        instance = super(Daemon, cls).__new__(cls)
  83        if daemon_id is not None:
  84            instance.daemon_id = daemon_id
  85            if instance.pickle_path.exists():
  86                instance = instance.read_pickle()
  87        return instance
  88
  89    @classmethod
  90    def from_properties_file(cls, daemon_id: str) -> Daemon:
  91        """
  92        Return a Daemon from a properties dictionary.
  93        """
  94        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
  95        if not properties_path.exists():
  96            raise OSError(f"Properties file '{properties_path}' does not exist.")
  97
  98        try:
  99            with open(properties_path, 'r', encoding='utf-8') as f:
 100                properties = json.load(f)
 101        except Exception:
 102            properties = {}
 103
 104        if not properties:
 105            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
 106
 107        daemon_id = properties_path.parent.name
 108        target_cf = properties.get('target', {})
 109        target_module_name = target_cf.get('module', None)
 110        target_function_name = target_cf.get('name', None)
 111        target_args = target_cf.get('args', None)
 112        target_kw = target_cf.get('kw', None)
 113        label = properties.get('label', None)
 114
 115        if None in [
 116            target_module_name,
 117            target_function_name,
 118            target_args,
 119            target_kw,
 120        ]:
 121            raise ValueError("Missing target function information.")
 122
 123        target_module = importlib.import_module(target_module_name)
 124        target_function = getattr(target_module, target_function_name)
 125
 126        return Daemon(
 127            daemon_id=daemon_id,
 128            target=target_function,
 129            target_args=target_args,
 130            target_kw=target_kw,
 131            properties=properties,
 132            label=label,
 133        )
 134
 135
 136    def __init__(
 137        self,
 138        target: Optional[Callable[[Any], Any]] = None,
 139        target_args: Union[List[Any], Tuple[Any], None] = None,
 140        target_kw: Optional[Dict[str, Any]] = None,
 141        env: Optional[Dict[str, str]] = None,
 142        daemon_id: Optional[str] = None,
 143        label: Optional[str] = None,
 144        properties: Optional[Dict[str, Any]] = None,
 145        pickle: bool = True,
 146    ):
 147        """
 148        Parameters
 149        ----------
 150        target: Optional[Callable[[Any], Any]], default None,
 151            The function to execute in a child process.
 152
 153        target_args: Union[List[Any], Tuple[Any], None], default None
 154            Positional arguments to pass to the target function.
 155
 156        target_kw: Optional[Dict[str, Any]], default None
 157            Keyword arguments to pass to the target function.
 158
 159        env: Optional[Dict[str, str]], default None
 160            If provided, set these environment variables in the daemon process.
 161
 162        daemon_id: Optional[str], default None
 163            Build a `Daemon` from an existing `daemon_id`.
 164            If `daemon_id` is provided, other arguments are ignored and are derived
 165            from the existing pickled `Daemon`.
 166
 167        label: Optional[str], default None
 168            Label string to help identifiy a daemon.
 169            If `None`, use the function name instead.
 170
 171        properties: Optional[Dict[str, Any]], default None
 172            Override reading from the properties JSON by providing an existing dictionary.
 173        """
 174        _pickle = self.__dict__.get('_pickle', False)
 175        if daemon_id is not None:
 176            self.daemon_id = daemon_id
 177            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
 178
 179                if not self.properties_path.exists():
 180                    raise Exception(
 181                        f"Daemon '{self.daemon_id}' does not exist. "
 182                        + "Pass a target to create a new Daemon."
 183                    )
 184
 185                try:
 186                    new_daemon = self.from_properties_file(daemon_id)
 187                except Exception:
 188                    new_daemon = None
 189
 190                if new_daemon is not None:
 191                    new_daemon.write_pickle()
 192                    target = new_daemon.target
 193                    target_args = new_daemon.target_args
 194                    target_kw = new_daemon.target_kw
 195                    label = new_daemon.label
 196                    self._properties = new_daemon.properties
 197                else:
 198                    try:
 199                        self.properties_path.unlink()
 200                    except Exception:
 201                        pass
 202
 203                    raise Exception(
 204                        f"Could not recover daemon '{self.daemon_id}' "
 205                        + "from its properties file."
 206                    )
 207
 208        if 'target' not in self.__dict__:
 209            if target is None:
 210                error("Cannot create a Daemon without a target.")
 211            self.target = target
 212
 213        self.pickle = pickle
 214
 215        ### NOTE: We have to check self.__dict__ in case we un-pickling.
 216        if '_target_args' not in self.__dict__:
 217            self._target_args = target_args
 218        if '_target_kw' not in self.__dict__:
 219            self._target_kw = target_kw
 220
 221        if 'label' not in self.__dict__:
 222            if label is None:
 223                label = (
 224                    self.target.__name__ if '__name__' in self.target.__dir__()
 225                        else str(self.target)
 226                )
 227            self.label = label
 228        elif label is not None:
 229            self.label = label
 230
 231        if 'daemon_id' not in self.__dict__:
 232            self.daemon_id = get_new_daemon_name()
 233        if '_properties' not in self.__dict__:
 234            self._properties = properties
 235        elif properties:
 236            if self._properties is None:
 237                self._properties = {}
 238            self._properties.update(properties)
 239        if self._properties is None:
 240            self._properties = {}
 241
 242        self._properties.update({'label': self.label})
 243        if env:
 244            self._properties.update({'env': env})
 245
 246        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
 247        _ = self.process
 248
 249
 250    def _run_exit(
 251        self,
 252        keep_daemon_output: bool = True,
 253        allow_dirty_run: bool = False,
 254    ) -> Any:
 255        """Run the daemon's target function.
 256        NOTE: This WILL EXIT the parent process!
 257
 258        Parameters
 259        ----------
 260        keep_daemon_output: bool, default True
 261            If `False`, delete the daemon's output directory upon exiting.
 262
 263        allow_dirty_run, bool, default False:
 264            If `True`, run the daemon, even if the `daemon_id` directory exists.
 265            This option is dangerous because if the same `daemon_id` runs twice,
 266            the last to finish will overwrite the output of the first.
 267
 268        Returns
 269        -------
 270        Nothing — this will exit the parent process.
 271        """
 272        import platform
 273        import sys
 274        import os
 275        import traceback
 276        from meerschaum.utils.warnings import warn
 277        from meerschaum.config import get_config
 278        daemon = attempt_import('daemon')
 279        lines = get_config('jobs', 'terminal', 'lines')
 280        columns = get_config('jobs', 'terminal', 'columns')
 281
 282        if platform.system() == 'Windows':
 283            return False, "Windows is no longer supported."
 284
 285        self._setup(allow_dirty_run)
 286
 287        _daemons.append(self)
 288
 289        logs_cf = self.properties.get('logs', {})
 290        log_refresh_seconds = logs_cf.get('refresh_files_seconds', None)
 291        if log_refresh_seconds is None:
 292            log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds')
 293        write_timestamps = logs_cf.get('write_timestamps', None)
 294        if write_timestamps is None:
 295            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
 296
 297        self._log_refresh_timer = RepeatTimer(
 298            log_refresh_seconds,
 299            partial(self.rotating_log.refresh_files, start_interception=write_timestamps),
 300        )
 301
 302        capture_stdin = logs_cf.get('stdin', True)
 303        cwd = self.properties.get('cwd', os.getcwd())
 304
 305        ### NOTE: The SIGINT handler has been removed so that child processes may handle
 306        ###       KeyboardInterrupts themselves.
 307        ###       The previous aggressive approach was redundant because of the SIGTERM handler.
 308        self._daemon_context = daemon.DaemonContext(
 309            pidfile=self.pid_lock,
 310            stdout=self.rotating_log,
 311            stderr=self.rotating_log,
 312            stdin=(self.stdin_file if capture_stdin else None),
 313            working_directory=cwd,
 314            detach_process=True,
 315            files_preserve=list(self.rotating_log.subfile_objects.values()),
 316            signal_map={
 317                signal.SIGTERM: self._handle_sigterm,
 318            },
 319        )
 320
 321        if capture_stdin and sys.stdin is None:
 322            raise OSError("Cannot daemonize without stdin.")
 323
 324        try:
 325            os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns))
 326            with self._daemon_context:
 327                if capture_stdin:
 328                    sys.stdin = self.stdin_file
 329                _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None)
 330                os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id
 331                os.environ['PYTHONUNBUFFERED'] = '1'
 332
 333                ### Allow the user to override environment variables.
 334                env = self.properties.get('env', {})
 335                if env and isinstance(env, dict):
 336                    os.environ.update({str(k): str(v) for k, v in env.items()})
 337
 338                self.rotating_log.refresh_files(start_interception=True)
 339                result = None
 340                try:
 341                    with open(self.pid_path, 'w+', encoding='utf-8') as f:
 342                        f.write(str(os.getpid()))
 343
 344                    ### NOTE: The timer fails to start for remote actions to localhost.
 345                    try:
 346                        if not self._log_refresh_timer.is_running():
 347                            self._log_refresh_timer.start()
 348                    except Exception:
 349                        pass
 350
 351                    self.properties['result'] = None
 352                    self._capture_process_timestamp('began')
 353                    result = self.target(*self.target_args, **self.target_kw)
 354                    self.properties['result'] = result
 355                except (BrokenPipeError, KeyboardInterrupt, SystemExit):
 356                    result = False, traceback.format_exc()
 357                except Exception as e:
 358                    warn(
 359                        f"Exception in daemon target function: {traceback.format_exc()}",
 360                    )
 361                    result = False, str(e)
 362                finally:
 363                    _results[self.daemon_id] = result
 364                    self.properties['result'] = result
 365
 366                    if keep_daemon_output:
 367                        self._capture_process_timestamp('ended')
 368                    else:
 369                        self.cleanup()
 370
 371                    self._log_refresh_timer.cancel()
 372                    try:
 373                        if self.pid is None and self.pid_path.exists():
 374                            self.pid_path.unlink()
 375                    except Exception:
 376                        pass
 377
 378                    if is_success_tuple(result):
 379                        try:
 380                            mrsm.pprint(result)
 381                        except BrokenPipeError:
 382                            pass
 383
 384        except Exception:
 385            daemon_error = traceback.format_exc()
 386            import meerschaum.config.paths as paths
 387            with open(paths.DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
 388                f.write(
 389                    f"Error in Daemon '{self}':\n\n"
 390                    f"{sys.stdin=}\n"
 391                    f"{self.stdin_file_path=}\n"
 392                    f"{self.stdin_file_path.exists()=}\n\n"
 393                    f"{daemon_error}\n\n"
 394                )
 395            warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}")
 396
 397    def _capture_process_timestamp(
 398        self,
 399        process_key: str,
 400        write_properties: bool = True,
 401    ) -> None:
 402        """
 403        Record the current timestamp to the parameters `process:<process_key>`.
 404
 405        Parameters
 406        ----------
 407        process_key: str
 408            Under which key to store the timestamp.
 409
 410        write_properties: bool, default True
 411            If `True` persist the properties to disk immediately after capturing the timestamp.
 412        """
 413        if 'process' not in self.properties:
 414            self.properties['process'] = {}
 415
 416        if process_key not in ('began', 'ended', 'paused', 'stopped'):
 417            raise ValueError(f"Invalid key '{process_key}'.")
 418
 419        self.properties['process'][process_key] = (
 420            datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
 421        )
 422        if write_properties:
 423            self.write_properties()
 424
 425    def run(
 426        self,
 427        keep_daemon_output: bool = True,
 428        allow_dirty_run: bool = False,
 429        wait: bool = False,
 430        timeout: Union[int, float] = 4,
 431        debug: bool = False,
 432    ) -> SuccessTuple:
 433        """Run the daemon as a child process and continue executing the parent.
 434
 435        Parameters
 436        ----------
 437        keep_daemon_output: bool, default True
 438            If `False`, delete the daemon's output directory upon exiting.
 439
 440        allow_dirty_run: bool, default False
 441            If `True`, run the daemon, even if the `daemon_id` directory exists.
 442            This option is dangerous because if the same `daemon_id` runs concurrently,
 443            the last to finish will overwrite the output of the first.
 444
 445        wait: bool, default True
 446            If `True`, block until `Daemon.status` is running (or the timeout expires).
 447
 448        timeout: Union[int, float], default 4
 449            If `wait` is `True`, block for up to `timeout` seconds before returning a failure.
 450
 451        Returns
 452        -------
 453        A SuccessTuple indicating success.
 454
 455        """
 456        import platform
 457        if platform.system() == 'Windows':
 458            return False, "Cannot run background jobs on Windows."
 459
 460        ### The daemon might exist and be paused.
 461        if self.status == 'paused':
 462            return self.resume()
 463
 464        self._remove_stop_file()
 465        if self.status == 'running':
 466            return True, f"Daemon '{self}' is already running."
 467
 468        self.mkdir_if_not_exists(allow_dirty_run)
 469        _write_pickle_success_tuple = self.write_pickle()
 470        if not _write_pickle_success_tuple[0]:
 471            return _write_pickle_success_tuple
 472
 473        _launch_daemon_code = (
 474            "from meerschaum.utils.daemon import Daemon, _daemons; "
 475            f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
 476            f"_daemons['{self.daemon_id}'] = daemon; "
 477            f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
 478            "allow_dirty_run=True)"
 479        )
 480        env = dict(os.environ)
 481        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
 482        msg = (
 483            "Success"
 484            if _launch_success_bool
 485            else f"Failed to start daemon '{self.daemon_id}'."
 486        )
 487        if not wait or not _launch_success_bool:
 488            return _launch_success_bool, msg
 489
 490        timeout = self.get_timeout_seconds(timeout)
 491        check_timeout_interval = self.get_check_timeout_interval_seconds()
 492
 493        if not timeout:
 494            success = self.status == 'running'
 495            msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'."
 496            if success:
 497                self._capture_process_timestamp('began')
 498            return success, msg
 499
 500        begin = time.perf_counter()
 501        while (time.perf_counter() - begin) < timeout:
 502            if self.status == 'running':
 503                self._capture_process_timestamp('began')
 504                return True, "Success"
 505            time.sleep(check_timeout_interval)
 506
 507        return False, (
 508            f"Failed to start daemon '{self.daemon_id}' within {timeout} second"
 509            + ('s' if timeout != 1 else '') + '.'
 510        )
 511
 512
 513    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
 514        """
 515        Forcibly terminate a running daemon.
 516        Sends a SIGTERM signal to the process.
 517
 518        Parameters
 519        ----------
 520        timeout: Optional[int], default 3
 521            How many seconds to wait for the process to terminate.
 522
 523        Returns
 524        -------
 525        A SuccessTuple indicating success.
 526        """
 527        if self.status != 'paused':
 528            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
 529            if success:
 530                self._write_stop_file('kill')
 531                self.stdin_file.close()
 532                self._remove_blocking_stdin_file()
 533                return success, msg
 534
 535        if self.status == 'stopped':
 536            self._write_stop_file('kill')
 537            self.stdin_file.close()
 538            self._remove_blocking_stdin_file()
 539            return True, "Process has already stopped."
 540
 541        psutil = attempt_import('psutil')
 542        process = self.process
 543        try:
 544            process.terminate()
 545            process.kill()
 546            process.wait(timeout=timeout)
 547        except Exception as e:
 548            return False, f"Failed to kill job {self} ({process}) with exception: {e}"
 549
 550        try:
 551            if process.status():
 552                return False, "Failed to stop daemon '{self}' ({process})."
 553        except psutil.NoSuchProcess:
 554            pass
 555
 556        if self.pid_path.exists():
 557            try:
 558                self.pid_path.unlink()
 559            except Exception:
 560                pass
 561
 562        self._write_stop_file('kill')
 563        self.stdin_file.close()
 564        self._remove_blocking_stdin_file()
 565        return True, "Success"
 566
 567    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
 568        """Gracefully quit a running daemon."""
 569        if self.status == 'paused':
 570            return self.kill(timeout)
 571
 572        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
 573        if signal_success:
 574            self._write_stop_file('quit')
 575            self.stdin_file.close()
 576            self._remove_blocking_stdin_file()
 577        return signal_success, signal_msg
 578
 579    def pause(
 580        self,
 581        timeout: Union[int, float, None] = None,
 582        check_timeout_interval: Union[float, int, None] = None,
 583    ) -> SuccessTuple:
 584        """
 585        Pause the daemon if it is running.
 586
 587        Parameters
 588        ----------
 589        timeout: Union[float, int, None], default None
 590            The maximum number of seconds to wait for a process to suspend.
 591
 592        check_timeout_interval: Union[float, int, None], default None
 593            The number of seconds to wait between checking if the process is still running.
 594
 595        Returns
 596        -------
 597        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
 598        """
 599        self._remove_blocking_stdin_file()
 600
 601        if self.process is None:
 602            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
 603
 604        if self.status == 'paused':
 605            return True, f"Daemon '{self.daemon_id}' is already paused."
 606
 607        self._write_stop_file('pause')
 608        self.stdin_file.close()
 609        self._remove_blocking_stdin_file()
 610        try:
 611            self.process.suspend()
 612        except Exception as e:
 613            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
 614
 615        timeout = self.get_timeout_seconds(timeout)
 616        check_timeout_interval = self.get_check_timeout_interval_seconds(
 617            check_timeout_interval
 618        )
 619
 620        psutil = attempt_import('psutil')
 621
 622        if not timeout:
 623            try:
 624                success = self.process.status() == 'stopped'
 625            except psutil.NoSuchProcess:
 626                success = True
 627            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
 628            if success:
 629                self._capture_process_timestamp('paused')
 630            return success, msg
 631
 632        begin = time.perf_counter()
 633        while (time.perf_counter() - begin) < timeout:
 634            try:
 635                if self.process.status() == 'stopped':
 636                    self._capture_process_timestamp('paused')
 637                    return True, "Success"
 638            except psutil.NoSuchProcess as e:
 639                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
 640            time.sleep(check_timeout_interval)
 641
 642        return False, (
 643            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
 644            + ('s' if timeout != 1 else '') + '.'
 645        )
 646
 647    def resume(
 648        self,
 649        timeout: Union[int, float, None] = None,
 650        check_timeout_interval: Union[float, int, None] = None,
 651    ) -> SuccessTuple:
 652        """
 653        Resume the daemon if it is paused.
 654
 655        Parameters
 656        ----------
 657        timeout: Union[float, int, None], default None
 658            The maximum number of seconds to wait for a process to resume.
 659
 660        check_timeout_interval: Union[float, int, None], default None
 661            The number of seconds to wait between checking if the process is still stopped.
 662
 663        Returns
 664        -------
 665        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
 666        """
 667        if self.status == 'running':
 668            return True, f"Daemon '{self.daemon_id}' is already running."
 669
 670        if self.status == 'stopped':
 671            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
 672
 673        self._remove_stop_file()
 674        try:
 675            if self.process is None:
 676                return False, f"Cannot resume daemon '{self.daemon_id}'."
 677
 678            self.process.resume()
 679        except Exception as e:
 680            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
 681
 682        timeout = self.get_timeout_seconds(timeout)
 683        check_timeout_interval = self.get_check_timeout_interval_seconds(
 684            check_timeout_interval
 685        )
 686
 687        if not timeout:
 688            success = self.status == 'running'
 689            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
 690            if success:
 691                self._capture_process_timestamp('began')
 692            return success, msg
 693
 694        begin = time.perf_counter()
 695        while (time.perf_counter() - begin) < timeout:
 696            if self.status == 'running':
 697                self._capture_process_timestamp('began')
 698                return True, "Success"
 699            time.sleep(check_timeout_interval)
 700
 701        return False, (
 702            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
 703            + ('s' if timeout != 1 else '') + '.'
 704        )
 705
 706    def _write_stop_file(self, action: str) -> SuccessTuple:
 707        """Write the stop file timestamp and action."""
 708        if action not in ('quit', 'kill', 'pause'):
 709            return False, f"Unsupported action '{action}'."
 710
 711        if not self.stop_path.parent.exists():
 712            self.stop_path.parent.mkdir(parents=True, exist_ok=True)
 713
 714        with open(self.stop_path, 'w+', encoding='utf-8') as f:
 715            json.dump(
 716                {
 717                    'stop_time': datetime.now(timezone.utc).isoformat(),
 718                    'action': action,
 719                },
 720                f
 721            )
 722
 723        return True, "Success"
 724
 725    def _remove_stop_file(self) -> SuccessTuple:
 726        """Remove the stop file"""
 727        if not self.stop_path.exists():
 728            return True, "Stop file does not exist."
 729
 730        try:
 731            self.stop_path.unlink()
 732        except Exception as e:
 733            return False, f"Failed to remove stop file:\n{e}"
 734
 735        return True, "Success"
 736
 737    def _read_stop_file(self) -> Dict[str, Any]:
 738        """
 739        Read the stop file if it exists.
 740        """
 741        if not self.stop_path.exists():
 742            return {}
 743
 744        try:
 745            with open(self.stop_path, 'r', encoding='utf-8') as f:
 746                data = json.load(f)
 747            return data
 748        except Exception:
 749            return {}
 750
 751    def _remove_blocking_stdin_file(self) -> mrsm.SuccessTuple:
 752        """
 753        Remove the blocking STDIN file if it exists.
 754        """
 755        try:
 756            if self.blocking_stdin_file_path.exists():
 757                self.blocking_stdin_file_path.unlink()
 758        except Exception as e:
 759            return False, str(e)
 760
 761        return True, "Success"
 762
 763    def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None:
 764        """
 765        Handle `SIGTERM` within the `Daemon` context.
 766        This method is injected into the `DaemonContext`.
 767        """
 768        from meerschaum.utils.process import signal_handler
 769        from meerschaum.utils.threading import request_stop, interrupt_threads
 770        signal_handler(signal_number, stack_frame)
 771
 772        ### Tell cooperative loops (e.g. `sync pipes`) to stop, then actively unwind
 773        ### any worker threads so they cannot keep the process alive as a zombie.
 774        request_stop()
 775        interrupt_threads(SystemExit)
 776
 777        timer = self.__dict__.get('_log_refresh_timer', None)
 778        if timer is not None:
 779            timer.cancel()
 780
 781        daemon_context = self.__dict__.get('_daemon_context', None)
 782        if daemon_context is not None:
 783            daemon_context.close()
 784
 785        _close_pools()
 786        raise SystemExit(0)
 787
 788    def _send_signal(
 789        self,
 790        signal_to_send,
 791        timeout: Union[float, int, None] = None,
 792        check_timeout_interval: Union[float, int, None] = None,
 793    ) -> SuccessTuple:
 794        """Send a signal to the daemon process.
 795
 796        Parameters
 797        ----------
 798        signal_to_send:
 799            The signal the send to the daemon, e.g. `signals.SIGINT`.
 800
 801        timeout: Union[float, int, None], default None
 802            The maximum number of seconds to wait for a process to terminate.
 803
 804        check_timeout_interval: Union[float, int, None], default None
 805            The number of seconds to wait between checking if the process is still running.
 806
 807        Returns
 808        -------
 809        A SuccessTuple indicating success.
 810        """
 811        try:
 812            pid = self.pid
 813            if pid is None:
 814                return (
 815                    False,
 816                    f"Daemon '{self.daemon_id}' is not running, "
 817                    + f"cannot send signal '{signal_to_send}'."
 818                )
 819            
 820            os.kill(pid, signal_to_send)
 821        except Exception:
 822            return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}"
 823
 824        timeout = self.get_timeout_seconds(timeout)
 825        check_timeout_interval = self.get_check_timeout_interval_seconds(
 826            check_timeout_interval
 827        )
 828
 829        if not timeout:
 830            return True, f"Successfully sent '{signal_to_send}' to daemon '{self.daemon_id}'."
 831
 832        begin = time.perf_counter()
 833        while (time.perf_counter() - begin) < timeout:
 834            if not self.status == 'running':
 835                return True, "Success"
 836            time.sleep(check_timeout_interval)
 837
 838        return False, (
 839            f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second"
 840            + ('s' if timeout != 1 else '') + '.'
 841        )
 842
 843    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
 844        """Create the Daemon's directory.
 845        If `allow_dirty_run` is `False` and the directory already exists,
 846        raise a `FileExistsError`.
 847        """
 848        try:
 849            self.path.mkdir(parents=True, exist_ok=True)
 850            _already_exists = any(os.scandir(self.path))
 851        except FileExistsError:
 852            _already_exists = True
 853
 854        if _already_exists and not allow_dirty_run:
 855            error(
 856                f"Daemon '{self.daemon_id}' already exists. " +
 857                "To allow this daemon to run, do one of the following:\n"
 858                + "  - Execute `daemon.cleanup()`.\n"
 859                + f"  - Delete the directory '{self.path}'.\n"
 860                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
 861                FileExistsError,
 862            )
 863
 864    @property
 865    def process(self) -> Union['psutil.Process', None]:
 866        """
 867        Return the psutil process for the Daemon.
 868        """
 869        psutil = attempt_import('psutil')
 870        pid = self.pid
 871        if pid is None:
 872            return None
 873        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
 874            try:
 875                self._process = psutil.Process(int(pid))
 876                process_exists = True
 877            except Exception:
 878                process_exists = False
 879            if not process_exists:
 880                _ = self.__dict__.pop('_process', None)
 881                try:
 882                    if self.pid_path.exists():
 883                        self.pid_path.unlink()
 884                except Exception:
 885                    pass
 886                return None
 887        return self._process
 888
 889    @property
 890    def status(self) -> str:
 891        """
 892        Return the running status of this Daemon.
 893        """
 894        if self.process is None:
 895            return 'stopped'
 896
 897        psutil = attempt_import('psutil', lazy=False)
 898        try:
 899            if self.process.status() == 'stopped':
 900                return 'paused'
 901            if self.process.status() == 'zombie':
 902                raise psutil.NoSuchProcess(self.process.pid)
 903        except (psutil.NoSuchProcess, AttributeError):
 904            if self.pid_path.exists():
 905                try:
 906                    self.pid_path.unlink()
 907                except Exception:
 908                    pass
 909            return 'stopped'
 910
 911        return 'running'
 912
 913    @classmethod
 914    def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 915        """
 916        Return a Daemon's path from its `daemon_id`.
 917        """
 918        import meerschaum.config.paths as paths
 919        return paths.DAEMON_RESOURCES_PATH / daemon_id
 920
 921    @property
 922    def path(self) -> pathlib.Path:
 923        """
 924        Return the path for this Daemon's directory.
 925        """
 926        return self._get_path_from_daemon_id(self.daemon_id)
 927
 928    @classmethod
 929    def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 930        """
 931        Return the `properties.json` path for a given `daemon_id`.
 932        """
 933        return cls._get_path_from_daemon_id(daemon_id) / 'properties.json'
 934
 935    @property
 936    def properties_path(self) -> pathlib.Path:
 937        """
 938        Return the `propterties.json` path for this Daemon.
 939        """
 940        return self._get_properties_path_from_daemon_id(self.daemon_id)
 941
 942    @property
 943    def stop_path(self) -> pathlib.Path:
 944        """
 945        Return the path for the stop file (created when manually stopped).
 946        """
 947        return self.path / '.stop.json'
 948
 949    @property
 950    def log_path(self) -> pathlib.Path:
 951        """
 952        Return the log path.
 953        """
 954        logs_cf = self.properties.get('logs', None) or {}
 955        if 'path' not in logs_cf:
 956            import meerschaum.config.paths as paths
 957            return paths.LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
 958
 959        return pathlib.Path(logs_cf['path'])
 960
 961    @property
 962    def stdin_file_path(self) -> pathlib.Path:
 963        """
 964        Return the stdin file path.
 965        """
 966        return self.path / 'input.stdin'
 967
 968    @property
 969    def blocking_stdin_file_path(self) -> pathlib.Path:
 970        """
 971        Return the stdin file path.
 972        """
 973        if '_blocking_stdin_file_path' in self.__dict__:
 974            return self._blocking_stdin_file_path
 975
 976        return self.path / 'input.stdin.block'
 977
 978    @property
 979    def prompt_kwargs_file_path(self) -> pathlib.Path:
 980        """
 981        Return the file path to the kwargs for the invoking `prompt()`.
 982        """
 983        return self.path / 'prompt_kwargs.json'
 984
 985    @property
 986    def log_offset_path(self) -> pathlib.Path:
 987        """
 988        Return the log offset file path.
 989        """
 990        import meerschaum.config.paths as paths
 991        return paths.LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
 992
 993    @property
 994    def log_offset_lock(self) -> 'fasteners.InterProcessLock':
 995        """
 996        Return the process lock context manager.
 997        """
 998        if '_log_offset_lock' in self.__dict__:
 999            return self._log_offset_lock
1000
1001        fasteners = attempt_import('fasteners')
1002        self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path)
1003        return self._log_offset_lock
1004
1005    @property
1006    def rotating_log(self) -> RotatingFile:
1007        """
1008        The rotating log file for the daemon's output.
1009        """
1010        if '_rotating_log' in self.__dict__:
1011            return self._rotating_log
1012
1013        logs_cf = self.properties.get('logs', None) or {}
1014        write_timestamps = logs_cf.get('write_timestamps', None)
1015        if write_timestamps is None:
1016            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1017
1018        timestamp_format = logs_cf.get('timestamp_format', None)
1019        if timestamp_format is None:
1020            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1021
1022        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1023        if num_files_to_keep is None:
1024            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1025
1026        max_file_size = logs_cf.get('max_file_size', None)
1027        if max_file_size is None:
1028            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1029
1030        redirect_streams = logs_cf.get('redirect_streams', True)
1031
1032        self._rotating_log = RotatingFile(
1033            self.log_path,
1034            redirect_streams=redirect_streams,
1035            write_timestamps=write_timestamps,
1036            timestamp_format=timestamp_format,
1037            num_files_to_keep=num_files_to_keep,
1038            max_file_size=max_file_size,
1039        )
1040        return self._rotating_log
1041
1042    @property
1043    def stdin_file(self):
1044        """
1045        Return the file handler for the stdin file.
1046        """
1047        if (_stdin_file := self.__dict__.get('_stdin_file', None)):
1048            return _stdin_file
1049
1050        self._stdin_file = StdinFile(
1051            self.stdin_file_path,
1052            lock_file_path=self.blocking_stdin_file_path,
1053        )
1054        return self._stdin_file
1055
1056    @property
1057    def log_text(self) -> Union[str, None]:
1058        """
1059        Read the log files and return their contents.
1060        Returns `None` if the log file does not exist.
1061        """
1062        logs_cf = self.properties.get('logs', None) or {}
1063        write_timestamps = logs_cf.get('write_timestamps', None)
1064        if write_timestamps is None:
1065            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1066
1067        timestamp_format = logs_cf.get('timestamp_format', None)
1068        if timestamp_format is None:
1069            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1070
1071        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1072        if num_files_to_keep is None:
1073            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1074
1075        max_file_size = logs_cf.get('max_file_size', None)
1076        if max_file_size is None:
1077            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1078
1079        new_rotating_log = RotatingFile(
1080            self.rotating_log.file_path,
1081            num_files_to_keep=num_files_to_keep,
1082            max_file_size=max_file_size,
1083            write_timestamps=write_timestamps,
1084            timestamp_format=timestamp_format,
1085        )
1086        return new_rotating_log.read()
1087
1088    def readlines(self) -> List[str]:
1089        """
1090        Read the next log lines, persisting the cursor for later use.
1091        Note this will alter the cursor of `self.rotating_log`.
1092        """
1093        self.rotating_log._cursor = self._read_log_offset()
1094        lines = self.rotating_log.readlines()
1095        self._write_log_offset()
1096        return lines
1097
1098    def _read_log_offset(self) -> Tuple[int, int]:
1099        """
1100        Return the current log offset cursor.
1101
1102        Returns
1103        -------
1104        A tuple of the form (`subfile_index`, `position`).
1105        """
1106        if not self.log_offset_path.exists():
1107            return 0, 0
1108
1109        try:
1110            with open(self.log_offset_path, 'r', encoding='utf-8') as f:
1111                cursor_text = f.read()
1112            cursor_parts = cursor_text.split(' ')
1113            subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1])
1114            return subfile_index, subfile_position
1115        except Exception as e:
1116            warn(f"Failed to read cursor:\n{e}")
1117        return 0, 0
1118
1119    def _write_log_offset(self) -> None:
1120        """
1121        Write the current log offset file.
1122        """
1123        with self.log_offset_lock:
1124            with open(self.log_offset_path, 'w+', encoding='utf-8') as f:
1125                subfile_index = self.rotating_log._cursor[0]
1126                subfile_position = self.rotating_log._cursor[1]
1127                f.write(f"{subfile_index} {subfile_position}")
1128
1129    @property
1130    def pid(self) -> Union[int, None]:
1131        """
1132        Read the PID file and return its contents.
1133        Returns `None` if the PID file does not exist.
1134        """
1135        if not self.pid_path.exists():
1136            return None
1137        try:
1138            with open(self.pid_path, 'r', encoding='utf-8') as f:
1139                text = f.read()
1140            if len(text) == 0:
1141                return None
1142            pid = int(text.rstrip())
1143        except Exception as e:
1144            warn(e)
1145            text = None
1146            pid = None
1147        return pid
1148
1149    @property
1150    def pid_path(self) -> pathlib.Path:
1151        """
1152        Return the path to a file containing the PID for this Daemon.
1153        """
1154        return self.path / 'process.pid'
1155
1156    @property
1157    def pid_lock(self) -> 'fasteners.InterProcessLock':
1158        """
1159        Return the process lock context manager.
1160        """
1161        if '_pid_lock' in self.__dict__:
1162            return self._pid_lock
1163
1164        fasteners = attempt_import('fasteners')
1165        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
1166        return self._pid_lock
1167
1168    @property
1169    def pickle_path(self) -> pathlib.Path:
1170        """
1171        Return the path for the pickle file.
1172        """
1173        return self.path / 'pickle.pkl'
1174
1175    def read_properties(self) -> Optional[Dict[str, Any]]:
1176        """Read the properties JSON file and return the dictionary."""
1177        if not self.properties_path.exists():
1178            return None
1179        try:
1180            with open(self.properties_path, 'r', encoding='utf-8') as file:
1181                properties = json.load(file)
1182        except Exception:
1183            properties = {}
1184        
1185        return properties or {}
1186
1187    def read_pickle(self) -> Daemon:
1188        """Read a Daemon's pickle file and return the `Daemon`."""
1189        import pickle
1190        import traceback
1191        if not self.pickle_path.exists():
1192            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1193
1194        if self.pickle_path.stat().st_size == 0:
1195            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1196
1197        try:
1198            with open(self.pickle_path, 'rb') as pickle_file:
1199                daemon = pickle.load(pickle_file)
1200            success, msg = True, 'Success'
1201        except Exception as e:
1202            success, msg = False, str(e)
1203            daemon = None
1204            traceback.print_exception(type(e), e, e.__traceback__)
1205        if not success:
1206            error(msg)
1207        return daemon
1208
1209    @property
1210    def properties(self) -> Dict[str, Any]:
1211        """
1212        Return the contents of the properties JSON file.
1213        """
1214        try:
1215            _file_properties = self.read_properties() or {}
1216        except Exception:
1217            traceback.print_exc()
1218            _file_properties = {}
1219
1220        if not self._properties:
1221            self._properties = _file_properties
1222
1223        if self._properties is None:
1224            self._properties = {}
1225
1226        if (
1227            self._properties.get('result', None) is None
1228            and _file_properties.get('result', None) is not None
1229        ):
1230            _ = self._properties.pop('result', None)
1231
1232        if _file_properties is not None:
1233            self._properties = apply_patch_to_config(
1234                _file_properties,
1235                self._properties,
1236            )
1237
1238        return self._properties
1239
1240    @property
1241    def hidden(self) -> bool:
1242        """
1243        Return a bool indicating whether this Daemon should be displayed.
1244        """
1245        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')
1246
1247    def write_properties(self) -> SuccessTuple:
1248        """Write the properties dictionary to the properties JSON file
1249        (only if self.properties exists).
1250        """
1251        from meerschaum.utils.misc import generate_password
1252        success, msg = (
1253            False,
1254            f"No properties to write for daemon '{self.daemon_id}'."
1255        )
1256        backup_path = self.properties_path.parent / (generate_password(8) + '.json')
1257        props = self.properties
1258        if props is not None:
1259            try:
1260                self.path.mkdir(parents=True, exist_ok=True)
1261                if self.properties_path.exists():
1262                    self.properties_path.rename(backup_path)
1263                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1264                    json.dump(props, properties_file)
1265                success, msg = True, 'Success'
1266            except Exception as e:
1267                success, msg = False, str(e)
1268
1269        try:
1270            if backup_path.exists():
1271                if not success:
1272                    backup_path.rename(self.properties_path)
1273                else:
1274                    backup_path.unlink()
1275        except Exception as e:
1276            success, msg = False, str(e)
1277
1278        return success, msg
1279
1280    def write_pickle(self) -> SuccessTuple:
1281        """Write the pickle file for the daemon."""
1282        import pickle
1283        import traceback
1284        from meerschaum.utils.misc import generate_password
1285
1286        if not self.pickle:
1287            return True, "Success"
1288
1289        from meerschaum._internal.entry import _shells
1290        if _shells:
1291            from meerschaum._internal.shell.Shell import revert_input
1292            revert_input()
1293
1294        backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl')
1295        try:
1296            self.path.mkdir(parents=True, exist_ok=True)
1297            if self.pickle_path.exists():
1298                self.pickle_path.rename(backup_path)
1299            with open(self.pickle_path, 'wb+') as pickle_file:
1300                pickle.dump(self, pickle_file)
1301            success, msg = True, "Success"
1302        except Exception as e:
1303            success, msg = False, str(e)
1304            traceback.print_exception(type(e), e, e.__traceback__)
1305        try:
1306            if backup_path.exists():
1307                if not success:
1308                    backup_path.rename(self.pickle_path)
1309                else:
1310                    backup_path.unlink()
1311        except Exception as e:
1312            success, msg = False, str(e)
1313        return success, msg
1314
1315
1316    def _setup(
1317        self,
1318        allow_dirty_run: bool = False,
1319    ) -> None:
1320        """
1321        Update properties before starting the Daemon.
1322        """
1323        if self.properties is None:
1324            self._properties = {}
1325
1326        self._properties.update({
1327            'target': {
1328                'name': self.target.__name__,
1329                'module': self.target.__module__,
1330                'args': self.target_args,
1331                'kw': self.target_kw,
1332            },
1333        })
1334        self.mkdir_if_not_exists(allow_dirty_run)
1335        _write_properties_success_tuple = self.write_properties()
1336        if not _write_properties_success_tuple[0]:
1337            error(_write_properties_success_tuple[1])
1338
1339        _write_pickle_success_tuple = self.write_pickle()
1340        if not _write_pickle_success_tuple[0]:
1341            error(_write_pickle_success_tuple[1])
1342
1343    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1344        """
1345        Remove a daemon's directory after execution.
1346
1347        Parameters
1348        ----------
1349        keep_logs: bool, default False
1350            If `True`, skip deleting the daemon's log files.
1351
1352        Returns
1353        -------
1354        A `SuccessTuple` indicating success.
1355        """
1356        if self.path.exists():
1357            try:
1358                shutil.rmtree(self.path)
1359            except Exception as e:
1360                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1361                warn(msg)
1362                return False, msg
1363        if not keep_logs:
1364            self.rotating_log.delete()
1365            try:
1366                if self.log_offset_path.exists():
1367                    self.log_offset_path.unlink()
1368            except Exception as e:
1369                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1370                warn(msg)
1371                return False, msg
1372        return True, "Success"
1373
1374
1375    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1376        """
1377        Return the timeout value to use. Use `--timeout-seconds` if provided,
1378        else the configured default (8).
1379        """
1380        if isinstance(timeout, (int, float)):
1381            return timeout
1382        return get_config('jobs', 'timeout_seconds')
1383
1384
1385    def get_check_timeout_interval_seconds(
1386        self,
1387        check_timeout_interval: Union[int, float, None] = None,
1388    ) -> Union[int, float]:
1389        """
1390        Return the interval value to check the status of timeouts.
1391        """
1392        if isinstance(check_timeout_interval, (int, float)):
1393            return check_timeout_interval
1394        return get_config('jobs', 'check_timeout_interval_seconds')
1395
1396    @property
1397    def target_args(self) -> Union[Tuple[Any], None]:
1398        """
1399        Return the positional arguments to pass to the target function.
1400        """
1401        target_args = (
1402            self.__dict__.get('_target_args', None)
1403            or self.properties.get('target', {}).get('args', None)
1404        )
1405        if target_args is None:
1406            return tuple([])
1407
1408        return tuple(target_args)
1409
1410    @property
1411    def target_kw(self) -> Union[Dict[str, Any], None]:
1412        """
1413        Return the keyword arguments to pass to the target function.
1414        """
1415        target_kw = (
1416            self.__dict__.get('_target_kw', None)
1417            or self.properties.get('target', {}).get('kw', None)
1418        )
1419        if target_kw is None:
1420            return {}
1421
1422        return {key: val for key, val in target_kw.items()}
1423
1424    @staticmethod
1425    def _get_target_reference(target) -> Union[Dict[str, str], None]:
1426        """
1427        If `target` is an importable, top-level function (e.g. `entry`), return a
1428        ``{'module': ..., 'qualname': ...}`` reference so it can be re-imported in the
1429        daemon process instead of pickled by value.
1430
1431        Pickling such a target by value serializes its entire global graph, which can
1432        reach unpicklable live state (e.g. a connector caching an `_asyncio.Task`) and
1433        raise `TypeError: cannot pickle '_asyncio.Task' object`. dill also falls back to
1434        by-value pickling whenever it cannot match the function by identity — which
1435        happens when two copies of a package are importable (a stale install shadowing
1436        a venv), so `byref=True` alone is not enough. Returns `None` for closures,
1437        lambdas, and anything not importable, which fall back to dill.
1438        """
1439        module = getattr(target, '__module__', None)
1440        qualname = getattr(target, '__qualname__', None)
1441        if not module or not qualname:
1442            return None
1443        if '<locals>' in qualname or '<lambda>' in qualname:
1444            return None
1445        return {'module': module, 'qualname': qualname}
1446
1447    @staticmethod
1448    def _load_target_reference(target_ref: Dict[str, str]):
1449        """
1450        Re-import a target from a `{'module': ..., 'qualname': ...}` reference.
1451        """
1452        import importlib
1453        obj = importlib.import_module(target_ref['module'])
1454        for part in target_ref['qualname'].split('.'):
1455            obj = getattr(obj, part)
1456        return obj
1457
1458    def __getstate__(self):
1459        """
1460        Pickle this Daemon.
1461        """
1462        dill = attempt_import('dill')
1463        state = {
1464            'target_args': self.target_args,
1465            'target_kw': self.target_kw,
1466            'daemon_id': self.daemon_id,
1467            'label': self.label,
1468            'properties': self.properties,
1469        }
1470        target_ref = self._get_target_reference(self.target)
1471        if target_ref is not None:
1472            ### Store by reference (re-imported in the daemon process), not by value.
1473            state['target'] = None
1474            state['target_ref'] = target_ref
1475        else:
1476            state['target'] = dill.dumps(self.target, byref=True)
1477        return state
1478
1479    def __setstate__(self, _state: Dict[str, Any]):
1480        """
1481        Restore this Daemon from a pickled state.
1482        If the properties file exists, skip the old pickled version.
1483        """
1484        dill = attempt_import('dill')
1485        target_ref = _state.pop('target_ref', None)
1486        if target_ref is not None and _state.get('target', None) is None:
1487            _state['target'] = self._load_target_reference(target_ref)
1488        else:
1489            _state['target'] = dill.loads(_state['target'])
1490        self._pickle = True
1491        daemon_id = _state.get('daemon_id', None)
1492        if not daemon_id:
1493            raise ValueError("Need a daemon_id to un-pickle a Daemon.")
1494
1495        properties_path = self._get_properties_path_from_daemon_id(daemon_id)
1496        ignore_properties = properties_path.exists()
1497        if ignore_properties:
1498            _state = {
1499                key: val
1500                for key, val in _state.items()
1501                if key != 'properties'
1502            }
1503        self.__init__(**_state)
1504
1505
1506    def __repr__(self):
1507        return str(self)
1508
1509    def __str__(self):
1510        return self.daemon_id
1511
1512    def __eq__(self, other):
1513        if not isinstance(other, Daemon):
1514            return False
1515        return self.daemon_id == other.daemon_id
1516
1517    def __hash__(self):
1518        return hash(self.daemon_id)

Daemonize Python functions into background processes.

Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 |

>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |

>>> mrsm.pprint(daemon.properties)
{
    'label': 'print',
    'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
    'result': None,
    'process': {'ended': '2024-07-29T18:03:33.752806'}
}
Daemon( target: Optional[Callable[[Any], Any]] = None, target_args: Union[List[Any], Tuple[Any], NoneType] = None, target_kw: Optional[Dict[str, Any]] = None, env: Optional[Dict[str, str]] = None, daemon_id: Optional[str] = None, label: Optional[str] = None, properties: Optional[Dict[str, Any]] = None, pickle: bool = True)
136    def __init__(
137        self,
138        target: Optional[Callable[[Any], Any]] = None,
139        target_args: Union[List[Any], Tuple[Any], None] = None,
140        target_kw: Optional[Dict[str, Any]] = None,
141        env: Optional[Dict[str, str]] = None,
142        daemon_id: Optional[str] = None,
143        label: Optional[str] = None,
144        properties: Optional[Dict[str, Any]] = None,
145        pickle: bool = True,
146    ):
147        """
148        Parameters
149        ----------
150        target: Optional[Callable[[Any], Any]], default None,
151            The function to execute in a child process.
152
153        target_args: Union[List[Any], Tuple[Any], None], default None
154            Positional arguments to pass to the target function.
155
156        target_kw: Optional[Dict[str, Any]], default None
157            Keyword arguments to pass to the target function.
158
159        env: Optional[Dict[str, str]], default None
160            If provided, set these environment variables in the daemon process.
161
162        daemon_id: Optional[str], default None
163            Build a `Daemon` from an existing `daemon_id`.
164            If `daemon_id` is provided, other arguments are ignored and are derived
165            from the existing pickled `Daemon`.
166
167        label: Optional[str], default None
168            Label string to help identifiy a daemon.
169            If `None`, use the function name instead.
170
171        properties: Optional[Dict[str, Any]], default None
172            Override reading from the properties JSON by providing an existing dictionary.
173        """
174        _pickle = self.__dict__.get('_pickle', False)
175        if daemon_id is not None:
176            self.daemon_id = daemon_id
177            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
178
179                if not self.properties_path.exists():
180                    raise Exception(
181                        f"Daemon '{self.daemon_id}' does not exist. "
182                        + "Pass a target to create a new Daemon."
183                    )
184
185                try:
186                    new_daemon = self.from_properties_file(daemon_id)
187                except Exception:
188                    new_daemon = None
189
190                if new_daemon is not None:
191                    new_daemon.write_pickle()
192                    target = new_daemon.target
193                    target_args = new_daemon.target_args
194                    target_kw = new_daemon.target_kw
195                    label = new_daemon.label
196                    self._properties = new_daemon.properties
197                else:
198                    try:
199                        self.properties_path.unlink()
200                    except Exception:
201                        pass
202
203                    raise Exception(
204                        f"Could not recover daemon '{self.daemon_id}' "
205                        + "from its properties file."
206                    )
207
208        if 'target' not in self.__dict__:
209            if target is None:
210                error("Cannot create a Daemon without a target.")
211            self.target = target
212
213        self.pickle = pickle
214
215        ### NOTE: We have to check self.__dict__ in case we un-pickling.
216        if '_target_args' not in self.__dict__:
217            self._target_args = target_args
218        if '_target_kw' not in self.__dict__:
219            self._target_kw = target_kw
220
221        if 'label' not in self.__dict__:
222            if label is None:
223                label = (
224                    self.target.__name__ if '__name__' in self.target.__dir__()
225                        else str(self.target)
226                )
227            self.label = label
228        elif label is not None:
229            self.label = label
230
231        if 'daemon_id' not in self.__dict__:
232            self.daemon_id = get_new_daemon_name()
233        if '_properties' not in self.__dict__:
234            self._properties = properties
235        elif properties:
236            if self._properties is None:
237                self._properties = {}
238            self._properties.update(properties)
239        if self._properties is None:
240            self._properties = {}
241
242        self._properties.update({'label': self.label})
243        if env:
244            self._properties.update({'env': env})
245
246        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
247        _ = self.process
Parameters
  • target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
  • target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
  • target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
  • env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
  • daemon_id (Optional[str], default None): Build a Daemon from an existing daemon_id. If daemon_id is provided, other arguments are ignored and are derived from the existing pickled Daemon.
  • label (Optional[str], default None): Label string to help identifiy a daemon. If None, use the function name instead.
  • properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
@classmethod
def from_properties_file(cls, daemon_id: str) -> Daemon:
 89    @classmethod
 90    def from_properties_file(cls, daemon_id: str) -> Daemon:
 91        """
 92        Return a Daemon from a properties dictionary.
 93        """
 94        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
 95        if not properties_path.exists():
 96            raise OSError(f"Properties file '{properties_path}' does not exist.")
 97
 98        try:
 99            with open(properties_path, 'r', encoding='utf-8') as f:
100                properties = json.load(f)
101        except Exception:
102            properties = {}
103
104        if not properties:
105            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
106
107        daemon_id = properties_path.parent.name
108        target_cf = properties.get('target', {})
109        target_module_name = target_cf.get('module', None)
110        target_function_name = target_cf.get('name', None)
111        target_args = target_cf.get('args', None)
112        target_kw = target_cf.get('kw', None)
113        label = properties.get('label', None)
114
115        if None in [
116            target_module_name,
117            target_function_name,
118            target_args,
119            target_kw,
120        ]:
121            raise ValueError("Missing target function information.")
122
123        target_module = importlib.import_module(target_module_name)
124        target_function = getattr(target_module, target_function_name)
125
126        return Daemon(
127            daemon_id=daemon_id,
128            target=target_function,
129            target_args=target_args,
130            target_kw=target_kw,
131            properties=properties,
132            label=label,
133        )

Return a Daemon from a properties dictionary.

pickle
def run( self, keep_daemon_output: bool = True, allow_dirty_run: bool = False, wait: bool = False, timeout: Union[int, float] = 4, debug: bool = False) -> Tuple[bool, str]:
425    def run(
426        self,
427        keep_daemon_output: bool = True,
428        allow_dirty_run: bool = False,
429        wait: bool = False,
430        timeout: Union[int, float] = 4,
431        debug: bool = False,
432    ) -> SuccessTuple:
433        """Run the daemon as a child process and continue executing the parent.
434
435        Parameters
436        ----------
437        keep_daemon_output: bool, default True
438            If `False`, delete the daemon's output directory upon exiting.
439
440        allow_dirty_run: bool, default False
441            If `True`, run the daemon, even if the `daemon_id` directory exists.
442            This option is dangerous because if the same `daemon_id` runs concurrently,
443            the last to finish will overwrite the output of the first.
444
445        wait: bool, default True
446            If `True`, block until `Daemon.status` is running (or the timeout expires).
447
448        timeout: Union[int, float], default 4
449            If `wait` is `True`, block for up to `timeout` seconds before returning a failure.
450
451        Returns
452        -------
453        A SuccessTuple indicating success.
454
455        """
456        import platform
457        if platform.system() == 'Windows':
458            return False, "Cannot run background jobs on Windows."
459
460        ### The daemon might exist and be paused.
461        if self.status == 'paused':
462            return self.resume()
463
464        self._remove_stop_file()
465        if self.status == 'running':
466            return True, f"Daemon '{self}' is already running."
467
468        self.mkdir_if_not_exists(allow_dirty_run)
469        _write_pickle_success_tuple = self.write_pickle()
470        if not _write_pickle_success_tuple[0]:
471            return _write_pickle_success_tuple
472
473        _launch_daemon_code = (
474            "from meerschaum.utils.daemon import Daemon, _daemons; "
475            f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
476            f"_daemons['{self.daemon_id}'] = daemon; "
477            f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
478            "allow_dirty_run=True)"
479        )
480        env = dict(os.environ)
481        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
482        msg = (
483            "Success"
484            if _launch_success_bool
485            else f"Failed to start daemon '{self.daemon_id}'."
486        )
487        if not wait or not _launch_success_bool:
488            return _launch_success_bool, msg
489
490        timeout = self.get_timeout_seconds(timeout)
491        check_timeout_interval = self.get_check_timeout_interval_seconds()
492
493        if not timeout:
494            success = self.status == 'running'
495            msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'."
496            if success:
497                self._capture_process_timestamp('began')
498            return success, msg
499
500        begin = time.perf_counter()
501        while (time.perf_counter() - begin) < timeout:
502            if self.status == 'running':
503                self._capture_process_timestamp('began')
504                return True, "Success"
505            time.sleep(check_timeout_interval)
506
507        return False, (
508            f"Failed to start daemon '{self.daemon_id}' within {timeout} second"
509            + ('s' if timeout != 1 else '') + '.'
510        )

Run the daemon as a child process and continue executing the parent.

Parameters
  • keep_daemon_output (bool, default True): If False, delete the daemon's output directory upon exiting.
  • allow_dirty_run (bool, default False): If True, run the daemon, even if the daemon_id directory exists. This option is dangerous because if the same daemon_id runs concurrently, the last to finish will overwrite the output of the first.
  • wait (bool, default True): If True, block until Daemon.status is running (or the timeout expires).
  • timeout (Union[int, float], default 4): If wait is True, block for up to timeout seconds before returning a failure.
Returns
  • A SuccessTuple indicating success.
def kill(self, timeout: Union[int, float, NoneType] = 8) -> Tuple[bool, str]:
513    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
514        """
515        Forcibly terminate a running daemon.
516        Sends a SIGTERM signal to the process.
517
518        Parameters
519        ----------
520        timeout: Optional[int], default 3
521            How many seconds to wait for the process to terminate.
522
523        Returns
524        -------
525        A SuccessTuple indicating success.
526        """
527        if self.status != 'paused':
528            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
529            if success:
530                self._write_stop_file('kill')
531                self.stdin_file.close()
532                self._remove_blocking_stdin_file()
533                return success, msg
534
535        if self.status == 'stopped':
536            self._write_stop_file('kill')
537            self.stdin_file.close()
538            self._remove_blocking_stdin_file()
539            return True, "Process has already stopped."
540
541        psutil = attempt_import('psutil')
542        process = self.process
543        try:
544            process.terminate()
545            process.kill()
546            process.wait(timeout=timeout)
547        except Exception as e:
548            return False, f"Failed to kill job {self} ({process}) with exception: {e}"
549
550        try:
551            if process.status():
552                return False, "Failed to stop daemon '{self}' ({process})."
553        except psutil.NoSuchProcess:
554            pass
555
556        if self.pid_path.exists():
557            try:
558                self.pid_path.unlink()
559            except Exception:
560                pass
561
562        self._write_stop_file('kill')
563        self.stdin_file.close()
564        self._remove_blocking_stdin_file()
565        return True, "Success"

Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.

Parameters
  • timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
  • A SuccessTuple indicating success.
def quit(self, timeout: Union[int, float, NoneType] = None) -> Tuple[bool, str]:
567    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
568        """Gracefully quit a running daemon."""
569        if self.status == 'paused':
570            return self.kill(timeout)
571
572        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
573        if signal_success:
574            self._write_stop_file('quit')
575            self.stdin_file.close()
576            self._remove_blocking_stdin_file()
577        return signal_success, signal_msg

Gracefully quit a running daemon.

def pause( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
579    def pause(
580        self,
581        timeout: Union[int, float, None] = None,
582        check_timeout_interval: Union[float, int, None] = None,
583    ) -> SuccessTuple:
584        """
585        Pause the daemon if it is running.
586
587        Parameters
588        ----------
589        timeout: Union[float, int, None], default None
590            The maximum number of seconds to wait for a process to suspend.
591
592        check_timeout_interval: Union[float, int, None], default None
593            The number of seconds to wait between checking if the process is still running.
594
595        Returns
596        -------
597        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
598        """
599        self._remove_blocking_stdin_file()
600
601        if self.process is None:
602            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
603
604        if self.status == 'paused':
605            return True, f"Daemon '{self.daemon_id}' is already paused."
606
607        self._write_stop_file('pause')
608        self.stdin_file.close()
609        self._remove_blocking_stdin_file()
610        try:
611            self.process.suspend()
612        except Exception as e:
613            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
614
615        timeout = self.get_timeout_seconds(timeout)
616        check_timeout_interval = self.get_check_timeout_interval_seconds(
617            check_timeout_interval
618        )
619
620        psutil = attempt_import('psutil')
621
622        if not timeout:
623            try:
624                success = self.process.status() == 'stopped'
625            except psutil.NoSuchProcess:
626                success = True
627            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
628            if success:
629                self._capture_process_timestamp('paused')
630            return success, msg
631
632        begin = time.perf_counter()
633        while (time.perf_counter() - begin) < timeout:
634            try:
635                if self.process.status() == 'stopped':
636                    self._capture_process_timestamp('paused')
637                    return True, "Success"
638            except psutil.NoSuchProcess as e:
639                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
640            time.sleep(check_timeout_interval)
641
642        return False, (
643            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
644            + ('s' if timeout != 1 else '') + '.'
645        )

Pause the daemon if it is running.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully suspended.
def resume( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
647    def resume(
648        self,
649        timeout: Union[int, float, None] = None,
650        check_timeout_interval: Union[float, int, None] = None,
651    ) -> SuccessTuple:
652        """
653        Resume the daemon if it is paused.
654
655        Parameters
656        ----------
657        timeout: Union[float, int, None], default None
658            The maximum number of seconds to wait for a process to resume.
659
660        check_timeout_interval: Union[float, int, None], default None
661            The number of seconds to wait between checking if the process is still stopped.
662
663        Returns
664        -------
665        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
666        """
667        if self.status == 'running':
668            return True, f"Daemon '{self.daemon_id}' is already running."
669
670        if self.status == 'stopped':
671            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
672
673        self._remove_stop_file()
674        try:
675            if self.process is None:
676                return False, f"Cannot resume daemon '{self.daemon_id}'."
677
678            self.process.resume()
679        except Exception as e:
680            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
681
682        timeout = self.get_timeout_seconds(timeout)
683        check_timeout_interval = self.get_check_timeout_interval_seconds(
684            check_timeout_interval
685        )
686
687        if not timeout:
688            success = self.status == 'running'
689            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
690            if success:
691                self._capture_process_timestamp('began')
692            return success, msg
693
694        begin = time.perf_counter()
695        while (time.perf_counter() - begin) < timeout:
696            if self.status == 'running':
697                self._capture_process_timestamp('began')
698                return True, "Success"
699            time.sleep(check_timeout_interval)
700
701        return False, (
702            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
703            + ('s' if timeout != 1 else '') + '.'
704        )

Resume the daemon if it is paused.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully resumed.
def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
843    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
844        """Create the Daemon's directory.
845        If `allow_dirty_run` is `False` and the directory already exists,
846        raise a `FileExistsError`.
847        """
848        try:
849            self.path.mkdir(parents=True, exist_ok=True)
850            _already_exists = any(os.scandir(self.path))
851        except FileExistsError:
852            _already_exists = True
853
854        if _already_exists and not allow_dirty_run:
855            error(
856                f"Daemon '{self.daemon_id}' already exists. " +
857                "To allow this daemon to run, do one of the following:\n"
858                + "  - Execute `daemon.cleanup()`.\n"
859                + f"  - Delete the directory '{self.path}'.\n"
860                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
861                FileExistsError,
862            )

Create the Daemon's directory. If allow_dirty_run is False and the directory already exists, raise a FileExistsError.

process: "Union['psutil.Process', None]"
864    @property
865    def process(self) -> Union['psutil.Process', None]:
866        """
867        Return the psutil process for the Daemon.
868        """
869        psutil = attempt_import('psutil')
870        pid = self.pid
871        if pid is None:
872            return None
873        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
874            try:
875                self._process = psutil.Process(int(pid))
876                process_exists = True
877            except Exception:
878                process_exists = False
879            if not process_exists:
880                _ = self.__dict__.pop('_process', None)
881                try:
882                    if self.pid_path.exists():
883                        self.pid_path.unlink()
884                except Exception:
885                    pass
886                return None
887        return self._process

Return the psutil process for the Daemon.

status: str
889    @property
890    def status(self) -> str:
891        """
892        Return the running status of this Daemon.
893        """
894        if self.process is None:
895            return 'stopped'
896
897        psutil = attempt_import('psutil', lazy=False)
898        try:
899            if self.process.status() == 'stopped':
900                return 'paused'
901            if self.process.status() == 'zombie':
902                raise psutil.NoSuchProcess(self.process.pid)
903        except (psutil.NoSuchProcess, AttributeError):
904            if self.pid_path.exists():
905                try:
906                    self.pid_path.unlink()
907                except Exception:
908                    pass
909            return 'stopped'
910
911        return 'running'

Return the running status of this Daemon.

path: pathlib.Path
921    @property
922    def path(self) -> pathlib.Path:
923        """
924        Return the path for this Daemon's directory.
925        """
926        return self._get_path_from_daemon_id(self.daemon_id)

Return the path for this Daemon's directory.

properties_path: pathlib.Path
935    @property
936    def properties_path(self) -> pathlib.Path:
937        """
938        Return the `propterties.json` path for this Daemon.
939        """
940        return self._get_properties_path_from_daemon_id(self.daemon_id)

Return the propterties.json path for this Daemon.

stop_path: pathlib.Path
942    @property
943    def stop_path(self) -> pathlib.Path:
944        """
945        Return the path for the stop file (created when manually stopped).
946        """
947        return self.path / '.stop.json'

Return the path for the stop file (created when manually stopped).

log_path: pathlib.Path
949    @property
950    def log_path(self) -> pathlib.Path:
951        """
952        Return the log path.
953        """
954        logs_cf = self.properties.get('logs', None) or {}
955        if 'path' not in logs_cf:
956            import meerschaum.config.paths as paths
957            return paths.LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
958
959        return pathlib.Path(logs_cf['path'])

Return the log path.

stdin_file_path: pathlib.Path
961    @property
962    def stdin_file_path(self) -> pathlib.Path:
963        """
964        Return the stdin file path.
965        """
966        return self.path / 'input.stdin'

Return the stdin file path.

blocking_stdin_file_path: pathlib.Path
968    @property
969    def blocking_stdin_file_path(self) -> pathlib.Path:
970        """
971        Return the stdin file path.
972        """
973        if '_blocking_stdin_file_path' in self.__dict__:
974            return self._blocking_stdin_file_path
975
976        return self.path / 'input.stdin.block'

Return the stdin file path.

prompt_kwargs_file_path: pathlib.Path
978    @property
979    def prompt_kwargs_file_path(self) -> pathlib.Path:
980        """
981        Return the file path to the kwargs for the invoking `prompt()`.
982        """
983        return self.path / 'prompt_kwargs.json'

Return the file path to the kwargs for the invoking prompt().

log_offset_path: pathlib.Path
985    @property
986    def log_offset_path(self) -> pathlib.Path:
987        """
988        Return the log offset file path.
989        """
990        import meerschaum.config.paths as paths
991        return paths.LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')

Return the log offset file path.

log_offset_lock: "'fasteners.InterProcessLock'"
 993    @property
 994    def log_offset_lock(self) -> 'fasteners.InterProcessLock':
 995        """
 996        Return the process lock context manager.
 997        """
 998        if '_log_offset_lock' in self.__dict__:
 999            return self._log_offset_lock
1000
1001        fasteners = attempt_import('fasteners')
1002        self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path)
1003        return self._log_offset_lock

Return the process lock context manager.

rotating_log: RotatingFile
1005    @property
1006    def rotating_log(self) -> RotatingFile:
1007        """
1008        The rotating log file for the daemon's output.
1009        """
1010        if '_rotating_log' in self.__dict__:
1011            return self._rotating_log
1012
1013        logs_cf = self.properties.get('logs', None) or {}
1014        write_timestamps = logs_cf.get('write_timestamps', None)
1015        if write_timestamps is None:
1016            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1017
1018        timestamp_format = logs_cf.get('timestamp_format', None)
1019        if timestamp_format is None:
1020            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1021
1022        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1023        if num_files_to_keep is None:
1024            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1025
1026        max_file_size = logs_cf.get('max_file_size', None)
1027        if max_file_size is None:
1028            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1029
1030        redirect_streams = logs_cf.get('redirect_streams', True)
1031
1032        self._rotating_log = RotatingFile(
1033            self.log_path,
1034            redirect_streams=redirect_streams,
1035            write_timestamps=write_timestamps,
1036            timestamp_format=timestamp_format,
1037            num_files_to_keep=num_files_to_keep,
1038            max_file_size=max_file_size,
1039        )
1040        return self._rotating_log

The rotating log file for the daemon's output.

stdin_file
1042    @property
1043    def stdin_file(self):
1044        """
1045        Return the file handler for the stdin file.
1046        """
1047        if (_stdin_file := self.__dict__.get('_stdin_file', None)):
1048            return _stdin_file
1049
1050        self._stdin_file = StdinFile(
1051            self.stdin_file_path,
1052            lock_file_path=self.blocking_stdin_file_path,
1053        )
1054        return self._stdin_file

Return the file handler for the stdin file.

log_text: Optional[str]
1056    @property
1057    def log_text(self) -> Union[str, None]:
1058        """
1059        Read the log files and return their contents.
1060        Returns `None` if the log file does not exist.
1061        """
1062        logs_cf = self.properties.get('logs', None) or {}
1063        write_timestamps = logs_cf.get('write_timestamps', None)
1064        if write_timestamps is None:
1065            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1066
1067        timestamp_format = logs_cf.get('timestamp_format', None)
1068        if timestamp_format is None:
1069            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1070
1071        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1072        if num_files_to_keep is None:
1073            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1074
1075        max_file_size = logs_cf.get('max_file_size', None)
1076        if max_file_size is None:
1077            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1078
1079        new_rotating_log = RotatingFile(
1080            self.rotating_log.file_path,
1081            num_files_to_keep=num_files_to_keep,
1082            max_file_size=max_file_size,
1083            write_timestamps=write_timestamps,
1084            timestamp_format=timestamp_format,
1085        )
1086        return new_rotating_log.read()

Read the log files and return their contents. Returns None if the log file does not exist.

def readlines(self) -> List[str]:
1088    def readlines(self) -> List[str]:
1089        """
1090        Read the next log lines, persisting the cursor for later use.
1091        Note this will alter the cursor of `self.rotating_log`.
1092        """
1093        self.rotating_log._cursor = self._read_log_offset()
1094        lines = self.rotating_log.readlines()
1095        self._write_log_offset()
1096        return lines

Read the next log lines, persisting the cursor for later use. Note this will alter the cursor of self.rotating_log.

pid: Optional[int]
1129    @property
1130    def pid(self) -> Union[int, None]:
1131        """
1132        Read the PID file and return its contents.
1133        Returns `None` if the PID file does not exist.
1134        """
1135        if not self.pid_path.exists():
1136            return None
1137        try:
1138            with open(self.pid_path, 'r', encoding='utf-8') as f:
1139                text = f.read()
1140            if len(text) == 0:
1141                return None
1142            pid = int(text.rstrip())
1143        except Exception as e:
1144            warn(e)
1145            text = None
1146            pid = None
1147        return pid

Read the PID file and return its contents. Returns None if the PID file does not exist.

pid_path: pathlib.Path
1149    @property
1150    def pid_path(self) -> pathlib.Path:
1151        """
1152        Return the path to a file containing the PID for this Daemon.
1153        """
1154        return self.path / 'process.pid'

Return the path to a file containing the PID for this Daemon.

pid_lock: "'fasteners.InterProcessLock'"
1156    @property
1157    def pid_lock(self) -> 'fasteners.InterProcessLock':
1158        """
1159        Return the process lock context manager.
1160        """
1161        if '_pid_lock' in self.__dict__:
1162            return self._pid_lock
1163
1164        fasteners = attempt_import('fasteners')
1165        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
1166        return self._pid_lock

Return the process lock context manager.

pickle_path: pathlib.Path
1168    @property
1169    def pickle_path(self) -> pathlib.Path:
1170        """
1171        Return the path for the pickle file.
1172        """
1173        return self.path / 'pickle.pkl'

Return the path for the pickle file.

def read_properties(self) -> Optional[Dict[str, Any]]:
1175    def read_properties(self) -> Optional[Dict[str, Any]]:
1176        """Read the properties JSON file and return the dictionary."""
1177        if not self.properties_path.exists():
1178            return None
1179        try:
1180            with open(self.properties_path, 'r', encoding='utf-8') as file:
1181                properties = json.load(file)
1182        except Exception:
1183            properties = {}
1184        
1185        return properties or {}

Read the properties JSON file and return the dictionary.

def read_pickle(self) -> Daemon:
1187    def read_pickle(self) -> Daemon:
1188        """Read a Daemon's pickle file and return the `Daemon`."""
1189        import pickle
1190        import traceback
1191        if not self.pickle_path.exists():
1192            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1193
1194        if self.pickle_path.stat().st_size == 0:
1195            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1196
1197        try:
1198            with open(self.pickle_path, 'rb') as pickle_file:
1199                daemon = pickle.load(pickle_file)
1200            success, msg = True, 'Success'
1201        except Exception as e:
1202            success, msg = False, str(e)
1203            daemon = None
1204            traceback.print_exception(type(e), e, e.__traceback__)
1205        if not success:
1206            error(msg)
1207        return daemon

Read a Daemon's pickle file and return the Daemon.

properties: Dict[str, Any]
1209    @property
1210    def properties(self) -> Dict[str, Any]:
1211        """
1212        Return the contents of the properties JSON file.
1213        """
1214        try:
1215            _file_properties = self.read_properties() or {}
1216        except Exception:
1217            traceback.print_exc()
1218            _file_properties = {}
1219
1220        if not self._properties:
1221            self._properties = _file_properties
1222
1223        if self._properties is None:
1224            self._properties = {}
1225
1226        if (
1227            self._properties.get('result', None) is None
1228            and _file_properties.get('result', None) is not None
1229        ):
1230            _ = self._properties.pop('result', None)
1231
1232        if _file_properties is not None:
1233            self._properties = apply_patch_to_config(
1234                _file_properties,
1235                self._properties,
1236            )
1237
1238        return self._properties

Return the contents of the properties JSON file.

hidden: bool
1240    @property
1241    def hidden(self) -> bool:
1242        """
1243        Return a bool indicating whether this Daemon should be displayed.
1244        """
1245        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')

Return a bool indicating whether this Daemon should be displayed.

def write_properties(self) -> Tuple[bool, str]:
1247    def write_properties(self) -> SuccessTuple:
1248        """Write the properties dictionary to the properties JSON file
1249        (only if self.properties exists).
1250        """
1251        from meerschaum.utils.misc import generate_password
1252        success, msg = (
1253            False,
1254            f"No properties to write for daemon '{self.daemon_id}'."
1255        )
1256        backup_path = self.properties_path.parent / (generate_password(8) + '.json')
1257        props = self.properties
1258        if props is not None:
1259            try:
1260                self.path.mkdir(parents=True, exist_ok=True)
1261                if self.properties_path.exists():
1262                    self.properties_path.rename(backup_path)
1263                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1264                    json.dump(props, properties_file)
1265                success, msg = True, 'Success'
1266            except Exception as e:
1267                success, msg = False, str(e)
1268
1269        try:
1270            if backup_path.exists():
1271                if not success:
1272                    backup_path.rename(self.properties_path)
1273                else:
1274                    backup_path.unlink()
1275        except Exception as e:
1276            success, msg = False, str(e)
1277
1278        return success, msg

Write the properties dictionary to the properties JSON file (only if self.properties exists).

def write_pickle(self) -> Tuple[bool, str]:
1280    def write_pickle(self) -> SuccessTuple:
1281        """Write the pickle file for the daemon."""
1282        import pickle
1283        import traceback
1284        from meerschaum.utils.misc import generate_password
1285
1286        if not self.pickle:
1287            return True, "Success"
1288
1289        from meerschaum._internal.entry import _shells
1290        if _shells:
1291            from meerschaum._internal.shell.Shell import revert_input
1292            revert_input()
1293
1294        backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl')
1295        try:
1296            self.path.mkdir(parents=True, exist_ok=True)
1297            if self.pickle_path.exists():
1298                self.pickle_path.rename(backup_path)
1299            with open(self.pickle_path, 'wb+') as pickle_file:
1300                pickle.dump(self, pickle_file)
1301            success, msg = True, "Success"
1302        except Exception as e:
1303            success, msg = False, str(e)
1304            traceback.print_exception(type(e), e, e.__traceback__)
1305        try:
1306            if backup_path.exists():
1307                if not success:
1308                    backup_path.rename(self.pickle_path)
1309                else:
1310                    backup_path.unlink()
1311        except Exception as e:
1312            success, msg = False, str(e)
1313        return success, msg

Write the pickle file for the daemon.

def cleanup(self, keep_logs: bool = False) -> Tuple[bool, str]:
1343    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1344        """
1345        Remove a daemon's directory after execution.
1346
1347        Parameters
1348        ----------
1349        keep_logs: bool, default False
1350            If `True`, skip deleting the daemon's log files.
1351
1352        Returns
1353        -------
1354        A `SuccessTuple` indicating success.
1355        """
1356        if self.path.exists():
1357            try:
1358                shutil.rmtree(self.path)
1359            except Exception as e:
1360                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1361                warn(msg)
1362                return False, msg
1363        if not keep_logs:
1364            self.rotating_log.delete()
1365            try:
1366                if self.log_offset_path.exists():
1367                    self.log_offset_path.unlink()
1368            except Exception as e:
1369                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1370                warn(msg)
1371                return False, msg
1372        return True, "Success"

Remove a daemon's directory after execution.

Parameters
  • keep_logs (bool, default False): If True, skip deleting the daemon's log files.
Returns
  • A SuccessTuple indicating success.
def get_timeout_seconds(self, timeout: Union[int, float, NoneType] = None) -> Union[int, float]:
1375    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1376        """
1377        Return the timeout value to use. Use `--timeout-seconds` if provided,
1378        else the configured default (8).
1379        """
1380        if isinstance(timeout, (int, float)):
1381            return timeout
1382        return get_config('jobs', 'timeout_seconds')

Return the timeout value to use. Use --timeout-seconds if provided, else the configured default (8).

def get_check_timeout_interval_seconds( self, check_timeout_interval: Union[int, float, NoneType] = None) -> Union[int, float]:
1385    def get_check_timeout_interval_seconds(
1386        self,
1387        check_timeout_interval: Union[int, float, None] = None,
1388    ) -> Union[int, float]:
1389        """
1390        Return the interval value to check the status of timeouts.
1391        """
1392        if isinstance(check_timeout_interval, (int, float)):
1393            return check_timeout_interval
1394        return get_config('jobs', 'check_timeout_interval_seconds')

Return the interval value to check the status of timeouts.

target_args: Optional[Tuple[Any]]
1396    @property
1397    def target_args(self) -> Union[Tuple[Any], None]:
1398        """
1399        Return the positional arguments to pass to the target function.
1400        """
1401        target_args = (
1402            self.__dict__.get('_target_args', None)
1403            or self.properties.get('target', {}).get('args', None)
1404        )
1405        if target_args is None:
1406            return tuple([])
1407
1408        return tuple(target_args)

Return the positional arguments to pass to the target function.

target_kw: Optional[Dict[str, Any]]
1410    @property
1411    def target_kw(self) -> Union[Dict[str, Any], None]:
1412        """
1413        Return the keyword arguments to pass to the target function.
1414        """
1415        target_kw = (
1416            self.__dict__.get('_target_kw', None)
1417            or self.properties.get('target', {}).get('kw', None)
1418        )
1419        if target_kw is None:
1420            return {}
1421
1422        return {key: val for key, val in target_kw.items()}

Return the keyword arguments to pass to the target function.

class StdinFile(io.TextIOBase):
 22class StdinFile(io.TextIOBase):
 23    """
 24    Redirect user input into a Daemon's context.
 25    """
 26    def __init__(
 27        self,
 28        file_path: Union[pathlib.Path, str],
 29        lock_file_path: Optional[pathlib.Path] = None,
 30        decode: bool = True,
 31        refresh_seconds: Union[int, float, None] = None,
 32    ):
 33        if isinstance(file_path, str):
 34            file_path = pathlib.Path(file_path)
 35
 36        self.file_path = file_path
 37        self.blocking_file_path = (
 38            lock_file_path
 39            if lock_file_path is not None
 40            else (file_path.parent / (file_path.name + '.block'))
 41        )
 42        self._file_handler = None
 43        self._fd = None
 44        self.sel = selectors.DefaultSelector()
 45        self.decode = decode
 46        self._write_fp = None
 47        self._refresh_seconds = refresh_seconds
 48
 49    @property
 50    def encoding(self):
 51        return 'utf-8'
 52
 53    @property
 54    def file_handler(self):
 55        """
 56        Return the read file handler to the provided file path.
 57        """
 58        if self._file_handler is not None:
 59            return self._file_handler
 60
 61        if not self.file_path.exists():
 62            self.file_path.parent.mkdir(parents=True, exist_ok=True)
 63            os.mkfifo(self.file_path.as_posix(), mode=0o600)
 64
 65        self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK)
 66        self._file_handler = os.fdopen(self._fd, 'rb', buffering=0)
 67        self.sel.register(self._file_handler, selectors.EVENT_READ)
 68        return self._file_handler
 69
 70    def write(self, data):
 71        if self._write_fp is None:
 72            self.file_path.parent.mkdir(parents=True, exist_ok=True)
 73            if not self.file_path.exists():
 74                os.mkfifo(self.file_path.as_posix(), mode=0o600)
 75            self._write_fp = open(self.file_path, 'wb')
 76
 77        if isinstance(data, str):
 78            data = data.encode('utf-8')
 79        try:
 80            self._write_fp.write(data)
 81            self._write_fp.flush()
 82        except BrokenPipeError:
 83            pass
 84
 85    def fileno(self):
 86        fileno = self.file_handler.fileno()
 87        return fileno
 88
 89    def read(self, size=-1):
 90        """
 91        Read from the FIFO pipe, blocking on EOFError.
 92        """
 93        _ = self.file_handler
 94        while True:
 95            try:
 96                data = self._file_handler.read(size)
 97                if data:
 98                    try:
 99                        if self.blocking_file_path.exists():
100                            self.blocking_file_path.unlink()
101                    except Exception:
102                        warn(traceback.format_exc())
103                    return data.decode('utf-8') if self.decode else data
104            except (OSError, EOFError):
105                pass
106
107            if not self.blocking_file_path.exists():
108                self.blocking_file_path.touch()
109            time.sleep(self.refresh_seconds)
110
111    def readline(self, size=-1):
112        line = '' if self.decode else b''
113        while True:
114            data = self.read(1)
115            if not data or ((data == '\n') if self.decode else (data == b'\n')):
116                break
117            line += data
118
119        return line
120
121    def close(self):
122        if self._file_handler is not None:
123            self.sel.unregister(self._file_handler)
124            self._file_handler.close()
125            try:
126                os.close(self._fd)
127            except OSError:
128                pass
129            self._file_handler = None
130            self._fd = None
131
132        if self._write_fp is not None:
133            try:
134                self._write_fp.close()
135            except BrokenPipeError:
136                pass
137            self._write_fp = None
138
139        try:
140            if self.blocking_file_path.exists():
141                self.blocking_file_path.unlink()
142        except Exception:
143            pass
144        super().close()
145
146    def is_open(self):
147        return self._file_handler is not None
148
149    def isatty(self) -> bool:
150        return False
151
152    @property
153    def refresh_seconds(self) -> Union[int, float]:
154        """
155        How many seconds between checking for blocking functions.
156        """
157        if not self._refresh_seconds:
158            self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds')
159        return self._refresh_seconds
160
161    def __str__(self) -> str:
162        return f"StdinFile('{self.file_path}')"
163
164    def __repr__(self) -> str:
165        return str(self)

Redirect user input into a Daemon's context.

StdinFile( file_path: Union[pathlib.Path, str], lock_file_path: Optional[pathlib.Path] = None, decode: bool = True, refresh_seconds: Union[int, float, NoneType] = None)
26    def __init__(
27        self,
28        file_path: Union[pathlib.Path, str],
29        lock_file_path: Optional[pathlib.Path] = None,
30        decode: bool = True,
31        refresh_seconds: Union[int, float, None] = None,
32    ):
33        if isinstance(file_path, str):
34            file_path = pathlib.Path(file_path)
35
36        self.file_path = file_path
37        self.blocking_file_path = (
38            lock_file_path
39            if lock_file_path is not None
40            else (file_path.parent / (file_path.name + '.block'))
41        )
42        self._file_handler = None
43        self._fd = None
44        self.sel = selectors.DefaultSelector()
45        self.decode = decode
46        self._write_fp = None
47        self._refresh_seconds = refresh_seconds
file_path
blocking_file_path
sel
decode
encoding
49    @property
50    def encoding(self):
51        return 'utf-8'

Encoding of the text stream.

Subclasses should override.

file_handler
53    @property
54    def file_handler(self):
55        """
56        Return the read file handler to the provided file path.
57        """
58        if self._file_handler is not None:
59            return self._file_handler
60
61        if not self.file_path.exists():
62            self.file_path.parent.mkdir(parents=True, exist_ok=True)
63            os.mkfifo(self.file_path.as_posix(), mode=0o600)
64
65        self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK)
66        self._file_handler = os.fdopen(self._fd, 'rb', buffering=0)
67        self.sel.register(self._file_handler, selectors.EVENT_READ)
68        return self._file_handler

Return the read file handler to the provided file path.

def write(self, data):
70    def write(self, data):
71        if self._write_fp is None:
72            self.file_path.parent.mkdir(parents=True, exist_ok=True)
73            if not self.file_path.exists():
74                os.mkfifo(self.file_path.as_posix(), mode=0o600)
75            self._write_fp = open(self.file_path, 'wb')
76
77        if isinstance(data, str):
78            data = data.encode('utf-8')
79        try:
80            self._write_fp.write(data)
81            self._write_fp.flush()
82        except BrokenPipeError:
83            pass

Write string s to stream.

Return the number of characters written (which is always equal to the length of the string).

def fileno(self):
85    def fileno(self):
86        fileno = self.file_handler.fileno()
87        return fileno

Return underlying file descriptor if one exists.

Raise OSError if the IO object does not use a file descriptor.

def read(self, size=-1):
 89    def read(self, size=-1):
 90        """
 91        Read from the FIFO pipe, blocking on EOFError.
 92        """
 93        _ = self.file_handler
 94        while True:
 95            try:
 96                data = self._file_handler.read(size)
 97                if data:
 98                    try:
 99                        if self.blocking_file_path.exists():
100                            self.blocking_file_path.unlink()
101                    except Exception:
102                        warn(traceback.format_exc())
103                    return data.decode('utf-8') if self.decode else data
104            except (OSError, EOFError):
105                pass
106
107            if not self.blocking_file_path.exists():
108                self.blocking_file_path.touch()
109            time.sleep(self.refresh_seconds)

Read from the FIFO pipe, blocking on EOFError.

def readline(self, size=-1):
111    def readline(self, size=-1):
112        line = '' if self.decode else b''
113        while True:
114            data = self.read(1)
115            if not data or ((data == '\n') if self.decode else (data == b'\n')):
116                break
117            line += data
118
119        return line

Read until newline or EOF.

Return an empty string if EOF is hit immediately. If size is specified, at most size characters will be read.

def close(self):
121    def close(self):
122        if self._file_handler is not None:
123            self.sel.unregister(self._file_handler)
124            self._file_handler.close()
125            try:
126                os.close(self._fd)
127            except OSError:
128                pass
129            self._file_handler = None
130            self._fd = None
131
132        if self._write_fp is not None:
133            try:
134                self._write_fp.close()
135            except BrokenPipeError:
136                pass
137            self._write_fp = None
138
139        try:
140            if self.blocking_file_path.exists():
141                self.blocking_file_path.unlink()
142        except Exception:
143            pass
144        super().close()

Flush and close the IO object.

This method has no effect if the file is already closed.

def is_open(self):
146    def is_open(self):
147        return self._file_handler is not None
def isatty(self) -> bool:
149    def isatty(self) -> bool:
150        return False

Return whether this is an 'interactive' stream.

Return False if it can't be determined.

refresh_seconds: Union[int, float]
152    @property
153    def refresh_seconds(self) -> Union[int, float]:
154        """
155        How many seconds between checking for blocking functions.
156        """
157        if not self._refresh_seconds:
158            self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds')
159        return self._refresh_seconds

How many seconds between checking for blocking functions.

class RotatingFile(io.IOBase):
 27class RotatingFile(io.IOBase):
 28    """
 29    A `RotatingFile` may be treated like a normal file-like object.
 30    Under the hood, however, it will create new sub-files and delete old ones.
 31    """
 32
 33    SEEK_BACK_ATTEMPTS: int = 5
 34
 35    def __init__(
 36        self,
 37        file_path: pathlib.Path,
 38        num_files_to_keep: Optional[int] = None,
 39        max_file_size: Optional[int] = None,
 40        redirect_streams: bool = False,
 41        write_timestamps: bool = False,
 42        timestamp_format: Optional[str] = None,
 43        write_callback: Optional[Callable[[str], None]] = None,
 44    ):
 45        """
 46        Create a file-like object which manages other files.
 47
 48        Parameters
 49        ----------
 50        num_files_to_keep: int, default None
 51            How many sub-files to keep at any given time.
 52            Defaults to the configured value (5).
 53
 54        max_file_size: int, default None
 55            How large in bytes each sub-file can grow before another file is created.
 56            Note that this is not a hard limit but rather a threshold
 57            which may be slightly exceeded.
 58            Defaults to the configured value (100_000).
 59
 60        redirect_streams: bool, default False
 61            If `True`, redirect previous file streams when opening a new file descriptor.
 62            
 63            NOTE: Only set this to `True` if you are entering into a daemon context.
 64            Doing so will redirect `sys.stdout` and `sys.stderr` into the log files.
 65
 66        write_timestamps: bool, default False
 67            If `True`, prepend the current UTC timestamp to each line of the file.
 68
 69        timestamp_format: str, default None
 70            If `write_timestamps` is `True`, use this format for the timestamps.
 71            Defaults to `'%Y-%m-%d %H:%M'`.
 72
 73        write_callback: Optional[Callable[[str], None]], default None
 74            If provided, execute this callback with the data to be written.
 75        """
 76        self.file_path = pathlib.Path(file_path)
 77        if num_files_to_keep is None:
 78            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
 79        if max_file_size is None:
 80            max_file_size = get_config('jobs', 'logs', 'max_file_size')
 81        if timestamp_format is None:
 82            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
 83        if num_files_to_keep < 1:
 84            raise ValueError("At least 1 file must be kept.")
 85        if max_file_size < 100:
 86            raise ValueError("Subfiles must contain at least 100 bytes.")
 87
 88        self.num_files_to_keep = num_files_to_keep
 89        self.max_file_size = max_file_size
 90        self.redirect_streams = redirect_streams
 91        self.write_timestamps = write_timestamps
 92        self.timestamp_format = timestamp_format
 93        self.write_callback = write_callback
 94        self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?')
 95
 96        ### When subfiles are opened, map from their index to the file objects.
 97        self.subfile_objects = {}
 98        self._redirected_subfile_objects = {}
 99        self._current_file_obj = None
100        self._previous_file_obj = None
101
102        ### When reading, keep track of the file index and position.
103        self._cursor: Tuple[int, int] = (0, 0)
104
105        ### Don't forget to close any stray files.
106        atexit.register(self.close)
107
108
109    def fileno(self):
110        """
111        Return the file descriptor for the latest subfile.
112        """
113        self.refresh_files(start_interception=False)
114        return self._current_file_obj.fileno()
115
116
117    def get_latest_subfile_path(self) -> pathlib.Path:
118        """
119        Return the path for the latest subfile to which to write into.
120        """
121        return self.get_subfile_path_from_index(
122            self.get_latest_subfile_index()
123        )
124
125
126    def get_remaining_subfile_size(self, subfile_index: int) -> int:
127        """
128        Return the remaining buffer size for a subfile.
129
130        Parameters
131        ---------
132        subfile_index: int
133            The index of the subfile to be checked.
134
135        Returns
136        -------
137        The remaining size in bytes.
138        """
139        subfile_path = self.get_subfile_path_from_index(subfile_index)
140        if not subfile_path.exists():
141            return self.max_file_size
142
143        return self.max_file_size - os.path.getsize(subfile_path)
144
145
146    def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
147        """
148        Return whether a given subfile is too large.
149
150        Parameters
151        ----------
152        subfile_index: int
153            The index of the subfile to be checked.
154
155        potential_new_len: int, default 0
156            The length of a potential write of new data.
157
158        Returns
159        -------
160        A bool indicating the subfile is or will be too large.
161        """
162        subfile_path = self.get_subfile_path_from_index(subfile_index)
163        if not subfile_path.exists():
164            return False
165
166        self.flush()
167
168        return (
169            (os.path.getsize(subfile_path) + potential_new_len)
170            >=
171            self.max_file_size
172        )
173
174
175    def get_latest_subfile_index(self) -> int:
176        """
177        Return the latest existing subfile index.
178        If no index may be found, return -1.
179        """
180        existing_subfile_paths = self.get_existing_subfile_paths()
181        latest_index = (
182            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
183            if existing_subfile_paths
184            else 0
185        )
186        return latest_index
187
188
189    def get_index_from_subfile_name(self, subfile_name: str) -> int:
190        """
191        Return the index from a given subfile name.
192        If the file name cannot be parsed, return -1.
193        """
194        try:
195            return int(subfile_name.replace(self.file_path.name + '.', ''))
196        except Exception:
197            return -1
198
199
200    def get_subfile_name_from_index(self, subfile_index: int) -> str:
201        """
202        Return the subfile name from the given index.
203        """
204        return f'{self.file_path.name}.{subfile_index}'
205
206
207    def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
208        """
209        Return the subfile's path from its index.
210        """
211        return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)
212
213
214    def get_existing_subfile_indices(self) -> List[int]:
215        """
216        Return of list of subfile indices which exist on disk.
217        """
218        existing_subfile_paths = self.get_existing_subfile_paths()
219        return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]
220
221
222    def get_existing_subfile_paths(self) -> List[pathlib.Path]:
223        """
224        Return a list of file paths that match the input filename pattern.
225        """
226        if not self.file_path.parent.exists():
227            return []
228
229        subfile_names_indices = sorted(
230            [
231                (file_name, self.get_index_from_subfile_name(file_name))
232                for file_name in os.listdir(self.file_path.parent)
233                if (
234                    file_name.startswith(self.file_path.name)
235                    and re.match(self.subfile_regex_pattern, file_name)
236                )
237            ],
238            key=lambda x: x[1],
239        )
240        return [
241            (self.file_path.parent / file_name)
242            for file_name, _ in subfile_names_indices
243        ]
244
245
246    def refresh_files(
247        self,
248        potential_new_len: int = 0,
249        start_interception: bool = False,
250    ) -> 'io.TextIOWrapper':
251        """
252        Check the state of the subfiles.
253        If the latest subfile is too large, create a new file and delete old ones.
254
255        Parameters
256        ----------
257        potential_new_len: int, default 0
258
259        start_interception: bool, default False
260            If `True`, kick off the file interception threads.
261        """
262        self.flush()
263
264        latest_subfile_index = self.get_latest_subfile_index()
265        latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index)
266
267        ### First run with existing log files: open the most recent log file.
268        is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None)
269
270        ### Sometimes a new file is created but output doesn't switch over.
271        lost_latest_handle = (
272            self._current_file_obj is not None
273            and
274            self.get_index_from_subfile_name(self._current_file_obj.name) == -1
275        )
276        if is_first_run_with_logs or lost_latest_handle:
277            self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8')
278            if self.redirect_streams:
279                try:
280                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
281                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
282                except OSError:
283                    warn(
284                        f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}"
285                    )
286                if start_interception and self.write_timestamps:
287                    self.start_log_fd_interception()
288
289        create_new_file = (
290            (latest_subfile_index == -1)
291            or
292            self._current_file_obj is None
293            or
294            self.is_subfile_too_large(latest_subfile_index, potential_new_len)
295        )
296        if create_new_file:
297            self.increment_subfiles()
298
299        return self._current_file_obj
300
301    def increment_subfiles(self, increment_by: int = 1):
302        """
303        Create a new subfile and switch the file pointer over.
304        """
305        latest_subfile_index = self.get_latest_subfile_index()
306        old_subfile_index = latest_subfile_index
307        new_subfile_index = old_subfile_index + increment_by
308        new_file_path = self.get_subfile_path_from_index(new_subfile_index)
309        self._previous_file_obj = self._current_file_obj
310        self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8')
311        self.subfile_objects[new_subfile_index] = self._current_file_obj
312        self.flush()
313
314        if self.redirect_streams:
315            if self._previous_file_obj is not None:
316                self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj
317                daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj)
318            daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
319            daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
320        self.close(unused_only=True)
321
322        ### Sanity check in case writing somehow fails.
323        if self._previous_file_obj is self._current_file_obj:
324            self._previous_file_obj = None
325
326        self.delete(unused_only=True)
327
328    def close(self, unused_only: bool = False) -> None:
329        """
330        Close any open file descriptors.
331
332        Parameters
333        ----------
334        unused_only: bool, default False
335            If `True`, only close file descriptors not currently in use.
336        """
337        self.stop_log_fd_interception(unused_only=unused_only)
338        subfile_indices = sorted(self.subfile_objects.keys())
339        for subfile_index in subfile_indices:
340            subfile_object = self.subfile_objects[subfile_index]
341            if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj):
342                continue
343            try:
344                if not subfile_object.closed:
345                    subfile_object.close()
346            except Exception:
347                warn(f"Failed to close an open subfile:\n{traceback.format_exc()}")
348
349            _ = self.subfile_objects.pop(subfile_index, None)
350            if self.redirect_streams:
351                _ = self._redirected_subfile_objects.pop(subfile_index, None)
352
353        if not unused_only:
354            self._previous_file_obj = None
355            self._current_file_obj = None
356
357
358    def get_timestamp_prefix_str(self) -> str:
359        """
360        Return the current minute prefix string.
361        """
362        return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '
363
364
365    def write(self, data: str) -> None:
366        """
367        Write the given text into the latest subfile.
368        If the subfile will be too large, create a new subfile.
369        If too many subfiles exist at once, the oldest one will be deleted.
370
371        NOTE: This will not split data across multiple files.
372        As such, if data is larger than max_file_size, then the corresponding subfile
373        may exceed this limit.
374        """
375        try:
376            if callable(self.write_callback):
377                self.write_callback(data)
378        except Exception:
379            warn(f"Failed to execute write callback:\n{traceback.format_exc()}")
380
381        try:
382            self.file_path.parent.mkdir(exist_ok=True, parents=True)
383            if isinstance(data, bytes):
384                data = data.decode('utf-8')
385
386            prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else ""
387            suffix_str = "\n" if self.write_timestamps else ""
388            self.refresh_files(
389                potential_new_len = len(prefix_str + data + suffix_str),
390                start_interception = self.write_timestamps,
391            )
392            try:
393                if prefix_str:
394                    self._current_file_obj.write(prefix_str)
395                self._current_file_obj.write(data)
396                if suffix_str:
397                    self._current_file_obj.write(suffix_str)
398            except BrokenPipeError:
399                warn("BrokenPipeError encountered. The daemon may have been terminated.")
400                return
401            except Exception:
402                warn(f"Failed to write to subfile:\n{traceback.format_exc()}")
403            self.flush()
404            self.delete(unused_only=True)
405        except Exception as e:
406            warn(f"Unexpected error in RotatingFile.write: {e}")
407
408
409    def delete(self, unused_only: bool = False) -> None:
410        """
411        Delete old subfiles.
412
413        Parameters
414        ----------
415        unused_only: bool, default False
416            If `True`, only delete subfiles which are no longer needed.
417        """
418        existing_subfile_paths = self.get_existing_subfile_paths()
419        if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep:
420            return
421
422        self.flush()
423        self.close(unused_only=unused_only)
424
425        end_ix = (
426            (-1 * self.num_files_to_keep)
427            if unused_only
428            else len(existing_subfile_paths)
429        )
430        for subfile_path_to_delete in existing_subfile_paths[0:end_ix]:
431            subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name)
432
433            try:
434                subfile_path_to_delete.unlink()
435            except Exception:
436                warn(
437                    f"Unable to delete subfile '{subfile_path_to_delete}':\n"
438                    + f"{traceback.format_exc()}"
439                )
440
441
442    def read(self, *args, **kwargs) -> str:
443        """
444        Read the contents of the existing subfiles.
445        """
446        existing_subfile_indices = [
447            self.get_index_from_subfile_name(subfile_path.name)
448            for subfile_path in self.get_existing_subfile_paths()
449        ]
450        paths_to_read = [
451            self.get_subfile_path_from_index(subfile_index)
452            for subfile_index in existing_subfile_indices
453            if subfile_index >= self._cursor[0]
454        ]
455        buffer = ''
456        refresh_cursor = True
457        for subfile_path in paths_to_read:
458            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
459            seek_ix = (
460                self._cursor[1]
461                if subfile_index == self._cursor[0]
462                else 0
463            )
464
465            if (
466                subfile_index in self.subfile_objects
467                and
468                subfile_index not in self._redirected_subfile_objects
469            ):
470                subfile_object = self.subfile_objects[subfile_index]
471                for i in range(self.SEEK_BACK_ATTEMPTS):
472                    try:
473                        subfile_object.seek(max(seek_ix - i, 0))
474                        buffer += subfile_object.read()
475                    except UnicodeDecodeError:
476                        continue
477                    break
478            else:
479                with open(subfile_path, 'r', encoding='utf-8') as f:
480                    for i in range(self.SEEK_BACK_ATTEMPTS):
481                        try:
482                            f.seek(max(seek_ix - i, 0))
483                            buffer += f.read()
484                        except UnicodeDecodeError:
485                            continue
486                        break
487
488                    ### Handle the case when no files have yet been opened.
489                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
490                        self._cursor = (subfile_index, f.tell())
491                        refresh_cursor = False
492
493        if refresh_cursor:
494            self.refresh_cursor()
495        return buffer
496
497
498    def refresh_cursor(self) -> None:
499        """
500        Update the cursor to the latest subfile index and file.tell() value.
501        """
502        self.flush()
503        existing_subfile_paths = self.get_existing_subfile_paths()
504        current_ix = (
505            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
506            if existing_subfile_paths
507            else 0
508        )
509        position = self._current_file_obj.tell() if self._current_file_obj is not None else 0
510        self._cursor = (current_ix, position)
511
512
513    def readlines(self) -> List[str]:
514        """
515        Return a list of lines of text.
516        """
517        existing_subfile_indices = [
518            self.get_index_from_subfile_name(subfile_path.name)
519            for subfile_path in self.get_existing_subfile_paths()
520        ]
521        paths_to_read = [
522            self.get_subfile_path_from_index(subfile_index)
523            for subfile_index in existing_subfile_indices
524            if subfile_index >= self._cursor[0]
525        ]
526
527        lines = []
528        refresh_cursor = True
529        for subfile_path in paths_to_read:
530            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
531            seek_ix = (
532                self._cursor[1]
533                if subfile_index == self._cursor[0]
534                else 0
535            )
536
537            subfile_lines = []
538            if (
539                subfile_index in self.subfile_objects
540                and
541                subfile_index not in self._redirected_subfile_objects
542            ):
543                subfile_object = self.subfile_objects[subfile_index]
544                for i in range(self.SEEK_BACK_ATTEMPTS):
545                    try:
546                        subfile_object.seek(max((seek_ix - i), 0))
547                        subfile_lines = subfile_object.readlines()
548                    except UnicodeDecodeError:
549                        continue
550                    break
551            else:
552                with open(subfile_path, 'r', encoding='utf-8') as f:
553                    for i in range(self.SEEK_BACK_ATTEMPTS):
554                        try:
555                            f.seek(max(seek_ix - i, 0))
556                            subfile_lines = f.readlines()
557                        except UnicodeDecodeError:
558                            continue
559                        break
560
561                    ### Handle the case when no files have yet been opened.
562                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
563                        self._cursor = (subfile_index, f.tell())
564                        refresh_cursor = False
565
566            ### Sometimes a line may span multiple files.
567            if lines and subfile_lines and not lines[-1].endswith('\n'):
568                lines[-1] += subfile_lines[0]
569                new_lines = subfile_lines[1:]
570            else:
571                new_lines = subfile_lines
572            lines.extend(new_lines)
573
574        if refresh_cursor:
575            self.refresh_cursor()
576        return lines
577
578
579    def seekable(self) -> bool:
580        return True
581
582
583    def seek(self, position: int) -> None:
584        """
585        Seek to the beginning of the logs stream.
586        """
587        existing_subfile_indices = self.get_existing_subfile_indices()
588        min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0
589        max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0
590        if position == 0:
591            self._cursor = (min_ix, 0)
592            return
593
594        self._cursor = (max_ix, position)
595        if self._current_file_obj is not None:
596            self._current_file_obj.seek(position)
597
598    
599    def flush(self) -> None:
600        """
601        Flush any open subfiles.
602        """
603        for subfile_index, subfile_object in self.subfile_objects.items():
604            if not subfile_object.closed:
605                try:
606                    subfile_object.flush()
607                except Exception:
608                    warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}")
609
610        if self.redirect_streams:
611            try:
612                sys.stdout.flush()
613            except BrokenPipeError:
614                pass
615            except Exception:
616                warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}")
617            try:
618                sys.stderr.flush()
619            except BrokenPipeError:
620                pass
621            except Exception:
622                warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")
623
624
625    def start_log_fd_interception(self):
626        """
627        Start the file descriptor monitoring threads.
628        """
629        if not self.write_timestamps:
630            return
631
632        self._stdout_interceptor = FileDescriptorInterceptor(
633            sys.stdout.fileno(),
634            self.get_timestamp_prefix_str,
635        )
636        self._stderr_interceptor = FileDescriptorInterceptor(
637            sys.stderr.fileno(),
638            self.get_timestamp_prefix_str,
639        )
640
641        self._stdout_interceptor_thread = Thread(
642            target = self._stdout_interceptor.start_interception,
643            daemon = True,
644        )
645        self._stderr_interceptor_thread = Thread(
646            target = self._stderr_interceptor.start_interception,
647            daemon = True,
648        )
649        self._stdout_interceptor_thread.start()
650        self._stderr_interceptor_thread.start()
651        self._intercepting = True
652
653        if '_interceptor_threads' not in self.__dict__:
654            self._interceptor_threads = []
655        if '_interceptors' not in self.__dict__:
656            self._interceptors = []
657        self._interceptor_threads.extend([
658            self._stdout_interceptor_thread,
659            self._stderr_interceptor_thread,
660        ])
661        self._interceptors.extend([
662            self._stdout_interceptor,
663            self._stderr_interceptor,
664        ])
665        self.stop_log_fd_interception(unused_only=True)
666
667
668    def stop_log_fd_interception(self, unused_only: bool = False):
669        """
670        Stop the file descriptor monitoring threads.
671        """
672        if not self.write_timestamps:
673            return
674
675        interceptors = self.__dict__.get('_interceptors', [])
676        interceptor_threads = self.__dict__.get('_interceptor_threads', [])
677
678        end_ix = len(interceptors) if not unused_only else -2
679
680        for interceptor in interceptors[:end_ix]:
681            interceptor.stop_interception()
682        del interceptors[:end_ix]
683
684        for thread in interceptor_threads[:end_ix]:
685            try:
686                thread.join()
687            except Exception:
688                warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}")
689        del interceptor_threads[:end_ix]
690
691    def touch(self):
692        """
693        Touch the latest subfile.
694        """
695        subfile_path = self.get_latest_subfile_path()
696        subfile_path.touch()
697
698    def isatty(self) -> bool:
699        return True
700
701    def __repr__(self) -> str:
702        """
703        Return basic info for this `RotatingFile`.
704        """
705        return (
706            "RotatingFile("
707            + f"'{self.file_path.as_posix()}', "
708            + f"num_files_to_keep={self.num_files_to_keep}, "
709            + f"max_file_size={self.max_file_size})"
710        )

A RotatingFile may be treated like a normal file-like object. Under the hood, however, it will create new sub-files and delete old ones.

RotatingFile( file_path: pathlib.Path, num_files_to_keep: Optional[int] = None, max_file_size: Optional[int] = None, redirect_streams: bool = False, write_timestamps: bool = False, timestamp_format: Optional[str] = None, write_callback: Optional[Callable[[str], NoneType]] = None)
 35    def __init__(
 36        self,
 37        file_path: pathlib.Path,
 38        num_files_to_keep: Optional[int] = None,
 39        max_file_size: Optional[int] = None,
 40        redirect_streams: bool = False,
 41        write_timestamps: bool = False,
 42        timestamp_format: Optional[str] = None,
 43        write_callback: Optional[Callable[[str], None]] = None,
 44    ):
 45        """
 46        Create a file-like object which manages other files.
 47
 48        Parameters
 49        ----------
 50        num_files_to_keep: int, default None
 51            How many sub-files to keep at any given time.
 52            Defaults to the configured value (5).
 53
 54        max_file_size: int, default None
 55            How large in bytes each sub-file can grow before another file is created.
 56            Note that this is not a hard limit but rather a threshold
 57            which may be slightly exceeded.
 58            Defaults to the configured value (100_000).
 59
 60        redirect_streams: bool, default False
 61            If `True`, redirect previous file streams when opening a new file descriptor.
 62            
 63            NOTE: Only set this to `True` if you are entering into a daemon context.
 64            Doing so will redirect `sys.stdout` and `sys.stderr` into the log files.
 65
 66        write_timestamps: bool, default False
 67            If `True`, prepend the current UTC timestamp to each line of the file.
 68
 69        timestamp_format: str, default None
 70            If `write_timestamps` is `True`, use this format for the timestamps.
 71            Defaults to `'%Y-%m-%d %H:%M'`.
 72
 73        write_callback: Optional[Callable[[str], None]], default None
 74            If provided, execute this callback with the data to be written.
 75        """
 76        self.file_path = pathlib.Path(file_path)
 77        if num_files_to_keep is None:
 78            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
 79        if max_file_size is None:
 80            max_file_size = get_config('jobs', 'logs', 'max_file_size')
 81        if timestamp_format is None:
 82            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
 83        if num_files_to_keep < 1:
 84            raise ValueError("At least 1 file must be kept.")
 85        if max_file_size < 100:
 86            raise ValueError("Subfiles must contain at least 100 bytes.")
 87
 88        self.num_files_to_keep = num_files_to_keep
 89        self.max_file_size = max_file_size
 90        self.redirect_streams = redirect_streams
 91        self.write_timestamps = write_timestamps
 92        self.timestamp_format = timestamp_format
 93        self.write_callback = write_callback
 94        self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?')
 95
 96        ### When subfiles are opened, map from their index to the file objects.
 97        self.subfile_objects = {}
 98        self._redirected_subfile_objects = {}
 99        self._current_file_obj = None
100        self._previous_file_obj = None
101
102        ### When reading, keep track of the file index and position.
103        self._cursor: Tuple[int, int] = (0, 0)
104
105        ### Don't forget to close any stray files.
106        atexit.register(self.close)

Create a file-like object which manages other files.

Parameters
  • num_files_to_keep (int, default None): How many sub-files to keep at any given time. Defaults to the configured value (5).
  • max_file_size (int, default None): How large in bytes each sub-file can grow before another file is created. Note that this is not a hard limit but rather a threshold which may be slightly exceeded. Defaults to the configured value (100_000).
  • redirect_streams (bool, default False): If True, redirect previous file streams when opening a new file descriptor.

    NOTE: Only set this to True if you are entering into a daemon context. Doing so will redirect sys.stdout and sys.stderr into the log files.

  • write_timestamps (bool, default False): If True, prepend the current UTC timestamp to each line of the file.
  • timestamp_format (str, default None): If write_timestamps is True, use this format for the timestamps. Defaults to '%Y-%m-%d %H:%M'.
  • write_callback (Optional[Callable[[str], None]], default None): If provided, execute this callback with the data to be written.
SEEK_BACK_ATTEMPTS: int = 5
file_path
num_files_to_keep
max_file_size
redirect_streams
write_timestamps
timestamp_format
write_callback
subfile_regex_pattern
subfile_objects
def fileno(self):
109    def fileno(self):
110        """
111        Return the file descriptor for the latest subfile.
112        """
113        self.refresh_files(start_interception=False)
114        return self._current_file_obj.fileno()

Return the file descriptor for the latest subfile.

def get_latest_subfile_path(self) -> pathlib.Path:
117    def get_latest_subfile_path(self) -> pathlib.Path:
118        """
119        Return the path for the latest subfile to which to write into.
120        """
121        return self.get_subfile_path_from_index(
122            self.get_latest_subfile_index()
123        )

Return the path for the latest subfile to which to write into.

def get_remaining_subfile_size(self, subfile_index: int) -> int:
126    def get_remaining_subfile_size(self, subfile_index: int) -> int:
127        """
128        Return the remaining buffer size for a subfile.
129
130        Parameters
131        ---------
132        subfile_index: int
133            The index of the subfile to be checked.
134
135        Returns
136        -------
137        The remaining size in bytes.
138        """
139        subfile_path = self.get_subfile_path_from_index(subfile_index)
140        if not subfile_path.exists():
141            return self.max_file_size
142
143        return self.max_file_size - os.path.getsize(subfile_path)

Return the remaining buffer size for a subfile.

Parameters
  • subfile_index (int): The index of the subfile to be checked.
Returns
  • The remaining size in bytes.
def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
146    def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
147        """
148        Return whether a given subfile is too large.
149
150        Parameters
151        ----------
152        subfile_index: int
153            The index of the subfile to be checked.
154
155        potential_new_len: int, default 0
156            The length of a potential write of new data.
157
158        Returns
159        -------
160        A bool indicating the subfile is or will be too large.
161        """
162        subfile_path = self.get_subfile_path_from_index(subfile_index)
163        if not subfile_path.exists():
164            return False
165
166        self.flush()
167
168        return (
169            (os.path.getsize(subfile_path) + potential_new_len)
170            >=
171            self.max_file_size
172        )

Return whether a given subfile is too large.

Parameters
  • subfile_index (int): The index of the subfile to be checked.
  • potential_new_len (int, default 0): The length of a potential write of new data.
Returns
  • A bool indicating the subfile is or will be too large.
def get_latest_subfile_index(self) -> int:
175    def get_latest_subfile_index(self) -> int:
176        """
177        Return the latest existing subfile index.
178        If no index may be found, return -1.
179        """
180        existing_subfile_paths = self.get_existing_subfile_paths()
181        latest_index = (
182            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
183            if existing_subfile_paths
184            else 0
185        )
186        return latest_index

Return the latest existing subfile index. If no index may be found, return -1.

def get_index_from_subfile_name(self, subfile_name: str) -> int:
189    def get_index_from_subfile_name(self, subfile_name: str) -> int:
190        """
191        Return the index from a given subfile name.
192        If the file name cannot be parsed, return -1.
193        """
194        try:
195            return int(subfile_name.replace(self.file_path.name + '.', ''))
196        except Exception:
197            return -1

Return the index from a given subfile name. If the file name cannot be parsed, return -1.

def get_subfile_name_from_index(self, subfile_index: int) -> str:
200    def get_subfile_name_from_index(self, subfile_index: int) -> str:
201        """
202        Return the subfile name from the given index.
203        """
204        return f'{self.file_path.name}.{subfile_index}'

Return the subfile name from the given index.

def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
207    def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
208        """
209        Return the subfile's path from its index.
210        """
211        return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)

Return the subfile's path from its index.

def get_existing_subfile_indices(self) -> List[int]:
214    def get_existing_subfile_indices(self) -> List[int]:
215        """
216        Return of list of subfile indices which exist on disk.
217        """
218        existing_subfile_paths = self.get_existing_subfile_paths()
219        return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]

Return of list of subfile indices which exist on disk.

def get_existing_subfile_paths(self) -> List[pathlib.Path]:
222    def get_existing_subfile_paths(self) -> List[pathlib.Path]:
223        """
224        Return a list of file paths that match the input filename pattern.
225        """
226        if not self.file_path.parent.exists():
227            return []
228
229        subfile_names_indices = sorted(
230            [
231                (file_name, self.get_index_from_subfile_name(file_name))
232                for file_name in os.listdir(self.file_path.parent)
233                if (
234                    file_name.startswith(self.file_path.name)
235                    and re.match(self.subfile_regex_pattern, file_name)
236                )
237            ],
238            key=lambda x: x[1],
239        )
240        return [
241            (self.file_path.parent / file_name)
242            for file_name, _ in subfile_names_indices
243        ]

Return a list of file paths that match the input filename pattern.

def refresh_files( self, potential_new_len: int = 0, start_interception: bool = False) -> _io.TextIOWrapper:
246    def refresh_files(
247        self,
248        potential_new_len: int = 0,
249        start_interception: bool = False,
250    ) -> 'io.TextIOWrapper':
251        """
252        Check the state of the subfiles.
253        If the latest subfile is too large, create a new file and delete old ones.
254
255        Parameters
256        ----------
257        potential_new_len: int, default 0
258
259        start_interception: bool, default False
260            If `True`, kick off the file interception threads.
261        """
262        self.flush()
263
264        latest_subfile_index = self.get_latest_subfile_index()
265        latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index)
266
267        ### First run with existing log files: open the most recent log file.
268        is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None)
269
270        ### Sometimes a new file is created but output doesn't switch over.
271        lost_latest_handle = (
272            self._current_file_obj is not None
273            and
274            self.get_index_from_subfile_name(self._current_file_obj.name) == -1
275        )
276        if is_first_run_with_logs or lost_latest_handle:
277            self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8')
278            if self.redirect_streams:
279                try:
280                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
281                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
282                except OSError:
283                    warn(
284                        f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}"
285                    )
286                if start_interception and self.write_timestamps:
287                    self.start_log_fd_interception()
288
289        create_new_file = (
290            (latest_subfile_index == -1)
291            or
292            self._current_file_obj is None
293            or
294            self.is_subfile_too_large(latest_subfile_index, potential_new_len)
295        )
296        if create_new_file:
297            self.increment_subfiles()
298
299        return self._current_file_obj

Check the state of the subfiles. If the latest subfile is too large, create a new file and delete old ones.

Parameters
  • potential_new_len (int, default 0):

  • start_interception (bool, default False): If True, kick off the file interception threads.

def increment_subfiles(self, increment_by: int = 1):
301    def increment_subfiles(self, increment_by: int = 1):
302        """
303        Create a new subfile and switch the file pointer over.
304        """
305        latest_subfile_index = self.get_latest_subfile_index()
306        old_subfile_index = latest_subfile_index
307        new_subfile_index = old_subfile_index + increment_by
308        new_file_path = self.get_subfile_path_from_index(new_subfile_index)
309        self._previous_file_obj = self._current_file_obj
310        self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8')
311        self.subfile_objects[new_subfile_index] = self._current_file_obj
312        self.flush()
313
314        if self.redirect_streams:
315            if self._previous_file_obj is not None:
316                self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj
317                daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj)
318            daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
319            daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
320        self.close(unused_only=True)
321
322        ### Sanity check in case writing somehow fails.
323        if self._previous_file_obj is self._current_file_obj:
324            self._previous_file_obj = None
325
326        self.delete(unused_only=True)

Create a new subfile and switch the file pointer over.

def close(self, unused_only: bool = False) -> None:
328    def close(self, unused_only: bool = False) -> None:
329        """
330        Close any open file descriptors.
331
332        Parameters
333        ----------
334        unused_only: bool, default False
335            If `True`, only close file descriptors not currently in use.
336        """
337        self.stop_log_fd_interception(unused_only=unused_only)
338        subfile_indices = sorted(self.subfile_objects.keys())
339        for subfile_index in subfile_indices:
340            subfile_object = self.subfile_objects[subfile_index]
341            if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj):
342                continue
343            try:
344                if not subfile_object.closed:
345                    subfile_object.close()
346            except Exception:
347                warn(f"Failed to close an open subfile:\n{traceback.format_exc()}")
348
349            _ = self.subfile_objects.pop(subfile_index, None)
350            if self.redirect_streams:
351                _ = self._redirected_subfile_objects.pop(subfile_index, None)
352
353        if not unused_only:
354            self._previous_file_obj = None
355            self._current_file_obj = None

Close any open file descriptors.

Parameters
  • unused_only (bool, default False): If True, only close file descriptors not currently in use.
def get_timestamp_prefix_str(self) -> str:
358    def get_timestamp_prefix_str(self) -> str:
359        """
360        Return the current minute prefix string.
361        """
362        return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '

Return the current minute prefix string.

def write(self, data: str) -> None:
365    def write(self, data: str) -> None:
366        """
367        Write the given text into the latest subfile.
368        If the subfile will be too large, create a new subfile.
369        If too many subfiles exist at once, the oldest one will be deleted.
370
371        NOTE: This will not split data across multiple files.
372        As such, if data is larger than max_file_size, then the corresponding subfile
373        may exceed this limit.
374        """
375        try:
376            if callable(self.write_callback):
377                self.write_callback(data)
378        except Exception:
379            warn(f"Failed to execute write callback:\n{traceback.format_exc()}")
380
381        try:
382            self.file_path.parent.mkdir(exist_ok=True, parents=True)
383            if isinstance(data, bytes):
384                data = data.decode('utf-8')
385
386            prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else ""
387            suffix_str = "\n" if self.write_timestamps else ""
388            self.refresh_files(
389                potential_new_len = len(prefix_str + data + suffix_str),
390                start_interception = self.write_timestamps,
391            )
392            try:
393                if prefix_str:
394                    self._current_file_obj.write(prefix_str)
395                self._current_file_obj.write(data)
396                if suffix_str:
397                    self._current_file_obj.write(suffix_str)
398            except BrokenPipeError:
399                warn("BrokenPipeError encountered. The daemon may have been terminated.")
400                return
401            except Exception:
402                warn(f"Failed to write to subfile:\n{traceback.format_exc()}")
403            self.flush()
404            self.delete(unused_only=True)
405        except Exception as e:
406            warn(f"Unexpected error in RotatingFile.write: {e}")

Write the given text into the latest subfile. If the subfile will be too large, create a new subfile. If too many subfiles exist at once, the oldest one will be deleted.

NOTE: This will not split data across multiple files. As such, if data is larger than max_file_size, then the corresponding subfile may exceed this limit.

def delete(self, unused_only: bool = False) -> None:
409    def delete(self, unused_only: bool = False) -> None:
410        """
411        Delete old subfiles.
412
413        Parameters
414        ----------
415        unused_only: bool, default False
416            If `True`, only delete subfiles which are no longer needed.
417        """
418        existing_subfile_paths = self.get_existing_subfile_paths()
419        if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep:
420            return
421
422        self.flush()
423        self.close(unused_only=unused_only)
424
425        end_ix = (
426            (-1 * self.num_files_to_keep)
427            if unused_only
428            else len(existing_subfile_paths)
429        )
430        for subfile_path_to_delete in existing_subfile_paths[0:end_ix]:
431            subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name)
432
433            try:
434                subfile_path_to_delete.unlink()
435            except Exception:
436                warn(
437                    f"Unable to delete subfile '{subfile_path_to_delete}':\n"
438                    + f"{traceback.format_exc()}"
439                )

Delete old subfiles.

Parameters
  • unused_only (bool, default False): If True, only delete subfiles which are no longer needed.
def read(self, *args, **kwargs) -> str:
442    def read(self, *args, **kwargs) -> str:
443        """
444        Read the contents of the existing subfiles.
445        """
446        existing_subfile_indices = [
447            self.get_index_from_subfile_name(subfile_path.name)
448            for subfile_path in self.get_existing_subfile_paths()
449        ]
450        paths_to_read = [
451            self.get_subfile_path_from_index(subfile_index)
452            for subfile_index in existing_subfile_indices
453            if subfile_index >= self._cursor[0]
454        ]
455        buffer = ''
456        refresh_cursor = True
457        for subfile_path in paths_to_read:
458            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
459            seek_ix = (
460                self._cursor[1]
461                if subfile_index == self._cursor[0]
462                else 0
463            )
464
465            if (
466                subfile_index in self.subfile_objects
467                and
468                subfile_index not in self._redirected_subfile_objects
469            ):
470                subfile_object = self.subfile_objects[subfile_index]
471                for i in range(self.SEEK_BACK_ATTEMPTS):
472                    try:
473                        subfile_object.seek(max(seek_ix - i, 0))
474                        buffer += subfile_object.read()
475                    except UnicodeDecodeError:
476                        continue
477                    break
478            else:
479                with open(subfile_path, 'r', encoding='utf-8') as f:
480                    for i in range(self.SEEK_BACK_ATTEMPTS):
481                        try:
482                            f.seek(max(seek_ix - i, 0))
483                            buffer += f.read()
484                        except UnicodeDecodeError:
485                            continue
486                        break
487
488                    ### Handle the case when no files have yet been opened.
489                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
490                        self._cursor = (subfile_index, f.tell())
491                        refresh_cursor = False
492
493        if refresh_cursor:
494            self.refresh_cursor()
495        return buffer

Read the contents of the existing subfiles.

def refresh_cursor(self) -> None:
498    def refresh_cursor(self) -> None:
499        """
500        Update the cursor to the latest subfile index and file.tell() value.
501        """
502        self.flush()
503        existing_subfile_paths = self.get_existing_subfile_paths()
504        current_ix = (
505            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
506            if existing_subfile_paths
507            else 0
508        )
509        position = self._current_file_obj.tell() if self._current_file_obj is not None else 0
510        self._cursor = (current_ix, position)

Update the cursor to the latest subfile index and file.tell() value.

def readlines(self) -> List[str]:
513    def readlines(self) -> List[str]:
514        """
515        Return a list of lines of text.
516        """
517        existing_subfile_indices = [
518            self.get_index_from_subfile_name(subfile_path.name)
519            for subfile_path in self.get_existing_subfile_paths()
520        ]
521        paths_to_read = [
522            self.get_subfile_path_from_index(subfile_index)
523            for subfile_index in existing_subfile_indices
524            if subfile_index >= self._cursor[0]
525        ]
526
527        lines = []
528        refresh_cursor = True
529        for subfile_path in paths_to_read:
530            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
531            seek_ix = (
532                self._cursor[1]
533                if subfile_index == self._cursor[0]
534                else 0
535            )
536
537            subfile_lines = []
538            if (
539                subfile_index in self.subfile_objects
540                and
541                subfile_index not in self._redirected_subfile_objects
542            ):
543                subfile_object = self.subfile_objects[subfile_index]
544                for i in range(self.SEEK_BACK_ATTEMPTS):
545                    try:
546                        subfile_object.seek(max((seek_ix - i), 0))
547                        subfile_lines = subfile_object.readlines()
548                    except UnicodeDecodeError:
549                        continue
550                    break
551            else:
552                with open(subfile_path, 'r', encoding='utf-8') as f:
553                    for i in range(self.SEEK_BACK_ATTEMPTS):
554                        try:
555                            f.seek(max(seek_ix - i, 0))
556                            subfile_lines = f.readlines()
557                        except UnicodeDecodeError:
558                            continue
559                        break
560
561                    ### Handle the case when no files have yet been opened.
562                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
563                        self._cursor = (subfile_index, f.tell())
564                        refresh_cursor = False
565
566            ### Sometimes a line may span multiple files.
567            if lines and subfile_lines and not lines[-1].endswith('\n'):
568                lines[-1] += subfile_lines[0]
569                new_lines = subfile_lines[1:]
570            else:
571                new_lines = subfile_lines
572            lines.extend(new_lines)
573
574        if refresh_cursor:
575            self.refresh_cursor()
576        return lines

Return a list of lines of text.

def seekable(self) -> bool:
579    def seekable(self) -> bool:
580        return True

Return whether object supports random access.

If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().

def seek(self, position: int) -> None:
583    def seek(self, position: int) -> None:
584        """
585        Seek to the beginning of the logs stream.
586        """
587        existing_subfile_indices = self.get_existing_subfile_indices()
588        min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0
589        max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0
590        if position == 0:
591            self._cursor = (min_ix, 0)
592            return
593
594        self._cursor = (max_ix, position)
595        if self._current_file_obj is not None:
596            self._current_file_obj.seek(position)

Seek to the beginning of the logs stream.

def flush(self) -> None:
599    def flush(self) -> None:
600        """
601        Flush any open subfiles.
602        """
603        for subfile_index, subfile_object in self.subfile_objects.items():
604            if not subfile_object.closed:
605                try:
606                    subfile_object.flush()
607                except Exception:
608                    warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}")
609
610        if self.redirect_streams:
611            try:
612                sys.stdout.flush()
613            except BrokenPipeError:
614                pass
615            except Exception:
616                warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}")
617            try:
618                sys.stderr.flush()
619            except BrokenPipeError:
620                pass
621            except Exception:
622                warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")

Flush any open subfiles.

def start_log_fd_interception(self):
625    def start_log_fd_interception(self):
626        """
627        Start the file descriptor monitoring threads.
628        """
629        if not self.write_timestamps:
630            return
631
632        self._stdout_interceptor = FileDescriptorInterceptor(
633            sys.stdout.fileno(),
634            self.get_timestamp_prefix_str,
635        )
636        self._stderr_interceptor = FileDescriptorInterceptor(
637            sys.stderr.fileno(),
638            self.get_timestamp_prefix_str,
639        )
640
641        self._stdout_interceptor_thread = Thread(
642            target = self._stdout_interceptor.start_interception,
643            daemon = True,
644        )
645        self._stderr_interceptor_thread = Thread(
646            target = self._stderr_interceptor.start_interception,
647            daemon = True,
648        )
649        self._stdout_interceptor_thread.start()
650        self._stderr_interceptor_thread.start()
651        self._intercepting = True
652
653        if '_interceptor_threads' not in self.__dict__:
654            self._interceptor_threads = []
655        if '_interceptors' not in self.__dict__:
656            self._interceptors = []
657        self._interceptor_threads.extend([
658            self._stdout_interceptor_thread,
659            self._stderr_interceptor_thread,
660        ])
661        self._interceptors.extend([
662            self._stdout_interceptor,
663            self._stderr_interceptor,
664        ])
665        self.stop_log_fd_interception(unused_only=True)

Start the file descriptor monitoring threads.

def stop_log_fd_interception(self, unused_only: bool = False):
668    def stop_log_fd_interception(self, unused_only: bool = False):
669        """
670        Stop the file descriptor monitoring threads.
671        """
672        if not self.write_timestamps:
673            return
674
675        interceptors = self.__dict__.get('_interceptors', [])
676        interceptor_threads = self.__dict__.get('_interceptor_threads', [])
677
678        end_ix = len(interceptors) if not unused_only else -2
679
680        for interceptor in interceptors[:end_ix]:
681            interceptor.stop_interception()
682        del interceptors[:end_ix]
683
684        for thread in interceptor_threads[:end_ix]:
685            try:
686                thread.join()
687            except Exception:
688                warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}")
689        del interceptor_threads[:end_ix]

Stop the file descriptor monitoring threads.

def touch(self):
691    def touch(self):
692        """
693        Touch the latest subfile.
694        """
695        subfile_path = self.get_latest_subfile_path()
696        subfile_path.touch()

Touch the latest subfile.

def isatty(self) -> bool:
698    def isatty(self) -> bool:
699        return True

Return whether this is an 'interactive' stream.

Return False if it can't be determined.

class FileDescriptorInterceptor:
 23class FileDescriptorInterceptor:
 24    """
 25    A management class to intercept data written to a file descriptor.
 26    """
 27    def __init__(
 28        self,
 29        file_descriptor: int,
 30        injection_hook: Callable[[], str],
 31    ):
 32        """
 33        Parameters
 34        ----------
 35        file_descriptor: int
 36            The OS file descriptor from which to read.
 37
 38        injection_hook: Callable[[], str]
 39            A callable which returns a string to be injected into the written data.
 40        """
 41        self.stop_event = Event()
 42        self.injection_hook = injection_hook
 43        self.original_file_descriptor = file_descriptor
 44        self.new_file_descriptor = os.dup(file_descriptor)
 45        self.read_pipe, self.write_pipe = os.pipe()
 46        self.signal_read_pipe, self.signal_write_pipe = os.pipe()
 47        os.dup2(self.write_pipe, file_descriptor)
 48
 49    def start_interception(self):
 50        """
 51        Read from the file descriptor and write the modified data after injection.
 52
 53        NOTE: This is blocking and is meant to be run in a thread.
 54        """
 55        os.set_blocking(self.read_pipe, False)
 56        os.set_blocking(self.signal_read_pipe, False)
 57        is_first_read = True
 58        while not self.stop_event.is_set():
 59            try:
 60                rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1)
 61                if self.signal_read_pipe in rlist:
 62                    break
 63                if not rlist:
 64                    continue
 65                data = os.read(self.read_pipe, 1024)
 66                if not data:
 67                    break
 68            except BlockingIOError:
 69                continue
 70            except OSError as e:
 71                if e.errno == errno.EBADF:
 72                    ### File descriptor is closed.
 73                    pass
 74                elif e.errno == errno.EINTR:
 75                    continue  # Interrupted system call, just try again
 76                else:
 77                    warn(f"OSError in FileDescriptorInterceptor: {e}")
 78                break
 79
 80            try:
 81                last_char_is_newline = data[-1] == b'\n'
 82
 83                injected_str = self.injection_hook()
 84                injected_bytes = injected_str.encode('utf-8')
 85
 86                if is_first_read:
 87                    data = b'\n' + data
 88                    is_first_read = False
 89
 90                modified_data = (
 91                    (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n')
 92                    if last_char_is_newline
 93                    else data.replace(b'\n', b'\n' + injected_bytes)
 94                )
 95                os.write(self.new_file_descriptor, modified_data)
 96            except (BrokenPipeError, OSError):
 97                break
 98            except Exception:
 99                with open(paths.DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
100                    f.write(traceback.format_exc())
101                break
102
103
104    def stop_interception(self):
105        """
106        Close the new file descriptors.
107        """
108        self.stop_event.set()
109        os.write(self.signal_write_pipe, b'\0')
110        try:
111            os.close(self.new_file_descriptor)
112        except OSError as e:
113            if e.errno != FD_CLOSED:
114                warn(
115                    "Error while trying to close the duplicated file descriptor:\n"
116                    + f"{traceback.format_exc()}"
117                )
118
119        try:
120            os.close(self.write_pipe)
121        except OSError as e:
122            if e.errno != FD_CLOSED:
123                warn(
124                    "Error while trying to close the write-pipe "
125                    + "to the intercepted file descriptor:\n"
126                    + f"{traceback.format_exc()}"
127                )
128        try:
129            os.close(self.read_pipe)
130        except OSError as e:
131            if e.errno != FD_CLOSED:
132                warn(
133                    "Error while trying to close the read-pipe "
134                    + "to the intercepted file descriptor:\n"
135                    + f"{traceback.format_exc()}"
136                )
137
138        try:
139            os.close(self.signal_read_pipe)
140        except OSError as e:
141            if e.errno != FD_CLOSED:
142                warn(
143                    "Error while trying to close the signal-read-pipe "
144                    + "to the intercepted file descriptor:\n"
145                    + f"{traceback.format_exc()}"
146                )
147
148        try:
149            os.close(self.signal_write_pipe)
150        except OSError as e:
151            if e.errno != FD_CLOSED:
152                warn(
153                    "Error while trying to close the signal-write-pipe "
154                    + "to the intercepted file descriptor:\n"
155                    + f"{traceback.format_exc()}"
156                )

A management class to intercept data written to a file descriptor.

FileDescriptorInterceptor(file_descriptor: int, injection_hook: Callable[[], str])
27    def __init__(
28        self,
29        file_descriptor: int,
30        injection_hook: Callable[[], str],
31    ):
32        """
33        Parameters
34        ----------
35        file_descriptor: int
36            The OS file descriptor from which to read.
37
38        injection_hook: Callable[[], str]
39            A callable which returns a string to be injected into the written data.
40        """
41        self.stop_event = Event()
42        self.injection_hook = injection_hook
43        self.original_file_descriptor = file_descriptor
44        self.new_file_descriptor = os.dup(file_descriptor)
45        self.read_pipe, self.write_pipe = os.pipe()
46        self.signal_read_pipe, self.signal_write_pipe = os.pipe()
47        os.dup2(self.write_pipe, file_descriptor)
Parameters
  • file_descriptor (int): The OS file descriptor from which to read.
  • injection_hook (Callable[[], str]): A callable which returns a string to be injected into the written data.
stop_event
injection_hook
original_file_descriptor
new_file_descriptor
def start_interception(self):
 49    def start_interception(self):
 50        """
 51        Read from the file descriptor and write the modified data after injection.
 52
 53        NOTE: This is blocking and is meant to be run in a thread.
 54        """
 55        os.set_blocking(self.read_pipe, False)
 56        os.set_blocking(self.signal_read_pipe, False)
 57        is_first_read = True
 58        while not self.stop_event.is_set():
 59            try:
 60                rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1)
 61                if self.signal_read_pipe in rlist:
 62                    break
 63                if not rlist:
 64                    continue
 65                data = os.read(self.read_pipe, 1024)
 66                if not data:
 67                    break
 68            except BlockingIOError:
 69                continue
 70            except OSError as e:
 71                if e.errno == errno.EBADF:
 72                    ### File descriptor is closed.
 73                    pass
 74                elif e.errno == errno.EINTR:
 75                    continue  # Interrupted system call, just try again
 76                else:
 77                    warn(f"OSError in FileDescriptorInterceptor: {e}")
 78                break
 79
 80            try:
 81                last_char_is_newline = data[-1] == b'\n'
 82
 83                injected_str = self.injection_hook()
 84                injected_bytes = injected_str.encode('utf-8')
 85
 86                if is_first_read:
 87                    data = b'\n' + data
 88                    is_first_read = False
 89
 90                modified_data = (
 91                    (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n')
 92                    if last_char_is_newline
 93                    else data.replace(b'\n', b'\n' + injected_bytes)
 94                )
 95                os.write(self.new_file_descriptor, modified_data)
 96            except (BrokenPipeError, OSError):
 97                break
 98            except Exception:
 99                with open(paths.DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
100                    f.write(traceback.format_exc())
101                break

Read from the file descriptor and write the modified data after injection.

NOTE: This is blocking and is meant to be run in a thread.

def stop_interception(self):
104    def stop_interception(self):
105        """
106        Close the new file descriptors.
107        """
108        self.stop_event.set()
109        os.write(self.signal_write_pipe, b'\0')
110        try:
111            os.close(self.new_file_descriptor)
112        except OSError as e:
113            if e.errno != FD_CLOSED:
114                warn(
115                    "Error while trying to close the duplicated file descriptor:\n"
116                    + f"{traceback.format_exc()}"
117                )
118
119        try:
120            os.close(self.write_pipe)
121        except OSError as e:
122            if e.errno != FD_CLOSED:
123                warn(
124                    "Error while trying to close the write-pipe "
125                    + "to the intercepted file descriptor:\n"
126                    + f"{traceback.format_exc()}"
127                )
128        try:
129            os.close(self.read_pipe)
130        except OSError as e:
131            if e.errno != FD_CLOSED:
132                warn(
133                    "Error while trying to close the read-pipe "
134                    + "to the intercepted file descriptor:\n"
135                    + f"{traceback.format_exc()}"
136                )
137
138        try:
139            os.close(self.signal_read_pipe)
140        except OSError as e:
141            if e.errno != FD_CLOSED:
142                warn(
143                    "Error while trying to close the signal-read-pipe "
144                    + "to the intercepted file descriptor:\n"
145                    + f"{traceback.format_exc()}"
146                )
147
148        try:
149            os.close(self.signal_write_pipe)
150        except OSError as e:
151            if e.errno != FD_CLOSED:
152                warn(
153                    "Error while trying to close the signal-write-pipe "
154                    + "to the intercepted file descriptor:\n"
155                    + f"{traceback.format_exc()}"
156                )

Close the new file descriptors.