meerschaum.utils.daemon

Manage Daemons via the Daemon class.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Manage Daemons via the `Daemon` class.
  7"""
  8
  9from __future__ import annotations
 10
 11import os
 12import pathlib
 13import shutil
 14import json
 15import datetime
 16import threading
 17import shlex
 18
 19from meerschaum.utils.typing import SuccessTuple, List, Optional, Callable, Any, Dict, Union
 20from meerschaum.utils.daemon.StdinFile import StdinFile
 21from meerschaum.utils.daemon.Daemon import Daemon
 22from meerschaum.utils.daemon.RotatingFile import RotatingFile
 23from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor
 24from meerschaum.utils.daemon._names import get_new_daemon_name
 25
 26
 27__all__ = (
 28    'daemon_action',
 29    'daemon_entry',
 30    'get_daemons',
 31    'get_daemon_ids',
 32    'get_running_daemons',
 33    'get_stopped_daemons',
 34    'get_paused_daemons',
 35    'get_filtered_daemons',
 36    'get_new_daemon_name',
 37    'run_daemon',
 38    'running_in_daemon',
 39    'Daemon',
 40    'StdinFile',
 41    'RotatingFile',
 42    'FileDescriptorInterceptor',
 43)
 44
 45_daemons = {}
 46
 47
 48def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple:
 49    """Parse sysargs and execute a Meerschaum action as a daemon.
 50
 51    Parameters
 52    ----------
 53    sysargs: Optional[List[str]], default None
 54        The command line arguments used in a Meerschaum action.
 55
 56    Returns
 57    -------
 58    A SuccessTuple.
 59    """
 60    from meerschaum._internal.entry import entry
 61    _args = {}
 62    if '--name' in sysargs or '--job-name' in sysargs:
 63        from meerschaum._internal.arguments._parse_arguments import parse_arguments
 64        _args = parse_arguments(sysargs)
 65    filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')]
 66    try:
 67        label = shlex.join(filtered_sysargs) if sysargs else None
 68    except Exception:
 69        label = ' '.join(filtered_sysargs) if sysargs else None
 70
 71    name = _args.get('name', None)
 72    daemon = None
 73    if name:
 74        try:
 75            daemon = Daemon(daemon_id=name)
 76        except Exception:
 77            daemon = None
 78
 79    if daemon is not None:
 80        existing_sysargs = daemon.properties['target']['args'][0]
 81        existing_kwargs = parse_arguments(existing_sysargs)
 82
 83        ### Remove sysargs because flags are aliased.
 84        _ = _args.pop('daemon', None)
 85        _ = _args.pop('sysargs', None)
 86        _ = _args.pop('filtered_sysargs', None)
 87        debug = _args.pop('debug', None)
 88        _args['sub_args'] = sorted(_args.get('sub_args', []))
 89        _ = existing_kwargs.pop('daemon', None)
 90        _ = existing_kwargs.pop('sysargs', None)
 91        _ = existing_kwargs.pop('filtered_sysargs', None)
 92        _ = existing_kwargs.pop('debug', None)
 93        existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', []))
 94
 95        ### Only run if the kwargs equal or no actions are provided.
 96        if existing_kwargs == _args or not _args.get('action', []):
 97            if daemon.status == 'running':
 98                return True, f"Daemon '{daemon}' is already running."
 99            return daemon.run(
100                debug=debug,
101                allow_dirty_run=True,
102            )
103
104    success_tuple = run_daemon(
105        entry,
106        filtered_sysargs,
107        daemon_id=_args.get('name', None) if _args else None,
108        label=label,
109        keep_daemon_output=('--rm' not in (sysargs or [])),
110    )
111    return success_tuple
112
113
114def daemon_action(**kw) -> SuccessTuple:
115    """Execute a Meerschaum action as a daemon."""
116    from meerschaum.utils.packages import run_python_package
117    from meerschaum.utils.threading import Thread
118    from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs
119    from meerschaum.actions import get_action
120
121    kw['daemon'] = True
122    kw['shell'] = False
123
124    action = kw.get('action', None)
125    if action and get_action(action) is None:
126        if not kw.get('allow_shell_job') and not kw.get('force'):
127            return False, (
128                f"Action '{action}' isn't recognized.\n\n"
129                + "  Include `--allow-shell-job`, `--force`, or `-f`\n  " 
130                + "to enable shell commands to run as Meerschaum jobs."
131            )
132
133    sysargs = parse_dict_to_sysargs(kw)
134    rc = run_python_package('meerschaum', sysargs, venv=None, debug=False)
135    msg = "Success" if rc == 0 else f"Daemon returned code: {rc}"
136    return rc == 0, msg
137
138
139def run_daemon(
140    func: Callable[[Any], Any],
141    *args,
142    daemon_id: Optional[str] = None,
143    keep_daemon_output: bool = True,
144    allow_dirty_run: bool = False,
145    label: Optional[str] = None,
146    **kw
147) -> Any:
148    """Execute a function as a daemon."""
149    daemon = Daemon(
150        func,
151        daemon_id=daemon_id,
152        target_args=[arg for arg in args],
153        target_kw=kw,
154        label=label,
155    )
156    return daemon.run(
157        keep_daemon_output=keep_daemon_output,
158        allow_dirty_run=allow_dirty_run,
159    )
160
161
162def get_daemons() -> List[Daemon]:
163    """
164    Return all existing Daemons, sorted by end time.
165    """
166    daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()]
167    daemons_status = {daemon: daemon.status for daemon in daemons}
168    running_daemons = {
169        daemon: daemons_status[daemon]
170        for daemon in daemons
171        if daemons_status[daemon] == 'running'
172    }
173    paused_daemons = {
174        daemon: daemons_status[daemon]
175        for daemon in daemons
176        if daemons_status[daemon] == 'paused'
177    }
178    stopped_daemons = {
179        daemon: daemons_status[daemon]
180        for daemon in daemons
181        if daemons_status[daemon] == 'stopped'
182    }
183    daemons_began = {
184        daemon: daemon.properties.get('process', {}).get('began', '9999')
185        for daemon in daemons
186    }
187    daemons_paused = {
188        daemon: daemon.properties.get('process', {}).get('paused', '9999')
189        for daemon in daemons
190    }
191    daemons_ended = {
192        daemon: daemon.properties.get('process', {}).get('ended', '9999')
193        for daemon in daemons
194    }
195    sorted_stopped_daemons = [
196        daemon
197        for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x])
198    ]
199    sorted_paused_daemons = [
200        daemon
201        for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x])
202    ]
203    sorted_running_daemons = [
204        daemon
205        for daemon in sorted(running_daemons, key=lambda x: daemons_began[x])
206    ]
207    return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons
208
209
210def get_daemon_ids() -> List[str]:
211    """
212    Return the IDs of all daemons on disk.
213    """
214    from meerschaum.config._paths import DAEMON_RESOURCES_PATH
215    return [
216        daemon_dir
217        for daemon_dir in sorted(os.listdir(DAEMON_RESOURCES_PATH))
218        if (DAEMON_RESOURCES_PATH / daemon_dir / 'properties.json').exists()
219    ]
220
221
222def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
223    """
224    Return a list of currently running daemons.
225    """
226    if daemons is None:
227        daemons = get_daemons()
228    return [
229        d
230        for d in daemons
231        if d.status == 'running'
232    ]
233
234
235def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
236    """
237    Return a list of active but paused daemons.
238    """
239    if daemons is None:
240        daemons = get_daemons()
241    return [
242        d
243        for d in daemons
244        if d.status == 'paused'
245    ]
246
247
248def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
249    """
250    Return a list of stopped daemons.
251    """
252    if daemons is None:
253        daemons = get_daemons()
254
255    return [
256        d
257        for d in daemons
258        if d.status == 'stopped'
259    ]
260
261
262def get_filtered_daemons(
263    filter_list: Optional[List[str]] = None,
264    warn: bool = False,
265) -> List[Daemon]:
266    """
267    Return a list of `Daemons` filtered by a list of `daemon_ids`.
268    Only `Daemons` that exist are returned.
269    
270    If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`).
271
272    Parameters
273    ----------
274    filter_list: Optional[List[str]], default None
275        List of `daemon_ids` to include. If `daemon_ids` is `None` or empty,
276        return all `Daemons`.
277
278    warn: bool, default False
279        If `True`, raise warnings for non-existent `daemon_ids`.
280
281    Returns
282    -------
283    A list of Daemon objects.
284
285    """
286    if not filter_list:
287        daemons = get_daemons()
288        return [d for d in daemons if not d.hidden]
289
290    from meerschaum.utils.warnings import warn as _warn
291    daemons = []
292    for d_id in filter_list:
293        try:
294            d = Daemon(daemon_id=d_id)
295            _exists = d.path.exists()
296        except Exception:
297            _exists = False
298        if not _exists:
299            if warn:
300                _warn(f"Daemon '{d_id}' does not exist.", stack=False)
301            continue
302        daemons.append(d)
303    return daemons
304
305
306def running_in_daemon() -> bool:
307    """
308    Return whether the current thread is running in a Daemon context.
309    """
310    from meerschaum._internal.static import STATIC_CONFIG
311    daemon_env_var = STATIC_CONFIG['environment']['daemon_id']
312    return daemon_env_var in os.environ
313
314
315def get_current_daemon() -> Union[Daemon, None]:
316    """
317    If running withing a daemon context, return the corresponding `Daemon`.
318    Otherwise return `None`.
319    """
320    from meerschaum._internal.static import STATIC_CONFIG
321    daemon_env_var = STATIC_CONFIG['environment']['daemon_id']
322    daemon_id = os.environ.get(daemon_env_var, None)
323    if daemon_id is None:
324        return None
325
326    return _daemons.get(daemon_id, Daemon(daemon_id=daemon_id))
def daemon_action(**kw) -> Tuple[bool, str]:
115def daemon_action(**kw) -> SuccessTuple:
116    """Execute a Meerschaum action as a daemon."""
117    from meerschaum.utils.packages import run_python_package
118    from meerschaum.utils.threading import Thread
119    from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs
120    from meerschaum.actions import get_action
121
122    kw['daemon'] = True
123    kw['shell'] = False
124
125    action = kw.get('action', None)
126    if action and get_action(action) is None:
127        if not kw.get('allow_shell_job') and not kw.get('force'):
128            return False, (
129                f"Action '{action}' isn't recognized.\n\n"
130                + "  Include `--allow-shell-job`, `--force`, or `-f`\n  " 
131                + "to enable shell commands to run as Meerschaum jobs."
132            )
133
134    sysargs = parse_dict_to_sysargs(kw)
135    rc = run_python_package('meerschaum', sysargs, venv=None, debug=False)
136    msg = "Success" if rc == 0 else f"Daemon returned code: {rc}"
137    return rc == 0, msg

Execute a Meerschaum action as a daemon.

def daemon_entry(sysargs: Optional[List[str]] = None) -> Tuple[bool, str]:
 49def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple:
 50    """Parse sysargs and execute a Meerschaum action as a daemon.
 51
 52    Parameters
 53    ----------
 54    sysargs: Optional[List[str]], default None
 55        The command line arguments used in a Meerschaum action.
 56
 57    Returns
 58    -------
 59    A SuccessTuple.
 60    """
 61    from meerschaum._internal.entry import entry
 62    _args = {}
 63    if '--name' in sysargs or '--job-name' in sysargs:
 64        from meerschaum._internal.arguments._parse_arguments import parse_arguments
 65        _args = parse_arguments(sysargs)
 66    filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')]
 67    try:
 68        label = shlex.join(filtered_sysargs) if sysargs else None
 69    except Exception:
 70        label = ' '.join(filtered_sysargs) if sysargs else None
 71
 72    name = _args.get('name', None)
 73    daemon = None
 74    if name:
 75        try:
 76            daemon = Daemon(daemon_id=name)
 77        except Exception:
 78            daemon = None
 79
 80    if daemon is not None:
 81        existing_sysargs = daemon.properties['target']['args'][0]
 82        existing_kwargs = parse_arguments(existing_sysargs)
 83
 84        ### Remove sysargs because flags are aliased.
 85        _ = _args.pop('daemon', None)
 86        _ = _args.pop('sysargs', None)
 87        _ = _args.pop('filtered_sysargs', None)
 88        debug = _args.pop('debug', None)
 89        _args['sub_args'] = sorted(_args.get('sub_args', []))
 90        _ = existing_kwargs.pop('daemon', None)
 91        _ = existing_kwargs.pop('sysargs', None)
 92        _ = existing_kwargs.pop('filtered_sysargs', None)
 93        _ = existing_kwargs.pop('debug', None)
 94        existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', []))
 95
 96        ### Only run if the kwargs equal or no actions are provided.
 97        if existing_kwargs == _args or not _args.get('action', []):
 98            if daemon.status == 'running':
 99                return True, f"Daemon '{daemon}' is already running."
100            return daemon.run(
101                debug=debug,
102                allow_dirty_run=True,
103            )
104
105    success_tuple = run_daemon(
106        entry,
107        filtered_sysargs,
108        daemon_id=_args.get('name', None) if _args else None,
109        label=label,
110        keep_daemon_output=('--rm' not in (sysargs or [])),
111    )
112    return success_tuple

Parse sysargs and execute a Meerschaum action as a daemon.

Parameters
  • sysargs (Optional[List[str]], default None): The command line arguments used in a Meerschaum action.
Returns
  • A SuccessTuple.
def get_daemons() -> List[Daemon]:
163def get_daemons() -> List[Daemon]:
164    """
165    Return all existing Daemons, sorted by end time.
166    """
167    daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()]
168    daemons_status = {daemon: daemon.status for daemon in daemons}
169    running_daemons = {
170        daemon: daemons_status[daemon]
171        for daemon in daemons
172        if daemons_status[daemon] == 'running'
173    }
174    paused_daemons = {
175        daemon: daemons_status[daemon]
176        for daemon in daemons
177        if daemons_status[daemon] == 'paused'
178    }
179    stopped_daemons = {
180        daemon: daemons_status[daemon]
181        for daemon in daemons
182        if daemons_status[daemon] == 'stopped'
183    }
184    daemons_began = {
185        daemon: daemon.properties.get('process', {}).get('began', '9999')
186        for daemon in daemons
187    }
188    daemons_paused = {
189        daemon: daemon.properties.get('process', {}).get('paused', '9999')
190        for daemon in daemons
191    }
192    daemons_ended = {
193        daemon: daemon.properties.get('process', {}).get('ended', '9999')
194        for daemon in daemons
195    }
196    sorted_stopped_daemons = [
197        daemon
198        for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x])
199    ]
200    sorted_paused_daemons = [
201        daemon
202        for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x])
203    ]
204    sorted_running_daemons = [
205        daemon
206        for daemon in sorted(running_daemons, key=lambda x: daemons_began[x])
207    ]
208    return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons

Return all existing Daemons, sorted by end time.

def get_daemon_ids() -> List[str]:
211def get_daemon_ids() -> List[str]:
212    """
213    Return the IDs of all daemons on disk.
214    """
215    from meerschaum.config._paths import DAEMON_RESOURCES_PATH
216    return [
217        daemon_dir
218        for daemon_dir in sorted(os.listdir(DAEMON_RESOURCES_PATH))
219        if (DAEMON_RESOURCES_PATH / daemon_dir / 'properties.json').exists()
220    ]

Return the IDs of all daemons on disk.

def get_running_daemons( daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
223def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
224    """
225    Return a list of currently running daemons.
226    """
227    if daemons is None:
228        daemons = get_daemons()
229    return [
230        d
231        for d in daemons
232        if d.status == 'running'
233    ]

Return a list of currently running daemons.

def get_stopped_daemons( daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
249def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
250    """
251    Return a list of stopped daemons.
252    """
253    if daemons is None:
254        daemons = get_daemons()
255
256    return [
257        d
258        for d in daemons
259        if d.status == 'stopped'
260    ]

Return a list of stopped daemons.

def get_paused_daemons( daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
236def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
237    """
238    Return a list of active but paused daemons.
239    """
240    if daemons is None:
241        daemons = get_daemons()
242    return [
243        d
244        for d in daemons
245        if d.status == 'paused'
246    ]

Return a list of active but paused daemons.

def get_filtered_daemons( filter_list: Optional[List[str]] = None, warn: bool = False) -> List[Daemon]:
263def get_filtered_daemons(
264    filter_list: Optional[List[str]] = None,
265    warn: bool = False,
266) -> List[Daemon]:
267    """
268    Return a list of `Daemons` filtered by a list of `daemon_ids`.
269    Only `Daemons` that exist are returned.
270    
271    If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`).
272
273    Parameters
274    ----------
275    filter_list: Optional[List[str]], default None
276        List of `daemon_ids` to include. If `daemon_ids` is `None` or empty,
277        return all `Daemons`.
278
279    warn: bool, default False
280        If `True`, raise warnings for non-existent `daemon_ids`.
281
282    Returns
283    -------
284    A list of Daemon objects.
285
286    """
287    if not filter_list:
288        daemons = get_daemons()
289        return [d for d in daemons if not d.hidden]
290
291    from meerschaum.utils.warnings import warn as _warn
292    daemons = []
293    for d_id in filter_list:
294        try:
295            d = Daemon(daemon_id=d_id)
296            _exists = d.path.exists()
297        except Exception:
298            _exists = False
299        if not _exists:
300            if warn:
301                _warn(f"Daemon '{d_id}' does not exist.", stack=False)
302            continue
303        daemons.append(d)
304    return daemons

Return a list of Daemons filtered by a list of daemon_ids. Only Daemons that exist are returned.

If filter_list is None or empty, return all Daemons (from get_daemons()).

Parameters
  • filter_list (Optional[List[str]], default None): List of daemon_ids to include. If daemon_ids is None or empty, return all Daemons.
  • warn (bool, default False): If True, raise warnings for non-existent daemon_ids.
Returns
  • A list of Daemon objects.
def get_new_daemon_name() -> str:
117def get_new_daemon_name() -> str:
118    """
119    Generate a new random name until a unique one is found
120    (up to ~6000 maximum possibilities).
121    """
122    from meerschaum.config._paths import DAEMON_RESOURCES_PATH
123    existing_names = (
124        os.listdir(DAEMON_RESOURCES_PATH)
125        if DAEMON_RESOURCES_PATH.exists()
126        else []
127    )
128    while True:
129        _name = generate_random_name()
130        if _name in existing_names:
131            continue
132        break
133    return _name

Generate a new random name until a unique one is found (up to ~6000 maximum possibilities).

def run_daemon( func: Callable[[Any], Any], *args, daemon_id: Optional[str] = None, keep_daemon_output: bool = True, allow_dirty_run: bool = False, label: Optional[str] = None, **kw) -> Any:
140def run_daemon(
141    func: Callable[[Any], Any],
142    *args,
143    daemon_id: Optional[str] = None,
144    keep_daemon_output: bool = True,
145    allow_dirty_run: bool = False,
146    label: Optional[str] = None,
147    **kw
148) -> Any:
149    """Execute a function as a daemon."""
150    daemon = Daemon(
151        func,
152        daemon_id=daemon_id,
153        target_args=[arg for arg in args],
154        target_kw=kw,
155        label=label,
156    )
157    return daemon.run(
158        keep_daemon_output=keep_daemon_output,
159        allow_dirty_run=allow_dirty_run,
160    )

Execute a function as a daemon.

def running_in_daemon() -> bool:
307def running_in_daemon() -> bool:
308    """
309    Return whether the current thread is running in a Daemon context.
310    """
311    from meerschaum._internal.static import STATIC_CONFIG
312    daemon_env_var = STATIC_CONFIG['environment']['daemon_id']
313    return daemon_env_var in os.environ

Return whether the current thread is running in a Daemon context.

class Daemon:
  42class Daemon:
  43    """
  44    Daemonize Python functions into background processes.
  45
  46    Examples
  47    --------
  48    >>> import meerschaum as mrsm
  49    >>> from meerschaum.utils.daemons import Daemon
  50    >>> daemon = Daemon(print, ('hi',))
  51    >>> success, msg = daemon.run()
  52    >>> print(daemon.log_text)
  53
  54    2024-07-29 18:03 | hi
  55    2024-07-29 18:03 |
  56    >>> daemon.run(allow_dirty_run=True)
  57    >>> print(daemon.log_text)
  58
  59    2024-07-29 18:03 | hi
  60    2024-07-29 18:03 |
  61    2024-07-29 18:05 | hi
  62    2024-07-29 18:05 |
  63    >>> mrsm.pprint(daemon.properties)
  64    {
  65        'label': 'print',
  66        'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
  67        'result': None,
  68        'process': {'ended': '2024-07-29T18:03:33.752806'}
  69    }
  70
  71    """
  72
  73    def __new__(
  74        cls,
  75        *args,
  76        daemon_id: Optional[str] = None,
  77        **kw
  78    ):
  79        """
  80        If a daemon_id is provided and already exists, read from its pickle file.
  81        """
  82        instance = super(Daemon, cls).__new__(cls)
  83        if daemon_id is not None:
  84            instance.daemon_id = daemon_id
  85            if instance.pickle_path.exists():
  86                instance = instance.read_pickle()
  87        return instance
  88
  89    @classmethod
  90    def from_properties_file(cls, daemon_id: str) -> Daemon:
  91        """
  92        Return a Daemon from a properties dictionary.
  93        """
  94        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
  95        if not properties_path.exists():
  96            raise OSError(f"Properties file '{properties_path}' does not exist.")
  97
  98        try:
  99            with open(properties_path, 'r', encoding='utf-8') as f:
 100                properties = json.load(f)
 101        except Exception:
 102            properties = {}
 103
 104        if not properties:
 105            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
 106
 107        daemon_id = properties_path.parent.name
 108        target_cf = properties.get('target', {})
 109        target_module_name = target_cf.get('module', None)
 110        target_function_name = target_cf.get('name', None)
 111        target_args = target_cf.get('args', None)
 112        target_kw = target_cf.get('kw', None)
 113        label = properties.get('label', None)
 114
 115        if None in [
 116            target_module_name,
 117            target_function_name,
 118            target_args,
 119            target_kw,
 120        ]:
 121            raise ValueError("Missing target function information.")
 122
 123        target_module = importlib.import_module(target_module_name)
 124        target_function = getattr(target_module, target_function_name)
 125
 126        return Daemon(
 127            daemon_id=daemon_id,
 128            target=target_function,
 129            target_args=target_args,
 130            target_kw=target_kw,
 131            properties=properties,
 132            label=label,
 133        )
 134
 135
 136    def __init__(
 137        self,
 138        target: Optional[Callable[[Any], Any]] = None,
 139        target_args: Union[List[Any], Tuple[Any], None] = None,
 140        target_kw: Optional[Dict[str, Any]] = None,
 141        env: Optional[Dict[str, str]] = None,
 142        daemon_id: Optional[str] = None,
 143        label: Optional[str] = None,
 144        properties: Optional[Dict[str, Any]] = None,
 145        pickle: bool = True,
 146    ):
 147        """
 148        Parameters
 149        ----------
 150        target: Optional[Callable[[Any], Any]], default None,
 151            The function to execute in a child process.
 152
 153        target_args: Union[List[Any], Tuple[Any], None], default None
 154            Positional arguments to pass to the target function.
 155
 156        target_kw: Optional[Dict[str, Any]], default None
 157            Keyword arguments to pass to the target function.
 158
 159        env: Optional[Dict[str, str]], default None
 160            If provided, set these environment variables in the daemon process.
 161
 162        daemon_id: Optional[str], default None
 163            Build a `Daemon` from an existing `daemon_id`.
 164            If `daemon_id` is provided, other arguments are ignored and are derived
 165            from the existing pickled `Daemon`.
 166
 167        label: Optional[str], default None
 168            Label string to help identifiy a daemon.
 169            If `None`, use the function name instead.
 170
 171        properties: Optional[Dict[str, Any]], default None
 172            Override reading from the properties JSON by providing an existing dictionary.
 173        """
 174        _pickle = self.__dict__.get('_pickle', False)
 175        if daemon_id is not None:
 176            self.daemon_id = daemon_id
 177            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
 178
 179                if not self.properties_path.exists():
 180                    raise Exception(
 181                        f"Daemon '{self.daemon_id}' does not exist. "
 182                        + "Pass a target to create a new Daemon."
 183                    )
 184
 185                try:
 186                    new_daemon = self.from_properties_file(daemon_id)
 187                except Exception:
 188                    new_daemon = None
 189
 190                if new_daemon is not None:
 191                    new_daemon.write_pickle()
 192                    target = new_daemon.target
 193                    target_args = new_daemon.target_args
 194                    target_kw = new_daemon.target_kw
 195                    label = new_daemon.label
 196                    self._properties = new_daemon.properties
 197                else:
 198                    try:
 199                        self.properties_path.unlink()
 200                    except Exception:
 201                        pass
 202
 203                    raise Exception(
 204                        f"Could not recover daemon '{self.daemon_id}' "
 205                        + "from its properties file."
 206                    )
 207
 208        if 'target' not in self.__dict__:
 209            if target is None:
 210                error("Cannot create a Daemon without a target.")
 211            self.target = target
 212
 213        self.pickle = pickle
 214
 215        ### NOTE: We have to check self.__dict__ in case we un-pickling.
 216        if '_target_args' not in self.__dict__:
 217            self._target_args = target_args
 218        if '_target_kw' not in self.__dict__:
 219            self._target_kw = target_kw
 220
 221        if 'label' not in self.__dict__:
 222            if label is None:
 223                label = (
 224                    self.target.__name__ if '__name__' in self.target.__dir__()
 225                        else str(self.target)
 226                )
 227            self.label = label
 228        elif label is not None:
 229            self.label = label
 230
 231        if 'daemon_id' not in self.__dict__:
 232            self.daemon_id = get_new_daemon_name()
 233        if '_properties' not in self.__dict__:
 234            self._properties = properties
 235        elif properties:
 236            if self._properties is None:
 237                self._properties = {}
 238            self._properties.update(properties)
 239        if self._properties is None:
 240            self._properties = {}
 241
 242        self._properties.update({'label': self.label})
 243        if env:
 244            self._properties.update({'env': env})
 245
 246        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
 247        _ = self.process
 248
 249
 250    def _run_exit(
 251        self,
 252        keep_daemon_output: bool = True,
 253        allow_dirty_run: bool = False,
 254    ) -> Any:
 255        """Run the daemon's target function.
 256        NOTE: This WILL EXIT the parent process!
 257
 258        Parameters
 259        ----------
 260        keep_daemon_output: bool, default True
 261            If `False`, delete the daemon's output directory upon exiting.
 262
 263        allow_dirty_run, bool, default False:
 264            If `True`, run the daemon, even if the `daemon_id` directory exists.
 265            This option is dangerous because if the same `daemon_id` runs twice,
 266            the last to finish will overwrite the output of the first.
 267
 268        Returns
 269        -------
 270        Nothing — this will exit the parent process.
 271        """
 272        import platform
 273        import sys
 274        import os
 275        import traceback
 276        from meerschaum.utils.warnings import warn
 277        from meerschaum.config import get_config
 278        daemon = attempt_import('daemon')
 279        lines = get_config('jobs', 'terminal', 'lines')
 280        columns = get_config('jobs', 'terminal', 'columns')
 281
 282        if platform.system() == 'Windows':
 283            return False, "Windows is no longer supported."
 284
 285        self._setup(allow_dirty_run)
 286
 287        _daemons.append(self)
 288
 289        logs_cf = self.properties.get('logs', {})
 290        log_refresh_seconds = logs_cf.get('refresh_files_seconds', None)
 291        if log_refresh_seconds is None:
 292            log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds')
 293        write_timestamps = logs_cf.get('write_timestamps', None)
 294        if write_timestamps is None:
 295            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
 296
 297        self._log_refresh_timer = RepeatTimer(
 298            log_refresh_seconds,
 299            partial(self.rotating_log.refresh_files, start_interception=write_timestamps),
 300        )
 301
 302        capture_stdin = logs_cf.get('stdin', True)
 303        cwd = self.properties.get('cwd', os.getcwd())
 304
 305        ### NOTE: The SIGINT handler has been removed so that child processes may handle
 306        ###       KeyboardInterrupts themselves.
 307        ###       The previous aggressive approach was redundant because of the SIGTERM handler.
 308        self._daemon_context = daemon.DaemonContext(
 309            pidfile=self.pid_lock,
 310            stdout=self.rotating_log,
 311            stderr=self.rotating_log,
 312            stdin=(self.stdin_file if capture_stdin else None),
 313            working_directory=cwd,
 314            detach_process=True,
 315            files_preserve=list(self.rotating_log.subfile_objects.values()),
 316            signal_map={
 317                signal.SIGTERM: self._handle_sigterm,
 318            },
 319        )
 320
 321        if capture_stdin and sys.stdin is None:
 322            raise OSError("Cannot daemonize without stdin.")
 323
 324        try:
 325            os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns))
 326            with self._daemon_context:
 327                if capture_stdin:
 328                    sys.stdin = self.stdin_file
 329                _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None)
 330                os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id
 331                os.environ['PYTHONUNBUFFERED'] = '1'
 332
 333                ### Allow the user to override environment variables.
 334                env = self.properties.get('env', {})
 335                if env and isinstance(env, dict):
 336                    os.environ.update({str(k): str(v) for k, v in env.items()})
 337
 338                self.rotating_log.refresh_files(start_interception=True)
 339                result = None
 340                try:
 341                    with open(self.pid_path, 'w+', encoding='utf-8') as f:
 342                        f.write(str(os.getpid()))
 343
 344                    ### NOTE: The timer fails to start for remote actions to localhost.
 345                    try:
 346                        if not self._log_refresh_timer.is_running():
 347                            self._log_refresh_timer.start()
 348                    except Exception:
 349                        pass
 350
 351                    self.properties['result'] = None
 352                    self._capture_process_timestamp('began')
 353                    result = self.target(*self.target_args, **self.target_kw)
 354                    self.properties['result'] = result
 355                except (BrokenPipeError, KeyboardInterrupt, SystemExit):
 356                    result = False, traceback.format_exc()
 357                except Exception as e:
 358                    warn(
 359                        f"Exception in daemon target function: {traceback.format_exc()}",
 360                    )
 361                    result = False, str(e)
 362                finally:
 363                    _results[self.daemon_id] = result
 364                    self.properties['result'] = result
 365
 366                    if keep_daemon_output:
 367                        self._capture_process_timestamp('ended')
 368                    else:
 369                        self.cleanup()
 370
 371                    self._log_refresh_timer.cancel()
 372                    try:
 373                        if self.pid is None and self.pid_path.exists():
 374                            self.pid_path.unlink()
 375                    except Exception:
 376                        pass
 377
 378                    if is_success_tuple(result):
 379                        try:
 380                            mrsm.pprint(result)
 381                        except BrokenPipeError:
 382                            pass
 383
 384        except Exception:
 385            daemon_error = traceback.format_exc()
 386            from meerschaum.config.paths import DAEMON_ERROR_LOG_PATH
 387            with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
 388                f.write(
 389                    f"Error in Daemon '{self}':\n\n"
 390                    f"{sys.stdin=}\n"
 391                    f"{self.stdin_file_path=}\n"
 392                    f"{self.stdin_file_path.exists()=}\n\n"
 393                    f"{daemon_error}\n\n"
 394                )
 395            warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}")
 396
 397    def _capture_process_timestamp(
 398        self,
 399        process_key: str,
 400        write_properties: bool = True,
 401    ) -> None:
 402        """
 403        Record the current timestamp to the parameters `process:<process_key>`.
 404
 405        Parameters
 406        ----------
 407        process_key: str
 408            Under which key to store the timestamp.
 409
 410        write_properties: bool, default True
 411            If `True` persist the properties to disk immediately after capturing the timestamp.
 412        """
 413        if 'process' not in self.properties:
 414            self.properties['process'] = {}
 415
 416        if process_key not in ('began', 'ended', 'paused', 'stopped'):
 417            raise ValueError(f"Invalid key '{process_key}'.")
 418
 419        self.properties['process'][process_key] = (
 420            datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
 421        )
 422        if write_properties:
 423            self.write_properties()
 424
 425    def run(
 426        self,
 427        keep_daemon_output: bool = True,
 428        allow_dirty_run: bool = False,
 429        wait: bool = False,
 430        timeout: Union[int, float] = 4,
 431        debug: bool = False,
 432    ) -> SuccessTuple:
 433        """Run the daemon as a child process and continue executing the parent.
 434
 435        Parameters
 436        ----------
 437        keep_daemon_output: bool, default True
 438            If `False`, delete the daemon's output directory upon exiting.
 439
 440        allow_dirty_run: bool, default False
 441            If `True`, run the daemon, even if the `daemon_id` directory exists.
 442            This option is dangerous because if the same `daemon_id` runs concurrently,
 443            the last to finish will overwrite the output of the first.
 444
 445        wait: bool, default True
 446            If `True`, block until `Daemon.status` is running (or the timeout expires).
 447
 448        timeout: Union[int, float], default 4
 449            If `wait` is `True`, block for up to `timeout` seconds before returning a failure.
 450
 451        Returns
 452        -------
 453        A SuccessTuple indicating success.
 454
 455        """
 456        import platform
 457        if platform.system() == 'Windows':
 458            return False, "Cannot run background jobs on Windows."
 459
 460        ### The daemon might exist and be paused.
 461        if self.status == 'paused':
 462            return self.resume()
 463
 464        self._remove_stop_file()
 465        if self.status == 'running':
 466            return True, f"Daemon '{self}' is already running."
 467
 468        self.mkdir_if_not_exists(allow_dirty_run)
 469        _write_pickle_success_tuple = self.write_pickle()
 470        if not _write_pickle_success_tuple[0]:
 471            return _write_pickle_success_tuple
 472
 473        _launch_daemon_code = (
 474            "from meerschaum.utils.daemon import Daemon, _daemons; "
 475            f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
 476            f"_daemons['{self.daemon_id}'] = daemon; "
 477            f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
 478            "allow_dirty_run=True)"
 479        )
 480        env = dict(os.environ)
 481        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
 482        msg = (
 483            "Success"
 484            if _launch_success_bool
 485            else f"Failed to start daemon '{self.daemon_id}'."
 486        )
 487        if not wait or not _launch_success_bool:
 488            return _launch_success_bool, msg
 489
 490        timeout = self.get_timeout_seconds(timeout)
 491        check_timeout_interval = self.get_check_timeout_interval_seconds()
 492
 493        if not timeout:
 494            success = self.status == 'running'
 495            msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'."
 496            if success:
 497                self._capture_process_timestamp('began')
 498            return success, msg
 499
 500        begin = time.perf_counter()
 501        while (time.perf_counter() - begin) < timeout:
 502            if self.status == 'running':
 503                self._capture_process_timestamp('began')
 504                return True, "Success"
 505            time.sleep(check_timeout_interval)
 506
 507        return False, (
 508            f"Failed to start daemon '{self.daemon_id}' within {timeout} second"
 509            + ('s' if timeout != 1 else '') + '.'
 510        )
 511
 512
 513    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
 514        """
 515        Forcibly terminate a running daemon.
 516        Sends a SIGTERM signal to the process.
 517
 518        Parameters
 519        ----------
 520        timeout: Optional[int], default 3
 521            How many seconds to wait for the process to terminate.
 522
 523        Returns
 524        -------
 525        A SuccessTuple indicating success.
 526        """
 527        if self.status != 'paused':
 528            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
 529            if success:
 530                self._write_stop_file('kill')
 531                self.stdin_file.close()
 532                self._remove_blocking_stdin_file()
 533                return success, msg
 534
 535        if self.status == 'stopped':
 536            self._write_stop_file('kill')
 537            self.stdin_file.close()
 538            self._remove_blocking_stdin_file()
 539            return True, "Process has already stopped."
 540
 541        psutil = attempt_import('psutil')
 542        process = self.process
 543        try:
 544            process.terminate()
 545            process.kill()
 546            process.wait(timeout=timeout)
 547        except Exception as e:
 548            return False, f"Failed to kill job {self} ({process}) with exception: {e}"
 549
 550        try:
 551            if process.status():
 552                return False, "Failed to stop daemon '{self}' ({process})."
 553        except psutil.NoSuchProcess:
 554            pass
 555
 556        if self.pid_path.exists():
 557            try:
 558                self.pid_path.unlink()
 559            except Exception:
 560                pass
 561
 562        self._write_stop_file('kill')
 563        self.stdin_file.close()
 564        self._remove_blocking_stdin_file()
 565        return True, "Success"
 566
 567    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
 568        """Gracefully quit a running daemon."""
 569        if self.status == 'paused':
 570            return self.kill(timeout)
 571
 572        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
 573        if signal_success:
 574            self._write_stop_file('quit')
 575            self.stdin_file.close()
 576            self._remove_blocking_stdin_file()
 577        return signal_success, signal_msg
 578
 579    def pause(
 580        self,
 581        timeout: Union[int, float, None] = None,
 582        check_timeout_interval: Union[float, int, None] = None,
 583    ) -> SuccessTuple:
 584        """
 585        Pause the daemon if it is running.
 586
 587        Parameters
 588        ----------
 589        timeout: Union[float, int, None], default None
 590            The maximum number of seconds to wait for a process to suspend.
 591
 592        check_timeout_interval: Union[float, int, None], default None
 593            The number of seconds to wait between checking if the process is still running.
 594
 595        Returns
 596        -------
 597        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
 598        """
 599        self._remove_blocking_stdin_file()
 600
 601        if self.process is None:
 602            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
 603
 604        if self.status == 'paused':
 605            return True, f"Daemon '{self.daemon_id}' is already paused."
 606
 607        self._write_stop_file('pause')
 608        self.stdin_file.close()
 609        self._remove_blocking_stdin_file()
 610        try:
 611            self.process.suspend()
 612        except Exception as e:
 613            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
 614
 615        timeout = self.get_timeout_seconds(timeout)
 616        check_timeout_interval = self.get_check_timeout_interval_seconds(
 617            check_timeout_interval
 618        )
 619
 620        psutil = attempt_import('psutil')
 621
 622        if not timeout:
 623            try:
 624                success = self.process.status() == 'stopped'
 625            except psutil.NoSuchProcess:
 626                success = True
 627            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
 628            if success:
 629                self._capture_process_timestamp('paused')
 630            return success, msg
 631
 632        begin = time.perf_counter()
 633        while (time.perf_counter() - begin) < timeout:
 634            try:
 635                if self.process.status() == 'stopped':
 636                    self._capture_process_timestamp('paused')
 637                    return True, "Success"
 638            except psutil.NoSuchProcess as e:
 639                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
 640            time.sleep(check_timeout_interval)
 641
 642        return False, (
 643            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
 644            + ('s' if timeout != 1 else '') + '.'
 645        )
 646
 647    def resume(
 648        self,
 649        timeout: Union[int, float, None] = None,
 650        check_timeout_interval: Union[float, int, None] = None,
 651    ) -> SuccessTuple:
 652        """
 653        Resume the daemon if it is paused.
 654
 655        Parameters
 656        ----------
 657        timeout: Union[float, int, None], default None
 658            The maximum number of seconds to wait for a process to resume.
 659
 660        check_timeout_interval: Union[float, int, None], default None
 661            The number of seconds to wait between checking if the process is still stopped.
 662
 663        Returns
 664        -------
 665        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
 666        """
 667        if self.status == 'running':
 668            return True, f"Daemon '{self.daemon_id}' is already running."
 669
 670        if self.status == 'stopped':
 671            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
 672
 673        self._remove_stop_file()
 674        try:
 675            if self.process is None:
 676                return False, f"Cannot resume daemon '{self.daemon_id}'."
 677
 678            self.process.resume()
 679        except Exception as e:
 680            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
 681
 682        timeout = self.get_timeout_seconds(timeout)
 683        check_timeout_interval = self.get_check_timeout_interval_seconds(
 684            check_timeout_interval
 685        )
 686
 687        if not timeout:
 688            success = self.status == 'running'
 689            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
 690            if success:
 691                self._capture_process_timestamp('began')
 692            return success, msg
 693
 694        begin = time.perf_counter()
 695        while (time.perf_counter() - begin) < timeout:
 696            if self.status == 'running':
 697                self._capture_process_timestamp('began')
 698                return True, "Success"
 699            time.sleep(check_timeout_interval)
 700
 701        return False, (
 702            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
 703            + ('s' if timeout != 1 else '') + '.'
 704        )
 705
 706    def _write_stop_file(self, action: str) -> SuccessTuple:
 707        """Write the stop file timestamp and action."""
 708        if action not in ('quit', 'kill', 'pause'):
 709            return False, f"Unsupported action '{action}'."
 710
 711        if not self.stop_path.parent.exists():
 712            self.stop_path.parent.mkdir(parents=True, exist_ok=True)
 713
 714        with open(self.stop_path, 'w+', encoding='utf-8') as f:
 715            json.dump(
 716                {
 717                    'stop_time': datetime.now(timezone.utc).isoformat(),
 718                    'action': action,
 719                },
 720                f
 721            )
 722
 723        return True, "Success"
 724
 725    def _remove_stop_file(self) -> SuccessTuple:
 726        """Remove the stop file"""
 727        if not self.stop_path.exists():
 728            return True, "Stop file does not exist."
 729
 730        try:
 731            self.stop_path.unlink()
 732        except Exception as e:
 733            return False, f"Failed to remove stop file:\n{e}"
 734
 735        return True, "Success"
 736
 737    def _read_stop_file(self) -> Dict[str, Any]:
 738        """
 739        Read the stop file if it exists.
 740        """
 741        if not self.stop_path.exists():
 742            return {}
 743
 744        try:
 745            with open(self.stop_path, 'r', encoding='utf-8') as f:
 746                data = json.load(f)
 747            return data
 748        except Exception:
 749            return {}
 750
 751    def _remove_blocking_stdin_file(self) -> mrsm.SuccessTuple:
 752        """
 753        Remove the blocking STDIN file if it exists.
 754        """
 755        try:
 756            if self.blocking_stdin_file_path.exists():
 757                self.blocking_stdin_file_path.unlink()
 758        except Exception as e:
 759            return False, str(e)
 760
 761        return True, "Success"
 762
 763    def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None:
 764        """
 765        Handle `SIGTERM` within the `Daemon` context.
 766        This method is injected into the `DaemonContext`.
 767        """
 768        from meerschaum.utils.process import signal_handler
 769        signal_handler(signal_number, stack_frame)
 770
 771        timer = self.__dict__.get('_log_refresh_timer', None)
 772        if timer is not None:
 773            timer.cancel()
 774
 775        daemon_context = self.__dict__.get('_daemon_context', None)
 776        if daemon_context is not None:
 777            daemon_context.close()
 778
 779        _close_pools()
 780        raise SystemExit(0)
 781
 782    def _send_signal(
 783        self,
 784        signal_to_send,
 785        timeout: Union[float, int, None] = None,
 786        check_timeout_interval: Union[float, int, None] = None,
 787    ) -> SuccessTuple:
 788        """Send a signal to the daemon process.
 789
 790        Parameters
 791        ----------
 792        signal_to_send:
 793            The signal the send to the daemon, e.g. `signals.SIGINT`.
 794
 795        timeout: Union[float, int, None], default None
 796            The maximum number of seconds to wait for a process to terminate.
 797
 798        check_timeout_interval: Union[float, int, None], default None
 799            The number of seconds to wait between checking if the process is still running.
 800
 801        Returns
 802        -------
 803        A SuccessTuple indicating success.
 804        """
 805        try:
 806            pid = self.pid
 807            if pid is None:
 808                return (
 809                    False,
 810                    f"Daemon '{self.daemon_id}' is not running, "
 811                    + f"cannot send signal '{signal_to_send}'."
 812                )
 813            
 814            os.kill(pid, signal_to_send)
 815        except Exception:
 816            return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}"
 817
 818        timeout = self.get_timeout_seconds(timeout)
 819        check_timeout_interval = self.get_check_timeout_interval_seconds(
 820            check_timeout_interval
 821        )
 822
 823        if not timeout:
 824            return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'."
 825
 826        begin = time.perf_counter()
 827        while (time.perf_counter() - begin) < timeout:
 828            if not self.status == 'running':
 829                return True, "Success"
 830            time.sleep(check_timeout_interval)
 831
 832        return False, (
 833            f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second"
 834            + ('s' if timeout != 1 else '') + '.'
 835        )
 836
 837    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
 838        """Create the Daemon's directory.
 839        If `allow_dirty_run` is `False` and the directory already exists,
 840        raise a `FileExistsError`.
 841        """
 842        try:
 843            self.path.mkdir(parents=True, exist_ok=True)
 844            _already_exists = any(os.scandir(self.path))
 845        except FileExistsError:
 846            _already_exists = True
 847
 848        if _already_exists and not allow_dirty_run:
 849            error(
 850                f"Daemon '{self.daemon_id}' already exists. " +
 851                "To allow this daemon to run, do one of the following:\n"
 852                + "  - Execute `daemon.cleanup()`.\n"
 853                + f"  - Delete the directory '{self.path}'.\n"
 854                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
 855                FileExistsError,
 856            )
 857
 858    @property
 859    def process(self) -> Union['psutil.Process', None]:
 860        """
 861        Return the psutil process for the Daemon.
 862        """
 863        psutil = attempt_import('psutil')
 864        pid = self.pid
 865        if pid is None:
 866            return None
 867        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
 868            try:
 869                self._process = psutil.Process(int(pid))
 870                process_exists = True
 871            except Exception:
 872                process_exists = False
 873            if not process_exists:
 874                _ = self.__dict__.pop('_process', None)
 875                try:
 876                    if self.pid_path.exists():
 877                        self.pid_path.unlink()
 878                except Exception:
 879                    pass
 880                return None
 881        return self._process
 882
 883    @property
 884    def status(self) -> str:
 885        """
 886        Return the running status of this Daemon.
 887        """
 888        if self.process is None:
 889            return 'stopped'
 890
 891        psutil = attempt_import('psutil', lazy=False)
 892        try:
 893            if self.process.status() == 'stopped':
 894                return 'paused'
 895            if self.process.status() == 'zombie':
 896                raise psutil.NoSuchProcess(self.process.pid)
 897        except (psutil.NoSuchProcess, AttributeError):
 898            if self.pid_path.exists():
 899                try:
 900                    self.pid_path.unlink()
 901                except Exception:
 902                    pass
 903            return 'stopped'
 904
 905        return 'running'
 906
 907    @classmethod
 908    def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 909        """
 910        Return a Daemon's path from its `daemon_id`.
 911        """
 912        from meerschaum.config.paths import DAEMON_RESOURCES_PATH
 913        return DAEMON_RESOURCES_PATH / daemon_id
 914
 915    @property
 916    def path(self) -> pathlib.Path:
 917        """
 918        Return the path for this Daemon's directory.
 919        """
 920        return self._get_path_from_daemon_id(self.daemon_id)
 921
 922    @classmethod
 923    def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 924        """
 925        Return the `properties.json` path for a given `daemon_id`.
 926        """
 927        return cls._get_path_from_daemon_id(daemon_id) / 'properties.json'
 928
 929    @property
 930    def properties_path(self) -> pathlib.Path:
 931        """
 932        Return the `propterties.json` path for this Daemon.
 933        """
 934        return self._get_properties_path_from_daemon_id(self.daemon_id)
 935
 936    @property
 937    def stop_path(self) -> pathlib.Path:
 938        """
 939        Return the path for the stop file (created when manually stopped).
 940        """
 941        return self.path / '.stop.json'
 942
 943    @property
 944    def log_path(self) -> pathlib.Path:
 945        """
 946        Return the log path.
 947        """
 948        logs_cf = self.properties.get('logs', None) or {}
 949        if 'path' not in logs_cf:
 950            from meerschaum.config.paths import LOGS_RESOURCES_PATH
 951            return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
 952
 953        return pathlib.Path(logs_cf['path'])
 954
 955    @property
 956    def stdin_file_path(self) -> pathlib.Path:
 957        """
 958        Return the stdin file path.
 959        """
 960        return self.path / 'input.stdin'
 961
 962    @property
 963    def blocking_stdin_file_path(self) -> pathlib.Path:
 964        """
 965        Return the stdin file path.
 966        """
 967        if '_blocking_stdin_file_path' in self.__dict__:
 968            return self._blocking_stdin_file_path
 969
 970        return self.path / 'input.stdin.block'
 971
 972    @property
 973    def prompt_kwargs_file_path(self) -> pathlib.Path:
 974        """
 975        Return the file path to the kwargs for the invoking `prompt()`.
 976        """
 977        return self.path / 'prompt_kwargs.json'
 978
 979    @property
 980    def log_offset_path(self) -> pathlib.Path:
 981        """
 982        Return the log offset file path.
 983        """
 984        from meerschaum.config.paths import LOGS_RESOURCES_PATH
 985        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
 986
 987    @property
 988    def log_offset_lock(self) -> 'fasteners.InterProcessLock':
 989        """
 990        Return the process lock context manager.
 991        """
 992        if '_log_offset_lock' in self.__dict__:
 993            return self._log_offset_lock
 994
 995        fasteners = attempt_import('fasteners')
 996        self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path)
 997        return self._log_offset_lock
 998
 999    @property
1000    def rotating_log(self) -> RotatingFile:
1001        """
1002        The rotating log file for the daemon's output.
1003        """
1004        if '_rotating_log' in self.__dict__:
1005            return self._rotating_log
1006
1007        logs_cf = self.properties.get('logs', None) or {}
1008        write_timestamps = logs_cf.get('write_timestamps', None)
1009        if write_timestamps is None:
1010            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1011
1012        timestamp_format = logs_cf.get('timestamp_format', None)
1013        if timestamp_format is None:
1014            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1015
1016        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1017        if num_files_to_keep is None:
1018            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1019
1020        max_file_size = logs_cf.get('max_file_size', None)
1021        if max_file_size is None:
1022            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1023
1024        redirect_streams = logs_cf.get('redirect_streams', True)
1025
1026        self._rotating_log = RotatingFile(
1027            self.log_path,
1028            redirect_streams=redirect_streams,
1029            write_timestamps=write_timestamps,
1030            timestamp_format=timestamp_format,
1031            num_files_to_keep=num_files_to_keep,
1032            max_file_size=max_file_size,
1033        )
1034        return self._rotating_log
1035
1036    @property
1037    def stdin_file(self):
1038        """
1039        Return the file handler for the stdin file.
1040        """
1041        if (_stdin_file := self.__dict__.get('_stdin_file', None)):
1042            return _stdin_file
1043
1044        self._stdin_file = StdinFile(
1045            self.stdin_file_path,
1046            lock_file_path=self.blocking_stdin_file_path,
1047        )
1048        return self._stdin_file
1049
1050    @property
1051    def log_text(self) -> Union[str, None]:
1052        """
1053        Read the log files and return their contents.
1054        Returns `None` if the log file does not exist.
1055        """
1056        logs_cf = self.properties.get('logs', None) or {}
1057        write_timestamps = logs_cf.get('write_timestamps', None)
1058        if write_timestamps is None:
1059            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1060
1061        timestamp_format = logs_cf.get('timestamp_format', None)
1062        if timestamp_format is None:
1063            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1064
1065        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1066        if num_files_to_keep is None:
1067            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1068
1069        max_file_size = logs_cf.get('max_file_size', None)
1070        if max_file_size is None:
1071            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1072
1073        new_rotating_log = RotatingFile(
1074            self.rotating_log.file_path,
1075            num_files_to_keep=num_files_to_keep,
1076            max_file_size=max_file_size,
1077            write_timestamps=write_timestamps,
1078            timestamp_format=timestamp_format,
1079        )
1080        return new_rotating_log.read()
1081
1082    def readlines(self) -> List[str]:
1083        """
1084        Read the next log lines, persisting the cursor for later use.
1085        Note this will alter the cursor of `self.rotating_log`.
1086        """
1087        self.rotating_log._cursor = self._read_log_offset()
1088        lines = self.rotating_log.readlines()
1089        self._write_log_offset()
1090        return lines
1091
1092    def _read_log_offset(self) -> Tuple[int, int]:
1093        """
1094        Return the current log offset cursor.
1095
1096        Returns
1097        -------
1098        A tuple of the form (`subfile_index`, `position`).
1099        """
1100        if not self.log_offset_path.exists():
1101            return 0, 0
1102
1103        try:
1104            with open(self.log_offset_path, 'r', encoding='utf-8') as f:
1105                cursor_text = f.read()
1106            cursor_parts = cursor_text.split(' ')
1107            subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1])
1108            return subfile_index, subfile_position
1109        except Exception as e:
1110            warn(f"Failed to read cursor:\n{e}")
1111        return 0, 0
1112
1113    def _write_log_offset(self) -> None:
1114        """
1115        Write the current log offset file.
1116        """
1117        with self.log_offset_lock:
1118            with open(self.log_offset_path, 'w+', encoding='utf-8') as f:
1119                subfile_index = self.rotating_log._cursor[0]
1120                subfile_position = self.rotating_log._cursor[1]
1121                f.write(f"{subfile_index} {subfile_position}")
1122
1123    @property
1124    def pid(self) -> Union[int, None]:
1125        """
1126        Read the PID file and return its contents.
1127        Returns `None` if the PID file does not exist.
1128        """
1129        if not self.pid_path.exists():
1130            return None
1131        try:
1132            with open(self.pid_path, 'r', encoding='utf-8') as f:
1133                text = f.read()
1134            if len(text) == 0:
1135                return None
1136            pid = int(text.rstrip())
1137        except Exception as e:
1138            warn(e)
1139            text = None
1140            pid = None
1141        return pid
1142
1143    @property
1144    def pid_path(self) -> pathlib.Path:
1145        """
1146        Return the path to a file containing the PID for this Daemon.
1147        """
1148        return self.path / 'process.pid'
1149
1150    @property
1151    def pid_lock(self) -> 'fasteners.InterProcessLock':
1152        """
1153        Return the process lock context manager.
1154        """
1155        if '_pid_lock' in self.__dict__:
1156            return self._pid_lock
1157
1158        fasteners = attempt_import('fasteners')
1159        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
1160        return self._pid_lock
1161
1162    @property
1163    def pickle_path(self) -> pathlib.Path:
1164        """
1165        Return the path for the pickle file.
1166        """
1167        return self.path / 'pickle.pkl'
1168
1169    def read_properties(self) -> Optional[Dict[str, Any]]:
1170        """Read the properties JSON file and return the dictionary."""
1171        if not self.properties_path.exists():
1172            return None
1173        try:
1174            with open(self.properties_path, 'r', encoding='utf-8') as file:
1175                properties = json.load(file)
1176        except Exception:
1177            properties = {}
1178        
1179        return properties or {}
1180
1181    def read_pickle(self) -> Daemon:
1182        """Read a Daemon's pickle file and return the `Daemon`."""
1183        import pickle
1184        import traceback
1185        if not self.pickle_path.exists():
1186            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1187
1188        if self.pickle_path.stat().st_size == 0:
1189            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1190
1191        try:
1192            with open(self.pickle_path, 'rb') as pickle_file:
1193                daemon = pickle.load(pickle_file)
1194            success, msg = True, 'Success'
1195        except Exception as e:
1196            success, msg = False, str(e)
1197            daemon = None
1198            traceback.print_exception(type(e), e, e.__traceback__)
1199        if not success:
1200            error(msg)
1201        return daemon
1202
1203    @property
1204    def properties(self) -> Dict[str, Any]:
1205        """
1206        Return the contents of the properties JSON file.
1207        """
1208        try:
1209            _file_properties = self.read_properties() or {}
1210        except Exception:
1211            traceback.print_exc()
1212            _file_properties = {}
1213
1214        if not self._properties:
1215            self._properties = _file_properties
1216
1217        if self._properties is None:
1218            self._properties = {}
1219
1220        if (
1221            self._properties.get('result', None) is None
1222            and _file_properties.get('result', None) is not None
1223        ):
1224            _ = self._properties.pop('result', None)
1225
1226        if _file_properties is not None:
1227            self._properties = apply_patch_to_config(
1228                _file_properties,
1229                self._properties,
1230            )
1231
1232        return self._properties
1233
1234    @property
1235    def hidden(self) -> bool:
1236        """
1237        Return a bool indicating whether this Daemon should be displayed.
1238        """
1239        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')
1240
1241    def write_properties(self) -> SuccessTuple:
1242        """Write the properties dictionary to the properties JSON file
1243        (only if self.properties exists).
1244        """
1245        from meerschaum.utils.misc import generate_password
1246        success, msg = (
1247            False,
1248            f"No properties to write for daemon '{self.daemon_id}'."
1249        )
1250        backup_path = self.properties_path.parent / (generate_password(8) + '.json')
1251        props = self.properties
1252        if props is not None:
1253            try:
1254                self.path.mkdir(parents=True, exist_ok=True)
1255                if self.properties_path.exists():
1256                    self.properties_path.rename(backup_path)
1257                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1258                    json.dump(props, properties_file)
1259                success, msg = True, 'Success'
1260            except Exception as e:
1261                success, msg = False, str(e)
1262
1263        try:
1264            if backup_path.exists():
1265                if not success:
1266                    backup_path.rename(self.properties_path)
1267                else:
1268                    backup_path.unlink()
1269        except Exception as e:
1270            success, msg = False, str(e)
1271
1272        return success, msg
1273
1274    def write_pickle(self) -> SuccessTuple:
1275        """Write the pickle file for the daemon."""
1276        import pickle
1277        import traceback
1278        from meerschaum.utils.misc import generate_password
1279
1280        if not self.pickle:
1281            return True, "Success"
1282
1283        backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl')
1284        try:
1285            self.path.mkdir(parents=True, exist_ok=True)
1286            if self.pickle_path.exists():
1287                self.pickle_path.rename(backup_path)
1288            with open(self.pickle_path, 'wb+') as pickle_file:
1289                pickle.dump(self, pickle_file)
1290            success, msg = True, "Success"
1291        except Exception as e:
1292            success, msg = False, str(e)
1293            traceback.print_exception(type(e), e, e.__traceback__)
1294        try:
1295            if backup_path.exists():
1296                if not success:
1297                    backup_path.rename(self.pickle_path)
1298                else:
1299                    backup_path.unlink()
1300        except Exception as e:
1301            success, msg = False, str(e)
1302        return success, msg
1303
1304
1305    def _setup(
1306        self,
1307        allow_dirty_run: bool = False,
1308    ) -> None:
1309        """
1310        Update properties before starting the Daemon.
1311        """
1312        if self.properties is None:
1313            self._properties = {}
1314
1315        self._properties.update({
1316            'target': {
1317                'name': self.target.__name__,
1318                'module': self.target.__module__,
1319                'args': self.target_args,
1320                'kw': self.target_kw,
1321            },
1322        })
1323        self.mkdir_if_not_exists(allow_dirty_run)
1324        _write_properties_success_tuple = self.write_properties()
1325        if not _write_properties_success_tuple[0]:
1326            error(_write_properties_success_tuple[1])
1327
1328        _write_pickle_success_tuple = self.write_pickle()
1329        if not _write_pickle_success_tuple[0]:
1330            error(_write_pickle_success_tuple[1])
1331
1332    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1333        """
1334        Remove a daemon's directory after execution.
1335
1336        Parameters
1337        ----------
1338        keep_logs: bool, default False
1339            If `True`, skip deleting the daemon's log files.
1340
1341        Returns
1342        -------
1343        A `SuccessTuple` indicating success.
1344        """
1345        if self.path.exists():
1346            try:
1347                shutil.rmtree(self.path)
1348            except Exception as e:
1349                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1350                warn(msg)
1351                return False, msg
1352        if not keep_logs:
1353            self.rotating_log.delete()
1354            try:
1355                if self.log_offset_path.exists():
1356                    self.log_offset_path.unlink()
1357            except Exception as e:
1358                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1359                warn(msg)
1360                return False, msg
1361        return True, "Success"
1362
1363
1364    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1365        """
1366        Return the timeout value to use. Use `--timeout-seconds` if provided,
1367        else the configured default (8).
1368        """
1369        if isinstance(timeout, (int, float)):
1370            return timeout
1371        return get_config('jobs', 'timeout_seconds')
1372
1373
1374    def get_check_timeout_interval_seconds(
1375        self,
1376        check_timeout_interval: Union[int, float, None] = None,
1377    ) -> Union[int, float]:
1378        """
1379        Return the interval value to check the status of timeouts.
1380        """
1381        if isinstance(check_timeout_interval, (int, float)):
1382            return check_timeout_interval
1383        return get_config('jobs', 'check_timeout_interval_seconds')
1384
1385    @property
1386    def target_args(self) -> Union[Tuple[Any], None]:
1387        """
1388        Return the positional arguments to pass to the target function.
1389        """
1390        target_args = (
1391            self.__dict__.get('_target_args', None)
1392            or self.properties.get('target', {}).get('args', None)
1393        )
1394        if target_args is None:
1395            return tuple([])
1396
1397        return tuple(target_args)
1398
1399    @property
1400    def target_kw(self) -> Union[Dict[str, Any], None]:
1401        """
1402        Return the keyword arguments to pass to the target function.
1403        """
1404        target_kw = (
1405            self.__dict__.get('_target_kw', None)
1406            or self.properties.get('target', {}).get('kw', None)
1407        )
1408        if target_kw is None:
1409            return {}
1410
1411        return {key: val for key, val in target_kw.items()}
1412
1413    def __getstate__(self):
1414        """
1415        Pickle this Daemon.
1416        """
1417        dill = attempt_import('dill')
1418        return {
1419            'target': dill.dumps(self.target),
1420            'target_args': self.target_args,
1421            'target_kw': self.target_kw,
1422            'daemon_id': self.daemon_id,
1423            'label': self.label,
1424            'properties': self.properties,
1425        }
1426
1427    def __setstate__(self, _state: Dict[str, Any]):
1428        """
1429        Restore this Daemon from a pickled state.
1430        If the properties file exists, skip the old pickled version.
1431        """
1432        dill = attempt_import('dill')
1433        _state['target'] = dill.loads(_state['target'])
1434        self._pickle = True
1435        daemon_id = _state.get('daemon_id', None)
1436        if not daemon_id:
1437            raise ValueError("Need a daemon_id to un-pickle a Daemon.")
1438
1439        properties_path = self._get_properties_path_from_daemon_id(daemon_id)
1440        ignore_properties = properties_path.exists()
1441        if ignore_properties:
1442            _state = {
1443                key: val
1444                for key, val in _state.items()
1445                if key != 'properties'
1446            }
1447        self.__init__(**_state)
1448
1449
1450    def __repr__(self):
1451        return str(self)
1452
1453    def __str__(self):
1454        return self.daemon_id
1455
1456    def __eq__(self, other):
1457        if not isinstance(other, Daemon):
1458            return False
1459        return self.daemon_id == other.daemon_id
1460
1461    def __hash__(self):
1462        return hash(self.daemon_id)

Daemonize Python functions into background processes.

Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 |

>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |

>>> mrsm.pprint(daemon.properties)
{
    'label': 'print',
    'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
    'result': None,
    'process': {'ended': '2024-07-29T18:03:33.752806'}
}
Daemon( target: Optional[Callable[[Any], Any]] = None, target_args: Union[List[Any], Tuple[Any], NoneType] = None, target_kw: Optional[Dict[str, Any]] = None, env: Optional[Dict[str, str]] = None, daemon_id: Optional[str] = None, label: Optional[str] = None, properties: Optional[Dict[str, Any]] = None, pickle: bool = True)
136    def __init__(
137        self,
138        target: Optional[Callable[[Any], Any]] = None,
139        target_args: Union[List[Any], Tuple[Any], None] = None,
140        target_kw: Optional[Dict[str, Any]] = None,
141        env: Optional[Dict[str, str]] = None,
142        daemon_id: Optional[str] = None,
143        label: Optional[str] = None,
144        properties: Optional[Dict[str, Any]] = None,
145        pickle: bool = True,
146    ):
147        """
148        Parameters
149        ----------
150        target: Optional[Callable[[Any], Any]], default None,
151            The function to execute in a child process.
152
153        target_args: Union[List[Any], Tuple[Any], None], default None
154            Positional arguments to pass to the target function.
155
156        target_kw: Optional[Dict[str, Any]], default None
157            Keyword arguments to pass to the target function.
158
159        env: Optional[Dict[str, str]], default None
160            If provided, set these environment variables in the daemon process.
161
162        daemon_id: Optional[str], default None
163            Build a `Daemon` from an existing `daemon_id`.
164            If `daemon_id` is provided, other arguments are ignored and are derived
165            from the existing pickled `Daemon`.
166
167        label: Optional[str], default None
168            Label string to help identifiy a daemon.
169            If `None`, use the function name instead.
170
171        properties: Optional[Dict[str, Any]], default None
172            Override reading from the properties JSON by providing an existing dictionary.
173        """
174        _pickle = self.__dict__.get('_pickle', False)
175        if daemon_id is not None:
176            self.daemon_id = daemon_id
177            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
178
179                if not self.properties_path.exists():
180                    raise Exception(
181                        f"Daemon '{self.daemon_id}' does not exist. "
182                        + "Pass a target to create a new Daemon."
183                    )
184
185                try:
186                    new_daemon = self.from_properties_file(daemon_id)
187                except Exception:
188                    new_daemon = None
189
190                if new_daemon is not None:
191                    new_daemon.write_pickle()
192                    target = new_daemon.target
193                    target_args = new_daemon.target_args
194                    target_kw = new_daemon.target_kw
195                    label = new_daemon.label
196                    self._properties = new_daemon.properties
197                else:
198                    try:
199                        self.properties_path.unlink()
200                    except Exception:
201                        pass
202
203                    raise Exception(
204                        f"Could not recover daemon '{self.daemon_id}' "
205                        + "from its properties file."
206                    )
207
208        if 'target' not in self.__dict__:
209            if target is None:
210                error("Cannot create a Daemon without a target.")
211            self.target = target
212
213        self.pickle = pickle
214
215        ### NOTE: We have to check self.__dict__ in case we un-pickling.
216        if '_target_args' not in self.__dict__:
217            self._target_args = target_args
218        if '_target_kw' not in self.__dict__:
219            self._target_kw = target_kw
220
221        if 'label' not in self.__dict__:
222            if label is None:
223                label = (
224                    self.target.__name__ if '__name__' in self.target.__dir__()
225                        else str(self.target)
226                )
227            self.label = label
228        elif label is not None:
229            self.label = label
230
231        if 'daemon_id' not in self.__dict__:
232            self.daemon_id = get_new_daemon_name()
233        if '_properties' not in self.__dict__:
234            self._properties = properties
235        elif properties:
236            if self._properties is None:
237                self._properties = {}
238            self._properties.update(properties)
239        if self._properties is None:
240            self._properties = {}
241
242        self._properties.update({'label': self.label})
243        if env:
244            self._properties.update({'env': env})
245
246        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
247        _ = self.process
Parameters
  • target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
  • target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
  • target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
  • env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
  • daemon_id (Optional[str], default None): Build a Daemon from an existing daemon_id. If daemon_id is provided, other arguments are ignored and are derived from the existing pickled Daemon.
  • label (Optional[str], default None): Label string to help identifiy a daemon. If None, use the function name instead.
  • properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
@classmethod
def from_properties_file(cls, daemon_id: str) -> Daemon:
 89    @classmethod
 90    def from_properties_file(cls, daemon_id: str) -> Daemon:
 91        """
 92        Return a Daemon from a properties dictionary.
 93        """
 94        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
 95        if not properties_path.exists():
 96            raise OSError(f"Properties file '{properties_path}' does not exist.")
 97
 98        try:
 99            with open(properties_path, 'r', encoding='utf-8') as f:
100                properties = json.load(f)
101        except Exception:
102            properties = {}
103
104        if not properties:
105            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
106
107        daemon_id = properties_path.parent.name
108        target_cf = properties.get('target', {})
109        target_module_name = target_cf.get('module', None)
110        target_function_name = target_cf.get('name', None)
111        target_args = target_cf.get('args', None)
112        target_kw = target_cf.get('kw', None)
113        label = properties.get('label', None)
114
115        if None in [
116            target_module_name,
117            target_function_name,
118            target_args,
119            target_kw,
120        ]:
121            raise ValueError("Missing target function information.")
122
123        target_module = importlib.import_module(target_module_name)
124        target_function = getattr(target_module, target_function_name)
125
126        return Daemon(
127            daemon_id=daemon_id,
128            target=target_function,
129            target_args=target_args,
130            target_kw=target_kw,
131            properties=properties,
132            label=label,
133        )

Return a Daemon from a properties dictionary.

pickle
def run( self, keep_daemon_output: bool = True, allow_dirty_run: bool = False, wait: bool = False, timeout: Union[int, float] = 4, debug: bool = False) -> Tuple[bool, str]:
425    def run(
426        self,
427        keep_daemon_output: bool = True,
428        allow_dirty_run: bool = False,
429        wait: bool = False,
430        timeout: Union[int, float] = 4,
431        debug: bool = False,
432    ) -> SuccessTuple:
433        """Run the daemon as a child process and continue executing the parent.
434
435        Parameters
436        ----------
437        keep_daemon_output: bool, default True
438            If `False`, delete the daemon's output directory upon exiting.
439
440        allow_dirty_run: bool, default False
441            If `True`, run the daemon, even if the `daemon_id` directory exists.
442            This option is dangerous because if the same `daemon_id` runs concurrently,
443            the last to finish will overwrite the output of the first.
444
445        wait: bool, default True
446            If `True`, block until `Daemon.status` is running (or the timeout expires).
447
448        timeout: Union[int, float], default 4
449            If `wait` is `True`, block for up to `timeout` seconds before returning a failure.
450
451        Returns
452        -------
453        A SuccessTuple indicating success.
454
455        """
456        import platform
457        if platform.system() == 'Windows':
458            return False, "Cannot run background jobs on Windows."
459
460        ### The daemon might exist and be paused.
461        if self.status == 'paused':
462            return self.resume()
463
464        self._remove_stop_file()
465        if self.status == 'running':
466            return True, f"Daemon '{self}' is already running."
467
468        self.mkdir_if_not_exists(allow_dirty_run)
469        _write_pickle_success_tuple = self.write_pickle()
470        if not _write_pickle_success_tuple[0]:
471            return _write_pickle_success_tuple
472
473        _launch_daemon_code = (
474            "from meerschaum.utils.daemon import Daemon, _daemons; "
475            f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
476            f"_daemons['{self.daemon_id}'] = daemon; "
477            f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
478            "allow_dirty_run=True)"
479        )
480        env = dict(os.environ)
481        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
482        msg = (
483            "Success"
484            if _launch_success_bool
485            else f"Failed to start daemon '{self.daemon_id}'."
486        )
487        if not wait or not _launch_success_bool:
488            return _launch_success_bool, msg
489
490        timeout = self.get_timeout_seconds(timeout)
491        check_timeout_interval = self.get_check_timeout_interval_seconds()
492
493        if not timeout:
494            success = self.status == 'running'
495            msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'."
496            if success:
497                self._capture_process_timestamp('began')
498            return success, msg
499
500        begin = time.perf_counter()
501        while (time.perf_counter() - begin) < timeout:
502            if self.status == 'running':
503                self._capture_process_timestamp('began')
504                return True, "Success"
505            time.sleep(check_timeout_interval)
506
507        return False, (
508            f"Failed to start daemon '{self.daemon_id}' within {timeout} second"
509            + ('s' if timeout != 1 else '') + '.'
510        )

Run the daemon as a child process and continue executing the parent.

Parameters
  • keep_daemon_output (bool, default True): If False, delete the daemon's output directory upon exiting.
  • allow_dirty_run (bool, default False): If True, run the daemon, even if the daemon_id directory exists. This option is dangerous because if the same daemon_id runs concurrently, the last to finish will overwrite the output of the first.
  • wait (bool, default True): If True, block until Daemon.status is running (or the timeout expires).
  • timeout (Union[int, float], default 4): If wait is True, block for up to timeout seconds before returning a failure.
Returns
  • A SuccessTuple indicating success.
def kill(self, timeout: Union[int, float, NoneType] = 8) -> Tuple[bool, str]:
513    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
514        """
515        Forcibly terminate a running daemon.
516        Sends a SIGTERM signal to the process.
517
518        Parameters
519        ----------
520        timeout: Optional[int], default 3
521            How many seconds to wait for the process to terminate.
522
523        Returns
524        -------
525        A SuccessTuple indicating success.
526        """
527        if self.status != 'paused':
528            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
529            if success:
530                self._write_stop_file('kill')
531                self.stdin_file.close()
532                self._remove_blocking_stdin_file()
533                return success, msg
534
535        if self.status == 'stopped':
536            self._write_stop_file('kill')
537            self.stdin_file.close()
538            self._remove_blocking_stdin_file()
539            return True, "Process has already stopped."
540
541        psutil = attempt_import('psutil')
542        process = self.process
543        try:
544            process.terminate()
545            process.kill()
546            process.wait(timeout=timeout)
547        except Exception as e:
548            return False, f"Failed to kill job {self} ({process}) with exception: {e}"
549
550        try:
551            if process.status():
552                return False, "Failed to stop daemon '{self}' ({process})."
553        except psutil.NoSuchProcess:
554            pass
555
556        if self.pid_path.exists():
557            try:
558                self.pid_path.unlink()
559            except Exception:
560                pass
561
562        self._write_stop_file('kill')
563        self.stdin_file.close()
564        self._remove_blocking_stdin_file()
565        return True, "Success"

Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.

Parameters
  • timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
  • A SuccessTuple indicating success.
def quit(self, timeout: Union[int, float, NoneType] = None) -> Tuple[bool, str]:
567    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
568        """Gracefully quit a running daemon."""
569        if self.status == 'paused':
570            return self.kill(timeout)
571
572        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
573        if signal_success:
574            self._write_stop_file('quit')
575            self.stdin_file.close()
576            self._remove_blocking_stdin_file()
577        return signal_success, signal_msg

Gracefully quit a running daemon.

def pause( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
579    def pause(
580        self,
581        timeout: Union[int, float, None] = None,
582        check_timeout_interval: Union[float, int, None] = None,
583    ) -> SuccessTuple:
584        """
585        Pause the daemon if it is running.
586
587        Parameters
588        ----------
589        timeout: Union[float, int, None], default None
590            The maximum number of seconds to wait for a process to suspend.
591
592        check_timeout_interval: Union[float, int, None], default None
593            The number of seconds to wait between checking if the process is still running.
594
595        Returns
596        -------
597        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
598        """
599        self._remove_blocking_stdin_file()
600
601        if self.process is None:
602            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
603
604        if self.status == 'paused':
605            return True, f"Daemon '{self.daemon_id}' is already paused."
606
607        self._write_stop_file('pause')
608        self.stdin_file.close()
609        self._remove_blocking_stdin_file()
610        try:
611            self.process.suspend()
612        except Exception as e:
613            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
614
615        timeout = self.get_timeout_seconds(timeout)
616        check_timeout_interval = self.get_check_timeout_interval_seconds(
617            check_timeout_interval
618        )
619
620        psutil = attempt_import('psutil')
621
622        if not timeout:
623            try:
624                success = self.process.status() == 'stopped'
625            except psutil.NoSuchProcess:
626                success = True
627            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
628            if success:
629                self._capture_process_timestamp('paused')
630            return success, msg
631
632        begin = time.perf_counter()
633        while (time.perf_counter() - begin) < timeout:
634            try:
635                if self.process.status() == 'stopped':
636                    self._capture_process_timestamp('paused')
637                    return True, "Success"
638            except psutil.NoSuchProcess as e:
639                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
640            time.sleep(check_timeout_interval)
641
642        return False, (
643            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
644            + ('s' if timeout != 1 else '') + '.'
645        )

Pause the daemon if it is running.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully suspended.
def resume( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
647    def resume(
648        self,
649        timeout: Union[int, float, None] = None,
650        check_timeout_interval: Union[float, int, None] = None,
651    ) -> SuccessTuple:
652        """
653        Resume the daemon if it is paused.
654
655        Parameters
656        ----------
657        timeout: Union[float, int, None], default None
658            The maximum number of seconds to wait for a process to resume.
659
660        check_timeout_interval: Union[float, int, None], default None
661            The number of seconds to wait between checking if the process is still stopped.
662
663        Returns
664        -------
665        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
666        """
667        if self.status == 'running':
668            return True, f"Daemon '{self.daemon_id}' is already running."
669
670        if self.status == 'stopped':
671            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
672
673        self._remove_stop_file()
674        try:
675            if self.process is None:
676                return False, f"Cannot resume daemon '{self.daemon_id}'."
677
678            self.process.resume()
679        except Exception as e:
680            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
681
682        timeout = self.get_timeout_seconds(timeout)
683        check_timeout_interval = self.get_check_timeout_interval_seconds(
684            check_timeout_interval
685        )
686
687        if not timeout:
688            success = self.status == 'running'
689            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
690            if success:
691                self._capture_process_timestamp('began')
692            return success, msg
693
694        begin = time.perf_counter()
695        while (time.perf_counter() - begin) < timeout:
696            if self.status == 'running':
697                self._capture_process_timestamp('began')
698                return True, "Success"
699            time.sleep(check_timeout_interval)
700
701        return False, (
702            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
703            + ('s' if timeout != 1 else '') + '.'
704        )

Resume the daemon if it is paused.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully resumed.
def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
837    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
838        """Create the Daemon's directory.
839        If `allow_dirty_run` is `False` and the directory already exists,
840        raise a `FileExistsError`.
841        """
842        try:
843            self.path.mkdir(parents=True, exist_ok=True)
844            _already_exists = any(os.scandir(self.path))
845        except FileExistsError:
846            _already_exists = True
847
848        if _already_exists and not allow_dirty_run:
849            error(
850                f"Daemon '{self.daemon_id}' already exists. " +
851                "To allow this daemon to run, do one of the following:\n"
852                + "  - Execute `daemon.cleanup()`.\n"
853                + f"  - Delete the directory '{self.path}'.\n"
854                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
855                FileExistsError,
856            )

Create the Daemon's directory. If allow_dirty_run is False and the directory already exists, raise a FileExistsError.

process: "Union['psutil.Process', None]"
858    @property
859    def process(self) -> Union['psutil.Process', None]:
860        """
861        Return the psutil process for the Daemon.
862        """
863        psutil = attempt_import('psutil')
864        pid = self.pid
865        if pid is None:
866            return None
867        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
868            try:
869                self._process = psutil.Process(int(pid))
870                process_exists = True
871            except Exception:
872                process_exists = False
873            if not process_exists:
874                _ = self.__dict__.pop('_process', None)
875                try:
876                    if self.pid_path.exists():
877                        self.pid_path.unlink()
878                except Exception:
879                    pass
880                return None
881        return self._process

Return the psutil process for the Daemon.

status: str
883    @property
884    def status(self) -> str:
885        """
886        Return the running status of this Daemon.
887        """
888        if self.process is None:
889            return 'stopped'
890
891        psutil = attempt_import('psutil', lazy=False)
892        try:
893            if self.process.status() == 'stopped':
894                return 'paused'
895            if self.process.status() == 'zombie':
896                raise psutil.NoSuchProcess(self.process.pid)
897        except (psutil.NoSuchProcess, AttributeError):
898            if self.pid_path.exists():
899                try:
900                    self.pid_path.unlink()
901                except Exception:
902                    pass
903            return 'stopped'
904
905        return 'running'

Return the running status of this Daemon.

path: pathlib.Path
915    @property
916    def path(self) -> pathlib.Path:
917        """
918        Return the path for this Daemon's directory.
919        """
920        return self._get_path_from_daemon_id(self.daemon_id)

Return the path for this Daemon's directory.

properties_path: pathlib.Path
929    @property
930    def properties_path(self) -> pathlib.Path:
931        """
932        Return the `propterties.json` path for this Daemon.
933        """
934        return self._get_properties_path_from_daemon_id(self.daemon_id)

Return the propterties.json path for this Daemon.

stop_path: pathlib.Path
936    @property
937    def stop_path(self) -> pathlib.Path:
938        """
939        Return the path for the stop file (created when manually stopped).
940        """
941        return self.path / '.stop.json'

Return the path for the stop file (created when manually stopped).

log_path: pathlib.Path
943    @property
944    def log_path(self) -> pathlib.Path:
945        """
946        Return the log path.
947        """
948        logs_cf = self.properties.get('logs', None) or {}
949        if 'path' not in logs_cf:
950            from meerschaum.config.paths import LOGS_RESOURCES_PATH
951            return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
952
953        return pathlib.Path(logs_cf['path'])

Return the log path.

stdin_file_path: pathlib.Path
955    @property
956    def stdin_file_path(self) -> pathlib.Path:
957        """
958        Return the stdin file path.
959        """
960        return self.path / 'input.stdin'

Return the stdin file path.

blocking_stdin_file_path: pathlib.Path
962    @property
963    def blocking_stdin_file_path(self) -> pathlib.Path:
964        """
965        Return the stdin file path.
966        """
967        if '_blocking_stdin_file_path' in self.__dict__:
968            return self._blocking_stdin_file_path
969
970        return self.path / 'input.stdin.block'

Return the stdin file path.

prompt_kwargs_file_path: pathlib.Path
972    @property
973    def prompt_kwargs_file_path(self) -> pathlib.Path:
974        """
975        Return the file path to the kwargs for the invoking `prompt()`.
976        """
977        return self.path / 'prompt_kwargs.json'

Return the file path to the kwargs for the invoking prompt().

log_offset_path: pathlib.Path
979    @property
980    def log_offset_path(self) -> pathlib.Path:
981        """
982        Return the log offset file path.
983        """
984        from meerschaum.config.paths import LOGS_RESOURCES_PATH
985        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')

Return the log offset file path.

log_offset_lock: "'fasteners.InterProcessLock'"
987    @property
988    def log_offset_lock(self) -> 'fasteners.InterProcessLock':
989        """
990        Return the process lock context manager.
991        """
992        if '_log_offset_lock' in self.__dict__:
993            return self._log_offset_lock
994
995        fasteners = attempt_import('fasteners')
996        self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path)
997        return self._log_offset_lock

Return the process lock context manager.

rotating_log: RotatingFile
 999    @property
1000    def rotating_log(self) -> RotatingFile:
1001        """
1002        The rotating log file for the daemon's output.
1003        """
1004        if '_rotating_log' in self.__dict__:
1005            return self._rotating_log
1006
1007        logs_cf = self.properties.get('logs', None) or {}
1008        write_timestamps = logs_cf.get('write_timestamps', None)
1009        if write_timestamps is None:
1010            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1011
1012        timestamp_format = logs_cf.get('timestamp_format', None)
1013        if timestamp_format is None:
1014            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1015
1016        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1017        if num_files_to_keep is None:
1018            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1019
1020        max_file_size = logs_cf.get('max_file_size', None)
1021        if max_file_size is None:
1022            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1023
1024        redirect_streams = logs_cf.get('redirect_streams', True)
1025
1026        self._rotating_log = RotatingFile(
1027            self.log_path,
1028            redirect_streams=redirect_streams,
1029            write_timestamps=write_timestamps,
1030            timestamp_format=timestamp_format,
1031            num_files_to_keep=num_files_to_keep,
1032            max_file_size=max_file_size,
1033        )
1034        return self._rotating_log

The rotating log file for the daemon's output.

stdin_file
1036    @property
1037    def stdin_file(self):
1038        """
1039        Return the file handler for the stdin file.
1040        """
1041        if (_stdin_file := self.__dict__.get('_stdin_file', None)):
1042            return _stdin_file
1043
1044        self._stdin_file = StdinFile(
1045            self.stdin_file_path,
1046            lock_file_path=self.blocking_stdin_file_path,
1047        )
1048        return self._stdin_file

Return the file handler for the stdin file.

log_text: Optional[str]
1050    @property
1051    def log_text(self) -> Union[str, None]:
1052        """
1053        Read the log files and return their contents.
1054        Returns `None` if the log file does not exist.
1055        """
1056        logs_cf = self.properties.get('logs', None) or {}
1057        write_timestamps = logs_cf.get('write_timestamps', None)
1058        if write_timestamps is None:
1059            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1060
1061        timestamp_format = logs_cf.get('timestamp_format', None)
1062        if timestamp_format is None:
1063            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1064
1065        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1066        if num_files_to_keep is None:
1067            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1068
1069        max_file_size = logs_cf.get('max_file_size', None)
1070        if max_file_size is None:
1071            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1072
1073        new_rotating_log = RotatingFile(
1074            self.rotating_log.file_path,
1075            num_files_to_keep=num_files_to_keep,
1076            max_file_size=max_file_size,
1077            write_timestamps=write_timestamps,
1078            timestamp_format=timestamp_format,
1079        )
1080        return new_rotating_log.read()

Read the log files and return their contents. Returns None if the log file does not exist.

def readlines(self) -> List[str]:
1082    def readlines(self) -> List[str]:
1083        """
1084        Read the next log lines, persisting the cursor for later use.
1085        Note this will alter the cursor of `self.rotating_log`.
1086        """
1087        self.rotating_log._cursor = self._read_log_offset()
1088        lines = self.rotating_log.readlines()
1089        self._write_log_offset()
1090        return lines

Read the next log lines, persisting the cursor for later use. Note this will alter the cursor of self.rotating_log.

pid: Optional[int]
1123    @property
1124    def pid(self) -> Union[int, None]:
1125        """
1126        Read the PID file and return its contents.
1127        Returns `None` if the PID file does not exist.
1128        """
1129        if not self.pid_path.exists():
1130            return None
1131        try:
1132            with open(self.pid_path, 'r', encoding='utf-8') as f:
1133                text = f.read()
1134            if len(text) == 0:
1135                return None
1136            pid = int(text.rstrip())
1137        except Exception as e:
1138            warn(e)
1139            text = None
1140            pid = None
1141        return pid

Read the PID file and return its contents. Returns None if the PID file does not exist.

pid_path: pathlib.Path
1143    @property
1144    def pid_path(self) -> pathlib.Path:
1145        """
1146        Return the path to a file containing the PID for this Daemon.
1147        """
1148        return self.path / 'process.pid'

Return the path to a file containing the PID for this Daemon.

pid_lock: "'fasteners.InterProcessLock'"
1150    @property
1151    def pid_lock(self) -> 'fasteners.InterProcessLock':
1152        """
1153        Return the process lock context manager.
1154        """
1155        if '_pid_lock' in self.__dict__:
1156            return self._pid_lock
1157
1158        fasteners = attempt_import('fasteners')
1159        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
1160        return self._pid_lock

Return the process lock context manager.

pickle_path: pathlib.Path
1162    @property
1163    def pickle_path(self) -> pathlib.Path:
1164        """
1165        Return the path for the pickle file.
1166        """
1167        return self.path / 'pickle.pkl'

Return the path for the pickle file.

def read_properties(self) -> Optional[Dict[str, Any]]:
1169    def read_properties(self) -> Optional[Dict[str, Any]]:
1170        """Read the properties JSON file and return the dictionary."""
1171        if not self.properties_path.exists():
1172            return None
1173        try:
1174            with open(self.properties_path, 'r', encoding='utf-8') as file:
1175                properties = json.load(file)
1176        except Exception:
1177            properties = {}
1178        
1179        return properties or {}

Read the properties JSON file and return the dictionary.

def read_pickle(self) -> Daemon:
1181    def read_pickle(self) -> Daemon:
1182        """Read a Daemon's pickle file and return the `Daemon`."""
1183        import pickle
1184        import traceback
1185        if not self.pickle_path.exists():
1186            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1187
1188        if self.pickle_path.stat().st_size == 0:
1189            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1190
1191        try:
1192            with open(self.pickle_path, 'rb') as pickle_file:
1193                daemon = pickle.load(pickle_file)
1194            success, msg = True, 'Success'
1195        except Exception as e:
1196            success, msg = False, str(e)
1197            daemon = None
1198            traceback.print_exception(type(e), e, e.__traceback__)
1199        if not success:
1200            error(msg)
1201        return daemon

Read a Daemon's pickle file and return the Daemon.

properties: Dict[str, Any]
1203    @property
1204    def properties(self) -> Dict[str, Any]:
1205        """
1206        Return the contents of the properties JSON file.
1207        """
1208        try:
1209            _file_properties = self.read_properties() or {}
1210        except Exception:
1211            traceback.print_exc()
1212            _file_properties = {}
1213
1214        if not self._properties:
1215            self._properties = _file_properties
1216
1217        if self._properties is None:
1218            self._properties = {}
1219
1220        if (
1221            self._properties.get('result', None) is None
1222            and _file_properties.get('result', None) is not None
1223        ):
1224            _ = self._properties.pop('result', None)
1225
1226        if _file_properties is not None:
1227            self._properties = apply_patch_to_config(
1228                _file_properties,
1229                self._properties,
1230            )
1231
1232        return self._properties

Return the contents of the properties JSON file.

hidden: bool
1234    @property
1235    def hidden(self) -> bool:
1236        """
1237        Return a bool indicating whether this Daemon should be displayed.
1238        """
1239        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')

Return a bool indicating whether this Daemon should be displayed.

def write_properties(self) -> Tuple[bool, str]:
1241    def write_properties(self) -> SuccessTuple:
1242        """Write the properties dictionary to the properties JSON file
1243        (only if self.properties exists).
1244        """
1245        from meerschaum.utils.misc import generate_password
1246        success, msg = (
1247            False,
1248            f"No properties to write for daemon '{self.daemon_id}'."
1249        )
1250        backup_path = self.properties_path.parent / (generate_password(8) + '.json')
1251        props = self.properties
1252        if props is not None:
1253            try:
1254                self.path.mkdir(parents=True, exist_ok=True)
1255                if self.properties_path.exists():
1256                    self.properties_path.rename(backup_path)
1257                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1258                    json.dump(props, properties_file)
1259                success, msg = True, 'Success'
1260            except Exception as e:
1261                success, msg = False, str(e)
1262
1263        try:
1264            if backup_path.exists():
1265                if not success:
1266                    backup_path.rename(self.properties_path)
1267                else:
1268                    backup_path.unlink()
1269        except Exception as e:
1270            success, msg = False, str(e)
1271
1272        return success, msg

Write the properties dictionary to the properties JSON file (only if self.properties exists).

def write_pickle(self) -> Tuple[bool, str]:
1274    def write_pickle(self) -> SuccessTuple:
1275        """Write the pickle file for the daemon."""
1276        import pickle
1277        import traceback
1278        from meerschaum.utils.misc import generate_password
1279
1280        if not self.pickle:
1281            return True, "Success"
1282
1283        backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl')
1284        try:
1285            self.path.mkdir(parents=True, exist_ok=True)
1286            if self.pickle_path.exists():
1287                self.pickle_path.rename(backup_path)
1288            with open(self.pickle_path, 'wb+') as pickle_file:
1289                pickle.dump(self, pickle_file)
1290            success, msg = True, "Success"
1291        except Exception as e:
1292            success, msg = False, str(e)
1293            traceback.print_exception(type(e), e, e.__traceback__)
1294        try:
1295            if backup_path.exists():
1296                if not success:
1297                    backup_path.rename(self.pickle_path)
1298                else:
1299                    backup_path.unlink()
1300        except Exception as e:
1301            success, msg = False, str(e)
1302        return success, msg

Write the pickle file for the daemon.

def cleanup(self, keep_logs: bool = False) -> Tuple[bool, str]:
1332    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1333        """
1334        Remove a daemon's directory after execution.
1335
1336        Parameters
1337        ----------
1338        keep_logs: bool, default False
1339            If `True`, skip deleting the daemon's log files.
1340
1341        Returns
1342        -------
1343        A `SuccessTuple` indicating success.
1344        """
1345        if self.path.exists():
1346            try:
1347                shutil.rmtree(self.path)
1348            except Exception as e:
1349                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1350                warn(msg)
1351                return False, msg
1352        if not keep_logs:
1353            self.rotating_log.delete()
1354            try:
1355                if self.log_offset_path.exists():
1356                    self.log_offset_path.unlink()
1357            except Exception as e:
1358                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1359                warn(msg)
1360                return False, msg
1361        return True, "Success"

Remove a daemon's directory after execution.

Parameters
  • keep_logs (bool, default False): If True, skip deleting the daemon's log files.
Returns
  • A SuccessTuple indicating success.
def get_timeout_seconds(self, timeout: Union[int, float, NoneType] = None) -> Union[int, float]:
1364    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1365        """
1366        Return the timeout value to use. Use `--timeout-seconds` if provided,
1367        else the configured default (8).
1368        """
1369        if isinstance(timeout, (int, float)):
1370            return timeout
1371        return get_config('jobs', 'timeout_seconds')

Return the timeout value to use. Use --timeout-seconds if provided, else the configured default (8).

def get_check_timeout_interval_seconds( self, check_timeout_interval: Union[int, float, NoneType] = None) -> Union[int, float]:
1374    def get_check_timeout_interval_seconds(
1375        self,
1376        check_timeout_interval: Union[int, float, None] = None,
1377    ) -> Union[int, float]:
1378        """
1379        Return the interval value to check the status of timeouts.
1380        """
1381        if isinstance(check_timeout_interval, (int, float)):
1382            return check_timeout_interval
1383        return get_config('jobs', 'check_timeout_interval_seconds')

Return the interval value to check the status of timeouts.

target_args: Optional[Tuple[Any]]
1385    @property
1386    def target_args(self) -> Union[Tuple[Any], None]:
1387        """
1388        Return the positional arguments to pass to the target function.
1389        """
1390        target_args = (
1391            self.__dict__.get('_target_args', None)
1392            or self.properties.get('target', {}).get('args', None)
1393        )
1394        if target_args is None:
1395            return tuple([])
1396
1397        return tuple(target_args)

Return the positional arguments to pass to the target function.

target_kw: Optional[Dict[str, Any]]
1399    @property
1400    def target_kw(self) -> Union[Dict[str, Any], None]:
1401        """
1402        Return the keyword arguments to pass to the target function.
1403        """
1404        target_kw = (
1405            self.__dict__.get('_target_kw', None)
1406            or self.properties.get('target', {}).get('kw', None)
1407        )
1408        if target_kw is None:
1409            return {}
1410
1411        return {key: val for key, val in target_kw.items()}

Return the keyword arguments to pass to the target function.

class StdinFile(io.TextIOBase):
 22class StdinFile(io.TextIOBase):
 23    """
 24    Redirect user input into a Daemon's context.
 25    """
 26    def __init__(
 27        self,
 28        file_path: Union[pathlib.Path, str],
 29        lock_file_path: Optional[pathlib.Path] = None,
 30        decode: bool = True,
 31        refresh_seconds: Union[int, float, None] = None,
 32    ):
 33        if isinstance(file_path, str):
 34            file_path = pathlib.Path(file_path)
 35
 36        self.file_path = file_path
 37        self.blocking_file_path = (
 38            lock_file_path
 39            if lock_file_path is not None
 40            else (file_path.parent / (file_path.name + '.block'))
 41        )
 42        self._file_handler = None
 43        self._fd = None
 44        self.sel = selectors.DefaultSelector()
 45        self.decode = decode
 46        self._write_fp = None
 47        self._refresh_seconds = refresh_seconds
 48
 49    @property
 50    def encoding(self):
 51        return 'utf-8'
 52
 53    @property
 54    def file_handler(self):
 55        """
 56        Return the read file handler to the provided file path.
 57        """
 58        if self._file_handler is not None:
 59            return self._file_handler
 60
 61        if not self.file_path.exists():
 62            self.file_path.parent.mkdir(parents=True, exist_ok=True)
 63            os.mkfifo(self.file_path.as_posix(), mode=0o600)
 64
 65        self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK)
 66        self._file_handler = os.fdopen(self._fd, 'rb', buffering=0)
 67        self.sel.register(self._file_handler, selectors.EVENT_READ)
 68        return self._file_handler
 69
 70    def write(self, data):
 71        if self._write_fp is None:
 72            self.file_path.parent.mkdir(parents=True, exist_ok=True)
 73            if not self.file_path.exists():
 74                os.mkfifo(self.file_path.as_posix(), mode=0o600)
 75            self._write_fp = open(self.file_path, 'wb')
 76
 77        if isinstance(data, str):
 78            data = data.encode('utf-8')
 79        try:
 80            self._write_fp.write(data)
 81            self._write_fp.flush()
 82        except BrokenPipeError:
 83            pass
 84
 85    def fileno(self):
 86        fileno = self.file_handler.fileno()
 87        return fileno
 88
 89    def read(self, size=-1):
 90        """
 91        Read from the FIFO pipe, blocking on EOFError.
 92        """
 93        _ = self.file_handler
 94        while True:
 95            try:
 96                data = self._file_handler.read(size)
 97                if data:
 98                    try:
 99                        if self.blocking_file_path.exists():
100                            self.blocking_file_path.unlink()
101                    except Exception:
102                        warn(traceback.format_exc())
103                    return data.decode('utf-8') if self.decode else data
104            except (OSError, EOFError):
105                pass
106
107            if not self.blocking_file_path.exists():
108                self.blocking_file_path.touch()
109            time.sleep(self.refresh_seconds)
110
111    def readline(self, size=-1):
112        line = '' if self.decode else b''
113        while True:
114            data = self.read(1)
115            if not data or ((data == '\n') if self.decode else (data == b'\n')):
116                break
117            line += data
118
119        return line
120
121    def close(self):
122        if self._file_handler is not None:
123            self.sel.unregister(self._file_handler)
124            self._file_handler.close()
125            try:
126                os.close(self._fd)
127            except OSError:
128                pass
129            self._file_handler = None
130            self._fd = None
131
132        if self._write_fp is not None:
133            try:
134                self._write_fp.close()
135            except BrokenPipeError:
136                pass
137            self._write_fp = None
138
139        try:
140            if self.blocking_file_path.exists():
141                self.blocking_file_path.unlink()
142        except Exception:
143            pass
144        super().close()
145
146    def is_open(self):
147        return self._file_handler is not None
148
149    def isatty(self) -> bool:
150        return False
151
152    @property
153    def refresh_seconds(self) -> Union[int, float]:
154        """
155        How many seconds between checking for blocking functions.
156        """
157        if not self._refresh_seconds:
158            self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds')
159        return self._refresh_seconds
160
161    def __str__(self) -> str:
162        return f"StdinFile('{self.file_path}')"
163
164    def __repr__(self) -> str:
165        return str(self)

Redirect user input into a Daemon's context.

StdinFile( file_path: Union[pathlib.Path, str], lock_file_path: Optional[pathlib.Path] = None, decode: bool = True, refresh_seconds: Union[int, float, NoneType] = None)
26    def __init__(
27        self,
28        file_path: Union[pathlib.Path, str],
29        lock_file_path: Optional[pathlib.Path] = None,
30        decode: bool = True,
31        refresh_seconds: Union[int, float, None] = None,
32    ):
33        if isinstance(file_path, str):
34            file_path = pathlib.Path(file_path)
35
36        self.file_path = file_path
37        self.blocking_file_path = (
38            lock_file_path
39            if lock_file_path is not None
40            else (file_path.parent / (file_path.name + '.block'))
41        )
42        self._file_handler = None
43        self._fd = None
44        self.sel = selectors.DefaultSelector()
45        self.decode = decode
46        self._write_fp = None
47        self._refresh_seconds = refresh_seconds
file_path
blocking_file_path
sel
decode
encoding
49    @property
50    def encoding(self):
51        return 'utf-8'

Encoding of the text stream.

Subclasses should override.

file_handler
53    @property
54    def file_handler(self):
55        """
56        Return the read file handler to the provided file path.
57        """
58        if self._file_handler is not None:
59            return self._file_handler
60
61        if not self.file_path.exists():
62            self.file_path.parent.mkdir(parents=True, exist_ok=True)
63            os.mkfifo(self.file_path.as_posix(), mode=0o600)
64
65        self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK)
66        self._file_handler = os.fdopen(self._fd, 'rb', buffering=0)
67        self.sel.register(self._file_handler, selectors.EVENT_READ)
68        return self._file_handler

Return the read file handler to the provided file path.

def write(self, data):
70    def write(self, data):
71        if self._write_fp is None:
72            self.file_path.parent.mkdir(parents=True, exist_ok=True)
73            if not self.file_path.exists():
74                os.mkfifo(self.file_path.as_posix(), mode=0o600)
75            self._write_fp = open(self.file_path, 'wb')
76
77        if isinstance(data, str):
78            data = data.encode('utf-8')
79        try:
80            self._write_fp.write(data)
81            self._write_fp.flush()
82        except BrokenPipeError:
83            pass

Write string s to stream.

Return the number of characters written (which is always equal to the length of the string).

def fileno(self):
85    def fileno(self):
86        fileno = self.file_handler.fileno()
87        return fileno

Return underlying file descriptor if one exists.

Raise OSError if the IO object does not use a file descriptor.

def read(self, size=-1):
 89    def read(self, size=-1):
 90        """
 91        Read from the FIFO pipe, blocking on EOFError.
 92        """
 93        _ = self.file_handler
 94        while True:
 95            try:
 96                data = self._file_handler.read(size)
 97                if data:
 98                    try:
 99                        if self.blocking_file_path.exists():
100                            self.blocking_file_path.unlink()
101                    except Exception:
102                        warn(traceback.format_exc())
103                    return data.decode('utf-8') if self.decode else data
104            except (OSError, EOFError):
105                pass
106
107            if not self.blocking_file_path.exists():
108                self.blocking_file_path.touch()
109            time.sleep(self.refresh_seconds)

Read from the FIFO pipe, blocking on EOFError.

def readline(self, size=-1):
111    def readline(self, size=-1):
112        line = '' if self.decode else b''
113        while True:
114            data = self.read(1)
115            if not data or ((data == '\n') if self.decode else (data == b'\n')):
116                break
117            line += data
118
119        return line

Read until newline or EOF.

Return an empty string if EOF is hit immediately. If size is specified, at most size characters will be read.

def close(self):
121    def close(self):
122        if self._file_handler is not None:
123            self.sel.unregister(self._file_handler)
124            self._file_handler.close()
125            try:
126                os.close(self._fd)
127            except OSError:
128                pass
129            self._file_handler = None
130            self._fd = None
131
132        if self._write_fp is not None:
133            try:
134                self._write_fp.close()
135            except BrokenPipeError:
136                pass
137            self._write_fp = None
138
139        try:
140            if self.blocking_file_path.exists():
141                self.blocking_file_path.unlink()
142        except Exception:
143            pass
144        super().close()

Flush and close the IO object.

This method has no effect if the file is already closed.

def is_open(self):
146    def is_open(self):
147        return self._file_handler is not None
def isatty(self) -> bool:
149    def isatty(self) -> bool:
150        return False

Return whether this is an 'interactive' stream.

Return False if it can't be determined.

refresh_seconds: Union[int, float]
152    @property
153    def refresh_seconds(self) -> Union[int, float]:
154        """
155        How many seconds between checking for blocking functions.
156        """
157        if not self._refresh_seconds:
158            self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds')
159        return self._refresh_seconds

How many seconds between checking for blocking functions.

class RotatingFile(io.IOBase):
 27class RotatingFile(io.IOBase):
 28    """
 29    A `RotatingFile` may be treated like a normal file-like object.
 30    Under the hood, however, it will create new sub-files and delete old ones.
 31    """
 32
 33    SEEK_BACK_ATTEMPTS: int = 5
 34
 35    def __init__(
 36        self,
 37        file_path: pathlib.Path,
 38        num_files_to_keep: Optional[int] = None,
 39        max_file_size: Optional[int] = None,
 40        redirect_streams: bool = False,
 41        write_timestamps: bool = False,
 42        timestamp_format: Optional[str] = None,
 43        write_callback: Optional[Callable[[str], None]] = None,
 44    ):
 45        """
 46        Create a file-like object which manages other files.
 47
 48        Parameters
 49        ----------
 50        num_files_to_keep: int, default None
 51            How many sub-files to keep at any given time.
 52            Defaults to the configured value (5).
 53
 54        max_file_size: int, default None
 55            How large in bytes each sub-file can grow before another file is created.
 56            Note that this is not a hard limit but rather a threshold
 57            which may be slightly exceeded.
 58            Defaults to the configured value (100_000).
 59
 60        redirect_streams: bool, default False
 61            If `True`, redirect previous file streams when opening a new file descriptor.
 62            
 63            NOTE: Only set this to `True` if you are entering into a daemon context.
 64            Doing so will redirect `sys.stdout` and `sys.stderr` into the log files.
 65
 66        write_timestamps: bool, default False
 67            If `True`, prepend the current UTC timestamp to each line of the file.
 68
 69        timestamp_format: str, default None
 70            If `write_timestamps` is `True`, use this format for the timestamps.
 71            Defaults to `'%Y-%m-%d %H:%M'`.
 72
 73        write_callback: Optional[Callable[[str], None]], default None
 74            If provided, execute this callback with the data to be written.
 75        """
 76        self.file_path = pathlib.Path(file_path)
 77        if num_files_to_keep is None:
 78            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
 79        if max_file_size is None:
 80            max_file_size = get_config('jobs', 'logs', 'max_file_size')
 81        if timestamp_format is None:
 82            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
 83        if num_files_to_keep < 1:
 84            raise ValueError("At least 1 file must be kept.")
 85        if max_file_size < 100:
 86            raise ValueError("Subfiles must contain at least 100 bytes.")
 87
 88        self.num_files_to_keep = num_files_to_keep
 89        self.max_file_size = max_file_size
 90        self.redirect_streams = redirect_streams
 91        self.write_timestamps = write_timestamps
 92        self.timestamp_format = timestamp_format
 93        self.write_callback = write_callback
 94        self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?')
 95
 96        ### When subfiles are opened, map from their index to the file objects.
 97        self.subfile_objects = {}
 98        self._redirected_subfile_objects = {}
 99        self._current_file_obj = None
100        self._previous_file_obj = None
101
102        ### When reading, keep track of the file index and position.
103        self._cursor: Tuple[int, int] = (0, 0)
104
105        ### Don't forget to close any stray files.
106        atexit.register(self.close)
107
108
109    def fileno(self):
110        """
111        Return the file descriptor for the latest subfile.
112        """
113        self.refresh_files(start_interception=False)
114        return self._current_file_obj.fileno()
115
116
117    def get_latest_subfile_path(self) -> pathlib.Path:
118        """
119        Return the path for the latest subfile to which to write into.
120        """
121        return self.get_subfile_path_from_index(
122            self.get_latest_subfile_index()
123        )
124
125
126    def get_remaining_subfile_size(self, subfile_index: int) -> int:
127        """
128        Return the remaining buffer size for a subfile.
129
130        Parameters
131        ---------
132        subfile_index: int
133            The index of the subfile to be checked.
134
135        Returns
136        -------
137        The remaining size in bytes.
138        """
139        subfile_path = self.get_subfile_path_from_index(subfile_index)
140        if not subfile_path.exists():
141            return self.max_file_size
142
143        return self.max_file_size - os.path.getsize(subfile_path)
144
145
146    def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
147        """
148        Return whether a given subfile is too large.
149
150        Parameters
151        ----------
152        subfile_index: int
153            The index of the subfile to be checked.
154
155        potential_new_len: int, default 0
156            The length of a potential write of new data.
157
158        Returns
159        -------
160        A bool indicating the subfile is or will be too large.
161        """
162        subfile_path = self.get_subfile_path_from_index(subfile_index)
163        if not subfile_path.exists():
164            return False
165
166        self.flush()
167
168        return (
169            (os.path.getsize(subfile_path) + potential_new_len)
170            >=
171            self.max_file_size
172        )
173
174
175    def get_latest_subfile_index(self) -> int:
176        """
177        Return the latest existing subfile index.
178        If no index may be found, return -1.
179        """
180        existing_subfile_paths = self.get_existing_subfile_paths()
181        latest_index = (
182            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
183            if existing_subfile_paths
184            else 0
185        )
186        return latest_index
187
188
189    def get_index_from_subfile_name(self, subfile_name: str) -> int:
190        """
191        Return the index from a given subfile name.
192        If the file name cannot be parsed, return -1.
193        """
194        try:
195            return int(subfile_name.replace(self.file_path.name + '.', ''))
196        except Exception:
197            return -1
198
199
200    def get_subfile_name_from_index(self, subfile_index: int) -> str:
201        """
202        Return the subfile name from the given index.
203        """
204        return f'{self.file_path.name}.{subfile_index}'
205
206
207    def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
208        """
209        Return the subfile's path from its index.
210        """
211        return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)
212
213
214    def get_existing_subfile_indices(self) -> List[int]:
215        """
216        Return of list of subfile indices which exist on disk.
217        """
218        existing_subfile_paths = self.get_existing_subfile_paths()
219        return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]
220
221
222    def get_existing_subfile_paths(self) -> List[pathlib.Path]:
223        """
224        Return a list of file paths that match the input filename pattern.
225        """
226        if not self.file_path.parent.exists():
227            return []
228
229        subfile_names_indices = sorted(
230            [
231                (file_name, self.get_index_from_subfile_name(file_name))
232                for file_name in os.listdir(self.file_path.parent)
233                if (
234                    file_name.startswith(self.file_path.name)
235                    and re.match(self.subfile_regex_pattern, file_name)
236                )
237            ],
238            key=lambda x: x[1],
239        )
240        return [
241            (self.file_path.parent / file_name)
242            for file_name, _ in subfile_names_indices
243        ]
244
245
246    def refresh_files(
247        self,
248        potential_new_len: int = 0,
249        start_interception: bool = False,
250    ) -> '_io.TextUIWrapper':
251        """
252        Check the state of the subfiles.
253        If the latest subfile is too large, create a new file and delete old ones.
254
255        Parameters
256        ----------
257        potential_new_len: int, default 0
258
259        start_interception: bool, default False
260            If `True`, kick off the file interception threads.
261        """
262        self.flush()
263
264        latest_subfile_index = self.get_latest_subfile_index()
265        latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index)
266
267        ### First run with existing log files: open the most recent log file.
268        is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None)
269
270        ### Sometimes a new file is created but output doesn't switch over.
271        lost_latest_handle = (
272            self._current_file_obj is not None
273            and
274            self.get_index_from_subfile_name(self._current_file_obj.name) == -1
275        )
276        if is_first_run_with_logs or lost_latest_handle:
277            self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8')
278            if self.redirect_streams:
279                try:
280                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
281                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
282                except OSError:
283                    warn(
284                        f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}"
285                    )
286                if start_interception and self.write_timestamps:
287                    self.start_log_fd_interception()
288
289        create_new_file = (
290            (latest_subfile_index == -1)
291            or
292            self._current_file_obj is None
293            or
294            self.is_subfile_too_large(latest_subfile_index, potential_new_len)
295        )
296        if create_new_file:
297            self.increment_subfiles()
298
299        return self._current_file_obj
300
301    def increment_subfiles(self, increment_by: int = 1):
302        """
303        Create a new subfile and switch the file pointer over.
304        """
305        latest_subfile_index = self.get_latest_subfile_index()
306        old_subfile_index = latest_subfile_index
307        new_subfile_index = old_subfile_index + increment_by
308        new_file_path = self.get_subfile_path_from_index(new_subfile_index)
309        self._previous_file_obj = self._current_file_obj
310        self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8')
311        self.subfile_objects[new_subfile_index] = self._current_file_obj
312        self.flush()
313
314        if self.redirect_streams:
315            if self._previous_file_obj is not None:
316                self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj
317                daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj)
318            daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
319            daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
320        self.close(unused_only=True)
321
322        ### Sanity check in case writing somehow fails.
323        if self._previous_file_obj is self._current_file_obj:
324            self._previous_file_obj = None
325
326        self.delete(unused_only=True)
327
328    def close(self, unused_only: bool = False) -> None:
329        """
330        Close any open file descriptors.
331
332        Parameters
333        ----------
334        unused_only: bool, default False
335            If `True`, only close file descriptors not currently in use.
336        """
337        self.stop_log_fd_interception(unused_only=unused_only)
338        subfile_indices = sorted(self.subfile_objects.keys())
339        for subfile_index in subfile_indices:
340            subfile_object = self.subfile_objects[subfile_index]
341            if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj):
342                continue
343            try:
344                if not subfile_object.closed:
345                    subfile_object.close()
346            except Exception:
347                warn(f"Failed to close an open subfile:\n{traceback.format_exc()}")
348
349            _ = self.subfile_objects.pop(subfile_index, None)
350            if self.redirect_streams:
351                _ = self._redirected_subfile_objects.pop(subfile_index, None)
352
353        if not unused_only:
354            self._previous_file_obj = None
355            self._current_file_obj = None
356
357
358    def get_timestamp_prefix_str(self) -> str:
359        """
360        Return the current minute prefix string.
361        """
362        return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '
363
364
365    def write(self, data: str) -> None:
366        """
367        Write the given text into the latest subfile.
368        If the subfile will be too large, create a new subfile.
369        If too many subfiles exist at once, the oldest one will be deleted.
370
371        NOTE: This will not split data across multiple files.
372        As such, if data is larger than max_file_size, then the corresponding subfile
373        may exceed this limit.
374        """
375        try:
376            if callable(self.write_callback):
377                self.write_callback(data)
378        except Exception:
379            warn(f"Failed to execute write callback:\n{traceback.format_exc()}")
380
381        try:
382            self.file_path.parent.mkdir(exist_ok=True, parents=True)
383            if isinstance(data, bytes):
384                data = data.decode('utf-8')
385
386            prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else ""
387            suffix_str = "\n" if self.write_timestamps else ""
388            self.refresh_files(
389                potential_new_len = len(prefix_str + data + suffix_str),
390                start_interception = self.write_timestamps,
391            )
392            try:
393                if prefix_str:
394                    self._current_file_obj.write(prefix_str)
395                self._current_file_obj.write(data)
396                if suffix_str:
397                    self._current_file_obj.write(suffix_str)
398            except BrokenPipeError:
399                warn("BrokenPipeError encountered. The daemon may have been terminated.")
400                return
401            except Exception:
402                warn(f"Failed to write to subfile:\n{traceback.format_exc()}")
403            self.flush()
404            self.delete(unused_only=True)
405        except Exception as e:
406            warn(f"Unexpected error in RotatingFile.write: {e}")
407
408
409    def delete(self, unused_only: bool = False) -> None:
410        """
411        Delete old subfiles.
412
413        Parameters
414        ----------
415        unused_only: bool, default False
416            If `True`, only delete subfiles which are no longer needed.
417        """
418        existing_subfile_paths = self.get_existing_subfile_paths()
419        if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep:
420            return
421
422        self.flush()
423        self.close(unused_only=unused_only)
424
425        end_ix = (
426            (-1 * self.num_files_to_keep)
427            if unused_only
428            else len(existing_subfile_paths)
429        )
430        for subfile_path_to_delete in existing_subfile_paths[0:end_ix]:
431            subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name)
432
433            try:
434                subfile_path_to_delete.unlink()
435            except Exception:
436                warn(
437                    f"Unable to delete subfile '{subfile_path_to_delete}':\n"
438                    + f"{traceback.format_exc()}"
439                )
440
441
442    def read(self, *args, **kwargs) -> str:
443        """
444        Read the contents of the existing subfiles.
445        """
446        existing_subfile_indices = [
447            self.get_index_from_subfile_name(subfile_path.name)
448            for subfile_path in self.get_existing_subfile_paths()
449        ]
450        paths_to_read = [
451            self.get_subfile_path_from_index(subfile_index)
452            for subfile_index in existing_subfile_indices
453            if subfile_index >= self._cursor[0]
454        ]
455        buffer = ''
456        refresh_cursor = True
457        for subfile_path in paths_to_read:
458            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
459            seek_ix = (
460                self._cursor[1]
461                if subfile_index == self._cursor[0]
462                else 0
463            )
464
465            if (
466                subfile_index in self.subfile_objects
467                and
468                subfile_index not in self._redirected_subfile_objects
469            ):
470                subfile_object = self.subfile_objects[subfile_index]
471                for i in range(self.SEEK_BACK_ATTEMPTS):
472                    try:
473                        subfile_object.seek(max(seek_ix - i, 0))
474                        buffer += subfile_object.read()
475                    except UnicodeDecodeError:
476                        continue
477                    break
478            else:
479                with open(subfile_path, 'r', encoding='utf-8') as f:
480                    for i in range(self.SEEK_BACK_ATTEMPTS):
481                        try:
482                            f.seek(max(seek_ix - i, 0))
483                            buffer += f.read()
484                        except UnicodeDecodeError:
485                            continue
486                        break
487
488                    ### Handle the case when no files have yet been opened.
489                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
490                        self._cursor = (subfile_index, f.tell())
491                        refresh_cursor = False
492
493        if refresh_cursor:
494            self.refresh_cursor()
495        return buffer
496
497
498    def refresh_cursor(self) -> None:
499        """
500        Update the cursor to the latest subfile index and file.tell() value.
501        """
502        self.flush()
503        existing_subfile_paths = self.get_existing_subfile_paths()
504        current_ix = (
505            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
506            if existing_subfile_paths
507            else 0
508        )
509        position = self._current_file_obj.tell() if self._current_file_obj is not None else 0
510        self._cursor = (current_ix, position)
511
512
513    def readlines(self) -> List[str]:
514        """
515        Return a list of lines of text.
516        """
517        existing_subfile_indices = [
518            self.get_index_from_subfile_name(subfile_path.name)
519            for subfile_path in self.get_existing_subfile_paths()
520        ]
521        paths_to_read = [
522            self.get_subfile_path_from_index(subfile_index)
523            for subfile_index in existing_subfile_indices
524            if subfile_index >= self._cursor[0]
525        ]
526
527        lines = []
528        refresh_cursor = True
529        for subfile_path in paths_to_read:
530            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
531            seek_ix = (
532                self._cursor[1]
533                if subfile_index == self._cursor[0]
534                else 0
535            )
536
537            subfile_lines = []
538            if (
539                subfile_index in self.subfile_objects
540                and
541                subfile_index not in self._redirected_subfile_objects
542            ):
543                subfile_object = self.subfile_objects[subfile_index]
544                for i in range(self.SEEK_BACK_ATTEMPTS):
545                    try:
546                        subfile_object.seek(max((seek_ix - i), 0))
547                        subfile_lines = subfile_object.readlines()
548                    except UnicodeDecodeError:
549                        continue
550                    break
551            else:
552                with open(subfile_path, 'r', encoding='utf-8') as f:
553                    for i in range(self.SEEK_BACK_ATTEMPTS):
554                        try:
555                            f.seek(max(seek_ix - i, 0))
556                            subfile_lines = f.readlines()
557                        except UnicodeDecodeError:
558                            continue
559                        break
560
561                    ### Handle the case when no files have yet been opened.
562                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
563                        self._cursor = (subfile_index, f.tell())
564                        refresh_cursor = False
565
566            ### Sometimes a line may span multiple files.
567            if lines and subfile_lines and not lines[-1].endswith('\n'):
568                lines[-1] += subfile_lines[0]
569                new_lines = subfile_lines[1:]
570            else:
571                new_lines = subfile_lines
572            lines.extend(new_lines)
573
574        if refresh_cursor:
575            self.refresh_cursor()
576        return lines
577
578
579    def seekable(self) -> bool:
580        return True
581
582
583    def seek(self, position: int) -> None:
584        """
585        Seek to the beginning of the logs stream.
586        """
587        existing_subfile_indices = self.get_existing_subfile_indices()
588        min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0
589        max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0
590        if position == 0:
591            self._cursor = (min_ix, 0)
592            return
593
594        self._cursor = (max_ix, position)
595        if self._current_file_obj is not None:
596            self._current_file_obj.seek(position)
597
598    
599    def flush(self) -> None:
600        """
601        Flush any open subfiles.
602        """
603        for subfile_index, subfile_object in self.subfile_objects.items():
604            if not subfile_object.closed:
605                try:
606                    subfile_object.flush()
607                except Exception:
608                    warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}")
609
610        if self.redirect_streams:
611            try:
612                sys.stdout.flush()
613            except BrokenPipeError:
614                pass
615            except Exception:
616                warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}")
617            try:
618                sys.stderr.flush()
619            except BrokenPipeError:
620                pass
621            except Exception:
622                warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")
623
624
625    def start_log_fd_interception(self):
626        """
627        Start the file descriptor monitoring threads.
628        """
629        if not self.write_timestamps:
630            return
631
632        self._stdout_interceptor = FileDescriptorInterceptor(
633            sys.stdout.fileno(),
634            self.get_timestamp_prefix_str,
635        )
636        self._stderr_interceptor = FileDescriptorInterceptor(
637            sys.stderr.fileno(),
638            self.get_timestamp_prefix_str,
639        )
640
641        self._stdout_interceptor_thread = Thread(
642            target = self._stdout_interceptor.start_interception,
643            daemon = True,
644        )
645        self._stderr_interceptor_thread = Thread(
646            target = self._stderr_interceptor.start_interception,
647            daemon = True,
648        )
649        self._stdout_interceptor_thread.start()
650        self._stderr_interceptor_thread.start()
651        self._intercepting = True
652
653        if '_interceptor_threads' not in self.__dict__:
654            self._interceptor_threads = []
655        if '_interceptors' not in self.__dict__:
656            self._interceptors = []
657        self._interceptor_threads.extend([
658            self._stdout_interceptor_thread,
659            self._stderr_interceptor_thread,
660        ])
661        self._interceptors.extend([
662            self._stdout_interceptor,
663            self._stderr_interceptor,
664        ])
665        self.stop_log_fd_interception(unused_only=True)
666
667
668    def stop_log_fd_interception(self, unused_only: bool = False):
669        """
670        Stop the file descriptor monitoring threads.
671        """
672        if not self.write_timestamps:
673            return
674
675        interceptors = self.__dict__.get('_interceptors', [])
676        interceptor_threads = self.__dict__.get('_interceptor_threads', [])
677
678        end_ix = len(interceptors) if not unused_only else -2
679
680        for interceptor in interceptors[:end_ix]:
681            interceptor.stop_interception()
682        del interceptors[:end_ix]
683
684        for thread in interceptor_threads[:end_ix]:
685            try:
686                thread.join()
687            except Exception:
688                warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}")
689        del interceptor_threads[:end_ix]
690
691    def touch(self):
692        """
693        Touch the latest subfile.
694        """
695        subfile_path = self.get_latest_subfile_path()
696        subfile_path.touch()
697
698    def isatty(self) -> bool:
699        return True
700
701    def __repr__(self) -> str:
702        """
703        Return basic info for this `RotatingFile`.
704        """
705        return (
706            "RotatingFile("
707            + f"'{self.file_path.as_posix()}', "
708            + f"num_files_to_keep={self.num_files_to_keep}, "
709            + f"max_file_size={self.max_file_size})"
710        )

A RotatingFile may be treated like a normal file-like object. Under the hood, however, it will create new sub-files and delete old ones.

RotatingFile( file_path: pathlib.Path, num_files_to_keep: Optional[int] = None, max_file_size: Optional[int] = None, redirect_streams: bool = False, write_timestamps: bool = False, timestamp_format: Optional[str] = None, write_callback: Optional[Callable[[str], NoneType]] = None)
 35    def __init__(
 36        self,
 37        file_path: pathlib.Path,
 38        num_files_to_keep: Optional[int] = None,
 39        max_file_size: Optional[int] = None,
 40        redirect_streams: bool = False,
 41        write_timestamps: bool = False,
 42        timestamp_format: Optional[str] = None,
 43        write_callback: Optional[Callable[[str], None]] = None,
 44    ):
 45        """
 46        Create a file-like object which manages other files.
 47
 48        Parameters
 49        ----------
 50        num_files_to_keep: int, default None
 51            How many sub-files to keep at any given time.
 52            Defaults to the configured value (5).
 53
 54        max_file_size: int, default None
 55            How large in bytes each sub-file can grow before another file is created.
 56            Note that this is not a hard limit but rather a threshold
 57            which may be slightly exceeded.
 58            Defaults to the configured value (100_000).
 59
 60        redirect_streams: bool, default False
 61            If `True`, redirect previous file streams when opening a new file descriptor.
 62            
 63            NOTE: Only set this to `True` if you are entering into a daemon context.
 64            Doing so will redirect `sys.stdout` and `sys.stderr` into the log files.
 65
 66        write_timestamps: bool, default False
 67            If `True`, prepend the current UTC timestamp to each line of the file.
 68
 69        timestamp_format: str, default None
 70            If `write_timestamps` is `True`, use this format for the timestamps.
 71            Defaults to `'%Y-%m-%d %H:%M'`.
 72
 73        write_callback: Optional[Callable[[str], None]], default None
 74            If provided, execute this callback with the data to be written.
 75        """
 76        self.file_path = pathlib.Path(file_path)
 77        if num_files_to_keep is None:
 78            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
 79        if max_file_size is None:
 80            max_file_size = get_config('jobs', 'logs', 'max_file_size')
 81        if timestamp_format is None:
 82            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
 83        if num_files_to_keep < 1:
 84            raise ValueError("At least 1 file must be kept.")
 85        if max_file_size < 100:
 86            raise ValueError("Subfiles must contain at least 100 bytes.")
 87
 88        self.num_files_to_keep = num_files_to_keep
 89        self.max_file_size = max_file_size
 90        self.redirect_streams = redirect_streams
 91        self.write_timestamps = write_timestamps
 92        self.timestamp_format = timestamp_format
 93        self.write_callback = write_callback
 94        self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?')
 95
 96        ### When subfiles are opened, map from their index to the file objects.
 97        self.subfile_objects = {}
 98        self._redirected_subfile_objects = {}
 99        self._current_file_obj = None
100        self._previous_file_obj = None
101
102        ### When reading, keep track of the file index and position.
103        self._cursor: Tuple[int, int] = (0, 0)
104
105        ### Don't forget to close any stray files.
106        atexit.register(self.close)

Create a file-like object which manages other files.

Parameters
  • num_files_to_keep (int, default None): How many sub-files to keep at any given time. Defaults to the configured value (5).
  • max_file_size (int, default None): How large in bytes each sub-file can grow before another file is created. Note that this is not a hard limit but rather a threshold which may be slightly exceeded. Defaults to the configured value (100_000).
  • redirect_streams (bool, default False): If True, redirect previous file streams when opening a new file descriptor.

    NOTE: Only set this to True if you are entering into a daemon context. Doing so will redirect sys.stdout and sys.stderr into the log files.

  • write_timestamps (bool, default False): If True, prepend the current UTC timestamp to each line of the file.
  • timestamp_format (str, default None): If write_timestamps is True, use this format for the timestamps. Defaults to '%Y-%m-%d %H:%M'.
  • write_callback (Optional[Callable[[str], None]], default None): If provided, execute this callback with the data to be written.
SEEK_BACK_ATTEMPTS: int = 5
file_path
num_files_to_keep
max_file_size
redirect_streams
write_timestamps
timestamp_format
write_callback
subfile_regex_pattern
subfile_objects
def fileno(self):
109    def fileno(self):
110        """
111        Return the file descriptor for the latest subfile.
112        """
113        self.refresh_files(start_interception=False)
114        return self._current_file_obj.fileno()

Return the file descriptor for the latest subfile.

def get_latest_subfile_path(self) -> pathlib.Path:
117    def get_latest_subfile_path(self) -> pathlib.Path:
118        """
119        Return the path for the latest subfile to which to write into.
120        """
121        return self.get_subfile_path_from_index(
122            self.get_latest_subfile_index()
123        )

Return the path for the latest subfile to which to write into.

def get_remaining_subfile_size(self, subfile_index: int) -> int:
126    def get_remaining_subfile_size(self, subfile_index: int) -> int:
127        """
128        Return the remaining buffer size for a subfile.
129
130        Parameters
131        ---------
132        subfile_index: int
133            The index of the subfile to be checked.
134
135        Returns
136        -------
137        The remaining size in bytes.
138        """
139        subfile_path = self.get_subfile_path_from_index(subfile_index)
140        if not subfile_path.exists():
141            return self.max_file_size
142
143        return self.max_file_size - os.path.getsize(subfile_path)

Return the remaining buffer size for a subfile.

Parameters
  • subfile_index (int): The index of the subfile to be checked.
Returns
  • The remaining size in bytes.
def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
146    def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
147        """
148        Return whether a given subfile is too large.
149
150        Parameters
151        ----------
152        subfile_index: int
153            The index of the subfile to be checked.
154
155        potential_new_len: int, default 0
156            The length of a potential write of new data.
157
158        Returns
159        -------
160        A bool indicating the subfile is or will be too large.
161        """
162        subfile_path = self.get_subfile_path_from_index(subfile_index)
163        if not subfile_path.exists():
164            return False
165
166        self.flush()
167
168        return (
169            (os.path.getsize(subfile_path) + potential_new_len)
170            >=
171            self.max_file_size
172        )

Return whether a given subfile is too large.

Parameters
  • subfile_index (int): The index of the subfile to be checked.
  • potential_new_len (int, default 0): The length of a potential write of new data.
Returns
  • A bool indicating the subfile is or will be too large.
def get_latest_subfile_index(self) -> int:
175    def get_latest_subfile_index(self) -> int:
176        """
177        Return the latest existing subfile index.
178        If no index may be found, return -1.
179        """
180        existing_subfile_paths = self.get_existing_subfile_paths()
181        latest_index = (
182            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
183            if existing_subfile_paths
184            else 0
185        )
186        return latest_index

Return the latest existing subfile index. If no index may be found, return -1.

def get_index_from_subfile_name(self, subfile_name: str) -> int:
189    def get_index_from_subfile_name(self, subfile_name: str) -> int:
190        """
191        Return the index from a given subfile name.
192        If the file name cannot be parsed, return -1.
193        """
194        try:
195            return int(subfile_name.replace(self.file_path.name + '.', ''))
196        except Exception:
197            return -1

Return the index from a given subfile name. If the file name cannot be parsed, return -1.

def get_subfile_name_from_index(self, subfile_index: int) -> str:
200    def get_subfile_name_from_index(self, subfile_index: int) -> str:
201        """
202        Return the subfile name from the given index.
203        """
204        return f'{self.file_path.name}.{subfile_index}'

Return the subfile name from the given index.

def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
207    def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
208        """
209        Return the subfile's path from its index.
210        """
211        return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)

Return the subfile's path from its index.

def get_existing_subfile_indices(self) -> List[int]:
214    def get_existing_subfile_indices(self) -> List[int]:
215        """
216        Return of list of subfile indices which exist on disk.
217        """
218        existing_subfile_paths = self.get_existing_subfile_paths()
219        return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]

Return of list of subfile indices which exist on disk.

def get_existing_subfile_paths(self) -> List[pathlib.Path]:
222    def get_existing_subfile_paths(self) -> List[pathlib.Path]:
223        """
224        Return a list of file paths that match the input filename pattern.
225        """
226        if not self.file_path.parent.exists():
227            return []
228
229        subfile_names_indices = sorted(
230            [
231                (file_name, self.get_index_from_subfile_name(file_name))
232                for file_name in os.listdir(self.file_path.parent)
233                if (
234                    file_name.startswith(self.file_path.name)
235                    and re.match(self.subfile_regex_pattern, file_name)
236                )
237            ],
238            key=lambda x: x[1],
239        )
240        return [
241            (self.file_path.parent / file_name)
242            for file_name, _ in subfile_names_indices
243        ]

Return a list of file paths that match the input filename pattern.

def refresh_files( self, potential_new_len: int = 0, start_interception: bool = False) -> '_io.TextUIWrapper':
246    def refresh_files(
247        self,
248        potential_new_len: int = 0,
249        start_interception: bool = False,
250    ) -> '_io.TextUIWrapper':
251        """
252        Check the state of the subfiles.
253        If the latest subfile is too large, create a new file and delete old ones.
254
255        Parameters
256        ----------
257        potential_new_len: int, default 0
258
259        start_interception: bool, default False
260            If `True`, kick off the file interception threads.
261        """
262        self.flush()
263
264        latest_subfile_index = self.get_latest_subfile_index()
265        latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index)
266
267        ### First run with existing log files: open the most recent log file.
268        is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None)
269
270        ### Sometimes a new file is created but output doesn't switch over.
271        lost_latest_handle = (
272            self._current_file_obj is not None
273            and
274            self.get_index_from_subfile_name(self._current_file_obj.name) == -1
275        )
276        if is_first_run_with_logs or lost_latest_handle:
277            self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8')
278            if self.redirect_streams:
279                try:
280                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
281                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
282                except OSError:
283                    warn(
284                        f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}"
285                    )
286                if start_interception and self.write_timestamps:
287                    self.start_log_fd_interception()
288
289        create_new_file = (
290            (latest_subfile_index == -1)
291            or
292            self._current_file_obj is None
293            or
294            self.is_subfile_too_large(latest_subfile_index, potential_new_len)
295        )
296        if create_new_file:
297            self.increment_subfiles()
298
299        return self._current_file_obj

Check the state of the subfiles. If the latest subfile is too large, create a new file and delete old ones.

Parameters
  • potential_new_len (int, default 0):

  • start_interception (bool, default False): If True, kick off the file interception threads.

def increment_subfiles(self, increment_by: int = 1):
301    def increment_subfiles(self, increment_by: int = 1):
302        """
303        Create a new subfile and switch the file pointer over.
304        """
305        latest_subfile_index = self.get_latest_subfile_index()
306        old_subfile_index = latest_subfile_index
307        new_subfile_index = old_subfile_index + increment_by
308        new_file_path = self.get_subfile_path_from_index(new_subfile_index)
309        self._previous_file_obj = self._current_file_obj
310        self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8')
311        self.subfile_objects[new_subfile_index] = self._current_file_obj
312        self.flush()
313
314        if self.redirect_streams:
315            if self._previous_file_obj is not None:
316                self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj
317                daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj)
318            daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
319            daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
320        self.close(unused_only=True)
321
322        ### Sanity check in case writing somehow fails.
323        if self._previous_file_obj is self._current_file_obj:
324            self._previous_file_obj = None
325
326        self.delete(unused_only=True)

Create a new subfile and switch the file pointer over.

def close(self, unused_only: bool = False) -> None:
328    def close(self, unused_only: bool = False) -> None:
329        """
330        Close any open file descriptors.
331
332        Parameters
333        ----------
334        unused_only: bool, default False
335            If `True`, only close file descriptors not currently in use.
336        """
337        self.stop_log_fd_interception(unused_only=unused_only)
338        subfile_indices = sorted(self.subfile_objects.keys())
339        for subfile_index in subfile_indices:
340            subfile_object = self.subfile_objects[subfile_index]
341            if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj):
342                continue
343            try:
344                if not subfile_object.closed:
345                    subfile_object.close()
346            except Exception:
347                warn(f"Failed to close an open subfile:\n{traceback.format_exc()}")
348
349            _ = self.subfile_objects.pop(subfile_index, None)
350            if self.redirect_streams:
351                _ = self._redirected_subfile_objects.pop(subfile_index, None)
352
353        if not unused_only:
354            self._previous_file_obj = None
355            self._current_file_obj = None

Close any open file descriptors.

Parameters
  • unused_only (bool, default False): If True, only close file descriptors not currently in use.
def get_timestamp_prefix_str(self) -> str:
358    def get_timestamp_prefix_str(self) -> str:
359        """
360        Return the current minute prefix string.
361        """
362        return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '

Return the current minute prefix string.

def write(self, data: str) -> None:
365    def write(self, data: str) -> None:
366        """
367        Write the given text into the latest subfile.
368        If the subfile will be too large, create a new subfile.
369        If too many subfiles exist at once, the oldest one will be deleted.
370
371        NOTE: This will not split data across multiple files.
372        As such, if data is larger than max_file_size, then the corresponding subfile
373        may exceed this limit.
374        """
375        try:
376            if callable(self.write_callback):
377                self.write_callback(data)
378        except Exception:
379            warn(f"Failed to execute write callback:\n{traceback.format_exc()}")
380
381        try:
382            self.file_path.parent.mkdir(exist_ok=True, parents=True)
383            if isinstance(data, bytes):
384                data = data.decode('utf-8')
385
386            prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else ""
387            suffix_str = "\n" if self.write_timestamps else ""
388            self.refresh_files(
389                potential_new_len = len(prefix_str + data + suffix_str),
390                start_interception = self.write_timestamps,
391            )
392            try:
393                if prefix_str:
394                    self._current_file_obj.write(prefix_str)
395                self._current_file_obj.write(data)
396                if suffix_str:
397                    self._current_file_obj.write(suffix_str)
398            except BrokenPipeError:
399                warn("BrokenPipeError encountered. The daemon may have been terminated.")
400                return
401            except Exception:
402                warn(f"Failed to write to subfile:\n{traceback.format_exc()}")
403            self.flush()
404            self.delete(unused_only=True)
405        except Exception as e:
406            warn(f"Unexpected error in RotatingFile.write: {e}")

Write the given text into the latest subfile. If the subfile will be too large, create a new subfile. If too many subfiles exist at once, the oldest one will be deleted.

NOTE: This will not split data across multiple files. As such, if data is larger than max_file_size, then the corresponding subfile may exceed this limit.

def delete(self, unused_only: bool = False) -> None:
409    def delete(self, unused_only: bool = False) -> None:
410        """
411        Delete old subfiles.
412
413        Parameters
414        ----------
415        unused_only: bool, default False
416            If `True`, only delete subfiles which are no longer needed.
417        """
418        existing_subfile_paths = self.get_existing_subfile_paths()
419        if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep:
420            return
421
422        self.flush()
423        self.close(unused_only=unused_only)
424
425        end_ix = (
426            (-1 * self.num_files_to_keep)
427            if unused_only
428            else len(existing_subfile_paths)
429        )
430        for subfile_path_to_delete in existing_subfile_paths[0:end_ix]:
431            subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name)
432
433            try:
434                subfile_path_to_delete.unlink()
435            except Exception:
436                warn(
437                    f"Unable to delete subfile '{subfile_path_to_delete}':\n"
438                    + f"{traceback.format_exc()}"
439                )

Delete old subfiles.

Parameters
  • unused_only (bool, default False): If True, only delete subfiles which are no longer needed.
def read(self, *args, **kwargs) -> str:
442    def read(self, *args, **kwargs) -> str:
443        """
444        Read the contents of the existing subfiles.
445        """
446        existing_subfile_indices = [
447            self.get_index_from_subfile_name(subfile_path.name)
448            for subfile_path in self.get_existing_subfile_paths()
449        ]
450        paths_to_read = [
451            self.get_subfile_path_from_index(subfile_index)
452            for subfile_index in existing_subfile_indices
453            if subfile_index >= self._cursor[0]
454        ]
455        buffer = ''
456        refresh_cursor = True
457        for subfile_path in paths_to_read:
458            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
459            seek_ix = (
460                self._cursor[1]
461                if subfile_index == self._cursor[0]
462                else 0
463            )
464
465            if (
466                subfile_index in self.subfile_objects
467                and
468                subfile_index not in self._redirected_subfile_objects
469            ):
470                subfile_object = self.subfile_objects[subfile_index]
471                for i in range(self.SEEK_BACK_ATTEMPTS):
472                    try:
473                        subfile_object.seek(max(seek_ix - i, 0))
474                        buffer += subfile_object.read()
475                    except UnicodeDecodeError:
476                        continue
477                    break
478            else:
479                with open(subfile_path, 'r', encoding='utf-8') as f:
480                    for i in range(self.SEEK_BACK_ATTEMPTS):
481                        try:
482                            f.seek(max(seek_ix - i, 0))
483                            buffer += f.read()
484                        except UnicodeDecodeError:
485                            continue
486                        break
487
488                    ### Handle the case when no files have yet been opened.
489                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
490                        self._cursor = (subfile_index, f.tell())
491                        refresh_cursor = False
492
493        if refresh_cursor:
494            self.refresh_cursor()
495        return buffer

Read the contents of the existing subfiles.

def refresh_cursor(self) -> None:
498    def refresh_cursor(self) -> None:
499        """
500        Update the cursor to the latest subfile index and file.tell() value.
501        """
502        self.flush()
503        existing_subfile_paths = self.get_existing_subfile_paths()
504        current_ix = (
505            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
506            if existing_subfile_paths
507            else 0
508        )
509        position = self._current_file_obj.tell() if self._current_file_obj is not None else 0
510        self._cursor = (current_ix, position)

Update the cursor to the latest subfile index and file.tell() value.

def readlines(self) -> List[str]:
513    def readlines(self) -> List[str]:
514        """
515        Return a list of lines of text.
516        """
517        existing_subfile_indices = [
518            self.get_index_from_subfile_name(subfile_path.name)
519            for subfile_path in self.get_existing_subfile_paths()
520        ]
521        paths_to_read = [
522            self.get_subfile_path_from_index(subfile_index)
523            for subfile_index in existing_subfile_indices
524            if subfile_index >= self._cursor[0]
525        ]
526
527        lines = []
528        refresh_cursor = True
529        for subfile_path in paths_to_read:
530            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
531            seek_ix = (
532                self._cursor[1]
533                if subfile_index == self._cursor[0]
534                else 0
535            )
536
537            subfile_lines = []
538            if (
539                subfile_index in self.subfile_objects
540                and
541                subfile_index not in self._redirected_subfile_objects
542            ):
543                subfile_object = self.subfile_objects[subfile_index]
544                for i in range(self.SEEK_BACK_ATTEMPTS):
545                    try:
546                        subfile_object.seek(max((seek_ix - i), 0))
547                        subfile_lines = subfile_object.readlines()
548                    except UnicodeDecodeError:
549                        continue
550                    break
551            else:
552                with open(subfile_path, 'r', encoding='utf-8') as f:
553                    for i in range(self.SEEK_BACK_ATTEMPTS):
554                        try:
555                            f.seek(max(seek_ix - i, 0))
556                            subfile_lines = f.readlines()
557                        except UnicodeDecodeError:
558                            continue
559                        break
560
561                    ### Handle the case when no files have yet been opened.
562                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
563                        self._cursor = (subfile_index, f.tell())
564                        refresh_cursor = False
565
566            ### Sometimes a line may span multiple files.
567            if lines and subfile_lines and not lines[-1].endswith('\n'):
568                lines[-1] += subfile_lines[0]
569                new_lines = subfile_lines[1:]
570            else:
571                new_lines = subfile_lines
572            lines.extend(new_lines)
573
574        if refresh_cursor:
575            self.refresh_cursor()
576        return lines

Return a list of lines of text.

def seekable(self) -> bool:
579    def seekable(self) -> bool:
580        return True

Return whether object supports random access.

If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().

def seek(self, position: int) -> None:
583    def seek(self, position: int) -> None:
584        """
585        Seek to the beginning of the logs stream.
586        """
587        existing_subfile_indices = self.get_existing_subfile_indices()
588        min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0
589        max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0
590        if position == 0:
591            self._cursor = (min_ix, 0)
592            return
593
594        self._cursor = (max_ix, position)
595        if self._current_file_obj is not None:
596            self._current_file_obj.seek(position)

Seek to the beginning of the logs stream.

def flush(self) -> None:
599    def flush(self) -> None:
600        """
601        Flush any open subfiles.
602        """
603        for subfile_index, subfile_object in self.subfile_objects.items():
604            if not subfile_object.closed:
605                try:
606                    subfile_object.flush()
607                except Exception:
608                    warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}")
609
610        if self.redirect_streams:
611            try:
612                sys.stdout.flush()
613            except BrokenPipeError:
614                pass
615            except Exception:
616                warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}")
617            try:
618                sys.stderr.flush()
619            except BrokenPipeError:
620                pass
621            except Exception:
622                warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")

Flush any open subfiles.

def start_log_fd_interception(self):
625    def start_log_fd_interception(self):
626        """
627        Start the file descriptor monitoring threads.
628        """
629        if not self.write_timestamps:
630            return
631
632        self._stdout_interceptor = FileDescriptorInterceptor(
633            sys.stdout.fileno(),
634            self.get_timestamp_prefix_str,
635        )
636        self._stderr_interceptor = FileDescriptorInterceptor(
637            sys.stderr.fileno(),
638            self.get_timestamp_prefix_str,
639        )
640
641        self._stdout_interceptor_thread = Thread(
642            target = self._stdout_interceptor.start_interception,
643            daemon = True,
644        )
645        self._stderr_interceptor_thread = Thread(
646            target = self._stderr_interceptor.start_interception,
647            daemon = True,
648        )
649        self._stdout_interceptor_thread.start()
650        self._stderr_interceptor_thread.start()
651        self._intercepting = True
652
653        if '_interceptor_threads' not in self.__dict__:
654            self._interceptor_threads = []
655        if '_interceptors' not in self.__dict__:
656            self._interceptors = []
657        self._interceptor_threads.extend([
658            self._stdout_interceptor_thread,
659            self._stderr_interceptor_thread,
660        ])
661        self._interceptors.extend([
662            self._stdout_interceptor,
663            self._stderr_interceptor,
664        ])
665        self.stop_log_fd_interception(unused_only=True)

Start the file descriptor monitoring threads.

def stop_log_fd_interception(self, unused_only: bool = False):
668    def stop_log_fd_interception(self, unused_only: bool = False):
669        """
670        Stop the file descriptor monitoring threads.
671        """
672        if not self.write_timestamps:
673            return
674
675        interceptors = self.__dict__.get('_interceptors', [])
676        interceptor_threads = self.__dict__.get('_interceptor_threads', [])
677
678        end_ix = len(interceptors) if not unused_only else -2
679
680        for interceptor in interceptors[:end_ix]:
681            interceptor.stop_interception()
682        del interceptors[:end_ix]
683
684        for thread in interceptor_threads[:end_ix]:
685            try:
686                thread.join()
687            except Exception:
688                warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}")
689        del interceptor_threads[:end_ix]

Stop the file descriptor monitoring threads.

def touch(self):
691    def touch(self):
692        """
693        Touch the latest subfile.
694        """
695        subfile_path = self.get_latest_subfile_path()
696        subfile_path.touch()

Touch the latest subfile.

def isatty(self) -> bool:
698    def isatty(self) -> bool:
699        return True

Return whether this is an 'interactive' stream.

Return False if it can't be determined.

class FileDescriptorInterceptor:
 23class FileDescriptorInterceptor:
 24    """
 25    A management class to intercept data written to a file descriptor.
 26    """
 27    def __init__(
 28        self,
 29        file_descriptor: int,
 30        injection_hook: Callable[[], str],
 31    ):
 32        """
 33        Parameters
 34        ----------
 35        file_descriptor: int
 36            The OS file descriptor from which to read.
 37
 38        injection_hook: Callable[[], str]
 39            A callable which returns a string to be injected into the written data.
 40        """
 41        self.stop_event = Event()
 42        self.injection_hook = injection_hook
 43        self.original_file_descriptor = file_descriptor
 44        self.new_file_descriptor = os.dup(file_descriptor)
 45        self.read_pipe, self.write_pipe = os.pipe()
 46        self.signal_read_pipe, self.signal_write_pipe = os.pipe()
 47        os.dup2(self.write_pipe, file_descriptor)
 48
 49    def start_interception(self):
 50        """
 51        Read from the file descriptor and write the modified data after injection.
 52
 53        NOTE: This is blocking and is meant to be run in a thread.
 54        """
 55        os.set_blocking(self.read_pipe, False)
 56        os.set_blocking(self.signal_read_pipe, False)
 57        is_first_read = True
 58        while not self.stop_event.is_set():
 59            try:
 60                rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1)
 61                if self.signal_read_pipe in rlist:
 62                    break
 63                if not rlist:
 64                    continue
 65                data = os.read(self.read_pipe, 1024)
 66                if not data:
 67                    break
 68            except BlockingIOError:
 69                continue
 70            except OSError as e:
 71                if e.errno == errno.EBADF:
 72                    ### File descriptor is closed.
 73                    pass
 74                elif e.errno == errno.EINTR:
 75                    continue  # Interrupted system call, just try again
 76                else:
 77                    warn(f"OSError in FileDescriptorInterceptor: {e}")
 78                break
 79
 80            try:
 81                last_char_is_newline = data[-1] == b'\n'
 82
 83                injected_str = self.injection_hook()
 84                injected_bytes = injected_str.encode('utf-8')
 85
 86                if is_first_read:
 87                    data = b'\n' + data
 88                    is_first_read = False
 89
 90                modified_data = (
 91                    (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n')
 92                    if last_char_is_newline
 93                    else data.replace(b'\n', b'\n' + injected_bytes)
 94                )
 95                os.write(self.new_file_descriptor, modified_data)
 96            except (BrokenPipeError, OSError):
 97                break
 98            except Exception:
 99                with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
100                    f.write(traceback.format_exc())
101                break
102
103
104    def stop_interception(self):
105        """
106        Close the new file descriptors.
107        """
108        self.stop_event.set()
109        os.write(self.signal_write_pipe, b'\0')
110        try:
111            os.close(self.new_file_descriptor)
112        except OSError as e:
113            if e.errno != FD_CLOSED:
114                warn(
115                    "Error while trying to close the duplicated file descriptor:\n"
116                    + f"{traceback.format_exc()}"
117                )
118
119        try:
120            os.close(self.write_pipe)
121        except OSError as e:
122            if e.errno != FD_CLOSED:
123                warn(
124                    "Error while trying to close the write-pipe "
125                    + "to the intercepted file descriptor:\n"
126                    + f"{traceback.format_exc()}"
127                )
128        try:
129            os.close(self.read_pipe)
130        except OSError as e:
131            if e.errno != FD_CLOSED:
132                warn(
133                    "Error while trying to close the read-pipe "
134                    + "to the intercepted file descriptor:\n"
135                    + f"{traceback.format_exc()}"
136                )
137
138        try:
139            os.close(self.signal_read_pipe)
140        except OSError as e:
141            if e.errno != FD_CLOSED:
142                warn(
143                    "Error while trying to close the signal-read-pipe "
144                    + "to the intercepted file descriptor:\n"
145                    + f"{traceback.format_exc()}"
146                )
147
148        try:
149            os.close(self.signal_write_pipe)
150        except OSError as e:
151            if e.errno != FD_CLOSED:
152                warn(
153                    "Error while trying to close the signal-write-pipe "
154                    + "to the intercepted file descriptor:\n"
155                    + f"{traceback.format_exc()}"
156                )

A management class to intercept data written to a file descriptor.

FileDescriptorInterceptor(file_descriptor: int, injection_hook: Callable[[], str])
27    def __init__(
28        self,
29        file_descriptor: int,
30        injection_hook: Callable[[], str],
31    ):
32        """
33        Parameters
34        ----------
35        file_descriptor: int
36            The OS file descriptor from which to read.
37
38        injection_hook: Callable[[], str]
39            A callable which returns a string to be injected into the written data.
40        """
41        self.stop_event = Event()
42        self.injection_hook = injection_hook
43        self.original_file_descriptor = file_descriptor
44        self.new_file_descriptor = os.dup(file_descriptor)
45        self.read_pipe, self.write_pipe = os.pipe()
46        self.signal_read_pipe, self.signal_write_pipe = os.pipe()
47        os.dup2(self.write_pipe, file_descriptor)
Parameters
  • file_descriptor (int): The OS file descriptor from which to read.
  • injection_hook (Callable[[], str]): A callable which returns a string to be injected into the written data.
stop_event
injection_hook
original_file_descriptor
new_file_descriptor
def start_interception(self):
 49    def start_interception(self):
 50        """
 51        Read from the file descriptor and write the modified data after injection.
 52
 53        NOTE: This is blocking and is meant to be run in a thread.
 54        """
 55        os.set_blocking(self.read_pipe, False)
 56        os.set_blocking(self.signal_read_pipe, False)
 57        is_first_read = True
 58        while not self.stop_event.is_set():
 59            try:
 60                rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1)
 61                if self.signal_read_pipe in rlist:
 62                    break
 63                if not rlist:
 64                    continue
 65                data = os.read(self.read_pipe, 1024)
 66                if not data:
 67                    break
 68            except BlockingIOError:
 69                continue
 70            except OSError as e:
 71                if e.errno == errno.EBADF:
 72                    ### File descriptor is closed.
 73                    pass
 74                elif e.errno == errno.EINTR:
 75                    continue  # Interrupted system call, just try again
 76                else:
 77                    warn(f"OSError in FileDescriptorInterceptor: {e}")
 78                break
 79
 80            try:
 81                last_char_is_newline = data[-1] == b'\n'
 82
 83                injected_str = self.injection_hook()
 84                injected_bytes = injected_str.encode('utf-8')
 85
 86                if is_first_read:
 87                    data = b'\n' + data
 88                    is_first_read = False
 89
 90                modified_data = (
 91                    (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n')
 92                    if last_char_is_newline
 93                    else data.replace(b'\n', b'\n' + injected_bytes)
 94                )
 95                os.write(self.new_file_descriptor, modified_data)
 96            except (BrokenPipeError, OSError):
 97                break
 98            except Exception:
 99                with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
100                    f.write(traceback.format_exc())
101                break

Read from the file descriptor and write the modified data after injection.

NOTE: This is blocking and is meant to be run in a thread.

def stop_interception(self):
104    def stop_interception(self):
105        """
106        Close the new file descriptors.
107        """
108        self.stop_event.set()
109        os.write(self.signal_write_pipe, b'\0')
110        try:
111            os.close(self.new_file_descriptor)
112        except OSError as e:
113            if e.errno != FD_CLOSED:
114                warn(
115                    "Error while trying to close the duplicated file descriptor:\n"
116                    + f"{traceback.format_exc()}"
117                )
118
119        try:
120            os.close(self.write_pipe)
121        except OSError as e:
122            if e.errno != FD_CLOSED:
123                warn(
124                    "Error while trying to close the write-pipe "
125                    + "to the intercepted file descriptor:\n"
126                    + f"{traceback.format_exc()}"
127                )
128        try:
129            os.close(self.read_pipe)
130        except OSError as e:
131            if e.errno != FD_CLOSED:
132                warn(
133                    "Error while trying to close the read-pipe "
134                    + "to the intercepted file descriptor:\n"
135                    + f"{traceback.format_exc()}"
136                )
137
138        try:
139            os.close(self.signal_read_pipe)
140        except OSError as e:
141            if e.errno != FD_CLOSED:
142                warn(
143                    "Error while trying to close the signal-read-pipe "
144                    + "to the intercepted file descriptor:\n"
145                    + f"{traceback.format_exc()}"
146                )
147
148        try:
149            os.close(self.signal_write_pipe)
150        except OSError as e:
151            if e.errno != FD_CLOSED:
152                warn(
153                    "Error while trying to close the signal-write-pipe "
154                    + "to the intercepted file descriptor:\n"
155                    + f"{traceback.format_exc()}"
156                )

Close the new file descriptors.