meerschaum.utils.daemon

Manage Daemons via the Daemon class.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Manage Daemons via the `Daemon` class.
  7"""
  8
  9from __future__ import annotations
 10import os, pathlib, shutil, json, datetime, threading, shlex
 11from meerschaum.utils.typing import SuccessTuple, List, Optional, Callable, Any, Dict
 12from meerschaum.config._paths import DAEMON_RESOURCES_PATH
 13from meerschaum.utils.daemon.Daemon import Daemon
 14from meerschaum.utils.daemon.RotatingFile import RotatingFile
 15
 16
 17def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple:
 18    """Parse sysargs and execute a Meerschaum action as a daemon.
 19
 20    Parameters
 21    ----------
 22    sysargs: Optional[List[str]], default None
 23        The command line arguments used in a Meerschaum action.
 24
 25    Returns
 26    -------
 27    A SuccessTuple.
 28    """
 29    from meerschaum._internal.entry import entry
 30    _args = {}
 31    if '--name' in sysargs or '--job-name' in sysargs:
 32        from meerschaum._internal.arguments._parse_arguments import parse_arguments
 33        _args = parse_arguments(sysargs)
 34    filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')]
 35    try:
 36        label = shlex.join(filtered_sysargs) if sysargs else None
 37    except Exception as e:
 38        label = ' '.join(filtered_sysargs) if sysargs else None
 39
 40    name = _args.get('name', None)
 41    daemon = None
 42    if name:
 43        try:
 44            daemon = Daemon(daemon_id=name)
 45        except Exception as e:
 46            daemon = None
 47
 48    if daemon is not None:
 49        existing_sysargs = daemon.properties['target']['args'][0]
 50        existing_kwargs = parse_arguments(existing_sysargs)
 51
 52        ### Remove sysargs because flags are aliased.
 53        _ = _args.pop('daemon', None)
 54        _ = _args.pop('sysargs', None)
 55        _ = _args.pop('filtered_sysargs', None)
 56        debug = _args.pop('debug', None)
 57        _args['sub_args'] = sorted(_args.get('sub_args', []))
 58        _ = existing_kwargs.pop('daemon', None)
 59        _ = existing_kwargs.pop('sysargs', None)
 60        _ = existing_kwargs.pop('filtered_sysargs', None)
 61        _ = existing_kwargs.pop('debug', None)
 62        existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', []))
 63
 64        ### Only run if the kwargs equal or no actions are provided.
 65        if existing_kwargs == _args or not _args.get('action', []):
 66            return daemon.run(
 67                debug = debug,
 68                allow_dirty_run = True,
 69            )
 70
 71    success_tuple = run_daemon(
 72        entry,
 73        filtered_sysargs,
 74        daemon_id = _args.get('name', None) if _args else None,
 75        label = label,
 76        keep_daemon_output = ('--rm' not in sysargs)
 77    )
 78    if not isinstance(success_tuple, tuple):
 79        success_tuple = False, str(success_tuple)
 80    return success_tuple
 81
 82
 83def daemon_action(**kw) -> SuccessTuple:
 84    """Execute a Meerschaum action as a daemon."""
 85    from meerschaum.utils.packages import run_python_package
 86    from meerschaum.utils.threading import Thread
 87    from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs
 88    from meerschaum.actions import get_action
 89
 90    kw['daemon'] = True
 91    kw['shell'] = False
 92
 93    action = kw.get('action', None)
 94    if action and get_action(action) is None:
 95        if not kw.get('allow_shell_job') and not kw.get('force'):
 96            return False, (
 97                f"Action '{action}' isn't recognized.\n\n"
 98                + "  Include `--allow-shell-job`, `--force`, or `-f`\n  " 
 99                + "to enable shell commands to run as Meerschaum jobs."
100            )
101
102    sysargs = parse_dict_to_sysargs(kw)
103    rc = run_python_package('meerschaum', sysargs, venv=None, debug=False)
104    msg = "Success" if rc == 0 else f"Daemon returned code: {rc}"
105    return rc == 0, msg
106
107
108def run_daemon(
109        func: Callable[[Any], Any],
110        *args,
111        daemon_id: Optional[str] = None,
112        keep_daemon_output: bool = False,
113        allow_dirty_run: bool = False,
114        label: Optional[str] = None,
115        **kw
116    ) -> Any:
117    """Execute a function as a daemon."""
118    daemon = Daemon(
119        func,
120        daemon_id = daemon_id,
121        target_args = [arg for arg in args],
122        target_kw = kw,
123        label = label,
124    )
125    return daemon.run(
126        keep_daemon_output = keep_daemon_output,
127        allow_dirty_run = allow_dirty_run,
128    )
129
130
131def get_daemons() -> List[Daemon]:
132    """
133    Return all existing Daemons, sorted by end time.
134    """
135    daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()]
136    daemons_status = {daemon: daemon.status for daemon in daemons}
137    running_daemons = {
138        daemon: daemons_status[daemon]
139        for daemon in daemons
140        if daemons_status[daemon] == 'running'
141    }
142    paused_daemons = {
143        daemon: daemons_status[daemon]
144        for daemon in daemons
145        if daemons_status[daemon] == 'paused'
146    }
147    stopped_daemons = {
148        daemon: daemons_status[daemon]
149        for daemon in daemons
150        if daemons_status[daemon] == 'stopped'
151    }
152    daemons_began = {
153        daemon: daemon.properties.get('process', {}).get('began', '9999')
154        for daemon in daemons
155    }
156    daemons_paused = {
157        daemon: daemon.properties.get('process', {}).get('paused', '9999')
158        for daemon in daemons
159    }
160    daemons_ended = {
161        daemon: daemon.properties.get('process', {}).get('ended', '9999')
162        for daemon in daemons
163    }
164    sorted_stopped_daemons = [
165        daemon
166        for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x])
167    ]
168    sorted_paused_daemons = [
169        daemon
170        for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x])
171    ]
172    sorted_running_daemons = [
173        daemon
174        for daemon in sorted(running_daemons, key=lambda x: daemons_began[x])
175    ]
176    return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons
177
178
179def get_daemon_ids() -> List[str]:
180    """
181    Return the IDs of all daemons on disk.
182    """
183    return sorted(os.listdir(DAEMON_RESOURCES_PATH))
184
185
186def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
187    """
188    Return a list of currently running daemons.
189    """
190    if daemons is None:
191        daemons = get_daemons()
192    return [
193        d
194        for d in daemons
195        if d.status == 'running'
196    ]
197
198
199def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
200    """
201    Return a list of active but paused daemons.
202    """
203    if daemons is None:
204        daemons = get_daemons()
205    return [
206        d
207        for d in daemons
208        if d.status == 'paused'
209    ]
210
211
212def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
213    """
214    Return a list of stopped daemons.
215    """
216    if daemons is None:
217        daemons = get_daemons()
218
219    return [
220        d
221        for d in daemons
222        if d.status == 'stopped'
223    ]
224
225
226def get_filtered_daemons(
227        filter_list: Optional[List[str]] = None,
228        warn: bool = False,
229    ) -> List[Daemon]:
230    """Return a list of `Daemons` filtered by a list of `daemon_ids`.
231    Only `Daemons` that exist are returned.
232    
233    If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`).
234
235    Parameters
236    ----------
237    filter_list: Optional[List[str]], default None
238        List of `daemon_ids` to include. If `daemon_ids` is `None` or empty,
239        return all `Daemons`.
240
241    warn: bool, default False
242        If `True`, raise warnings for non-existent `daemon_ids`.
243
244    Returns
245    -------
246    A list of Daemon objects.
247
248    """
249    if not filter_list:
250        daemons = get_daemons()
251        return [d for d in daemons if not d.hidden]
252    from meerschaum.utils.warnings import warn as _warn
253    daemons = []
254    for d_id in filter_list:
255        try:
256            d = Daemon(daemon_id=d_id)
257            _exists = d.path.exists()
258        except Exception as e:
259            _exists = False
260        if not _exists:
261            if warn:
262                _warn(f"Daemon '{d_id}' does not exist.", stack=False)
263            continue
264        if d.hidden:
265            pass
266        daemons.append(d)
267    return daemons
def daemon_entry(sysargs: Optional[List[str]] = None) -> Tuple[bool, str]:
18def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple:
19    """Parse sysargs and execute a Meerschaum action as a daemon.
20
21    Parameters
22    ----------
23    sysargs: Optional[List[str]], default None
24        The command line arguments used in a Meerschaum action.
25
26    Returns
27    -------
28    A SuccessTuple.
29    """
30    from meerschaum._internal.entry import entry
31    _args = {}
32    if '--name' in sysargs or '--job-name' in sysargs:
33        from meerschaum._internal.arguments._parse_arguments import parse_arguments
34        _args = parse_arguments(sysargs)
35    filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')]
36    try:
37        label = shlex.join(filtered_sysargs) if sysargs else None
38    except Exception as e:
39        label = ' '.join(filtered_sysargs) if sysargs else None
40
41    name = _args.get('name', None)
42    daemon = None
43    if name:
44        try:
45            daemon = Daemon(daemon_id=name)
46        except Exception as e:
47            daemon = None
48
49    if daemon is not None:
50        existing_sysargs = daemon.properties['target']['args'][0]
51        existing_kwargs = parse_arguments(existing_sysargs)
52
53        ### Remove sysargs because flags are aliased.
54        _ = _args.pop('daemon', None)
55        _ = _args.pop('sysargs', None)
56        _ = _args.pop('filtered_sysargs', None)
57        debug = _args.pop('debug', None)
58        _args['sub_args'] = sorted(_args.get('sub_args', []))
59        _ = existing_kwargs.pop('daemon', None)
60        _ = existing_kwargs.pop('sysargs', None)
61        _ = existing_kwargs.pop('filtered_sysargs', None)
62        _ = existing_kwargs.pop('debug', None)
63        existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', []))
64
65        ### Only run if the kwargs equal or no actions are provided.
66        if existing_kwargs == _args or not _args.get('action', []):
67            return daemon.run(
68                debug = debug,
69                allow_dirty_run = True,
70            )
71
72    success_tuple = run_daemon(
73        entry,
74        filtered_sysargs,
75        daemon_id = _args.get('name', None) if _args else None,
76        label = label,
77        keep_daemon_output = ('--rm' not in sysargs)
78    )
79    if not isinstance(success_tuple, tuple):
80        success_tuple = False, str(success_tuple)
81    return success_tuple

Parse sysargs and execute a Meerschaum action as a daemon.

Parameters
  • sysargs (Optional[List[str]], default None): The command line arguments used in a Meerschaum action.
Returns
  • A SuccessTuple.
def daemon_action(**kw) -> Tuple[bool, str]:
 84def daemon_action(**kw) -> SuccessTuple:
 85    """Execute a Meerschaum action as a daemon."""
 86    from meerschaum.utils.packages import run_python_package
 87    from meerschaum.utils.threading import Thread
 88    from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs
 89    from meerschaum.actions import get_action
 90
 91    kw['daemon'] = True
 92    kw['shell'] = False
 93
 94    action = kw.get('action', None)
 95    if action and get_action(action) is None:
 96        if not kw.get('allow_shell_job') and not kw.get('force'):
 97            return False, (
 98                f"Action '{action}' isn't recognized.\n\n"
 99                + "  Include `--allow-shell-job`, `--force`, or `-f`\n  " 
100                + "to enable shell commands to run as Meerschaum jobs."
101            )
102
103    sysargs = parse_dict_to_sysargs(kw)
104    rc = run_python_package('meerschaum', sysargs, venv=None, debug=False)
105    msg = "Success" if rc == 0 else f"Daemon returned code: {rc}"
106    return rc == 0, msg

Execute a Meerschaum action as a daemon.

def run_daemon( func: Callable[[Any], Any], *args, daemon_id: Optional[str] = None, keep_daemon_output: bool = False, allow_dirty_run: bool = False, label: Optional[str] = None, **kw) -> Any:
109def run_daemon(
110        func: Callable[[Any], Any],
111        *args,
112        daemon_id: Optional[str] = None,
113        keep_daemon_output: bool = False,
114        allow_dirty_run: bool = False,
115        label: Optional[str] = None,
116        **kw
117    ) -> Any:
118    """Execute a function as a daemon."""
119    daemon = Daemon(
120        func,
121        daemon_id = daemon_id,
122        target_args = [arg for arg in args],
123        target_kw = kw,
124        label = label,
125    )
126    return daemon.run(
127        keep_daemon_output = keep_daemon_output,
128        allow_dirty_run = allow_dirty_run,
129    )

Execute a function as a daemon.

def get_daemons() -> List[meerschaum.utils.daemon.Daemon.Daemon]:
132def get_daemons() -> List[Daemon]:
133    """
134    Return all existing Daemons, sorted by end time.
135    """
136    daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()]
137    daemons_status = {daemon: daemon.status for daemon in daemons}
138    running_daemons = {
139        daemon: daemons_status[daemon]
140        for daemon in daemons
141        if daemons_status[daemon] == 'running'
142    }
143    paused_daemons = {
144        daemon: daemons_status[daemon]
145        for daemon in daemons
146        if daemons_status[daemon] == 'paused'
147    }
148    stopped_daemons = {
149        daemon: daemons_status[daemon]
150        for daemon in daemons
151        if daemons_status[daemon] == 'stopped'
152    }
153    daemons_began = {
154        daemon: daemon.properties.get('process', {}).get('began', '9999')
155        for daemon in daemons
156    }
157    daemons_paused = {
158        daemon: daemon.properties.get('process', {}).get('paused', '9999')
159        for daemon in daemons
160    }
161    daemons_ended = {
162        daemon: daemon.properties.get('process', {}).get('ended', '9999')
163        for daemon in daemons
164    }
165    sorted_stopped_daemons = [
166        daemon
167        for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x])
168    ]
169    sorted_paused_daemons = [
170        daemon
171        for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x])
172    ]
173    sorted_running_daemons = [
174        daemon
175        for daemon in sorted(running_daemons, key=lambda x: daemons_began[x])
176    ]
177    return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons

Return all existing Daemons, sorted by end time.

def get_daemon_ids() -> List[str]:
180def get_daemon_ids() -> List[str]:
181    """
182    Return the IDs of all daemons on disk.
183    """
184    return sorted(os.listdir(DAEMON_RESOURCES_PATH))

Return the IDs of all daemons on disk.

def get_running_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
187def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
188    """
189    Return a list of currently running daemons.
190    """
191    if daemons is None:
192        daemons = get_daemons()
193    return [
194        d
195        for d in daemons
196        if d.status == 'running'
197    ]

Return a list of currently running daemons.

def get_paused_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
200def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
201    """
202    Return a list of active but paused daemons.
203    """
204    if daemons is None:
205        daemons = get_daemons()
206    return [
207        d
208        for d in daemons
209        if d.status == 'paused'
210    ]

Return a list of active but paused daemons.

def get_stopped_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
213def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]:
214    """
215    Return a list of stopped daemons.
216    """
217    if daemons is None:
218        daemons = get_daemons()
219
220    return [
221        d
222        for d in daemons
223        if d.status == 'stopped'
224    ]

Return a list of stopped daemons.

def get_filtered_daemons( filter_list: Optional[List[str]] = None, warn: bool = False) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
227def get_filtered_daemons(
228        filter_list: Optional[List[str]] = None,
229        warn: bool = False,
230    ) -> List[Daemon]:
231    """Return a list of `Daemons` filtered by a list of `daemon_ids`.
232    Only `Daemons` that exist are returned.
233    
234    If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`).
235
236    Parameters
237    ----------
238    filter_list: Optional[List[str]], default None
239        List of `daemon_ids` to include. If `daemon_ids` is `None` or empty,
240        return all `Daemons`.
241
242    warn: bool, default False
243        If `True`, raise warnings for non-existent `daemon_ids`.
244
245    Returns
246    -------
247    A list of Daemon objects.
248
249    """
250    if not filter_list:
251        daemons = get_daemons()
252        return [d for d in daemons if not d.hidden]
253    from meerschaum.utils.warnings import warn as _warn
254    daemons = []
255    for d_id in filter_list:
256        try:
257            d = Daemon(daemon_id=d_id)
258            _exists = d.path.exists()
259        except Exception as e:
260            _exists = False
261        if not _exists:
262            if warn:
263                _warn(f"Daemon '{d_id}' does not exist.", stack=False)
264            continue
265        if d.hidden:
266            pass
267        daemons.append(d)
268    return daemons

Return a list of Daemons filtered by a list of daemon_ids. Only Daemons that exist are returned.

If filter_list is None or empty, return all Daemons (from get_daemons()).

Parameters
  • filter_list (Optional[List[str]], default None): List of daemon_ids to include. If daemon_ids is None or empty, return all Daemons.
  • warn (bool, default False): If True, raise warnings for non-existent daemon_ids.
Returns
  • A list of Daemon objects.