meerschaum.utils.daemon
Manage Daemons via the Daemon
class.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Manage Daemons via the `Daemon` class. 7""" 8 9from __future__ import annotations 10 11import os 12import pathlib 13import shutil 14import json 15import datetime 16import threading 17import shlex 18 19from meerschaum.utils.typing import SuccessTuple, List, Optional, Callable, Any, Dict, Union 20from meerschaum.utils.daemon.StdinFile import StdinFile 21from meerschaum.utils.daemon.Daemon import Daemon 22from meerschaum.utils.daemon.RotatingFile import RotatingFile 23from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor 24from meerschaum.utils.daemon._names import get_new_daemon_name 25 26 27__all__ = ( 28 'daemon_action', 29 'daemon_entry', 30 'get_daemons', 31 'get_daemon_ids', 32 'get_running_daemons', 33 'get_stopped_daemons', 34 'get_paused_daemons', 35 'get_filtered_daemons', 36 'get_new_daemon_name', 37 'run_daemon', 38 'running_in_daemon', 39 'Daemon', 40 'StdinFile', 41 'RotatingFile', 42 'FileDescriptorInterceptor', 43) 44 45_daemons = {} 46 47 48def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple: 49 """Parse sysargs and execute a Meerschaum action as a daemon. 50 51 Parameters 52 ---------- 53 sysargs: Optional[List[str]], default None 54 The command line arguments used in a Meerschaum action. 55 56 Returns 57 ------- 58 A SuccessTuple. 59 """ 60 from meerschaum._internal.entry import entry 61 _args = {} 62 if '--name' in sysargs or '--job-name' in sysargs: 63 from meerschaum._internal.arguments._parse_arguments import parse_arguments 64 _args = parse_arguments(sysargs) 65 filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')] 66 try: 67 label = shlex.join(filtered_sysargs) if sysargs else None 68 except Exception: 69 label = ' '.join(filtered_sysargs) if sysargs else None 70 71 name = _args.get('name', None) 72 daemon = None 73 if name: 74 try: 75 daemon = Daemon(daemon_id=name) 76 except Exception: 77 daemon = None 78 79 if daemon is not None: 80 existing_sysargs = daemon.properties['target']['args'][0] 81 existing_kwargs = parse_arguments(existing_sysargs) 82 83 ### Remove sysargs because flags are aliased. 84 _ = _args.pop('daemon', None) 85 _ = _args.pop('sysargs', None) 86 _ = _args.pop('filtered_sysargs', None) 87 debug = _args.pop('debug', None) 88 _args['sub_args'] = sorted(_args.get('sub_args', [])) 89 _ = existing_kwargs.pop('daemon', None) 90 _ = existing_kwargs.pop('sysargs', None) 91 _ = existing_kwargs.pop('filtered_sysargs', None) 92 _ = existing_kwargs.pop('debug', None) 93 existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', [])) 94 95 ### Only run if the kwargs equal or no actions are provided. 96 if existing_kwargs == _args or not _args.get('action', []): 97 if daemon.status == 'running': 98 return True, f"Daemon '{daemon}' is already running." 99 return daemon.run( 100 debug=debug, 101 allow_dirty_run=True, 102 ) 103 104 success_tuple = run_daemon( 105 entry, 106 filtered_sysargs, 107 daemon_id=_args.get('name', None) if _args else None, 108 label=label, 109 keep_daemon_output=('--rm' not in (sysargs or [])), 110 ) 111 return success_tuple 112 113 114def daemon_action(**kw) -> SuccessTuple: 115 """Execute a Meerschaum action as a daemon.""" 116 from meerschaum.utils.packages import run_python_package 117 from meerschaum.utils.threading import Thread 118 from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs 119 from meerschaum.actions import get_action 120 121 kw['daemon'] = True 122 kw['shell'] = False 123 124 action = kw.get('action', None) 125 if action and get_action(action) is None: 126 if not kw.get('allow_shell_job') and not kw.get('force'): 127 return False, ( 128 f"Action '{action}' isn't recognized.\n\n" 129 + " Include `--allow-shell-job`, `--force`, or `-f`\n " 130 + "to enable shell commands to run as Meerschaum jobs." 131 ) 132 133 sysargs = parse_dict_to_sysargs(kw) 134 rc = run_python_package('meerschaum', sysargs, venv=None, debug=False) 135 msg = "Success" if rc == 0 else f"Daemon returned code: {rc}" 136 return rc == 0, msg 137 138 139def run_daemon( 140 func: Callable[[Any], Any], 141 *args, 142 daemon_id: Optional[str] = None, 143 keep_daemon_output: bool = True, 144 allow_dirty_run: bool = False, 145 label: Optional[str] = None, 146 **kw 147) -> Any: 148 """Execute a function as a daemon.""" 149 daemon = Daemon( 150 func, 151 daemon_id=daemon_id, 152 target_args=[arg for arg in args], 153 target_kw=kw, 154 label=label, 155 ) 156 return daemon.run( 157 keep_daemon_output=keep_daemon_output, 158 allow_dirty_run=allow_dirty_run, 159 ) 160 161 162def get_daemons() -> List[Daemon]: 163 """ 164 Return all existing Daemons, sorted by end time. 165 """ 166 daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()] 167 daemons_status = {daemon: daemon.status for daemon in daemons} 168 running_daemons = { 169 daemon: daemons_status[daemon] 170 for daemon in daemons 171 if daemons_status[daemon] == 'running' 172 } 173 paused_daemons = { 174 daemon: daemons_status[daemon] 175 for daemon in daemons 176 if daemons_status[daemon] == 'paused' 177 } 178 stopped_daemons = { 179 daemon: daemons_status[daemon] 180 for daemon in daemons 181 if daemons_status[daemon] == 'stopped' 182 } 183 daemons_began = { 184 daemon: daemon.properties.get('process', {}).get('began', '9999') 185 for daemon in daemons 186 } 187 daemons_paused = { 188 daemon: daemon.properties.get('process', {}).get('paused', '9999') 189 for daemon in daemons 190 } 191 daemons_ended = { 192 daemon: daemon.properties.get('process', {}).get('ended', '9999') 193 for daemon in daemons 194 } 195 sorted_stopped_daemons = [ 196 daemon 197 for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x]) 198 ] 199 sorted_paused_daemons = [ 200 daemon 201 for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x]) 202 ] 203 sorted_running_daemons = [ 204 daemon 205 for daemon in sorted(running_daemons, key=lambda x: daemons_began[x]) 206 ] 207 return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons 208 209 210def get_daemon_ids() -> List[str]: 211 """ 212 Return the IDs of all daemons on disk. 213 """ 214 from meerschaum.config._paths import DAEMON_RESOURCES_PATH 215 return [ 216 daemon_dir 217 for daemon_dir in sorted(os.listdir(DAEMON_RESOURCES_PATH)) 218 if (DAEMON_RESOURCES_PATH / daemon_dir / 'properties.json').exists() 219 ] 220 221 222def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 223 """ 224 Return a list of currently running daemons. 225 """ 226 if daemons is None: 227 daemons = get_daemons() 228 return [ 229 d 230 for d in daemons 231 if d.status == 'running' 232 ] 233 234 235def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 236 """ 237 Return a list of active but paused daemons. 238 """ 239 if daemons is None: 240 daemons = get_daemons() 241 return [ 242 d 243 for d in daemons 244 if d.status == 'paused' 245 ] 246 247 248def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 249 """ 250 Return a list of stopped daemons. 251 """ 252 if daemons is None: 253 daemons = get_daemons() 254 255 return [ 256 d 257 for d in daemons 258 if d.status == 'stopped' 259 ] 260 261 262def get_filtered_daemons( 263 filter_list: Optional[List[str]] = None, 264 warn: bool = False, 265) -> List[Daemon]: 266 """ 267 Return a list of `Daemons` filtered by a list of `daemon_ids`. 268 Only `Daemons` that exist are returned. 269 270 If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`). 271 272 Parameters 273 ---------- 274 filter_list: Optional[List[str]], default None 275 List of `daemon_ids` to include. If `daemon_ids` is `None` or empty, 276 return all `Daemons`. 277 278 warn: bool, default False 279 If `True`, raise warnings for non-existent `daemon_ids`. 280 281 Returns 282 ------- 283 A list of Daemon objects. 284 285 """ 286 if not filter_list: 287 daemons = get_daemons() 288 return [d for d in daemons if not d.hidden] 289 290 from meerschaum.utils.warnings import warn as _warn 291 daemons = [] 292 for d_id in filter_list: 293 try: 294 d = Daemon(daemon_id=d_id) 295 _exists = d.path.exists() 296 except Exception: 297 _exists = False 298 if not _exists: 299 if warn: 300 _warn(f"Daemon '{d_id}' does not exist.", stack=False) 301 continue 302 daemons.append(d) 303 return daemons 304 305 306def running_in_daemon() -> bool: 307 """ 308 Return whether the current thread is running in a Daemon context. 309 """ 310 from meerschaum._internal.static import STATIC_CONFIG 311 daemon_env_var = STATIC_CONFIG['environment']['daemon_id'] 312 return daemon_env_var in os.environ 313 314 315def get_current_daemon() -> Union[Daemon, None]: 316 """ 317 If running withing a daemon context, return the corresponding `Daemon`. 318 Otherwise return `None`. 319 """ 320 from meerschaum._internal.static import STATIC_CONFIG 321 daemon_env_var = STATIC_CONFIG['environment']['daemon_id'] 322 daemon_id = os.environ.get(daemon_env_var, None) 323 if daemon_id is None: 324 return None 325 326 return _daemons.get(daemon_id, Daemon(daemon_id=daemon_id))
115def daemon_action(**kw) -> SuccessTuple: 116 """Execute a Meerschaum action as a daemon.""" 117 from meerschaum.utils.packages import run_python_package 118 from meerschaum.utils.threading import Thread 119 from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs 120 from meerschaum.actions import get_action 121 122 kw['daemon'] = True 123 kw['shell'] = False 124 125 action = kw.get('action', None) 126 if action and get_action(action) is None: 127 if not kw.get('allow_shell_job') and not kw.get('force'): 128 return False, ( 129 f"Action '{action}' isn't recognized.\n\n" 130 + " Include `--allow-shell-job`, `--force`, or `-f`\n " 131 + "to enable shell commands to run as Meerschaum jobs." 132 ) 133 134 sysargs = parse_dict_to_sysargs(kw) 135 rc = run_python_package('meerschaum', sysargs, venv=None, debug=False) 136 msg = "Success" if rc == 0 else f"Daemon returned code: {rc}" 137 return rc == 0, msg
Execute a Meerschaum action as a daemon.
49def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple: 50 """Parse sysargs and execute a Meerschaum action as a daemon. 51 52 Parameters 53 ---------- 54 sysargs: Optional[List[str]], default None 55 The command line arguments used in a Meerschaum action. 56 57 Returns 58 ------- 59 A SuccessTuple. 60 """ 61 from meerschaum._internal.entry import entry 62 _args = {} 63 if '--name' in sysargs or '--job-name' in sysargs: 64 from meerschaum._internal.arguments._parse_arguments import parse_arguments 65 _args = parse_arguments(sysargs) 66 filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')] 67 try: 68 label = shlex.join(filtered_sysargs) if sysargs else None 69 except Exception: 70 label = ' '.join(filtered_sysargs) if sysargs else None 71 72 name = _args.get('name', None) 73 daemon = None 74 if name: 75 try: 76 daemon = Daemon(daemon_id=name) 77 except Exception: 78 daemon = None 79 80 if daemon is not None: 81 existing_sysargs = daemon.properties['target']['args'][0] 82 existing_kwargs = parse_arguments(existing_sysargs) 83 84 ### Remove sysargs because flags are aliased. 85 _ = _args.pop('daemon', None) 86 _ = _args.pop('sysargs', None) 87 _ = _args.pop('filtered_sysargs', None) 88 debug = _args.pop('debug', None) 89 _args['sub_args'] = sorted(_args.get('sub_args', [])) 90 _ = existing_kwargs.pop('daemon', None) 91 _ = existing_kwargs.pop('sysargs', None) 92 _ = existing_kwargs.pop('filtered_sysargs', None) 93 _ = existing_kwargs.pop('debug', None) 94 existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', [])) 95 96 ### Only run if the kwargs equal or no actions are provided. 97 if existing_kwargs == _args or not _args.get('action', []): 98 if daemon.status == 'running': 99 return True, f"Daemon '{daemon}' is already running." 100 return daemon.run( 101 debug=debug, 102 allow_dirty_run=True, 103 ) 104 105 success_tuple = run_daemon( 106 entry, 107 filtered_sysargs, 108 daemon_id=_args.get('name', None) if _args else None, 109 label=label, 110 keep_daemon_output=('--rm' not in (sysargs or [])), 111 ) 112 return success_tuple
Parse sysargs and execute a Meerschaum action as a daemon.
Parameters
- sysargs (Optional[List[str]], default None): The command line arguments used in a Meerschaum action.
Returns
- A SuccessTuple.
163def get_daemons() -> List[Daemon]: 164 """ 165 Return all existing Daemons, sorted by end time. 166 """ 167 daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()] 168 daemons_status = {daemon: daemon.status for daemon in daemons} 169 running_daemons = { 170 daemon: daemons_status[daemon] 171 for daemon in daemons 172 if daemons_status[daemon] == 'running' 173 } 174 paused_daemons = { 175 daemon: daemons_status[daemon] 176 for daemon in daemons 177 if daemons_status[daemon] == 'paused' 178 } 179 stopped_daemons = { 180 daemon: daemons_status[daemon] 181 for daemon in daemons 182 if daemons_status[daemon] == 'stopped' 183 } 184 daemons_began = { 185 daemon: daemon.properties.get('process', {}).get('began', '9999') 186 for daemon in daemons 187 } 188 daemons_paused = { 189 daemon: daemon.properties.get('process', {}).get('paused', '9999') 190 for daemon in daemons 191 } 192 daemons_ended = { 193 daemon: daemon.properties.get('process', {}).get('ended', '9999') 194 for daemon in daemons 195 } 196 sorted_stopped_daemons = [ 197 daemon 198 for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x]) 199 ] 200 sorted_paused_daemons = [ 201 daemon 202 for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x]) 203 ] 204 sorted_running_daemons = [ 205 daemon 206 for daemon in sorted(running_daemons, key=lambda x: daemons_began[x]) 207 ] 208 return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons
Return all existing Daemons, sorted by end time.
211def get_daemon_ids() -> List[str]: 212 """ 213 Return the IDs of all daemons on disk. 214 """ 215 from meerschaum.config._paths import DAEMON_RESOURCES_PATH 216 return [ 217 daemon_dir 218 for daemon_dir in sorted(os.listdir(DAEMON_RESOURCES_PATH)) 219 if (DAEMON_RESOURCES_PATH / daemon_dir / 'properties.json').exists() 220 ]
Return the IDs of all daemons on disk.
223def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 224 """ 225 Return a list of currently running daemons. 226 """ 227 if daemons is None: 228 daemons = get_daemons() 229 return [ 230 d 231 for d in daemons 232 if d.status == 'running' 233 ]
Return a list of currently running daemons.
249def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 250 """ 251 Return a list of stopped daemons. 252 """ 253 if daemons is None: 254 daemons = get_daemons() 255 256 return [ 257 d 258 for d in daemons 259 if d.status == 'stopped' 260 ]
Return a list of stopped daemons.
236def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 237 """ 238 Return a list of active but paused daemons. 239 """ 240 if daemons is None: 241 daemons = get_daemons() 242 return [ 243 d 244 for d in daemons 245 if d.status == 'paused' 246 ]
Return a list of active but paused daemons.
263def get_filtered_daemons( 264 filter_list: Optional[List[str]] = None, 265 warn: bool = False, 266) -> List[Daemon]: 267 """ 268 Return a list of `Daemons` filtered by a list of `daemon_ids`. 269 Only `Daemons` that exist are returned. 270 271 If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`). 272 273 Parameters 274 ---------- 275 filter_list: Optional[List[str]], default None 276 List of `daemon_ids` to include. If `daemon_ids` is `None` or empty, 277 return all `Daemons`. 278 279 warn: bool, default False 280 If `True`, raise warnings for non-existent `daemon_ids`. 281 282 Returns 283 ------- 284 A list of Daemon objects. 285 286 """ 287 if not filter_list: 288 daemons = get_daemons() 289 return [d for d in daemons if not d.hidden] 290 291 from meerschaum.utils.warnings import warn as _warn 292 daemons = [] 293 for d_id in filter_list: 294 try: 295 d = Daemon(daemon_id=d_id) 296 _exists = d.path.exists() 297 except Exception: 298 _exists = False 299 if not _exists: 300 if warn: 301 _warn(f"Daemon '{d_id}' does not exist.", stack=False) 302 continue 303 daemons.append(d) 304 return daemons
Return a list of Daemons
filtered by a list of daemon_ids
.
Only Daemons
that exist are returned.
If filter_list
is None
or empty, return all Daemons
(from get_daemons()
).
Parameters
- filter_list (Optional[List[str]], default None):
List of
daemon_ids
to include. Ifdaemon_ids
isNone
or empty, return allDaemons
. - warn (bool, default False):
If
True
, raise warnings for non-existentdaemon_ids
.
Returns
- A list of Daemon objects.
117def get_new_daemon_name() -> str: 118 """ 119 Generate a new random name until a unique one is found 120 (up to ~6000 maximum possibilities). 121 """ 122 from meerschaum.config._paths import DAEMON_RESOURCES_PATH 123 existing_names = ( 124 os.listdir(DAEMON_RESOURCES_PATH) 125 if DAEMON_RESOURCES_PATH.exists() 126 else [] 127 ) 128 while True: 129 _name = generate_random_name() 130 if _name in existing_names: 131 continue 132 break 133 return _name
Generate a new random name until a unique one is found (up to ~6000 maximum possibilities).
140def run_daemon( 141 func: Callable[[Any], Any], 142 *args, 143 daemon_id: Optional[str] = None, 144 keep_daemon_output: bool = True, 145 allow_dirty_run: bool = False, 146 label: Optional[str] = None, 147 **kw 148) -> Any: 149 """Execute a function as a daemon.""" 150 daemon = Daemon( 151 func, 152 daemon_id=daemon_id, 153 target_args=[arg for arg in args], 154 target_kw=kw, 155 label=label, 156 ) 157 return daemon.run( 158 keep_daemon_output=keep_daemon_output, 159 allow_dirty_run=allow_dirty_run, 160 )
Execute a function as a daemon.
307def running_in_daemon() -> bool: 308 """ 309 Return whether the current thread is running in a Daemon context. 310 """ 311 from meerschaum._internal.static import STATIC_CONFIG 312 daemon_env_var = STATIC_CONFIG['environment']['daemon_id'] 313 return daemon_env_var in os.environ
Return whether the current thread is running in a Daemon context.
42class Daemon: 43 """ 44 Daemonize Python functions into background processes. 45 46 Examples 47 -------- 48 >>> import meerschaum as mrsm 49 >>> from meerschaum.utils.daemons import Daemon 50 >>> daemon = Daemon(print, ('hi',)) 51 >>> success, msg = daemon.run() 52 >>> print(daemon.log_text) 53 54 2024-07-29 18:03 | hi 55 2024-07-29 18:03 | 56 >>> daemon.run(allow_dirty_run=True) 57 >>> print(daemon.log_text) 58 59 2024-07-29 18:03 | hi 60 2024-07-29 18:03 | 61 2024-07-29 18:05 | hi 62 2024-07-29 18:05 | 63 >>> mrsm.pprint(daemon.properties) 64 { 65 'label': 'print', 66 'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}}, 67 'result': None, 68 'process': {'ended': '2024-07-29T18:03:33.752806'} 69 } 70 71 """ 72 73 def __new__( 74 cls, 75 *args, 76 daemon_id: Optional[str] = None, 77 **kw 78 ): 79 """ 80 If a daemon_id is provided and already exists, read from its pickle file. 81 """ 82 instance = super(Daemon, cls).__new__(cls) 83 if daemon_id is not None: 84 instance.daemon_id = daemon_id 85 if instance.pickle_path.exists(): 86 instance = instance.read_pickle() 87 return instance 88 89 @classmethod 90 def from_properties_file(cls, daemon_id: str) -> Daemon: 91 """ 92 Return a Daemon from a properties dictionary. 93 """ 94 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 95 if not properties_path.exists(): 96 raise OSError(f"Properties file '{properties_path}' does not exist.") 97 98 try: 99 with open(properties_path, 'r', encoding='utf-8') as f: 100 properties = json.load(f) 101 except Exception: 102 properties = {} 103 104 if not properties: 105 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 106 107 daemon_id = properties_path.parent.name 108 target_cf = properties.get('target', {}) 109 target_module_name = target_cf.get('module', None) 110 target_function_name = target_cf.get('name', None) 111 target_args = target_cf.get('args', None) 112 target_kw = target_cf.get('kw', None) 113 label = properties.get('label', None) 114 115 if None in [ 116 target_module_name, 117 target_function_name, 118 target_args, 119 target_kw, 120 ]: 121 raise ValueError("Missing target function information.") 122 123 target_module = importlib.import_module(target_module_name) 124 target_function = getattr(target_module, target_function_name) 125 126 return Daemon( 127 daemon_id=daemon_id, 128 target=target_function, 129 target_args=target_args, 130 target_kw=target_kw, 131 properties=properties, 132 label=label, 133 ) 134 135 136 def __init__( 137 self, 138 target: Optional[Callable[[Any], Any]] = None, 139 target_args: Union[List[Any], Tuple[Any], None] = None, 140 target_kw: Optional[Dict[str, Any]] = None, 141 env: Optional[Dict[str, str]] = None, 142 daemon_id: Optional[str] = None, 143 label: Optional[str] = None, 144 properties: Optional[Dict[str, Any]] = None, 145 pickle: bool = True, 146 ): 147 """ 148 Parameters 149 ---------- 150 target: Optional[Callable[[Any], Any]], default None, 151 The function to execute in a child process. 152 153 target_args: Union[List[Any], Tuple[Any], None], default None 154 Positional arguments to pass to the target function. 155 156 target_kw: Optional[Dict[str, Any]], default None 157 Keyword arguments to pass to the target function. 158 159 env: Optional[Dict[str, str]], default None 160 If provided, set these environment variables in the daemon process. 161 162 daemon_id: Optional[str], default None 163 Build a `Daemon` from an existing `daemon_id`. 164 If `daemon_id` is provided, other arguments are ignored and are derived 165 from the existing pickled `Daemon`. 166 167 label: Optional[str], default None 168 Label string to help identifiy a daemon. 169 If `None`, use the function name instead. 170 171 properties: Optional[Dict[str, Any]], default None 172 Override reading from the properties JSON by providing an existing dictionary. 173 """ 174 _pickle = self.__dict__.get('_pickle', False) 175 if daemon_id is not None: 176 self.daemon_id = daemon_id 177 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 178 179 if not self.properties_path.exists(): 180 raise Exception( 181 f"Daemon '{self.daemon_id}' does not exist. " 182 + "Pass a target to create a new Daemon." 183 ) 184 185 try: 186 new_daemon = self.from_properties_file(daemon_id) 187 except Exception: 188 new_daemon = None 189 190 if new_daemon is not None: 191 new_daemon.write_pickle() 192 target = new_daemon.target 193 target_args = new_daemon.target_args 194 target_kw = new_daemon.target_kw 195 label = new_daemon.label 196 self._properties = new_daemon.properties 197 else: 198 try: 199 self.properties_path.unlink() 200 except Exception: 201 pass 202 203 raise Exception( 204 f"Could not recover daemon '{self.daemon_id}' " 205 + "from its properties file." 206 ) 207 208 if 'target' not in self.__dict__: 209 if target is None: 210 error("Cannot create a Daemon without a target.") 211 self.target = target 212 213 self.pickle = pickle 214 215 ### NOTE: We have to check self.__dict__ in case we un-pickling. 216 if '_target_args' not in self.__dict__: 217 self._target_args = target_args 218 if '_target_kw' not in self.__dict__: 219 self._target_kw = target_kw 220 221 if 'label' not in self.__dict__: 222 if label is None: 223 label = ( 224 self.target.__name__ if '__name__' in self.target.__dir__() 225 else str(self.target) 226 ) 227 self.label = label 228 elif label is not None: 229 self.label = label 230 231 if 'daemon_id' not in self.__dict__: 232 self.daemon_id = get_new_daemon_name() 233 if '_properties' not in self.__dict__: 234 self._properties = properties 235 elif properties: 236 if self._properties is None: 237 self._properties = {} 238 self._properties.update(properties) 239 if self._properties is None: 240 self._properties = {} 241 242 self._properties.update({'label': self.label}) 243 if env: 244 self._properties.update({'env': env}) 245 246 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 247 _ = self.process 248 249 250 def _run_exit( 251 self, 252 keep_daemon_output: bool = True, 253 allow_dirty_run: bool = False, 254 ) -> Any: 255 """Run the daemon's target function. 256 NOTE: This WILL EXIT the parent process! 257 258 Parameters 259 ---------- 260 keep_daemon_output: bool, default True 261 If `False`, delete the daemon's output directory upon exiting. 262 263 allow_dirty_run, bool, default False: 264 If `True`, run the daemon, even if the `daemon_id` directory exists. 265 This option is dangerous because if the same `daemon_id` runs twice, 266 the last to finish will overwrite the output of the first. 267 268 Returns 269 ------- 270 Nothing — this will exit the parent process. 271 """ 272 import platform 273 import sys 274 import os 275 import traceback 276 from meerschaum.utils.warnings import warn 277 from meerschaum.config import get_config 278 daemon = attempt_import('daemon') 279 lines = get_config('jobs', 'terminal', 'lines') 280 columns = get_config('jobs', 'terminal', 'columns') 281 282 if platform.system() == 'Windows': 283 return False, "Windows is no longer supported." 284 285 self._setup(allow_dirty_run) 286 287 _daemons.append(self) 288 289 logs_cf = self.properties.get('logs', {}) 290 log_refresh_seconds = logs_cf.get('refresh_files_seconds', None) 291 if log_refresh_seconds is None: 292 log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds') 293 write_timestamps = logs_cf.get('write_timestamps', None) 294 if write_timestamps is None: 295 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 296 297 self._log_refresh_timer = RepeatTimer( 298 log_refresh_seconds, 299 partial(self.rotating_log.refresh_files, start_interception=write_timestamps), 300 ) 301 302 capture_stdin = logs_cf.get('stdin', True) 303 cwd = self.properties.get('cwd', os.getcwd()) 304 305 ### NOTE: The SIGINT handler has been removed so that child processes may handle 306 ### KeyboardInterrupts themselves. 307 ### The previous aggressive approach was redundant because of the SIGTERM handler. 308 self._daemon_context = daemon.DaemonContext( 309 pidfile=self.pid_lock, 310 stdout=self.rotating_log, 311 stderr=self.rotating_log, 312 stdin=(self.stdin_file if capture_stdin else None), 313 working_directory=cwd, 314 detach_process=True, 315 files_preserve=list(self.rotating_log.subfile_objects.values()), 316 signal_map={ 317 signal.SIGTERM: self._handle_sigterm, 318 }, 319 ) 320 321 if capture_stdin and sys.stdin is None: 322 raise OSError("Cannot daemonize without stdin.") 323 324 try: 325 os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns)) 326 with self._daemon_context: 327 if capture_stdin: 328 sys.stdin = self.stdin_file 329 _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None) 330 os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id 331 os.environ['PYTHONUNBUFFERED'] = '1' 332 333 ### Allow the user to override environment variables. 334 env = self.properties.get('env', {}) 335 if env and isinstance(env, dict): 336 os.environ.update({str(k): str(v) for k, v in env.items()}) 337 338 self.rotating_log.refresh_files(start_interception=True) 339 result = None 340 try: 341 with open(self.pid_path, 'w+', encoding='utf-8') as f: 342 f.write(str(os.getpid())) 343 344 ### NOTE: The timer fails to start for remote actions to localhost. 345 try: 346 if not self._log_refresh_timer.is_running(): 347 self._log_refresh_timer.start() 348 except Exception: 349 pass 350 351 self.properties['result'] = None 352 self._capture_process_timestamp('began') 353 result = self.target(*self.target_args, **self.target_kw) 354 self.properties['result'] = result 355 except (BrokenPipeError, KeyboardInterrupt, SystemExit): 356 result = False, traceback.format_exc() 357 except Exception as e: 358 warn( 359 f"Exception in daemon target function: {traceback.format_exc()}", 360 ) 361 result = False, str(e) 362 finally: 363 _results[self.daemon_id] = result 364 self.properties['result'] = result 365 366 if keep_daemon_output: 367 self._capture_process_timestamp('ended') 368 else: 369 self.cleanup() 370 371 self._log_refresh_timer.cancel() 372 try: 373 if self.pid is None and self.pid_path.exists(): 374 self.pid_path.unlink() 375 except Exception: 376 pass 377 378 if is_success_tuple(result): 379 try: 380 mrsm.pprint(result) 381 except BrokenPipeError: 382 pass 383 384 except Exception: 385 daemon_error = traceback.format_exc() 386 from meerschaum.config.paths import DAEMON_ERROR_LOG_PATH 387 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 388 f.write( 389 f"Error in Daemon '{self}':\n\n" 390 f"{sys.stdin=}\n" 391 f"{self.stdin_file_path=}\n" 392 f"{self.stdin_file_path.exists()=}\n\n" 393 f"{daemon_error}\n\n" 394 ) 395 warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}") 396 397 def _capture_process_timestamp( 398 self, 399 process_key: str, 400 write_properties: bool = True, 401 ) -> None: 402 """ 403 Record the current timestamp to the parameters `process:<process_key>`. 404 405 Parameters 406 ---------- 407 process_key: str 408 Under which key to store the timestamp. 409 410 write_properties: bool, default True 411 If `True` persist the properties to disk immediately after capturing the timestamp. 412 """ 413 if 'process' not in self.properties: 414 self.properties['process'] = {} 415 416 if process_key not in ('began', 'ended', 'paused', 'stopped'): 417 raise ValueError(f"Invalid key '{process_key}'.") 418 419 self.properties['process'][process_key] = ( 420 datetime.now(timezone.utc).replace(tzinfo=None).isoformat() 421 ) 422 if write_properties: 423 self.write_properties() 424 425 def run( 426 self, 427 keep_daemon_output: bool = True, 428 allow_dirty_run: bool = False, 429 wait: bool = False, 430 timeout: Union[int, float] = 4, 431 debug: bool = False, 432 ) -> SuccessTuple: 433 """Run the daemon as a child process and continue executing the parent. 434 435 Parameters 436 ---------- 437 keep_daemon_output: bool, default True 438 If `False`, delete the daemon's output directory upon exiting. 439 440 allow_dirty_run: bool, default False 441 If `True`, run the daemon, even if the `daemon_id` directory exists. 442 This option is dangerous because if the same `daemon_id` runs concurrently, 443 the last to finish will overwrite the output of the first. 444 445 wait: bool, default True 446 If `True`, block until `Daemon.status` is running (or the timeout expires). 447 448 timeout: Union[int, float], default 4 449 If `wait` is `True`, block for up to `timeout` seconds before returning a failure. 450 451 Returns 452 ------- 453 A SuccessTuple indicating success. 454 455 """ 456 import platform 457 if platform.system() == 'Windows': 458 return False, "Cannot run background jobs on Windows." 459 460 ### The daemon might exist and be paused. 461 if self.status == 'paused': 462 return self.resume() 463 464 self._remove_stop_file() 465 if self.status == 'running': 466 return True, f"Daemon '{self}' is already running." 467 468 self.mkdir_if_not_exists(allow_dirty_run) 469 _write_pickle_success_tuple = self.write_pickle() 470 if not _write_pickle_success_tuple[0]: 471 return _write_pickle_success_tuple 472 473 _launch_daemon_code = ( 474 "from meerschaum.utils.daemon import Daemon, _daemons; " 475 f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 476 f"_daemons['{self.daemon_id}'] = daemon; " 477 f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 478 "allow_dirty_run=True)" 479 ) 480 env = dict(os.environ) 481 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 482 msg = ( 483 "Success" 484 if _launch_success_bool 485 else f"Failed to start daemon '{self.daemon_id}'." 486 ) 487 if not wait or not _launch_success_bool: 488 return _launch_success_bool, msg 489 490 timeout = self.get_timeout_seconds(timeout) 491 check_timeout_interval = self.get_check_timeout_interval_seconds() 492 493 if not timeout: 494 success = self.status == 'running' 495 msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'." 496 if success: 497 self._capture_process_timestamp('began') 498 return success, msg 499 500 begin = time.perf_counter() 501 while (time.perf_counter() - begin) < timeout: 502 if self.status == 'running': 503 self._capture_process_timestamp('began') 504 return True, "Success" 505 time.sleep(check_timeout_interval) 506 507 return False, ( 508 f"Failed to start daemon '{self.daemon_id}' within {timeout} second" 509 + ('s' if timeout != 1 else '') + '.' 510 ) 511 512 513 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 514 """ 515 Forcibly terminate a running daemon. 516 Sends a SIGTERM signal to the process. 517 518 Parameters 519 ---------- 520 timeout: Optional[int], default 3 521 How many seconds to wait for the process to terminate. 522 523 Returns 524 ------- 525 A SuccessTuple indicating success. 526 """ 527 if self.status != 'paused': 528 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 529 if success: 530 self._write_stop_file('kill') 531 self.stdin_file.close() 532 self._remove_blocking_stdin_file() 533 return success, msg 534 535 if self.status == 'stopped': 536 self._write_stop_file('kill') 537 self.stdin_file.close() 538 self._remove_blocking_stdin_file() 539 return True, "Process has already stopped." 540 541 psutil = attempt_import('psutil') 542 process = self.process 543 try: 544 process.terminate() 545 process.kill() 546 process.wait(timeout=timeout) 547 except Exception as e: 548 return False, f"Failed to kill job {self} ({process}) with exception: {e}" 549 550 try: 551 if process.status(): 552 return False, "Failed to stop daemon '{self}' ({process})." 553 except psutil.NoSuchProcess: 554 pass 555 556 if self.pid_path.exists(): 557 try: 558 self.pid_path.unlink() 559 except Exception: 560 pass 561 562 self._write_stop_file('kill') 563 self.stdin_file.close() 564 self._remove_blocking_stdin_file() 565 return True, "Success" 566 567 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 568 """Gracefully quit a running daemon.""" 569 if self.status == 'paused': 570 return self.kill(timeout) 571 572 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 573 if signal_success: 574 self._write_stop_file('quit') 575 self.stdin_file.close() 576 self._remove_blocking_stdin_file() 577 return signal_success, signal_msg 578 579 def pause( 580 self, 581 timeout: Union[int, float, None] = None, 582 check_timeout_interval: Union[float, int, None] = None, 583 ) -> SuccessTuple: 584 """ 585 Pause the daemon if it is running. 586 587 Parameters 588 ---------- 589 timeout: Union[float, int, None], default None 590 The maximum number of seconds to wait for a process to suspend. 591 592 check_timeout_interval: Union[float, int, None], default None 593 The number of seconds to wait between checking if the process is still running. 594 595 Returns 596 ------- 597 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 598 """ 599 self._remove_blocking_stdin_file() 600 601 if self.process is None: 602 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 603 604 if self.status == 'paused': 605 return True, f"Daemon '{self.daemon_id}' is already paused." 606 607 self._write_stop_file('pause') 608 self.stdin_file.close() 609 self._remove_blocking_stdin_file() 610 try: 611 self.process.suspend() 612 except Exception as e: 613 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 614 615 timeout = self.get_timeout_seconds(timeout) 616 check_timeout_interval = self.get_check_timeout_interval_seconds( 617 check_timeout_interval 618 ) 619 620 psutil = attempt_import('psutil') 621 622 if not timeout: 623 try: 624 success = self.process.status() == 'stopped' 625 except psutil.NoSuchProcess: 626 success = True 627 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 628 if success: 629 self._capture_process_timestamp('paused') 630 return success, msg 631 632 begin = time.perf_counter() 633 while (time.perf_counter() - begin) < timeout: 634 try: 635 if self.process.status() == 'stopped': 636 self._capture_process_timestamp('paused') 637 return True, "Success" 638 except psutil.NoSuchProcess as e: 639 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 640 time.sleep(check_timeout_interval) 641 642 return False, ( 643 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 644 + ('s' if timeout != 1 else '') + '.' 645 ) 646 647 def resume( 648 self, 649 timeout: Union[int, float, None] = None, 650 check_timeout_interval: Union[float, int, None] = None, 651 ) -> SuccessTuple: 652 """ 653 Resume the daemon if it is paused. 654 655 Parameters 656 ---------- 657 timeout: Union[float, int, None], default None 658 The maximum number of seconds to wait for a process to resume. 659 660 check_timeout_interval: Union[float, int, None], default None 661 The number of seconds to wait between checking if the process is still stopped. 662 663 Returns 664 ------- 665 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 666 """ 667 if self.status == 'running': 668 return True, f"Daemon '{self.daemon_id}' is already running." 669 670 if self.status == 'stopped': 671 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 672 673 self._remove_stop_file() 674 try: 675 if self.process is None: 676 return False, f"Cannot resume daemon '{self.daemon_id}'." 677 678 self.process.resume() 679 except Exception as e: 680 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 681 682 timeout = self.get_timeout_seconds(timeout) 683 check_timeout_interval = self.get_check_timeout_interval_seconds( 684 check_timeout_interval 685 ) 686 687 if not timeout: 688 success = self.status == 'running' 689 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 690 if success: 691 self._capture_process_timestamp('began') 692 return success, msg 693 694 begin = time.perf_counter() 695 while (time.perf_counter() - begin) < timeout: 696 if self.status == 'running': 697 self._capture_process_timestamp('began') 698 return True, "Success" 699 time.sleep(check_timeout_interval) 700 701 return False, ( 702 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 703 + ('s' if timeout != 1 else '') + '.' 704 ) 705 706 def _write_stop_file(self, action: str) -> SuccessTuple: 707 """Write the stop file timestamp and action.""" 708 if action not in ('quit', 'kill', 'pause'): 709 return False, f"Unsupported action '{action}'." 710 711 if not self.stop_path.parent.exists(): 712 self.stop_path.parent.mkdir(parents=True, exist_ok=True) 713 714 with open(self.stop_path, 'w+', encoding='utf-8') as f: 715 json.dump( 716 { 717 'stop_time': datetime.now(timezone.utc).isoformat(), 718 'action': action, 719 }, 720 f 721 ) 722 723 return True, "Success" 724 725 def _remove_stop_file(self) -> SuccessTuple: 726 """Remove the stop file""" 727 if not self.stop_path.exists(): 728 return True, "Stop file does not exist." 729 730 try: 731 self.stop_path.unlink() 732 except Exception as e: 733 return False, f"Failed to remove stop file:\n{e}" 734 735 return True, "Success" 736 737 def _read_stop_file(self) -> Dict[str, Any]: 738 """ 739 Read the stop file if it exists. 740 """ 741 if not self.stop_path.exists(): 742 return {} 743 744 try: 745 with open(self.stop_path, 'r', encoding='utf-8') as f: 746 data = json.load(f) 747 return data 748 except Exception: 749 return {} 750 751 def _remove_blocking_stdin_file(self) -> mrsm.SuccessTuple: 752 """ 753 Remove the blocking STDIN file if it exists. 754 """ 755 try: 756 if self.blocking_stdin_file_path.exists(): 757 self.blocking_stdin_file_path.unlink() 758 except Exception as e: 759 return False, str(e) 760 761 return True, "Success" 762 763 def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None: 764 """ 765 Handle `SIGTERM` within the `Daemon` context. 766 This method is injected into the `DaemonContext`. 767 """ 768 from meerschaum.utils.process import signal_handler 769 signal_handler(signal_number, stack_frame) 770 771 timer = self.__dict__.get('_log_refresh_timer', None) 772 if timer is not None: 773 timer.cancel() 774 775 daemon_context = self.__dict__.get('_daemon_context', None) 776 if daemon_context is not None: 777 daemon_context.close() 778 779 _close_pools() 780 raise SystemExit(0) 781 782 def _send_signal( 783 self, 784 signal_to_send, 785 timeout: Union[float, int, None] = None, 786 check_timeout_interval: Union[float, int, None] = None, 787 ) -> SuccessTuple: 788 """Send a signal to the daemon process. 789 790 Parameters 791 ---------- 792 signal_to_send: 793 The signal the send to the daemon, e.g. `signals.SIGINT`. 794 795 timeout: Union[float, int, None], default None 796 The maximum number of seconds to wait for a process to terminate. 797 798 check_timeout_interval: Union[float, int, None], default None 799 The number of seconds to wait between checking if the process is still running. 800 801 Returns 802 ------- 803 A SuccessTuple indicating success. 804 """ 805 try: 806 pid = self.pid 807 if pid is None: 808 return ( 809 False, 810 f"Daemon '{self.daemon_id}' is not running, " 811 + f"cannot send signal '{signal_to_send}'." 812 ) 813 814 os.kill(pid, signal_to_send) 815 except Exception: 816 return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}" 817 818 timeout = self.get_timeout_seconds(timeout) 819 check_timeout_interval = self.get_check_timeout_interval_seconds( 820 check_timeout_interval 821 ) 822 823 if not timeout: 824 return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'." 825 826 begin = time.perf_counter() 827 while (time.perf_counter() - begin) < timeout: 828 if not self.status == 'running': 829 return True, "Success" 830 time.sleep(check_timeout_interval) 831 832 return False, ( 833 f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second" 834 + ('s' if timeout != 1 else '') + '.' 835 ) 836 837 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 838 """Create the Daemon's directory. 839 If `allow_dirty_run` is `False` and the directory already exists, 840 raise a `FileExistsError`. 841 """ 842 try: 843 self.path.mkdir(parents=True, exist_ok=True) 844 _already_exists = any(os.scandir(self.path)) 845 except FileExistsError: 846 _already_exists = True 847 848 if _already_exists and not allow_dirty_run: 849 error( 850 f"Daemon '{self.daemon_id}' already exists. " + 851 "To allow this daemon to run, do one of the following:\n" 852 + " - Execute `daemon.cleanup()`.\n" 853 + f" - Delete the directory '{self.path}'.\n" 854 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 855 FileExistsError, 856 ) 857 858 @property 859 def process(self) -> Union['psutil.Process', None]: 860 """ 861 Return the psutil process for the Daemon. 862 """ 863 psutil = attempt_import('psutil') 864 pid = self.pid 865 if pid is None: 866 return None 867 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 868 try: 869 self._process = psutil.Process(int(pid)) 870 process_exists = True 871 except Exception: 872 process_exists = False 873 if not process_exists: 874 _ = self.__dict__.pop('_process', None) 875 try: 876 if self.pid_path.exists(): 877 self.pid_path.unlink() 878 except Exception: 879 pass 880 return None 881 return self._process 882 883 @property 884 def status(self) -> str: 885 """ 886 Return the running status of this Daemon. 887 """ 888 if self.process is None: 889 return 'stopped' 890 891 psutil = attempt_import('psutil', lazy=False) 892 try: 893 if self.process.status() == 'stopped': 894 return 'paused' 895 if self.process.status() == 'zombie': 896 raise psutil.NoSuchProcess(self.process.pid) 897 except (psutil.NoSuchProcess, AttributeError): 898 if self.pid_path.exists(): 899 try: 900 self.pid_path.unlink() 901 except Exception: 902 pass 903 return 'stopped' 904 905 return 'running' 906 907 @classmethod 908 def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 909 """ 910 Return a Daemon's path from its `daemon_id`. 911 """ 912 from meerschaum.config.paths import DAEMON_RESOURCES_PATH 913 return DAEMON_RESOURCES_PATH / daemon_id 914 915 @property 916 def path(self) -> pathlib.Path: 917 """ 918 Return the path for this Daemon's directory. 919 """ 920 return self._get_path_from_daemon_id(self.daemon_id) 921 922 @classmethod 923 def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 924 """ 925 Return the `properties.json` path for a given `daemon_id`. 926 """ 927 return cls._get_path_from_daemon_id(daemon_id) / 'properties.json' 928 929 @property 930 def properties_path(self) -> pathlib.Path: 931 """ 932 Return the `propterties.json` path for this Daemon. 933 """ 934 return self._get_properties_path_from_daemon_id(self.daemon_id) 935 936 @property 937 def stop_path(self) -> pathlib.Path: 938 """ 939 Return the path for the stop file (created when manually stopped). 940 """ 941 return self.path / '.stop.json' 942 943 @property 944 def log_path(self) -> pathlib.Path: 945 """ 946 Return the log path. 947 """ 948 logs_cf = self.properties.get('logs', None) or {} 949 if 'path' not in logs_cf: 950 from meerschaum.config.paths import LOGS_RESOURCES_PATH 951 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 952 953 return pathlib.Path(logs_cf['path']) 954 955 @property 956 def stdin_file_path(self) -> pathlib.Path: 957 """ 958 Return the stdin file path. 959 """ 960 return self.path / 'input.stdin' 961 962 @property 963 def blocking_stdin_file_path(self) -> pathlib.Path: 964 """ 965 Return the stdin file path. 966 """ 967 if '_blocking_stdin_file_path' in self.__dict__: 968 return self._blocking_stdin_file_path 969 970 return self.path / 'input.stdin.block' 971 972 @property 973 def prompt_kwargs_file_path(self) -> pathlib.Path: 974 """ 975 Return the file path to the kwargs for the invoking `prompt()`. 976 """ 977 return self.path / 'prompt_kwargs.json' 978 979 @property 980 def log_offset_path(self) -> pathlib.Path: 981 """ 982 Return the log offset file path. 983 """ 984 from meerschaum.config.paths import LOGS_RESOURCES_PATH 985 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset') 986 987 @property 988 def log_offset_lock(self) -> 'fasteners.InterProcessLock': 989 """ 990 Return the process lock context manager. 991 """ 992 if '_log_offset_lock' in self.__dict__: 993 return self._log_offset_lock 994 995 fasteners = attempt_import('fasteners') 996 self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path) 997 return self._log_offset_lock 998 999 @property 1000 def rotating_log(self) -> RotatingFile: 1001 """ 1002 The rotating log file for the daemon's output. 1003 """ 1004 if '_rotating_log' in self.__dict__: 1005 return self._rotating_log 1006 1007 logs_cf = self.properties.get('logs', None) or {} 1008 write_timestamps = logs_cf.get('write_timestamps', None) 1009 if write_timestamps is None: 1010 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1011 1012 timestamp_format = logs_cf.get('timestamp_format', None) 1013 if timestamp_format is None: 1014 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1015 1016 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1017 if num_files_to_keep is None: 1018 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1019 1020 max_file_size = logs_cf.get('max_file_size', None) 1021 if max_file_size is None: 1022 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1023 1024 redirect_streams = logs_cf.get('redirect_streams', True) 1025 1026 self._rotating_log = RotatingFile( 1027 self.log_path, 1028 redirect_streams=redirect_streams, 1029 write_timestamps=write_timestamps, 1030 timestamp_format=timestamp_format, 1031 num_files_to_keep=num_files_to_keep, 1032 max_file_size=max_file_size, 1033 ) 1034 return self._rotating_log 1035 1036 @property 1037 def stdin_file(self): 1038 """ 1039 Return the file handler for the stdin file. 1040 """ 1041 if (_stdin_file := self.__dict__.get('_stdin_file', None)): 1042 return _stdin_file 1043 1044 self._stdin_file = StdinFile( 1045 self.stdin_file_path, 1046 lock_file_path=self.blocking_stdin_file_path, 1047 ) 1048 return self._stdin_file 1049 1050 @property 1051 def log_text(self) -> Union[str, None]: 1052 """ 1053 Read the log files and return their contents. 1054 Returns `None` if the log file does not exist. 1055 """ 1056 logs_cf = self.properties.get('logs', None) or {} 1057 write_timestamps = logs_cf.get('write_timestamps', None) 1058 if write_timestamps is None: 1059 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1060 1061 timestamp_format = logs_cf.get('timestamp_format', None) 1062 if timestamp_format is None: 1063 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1064 1065 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1066 if num_files_to_keep is None: 1067 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1068 1069 max_file_size = logs_cf.get('max_file_size', None) 1070 if max_file_size is None: 1071 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1072 1073 new_rotating_log = RotatingFile( 1074 self.rotating_log.file_path, 1075 num_files_to_keep=num_files_to_keep, 1076 max_file_size=max_file_size, 1077 write_timestamps=write_timestamps, 1078 timestamp_format=timestamp_format, 1079 ) 1080 return new_rotating_log.read() 1081 1082 def readlines(self) -> List[str]: 1083 """ 1084 Read the next log lines, persisting the cursor for later use. 1085 Note this will alter the cursor of `self.rotating_log`. 1086 """ 1087 self.rotating_log._cursor = self._read_log_offset() 1088 lines = self.rotating_log.readlines() 1089 self._write_log_offset() 1090 return lines 1091 1092 def _read_log_offset(self) -> Tuple[int, int]: 1093 """ 1094 Return the current log offset cursor. 1095 1096 Returns 1097 ------- 1098 A tuple of the form (`subfile_index`, `position`). 1099 """ 1100 if not self.log_offset_path.exists(): 1101 return 0, 0 1102 1103 try: 1104 with open(self.log_offset_path, 'r', encoding='utf-8') as f: 1105 cursor_text = f.read() 1106 cursor_parts = cursor_text.split(' ') 1107 subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1]) 1108 return subfile_index, subfile_position 1109 except Exception as e: 1110 warn(f"Failed to read cursor:\n{e}") 1111 return 0, 0 1112 1113 def _write_log_offset(self) -> None: 1114 """ 1115 Write the current log offset file. 1116 """ 1117 with self.log_offset_lock: 1118 with open(self.log_offset_path, 'w+', encoding='utf-8') as f: 1119 subfile_index = self.rotating_log._cursor[0] 1120 subfile_position = self.rotating_log._cursor[1] 1121 f.write(f"{subfile_index} {subfile_position}") 1122 1123 @property 1124 def pid(self) -> Union[int, None]: 1125 """ 1126 Read the PID file and return its contents. 1127 Returns `None` if the PID file does not exist. 1128 """ 1129 if not self.pid_path.exists(): 1130 return None 1131 try: 1132 with open(self.pid_path, 'r', encoding='utf-8') as f: 1133 text = f.read() 1134 if len(text) == 0: 1135 return None 1136 pid = int(text.rstrip()) 1137 except Exception as e: 1138 warn(e) 1139 text = None 1140 pid = None 1141 return pid 1142 1143 @property 1144 def pid_path(self) -> pathlib.Path: 1145 """ 1146 Return the path to a file containing the PID for this Daemon. 1147 """ 1148 return self.path / 'process.pid' 1149 1150 @property 1151 def pid_lock(self) -> 'fasteners.InterProcessLock': 1152 """ 1153 Return the process lock context manager. 1154 """ 1155 if '_pid_lock' in self.__dict__: 1156 return self._pid_lock 1157 1158 fasteners = attempt_import('fasteners') 1159 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 1160 return self._pid_lock 1161 1162 @property 1163 def pickle_path(self) -> pathlib.Path: 1164 """ 1165 Return the path for the pickle file. 1166 """ 1167 return self.path / 'pickle.pkl' 1168 1169 def read_properties(self) -> Optional[Dict[str, Any]]: 1170 """Read the properties JSON file and return the dictionary.""" 1171 if not self.properties_path.exists(): 1172 return None 1173 try: 1174 with open(self.properties_path, 'r', encoding='utf-8') as file: 1175 properties = json.load(file) 1176 except Exception: 1177 properties = {} 1178 1179 return properties or {} 1180 1181 def read_pickle(self) -> Daemon: 1182 """Read a Daemon's pickle file and return the `Daemon`.""" 1183 import pickle 1184 import traceback 1185 if not self.pickle_path.exists(): 1186 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1187 1188 if self.pickle_path.stat().st_size == 0: 1189 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1190 1191 try: 1192 with open(self.pickle_path, 'rb') as pickle_file: 1193 daemon = pickle.load(pickle_file) 1194 success, msg = True, 'Success' 1195 except Exception as e: 1196 success, msg = False, str(e) 1197 daemon = None 1198 traceback.print_exception(type(e), e, e.__traceback__) 1199 if not success: 1200 error(msg) 1201 return daemon 1202 1203 @property 1204 def properties(self) -> Dict[str, Any]: 1205 """ 1206 Return the contents of the properties JSON file. 1207 """ 1208 try: 1209 _file_properties = self.read_properties() or {} 1210 except Exception: 1211 traceback.print_exc() 1212 _file_properties = {} 1213 1214 if not self._properties: 1215 self._properties = _file_properties 1216 1217 if self._properties is None: 1218 self._properties = {} 1219 1220 if ( 1221 self._properties.get('result', None) is None 1222 and _file_properties.get('result', None) is not None 1223 ): 1224 _ = self._properties.pop('result', None) 1225 1226 if _file_properties is not None: 1227 self._properties = apply_patch_to_config( 1228 _file_properties, 1229 self._properties, 1230 ) 1231 1232 return self._properties 1233 1234 @property 1235 def hidden(self) -> bool: 1236 """ 1237 Return a bool indicating whether this Daemon should be displayed. 1238 """ 1239 return self.daemon_id.startswith('_') or self.daemon_id.startswith('.') 1240 1241 def write_properties(self) -> SuccessTuple: 1242 """Write the properties dictionary to the properties JSON file 1243 (only if self.properties exists). 1244 """ 1245 from meerschaum.utils.misc import generate_password 1246 success, msg = ( 1247 False, 1248 f"No properties to write for daemon '{self.daemon_id}'." 1249 ) 1250 backup_path = self.properties_path.parent / (generate_password(8) + '.json') 1251 props = self.properties 1252 if props is not None: 1253 try: 1254 self.path.mkdir(parents=True, exist_ok=True) 1255 if self.properties_path.exists(): 1256 self.properties_path.rename(backup_path) 1257 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1258 json.dump(props, properties_file) 1259 success, msg = True, 'Success' 1260 except Exception as e: 1261 success, msg = False, str(e) 1262 1263 try: 1264 if backup_path.exists(): 1265 if not success: 1266 backup_path.rename(self.properties_path) 1267 else: 1268 backup_path.unlink() 1269 except Exception as e: 1270 success, msg = False, str(e) 1271 1272 return success, msg 1273 1274 def write_pickle(self) -> SuccessTuple: 1275 """Write the pickle file for the daemon.""" 1276 import pickle 1277 import traceback 1278 from meerschaum.utils.misc import generate_password 1279 1280 if not self.pickle: 1281 return True, "Success" 1282 1283 backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl') 1284 try: 1285 self.path.mkdir(parents=True, exist_ok=True) 1286 if self.pickle_path.exists(): 1287 self.pickle_path.rename(backup_path) 1288 with open(self.pickle_path, 'wb+') as pickle_file: 1289 pickle.dump(self, pickle_file) 1290 success, msg = True, "Success" 1291 except Exception as e: 1292 success, msg = False, str(e) 1293 traceback.print_exception(type(e), e, e.__traceback__) 1294 try: 1295 if backup_path.exists(): 1296 if not success: 1297 backup_path.rename(self.pickle_path) 1298 else: 1299 backup_path.unlink() 1300 except Exception as e: 1301 success, msg = False, str(e) 1302 return success, msg 1303 1304 1305 def _setup( 1306 self, 1307 allow_dirty_run: bool = False, 1308 ) -> None: 1309 """ 1310 Update properties before starting the Daemon. 1311 """ 1312 if self.properties is None: 1313 self._properties = {} 1314 1315 self._properties.update({ 1316 'target': { 1317 'name': self.target.__name__, 1318 'module': self.target.__module__, 1319 'args': self.target_args, 1320 'kw': self.target_kw, 1321 }, 1322 }) 1323 self.mkdir_if_not_exists(allow_dirty_run) 1324 _write_properties_success_tuple = self.write_properties() 1325 if not _write_properties_success_tuple[0]: 1326 error(_write_properties_success_tuple[1]) 1327 1328 _write_pickle_success_tuple = self.write_pickle() 1329 if not _write_pickle_success_tuple[0]: 1330 error(_write_pickle_success_tuple[1]) 1331 1332 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1333 """ 1334 Remove a daemon's directory after execution. 1335 1336 Parameters 1337 ---------- 1338 keep_logs: bool, default False 1339 If `True`, skip deleting the daemon's log files. 1340 1341 Returns 1342 ------- 1343 A `SuccessTuple` indicating success. 1344 """ 1345 if self.path.exists(): 1346 try: 1347 shutil.rmtree(self.path) 1348 except Exception as e: 1349 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1350 warn(msg) 1351 return False, msg 1352 if not keep_logs: 1353 self.rotating_log.delete() 1354 try: 1355 if self.log_offset_path.exists(): 1356 self.log_offset_path.unlink() 1357 except Exception as e: 1358 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1359 warn(msg) 1360 return False, msg 1361 return True, "Success" 1362 1363 1364 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1365 """ 1366 Return the timeout value to use. Use `--timeout-seconds` if provided, 1367 else the configured default (8). 1368 """ 1369 if isinstance(timeout, (int, float)): 1370 return timeout 1371 return get_config('jobs', 'timeout_seconds') 1372 1373 1374 def get_check_timeout_interval_seconds( 1375 self, 1376 check_timeout_interval: Union[int, float, None] = None, 1377 ) -> Union[int, float]: 1378 """ 1379 Return the interval value to check the status of timeouts. 1380 """ 1381 if isinstance(check_timeout_interval, (int, float)): 1382 return check_timeout_interval 1383 return get_config('jobs', 'check_timeout_interval_seconds') 1384 1385 @property 1386 def target_args(self) -> Union[Tuple[Any], None]: 1387 """ 1388 Return the positional arguments to pass to the target function. 1389 """ 1390 target_args = ( 1391 self.__dict__.get('_target_args', None) 1392 or self.properties.get('target', {}).get('args', None) 1393 ) 1394 if target_args is None: 1395 return tuple([]) 1396 1397 return tuple(target_args) 1398 1399 @property 1400 def target_kw(self) -> Union[Dict[str, Any], None]: 1401 """ 1402 Return the keyword arguments to pass to the target function. 1403 """ 1404 target_kw = ( 1405 self.__dict__.get('_target_kw', None) 1406 or self.properties.get('target', {}).get('kw', None) 1407 ) 1408 if target_kw is None: 1409 return {} 1410 1411 return {key: val for key, val in target_kw.items()} 1412 1413 def __getstate__(self): 1414 """ 1415 Pickle this Daemon. 1416 """ 1417 dill = attempt_import('dill') 1418 return { 1419 'target': dill.dumps(self.target), 1420 'target_args': self.target_args, 1421 'target_kw': self.target_kw, 1422 'daemon_id': self.daemon_id, 1423 'label': self.label, 1424 'properties': self.properties, 1425 } 1426 1427 def __setstate__(self, _state: Dict[str, Any]): 1428 """ 1429 Restore this Daemon from a pickled state. 1430 If the properties file exists, skip the old pickled version. 1431 """ 1432 dill = attempt_import('dill') 1433 _state['target'] = dill.loads(_state['target']) 1434 self._pickle = True 1435 daemon_id = _state.get('daemon_id', None) 1436 if not daemon_id: 1437 raise ValueError("Need a daemon_id to un-pickle a Daemon.") 1438 1439 properties_path = self._get_properties_path_from_daemon_id(daemon_id) 1440 ignore_properties = properties_path.exists() 1441 if ignore_properties: 1442 _state = { 1443 key: val 1444 for key, val in _state.items() 1445 if key != 'properties' 1446 } 1447 self.__init__(**_state) 1448 1449 1450 def __repr__(self): 1451 return str(self) 1452 1453 def __str__(self): 1454 return self.daemon_id 1455 1456 def __eq__(self, other): 1457 if not isinstance(other, Daemon): 1458 return False 1459 return self.daemon_id == other.daemon_id 1460 1461 def __hash__(self): 1462 return hash(self.daemon_id)
Daemonize Python functions into background processes.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 |
>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |
>>> mrsm.pprint(daemon.properties)
{
'label': 'print',
'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
'result': None,
'process': {'ended': '2024-07-29T18:03:33.752806'}
}
136 def __init__( 137 self, 138 target: Optional[Callable[[Any], Any]] = None, 139 target_args: Union[List[Any], Tuple[Any], None] = None, 140 target_kw: Optional[Dict[str, Any]] = None, 141 env: Optional[Dict[str, str]] = None, 142 daemon_id: Optional[str] = None, 143 label: Optional[str] = None, 144 properties: Optional[Dict[str, Any]] = None, 145 pickle: bool = True, 146 ): 147 """ 148 Parameters 149 ---------- 150 target: Optional[Callable[[Any], Any]], default None, 151 The function to execute in a child process. 152 153 target_args: Union[List[Any], Tuple[Any], None], default None 154 Positional arguments to pass to the target function. 155 156 target_kw: Optional[Dict[str, Any]], default None 157 Keyword arguments to pass to the target function. 158 159 env: Optional[Dict[str, str]], default None 160 If provided, set these environment variables in the daemon process. 161 162 daemon_id: Optional[str], default None 163 Build a `Daemon` from an existing `daemon_id`. 164 If `daemon_id` is provided, other arguments are ignored and are derived 165 from the existing pickled `Daemon`. 166 167 label: Optional[str], default None 168 Label string to help identifiy a daemon. 169 If `None`, use the function name instead. 170 171 properties: Optional[Dict[str, Any]], default None 172 Override reading from the properties JSON by providing an existing dictionary. 173 """ 174 _pickle = self.__dict__.get('_pickle', False) 175 if daemon_id is not None: 176 self.daemon_id = daemon_id 177 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 178 179 if not self.properties_path.exists(): 180 raise Exception( 181 f"Daemon '{self.daemon_id}' does not exist. " 182 + "Pass a target to create a new Daemon." 183 ) 184 185 try: 186 new_daemon = self.from_properties_file(daemon_id) 187 except Exception: 188 new_daemon = None 189 190 if new_daemon is not None: 191 new_daemon.write_pickle() 192 target = new_daemon.target 193 target_args = new_daemon.target_args 194 target_kw = new_daemon.target_kw 195 label = new_daemon.label 196 self._properties = new_daemon.properties 197 else: 198 try: 199 self.properties_path.unlink() 200 except Exception: 201 pass 202 203 raise Exception( 204 f"Could not recover daemon '{self.daemon_id}' " 205 + "from its properties file." 206 ) 207 208 if 'target' not in self.__dict__: 209 if target is None: 210 error("Cannot create a Daemon without a target.") 211 self.target = target 212 213 self.pickle = pickle 214 215 ### NOTE: We have to check self.__dict__ in case we un-pickling. 216 if '_target_args' not in self.__dict__: 217 self._target_args = target_args 218 if '_target_kw' not in self.__dict__: 219 self._target_kw = target_kw 220 221 if 'label' not in self.__dict__: 222 if label is None: 223 label = ( 224 self.target.__name__ if '__name__' in self.target.__dir__() 225 else str(self.target) 226 ) 227 self.label = label 228 elif label is not None: 229 self.label = label 230 231 if 'daemon_id' not in self.__dict__: 232 self.daemon_id = get_new_daemon_name() 233 if '_properties' not in self.__dict__: 234 self._properties = properties 235 elif properties: 236 if self._properties is None: 237 self._properties = {} 238 self._properties.update(properties) 239 if self._properties is None: 240 self._properties = {} 241 242 self._properties.update({'label': self.label}) 243 if env: 244 self._properties.update({'env': env}) 245 246 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 247 _ = self.process
Parameters
- target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
- target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
- target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
- env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
- daemon_id (Optional[str], default None):
Build a
Daemon
from an existingdaemon_id
. Ifdaemon_id
is provided, other arguments are ignored and are derived from the existing pickledDaemon
. - label (Optional[str], default None):
Label string to help identifiy a daemon.
If
None
, use the function name instead. - properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
89 @classmethod 90 def from_properties_file(cls, daemon_id: str) -> Daemon: 91 """ 92 Return a Daemon from a properties dictionary. 93 """ 94 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 95 if not properties_path.exists(): 96 raise OSError(f"Properties file '{properties_path}' does not exist.") 97 98 try: 99 with open(properties_path, 'r', encoding='utf-8') as f: 100 properties = json.load(f) 101 except Exception: 102 properties = {} 103 104 if not properties: 105 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 106 107 daemon_id = properties_path.parent.name 108 target_cf = properties.get('target', {}) 109 target_module_name = target_cf.get('module', None) 110 target_function_name = target_cf.get('name', None) 111 target_args = target_cf.get('args', None) 112 target_kw = target_cf.get('kw', None) 113 label = properties.get('label', None) 114 115 if None in [ 116 target_module_name, 117 target_function_name, 118 target_args, 119 target_kw, 120 ]: 121 raise ValueError("Missing target function information.") 122 123 target_module = importlib.import_module(target_module_name) 124 target_function = getattr(target_module, target_function_name) 125 126 return Daemon( 127 daemon_id=daemon_id, 128 target=target_function, 129 target_args=target_args, 130 target_kw=target_kw, 131 properties=properties, 132 label=label, 133 )
Return a Daemon from a properties dictionary.
425 def run( 426 self, 427 keep_daemon_output: bool = True, 428 allow_dirty_run: bool = False, 429 wait: bool = False, 430 timeout: Union[int, float] = 4, 431 debug: bool = False, 432 ) -> SuccessTuple: 433 """Run the daemon as a child process and continue executing the parent. 434 435 Parameters 436 ---------- 437 keep_daemon_output: bool, default True 438 If `False`, delete the daemon's output directory upon exiting. 439 440 allow_dirty_run: bool, default False 441 If `True`, run the daemon, even if the `daemon_id` directory exists. 442 This option is dangerous because if the same `daemon_id` runs concurrently, 443 the last to finish will overwrite the output of the first. 444 445 wait: bool, default True 446 If `True`, block until `Daemon.status` is running (or the timeout expires). 447 448 timeout: Union[int, float], default 4 449 If `wait` is `True`, block for up to `timeout` seconds before returning a failure. 450 451 Returns 452 ------- 453 A SuccessTuple indicating success. 454 455 """ 456 import platform 457 if platform.system() == 'Windows': 458 return False, "Cannot run background jobs on Windows." 459 460 ### The daemon might exist and be paused. 461 if self.status == 'paused': 462 return self.resume() 463 464 self._remove_stop_file() 465 if self.status == 'running': 466 return True, f"Daemon '{self}' is already running." 467 468 self.mkdir_if_not_exists(allow_dirty_run) 469 _write_pickle_success_tuple = self.write_pickle() 470 if not _write_pickle_success_tuple[0]: 471 return _write_pickle_success_tuple 472 473 _launch_daemon_code = ( 474 "from meerschaum.utils.daemon import Daemon, _daemons; " 475 f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 476 f"_daemons['{self.daemon_id}'] = daemon; " 477 f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 478 "allow_dirty_run=True)" 479 ) 480 env = dict(os.environ) 481 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 482 msg = ( 483 "Success" 484 if _launch_success_bool 485 else f"Failed to start daemon '{self.daemon_id}'." 486 ) 487 if not wait or not _launch_success_bool: 488 return _launch_success_bool, msg 489 490 timeout = self.get_timeout_seconds(timeout) 491 check_timeout_interval = self.get_check_timeout_interval_seconds() 492 493 if not timeout: 494 success = self.status == 'running' 495 msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'." 496 if success: 497 self._capture_process_timestamp('began') 498 return success, msg 499 500 begin = time.perf_counter() 501 while (time.perf_counter() - begin) < timeout: 502 if self.status == 'running': 503 self._capture_process_timestamp('began') 504 return True, "Success" 505 time.sleep(check_timeout_interval) 506 507 return False, ( 508 f"Failed to start daemon '{self.daemon_id}' within {timeout} second" 509 + ('s' if timeout != 1 else '') + '.' 510 )
Run the daemon as a child process and continue executing the parent.
Parameters
- keep_daemon_output (bool, default True):
If
False
, delete the daemon's output directory upon exiting. - allow_dirty_run (bool, default False):
If
True
, run the daemon, even if thedaemon_id
directory exists. This option is dangerous because if the samedaemon_id
runs concurrently, the last to finish will overwrite the output of the first. - wait (bool, default True):
If
True
, block untilDaemon.status
is running (or the timeout expires). - timeout (Union[int, float], default 4):
If
wait
isTrue
, block for up totimeout
seconds before returning a failure.
Returns
- A SuccessTuple indicating success.
513 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 514 """ 515 Forcibly terminate a running daemon. 516 Sends a SIGTERM signal to the process. 517 518 Parameters 519 ---------- 520 timeout: Optional[int], default 3 521 How many seconds to wait for the process to terminate. 522 523 Returns 524 ------- 525 A SuccessTuple indicating success. 526 """ 527 if self.status != 'paused': 528 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 529 if success: 530 self._write_stop_file('kill') 531 self.stdin_file.close() 532 self._remove_blocking_stdin_file() 533 return success, msg 534 535 if self.status == 'stopped': 536 self._write_stop_file('kill') 537 self.stdin_file.close() 538 self._remove_blocking_stdin_file() 539 return True, "Process has already stopped." 540 541 psutil = attempt_import('psutil') 542 process = self.process 543 try: 544 process.terminate() 545 process.kill() 546 process.wait(timeout=timeout) 547 except Exception as e: 548 return False, f"Failed to kill job {self} ({process}) with exception: {e}" 549 550 try: 551 if process.status(): 552 return False, "Failed to stop daemon '{self}' ({process})." 553 except psutil.NoSuchProcess: 554 pass 555 556 if self.pid_path.exists(): 557 try: 558 self.pid_path.unlink() 559 except Exception: 560 pass 561 562 self._write_stop_file('kill') 563 self.stdin_file.close() 564 self._remove_blocking_stdin_file() 565 return True, "Success"
Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.
Parameters
- timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
- A SuccessTuple indicating success.
567 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 568 """Gracefully quit a running daemon.""" 569 if self.status == 'paused': 570 return self.kill(timeout) 571 572 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 573 if signal_success: 574 self._write_stop_file('quit') 575 self.stdin_file.close() 576 self._remove_blocking_stdin_file() 577 return signal_success, signal_msg
Gracefully quit a running daemon.
579 def pause( 580 self, 581 timeout: Union[int, float, None] = None, 582 check_timeout_interval: Union[float, int, None] = None, 583 ) -> SuccessTuple: 584 """ 585 Pause the daemon if it is running. 586 587 Parameters 588 ---------- 589 timeout: Union[float, int, None], default None 590 The maximum number of seconds to wait for a process to suspend. 591 592 check_timeout_interval: Union[float, int, None], default None 593 The number of seconds to wait between checking if the process is still running. 594 595 Returns 596 ------- 597 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 598 """ 599 self._remove_blocking_stdin_file() 600 601 if self.process is None: 602 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 603 604 if self.status == 'paused': 605 return True, f"Daemon '{self.daemon_id}' is already paused." 606 607 self._write_stop_file('pause') 608 self.stdin_file.close() 609 self._remove_blocking_stdin_file() 610 try: 611 self.process.suspend() 612 except Exception as e: 613 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 614 615 timeout = self.get_timeout_seconds(timeout) 616 check_timeout_interval = self.get_check_timeout_interval_seconds( 617 check_timeout_interval 618 ) 619 620 psutil = attempt_import('psutil') 621 622 if not timeout: 623 try: 624 success = self.process.status() == 'stopped' 625 except psutil.NoSuchProcess: 626 success = True 627 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 628 if success: 629 self._capture_process_timestamp('paused') 630 return success, msg 631 632 begin = time.perf_counter() 633 while (time.perf_counter() - begin) < timeout: 634 try: 635 if self.process.status() == 'stopped': 636 self._capture_process_timestamp('paused') 637 return True, "Success" 638 except psutil.NoSuchProcess as e: 639 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 640 time.sleep(check_timeout_interval) 641 642 return False, ( 643 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 644 + ('s' if timeout != 1 else '') + '.' 645 )
Pause the daemon if it is running.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
- A
SuccessTuple
indicating whether theDaemon
process was successfully suspended.
647 def resume( 648 self, 649 timeout: Union[int, float, None] = None, 650 check_timeout_interval: Union[float, int, None] = None, 651 ) -> SuccessTuple: 652 """ 653 Resume the daemon if it is paused. 654 655 Parameters 656 ---------- 657 timeout: Union[float, int, None], default None 658 The maximum number of seconds to wait for a process to resume. 659 660 check_timeout_interval: Union[float, int, None], default None 661 The number of seconds to wait between checking if the process is still stopped. 662 663 Returns 664 ------- 665 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 666 """ 667 if self.status == 'running': 668 return True, f"Daemon '{self.daemon_id}' is already running." 669 670 if self.status == 'stopped': 671 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 672 673 self._remove_stop_file() 674 try: 675 if self.process is None: 676 return False, f"Cannot resume daemon '{self.daemon_id}'." 677 678 self.process.resume() 679 except Exception as e: 680 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 681 682 timeout = self.get_timeout_seconds(timeout) 683 check_timeout_interval = self.get_check_timeout_interval_seconds( 684 check_timeout_interval 685 ) 686 687 if not timeout: 688 success = self.status == 'running' 689 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 690 if success: 691 self._capture_process_timestamp('began') 692 return success, msg 693 694 begin = time.perf_counter() 695 while (time.perf_counter() - begin) < timeout: 696 if self.status == 'running': 697 self._capture_process_timestamp('began') 698 return True, "Success" 699 time.sleep(check_timeout_interval) 700 701 return False, ( 702 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 703 + ('s' if timeout != 1 else '') + '.' 704 )
Resume the daemon if it is paused.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
- A
SuccessTuple
indicating whether theDaemon
process was successfully resumed.
837 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 838 """Create the Daemon's directory. 839 If `allow_dirty_run` is `False` and the directory already exists, 840 raise a `FileExistsError`. 841 """ 842 try: 843 self.path.mkdir(parents=True, exist_ok=True) 844 _already_exists = any(os.scandir(self.path)) 845 except FileExistsError: 846 _already_exists = True 847 848 if _already_exists and not allow_dirty_run: 849 error( 850 f"Daemon '{self.daemon_id}' already exists. " + 851 "To allow this daemon to run, do one of the following:\n" 852 + " - Execute `daemon.cleanup()`.\n" 853 + f" - Delete the directory '{self.path}'.\n" 854 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 855 FileExistsError, 856 )
Create the Daemon's directory.
If allow_dirty_run
is False
and the directory already exists,
raise a FileExistsError
.
858 @property 859 def process(self) -> Union['psutil.Process', None]: 860 """ 861 Return the psutil process for the Daemon. 862 """ 863 psutil = attempt_import('psutil') 864 pid = self.pid 865 if pid is None: 866 return None 867 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 868 try: 869 self._process = psutil.Process(int(pid)) 870 process_exists = True 871 except Exception: 872 process_exists = False 873 if not process_exists: 874 _ = self.__dict__.pop('_process', None) 875 try: 876 if self.pid_path.exists(): 877 self.pid_path.unlink() 878 except Exception: 879 pass 880 return None 881 return self._process
Return the psutil process for the Daemon.
883 @property 884 def status(self) -> str: 885 """ 886 Return the running status of this Daemon. 887 """ 888 if self.process is None: 889 return 'stopped' 890 891 psutil = attempt_import('psutil', lazy=False) 892 try: 893 if self.process.status() == 'stopped': 894 return 'paused' 895 if self.process.status() == 'zombie': 896 raise psutil.NoSuchProcess(self.process.pid) 897 except (psutil.NoSuchProcess, AttributeError): 898 if self.pid_path.exists(): 899 try: 900 self.pid_path.unlink() 901 except Exception: 902 pass 903 return 'stopped' 904 905 return 'running'
Return the running status of this Daemon.
915 @property 916 def path(self) -> pathlib.Path: 917 """ 918 Return the path for this Daemon's directory. 919 """ 920 return self._get_path_from_daemon_id(self.daemon_id)
Return the path for this Daemon's directory.
929 @property 930 def properties_path(self) -> pathlib.Path: 931 """ 932 Return the `propterties.json` path for this Daemon. 933 """ 934 return self._get_properties_path_from_daemon_id(self.daemon_id)
Return the propterties.json
path for this Daemon.
936 @property 937 def stop_path(self) -> pathlib.Path: 938 """ 939 Return the path for the stop file (created when manually stopped). 940 """ 941 return self.path / '.stop.json'
Return the path for the stop file (created when manually stopped).
943 @property 944 def log_path(self) -> pathlib.Path: 945 """ 946 Return the log path. 947 """ 948 logs_cf = self.properties.get('logs', None) or {} 949 if 'path' not in logs_cf: 950 from meerschaum.config.paths import LOGS_RESOURCES_PATH 951 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 952 953 return pathlib.Path(logs_cf['path'])
Return the log path.
955 @property 956 def stdin_file_path(self) -> pathlib.Path: 957 """ 958 Return the stdin file path. 959 """ 960 return self.path / 'input.stdin'
Return the stdin file path.
962 @property 963 def blocking_stdin_file_path(self) -> pathlib.Path: 964 """ 965 Return the stdin file path. 966 """ 967 if '_blocking_stdin_file_path' in self.__dict__: 968 return self._blocking_stdin_file_path 969 970 return self.path / 'input.stdin.block'
Return the stdin file path.
972 @property 973 def prompt_kwargs_file_path(self) -> pathlib.Path: 974 """ 975 Return the file path to the kwargs for the invoking `prompt()`. 976 """ 977 return self.path / 'prompt_kwargs.json'
Return the file path to the kwargs for the invoking prompt()
.
979 @property 980 def log_offset_path(self) -> pathlib.Path: 981 """ 982 Return the log offset file path. 983 """ 984 from meerschaum.config.paths import LOGS_RESOURCES_PATH 985 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
Return the log offset file path.
987 @property 988 def log_offset_lock(self) -> 'fasteners.InterProcessLock': 989 """ 990 Return the process lock context manager. 991 """ 992 if '_log_offset_lock' in self.__dict__: 993 return self._log_offset_lock 994 995 fasteners = attempt_import('fasteners') 996 self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path) 997 return self._log_offset_lock
Return the process lock context manager.
999 @property 1000 def rotating_log(self) -> RotatingFile: 1001 """ 1002 The rotating log file for the daemon's output. 1003 """ 1004 if '_rotating_log' in self.__dict__: 1005 return self._rotating_log 1006 1007 logs_cf = self.properties.get('logs', None) or {} 1008 write_timestamps = logs_cf.get('write_timestamps', None) 1009 if write_timestamps is None: 1010 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1011 1012 timestamp_format = logs_cf.get('timestamp_format', None) 1013 if timestamp_format is None: 1014 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1015 1016 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1017 if num_files_to_keep is None: 1018 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1019 1020 max_file_size = logs_cf.get('max_file_size', None) 1021 if max_file_size is None: 1022 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1023 1024 redirect_streams = logs_cf.get('redirect_streams', True) 1025 1026 self._rotating_log = RotatingFile( 1027 self.log_path, 1028 redirect_streams=redirect_streams, 1029 write_timestamps=write_timestamps, 1030 timestamp_format=timestamp_format, 1031 num_files_to_keep=num_files_to_keep, 1032 max_file_size=max_file_size, 1033 ) 1034 return self._rotating_log
The rotating log file for the daemon's output.
1036 @property 1037 def stdin_file(self): 1038 """ 1039 Return the file handler for the stdin file. 1040 """ 1041 if (_stdin_file := self.__dict__.get('_stdin_file', None)): 1042 return _stdin_file 1043 1044 self._stdin_file = StdinFile( 1045 self.stdin_file_path, 1046 lock_file_path=self.blocking_stdin_file_path, 1047 ) 1048 return self._stdin_file
Return the file handler for the stdin file.
1050 @property 1051 def log_text(self) -> Union[str, None]: 1052 """ 1053 Read the log files and return their contents. 1054 Returns `None` if the log file does not exist. 1055 """ 1056 logs_cf = self.properties.get('logs', None) or {} 1057 write_timestamps = logs_cf.get('write_timestamps', None) 1058 if write_timestamps is None: 1059 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1060 1061 timestamp_format = logs_cf.get('timestamp_format', None) 1062 if timestamp_format is None: 1063 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1064 1065 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1066 if num_files_to_keep is None: 1067 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1068 1069 max_file_size = logs_cf.get('max_file_size', None) 1070 if max_file_size is None: 1071 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1072 1073 new_rotating_log = RotatingFile( 1074 self.rotating_log.file_path, 1075 num_files_to_keep=num_files_to_keep, 1076 max_file_size=max_file_size, 1077 write_timestamps=write_timestamps, 1078 timestamp_format=timestamp_format, 1079 ) 1080 return new_rotating_log.read()
Read the log files and return their contents.
Returns None
if the log file does not exist.
1082 def readlines(self) -> List[str]: 1083 """ 1084 Read the next log lines, persisting the cursor for later use. 1085 Note this will alter the cursor of `self.rotating_log`. 1086 """ 1087 self.rotating_log._cursor = self._read_log_offset() 1088 lines = self.rotating_log.readlines() 1089 self._write_log_offset() 1090 return lines
Read the next log lines, persisting the cursor for later use.
Note this will alter the cursor of self.rotating_log
.
1123 @property 1124 def pid(self) -> Union[int, None]: 1125 """ 1126 Read the PID file and return its contents. 1127 Returns `None` if the PID file does not exist. 1128 """ 1129 if not self.pid_path.exists(): 1130 return None 1131 try: 1132 with open(self.pid_path, 'r', encoding='utf-8') as f: 1133 text = f.read() 1134 if len(text) == 0: 1135 return None 1136 pid = int(text.rstrip()) 1137 except Exception as e: 1138 warn(e) 1139 text = None 1140 pid = None 1141 return pid
Read the PID file and return its contents.
Returns None
if the PID file does not exist.
1143 @property 1144 def pid_path(self) -> pathlib.Path: 1145 """ 1146 Return the path to a file containing the PID for this Daemon. 1147 """ 1148 return self.path / 'process.pid'
Return the path to a file containing the PID for this Daemon.
1150 @property 1151 def pid_lock(self) -> 'fasteners.InterProcessLock': 1152 """ 1153 Return the process lock context manager. 1154 """ 1155 if '_pid_lock' in self.__dict__: 1156 return self._pid_lock 1157 1158 fasteners = attempt_import('fasteners') 1159 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 1160 return self._pid_lock
Return the process lock context manager.
1162 @property 1163 def pickle_path(self) -> pathlib.Path: 1164 """ 1165 Return the path for the pickle file. 1166 """ 1167 return self.path / 'pickle.pkl'
Return the path for the pickle file.
1169 def read_properties(self) -> Optional[Dict[str, Any]]: 1170 """Read the properties JSON file and return the dictionary.""" 1171 if not self.properties_path.exists(): 1172 return None 1173 try: 1174 with open(self.properties_path, 'r', encoding='utf-8') as file: 1175 properties = json.load(file) 1176 except Exception: 1177 properties = {} 1178 1179 return properties or {}
Read the properties JSON file and return the dictionary.
1181 def read_pickle(self) -> Daemon: 1182 """Read a Daemon's pickle file and return the `Daemon`.""" 1183 import pickle 1184 import traceback 1185 if not self.pickle_path.exists(): 1186 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1187 1188 if self.pickle_path.stat().st_size == 0: 1189 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1190 1191 try: 1192 with open(self.pickle_path, 'rb') as pickle_file: 1193 daemon = pickle.load(pickle_file) 1194 success, msg = True, 'Success' 1195 except Exception as e: 1196 success, msg = False, str(e) 1197 daemon = None 1198 traceback.print_exception(type(e), e, e.__traceback__) 1199 if not success: 1200 error(msg) 1201 return daemon
Read a Daemon's pickle file and return the Daemon
.
1203 @property 1204 def properties(self) -> Dict[str, Any]: 1205 """ 1206 Return the contents of the properties JSON file. 1207 """ 1208 try: 1209 _file_properties = self.read_properties() or {} 1210 except Exception: 1211 traceback.print_exc() 1212 _file_properties = {} 1213 1214 if not self._properties: 1215 self._properties = _file_properties 1216 1217 if self._properties is None: 1218 self._properties = {} 1219 1220 if ( 1221 self._properties.get('result', None) is None 1222 and _file_properties.get('result', None) is not None 1223 ): 1224 _ = self._properties.pop('result', None) 1225 1226 if _file_properties is not None: 1227 self._properties = apply_patch_to_config( 1228 _file_properties, 1229 self._properties, 1230 ) 1231 1232 return self._properties
Return the contents of the properties JSON file.
1241 def write_properties(self) -> SuccessTuple: 1242 """Write the properties dictionary to the properties JSON file 1243 (only if self.properties exists). 1244 """ 1245 from meerschaum.utils.misc import generate_password 1246 success, msg = ( 1247 False, 1248 f"No properties to write for daemon '{self.daemon_id}'." 1249 ) 1250 backup_path = self.properties_path.parent / (generate_password(8) + '.json') 1251 props = self.properties 1252 if props is not None: 1253 try: 1254 self.path.mkdir(parents=True, exist_ok=True) 1255 if self.properties_path.exists(): 1256 self.properties_path.rename(backup_path) 1257 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1258 json.dump(props, properties_file) 1259 success, msg = True, 'Success' 1260 except Exception as e: 1261 success, msg = False, str(e) 1262 1263 try: 1264 if backup_path.exists(): 1265 if not success: 1266 backup_path.rename(self.properties_path) 1267 else: 1268 backup_path.unlink() 1269 except Exception as e: 1270 success, msg = False, str(e) 1271 1272 return success, msg
Write the properties dictionary to the properties JSON file (only if self.properties exists).
1274 def write_pickle(self) -> SuccessTuple: 1275 """Write the pickle file for the daemon.""" 1276 import pickle 1277 import traceback 1278 from meerschaum.utils.misc import generate_password 1279 1280 if not self.pickle: 1281 return True, "Success" 1282 1283 backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl') 1284 try: 1285 self.path.mkdir(parents=True, exist_ok=True) 1286 if self.pickle_path.exists(): 1287 self.pickle_path.rename(backup_path) 1288 with open(self.pickle_path, 'wb+') as pickle_file: 1289 pickle.dump(self, pickle_file) 1290 success, msg = True, "Success" 1291 except Exception as e: 1292 success, msg = False, str(e) 1293 traceback.print_exception(type(e), e, e.__traceback__) 1294 try: 1295 if backup_path.exists(): 1296 if not success: 1297 backup_path.rename(self.pickle_path) 1298 else: 1299 backup_path.unlink() 1300 except Exception as e: 1301 success, msg = False, str(e) 1302 return success, msg
Write the pickle file for the daemon.
1332 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1333 """ 1334 Remove a daemon's directory after execution. 1335 1336 Parameters 1337 ---------- 1338 keep_logs: bool, default False 1339 If `True`, skip deleting the daemon's log files. 1340 1341 Returns 1342 ------- 1343 A `SuccessTuple` indicating success. 1344 """ 1345 if self.path.exists(): 1346 try: 1347 shutil.rmtree(self.path) 1348 except Exception as e: 1349 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1350 warn(msg) 1351 return False, msg 1352 if not keep_logs: 1353 self.rotating_log.delete() 1354 try: 1355 if self.log_offset_path.exists(): 1356 self.log_offset_path.unlink() 1357 except Exception as e: 1358 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1359 warn(msg) 1360 return False, msg 1361 return True, "Success"
Remove a daemon's directory after execution.
Parameters
- keep_logs (bool, default False):
If
True
, skip deleting the daemon's log files.
Returns
- A
SuccessTuple
indicating success.
1364 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1365 """ 1366 Return the timeout value to use. Use `--timeout-seconds` if provided, 1367 else the configured default (8). 1368 """ 1369 if isinstance(timeout, (int, float)): 1370 return timeout 1371 return get_config('jobs', 'timeout_seconds')
Return the timeout value to use. Use --timeout-seconds
if provided,
else the configured default (8).
1374 def get_check_timeout_interval_seconds( 1375 self, 1376 check_timeout_interval: Union[int, float, None] = None, 1377 ) -> Union[int, float]: 1378 """ 1379 Return the interval value to check the status of timeouts. 1380 """ 1381 if isinstance(check_timeout_interval, (int, float)): 1382 return check_timeout_interval 1383 return get_config('jobs', 'check_timeout_interval_seconds')
Return the interval value to check the status of timeouts.
1385 @property 1386 def target_args(self) -> Union[Tuple[Any], None]: 1387 """ 1388 Return the positional arguments to pass to the target function. 1389 """ 1390 target_args = ( 1391 self.__dict__.get('_target_args', None) 1392 or self.properties.get('target', {}).get('args', None) 1393 ) 1394 if target_args is None: 1395 return tuple([]) 1396 1397 return tuple(target_args)
Return the positional arguments to pass to the target function.
1399 @property 1400 def target_kw(self) -> Union[Dict[str, Any], None]: 1401 """ 1402 Return the keyword arguments to pass to the target function. 1403 """ 1404 target_kw = ( 1405 self.__dict__.get('_target_kw', None) 1406 or self.properties.get('target', {}).get('kw', None) 1407 ) 1408 if target_kw is None: 1409 return {} 1410 1411 return {key: val for key, val in target_kw.items()}
Return the keyword arguments to pass to the target function.
22class StdinFile(io.TextIOBase): 23 """ 24 Redirect user input into a Daemon's context. 25 """ 26 def __init__( 27 self, 28 file_path: Union[pathlib.Path, str], 29 lock_file_path: Optional[pathlib.Path] = None, 30 decode: bool = True, 31 refresh_seconds: Union[int, float, None] = None, 32 ): 33 if isinstance(file_path, str): 34 file_path = pathlib.Path(file_path) 35 36 self.file_path = file_path 37 self.blocking_file_path = ( 38 lock_file_path 39 if lock_file_path is not None 40 else (file_path.parent / (file_path.name + '.block')) 41 ) 42 self._file_handler = None 43 self._fd = None 44 self.sel = selectors.DefaultSelector() 45 self.decode = decode 46 self._write_fp = None 47 self._refresh_seconds = refresh_seconds 48 49 @property 50 def encoding(self): 51 return 'utf-8' 52 53 @property 54 def file_handler(self): 55 """ 56 Return the read file handler to the provided file path. 57 """ 58 if self._file_handler is not None: 59 return self._file_handler 60 61 if not self.file_path.exists(): 62 self.file_path.parent.mkdir(parents=True, exist_ok=True) 63 os.mkfifo(self.file_path.as_posix(), mode=0o600) 64 65 self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK) 66 self._file_handler = os.fdopen(self._fd, 'rb', buffering=0) 67 self.sel.register(self._file_handler, selectors.EVENT_READ) 68 return self._file_handler 69 70 def write(self, data): 71 if self._write_fp is None: 72 self.file_path.parent.mkdir(parents=True, exist_ok=True) 73 if not self.file_path.exists(): 74 os.mkfifo(self.file_path.as_posix(), mode=0o600) 75 self._write_fp = open(self.file_path, 'wb') 76 77 if isinstance(data, str): 78 data = data.encode('utf-8') 79 try: 80 self._write_fp.write(data) 81 self._write_fp.flush() 82 except BrokenPipeError: 83 pass 84 85 def fileno(self): 86 fileno = self.file_handler.fileno() 87 return fileno 88 89 def read(self, size=-1): 90 """ 91 Read from the FIFO pipe, blocking on EOFError. 92 """ 93 _ = self.file_handler 94 while True: 95 try: 96 data = self._file_handler.read(size) 97 if data: 98 try: 99 if self.blocking_file_path.exists(): 100 self.blocking_file_path.unlink() 101 except Exception: 102 warn(traceback.format_exc()) 103 return data.decode('utf-8') if self.decode else data 104 except (OSError, EOFError): 105 pass 106 107 if not self.blocking_file_path.exists(): 108 self.blocking_file_path.touch() 109 time.sleep(self.refresh_seconds) 110 111 def readline(self, size=-1): 112 line = '' if self.decode else b'' 113 while True: 114 data = self.read(1) 115 if not data or ((data == '\n') if self.decode else (data == b'\n')): 116 break 117 line += data 118 119 return line 120 121 def close(self): 122 if self._file_handler is not None: 123 self.sel.unregister(self._file_handler) 124 self._file_handler.close() 125 try: 126 os.close(self._fd) 127 except OSError: 128 pass 129 self._file_handler = None 130 self._fd = None 131 132 if self._write_fp is not None: 133 try: 134 self._write_fp.close() 135 except BrokenPipeError: 136 pass 137 self._write_fp = None 138 139 try: 140 if self.blocking_file_path.exists(): 141 self.blocking_file_path.unlink() 142 except Exception: 143 pass 144 super().close() 145 146 def is_open(self): 147 return self._file_handler is not None 148 149 def isatty(self) -> bool: 150 return False 151 152 @property 153 def refresh_seconds(self) -> Union[int, float]: 154 """ 155 How many seconds between checking for blocking functions. 156 """ 157 if not self._refresh_seconds: 158 self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds') 159 return self._refresh_seconds 160 161 def __str__(self) -> str: 162 return f"StdinFile('{self.file_path}')" 163 164 def __repr__(self) -> str: 165 return str(self)
Redirect user input into a Daemon's context.
26 def __init__( 27 self, 28 file_path: Union[pathlib.Path, str], 29 lock_file_path: Optional[pathlib.Path] = None, 30 decode: bool = True, 31 refresh_seconds: Union[int, float, None] = None, 32 ): 33 if isinstance(file_path, str): 34 file_path = pathlib.Path(file_path) 35 36 self.file_path = file_path 37 self.blocking_file_path = ( 38 lock_file_path 39 if lock_file_path is not None 40 else (file_path.parent / (file_path.name + '.block')) 41 ) 42 self._file_handler = None 43 self._fd = None 44 self.sel = selectors.DefaultSelector() 45 self.decode = decode 46 self._write_fp = None 47 self._refresh_seconds = refresh_seconds
53 @property 54 def file_handler(self): 55 """ 56 Return the read file handler to the provided file path. 57 """ 58 if self._file_handler is not None: 59 return self._file_handler 60 61 if not self.file_path.exists(): 62 self.file_path.parent.mkdir(parents=True, exist_ok=True) 63 os.mkfifo(self.file_path.as_posix(), mode=0o600) 64 65 self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK) 66 self._file_handler = os.fdopen(self._fd, 'rb', buffering=0) 67 self.sel.register(self._file_handler, selectors.EVENT_READ) 68 return self._file_handler
Return the read file handler to the provided file path.
70 def write(self, data): 71 if self._write_fp is None: 72 self.file_path.parent.mkdir(parents=True, exist_ok=True) 73 if not self.file_path.exists(): 74 os.mkfifo(self.file_path.as_posix(), mode=0o600) 75 self._write_fp = open(self.file_path, 'wb') 76 77 if isinstance(data, str): 78 data = data.encode('utf-8') 79 try: 80 self._write_fp.write(data) 81 self._write_fp.flush() 82 except BrokenPipeError: 83 pass
Write string s to stream.
Return the number of characters written (which is always equal to the length of the string).
Return underlying file descriptor if one exists.
Raise OSError if the IO object does not use a file descriptor.
89 def read(self, size=-1): 90 """ 91 Read from the FIFO pipe, blocking on EOFError. 92 """ 93 _ = self.file_handler 94 while True: 95 try: 96 data = self._file_handler.read(size) 97 if data: 98 try: 99 if self.blocking_file_path.exists(): 100 self.blocking_file_path.unlink() 101 except Exception: 102 warn(traceback.format_exc()) 103 return data.decode('utf-8') if self.decode else data 104 except (OSError, EOFError): 105 pass 106 107 if not self.blocking_file_path.exists(): 108 self.blocking_file_path.touch() 109 time.sleep(self.refresh_seconds)
Read from the FIFO pipe, blocking on EOFError.
111 def readline(self, size=-1): 112 line = '' if self.decode else b'' 113 while True: 114 data = self.read(1) 115 if not data or ((data == '\n') if self.decode else (data == b'\n')): 116 break 117 line += data 118 119 return line
Read until newline or EOF.
Return an empty string if EOF is hit immediately. If size is specified, at most size characters will be read.
121 def close(self): 122 if self._file_handler is not None: 123 self.sel.unregister(self._file_handler) 124 self._file_handler.close() 125 try: 126 os.close(self._fd) 127 except OSError: 128 pass 129 self._file_handler = None 130 self._fd = None 131 132 if self._write_fp is not None: 133 try: 134 self._write_fp.close() 135 except BrokenPipeError: 136 pass 137 self._write_fp = None 138 139 try: 140 if self.blocking_file_path.exists(): 141 self.blocking_file_path.unlink() 142 except Exception: 143 pass 144 super().close()
Flush and close the IO object.
This method has no effect if the file is already closed.
Return whether this is an 'interactive' stream.
Return False if it can't be determined.
152 @property 153 def refresh_seconds(self) -> Union[int, float]: 154 """ 155 How many seconds between checking for blocking functions. 156 """ 157 if not self._refresh_seconds: 158 self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds') 159 return self._refresh_seconds
How many seconds between checking for blocking functions.
27class RotatingFile(io.IOBase): 28 """ 29 A `RotatingFile` may be treated like a normal file-like object. 30 Under the hood, however, it will create new sub-files and delete old ones. 31 """ 32 33 SEEK_BACK_ATTEMPTS: int = 5 34 35 def __init__( 36 self, 37 file_path: pathlib.Path, 38 num_files_to_keep: Optional[int] = None, 39 max_file_size: Optional[int] = None, 40 redirect_streams: bool = False, 41 write_timestamps: bool = False, 42 timestamp_format: Optional[str] = None, 43 write_callback: Optional[Callable[[str], None]] = None, 44 ): 45 """ 46 Create a file-like object which manages other files. 47 48 Parameters 49 ---------- 50 num_files_to_keep: int, default None 51 How many sub-files to keep at any given time. 52 Defaults to the configured value (5). 53 54 max_file_size: int, default None 55 How large in bytes each sub-file can grow before another file is created. 56 Note that this is not a hard limit but rather a threshold 57 which may be slightly exceeded. 58 Defaults to the configured value (100_000). 59 60 redirect_streams: bool, default False 61 If `True`, redirect previous file streams when opening a new file descriptor. 62 63 NOTE: Only set this to `True` if you are entering into a daemon context. 64 Doing so will redirect `sys.stdout` and `sys.stderr` into the log files. 65 66 write_timestamps: bool, default False 67 If `True`, prepend the current UTC timestamp to each line of the file. 68 69 timestamp_format: str, default None 70 If `write_timestamps` is `True`, use this format for the timestamps. 71 Defaults to `'%Y-%m-%d %H:%M'`. 72 73 write_callback: Optional[Callable[[str], None]], default None 74 If provided, execute this callback with the data to be written. 75 """ 76 self.file_path = pathlib.Path(file_path) 77 if num_files_to_keep is None: 78 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 79 if max_file_size is None: 80 max_file_size = get_config('jobs', 'logs', 'max_file_size') 81 if timestamp_format is None: 82 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 83 if num_files_to_keep < 1: 84 raise ValueError("At least 1 file must be kept.") 85 if max_file_size < 100: 86 raise ValueError("Subfiles must contain at least 100 bytes.") 87 88 self.num_files_to_keep = num_files_to_keep 89 self.max_file_size = max_file_size 90 self.redirect_streams = redirect_streams 91 self.write_timestamps = write_timestamps 92 self.timestamp_format = timestamp_format 93 self.write_callback = write_callback 94 self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?') 95 96 ### When subfiles are opened, map from their index to the file objects. 97 self.subfile_objects = {} 98 self._redirected_subfile_objects = {} 99 self._current_file_obj = None 100 self._previous_file_obj = None 101 102 ### When reading, keep track of the file index and position. 103 self._cursor: Tuple[int, int] = (0, 0) 104 105 ### Don't forget to close any stray files. 106 atexit.register(self.close) 107 108 109 def fileno(self): 110 """ 111 Return the file descriptor for the latest subfile. 112 """ 113 self.refresh_files(start_interception=False) 114 return self._current_file_obj.fileno() 115 116 117 def get_latest_subfile_path(self) -> pathlib.Path: 118 """ 119 Return the path for the latest subfile to which to write into. 120 """ 121 return self.get_subfile_path_from_index( 122 self.get_latest_subfile_index() 123 ) 124 125 126 def get_remaining_subfile_size(self, subfile_index: int) -> int: 127 """ 128 Return the remaining buffer size for a subfile. 129 130 Parameters 131 --------- 132 subfile_index: int 133 The index of the subfile to be checked. 134 135 Returns 136 ------- 137 The remaining size in bytes. 138 """ 139 subfile_path = self.get_subfile_path_from_index(subfile_index) 140 if not subfile_path.exists(): 141 return self.max_file_size 142 143 return self.max_file_size - os.path.getsize(subfile_path) 144 145 146 def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool: 147 """ 148 Return whether a given subfile is too large. 149 150 Parameters 151 ---------- 152 subfile_index: int 153 The index of the subfile to be checked. 154 155 potential_new_len: int, default 0 156 The length of a potential write of new data. 157 158 Returns 159 ------- 160 A bool indicating the subfile is or will be too large. 161 """ 162 subfile_path = self.get_subfile_path_from_index(subfile_index) 163 if not subfile_path.exists(): 164 return False 165 166 self.flush() 167 168 return ( 169 (os.path.getsize(subfile_path) + potential_new_len) 170 >= 171 self.max_file_size 172 ) 173 174 175 def get_latest_subfile_index(self) -> int: 176 """ 177 Return the latest existing subfile index. 178 If no index may be found, return -1. 179 """ 180 existing_subfile_paths = self.get_existing_subfile_paths() 181 latest_index = ( 182 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 183 if existing_subfile_paths 184 else 0 185 ) 186 return latest_index 187 188 189 def get_index_from_subfile_name(self, subfile_name: str) -> int: 190 """ 191 Return the index from a given subfile name. 192 If the file name cannot be parsed, return -1. 193 """ 194 try: 195 return int(subfile_name.replace(self.file_path.name + '.', '')) 196 except Exception: 197 return -1 198 199 200 def get_subfile_name_from_index(self, subfile_index: int) -> str: 201 """ 202 Return the subfile name from the given index. 203 """ 204 return f'{self.file_path.name}.{subfile_index}' 205 206 207 def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path: 208 """ 209 Return the subfile's path from its index. 210 """ 211 return self.file_path.parent / self.get_subfile_name_from_index(subfile_index) 212 213 214 def get_existing_subfile_indices(self) -> List[int]: 215 """ 216 Return of list of subfile indices which exist on disk. 217 """ 218 existing_subfile_paths = self.get_existing_subfile_paths() 219 return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths] 220 221 222 def get_existing_subfile_paths(self) -> List[pathlib.Path]: 223 """ 224 Return a list of file paths that match the input filename pattern. 225 """ 226 if not self.file_path.parent.exists(): 227 return [] 228 229 subfile_names_indices = sorted( 230 [ 231 (file_name, self.get_index_from_subfile_name(file_name)) 232 for file_name in os.listdir(self.file_path.parent) 233 if ( 234 file_name.startswith(self.file_path.name) 235 and re.match(self.subfile_regex_pattern, file_name) 236 ) 237 ], 238 key=lambda x: x[1], 239 ) 240 return [ 241 (self.file_path.parent / file_name) 242 for file_name, _ in subfile_names_indices 243 ] 244 245 246 def refresh_files( 247 self, 248 potential_new_len: int = 0, 249 start_interception: bool = False, 250 ) -> '_io.TextUIWrapper': 251 """ 252 Check the state of the subfiles. 253 If the latest subfile is too large, create a new file and delete old ones. 254 255 Parameters 256 ---------- 257 potential_new_len: int, default 0 258 259 start_interception: bool, default False 260 If `True`, kick off the file interception threads. 261 """ 262 self.flush() 263 264 latest_subfile_index = self.get_latest_subfile_index() 265 latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index) 266 267 ### First run with existing log files: open the most recent log file. 268 is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None) 269 270 ### Sometimes a new file is created but output doesn't switch over. 271 lost_latest_handle = ( 272 self._current_file_obj is not None 273 and 274 self.get_index_from_subfile_name(self._current_file_obj.name) == -1 275 ) 276 if is_first_run_with_logs or lost_latest_handle: 277 self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8') 278 if self.redirect_streams: 279 try: 280 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 281 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 282 except OSError: 283 warn( 284 f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}" 285 ) 286 if start_interception and self.write_timestamps: 287 self.start_log_fd_interception() 288 289 create_new_file = ( 290 (latest_subfile_index == -1) 291 or 292 self._current_file_obj is None 293 or 294 self.is_subfile_too_large(latest_subfile_index, potential_new_len) 295 ) 296 if create_new_file: 297 self.increment_subfiles() 298 299 return self._current_file_obj 300 301 def increment_subfiles(self, increment_by: int = 1): 302 """ 303 Create a new subfile and switch the file pointer over. 304 """ 305 latest_subfile_index = self.get_latest_subfile_index() 306 old_subfile_index = latest_subfile_index 307 new_subfile_index = old_subfile_index + increment_by 308 new_file_path = self.get_subfile_path_from_index(new_subfile_index) 309 self._previous_file_obj = self._current_file_obj 310 self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8') 311 self.subfile_objects[new_subfile_index] = self._current_file_obj 312 self.flush() 313 314 if self.redirect_streams: 315 if self._previous_file_obj is not None: 316 self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj 317 daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj) 318 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 319 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 320 self.close(unused_only=True) 321 322 ### Sanity check in case writing somehow fails. 323 if self._previous_file_obj is self._current_file_obj: 324 self._previous_file_obj = None 325 326 self.delete(unused_only=True) 327 328 def close(self, unused_only: bool = False) -> None: 329 """ 330 Close any open file descriptors. 331 332 Parameters 333 ---------- 334 unused_only: bool, default False 335 If `True`, only close file descriptors not currently in use. 336 """ 337 self.stop_log_fd_interception(unused_only=unused_only) 338 subfile_indices = sorted(self.subfile_objects.keys()) 339 for subfile_index in subfile_indices: 340 subfile_object = self.subfile_objects[subfile_index] 341 if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj): 342 continue 343 try: 344 if not subfile_object.closed: 345 subfile_object.close() 346 except Exception: 347 warn(f"Failed to close an open subfile:\n{traceback.format_exc()}") 348 349 _ = self.subfile_objects.pop(subfile_index, None) 350 if self.redirect_streams: 351 _ = self._redirected_subfile_objects.pop(subfile_index, None) 352 353 if not unused_only: 354 self._previous_file_obj = None 355 self._current_file_obj = None 356 357 358 def get_timestamp_prefix_str(self) -> str: 359 """ 360 Return the current minute prefix string. 361 """ 362 return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | ' 363 364 365 def write(self, data: str) -> None: 366 """ 367 Write the given text into the latest subfile. 368 If the subfile will be too large, create a new subfile. 369 If too many subfiles exist at once, the oldest one will be deleted. 370 371 NOTE: This will not split data across multiple files. 372 As such, if data is larger than max_file_size, then the corresponding subfile 373 may exceed this limit. 374 """ 375 try: 376 if callable(self.write_callback): 377 self.write_callback(data) 378 except Exception: 379 warn(f"Failed to execute write callback:\n{traceback.format_exc()}") 380 381 try: 382 self.file_path.parent.mkdir(exist_ok=True, parents=True) 383 if isinstance(data, bytes): 384 data = data.decode('utf-8') 385 386 prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else "" 387 suffix_str = "\n" if self.write_timestamps else "" 388 self.refresh_files( 389 potential_new_len = len(prefix_str + data + suffix_str), 390 start_interception = self.write_timestamps, 391 ) 392 try: 393 if prefix_str: 394 self._current_file_obj.write(prefix_str) 395 self._current_file_obj.write(data) 396 if suffix_str: 397 self._current_file_obj.write(suffix_str) 398 except BrokenPipeError: 399 warn("BrokenPipeError encountered. The daemon may have been terminated.") 400 return 401 except Exception: 402 warn(f"Failed to write to subfile:\n{traceback.format_exc()}") 403 self.flush() 404 self.delete(unused_only=True) 405 except Exception as e: 406 warn(f"Unexpected error in RotatingFile.write: {e}") 407 408 409 def delete(self, unused_only: bool = False) -> None: 410 """ 411 Delete old subfiles. 412 413 Parameters 414 ---------- 415 unused_only: bool, default False 416 If `True`, only delete subfiles which are no longer needed. 417 """ 418 existing_subfile_paths = self.get_existing_subfile_paths() 419 if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep: 420 return 421 422 self.flush() 423 self.close(unused_only=unused_only) 424 425 end_ix = ( 426 (-1 * self.num_files_to_keep) 427 if unused_only 428 else len(existing_subfile_paths) 429 ) 430 for subfile_path_to_delete in existing_subfile_paths[0:end_ix]: 431 subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name) 432 433 try: 434 subfile_path_to_delete.unlink() 435 except Exception: 436 warn( 437 f"Unable to delete subfile '{subfile_path_to_delete}':\n" 438 + f"{traceback.format_exc()}" 439 ) 440 441 442 def read(self, *args, **kwargs) -> str: 443 """ 444 Read the contents of the existing subfiles. 445 """ 446 existing_subfile_indices = [ 447 self.get_index_from_subfile_name(subfile_path.name) 448 for subfile_path in self.get_existing_subfile_paths() 449 ] 450 paths_to_read = [ 451 self.get_subfile_path_from_index(subfile_index) 452 for subfile_index in existing_subfile_indices 453 if subfile_index >= self._cursor[0] 454 ] 455 buffer = '' 456 refresh_cursor = True 457 for subfile_path in paths_to_read: 458 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 459 seek_ix = ( 460 self._cursor[1] 461 if subfile_index == self._cursor[0] 462 else 0 463 ) 464 465 if ( 466 subfile_index in self.subfile_objects 467 and 468 subfile_index not in self._redirected_subfile_objects 469 ): 470 subfile_object = self.subfile_objects[subfile_index] 471 for i in range(self.SEEK_BACK_ATTEMPTS): 472 try: 473 subfile_object.seek(max(seek_ix - i, 0)) 474 buffer += subfile_object.read() 475 except UnicodeDecodeError: 476 continue 477 break 478 else: 479 with open(subfile_path, 'r', encoding='utf-8') as f: 480 for i in range(self.SEEK_BACK_ATTEMPTS): 481 try: 482 f.seek(max(seek_ix - i, 0)) 483 buffer += f.read() 484 except UnicodeDecodeError: 485 continue 486 break 487 488 ### Handle the case when no files have yet been opened. 489 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 490 self._cursor = (subfile_index, f.tell()) 491 refresh_cursor = False 492 493 if refresh_cursor: 494 self.refresh_cursor() 495 return buffer 496 497 498 def refresh_cursor(self) -> None: 499 """ 500 Update the cursor to the latest subfile index and file.tell() value. 501 """ 502 self.flush() 503 existing_subfile_paths = self.get_existing_subfile_paths() 504 current_ix = ( 505 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 506 if existing_subfile_paths 507 else 0 508 ) 509 position = self._current_file_obj.tell() if self._current_file_obj is not None else 0 510 self._cursor = (current_ix, position) 511 512 513 def readlines(self) -> List[str]: 514 """ 515 Return a list of lines of text. 516 """ 517 existing_subfile_indices = [ 518 self.get_index_from_subfile_name(subfile_path.name) 519 for subfile_path in self.get_existing_subfile_paths() 520 ] 521 paths_to_read = [ 522 self.get_subfile_path_from_index(subfile_index) 523 for subfile_index in existing_subfile_indices 524 if subfile_index >= self._cursor[0] 525 ] 526 527 lines = [] 528 refresh_cursor = True 529 for subfile_path in paths_to_read: 530 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 531 seek_ix = ( 532 self._cursor[1] 533 if subfile_index == self._cursor[0] 534 else 0 535 ) 536 537 subfile_lines = [] 538 if ( 539 subfile_index in self.subfile_objects 540 and 541 subfile_index not in self._redirected_subfile_objects 542 ): 543 subfile_object = self.subfile_objects[subfile_index] 544 for i in range(self.SEEK_BACK_ATTEMPTS): 545 try: 546 subfile_object.seek(max((seek_ix - i), 0)) 547 subfile_lines = subfile_object.readlines() 548 except UnicodeDecodeError: 549 continue 550 break 551 else: 552 with open(subfile_path, 'r', encoding='utf-8') as f: 553 for i in range(self.SEEK_BACK_ATTEMPTS): 554 try: 555 f.seek(max(seek_ix - i, 0)) 556 subfile_lines = f.readlines() 557 except UnicodeDecodeError: 558 continue 559 break 560 561 ### Handle the case when no files have yet been opened. 562 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 563 self._cursor = (subfile_index, f.tell()) 564 refresh_cursor = False 565 566 ### Sometimes a line may span multiple files. 567 if lines and subfile_lines and not lines[-1].endswith('\n'): 568 lines[-1] += subfile_lines[0] 569 new_lines = subfile_lines[1:] 570 else: 571 new_lines = subfile_lines 572 lines.extend(new_lines) 573 574 if refresh_cursor: 575 self.refresh_cursor() 576 return lines 577 578 579 def seekable(self) -> bool: 580 return True 581 582 583 def seek(self, position: int) -> None: 584 """ 585 Seek to the beginning of the logs stream. 586 """ 587 existing_subfile_indices = self.get_existing_subfile_indices() 588 min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0 589 max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0 590 if position == 0: 591 self._cursor = (min_ix, 0) 592 return 593 594 self._cursor = (max_ix, position) 595 if self._current_file_obj is not None: 596 self._current_file_obj.seek(position) 597 598 599 def flush(self) -> None: 600 """ 601 Flush any open subfiles. 602 """ 603 for subfile_index, subfile_object in self.subfile_objects.items(): 604 if not subfile_object.closed: 605 try: 606 subfile_object.flush() 607 except Exception: 608 warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}") 609 610 if self.redirect_streams: 611 try: 612 sys.stdout.flush() 613 except BrokenPipeError: 614 pass 615 except Exception: 616 warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}") 617 try: 618 sys.stderr.flush() 619 except BrokenPipeError: 620 pass 621 except Exception: 622 warn(f"Failed to flush STDERR:\n{traceback.format_exc()}") 623 624 625 def start_log_fd_interception(self): 626 """ 627 Start the file descriptor monitoring threads. 628 """ 629 if not self.write_timestamps: 630 return 631 632 self._stdout_interceptor = FileDescriptorInterceptor( 633 sys.stdout.fileno(), 634 self.get_timestamp_prefix_str, 635 ) 636 self._stderr_interceptor = FileDescriptorInterceptor( 637 sys.stderr.fileno(), 638 self.get_timestamp_prefix_str, 639 ) 640 641 self._stdout_interceptor_thread = Thread( 642 target = self._stdout_interceptor.start_interception, 643 daemon = True, 644 ) 645 self._stderr_interceptor_thread = Thread( 646 target = self._stderr_interceptor.start_interception, 647 daemon = True, 648 ) 649 self._stdout_interceptor_thread.start() 650 self._stderr_interceptor_thread.start() 651 self._intercepting = True 652 653 if '_interceptor_threads' not in self.__dict__: 654 self._interceptor_threads = [] 655 if '_interceptors' not in self.__dict__: 656 self._interceptors = [] 657 self._interceptor_threads.extend([ 658 self._stdout_interceptor_thread, 659 self._stderr_interceptor_thread, 660 ]) 661 self._interceptors.extend([ 662 self._stdout_interceptor, 663 self._stderr_interceptor, 664 ]) 665 self.stop_log_fd_interception(unused_only=True) 666 667 668 def stop_log_fd_interception(self, unused_only: bool = False): 669 """ 670 Stop the file descriptor monitoring threads. 671 """ 672 if not self.write_timestamps: 673 return 674 675 interceptors = self.__dict__.get('_interceptors', []) 676 interceptor_threads = self.__dict__.get('_interceptor_threads', []) 677 678 end_ix = len(interceptors) if not unused_only else -2 679 680 for interceptor in interceptors[:end_ix]: 681 interceptor.stop_interception() 682 del interceptors[:end_ix] 683 684 for thread in interceptor_threads[:end_ix]: 685 try: 686 thread.join() 687 except Exception: 688 warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}") 689 del interceptor_threads[:end_ix] 690 691 def touch(self): 692 """ 693 Touch the latest subfile. 694 """ 695 subfile_path = self.get_latest_subfile_path() 696 subfile_path.touch() 697 698 def isatty(self) -> bool: 699 return True 700 701 def __repr__(self) -> str: 702 """ 703 Return basic info for this `RotatingFile`. 704 """ 705 return ( 706 "RotatingFile(" 707 + f"'{self.file_path.as_posix()}', " 708 + f"num_files_to_keep={self.num_files_to_keep}, " 709 + f"max_file_size={self.max_file_size})" 710 )
A RotatingFile
may be treated like a normal file-like object.
Under the hood, however, it will create new sub-files and delete old ones.
35 def __init__( 36 self, 37 file_path: pathlib.Path, 38 num_files_to_keep: Optional[int] = None, 39 max_file_size: Optional[int] = None, 40 redirect_streams: bool = False, 41 write_timestamps: bool = False, 42 timestamp_format: Optional[str] = None, 43 write_callback: Optional[Callable[[str], None]] = None, 44 ): 45 """ 46 Create a file-like object which manages other files. 47 48 Parameters 49 ---------- 50 num_files_to_keep: int, default None 51 How many sub-files to keep at any given time. 52 Defaults to the configured value (5). 53 54 max_file_size: int, default None 55 How large in bytes each sub-file can grow before another file is created. 56 Note that this is not a hard limit but rather a threshold 57 which may be slightly exceeded. 58 Defaults to the configured value (100_000). 59 60 redirect_streams: bool, default False 61 If `True`, redirect previous file streams when opening a new file descriptor. 62 63 NOTE: Only set this to `True` if you are entering into a daemon context. 64 Doing so will redirect `sys.stdout` and `sys.stderr` into the log files. 65 66 write_timestamps: bool, default False 67 If `True`, prepend the current UTC timestamp to each line of the file. 68 69 timestamp_format: str, default None 70 If `write_timestamps` is `True`, use this format for the timestamps. 71 Defaults to `'%Y-%m-%d %H:%M'`. 72 73 write_callback: Optional[Callable[[str], None]], default None 74 If provided, execute this callback with the data to be written. 75 """ 76 self.file_path = pathlib.Path(file_path) 77 if num_files_to_keep is None: 78 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 79 if max_file_size is None: 80 max_file_size = get_config('jobs', 'logs', 'max_file_size') 81 if timestamp_format is None: 82 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 83 if num_files_to_keep < 1: 84 raise ValueError("At least 1 file must be kept.") 85 if max_file_size < 100: 86 raise ValueError("Subfiles must contain at least 100 bytes.") 87 88 self.num_files_to_keep = num_files_to_keep 89 self.max_file_size = max_file_size 90 self.redirect_streams = redirect_streams 91 self.write_timestamps = write_timestamps 92 self.timestamp_format = timestamp_format 93 self.write_callback = write_callback 94 self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?') 95 96 ### When subfiles are opened, map from their index to the file objects. 97 self.subfile_objects = {} 98 self._redirected_subfile_objects = {} 99 self._current_file_obj = None 100 self._previous_file_obj = None 101 102 ### When reading, keep track of the file index and position. 103 self._cursor: Tuple[int, int] = (0, 0) 104 105 ### Don't forget to close any stray files. 106 atexit.register(self.close)
Create a file-like object which manages other files.
Parameters
- num_files_to_keep (int, default None): How many sub-files to keep at any given time. Defaults to the configured value (5).
- max_file_size (int, default None): How large in bytes each sub-file can grow before another file is created. Note that this is not a hard limit but rather a threshold which may be slightly exceeded. Defaults to the configured value (100_000).
redirect_streams (bool, default False): If
True
, redirect previous file streams when opening a new file descriptor.NOTE: Only set this to
True
if you are entering into a daemon context. Doing so will redirectsys.stdout
andsys.stderr
into the log files.- write_timestamps (bool, default False):
If
True
, prepend the current UTC timestamp to each line of the file. - timestamp_format (str, default None):
If
write_timestamps
isTrue
, use this format for the timestamps. Defaults to'%Y-%m-%d %H:%M'
. - write_callback (Optional[Callable[[str], None]], default None): If provided, execute this callback with the data to be written.
109 def fileno(self): 110 """ 111 Return the file descriptor for the latest subfile. 112 """ 113 self.refresh_files(start_interception=False) 114 return self._current_file_obj.fileno()
Return the file descriptor for the latest subfile.
117 def get_latest_subfile_path(self) -> pathlib.Path: 118 """ 119 Return the path for the latest subfile to which to write into. 120 """ 121 return self.get_subfile_path_from_index( 122 self.get_latest_subfile_index() 123 )
Return the path for the latest subfile to which to write into.
126 def get_remaining_subfile_size(self, subfile_index: int) -> int: 127 """ 128 Return the remaining buffer size for a subfile. 129 130 Parameters 131 --------- 132 subfile_index: int 133 The index of the subfile to be checked. 134 135 Returns 136 ------- 137 The remaining size in bytes. 138 """ 139 subfile_path = self.get_subfile_path_from_index(subfile_index) 140 if not subfile_path.exists(): 141 return self.max_file_size 142 143 return self.max_file_size - os.path.getsize(subfile_path)
Return the remaining buffer size for a subfile.
Parameters
- subfile_index (int): The index of the subfile to be checked.
Returns
- The remaining size in bytes.
146 def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool: 147 """ 148 Return whether a given subfile is too large. 149 150 Parameters 151 ---------- 152 subfile_index: int 153 The index of the subfile to be checked. 154 155 potential_new_len: int, default 0 156 The length of a potential write of new data. 157 158 Returns 159 ------- 160 A bool indicating the subfile is or will be too large. 161 """ 162 subfile_path = self.get_subfile_path_from_index(subfile_index) 163 if not subfile_path.exists(): 164 return False 165 166 self.flush() 167 168 return ( 169 (os.path.getsize(subfile_path) + potential_new_len) 170 >= 171 self.max_file_size 172 )
Return whether a given subfile is too large.
Parameters
- subfile_index (int): The index of the subfile to be checked.
- potential_new_len (int, default 0): The length of a potential write of new data.
Returns
- A bool indicating the subfile is or will be too large.
175 def get_latest_subfile_index(self) -> int: 176 """ 177 Return the latest existing subfile index. 178 If no index may be found, return -1. 179 """ 180 existing_subfile_paths = self.get_existing_subfile_paths() 181 latest_index = ( 182 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 183 if existing_subfile_paths 184 else 0 185 ) 186 return latest_index
Return the latest existing subfile index. If no index may be found, return -1.
189 def get_index_from_subfile_name(self, subfile_name: str) -> int: 190 """ 191 Return the index from a given subfile name. 192 If the file name cannot be parsed, return -1. 193 """ 194 try: 195 return int(subfile_name.replace(self.file_path.name + '.', '')) 196 except Exception: 197 return -1
Return the index from a given subfile name. If the file name cannot be parsed, return -1.
200 def get_subfile_name_from_index(self, subfile_index: int) -> str: 201 """ 202 Return the subfile name from the given index. 203 """ 204 return f'{self.file_path.name}.{subfile_index}'
Return the subfile name from the given index.
207 def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path: 208 """ 209 Return the subfile's path from its index. 210 """ 211 return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)
Return the subfile's path from its index.
214 def get_existing_subfile_indices(self) -> List[int]: 215 """ 216 Return of list of subfile indices which exist on disk. 217 """ 218 existing_subfile_paths = self.get_existing_subfile_paths() 219 return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]
Return of list of subfile indices which exist on disk.
222 def get_existing_subfile_paths(self) -> List[pathlib.Path]: 223 """ 224 Return a list of file paths that match the input filename pattern. 225 """ 226 if not self.file_path.parent.exists(): 227 return [] 228 229 subfile_names_indices = sorted( 230 [ 231 (file_name, self.get_index_from_subfile_name(file_name)) 232 for file_name in os.listdir(self.file_path.parent) 233 if ( 234 file_name.startswith(self.file_path.name) 235 and re.match(self.subfile_regex_pattern, file_name) 236 ) 237 ], 238 key=lambda x: x[1], 239 ) 240 return [ 241 (self.file_path.parent / file_name) 242 for file_name, _ in subfile_names_indices 243 ]
Return a list of file paths that match the input filename pattern.
246 def refresh_files( 247 self, 248 potential_new_len: int = 0, 249 start_interception: bool = False, 250 ) -> '_io.TextUIWrapper': 251 """ 252 Check the state of the subfiles. 253 If the latest subfile is too large, create a new file and delete old ones. 254 255 Parameters 256 ---------- 257 potential_new_len: int, default 0 258 259 start_interception: bool, default False 260 If `True`, kick off the file interception threads. 261 """ 262 self.flush() 263 264 latest_subfile_index = self.get_latest_subfile_index() 265 latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index) 266 267 ### First run with existing log files: open the most recent log file. 268 is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None) 269 270 ### Sometimes a new file is created but output doesn't switch over. 271 lost_latest_handle = ( 272 self._current_file_obj is not None 273 and 274 self.get_index_from_subfile_name(self._current_file_obj.name) == -1 275 ) 276 if is_first_run_with_logs or lost_latest_handle: 277 self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8') 278 if self.redirect_streams: 279 try: 280 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 281 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 282 except OSError: 283 warn( 284 f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}" 285 ) 286 if start_interception and self.write_timestamps: 287 self.start_log_fd_interception() 288 289 create_new_file = ( 290 (latest_subfile_index == -1) 291 or 292 self._current_file_obj is None 293 or 294 self.is_subfile_too_large(latest_subfile_index, potential_new_len) 295 ) 296 if create_new_file: 297 self.increment_subfiles() 298 299 return self._current_file_obj
Check the state of the subfiles. If the latest subfile is too large, create a new file and delete old ones.
Parameters
potential_new_len (int, default 0):
start_interception (bool, default False): If
True
, kick off the file interception threads.
301 def increment_subfiles(self, increment_by: int = 1): 302 """ 303 Create a new subfile and switch the file pointer over. 304 """ 305 latest_subfile_index = self.get_latest_subfile_index() 306 old_subfile_index = latest_subfile_index 307 new_subfile_index = old_subfile_index + increment_by 308 new_file_path = self.get_subfile_path_from_index(new_subfile_index) 309 self._previous_file_obj = self._current_file_obj 310 self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8') 311 self.subfile_objects[new_subfile_index] = self._current_file_obj 312 self.flush() 313 314 if self.redirect_streams: 315 if self._previous_file_obj is not None: 316 self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj 317 daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj) 318 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 319 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 320 self.close(unused_only=True) 321 322 ### Sanity check in case writing somehow fails. 323 if self._previous_file_obj is self._current_file_obj: 324 self._previous_file_obj = None 325 326 self.delete(unused_only=True)
Create a new subfile and switch the file pointer over.
328 def close(self, unused_only: bool = False) -> None: 329 """ 330 Close any open file descriptors. 331 332 Parameters 333 ---------- 334 unused_only: bool, default False 335 If `True`, only close file descriptors not currently in use. 336 """ 337 self.stop_log_fd_interception(unused_only=unused_only) 338 subfile_indices = sorted(self.subfile_objects.keys()) 339 for subfile_index in subfile_indices: 340 subfile_object = self.subfile_objects[subfile_index] 341 if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj): 342 continue 343 try: 344 if not subfile_object.closed: 345 subfile_object.close() 346 except Exception: 347 warn(f"Failed to close an open subfile:\n{traceback.format_exc()}") 348 349 _ = self.subfile_objects.pop(subfile_index, None) 350 if self.redirect_streams: 351 _ = self._redirected_subfile_objects.pop(subfile_index, None) 352 353 if not unused_only: 354 self._previous_file_obj = None 355 self._current_file_obj = None
Close any open file descriptors.
Parameters
- unused_only (bool, default False):
If
True
, only close file descriptors not currently in use.
358 def get_timestamp_prefix_str(self) -> str: 359 """ 360 Return the current minute prefix string. 361 """ 362 return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '
Return the current minute prefix string.
365 def write(self, data: str) -> None: 366 """ 367 Write the given text into the latest subfile. 368 If the subfile will be too large, create a new subfile. 369 If too many subfiles exist at once, the oldest one will be deleted. 370 371 NOTE: This will not split data across multiple files. 372 As such, if data is larger than max_file_size, then the corresponding subfile 373 may exceed this limit. 374 """ 375 try: 376 if callable(self.write_callback): 377 self.write_callback(data) 378 except Exception: 379 warn(f"Failed to execute write callback:\n{traceback.format_exc()}") 380 381 try: 382 self.file_path.parent.mkdir(exist_ok=True, parents=True) 383 if isinstance(data, bytes): 384 data = data.decode('utf-8') 385 386 prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else "" 387 suffix_str = "\n" if self.write_timestamps else "" 388 self.refresh_files( 389 potential_new_len = len(prefix_str + data + suffix_str), 390 start_interception = self.write_timestamps, 391 ) 392 try: 393 if prefix_str: 394 self._current_file_obj.write(prefix_str) 395 self._current_file_obj.write(data) 396 if suffix_str: 397 self._current_file_obj.write(suffix_str) 398 except BrokenPipeError: 399 warn("BrokenPipeError encountered. The daemon may have been terminated.") 400 return 401 except Exception: 402 warn(f"Failed to write to subfile:\n{traceback.format_exc()}") 403 self.flush() 404 self.delete(unused_only=True) 405 except Exception as e: 406 warn(f"Unexpected error in RotatingFile.write: {e}")
Write the given text into the latest subfile. If the subfile will be too large, create a new subfile. If too many subfiles exist at once, the oldest one will be deleted.
NOTE: This will not split data across multiple files. As such, if data is larger than max_file_size, then the corresponding subfile may exceed this limit.
409 def delete(self, unused_only: bool = False) -> None: 410 """ 411 Delete old subfiles. 412 413 Parameters 414 ---------- 415 unused_only: bool, default False 416 If `True`, only delete subfiles which are no longer needed. 417 """ 418 existing_subfile_paths = self.get_existing_subfile_paths() 419 if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep: 420 return 421 422 self.flush() 423 self.close(unused_only=unused_only) 424 425 end_ix = ( 426 (-1 * self.num_files_to_keep) 427 if unused_only 428 else len(existing_subfile_paths) 429 ) 430 for subfile_path_to_delete in existing_subfile_paths[0:end_ix]: 431 subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name) 432 433 try: 434 subfile_path_to_delete.unlink() 435 except Exception: 436 warn( 437 f"Unable to delete subfile '{subfile_path_to_delete}':\n" 438 + f"{traceback.format_exc()}" 439 )
Delete old subfiles.
Parameters
- unused_only (bool, default False):
If
True
, only delete subfiles which are no longer needed.
442 def read(self, *args, **kwargs) -> str: 443 """ 444 Read the contents of the existing subfiles. 445 """ 446 existing_subfile_indices = [ 447 self.get_index_from_subfile_name(subfile_path.name) 448 for subfile_path in self.get_existing_subfile_paths() 449 ] 450 paths_to_read = [ 451 self.get_subfile_path_from_index(subfile_index) 452 for subfile_index in existing_subfile_indices 453 if subfile_index >= self._cursor[0] 454 ] 455 buffer = '' 456 refresh_cursor = True 457 for subfile_path in paths_to_read: 458 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 459 seek_ix = ( 460 self._cursor[1] 461 if subfile_index == self._cursor[0] 462 else 0 463 ) 464 465 if ( 466 subfile_index in self.subfile_objects 467 and 468 subfile_index not in self._redirected_subfile_objects 469 ): 470 subfile_object = self.subfile_objects[subfile_index] 471 for i in range(self.SEEK_BACK_ATTEMPTS): 472 try: 473 subfile_object.seek(max(seek_ix - i, 0)) 474 buffer += subfile_object.read() 475 except UnicodeDecodeError: 476 continue 477 break 478 else: 479 with open(subfile_path, 'r', encoding='utf-8') as f: 480 for i in range(self.SEEK_BACK_ATTEMPTS): 481 try: 482 f.seek(max(seek_ix - i, 0)) 483 buffer += f.read() 484 except UnicodeDecodeError: 485 continue 486 break 487 488 ### Handle the case when no files have yet been opened. 489 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 490 self._cursor = (subfile_index, f.tell()) 491 refresh_cursor = False 492 493 if refresh_cursor: 494 self.refresh_cursor() 495 return buffer
Read the contents of the existing subfiles.
498 def refresh_cursor(self) -> None: 499 """ 500 Update the cursor to the latest subfile index and file.tell() value. 501 """ 502 self.flush() 503 existing_subfile_paths = self.get_existing_subfile_paths() 504 current_ix = ( 505 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 506 if existing_subfile_paths 507 else 0 508 ) 509 position = self._current_file_obj.tell() if self._current_file_obj is not None else 0 510 self._cursor = (current_ix, position)
Update the cursor to the latest subfile index and file.tell() value.
513 def readlines(self) -> List[str]: 514 """ 515 Return a list of lines of text. 516 """ 517 existing_subfile_indices = [ 518 self.get_index_from_subfile_name(subfile_path.name) 519 for subfile_path in self.get_existing_subfile_paths() 520 ] 521 paths_to_read = [ 522 self.get_subfile_path_from_index(subfile_index) 523 for subfile_index in existing_subfile_indices 524 if subfile_index >= self._cursor[0] 525 ] 526 527 lines = [] 528 refresh_cursor = True 529 for subfile_path in paths_to_read: 530 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 531 seek_ix = ( 532 self._cursor[1] 533 if subfile_index == self._cursor[0] 534 else 0 535 ) 536 537 subfile_lines = [] 538 if ( 539 subfile_index in self.subfile_objects 540 and 541 subfile_index not in self._redirected_subfile_objects 542 ): 543 subfile_object = self.subfile_objects[subfile_index] 544 for i in range(self.SEEK_BACK_ATTEMPTS): 545 try: 546 subfile_object.seek(max((seek_ix - i), 0)) 547 subfile_lines = subfile_object.readlines() 548 except UnicodeDecodeError: 549 continue 550 break 551 else: 552 with open(subfile_path, 'r', encoding='utf-8') as f: 553 for i in range(self.SEEK_BACK_ATTEMPTS): 554 try: 555 f.seek(max(seek_ix - i, 0)) 556 subfile_lines = f.readlines() 557 except UnicodeDecodeError: 558 continue 559 break 560 561 ### Handle the case when no files have yet been opened. 562 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 563 self._cursor = (subfile_index, f.tell()) 564 refresh_cursor = False 565 566 ### Sometimes a line may span multiple files. 567 if lines and subfile_lines and not lines[-1].endswith('\n'): 568 lines[-1] += subfile_lines[0] 569 new_lines = subfile_lines[1:] 570 else: 571 new_lines = subfile_lines 572 lines.extend(new_lines) 573 574 if refresh_cursor: 575 self.refresh_cursor() 576 return lines
Return a list of lines of text.
Return whether object supports random access.
If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().
583 def seek(self, position: int) -> None: 584 """ 585 Seek to the beginning of the logs stream. 586 """ 587 existing_subfile_indices = self.get_existing_subfile_indices() 588 min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0 589 max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0 590 if position == 0: 591 self._cursor = (min_ix, 0) 592 return 593 594 self._cursor = (max_ix, position) 595 if self._current_file_obj is not None: 596 self._current_file_obj.seek(position)
Seek to the beginning of the logs stream.
599 def flush(self) -> None: 600 """ 601 Flush any open subfiles. 602 """ 603 for subfile_index, subfile_object in self.subfile_objects.items(): 604 if not subfile_object.closed: 605 try: 606 subfile_object.flush() 607 except Exception: 608 warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}") 609 610 if self.redirect_streams: 611 try: 612 sys.stdout.flush() 613 except BrokenPipeError: 614 pass 615 except Exception: 616 warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}") 617 try: 618 sys.stderr.flush() 619 except BrokenPipeError: 620 pass 621 except Exception: 622 warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")
Flush any open subfiles.
625 def start_log_fd_interception(self): 626 """ 627 Start the file descriptor monitoring threads. 628 """ 629 if not self.write_timestamps: 630 return 631 632 self._stdout_interceptor = FileDescriptorInterceptor( 633 sys.stdout.fileno(), 634 self.get_timestamp_prefix_str, 635 ) 636 self._stderr_interceptor = FileDescriptorInterceptor( 637 sys.stderr.fileno(), 638 self.get_timestamp_prefix_str, 639 ) 640 641 self._stdout_interceptor_thread = Thread( 642 target = self._stdout_interceptor.start_interception, 643 daemon = True, 644 ) 645 self._stderr_interceptor_thread = Thread( 646 target = self._stderr_interceptor.start_interception, 647 daemon = True, 648 ) 649 self._stdout_interceptor_thread.start() 650 self._stderr_interceptor_thread.start() 651 self._intercepting = True 652 653 if '_interceptor_threads' not in self.__dict__: 654 self._interceptor_threads = [] 655 if '_interceptors' not in self.__dict__: 656 self._interceptors = [] 657 self._interceptor_threads.extend([ 658 self._stdout_interceptor_thread, 659 self._stderr_interceptor_thread, 660 ]) 661 self._interceptors.extend([ 662 self._stdout_interceptor, 663 self._stderr_interceptor, 664 ]) 665 self.stop_log_fd_interception(unused_only=True)
Start the file descriptor monitoring threads.
668 def stop_log_fd_interception(self, unused_only: bool = False): 669 """ 670 Stop the file descriptor monitoring threads. 671 """ 672 if not self.write_timestamps: 673 return 674 675 interceptors = self.__dict__.get('_interceptors', []) 676 interceptor_threads = self.__dict__.get('_interceptor_threads', []) 677 678 end_ix = len(interceptors) if not unused_only else -2 679 680 for interceptor in interceptors[:end_ix]: 681 interceptor.stop_interception() 682 del interceptors[:end_ix] 683 684 for thread in interceptor_threads[:end_ix]: 685 try: 686 thread.join() 687 except Exception: 688 warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}") 689 del interceptor_threads[:end_ix]
Stop the file descriptor monitoring threads.
23class FileDescriptorInterceptor: 24 """ 25 A management class to intercept data written to a file descriptor. 26 """ 27 def __init__( 28 self, 29 file_descriptor: int, 30 injection_hook: Callable[[], str], 31 ): 32 """ 33 Parameters 34 ---------- 35 file_descriptor: int 36 The OS file descriptor from which to read. 37 38 injection_hook: Callable[[], str] 39 A callable which returns a string to be injected into the written data. 40 """ 41 self.stop_event = Event() 42 self.injection_hook = injection_hook 43 self.original_file_descriptor = file_descriptor 44 self.new_file_descriptor = os.dup(file_descriptor) 45 self.read_pipe, self.write_pipe = os.pipe() 46 self.signal_read_pipe, self.signal_write_pipe = os.pipe() 47 os.dup2(self.write_pipe, file_descriptor) 48 49 def start_interception(self): 50 """ 51 Read from the file descriptor and write the modified data after injection. 52 53 NOTE: This is blocking and is meant to be run in a thread. 54 """ 55 os.set_blocking(self.read_pipe, False) 56 os.set_blocking(self.signal_read_pipe, False) 57 is_first_read = True 58 while not self.stop_event.is_set(): 59 try: 60 rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1) 61 if self.signal_read_pipe in rlist: 62 break 63 if not rlist: 64 continue 65 data = os.read(self.read_pipe, 1024) 66 if not data: 67 break 68 except BlockingIOError: 69 continue 70 except OSError as e: 71 if e.errno == errno.EBADF: 72 ### File descriptor is closed. 73 pass 74 elif e.errno == errno.EINTR: 75 continue # Interrupted system call, just try again 76 else: 77 warn(f"OSError in FileDescriptorInterceptor: {e}") 78 break 79 80 try: 81 last_char_is_newline = data[-1] == b'\n' 82 83 injected_str = self.injection_hook() 84 injected_bytes = injected_str.encode('utf-8') 85 86 if is_first_read: 87 data = b'\n' + data 88 is_first_read = False 89 90 modified_data = ( 91 (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n') 92 if last_char_is_newline 93 else data.replace(b'\n', b'\n' + injected_bytes) 94 ) 95 os.write(self.new_file_descriptor, modified_data) 96 except (BrokenPipeError, OSError): 97 break 98 except Exception: 99 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 100 f.write(traceback.format_exc()) 101 break 102 103 104 def stop_interception(self): 105 """ 106 Close the new file descriptors. 107 """ 108 self.stop_event.set() 109 os.write(self.signal_write_pipe, b'\0') 110 try: 111 os.close(self.new_file_descriptor) 112 except OSError as e: 113 if e.errno != FD_CLOSED: 114 warn( 115 "Error while trying to close the duplicated file descriptor:\n" 116 + f"{traceback.format_exc()}" 117 ) 118 119 try: 120 os.close(self.write_pipe) 121 except OSError as e: 122 if e.errno != FD_CLOSED: 123 warn( 124 "Error while trying to close the write-pipe " 125 + "to the intercepted file descriptor:\n" 126 + f"{traceback.format_exc()}" 127 ) 128 try: 129 os.close(self.read_pipe) 130 except OSError as e: 131 if e.errno != FD_CLOSED: 132 warn( 133 "Error while trying to close the read-pipe " 134 + "to the intercepted file descriptor:\n" 135 + f"{traceback.format_exc()}" 136 ) 137 138 try: 139 os.close(self.signal_read_pipe) 140 except OSError as e: 141 if e.errno != FD_CLOSED: 142 warn( 143 "Error while trying to close the signal-read-pipe " 144 + "to the intercepted file descriptor:\n" 145 + f"{traceback.format_exc()}" 146 ) 147 148 try: 149 os.close(self.signal_write_pipe) 150 except OSError as e: 151 if e.errno != FD_CLOSED: 152 warn( 153 "Error while trying to close the signal-write-pipe " 154 + "to the intercepted file descriptor:\n" 155 + f"{traceback.format_exc()}" 156 )
A management class to intercept data written to a file descriptor.
27 def __init__( 28 self, 29 file_descriptor: int, 30 injection_hook: Callable[[], str], 31 ): 32 """ 33 Parameters 34 ---------- 35 file_descriptor: int 36 The OS file descriptor from which to read. 37 38 injection_hook: Callable[[], str] 39 A callable which returns a string to be injected into the written data. 40 """ 41 self.stop_event = Event() 42 self.injection_hook = injection_hook 43 self.original_file_descriptor = file_descriptor 44 self.new_file_descriptor = os.dup(file_descriptor) 45 self.read_pipe, self.write_pipe = os.pipe() 46 self.signal_read_pipe, self.signal_write_pipe = os.pipe() 47 os.dup2(self.write_pipe, file_descriptor)
Parameters
- file_descriptor (int): The OS file descriptor from which to read.
- injection_hook (Callable[[], str]): A callable which returns a string to be injected into the written data.
49 def start_interception(self): 50 """ 51 Read from the file descriptor and write the modified data after injection. 52 53 NOTE: This is blocking and is meant to be run in a thread. 54 """ 55 os.set_blocking(self.read_pipe, False) 56 os.set_blocking(self.signal_read_pipe, False) 57 is_first_read = True 58 while not self.stop_event.is_set(): 59 try: 60 rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1) 61 if self.signal_read_pipe in rlist: 62 break 63 if not rlist: 64 continue 65 data = os.read(self.read_pipe, 1024) 66 if not data: 67 break 68 except BlockingIOError: 69 continue 70 except OSError as e: 71 if e.errno == errno.EBADF: 72 ### File descriptor is closed. 73 pass 74 elif e.errno == errno.EINTR: 75 continue # Interrupted system call, just try again 76 else: 77 warn(f"OSError in FileDescriptorInterceptor: {e}") 78 break 79 80 try: 81 last_char_is_newline = data[-1] == b'\n' 82 83 injected_str = self.injection_hook() 84 injected_bytes = injected_str.encode('utf-8') 85 86 if is_first_read: 87 data = b'\n' + data 88 is_first_read = False 89 90 modified_data = ( 91 (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n') 92 if last_char_is_newline 93 else data.replace(b'\n', b'\n' + injected_bytes) 94 ) 95 os.write(self.new_file_descriptor, modified_data) 96 except (BrokenPipeError, OSError): 97 break 98 except Exception: 99 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 100 f.write(traceback.format_exc()) 101 break
Read from the file descriptor and write the modified data after injection.
NOTE: This is blocking and is meant to be run in a thread.
104 def stop_interception(self): 105 """ 106 Close the new file descriptors. 107 """ 108 self.stop_event.set() 109 os.write(self.signal_write_pipe, b'\0') 110 try: 111 os.close(self.new_file_descriptor) 112 except OSError as e: 113 if e.errno != FD_CLOSED: 114 warn( 115 "Error while trying to close the duplicated file descriptor:\n" 116 + f"{traceback.format_exc()}" 117 ) 118 119 try: 120 os.close(self.write_pipe) 121 except OSError as e: 122 if e.errno != FD_CLOSED: 123 warn( 124 "Error while trying to close the write-pipe " 125 + "to the intercepted file descriptor:\n" 126 + f"{traceback.format_exc()}" 127 ) 128 try: 129 os.close(self.read_pipe) 130 except OSError as e: 131 if e.errno != FD_CLOSED: 132 warn( 133 "Error while trying to close the read-pipe " 134 + "to the intercepted file descriptor:\n" 135 + f"{traceback.format_exc()}" 136 ) 137 138 try: 139 os.close(self.signal_read_pipe) 140 except OSError as e: 141 if e.errno != FD_CLOSED: 142 warn( 143 "Error while trying to close the signal-read-pipe " 144 + "to the intercepted file descriptor:\n" 145 + f"{traceback.format_exc()}" 146 ) 147 148 try: 149 os.close(self.signal_write_pipe) 150 except OSError as e: 151 if e.errno != FD_CLOSED: 152 warn( 153 "Error while trying to close the signal-write-pipe " 154 + "to the intercepted file descriptor:\n" 155 + f"{traceback.format_exc()}" 156 )
Close the new file descriptors.