meerschaum.utils.daemon
Manage Daemons via the Daemon
class.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Manage Daemons via the `Daemon` class. 7""" 8 9from __future__ import annotations 10import os, pathlib, shutil, json, datetime, threading, shlex 11from meerschaum.utils.typing import SuccessTuple, List, Optional, Callable, Any, Dict 12from meerschaum.config._paths import DAEMON_RESOURCES_PATH 13from meerschaum.utils.daemon.Daemon import Daemon 14from meerschaum.utils.daemon.RotatingFile import RotatingFile 15from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor 16 17 18def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple: 19 """Parse sysargs and execute a Meerschaum action as a daemon. 20 21 Parameters 22 ---------- 23 sysargs: Optional[List[str]], default None 24 The command line arguments used in a Meerschaum action. 25 26 Returns 27 ------- 28 A SuccessTuple. 29 """ 30 from meerschaum._internal.entry import entry 31 _args = {} 32 if '--name' in sysargs or '--job-name' in sysargs: 33 from meerschaum._internal.arguments._parse_arguments import parse_arguments 34 _args = parse_arguments(sysargs) 35 filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')] 36 try: 37 label = shlex.join(filtered_sysargs) if sysargs else None 38 except Exception as e: 39 label = ' '.join(filtered_sysargs) if sysargs else None 40 41 name = _args.get('name', None) 42 daemon = None 43 if name: 44 try: 45 daemon = Daemon(daemon_id=name) 46 except Exception as e: 47 daemon = None 48 49 if daemon is not None: 50 existing_sysargs = daemon.properties['target']['args'][0] 51 existing_kwargs = parse_arguments(existing_sysargs) 52 53 ### Remove sysargs because flags are aliased. 54 _ = _args.pop('daemon', None) 55 _ = _args.pop('sysargs', None) 56 _ = _args.pop('filtered_sysargs', None) 57 debug = _args.pop('debug', None) 58 _args['sub_args'] = sorted(_args.get('sub_args', [])) 59 _ = existing_kwargs.pop('daemon', None) 60 _ = existing_kwargs.pop('sysargs', None) 61 _ = existing_kwargs.pop('filtered_sysargs', None) 62 _ = existing_kwargs.pop('debug', None) 63 existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', [])) 64 65 ### Only run if the kwargs equal or no actions are provided. 66 if existing_kwargs == _args or not _args.get('action', []): 67 if daemon.status == 'running': 68 return True, f"Daemon '{daemon}' is already running." 69 return daemon.run( 70 debug=debug, 71 allow_dirty_run=True, 72 ) 73 74 success_tuple = run_daemon( 75 entry, 76 filtered_sysargs, 77 daemon_id=_args.get('name', None) if _args else None, 78 label=label, 79 keep_daemon_output=('--rm' not in sysargs), 80 ) 81 return success_tuple 82 83 84def daemon_action(**kw) -> SuccessTuple: 85 """Execute a Meerschaum action as a daemon.""" 86 from meerschaum.utils.packages import run_python_package 87 from meerschaum.utils.threading import Thread 88 from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs 89 from meerschaum.actions import get_action 90 91 kw['daemon'] = True 92 kw['shell'] = False 93 94 action = kw.get('action', None) 95 if action and get_action(action) is None: 96 if not kw.get('allow_shell_job') and not kw.get('force'): 97 return False, ( 98 f"Action '{action}' isn't recognized.\n\n" 99 + " Include `--allow-shell-job`, `--force`, or `-f`\n " 100 + "to enable shell commands to run as Meerschaum jobs." 101 ) 102 103 sysargs = parse_dict_to_sysargs(kw) 104 rc = run_python_package('meerschaum', sysargs, venv=None, debug=False) 105 msg = "Success" if rc == 0 else f"Daemon returned code: {rc}" 106 return rc == 0, msg 107 108 109def run_daemon( 110 func: Callable[[Any], Any], 111 *args, 112 daemon_id: Optional[str] = None, 113 keep_daemon_output: bool = True, 114 allow_dirty_run: bool = False, 115 label: Optional[str] = None, 116 **kw 117) -> Any: 118 """Execute a function as a daemon.""" 119 daemon = Daemon( 120 func, 121 daemon_id=daemon_id, 122 target_args=[arg for arg in args], 123 target_kw=kw, 124 label=label, 125 ) 126 return daemon.run( 127 keep_daemon_output=keep_daemon_output, 128 allow_dirty_run=allow_dirty_run, 129 ) 130 131 132def get_daemons() -> List[Daemon]: 133 """ 134 Return all existing Daemons, sorted by end time. 135 """ 136 daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()] 137 daemons_status = {daemon: daemon.status for daemon in daemons} 138 running_daemons = { 139 daemon: daemons_status[daemon] 140 for daemon in daemons 141 if daemons_status[daemon] == 'running' 142 } 143 paused_daemons = { 144 daemon: daemons_status[daemon] 145 for daemon in daemons 146 if daemons_status[daemon] == 'paused' 147 } 148 stopped_daemons = { 149 daemon: daemons_status[daemon] 150 for daemon in daemons 151 if daemons_status[daemon] == 'stopped' 152 } 153 daemons_began = { 154 daemon: daemon.properties.get('process', {}).get('began', '9999') 155 for daemon in daemons 156 } 157 daemons_paused = { 158 daemon: daemon.properties.get('process', {}).get('paused', '9999') 159 for daemon in daemons 160 } 161 daemons_ended = { 162 daemon: daemon.properties.get('process', {}).get('ended', '9999') 163 for daemon in daemons 164 } 165 sorted_stopped_daemons = [ 166 daemon 167 for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x]) 168 ] 169 sorted_paused_daemons = [ 170 daemon 171 for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x]) 172 ] 173 sorted_running_daemons = [ 174 daemon 175 for daemon in sorted(running_daemons, key=lambda x: daemons_began[x]) 176 ] 177 return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons 178 179 180def get_daemon_ids() -> List[str]: 181 """ 182 Return the IDs of all daemons on disk. 183 """ 184 return sorted(os.listdir(DAEMON_RESOURCES_PATH)) 185 186 187def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 188 """ 189 Return a list of currently running daemons. 190 """ 191 if daemons is None: 192 daemons = get_daemons() 193 return [ 194 d 195 for d in daemons 196 if d.status == 'running' 197 ] 198 199 200def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 201 """ 202 Return a list of active but paused daemons. 203 """ 204 if daemons is None: 205 daemons = get_daemons() 206 return [ 207 d 208 for d in daemons 209 if d.status == 'paused' 210 ] 211 212 213def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 214 """ 215 Return a list of stopped daemons. 216 """ 217 if daemons is None: 218 daemons = get_daemons() 219 220 return [ 221 d 222 for d in daemons 223 if d.status == 'stopped' 224 ] 225 226 227def get_filtered_daemons( 228 filter_list: Optional[List[str]] = None, 229 warn: bool = False, 230 ) -> List[Daemon]: 231 """Return a list of `Daemons` filtered by a list of `daemon_ids`. 232 Only `Daemons` that exist are returned. 233 234 If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`). 235 236 Parameters 237 ---------- 238 filter_list: Optional[List[str]], default None 239 List of `daemon_ids` to include. If `daemon_ids` is `None` or empty, 240 return all `Daemons`. 241 242 warn: bool, default False 243 If `True`, raise warnings for non-existent `daemon_ids`. 244 245 Returns 246 ------- 247 A list of Daemon objects. 248 249 """ 250 if not filter_list: 251 daemons = get_daemons() 252 return [d for d in daemons if not d.hidden] 253 from meerschaum.utils.warnings import warn as _warn 254 daemons = [] 255 for d_id in filter_list: 256 try: 257 d = Daemon(daemon_id=d_id) 258 _exists = d.path.exists() 259 except Exception as e: 260 _exists = False 261 if not _exists: 262 if warn: 263 _warn(f"Daemon '{d_id}' does not exist.", stack=False) 264 continue 265 if d.hidden: 266 pass 267 daemons.append(d) 268 return daemons 269 270 271def running_in_daemon() -> bool: 272 """ 273 Return whether the current thread is running in a Daemon context. 274 """ 275 from meerschaum.config.static import STATIC_CONFIG 276 daemon_env_var = STATIC_CONFIG['environment']['daemon_id'] 277 return daemon_env_var in os.environ
def
daemon_entry(sysargs: Optional[List[str]] = None) -> Tuple[bool, str]:
19def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple: 20 """Parse sysargs and execute a Meerschaum action as a daemon. 21 22 Parameters 23 ---------- 24 sysargs: Optional[List[str]], default None 25 The command line arguments used in a Meerschaum action. 26 27 Returns 28 ------- 29 A SuccessTuple. 30 """ 31 from meerschaum._internal.entry import entry 32 _args = {} 33 if '--name' in sysargs or '--job-name' in sysargs: 34 from meerschaum._internal.arguments._parse_arguments import parse_arguments 35 _args = parse_arguments(sysargs) 36 filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')] 37 try: 38 label = shlex.join(filtered_sysargs) if sysargs else None 39 except Exception as e: 40 label = ' '.join(filtered_sysargs) if sysargs else None 41 42 name = _args.get('name', None) 43 daemon = None 44 if name: 45 try: 46 daemon = Daemon(daemon_id=name) 47 except Exception as e: 48 daemon = None 49 50 if daemon is not None: 51 existing_sysargs = daemon.properties['target']['args'][0] 52 existing_kwargs = parse_arguments(existing_sysargs) 53 54 ### Remove sysargs because flags are aliased. 55 _ = _args.pop('daemon', None) 56 _ = _args.pop('sysargs', None) 57 _ = _args.pop('filtered_sysargs', None) 58 debug = _args.pop('debug', None) 59 _args['sub_args'] = sorted(_args.get('sub_args', [])) 60 _ = existing_kwargs.pop('daemon', None) 61 _ = existing_kwargs.pop('sysargs', None) 62 _ = existing_kwargs.pop('filtered_sysargs', None) 63 _ = existing_kwargs.pop('debug', None) 64 existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', [])) 65 66 ### Only run if the kwargs equal or no actions are provided. 67 if existing_kwargs == _args or not _args.get('action', []): 68 if daemon.status == 'running': 69 return True, f"Daemon '{daemon}' is already running." 70 return daemon.run( 71 debug=debug, 72 allow_dirty_run=True, 73 ) 74 75 success_tuple = run_daemon( 76 entry, 77 filtered_sysargs, 78 daemon_id=_args.get('name', None) if _args else None, 79 label=label, 80 keep_daemon_output=('--rm' not in sysargs), 81 ) 82 return success_tuple
Parse sysargs and execute a Meerschaum action as a daemon.
Parameters
- sysargs (Optional[List[str]], default None): The command line arguments used in a Meerschaum action.
Returns
- A SuccessTuple.
def
daemon_action(**kw) -> Tuple[bool, str]:
85def daemon_action(**kw) -> SuccessTuple: 86 """Execute a Meerschaum action as a daemon.""" 87 from meerschaum.utils.packages import run_python_package 88 from meerschaum.utils.threading import Thread 89 from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs 90 from meerschaum.actions import get_action 91 92 kw['daemon'] = True 93 kw['shell'] = False 94 95 action = kw.get('action', None) 96 if action and get_action(action) is None: 97 if not kw.get('allow_shell_job') and not kw.get('force'): 98 return False, ( 99 f"Action '{action}' isn't recognized.\n\n" 100 + " Include `--allow-shell-job`, `--force`, or `-f`\n " 101 + "to enable shell commands to run as Meerschaum jobs." 102 ) 103 104 sysargs = parse_dict_to_sysargs(kw) 105 rc = run_python_package('meerschaum', sysargs, venv=None, debug=False) 106 msg = "Success" if rc == 0 else f"Daemon returned code: {rc}" 107 return rc == 0, msg
Execute a Meerschaum action as a daemon.
def
run_daemon( func: Callable[[Any], Any], *args, daemon_id: Optional[str] = None, keep_daemon_output: bool = True, allow_dirty_run: bool = False, label: Optional[str] = None, **kw) -> Any:
110def run_daemon( 111 func: Callable[[Any], Any], 112 *args, 113 daemon_id: Optional[str] = None, 114 keep_daemon_output: bool = True, 115 allow_dirty_run: bool = False, 116 label: Optional[str] = None, 117 **kw 118) -> Any: 119 """Execute a function as a daemon.""" 120 daemon = Daemon( 121 func, 122 daemon_id=daemon_id, 123 target_args=[arg for arg in args], 124 target_kw=kw, 125 label=label, 126 ) 127 return daemon.run( 128 keep_daemon_output=keep_daemon_output, 129 allow_dirty_run=allow_dirty_run, 130 )
Execute a function as a daemon.
133def get_daemons() -> List[Daemon]: 134 """ 135 Return all existing Daemons, sorted by end time. 136 """ 137 daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()] 138 daemons_status = {daemon: daemon.status for daemon in daemons} 139 running_daemons = { 140 daemon: daemons_status[daemon] 141 for daemon in daemons 142 if daemons_status[daemon] == 'running' 143 } 144 paused_daemons = { 145 daemon: daemons_status[daemon] 146 for daemon in daemons 147 if daemons_status[daemon] == 'paused' 148 } 149 stopped_daemons = { 150 daemon: daemons_status[daemon] 151 for daemon in daemons 152 if daemons_status[daemon] == 'stopped' 153 } 154 daemons_began = { 155 daemon: daemon.properties.get('process', {}).get('began', '9999') 156 for daemon in daemons 157 } 158 daemons_paused = { 159 daemon: daemon.properties.get('process', {}).get('paused', '9999') 160 for daemon in daemons 161 } 162 daemons_ended = { 163 daemon: daemon.properties.get('process', {}).get('ended', '9999') 164 for daemon in daemons 165 } 166 sorted_stopped_daemons = [ 167 daemon 168 for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x]) 169 ] 170 sorted_paused_daemons = [ 171 daemon 172 for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x]) 173 ] 174 sorted_running_daemons = [ 175 daemon 176 for daemon in sorted(running_daemons, key=lambda x: daemons_began[x]) 177 ] 178 return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons
Return all existing Daemons, sorted by end time.
def
get_daemon_ids() -> List[str]:
181def get_daemon_ids() -> List[str]: 182 """ 183 Return the IDs of all daemons on disk. 184 """ 185 return sorted(os.listdir(DAEMON_RESOURCES_PATH))
Return the IDs of all daemons on disk.
def
get_running_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
188def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 189 """ 190 Return a list of currently running daemons. 191 """ 192 if daemons is None: 193 daemons = get_daemons() 194 return [ 195 d 196 for d in daemons 197 if d.status == 'running' 198 ]
Return a list of currently running daemons.
def
get_paused_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
201def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 202 """ 203 Return a list of active but paused daemons. 204 """ 205 if daemons is None: 206 daemons = get_daemons() 207 return [ 208 d 209 for d in daemons 210 if d.status == 'paused' 211 ]
Return a list of active but paused daemons.
def
get_stopped_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
214def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 215 """ 216 Return a list of stopped daemons. 217 """ 218 if daemons is None: 219 daemons = get_daemons() 220 221 return [ 222 d 223 for d in daemons 224 if d.status == 'stopped' 225 ]
Return a list of stopped daemons.
def
get_filtered_daemons( filter_list: Optional[List[str]] = None, warn: bool = False) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
228def get_filtered_daemons( 229 filter_list: Optional[List[str]] = None, 230 warn: bool = False, 231 ) -> List[Daemon]: 232 """Return a list of `Daemons` filtered by a list of `daemon_ids`. 233 Only `Daemons` that exist are returned. 234 235 If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`). 236 237 Parameters 238 ---------- 239 filter_list: Optional[List[str]], default None 240 List of `daemon_ids` to include. If `daemon_ids` is `None` or empty, 241 return all `Daemons`. 242 243 warn: bool, default False 244 If `True`, raise warnings for non-existent `daemon_ids`. 245 246 Returns 247 ------- 248 A list of Daemon objects. 249 250 """ 251 if not filter_list: 252 daemons = get_daemons() 253 return [d for d in daemons if not d.hidden] 254 from meerschaum.utils.warnings import warn as _warn 255 daemons = [] 256 for d_id in filter_list: 257 try: 258 d = Daemon(daemon_id=d_id) 259 _exists = d.path.exists() 260 except Exception as e: 261 _exists = False 262 if not _exists: 263 if warn: 264 _warn(f"Daemon '{d_id}' does not exist.", stack=False) 265 continue 266 if d.hidden: 267 pass 268 daemons.append(d) 269 return daemons
Return a list of Daemons
filtered by a list of daemon_ids
.
Only Daemons
that exist are returned.
If filter_list
is None
or empty, return all Daemons
(from get_daemons()
).
Parameters
- filter_list (Optional[List[str]], default None):
List of
daemon_ids
to include. Ifdaemon_ids
isNone
or empty, return allDaemons
. - warn (bool, default False):
If
True
, raise warnings for non-existentdaemon_ids
.
Returns
- A list of Daemon objects.
def
running_in_daemon() -> bool:
272def running_in_daemon() -> bool: 273 """ 274 Return whether the current thread is running in a Daemon context. 275 """ 276 from meerschaum.config.static import STATIC_CONFIG 277 daemon_env_var = STATIC_CONFIG['environment']['daemon_id'] 278 return daemon_env_var in os.environ
Return whether the current thread is running in a Daemon context.