meerschaum.utils.daemon
Manage Daemons via the Daemon
class.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Manage Daemons via the `Daemon` class. 7""" 8 9from __future__ import annotations 10import os, pathlib, shutil, json, datetime, threading, shlex 11from meerschaum.utils.typing import SuccessTuple, List, Optional, Callable, Any, Dict 12from meerschaum.config._paths import DAEMON_RESOURCES_PATH 13from meerschaum.utils.daemon.StdinFile import StdinFile 14from meerschaum.utils.daemon.Daemon import Daemon 15from meerschaum.utils.daemon.RotatingFile import RotatingFile 16from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor 17from meerschaum.utils.daemon._names import get_new_daemon_name 18 19 20def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple: 21 """Parse sysargs and execute a Meerschaum action as a daemon. 22 23 Parameters 24 ---------- 25 sysargs: Optional[List[str]], default None 26 The command line arguments used in a Meerschaum action. 27 28 Returns 29 ------- 30 A SuccessTuple. 31 """ 32 from meerschaum._internal.entry import entry 33 _args = {} 34 if '--name' in sysargs or '--job-name' in sysargs: 35 from meerschaum._internal.arguments._parse_arguments import parse_arguments 36 _args = parse_arguments(sysargs) 37 filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')] 38 try: 39 label = shlex.join(filtered_sysargs) if sysargs else None 40 except Exception as e: 41 label = ' '.join(filtered_sysargs) if sysargs else None 42 43 name = _args.get('name', None) 44 daemon = None 45 if name: 46 try: 47 daemon = Daemon(daemon_id=name) 48 except Exception as e: 49 daemon = None 50 51 if daemon is not None: 52 existing_sysargs = daemon.properties['target']['args'][0] 53 existing_kwargs = parse_arguments(existing_sysargs) 54 55 ### Remove sysargs because flags are aliased. 56 _ = _args.pop('daemon', None) 57 _ = _args.pop('sysargs', None) 58 _ = _args.pop('filtered_sysargs', None) 59 debug = _args.pop('debug', None) 60 _args['sub_args'] = sorted(_args.get('sub_args', [])) 61 _ = existing_kwargs.pop('daemon', None) 62 _ = existing_kwargs.pop('sysargs', None) 63 _ = existing_kwargs.pop('filtered_sysargs', None) 64 _ = existing_kwargs.pop('debug', None) 65 existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', [])) 66 67 ### Only run if the kwargs equal or no actions are provided. 68 if existing_kwargs == _args or not _args.get('action', []): 69 if daemon.status == 'running': 70 return True, f"Daemon '{daemon}' is already running." 71 return daemon.run( 72 debug=debug, 73 allow_dirty_run=True, 74 ) 75 76 success_tuple = run_daemon( 77 entry, 78 filtered_sysargs, 79 daemon_id=_args.get('name', None) if _args else None, 80 label=label, 81 keep_daemon_output=('--rm' not in (sysargs or [])), 82 ) 83 return success_tuple 84 85 86def daemon_action(**kw) -> SuccessTuple: 87 """Execute a Meerschaum action as a daemon.""" 88 from meerschaum.utils.packages import run_python_package 89 from meerschaum.utils.threading import Thread 90 from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs 91 from meerschaum.actions import get_action 92 93 kw['daemon'] = True 94 kw['shell'] = False 95 96 action = kw.get('action', None) 97 if action and get_action(action) is None: 98 if not kw.get('allow_shell_job') and not kw.get('force'): 99 return False, ( 100 f"Action '{action}' isn't recognized.\n\n" 101 + " Include `--allow-shell-job`, `--force`, or `-f`\n " 102 + "to enable shell commands to run as Meerschaum jobs." 103 ) 104 105 sysargs = parse_dict_to_sysargs(kw) 106 rc = run_python_package('meerschaum', sysargs, venv=None, debug=False) 107 msg = "Success" if rc == 0 else f"Daemon returned code: {rc}" 108 return rc == 0, msg 109 110 111def run_daemon( 112 func: Callable[[Any], Any], 113 *args, 114 daemon_id: Optional[str] = None, 115 keep_daemon_output: bool = True, 116 allow_dirty_run: bool = False, 117 label: Optional[str] = None, 118 **kw 119) -> Any: 120 """Execute a function as a daemon.""" 121 daemon = Daemon( 122 func, 123 daemon_id=daemon_id, 124 target_args=[arg for arg in args], 125 target_kw=kw, 126 label=label, 127 ) 128 return daemon.run( 129 keep_daemon_output=keep_daemon_output, 130 allow_dirty_run=allow_dirty_run, 131 ) 132 133 134def get_daemons() -> List[Daemon]: 135 """ 136 Return all existing Daemons, sorted by end time. 137 """ 138 daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()] 139 daemons_status = {daemon: daemon.status for daemon in daemons} 140 running_daemons = { 141 daemon: daemons_status[daemon] 142 for daemon in daemons 143 if daemons_status[daemon] == 'running' 144 } 145 paused_daemons = { 146 daemon: daemons_status[daemon] 147 for daemon in daemons 148 if daemons_status[daemon] == 'paused' 149 } 150 stopped_daemons = { 151 daemon: daemons_status[daemon] 152 for daemon in daemons 153 if daemons_status[daemon] == 'stopped' 154 } 155 daemons_began = { 156 daemon: daemon.properties.get('process', {}).get('began', '9999') 157 for daemon in daemons 158 } 159 daemons_paused = { 160 daemon: daemon.properties.get('process', {}).get('paused', '9999') 161 for daemon in daemons 162 } 163 daemons_ended = { 164 daemon: daemon.properties.get('process', {}).get('ended', '9999') 165 for daemon in daemons 166 } 167 sorted_stopped_daemons = [ 168 daemon 169 for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x]) 170 ] 171 sorted_paused_daemons = [ 172 daemon 173 for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x]) 174 ] 175 sorted_running_daemons = [ 176 daemon 177 for daemon in sorted(running_daemons, key=lambda x: daemons_began[x]) 178 ] 179 return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons 180 181 182def get_daemon_ids() -> List[str]: 183 """ 184 Return the IDs of all daemons on disk. 185 """ 186 return [ 187 daemon_dir 188 for daemon_dir in sorted(os.listdir(DAEMON_RESOURCES_PATH)) 189 if (DAEMON_RESOURCES_PATH / daemon_dir / 'properties.json').exists() 190 ] 191 192 193def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 194 """ 195 Return a list of currently running daemons. 196 """ 197 if daemons is None: 198 daemons = get_daemons() 199 return [ 200 d 201 for d in daemons 202 if d.status == 'running' 203 ] 204 205 206def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 207 """ 208 Return a list of active but paused daemons. 209 """ 210 if daemons is None: 211 daemons = get_daemons() 212 return [ 213 d 214 for d in daemons 215 if d.status == 'paused' 216 ] 217 218 219def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 220 """ 221 Return a list of stopped daemons. 222 """ 223 if daemons is None: 224 daemons = get_daemons() 225 226 return [ 227 d 228 for d in daemons 229 if d.status == 'stopped' 230 ] 231 232 233def get_filtered_daemons( 234 filter_list: Optional[List[str]] = None, 235 warn: bool = False, 236) -> List[Daemon]: 237 """ 238 Return a list of `Daemons` filtered by a list of `daemon_ids`. 239 Only `Daemons` that exist are returned. 240 241 If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`). 242 243 Parameters 244 ---------- 245 filter_list: Optional[List[str]], default None 246 List of `daemon_ids` to include. If `daemon_ids` is `None` or empty, 247 return all `Daemons`. 248 249 warn: bool, default False 250 If `True`, raise warnings for non-existent `daemon_ids`. 251 252 Returns 253 ------- 254 A list of Daemon objects. 255 256 """ 257 if not filter_list: 258 daemons = get_daemons() 259 return [d for d in daemons if not d.hidden] 260 261 from meerschaum.utils.warnings import warn as _warn 262 daemons = [] 263 for d_id in filter_list: 264 try: 265 d = Daemon(daemon_id=d_id) 266 _exists = d.path.exists() 267 except Exception: 268 _exists = False 269 if not _exists: 270 if warn: 271 _warn(f"Daemon '{d_id}' does not exist.", stack=False) 272 continue 273 if d.hidden: 274 pass 275 daemons.append(d) 276 return daemons 277 278 279def running_in_daemon() -> bool: 280 """ 281 Return whether the current thread is running in a Daemon context. 282 """ 283 from meerschaum.config.static import STATIC_CONFIG 284 daemon_env_var = STATIC_CONFIG['environment']['daemon_id'] 285 return daemon_env_var in os.environ
def
daemon_entry(sysargs: Optional[List[str]] = None) -> Tuple[bool, str]:
21def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple: 22 """Parse sysargs and execute a Meerschaum action as a daemon. 23 24 Parameters 25 ---------- 26 sysargs: Optional[List[str]], default None 27 The command line arguments used in a Meerschaum action. 28 29 Returns 30 ------- 31 A SuccessTuple. 32 """ 33 from meerschaum._internal.entry import entry 34 _args = {} 35 if '--name' in sysargs or '--job-name' in sysargs: 36 from meerschaum._internal.arguments._parse_arguments import parse_arguments 37 _args = parse_arguments(sysargs) 38 filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')] 39 try: 40 label = shlex.join(filtered_sysargs) if sysargs else None 41 except Exception as e: 42 label = ' '.join(filtered_sysargs) if sysargs else None 43 44 name = _args.get('name', None) 45 daemon = None 46 if name: 47 try: 48 daemon = Daemon(daemon_id=name) 49 except Exception as e: 50 daemon = None 51 52 if daemon is not None: 53 existing_sysargs = daemon.properties['target']['args'][0] 54 existing_kwargs = parse_arguments(existing_sysargs) 55 56 ### Remove sysargs because flags are aliased. 57 _ = _args.pop('daemon', None) 58 _ = _args.pop('sysargs', None) 59 _ = _args.pop('filtered_sysargs', None) 60 debug = _args.pop('debug', None) 61 _args['sub_args'] = sorted(_args.get('sub_args', [])) 62 _ = existing_kwargs.pop('daemon', None) 63 _ = existing_kwargs.pop('sysargs', None) 64 _ = existing_kwargs.pop('filtered_sysargs', None) 65 _ = existing_kwargs.pop('debug', None) 66 existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', [])) 67 68 ### Only run if the kwargs equal or no actions are provided. 69 if existing_kwargs == _args or not _args.get('action', []): 70 if daemon.status == 'running': 71 return True, f"Daemon '{daemon}' is already running." 72 return daemon.run( 73 debug=debug, 74 allow_dirty_run=True, 75 ) 76 77 success_tuple = run_daemon( 78 entry, 79 filtered_sysargs, 80 daemon_id=_args.get('name', None) if _args else None, 81 label=label, 82 keep_daemon_output=('--rm' not in (sysargs or [])), 83 ) 84 return success_tuple
Parse sysargs and execute a Meerschaum action as a daemon.
Parameters
- sysargs (Optional[List[str]], default None): The command line arguments used in a Meerschaum action.
Returns
- A SuccessTuple.
def
daemon_action(**kw) -> Tuple[bool, str]:
87def daemon_action(**kw) -> SuccessTuple: 88 """Execute a Meerschaum action as a daemon.""" 89 from meerschaum.utils.packages import run_python_package 90 from meerschaum.utils.threading import Thread 91 from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs 92 from meerschaum.actions import get_action 93 94 kw['daemon'] = True 95 kw['shell'] = False 96 97 action = kw.get('action', None) 98 if action and get_action(action) is None: 99 if not kw.get('allow_shell_job') and not kw.get('force'): 100 return False, ( 101 f"Action '{action}' isn't recognized.\n\n" 102 + " Include `--allow-shell-job`, `--force`, or `-f`\n " 103 + "to enable shell commands to run as Meerschaum jobs." 104 ) 105 106 sysargs = parse_dict_to_sysargs(kw) 107 rc = run_python_package('meerschaum', sysargs, venv=None, debug=False) 108 msg = "Success" if rc == 0 else f"Daemon returned code: {rc}" 109 return rc == 0, msg
Execute a Meerschaum action as a daemon.
def
run_daemon( func: Callable[[Any], Any], *args, daemon_id: Optional[str] = None, keep_daemon_output: bool = True, allow_dirty_run: bool = False, label: Optional[str] = None, **kw) -> Any:
112def run_daemon( 113 func: Callable[[Any], Any], 114 *args, 115 daemon_id: Optional[str] = None, 116 keep_daemon_output: bool = True, 117 allow_dirty_run: bool = False, 118 label: Optional[str] = None, 119 **kw 120) -> Any: 121 """Execute a function as a daemon.""" 122 daemon = Daemon( 123 func, 124 daemon_id=daemon_id, 125 target_args=[arg for arg in args], 126 target_kw=kw, 127 label=label, 128 ) 129 return daemon.run( 130 keep_daemon_output=keep_daemon_output, 131 allow_dirty_run=allow_dirty_run, 132 )
Execute a function as a daemon.
135def get_daemons() -> List[Daemon]: 136 """ 137 Return all existing Daemons, sorted by end time. 138 """ 139 daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()] 140 daemons_status = {daemon: daemon.status for daemon in daemons} 141 running_daemons = { 142 daemon: daemons_status[daemon] 143 for daemon in daemons 144 if daemons_status[daemon] == 'running' 145 } 146 paused_daemons = { 147 daemon: daemons_status[daemon] 148 for daemon in daemons 149 if daemons_status[daemon] == 'paused' 150 } 151 stopped_daemons = { 152 daemon: daemons_status[daemon] 153 for daemon in daemons 154 if daemons_status[daemon] == 'stopped' 155 } 156 daemons_began = { 157 daemon: daemon.properties.get('process', {}).get('began', '9999') 158 for daemon in daemons 159 } 160 daemons_paused = { 161 daemon: daemon.properties.get('process', {}).get('paused', '9999') 162 for daemon in daemons 163 } 164 daemons_ended = { 165 daemon: daemon.properties.get('process', {}).get('ended', '9999') 166 for daemon in daemons 167 } 168 sorted_stopped_daemons = [ 169 daemon 170 for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x]) 171 ] 172 sorted_paused_daemons = [ 173 daemon 174 for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x]) 175 ] 176 sorted_running_daemons = [ 177 daemon 178 for daemon in sorted(running_daemons, key=lambda x: daemons_began[x]) 179 ] 180 return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons
Return all existing Daemons, sorted by end time.
def
get_daemon_ids() -> List[str]:
183def get_daemon_ids() -> List[str]: 184 """ 185 Return the IDs of all daemons on disk. 186 """ 187 return [ 188 daemon_dir 189 for daemon_dir in sorted(os.listdir(DAEMON_RESOURCES_PATH)) 190 if (DAEMON_RESOURCES_PATH / daemon_dir / 'properties.json').exists() 191 ]
Return the IDs of all daemons on disk.
def
get_running_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
194def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 195 """ 196 Return a list of currently running daemons. 197 """ 198 if daemons is None: 199 daemons = get_daemons() 200 return [ 201 d 202 for d in daemons 203 if d.status == 'running' 204 ]
Return a list of currently running daemons.
def
get_paused_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
207def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 208 """ 209 Return a list of active but paused daemons. 210 """ 211 if daemons is None: 212 daemons = get_daemons() 213 return [ 214 d 215 for d in daemons 216 if d.status == 'paused' 217 ]
Return a list of active but paused daemons.
def
get_stopped_daemons( daemons: Optional[List[meerschaum.utils.daemon.Daemon.Daemon]] = None) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
220def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 221 """ 222 Return a list of stopped daemons. 223 """ 224 if daemons is None: 225 daemons = get_daemons() 226 227 return [ 228 d 229 for d in daemons 230 if d.status == 'stopped' 231 ]
Return a list of stopped daemons.
def
get_filtered_daemons( filter_list: Optional[List[str]] = None, warn: bool = False) -> List[meerschaum.utils.daemon.Daemon.Daemon]:
234def get_filtered_daemons( 235 filter_list: Optional[List[str]] = None, 236 warn: bool = False, 237) -> List[Daemon]: 238 """ 239 Return a list of `Daemons` filtered by a list of `daemon_ids`. 240 Only `Daemons` that exist are returned. 241 242 If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`). 243 244 Parameters 245 ---------- 246 filter_list: Optional[List[str]], default None 247 List of `daemon_ids` to include. If `daemon_ids` is `None` or empty, 248 return all `Daemons`. 249 250 warn: bool, default False 251 If `True`, raise warnings for non-existent `daemon_ids`. 252 253 Returns 254 ------- 255 A list of Daemon objects. 256 257 """ 258 if not filter_list: 259 daemons = get_daemons() 260 return [d for d in daemons if not d.hidden] 261 262 from meerschaum.utils.warnings import warn as _warn 263 daemons = [] 264 for d_id in filter_list: 265 try: 266 d = Daemon(daemon_id=d_id) 267 _exists = d.path.exists() 268 except Exception: 269 _exists = False 270 if not _exists: 271 if warn: 272 _warn(f"Daemon '{d_id}' does not exist.", stack=False) 273 continue 274 if d.hidden: 275 pass 276 daemons.append(d) 277 return daemons
Return a list of Daemons
filtered by a list of daemon_ids
.
Only Daemons
that exist are returned.
If filter_list
is None
or empty, return all Daemons
(from get_daemons()
).
Parameters
- filter_list (Optional[List[str]], default None):
List of
daemon_ids
to include. Ifdaemon_ids
isNone
or empty, return allDaemons
. - warn (bool, default False):
If
True
, raise warnings for non-existentdaemon_ids
.
Returns
- A list of Daemon objects.
def
running_in_daemon() -> bool:
280def running_in_daemon() -> bool: 281 """ 282 Return whether the current thread is running in a Daemon context. 283 """ 284 from meerschaum.config.static import STATIC_CONFIG 285 daemon_env_var = STATIC_CONFIG['environment']['daemon_id'] 286 return daemon_env_var in os.environ
Return whether the current thread is running in a Daemon context.