meerschaum.utils.daemon

Manage Daemons via the Daemon class.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Manage Daemons via the `Daemon` class.
  7"""
  8
  9from __future__ import annotations
 10import os, pathlib, shutil, json, datetime, threading, shlex
 11from meerschaum.utils.typing import SuccessTuple, List, Optional, Callable, Any, Dict
 12from meerschaum.config._paths import DAEMON_RESOURCES_PATH
 13from meerschaum.utils.daemon.Daemon import Daemon
 14from meerschaum.utils.daemon.RotatingFile import RotatingFile
 15from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor
 16
 17
 18def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple:
 19    """Parse sysargs and execute a Meerschaum action as a daemon.
 20
 21    Parameters
 22    ----------
 23    sysargs: Optional[List[str]], default None
 24        The command line arguments used in a Meerschaum action.
 25
 26    Returns
 27    -------
 28    A SuccessTuple.
 29    """
 30    from meerschaum._internal.entry import entry
 31    _args = {}
 32    if '--name' in sysargs or '--job-name' in sysargs:
 33        from meerschaum._internal.arguments._parse_arguments import parse_arguments
 34        _args = parse_arguments(sysargs)
 35    filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')]
 36    try:
 37        label = shlex.join(filtered_sysargs) if sysargs else None
 38    except Exception as e:
 39        label = ' '.join(filtered_sysargs) if sysargs else None
 40
 41    name = _args.get('name', None)
 42    daemon = None
 43    if name:
 44        try:
 45            daemon = Daemon(daemon_id=name)
 46        except Exception as e:
 47            daemon = None
 48
 49    if daemon is not None:
 50        existing_sysargs = daemon.properties['target']['args'][0]
 51        existing_kwargs = parse_arguments(existing_sysargs)
 52
 53        ### Remove sysargs because flags are aliased.
 54        _ = _args.pop('daemon', None)
 55        _ = _args.pop('sysargs', None)
 56        _ = _args.pop('filtered_sysargs', None)
 57        debug = _args.pop('debug', None)
 58        _args['sub_args'] = sorted(_args.get('sub_args', []))
 59        _ = existing_kwargs.pop('daemon', None)
 60        _ = existing_kwargs.pop('sysargs', None)
 61        _ = existing_kwargs.pop('filtered_sysargs', None)
 62        _ = existing_kwargs.pop('debug', None)
 63        existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', []))
 64
 65        ### Only run if the kwargs equal or no actions are provided.
 66        if existing_kwargs == _args or not _args.get('action', []):
 67            if daemon.status == 'running':
 68                return True, f"Daemon '{daemon}' is already running."
 69            return daemon.run(
 70                debug=debug,
 71                allow_dirty_run=True,
 72            )
 73
 74    success_tuple = run_daemon(
 75        entry,
 76        filtered_sysargs,
 77        daemon_id=_args.get('name', None) if _args else None,
 78        label=label,
 79        keep_daemon_output=('--rm' not in sysargs),
 80    )
 81    return success_tuple
 82
 83
 84def daemon_action(**kw) -> SuccessTuple:
 85    """Execute a Meerschaum action as a daemon."""
 86    from meerschaum.utils.packages import run_python_package
 87    from meerschaum.utils.threading import Thread
 88    from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs
 89    from meerschaum.actions import get_action
 90
 91    kw['daemon'] = True
 92    kw['shell'] = False
 93
 94    action = kw.get('action', None)
 95    if action and get_action(action) is None:
 96        if not kw.get('allow_shell_job') and not kw.get('force'):
 97            return False, (
 98                f"Action '{action}' isn't recognized.\n\n"
 99                + "  Include `--allow-shell-job`, `--force`, or `-f`\n  " 
100                + "to enable shell commands to run as Meerschaum jobs."
101            )
102
103    sysargs = parse_dict_to_sysargs(kw)
104    rc = run_python_package('meerschaum', sysargs, venv=None, debug=False)
105    msg = "Success" if rc == 0 else f"Daemon returned code: {rc}"
106    return rc == 0, msg
107
108
109def run_daemon(
110    func: Callable[[Any], Any],
111    *args,
112    daemon_id: Optional[str] = None,
113    keep_daemon_output: bool = True,
114    allow_dirty_run: bool = False,
115    label: Optional[str] = None,
116    **kw
117) -> Any:
118    """Execute a function as a daemon."""
119    daemon = Daemon(
120        func,
121        daemon_id=daemon_id,
122        target_args=[arg for arg in args],
123        target_kw=kw,
124        label=label,
125    )
126    return daemon.run(
127        keep_daemon_output=keep_daemon_output,
128        allow_dirty_run=allow_dirty_run,
129    )
130
131
132def get_daemons() -> List[Daemon]:
133    """
134    Return all existing Daemons, sorted by end time.
135    """
136    daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()]
137    daemons_status = {daemon: daemon.status for daemon in daemons}
138    running_daemons = {
139        daemon: daemons_status[daemon]
140        for daemon in daemons
141        if daemons_status[daemon] == 'running'
142    }
143    paused_daemons = {
144        daemon: daemons_status[daemon]
145        for daemon in daemons
146        if daemons_status[daemon] == 'paused'
147    }
148    stopped_daemons = {
149        daemon: daemons_status[daemon]
150        for daemon in daemons
151        if daemons_status[daemon] == 'stopped'
152    }
153    daemons_began = {
154        daemon: daemon.properties.get('process', {}).get('began', '9999')
155        for daemon in daemons
156    }
157    daemons_paused = {
158        daemon: daemon.properties.get('process', {}).get('paused', '9999')
159        for daemon in daemons
160    }
161    daemons_ended = {
162        daemon: daemon.properties.get('process', {}).get('ended', '9999')
163        for daemon in daemons
164    }
165    sorted_stopped_daemons = [
166        daemon
167        for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x])
168    ]
169    sorted_paused_daemons = [
170        daemon
171        for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x])
172    ]
173    sorted_running_daemons = [
174        daemon
175        for daemon in sorted(running_daemons, key=lambda x: daemons_began[x])
176    ]
177    return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons
178
179
180def get_daemon_ids() -> List[str]:
181    """
182    Return the IDs of all daemons on disk.
183    """
184    return sorted(os.listdir(DAEMON_RESOURCES_PATH))
185
186
187def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
188    """
189    Return a list of currently running daemons.
190    """
191    if daemons is None:
192        daemons = get_daemons()
193    return [
194        d
195        for d in daemons
196        if d.status == 'running'
197    ]
198
199
200def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
201    """
202    Return a list of active but paused daemons.
203    """
204    if daemons is None:
205        daemons = get_daemons()
206    return [
207        d
208        for d in daemons
209        if d.status == 'paused'
210    ]
211
212
213def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
214    """
215    Return a list of stopped daemons.
216    """
217    if daemons is None:
218        daemons = get_daemons()
219
220    return [
221        d
222        for d in daemons
223        if d.status == 'stopped'
224    ]
225
226
227def get_filtered_daemons(
228        filter_list: Optional[List[str]] = None,
229        warn: bool = False,
230    ) -> List[Daemon]:
231    """Return a list of `Daemons` filtered by a list of `daemon_ids`.
232    Only `Daemons` that exist are returned.
233    
234    If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`).
235
236    Parameters
237    ----------
238    filter_list: Optional[List[str]], default None
239        List of `daemon_ids` to include. If `daemon_ids` is `None` or empty,
240        return all `Daemons`.
241
242    warn: bool, default False
243        If `True`, raise warnings for non-existent `daemon_ids`.
244
245    Returns
246    -------
247    A list of Daemon objects.
248
249    """
250    if not filter_list:
251        daemons = get_daemons()
252        return [d for d in daemons if not d.hidden]
253    from meerschaum.utils.warnings import warn as _warn
254    daemons = []
255    for d_id in filter_list:
256        try:
257            d = Daemon(daemon_id=d_id)
258            _exists = d.path.exists()
259        except Exception as e:
260            _exists = False
261        if not _exists:
262            if warn:
263                _warn(f"Daemon '{d_id}' does not exist.", stack=False)
264            continue
265        if d.hidden:
266            pass
267        daemons.append(d)
268    return daemons
269
270
271def running_in_daemon() -> bool:
272    """
273    Return whether the current thread is running in a Daemon context.
274    """
275    from meerschaum.config.static import STATIC_CONFIG
276    daemon_env_var = STATIC_CONFIG['environment']['daemon_id']
277    return daemon_env_var in os.environ
def daemon_entry(sysargs: Optional[List[str]] = None) -> Tuple[bool, str]:
19def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple:
20    """Parse sysargs and execute a Meerschaum action as a daemon.
21
22    Parameters
23    ----------
24    sysargs: Optional[List[str]], default None
25        The command line arguments used in a Meerschaum action.
26
27    Returns
28    -------
29    A SuccessTuple.
30    """
31    from meerschaum._internal.entry import entry
32    _args = {}
33    if '--name' in sysargs or '--job-name' in sysargs:
34        from meerschaum._internal.arguments._parse_arguments import parse_arguments
35        _args = parse_arguments(sysargs)
36    filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')]
37    try:
38        label = shlex.join(filtered_sysargs) if sysargs else None
39    except Exception as e:
40        label = ' '.join(filtered_sysargs) if sysargs else None
41
42    name = _args.get('name', None)
43    daemon = None
44    if name:
45        try:
46            daemon = Daemon(daemon_id=name)
47        except Exception as e:
48            daemon = None
49
50    if daemon is not None:
51        existing_sysargs = daemon.properties['target']['args'][0]
52        existing_kwargs = parse_arguments(existing_sysargs)
53
54        ### Remove sysargs because flags are aliased.
55        _ = _args.pop('daemon', None)
56        _ = _args.pop('sysargs', None)
57        _ = _args.pop('filtered_sysargs', None)
58        debug = _args.pop('debug', None)
59        _args['sub_args'] = sorted(_args.get('sub_args', []))
60        _ = existing_kwargs.pop('daemon', None)
61        _ = existing_kwargs.pop('sysargs', None)
62        _ = existing_kwargs.pop('filtered_sysargs', None)
63        _ = existing_kwargs.pop('debug', None)
64        existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', []))
65
66        ### Only run if the kwargs equal or no actions are provided.
67        if existing_kwargs == _args or not _args.get('action', []):
68            if daemon.status == 'running':
69                return True, f"Daemon '{daemon}' is already running."
70            return daemon.run(
71                debug=debug,
72                allow_dirty_run=True,
73            )
74
75    success_tuple = run_daemon(
76        entry,
77        filtered_sysargs,
78        daemon_id=_args.get('name', None) if _args else None,
79        label=label,
80        keep_daemon_output=('--rm' not in sysargs),
81    )
82    return success_tuple

Parse sysargs and execute a Meerschaum action as a daemon.

Parameters
  • sysargs (Optional[List[str]], default None): The command line arguments used in a Meerschaum action.
Returns
  • A SuccessTuple.
def daemon_action(**kw) -> Tuple[bool, str]:
 85def daemon_action(**kw) -> SuccessTuple:
 86    """Execute a Meerschaum action as a daemon."""
 87    from meerschaum.utils.packages import run_python_package
 88    from meerschaum.utils.threading import Thread
 89    from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs
 90    from meerschaum.actions import get_action
 91
 92    kw['daemon'] = True
 93    kw['shell'] = False
 94
 95    action = kw.get('action', None)
 96    if action and get_action(action) is None:
 97        if not kw.get('allow_shell_job') and not kw.get('force'):
 98            return False, (
 99                f"Action '{action}' isn't recognized.\n\n"
100                + "  Include `--allow-shell-job`, `--force`, or `-f`\n  " 
101                + "to enable shell commands to run as Meerschaum jobs."
102            )
103
104    sysargs = parse_dict_to_sysargs(kw)
105    rc = run_python_package('meerschaum', sysargs, venv=None, debug=False)
106    msg = "Success" if rc == 0 else f"Daemon returned code: {rc}"
107    return rc == 0, msg

Execute a Meerschaum action as a daemon.

def run_daemon( func: Callable[[Any], Any], *args, daemon_id: Optional[str] = None, keep_daemon_output: bool = True, allow_dirty_run: bool = False, label: Optional[str] = None, **kw) -> Any:
110def run_daemon(
111    func: Callable[[Any], Any],
112    *args,
113    daemon_id: Optional[str] = None,
114    keep_daemon_output: bool = True,
115    allow_dirty_run: bool = False,
116    label: Optional[str] = None,
117    **kw
118) -> Any:
119    """Execute a function as a daemon."""
120    daemon = Daemon(
121        func,
122        daemon_id=daemon_id,
123        target_args=[arg for arg in args],
124        target_kw=kw,
125        label=label,
126    )
127    return daemon.run(
128        keep_daemon_output=keep_daemon_output,
129        allow_dirty_run=allow_dirty_run,
130    )

Execute a function as a daemon.

def get_daemons() -> List[meerschaum.utils.daemon.Daemon.Daemon]:
133def get_daemons() -> List[Daemon]:
134    """
135    Return all existing Daemons, sorted by end time.
136    """
137    daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()]
138    daemons_status = {daemon: daemon.status for daemon in daemons}
139    running_daemons = {
140        daemon: daemons_status[daemon]
141        for daemon in daemons
142        if daemons_status[daemon] == 'running'
143    }
144    paused_daemons = {
145        daemon: daemons_status[daemon]
146        for daemon in daemons
147        if daemons_status[daemon] == 'paused'
148    }
149    stopped_daemons = {
150        daemon: daemons_status[daemon]
151        for daemon in daemons
152        if daemons_status[daemon] == 'stopped'
153    }
154    daemons_began = {
155        daemon: daemon.properties.get('process', {}).get('began', '9999')
156        for daemon in daemons
157    }
158    daemons_paused = {
159        daemon: daemon.properties.get('process', {}).get('paused', '9999')
160        for daemon in daemons
161    }
162    daemons_ended = {
163        daemon: daemon.properties.get('process', {}).get('ended', '9999')
164        for daemon in daemons
165    }
166    sorted_stopped_daemons = [
167        daemon
168        for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x])
169    ]
170    sorted_paused_daemons = [
171        daemon
172        for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x])
173    ]
174    sorted_running_daemons = [
175        daemon
176        for daemon in sorted(running_daemons, key=lambda x: daemons_began[x])
177    ]
178    return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons

Return all existing Daemons, sorted by end time.

def get_daemon_ids() -> List[str]:
181def get_daemon_ids() -> List[str]:
182    """
183    Return the IDs of all daemons on disk.
184    """
185    return sorted(os.listdir(DAEMON_RESOURCES_PATH))

Return the IDs of all daemons on disk.

def get_running_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
188def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
189    """
190    Return a list of currently running daemons.
191    """
192    if daemons is None:
193        daemons = get_daemons()
194    return [
195        d
196        for d in daemons
197        if d.status == 'running'
198    ]

Return a list of currently running daemons.

def get_paused_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
201def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
202    """
203    Return a list of active but paused daemons.
204    """
205    if daemons is None:
206        daemons = get_daemons()
207    return [
208        d
209        for d in daemons
210        if d.status == 'paused'
211    ]

Return a list of active but paused daemons.

def get_stopped_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
214def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
215    """
216    Return a list of stopped daemons.
217    """
218    if daemons is None:
219        daemons = get_daemons()
220
221    return [
222        d
223        for d in daemons
224        if d.status == 'stopped'
225    ]

Return a list of stopped daemons.

def get_filtered_daemons( filter_list: Optional[List[str]] = None, warn: bool = False) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
228def get_filtered_daemons(
229        filter_list: Optional[List[str]] = None,
230        warn: bool = False,
231    ) -> List[Daemon]:
232    """Return a list of `Daemons` filtered by a list of `daemon_ids`.
233    Only `Daemons` that exist are returned.
234    
235    If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`).
236
237    Parameters
238    ----------
239    filter_list: Optional[List[str]], default None
240        List of `daemon_ids` to include. If `daemon_ids` is `None` or empty,
241        return all `Daemons`.
242
243    warn: bool, default False
244        If `True`, raise warnings for non-existent `daemon_ids`.
245
246    Returns
247    -------
248    A list of Daemon objects.
249
250    """
251    if not filter_list:
252        daemons = get_daemons()
253        return [d for d in daemons if not d.hidden]
254    from meerschaum.utils.warnings import warn as _warn
255    daemons = []
256    for d_id in filter_list:
257        try:
258            d = Daemon(daemon_id=d_id)
259            _exists = d.path.exists()
260        except Exception as e:
261            _exists = False
262        if not _exists:
263            if warn:
264                _warn(f"Daemon '{d_id}' does not exist.", stack=False)
265            continue
266        if d.hidden:
267            pass
268        daemons.append(d)
269    return daemons

Return a list of Daemons filtered by a list of daemon_ids. Only Daemons that exist are returned.

If filter_list is None or empty, return all Daemons (from get_daemons()).

Parameters
  • filter_list (Optional[List[str]], default None): List of daemon_ids to include. If daemon_ids is None or empty, return all Daemons.
  • warn (bool, default False): If True, raise warnings for non-existent daemon_ids.
Returns
  • A list of Daemon objects.
def running_in_daemon() -> bool:
272def running_in_daemon() -> bool:
273    """
274    Return whether the current thread is running in a Daemon context.
275    """
276    from meerschaum.config.static import STATIC_CONFIG
277    daemon_env_var = STATIC_CONFIG['environment']['daemon_id']
278    return daemon_env_var in os.environ

Return whether the current thread is running in a Daemon context.