meerschaum.utils.daemon

Manage Daemons via the Daemon class.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Manage Daemons via the `Daemon` class.
  7"""
  8
  9from __future__ import annotations
 10import os, pathlib, shutil, json, datetime, threading, shlex
 11from meerschaum.utils.typing import SuccessTuple, List, Optional, Callable, Any, Dict
 12from meerschaum.config._paths import DAEMON_RESOURCES_PATH
 13from meerschaum.utils.daemon.StdinFile import StdinFile
 14from meerschaum.utils.daemon.Daemon import Daemon
 15from meerschaum.utils.daemon.RotatingFile import RotatingFile
 16from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor
 17from meerschaum.utils.daemon._names import get_new_daemon_name
 18
 19
 20def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple:
 21    """Parse sysargs and execute a Meerschaum action as a daemon.
 22
 23    Parameters
 24    ----------
 25    sysargs: Optional[List[str]], default None
 26        The command line arguments used in a Meerschaum action.
 27
 28    Returns
 29    -------
 30    A SuccessTuple.
 31    """
 32    from meerschaum._internal.entry import entry
 33    _args = {}
 34    if '--name' in sysargs or '--job-name' in sysargs:
 35        from meerschaum._internal.arguments._parse_arguments import parse_arguments
 36        _args = parse_arguments(sysargs)
 37    filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')]
 38    try:
 39        label = shlex.join(filtered_sysargs) if sysargs else None
 40    except Exception as e:
 41        label = ' '.join(filtered_sysargs) if sysargs else None
 42
 43    name = _args.get('name', None)
 44    daemon = None
 45    if name:
 46        try:
 47            daemon = Daemon(daemon_id=name)
 48        except Exception as e:
 49            daemon = None
 50
 51    if daemon is not None:
 52        existing_sysargs = daemon.properties['target']['args'][0]
 53        existing_kwargs = parse_arguments(existing_sysargs)
 54
 55        ### Remove sysargs because flags are aliased.
 56        _ = _args.pop('daemon', None)
 57        _ = _args.pop('sysargs', None)
 58        _ = _args.pop('filtered_sysargs', None)
 59        debug = _args.pop('debug', None)
 60        _args['sub_args'] = sorted(_args.get('sub_args', []))
 61        _ = existing_kwargs.pop('daemon', None)
 62        _ = existing_kwargs.pop('sysargs', None)
 63        _ = existing_kwargs.pop('filtered_sysargs', None)
 64        _ = existing_kwargs.pop('debug', None)
 65        existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', []))
 66
 67        ### Only run if the kwargs equal or no actions are provided.
 68        if existing_kwargs == _args or not _args.get('action', []):
 69            if daemon.status == 'running':
 70                return True, f"Daemon '{daemon}' is already running."
 71            return daemon.run(
 72                debug=debug,
 73                allow_dirty_run=True,
 74            )
 75
 76    success_tuple = run_daemon(
 77        entry,
 78        filtered_sysargs,
 79        daemon_id=_args.get('name', None) if _args else None,
 80        label=label,
 81        keep_daemon_output=('--rm' not in (sysargs or [])),
 82    )
 83    return success_tuple
 84
 85
 86def daemon_action(**kw) -> SuccessTuple:
 87    """Execute a Meerschaum action as a daemon."""
 88    from meerschaum.utils.packages import run_python_package
 89    from meerschaum.utils.threading import Thread
 90    from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs
 91    from meerschaum.actions import get_action
 92
 93    kw['daemon'] = True
 94    kw['shell'] = False
 95
 96    action = kw.get('action', None)
 97    if action and get_action(action) is None:
 98        if not kw.get('allow_shell_job') and not kw.get('force'):
 99            return False, (
100                f"Action '{action}' isn't recognized.\n\n"
101                + "  Include `--allow-shell-job`, `--force`, or `-f`\n  " 
102                + "to enable shell commands to run as Meerschaum jobs."
103            )
104
105    sysargs = parse_dict_to_sysargs(kw)
106    rc = run_python_package('meerschaum', sysargs, venv=None, debug=False)
107    msg = "Success" if rc == 0 else f"Daemon returned code: {rc}"
108    return rc == 0, msg
109
110
111def run_daemon(
112    func: Callable[[Any], Any],
113    *args,
114    daemon_id: Optional[str] = None,
115    keep_daemon_output: bool = True,
116    allow_dirty_run: bool = False,
117    label: Optional[str] = None,
118    **kw
119) -> Any:
120    """Execute a function as a daemon."""
121    daemon = Daemon(
122        func,
123        daemon_id=daemon_id,
124        target_args=[arg for arg in args],
125        target_kw=kw,
126        label=label,
127    )
128    return daemon.run(
129        keep_daemon_output=keep_daemon_output,
130        allow_dirty_run=allow_dirty_run,
131    )
132
133
134def get_daemons() -> List[Daemon]:
135    """
136    Return all existing Daemons, sorted by end time.
137    """
138    daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()]
139    daemons_status = {daemon: daemon.status for daemon in daemons}
140    running_daemons = {
141        daemon: daemons_status[daemon]
142        for daemon in daemons
143        if daemons_status[daemon] == 'running'
144    }
145    paused_daemons = {
146        daemon: daemons_status[daemon]
147        for daemon in daemons
148        if daemons_status[daemon] == 'paused'
149    }
150    stopped_daemons = {
151        daemon: daemons_status[daemon]
152        for daemon in daemons
153        if daemons_status[daemon] == 'stopped'
154    }
155    daemons_began = {
156        daemon: daemon.properties.get('process', {}).get('began', '9999')
157        for daemon in daemons
158    }
159    daemons_paused = {
160        daemon: daemon.properties.get('process', {}).get('paused', '9999')
161        for daemon in daemons
162    }
163    daemons_ended = {
164        daemon: daemon.properties.get('process', {}).get('ended', '9999')
165        for daemon in daemons
166    }
167    sorted_stopped_daemons = [
168        daemon
169        for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x])
170    ]
171    sorted_paused_daemons = [
172        daemon
173        for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x])
174    ]
175    sorted_running_daemons = [
176        daemon
177        for daemon in sorted(running_daemons, key=lambda x: daemons_began[x])
178    ]
179    return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons
180
181
182def get_daemon_ids() -> List[str]:
183    """
184    Return the IDs of all daemons on disk.
185    """
186    return [
187        daemon_dir
188        for daemon_dir in sorted(os.listdir(DAEMON_RESOURCES_PATH))
189        if (DAEMON_RESOURCES_PATH / daemon_dir / 'properties.json').exists()
190    ]
191
192
193def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
194    """
195    Return a list of currently running daemons.
196    """
197    if daemons is None:
198        daemons = get_daemons()
199    return [
200        d
201        for d in daemons
202        if d.status == 'running'
203    ]
204
205
206def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
207    """
208    Return a list of active but paused daemons.
209    """
210    if daemons is None:
211        daemons = get_daemons()
212    return [
213        d
214        for d in daemons
215        if d.status == 'paused'
216    ]
217
218
219def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
220    """
221    Return a list of stopped daemons.
222    """
223    if daemons is None:
224        daemons = get_daemons()
225
226    return [
227        d
228        for d in daemons
229        if d.status == 'stopped'
230    ]
231
232
233def get_filtered_daemons(
234    filter_list: Optional[List[str]] = None,
235    warn: bool = False,
236) -> List[Daemon]:
237    """
238    Return a list of `Daemons` filtered by a list of `daemon_ids`.
239    Only `Daemons` that exist are returned.
240    
241    If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`).
242
243    Parameters
244    ----------
245    filter_list: Optional[List[str]], default None
246        List of `daemon_ids` to include. If `daemon_ids` is `None` or empty,
247        return all `Daemons`.
248
249    warn: bool, default False
250        If `True`, raise warnings for non-existent `daemon_ids`.
251
252    Returns
253    -------
254    A list of Daemon objects.
255
256    """
257    if not filter_list:
258        daemons = get_daemons()
259        return [d for d in daemons if not d.hidden]
260
261    from meerschaum.utils.warnings import warn as _warn
262    daemons = []
263    for d_id in filter_list:
264        try:
265            d = Daemon(daemon_id=d_id)
266            _exists = d.path.exists()
267        except Exception:
268            _exists = False
269        if not _exists:
270            if warn:
271                _warn(f"Daemon '{d_id}' does not exist.", stack=False)
272            continue
273        if d.hidden:
274            pass
275        daemons.append(d)
276    return daemons
277
278
279def running_in_daemon() -> bool:
280    """
281    Return whether the current thread is running in a Daemon context.
282    """
283    from meerschaum.config.static import STATIC_CONFIG
284    daemon_env_var = STATIC_CONFIG['environment']['daemon_id']
285    return daemon_env_var in os.environ
def daemon_entry(sysargs: Optional[List[str]] = None) -> Tuple[bool, str]:
21def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple:
22    """Parse sysargs and execute a Meerschaum action as a daemon.
23
24    Parameters
25    ----------
26    sysargs: Optional[List[str]], default None
27        The command line arguments used in a Meerschaum action.
28
29    Returns
30    -------
31    A SuccessTuple.
32    """
33    from meerschaum._internal.entry import entry
34    _args = {}
35    if '--name' in sysargs or '--job-name' in sysargs:
36        from meerschaum._internal.arguments._parse_arguments import parse_arguments
37        _args = parse_arguments(sysargs)
38    filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')]
39    try:
40        label = shlex.join(filtered_sysargs) if sysargs else None
41    except Exception as e:
42        label = ' '.join(filtered_sysargs) if sysargs else None
43
44    name = _args.get('name', None)
45    daemon = None
46    if name:
47        try:
48            daemon = Daemon(daemon_id=name)
49        except Exception as e:
50            daemon = None
51
52    if daemon is not None:
53        existing_sysargs = daemon.properties['target']['args'][0]
54        existing_kwargs = parse_arguments(existing_sysargs)
55
56        ### Remove sysargs because flags are aliased.
57        _ = _args.pop('daemon', None)
58        _ = _args.pop('sysargs', None)
59        _ = _args.pop('filtered_sysargs', None)
60        debug = _args.pop('debug', None)
61        _args['sub_args'] = sorted(_args.get('sub_args', []))
62        _ = existing_kwargs.pop('daemon', None)
63        _ = existing_kwargs.pop('sysargs', None)
64        _ = existing_kwargs.pop('filtered_sysargs', None)
65        _ = existing_kwargs.pop('debug', None)
66        existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', []))
67
68        ### Only run if the kwargs equal or no actions are provided.
69        if existing_kwargs == _args or not _args.get('action', []):
70            if daemon.status == 'running':
71                return True, f"Daemon '{daemon}' is already running."
72            return daemon.run(
73                debug=debug,
74                allow_dirty_run=True,
75            )
76
77    success_tuple = run_daemon(
78        entry,
79        filtered_sysargs,
80        daemon_id=_args.get('name', None) if _args else None,
81        label=label,
82        keep_daemon_output=('--rm' not in (sysargs or [])),
83    )
84    return success_tuple

Parse sysargs and execute a Meerschaum action as a daemon.

Parameters
  • sysargs (Optional[List[str]], default None): The command line arguments used in a Meerschaum action.
Returns
  • A SuccessTuple.
def daemon_action(**kw) -> Tuple[bool, str]:
 87def daemon_action(**kw) -> SuccessTuple:
 88    """Execute a Meerschaum action as a daemon."""
 89    from meerschaum.utils.packages import run_python_package
 90    from meerschaum.utils.threading import Thread
 91    from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs
 92    from meerschaum.actions import get_action
 93
 94    kw['daemon'] = True
 95    kw['shell'] = False
 96
 97    action = kw.get('action', None)
 98    if action and get_action(action) is None:
 99        if not kw.get('allow_shell_job') and not kw.get('force'):
100            return False, (
101                f"Action '{action}' isn't recognized.\n\n"
102                + "  Include `--allow-shell-job`, `--force`, or `-f`\n  " 
103                + "to enable shell commands to run as Meerschaum jobs."
104            )
105
106    sysargs = parse_dict_to_sysargs(kw)
107    rc = run_python_package('meerschaum', sysargs, venv=None, debug=False)
108    msg = "Success" if rc == 0 else f"Daemon returned code: {rc}"
109    return rc == 0, msg

Execute a Meerschaum action as a daemon.

def run_daemon( func: Callable[[Any], Any], *args, daemon_id: Optional[str] = None, keep_daemon_output: bool = True, allow_dirty_run: bool = False, label: Optional[str] = None, **kw) -> Any:
112def run_daemon(
113    func: Callable[[Any], Any],
114    *args,
115    daemon_id: Optional[str] = None,
116    keep_daemon_output: bool = True,
117    allow_dirty_run: bool = False,
118    label: Optional[str] = None,
119    **kw
120) -> Any:
121    """Execute a function as a daemon."""
122    daemon = Daemon(
123        func,
124        daemon_id=daemon_id,
125        target_args=[arg for arg in args],
126        target_kw=kw,
127        label=label,
128    )
129    return daemon.run(
130        keep_daemon_output=keep_daemon_output,
131        allow_dirty_run=allow_dirty_run,
132    )

Execute a function as a daemon.

def get_daemons() -> List[meerschaum.utils.daemon.Daemon.Daemon]:
135def get_daemons() -> List[Daemon]:
136    """
137    Return all existing Daemons, sorted by end time.
138    """
139    daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()]
140    daemons_status = {daemon: daemon.status for daemon in daemons}
141    running_daemons = {
142        daemon: daemons_status[daemon]
143        for daemon in daemons
144        if daemons_status[daemon] == 'running'
145    }
146    paused_daemons = {
147        daemon: daemons_status[daemon]
148        for daemon in daemons
149        if daemons_status[daemon] == 'paused'
150    }
151    stopped_daemons = {
152        daemon: daemons_status[daemon]
153        for daemon in daemons
154        if daemons_status[daemon] == 'stopped'
155    }
156    daemons_began = {
157        daemon: daemon.properties.get('process', {}).get('began', '9999')
158        for daemon in daemons
159    }
160    daemons_paused = {
161        daemon: daemon.properties.get('process', {}).get('paused', '9999')
162        for daemon in daemons
163    }
164    daemons_ended = {
165        daemon: daemon.properties.get('process', {}).get('ended', '9999')
166        for daemon in daemons
167    }
168    sorted_stopped_daemons = [
169        daemon
170        for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x])
171    ]
172    sorted_paused_daemons = [
173        daemon
174        for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x])
175    ]
176    sorted_running_daemons = [
177        daemon
178        for daemon in sorted(running_daemons, key=lambda x: daemons_began[x])
179    ]
180    return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons

Return all existing Daemons, sorted by end time.

def get_daemon_ids() -> List[str]:
183def get_daemon_ids() -> List[str]:
184    """
185    Return the IDs of all daemons on disk.
186    """
187    return [
188        daemon_dir
189        for daemon_dir in sorted(os.listdir(DAEMON_RESOURCES_PATH))
190        if (DAEMON_RESOURCES_PATH / daemon_dir / 'properties.json').exists()
191    ]

Return the IDs of all daemons on disk.

def get_running_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
194def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
195    """
196    Return a list of currently running daemons.
197    """
198    if daemons is None:
199        daemons = get_daemons()
200    return [
201        d
202        for d in daemons
203        if d.status == 'running'
204    ]

Return a list of currently running daemons.

def get_paused_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
207def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
208    """
209    Return a list of active but paused daemons.
210    """
211    if daemons is None:
212        daemons = get_daemons()
213    return [
214        d
215        for d in daemons
216        if d.status == 'paused'
217    ]

Return a list of active but paused daemons.

def get_stopped_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
220def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
221    """
222    Return a list of stopped daemons.
223    """
224    if daemons is None:
225        daemons = get_daemons()
226
227    return [
228        d
229        for d in daemons
230        if d.status == 'stopped'
231    ]

Return a list of stopped daemons.

def get_filtered_daemons( filter_list: Optional[List[str]] = None, warn: bool = False) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
234def get_filtered_daemons(
235    filter_list: Optional[List[str]] = None,
236    warn: bool = False,
237) -> List[Daemon]:
238    """
239    Return a list of `Daemons` filtered by a list of `daemon_ids`.
240    Only `Daemons` that exist are returned.
241    
242    If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`).
243
244    Parameters
245    ----------
246    filter_list: Optional[List[str]], default None
247        List of `daemon_ids` to include. If `daemon_ids` is `None` or empty,
248        return all `Daemons`.
249
250    warn: bool, default False
251        If `True`, raise warnings for non-existent `daemon_ids`.
252
253    Returns
254    -------
255    A list of Daemon objects.
256
257    """
258    if not filter_list:
259        daemons = get_daemons()
260        return [d for d in daemons if not d.hidden]
261
262    from meerschaum.utils.warnings import warn as _warn
263    daemons = []
264    for d_id in filter_list:
265        try:
266            d = Daemon(daemon_id=d_id)
267            _exists = d.path.exists()
268        except Exception:
269            _exists = False
270        if not _exists:
271            if warn:
272                _warn(f"Daemon '{d_id}' does not exist.", stack=False)
273            continue
274        if d.hidden:
275            pass
276        daemons.append(d)
277    return daemons

Return a list of Daemons filtered by a list of daemon_ids. Only Daemons that exist are returned.

If filter_list is None or empty, return all Daemons (from get_daemons()).

Parameters
  • filter_list (Optional[List[str]], default None): List of daemon_ids to include. If daemon_ids is None or empty, return all Daemons.
  • warn (bool, default False): If True, raise warnings for non-existent daemon_ids.
Returns
  • A list of Daemon objects.
def running_in_daemon() -> bool:
280def running_in_daemon() -> bool:
281    """
282    Return whether the current thread is running in a Daemon context.
283    """
284    from meerschaum.config.static import STATIC_CONFIG
285    daemon_env_var = STATIC_CONFIG['environment']['daemon_id']
286    return daemon_env_var in os.environ

Return whether the current thread is running in a Daemon context.