Module meerschaum.utils.daemon
Daemonize processes via daemoniker.
Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
"""
Daemonize processes via daemoniker.
"""
from __future__ import annotations
import os, pathlib, shutil, json, datetime, threading, shlex
from meerschaum.utils.typing import SuccessTuple, List, Optional, Callable, Any, Dict
from meerschaum.config._paths import DAEMON_RESOURCES_PATH
from meerschaum.utils.daemon.Daemon import Daemon
from meerschaum.utils.daemon.Log import Log
def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple:
"""Parse sysargs and execute a Meerschaum action as a daemon.
Parameters
----------
sysargs: Optional[List[str]], default None
The command line arguments used in a Meerschaum action.
Returns
-------
A SuccessTuple.
"""
from meerschaum._internal.entry import entry
_args = None
if '--name' in sysargs or '--job-name' in sysargs:
from meerschaum._internal.arguments._parse_arguments import parse_arguments
_args = parse_arguments(sysargs)
filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')]
try:
label = shlex.join(filtered_sysargs) if sysargs else None
except Exception as e:
label = ' '.join(filtered_sysargs) if sysargs else None
success_tuple = run_daemon(
entry,
filtered_sysargs,
daemon_id = _args.get('name', None) if _args else None,
label = label,
keep_daemon_output = ('--rm' not in sysargs)
)
if not isinstance(success_tuple, tuple):
success_tuple = False, str(success_tuple)
return success_tuple
def daemon_action(**kw) -> SuccessTuple:
"""Execute a Meerschaum action as a daemon."""
from meerschaum.utils.packages import run_python_package
from meerschaum.utils.threading import Thread
from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs
from meerschaum.actions import actions
kw['daemon'] = True
kw['shell'] = False
if kw.get('action', None) and kw.get('action')[0] not in actions:
if not kw.get('allow_shell_job') and not kw.get('force'):
return False, (
f"Action '{kw.get('action')[0]}' isn't recognized.\n\n"
+ " Include `--allow-shell-job`, `--force`, or `-f`\n "
+ "to enable shell commands to run as Meerschaum jobs."
)
sysargs = parse_dict_to_sysargs(kw)
rc = run_python_package('meerschaum', sysargs, venv=None, debug=False)
msg = "Success" if rc == 0 else f"Daemon for '{' '.join(sysargs)}' returned code: {rc}"
return rc == 0, msg
def run_daemon(
func: Callable[[Any], Any],
*args,
daemon_id: Optional[str] = None,
keep_daemon_output: bool = False,
allow_dirty_run: bool = False,
label: Optional[str] = None,
**kw
) -> Any:
"""Execute a function as a daemon."""
daemon = Daemon(
func,
daemon_id = daemon_id,
target_args = [arg for arg in args],
target_kw = kw,
label = label,
)
return daemon.run(
keep_daemon_output = keep_daemon_output,
allow_dirty_run = allow_dirty_run,
)
def get_daemons() -> List[Daemon]:
""" """
return [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()]
def get_daemon_ids() -> List[str]:
""" """
return os.listdir(DAEMON_RESOURCES_PATH)
def get_running_daemons(daemons : Optional[List[Daemon]] = None) -> List[Daemon]:
"""
Return a list of currently running daemons.
"""
if daemons is None:
daemons = get_daemons()
return [
d for d in daemons if d.pid_path.exists()
]
def get_stopped_daemons(
daemons : Optional[List[Daemon]] = None,
running_daemons : Optional[List[Daemon]] = None,
) -> List[Daemon]:
"""
Return a list of stopped daemons.
"""
if daemons is None:
daemons = get_daemons()
if running_daemons is None:
running_daemons = get_running_daemons(daemons)
return [d for d in daemons if d not in running_daemons]
def get_filtered_daemons(
filter_list: Optional[List[str]] = None,
warn: bool = False,
) -> List[Daemon]:
"""Return a list of `Daemons` filtered by a list of `daemon_ids`.
Only `Daemons` that exist are returned.
If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`).
Parameters
----------
filter_list: Optional[List[str]], default None
List of `daemon_ids` to include. If `daemon_ids` is `None` or empty,
return all `Daemons`.
warn: bool, default False
If `True`, raise warnings for non-existent `daemon_ids`.
Returns
-------
A list of Daemon objects.
"""
if not filter_list:
daemons = get_daemons()
return [d for d in daemons if not d.hidden]
from meerschaum.utils.warnings import warn as _warn
daemons = []
for d_id in filter_list:
try:
d = Daemon(daemon_id=d_id)
_exists = d.path.exists()
except Exception as e:
_exists = False
if not _exists:
if warn:
_warn(f"Daemon '{d_id}' does not exist.", stack=False)
continue
if d.hidden:
pass
daemons.append(d)
return daemons
Sub-modules
meerschaum.utils.daemon.Daemon
-
Manage running daemons via the Daemon class.
meerschaum.utils.daemon.Log
-
A logging class to keep track of the offset when printing unread lines.
Functions
def daemon_action(**kw) ‑> Tuple[bool, str]
-
Execute a Meerschaum action as a daemon.
Expand source code
def daemon_action(**kw) -> SuccessTuple: """Execute a Meerschaum action as a daemon.""" from meerschaum.utils.packages import run_python_package from meerschaum.utils.threading import Thread from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs from meerschaum.actions import actions kw['daemon'] = True kw['shell'] = False if kw.get('action', None) and kw.get('action')[0] not in actions: if not kw.get('allow_shell_job') and not kw.get('force'): return False, ( f"Action '{kw.get('action')[0]}' isn't recognized.\n\n" + " Include `--allow-shell-job`, `--force`, or `-f`\n " + "to enable shell commands to run as Meerschaum jobs." ) sysargs = parse_dict_to_sysargs(kw) rc = run_python_package('meerschaum', sysargs, venv=None, debug=False) msg = "Success" if rc == 0 else f"Daemon for '{' '.join(sysargs)}' returned code: {rc}" return rc == 0, msg
def daemon_entry(sysargs: Optional[List[str]] = None) ‑> Tuple[bool, str]
-
Parse sysargs and execute a Meerschaum action as a daemon.
Parameters
sysargs
:Optional[List[str]]
, defaultNone
- The command line arguments used in a Meerschaum action.
Returns
A SuccessTuple.
Expand source code
def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple: """Parse sysargs and execute a Meerschaum action as a daemon. Parameters ---------- sysargs: Optional[List[str]], default None The command line arguments used in a Meerschaum action. Returns ------- A SuccessTuple. """ from meerschaum._internal.entry import entry _args = None if '--name' in sysargs or '--job-name' in sysargs: from meerschaum._internal.arguments._parse_arguments import parse_arguments _args = parse_arguments(sysargs) filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')] try: label = shlex.join(filtered_sysargs) if sysargs else None except Exception as e: label = ' '.join(filtered_sysargs) if sysargs else None success_tuple = run_daemon( entry, filtered_sysargs, daemon_id = _args.get('name', None) if _args else None, label = label, keep_daemon_output = ('--rm' not in sysargs) ) if not isinstance(success_tuple, tuple): success_tuple = False, str(success_tuple) return success_tuple
def get_daemon_ids() ‑> List[str]
-
Expand source code
def get_daemon_ids() -> List[str]: """ """ return os.listdir(DAEMON_RESOURCES_PATH)
def get_daemons() ‑> List[Daemon]
-
Expand source code
def get_daemons() -> List[Daemon]: """ """ return [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()]
def get_filtered_daemons(filter_list: Optional[List[str]] = None, warn: bool = False) ‑> List[Daemon]
-
Return a list of
Daemons
filtered by a list ofdaemon_ids
. OnlyDaemons
that exist are returned.If
filter_list
isNone
or empty, return allDaemons
(fromget_daemons()
).Parameters
filter_list
:Optional[List[str]]
, defaultNone
- List of
daemon_ids
to include. Ifdaemon_ids
isNone
or empty, return allDaemons
. warn
:bool
, defaultFalse
- If
True
, raise warnings for non-existentdaemon_ids
.
Returns
A list of Daemon objects.
Expand source code
def get_filtered_daemons( filter_list: Optional[List[str]] = None, warn: bool = False, ) -> List[Daemon]: """Return a list of `Daemons` filtered by a list of `daemon_ids`. Only `Daemons` that exist are returned. If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`). Parameters ---------- filter_list: Optional[List[str]], default None List of `daemon_ids` to include. If `daemon_ids` is `None` or empty, return all `Daemons`. warn: bool, default False If `True`, raise warnings for non-existent `daemon_ids`. Returns ------- A list of Daemon objects. """ if not filter_list: daemons = get_daemons() return [d for d in daemons if not d.hidden] from meerschaum.utils.warnings import warn as _warn daemons = [] for d_id in filter_list: try: d = Daemon(daemon_id=d_id) _exists = d.path.exists() except Exception as e: _exists = False if not _exists: if warn: _warn(f"Daemon '{d_id}' does not exist.", stack=False) continue if d.hidden: pass daemons.append(d) return daemons
def get_running_daemons(daemons: Optional[List[meerschaum.utils.daemon.Daemon]] = None) ‑> List[Daemon]
-
Return a list of currently running daemons.
Expand source code
def get_running_daemons(daemons : Optional[List[Daemon]] = None) -> List[Daemon]: """ Return a list of currently running daemons. """ if daemons is None: daemons = get_daemons() return [ d for d in daemons if d.pid_path.exists() ]
def get_stopped_daemons(daemons: Optional[List[meerschaum.utils.daemon.Daemon]] = None, running_daemons: Optional[List[meerschaum.utils.daemon.Daemon]] = None) ‑> List[Daemon]
-
Return a list of stopped daemons.
Expand source code
def get_stopped_daemons( daemons : Optional[List[Daemon]] = None, running_daemons : Optional[List[Daemon]] = None, ) -> List[Daemon]: """ Return a list of stopped daemons. """ if daemons is None: daemons = get_daemons() if running_daemons is None: running_daemons = get_running_daemons(daemons) return [d for d in daemons if d not in running_daemons]
def run_daemon(func: Callable[[Any], Any], *args, daemon_id: Optional[str] = None, keep_daemon_output: bool = False, allow_dirty_run: bool = False, label: Optional[str] = None, **kw) ‑> Any
-
Execute a function as a daemon.
Expand source code
def run_daemon( func: Callable[[Any], Any], *args, daemon_id: Optional[str] = None, keep_daemon_output: bool = False, allow_dirty_run: bool = False, label: Optional[str] = None, **kw ) -> Any: """Execute a function as a daemon.""" daemon = Daemon( func, daemon_id = daemon_id, target_args = [arg for arg in args], target_kw = kw, label = label, ) return daemon.run( keep_daemon_output = keep_daemon_output, allow_dirty_run = allow_dirty_run, )